Vert.x 的 RabbitMQ 客户端

一个 Vert.x 客户端,允许应用程序与 RabbitMQ 代理(AMQP 0.9.1)进行交互

此服务处于实验阶段,其 API 在稳定之前可能会发生变化。

开始使用

Maven

将以下依赖添加到您的 Maven 项目中

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-rabbitmq-client</artifactId>
  <version>5.0.1</version>
</dependency>

Gradle

将以下依赖添加到您的 Gradle 项目中

dependencies {
  compile 'io.vertx:vertx-rabbitmq-client:5.0.1'
}

创建客户端

您可以使用完整的 AMQP URI 如下创建客户端实例

RabbitMQOptions config = new RabbitMQOptions();
// full amqp uri
config.setUri("amqp://xvjvsrrc:[email protected]/xvjvsrrc");
RabbitMQClient client = RabbitMQClient.create(vertx, config);

// Connect
client
  .start()
  .onComplete(asyncResult -> {
  if (asyncResult.succeeded()) {
    System.out.println("RabbitMQ successfully connected!");
  } else {
    System.out.println("Fail to connect to RabbitMQ " + asyncResult.cause().getMessage());
  }
});

或者您也可以手动指定单独的参数

RabbitMQOptions config = new RabbitMQOptions();
// Each parameter is optional
// The default parameter with be used if the parameter is not set
config.setUser("user1");
config.setPassword("password1");
config.setHost("localhost");
config.setPort(5672);
config.setVirtualHost("vhost1");
config.setConnectionTimeout(6000); // in milliseconds
config.setRequestedHeartbeat(60); // in seconds
config.setHandshakeTimeout(6000); // in milliseconds
config.setRequestedChannelMax(5);
config.setNetworkRecoveryInterval(500); // in milliseconds
config.setAutomaticRecoveryEnabled(true);

RabbitMQClient client = RabbitMQClient.create(vertx, config);

// Connect
client
  .start()
  .onComplete(asyncResult -> {
  if (asyncResult.succeeded()) {
    System.out.println("RabbitMQ successfully connected!");
  } else {
    System.out.println("Fail to connect to RabbitMQ " + asyncResult.cause().getMessage());
  }
});

您可以设置多个地址来连接到集群;

RabbitMQOptions config = new RabbitMQOptions();
config.setUser("user1");
config.setPassword("password1");
config.setVirtualHost("vhost1");

config.setAddresses(Arrays.asList(Address.parseAddresses("firstHost,secondHost:5672")));

RabbitMQClient client = RabbitMQClient.create(vertx, config);

// Connect
client
  .start()
  .onComplete(asyncResult -> {
  if (asyncResult.succeeded()) {
    System.out.println("RabbitMQ successfully connected!");
  } else {
    System.out.println("Fail to connect to RabbitMQ " + asyncResult.cause().getMessage());
  }
});

恢复与重连

RabbitMQClient 中有两种独立且不兼容的重连机制

  • Java RabbitMQ 客户端库自动恢复;

  • RabbitMQClient 重启。

默认情况下,这两种机制都未启用。

Java RabbitMQ 客户端库提供的重连功能并非在所有情况下都有效(如果与服务器的连接正常断开,客户端将关闭且不会恢复)。

为了使用 Java RabbitMQ 客户端库的自动恢复功能,需要同时启用它并禁用 RabbitMQClient 库的重连尝试

RabbitMQOptions options = new RabbitMQOptions();
options.setAutomaticRecoveryEnabled(true);
options.setReconnectAttempts(0);

客户端库还将尝试拓扑恢复,具体细节请参阅其文档(此功能在库中默认启用,并且未在 RabbitMQClientOptions 中暴露)。

或者,可以将 RabbitMQClient 配置为在出现连接问题时重试连接到 RabbitMQ 服务器。连接失败可能是由瞬态网络故障(客户端可能会重新连接到同一个 RabbitMQ 服务器)或故障转移场景引起的。这种方法比客户端库所采用的方法更“粗暴”——RabbitMQClient 的重启工作原理是,当客户端库报告问题时关闭连接,然后反复尝试从头开始重新连接。

可以通过在配置中设置 setReconnectIntervalsetReconnectAttempts 属性来配置重连策略

