val vertx = Vertx.vertx()
GlobalScope.launch(vertx.dispatcher()) {
val timerId = awaitEvent<Long> { handler ->
vertx.setTimer(1000, handler)
}
println("Event fired from timer with id $timerId")
}
vertx-lang-kotlin-coroutines
`vertx-lang-kotlin-coroutines` 集成了 Kotlin *协程*,用于执行异步操作和处理事件。这使得编程模型看起来像顺序代码,但它不会阻塞内核线程。
简介
Vert.x 相对于许多传统应用平台的一个主要优势是,它几乎完全是非阻塞的(不阻塞内核线程)。这使得基于 Vert.x 的应用程序能够使用极少数内核线程处理大量并发(例如,大量连接和消息),从而实现出色的可伸缩性。
Vert.x 的非阻塞特性导致了异步 API。异步 API 可以采用多种形式,包括回调、Promise、纤程或响应式扩展。Vert.x 的核心 API 使用回调风格,但它也支持其他模型,如 RxJava。
在某些情况下,使用异步 API 进行编程可能比使用经典/顺序代码风格更具挑战性,尤其是在需要按顺序完成多个操作时。此外,在使用异步 API 时,错误传播通常更为复杂。
`vertx-lang-kotlin-coroutines` 使用*协程*。协程是非常轻量级的线程,它们不对应底层的内核线程,因此当一个*协程*需要“阻塞”时,它会*挂起*并释放其当前的内核线程,以便另一个协程可以处理事件。
`vertx-lang-kotlin-coroutines` 使用 `kotlinx.coroutines` 来实现协程。
从 Vert.x 上下文运行协程
导入 `io.vertx.kotlin.coroutines.VertxCoroutine` 后,`GlobalScope.launch` 方法允许将一段代码作为协程在“全局”应用程序范围(受限于应用程序的生命周期)中运行。
`vertx.dispatcher()` 返回一个协程调度器,该调度器使用 Vert.x 事件循环执行协程。
`awaitEvent` 函数会挂起协程的执行,直到计时器触发,然后用传递给处理程序的值恢复协程。
关于处理程序、事件和事件流的更多详细信息将在接下来的章节中给出。
扩展 CoroutineVerticle
您可以将代码部署为 `io.vertx.kotlin.coroutines.CoroutineVerticle` 的实例,这是一种专为 Kotlin 协程设计的 verticle 类型。`CoroutineVerticle` 类实现了 `kotlinx.coroutines.CoroutineScope` 接口,使得所有协程构建器方法默认绑定到 verticle 上下文。您应该重写 verticle 的挂起 `start()` 方法和(可选的)挂起 `stop()` 方法。
class MyVerticle : CoroutineVerticle() {
override suspend fun start() {
// ...
}
override suspend fun stop() {
// ...
}
}
下面的所有代码示例都假定在 `CoroutineVerticle` 实现中运行,但您可以将所有 `
获取一次性异步结果
Vert.x 中的许多异步操作都将 `Handler
这可以通过使用 `awaitResult` 方法来实现,该方法返回结果值或抛出异常。
协程在事件处理完成之前被挂起,并且没有内核线程被阻塞。
该方法通过指定需要执行的异步操作来执行,该操作以在运行时传递给处理程序的代码块的形式给出。
这是一个示例
suspend fun awaitResultExample() {
val consumer = vertx.eventBus().localConsumer<String>("a.b.c")
consumer.handler { message ->
println("Consumer received: ${message.body()}")
message.reply("pong")
}
// Send a message and wait for a reply
val reply = awaitResult<Message<String>> { h ->
vertx.eventBus().request<String>("a.b.c", "ping").onComplete(h)
}
println("Reply received: ${reply.body()}")
}
当代码块产生失败时,调用者可以使用常见的异常 `try`/`catch` 结构将其作为异常处理。
suspend fun awaitResultFailureExample() {
val consumer = vertx.eventBus().localConsumer<String>("a.b.c")
consumer.handler { message ->
// The consumer will get a failure
message.fail(0, "it failed!!!")
}
// Send a message and wait for a reply
try {
awaitResult<Message<String>> { h ->
vertx.eventBus().request<String>("a.b.c", "ping").onComplete(h)
}
} catch (e: ReplyException) {
// Handle specific reply exception here
println("Reply failure: ${e.message}")
}
}
获取一次性事件
处理一次性事件(以及不处理后续发生的事件,如果有的话)是使用 `awaitEvent` 函数实现的。
suspend fun awaitEventExample() {
val id = awaitEvent<Long> { h -> vertx.setTimer(2000L, h) }
println("This should be fired in 2s by some time with id=$id")
}
获取一次性 worker 结果
处理阻塞计算是使用 `awaitBlocking` 函数实现的。
suspend fun awaitBlockingExample() {
awaitBlocking {
Thread.sleep(1000)
"some-string"
}
}
事件流
在 Vert.x API 的许多地方,事件流通过处理程序进行处理。示例包括事件总线消息消费者和 HTTP 服务器请求。
`ReceiveChannelHandler` 类允许通过(可挂起的)`receive` 方法接收事件。
suspend fun streamExample() {
val adapter = vertx.receiveChannelHandler<Message<Int>>()
vertx.eventBus().localConsumer<Int>("a.b.c").handler(adapter)
// Send 15 messages
for (i in 0..15) vertx.eventBus().send("a.b.c", i)
// Receive the first 10 messages
for (i in 0..10) {
val message = adapter.receive()
println("Received: ${message.body()}")
}
}
等待 Vert.x 异步结果完成
Vert.x 4 提供了 Future,并且 `Future` 有一个 `coAwait()` 挂起方法,用于锁定异步结果。
Vert.x `Future` 实例上的 `coAwait` 扩展方法会挂起协程,直到它们完成,在这种情况下,该方法将返回对应 `AsyncResult
suspend fun awaitingFuture(anotherFuture: Future<String>) {
// Getting a future
val httpServerFuture = vertx.createHttpServer()
.requestHandler { req -> req.response().end("Hello!") }
.listen(8000)
val httpServer = httpServerFuture.coAwait()
println("HTTP server port: ${httpServer.actualPort()}")
// It also works for composite futures
val result = Future.all(httpServerFuture, anotherFuture).coAwait()
if (result.succeeded()) {
println("The server is now running!")
} else {
result.cause().printStackTrace()
}
}
渠道
通道类似于 Java `BlockingQueue`,不同之处在于它们会挂起协程而不是阻塞线程。
-
将值发送到已满的通道会挂起协程。
-
从空通道接收值也会挂起协程。
Vert.x `ReadStream` 和 `WriteStream` 可以通过 `toChannel` 扩展方法适应通道。
这些适配器负责管理背压和流终止。
-
`ReadStream
` 适应于 `ReceiveChannel ` -
`WriteStream
` 适应于 `SendChannel `
接收数据
当你需要处理一系列相关值流时,通道会非常有用。
suspend fun handleTemperatureStream() {
val stream = vertx.eventBus().consumer<Double>("temperature")
val channel = stream.toReceiveChannel(vertx)
var min = Double.MAX_VALUE
var max = Double.MIN_VALUE
// Iterate until the stream is closed
// Non-blocking
for (msg in channel) {
val temperature = msg.body()
min = min(min, temperature)
max = max(max, temperature)
}
// The stream is now closed
}
它也常用于解析协议。我们将构建一个非阻塞的 HTTP 请求解析器,以展示通道的强大功能。
我们将依赖 `RecordParser` 将缓冲区流切分为由 `\r\n` 分隔的缓冲区流。
这是解析器的初始版本,它只处理 HTTP 请求行。
vertx.createNetServer().connectHandler { socket ->
// The record parser provides a stream of buffers delimited by \r\n
val stream = RecordParser.newDelimited("\r\n", socket)
// Convert the stream to a Kotlin channel
val channel = stream.toReceiveChannel(vertx)
// Run the coroutine
launch {
// Receive the request-line
// Non-blocking
val line = channel.receive().toString().split(" ")
val method = line[0]
val uri = line[1]
println("Received HTTP request ($method, $uri)")
// Still need to parse headers and body...
}
}
解析请求行就像在通道上调用 `receive` 一样简单。
下一步通过接收数据块来解析 HTTP 头部,直到接收到一个空块。
// Receive HTTP headers
val headers = HashMap<String, String>()
while (true) {
// Non-blocking
val header = channel.receive().toString()
// Done with parsing headers
if (header.isEmpty()) {
break
}
val pos = header.indexOf(':')
headers[header.substring(0, pos).lowercase(Locale.getDefault())] = header.substring(pos + 1).trim()
}
println("Received HTTP request ($method, $uri) with headers ${headers.keys}")
最后,通过处理可选的请求体来终止解析器。
// Receive the request body
val transferEncoding = headers["transfer-encoding"]
val contentLength = headers["content-length"]
val body: Buffer?
if (transferEncoding == "chunked") {
// Handle chunked encoding, e.g
// 5\r\n
// HELLO\r\n
// 0\r\n
// \r\n
body = Buffer.buffer()
while (true) {
// Parse length chunk
// Non-blocking
val len = channel.receive().toString().toInt(16)
if (len == 0) {
break
}
// The stream is flipped to parse a chunk of the exact size
stream.fixedSizeMode(len + 2)
// Receive the chunk and append it
// Non-blocking
val chunk = channel.receive()
body.appendBuffer(chunk, 0, chunk.length() - 2)
// The stream is flipped back to the \r\n delimiter to parse the next chunk
stream.delimitedMode("\r\n")
}
} else if (contentLength != null) {
// The stream is flipped to parse a body of the exact size
stream.fixedSizeMode(contentLength.toInt())
// Non-blocking
body = channel.receive()
} else {
body = null
}
val bodySize = body?.length() ?: 0
println("Received HTTP request ($method, $uri) with headers ${headers.keys} and body with size $bodySize")
发送数据
使用通道发送数据非常直接。
suspend fun sendChannel(httpResponse : HttpServerResponse) {
val channel = httpResponse.toSendChannel(vertx)
while (true) {
val buffer = readBuffer()
// Broadcast the temperature
// Non-blocking but could be suspended
channel.send(buffer)
// Wait for one second
awaitEvent<Long> { vertx.setTimer(1000, it) }
}
}
`SendChannel#send` 和 `WriteStream#write` 都是非阻塞操作,然而与 `SendChannel#send` 在通道满时可以挂起执行不同,没有通道的等效代码会是这样:
suspend fun sendChannel(httpResponse : HttpServerResponse) {
val channel = httpResponse.toSendChannel(vertx)
while (true) {
val buffer = readBuffer()
// Broadcast the temperature
// Non-blocking but could be suspended
channel.send(buffer)
// Wait for one second
awaitEvent<Long> { vertx.setTimer(1000, it) }
}
}
延迟、取消和超时
Vert.x 调度器通过 Vert.x 计时器完全支持协程 `delay` 函数。
launch {
// Set a one-second Vertx timer
delay(1000)
}
计时器支持取消。
val job = launch {
// Set a one-second Vertx timer
while (true) {
delay(1000)
// Do something periodically
}
}
// Sometimes later
job.cancel()
取消是协作式的。
您还可以使用 `withTimeout` 函数安排超时。
launch {
try {
val id = withTimeout(1000) {
awaitEvent { anAsyncMethod(it) }
}
} catch (e: TimeoutCancellationException) {
// Cancelled
}
}
协程构建器
Vert.x 与所有协程构建器配合使用,只要 `CoroutineScope` 实例可用即可:`launch`、`async`、`produce` 等。有几点重要事项需要记住:
-
`runBlocking` 不需要 `CoroutineScope`,并且不得在 Vert.x 事件循环线程中使用。
-
为了避免内存泄漏,始终使用 `coroutineScope {..}` 来定义子作用域。这样,如果作用域内的一个协程失败,该作用域内定义的所有其他协程也将被取消。
Vert.x 还提供了一个协程构建器,它返回一个 `io.vertx.core.Future` 实例。
// Can be called on any thread
val future1: Future<String> = vertxFuture(vertx) {
computeSomethingWithSuspendingFunction()
}
// Can be called only when running on a Vert.x context
val future2: Future<String> = vertxFuture {
computeSomethingWithSuspendingFunction()
}
协程互操作性
Vert.x 集成设计为与 Kotlin 协程完全互操作。
-
使用 Vert.x 调度器时,`kotlinx.coroutines.sync.Mutex` 在事件循环线程上执行。
RxJava 互操作性
模块 `vertx-lang-kotlin-coroutines` 没有与 RxJava 的特定集成。然而,Kotlin 协程提供了与 RxJava 的集成,这与 `vertx-lang-kotlin-coroutines` 配合得很好。
您可以在 适用于响应式流的协程 中阅读更多内容。
适用于协程的 Vert.x API 扩展
事件总线
Vert.x `EventBus` 和 `MessageConsumer` 对象通过 `coroutineEventBus` 作用域函数扩展了对协程的支持。
val bus = vertx.eventBus()
coroutineEventBus {
bus.coConsumer<String>("some-address") {
computeSomethingWithSuspendingFunction()
it.reply("done")
}
}
如果包围类型实现了 `io.vertx.kotlin.coroutines.CoroutineEventBusSupport`,则不需要作用域函数。例如,使用协程 verticle 时。 |
Vert.x Web
Vert.x Web 的 `Router` 和 `Route` 对象通过 `coroutineRouter` 作用域函数扩展了对协程的支持。
val router = Router.router(vertx)
coroutineRouter {
// Route.coRespond is similar to Route.respond but using a suspending function
router.get("/my-resource").coRespond {
// similar to Route.respond but using a suspending function
val response = computeSomethingWithSuspendingFunction()
response // sent by Vert.x to the client
}
// Router.coErrorHandler is similar to Router.errorHandler but using a suspending function
router.coErrorHandler(404) { rc ->
val html = computeHtmlPageWithSuspendingFunction()
rc.response().setStatusCode(404).putHeader(CONTENT_TYPE, TEXT_HTML).end(html)
}
}
如果包围类型实现了 `io.vertx.kotlin.coroutines.CoroutineRouterSupport`,则不需要作用域函数。例如,使用协程 verticle 时。 |