Vert.x RxJava

RxJava2 已于 2021 年 2 月 28 日终止生命周期。新的 Vert.x 模块将不再生成绑定。请考虑迁移到 Vert.x RxJava3

Vert.x RxJava2 API

RxJava 是一个流行的库,用于使用 Java VM 的可观测序列编写异步和基于事件的程序。

Vert.x 自然地与 RxJava 集成,允许在任何可以使用流或异步结果的地方使用 RxJava。

使用 Vert.x RxJava2 API

要使用 Vert.x RxJava2 API,请将以下依赖添加到您的构建描述符的 dependencies 部分

  • Maven(在您的 pom.xml 中)

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-rx-java2</artifactId>
  <version>5.0.1</version>
</dependency>
  • Gradle(在您的 build.gradle 文件中)

compile 'io.vertx:vertx-rx-java2:5.0.1'

使用 Vert.x 的 RxJava 2 API 有两种方式

读取流支持

RxJava 的 Flowable 与 Vert.x 的 ReadStream 类完美匹配:两者都提供项目流。

FlowableHelper.toFlowable 静态方法将 Vert.x 读取流转换为 Flowable

FileSystem fileSystem = vertx.fileSystem();
fileSystem.open("/data.txt", new OpenOptions()).onComplete(result -> {
  AsyncFile file = result.result();
  Flowable<Buffer> observable = FlowableHelper.toFlowable(file);
  observable.forEach(data -> System.out.println("Read data: " + data.toString("UTF-8")));
});

Rxified Vert.x API 在 ReadStream 上提供了一个 toFlowable 方法。

FileSystem fs = vertx.fileSystem();
fs.open("/data.txt", new OpenOptions()).onComplete( result -> {
  AsyncFile file = result.result();
  Flowable<Buffer> observable = file.toFlowable();
  observable.forEach(data -> System.out.println("Read data: " + data.toString("UTF-8")));
});

此类 Flowable 是 Flowable,即无论是否订阅,它们都会产生通知,因为 ReadStream 可能会根据实现情况自发地或不自发地发出项目。

在订阅时,适配器调用 handler 来设置其自己的处理程序。

一些 ReadStream 实现可能会在此调用后开始发出事件,而另一些则无论是否设置了处理程序都会发出事件。

  • AsyncFile 在设置处理程序后生成缓冲区事件

  • HttpServerRequest 独立于处理程序生成事件(即,如果未设置处理程序,缓冲区可能会丢失)

在这两种情况下,在同一调用中订阅 Flowable 是安全的,因为事件循环或工作 Verticle 不能并发调用,因此订阅总会在处理程序开始发出数据之前发生。

当您需要延迟订阅时,您需要暂停 ReadStream,然后恢复它,这与您使用 ReadStream 的做法相同。

server.requestHandler(request -> {
  if (request.method() == HttpMethod.POST) {

    // Stop receiving buffers
    request.pause();

    checkAuth(res -> {

      // Now we can receive buffers again
      request.resume();

      if (res.succeeded()) {
        Flowable<Buffer> flowable = request.toFlowable();
        flowable.subscribe(buff -> {
          // Get buffers
        });
      }
    });
  }
});

同样,可以将现有的 Flowable 转换为 Vert.x ReadStream

FlowableHelper.toReadStream 静态方法将 Flowable 转换为 Vert.x 读取流。

Flowable<Buffer> observable = getFlowable();
ReadStream<Buffer> readStream = FlowableHelper.toReadStream(observable);
readStream.pipeTo(response);

写入流支持

一个 WriteStream,就像 org.reactivestreams.Subscriber 一样,消费项目,当它无法跟上时,会与生产者协作以避免不断增长的积压。

Vert.x 提供了 WriteStreamSubscriber 适配器,您可以使用它将 Flowable 项目发送到任何 WriteStream

将缓冲区发送到 HTTP 服务器响应
response.setChunked(true);
WriteStreamSubscriber<io.vertx.core.buffer.Buffer> subscriber = io.vertx.reactivex.RxHelper.toSubscriber(response);
flowable.subscribe(subscriber);
还有一个 io.vertx.reactivex.WriteStreamObserver 适配器,用于非背压的 io.reactivex.Observable。不同之处在于,即使 WriteStream 无法跟上生产者的速率,此适配器也会将项目发送到它。

如果您正在使用 Rxified Vert.x API 进行编程,则 WriteStream 实现提供了一个 toSubscriber 方法。这样,前面的示例会变得更加直接。

response.setChunked(true);
flowable.subscribe(response.toSubscriber());
Flowable 成功终止时,适配器调用 end 方法。
适配器设置 WriteStreamdrainexception 处理程序,因此在订阅后不要使用它们。

WriteStreamSubscriber 适配器能够在以下情况时调用回调:

  • Flowable 发生错误终止,或者

  • WriteStream 失败(例如,HTTP 连接关闭或文件系统已满),或者

  • WriteStream 结束(即所有写入完成且文件已关闭),或者

  • WriteStream 以错误结束(即所有写入完成并在关闭文件时发生错误)

