Vert.x RxJava

适用于 RxJava3 的 Vert.x API

RxJava 是一个流行的库,用于使用 Java VM 的可观察序列来组合异步和基于事件的程序。

Vert.x 与 RxJava 自然集成,允许您在任何可以使用流或异步结果的地方使用 RxJava。

使用适用于 RxJava3 的 Vert.x API

要使用适用于 RxJava3 的 Vert.x API,请将以下依赖项添加到构建描述符的 dependencies 部分

  • Maven(在您的 pom.xml 中)

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-rx-java3</artifactId>
  <version>5.0.1</version>
</dependency>
  • Gradle(在您的 build.gradle 文件中)

compile 'io.vertx:vertx-rx-java3:5.0.1'

有两种方法可以在 Vert.x 中使用 RxJava 3 API

读取流支持

RxJava 的 Flowable 与 Vert.x 的 ReadStream 类完美匹配:两者都提供项目流。

FlowableHelper.toFlowable 静态方法将 Vert.x 读取流转换为 Flowable

FileSystem fileSystem = vertx.fileSystem();
fileSystem.open("/data.txt", new OpenOptions()).onComplete(result -> {
  AsyncFile file = result.result();
  Flowable<Buffer> observable = FlowableHelper.toFlowable(file);
  observable.forEach(data -> System.out.println("Read data: " + data.toString("UTF-8")));
});

Rxified Vert.x API 在 ReadStream 上提供了一个 toFlowable 方法

FileSystem fs = vertx.fileSystem();
fs.rxOpen("/data.txt", new OpenOptions())
  .flatMapPublisher(file -> file.toFlowable())
  .subscribe(data -> System.out.println("Read data: " + data.toString("UTF-8")));

此类 flowable 是 flowable,即它们会产生通知而不管订阅如何,因为 ReadStream 可能会根据实现情况自发地发出项目或不发出。

在订阅时,适配器调用 handler 来设置其自身的处理器。

一些 ReadStream 实现在此调用后可以开始发出事件,其他一些无论是否设置了处理器都会发出事件

  • AsyncFile 在设置处理器后产生缓冲区事件

  • HttpServerRequest 独立于处理器产生事件(即如果未设置处理器,则缓冲区可能会丢失)

在这两种情况下,在同一调用中订阅 Flowable 是安全的,因为事件循环或工作 verticle 不能并发调用,所以订阅总是在处理器开始发出数据之前发生。

当您需要延迟订阅时,您需要 pause (暂停) ReadStream,然后 resume (恢复) 它,这与您使用 ReadStream 的操作相同。

server.requestHandler(request -> {
  if (request.method() == HttpMethod.POST) {

    // Stop receiving buffers
    request.pause();

    checkAuth(res -> {

      // Now we can receive buffers again
      request.resume();

      if (res.succeeded()) {
        Flowable<Buffer> flowable = request.toFlowable();
        flowable.subscribe(buff -> {
          // Get buffers
        });
      }
    });
  }
});

同样,可以将现有 Flowable 转换为 Vert.x ReadStream

FlowableHelper.toReadStream 静态方法将 Flowable 转换为 Vert.x 读取流

Flowable<Buffer> observable = getFlowable();
ReadStream<Buffer> readStream = FlowableHelper.toReadStream(observable);
readStream.pipeTo(response);

写入流支持

WriteStream,就像 org.reactivestreams.Subscriber 一样,消费项目,当它无法跟上时,会与生产者协作以避免积压不断增长。

Vert.x 提供了 WriteStreamSubscriber 适配器,您可以使用它将 Flowable 项目发送到任何 WriteStream

将缓冲区发送到 HTTP 服务器响应
response.setChunked(true);
WriteStreamSubscriber<io.vertx.core.buffer.Buffer> subscriber = io.vertx.rxjava3.RxHelper.toSubscriber(response);
flowable.subscribe(subscriber);
还有一个 io.vertx.rxjava3.WriteStreamObserver 适配器,适用于非背压的 io.reactivex.Observable。不同之处在于,即使 WriteStream 无法跟上生产者速率,此适配器也会将项目发送到其中。

