Vert.x MQTT

使用 Vert.x MQTT

此组件已在 Vert.x 栈中正式发布,只需在构建描述符的 dependencies 部分添加以下依赖即可

  • Maven(在您的 pom.xml 中)

<dependency>
    <groupId>io.vertx</groupId>
    <artifactId>vertx-mqtt</artifactId>
    <version>5.0.1</version>
</dependency>
  • Gradle(在您的 build.gradle 文件中)

compile "io.vertx:vertx-mqtt:5.0.1"

Vert.x MQTT 服务器

此组件提供了一个服务器,能够处理与远程 MQTT 客户端的连接、通信和消息交换。其 API 提供了与客户端接收到的原始协议消息相关的许多事件,并公开了一些功能以便向客户端发送消息。

在撰写本文时,它支持所有 MQTT 5.0 版本功能,但 AUTH 消息尚未实现。

它不是一个功能齐全的 MQTT 代理,但可以用于构建类似的功能或进行协议转换。

此模块具有技术预览状态,这意味着 API 可能会在版本之间发生变化。

处理客户端连接/断开连接

此示例展示了如何处理来自远程 MQTT 客户端的连接请求。首先,创建一个 MqttServer 实例,并使用 endpointHandler 方法指定当远程客户端发送 CONNECT 消息连接到服务器时调用的处理器。作为参数提供给处理器的 MqttEndpoint 实例,包含了与 CONNECT 消息相关的所有主要信息,例如客户端标识符、用户名/密码、“遗嘱”信息、清除会话标志、协议版本、“保持连接”超时和 CONNECT 消息属性(适用于 MQTT 5.0 版本)。在该处理器内部,endpoint 实例提供了 accept 方法,用于向远程客户端回复相应的 CONNACK 消息:通过这种方式,连接得以建立。最后,使用 listen 方法以默认行为(在 localhost 和默认 MQTT 端口 1883 上)启动服务器。同一方法允许指定一个处理器,以检查服务器是否正常启动。

MqttServer mqttServer = MqttServer.create(vertx);
mqttServer.endpointHandler(endpoint -> {

  // shows main connect info
  System.out.println("MQTT client [" + endpoint.clientIdentifier() + "] request to connect, clean session = " + endpoint.isCleanSession());

  if (endpoint.auth() != null) {
    System.out.println("[username = " + endpoint.auth().getUsername() + ", password = " + endpoint.auth().getPassword() + "]");
  }
  System.out.println("[properties = " + endpoint.connectProperties() + "]");
  if (endpoint.will() != null) {
    System.out.println("[will topic = " + endpoint.will().getWillTopic() + " msg = " + new String(endpoint.will().getWillMessageBytes()) +
      " QoS = " + endpoint.will().getWillQos() + " isRetain = " + endpoint.will().isWillRetain() + "]");
  }

  System.out.println("[keep alive timeout = " + endpoint.keepAliveTimeSeconds() + "]");

  // accept connection from the remote client
  endpoint.accept(false);

})
  .listen()
  .onComplete(ar -> {

    if (ar.succeeded()) {

      System.out.println("MQTT server is listening on port " + ar.result().actualPort());
    } else {

      System.out.println("Error on starting the server");
      ar.cause().printStackTrace();
    }
  });

同样的 endpoint 实例提供了 disconnectMessageHandler,用于指定当远程客户端发送 DISCONNECT 消息以断开与服务器的连接时调用的处理器;此处理器将 MqttDisconnectMessage 作为参数。

endpoint.disconnectMessageHandler(disconnectMessage -> {

  System.out.println("Received disconnect from client, reason code = " + disconnectMessage.code());
});

如果使用 MQTT 5.0 或更新版本,服务器可以使用 disconnect 向客户端发送包含原因码和属性的 DISCONNECT 消息。

通过 SSL/TLS 支持处理客户端连接/断开连接

服务器支持通过 SSL/TLS 协议接受连接请求以进行身份验证和加密。为此,MqttServerOptions 类提供了 setSsl 方法来设置 SSL/TLS 的使用(传递“true”作为值),以及其他一些有用的方法来提供服务器证书和相关的私钥(以 Java 密钥库引用、PEM 或 PFX 格式)。在以下示例中,setKeyCertOptions 方法用于以 PEM 格式传递证书。此方法需要 KeyCertOptions 接口的可能实现之一的实例,在此示例中,PemKeyCertOptions 类用于提供服务器证书和私钥的路径,并使用相应的 setCertPathsetKeyPath 方法。MQTT 服务器的启动方式是:将 Vert.x 实例照常传递给创建方法,并将上述 MQTT 选项实例也传递给创建方法。

