Vert.x gRPC

gRPC 的最佳描述可见于维基百科。

gRPC 是一个最初由 Google 开发的开源远程过程调用(RPC)系统。它使用 HTTP/2 作为传输协议,Protocol Buffers 作为接口描述语言,并提供认证、双向流和流量控制、阻塞或非阻塞绑定以及取消和超时等功能。它为多种语言生成跨平台的客户端和服务器绑定。

— 维基百科

Vert.x gRPC 是一个模块,它将使 gRPC 的编程风格与 Vert.x 风格保持一致。作为本模块的用户,您将更熟悉使用 Vert.x 流(Streams)和 Future 的代码风格,同时受益于 gRPC 的所有优点。

有关 gRPC 的更多信息,请查阅官方文档网站 http://www.grpc.io/

Vert.x gRPC 分为几个部分

  • Vert.x gRPC 服务器

  • Vert.x gRPC 客户端

  • Vert.x gRPC/IO 服务器

  • Vert.x gRPC/IO 客户端

  • Vert.x gRPC/IO 上下文存储

Vert.x gRPC Protoc 插件 2

使用 Vert.x gRPC 最简单的方法是利用其内置的代码生成器插件。为此,必须按照 gRPC 的要求,以 protobuffer 格式定义协议。

syntax = "proto3";

option java_multiple_files = true;
option java_package = "examples";
option java_outer_classname = "HelloWorldProto";
package helloworld;

// The greeting service definition.
service Greeter {
  // Sends a greeting
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
  string name = 1;
}

// The response message containing the greetings
message HelloReply {
  string message = 1;
}

这是一个非常简单的示例,展示了单请求、单响应模式。

编译 RPC 定义

使用上述定义,我们需要对其进行编译。

您可以选择使用 protoc 编译器编译 proto 文件(如果需要),或者将其集成到您的构建中。

如果您使用 Apache Maven,则需要添加以下插件

<plugin>
  <groupId>org.xolstice.maven.plugins</groupId>
  <artifactId>protobuf-maven-plugin</artifactId>
  <version>0.6.1</version>
  <configuration>
    <protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}</protocArtifact>
    <pluginId>grpc-java</pluginId>
    <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
    <protocPlugins>
      <protocPlugin>
        <id>vertx-grpc-protoc-plugin2</id>
        <groupId>io.vertx</groupId>
        <artifactId>vertx-grpc-protoc-plugin2</artifactId>
        <version>${stack.version}</version>
        <mainClass>io.vertx.grpc.plugin.VertxGrpcGenerator</mainClass>
      </protocPlugin>
    </protocPlugins>
  </configuration>
  <executions>
    <execution>
      <id>compile</id>
      <configuration>
        <outputDirectory>${project.basedir}/src/main/java</outputDirectory>
        <clearOutputDirectory>false</clearOutputDirectory>
      </configuration>
      <goals>
        <goal>compile</goal>
        <goal>compile-custom</goal>
      </goals>
    </execution>
  </executions>
</plugin>

