syntax = "proto3";
option java_multiple_files = true;
option java_package = "examples";
option java_outer_classname = "HelloWorldProto";
package helloworld;
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
Vert.x gRPC
关于 gRPC 最好的描述可以在维基百科上找到。
gRPC 是一个开源的远程过程调用 (RPC) 系统,最初由 Google 开发。它使用 HTTP/2 进行传输,使用 Protocol Buffers 作为接口描述语言,并提供诸如身份验证、双向流和流量控制、阻塞或非阻塞绑定、取消和超时等功能。它为多种语言生成跨平台的客户端和服务器绑定。
维基百科
Vert.x gRPC 是一个模块,它将 Google gRPC 的编程风格与 Vert.x 风格对齐。作为此模块的用户,您将更熟悉使用 Vert.x Streams 和 Futures 的代码风格,同时受益于 gRPC 的所有优点。
有关 gRPC 的更多信息,请查阅官方文档网站 http://www.grpc.io/。
此外,Vert.x gRPC 支持
-
使用 Verticles 实现 gRPC 服务扩展
-
非阻塞原生传输
gRPC 类型
使用 gRPC,您可以受益于 HTTP/2,这意味着您将获得异步流支持,即您的远程过程调用可以具有以下特性:
-
客户端流式传输请求对象,而服务器以单个响应对象回复
-
客户端流式传输请求对象,而服务器以响应对象流回复
-
客户端发送单个请求对象,而服务器以单个响应对象回复
-
客户端发送单个请求对象,而服务器以响应对象流回复
对于不熟悉的人来说,这可能看起来与其他基于 HTTP 的 RPC 方法没有太大区别,但您应该知道,使用 HTTP/2,您的请求不必在响应开始到达之前完成。这意味着您的通信通道是全双工的。全双工允许您减少响应延迟并使应用程序响应更迅速。
一个简单的 Hello World 示例
要开始您的第一个 Hello World 示例,需要定义协议。gRPC 要求您使用 protobuffer
格式定义此协议。
这是一个非常简单的示例,展示了单请求、单响应模式。
编译 RPC 定义
使用上述定义,我们需要对其进行编译。
如果您愿意,可以使用 protoc
编译器编译 proto 文件,或者将其集成到您的构建中。
如果您正在使用 Apache Maven,则需要添加以下插件
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.2.0:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${vertx.grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
<protocPlugins>
<protocPlugin>
<id>vertx-grpc-protoc-plugin</id>
<groupId>io.vertx</groupId>
<artifactId>vertx-grpc-protoc-plugin</artifactId>
<version>${stack.version}</version>
<mainClass>io.vertx.grpc.protoc.plugin.VertxGrpcGenerator</mainClass>
</protocPlugin>
</protocPlugins>
</configuration>
<executions>
<execution>
<id>compile</id>
<configuration>
<outputDirectory>${project.basedir}/src/main/java</outputDirectory>
<clearOutputDirectory>false</clearOutputDirectory>
</configuration>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
<execution>
<id>test-compile</id>
<goals>
<goal>test-compile</goal>
<goal>test-compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
${os.detected.classifier}
属性用于使构建独立于操作系统,在 OSX 上它会被 osx-x86_64 等替换。要使用它,您需要在 pom.xml
的 build
部分添加 os-maven-plugin[https://github.com/trustin/os-maven-plugin]。
<build>
...
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.4.1.Final</version>
</extension>
</extensions>
...
</build>
此插件将编译 src/main/proto
下的 proto 文件,并使其可用于您的项目。
如果您正在使用 Gradle,则需要添加以下插件
...
apply plugin: 'com.google.protobuf'
...
buildscript {
...
dependencies {
// ASSUMES GRADLE 2.12 OR HIGHER. Use plugin version 0.7.5 with earlier gradle versions
classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.0'
}
}
...
protobuf {
protoc {
artifact = 'com.google.protobuf:protoc:3.2.0'
}
plugins {
grpc {
artifact = "io.grpc:protoc-gen-grpc-java:1.25.0"
}
vertx {
artifact = "io.vertx:vertx-grpc-protoc-plugin:${vertx.grpc.version}"
}
}
generateProtoTasks {
all()*.plugins {
grpc
vertx
}
}
}
此插件将编译 build/generated/source/proto/main
下的 proto 文件,并使其可用于您的项目。
gRPC 服务器
现在您应该已经设置好了 RPC 基础代码,是时候实现您的服务器了。正如您应该从上面回顾到的那样,我们描述了我们的服务器应该实现一个 sayHello
方法,该方法接收 HelloRequest
对象并返回 HelloReply
对象。因此您可以将其实现为
GreeterGrpc.GreeterImplBase service = new GreeterGrpc.GreeterImplBase() {
@Override
public void sayHello(
HelloRequest request,
StreamObserver<HelloReply> responseObserver) {
responseObserver.onNext(
HelloReply.newBuilder()
.setMessage(request.getName())
.build());
responseObserver.onCompleted();
}
};
一旦您满意,就可以在服务器上提供您的服务。Vert.x 使服务器的创建非常简单,您只需添加
VertxServer rpcServer = VertxServerBuilder
.forAddress(vertx, "my.host", 8080)
.addService(service)
.build();
// Start is asynchronous
rpcServer.start();
使用 Vert.x Future 和 Stream
前面的示例使用 gRPC 服务器,通过 gRPC 异步构造(如 io.grpc.stub.StreamObserver
)进行异步处理。此代码由 protoc 编译器生成。
上面的插件配置配置了以下插件
<protocPlugin>
<id>vertx-grpc-protoc-plugin</id>
<groupId>io.vertx</groupId>
<artifactId>vertx-grpc-protoc-plugin</artifactId>
<version>${stack.version}</version>
<mainClass>io.vertx.grpc.protoc.plugin.VertxGrpcGenerator</mainClass>
</protocPlugin>
这会生成一个服务版本,它使用 Vert.x 异步构造,例如 Future
、ReadStream
或 WriteStream
,这在 Vert.x 生态系统中可能更方便。
VertxGreeterGrpc.GreeterVertxImplBase service =
new VertxGreeterGrpc.GreeterVertxImplBase() {
@Override
public Future<HelloReply> sayHello(HelloRequest request) {
return Future.succeededFuture(
HelloReply.newBuilder()
.setMessage(request.getName())
.build());
}
};
服务器 Gzip 压缩
您可以启用 gzip 压缩,以告知服务器发送压缩响应(压缩请求由服务器自动处理)。
VertxGreeterGrpc.GreeterVertxImplBase service =
new VertxGreeterGrpc.GreeterVertxImplBase() {
@Override
public Future<HelloReply> sayHello(HelloRequest request) {
return Future.succeededFuture(
HelloReply.newBuilder()
.setMessage(request.getName())
.build());
}
}
.withCompression("gzip");
withCompression
配置由 Vert.x gRPC protoc 插件生成。您还可以通过将 ResponseObserver
转换为 ServerCallStreamObserver
并在发送响应之前调用 setCompression
来在默认服务上启用压缩。
GreeterGrpc.GreeterImplBase service = new GreeterGrpc.GreeterImplBase() {
@Override
public void sayHello(
HelloRequest request,
StreamObserver<HelloReply> responseObserver) {
((ServerCallStreamObserver) responseObserver)
.setCompression("gzip");
responseObserver.onNext(
HelloReply.newBuilder()
.setMessage(request.getName())
.build());
responseObserver.onCompleted();
}
};
只要服务器支持其他压缩器,并且在构建 ManagedChannel 时将它们注册到压缩器注册表中,您就可以使用它们。 |
SSL 配置
前面的示例很简单,但您的 RPC 不安全。为了使其安全,我们应该启用 SSL/TLS
VertxServerBuilder builder = VertxServerBuilder.forPort(vertx, 8080)
.useSsl(options -> options
.setSsl(true)
.setUseAlpn(true)
.setKeyCertOptions(new JksOptions()
.setPath("server-keystore.jks")
.setPassword("secret")));
恭喜您,您刚刚完成了您的第一个 gRPC 服务器。
由于 gRPC 使用 HTTP/2 传输,SSL/TLS 设置需要在您的服务器中进行应用层协议协商。 |
服务器扩展
当您部署同一 Verticle 的多个实例时,gRPC 服务器将在 Verticle 的事件循环上进行扩展。
vertx.deployVerticle(
// Verticle supplier - should be called 4 times
() -> new AbstractVerticle() {
BindableService service = new GreeterGrpc.GreeterImplBase() {
@Override
public void sayHello(
HelloRequest request,
StreamObserver<HelloReply> responseObserver) {
responseObserver.onNext(
HelloReply.newBuilder()
.setMessage(request.getName())
.build());
responseObserver.onCompleted();
}
};
@Override
public void start() throws Exception {
VertxServerBuilder
.forAddress(vertx, "my.host", 8080)
.addService(service)
.build()
.start();
}
},
// Deploy 4 instances, i.e the service is scaled on 4 event-loops
new DeploymentOptions()
.setInstances(4));
阻塞服务器拦截器
gRPC ServerInterceptor 是一种在传入调用发送到服务之前对其进行拦截的机制。它具有同步行为,将在 Vert.x 事件循环上执行。
VertxServer rpcServer = VertxServerBuilder
.forAddress(vertx, "my.host", 8080)
.addService(ServerInterceptors.intercept(service, myInterceptor))
.build();
假设我们有一个会阻塞事件循环的拦截器
class MyInterceptor implements ServerInterceptor {
@Override
public <Q, A> ServerCall.Listener<Q> interceptCall(
ServerCall<Q, A> call, Metadata headers, ServerCallHandler<Q, A> next) {
// do something hard and update the metadata, for example
return next.startCall(call, headers);
}
}
MyInterceptor myInterceptor = new MyInterceptor();
为避免阻塞,应包装拦截器。然后它将在 Vert.x 工作线程上被调用。
ServerInterceptor wrapped =
BlockingServerInterceptor.wrap(vertx, myInterceptor);
// Create the server
VertxServer rpcServer = VertxServerBuilder
.forAddress(vertx, "my.host", 8080)
.addService(ServerInterceptors.intercept(service, wrapped))
.build();
// Start it
rpcServer.start();
上下文服务器拦截器
提供了一个抽象的上下文服务器拦截器,允许拦截服务器调用并将元数据提取到 vert.x 上下文中。此上下文不依赖于线程局部变量,因此在 vert.x API 上使用是安全的。此拦截器应是第一个(或首批)添加到拦截器列表中的拦截器。
一个典型示例是会话 ID 的使用。客户端可以创建一个客户端拦截器,在所有连接中设置会话 ID,如下所示:
Metadata extraHeaders = new Metadata();
extraHeaders.put(
Metadata.Key.of("sessionId", Metadata.ASCII_STRING_MARSHALLER), theSessionId);
ClientInterceptor clientInterceptor = MetadataUtils
.newAttachHeadersInterceptor(extraHeaders);
channel = VertxChannelBuilder.forAddress(vertx, "localhost", port)
.intercept(clientInterceptor)
.build();
然后,在服务器端可以添加一个拦截器,如下所示:
BindableService service = new VertxGreeterGrpc.GreeterVertxImplBase() {
@Override
public Future<HelloReply> sayHello(HelloRequest request) {
return Future.succeededFuture(
HelloReply.newBuilder().setMessage("Hello " + request.getName()).build());
}
};
ServerInterceptor contextInterceptor = new ContextServerInterceptor() {
@Override
public void bind(Metadata metadata) {
put("sessionId", metadata.get(SESSION_ID_METADATA_KEY));
}
};
// Create the server
VertxServer rpcServer = VertxServerBuilder
.forAddress(vertx, "my.host", 8080)
.addService(ServerInterceptors.intercept(service, contextInterceptor))
.build();
gRPC 客户端
没有客户端的服务器是没用的,所以我们来创建一个客户端。为此,有些步骤与服务器重叠。首先,我们需要有 RPC 定义,这应该已经完成,否则就不会有服务器,并且相同的定义也应该已经编译过了。
请注意,编译器将始终生成基础服务器和客户端存根,因此如果您已经编译过一次,则无需再次编译。
每个客户端存根都将始终需要一个到服务器的通信通道,因此我们首先需要创建一个 gRPC 通道
ManagedChannel channel = VertxChannelBuilder
.forAddress(vertx, "localhost", 8080)
.usePlaintext()
.build();
// Get a stub to use for interacting with the remote service
GreeterGrpc.GreeterStub stub = GreeterGrpc.newStub(channel);
一旦存根创建成功,我们就可以与服务器通信了,这次更容易,因为存根已经提供了正确的方法定义和参数类型
HelloRequest request = HelloRequest.newBuilder().setName("Julien").build();
// Call the remote service
stub.sayHello(request, new StreamObserver<HelloReply>() {
private HelloReply helloReply;
@Override
public void onNext(HelloReply helloReply) {
this.helloReply = helloReply;
}
@Override
public void onError(Throwable throwable) {
System.out.println("Coult not reach server " + throwable.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Got the server response: " + helloReply.getMessage());
}
});
使用 Vert.x Future 和 Stream
前面的示例使用 gRPC 客户端,通过 gRPC 异步构造(如 io.grpc.stub.StreamObserver
)进行异步处理。此代码由 protoc 编译器生成。
上面的插件配置配置了以下插件
<protocPlugin>
<id>vertx-grpc-protoc-plugin</id>
<groupId>io.vertx</groupId>
<artifactId>vertx-grpc-protoc-plugin</artifactId>
<version>${stack.version}</version>
<mainClass>io.vertx.grpc.protoc.plugin.VertxGrpcGenerator</mainClass>
</protocPlugin>
这会生成一个客户端版本,它使用 Vert.x 异步构造,例如 Future
、ReadStream
或 WriteStream
,这在 Vert.x 生态系统中可能更方便。
HelloRequest request = HelloRequest.newBuilder().setName("Julien").build();
// Call the remote service
Future<HelloReply> future = stub.sayHello(request);
// Listen to completion events
future
.onSuccess(helloReply -> System.out.println("Got the server response: " + helloReply.getMessage())).onFailure(err -> System.out.println("Coult not reach server " + err));
客户端 Gzip 压缩
您可以启用 gzip 压缩,以告知客户端发送压缩消息。
GreeterGrpc.GreeterStub stub = GreeterGrpc
.newStub(channel)
.withCompression("gzip");
只要服务器支持其他压缩器,并且在构建 ManagedChannel 时将它们注册到压缩器注册表中,您就可以使用它们。 |
SSL 配置
如果您之前启用了 SSL,您的客户端也将需要 SSL,为此我们需要配置通道
ManagedChannel channel = VertxChannelBuilder.
forAddress(vertx, "localhost", 8080)
.useSsl(options -> options
.setSsl(true)
.setUseAlpn(true)
.setKeyCertOptions(new JksOptions()
.setPath("client-truststore.jks")
.setPassword("secret")))
.build();
由于 gRPC 使用 HTTP/2 传输,SSL/TLS 设置需要在您的客户端中进行应用层协议协商。 |
高级配置
到目前为止,所有 gRPC 示例都使用了合理的默认值,但还有更多内容。如果您需要完全控制服务器配置,应查阅文档:VertxServerBuilder
;如果您需要控制客户端通道,则应查阅 VertxChannelBuilder
。Vert.x gRPC 扩展了 grpc-java 项目(Netty 传输),因此建议阅读其文档。
原生传输
客户端和服务器可以使用 Netty 的原生传输进行部署,这在创建 Vert.x 实例时实现。
Vertx.vertx(new VertxOptions().setPreferNativeTransport(true));
有关原生传输的更多信息,请参阅 Vert.x Core 文档。