MqttServerOptions options = new MqttServerOptions()
  .setPort(8883)
  .setKeyCertOptions(new PemKeyCertOptions()
    .setKeyPath("./src/test/resources/tls/server-key.pem")
    .setCertPath("./src/test/resources/tls/server-cert.pem"))
  .setSsl(true);

MqttServer mqttServer = MqttServer.create(vertx, options);
mqttServer.endpointHandler(endpoint -> {

  // shows main connect info
  System.out.println("MQTT client [" + endpoint.clientIdentifier() + "] request to connect, clean session = " + endpoint.isCleanSession());

  if (endpoint.auth() != null) {
    System.out.println("[username = " + endpoint.auth().getUsername() + ", password = " + endpoint.auth().getPassword() + "]");
  }
  if (endpoint.will() != null) {
    System.out.println("[will topic = " + endpoint.will().getWillTopic() + " msg = " + new String(endpoint.will().getWillMessageBytes()) +
      " QoS = " + endpoint.will().getWillQos() + " isRetain = " + endpoint.will().isWillRetain() + "]");
  }

  System.out.println("[keep alive timeout = " + endpoint.keepAliveTimeSeconds() + "]");

  // accept connection from the remote client
  endpoint.accept(false);

})
  .listen()
  .onComplete(ar -> {

    if (ar.succeeded()) {

      System.out.println("MQTT server is listening on port " + ar.result().actualPort());
    } else {

      System.out.println("Error on starting the server");
      ar.cause().printStackTrace();
    }
  });

通过 WebSocket 处理客户端连接

如果您想支持通过 WebSockets 的连接,也可以通过 MqttServerOptions 启用此功能。通过将 true 传递给 setUseWebSocket,它将在路径 /mqtt 上监听 WebSocket 连接。

与其他设置配置一样,最终的端点连接和相关的断开连接的管理方式与常规连接相同。

DeploymentOptions options = new DeploymentOptions().setInstances(10);
vertx.deployVerticle("com.mycompany.MyVerticle", options);

处理客户端订阅/取消订阅请求

在客户端和服务器建立连接后,客户端可以使用 SUBSCRIBE 消息发送主题订阅请求。MqttEndpoint 接口允许使用 subscribeHandler 方法为传入的订阅请求指定处理器。该处理器接收 MqttSubscribeMessage 接口的一个实例,该实例包含主题列表以及客户端期望的相关订阅选项。订阅选项包括 QoS 级别和相关标志,对于 MQTT 5.0 版本还包括额外的标志,例如 noLocalretainAsPublished。最后,端点实例提供了 subscribeAcknowledge 方法,用于向客户端回复相关的 SUBACK 消息,其中包含原因码(可以是 QoS 级别或错误码 - 每个主题或模式独立)和消息属性。

endpoint.subscribeHandler(subscribe -> {

  List<MqttSubAckReasonCode> reasonCodes = new ArrayList<>();
  for (MqttTopicSubscription s: subscribe.topicSubscriptions()) {
    System.out.println("Subscription for " + s.topicName() + " with QoS " + s.qualityOfService());
    reasonCodes.add(MqttSubAckReasonCode.qosGranted(s.qualityOfService()));
  }
  // ack the subscriptions request
  endpoint.subscribeAcknowledge(subscribe.messageId(), reasonCodes, MqttProperties.NO_PROPERTIES);

});

同样,可以在端点上使用 unsubscribeHandler 方法来指定当客户端发送 UNSUBSCRIBE 消息时调用的处理器。此处理器接收一个 MqttUnsubscribeMessage 接口的实例作为参数,其中包含要取消订阅的主题列表。最后,端点实例提供了 unsubscribeAcknowledgeunsubscribeAcknowledge 方法,用于向客户端回复相关的 UNSUBACK 消息——要么简单地确认所有取消订阅,要么为每个主题指定原因,并指定 UNSUBSCRIBE 请求中的属性(在 MQTT v 5.0 或更高版本中支持)。

endpoint.unsubscribeHandler(unsubscribe -> {

  for (String t: unsubscribe.topics()) {
    System.out.println("Unsubscription for " + t);
  }
  // ack the subscriptions request
  endpoint.unsubscribeAcknowledge(unsubscribe.messageId());
});

处理客户端发布的消息

为了处理远程客户端发布的传入消息,MqttEndpoint 接口提供了 publishHandler 方法,用于指定当客户端发送 PUBLISH 消息时调用的处理器。此处理器接收一个 MqttPublishMessage 接口的实例作为参数,其中包含有效载荷、QoS 级别、重复和保留标志以及消息属性。