${os.detected.classifier} 属性用于使构建与操作系统无关,在 OSX 上它被替换为 osx-x86_64,依此类推。要使用它,您需要在 pom.xmlbuild 部分中添加 os-maven-plugin[https://github.com/trustin/os-maven-plugin]。

<build>
  ...
  <extensions>
    <extension>
      <groupId>kr.motd.maven</groupId>
      <artifactId>os-maven-plugin</artifactId>
      <version>1.4.1.Final</version>
    </extension>
  </extensions>
  ...
</build>

此插件将编译 src/main/proto 下的 proto 文件,并使其可用于您的项目。

默认情况下,此插件会生成客户端和服务端文件。如果您只需要其中一个,可以配置生成器以仅生成特定类

<protocPlugin>
  <id>vertx-grpc-protoc-plugin2</id>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-grpc-protoc-plugin2</artifactId>
  <version>${stack.version}</version>
  <mainClass>io.vertx.grpc.plugin.VertxGrpcGenerator</mainClass>
  <args>
    <arg>--grpc-client</arg>
  </args>
</protocPlugin>

该插件接受以下参数

  • --grpc-client: 生成客户端文件

  • --grpc-service: 生成服务端文件

  • --grpc-io: 生成与 io.grpc:grpc-stub 兼容的存根文件,默认不生成

  • --grpc-transcoding: 是否为带有 HTTP 注解的方法生成转码选项

  • --vertx-codegen: 是否将 Vert.x 注解添加到生成的类(@VertxGen)中。默认情况下,此功能是禁用的

  • --service-prefix: 生成带前缀的服务类。例如,如果您将其设置为 MyService,则生成的服务类将是 MyServiceGreeterService,而不是 GreeterService

  • --help: 显示帮助信息

  • --version: 显示版本信息

如果未提供特定的生成选项,默认情况下将同时生成客户端和服务端文件。默认情况下,支持所有扩展(目前仅支持 'http')。

如果您使用 Gradle,则需要添加以下插件

...
apply plugin: 'com.google.protobuf'
...
buildscript {
  ...
  dependencies {
    // ASSUMES GRADLE 2.12 OR HIGHER. Use plugin version 0.7.5 with earlier gradle versions
    classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.0'
  }
}
...
protobuf {
  protoc {
    artifact = 'com.google.protobuf:protoc:3.2.0'
  }
  plugins {
    grpc {
      artifact = "io.grpc:protoc-gen-grpc-java:1.25.0"
    }
    vertx {
      artifact = "io.vertx:vertx-grpc-protoc-plugin2:${vertx.grpc.version}"
    }
  }
  generateProtoTasks {
    all()*.plugins {
      grpc
      vertx
    }
  }
}

此插件将编译 build/generated/source/proto/main 下的 proto 文件,并使其可用于您的项目。

生成器插件处于技术预览阶段,可能会有所更改

生成的 RPC 文件

对于每个服务定义,该插件都会创建多个 Java RPC 文件,让我们快速浏览一下:

  • examples/Greeter.java

  • examples/GreeterClient.java

  • examples/GreeterService.java

  • examples/GreeterGrpcClient.java

  • examples/GreeterGrpcService.java

Vert.x gRPC 服务器

Vert.x gRPC 服务器是一个由 Vert.x HTTP 服务器驱动的 gRPC 服务器,取代了集成的基于 Netty 的 gRPC 客户端。

该服务器提供面向 gRPC 请求/响应的 API,以及使用 Vert.x gRPC 生成器生成的存根方法。

使用 Vert.x gRPC 服务器

要使用 Vert.x gRPC 服务器,请将以下依赖项添加到您的构建描述符的 dependencies 部分:

  • Maven(在您的 pom.xml 中)

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-grpc-server</artifactId>
  <version>5.0.1</version>
</dependency>
  • Gradle(在您的 build.gradle 文件中)

dependencies {
  compile 'io.vertx:vertx-grpc-server:5.0.1'
}

创建 gRPC 服务器

GrpcServer 是一个 Handler<HttpServerRequest>,可以用作 HTTP 服务器请求处理器。

GrpcServer grpcServer = GrpcServer.server(vertx);

HttpServer server = vertx.createHttpServer(options);

server
  .requestHandler(grpcServer)
  .listen();

GrpcServer 可以在 Vert.x Web 路由器中使用

route.consumes("application/grpc").handler(rc -> grpcServer.handle(rc.request()));

惯用服务器存根生成

Vert.x gRPC protoc 插件生成惯用服务存根

  • examples/Greeter.java

  • examples/GreeterService.java

  • examples/GreeterGrpcService.java

Greeter 接口定义了服务的主要契约

public interface Greeter {

  Future<examples.grpc.HelloReply> sayHello(examples.grpc.HelloRequest request);

}

GreeterService 类实现了 Greeter 契约

public class GreeterService implements Greeter {

  public Future<examples.grpc.HelloReply> sayHello(examples.grpc.HelloRequest request) {
    // ...
  }

  protected void sayHello(examples.grpc.HelloRequest request, Completable<examples.grpc.HelloReply> response) {
    // ...
  }
}

通常,您会扩展此服务以提供服务实现。

GreeterGrpcService 类扩展了 GreeterService,让您可以将您的服务实现转变为可以绑定到 Vert.x gRPC 服务器的 gRPC 服务

GreeterGrpcService service = GreeterGrpcService.create(new GreeterService() {
  @Override
  public Future<HelloReply> sayHello(HelloRequest request) {
    return Future.succeededFuture(HelloReply.newBuilder()
      .setMessage("Hello " + request.getName())
      .build());
  }
});

grpcServer.add(service);

每个服务方法有两种形式,您可以根据风格覆盖您喜欢的方法。

一元方法

一元方法返回一个 Vert.x Future

GreeterService service = new GreeterService() {
  @Override
  public Future<HelloReply> sayHello(HelloRequest request) {
    return Future.succeededFuture(HelloReply.newBuilder().setMessage("Hello " + request.getName()).build());
  }
};

或处理一个 Vert.x Promise

GreeterService service = new GreeterService() {
  @Override
  public void sayHello(HelloRequest request, Completable<HelloReply> response) {
    response.succeed(HelloReply.newBuilder().setMessage("Hello " + request.getName()).build());
  }
};

在这两种情况下,您都需要将存根绑定到现有的 GrpcServer

server.addService(service);

流式请求

流式请求通过 ReadStream 实现

StreamingGrpcService service = new StreamingGrpcService() {
  @Override
  public void sink(ReadStream<Item> stream, Completable<Empty> response) {
    stream.handler(item -> {
      System.out.println("Process item " + item.getValue());
    });
    // Send response
    stream.endHandler(v -> response.succeed(Empty.getDefaultInstance()));
  }
};
server.addService(service);

流式响应

流式响应通过 Vert.x 流实现,有两种形式。

您可以返回一个 Vert.x ReadStream 并让服务为您发送它

StreamingService service = new StreamingService() {
  @Override
  public Future<ReadStream<Item>> source(Empty request) {
    return streamOfItems();
  }
};

或者您可以处理一个 WriteStream

StreamingService service = new StreamingService() {
  @Override
  public void source(Empty request, WriteStream<Item> response) {
    response.write(Item.newBuilder().setValue("value-1").build());
    response.end(Item.newBuilder().setValue("value-2").build());
  }
};

服务器请求/响应 API

gRPC 请求/响应服务器 API 提供了一种与客户端交互的替代方式,而无需扩展 Java 类。

绑定 gRPC 服务方法

您将使用 ServiceMethod 来绑定 gRPC 服务方法

ServiceName serviceName = ServiceName.create("examples.grpc", "Greeter");
ServiceMethod<HelloRequest, HelloReply> sayHello = ServiceMethod.server(
  serviceName,
  "SayHello",
  GrpcMessageEncoder.encoder(),
  GrpcMessageDecoder.decoder(HelloRequest.newBuilder()));

然而,大多数时候您将使用由 Vert.x gRPC protoc 插件生成的 ServiceMethod 常量

ServiceMethod<HelloRequest, HelloReply> sayHello = GreeterGrpcService.SayHello;

请求/响应

每个服务方法都由一个处理器处理,该处理器使用 ServiceMethod 进行绑定。

server.callHandler(GreeterGrpcService.SayHello, request -> {

  request.handler(hello -> {

    GrpcServerResponse<HelloRequest, HelloReply> response = request.response();

    HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + hello.getName()).build();

    response.end(reply);
  });
});

流式请求

您可以设置处理器来处理请求事件

server.callHandler(StreamingGrpcService.Sink, request -> {
  request.handler(item -> {
    // Process item
  });
  request.endHandler(v ->{
    // No more items
    // Send the response
    request.response().end(Empty.getDefaultInstance());
  });
  request.exceptionHandler(err -> {
    // Something wrong happened
  });
});

流式响应

流式响应涉及对流的每个元素调用 write,并使用 end 结束流

server.callHandler(StreamingGrpcService.Source, request -> {
  GrpcServerResponse<Empty, Item> response = request.response();
  request.handler(empty -> {
    for (int i = 0;i < 10;i++) {
      response.write(Item.newBuilder().setValue("1").build());
    }
    response.end();
  });
});

双向请求/响应

双向请求/响应只是流式请求和流式响应的组合

server.callHandler(StreamingGrpcService.Pipe, request -> {

  request.handler(item -> request.response().write(item));
  request.endHandler(v -> request.response().end());
});

gRPC 协议

HTTP/2 协议

Vert.x gRPC 服务器默认提供 HTTP/2 协议。

此外,Vert.x gRPC 服务器还支持 gRPC-Web 协议和 HTTP/JSON 转码。

gRPC-Web 协议

Vert.x gRPC 服务器默认支持 gRPC-Web 协议。

如果您的网站服务器和 gRPC 服务器不同,您必须为 gRPC 服务器配置 CORS。这可以通过 Vert.x Web 路由器和 CORS 处理器完成

CorsHandler corsHandler = CorsHandler.create()
  .addRelativeOrigin("https://www.mycompany.com")
  .allowedHeaders(Set.of("keep-alive","user-agent","cache-control","content-type","content-transfer-encoding","x-custom-key","x-user-agent","x-grpc-web","grpc-timeout"))
  .exposedHeaders(Set.of("x-custom-key","grpc-status","grpc-message"));
