响应式

Vert.x 响应式流集成

Reactive Streams 是一项旨在为 JVM 上的异步流处理提供带非阻塞背压标准的倡议。

此库为 Vert.x 提供了一个响应式流的实现。

Vert.x 提供其自己的机制,用于使用 io.vertx.core.streams.ReadStreamio.vertx.core.streams.WriteStreamio.vertx.core.streams.Pump 处理数据流并带背压地将它们从一个泵送到另一个。请参阅 Vert.x 核心手册以获取有关 Vert.x 流的更多信息。

此库提供了读流和写流的实现,它们也充当响应式流的发布者和订阅者。这使我们能够将任何响应式流发布者或订阅者视为任何其他 Vert.x 读流或写流来处理。

使用 Vert.x 响应式流

要使用 Vert.x 响应式流,请将以下依赖项添加到您的构建描述符的 dependencies 部分

  • Maven(在您的 pom.xml 中)

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-reactive-streams</artifactId>
  <version>5.0.1</version>
</dependency>
  • Gradle(在您的 build.gradle 文件中)

compile io.vertx:vertx-reactive-streams:5.0.1

响应式读流

我们提供了一个 Vert.x ReadStream 接口的实现,通过 ReactiveReadStream,它也实现了响应式流的 Subscriber

您可以将此实例传递给任何响应式流的 Publisher(例如来自 Akka 的 Publisher),然后您就可以像任何其他 Vert.x ReadStream 一样从它读取数据(例如使用 Pump 将其泵送到 WriteStream)。

这是一个示例,展示如何从其他响应式流实现(例如 Akka)获取一个发布者,并将该流泵送到服务器端 HTTP 响应的主体。这将自动处理背压。

ReactiveReadStream<Buffer> rrs = ReactiveReadStream.readStream();

// Subscribe the read stream to the publisher
otherPublisher.subscribe(rrs);

// Pipe from the read stream to the http response
rrs.pipeTo(response);

响应式写流

我们还提供了一个 Vert.x WriteStream 接口的实现,通过 ReactiveWriteStream,它也实现了响应式流的 Publisher。您可以获取任何响应式流的 Subscriber(例如来自 Akka 的 Subscriber),然后您就可以像任何其他 Vert.x WriteStream 一样写入它*。(例如使用 PumpReadStream 泵送数据)。

您可以使用 pauseresumewriteQueueFull,就像您使用任何 Vert.x 读流一样,来处理您的背压。这在内部会自动转换为响应式流传播背压(请求更多项)的方法。

这是一个示例,展示如何从其他响应式流实现中获取一个订阅者,并将服务器端 HTTP 请求的主体泵送到该订阅者。这将自动处理背压。

ReactiveWriteStream<Buffer> rws = ReactiveWriteStream.writeStream(vertx);

// Subscribe the other subscriber to the write stream
rws.subscribe(otherSubscriber);

// Pipe the http request to the write stream
request.pipeTo(rws);