如果 QoS 级别为 0 (AT_MOST_ONCE),端点无需回复客户端。

如果 QoS 级别为 1 (AT_LEAST_ONCE),端点需要使用可用的 publishAcknowledgepublishAcknowledge 方法回复 PUBACK 消息。

如果 QoS 级别为 2 (EXACTLY_ONCE),端点需要使用可用的 publishReceivedpublishReceived 方法回复 PUBREC 消息;在这种情况下,同一端点也应该处理从客户端收到的 PUBREL 消息(远程客户端在收到端点的 PUBREC 后发送此消息),并且可以通过 publishReleaseHandlerpublishReleaseMessageHandler 方法指定处理器,具体取决于服务器是否需要访问 MQTT 5.0 扩展功能(原因码、消息属性)。为了关闭 QoS 级别 2 的传递,端点可以使用 publishCompletepublishComplete 方法向客户端发送 PUBCOMP 消息。

endpoint.publishHandler(message -> {

  System.out.println("Just received message [" + message.payload().toString(Charset.defaultCharset()) + "] with QoS [" + message.qosLevel() + "]");

  if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
    endpoint.publishAcknowledge(message.messageId());
  } else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) {
    endpoint.publishReceived(message.messageId());
  }

}).publishReleaseHandler(messageId -> {

  endpoint.publishComplete(messageId);
});

向客户端发布消息

端点可以使用 publish 方法向远程客户端发布消息(发送 PUBLISH 消息),该方法接受以下输入参数:要发布的主题、有效载荷、QoS 级别、重复和保留标志。如果您正在使用 MQTT 5.0 或更新版本,并且希望指定消息属性,则可以使用 publish 方法,该方法除了上述方法外,还接受消息 ID 和消息属性。

如果 QoS 级别为 0 (AT_MOST_ONCE),端点将不会收到来自客户端的任何反馈。

如果 QoS 级别为 1 (AT_LEAST_ONCE),端点需要处理从客户端收到的 PUBACK 消息,以接收最终的交付确认。可以通过指定 publishAcknowledgeHandlerpublishAcknowledgeMessageHandler 方法来完成。

如果 QoS 级别为 2 (EXACTLY_ONCE),端点需要处理从客户端收到的 PUBREC 消息。publishReceivedHandlerpublishReceivedMessageHandler 方法允许为此指定处理器。在该处理器内部,端点可以使用 publishReleasepublishRelease 方法通过 PUBREL 消息回复客户端。最后一步是处理从客户端收到的 PUBCOMP 消息,作为已发布消息的最终确认;这可以通过 publishCompletionHandlerpublishCompletionMessageHandler 指定,以便在收到最终 PUBCOMP 消息时调用处理器。

endpoint.publish("my_topic",
  Buffer.buffer("Hello from the Vert.x MQTT server"),
  MqttQoS.EXACTLY_ONCE,
  false,
  false);

// specifing handlers for handling QoS 1 and 2
endpoint.publishAcknowledgeHandler(messageId -> {

  System.out.println("Received ack for message = " +  messageId);

}).publishReceivedHandler(messageId -> {

  endpoint.publishRelease(messageId);

}).publishCompletionHandler(messageId -> {

  System.out.println("Received ack for message = " +  messageId);
});

接收客户端心跳通知

底层的 MQTT 保活机制由服务器内部处理。当收到 CONNECT 消息时,服务器会根据消息中指定的保活超时时间,检查客户端是否在此超时时间内没有发送消息。同时,对于收到的每个 PINGREQ,服务器都会回复相关的 PINGRESP。

即使高级应用程序无需处理,MqttEndpoint 接口仍提供了 pingHandler 方法,用于指定当从客户端收到 PINGREQ 消息时调用的处理器。这只是向应用程序发出通知,表明客户端没有发送有意义的消息,而只是发送 ping 以保持连接活跃;无论如何,如上所述,PINGRESP 会由服务器内部自动发送。

endpoint.pingHandler(v -> {

  System.out.println("Ping received from client");
});

关闭服务器

MqttServer 接口提供了 close 方法,可用于关闭服务器;它将停止监听传入连接并关闭所有与远程客户端的活动连接。此方法是异步的,并且其中一个重载提供了指定完成处理器的可能性,该处理器将在服务器真正关闭时被调用。

mqttServer.close().onComplete(v -> {

  System.out.println("MQTT server closed");
});

处理客户端认证包/向远程客户端发送 AUTH 包(仅限 MQTT 5 版本)

