vertx-lang-kotlin-coroutines

`vertx-lang-kotlin-coroutines` 集成了 Kotlin *协程*,用于执行异步操作和处理事件。这使得编程模型看起来像顺序代码,但它不会阻塞内核线程。

简介

Vert.x 相对于许多传统应用平台的一个主要优势是,它几乎完全是非阻塞的(不阻塞内核线程)。这使得基于 Vert.x 的应用程序能够使用极少数内核线程处理大量并发(例如,大量连接和消息),从而实现出色的可伸缩性。

Vert.x 的非阻塞特性导致了异步 API。异步 API 可以采用多种形式,包括回调、Promise、纤程或响应式扩展。Vert.x 的核心 API 使用回调风格,但它也支持其他模型,如 RxJava。

在某些情况下,使用异步 API 进行编程可能比使用经典/顺序代码风格更具挑战性,尤其是在需要按顺序完成多个操作时。此外,在使用异步 API 时,错误传播通常更为复杂。

`vertx-lang-kotlin-coroutines` 使用*协程*。协程是非常轻量级的线程,它们不对应底层的内核线程,因此当一个*协程*需要“阻塞”时,它会*挂起*并释放其当前的内核线程,以便另一个协程可以处理事件。

`vertx-lang-kotlin-coroutines` 使用 `kotlinx.coroutines` 来实现协程。

从 Vert.x 上下文运行协程

导入 `io.vertx.kotlin.coroutines.VertxCoroutine` 后,`GlobalScope.launch` 方法允许将一段代码作为协程在“全局”应用程序范围(受限于应用程序的生命周期)中运行。

val vertx = Vertx.vertx()

GlobalScope.launch(vertx.dispatcher()) {
  val timerId = awaitEvent<Long> { handler ->
    vertx.setTimer(1000, handler)
  }
  println("Event fired from timer with id $timerId")
}

`vertx.dispatcher()` 返回一个协程调度器,该调度器使用 Vert.x 事件循环执行协程。

`awaitEvent` 函数会挂起协程的执行,直到计时器触发,然后用传递给处理程序的值恢复协程。

关于处理程序、事件和事件流的更多详细信息将在接下来的章节中给出。

扩展 CoroutineVerticle

您可以将代码部署为 `io.vertx.kotlin.coroutines.CoroutineVerticle` 的实例,这是一种专为 Kotlin 协程设计的 verticle 类型。`CoroutineVerticle` 类实现了 `kotlinx.coroutines.CoroutineScope` 接口,使得所有协程构建器方法默认绑定到 verticle 上下文。您应该重写 verticle 的挂起 `start()` 方法和(可选的)挂起 `stop()` 方法。

class MyVerticle : CoroutineVerticle() {
  override suspend fun start() {
    // ...
  }

  override suspend fun stop() {
    // ...
  }
}

下面的所有代码示例都假定在 `CoroutineVerticle` 实现中运行,但您可以将所有 ` { .. }` 调用替换为 `GlobalScope. { .. }`,以改用应用程序范围。

获取一次性异步结果

Vert.x 中的许多异步操作都将 `Handler>` 作为最后一个参数。例如,使用 Vert.x Mongo 客户端执行对象检索,或者发送事件总线消息然后等待回复。

这可以通过使用 `awaitResult` 方法来实现,该方法返回结果值或抛出异常。

协程在事件处理完成之前被挂起,并且没有内核线程被阻塞。

该方法通过指定需要执行的异步操作来执行,该操作以在运行时传递给处理程序的代码块的形式给出。

这是一个示例

suspend fun awaitResultExample() {
  val consumer = vertx.eventBus().localConsumer<String>("a.b.c")
  consumer.handler { message ->
    println("Consumer received: ${message.body()}")
    message.reply("pong")
  }

  // Send a message and wait for a reply
  val reply = awaitResult<Message<String>> { h ->
    vertx.eventBus().request<String>("a.b.c", "ping").onComplete(h)
  }
  println("Reply received: ${reply.body()}")
}