如果您正在使用 Rxified Vert.x API 编程,WriteStream 实现提供了 toSubscriber 方法。这样,前面的示例将变得更加简单

response.setChunked(true);
flowable.subscribe(response.toSubscriber());
Flowable 成功终止时,适配器会调用 end 方法。
适配器会设置 WriteStreamdrainexception 处理器,因此请勿在订阅后使用它们。

当以下情况发生时,WriteStreamSubscriber 适配器能够调用回调函数:

  • Flowable 因错误终止,或者

  • WriteStream 失败(例如 HTTP 连接关闭或文件系统已满),或者

  • WriteStream 结束(即所有写入完成且文件已关闭),或者

  • WriteStream 以错误结束(即所有写入完成并且在关闭文件时发生错误)

这允许更健壮的程序设计,以及在处理完流之后调度其他任务

response.setChunked(true);

WriteStreamSubscriber<Buffer> subscriber = response.toSubscriber();

subscriber.onError(throwable -> {
  if (!response.headWritten() && response.closed()) {
    response.setStatusCode(500).end("oops");
  } else {
    // log error
  }
});

subscriber.onWriteStreamError(throwable -> {
  // log error
});

subscriber.onWriteStreamEnd(() -> {
  // log end of transaction to audit system...
});

flowable.subscribe(subscriber);
如果 WriteStream 失败,适配器会取消 org.reactivestreams.Subscription

异步结果支持

您可以从现有 Vert.x Handler<AsyncResult<T>> 创建一个 RxJava Observer 并订阅它

Handler<AsyncResult<String>> handler = getHandler();

// Subscribe to a Single
Single.just("hello").subscribe(SingleHelper.toObserver(handler));
Handler<AsyncResult<String>> handler = getHandler();

// Subscribe to a Single
Maybe.just("hello").subscribe(MaybeHelper.toObserver(handler));
Handler<AsyncResult<Void>> handler = getHandler();

// Subscribe to a Single
Completable.complete().subscribe(CompletableHelper.toObserver(handler));

Rxified Vert.x API 将每个此类方法复制为带 rx 前缀的方法,返回 RxJava 的 SingleMaybeCompletable

Single<HttpServer> single = vertx
  .createHttpServer()
  .rxListen(1234, "localhost");

// Subscribe to bind the server
single.
    subscribe(
        server -> {
          // Server is listening
        },
        failure -> {
          // Server could not start
        }
    );

此类 Single 是 Single,并且相应的 API 方法在订阅时被调用。

Maybe 可以产生结果或不产生结果

DnsClient client = vertx.createDnsClient(dnsPort, dnsHost);

// Obtain a maybe that performs the actual reverse lookup on subscribe
Maybe<String> maybe = client.rxReverseLookup(ipAddress);

// Subscribe to perform the lookup
maybe.
  subscribe(
    name -> {
      // Lookup produced a result
    },
    failure -> {
      // Lookup failed
    },
    () -> {
      // Lookup produced no result
    }
  );

Completable 通常映射到 Handler<AsyncResult<Void>>

Completable single = server.rxClose();

// Subscribe to close the server
single.
  subscribe(
    () -> {
      // Server is closed
    },
    failure -> {
      // Server closed but encountered issue
    }
  );

如果您不能使用 Vert.x Rxified API,或者如果您有自己的基于回调的异步方法,Vert.x 提供了适配器

适配 Vert.x 核心的 executeBlocking 方法
Maybe<String> maybe = MaybeHelper.toMaybe(handler -> {
  vertx.executeBlocking(() -> invokeBlocking()).onComplete(handler);
});

调度器支持

反应式扩展有时需要调度操作,例如 Flowable#timer 创建并返回一个计时器,该计时器发出周期性事件。默认情况下,调度操作由 RxJava 管理,这意味着计时器线程不是 Vert.x 线程,因此不会在 Vert.x 事件循环或 Vert.x 工作线程上执行。

当 RxJava 方法处理调度器时,它接受一个重载方法,该方法接受一个额外的 io.reactivex.SchedulerRxHelper.scheduler 方法将返回一个可用于此类情况的调度器。