RabbitMQOptions options = new RabbitMQOptions();
options.setAutomaticRecoveryEnabled(false);
options.setReconnectAttempts(Integer.MAX_VALUE);
options.setReconnectInterval(500);

RabbitMQClient 的重连不具备任何形式的*自动*拓扑恢复功能。这可能导致竞态条件,即消息在服务器拓扑准备就绪(即交换器和队列创建/绑定之前)之前就被发送。为了在连接被视为就绪之前提供创建这些对象的机会,RabbitMQClient 提供了 ConnectionEstablishedCallback。ConnectionEstablishedCallback 可用于在其他用户(包括 RabbitMQConsumer 和 RabbitMQPublisher)能够访问 RabbitMQClient 之前对其执行任何操作。

RabbitMQClient client = RabbitMQClient.create(vertx, config);
client.addConnectionEstablishedCallback(promise -> {
            client.exchangeDeclare("exchange", "fanout", true, false)
                .compose(v -> {
                  return client.queueDeclare("queue", false, true, true);
                })
                .compose(declareOk -> {
                  return client.queueBind(declareOk.getQueue(), "exchange", "");
                })
                .onComplete(promise);
});

// At this point the exchange, queue and binding will have been declared even if the client connects to a new server
client.basicConsumer("queue");

如果 RabbitMQConsumer 正在监听自动删除的服务器命名队列上的消息,并且代理重启,则当客户端重新连接时,该队列将被移除。在这种情况下,需要同时重新创建队列并在 RabbitMQConsumer 上设置新的队列名称。

RabbitMQClient client = RabbitMQClient.create(vertx, config);
AtomicReference<RabbitMQConsumer> consumerRef = new AtomicReference<>();
AtomicReference<String> queueName = new AtomicReference<>();
client.addConnectionEstablishedCallback(promise -> {
      client.exchangeDeclare("exchange", "fanout", true, false)
              .compose(v -> client.queueDeclare("", false, true, true))
              .compose(dok -> {
                  queueName.set(dok.getQueue());
                  // The first time this runs there will be no existing consumer
                  // on subsequent connections the consumer needs to be update with the new queue name
                  RabbitMQConsumer currentConsumer = consumerRef.get();
                  if (currentConsumer != null) {
                    currentConsumer.setQueueName(queueName.get());
                  }
                  return client.queueBind(queueName.get(), "exchange", "");
              })
              .onComplete(promise);
});

client.start()
        .onSuccess(v -> {
            // At this point the exchange, queue and binding will have been declared even if the client connects to a new server
            client
              .basicConsumer(queueName.get())
              .onSuccess(c -> consumerRef.set(c));
        })
        .onFailure(ex -> {
            System.out.println("It went wrong: " + ex.getMessage());
        });

在客户端启用 SSL/TLS

RabbitMQClient 可以轻松配置为使用 SSL。

RabbitMQOptions options = new RabbitMQOptions()
  .setSsl(true);
客户端信任配置

如果 trustAll 设置为 true,客户端将信任所有服务器证书。连接仍将加密,但容易受到“中间人”攻击。**严重警告**,请勿在生产环境中使用此选项!默认值为 false。

RabbitMQOptions options = new RabbitMQOptions()
  .setSsl(true)
  .setTrustAll(true));

如果 trustAll 设置为 false,将进行适当的服务器身份验证。有三个主要选项可用。

  • 您的默认信任存储已“信任”服务器,在这种情况下一切正常

  • 您可以使用 -Djavax.net.ssl.trustStore=xxx.jks 启动 Java 进程,指定自定义信任存储

  • 您通过 RabbitMQOptions 提供自定义信任存储

JKS 信任存储选项
RabbitMQOptions options = new RabbitMQOptions()
  .setSsl(true)
  .setTrustOptions(new JksOptions()
    .setPath("/path/myKeyStore.jks")
    .setPassword("myKeyStorePassword"));
p12/pfx 信任存储选项
RabbitMQOptions options = new RabbitMQOptions()
  .setSsl(true)
  .setPfxTrustOptions(
    new PfxOptions().
      setPath("/path/myKeyStore.p12").
      setPassword("myKeyStorePassword"));
PEM 信任选项
RabbitMQOptions options = new RabbitMQOptions()
  .setSsl(true)
  .setPemTrustOptions(
    new PemTrustOptions().
      addCertPath("/path/ca-cert.pem"));