当代码块产生失败时,调用者可以使用常见的异常 `try`/`catch` 结构将其作为异常处理。

suspend fun awaitResultFailureExample() {
  val consumer = vertx.eventBus().localConsumer<String>("a.b.c")
  consumer.handler { message ->
    // The consumer will get a failure
    message.fail(0, "it failed!!!")
  }

  // Send a message and wait for a reply
  try {
    awaitResult<Message<String>> { h ->
      vertx.eventBus().request<String>("a.b.c", "ping").onComplete(h)
    }
  } catch (e: ReplyException) {
    // Handle specific reply exception here
    println("Reply failure: ${e.message}")
  }
}

获取一次性事件

处理一次性事件(以及不处理后续发生的事件,如果有的话)是使用 `awaitEvent` 函数实现的。

suspend fun awaitEventExample() {
  val id = awaitEvent<Long> { h -> vertx.setTimer(2000L, h) }
  println("This should be fired in 2s by some time with id=$id")
}

获取一次性 worker 结果

处理阻塞计算是使用 `awaitBlocking` 函数实现的。

suspend fun awaitBlockingExample() {
  awaitBlocking {
    Thread.sleep(1000)
    "some-string"
  }
}

事件流

在 Vert.x API 的许多地方,事件流通过处理程序进行处理。示例包括事件总线消息消费者和 HTTP 服务器请求。

`ReceiveChannelHandler` 类允许通过(可挂起的)`receive` 方法接收事件。

suspend fun streamExample() {
  val adapter = vertx.receiveChannelHandler<Message<Int>>()
  vertx.eventBus().localConsumer<Int>("a.b.c").handler(adapter)

  // Send 15 messages
  for (i in 0..15) vertx.eventBus().send("a.b.c", i)

  // Receive the first 10 messages
  for (i in 0..10) {
    val message = adapter.receive()
    println("Received: ${message.body()}")
  }
}

等待 Vert.x 异步结果完成

Vert.x 4 提供了 Future,并且 `Future` 有一个 `coAwait()` 挂起方法,用于锁定异步结果。

Vert.x `Future` 实例上的 `coAwait` 扩展方法会挂起协程,直到它们完成,在这种情况下,该方法将返回对应 `AsyncResult` 对象的值或抛出其失败。

suspend fun awaitingFuture(anotherFuture: Future<String>) {
  // Getting a future
  val httpServerFuture = vertx.createHttpServer()
    .requestHandler { req -> req.response().end("Hello!") }
    .listen(8000)

  val httpServer = httpServerFuture.coAwait()
  println("HTTP server port: ${httpServer.actualPort()}")

  // It also works for composite futures
  val result = Future.all(httpServerFuture, anotherFuture).coAwait()
  if (result.succeeded()) {
    println("The server is now running!")
  } else {
    result.cause().printStackTrace()
  }
}

渠道

通道类似于 Java `BlockingQueue`,不同之处在于它们会挂起协程而不是阻塞线程。

  • 将值发送到已满的通道会挂起协程。

  • 从空通道接收值也会挂起协程。

Vert.x `ReadStream` 和 `WriteStream` 可以通过 `toChannel` 扩展方法适应通道。

这些适配器负责管理背压和流终止。

  • `ReadStream` 适应于 `ReceiveChannel`

  • `WriteStream` 适应于 `SendChannel`

接收数据

当你需要处理一系列相关值流时,通道会非常有用。

suspend fun handleTemperatureStream() {
  val stream = vertx.eventBus().consumer<Double>("temperature")
  val channel = stream.toReceiveChannel(vertx)

  var min = Double.MAX_VALUE
  var max = Double.MIN_VALUE

  // Iterate until the stream is closed
  // Non-blocking
  for (msg in channel) {
    val temperature = msg.body()
    min = min(min, temperature)
    max = max(max, temperature)
  }

  // The stream is now closed
}