在客户端和服务器建立连接后,客户端可以使用 AUTH 消息向服务器发送认证包。MqttEndpoint 接口允许使用 authenticationExchangeHandler 方法为传入的认证包指定处理器。该处理器接收 MqttAuthenticationExchangeMessage 接口的一个实例,该实例包含原因码、认证方法和数据。服务器可以继续使用 authenticationExchange 发送 AUTH 包以进行认证或直接通过。

endpoint.authenticationExchange(MqttAuthenticationExchangeMessage.create(MqttAuthenticateReasonCode.SUCCESS, MqttProperties.NO_PROPERTIES));
// handling auth from client
endpoint.authenticationExchangeHandler(auth -> {
  System.out.println("AUTH packet received from client. code: " + auth.reasonCode());
});

verticle 中的自动清理

如果您从 verticle 内部创建 MQTT 服务器,则这些服务器将在 verticle 卸载时自动关闭。

扩展:共享 MQTT 服务器

与 MQTT 服务器相关的处理器始终在同一个事件循环线程中执行。这意味着在具有多个核心的系统上,即使只部署了一个实例,也只使用了一个核心。为了使用更多核心,可以部署 MQTT 服务器的更多实例。

这可以通过编程方式实现

for (int i = 0; i < 10; i++) {

  MqttServer mqttServer = MqttServer.create(vertx);
  mqttServer.endpointHandler(endpoint -> {
    // handling endpoint
  })
    .listen()
    .onComplete(ar -> {

      // handling start listening
    });

}

或使用 verticle 指定实例数量

DeploymentOptions options = new DeploymentOptions().setInstances(10);
vertx.deployVerticle("com.mycompany.MyVerticle", options);

真正发生的是,即使只部署了一个 MQTT 服务器,但随着传入连接的到来,Vert.x 会以轮询方式将它们分发到在不同核心上执行的任何连接处理器。

Vert.x MQTT 客户端

此组件提供了一个符合 3.1.1 规范的 MQTT 客户端。其 API 提供了许多方法,用于连接/断开与代理的连接、发布消息(具有所有三种不同 QoS 级别)以及订阅主题。

此模块具有技术预览状态,这意味着 API 可能会在版本之间发生变化。

连接/断开连接

客户端提供了连接和断开服务器的便利。此外,您可以通过构造函数传递 MqttClientOptions 实例作为参数,来指定您想要连接的服务器的主机和端口等信息。

此示例展示了如何使用 Vert.x MQTT 客户端并调用 connectdisconnect 方法来连接和断开服务器。

MqttClient client = MqttClient.create(vertx);

client.connect(1883, "mqtt.eclipse.org").onComplete(s -> {
  client.disconnect();
});
通过 MqttClientOptions 提供的服务器默认地址是 localhost:1883,如果您使用 SSL/TSL,则是 localhost:8883。

订阅主题

现在,让我们深入了解这个例子

client.publishHandler(s -> {
  System.out.println("There are new message in topic: " + s.topicName());
  System.out.println("Content(as string) of the message: " + s.payload().toString());
  System.out.println("QoS: " + s.qosLevel());
})
  .subscribe("rpi2/temp", 2);

这里我们有 subscribe 方法的使用示例。为了从 rpi2/temp 主题接收消息,我们调用 subscribe 方法。此外,为了处理从服务器接收到的消息,您需要提供一个处理器,每当您订阅的主题中有新消息时,该处理器都会被调用。如本示例所示,处理器可以通过 publishHandler 方法提供。

向主题发布消息

如果您想向主题发布一些消息,则应调用 publish。让我们看看这个例子

client.publish("temperature",
  Buffer.buffer("hello"),
  MqttQoS.AT_LEAST_ONCE,
  false,
  false);

在此示例中,我们向名为“temperature”的主题发送消息。

处理服务器认证请求/向服务器发送 AUTH 包(仅限 MQTT 5 版本)

在客户端和服务器之间建立连接后,客户端可以使用 authenticationExchange 向服务器发送认证请求以进行认证。服务器可能会返回一个 AUTH 包。MqttClient 接口允许使用 authenticationExchangeHandler 方法为传入的认证包指定处理器。该处理器接收 MqttAuthenticationExchangeMessage 接口的一个实例,该实例包含原因码、认证方法和数据。

client.authenticationExchange(MqttAuthenticationExchangeMessage.create(MqttAuthenticateReasonCode.SUCCESS, MqttProperties.NO_PROPERTIES));
client.authenticationExchangeHandler(auth -> {
  //The handler will be called time to time by default
  System.out.println("We have just received AUTH packet: " + auth.reasonCode());
});

保持与服务器的连接