Scheduler scheduler = RxHelper.scheduler(vertx);
Observable<Long> timer = Observable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler);

对于阻塞的调度操作,可以使用 RxHelper.blockingScheduler 方法创建一个调度器

Scheduler scheduler = RxHelper.blockingScheduler(vertx);
Observable<Long> timer = Observable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler);

RxJava 也可以重新配置为使用 Vert.x 调度器

RxJavaPlugins.setComputationSchedulerHandler(s -> RxHelper.scheduler(vertx));
RxJavaPlugins.setIoSchedulerHandler(s -> RxHelper.blockingScheduler(vertx));
RxJavaPlugins.setNewThreadSchedulerHandler(s -> RxHelper.scheduler(vertx));
RxJava 使用 computation 来表示非阻塞任务,使用 io 来表示阻塞任务,这与 Vert.x 的术语相反

Rxified Vert.x API 也在 RxHelper 类上提供了类似的方法

Scheduler scheduler = RxHelper.scheduler(vertx);
Observable<Long> timer = Observable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler);
RxJavaPlugins.setComputationSchedulerHandler(s -> RxHelper.scheduler(vertx));
RxJavaPlugins.setIoSchedulerHandler(s -> RxHelper.blockingScheduler(vertx));
RxJavaPlugins.setNewThreadSchedulerHandler(s -> RxHelper.scheduler(vertx));

还可以创建一个由命名工作池支持的调度器。如果您希望重用特定线程池来调度阻塞操作,这会很有用

Scheduler scheduler = RxHelper.blockingScheduler(workerExecutor);
Observable<Long> timer = Observable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler);

Json 反序列化

FlowableHelper.unmarshaller 创建一个 io.reactivex.rxjava3.FlowableOperator,将 JSON 格式的 Flowable<Buffer> 转换为对象 Flowable

fileSystem.open("/data.txt", new OpenOptions()).onComplete(result -> {
  AsyncFile file = result.result();
  Flowable<Buffer> observable = FlowableHelper.toFlowable(file);
  observable.compose(FlowableHelper.unmarshaller(MyPojo.class)).subscribe(
      mypojo -> {
        // Process the object
      }
  );
});

使用 Rxified 辅助器也可以实现相同的操作

fileSystem
  .rxOpen("/data.txt", new OpenOptions())
  .flatMapObservable(file -> file.toObservable())
  .compose(ObservableHelper.unmarshaller((MyPojo.class)))
  .subscribe(mypojo -> {
    // Process the object
  });

部署 Verticle

要部署现有 Verticle 实例,您可以使用 RxHelper.deployVerticle,它会部署一个 Verticle 并返回部署 ID 的 Single<String>

Single<String> deployment = RxHelper.deployVerticle(vertx, verticle);

deployment.subscribe(id -> {
  // Deployed
}, err -> {
  // Could not deploy
});

Rxified API

Rxified API 是 Vert.x API 的代码生成版本。该 API 使用 io.vertx.rxjava3 前缀,例如 io.vertx.core.Vertx 类被转换为 Vertx 类。

Rxified API 以两种方式公开 Vert.x 异步方法

  • 将原始方法转换为 RxJava 等效方法,返回一个急切且缓存的订阅

  • 一个带 rx 前缀的派生方法,在订阅时调用原始方法

// Immediate write
// no need to subscribe
// completion provides the asynchronous result
response.write(buffer);

// No write happened
completion = response.rxWrite(buffer);

// Perform an actual write
completion.subscribe(() -> ..., err -> ...);

您可以根据需要使用原始方法或 rxified 方法,例如当您不想订阅或不关心结果时,可以调用原始方法。

嵌入 Rxified Vert.x

只需使用 Vertx.vertx 方法

Vertx vertx = io.vertx.rxjava3.core.Vertx.vertx();

作为 Verticle

扩展 AbstractVerticle 类,它会为您进行包装

class MyVerticle extends AbstractVerticle {
  public void start() {
    // Use Rxified Vertx here
  }
}

RxJava verticle 的部署仍然由 Java 部署器执行,不需要特定的部署器。

具有异步启动的 Verticle 可以重写 rxStart 方法并返回一个 Completable