它也常用于解析协议。我们将构建一个非阻塞的 HTTP 请求解析器,以展示通道的强大功能。

我们将依赖 `RecordParser` 将缓冲区流切分为由 `\r\n` 分隔的缓冲区流。

这是解析器的初始版本,它只处理 HTTP 请求行。

vertx.createNetServer().connectHandler { socket ->

  // The record parser provides a stream of buffers delimited by \r\n
  val stream = RecordParser.newDelimited("\r\n", socket)

  // Convert the stream to a Kotlin channel
  val channel = stream.toReceiveChannel(vertx)

  // Run the coroutine
  launch {

    // Receive the request-line
    // Non-blocking
    val line = channel.receive().toString().split(" ")
    val method = line[0]
    val uri = line[1]

    println("Received HTTP request ($method, $uri)")

    // Still need to parse headers and body...
  }
}

解析请求行就像在通道上调用 `receive` 一样简单。

下一步通过接收数据块来解析 HTTP 头部,直到接收到一个空块。

// Receive HTTP headers
val headers = HashMap<String, String>()
while (true) {

  // Non-blocking
  val header = channel.receive().toString()

  // Done with parsing headers
  if (header.isEmpty()) {
    break
  }

  val pos = header.indexOf(':')
  headers[header.substring(0, pos).lowercase(Locale.getDefault())] = header.substring(pos + 1).trim()
}

println("Received HTTP request ($method, $uri) with headers ${headers.keys}")

最后,通过处理可选的请求体来终止解析器。

// Receive the request body
val transferEncoding = headers["transfer-encoding"]
val contentLength = headers["content-length"]

val body: Buffer?
if (transferEncoding == "chunked") {

  // Handle chunked encoding, e.g
  // 5\r\n
  // HELLO\r\n
  // 0\r\n
  // \r\n

  body = Buffer.buffer()
  while (true) {

    // Parse length chunk
    // Non-blocking
    val len = channel.receive().toString().toInt(16)
    if (len == 0) {
      break
    }

    // The stream is flipped to parse a chunk of the exact size
    stream.fixedSizeMode(len + 2)

    // Receive the chunk and append it
    // Non-blocking
    val chunk = channel.receive()
    body.appendBuffer(chunk, 0, chunk.length() - 2)

    // The stream is flipped back to the \r\n delimiter to parse the next chunk
    stream.delimitedMode("\r\n")
  }
} else if (contentLength != null) {

  // The stream is flipped to parse a body of the exact size
  stream.fixedSizeMode(contentLength.toInt())

  // Non-blocking
  body = channel.receive()
} else {
  body = null
}

val bodySize = body?.length() ?: 0
println("Received HTTP request ($method, $uri) with headers ${headers.keys} and body with size $bodySize")

发送数据

使用通道发送数据非常直接。

suspend fun sendChannel(httpResponse : HttpServerResponse) {
  val channel = httpResponse.toSendChannel(vertx)

  while (true) {
    val buffer = readBuffer()

    // Broadcast the temperature
    // Non-blocking but could be suspended
    channel.send(buffer)

    // Wait for one second
    awaitEvent<Long> { vertx.setTimer(1000, it) }
  }
}

`SendChannel#send` 和 `WriteStream#write` 都是非阻塞操作,然而与 `SendChannel#send` 在通道满时可以挂起执行不同,没有通道的等效代码会是这样:

suspend fun sendChannel(httpResponse : HttpServerResponse) {
  val channel = httpResponse.toSendChannel(vertx)

  while (true) {
    val buffer = readBuffer()

    // Broadcast the temperature
    // Non-blocking but could be suspended
    channel.send(buffer)

    // Wait for one second
    awaitEvent<Long> { vertx.setTimer(1000, it) }
  }
}

延迟、取消和超时

Vert.x 调度器通过 Vert.x 计时器完全支持协程 `delay` 函数。

launch {
  // Set a one-second Vertx timer
  delay(1000)
}

