事件总线

在事件总线上交换生成的Protobuf类

本文档将向您展示如何在事件总线上交换由Protocol Buffers生成的类型的消息。

您将构建什么

您将构建一个定期生成问候消息的应用程序。该应用程序包括

  • 一个发送者verticle,它发送一个GreetingRequest

  • 一个接收者verticle,它用一个GreetingResponse回复请求

您需要什么

  • 一个文本编辑器或IDE,

  • Java 11 或更高版本

创建项目

浏览至https://start.vertx.io。点击“高级选项”以展开隐藏面板,然后更改以下字段的值

  • Group Id: 设置为io.vertx.howtos

  • Artifact Id: 设置为protobuf-eventbus-howto

  • Dependencies: 添加Hazelcast Cluster Manager

  • Package: 设置为io.vertx.howtos.protobuf.eventbus

完成后,点击Generate Project并将其生成的存档内容提取到您的文件系统中的某个位置。

您也可以从命令行执行此操作

curl -G https://start.vertx.io/starter.zip -d "groupId=io.vertx.howtos" -d "artifactId=protobuf-eventbus-howto" -d "packageName=io.vertx.howtos.protobuf.eventbus" -d "vertxDependencies=vertx-hazelcast" -d "jdkVersion=11" -d "buildTool=maven" --output protobuf-eventbus-howto.zip
unzip -d protobuf-eventbus-howto protobuf-eventbus-howto.zip

在编码之前,我们需要对构建文件进行一些调整

  • 配置一个自定义的Vert.x Launcher类(在运行可执行JAR时将用作入口点)

  • 添加一个对Protocol Buffers的依赖

  • 配置Maven插件,从.proto文件生成消息类。

以下是您应该使用的 pom.xml 文件的内容

应用程序的实现

消息的定义

src/main/proto/greetings.proto中,我们定义了

  • 一个包含nameGreetingRequest,以及

  • 一个包含messageGreetingReply

greetings.proto
syntax = "proto3";

package greeting;

option java_multiple_files = true;
option java_package = "io.vertx.howtos.protobuf.eventbus";
option java_outer_classname = "GreetingProtos";

message GreetingRequest {
  string name = 1;
}

message GreetingReply {
  string message = 1;
}

接收者verticle

接收者verticle在事件总线上注册一个消费者。当接收到请求时

  1. 请求连同其系统哈希码一起打印到控制台

  2. 生成一个回复

  3. 回复连同其系统哈希码一起打印到控制台

  4. 发送回复

ReceiverVerticle.java
package io.vertx.howtos.protobuf.eventbus;

import io.vertx.core.Future;
import io.vertx.core.VerticleBase;

public class ReceiverVerticle extends VerticleBase {

  @Override
  public Future<?> start() {
    return vertx.eventBus().<GreetingRequest>consumer("greetings", msg -> {
      var request = msg.body();
      System.out.printf("Received request = %s (%d)%n", request.getName(), System.identityHashCode(request));
      var greeting = String.format("Hello %s", request.getName());
      var reply = GreetingReply.newBuilder().setMessage(greeting).build();
      System.out.printf("Sending reply = %s (%d)%n", reply.getMessage(), System.identityHashCode(reply));
      msg.reply(reply);
    }).completion();
  }
}
打印系统哈希码是为了当我们把应用程序运行在单个虚拟机中时,可以查看对象是否被事件总线复制。

发送者verticle

发送者verticle安排了一个周期性任务。每隔五秒钟

  1. 生成一个请求

  2. 请求连同其系统哈希码一起打印到控制台

  3. 发送请求

  4. 回复连同其系统哈希码一起打印到控制台

SenderVerticle.java
package io.vertx.howtos.protobuf.eventbus;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.Message;

public class SenderVerticle extends AbstractVerticle {

  @Override
  public void start() throws Exception {
    vertx.setPeriodic(5000, l -> {
      var request = GreetingRequest.newBuilder().setName("Jane Doe").build();
      System.out.printf("Sending request = %s (%d)%n", request.getName(), System.identityHashCode(request));
      vertx.eventBus().<GreetingReply>request("greetings", request)
        .map(Message::body)
        .onFailure(Throwable::printStackTrace)
        .onSuccess(reply -> System.out.printf("Received reply = %s (%d)%n", reply.getMessage(), System.identityHashCode(reply)));
    });
  }
}

EventBus编解码器

在为Protocol Buffer消息类设计编解码器时,我们可以利用它们的属性

  • 所有消息在Java平台的意义上都是Serializable

  • 消息对象是不可变的

因此,消息类在通过网络发送/接收时可以透明地进行序列化/反序列化。此外,当消息对象在本地交换时,我们无需复制它们。

ProtobufCodec.java
package io.vertx.howtos.protobuf.eventbus;

import com.google.protobuf.GeneratedMessageV3;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.MessageCodec;
import io.vertx.core.impl.SerializableUtils;

public class ProtobufCodec implements MessageCodec<GeneratedMessageV3, GeneratedMessageV3> {

  static final String PROTOS_PACKAGE_NAME = "io.vertx.howtos.protobuf.eventbus.";

  @Override
  public void encodeToWire(Buffer buffer, GeneratedMessageV3 o) {
    var bytes = SerializableUtils.toBytes(o);
    buffer.appendInt(bytes.length);
    buffer.appendBytes(bytes);
  }

