Vert.x gRPC

已弃用

关于 gRPC 最好的描述可以在维基百科上找到。

gRPC 是一个开源的远程过程调用 (RPC) 系统,最初由 Google 开发。它使用 HTTP/2 进行传输,使用 Protocol Buffers 作为接口描述语言,并提供诸如身份验证、双向流和流量控制、阻塞或非阻塞绑定、取消和超时等功能。它为多种语言生成跨平台的客户端和服务器绑定。

— 维基百科
维基百科

Vert.x gRPC 是一个模块,它将 Google gRPC 的编程风格与 Vert.x 风格对齐。作为此模块的用户,您将更熟悉使用 Vert.x Streams 和 Futures 的代码风格,同时受益于 gRPC 的所有优点。

有关 gRPC 的更多信息,请查阅官方文档网站 http://www.grpc.io/

此外,Vert.x gRPC 支持

  • 使用 Verticles 实现 gRPC 服务扩展

  • 非阻塞原生传输

gRPC 类型

使用 gRPC,您可以受益于 HTTP/2,这意味着您将获得异步流支持,即您的远程过程调用可以具有以下特性:

  • 客户端流式传输请求对象,而服务器以单个响应对象回复

  • 客户端流式传输请求对象,而服务器以响应对象流回复

  • 客户端发送单个请求对象,而服务器以单个响应对象回复

  • 客户端发送单个请求对象,而服务器以响应对象流回复

对于不熟悉的人来说,这可能看起来与其他基于 HTTP 的 RPC 方法没有太大区别,但您应该知道,使用 HTTP/2,您的请求不必在响应开始到达之前完成。这意味着您的通信通道是全双工的。全双工允许您减少响应延迟并使应用程序响应更迅速。

一个简单的 Hello World 示例

要开始您的第一个 Hello World 示例,需要定义协议。gRPC 要求您使用 protobuffer 格式定义此协议。

syntax = "proto3";

option java_multiple_files = true;
option java_package = "examples";
option java_outer_classname = "HelloWorldProto";
package helloworld;

// The greeting service definition.
service Greeter {
  // Sends a greeting
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
  string name = 1;
}

// The response message containing the greetings
message HelloReply {
  string message = 1;
}

这是一个非常简单的示例,展示了单请求、单响应模式。

编译 RPC 定义

使用上述定义,我们需要对其进行编译。

如果您愿意,可以使用 protoc 编译器编译 proto 文件,或者将其集成到您的构建中。