这允许更健壮的程序设计,以及在流处理完毕后安排其他任务。

response.setChunked(true);

WriteStreamSubscriber<Buffer> subscriber = response.toSubscriber();

subscriber.onError(throwable -> {
  if (!response.headWritten() && response.closed()) {
    response.setStatusCode(500).end("oops");
  } else {
    // log error
  }
});

subscriber.onWriteStreamError(throwable -> {
  // log error
});

subscriber.onWriteStreamEnd(() -> {
  // log end of transaction to audit system...
});

flowable.subscribe(subscriber);
如果 WriteStream 失败,适配器将取消 org.reactivestreams.Subscription

异步结果支持

您可以从现有的 Vert.x Handler<AsyncResult<T>> 创建一个 RxJava Observer 并订阅它。

Handler<AsyncResult<String>> handler = getHandler();

// Subscribe to a Single
Single.just("hello").subscribe(SingleHelper.toObserver(handler));
Handler<AsyncResult<String>> handler = getHandler();

// Subscribe to a Single
Maybe.just("hello").subscribe(MaybeHelper.toObserver(handler));
Handler<AsyncResult<Void>> handler = getHandler();

// Subscribe to a Single
Completable.complete().subscribe(CompletableHelper.toObserver(handler));

Rxified Vert.x API 将每个此类方法复制并加上 rx 前缀,返回一个 RxJava SingleMaybeCompletable

Single<HttpServer> single = vertx
  .createHttpServer()
  .rxListen(1234, "localhost");

// Subscribe to bind the server
single.
    subscribe(
        server -> {
          // Server is listening
        },
        failure -> {
          // Server could not start
        }
    );

此类 Single 是 Single,并且在订阅时调用相应的 API 方法。

Maybe 可以产生结果或不产生结果。

DnsClient client = vertx.createDnsClient(dnsPort, dnsHost);

// Obtain a maybe that performs the actual reverse lookup on subscribe
Maybe<String> maybe = client.rxReverseLookup(ipAddress);

// Subscribe to perform the lookup
maybe.
  subscribe(
    name -> {
      // Lookup produced a result
    },
    failure -> {
      // Lookup failed
    },
    () -> {
      // Lookup produced no result
    }
  );

Completable 通常映射到 Handler<AsyncResult<Void>>

Completable single = server.rxClose();

// Subscribe to bind the server
single.
  subscribe(
    () -> {
      // Server is closed
    },
    failure -> {
      // Server closed but encoutered issue
    }
  );

如果您无法使用 Vert.x Rxified API,或者您有自己的基于回调的异步方法,Vert.x 提供了适配器。

适配 Vert.x 核心 executeBlocking 方法
Maybe<String> maybe = MaybeHelper.toMaybe(handler -> {
  vertx.executeBlocking(() -> invokeBlocking()).onComplete(handler);
});

调度器支持

响应式扩展有时需要调度操作,例如 Flowable#timer 创建并返回一个定时器,该定时器发出周期性事件。默认情况下,调度操作由 RxJava 管理,这意味着定时器线程不是 Vert.x 线程,因此不在 Vert.x 事件循环或 Vert.x 工作线程上执行。

当 RxJava 方法处理调度器时,它接受一个重载方法,该方法接受一个额外的 io.reactivex.SchedulerRxHelper.scheduler 方法将返回一个可以在此类地方使用的调度器。

Scheduler scheduler = RxHelper.scheduler(vertx);
Observable<Long> timer = Observable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler);

对于阻塞调度操作,可以使用 RxHelper.blockingScheduler 方法创建一个调度器。

Scheduler scheduler = RxHelper.blockingScheduler(vertx);
Observable<Long> timer = Observable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler);

RxJava 也可以重新配置为使用 Vert.x 调度器。

RxJavaPlugins.setComputationSchedulerHandler(s -> RxHelper.scheduler(vertx));
RxJavaPlugins.setIoSchedulerHandler(s -> RxHelper.blockingScheduler(vertx));
RxJavaPlugins.setNewThreadSchedulerHandler(s -> RxHelper.scheduler(vertx));
RxJava 使用 computation 表示非阻塞任务,使用 io 表示阻塞任务,这与 Vert.x 的术语相反。

Rxified Vert.x API 也在 RxHelper 类上提供了类似的方法。

Scheduler scheduler = RxHelper.scheduler(vertx);
Observable<Long> timer = Observable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler);
RxJavaPlugins.setComputationSchedulerHandler(s -> RxHelper.scheduler(vertx));
RxJavaPlugins.setIoSchedulerHandler(s -> RxHelper.blockingScheduler(vertx));
RxJavaPlugins.setNewThreadSchedulerHandler(s -> RxHelper.scheduler(vertx));

也可以创建一个由命名工作池支持的调度器。如果您希望重用特定线程池来调度阻塞操作,这会很有用。

Scheduler scheduler = RxHelper.blockingScheduler(workerExecutor);
Observable<Long> timer = Observable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler);

Json 反序列化

FlowableHelper.unmarshaller 创建一个 io.reactivex.rxjava2.FlowableOperator,它将 JSON 格式的 Flowable<Buffer> 转换为对象 Flowable。