  @Override
  public GeneratedMessageV3 decodeFromWire(int pos, Buffer buffer) {
    var length = buffer.getInt(pos);
    pos += 4;
    var bytes = buffer.getBytes(pos, pos + length);
    return (GeneratedMessageV3) SerializableUtils.fromBytes(bytes, CheckedClassNameObjectInputStream::new);
  }

  @Override
  public GeneratedMessageV3 transform(GeneratedMessageV3 o) {
    return o;
  }

  @Override
  public String name() {
    return "ProtobufCodec";
  }

  @Override
  public byte systemCodecID() {
    return -1; // -1 for a user codec
  }

  public boolean appliesTo(String className) {
    return className.startsWith(PROTOS_PACKAGE_NAME);
  }
}

出于安全原因,我们不希望在接收端反序列化任何对象。这就是为什么我们使用CheckedClassNameObjectInputStream而不是普通的ObjectInputStream

该实现保证只允许某些类

  • 当然,包括我们的消息类

  • Protocol Buffer的Java实现类

  • Vert.x Event Bus默认允许的类(例如字节数组)

CheckedClassNameObjectInputStream.java
package io.vertx.howtos.protobuf.eventbus;

import io.vertx.core.eventbus.EventBus;

import java.io.*;

class CheckedClassNameObjectInputStream extends ObjectInputStream {

  CheckedClassNameObjectInputStream(InputStream in) throws IOException {
    super(in);
  }

  @Override
  protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
    var name = desc.getName();
    if (name.startsWith("com.google.protobuf.")
      || name.startsWith(ProtobufCodec.PROTOS_PACKAGE_NAME)
      || EventBus.DEFAULT_SERIALIZABLE_CHECKER.apply(name)) {
      return super.resolveClass(desc);
    }
    throw new InvalidClassException("Class not allowed: " + name);
  }
}

最后,在一个自定义的Launcher类中,我们必须

  • 注册这个编解码器

  • 配置Event Bus,使其在消息体类型属于我们包时使用此编解码器

CustomLauncher.java
package io.vertx.howtos.protobuf.eventbus;

import io.vertx.launcher.application.HookContext;
import io.vertx.launcher.application.VertxApplication;
import io.vertx.launcher.application.VertxApplicationHooks;

public class CustomLauncher extends VertxApplication implements VertxApplicationHooks {

  public CustomLauncher(String[] args) {
    super(args);
  }

  public static void main(String[] args) {
    new CustomLauncher(args).launch();
  }

  @Override
  public void afterVertxStarted(HookContext context) {
    var vertx = context.vertx();
    var protobufCodec = new ProtobufCodec();
    vertx.eventBus().registerCodec(protobufCodec);
    vertx.eventBus().codecSelector(body -> {
      return protobufCodec.appliesTo(body.getClass().getName()) ? protobufCodec.name() : null;
    });
  }
}

运行应用程序

首先您必须构建应用程序

./mvnw clean package

然后启动接收器

java -Djava.net.preferIPv4Stack=true -jar target/protobuf-eventbus-howto-1.0.0-SNAPSHOT-fat.jar io.vertx.howtos.protobuf.eventbus.ReceiverVerticle -cluster

当它准备好时,您将看到:INFO: Succeeded in deploying verticle

现在在另一个终端启动发送器

java -Djava.net.preferIPv4Stack=true -jar target/protobuf-eventbus-howto-1.0.0-SNAPSHOT-fat.jar io.vertx.howtos.protobuf.eventbus.SenderVerticle -cluster

当它准备好时,您将看到:INFO: Succeeded in deploying verticle

过一段时间后,您将在发送者控制台中看到

Sending request = Jane Doe (1445840961)
Received reply = Hello Jane Doe (654163465)

而在接收者控制台中

Received request = Jane Doe (449456520)
Sending reply = Hello Jane Doe (522259462)

在集群模式下,打印的系统哈希码不重要:位于不同虚拟机中的对象,显然是不同的。

本地模式呢?要在同一个虚拟机中运行发送者和接收者,我们可以使用第三个verticle,其唯一目的是部署它们。

MainVerticle.java
package io.vertx.howtos.protobuf.eventbus;

import io.vertx.core.Future;
import io.vertx.core.VerticleBase;

public class MainVerticle extends VerticleBase {

  @Override
  public Future<?> start() {
    return Future.join(
      vertx.deployVerticle(new ReceiverVerticle()),
      vertx.deployVerticle(new SenderVerticle())
    );
  }
}

打开一个终端,再次构建项目并运行可执行JAR。

./mvnw clean package
java -jar target/protobuf-eventbus-howto-1.0.0-SNAPSHOT-fat.jar

当它准备好时,您将看到:INFO: Succeeded in deploying verticle

过一段时间后,您将在控制台中看到

Sending request = Jane Doe (346056258)
Received request = Jane Doe (346056258)
Sending reply = Hello Jane Doe (1483137857)
Received reply = Hello Jane Doe (1483137857)

请注意系统哈希码。请注意,请求对象在发送者和接收者中是相同的。回复对象也是如此。

总结

本文档涵盖了

  1. Protocol Buffers生成的类型的消息创建编解码器

  2. 注册此编解码器并配置Event Bus使其默认使用它

  3. 在本地和跨网络发送和接收消息对象