如果您正在使用 Apache Maven,则需要添加以下插件

 <plugin>
  <groupId>org.xolstice.maven.plugins</groupId>
  <artifactId>protobuf-maven-plugin</artifactId>
  <version>0.6.1</version>
  <configuration>
    <protocArtifact>com.google.protobuf:protoc:3.2.0:exe:${os.detected.classifier}</protocArtifact>
    <pluginId>grpc-java</pluginId>
    <pluginArtifact>io.grpc:protoc-gen-grpc-java:${vertx.grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
    <protocPlugins>
      <protocPlugin>
        <id>vertx-grpc-protoc-plugin</id>
        <groupId>io.vertx</groupId>
        <artifactId>vertx-grpc-protoc-plugin</artifactId>
        <version>${stack.version}</version>
        <mainClass>io.vertx.grpc.protoc.plugin.VertxGrpcGenerator</mainClass>
      </protocPlugin>
    </protocPlugins>
  </configuration>
  <executions>
    <execution>
      <id>compile</id>
      <configuration>
        <outputDirectory>${project.basedir}/src/main/java</outputDirectory>
        <clearOutputDirectory>false</clearOutputDirectory>
      </configuration>
      <goals>
        <goal>compile</goal>
        <goal>compile-custom</goal>
      </goals>
    </execution>
    <execution>
      <id>test-compile</id>
      <goals>
        <goal>test-compile</goal>
        <goal>test-compile-custom</goal>
      </goals>
    </execution>
  </executions>
</plugin>

${os.detected.classifier} 属性用于使构建独立于操作系统,在 OSX 上它会被 osx-x86_64 等替换。要使用它,您需要在 pom.xmlbuild 部分添加 os-maven-plugin[https://github.com/trustin/os-maven-plugin]。

<build>
  ...
  <extensions>
    <extension>
      <groupId>kr.motd.maven</groupId>
      <artifactId>os-maven-plugin</artifactId>
      <version>1.4.1.Final</version>
    </extension>
  </extensions>
  ...
</build>

此插件将编译 src/main/proto 下的 proto 文件,并使其可用于您的项目。

如果您正在使用 Gradle,则需要添加以下插件

...
apply plugin: 'com.google.protobuf'
...
buildscript {
  ...
  dependencies {
    // ASSUMES GRADLE 2.12 OR HIGHER. Use plugin version 0.7.5 with earlier gradle versions
    classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.0'
  }
}
...
protobuf {
  protoc {
    artifact = 'com.google.protobuf:protoc:3.2.0'
  }
  plugins {
    grpc {
      artifact = "io.grpc:protoc-gen-grpc-java:1.25.0"
    }
    vertx {
      artifact = "io.vertx:vertx-grpc-protoc-plugin:${vertx.grpc.version}"
    }
  }
  generateProtoTasks {
    all()*.plugins {
      grpc
      vertx
    }
  }
}

此插件将编译 build/generated/source/proto/main 下的 proto 文件,并使其可用于您的项目。

gRPC 服务器

现在您应该已经设置好了 RPC 基础代码,是时候实现您的服务器了。正如您应该从上面回顾到的那样,我们描述了我们的服务器应该实现一个 sayHello 方法,该方法接收 HelloRequest 对象并返回 HelloReply 对象。因此您可以将其实现为

GreeterGrpc.GreeterImplBase service = new GreeterGrpc.GreeterImplBase() {
  @Override
  public void sayHello(
    HelloRequest request,
    StreamObserver<HelloReply> responseObserver) {

    responseObserver.onNext(
      HelloReply.newBuilder()
        .setMessage(request.getName())
        .build());
    responseObserver.onCompleted();
  }
};

一旦您满意,就可以在服务器上提供您的服务。Vert.x 使服务器的创建非常简单,您只需添加

VertxServer rpcServer = VertxServerBuilder
  .forAddress(vertx, "my.host", 8080)
  .addService(service)
  .build();

// Start is asynchronous
rpcServer.start();

使用 Vert.x Future 和 Stream

前面的示例使用 gRPC 服务器,通过 gRPC 异步构造(如 io.grpc.stub.StreamObserver)进行异步处理。此代码由 protoc 编译器生成。

上面的插件配置配置了以下插件

<protocPlugin>
  <id>vertx-grpc-protoc-plugin</id>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-grpc-protoc-plugin</artifactId>
  <version>${stack.version}</version>
  <mainClass>io.vertx.grpc.protoc.plugin.VertxGrpcGenerator</mainClass>
</protocPlugin>

这会生成一个服务版本,它使用 Vert.x 异步构造,例如 FutureReadStreamWriteStream,这在 Vert.x 生态系统中可能更方便。

VertxGreeterGrpc.GreeterVertxImplBase service =
  new VertxGreeterGrpc.GreeterVertxImplBase() {
    @Override
    public Future<HelloReply> sayHello(HelloRequest request) {
      return Future.succeededFuture(
        HelloReply.newBuilder()
          .setMessage(request.getName())
          .build());
    }
  };

服务器 Gzip 压缩

您可以启用 gzip 压缩,以告知服务器发送压缩响应(压缩请求由服务器自动处理)。

VertxGreeterGrpc.GreeterVertxImplBase service =
  new VertxGreeterGrpc.GreeterVertxImplBase() {
    @Override
    public Future<HelloReply> sayHello(HelloRequest request) {
      return Future.succeededFuture(
        HelloReply.newBuilder()
          .setMessage(request.getName())
          .build());
    }
  }
    .withCompression("gzip");

withCompression 配置由 Vert.x gRPC protoc 插件生成。您还可以通过将 ResponseObserver 转换为 ServerCallStreamObserver 并在发送响应之前调用 setCompression 来在默认服务上启用压缩。

GreeterGrpc.GreeterImplBase service = new GreeterGrpc.GreeterImplBase() {
  @Override
  public void sayHello(
    HelloRequest request,
    StreamObserver<HelloReply> responseObserver) {

    ((ServerCallStreamObserver) responseObserver)
      .setCompression("gzip");

    responseObserver.onNext(
      HelloReply.newBuilder()
        .setMessage(request.getName())
        .build());

    responseObserver.onCompleted();
  }
};
只要服务器支持其他压缩器,并且在构建 ManagedChannel 时将它们注册到压缩器注册表中,您就可以使用它们。

SSL 配置

前面的示例很简单,但您的 RPC 不安全。为了使其安全,我们应该启用 SSL/TLS

VertxServerBuilder builder = VertxServerBuilder.forPort(vertx, 8080)
  .useSsl(options -> options
    .setSsl(true)
    .setUseAlpn(true)
    .setKeyCertOptions(new JksOptions()
      .setPath("server-keystore.jks")
      .setPassword("secret")));

恭喜您,您刚刚完成了您的第一个 gRPC 服务器。

由于 gRPC 使用 HTTP/2 传输,SSL/TLS 设置需要在您的服务器中进行应用层协议协商

服务器扩展

当您部署同一 Verticle 的多个实例时,gRPC 服务器将在 Verticle 的事件循环上进行扩展。

vertx.deployVerticle(

  // Verticle supplier - should be called 4 times
  () -> new AbstractVerticle() {

    BindableService service = new GreeterGrpc.GreeterImplBase() {
      @Override
      public void sayHello(
        HelloRequest request,
        StreamObserver<HelloReply> responseObserver) {

        responseObserver.onNext(
          HelloReply.newBuilder()
            .setMessage(request.getName())
            .build());

        responseObserver.onCompleted();
      }
    };

    @Override
    public void start() throws Exception {
      VertxServerBuilder
        .forAddress(vertx, "my.host", 8080)
        .addService(service)
        .build()
        .start();
    }
  },

  // Deploy 4 instances, i.e the service is scaled on 4 event-loops
  new DeploymentOptions()
    .setInstances(4));

阻塞服务器拦截器

gRPC ServerInterceptor 是一种在传入调用发送到服务之前对其进行拦截的机制。它具有同步行为,将在 Vert.x 事件循环上执行。

VertxServer rpcServer = VertxServerBuilder
  .forAddress(vertx, "my.host", 8080)
  .addService(ServerInterceptors.intercept(service, myInterceptor))
  .build();

假设我们有一个会阻塞事件循环的拦截器

class MyInterceptor implements ServerInterceptor {
  @Override
  public <Q, A> ServerCall.Listener<Q> interceptCall(
    ServerCall<Q, A> call, Metadata headers, ServerCallHandler<Q, A> next) {
    // do something hard and update the metadata, for example
    return next.startCall(call, headers);
  }
}
MyInterceptor myInterceptor = new MyInterceptor();

为避免阻塞,应包装拦截器。然后它将在 Vert.x 工作线程上被调用。

ServerInterceptor wrapped =
  BlockingServerInterceptor.wrap(vertx, myInterceptor);

// Create the server
VertxServer rpcServer = VertxServerBuilder
  .forAddress(vertx, "my.host", 8080)
  .addService(ServerInterceptors.intercept(service, wrapped))
  .build();

// Start it
rpcServer.start();

上下文服务器拦截器

提供了一个抽象的上下文服务器拦截器,允许拦截服务器调用并将元数据提取到 vert.x 上下文中。此上下文不依赖于线程局部变量,因此在 vert.x API 上使用是安全的。此拦截器应是第一个(或首批)添加到拦截器列表中的拦截器。

一个典型示例是会话 ID 的使用。客户端可以创建一个客户端拦截器,在所有连接中设置会话 ID,如下所示:

Metadata extraHeaders = new Metadata();
extraHeaders.put(
  Metadata.Key.of("sessionId", Metadata.ASCII_STRING_MARSHALLER), theSessionId);

ClientInterceptor clientInterceptor = MetadataUtils
  .newAttachHeadersInterceptor(extraHeaders);

channel = VertxChannelBuilder.forAddress(vertx, "localhost", port)
  .intercept(clientInterceptor)
  .build();

然后,在服务器端可以添加一个拦截器,如下所示:

BindableService service = new VertxGreeterGrpc.GreeterVertxImplBase() {
  @Override
  public Future<HelloReply> sayHello(HelloRequest request) {
    return Future.succeededFuture(
      HelloReply.newBuilder().setMessage("Hello " + request.getName()).build());
  }
};

ServerInterceptor contextInterceptor = new ContextServerInterceptor() {
  @Override
  public void bind(Metadata metadata) {
    put("sessionId", metadata.get(SESSION_ID_METADATA_KEY));
  }
};

// Create the server
VertxServer rpcServer = VertxServerBuilder
  .forAddress(vertx, "my.host", 8080)
  .addService(ServerInterceptors.intercept(service, contextInterceptor))
  .build();

gRPC 客户端

没有客户端的服务器是没用的,所以我们来创建一个客户端。为此,有些步骤与服务器重叠。首先,我们需要有 RPC 定义,这应该已经完成,否则就不会有服务器,并且相同的定义也应该已经编译过了。

请注意,编译器将始终生成基础服务器和客户端存根,因此如果您已经编译过一次,则无需再次编译。

每个客户端存根都将始终需要一个到服务器的通信通道,因此我们首先需要创建一个 gRPC 通道

ManagedChannel channel = VertxChannelBuilder
  .forAddress(vertx, "localhost", 8080)
  .usePlaintext()
  .build();

// Get a stub to use for interacting with the remote service
GreeterGrpc.GreeterStub stub = GreeterGrpc.newStub(channel);

一旦存根创建成功,我们就可以与服务器通信了,这次更容易,因为存根已经提供了正确的方法定义和参数类型

HelloRequest request = HelloRequest.newBuilder().setName("Julien").build();

// Call the remote service
stub.sayHello(request, new StreamObserver<HelloReply>() {
  private HelloReply helloReply;

  @Override
  public void onNext(HelloReply helloReply) {
    this.helloReply = helloReply;
  }

  @Override
  public void onError(Throwable throwable) {
    System.out.println("Coult not reach server " + throwable.getMessage());
  }

  @Override
  public void onCompleted() {
    System.out.println("Got the server response: " + helloReply.getMessage());
  }
});

使用 Vert.x Future 和 Stream

前面的示例使用 gRPC 客户端,通过 gRPC 异步构造(如 io.grpc.stub.StreamObserver)进行异步处理。此代码由 protoc 编译器生成。

上面的插件配置配置了以下插件

<protocPlugin>
  <id>vertx-grpc-protoc-plugin</id>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-grpc-protoc-plugin</artifactId>
  <version>${stack.version}</version>
  <mainClass>io.vertx.grpc.protoc.plugin.VertxGrpcGenerator</mainClass>
</protocPlugin>

这会生成一个客户端版本,它使用 Vert.x 异步构造,例如 FutureReadStreamWriteStream,这在 Vert.x 生态系统中可能更方便。

HelloRequest request = HelloRequest.newBuilder().setName("Julien").build();

// Call the remote service
Future<HelloReply> future = stub.sayHello(request);

// Listen to completion events
future
  .onSuccess(helloReply -> System.out.println("Got the server response: " + helloReply.getMessage())).onFailure(err -> System.out.println("Coult not reach server " + err));

客户端 Gzip 压缩

您可以启用 gzip 压缩,以告知客户端发送压缩消息。

GreeterGrpc.GreeterStub stub = GreeterGrpc
  .newStub(channel)
  .withCompression("gzip");
只要服务器支持其他压缩器,并且在构建 ManagedChannel 时将它们注册到压缩器注册表中,您就可以使用它们。

SSL 配置

如果您之前启用了 SSL,您的客户端也将需要 SSL,为此我们需要配置通道

ManagedChannel channel = VertxChannelBuilder.
  forAddress(vertx, "localhost", 8080)
  .useSsl(options -> options
    .setSsl(true)
    .setUseAlpn(true)
    .setKeyCertOptions(new JksOptions()
      .setPath("client-truststore.jks")
      .setPassword("secret")))
  .build();
由于 gRPC 使用 HTTP/2 传输,SSL/TLS 设置需要在您的客户端中进行应用层协议协商

高级配置

到目前为止,所有 gRPC 示例都使用了合理的默认值,但还有更多内容。如果您需要完全控制服务器配置,应查阅文档:VertxServerBuilder;如果您需要控制客户端通道,则应查阅 VertxChannelBuilder。Vert.x gRPC 扩展了 grpc-java 项目(Netty 传输),因此建议阅读其文档

原生传输

客户端和服务器可以使用 Netty 的原生传输进行部署,这在创建 Vert.x 实例时实现。

Vertx.vertx(new VertxOptions().setPreferNativeTransport(true));

有关原生传输的更多信息,请参阅 Vert.x Core 文档。