<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-reactive-streams</artifactId>
<version>5.0.1</version>
</dependency>
Vert.x 响应式流集成
Reactive Streams 是一项旨在为 JVM 上的异步流处理提供带非阻塞背压标准的倡议。
此库为 Vert.x 提供了一个响应式流的实现。
Vert.x 提供其自己的机制,用于使用 io.vertx.core.streams.ReadStream
、io.vertx.core.streams.WriteStream
和 io.vertx.core.streams.Pump
处理数据流并带背压地将它们从一个泵送到另一个。请参阅 Vert.x 核心手册以获取有关 Vert.x 流的更多信息。
此库提供了读流和写流的实现,它们也充当响应式流的发布者和订阅者。这使我们能够将任何响应式流发布者或订阅者视为任何其他 Vert.x 读流或写流来处理。
使用 Vert.x 响应式流
要使用 Vert.x 响应式流,请将以下依赖项添加到您的构建描述符的 dependencies 部分
-
Maven(在您的
pom.xml
中)
-
Gradle(在您的
build.gradle
文件中)
compile io.vertx:vertx-reactive-streams:5.0.1
响应式读流
我们提供了一个 Vert.x ReadStream
接口的实现,通过 ReactiveReadStream
,它也实现了响应式流的 Subscriber
。
您可以将此实例传递给任何响应式流的 Publisher
(例如来自 Akka 的 Publisher),然后您就可以像任何其他 Vert.x ReadStream
一样从它读取数据(例如使用 Pump
将其泵送到 WriteStream
)。
这是一个示例,展示如何从其他响应式流实现(例如 Akka)获取一个发布者,并将该流泵送到服务器端 HTTP 响应的主体。这将自动处理背压。
ReactiveReadStream<Buffer> rrs = ReactiveReadStream.readStream();
// Subscribe the read stream to the publisher
otherPublisher.subscribe(rrs);
// Pipe from the read stream to the http response
rrs.pipeTo(response);
响应式写流
我们还提供了一个 Vert.x WriteStream
接口的实现,通过 ReactiveWriteStream
,它也实现了响应式流的 Publisher
。您可以获取任何响应式流的 Subscriber
(例如来自 Akka 的 Subscriber),然后您就可以像任何其他 Vert.x WriteStream
一样写入它*。(例如使用 Pump
从 ReadStream
泵送数据)。
您可以使用 pause
、resume
和 writeQueueFull
,就像您使用任何 Vert.x 读流一样,来处理您的背压。这在内部会自动转换为响应式流传播背压(请求更多项)的方法。
这是一个示例,展示如何从其他响应式流实现中获取一个订阅者,并将服务器端 HTTP 请求的主体泵送到该订阅者。这将自动处理背压。
ReactiveWriteStream<Buffer> rws = ReactiveWriteStream.writeStream(vertx);
// Subscribe the other subscriber to the write stream
rws.subscribe(otherSubscriber);
// Pipe the http request to the write stream
request.pipeTo(rws);