声明带额外配置的交换器

您可以向 RabbitMQ 的 exchangeDeclare 方法传递额外的配置参数

JsonObject config = new JsonObject();

config.put("x-dead-letter-exchange", "my.deadletter.exchange");
config.put("alternate-exchange", "my.alternate.exchange");
// ...
client
  .exchangeDeclare("my.exchange", "fanout", true, false, config)
  .onComplete(onResult -> {
    if (onResult.succeeded()) {
      System.out.println("Exchange successfully declared with config");
    } else {
      onResult.cause().printStackTrace();
    }
  });

声明带额外配置的队列

您可以向 RabbitMQ 的 queueDeclare 方法传递额外的配置参数

JsonObject config = new JsonObject();
config.put("x-message-ttl", 10_000L);

client
  .queueDeclare("my-queue", true, false, true, config)
  .onComplete(queueResult -> {
    if (queueResult.succeeded()) {
      System.out.println("Queue declared!");
    } else {
      System.err.println("Queue failed to be declared!");
      queueResult.cause().printStackTrace();
    }
  });

操作

以下是 RabbitMQService API 支持的一些操作示例。有关所有 API 方法的详细信息,请查阅 javadoc/文档。

发布

向队列发布消息

Buffer message = Buffer.buffer("Hello RabbitMQ, from Vert.x !");
client
  .basicPublish("", "my.queue", message)
  .onComplete(pubResult -> {
    if (pubResult.succeeded()) {
      System.out.println("Message published !");
    } else {
      pubResult.cause().printStackTrace();
    }
  });

发布带确认

向队列发布消息并确认代理已收到。

Buffer message = Buffer.buffer("Hello RabbitMQ, from Vert.x !");

// Put the channel in confirm mode. This can be done once at init.
client
  .confirmSelect()
  .compose(v -> client.basicPublish("", "my.queue", message))
  .compose(v -> client.waitForConfirms()) // Check the message got confirmed by the broker.
  .onComplete(waitResult -> {
    if(waitResult.succeeded())
      System.out.println("Message published !");
    else
      waitResult.cause().printStackTrace();
  });

可靠消息发布

为了可靠地向 RabbitMQ 发布消息,必须处理服务器对每条消息的确认。最简单的确认方法是使用上述的 basicPublishWithConfirm 方法,它在消息发送时同步确认每条消息——阻塞发布通道直到收到确认。

为了实现更高的吞吐量,RabbitMQ 提供了异步确认。异步确认可以一次性确认多条消息,因此客户端需要按照消息发布顺序跟踪所有消息。此外,在消息被服务器确认之前,可能需要重新发送它们,因此客户端必须保留这些消息。

RabbitMQPublisher 类实现了一种处理异步确认的标准方法,避免了原本所需的许多样板代码。

RabbitMQPublisher 的工作原理是

  • 将所有已发送的消息添加到内部队列。

  • 在条件允许时从队列发送消息,并在单独的队列中跟踪这些待确认的消息。

  • 处理来自 RabbitMQ 的异步确认,一旦消息得到确认,就从 pendingAck 队列中移除消息。

  • 通知调用者每条已确认的消息(这始终是单条消息,而不是 RabbitMQ 使用的批量确认)。

RabbitMQPublisher publisher = RabbitMQPublisher.create(vertx, client, options);

messages.forEach((k,v) -> {
  com.rabbitmq.client.BasicProperties properties = new AMQP.BasicProperties.Builder()
          .messageId(k)
          .build();
  publisher.publish("exchange", "routingKey", properties, v.toBuffer());
});

publisher.getConfirmationStream().handler(conf -> {
  if (conf.isSucceeded()) {
    messages.remove(conf.getMessageId());
  }
});

投递标签

本节是一个实现细节,对于希望实现自己的 RabbitMQPublisher 替代方案的人很有用。

为了使 RabbitMQPublisher 工作,它必须知道 RabbitMQ 将为每条发布的消息使用的投递标签。来自 RabbitMQ 的确认可能在 basicPublish 调用完成之前到达客户端,因此如果使用异步确认,则无法通过 basicPublish 的任何返回值来识别投递标签。因此,RabbitMQClient 需要通过一个单独的回调将每条消息的投递标签告知 RabbitMQPublisher,该回调发生在 RabbitMQClient::basicPublish 调用中,但在消息实际通过网络发送之前。单条消息的投递标签也可能改变(投递标签是按通道的,因此如果消息在重新连接后被重新发送,它将有一个新的投递标签)——这意味着我们不能使用 Future 来通知客户端投递标签。如果 deliveryTagHandler 为给定消息被调用多次,则始终可以安全地忽略先前的值——任何时候一条消息只能有一个有效的投递标签。