fileSystem.open("/data.txt", new OpenOptions()).onComplete(result -> {
  AsyncFile file = result.result();
  Flowable<Buffer> observable = FlowableHelper.toFlowable(file);
  observable.compose(FlowableHelper.unmarshaller(MyPojo.class)).subscribe(
      mypojo -> {
        // Process the object
      }
  );
});

同样的操作也可以通过 Rxified 助手完成。

fileSystem.open("/data.txt", new OpenOptions()).onComplete(result -> {
  AsyncFile file = result.result();
  Observable<Buffer> observable = file.toObservable();
  observable.compose(ObservableHelper.unmarshaller((MyPojo.class))).subscribe(
    mypojo -> {
      // Process the object
    }
  );
});

部署 Verticle

要部署现有的 Verticle 实例,您可以使用 RxHelper.deployVerticle,它部署一个 Verticle 并返回部署 ID 的 Single<String>

Single<String> deployment = RxHelper.deployVerticle(vertx, verticle);

deployment.subscribe(id -> {
  // Deployed
}, err -> {
  // Could not deploy
});

Rxified API

Rxified API 是 Vert.x API 的代码生成版本,就像 JavaScriptGroovy 语言一样。该 API 使用 io.vertx.rxjava 前缀,例如 io.vertx.core.Vertx 类被转换为 Vertx 类。

嵌入 Rxified Vert.x

只需使用 Vertx.vertx 方法。

Vertx vertx = io.vertx.reactivex.core.Vertx.vertx();

作为 Verticle

扩展 AbstractVerticle 类,它会为您进行包装。

class MyVerticle extends io.vertx.reactivex.core.AbstractVerticle {
  public void start() {
    // Use Rxified Vertx here
  }
}

RxJava Verticle 的部署仍由 Java 部署器执行,不需要特定的部署器。

具有异步启动的 Verticle 可以重写 rxStart 方法并返回一个 Completable

class MyVerticle extends io.vertx.reactivex.core.AbstractVerticle {
  public Completable rxStart() {
    return vertx.createHttpServer()
      .requestHandler(req -> req.response().end("Hello World"))
      .rxListen()
      .toCompletable();
  }
}

API 示例

现在让我们学习一些 Vert.x 与 RxJava 结合使用的示例。

EventBus 消息流

事件总线 MessageConsumer 自然地提供了一个 Observable<Message<T>>

EventBus eb = vertx.eventBus();
MessageConsumer<String> consumer = eb.<String>consumer("the-address");
Observable<Message<String>> observable = consumer.toObservable();
Disposable sub = observable.subscribe(msg -> {
  // Got message
});

// Unregisters the stream after 10 seconds
vertx.setTimer(10000, id -> {
  sub.dispose();
});

MessageConsumer 提供 Message 流。如果需要,body 提供对新的消息体流的访问。

EventBus eb = vertx.eventBus();
MessageConsumer<String> consumer = eb.<String>consumer("the-address");
Observable<String> observable = consumer.bodyStream().toObservable();

然后可以使用 RxJava 的 map/reduce 组合风格。

Observable<Double> observable = vertx.eventBus().
    <Double>consumer("heat-sensor").
    bodyStream().
    toObservable();

observable.
    buffer(1, TimeUnit.SECONDS).
    map(samples -> samples.
        stream().
        collect(Collectors.averagingDouble(d -> d))).
    subscribe(heat -> {
      vertx.eventBus().send("news-feed", "Current heat is " + heat);
    });

HTTP 客户端请求

我们建议将 Vert.x Web Client 与 RxJava 结合使用。

HTTP 服务器请求

HttpServerRequest 可以随后适配为 Observable<Buffer>

Observable<Buffer> observable = request.toObservable();

ObservableHelper.unmarshaller 可用于解析 JSON 请求并将其映射到对象。

Observable<MyPojo> observable = request.
  toObservable().
  compose(io.vertx.reactivex.core.ObservableHelper.unmarshaller(MyPojo.class));

WebSocket 客户端

rxConnect 在 WebSocket 连接时提供一个单一回调,否则提供一个失败。

WebSocketClient client = vertx.createWebSocketClient(new WebSocketClientOptions());
client.rxConnect(8080, "localhost", "/the_uri").subscribe(
    ws -> {
      // Use the websocket
    },
    error -> {
      // Could not connect
    }
);

WebSocket 可以很容易地转换为 Observable<Buffer>

socketObservable.subscribe(
    socket -> {
      Flowable<Buffer> dataObs = socket.toFlowable();
      dataObs.subscribe(buffer -> {
        System.out.println("Got message " + buffer.toString("UTF-8"));
      });
    }
);

WebSocket 服务器

一个 ServerWebSocket 可以很容易地转换为 Observable<Buffer>

socketObservable.subscribe(
    socket -> {
      Observable<Buffer> dataObs = socket.toObservable();
      dataObs.subscribe(buffer -> {
        System.out.println("Got message " + buffer.toString("UTF-8"));
      });
    }
);