Vert.x-Stomp

STOMP 是简单(或流式)文本导向消息协议。STOMP 提供可互操作的线缆格式,以便 STOMP 客户端可以与任何 STOMP 消息代理通信,从而在多种语言、平台和代理之间提供简单而广泛的消息互操作性。有关 STOMP 的更多详细信息,请访问 https://stomp.github.io/index.html

Vertx-Stomp 是 STOMP 服务器和客户端的实现。您可以将 STOMP 服务器与其他客户端一起使用,也可以将 STOMP 客户端与其他服务器一起使用。服务器和客户端支持 STOMP 协议的 1.0、1.1 和 1.2 版本(参见 https://stomp.github.io/stomp-specification-1.2.html)。STOMP 服务器也可以作为与 vert.x 事件总线的桥接,或直接与 WebSockets(使用 StompJS)一起使用。

使用 vertx-stomp

要使用 Vert.x Stomp 服务器和客户端,请将以下依赖项添加到您的构建描述符的 dependencies 部分

  • Maven(在您的 pom.xml 中)

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-stomp</artifactId>
  <version>5.0.1</version>
</dependency>
  • Gradle(在您的 build.gradle 文件中)

compile 'io.vertx:vertx-stomp:5.0.1'

STOMP 服务器

创建 STOMP 服务器

创建 STOMP 服务器最简单的方式是使用所有默认选项,如下所示

Future<StompServer> server = StompServer.create(vertx)
    .handler(StompServerHandler.create(vertx))
    .listen();

这将创建一个在 localhost:61613 上监听的 STOMP 服务器,该服务器符合 STOMP 规范。

您可以在 listen 方法中配置端口和主机

Future<StompServer> server = StompServer.create(vertx)
    .handler(StompServerHandler.create(vertx))
    .listen(1234, "0.0.0.0");

如果将 -1 作为端口,TCP 服务器将不会启动。这在使用 WebSocket 桥接时很有用。要在服务器准备就绪时收到通知,请按如下方式使用处理程序

StompServer server = StompServer.create(vertx)
    .handler(StompServerHandler.create(vertx));
server
  .listen()
  .onComplete(ar -> {
      if (ar.failed()) {
        System.out.println("Failing to start the STOMP server : " + ar.cause().getMessage());
      } else {
        System.out.println("Ready to receive STOMP frames");
      }
    });

处理程序会收到 StompServer 的引用。

您还可以在 StompServerOptions 中配置主机和端口

Future<StompServer> server = StompServer.create(vertx, new StompServerOptions().setPort(1234).setHost("0.0.0.0"))
    .handler(StompServerHandler.create(vertx))
    .listen();

关闭 STOMP 服务器

STOMP 服务器按如下方式关闭

server
  .close()
  .onComplete(ar -> {
  if (ar.succeeded()) {
    System.out.println("The STOMP server has been closed");
  } else {
    System.out.println("The STOMP server failed to close : " + ar.cause().getMessage());
  }
});

配置

StompServerOptions 允许您配置 STOMP 服务器的某些方面。

首先,STOMP 服务器基于 NetServer,因此您可以从 StompServerOptions 配置底层 NetServer。或者,您也可以传递您想要使用的 NetServer

Future<StompServer> server = StompServer.create(vertx, netServer)
    .handler(StompServerHandler.create(vertx))
    .listen();

StompServerOptions 允许您配置

  • STOMP 服务器的主机和端口 - 默认为 0.0.0.0:61613

  • STOMP 服务器是否安全 - 默认为 false

  • STOMP 帧的最大主体大小 - 默认为 10 MB

  • STOMP 帧中接受的最大头数量 - 默认为 1000

  • STOMP 帧中头行的最大长度 - 默认为 10240

  • STOMP 心跳时间 - 默认为 1000, 1000

  • 支持的 STOMP 协议版本(默认为 1.0、1.1 和 1.2)

  • 事务中允许的最大帧数量(默认为 1000)

  • 事务块的大小 - 默认为 1000(参见 setTransactionChunkSize

  • 客户端可以处理的最大订阅数量 - 默认为 1000

STOMP 心跳使用 JSON 对象配置,如下所示

Future<StompServer> server = StompServer.create(vertx, new StompServerOptions().setHeartbeat(
    new JsonObject().put("x", 1000).put("y", 1000)))
    .handler(StompServerHandler.create(vertx))
    .listen();

启用安全需要额外的 AuthenticationProvider 来处理认证请求

Future<StompServer> server = StompServer.create(vertx, new StompServerOptions().setSecured(true))
    .handler(StompServerHandler.create(vertx).authProvider(provider))
    .listen();

有关 AuthenticationProvider 的更多信息,请参阅此处

如果帧超出任何大小限制,该帧将被拒绝,并且客户端将收到一个 ERROR 帧。按照规范要求,在发送错误后,客户端连接将立即关闭。其他阈值也会发生相同的行为。

订阅

默认的 STOMP 服务器将订阅目的地处理为不透明字符串。因此,它不促进结构化,也不分层。默认情况下,STOMP 服务器遵循 topic 语义(因此消息会被分派给所有订阅者)。

目的地类型

默认情况下,STOMP 服务器将 目的地 管理为主题。因此,消息会被分派给所有订阅者。您可以配置服务器使用队列,或者混合两种类型

Future<StompServer> server = StompServer.create(vertx)
    .handler(StompServerHandler.create(vertx)
        .destinationFactory((v, name) -> {
          if (name.startsWith("/queue")) {
            return Destination.queue(vertx, name);
          } else {
            return Destination.topic(vertx, name);
          }
        }))
    .listen();

在最后一个示例中,所有以 /queue 开头的目的地都是队列,而其他的是主题。目的地是在收到对该目的地的第一个订阅时创建的。

服务器可以通过返回 null 来决定拒绝目的地创建

Future<StompServer> server = StompServer.create(vertx)
    .handler(StompServerHandler.create(vertx)
        .destinationFactory((v, name) -> {
          if (name.startsWith("/forbidden")) {
            return null;
          } else if (name.startsWith("/queue")) {
            return Destination.queue(vertx, name);
          } else {
            return Destination.topic(vertx, name);
          }
        }))
    .listen();

在这种情况下,订阅者收到了一个 ERROR 帧。

队列使用轮询策略分派消息。

提供您自己的目的地类型

STOMP 服务器特意没有实现任何高级功能。如果您需要更高级的分派策略,可以通过提供一个返回您自己的 Destination 对象的 DestinationFactory 来实现您自己的目的地类型。

确认

默认情况下,当消息未被确认时,STOMP 服务器不做任何处理。您可以通过提供您自己的 Destination 实现来定制此行为。

自定义目的地应调用

onAckonNack 方法,以便 StompServerHandler 可以定制行为

Future<StompServer> server = StompServer.create(vertx)
    .handler(StompServerHandler.create(vertx)
        .onAckHandler(acknowledgement -> {
          // Action to execute when the frames (one in `client-individual` mode, several
          // in `client` mode are acknowledged.
        })
        .onNackHandler(acknowledgement -> {
          // Action to execute when the frames (1 in `client-individual` mode, several in
          // `client` mode are not acknowledged.
        }))
    .listen();

定制 STOMP 服务器

除了上面看到的处理程序之外,您几乎可以配置 STOMP 服务器的所有方面,例如接收到特定帧时执行的操作、发送给客户端的 ping(用于实现心跳)。以下是一些示例

Future<StompServer> server = StompServer.create(vertx)
    .handler(StompServerHandler.create(vertx)
            .closeHandler(connection -> {
              // client connection closed
            })
            .beginHandler(frame -> {
              // transaction starts
            })
            .commitHandler(frame -> {
                  // transaction committed
                }
            )
        //...
    ).listen();

请注意,更改默认行为可能会破坏与 STOMP 规范的兼容性。因此,请参考默认实现。

STOMP 客户端

STOMP 客户端连接到 STOMP 服务器并可以发送和接收帧。

创建 STOMP 客户端

您可以使用默认选项创建 StompClient 实例,如下所示

StompClient.create(vertx)
  .connect()
  .onSuccess(connection -> {
    // use the connection
  })
  .onFailure(err ->
    System.out.println(
      "Failed to connect to the STOMP server: " + err.toString()));

上一个代码片段创建了一个连接到 "0.0.0.0:61613" 的 STOMP 客户端。连接后,您将获得一个 StompClientConnection,它允许您与服务器交互。您可以按如下方式配置主机和端口

StompClient.create(vertx)
  .connect(61613, "0.0.0.0")
  .onSuccess(connection -> {
    // use the connection
  })
  .onFailure(err ->
    System.out.println(
      "Failed to connect to the STOMP server: " + err.toString()));

为了捕获由于认证问题或服务器在连接协商期间发送的任何错误帧导致的连接错误,您可以在 STOMP 客户端上注册一个 错误处理程序。使用该客户端创建的所有连接都将继承此错误处理程序(但它们可以有自己的)

StompClient.create(vertx)
  .errorFrameHandler(frame -> {
    // Received the ERROR frame
  })
  .connect(61613, "0.0.0.0")
  .onSuccess(connection -> {
    // use the connection
  })
  .onFailure(err ->
    System.out.println(
      "Failed to connect to the STOMP server: " + err.toString()));

您还可以在 StompClientOptions 中配置主机和端口

StompClient
  .create(vertx, new StompClientOptions().setHost("localhost").setPort(1234))
  .connect()
  .onSuccess(connection -> {
    // use the connection
  })
  .onFailure(err ->
    System.out.println(
      "Failed to connect to the STOMP server: " + err.toString()));

关闭 STOMP 客户端

您可以关闭 STOMP 客户端

StompClient client = StompClient
  .create(vertx, new StompClientOptions().setHost("localhost").setPort(1234));

client
  .connect()
  .onSuccess(connection -> {
    // use the connection
  })
  .onFailure(err ->
    System.out.println(
      "Failed to connect to the STOMP server: " + err.toString()));

client.close();

但是,这种方式不会通知服务器断开连接。要干净地关闭连接,您应该使用 disconnect 方法

StompClient
  .create(vertx, new StompClientOptions().setHost("localhost").setPort(1234))
  .connect()
  .onSuccess(connection -> {
    // use the connection
    connection.disconnect();
  })
  .onFailure(err ->
    System.out.println(
      "Failed to connect to the STOMP server: " + err.toString()));

如果心跳已启用,并且客户端在配置的超时后未检测到服务器活动,连接将自动关闭。

错误处理

StompClientConnection 上,您可以注册一个错误处理程序,接收服务器发送的 ERROR 帧。请注意,服务器在发送此类帧后会关闭与客户端的连接

StompClient
  .create(vertx, new StompClientOptions().setHost("localhost").setPort(1234))
  .connect()
  .onSuccess(connection -> {
    // use the connection
    connection
      .errorHandler(frame ->
        System.out.println("ERROR frame received : " + frame));
  })
  .onFailure(err ->
    System.out.println(
      "Failed to connect to the STOMP server: " + err.toString()));

当检测到连接断开时,客户端也可以收到通知。连接失败是使用 STOMP 心跳机制检测到的。当服务器在心跳时间窗口内未发送消息时,连接将关闭,并调用 connectionDroppedHandler(如果已设置)。要配置 connectionDroppedHandler,请调用 connectionDroppedHandler。例如,处理程序可以尝试重新连接到服务器

StompClient.create(vertx)
  .connect()
  .onSuccess(connection -> {

    connection.connectionDroppedHandler(con -> {
      // The connection has been lost
      // You can reconnect or switch to another server.
    });

    connection.send("/queue", Buffer.buffer("Hello"))
      .onSuccess(frame -> System.out.println("Message processed by the server")
      );
  })
  .onFailure(err ->
    System.out.println(
      "Failed to connect to the STOMP server: " + err.toString()));

配置

您可以通过在创建 StompClient 时传递 StompClientOptions 来配置各个方面。由于 STOMP 客户端依赖于 NetClient,您可以从 StompClientOptions 配置底层 Net 客户端。

StompClientOptions 允许您配置

  • STOMP 服务器的主机和端口

  • 连接到服务器的登录名和密码

  • 如果未明确设置,是否应将 content-length 头添加到帧中。(默认启用)

  • 是否应使用 STOMP 命令而不是 CONNECT 命令(默认禁用)

  • 是否应在 CONNECT 帧中忽略 host 头(默认禁用)

  • 心跳配置(默认为 1000, 1000)

订阅目的地

要订阅目的地,请使用

StompClient.create(vertx)
  .connect()
  .onSuccess(connection -> {
    // use the connection
    connection.subscribe("/queue", frame ->
      System.out.println("Just received a frame from /queue : " + frame));
  })
  .onFailure(err ->
    System.out.println(
      "Failed to connect to the STOMP server: " + err.toString()));

要取消订阅,请使用

StompClient.create(vertx)
  .connect()
  .onSuccess(connection -> {
    // use the connection
    connection.subscribe("/queue", frame ->
      System.out.println("Just received a frame from /queue : " + frame));

    // ....

    connection.unsubscribe("/queue");
  })
  .onFailure(err ->
    System.out.println(
      "Failed to connect to the STOMP server: " + err.toString()));

发送消息

要发送消息,请使用

StompClient.create(vertx)
  .connect()
  .onSuccess(connection -> {
    Map<String, String> headers = new HashMap<>();
    headers.put("header1", "value1");
    connection.send("/queue", headers, Buffer.buffer("Hello"));
    // No headers:
    connection.send("/queue", Buffer.buffer("World"));
  })
  .onFailure(err ->
    System.out.println(
      "Failed to connect to the STOMP server: " + err.toString()));

在 Java 和 Groovy 中,您可以使用 Headers 类来简化头创建。

确认

客户端可以发送 ACKNACK

StompClient.create(vertx)
  .connect()
  .onSuccess(connection -> {
    connection.subscribe("/queue", frame -> {
      connection.ack(frame.getAck());
      // OR
      connection.nack(frame.getAck());
    });
  })
  .onFailure(err ->
    System.out.println(
      "Failed to connect to the STOMP server: " + err.toString()));

事务

客户端还可以创建事务。在事务中发送的 ACKNACKSEND 帧只有在事务提交时才会交付。

StompClient.create(vertx)
  .connect()
  .onSuccess(connection -> {
    Map<String, String> headers = new HashMap<>();
    headers.put("transaction", "my-transaction");
    connection.beginTX("my-transaction");
    connection.send("/queue", headers, Buffer.buffer("Hello"));
    connection.send("/queue", headers, Buffer.buffer("World"));
    connection.send("/queue", headers, Buffer.buffer("!!!"));
    connection.commit("my-transaction");
    // OR
    connection.abort("my-transaction");
  })
  .onFailure(err ->
    System.out.println(
      "Failed to connect to the STOMP server: " + err.toString()));

回执

每个发送的命令都可以有一个 回执 处理程序,当服务器处理完消息后会收到通知

StompClient.create(vertx)
  .connect()
  .onSuccess(connection -> {
    connection
      .send("/queue", Buffer.buffer("Hello"))
      .onSuccess(frame ->
        System.out.println("Message processed by the server"));
  })
  .onFailure(err ->
    System.out.println(
      "Failed to connect to the STOMP server: " + err.toString()));

将 STOMP 服务器用作 vert.x 事件总线的桥接

STOMP 服务器可以用作 vert.x 事件总线的桥接。该桥接是双向的,意味着 STOMP 帧被转换为事件总线消息,事件总线消息被转换为 STOMP 帧。

要启用桥接,您需要配置入站和出站地址。入站地址是传输到事件总线的 STOMP 目的地。STOMP 目的地用作事件总线地址。出站地址是传输到 STOMP 的事件总线地址。

Future<StompServer> server = StompServer.create(vertx)
    .handler(StompServerHandler.create(vertx)
        .bridge(new BridgeOptions()
            .addInboundPermitted(new PermittedOptions().setAddress("/toBus"))
            .addOutboundPermitted(new PermittedOptions().setAddress("/toStomp"))
        )
    )
    .listen();

默认情况下,桥接使用发布/订阅交付(主题)。您可以将其配置为使用点对点交付,其中只调用一个 STOMP 客户端或事件总线消费者

Future<StompServer> server = StompServer.create(vertx)
    .handler(StompServerHandler.create(vertx)
            .bridge(new BridgeOptions()
                    .addInboundPermitted(new PermittedOptions().setAddress("/toBus"))
                    .addOutboundPermitted(new PermittedOptions().setAddress("/toStomp"))
                    .setPointToPoint(true)
            )
    )
    .listen();

允许的选项也可以表示为“regex”或带有 匹配匹配 是消息负载必须满足的结构。例如,在以下示例中,负载必须包含设置为“bar”的“foo”字段。结构匹配只支持 JSON 对象。

Future<StompServer> server = StompServer.create(vertx)
    .handler(StompServerHandler.create(vertx)
        .bridge(new BridgeOptions()
            .addInboundPermitted(new PermittedOptions().setAddress("/toBus")
                .setMatch(new JsonObject().put("foo", "bar")))
            .addOutboundPermitted(new PermittedOptions().setAddress("/toStomp"))
            .setPointToPoint(true)
        )
    )
    .listen();

将 STOMP 服务器与 WebSocket 一起使用

如果您想将 JavaScript 客户端(node.js 或浏览器)直接连接到 STOMP 服务器,您可以使用 WebSocket。STOMP 协议已在 StompJS 中进行调整,以便通过 WebSocket 工作。JavaScript 直接连接到 STOMP 服务器并在 WebSocket 上发送 STOMP 帧。它也直接在 WebSocket 上接收 STOMP 帧。

要配置服务器以使用 StompJS,您需要

  1. 启用 WebSocket 桥接并配置监听 WebSocket 的路径(默认为 /stomp)。

  2. 在您的应用程序中导入 StompJS(作为 HTML 页面上的脚本,或作为 npm 模块(https://npmjs.net.cn/package/stompjs))。

  3. 连接到服务器

要实现第一步,您需要一个 HTTP 服务器,并将 webSocketHandler 的结果传递给 webSocketHandler

StompServer server = StompServer.create(vertx, new StompServerOptions()
    .setPort(-1) // Disable the TCP port, optional
    .setWebsocketBridge(true) // Enable the web socket support
    .setWebsocketPath("/stomp")) // Configure the web socket path, /stomp by default
    .handler(StompServerHandler.create(vertx));

Future<HttpServer> http = vertx.createHttpServer(
    new HttpServerOptions().setWebSocketSubProtocols(Arrays.asList("v10.stomp", "v11.stomp"))
)
    .webSocketHandshakeHandler(server.webSocketHandshakeHandler())
    .webSocketHandler(server.webSocketHandler())
    .listen(8080);

不要忘记声明支持的子协议。否则,连接将被拒绝。

然后按照 StompJS 文档 中的说明连接到服务器。这是一个简单示例

var url = "ws://:8080/stomp";
var client = Stomp.client(url);
var callback = function(frame) {
   console.log(frame);
};

client.connect({}, function() {
 var subscription = client.subscribe("foo", callback);
});

注册接收和写入帧处理程序

STOMP 客户端、客户端连接和服务器处理程序支持注册一个接收到的 Frame 处理程序,该处理程序会在每次从线路接收到帧时收到通知。它允许您记录帧或实现自定义行为。该处理程序已经为 PING 帧和 非法/未知 帧调用。

Future<StompServer> server = StompServer.create(vertx)
    .handler(StompServerHandler.create(vertx).receivedFrameHandler(sf -> {
      System.out.println(sf.frame());
    }))
    .listen();

StompClient client = StompClient.create(vertx).receivedFrameHandler(frame -> System.out.println(frame));

处理程序在帧处理之前被调用,因此您也可以 修改 帧。

未使用有效 STOMP 命令的帧使用 UNKNOWN 命令。原始命令使用 Frame.STOMP_FRAME_COMMAND 键写入到头中。

您还可以注册一个处理程序,以便在帧即将发送(写入线路)时收到通知

Future<StompServer> server = StompServer.create(vertx)
    .handler(StompServerHandler.create(vertx))
    .writingFrameHandler(sf -> {
      System.out.println(sf.frame());
    })
    .listen();

StompClient client = StompClient.create(vertx).writingFrameHandler(frame -> {
  System.out.println(frame);
});