Vert.x AMQP 客户端

Vert.x AMQP 客户端允许与 AMQP 1.0 代理和路由器进行交互。它支持:

  • 连接到 AMQP 代理或路由器 - 支持 SASL 和 TLS 连接

  • 从队列或主题消费消息

  • 向队列或主题发送消息

  • 检查已发送消息的确认

AMQP 1.0 协议支持持久订阅、持久化、安全性、会话、复杂的路由等。更多协议详情可在 AMQP 主页上找到。

Vert.x AMQP 客户端基于 Vert.x Proton。如果您需要细粒度控制,我们建议直接使用 Vert.x Proton

使用 Vert.x AMQP 客户端

要使用 Vert.x AMQP 客户端,请将以下依赖项添加到您的构建描述符的 dependencies 部分:

  • Maven(在您的 pom.xml 中)

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-amqp-client</artifactId>
  <version>5.0.1</version>
</dependency>
  • Gradle(在您的 build.gradle 文件中)

compile 'io.vertx:vertx-amqp-client:5.0.1'

创建 AMQP 客户端

将客户端添加到您的 CLASSPATH 后,您可以按如下方式实例化一个 AmqpClient

AmqpClientOptions options = new AmqpClientOptions()
  .setHost("localhost")
  .setPort(5672)
  .setUsername("user")
  .setPassword("secret");
// Create a client using its own internal Vert.x instance.
AmqpClient client1 = AmqpClient.create(options);

// USe an explicit Vert.x instance.
AmqpClient client2 = AmqpClient.create(vertx, options);

有两种方法可以实例化 AmqpClient。您可以传入一个显式的 Vert.x 实例。如果您在 Vert.x 应用程序或 Vert.x verticle 中,请使用此方法。否则,您可以省略传入 Vert.x 实例,客户端关闭时将创建一个内部实例并将其关闭。

要实例化 AmqpClient,您需要传入 AmqpClientOptions。这些选项包含代理或路由器的位置、凭据等。AMQP 客户端的许多方面都可以使用这些选项进行配置。请注意,您也可以使用这些选项来配置底层的 Proton 客户端。

主机、端口、用户名和密码也可以从系统属性或环境变量中配置:

  • 主机:系统属性:amqp-client-host,环境变量:AMQP_CLIENT_HOST(必填)

  • 端口:系统属性:amqp-client-port,环境变量:AMQP_CLIENT_PORT(默认为 5672)

  • 用户名:系统属性:amqp-client-username,环境变量:AMQP_CLIENT_USERNAME

  • 密码:系统属性:amqp-client-password,环境变量:AMQP_CLIENT_PASSWORD

建立连接

创建客户端后,您需要显式连接到远程服务器。这可以通过 connect 方法完成:

client
  .connect()
  .onComplete(ar -> {
  if (ar.failed()) {
    System.out.println("Unable to connect to the broker");
  } else {
    System.out.println("Connection succeeded");
    AmqpConnection connection = ar.result();
  }
});

连接建立或失败后,将调用处理程序。请注意,连接用于创建接收器和发送器。

创建接收器

接收器用于接收消息。AMQP 接收器可以通过以下两种方法之一获取:

connection
  .createReceiver("my-queue")
  .onComplete(
  done -> {
    if (done.failed()) {
      System.out.println("Unable to create receiver");
    } else {
      AmqpReceiver receiver = done.result();
      receiver.handler(msg -> {
        // called on every received messages
        System.out.println("Received " + msg.bodyAsString());
      });
    }
  }
);

connection
  .createReceiver("my-queue")
  .onComplete(
  done -> {
    if (done.failed()) {
      System.out.println("Unable to create receiver");
    } else {
      AmqpReceiver receiver = done.result();
      receiver
        .exceptionHandler(t -> {
          // Error thrown.
        })
        .handler(msg -> {
          // Attach the message handler
        });
    }
  }
);

这两种方法的主要区别在于消息处理程序 *何时* 附加到接收器。第一种方法是立即传递处理程序,并立即开始接收消息。第二种方法是在完成后手动附加处理程序。这为您提供了更多控制权,并允许您附加其他处理程序。

完成处理程序中传入的接收器可以作为流使用。因此,您可以暂停和恢复消息的接收。背压协议是使用 AMQP 信用点实现的。

接收到的消息是 AmqpMessage 的实例。实例是不可变的,并提供对 AMQP 支持的大多数元数据的访问。请参阅属性列表作为参考。请注意,从正文中检索 JSON 对象或 JSON 数组需要将值作为 AMQP Data 传递。

您也可以直接从客户端创建接收器:

