public class MyVerticle extends AbstractVerticle {
public void start() {
JsonObject config = context.config();
}
}
高级 Vert.x 指南
本指南文档介绍了 Vert.x (5.0) 的高级/内部内容。
它旨在解释和讨论以下内容
-
Vert.x 设计
-
内部 API
-
与 Netty 集成
当您希望时,请阅读本指南
-
更好地理解 Vert.x 内部机制
-
将 Vert.x 与第三方库集成
-
使用 Netty 和 Vert.x 进行网络操作
这是一个实时指南,您可以贡献内容,只需在仓库中打开一个 PR 或 Issue。
本指南中暴露了一些 Vert.x 内部 API,您应该记住,这些 API 在需要时可能会发生更改。 |
Vert.x 中的上下文
io.vertx.core.Context
接口是 Vert.x 的一个重要组成部分。
从宏观层面看,上下文可以被认为是控制应用程序如何执行事件(或由处理器创建的任务)的执行方式。
大多数事件都是通过上下文分发的,当应用程序消费一个事件时,很可能有一个上下文与该事件的分发相关联。
Verticle 上下文
当部署一个 verticle 实例时,Vert.x 会创建并关联一个上下文到该实例。您可以使用 AbstractVerticle
的 context
字段在您的 verticle 中访问此上下文。
当 MyVerticle
部署时,Vert.x 会发出一个 start 事件,start
方法由 Verticle 上下文的一个线程调用。
-
默认情况下,上下文始终是事件循环上下文,调用线程是事件循环线程。
-
当 verticle 作为 worker 部署时,调用线程是 Vert.x worker 线程池中的一个。
即时上下文
自 Vert.x 3 以来,不使用 Verticle 而使用 Vert.x API 得到支持,这引出了一个有趣的问题:使用了哪个上下文?
当调用 Vert.x API 时,Vert.x 会将当前线程与一个即时事件循环上下文关联起来,Vertx#getOrCreateContext()
在首次为非 Vert.x 线程调用时创建一个上下文,并在后续调用中返回此上下文。
因此,异步 Vert.x API 上的回调发生在同一个上下文中。
public class Main {
public static void main(String[] args) {
WebClient client = WebClient.create(vertx);
for (int i = 0;i < 4;i++) {
client
.get(8080, "myserver.mycompany.com", "/some-uri")
.send()
.onSuccess(ar -> {
// All callbacks are on the same context
});
}
}
}
此行为与之前的主要版本不同,Vert.x 3 会为每个 HTTP 请求创建不同的上下文。
虽然 Vert.x 鼓励在上下文中限制代码,但这种行为避免了潜在的数据竞争。
上下文的传播
大多数 Vert.x API 都感知上下文。
在上下文中执行的异步操作将使用相同的上下文回调应用程序。
同样,事件处理器也分派到相同的上下文中。
public class MyVerticle extends AbstractVerticle {
public void start() {
Future<HttpServer> future = vertx.createHttpServer()
.requestHandler(request -> {
// Executed in the verticle context
})
.listen(8080, "localhost");
future.onComplete(ar -> {
// Executed in the verticle context
});
}
}
处理上下文
大多数应用程序不需要与上下文进行紧密交互,但有时访问它们会很有用,例如您的应用程序使用另一个库,该库在其自己的线程上执行回调,而您希望在原始上下文中执行代码。
上面我们已经看到 verticle 可以通过 context
字段访问其上下文,但这意味着要使用一个 verticle 并拥有一个 verticle 的引用,这可能并不总是方便的。
您可以使用 getOrCreateContext()
获取当前上下文。
Context context = vertx.getOrCreateContext();
您也可以使用静态方法 Vertx.currentContext()
。
Context context = Vertx.currentContext();
后者在当前线程未与上下文关联时可能返回 null,而前者在需要时会创建一个上下文,因此从不返回 null。
获得上下文后,您可以使用它在该上下文中运行代码。
public void integrateWithExternalSystem(Handler<Event> handler) {
// Capture the current context
Context context = vertx.getOrCreateContext();
// Run the event handler on the application context
externalSystem.onEvent(event -> {
context.runOnContext(v -> handler.handle(event));
});
}
实际上,许多 Vert.x API 和第三方库都是这样实现的。
事件循环上下文
事件循环上下文使用事件循环来运行代码:处理器直接在 IO 线程上执行,因此
-
处理器将始终由同一线程执行。
-
处理器绝不能阻塞线程,否则将导致与该事件循环关联的所有 IO 任务饥饿。
这种行为大大简化了线程模型,因为它保证关联的处理器将始终在同一线程上执行,从而消除了对同步和其他锁定机制的需求。
这是默认且最常用的上下文类型。没有使用 worker 标志部署的 verticle 将始终使用事件循环上下文进行部署。
Worker 上下文
Worker 上下文分配给启用了 worker 选项部署的 verticle。Worker 上下文与标准事件循环上下文的区别在于,worker 在单独的 worker 线程池上执行。
这种与事件循环线程的分离允许 worker 上下文执行会阻塞事件循环的阻塞操作:阻塞此类线程只会阻塞一个线程,而不会影响应用程序。
与事件循环上下文的情况一样,worker 上下文确保处理器在任何给定时间只在一个线程上执行。也就是说,在 worker 上下文中执行的处理器将始终按顺序执行——一个接一个——但不同的操作可能在不同的线程上执行。
上下文异常处理器
可以在上下文中设置一个异常处理器,以捕获在上下文中运行的任务抛出的任何未检查异常。
当没有设置异常处理器时,将调用 Vertx
异常处理器。
context.exceptionHandler(throwable -> {
// Any exception thrown by this context
});
vertx.exceptionHandler(throwable -> {
// Any exception uncaught exception thrown on context
});
当完全没有设置处理器时,异常将以错误消息 Unhandled exception 记录。
您可以使用 reportException
在上下文中报告异常。
context.reportException(new Exception());
触发事件
runOnContext
是在上下文中执行一段代码的最常用方式。尽管它非常适合将外部库与 Vert.x 集成,但它并不总是最适合将事件循环级别执行的代码(例如 Netty 事件)与应用程序代码集成。
根据情况,有一些内部方法可以实现类似的行为:
-
ContextInternal#dispatch(E, Handler<E>)
-
ContextInternal#execute(E, Handler<E>)
-
ContextInternal#emit(E, Handler<E>)
分派
dispatch
假定调用线程是上下文线程,它将当前执行线程与上下文关联起来。
assertNull(Vertx.currentContext());
context.dispatch(event, evt -> {
assertSame(context, Vertx.currentContext());
});
处理器也由阻塞线程检查器监控。
最后,处理器抛出的任何异常都会报告给上下文。
context.exceptionHandler(err -> {
// Should receive the exception thrown below
});
context.dispatch(event, evt -> {
throw new RuntimeException();
});
发出
emit
是 execute
和 dispatch
的组合。
default void emit(E event, Handler<E> eventHandler) {
execute(v -> dispatch(argument, task));
}
emit
可以从任何线程用于向处理器触发事件。
-
从任何线程调用时,它的行为类似于
runOnContext
。 -
从上下文线程调用时,它使用上下文线程局部关联、阻塞线程检查器运行事件处理器,并向上下文报告失败。
在大多数情况下,emit
方法是让应用程序处理事件的首选方式。dispatch
和 execute
方法的主要目的是为了让代码实现非常特定的功能时提供更多控制。
上下文感知型 Future
在 Vert.x 4 之前,Future
是静态创建的对象,与上下文没有特定关系。Vert.x 4 提供了一个基于 Future 的 API,它需要遵循与 Vert.x 3 相同的语义:Future 上的任何回调都应可预测地在同一上下文中运行。
Vert.x 4 API 创建绑定到调用者上下文的 Future,并在该上下文中运行回调。
Promise<String> promise = context.promise();
Future<String> future = promise.future();
future.onSuccess(handler);
任何回调都在创建 Promise 的上下文中发出,上面的代码与此非常相似:
Promise<String> promise = Promise.promise();
Future<String> future = promise.future();
future.onSuccess(result -> context.emit(result, handler));
此外,API 允许创建成功和失败的 Future。
Future<String> succeeded = context.succeededFuture("OK usa");
Future<String> failed = context.failedFuture("Oh sorry");
上下文与追踪
自 Vert.x 4 起,Vert.x 集成了流行的分布式追踪系统。
追踪库通常依赖于线程局部存储来传播追踪数据,例如,处理 HTTP 请求时收到的追踪应在整个 HTTP 客户端中传播。
Vert.x 以类似的方式集成追踪,但依赖于上下文而非线程局部存储。上下文确实由 Vert.x API 传播,因此为实现追踪提供了可靠的存储。
由于给定服务器处理的所有 HTTP 请求都使用创建 HTTP 服务器的同一上下文,因此服务器上下文会为每个 HTTP 请求复制一份,以确保每个 HTTP 请求的唯一性。
public class MyVerticle extends AbstractVerticle {
public void start() {
vertx.createHttpServer()
.requestHandler(request -> {
// Executed in a duplicate verticle context
})
.listen(8080, "localhost");
}
}
复制共享了原始上下文的大部分特性,并提供了特定的局部存储。
vertx.createHttpServer()
.requestHandler(request -> {
JsonObject specificRequestData = getRequestData(request);
Context context = vertx.getOrCreateContext();
context.putLocal("my-stuff", specificRequestData);
processRequest(request);
})
.listen(8080, "localhost");
之后应用程序可以使用它
Context context = vertx.getOrCreateContext();
JsonObject specificRequestData = context.getLocal("my-stuff");
ContextInternal#duplicate()
复制当前上下文,可用于限定与追踪相关的活动范围。
public void startProcessing(Request request) {
Context duplicate = context.duplicate();
request.setContext(duplicate);
}
关闭钩子
关闭钩子是 Vert.x 的一个内部特性,用于创建在 Verticle
或 Vertx
实例关闭时收到通知的组件。它可用于实现 verticle 中的自动清理功能,例如 Vert.x HTTP 服务器。
接收关闭通知的契约由 io.vertx.core.Closeable
接口及其 close(Promise<Void> closePromise)
方法定义。
@Override
public void close(Promise<Void> completion) {
// Do cleanup, the method will complete the future
doClose(completion);
}
ContextInternal#addCloseHook
方法注册一个 Closeable
实例,以便在上下文关闭时收到通知。
context.addCloseHook(closeable);
Verticle 部署创建的上下文会在 verticle 实例停止时调用该钩子。
否则,钩子会在 Vertx 实例关闭时调用。
Context#removeCloseHook
注销关闭钩子,应在关闭钩子被调用之前资源关闭时使用。
context.removeCloseHook(closeable);
钩子通过弱引用实现以避免内存泄漏,但仍应注销钩子。
在复制的上下文上添加钩子,会将其添加到原始上下文。
同样,VertxInternal
也暴露了相同的操作,以便在 Vertx
实例关闭时接收通知。
集成 Netty
Netty 是 Vert.x 的依赖项之一。实际上,Netty 为 Vert.x 的网络服务提供支持。Vert.x Core 提供了人们期望从此类库获得的基本网络服务:
-
TCP
-
HTTP
-
UDP
-
DNS
这些服务是使用 Netty 的各种组件构建的。Netty 社区已经实现了广泛的组件,本章解释了如何在 Vert.x 中集成这些组件。
在本章中,我们将构建一个 TIME 协议客户端和服务器。Netty 文档提供了该简单协议的客户端/服务器实现,我们将重点关注这些组件的集成。
Netty 集成点
本章的主要目的是解释 Vert.x 的一些内部接口。这些接口是扩展,暴露了与 Netty 交互的低级方法,对于直接重用 Netty 的组件非常有用。
大多数用户不需要处理此扩展,因此此类方法被隔离在一个扩展接口中。 |
引导客户端
ContextInternal
扩展了 io.vertx.core.Context
并暴露了各种 Netty 集成点,例如 VertxInternal
。
通常,上下文是通过 Vertx#getOrCreateContext()
方法获得的,该方法返回当前执行上下文或在必要时创建一个新上下文:在 Verticle 中调用时,getOrCreateContext()
返回此 Verticle 的上下文;在非 Vert.x 线程(例如 main
或单元测试)中使用时,它会创建一个新上下文并返回。
Context context = vertx.getOrCreateContext();
// Cast to access extra methods
Internals contextInternal = (Internals) context;
上下文始终与 Netty 事件循环关联,因此使用此上下文可确保我们的组件重用已存在的事件循环或使用新的事件循环。
ContextInternal#nettyEventLoop()
方法返回此特定的事件循环,我们可以在 Bootstrap
(用于客户端)或 ServerBoostrap
(用于服务器)上使用它。
ContextInternal contextInt = (ContextInternal) context; (1)
EventLoop eventLoop = contextInt.nettyEventLoop();
Bootstrap bootstrap = new Bootstrap(); (2)
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(eventLoop);
1 | 获取与此上下文关联的事件循环 |
2 | 为客户端创建引导程序 |
引导服务器
VertxInternal
扩展了 io.vertx.core.Vertx
,其中 VertxInternal#getAcceptorEventLoopGroup()
返回一个用于在服务器上接受连接的 EventLoopGroup
,其典型用法是在 ServerBootstrap
上。
ContextInternal contextInt = (ContextInternal) context; (1)
EventLoop eventLoop = contextInt.nettyEventLoop();
VertxInternal vertxInt = contextInt.owner(); (2)
EventLoopGroup acceptorGroup = vertxInt.getAcceptorEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap(); (3)
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.group(acceptorGroup, eventLoop);
1 | 获取与此上下文关联的事件循环 |
2 | 获取 Vertx 的 acceptor 事件循环组 |
3 | 为服务器创建引导程序 |
处理事件
既然我们对 ContextInternal
更加熟悉了,让我们看看如何使用它来处理 Netty 事件,例如网络事件、通道生命周期等。
ContextInternal#emit
方法用于向应用程序发出事件,因为它确保了:
-
上下文并发性:重用当前事件循环线程或在 worker 线程上执行
-
当前上下文与分派线程的线程局部关联
-
任何未捕获的异常都会报告给上下文,此类异常要么被记录,要么传递给
Context#exceptionHandler
。
以下是一个显示服务器引导程序的简短示例
Handler<Channel> bindHandler = ch -> {
};
bootstrap.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
context.emit(ch, bindHandler);
}
});
Promise<Void> bindPromise = context.promise();
bootstrap.bind(socketAddress).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
// Signal application with bind success
bindPromise.complete();
} else {
// Signal application with bind error
bindPromise.fail(future.cause());
}
}
});
return bindPromise.future();
emit
的典型用法是将一个或多个事件分派给同一个处理器,例如事件处理器。
对于 Future,ContextInternal#promise
方法会创建一个 Promise,其监听器行为类似于 emit
方法。
服务器
原始服务器示例可以在这里找到。
Vert.x TIME 服务器暴露了一个简单的 API
-
一个用于创建
TimeServer
的静态方法 -
两个方法:
listen
用于绑定服务器,close
用于解绑 -
requestHandler
用于设置处理请求的处理器
public interface TimeServer {
/**
* @return a new time server
*/
static TimeServer create(Vertx vertx) {
return new TimeServerImpl(vertx);
}
/**
* Set the handler to be called when a time request happens. The handler should complete
* the future with the time value.
*
* @param handler the handler to be called
* @return this object
*/
TimeServer requestHandler(Handler<Promise<Long>> handler);
/**
* Start and bind the time server.
*
* @param port the server port
* @param host the server host
* @return the future completed when the socket is bound
*/
Future<Void> listen(int port, String host);
/**
* Close the time server.
*/
void close();
}
然后,实现一个提供当前 JVM 时间的 TIME 服务器就很简单了。
Vertx vertx = Vertx.vertx();
// Create the time server
TimeServer server = TimeServer.create(vertx);
server.requestHandler(time -> {
time.complete(System.currentTimeMillis());
});
// Start the server
server.listen(8037, "0.0.0.0")
.onSuccess(v -> System.out.println("Server started"))
.onFailure(err -> err.printStackTrace());
现在我们来研究一下服务器的实现。
服务器引导程序
首先我们来看看 ServerBootstrap
的创建和配置。
EventLoopGroup acceptorGroup = vertx.getAcceptorEventLoopGroup(); (1)
EventLoop eventLoop = context.nettyEventLoop(); (2)
bootstrap = new ServerBootstrap(); (3)
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.group(acceptorGroup, eventLoop);
bootstrap.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline(); (4)
TimeServerHandler handler = new TimeServerHandler(context, requestHandler);
pipeline.addLast(handler);
}
});
1 | VertxInternal 返回用作 acceptor 组的事件循环组。 |
2 | ContextInternal 返回用作 child 组的事件循环。 |
3 | 创建并配置 Netty 的 ServerBootstrap |
4 | 使用以服务器 requestHandler 初始化的 TimeServerHandler 配置通道。 |
ServerBootstrap
的创建相当简单,与原始版本非常相似。主要区别在于我们重用了 Verticle 和 Vert.x 提供的事件循环。这确保了我们的服务器与应用程序共享相同的资源。
请注意,TimeServerHandler
是用服务器 requestHandler
初始化的,此处理器将在处理 TIME 请求时使用。
服务器绑定
现在我们来看看绑定操作,它也非常简单,与原始示例没有太大区别。
Promise<Void> promise = context.promise(); (1)
ChannelFuture bindFuture = bootstrap.bind(host, port);
bindFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
(2)
if (future.isSuccess()) {
channel = future.channel();
promise.complete();
} else {
promise.fail(future.cause());
}
}
});
return promise.future(); (3)
1 | 创建绑定到服务器上下文的 Promise |
2 | 完成或成功结果 Promise |
3 | 返回 Future 结果 |
最重要的部分是创建上下文 Promise,使应用程序知道绑定结果。
服务器处理器
现在我们用 TimeServerHandler
完成我们的服务器,它是 Netty 原始 TimeServerHandler
的一个改编。
Promise<Long> result = Promise.promise(); (1)
context.emit(result, requestHandler); (2)
result.future().onComplete(ar -> { (3)
if (ar.succeeded()) { (4)
ByteBuf time = ctx.alloc().buffer(4);
time.writeInt((int) (ar.result() / 1000L + 2208988800L));
ChannelFuture f = ctx.writeAndFlush(time);
f.addListener((ChannelFutureListener) channelFuture -> ctx.close());
} else { (5)
ctx.close();
}
});
1 | 创建一个新的空白 Promise,该 Promise 将由 requestHandler 解析。 |
2 | 让上下文使用 emit 将事件发出到 requestHandler 。 |
3 | 当 requestHandler 实现完成关联的 Promise 时,调用 Future 处理器。 |
4 | 将当前 TIME 写入通道,并在此之后关闭它。 |
5 | 应用程序失败,我们简单地关闭套接字。 |
当 TIME 请求事件发生时,会使用 emit
,待完成的 Promise 会传递给 requestHandler
。当此 Promise 完成时,处理器将把时间结果写入通道或关闭它。
客户端
原始客户端示例可以在这里找到。
Vert.x time 客户端暴露了一个简单的 API
-
一个用于创建
TimeClient
的静态方法 -
客户端
getTime
方法用于从服务器检索时间值
public interface TimeClient {
/**
* @return a new time client
*/
static TimeClient create(Vertx vertx) {
return new TimeClientImpl(vertx);
}
/**
* Fetch the current time from a server.
*
* @param port the server port
* @param host the server host name
* @return the result future
*/
Future<Long> getTime(int port, String host);
}
TIME 客户端使用起来很简单。
Vertx vertx = Vertx.vertx();
// Create the time client
TimeClient server = TimeClient.create(vertx);
// Fetch the time
server.getTime(8037, "localhost").onComplete(ar -> {
if (ar.succeeded()) {
System.out.println("Time is " + new Date(ar.result()));
} else {
ar.cause().printStackTrace();
}
});
现在我们来研究一下客户端的实现。
客户端引导程序
首先我们来看看 Bootstrap
的创建和配置。
EventLoop eventLoop = context.nettyEventLoop(); (1)
// Create and configure the Netty bootstrap
Bootstrap bootstrap = new Bootstrap(); (2)
bootstrap.group(eventLoop);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ChannelPipeline pipeline = ch.pipeline(); (3)
pipeline.addLast(new TimeClientHandler(result));
}
});
return bootstrap;
1 | ContextInternal 返回用作 child 组的事件循环。 |
2 | 创建并配置 Netty 的 Bootstrap |
3 | 使用以服务器 resultHandler 初始化的 TimeServerHandler 配置通道。 |
Bootstrap
的创建相当简单,与原始版本非常相似。主要区别在于我们重用了 Verticle 提供的事件循环。这确保了我们的客户端重用与我们的 verticle 相同的事件循环。
与服务器示例中一样,我们使用 ContextInternal
获取 Netty 的 EventLoop
并设置到 Bootstrap
上。
请注意,TimeServerHandler
是用客户端 resultHandler
初始化的,此处理器将使用 TIME 请求结果进行调用。
客户端连接
引导程序设置与原始示例非常相似,如果发生故障,应用程序回调会使用一个持有整体结果的 Promise。
ChannelFuture connectFuture = bootstrap.connect(host, port); (1)
connectFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
result.fail(future.cause()); // 2
}
}
});
1 | 连接到服务器 |
2 | 连接错误时,我们使 Promise 失败 |
我们只关心将连接失败传播到应用程序,当引导程序成功连接时,TimeServerHandler
将处理网络响应并传递给应用程序。
客户端处理器
现在我们用 TimeServerHandler
完成我们的客户端,它是 Netty 原始 TimeClientHandler
的一个改编。
ByteBuf m = (ByteBuf) msg;
long currentTimeMillis;
try {
currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L; (1)
resultPromise.complete(currentTimeMillis); (2)
resultPromise = null; (3)
ctx.close(); (4)
} finally {
m.release();
}
1 | 解码来自服务器的时间响应 |
2 | 用响应完成 resultPromise |
3 | 将 resultPromise 设置为 null |
4 | 关闭通道 |
同样,在这里,当 TIME 响应事件发生时,我们完成 resultPromise
。
使用 Netty TCP 编解码器
在上一节中,我们探讨了 Vert.x 和 Netty 如何共享资源以及 Netty 事件如何传播到 Vert.x 应用程序。在本节中,我们将研究现有 Netty 编解码器的集成。
Netty 编解码器非常适合封装和重用网络协议的编码器和解码器。基本的 Netty 发行版为流行的协议(如 HTTP、Redis、Memcached 或 MQTT)提供了一些编解码器。
客户端和服务器可以基于这些编解码器与 Vert.x 一起构建,例如 Vert.x HTTP 组件重用了 Netty 的 HTTP 编解码器,Vert.x 的 MQTT 协议也类似。
Vert.x TCP 客户端/服务器可以定制以重用 Netty 编解码器。实际上,NetSocket
的通道可以用于定制管道并读/写任意消息。
以这种方式重用 NetSocket
具有很大的价值:
-
扩展 Vert.x 生态系统,您的客户端/服务器将与此生态系统完全集成,即您可以将您的中间件与现有的 Vert.x 中间件、文件系统等混合搭配使用。
-
基于
NetSocket
功能进行构建-
SSL/TLS
-
域套接字
-
客户端 Socks/HTTP 代理处理
-
服务器 Verticle 扩展
-
指标
-
SNI 处理
-
在本章中,我们将编写一个客户端,但同样的技术也可以用于以相同方式在 Netty 编解码器之上编写服务器。
本章中实现的所有功能也可以使用集成 Netty 章中展示的技术来实现。 |
Memcached 客户端
例如,本章我们将基于 Netty 的 Memcached 二进制编解码器构建一个简单的 Memcached 客户端。
Memcached 是一个流行且免费开源、高性能的分布式内存对象缓存系统。
该协议有两个版本:文本和二进制。在本节中,我们将为本文档中描述的二进制协议构建一个客户端。
客户端使用起来非常简单。
Vertx vertx = Vertx.vertx();
MemcachedClient.connect(vertx, 11211, "localhost")
.compose(client -> {
System.out.println("connected");
// Put a value
return client.set("foo", "bar").compose(v -> {
System.out.println("Put successful");
// Now retrieve the same value
return client.get("foo");
});
}).onSuccess(res -> {
System.out.println("Get successful " + res + "");
}).onFailure(err -> err.printStackTrace());
您可以使用 Docker 轻松启动 Memcached 服务器来尝试此示例。
> docker run --rm --name my-memcache -p 11211:11211 -d memcached
Memcached 客户端剖析
客户端提供了一个简单的 API,用于连接到 Memcached 服务器以及获取/设置条目。
public interface MemcachedClient {
/**
* Connect to memcached, the {@code completionHandler} will get the {@link MemcachedClient} instance.
*/
static Future<MemcachedClient> connect(Vertx vertx, int port, String host) {
return MemcachedClientImpl.connect(vertx, port, host, null);
}
/**
* Connect to memcached, the {@code completionHandler} will get the {@link MemcachedClient} instance.
*/
static Future<MemcachedClient> connect(Vertx vertx, int port, String host, NetClientOptions options) {
return MemcachedClientImpl.connect(vertx, port, host, options);
}
/**
* Get a cached entry.
*
* @param key the entry key
* @return the result future
*/
Future<@Nullable String> get(String key);
/**
* Set a cached entry.
*
* @param key the entry key
* @param value the entry value
* @return the result future
*/
Future<Void> set(String key, String value);
}
Memcached 编解码器
Netty 提供的 Memcached 编解码器负责将 Netty ByteBuf
编码和解码为 Memcached 请求和响应。
我们的客户端只需要使用 Memcached 对象
-
将
FullBinaryMemcacheRequest
写入管道-
具有
key
属性:一个ByteBuf
,用于提供缓存条目键。 -
具有
opCode
属性:一个枚举,指示操作,GET
和SET
。 -
具有
extras
属性:一个Bytebuf
,用于提供额外信息,仅用于 Memcached SET 请求。 -
具有
content
属性:一个Bytebuf
,用于提供缓存条目值,仅用于 Memcached SET 请求。
-
-
从管道读取
FullBinaryMemcacheResponse
-
具有
status
属性:操作成功时值为0
。 -
具有
content
属性:一个Bytebuf
,用于提供缓存条目值,仅用于 Memcached GET 响应。
-
Memcached 提供了比 GET 或 SET 更丰富的协议,但本节不涵盖这些,因为目标只是一个演示,而不是一个完整的客户端。 |
连接到服务器
我们首先看看客户端的连接实现。
NetClient tcpClient = options != null ? vertx.createNetClient(options) : vertx.createNetClient();
// Connect to the memcached instance
Future<NetSocket> connect = tcpClient.connect(port, host);
return connect.map(so -> {
// Create the client
MemcachedClientImpl memcachedClient = new MemcachedClientImpl((VertxInternal) vertx, (NetSocketInternal) so);
// Initialize the client: configure the pipeline and set the handlers
memcachedClient.init();
return memcachedClient;
});
connect
实现创建了一个 Vert.x NetClient
以连接到实际的 Memcached 服务器。当连接成功时:
-
Vert.x
NetSocket
被转换为NetSocketInternal
-
Memcached 客户端被创建并初始化
NetSocketInternal
是一个高级接口,它提供了我们构建客户端所需的一些额外方法:
-
channelHandlerContext()
返回NetSocket
Netty 处理器的上下文。 -
writeMessage(Object, Handler<AsyncResult<Void>>)
将一个对象写入管道。 -
messsageHandler(Handler<Object>)
设置一个处理器用于处理管道消息。
Memcached 客户端的 init
方法使用其中一些来:
-
使用 Memcached 编解码器初始化
NetSocket
-
设置消息处理器以处理 Memcached 响应
ChannelPipeline pipeline = so.channelHandlerContext().pipeline();
// Add the memcached message aggregator
pipeline.addFirst("aggregator", new BinaryMemcacheObjectAggregator(Integer.MAX_VALUE));
// Add the memcached decoder
pipeline.addFirst("memcached", new BinaryMemcacheClientCodec());
// Set the message handler to process memcached message
so.messageHandler(this::processResponse);
请求/响应关联
Memcached 协议是一个流水线协议,响应的接收顺序与请求的发送顺序相同。
因此,客户端需要维护一个 inflight
先进先出(FIFO)队列,它是一个简单的 Java ConcurrentLinkedQueue
。当请求发送到 Memcached 服务器时,响应处理器会被添加到队列中。当收到响应时,处理器从队列中移除并可以处理响应。
发送 Memcached 请求消息
客户端有一个 writeRequest
方法,用于向管道发送请求。
-
写入请求消息
-
写入成功后,将响应处理器添加到
inflight
队列中,以便处理响应。
return so.writeMessage(request).compose(v -> {
// The message has been encoded successfully and sent
// Create a response promise and add it to the inflight queue, so it can be resolved by the server ack
Promise<FullBinaryMemcacheResponse> promise = vertx.promise();
inflight.add(promise);
//
return promise.future();
});
处理 Memcached 响应消息
客户端有一个 processResponse
方法,每当 Memcached 编解码器解码一个响应时就会调用它。
-
从队列中移除响应处理器
-
释放 Netty 消息,因为响应消息是池化的,必须调用此方法,否则将发生内存泄漏。
FullBinaryMemcacheResponse response = (FullBinaryMemcacheResponse) msg;
try {
// Get the handler that will process the response
Promise<FullBinaryMemcacheResponse> handler = inflight.poll();
// Handle the message
handler.complete(response);
} finally {
// Release the referenced counted message
response.release();
}
发送 Memcached GET 请求
Memcached GET 相当简单:
-
创建
FullBinaryMemcacheRequest
-
设置
key
属性 -
将
opCode
属性设置为BinaryMemcacheOpcodes.GET
-
-
调用
writeRequest
,传入请求并提供响应处理器
ByteBuf keyBuf = Unpooled.copiedBuffer(key, StandardCharsets.UTF_8);
// Create the memcached request
FullBinaryMemcacheRequest request = new DefaultFullBinaryMemcacheRequest(keyBuf, Unpooled.EMPTY_BUFFER);
// Set the memcached operation opcode to perform a GET
request.setOpcode(BinaryMemcacheOpcodes.GET);
// Execute the request and process the response
return writeRequest(request).map(response -> processGetResponse(response));
处理 Memcached GET 响应
Memcached GET 响应由 processGetResponse
处理。
short status = response.status();
switch (status) {
case 0:
// Succesfull get
return response.content().toString(StandardCharsets.UTF_8);
case 1:
// Empty response -> null
return null;
default:
// Memcached error
throw new MemcachedError(status);
}
响应的 status
属性指示响应是否成功。当 status
为 1
时需要特别注意,因为客户端将其视为 Java 的 null
值。
发送 Memcached SET 请求
Memcached SET 也很简单:
-
创建
FullBinaryMemcacheRequest
-
设置
key
属性 -
将
opCode
属性设置为BinaryMemcacheOpcodes.SET
-
将
extras
属性设置为值0xDEADBEEF_00001C20
-
根据协议,必须使用
0xDEADBEEF
-
00001C20
是设置为 2 小时的过期时间
-
-
设置
value
属性
-
-
调用
writeRequest
,传入请求并提供响应处理器
ByteBuf keyBuf = Unpooled.copiedBuffer(key, StandardCharsets.UTF_8);
// Create the memcached request
FullBinaryMemcacheRequest request = new DefaultFullBinaryMemcacheRequest(keyBuf, Unpooled.EMPTY_BUFFER);
// Set the memcached operation opcode to perform a GET
request.setOpcode(BinaryMemcacheOpcodes.GET);
// Execute the request and process the response
return writeRequest(request).map(response -> processGetResponse(response));
处理 Memcached SET 响应
Memcached SET 响应由 processSetResponse
处理。
short status = response.status();
if (status == 0) {
// Succesfull get
return null;
} else {
// Memcached error
throw new MemcachedError(status);
}