要捕获投递标签,应使用 RabbitMqClient::basicPublishWithDeliveryTag 方法之一。

  void basicPublishWithDeliveryTag(String exchange, String routingKey, BasicProperties properties, Buffer body, Handler<Long> deliveryTagHandler, Handler<AsyncResult<Void>> resultHandler);
  Future<Void> basicPublishWithDeliveryTag(String exchange, String routingKey, BasicProperties properties, Buffer body, @Nullable Handler<Long> deliveryTagHandler);

消费

从队列消费消息。

// Create a stream of messages from a queue
client
  .basicConsumer("my.queue")
  .onComplete(rabbitMQConsumerAsyncResult -> {
    if (rabbitMQConsumerAsyncResult.succeeded()) {
      System.out.println("RabbitMQ consumer created !");
      RabbitMQConsumer mqConsumer = rabbitMQConsumerAsyncResult.result();
      mqConsumer.handler(message -> {
        System.out.println("Got message: " + message.body().toString());
      });
    } else {
      rabbitMQConsumerAsyncResult.cause().printStackTrace();
    }
  });

在任何时候,您都可以暂停或恢复流。当流暂停时,您将不会收到任何消息。

consumer.pause();
consumer.resume();

实际上,在创建消费流时有一系列选项可以指定。

QueueOptions 允许您指定

  • 内部队列的大小,使用 setMaxInternalQueueSize

  • 当队列大小超出时,流是否应保留最新消息,使用 setKeepMostRecent

QueueOptions options = new QueueOptions()
  .setMaxInternalQueueSize(1000);

client
  .basicConsumer("my.queue", options)
  .onComplete(rabbitMQConsumerAsyncResult -> {
    if (rabbitMQConsumerAsyncResult.succeeded()) {
      System.out.println("RabbitMQ consumer created !");
    } else {
      rabbitMQConsumerAsyncResult.cause().printStackTrace();
    }
  });

当您想停止从队列消费消息时,您可以执行

rabbitMQConsumer
  .cancel()
  .onComplete(cancelResult -> {
    if (cancelResult.succeeded()) {
      System.out.println("Consumption successfully stopped");
    } else {
      System.out.println("Tired in attempt to stop consumption");
      cancelResult.cause().printStackTrace();
    }
  });

当队列不再处理任何消息时,您可以通过结束处理程序收到通知

rabbitMQConsumer.endHandler(v -> {
  System.out.println("It is the end of the stream");
});

您可以设置异常处理程序,以便在处理消息时发生任何错误时收到通知

consumer.exceptionHandler(e -> {
  System.out.println("An exception occurred in the process of message handling");
  e.printStackTrace();
});

最后,您可能希望检索与消费者标签相关的内容

String consumerTag = consumer.consumerTag();
System.out.println("Consumer tag is: " + consumerTag);

获取

将从队列中获取一条消息

client
  .basicGet("my.queue", true)
  .onComplete(getResult -> {
    if (getResult.succeeded()) {
      RabbitMQMessage msg = getResult.result();
      System.out.println("Got message: " + msg.body());
    } else {
      getResult.cause().printStackTrace();
    }
  });

消费消息不自动确认

client
  .basicConsumer("my.queue", new QueueOptions().setAutoAck(false))
  .onComplete(consumeResult -> {
    if (consumeResult.succeeded()) {
      System.out.println("RabbitMQ consumer created !");
      RabbitMQConsumer consumer = consumeResult.result();

      // Set the handler which messages will be sent to
      consumer.handler(msg -> {
        JsonObject json = (JsonObject) msg.body();
        System.out.println("Got message: " + json.getString("body"));
        // ack
        client.basicAck(json.getLong("deliveryTag"), false);
      });
    } else {
      consumeResult.cause().printStackTrace();
    }
  });

运行测试

您需要安装 RabbitMQ 并在本地主机上以默认端口运行,此功能才能正常工作。