为了保持与服务器的连接,您应该不时向服务器发送一些内容,否则服务器将关闭连接。保持连接的正确方法是 ping 方法。

默认情况下,您的客户端会自动保持与服务器的连接。这意味着您无需调用 ping 来保持与服务器的连接。MqttClient 将为您完成此操作。

如果要禁用此功能,应将 setAutoKeepAlive 参数设置为 false

options.setAutoKeepAlive(false);

在以下情况下收到通知

  • 发布完成

    您可以调用 publishCompletionHandler 提供处理器。每次发布完成后,该处理器都将被调用。这非常有用,因为您可以看到刚刚收到的 PUBACK 或 PUBCOMP 数据包的 packetId。

    client.publishCompletionHandler(id -> {
      System.out.println("Id of just received PUBACK or PUBCOMP packet is " + id);
    });
      // The line of code below will trigger publishCompletionHandler (QoS 2)
    client.publish("hello", Buffer.buffer("hello"), MqttQoS.EXACTLY_ONCE, false, false);
      // The line of code below will trigger publishCompletionHandler (QoS is 1)
    client.publish("hello", Buffer.buffer("hello"), MqttQoS.AT_LEAST_ONCE, false, false);
      // The line of code below does not trigger because QoS value is 0
    client.publish("hello", Buffer.buffer("hello"), MqttQoS.AT_LEAST_ONCE, false, false);
    如果发送的发布数据包的 QoS=0,则处理器将**不会被调用**。
  • 订阅完成

    client.subscribeCompletionHandler(mqttSubAckMessage -> {
      System.out.println("Id of just received SUBACK packet is " + mqttSubAckMessage.messageId());
      for (int s : mqttSubAckMessage.grantedQoSLevels()) {
        if (s == 0x80) {
          System.out.println("Failure");
        } else {
          System.out.println("Success. Maximum QoS is " + s);
        }
      }
    });
    client.subscribe("temp", 1);
    client.subscribe("temp2", 2);
  • 取消订阅完成

    client
      .unsubscribeCompletionHandler(id -> {
        System.out.println("Id of just received UNSUBACK packet is " + id);
      });
    client.subscribe("temp", 1);
    client.unsubscribe("temp");
  • 已发送取消订阅

    client.subscribe("temp", 1);
    client.unsubscribe("temp").onSuccess(id -> {
        System.out.println("Id of just sent UNSUBSCRIBE packet is " + id);
      });
  • 收到 PINGRESP

    client.pingResponseHandler(s -> {
      //The handler will be called time to time by default
      System.out.println("We have just received PINGRESP packet");
    });

使用 TLS 连接

您可以通过配置客户端 TCP 选项来使用 TLS 连接 MQTT 服务器,请确保设置:

  • SSL 标志

  • 服务器证书或信任所有标志

  • 主机名验证算法为 "HTTPS"(如果您想验证服务器身份)或 ""(否则)

MqttClientOptions options = new MqttClientOptions();
options
  .setSsl(true)
  .setTrustOptions(new PemTrustOptions().addCertPath("/path/to/server.crt"))
  // Algo can be the empty string "" or "HTTPS" to verify the server hostname
  .setHostnameVerificationAlgorithm(algo);
MqttClient client = MqttClient.create(vertx);
client.connect(1883, "mqtt.eclipse.org").onComplete(s -> {
  client.disconnect();
});
有关 TLS 客户端配置的更多详细信息,请参见 此处

使用代理协议

MqttServer mqttServer = MqttServer
  .create(vertx, new MqttServerOptions()
    // set true to use proxy protocol
    .setUseProxyProtocol(true));
mqttServer.endpointHandler(endpoint -> {
  // remote address is origin real address, not proxy's address
  System.out.println(endpoint.remoteAddress());
  endpoint.accept(false);

})
  .listen()
  .onComplete(ar -> {

    if (ar.succeeded()) {

      System.out.println("MQTT server is listening on port " + ar.result().actualPort());
    } else {

      System.out.println("Error on starting the server");
      ar.cause().printStackTrace();
    }
  });

如果您的服务器位于 haproxy 或 nginx 之后,并且您想获取客户端的原始 IP 和端口,则需要将 setUseProxyProtocol 设置为 true

要启用此功能,您需要添加依赖 netty-codec-haproxy,但它默认不引入,因此您需要手动添加它
  • Maven(在您的 pom.xml 中)

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-codec-haproxy</artifactId>
    <version>5.0.1</version>
</dependency>
  • Gradle(在您的 build.gradle 文件中)

compile "io.netty:netty-codec-haproxy:5.0.1"