curl -G https://start.vertx.io/starter.zip -d "groupId=io.vertx.howtos" -d "artifactId=protobuf-eventbus-howto" -d "packageName=io.vertx.howtos.protobuf.eventbus" -d "vertxDependencies=vertx-hazelcast" -d "jdkVersion=11" -d "buildTool=maven" --output protobuf-eventbus-howto.zip
unzip -d protobuf-eventbus-howto protobuf-eventbus-howto.zip
在事件总线上交换生成的Protobuf类
本文档将向您展示如何在事件总线上交换由Protocol Buffers生成的类型的消息。
您将构建什么
您将构建一个定期生成问候消息的应用程序。该应用程序包括
-
一个发送者verticle,它发送一个
GreetingRequest
-
一个接收者verticle,它用一个
GreetingResponse
回复请求
您需要什么
-
一个文本编辑器或IDE,
-
Java 11 或更高版本
创建项目
浏览至https://start.vertx.io。点击“高级选项”以展开隐藏面板,然后更改以下字段的值
-
Group Id: 设置为
io.vertx.howtos
-
Artifact Id: 设置为
protobuf-eventbus-howto
-
Dependencies: 添加
Hazelcast Cluster Manager
-
Package: 设置为
io.vertx.howtos.protobuf.eventbus
完成后,点击Generate Project并将其生成的存档内容提取到您的文件系统中的某个位置。
您也可以从命令行执行此操作 |
在编码之前,我们需要对构建文件进行一些调整
-
配置一个自定义的Vert.x
Launcher
类(在运行可执行JAR时将用作入口点) -
添加一个对Protocol Buffers的依赖
-
配置Maven插件,从
.proto
文件生成消息类。
以下是您应该使用的 pom.xml
文件的内容
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.vertx.howtos</groupId>
<artifactId>protobuf-eventbus-howto</artifactId>
<version>1.0.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<main.verticle>io.vertx.howtos.protobuf.eventbus.MainVerticle</main.verticle>
<launcher.class>io.vertx.howtos.protobuf.eventbus.CustomLauncher</launcher.class>
<vertx.version>5.0.0.CR2</vertx.version>
<protobuf.version>3.22.2</protobuf.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-stack-depchain</artifactId>
<version>${vertx.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-launcher-application</artifactId>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-hazelcast</artifactId>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.7.0</version>
</extension>
</extensions>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.13.0</version>
<configuration>
<release>11</release>
</configuration>
</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<manifestEntries>
<Main-Class>${launcher.class}</Main-Class>
<Main-Verticle>${main.verticle}</Main-Verticle>
</manifestEntries>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
<outputFile>${project.build.directory}/${project.artifactId}-${project.version}-fat.jar
</outputFile>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
应用程序的实现
消息的定义
在src/main/proto/greetings.proto
中,我们定义了
-
一个包含
name
的GreetingRequest
,以及 -
一个包含
message
的GreetingReply
syntax = "proto3";
package greeting;
option java_multiple_files = true;
option java_package = "io.vertx.howtos.protobuf.eventbus";
option java_outer_classname = "GreetingProtos";
message GreetingRequest {
string name = 1;
}
message GreetingReply {
string message = 1;
}
接收者verticle
接收者verticle在事件总线上注册一个消费者。当接收到请求时
-
请求连同其系统哈希码一起打印到控制台
-
生成一个回复
-
回复连同其系统哈希码一起打印到控制台
-
发送回复
package io.vertx.howtos.protobuf.eventbus;
import io.vertx.core.Future;
import io.vertx.core.VerticleBase;
public class ReceiverVerticle extends VerticleBase {
@Override
public Future<?> start() {
return vertx.eventBus().<GreetingRequest>consumer("greetings", msg -> {
var request = msg.body();
System.out.printf("Received request = %s (%d)%n", request.getName(), System.identityHashCode(request));
var greeting = String.format("Hello %s", request.getName());
var reply = GreetingReply.newBuilder().setMessage(greeting).build();
System.out.printf("Sending reply = %s (%d)%n", reply.getMessage(), System.identityHashCode(reply));
msg.reply(reply);
}).completion();
}
}
打印系统哈希码是为了当我们把应用程序运行在单个虚拟机中时,可以查看对象是否被事件总线复制。 |
发送者verticle
发送者verticle安排了一个周期性任务。每隔五秒钟
-
生成一个请求
-
请求连同其系统哈希码一起打印到控制台
-
发送请求
-
回复连同其系统哈希码一起打印到控制台
package io.vertx.howtos.protobuf.eventbus;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.Message;
public class SenderVerticle extends AbstractVerticle {
@Override
public void start() throws Exception {
vertx.setPeriodic(5000, l -> {
var request = GreetingRequest.newBuilder().setName("Jane Doe").build();
System.out.printf("Sending request = %s (%d)%n", request.getName(), System.identityHashCode(request));
vertx.eventBus().<GreetingReply>request("greetings", request)
.map(Message::body)
.onFailure(Throwable::printStackTrace)
.onSuccess(reply -> System.out.printf("Received reply = %s (%d)%n", reply.getMessage(), System.identityHashCode(reply)));
});
}
}
EventBus编解码器
在为Protocol Buffer消息类设计编解码器时,我们可以利用它们的属性
-
所有消息在Java平台的意义上都是
Serializable
的 -
消息对象是不可变的
因此,消息类在通过网络发送/接收时可以透明地进行序列化/反序列化。此外,当消息对象在本地交换时,我们无需复制它们。
package io.vertx.howtos.protobuf.eventbus;
import com.google.protobuf.GeneratedMessageV3;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.MessageCodec;
import io.vertx.core.impl.SerializableUtils;
public class ProtobufCodec implements MessageCodec<GeneratedMessageV3, GeneratedMessageV3> {
static final String PROTOS_PACKAGE_NAME = "io.vertx.howtos.protobuf.eventbus.";
@Override
public void encodeToWire(Buffer buffer, GeneratedMessageV3 o) {
var bytes = SerializableUtils.toBytes(o);
buffer.appendInt(bytes.length);
buffer.appendBytes(bytes);
}
@Override
public GeneratedMessageV3 decodeFromWire(int pos, Buffer buffer) {
var length = buffer.getInt(pos);
pos += 4;
var bytes = buffer.getBytes(pos, pos + length);
return (GeneratedMessageV3) SerializableUtils.fromBytes(bytes, CheckedClassNameObjectInputStream::new);
}
@Override
public GeneratedMessageV3 transform(GeneratedMessageV3 o) {
return o;
}
@Override
public String name() {
return "ProtobufCodec";
}
@Override
public byte systemCodecID() {
return -1; // -1 for a user codec
}
public boolean appliesTo(String className) {
return className.startsWith(PROTOS_PACKAGE_NAME);
}
}
出于安全原因,我们不希望在接收端反序列化任何对象。这就是为什么我们使用CheckedClassNameObjectInputStream
而不是普通的ObjectInputStream
。
该实现保证只允许某些类
-
当然,包括我们的消息类
-
Protocol Buffer的Java实现类
-
Vert.x Event Bus默认允许的类(例如字节数组)
package io.vertx.howtos.protobuf.eventbus;
import io.vertx.core.eventbus.EventBus;
import java.io.*;
class CheckedClassNameObjectInputStream extends ObjectInputStream {
CheckedClassNameObjectInputStream(InputStream in) throws IOException {
super(in);
}
@Override
protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
var name = desc.getName();
if (name.startsWith("com.google.protobuf.")
|| name.startsWith(ProtobufCodec.PROTOS_PACKAGE_NAME)
|| EventBus.DEFAULT_SERIALIZABLE_CHECKER.apply(name)) {
return super.resolveClass(desc);
}
throw new InvalidClassException("Class not allowed: " + name);
}
}
最后,在一个自定义的Launcher
类中,我们必须
-
注册这个编解码器
-
配置Event Bus,使其在消息体类型属于我们包时使用此编解码器
package io.vertx.howtos.protobuf.eventbus;
import io.vertx.launcher.application.HookContext;
import io.vertx.launcher.application.VertxApplication;
import io.vertx.launcher.application.VertxApplicationHooks;
public class CustomLauncher extends VertxApplication implements VertxApplicationHooks {
public CustomLauncher(String[] args) {
super(args);
}
public static void main(String[] args) {
new CustomLauncher(args).launch();
}
@Override
public void afterVertxStarted(HookContext context) {
var vertx = context.vertx();
var protobufCodec = new ProtobufCodec();
vertx.eventBus().registerCodec(protobufCodec);
vertx.eventBus().codecSelector(body -> {
return protobufCodec.appliesTo(body.getClass().getName()) ? protobufCodec.name() : null;
});
}
}
运行应用程序
首先您必须构建应用程序
./mvnw clean package
然后启动接收器
java -Djava.net.preferIPv4Stack=true -jar target/protobuf-eventbus-howto-1.0.0-SNAPSHOT-fat.jar io.vertx.howtos.protobuf.eventbus.ReceiverVerticle -cluster
当它准备好时,您将看到:INFO: Succeeded in deploying verticle
。
现在在另一个终端启动发送器
java -Djava.net.preferIPv4Stack=true -jar target/protobuf-eventbus-howto-1.0.0-SNAPSHOT-fat.jar io.vertx.howtos.protobuf.eventbus.SenderVerticle -cluster
当它准备好时,您将看到:INFO: Succeeded in deploying verticle
。
过一段时间后,您将在发送者控制台中看到
Sending request = Jane Doe (1445840961) Received reply = Hello Jane Doe (654163465)
而在接收者控制台中
Received request = Jane Doe (449456520) Sending reply = Hello Jane Doe (522259462)
在集群模式下,打印的系统哈希码不重要:位于不同虚拟机中的对象,显然是不同的。
本地模式呢?要在同一个虚拟机中运行发送者和接收者,我们可以使用第三个verticle,其唯一目的是部署它们。
package io.vertx.howtos.protobuf.eventbus;
import io.vertx.core.Future;
import io.vertx.core.VerticleBase;
public class MainVerticle extends VerticleBase {
@Override
public Future<?> start() {
return Future.join(
vertx.deployVerticle(new ReceiverVerticle()),
vertx.deployVerticle(new SenderVerticle())
);
}
}
打开一个终端,再次构建项目并运行可执行JAR。
./mvnw clean package
java -jar target/protobuf-eventbus-howto-1.0.0-SNAPSHOT-fat.jar
当它准备好时,您将看到:INFO: Succeeded in deploying verticle
。
过一段时间后,您将在控制台中看到
Sending request = Jane Doe (346056258) Received request = Jane Doe (346056258) Sending reply = Hello Jane Doe (1483137857) Received reply = Hello Jane Doe (1483137857)
请注意系统哈希码。请注意,请求对象在发送者和接收者中是相同的。回复对象也是如此。
总结
本文档涵盖了
-
为Protocol Buffers生成的类型的消息创建编解码器
-
注册此编解码器并配置Event Bus使其默认使用它
-
在本地和跨网络发送和接收消息对象