<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-rx-java3</artifactId>
<version>5.0.1</version>
</dependency>
Vert.x RxJava
适用于 RxJava3 的 Vert.x API
RxJava 是一个流行的库,用于使用 Java VM 的可观察序列来组合异步和基于事件的程序。
Vert.x 与 RxJava 自然集成,允许您在任何可以使用流或异步结果的地方使用 RxJava。
使用适用于 RxJava3 的 Vert.x API
要使用适用于 RxJava3 的 Vert.x API,请将以下依赖项添加到构建描述符的 dependencies 部分
-
Maven(在您的
pom.xml
中)
-
Gradle(在您的
build.gradle
文件中)
compile 'io.vertx:vertx-rx-java3:5.0.1'
有两种方法可以在 Vert.x 中使用 RxJava 3 API
-
通过原始 Vert.x API,带有提供静态方法用于在 Vert.x 核心 API 和 RxJava 3 API 之间转换对象的辅助类
-
通过增强核心 Vert.x API 的 Rxified Vert.x API。
读取流支持
RxJava 的 Flowable
与 Vert.x 的 ReadStream
类完美匹配:两者都提供项目流。
FlowableHelper.toFlowable
静态方法将 Vert.x 读取流转换为 Flowable
FileSystem fileSystem = vertx.fileSystem();
fileSystem.open("/data.txt", new OpenOptions()).onComplete(result -> {
AsyncFile file = result.result();
Flowable<Buffer> observable = FlowableHelper.toFlowable(file);
observable.forEach(data -> System.out.println("Read data: " + data.toString("UTF-8")));
});
Rxified Vert.x API 在 ReadStream
上提供了一个 toFlowable
方法
FileSystem fs = vertx.fileSystem();
fs.rxOpen("/data.txt", new OpenOptions())
.flatMapPublisher(file -> file.toFlowable())
.subscribe(data -> System.out.println("Read data: " + data.toString("UTF-8")));
此类 flowable 是热 flowable,即它们会产生通知而不管订阅如何,因为 ReadStream
可能会根据实现情况自发地发出项目或不发出。
在订阅时,适配器调用 handler
来设置其自身的处理器。
一些 ReadStream
实现在此调用后可以开始发出事件,其他一些无论是否设置了处理器都会发出事件
-
AsyncFile
在设置处理器后产生缓冲区事件 -
HttpServerRequest
独立于处理器产生事件(即如果未设置处理器,则缓冲区可能会丢失)
在这两种情况下,在同一调用中订阅 Flowable
是安全的,因为事件循环或工作 verticle 不能并发调用,所以订阅总是在处理器开始发出数据之前发生。
当您需要延迟订阅时,您需要 pause
(暂停) ReadStream
,然后 resume
(恢复) 它,这与您使用 ReadStream
的操作相同。
server.requestHandler(request -> {
if (request.method() == HttpMethod.POST) {
// Stop receiving buffers
request.pause();
checkAuth(res -> {
// Now we can receive buffers again
request.resume();
if (res.succeeded()) {
Flowable<Buffer> flowable = request.toFlowable();
flowable.subscribe(buff -> {
// Get buffers
});
}
});
}
});
同样,可以将现有 Flowable
转换为 Vert.x ReadStream
。
FlowableHelper.toReadStream
静态方法将 Flowable
转换为 Vert.x 读取流
Flowable<Buffer> observable = getFlowable();
ReadStream<Buffer> readStream = FlowableHelper.toReadStream(observable);
readStream.pipeTo(response);
写入流支持
WriteStream
,就像 org.reactivestreams.Subscriber
一样,消费项目,当它无法跟上时,会与生产者协作以避免积压不断增长。
Vert.x 提供了 WriteStreamSubscriber
适配器,您可以使用它将 Flowable
项目发送到任何 WriteStream
response.setChunked(true);
WriteStreamSubscriber<io.vertx.core.buffer.Buffer> subscriber = io.vertx.rxjava3.RxHelper.toSubscriber(response);
flowable.subscribe(subscriber);
还有一个 io.vertx.rxjava3.WriteStreamObserver 适配器,适用于非背压的 io.reactivex.Observable 。不同之处在于,即使 WriteStream 无法跟上生产者速率,此适配器也会将项目发送到其中。 |
如果您正在使用 Rxified Vert.x API 编程,WriteStream
实现提供了 toSubscriber
方法。这样,前面的示例将变得更加简单
response.setChunked(true);
flowable.subscribe(response.toSubscriber());
当 Flowable 成功终止时,适配器会调用 end 方法。 |
适配器会设置 WriteStream 的 drain 和 exception 处理器,因此请勿在订阅后使用它们。 |
当以下情况发生时,WriteStreamSubscriber
适配器能够调用回调函数:
-
Flowable
因错误终止,或者 -
WriteStream
失败(例如 HTTP 连接关闭或文件系统已满),或者 -
WriteStream
结束(即所有写入完成且文件已关闭),或者 -
WriteStream
以错误结束(即所有写入完成并且在关闭文件时发生错误)
这允许更健壮的程序设计,以及在处理完流之后调度其他任务
response.setChunked(true);
WriteStreamSubscriber<Buffer> subscriber = response.toSubscriber();
subscriber.onError(throwable -> {
if (!response.headWritten() && response.closed()) {
response.setStatusCode(500).end("oops");
} else {
// log error
}
});
subscriber.onWriteStreamError(throwable -> {
// log error
});
subscriber.onWriteStreamEnd(() -> {
// log end of transaction to audit system...
});
flowable.subscribe(subscriber);
如果 WriteStream 失败,适配器会取消 org.reactivestreams.Subscription 。 |
异步结果支持
您可以从现有 Vert.x Handler<AsyncResult<T>>
创建一个 RxJava Observer
并订阅它
Handler<AsyncResult<String>> handler = getHandler();
// Subscribe to a Single
Single.just("hello").subscribe(SingleHelper.toObserver(handler));
Handler<AsyncResult<String>> handler = getHandler();
// Subscribe to a Single
Maybe.just("hello").subscribe(MaybeHelper.toObserver(handler));
Handler<AsyncResult<Void>> handler = getHandler();
// Subscribe to a Single
Completable.complete().subscribe(CompletableHelper.toObserver(handler));
Rxified Vert.x API 将每个此类方法复制为带 rx
前缀的方法,返回 RxJava 的 Single
、Maybe
或 Completable
Single<HttpServer> single = vertx
.createHttpServer()
.rxListen(1234, "localhost");
// Subscribe to bind the server
single.
subscribe(
server -> {
// Server is listening
},
failure -> {
// Server could not start
}
);
此类 Single 是冷 Single,并且相应的 API 方法在订阅时被调用。
Maybe
可以产生结果或不产生结果
DnsClient client = vertx.createDnsClient(dnsPort, dnsHost);
// Obtain a maybe that performs the actual reverse lookup on subscribe
Maybe<String> maybe = client.rxReverseLookup(ipAddress);
// Subscribe to perform the lookup
maybe.
subscribe(
name -> {
// Lookup produced a result
},
failure -> {
// Lookup failed
},
() -> {
// Lookup produced no result
}
);
Completable
通常映射到 Handler<AsyncResult<Void>>
Completable single = server.rxClose();
// Subscribe to close the server
single.
subscribe(
() -> {
// Server is closed
},
failure -> {
// Server closed but encountered issue
}
);
如果您不能使用 Vert.x Rxified API,或者如果您有自己的基于回调的异步方法,Vert.x 提供了适配器 |
Maybe<String> maybe = MaybeHelper.toMaybe(handler -> {
vertx.executeBlocking(() -> invokeBlocking()).onComplete(handler);
});
调度器支持
反应式扩展有时需要调度操作,例如 Flowable#timer
创建并返回一个计时器,该计时器发出周期性事件。默认情况下,调度操作由 RxJava 管理,这意味着计时器线程不是 Vert.x 线程,因此不会在 Vert.x 事件循环或 Vert.x 工作线程上执行。
当 RxJava 方法处理调度器时,它接受一个重载方法,该方法接受一个额外的 io.reactivex.Scheduler
,RxHelper.scheduler
方法将返回一个可用于此类情况的调度器。
Scheduler scheduler = RxHelper.scheduler(vertx);
Observable<Long> timer = Observable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler);
对于阻塞的调度操作,可以使用 RxHelper.blockingScheduler
方法创建一个调度器
Scheduler scheduler = RxHelper.blockingScheduler(vertx);
Observable<Long> timer = Observable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler);
RxJava 也可以重新配置为使用 Vert.x 调度器
RxJavaPlugins.setComputationSchedulerHandler(s -> RxHelper.scheduler(vertx));
RxJavaPlugins.setIoSchedulerHandler(s -> RxHelper.blockingScheduler(vertx));
RxJavaPlugins.setNewThreadSchedulerHandler(s -> RxHelper.scheduler(vertx));
RxJava 使用 computation 来表示非阻塞任务,使用 io 来表示阻塞任务,这与 Vert.x 的术语相反 |
Rxified Vert.x API 也在 RxHelper
类上提供了类似的方法
Scheduler scheduler = RxHelper.scheduler(vertx);
Observable<Long> timer = Observable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler);
RxJavaPlugins.setComputationSchedulerHandler(s -> RxHelper.scheduler(vertx));
RxJavaPlugins.setIoSchedulerHandler(s -> RxHelper.blockingScheduler(vertx));
RxJavaPlugins.setNewThreadSchedulerHandler(s -> RxHelper.scheduler(vertx));
还可以创建一个由命名工作池支持的调度器。如果您希望重用特定线程池来调度阻塞操作,这会很有用
Scheduler scheduler = RxHelper.blockingScheduler(workerExecutor);
Observable<Long> timer = Observable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler);
Json 反序列化
FlowableHelper.unmarshaller
创建一个 io.reactivex.rxjava3.FlowableOperator
,将 JSON 格式的 Flowable<Buffer>
转换为对象 Flowable
fileSystem.open("/data.txt", new OpenOptions()).onComplete(result -> {
AsyncFile file = result.result();
Flowable<Buffer> observable = FlowableHelper.toFlowable(file);
observable.compose(FlowableHelper.unmarshaller(MyPojo.class)).subscribe(
mypojo -> {
// Process the object
}
);
});
使用 Rxified 辅助器也可以实现相同的操作
fileSystem
.rxOpen("/data.txt", new OpenOptions())
.flatMapObservable(file -> file.toObservable())
.compose(ObservableHelper.unmarshaller((MyPojo.class)))
.subscribe(mypojo -> {
// Process the object
});
部署 Verticle
要部署现有 Verticle 实例,您可以使用 RxHelper.deployVerticle
,它会部署一个 Verticle
并返回部署 ID 的 Single<String>
。
Single<String> deployment = RxHelper.deployVerticle(vertx, verticle);
deployment.subscribe(id -> {
// Deployed
}, err -> {
// Could not deploy
});
Rxified API
Rxified API 是 Vert.x API 的代码生成版本。该 API 使用 io.vertx.rxjava3
前缀,例如 io.vertx.core.Vertx
类被转换为 Vertx
类。
Rxified API 以两种方式公开 Vert.x 异步方法
-
将原始方法转换为 RxJava 等效方法,返回一个急切且缓存的订阅
-
一个带 rx 前缀的派生方法,在订阅时调用原始方法
// Immediate write
// no need to subscribe
// completion provides the asynchronous result
response.write(buffer);
// No write happened
completion = response.rxWrite(buffer);
// Perform an actual write
completion.subscribe(() -> ..., err -> ...);
您可以根据需要使用原始方法或 rxified 方法,例如当您不想订阅或不关心结果时,可以调用原始方法。
嵌入 Rxified Vert.x
只需使用 Vertx.vertx
方法
Vertx vertx = io.vertx.rxjava3.core.Vertx.vertx();
作为 Verticle
扩展 AbstractVerticle
类,它会为您进行包装
class MyVerticle extends AbstractVerticle {
public void start() {
// Use Rxified Vertx here
}
}
RxJava verticle 的部署仍然由 Java 部署器执行,不需要特定的部署器。
具有异步启动的 Verticle 可以重写 rxStart
方法并返回一个 Completable
class MyVerticle extends AbstractVerticle {
public Completable rxStart() {
return vertx.createHttpServer()
.requestHandler(req -> req.response().end("Hello World"))
.rxListen()
.ignoreElement();
}
}
API 示例
现在我们来学习一些 Vert.x 与 RxJava 结合使用的示例。
事件总线消息流
事件总线 MessageConsumer
自然地提供了一个 Observable<Message<T>>
EventBus eb = vertx.eventBus();
MessageConsumer<String> consumer = eb.<String>consumer("the-address");
Flowable<Message<String>> flowable = consumer.toFlowable();
Disposable sub = flowable.subscribe(msg -> {
// Got message
});
// Unregisters the stream after 10 seconds
vertx.setTimer(10000, id -> {
sub.dispose();
});
MessageConsumer
提供 Message
流。如果需要,body
可以访问新的消息体流
EventBus eb = vertx.eventBus();
MessageConsumer<String> consumer = eb.<String>consumer("the-address");
Flowable<String> flowable = consumer.bodyStream().toFlowable();
然后可以使用 RxJava 的 map/reduce 组合风格
Flowable<Double> flowable = vertx.eventBus().
<Double>consumer("heat-sensor").
bodyStream().
toFlowable();
flowable.
buffer(1, TimeUnit.SECONDS).
map(samples -> samples.
stream().
collect(Collectors.averagingDouble(d -> d))).
subscribe(heat -> {
vertx.eventBus().send("news-feed", "Current heat is " + heat);
});
HTTP 客户端请求
您可以轻松地使用 HTTP 客户端创建请求并处理响应
HttpClient client = vertx.createHttpClient();
client.rxRequest(HttpMethod.GET, 8080, "localhost", "/")
.flatMap(request -> request
.rxSend()
.flatMap(response -> {
if (response.statusCode() == 200) {
return response.body();
} else {
return Single.error(VertxException.noStackTrace("Invalid response"));
}
}))
.subscribe(body -> {
// Process the body
});
当您需要处理大型流式响应时,可以从 HTTP 响应中获取一个 Flowable<Buffer>
HttpClient client = vertx.createHttpClient();
client.rxRequest(HttpMethod.GET, 8080, "localhost", "/")
.flatMapPublisher(request -> request
.rxSend()
.flatMapPublisher(response -> {
if (response.statusCode() == 200) {
return response.toFlowable();
} else {
return Flowable.error(VertxException.noStackTrace("Invalid response"));
}
}))
.subscribe(chunk -> {
// Process the response chunks
});
您也可以使用 Vert.x Web 客户端。
HTTP 服务器请求
HttpServerRequest
可以适配为 Observable<Buffer>
Observable<Buffer> observable = request.toObservable();
ObservableHelper.unmarshaller
可用于解析 JSON 请求并将其映射到对象
Flowable<MyPojo> flowable = request.
toFlowable().
compose(FlowableHelper.unmarshaller(MyPojo.class));
WebSocket 客户端
connect
在 WebSocket 连接成功时提供单个回调,否则提供失败回调
WebSocketClient client = vertx.createWebSocketClient(new WebSocketClientOptions());
client.rxConnect(8080, "localhost", "/the_uri").subscribe(
ws -> {
// Use the websocket
},
error -> {
// Could not connect
}
);
然后 WebSocket
可以轻松地转换为 Observable<Buffer>
socketObservable.subscribe(
socket -> {
Flowable<Buffer> dataObs = socket.toFlowable();
dataObs.subscribe(buffer -> {
System.out.println("Got message " + buffer.toString("UTF-8"));
});
}
);
WebSocket 服务器
ServerWebSocket
可以轻松地转换为 Observable<Buffer>
socketObservable.subscribe(
socket -> {
Flowable<Buffer> dataObs = socket.toFlowable();
dataObs.subscribe(buffer -> {
System.out.println("Got message " + buffer.toString("UTF-8"));
});
}
);