class MyVerticle extends AbstractVerticle {
  public Completable rxStart() {
    return vertx.createHttpServer()
      .requestHandler(req -> req.response().end("Hello World"))
      .rxListen()
      .ignoreElement();
  }
}

API 示例

现在我们来学习一些 Vert.x 与 RxJava 结合使用的示例。

事件总线消息流

事件总线 MessageConsumer 自然地提供了一个 Observable<Message<T>>

EventBus eb = vertx.eventBus();
MessageConsumer<String> consumer = eb.<String>consumer("the-address");
Flowable<Message<String>> flowable = consumer.toFlowable();
Disposable sub = flowable.subscribe(msg -> {
  // Got message
});

// Unregisters the stream after 10 seconds
vertx.setTimer(10000, id -> {
  sub.dispose();
});

MessageConsumer 提供 Message 流。如果需要,body 可以访问新的消息体流

EventBus eb = vertx.eventBus();
MessageConsumer<String> consumer = eb.<String>consumer("the-address");
Flowable<String> flowable = consumer.bodyStream().toFlowable();

然后可以使用 RxJava 的 map/reduce 组合风格

Flowable<Double> flowable = vertx.eventBus().
    <Double>consumer("heat-sensor").
    bodyStream().
    toFlowable();

flowable.
    buffer(1, TimeUnit.SECONDS).
    map(samples -> samples.
        stream().
        collect(Collectors.averagingDouble(d -> d))).
    subscribe(heat -> {
      vertx.eventBus().send("news-feed", "Current heat is " + heat);
    });

HTTP 客户端请求

您可以轻松地使用 HTTP 客户端创建请求并处理响应

HttpClient client = vertx.createHttpClient();
client.rxRequest(HttpMethod.GET, 8080, "localhost", "/")
  .flatMap(request -> request
    .rxSend()
    .flatMap(response -> {
      if (response.statusCode() == 200) {
        return response.body();
      } else {
        return Single.error(VertxException.noStackTrace("Invalid response"));
      }
    }))
  .subscribe(body -> {
    // Process the body
  });

当您需要处理大型流式响应时,可以从 HTTP 响应中获取一个 Flowable<Buffer>

HttpClient client = vertx.createHttpClient();
client.rxRequest(HttpMethod.GET, 8080, "localhost", "/")
  .flatMapPublisher(request -> request
    .rxSend()
    .flatMapPublisher(response -> {
      if (response.statusCode() == 200) {
        return response.toFlowable();
      } else {
        return Flowable.error(VertxException.noStackTrace("Invalid response"));
      }
    }))
  .subscribe(chunk -> {
    // Process the response chunks
  });

您也可以使用 Vert.x Web 客户端

HTTP 服务器请求

HttpServerRequest 可以适配为 Observable<Buffer>

Observable<Buffer> observable = request.toObservable();

ObservableHelper.unmarshaller 可用于解析 JSON 请求并将其映射到对象

Flowable<MyPojo> flowable = request.
  toFlowable().
  compose(FlowableHelper.unmarshaller(MyPojo.class));

WebSocket 客户端

connect 在 WebSocket 连接成功时提供单个回调,否则提供失败回调

WebSocketClient client = vertx.createWebSocketClient(new WebSocketClientOptions());
client.rxConnect(8080, "localhost", "/the_uri").subscribe(
    ws -> {
      // Use the websocket
    },
    error -> {
      // Could not connect
    }
);

然后 WebSocket 可以轻松地转换为 Observable<Buffer>

socketObservable.subscribe(
    socket -> {
      Flowable<Buffer> dataObs = socket.toFlowable();
      dataObs.subscribe(buffer -> {
        System.out.println("Got message " + buffer.toString("UTF-8"));
      });
    }
);

WebSocket 服务器

ServerWebSocket 可以轻松地转换为 Observable<Buffer>

socketObservable.subscribe(
    socket -> {
      Flowable<Buffer> dataObs = socket.toFlowable();
      dataObs.subscribe(buffer -> {
        System.out.println("Got message " + buffer.toString("UTF-8"));
      });
    }
);