client
  .createReceiver("my-queue")
  .onComplete(
  done -> {
    if (done.failed()) {
      System.out.println("Unable to create receiver");
    } else {
      AmqpReceiver receiver = done.result();
      receiver.handler(msg -> {
        // called on every received messages
        System.out.println("Received " + msg.bodyAsString());
      });
    }
  }
);

在这种情况下,会自动建立连接。您可以使用 connection 方法检索它。

默认情况下,消息会自动确认。您可以使用 setAutoAcknowledgement 禁用此行为。然后,您需要使用以下方法显式确认传入消息:* accepted * rejected * released

创建发送器

发送器允许将消息发布到队列和主题。您可以按如下方式获取发送器:

connection
  .createSender("my-queue")
  .onComplete(done -> {
  if (done.failed()) {
    System.out.println("Unable to create a sender");
  } else {
    AmqpSender result = done.result();
  }
});

获取 AMQP 发送器后,您可以创建消息。由于 AmqpMessage 是不可变的,因此创建消息使用 AmqpMessageBuilder 构建器类。以下代码片段提供了一些示例:

AmqpMessageBuilder builder = AmqpMessage.create();

// Very simple message
AmqpMessage m1 = builder.withBody("hello").build();

// Message overriding the destination
AmqpMessage m2 = builder.withBody("hello").address("another-queue").build();

// Message with a JSON object as body, metadata and TTL
AmqpMessage m3 = builder
  .withJsonObjectAsBody(new JsonObject().put("message", "hello"))
  .subject("subject")
  .ttl(10000)
  .applicationProperties(new JsonObject().put("prop1", "value1"))
  .build();

有了发送器并创建消息后,您可以使用以下方法发送它:

发送消息的最简单方法如下:

sender.send(AmqpMessage.create().withBody("hello").build());

发送消息时,您可以监控确认:

sender
  .sendWithAck(AmqpMessage.create().withBody("hello").build())
  .onComplete(acked -> {
  if (acked.succeeded()) {
    System.out.println("Message accepted");
  } else {
    System.out.println("Message not accepted");
  }
});

请注意,如果交付设置为 ACCEPTED,则消息被视为已确认。其他交付值被视为未确认(详细信息可在传入的原因中找到)。

AmqpSender 可以用作写入流。流控制是使用 AMQP 信用点实现的。

您也可以直接从客户端创建发送器:

client
  .createSender("my-queue")
  .onComplete(maybeSender -> {
  //...
});

在这种情况下,会自动建立连接。您可以使用 connection 方法检索它。

实现请求-回复

要实现请求-回复行为,您可以使用动态接收器和匿名发送器。动态接收器不与用户提供的地址关联,而是由代理提供地址。匿名发送器也不与特定地址关联,要求所有消息都包含一个地址。

以下代码片段展示了如何实现请求-回复:

connection
  .createAnonymousSender()
  .onComplete(responseSender -> {
  // You got an anonymous sender, used to send the reply
  // Now register the main receiver:
  connection
    .createReceiver("my-queue")
    .onComplete(done -> {
    if (done.failed()) {
      System.out.println("Unable to create receiver");
    } else {
      AmqpReceiver receiver = done.result();
      receiver.handler(msg -> {
        // You got the message, let's reply.
        responseSender.result().send(AmqpMessage.create()
          .address(msg.replyTo())
          .correlationId(msg.id()) // send the message id as correlation id
          .withBody("my response to your request")
          .build()
        );
      });
    }
  });
});

// On the sender side (sending the initial request and expecting a reply)
connection
  .createDynamicReceiver()
  .onComplete(replyReceiver -> {
  // We got a receiver, the address is provided by the broker
  String replyToAddress = replyReceiver.result().address();

  // Attach the handler receiving the reply
  replyReceiver.result().handler(msg -> {
    System.out.println("Got the reply! " + msg.bodyAsString());
  });

  // Create a sender and send the message:
  connection
    .createSender("my-queue")
    .onComplete(sender -> {
    sender.result().send(AmqpMessage.create()
      .replyTo(replyToAddress)
      .id("my-message-id")
      .withBody("This is my request").build());
  });
});

要回复消息,请将其发送到 reply-to 中指定的地址。此外,建议使用 message id 指示 correlation id,以便回复接收器可以将响应与请求关联起来。

关闭客户端

完成连接、接收器或发送器后,您应该使用 close 方法关闭它们。关闭连接会关闭所有已创建的接收器和发送器。

当客户端不再使用时,您也必须将其关闭。这将关闭所有打开的连接,从而关闭接收器和发送器。