<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-rabbitmq-client</artifactId>
<version>5.0.1</version>
</dependency>
Vert.x 的 RabbitMQ 客户端
一个 Vert.x 客户端,允许应用程序与 RabbitMQ 代理(AMQP 0.9.1)进行交互
此服务处于实验阶段,其 API 在稳定之前可能会发生变化。
开始使用
Maven
将以下依赖添加到您的 Maven 项目中
创建客户端
您可以使用完整的 AMQP URI 如下创建客户端实例
RabbitMQOptions config = new RabbitMQOptions();
// full amqp uri
config.setUri("amqp://xvjvsrrc:[email protected]/xvjvsrrc");
RabbitMQClient client = RabbitMQClient.create(vertx, config);
// Connect
client
.start()
.onComplete(asyncResult -> {
if (asyncResult.succeeded()) {
System.out.println("RabbitMQ successfully connected!");
} else {
System.out.println("Fail to connect to RabbitMQ " + asyncResult.cause().getMessage());
}
});
或者您也可以手动指定单独的参数
RabbitMQOptions config = new RabbitMQOptions();
// Each parameter is optional
// The default parameter with be used if the parameter is not set
config.setUser("user1");
config.setPassword("password1");
config.setHost("localhost");
config.setPort(5672);
config.setVirtualHost("vhost1");
config.setConnectionTimeout(6000); // in milliseconds
config.setRequestedHeartbeat(60); // in seconds
config.setHandshakeTimeout(6000); // in milliseconds
config.setRequestedChannelMax(5);
config.setNetworkRecoveryInterval(500); // in milliseconds
config.setAutomaticRecoveryEnabled(true);
RabbitMQClient client = RabbitMQClient.create(vertx, config);
// Connect
client
.start()
.onComplete(asyncResult -> {
if (asyncResult.succeeded()) {
System.out.println("RabbitMQ successfully connected!");
} else {
System.out.println("Fail to connect to RabbitMQ " + asyncResult.cause().getMessage());
}
});
您可以设置多个地址来连接到集群;
RabbitMQOptions config = new RabbitMQOptions();
config.setUser("user1");
config.setPassword("password1");
config.setVirtualHost("vhost1");
config.setAddresses(Arrays.asList(Address.parseAddresses("firstHost,secondHost:5672")));
RabbitMQClient client = RabbitMQClient.create(vertx, config);
// Connect
client
.start()
.onComplete(asyncResult -> {
if (asyncResult.succeeded()) {
System.out.println("RabbitMQ successfully connected!");
} else {
System.out.println("Fail to connect to RabbitMQ " + asyncResult.cause().getMessage());
}
});
恢复与重连
RabbitMQClient 中有两种独立且不兼容的重连机制
-
Java RabbitMQ 客户端库自动恢复;
-
RabbitMQClient 重启。
默认情况下,这两种机制都未启用。
Java RabbitMQ 客户端库提供的重连功能并非在所有情况下都有效(如果与服务器的连接正常断开,客户端将关闭且不会恢复)。
为了使用 Java RabbitMQ 客户端库的自动恢复功能,需要同时启用它并禁用 RabbitMQClient 库的重连尝试
RabbitMQOptions options = new RabbitMQOptions();
options.setAutomaticRecoveryEnabled(true);
options.setReconnectAttempts(0);
客户端库还将尝试拓扑恢复,具体细节请参阅其文档(此功能在库中默认启用,并且未在 RabbitMQClientOptions 中暴露)。
或者,可以将 RabbitMQClient 配置为在出现连接问题时重试连接到 RabbitMQ 服务器。连接失败可能是由瞬态网络故障(客户端可能会重新连接到同一个 RabbitMQ 服务器)或故障转移场景引起的。这种方法比客户端库所采用的方法更“粗暴”——RabbitMQClient 的重启工作原理是,当客户端库报告问题时关闭连接,然后反复尝试从头开始重新连接。
可以通过在配置中设置 setReconnectInterval
和 setReconnectAttempts
属性来配置重连策略
RabbitMQOptions options = new RabbitMQOptions();
options.setAutomaticRecoveryEnabled(false);
options.setReconnectAttempts(Integer.MAX_VALUE);
options.setReconnectInterval(500);
RabbitMQClient 的重连不具备任何形式的*自动*拓扑恢复功能。这可能导致竞态条件,即消息在服务器拓扑准备就绪(即交换器和队列创建/绑定之前)之前就被发送。为了在连接被视为就绪之前提供创建这些对象的机会,RabbitMQClient 提供了 ConnectionEstablishedCallback。ConnectionEstablishedCallback 可用于在其他用户(包括 RabbitMQConsumer 和 RabbitMQPublisher)能够访问 RabbitMQClient 之前对其执行任何操作。
RabbitMQClient client = RabbitMQClient.create(vertx, config);
client.addConnectionEstablishedCallback(promise -> {
client.exchangeDeclare("exchange", "fanout", true, false)
.compose(v -> {
return client.queueDeclare("queue", false, true, true);
})
.compose(declareOk -> {
return client.queueBind(declareOk.getQueue(), "exchange", "");
})
.onComplete(promise);
});
// At this point the exchange, queue and binding will have been declared even if the client connects to a new server
client.basicConsumer("queue");
如果 RabbitMQConsumer 正在监听自动删除的服务器命名队列上的消息,并且代理重启,则当客户端重新连接时,该队列将被移除。在这种情况下,需要同时重新创建队列并在 RabbitMQConsumer 上设置新的队列名称。
RabbitMQClient client = RabbitMQClient.create(vertx, config);
AtomicReference<RabbitMQConsumer> consumerRef = new AtomicReference<>();
AtomicReference<String> queueName = new AtomicReference<>();
client.addConnectionEstablishedCallback(promise -> {
client.exchangeDeclare("exchange", "fanout", true, false)
.compose(v -> client.queueDeclare("", false, true, true))
.compose(dok -> {
queueName.set(dok.getQueue());
// The first time this runs there will be no existing consumer
// on subsequent connections the consumer needs to be update with the new queue name
RabbitMQConsumer currentConsumer = consumerRef.get();
if (currentConsumer != null) {
currentConsumer.setQueueName(queueName.get());
}
return client.queueBind(queueName.get(), "exchange", "");
})
.onComplete(promise);
});
client.start()
.onSuccess(v -> {
// At this point the exchange, queue and binding will have been declared even if the client connects to a new server
client
.basicConsumer(queueName.get())
.onSuccess(c -> consumerRef.set(c));
})
.onFailure(ex -> {
System.out.println("It went wrong: " + ex.getMessage());
});
在客户端启用 SSL/TLS
RabbitMQClient 可以轻松配置为使用 SSL。
RabbitMQOptions options = new RabbitMQOptions()
.setSsl(true);
客户端信任配置
如果 trustAll
设置为 true,客户端将信任所有服务器证书。连接仍将加密,但容易受到“中间人”攻击。**严重警告**,请勿在生产环境中使用此选项!默认值为 false。
RabbitMQOptions options = new RabbitMQOptions()
.setSsl(true)
.setTrustAll(true));
如果 trustAll
设置为 false,将进行适当的服务器身份验证。有三个主要选项可用。
-
您的默认信任存储已“信任”服务器,在这种情况下一切正常
-
您可以使用 -Djavax.net.ssl.trustStore=xxx.jks 启动 Java 进程,指定自定义信任存储
-
您通过 RabbitMQOptions 提供自定义信任存储
JKS 信任存储选项
RabbitMQOptions options = new RabbitMQOptions()
.setSsl(true)
.setTrustOptions(new JksOptions()
.setPath("/path/myKeyStore.jks")
.setPassword("myKeyStorePassword"));
p12/pfx 信任存储选项
RabbitMQOptions options = new RabbitMQOptions()
.setSsl(true)
.setPfxTrustOptions(
new PfxOptions().
setPath("/path/myKeyStore.p12").
setPassword("myKeyStorePassword"));
PEM 信任选项
RabbitMQOptions options = new RabbitMQOptions()
.setSsl(true)
.setPemTrustOptions(
new PemTrustOptions().
addCertPath("/path/ca-cert.pem"));
声明带额外配置的交换器
您可以向 RabbitMQ 的 exchangeDeclare 方法传递额外的配置参数
JsonObject config = new JsonObject();
config.put("x-dead-letter-exchange", "my.deadletter.exchange");
config.put("alternate-exchange", "my.alternate.exchange");
// ...
client
.exchangeDeclare("my.exchange", "fanout", true, false, config)
.onComplete(onResult -> {
if (onResult.succeeded()) {
System.out.println("Exchange successfully declared with config");
} else {
onResult.cause().printStackTrace();
}
});
声明带额外配置的队列
您可以向 RabbitMQ 的 queueDeclare 方法传递额外的配置参数
JsonObject config = new JsonObject();
config.put("x-message-ttl", 10_000L);
client
.queueDeclare("my-queue", true, false, true, config)
.onComplete(queueResult -> {
if (queueResult.succeeded()) {
System.out.println("Queue declared!");
} else {
System.err.println("Queue failed to be declared!");
queueResult.cause().printStackTrace();
}
});
操作
以下是 RabbitMQService API 支持的一些操作示例。有关所有 API 方法的详细信息,请查阅 javadoc/文档。
发布
向队列发布消息
Buffer message = Buffer.buffer("Hello RabbitMQ, from Vert.x !");
client
.basicPublish("", "my.queue", message)
.onComplete(pubResult -> {
if (pubResult.succeeded()) {
System.out.println("Message published !");
} else {
pubResult.cause().printStackTrace();
}
});
发布带确认
向队列发布消息并确认代理已收到。
Buffer message = Buffer.buffer("Hello RabbitMQ, from Vert.x !");
// Put the channel in confirm mode. This can be done once at init.
client
.confirmSelect()
.compose(v -> client.basicPublish("", "my.queue", message))
.compose(v -> client.waitForConfirms()) // Check the message got confirmed by the broker.
.onComplete(waitResult -> {
if(waitResult.succeeded())
System.out.println("Message published !");
else
waitResult.cause().printStackTrace();
});
可靠消息发布
为了可靠地向 RabbitMQ 发布消息,必须处理服务器对每条消息的确认。最简单的确认方法是使用上述的 basicPublishWithConfirm 方法,它在消息发送时同步确认每条消息——阻塞发布通道直到收到确认。
为了实现更高的吞吐量,RabbitMQ 提供了异步确认。异步确认可以一次性确认多条消息,因此客户端需要按照消息发布顺序跟踪所有消息。此外,在消息被服务器确认之前,可能需要重新发送它们,因此客户端必须保留这些消息。
RabbitMQPublisher 类实现了一种处理异步确认的标准方法,避免了原本所需的许多样板代码。
RabbitMQPublisher 的工作原理是
-
将所有已发送的消息添加到内部队列。
-
在条件允许时从队列发送消息,并在单独的队列中跟踪这些待确认的消息。
-
处理来自 RabbitMQ 的异步确认,一旦消息得到确认,就从 pendingAck 队列中移除消息。
-
通知调用者每条已确认的消息(这始终是单条消息,而不是 RabbitMQ 使用的批量确认)。
RabbitMQPublisher publisher = RabbitMQPublisher.create(vertx, client, options);
messages.forEach((k,v) -> {
com.rabbitmq.client.BasicProperties properties = new AMQP.BasicProperties.Builder()
.messageId(k)
.build();
publisher.publish("exchange", "routingKey", properties, v.toBuffer());
});
publisher.getConfirmationStream().handler(conf -> {
if (conf.isSucceeded()) {
messages.remove(conf.getMessageId());
}
});
投递标签
本节是一个实现细节,对于希望实现自己的 RabbitMQPublisher 替代方案的人很有用。
为了使 RabbitMQPublisher 工作,它必须知道 RabbitMQ 将为每条发布的消息使用的投递标签。来自 RabbitMQ 的确认可能在 basicPublish 调用完成之前到达客户端,因此如果使用异步确认,则无法通过 basicPublish 的任何返回值来识别投递标签。因此,RabbitMQClient 需要通过一个单独的回调将每条消息的投递标签告知 RabbitMQPublisher,该回调发生在 RabbitMQClient::basicPublish 调用中,但在消息实际通过网络发送之前。单条消息的投递标签也可能改变(投递标签是按通道的,因此如果消息在重新连接后被重新发送,它将有一个新的投递标签)——这意味着我们不能使用 Future 来通知客户端投递标签。如果 deliveryTagHandler 为给定消息被调用多次,则始终可以安全地忽略先前的值——任何时候一条消息只能有一个有效的投递标签。
要捕获投递标签,应使用 RabbitMqClient::basicPublishWithDeliveryTag 方法之一。
void basicPublishWithDeliveryTag(String exchange, String routingKey, BasicProperties properties, Buffer body, Handler<Long> deliveryTagHandler, Handler<AsyncResult<Void>> resultHandler);
Future<Void> basicPublishWithDeliveryTag(String exchange, String routingKey, BasicProperties properties, Buffer body, @Nullable Handler<Long> deliveryTagHandler);
消费
从队列消费消息。
// Create a stream of messages from a queue
client
.basicConsumer("my.queue")
.onComplete(rabbitMQConsumerAsyncResult -> {
if (rabbitMQConsumerAsyncResult.succeeded()) {
System.out.println("RabbitMQ consumer created !");
RabbitMQConsumer mqConsumer = rabbitMQConsumerAsyncResult.result();
mqConsumer.handler(message -> {
System.out.println("Got message: " + message.body().toString());
});
} else {
rabbitMQConsumerAsyncResult.cause().printStackTrace();
}
});
在任何时候,您都可以暂停或恢复流。当流暂停时,您将不会收到任何消息。
consumer.pause();
consumer.resume();
实际上,在创建消费流时有一系列选项可以指定。
QueueOptions
允许您指定
-
内部队列的大小,使用
setMaxInternalQueueSize
-
当队列大小超出时,流是否应保留最新消息,使用
setKeepMostRecent
QueueOptions options = new QueueOptions()
.setMaxInternalQueueSize(1000);
client
.basicConsumer("my.queue", options)
.onComplete(rabbitMQConsumerAsyncResult -> {
if (rabbitMQConsumerAsyncResult.succeeded()) {
System.out.println("RabbitMQ consumer created !");
} else {
rabbitMQConsumerAsyncResult.cause().printStackTrace();
}
});
当您想停止从队列消费消息时,您可以执行
rabbitMQConsumer
.cancel()
.onComplete(cancelResult -> {
if (cancelResult.succeeded()) {
System.out.println("Consumption successfully stopped");
} else {
System.out.println("Tired in attempt to stop consumption");
cancelResult.cause().printStackTrace();
}
});
当队列不再处理任何消息时,您可以通过结束处理程序收到通知
rabbitMQConsumer.endHandler(v -> {
System.out.println("It is the end of the stream");
});
您可以设置异常处理程序,以便在处理消息时发生任何错误时收到通知
consumer.exceptionHandler(e -> {
System.out.println("An exception occurred in the process of message handling");
e.printStackTrace();
});
最后,您可能希望检索与消费者标签相关的内容
String consumerTag = consumer.consumerTag();
System.out.println("Consumer tag is: " + consumerTag);
获取
将从队列中获取一条消息
client
.basicGet("my.queue", true)
.onComplete(getResult -> {
if (getResult.succeeded()) {
RabbitMQMessage msg = getResult.result();
System.out.println("Got message: " + msg.body());
} else {
getResult.cause().printStackTrace();
}
});
消费消息不自动确认
client
.basicConsumer("my.queue", new QueueOptions().setAutoAck(false))
.onComplete(consumeResult -> {
if (consumeResult.succeeded()) {
System.out.println("RabbitMQ consumer created !");
RabbitMQConsumer consumer = consumeResult.result();
// Set the handler which messages will be sent to
consumer.handler(msg -> {
JsonObject json = (JsonObject) msg.body();
System.out.println("Got message: " + json.getString("body"));
// ack
client.basicAck(json.getLong("deliveryTag"), false);
});
} else {
consumeResult.cause().printStackTrace();
}
});
运行测试
您需要安装 RabbitMQ 并在本地主机上以默认端口运行,此功能才能正常工作。