<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-amqp-client</artifactId>
<version>5.0.1</version>
</dependency>
Vert.x AMQP 客户端
Vert.x AMQP 客户端允许与 AMQP 1.0 代理和路由器进行交互。它支持:
-
连接到 AMQP 代理或路由器 - 支持 SASL 和 TLS 连接
-
从队列或主题消费消息
-
向队列或主题发送消息
-
检查已发送消息的确认
AMQP 1.0 协议支持持久订阅、持久化、安全性、会话、复杂的路由等。更多协议详情可在 AMQP 主页上找到。
Vert.x AMQP 客户端基于 Vert.x Proton。如果您需要细粒度控制,我们建议直接使用 Vert.x Proton。
使用 Vert.x AMQP 客户端
要使用 Vert.x AMQP 客户端,请将以下依赖项添加到您的构建描述符的 dependencies 部分:
-
Maven(在您的
pom.xml
中)
-
Gradle(在您的
build.gradle
文件中)
compile 'io.vertx:vertx-amqp-client:5.0.1'
创建 AMQP 客户端
将客户端添加到您的 CLASSPATH 后,您可以按如下方式实例化一个 AmqpClient
:
AmqpClientOptions options = new AmqpClientOptions()
.setHost("localhost")
.setPort(5672)
.setUsername("user")
.setPassword("secret");
// Create a client using its own internal Vert.x instance.
AmqpClient client1 = AmqpClient.create(options);
// USe an explicit Vert.x instance.
AmqpClient client2 = AmqpClient.create(vertx, options);
有两种方法可以实例化 AmqpClient
。您可以传入一个显式的 Vert.x 实例。如果您在 Vert.x 应用程序或 Vert.x verticle 中,请使用此方法。否则,您可以省略传入 Vert.x 实例,客户端关闭时将创建一个内部实例并将其关闭。
要实例化 AmqpClient
,您需要传入 AmqpClientOptions
。这些选项包含代理或路由器的位置、凭据等。AMQP 客户端的许多方面都可以使用这些选项进行配置。请注意,您也可以使用这些选项来配置底层的 Proton 客户端。
主机、端口、用户名和密码也可以从系统属性或环境变量中配置:
-
主机:系统属性:
amqp-client-host
,环境变量:AMQP_CLIENT_HOST
(必填) -
端口:系统属性:
amqp-client-port
,环境变量:AMQP_CLIENT_PORT
(默认为 5672) -
用户名:系统属性:
amqp-client-username
,环境变量:AMQP_CLIENT_USERNAME
-
密码:系统属性:
amqp-client-password
,环境变量:AMQP_CLIENT_PASSWORD
建立连接
创建客户端后,您需要显式连接到远程服务器。这可以通过 connect
方法完成:
client
.connect()
.onComplete(ar -> {
if (ar.failed()) {
System.out.println("Unable to connect to the broker");
} else {
System.out.println("Connection succeeded");
AmqpConnection connection = ar.result();
}
});
连接建立或失败后,将调用处理程序。请注意,连接用于创建接收器和发送器。
创建接收器
接收器用于接收消息。AMQP 接收器可以通过以下两种方法之一获取:
connection
.createReceiver("my-queue")
.onComplete(
done -> {
if (done.failed()) {
System.out.println("Unable to create receiver");
} else {
AmqpReceiver receiver = done.result();
receiver.handler(msg -> {
// called on every received messages
System.out.println("Received " + msg.bodyAsString());
});
}
}
);
connection
.createReceiver("my-queue")
.onComplete(
done -> {
if (done.failed()) {
System.out.println("Unable to create receiver");
} else {
AmqpReceiver receiver = done.result();
receiver
.exceptionHandler(t -> {
// Error thrown.
})
.handler(msg -> {
// Attach the message handler
});
}
}
);
这两种方法的主要区别在于消息处理程序 *何时* 附加到接收器。第一种方法是立即传递处理程序,并立即开始接收消息。第二种方法是在完成后手动附加处理程序。这为您提供了更多控制权,并允许您附加其他处理程序。
完成处理程序中传入的接收器可以作为流使用。因此,您可以暂停和恢复消息的接收。背压协议是使用 AMQP 信用点实现的。
接收到的消息是 AmqpMessage
的实例。实例是不可变的,并提供对 AMQP 支持的大多数元数据的访问。请参阅属性列表作为参考。请注意,从正文中检索 JSON 对象或 JSON 数组需要将值作为 AMQP Data 传递。
您也可以直接从客户端创建接收器:
client
.createReceiver("my-queue")
.onComplete(
done -> {
if (done.failed()) {
System.out.println("Unable to create receiver");
} else {
AmqpReceiver receiver = done.result();
receiver.handler(msg -> {
// called on every received messages
System.out.println("Received " + msg.bodyAsString());
});
}
}
);
在这种情况下,会自动建立连接。您可以使用 connection
方法检索它。
默认情况下,消息会自动确认。您可以使用 setAutoAcknowledgement
禁用此行为。然后,您需要使用以下方法显式确认传入消息:* accepted
* rejected
* released
创建发送器
发送器允许将消息发布到队列和主题。您可以按如下方式获取发送器:
connection
.createSender("my-queue")
.onComplete(done -> {
if (done.failed()) {
System.out.println("Unable to create a sender");
} else {
AmqpSender result = done.result();
}
});
获取 AMQP 发送器后,您可以创建消息。由于 AmqpMessage
是不可变的,因此创建消息使用 AmqpMessageBuilder
构建器类。以下代码片段提供了一些示例:
AmqpMessageBuilder builder = AmqpMessage.create();
// Very simple message
AmqpMessage m1 = builder.withBody("hello").build();
// Message overriding the destination
AmqpMessage m2 = builder.withBody("hello").address("another-queue").build();
// Message with a JSON object as body, metadata and TTL
AmqpMessage m3 = builder
.withJsonObjectAsBody(new JsonObject().put("message", "hello"))
.subject("subject")
.ttl(10000)
.applicationProperties(new JsonObject().put("prop1", "value1"))
.build();
有了发送器并创建消息后,您可以使用以下方法发送它:
-
send
- 发送消息 -
sendWithAck
- 发送消息并监控其确认
发送消息的最简单方法如下:
sender.send(AmqpMessage.create().withBody("hello").build());
发送消息时,您可以监控确认:
sender
.sendWithAck(AmqpMessage.create().withBody("hello").build())
.onComplete(acked -> {
if (acked.succeeded()) {
System.out.println("Message accepted");
} else {
System.out.println("Message not accepted");
}
});
请注意,如果交付设置为 ACCEPTED
,则消息被视为已确认。其他交付值被视为未确认(详细信息可在传入的原因中找到)。
AmqpSender
可以用作写入流。流控制是使用 AMQP 信用点实现的。
您也可以直接从客户端创建发送器:
client
.createSender("my-queue")
.onComplete(maybeSender -> {
//...
});
在这种情况下,会自动建立连接。您可以使用 connection
方法检索它。
实现请求-回复
要实现请求-回复行为,您可以使用动态接收器和匿名发送器。动态接收器不与用户提供的地址关联,而是由代理提供地址。匿名发送器也不与特定地址关联,要求所有消息都包含一个地址。
以下代码片段展示了如何实现请求-回复:
connection
.createAnonymousSender()
.onComplete(responseSender -> {
// You got an anonymous sender, used to send the reply
// Now register the main receiver:
connection
.createReceiver("my-queue")
.onComplete(done -> {
if (done.failed()) {
System.out.println("Unable to create receiver");
} else {
AmqpReceiver receiver = done.result();
receiver.handler(msg -> {
// You got the message, let's reply.
responseSender.result().send(AmqpMessage.create()
.address(msg.replyTo())
.correlationId(msg.id()) // send the message id as correlation id
.withBody("my response to your request")
.build()
);
});
}
});
});
// On the sender side (sending the initial request and expecting a reply)
connection
.createDynamicReceiver()
.onComplete(replyReceiver -> {
// We got a receiver, the address is provided by the broker
String replyToAddress = replyReceiver.result().address();
// Attach the handler receiving the reply
replyReceiver.result().handler(msg -> {
System.out.println("Got the reply! " + msg.bodyAsString());
});
// Create a sender and send the message:
connection
.createSender("my-queue")
.onComplete(sender -> {
sender.result().send(AmqpMessage.create()
.replyTo(replyToAddress)
.id("my-message-id")
.withBody("This is my request").build());
});
});
要回复消息,请将其发送到 reply-to
中指定的地址。此外,建议使用 message id
指示 correlation id
,以便回复接收器可以将响应与请求关联起来。
关闭客户端
完成连接、接收器或发送器后,您应该使用 close
方法关闭它们。关闭连接会关闭所有已创建的接收器和发送器。
当客户端不再使用时,您也必须将其关闭。这将关闭所有打开的连接,从而关闭接收器和发送器。