router.route("/com.mycompany.MyService/*").handler(corsHandler);

gRPC 转码

Vert.x gRPC 服务器支持gRPC 转码,它允许在 HTTP/JSON 请求和 gRPC 服务之间进行映射。

协议配置

默认情况下,gRPC 服务器接受所有协议。

要禁用特定协议支持,请使用 removeEnabledProtocol 配置选项,然后使用 GrpcServer#server(vertx, options) 创建服务器。

移除 gRPC-Web 支持
GrpcServer server = GrpcServer.server(vertx, new GrpcServerOptions()
  .removeEnabledProtocol(GrpcProtocol.WEB)
  .removeEnabledProtocol(GrpcProtocol.WEB_TEXT)
);

流量控制

请求和响应是带有反压的 Vert.x 流。

您可以暂停/恢复/获取请求

request.pause();

performAsyncOperation().onComplete(ar -> {
  // And then resume
  request.resume();
});

您可以检查响应的可写性并设置一个排水处理器

if (response.writeQueueFull()) {
  response.drainHandler(v -> {
    // Writable again
  });
} else {
  response.write(item);
}

超时和截止时间

gRPC 服务器处理超时和截止时间。

无论何时服务收到指示超时的请求,都可以检索到该超时。

long timeout = request.timeout();

if (timeout > 0L) {
  // A timeout has been received
}

默认情况下,服务器

  • 不会自动为给定请求安排截止时间

  • 不会自动将截止时间传播到 Vert.x 客户端

服务器可以安排截止时间:当请求带有超时时,服务器会在本地安排一个计时器,以便在响应未及时发送时取消请求。

服务器可以传播截止时间:当请求带有超时时,服务器会计算截止时间,并将当前服务器请求与此截止时间关联。Vert.x gRPC 客户端可以使用此截止时间来计算要发送的超时,并将超时级联到另一个 gRPC 服务器。

GrpcServer server = GrpcServer.server(vertx, new GrpcServerOptions()
  .setScheduleDeadlineAutomatically(true)
  .setDeadlinePropagation(true)
);

JSON 线路格式

gRPC 隐含地假定使用 Protobuf 线路格式。

Vert.x gRPC 服务器也支持 JSON 线路格式,即携带 application/grpc+json 内容类型的 gRPC 请求。

com.google.protobuf:protobuf-java-util 库执行 JSON 编码/解码。

Vert.x JsonObject 也支持贫血 JSON。

ServiceMethod<JsonObject, JsonObject> sayHello = ServiceMethod.server(
  ServiceName.create("helloworld", "Greeter"),
  "SayHello",
  GrpcMessageEncoder.JSON_OBJECT,
  GrpcMessageDecoder.JSON_OBJECT
);

server.callHandler(sayHello, request -> {
  request.last().onSuccess(helloRequest -> {
    request.response().end(new JsonObject().put("message", "Hello " + helloRequest.getString("name")));
  });
});

压缩

您可以在发送任何消息之前通过设置响应编码来压缩响应消息

response.encoding("gzip");

// Write items after encoding has been defined
response.write(Item.newBuilder().setValue("item-1").build());
response.write(Item.newBuilder().setValue("item-2").build());
response.write(Item.newBuilder().setValue("item-3").build());
gRPC-Web 协议不支持压缩。

解压缩

当客户端发送编码请求时,服务器会透明地进行解压缩。

gRPC-Web 协议不支持解压缩。

消息级别 API

服务器提供消息级别 API,用于直接与 protobuf 编码的 gRPC 消息进行交互。

服务器消息级别 API 可以与客户端消息级别 API 结合使用,以编写 gRPC 反向代理

当您不关心消息内容,而只想将它们转发到另一个服务时,例如您正在编写代理,此类 API 非常有用。

ServiceName greeterServiceName = ServiceName.create("helloworld", "Greeter");

server.callHandler(request -> {

  if (request.serviceName().equals(greeterServiceName) && request.methodName().equals("SayHello")) {

    request.handler(protoHello -> {
      // Handle protobuf encoded hello
      performAsyncOperation(protoHello)
        .onSuccess(protoReply -> {
          // Reply with protobuf encoded reply
          request.response().end(protoReply);
        }).onFailure(err -> {
          request.response()
            .status(GrpcStatus.ABORTED)
            .end();
        });
    });
  } else {
    request.response()
      .status(GrpcStatus.NOT_FOUND)
      .end();
  }
});