计时器支持取消。

val job = launch {
  // Set a one-second Vertx timer
  while (true) {
    delay(1000)
    // Do something periodically
  }
}

// Sometimes later
job.cancel()

取消是协作式的。

您还可以使用 `withTimeout` 函数安排超时。

launch {
  try {
    val id = withTimeout(1000) {
      awaitEvent { anAsyncMethod(it) }
    }
  } catch (e: TimeoutCancellationException) {
    // Cancelled
  }
}

协程构建器

Vert.x 与所有协程构建器配合使用,只要 `CoroutineScope` 实例可用即可:`launch`、`async`、`produce` 等。有几点重要事项需要记住:

  • `runBlocking` 不需要 `CoroutineScope`,并且不得在 Vert.x 事件循环线程中使用。

  • 为了避免内存泄漏,始终使用 `coroutineScope {..}` 来定义子作用域。这样,如果作用域内的一个协程失败,该作用域内定义的所有其他协程也将被取消。

Vert.x 还提供了一个协程构建器,它返回一个 `io.vertx.core.Future` 实例。


// Can be called on any thread
val future1: Future<String> = vertxFuture(vertx) {
  computeSomethingWithSuspendingFunction()
}

// Can be called only when running on a Vert.x context
val future2: Future<String> = vertxFuture {
  computeSomethingWithSuspendingFunction()
}

协程互操作性

Vert.x 集成设计为与 Kotlin 协程完全互操作。

  • 使用 Vert.x 调度器时,`kotlinx.coroutines.sync.Mutex` 在事件循环线程上执行。

RxJava 互操作性

模块 `vertx-lang-kotlin-coroutines` 没有与 RxJava 的特定集成。然而,Kotlin 协程提供了与 RxJava 的集成,这与 `vertx-lang-kotlin-coroutines` 配合得很好。

您可以在 适用于响应式流的协程 中阅读更多内容。

适用于协程的 Vert.x API 扩展

事件总线

Vert.x `EventBus` 和 `MessageConsumer` 对象通过 `coroutineEventBus` 作用域函数扩展了对协程的支持。

val bus = vertx.eventBus()
coroutineEventBus {
  bus.coConsumer<String>("some-address") {
    computeSomethingWithSuspendingFunction()
    it.reply("done")
  }
}

如果包围类型实现了 `io.vertx.kotlin.coroutines.CoroutineEventBusSupport`,则不需要作用域函数。例如,使用协程 verticle 时。

class VerticleWithCoroutineEventBusSupport : CoroutineVerticle(), CoroutineEventBusSupport {
  override suspend fun start() {
    val bus = vertx.eventBus()
    bus.coConsumer<String>("some-address") {
      // call suspending functions and do something
    }
  }
}

Vert.x Web

Vert.x Web 的 `Router` 和 `Route` 对象通过 `coroutineRouter` 作用域函数扩展了对协程的支持。

val router = Router.router(vertx)
coroutineRouter {
  // Route.coRespond is similar to Route.respond but using a suspending function
  router.get("/my-resource").coRespond {
    // similar to Route.respond but using a suspending function
    val response = computeSomethingWithSuspendingFunction()
    response // sent by Vert.x to the client
  }
  // Router.coErrorHandler is similar to Router.errorHandler but using a suspending function
  router.coErrorHandler(404) { rc ->
    val html = computeHtmlPageWithSuspendingFunction()
    rc.response().setStatusCode(404).putHeader(CONTENT_TYPE, TEXT_HTML).end(html)
  }
}

如果包围类型实现了 `io.vertx.kotlin.coroutines.CoroutineRouterSupport`,则不需要作用域函数。例如,使用协程 verticle 时。

class VerticleWithCoroutineRouterSupport : CoroutineVerticle(), CoroutineRouterSupport {
  override suspend fun start() {
    val router = Router.router(vertx)
    router.get("/my-resource").coRespond {
      // call suspending functions and build response
    }
  }
}