您还可以设置 messageHandler 来处理 GrpcMessage,此类消息保留客户端编码,这在您转发到的服务可以直接处理压缩消息时非常有用,在这种情况下,消息无需再次解压缩和压缩。`

ServiceName greeterServiceName = ServiceName.create("helloworld", "Greeter");

server.callHandler(request -> {

  if (request.serviceName().equals(greeterServiceName) && request.methodName().equals("SayHello")) {

    request.messageHandler(helloMessage -> {

      // Can be identity or gzip
      String helloEncoding = helloMessage.encoding();

      // Handle hello message
      handleGrpcMessage(helloMessage)
        .onSuccess(replyMessage -> {
          // Reply with reply message

          // Can be identity or gzip
          String replyEncoding = replyMessage.encoding();

          // Send the reply
          request.response().endMessage(replyMessage);
        }).onFailure(err -> {
          request.response()
            .status(GrpcStatus.ABORTED)
            .end();
        });
    });
  } else {
    request.response()
      .status(GrpcStatus.NOT_FOUND)
      .end();
  }
});

writeMessageendMessage 将处理消息编码

  • 当消息使用响应编码时,消息将按原样发送

  • 当消息使用不同的编码时,它将被编码,例如压缩或解压缩

gRPC 反射服务

可以为您的 Vert.x gRPC 服务器添加对 gRPC 反射服务的支持。

要使用反射服务,请添加以下依赖项

  • Maven(在您的 pom.xml 中)

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-grpc-reflection</artifactId>
  <version>5.0.1</version>
</dependency>
  • Gradle(在您的 build.gradle 文件中)

dependencies {
  compile 'io.vertx:vertx-grpc-reflection:5.0.1'
}

然后您可以在服务器中部署反射服务

GrpcServer grpcServer = GrpcServer.server(vertx);

// Add reflection service
grpcServer.addService(ReflectionService.v1());

GreeterGrpcService greeterService = new GreeterGrpcService() {
  @Override
  public Future<HelloReply> sayHello(HelloRequest request) {
    return Future.succeededFuture(HelloReply.newBuilder().setMessage("Hello " + request.getName()).build());
  }
};

grpcServer.addService(greeterService);

// Start the server
vertx.createHttpServer(options)
  .requestHandler(grpcServer)
  .listen();

gRPC 健康服务

gRPC 健康服务实现了标准的 gRPC 健康检查协议,它允许客户端检查您的服务的健康状态。

默认情况下,HealthService 返回所有注册的服务,状态为 SERVING,如果您想覆盖此行为,您需要为指定的服务注册健康检查。

健康服务提供了一种通过 gRPC 暴露您的服务健康状态的简单方法。它实现了两个 RPC

  • Check: 用于检查服务的健康状态

  • List: 用于列出服务及其健康状态

  • Watch: 用于持续监控服务的健康状态

要使用健康服务,请添加以下依赖项

  • Maven(在您的 pom.xml 中)

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-grpc-health</artifactId>
  <version>5.0.1</version>
</dependency>
  • Gradle(在您的 build.gradle 文件中)

dependencies {
  compile 'io.vertx:vertx-grpc-health:5.0.1'
}

以下是如何创建健康服务并将其绑定到您的 gRPC 服务器:

GrpcServer grpcServer = GrpcServer.server(vertx);

// Create a health service instance
HealthService healthService = HealthService.create(vertx);

// Register health checks for your services
healthService.register("my.service.name", () -> Future.succeededFuture(true));

// Add the health service to the gRPC server
grpcServer.addService(healthService);

// Start the server
vertx.createHttpServer(options)
  .requestHandler(grpcServer)
  .listen();

客户端随后可以使用标准的 gRPC 健康检查协议检查您的服务的健康状况。

健康检查服务在 Vert.x 5.0 中处于技术预览阶段,直到 API 稳定。

Vert.x gRPC 客户端

Vert.x gRPC 客户端提供面向 gRPC 请求/响应的 API 以及生成的客户端方法。

使用 Vert.x gRPC 客户端

要使用 Vert.x gRPC 客户端,请将以下依赖项添加到您的构建描述符的 dependencies 部分:

  • Maven(在您的 pom.xml 中)

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-grpc-client</artifactId>
  <version>5.0.1</version>
</dependency>
  • Gradle(在您的 build.gradle 文件中)

dependencies {
  compile 'io.vertx:vertx-grpc-client:5.0.1'
}

创建 gRPC 客户端

您可以轻松创建 gRPC 客户端

GrpcClient client = GrpcClient.client(vertx);

惯用客户端 API 生成

Vert.x gRPC protoc 插件生成惯用客户端代码

  • examples/Greeter.java

  • examples/GreeterClient.java

  • examples/GreeterGrpcClient.java

惯用客户端封装了一个 GrpcClient 并提供 Vert.x 惯用 API 以与服务交互

Greeter 接口定义了服务的主要契约

public interface Greeter {

  Future<examples.grpc.HelloReply> sayHello(examples.grpc.HelloRequest request);

}

GreeterClient 接口扩展了 Greeter 契约,并使用 Vert.x codegen 注解对其进行标注,提供了执行进一步代码生成(例如生成 RxJavaMutiny 客户端)的选项。

@io.vertx.codegen.annotations.VertxGen
public interface GreeterClient extends Greeter {

  @io.vertx.codegen.annotations.GenIgnore(io.vertx.codegen.annotations.GenIgnore.PERMITTED_TYPE)
  Future<examples.grpc.HelloReply> sayHello(examples.grpc.HelloRequest request);
}

GreeterGrpcClient 接口扩展了 GreeterClient 并提供了一个使用 Vert.x gRPC 客户端与服务交互的实现

GreeterGrpcClient greeterClient = GreeterGrpcClient.create(client, SocketAddress.inetSocketAddress(port, host));

一元服务

一元服务返回一个 Vert.x Future

Future<HelloReply> response = greeterClient.sayHello(HelloRequest.newBuilder().setName("John").build());

response.onSuccess(result -> System.out.println("Service responded: " + response.result().getMessage()));

response.onFailure(err -> System.out.println("Service failure: " + response.cause().getMessage()));

流式请求

流式请求使用 lambda,其中传入一个 Vert.x WriteStream,用于发送消息到服务

Future<Empty> response = streamingClient.sink((stream, err) -> {
  stream.write(Item.newBuilder().setValue("Value 1").build());
  stream.write(Item.newBuilder().setValue("Value 2").build());
  stream.end(Item.newBuilder().setValue("Value 3").build());
});

或者,您可以传递一个消息流到服务

Future<Empty> response = streamingClient.sink(stream);

流式响应

流式响应获取一个 Vert.x ReadStream,其中包含服务发送的消息

Future<ReadStream<Item>> response = streamingClient.source(Empty.getDefaultInstance());

response.onSuccess(stream -> stream
  .handler(item -> System.out.println("Item " + item.getValue()))
  .exceptionHandler(err -> System.out.println("Stream failed " + err.getMessage()))
  .endHandler(v -> System.out.println("Stream ended")));

response.onFailure(err -> System.out.println("Service failure: " + err.getMessage()));

RxJava 客户端包装器生成

如果您想使用 Vert.x codegen 注解生成您的客户端,您可以使用 vertx-grpc-protoc-plugin2 中的选项来生成一个带有 Vert.x codegen 注解的客户端服务接口。为此,您需要将 --vertx-codegen 选项传递给 vertx-grpc-protoc-plugin

<protocPlugin>
  <id>vertx-grpc-protoc-plugin2</id>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-grpc-protoc-plugin2</artifactId>
  <version>${stack.version}</version>
  <mainClass>io.vertx.grpc.plugin.VertxGrpcGenerator</mainClass>
  <args>
    <arg>--grpc-client</arg>
    <arg>--vertx-codegen</arg>
  </args>
</protocPlugin>

生成的客户端服务接口将使用 @io.vertx.codegen.annotations.VertxGen 进行注解。

@io.vertx.codegen.annotations.VertxGen
public interface GreeterClient {
  ...
}

因此,Vert.x codegen 处理器可以对其进行处理,从而 Vert.x RxJava 生成器将生成一个带有惯用 RxJava API 的 RxJava 客户端包装器。

以下是 Maven 配置示例

<plugin>
  <artifactId>maven-compiler-plugin</artifactId>
  <executions>
    <execution>
      <id>default-compile</id>
      <configuration>
        <annotationProcessorPaths>
          <annotationProcessorPath>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-codegen</artifactId>
            <classifier>processor</classifier>
            <version>${vertx.version}</version>
          </annotationProcessorPath>
          <annotationProcessorPath>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-rx-java3-gen</artifactId>
            <version>${vertx.version}</version>
          </annotationProcessorPath>
        </annotationProcessorPaths>
      </configuration>
    </execution>
  </executions>
</plugin>

然后可以使用生成的

// Use the RxJava version
GreeterClient client = io.grpc.examples.rxjava3.helloworld.GreeterClient.create(grpcClient, SocketAddress.inetSocketAddress(8080, "localhost"));

// Get a Single instead of a Future
Single<HelloReply> reply = client.sayHello(HelloRequest.newBuilder().setName("World").build());

Flowable<HelloRequest> requestStream = Flowable.just("World", "Monde", "Mundo")
      .map(name -> HelloRequest.newBuilder().setName(name).build());

// Use Flowable instead of Vert.x streams
Flowable<String> responseStream = client
  .sayHelloStreaming(stream)
  .map(HelloReply::getMessage);
这需要 GrpcClient 的 RxJava 版本

客户端请求/响应 API

gRPC 请求/响应客户端 API 提供了一种与服务器交互的替代方式,而无需生成的存根。

寻址 gRPC 服务方法

您将使用 ServiceMethod 来寻址 gRPC 服务方法

ServiceName serviceName = ServiceName.create("examples.grpc", "Greeter");
ServiceMethod<HelloReply, HelloRequest> sayHello = ServiceMethod.client(
  serviceName,
  "SayHello",
  GrpcMessageEncoder.encoder(),
  GrpcMessageDecoder.decoder(HelloReply.newBuilder()));

然而,大多数时候您可以使用由 Vert.x gRPC protoc 插件生成的 ServiceMethod 常量

ServiceMethod<HelloReply, HelloRequest> sayHello = GreeterGrpcClient.SayHello;

请求/响应

与 gRPC 服务器交互涉及向远程 gRPC 服务创建请求。

SocketAddress server = SocketAddress.inetSocketAddress(443, "example.com");
Future<GrpcClientRequest<HelloRequest, HelloReply>> fut = client.request(server, GreeterGrpcClient.SayHello);
fut.onSuccess(request -> {
  // The end method calls the service
  request.end(HelloRequest.newBuilder().setName("Bob").build());
});

response 包含响应,last 包含结果

request.response().onSuccess(response -> {
  Future<HelloReply> fut = response.last();
  fut.onSuccess(reply -> {
    System.out.println("Received " + reply.getMessage());
  });
});

Future 组合可以紧凑地将所有先前的步骤结合在一起

client
  .request(server, GreeterGrpcClient.SayHello).compose(request -> {
    request.end(HelloRequest
      .newBuilder()
      .setName("Bob")
      .build());
    return request.response().compose(response -> response.last());
  }).onSuccess(reply -> {
    System.out.println("Received " + reply.getMessage());
  });

流式请求

流式请求涉及对流的每个元素调用 write,并使用 end 结束流

client
  .request(server, StreamingGrpcClient.Sink)
  .onSuccess(request -> {
  for (int i = 0;i < 10;i++) {
    request.write(Item.newBuilder().setValue("1").build());
  }
  request.end();
});

流式响应

您可以设置处理器来处理流式响应的响应事件

client
  .request(server, StreamingGrpcClient.Source)
  .compose(request -> {
    request.end(Empty.getDefaultInstance());
    return request.response();
  })
  .onSuccess(response -> {
    response.handler(item -> {
      // Process item
    });
    response.endHandler(v -> {
      // Done
    });
    response.exceptionHandler(err -> {
      // Something went bad
    });
  });

双向请求/响应

双向请求/响应只是流式请求和流式响应的组合。

流量控制

无论您使用请求/响应 API 还是惯用客户端 API,您都与带有反压的 Vert.x 流进行交互。

您可以检查请求的可写性并设置一个排水处理器

if (request.writeQueueFull()) {
  request.drainHandler(v -> {
    // Writable again
  });
} else {
  request.write(item);
}

您可以暂停/恢复/获取响应,以精确控制您读取的消息。

response.pause();

performAsyncOperation().onComplete(ar -> {
  // And then resume
  response.resume();
});
您可以在 Vert.x 核心文档中了解更多关于 Vert.x 流的信息

超时和截止时间

gRPC 客户端处理超时和截止时间,在 gRPC 请求上设置超时会指示客户端发送超时信息,以使服务器知道客户端希望在定义的时间内收到响应。

此外,客户端应配置为安排截止时间:当请求上设置了超时时,客户端会在本地安排一个计时器,以便在未及时收到响应时取消请求。

GrpcClient client = GrpcClient.client(vertx, new GrpcClientOptions()
  .setTimeout(10)
  .setTimeoutUnit(TimeUnit.SECONDS)
  .setScheduleDeadlineAutomatically(true));

超时也可以按请求设置。

Future<GrpcClientRequest<HelloRequest, HelloReply>> fut = client.request(server, GreeterGrpcClient.SayHello);
fut.onSuccess(request -> {

  request
    // Given this request, set a 10 seconds timeout that will be sent to the gRPC service
    .timeout(10, TimeUnit.SECONDS);

  request.end(HelloRequest.newBuilder().setName("Bob").build());
});

取消

您可以调用 cancel 来取消请求

request.cancel();
取消会向服务器发送一个 HTTP/2 重置帧

客户端负载均衡

gRPC 客户端可以配置为执行客户端负载均衡。

基于 DNS 的负载均衡

基于 DNS 的负载均衡通过 DNS 查询工作,将单个主机解析为多个 IP 地址(通常是 A 记录)。

您可以设置负载均衡器以启用基于 DNS 的负载均衡

GrpcClient client = GrpcClient
  .builder(vertx)
  .withLoadBalancer(LoadBalancer.ROUND_ROBIN)
  .build();

client
  .request(SocketAddress.inetSocketAddress(port, server), GreeterGrpcClient.SayHello)
  .compose(request -> {
    request.end(HelloRequest
      .newBuilder()
      .setName("Bob")
      .build());
    return request.response().compose(response -> response.last());
  }).onSuccess(reply -> {
    System.out.println("Received " + reply.getMessage());
  });

通常的负载均衡策略都可用,您可以参考 Vert.x HTTP 客户端负载均衡文档进行配置。

基于地址的负载均衡

基于地址的负载均衡依赖于 Vert.x 地址解析器将单个地址解析为多个主机/端口套接字地址。

您可以设置一个地址解析器来启用负载均衡,Vert.x 服务解析器实现了一些地址解析器,例如 Kubernetes 解析器。

GrpcClient client = GrpcClient
  .builder(vertx)
  .withAddressResolver(KubeResolver.create())
  .withLoadBalancer(LoadBalancer.ROUND_ROBIN)
  .build();

与基于 DNS 的负载均衡不同,基于地址的负载均衡使用抽象的 Address 而不是 SocketAddress。地址解析器实现将地址解析为套接字地址列表。

Vert.x 服务解析器定义了 ServiceAddress

ServiceAddress address = ServiceAddress.of("GreeterService");

client
  .request(address, GreeterGrpcClient.SayHello)
  .compose(request -> {
    request.end(HelloRequest
      .newBuilder()
      .setName("Bob")
      .build());
    return request.response().compose(response -> response.last());
  }).onSuccess(reply -> {
    System.out.println("Received " + reply.getMessage());
  });

您可以参考 Vert.x 服务解析器项目文档了解更多详细信息。

JSON 线路格式

gRPC 隐含地假定使用 Protobuf 线路格式。

Vert.x gRPC 客户端也支持 JSON 线路格式。

您可以使用 application/grpc+json 内容类型调用 JSON 服务方法。

client
  .request(server, GreeterGrpcClient.SayHello).compose(request -> {
    request.format(WireFormat.JSON);
    request.end(HelloRequest
      .newBuilder()
      .setName("Bob")
      .build());
    return request.response().compose(response -> response.last());
  }).onSuccess(reply -> {
    System.out.println("Received " + reply.getMessage());
  });

客户端将发送 application/grpc+json 请求。

JSON 编码/解码通过 com.google.protobuf:protobuf-java-util 库实现。

Vert.x JsonObject 也支持贫血 JSON。

ServiceMethod<JsonObject, JsonObject> sayHello = ServiceMethod.client(
  ServiceName.create("helloworld", "Greeter"),
  "SayHello",
  GrpcMessageEncoder.JSON_OBJECT,
  GrpcMessageDecoder.JSON_OBJECT
);
client
  .request(server, sayHello).compose(request -> {
    request.end(new JsonObject().put("name", "Bob"));
    return request.response().compose(response -> response.last());
  }).onSuccess(reply -> {
    System.out.println("Received " + reply.getString("message"));
  });

您还可以在创建惯用客户端时指定 JSON 线路格式

GreeterGrpcClient greeterClient = GreeterGrpcClient.create(client, SocketAddress.inetSocketAddress(port, host), WireFormat.JSON);

压缩

您可以在发送任何消息之前通过设置请求编码来压缩请求消息

request.encoding("gzip");

// Write items after encoding has been defined
request.write(Item.newBuilder().setValue("item-1").build());
request.write(Item.newBuilder().setValue("item-2").build());
request.write(Item.newBuilder().setValue("item-3").build());

解压缩

当服务器发送编码响应时,客户端会透明地进行解压缩。

消息级别 API

客户端提供消息级别 API,用于直接与 protobuf 编码的 gRPC 消息进行交互。

客户端消息级别 API 可以与服务器消息级别 API 结合使用,以编写对消息不透明的 gRPC 中间件,例如反向代理。

当您不关心消息内容,而只想将它们转发到另一个服务时,例如您正在编写代理,此类 API 非常有用。

Future<GrpcClientRequest<Buffer, Buffer>> requestFut = client.request(server);

requestFut.onSuccess(request -> {

  // Set the service name and the method to call
  request.serviceName(ServiceName.create("helloworld", "Greeter"));
  request.methodName("SayHello");

  // Send the protobuf request
  request.end(protoHello);

  // Handle the response
  Future<GrpcClientResponse<Buffer, Buffer>> responseFut = request.response();
  responseFut.onSuccess(response -> {
    response.handler(protoReply -> {
      // Handle the protobuf reply
    });
  });
});

您还可以设置 messageHandler 来处理 GrpcMessage,此类消息保留服务器编码。

Future<GrpcClientRequest<Buffer, Buffer>> requestFut = client.request(server);

requestFut.onSuccess(request -> {

  // Set the service name and the method to call
  request.serviceName(ServiceName.create("helloworld", "Greeter"));
  request.methodName("SayHello");

  // Send the protobuf request
  request.endMessage(GrpcMessage.message("identity", protoHello));

  // Handle the response
  Future<GrpcClientResponse<Buffer, Buffer>> responseFut = request.response();
  responseFut.onSuccess(response -> {
    response.messageHandler(replyMessage -> {
      System.out.println("Got reply message encoded as " + replyMessage.encoding());
    });
  });
});

writeMessageendMessage 将处理消息编码

  • 当消息使用响应编码时,消息将按原样发送

  • 当消息使用不同的编码时,它将被编码,例如压缩或解压缩

gRPC 转码

gRPC 转码允许您的服务同时接受 gRPC 和 HTTP/JSON(接受/回复 application/json 内容类型)请求,从而提供更大的灵活性。此功能在以下情况下特别有用:

  • 您想将 gRPC 服务暴露给不支持 gRPC 的客户端

  • 您需要同时支持传统的 REST API 和 gRPC 端点

  • 您想利用 gRPC 的效率,同时保持 HTTP/JSON 兼容性

gRPC 转码不需要特定配置,因为它服务于 HTTP 协议。

服务开箱即用支持转码,但是,通过额外的配置(例如挂载路径),您可以获得最佳的转码效果。

转码在 Vert.x 5.0 中处于技术预览阶段,直到 API 稳定。

使用转码

要使用 Vert.x gRPC 转码,请将以下依赖项添加到您的构建描述符的 dependencies 部分:

  • Maven(在您的 pom.xml 中)

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-grpc-transcoding</artifactId>
  <version>5.0.1</version>
</dependency>
  • Gradle(在您的 build.gradle 文件中)

dependencies {
  compile 'io.vertx:vertx-grpc-transcoding:5.0.1'
}

惯用转码生成

Vert.x gRPC protoc 插件为 gRPC 服务生成转码定义。

该插件支持通过 http.proto 为 gRPC 服务生成转码定义。

google.api.http 注解允许您将 gRPC 方法映射到 HTTP 端点,使您的服务能够同时处理 gRPC 和 HTTP/REST 请求。该插件支持各种 HTTP 方法(GET、POST)和自定义方法。

gRPC 转码定义示例
syntax = "proto3";

import "google/api/http.proto";

service Greeter {
  rpc SayHello (HelloRequest) returns (HelloReply) {
    option (google.api.http) = {
      post: "/v1/hello"
      additional_bindings {
        get: "/v1/hello/{name}"
      }
    };
  }
}

message HelloRequest {
  string name = 1;
}

message HelloReply {
  string message = 1;
}

当服务声明 HttpRule 时,转码服务方法由 Vert.x gRPC protoc 插件生成

Service service = new GreeterGrpcService() {
  @Override
  public Future<HelloReply> sayHello(HelloRequest request) {
    return Future.succeededFuture(HelloReply.newBuilder().setMessage("Hello " + request.getName()).build());
  }
};

server.addService(service);
服务响应所有服务器启用的协议,默认包括 application/grpcapplication/grpc+jsonapplication/grpc-webapplication/grpc-web-text,当然还有 application/json

要测试 gRPC 转码,您可以使用 curl 等工具向 gRPC 服务发送 HTTP 请求。

curl -X POST -H "Content-Type: application/json" -d '{"name":"vert.x"}' https://:8080/v1/hello

您还可以请求路径映射版本

curl -X GET -H "Content-Type: application/json" https://:8080/v1/hello/vert.x

自动 HTTP 映射

当 gRPC 服务在没有特定转码配置的情况下部署时,该服务会映射到 HTTP POST,请求/响应体将由转码器转换为服务消息。

基本 HTTP 映射

service Greeter {
  // Maps a GET endpoint with a URL parameter
  rpc SayHello (HelloRequest) returns (HelloReply) {
    option (google.api.http) = {
      get: "/v1/hello/{name}"
      additional_bindings {
        post: "/v1/hello"  // Alternative POST endpoint
      }
    };
  }

  // Maps a POST endpoint with an alternative GET binding
  rpc SayHelloAgain (HelloRequest) returns (HelloReply) {
    option (google.api.http) = {
      post: "/v2/hello"
      additional_bindings {
        get: "/v2/hello/{name}"
      }
    };
  }
}

高级配置

自定义方法

service Greeter {
  // Define custom HTTP methods
  rpc SayHelloCustom (HelloRequest) returns (HelloReply) {
    option (google.api.http) = {
      custom: {
        kind: "ACL"
        path: "/v1/hello/custom/{name}"
      }
    };
  }
}

请求体处理

service Greeter {
  // Specify which field should be mapped to the HTTP request body
  rpc SayHelloWithBody (HelloBodyRequest) returns (HelloReply) {
    option (google.api.http) = {
      post: "/v1/hello/body"
      body: "request"  // Maps the "request" field to the request body
    };
  }
}

message HelloBodyRequest {
  HelloRequest request = 1;
}

message HelloRequest {
  string name = 1;
}

message HelloReply {
  string reply = 1;
}

响应体映射

service Greeter {
  // Configure which field should be used as the HTTP response body
  rpc SayHelloWithResponseBody (HelloRequest) returns (HelloBodyResponse) {
    option (google.api.http) = {
      post: "/v1/hello/body/response"
      response_body: "response"  // Maps the "response" field to the response body
    };
  }
}

message HelloRequest {
  string name = 1;
}

message HelloBodyResponse {
  HelloResponse response = 1;
}

message HelloResponse {
  string reply = 1;
}

转码错误处理

如果在转码过程中发生错误,服务器将返回带有相应状态码的 HTTP 错误响应。大多数 gRPC 状态码将尽力映射到相应的 HTTP 状态码。如果状态码未映射,服务器将返回 500 内部服务器错误。

gRPC 状态码

HTTP 状态码

描述

OK

200

操作成功完成。

CANCELLED

408

操作已取消(通常由调用方取消)。

UNKNOWN

500

未知错误。

INVALID_ARGUMENT

400

客户端指定了无效参数。

DEADLINE_EXCEEDED

504

操作在截止时间前未能完成。

NOT_FOUND

404

未找到请求的实体(例如,文件或目录)。

ALREADY_EXISTS

409

我们尝试创建的某些实体(例如,文件或目录)已存在。

PERMISSION_DENIED

403

调用方没有执行指定操作的权限。

RESOURCE_EXHAUSTED

429

某些资源已耗尽,可能是用户配额,或者整个文件系统空间不足。

FAILED_PRECONDITION

400

操作被拒绝,因为系统未处于执行操作所需的状态

ABORTED

409

操作已中止,通常是由于并发问题,如序列器检查失败、事务中止等。

OUT_OF_RANGE

400

操作尝试超出有效范围。

UNIMPLEMENTED

501

此服务中未实现或不支持/未启用该操作。

INTERNAL

500

内部错误。这意味着底层系统预期的某些不变性已被破坏。

UNAVAILABLE

503

服务当前不可用。

DATA_LOSS

500

不可恢复的数据丢失或损坏。

UNAUTHENTICATED

401

该请求没有执行操作的有效认证凭据。

Vert.x gRPC/IO 服务器

Vert.x gRPC/IO 服务器通过 grpc-java 集成扩展了 Vert.x gRPC 服务器。

该服务器通过服务桥接提供与 grpc-java 生成的存根方法的兼容性。

使用 Vert.x gRPC/IO 服务器

要使用 Vert.x gRPC/IO 服务器,请将以下依赖项添加到您的构建描述符的 dependencies 部分:

  • Maven(在您的 pom.xml 中)

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-grpcio-server</artifactId>
  <version>5.0.1</version>
</dependency>
  • Gradle(在您的 build.gradle 文件中)

dependencies {
  compile 'io.vertx:vertx-grpcio-server:5.0.1'
}

服务桥接

Vert.x gRPC 服务器可以将 gRPC 服务桥接,以便与 grpc-java 生成的服务器类一起使用。

GrpcIoServer grpcServer = GrpcIoServer.server(vertx);

GreeterGrpc.GreeterImplBase service = new GreeterGrpc.GreeterImplBase() {
  @Override
  public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
    responseObserver.onNext(HelloReply.newBuilder().setMessage("Hello " + request.getName()).build());
    responseObserver.onCompleted();
  }
};

// Bind the service in the gRPC server
grpcServer.addService(service);

// Start the HTTP/2 server
vertx.createHttpServer(options)
  .requestHandler(grpcServer)
  .listen();

该桥接支持截止时间自动取消:当收到带有超时的 gRPC 请求时,会将一个截止时间与 io.grpc.Context 关联,并可以从当前上下文获取。当关联的超时触发时,此截止时间会自动取消正在进行的请求。

惯用 gRPC/IO 服务

Vert.x gRPC protoc 插件支持生成 gRPC/IO 客户端代码

  • examples/Greeter.java

  • examples/GreeterService.java

  • examples/GreeterGrpcIo.java

默认情况下,不生成 GreeterGrpcIo,要激活它,您需要告诉 Vert.x gRPC protoc 插件生成它

<protocPlugin>
  <id>vertx-grpc-protoc-plugin2</id>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-grpc-protoc-plugin2</artifactId>
  <version>${stack.version}</version>
  <mainClass>io.vertx.grpc.plugin.VertxGrpcGenerator</mainClass>
  <args>
    <arg>--grpc-io</arg>
  </args>
</protocPlugin>

GreeterGrpcIo 提供了一个可绑定的服务,该服务使用 Vert.x gRPC/IO 服务器来暴露服务

GrpcIoServer grpcServer = GrpcIoServer.server(vertx);

BindableService service = GreeterGrpcIo.bindableServiceOf(new GreeterService() {
  @Override
  public Future<HelloReply> sayHello(HelloRequest request) {
    return Future.succeededFuture(HelloReply.newBuilder().setMessage("Hello " + request.getName()).build());
  }
});

// Bind the service in the gRPC server
grpcServer.addService(service);

// Start the HTTP/2 server
vertx.createHttpServer(options)
  .requestHandler(grpcServer)
  .listen();

您可以阅读惯用服务器部分了解更多信息。

gRPC 反射 API

可以为您的 Vert.x gRPC 服务器添加对 gRPC 反射 API 的支持。

GrpcIoServer grpcServer = GrpcIoServer.server(vertx);

// Add reflection service
grpcServer.addService(ReflectionService.v1());

GreeterGrpc.GreeterImplBase greeterService = new GreeterGrpc.GreeterImplBase() {
  @Override
  public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
    responseObserver.onNext(HelloReply.newBuilder().setMessage("Hello " + request.getName()).build());
    responseObserver.onCompleted();
  }
};

// Bind the service in the gRPC server
grpcServer.addService(greeterService);

// Start the HTTP/2 server
vertx.createHttpServer(options)
  .requestHandler(grpcServer)
  .listen();

然后您可以使用 gRPCurl 等工具探索和调用您的 gRPC API。

grpcurl -plaintext localhost:50051 list

grpcurl -plaintext localhost:50051 describe .helloworld.HelloRequest

grpcurl -plaintext -d '{"name": "Vert.x"}' localhost:50051 helloworld.Greeter

Vert.x gRPC/IO 客户端

Vert.x gRPC/IO 客户端通过 grpc-java 集成扩展了 Vert.x gRPC 客户端。

该客户端通过 gRPC 通道提供生成的存根方法

使用 Vert.x gRPC/IO 客户端

要使用 Vert.x gRPC/IO 客户端,请将以下依赖项添加到您的构建描述符的 dependencies 部分:

  • Maven(在您的 pom.xml 中)

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-grpcio-client</artifactId>
  <version>5.0.1</version>
</dependency>
  • Gradle(在您的 build.gradle 文件中)

dependencies {
  compile 'io.vertx:vertx-grpcio-client:5.0.1'
}

gRPC 通道

Vert.x gRPC/IO 客户端提供 gRPC 通道,用于与 grpc-java 生成的客户端类一起使用。

GrpcIoClientChannel channel = new GrpcIoClientChannel(client, SocketAddress.inetSocketAddress(443, "example.com"));

GreeterGrpc.GreeterStub greeter = GreeterGrpc.newStub(channel);

StreamObserver<HelloReply> observer = new StreamObserver<HelloReply>() {
  @Override
  public void onNext(HelloReply value) {
    // Process response
  }

  @Override
  public void onCompleted() {
    // Done
  }

  @Override
  public void onError(Throwable t) {
    // Something went bad
  }
};

greeter.sayHello(HelloRequest.newBuilder().setName("Bob").build(), observer);

超时和截止时间通过常规 gRPC API 支持。

GreeterGrpc.GreeterStub greeter = GreeterGrpc.newStub(channel).withDeadlineAfter(10, TimeUnit.SECONDS);

greeter.sayHello(HelloRequest.newBuilder().setName("Bob").build(), observer);

截止时间是级联的,例如,当当前 io.grpc.Context 带有截止时间且存根没有显式设置截止时间时,客户端会自动继承隐式截止时间。在使用 gRPC 服务器调用中的存根时可以设置此类截止时间。

惯用 gRPC/IO 客户端

Vert.x gRPC protoc 插件支持生成 gRPC/IO 客户端代码

  • examples/Greeter.java

  • examples/GreeterClient.java

  • examples/GreeterGrpcIo.java

默认情况下,不生成 GreeterGrpcIo,要激活它,您需要告诉 Vert.x gRPC protoc 插件生成它

<protocPlugin>
  <id>vertx-grpc-protoc-plugin2</id>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-grpc-protoc-plugin2</artifactId>
  <version>${stack.version}</version>
  <mainClass>io.vertx.grpc.plugin.VertxGrpcGenerator</mainClass>
  <args>
    <arg>grpc-io</arg>
  </args>
</protocPlugin>

GreeterGrpcIo 提供了一个客户端存根,它使用 Vert.x gRPC/IO 客户端与服务交互

GrpcIoClientChannel channel = new GrpcIoClientChannel(client, SocketAddress.inetSocketAddress(443, "example.com"));

GreeterGrpcIo.GreeterStub greeter = GreeterGrpcIo.newStub(vertx, channel);

Future<HelloReply> future = greeter.sayHello(HelloRequest.newBuilder().setName("Bob").build());

future
  .onSuccess(reply -> {
    // Process response
  })
  .onFailure(err -> {
    // Something went bad
  });

您可以阅读惯用客户端部分了解更多信息。

Vert.x gRPC 上下文存储

Vert.x gRPC 上下文存储覆盖了默认的 io.grpc.Context.Storage 实现。

默认实现始终将 gRPC 上下文存储在线程局部变量中。此实现存储 gRPC 上下文的方式与 Vert.x 核心存储请求跟踪数据的方式相同。

这意味着,例如,当您实现服务方法时,gRPC 上下文会在 Vert.x 异步 API 调用之间传播

Context grpcCtx1 = Context.current();

vertx.executeBlocking(() -> {

  // Same as grpcCtx1
  Context grpcCtx2 = Context.current();

  return doSomething();

}).onComplete(ar -> {

  // Same as grpcCtx1 and grpcCtx2
  Context grpcCtx3 = Context.current();

});

gRPC 上下文仅当当前的 Vert.x Context 绑定到 Vert.x HTTP 服务器请求时,才会在 Vert.x 异步 API 调用之间传播。

如果例如您在非 Vert.x 线程上或从 Verticle 的 start 方法中调用存根,则不会传播。

使用 Vert.x gRPC 上下文存储

要使用 Vert.x gRPC 上下文存储,请将以下依赖项添加到您的构建描述符的 dependencies 部分:

  • Maven(在您的 pom.xml 中)

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-grpc-context-storage</artifactId>
  <version>5.0.1</version>
</dependency>
  • Gradle(在您的 build.gradle 文件中)

dependencies {
  compile 'io.vertx:vertx-grpc-context-storage:5.0.1'
}