<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-mqtt</artifactId>
<version>5.0.1</version>
</dependency>
Vert.x MQTT
使用 Vert.x MQTT
此组件已在 Vert.x 栈中正式发布,只需在构建描述符的 dependencies 部分添加以下依赖即可
-
Maven(在您的
pom.xml
中)
-
Gradle(在您的
build.gradle
文件中)
compile "io.vertx:vertx-mqtt:5.0.1"
Vert.x MQTT 服务器
此组件提供了一个服务器,能够处理与远程 MQTT 客户端的连接、通信和消息交换。其 API 提供了与客户端接收到的原始协议消息相关的许多事件,并公开了一些功能以便向客户端发送消息。
在撰写本文时,它支持所有 MQTT 5.0 版本功能,但 AUTH 消息尚未实现。
它不是一个功能齐全的 MQTT 代理,但可以用于构建类似的功能或进行协议转换。
此模块具有技术预览状态,这意味着 API 可能会在版本之间发生变化。 |
处理客户端连接/断开连接
此示例展示了如何处理来自远程 MQTT 客户端的连接请求。首先,创建一个 MqttServer
实例,并使用 endpointHandler
方法指定当远程客户端发送 CONNECT 消息连接到服务器时调用的处理器。作为参数提供给处理器的 MqttEndpoint
实例,包含了与 CONNECT 消息相关的所有主要信息,例如客户端标识符、用户名/密码、“遗嘱”信息、清除会话标志、协议版本、“保持连接”超时和 CONNECT 消息属性(适用于 MQTT 5.0 版本)。在该处理器内部,endpoint 实例提供了 accept
方法,用于向远程客户端回复相应的 CONNACK 消息:通过这种方式,连接得以建立。最后,使用 listen
方法以默认行为(在 localhost 和默认 MQTT 端口 1883 上)启动服务器。同一方法允许指定一个处理器,以检查服务器是否正常启动。
MqttServer mqttServer = MqttServer.create(vertx);
mqttServer.endpointHandler(endpoint -> {
// shows main connect info
System.out.println("MQTT client [" + endpoint.clientIdentifier() + "] request to connect, clean session = " + endpoint.isCleanSession());
if (endpoint.auth() != null) {
System.out.println("[username = " + endpoint.auth().getUsername() + ", password = " + endpoint.auth().getPassword() + "]");
}
System.out.println("[properties = " + endpoint.connectProperties() + "]");
if (endpoint.will() != null) {
System.out.println("[will topic = " + endpoint.will().getWillTopic() + " msg = " + new String(endpoint.will().getWillMessageBytes()) +
" QoS = " + endpoint.will().getWillQos() + " isRetain = " + endpoint.will().isWillRetain() + "]");
}
System.out.println("[keep alive timeout = " + endpoint.keepAliveTimeSeconds() + "]");
// accept connection from the remote client
endpoint.accept(false);
})
.listen()
.onComplete(ar -> {
if (ar.succeeded()) {
System.out.println("MQTT server is listening on port " + ar.result().actualPort());
} else {
System.out.println("Error on starting the server");
ar.cause().printStackTrace();
}
});
同样的 endpoint 实例提供了 disconnectMessageHandler
,用于指定当远程客户端发送 DISCONNECT 消息以断开与服务器的连接时调用的处理器;此处理器将 MqttDisconnectMessage
作为参数。
endpoint.disconnectMessageHandler(disconnectMessage -> {
System.out.println("Received disconnect from client, reason code = " + disconnectMessage.code());
});
如果使用 MQTT 5.0 或更新版本,服务器可以使用 disconnect
向客户端发送包含原因码和属性的 DISCONNECT 消息。
通过 SSL/TLS 支持处理客户端连接/断开连接
服务器支持通过 SSL/TLS 协议接受连接请求以进行身份验证和加密。为此,MqttServerOptions
类提供了 setSsl
方法来设置 SSL/TLS 的使用(传递“true”作为值),以及其他一些有用的方法来提供服务器证书和相关的私钥(以 Java 密钥库引用、PEM 或 PFX 格式)。在以下示例中,setKeyCertOptions
方法用于以 PEM 格式传递证书。此方法需要 KeyCertOptions
接口的可能实现之一的实例,在此示例中,PemKeyCertOptions
类用于提供服务器证书和私钥的路径,并使用相应的 setCertPath
和 setKeyPath
方法。MQTT 服务器的启动方式是:将 Vert.x 实例照常传递给创建方法,并将上述 MQTT 选项实例也传递给创建方法。
MqttServerOptions options = new MqttServerOptions()
.setPort(8883)
.setKeyCertOptions(new PemKeyCertOptions()
.setKeyPath("./src/test/resources/tls/server-key.pem")
.setCertPath("./src/test/resources/tls/server-cert.pem"))
.setSsl(true);
MqttServer mqttServer = MqttServer.create(vertx, options);
mqttServer.endpointHandler(endpoint -> {
// shows main connect info
System.out.println("MQTT client [" + endpoint.clientIdentifier() + "] request to connect, clean session = " + endpoint.isCleanSession());
if (endpoint.auth() != null) {
System.out.println("[username = " + endpoint.auth().getUsername() + ", password = " + endpoint.auth().getPassword() + "]");
}
if (endpoint.will() != null) {
System.out.println("[will topic = " + endpoint.will().getWillTopic() + " msg = " + new String(endpoint.will().getWillMessageBytes()) +
" QoS = " + endpoint.will().getWillQos() + " isRetain = " + endpoint.will().isWillRetain() + "]");
}
System.out.println("[keep alive timeout = " + endpoint.keepAliveTimeSeconds() + "]");
// accept connection from the remote client
endpoint.accept(false);
})
.listen()
.onComplete(ar -> {
if (ar.succeeded()) {
System.out.println("MQTT server is listening on port " + ar.result().actualPort());
} else {
System.out.println("Error on starting the server");
ar.cause().printStackTrace();
}
});
通过 WebSocket 处理客户端连接
如果您想支持通过 WebSockets 的连接,也可以通过 MqttServerOptions
启用此功能。通过将 true
传递给 setUseWebSocket
,它将在路径 /mqtt
上监听 WebSocket 连接。
与其他设置配置一样,最终的端点连接和相关的断开连接的管理方式与常规连接相同。
DeploymentOptions options = new DeploymentOptions().setInstances(10);
vertx.deployVerticle("com.mycompany.MyVerticle", options);
处理客户端订阅/取消订阅请求
在客户端和服务器建立连接后,客户端可以使用 SUBSCRIBE 消息发送主题订阅请求。MqttEndpoint
接口允许使用 subscribeHandler
方法为传入的订阅请求指定处理器。该处理器接收 MqttSubscribeMessage
接口的一个实例,该实例包含主题列表以及客户端期望的相关订阅选项。订阅选项包括 QoS 级别和相关标志,对于 MQTT 5.0 版本还包括额外的标志,例如 noLocal
和 retainAsPublished
。最后,端点实例提供了 subscribeAcknowledge
方法,用于向客户端回复相关的 SUBACK 消息,其中包含原因码(可以是 QoS 级别或错误码 - 每个主题或模式独立)和消息属性。
endpoint.subscribeHandler(subscribe -> {
List<MqttSubAckReasonCode> reasonCodes = new ArrayList<>();
for (MqttTopicSubscription s: subscribe.topicSubscriptions()) {
System.out.println("Subscription for " + s.topicName() + " with QoS " + s.qualityOfService());
reasonCodes.add(MqttSubAckReasonCode.qosGranted(s.qualityOfService()));
}
// ack the subscriptions request
endpoint.subscribeAcknowledge(subscribe.messageId(), reasonCodes, MqttProperties.NO_PROPERTIES);
});
同样,可以在端点上使用 unsubscribeHandler
方法来指定当客户端发送 UNSUBSCRIBE 消息时调用的处理器。此处理器接收一个 MqttUnsubscribeMessage
接口的实例作为参数,其中包含要取消订阅的主题列表。最后,端点实例提供了 unsubscribeAcknowledge
和 unsubscribeAcknowledge
方法,用于向客户端回复相关的 UNSUBACK 消息——要么简单地确认所有取消订阅,要么为每个主题指定原因,并指定 UNSUBSCRIBE 请求中的属性(在 MQTT v 5.0 或更高版本中支持)。
endpoint.unsubscribeHandler(unsubscribe -> {
for (String t: unsubscribe.topics()) {
System.out.println("Unsubscription for " + t);
}
// ack the subscriptions request
endpoint.unsubscribeAcknowledge(unsubscribe.messageId());
});
处理客户端发布的消息
为了处理远程客户端发布的传入消息,MqttEndpoint
接口提供了 publishHandler
方法,用于指定当客户端发送 PUBLISH 消息时调用的处理器。此处理器接收一个 MqttPublishMessage
接口的实例作为参数,其中包含有效载荷、QoS 级别、重复和保留标志以及消息属性。
如果 QoS 级别为 0 (AT_MOST_ONCE),端点无需回复客户端。
如果 QoS 级别为 1 (AT_LEAST_ONCE),端点需要使用可用的 publishAcknowledge
或 publishAcknowledge
方法回复 PUBACK 消息。
如果 QoS 级别为 2 (EXACTLY_ONCE),端点需要使用可用的 publishReceived
或 publishReceived
方法回复 PUBREC 消息;在这种情况下,同一端点也应该处理从客户端收到的 PUBREL 消息(远程客户端在收到端点的 PUBREC 后发送此消息),并且可以通过 publishReleaseHandler
或 publishReleaseMessageHandler
方法指定处理器,具体取决于服务器是否需要访问 MQTT 5.0 扩展功能(原因码、消息属性)。为了关闭 QoS 级别 2 的传递,端点可以使用 publishComplete
或 publishComplete
方法向客户端发送 PUBCOMP 消息。
endpoint.publishHandler(message -> {
System.out.println("Just received message [" + message.payload().toString(Charset.defaultCharset()) + "] with QoS [" + message.qosLevel() + "]");
if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
endpoint.publishAcknowledge(message.messageId());
} else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) {
endpoint.publishReceived(message.messageId());
}
}).publishReleaseHandler(messageId -> {
endpoint.publishComplete(messageId);
});
向客户端发布消息
端点可以使用 publish
方法向远程客户端发布消息(发送 PUBLISH 消息),该方法接受以下输入参数:要发布的主题、有效载荷、QoS 级别、重复和保留标志。如果您正在使用 MQTT 5.0 或更新版本,并且希望指定消息属性,则可以使用 publish
方法,该方法除了上述方法外,还接受消息 ID 和消息属性。
如果 QoS 级别为 0 (AT_MOST_ONCE),端点将不会收到来自客户端的任何反馈。
如果 QoS 级别为 1 (AT_LEAST_ONCE),端点需要处理从客户端收到的 PUBACK 消息,以接收最终的交付确认。可以通过指定 publishAcknowledgeHandler
或 publishAcknowledgeMessageHandler
方法来完成。
如果 QoS 级别为 2 (EXACTLY_ONCE),端点需要处理从客户端收到的 PUBREC 消息。publishReceivedHandler
和 publishReceivedMessageHandler
方法允许为此指定处理器。在该处理器内部,端点可以使用 publishRelease
或 publishRelease
方法通过 PUBREL 消息回复客户端。最后一步是处理从客户端收到的 PUBCOMP 消息,作为已发布消息的最终确认;这可以通过 publishCompletionHandler
或 publishCompletionMessageHandler
指定,以便在收到最终 PUBCOMP 消息时调用处理器。
endpoint.publish("my_topic",
Buffer.buffer("Hello from the Vert.x MQTT server"),
MqttQoS.EXACTLY_ONCE,
false,
false);
// specifing handlers for handling QoS 1 and 2
endpoint.publishAcknowledgeHandler(messageId -> {
System.out.println("Received ack for message = " + messageId);
}).publishReceivedHandler(messageId -> {
endpoint.publishRelease(messageId);
}).publishCompletionHandler(messageId -> {
System.out.println("Received ack for message = " + messageId);
});
接收客户端心跳通知
底层的 MQTT 保活机制由服务器内部处理。当收到 CONNECT 消息时,服务器会根据消息中指定的保活超时时间,检查客户端是否在此超时时间内没有发送消息。同时,对于收到的每个 PINGREQ,服务器都会回复相关的 PINGRESP。
即使高级应用程序无需处理,MqttEndpoint
接口仍提供了 pingHandler
方法,用于指定当从客户端收到 PINGREQ 消息时调用的处理器。这只是向应用程序发出通知,表明客户端没有发送有意义的消息,而只是发送 ping 以保持连接活跃;无论如何,如上所述,PINGRESP 会由服务器内部自动发送。
endpoint.pingHandler(v -> {
System.out.println("Ping received from client");
});
关闭服务器
MqttServer
接口提供了 close
方法,可用于关闭服务器;它将停止监听传入连接并关闭所有与远程客户端的活动连接。此方法是异步的,并且其中一个重载提供了指定完成处理器的可能性,该处理器将在服务器真正关闭时被调用。
mqttServer.close().onComplete(v -> {
System.out.println("MQTT server closed");
});
处理客户端认证包/向远程客户端发送 AUTH 包(仅限 MQTT 5 版本)
在客户端和服务器建立连接后,客户端可以使用 AUTH 消息向服务器发送认证包。MqttEndpoint
接口允许使用 authenticationExchangeHandler
方法为传入的认证包指定处理器。该处理器接收 MqttAuthenticationExchangeMessage
接口的一个实例,该实例包含原因码、认证方法和数据。服务器可以继续使用 authenticationExchange
发送 AUTH 包以进行认证或直接通过。
endpoint.authenticationExchange(MqttAuthenticationExchangeMessage.create(MqttAuthenticateReasonCode.SUCCESS, MqttProperties.NO_PROPERTIES));
// handling auth from client
endpoint.authenticationExchangeHandler(auth -> {
System.out.println("AUTH packet received from client. code: " + auth.reasonCode());
});
verticle 中的自动清理
如果您从 verticle 内部创建 MQTT 服务器,则这些服务器将在 verticle 卸载时自动关闭。
扩展:共享 MQTT 服务器
与 MQTT 服务器相关的处理器始终在同一个事件循环线程中执行。这意味着在具有多个核心的系统上,即使只部署了一个实例,也只使用了一个核心。为了使用更多核心,可以部署 MQTT 服务器的更多实例。
这可以通过编程方式实现
for (int i = 0; i < 10; i++) {
MqttServer mqttServer = MqttServer.create(vertx);
mqttServer.endpointHandler(endpoint -> {
// handling endpoint
})
.listen()
.onComplete(ar -> {
// handling start listening
});
}
或使用 verticle 指定实例数量
DeploymentOptions options = new DeploymentOptions().setInstances(10);
vertx.deployVerticle("com.mycompany.MyVerticle", options);
真正发生的是,即使只部署了一个 MQTT 服务器,但随着传入连接的到来,Vert.x 会以轮询方式将它们分发到在不同核心上执行的任何连接处理器。
Vert.x MQTT 客户端
此组件提供了一个符合 3.1.1 规范的 MQTT 客户端。其 API 提供了许多方法,用于连接/断开与代理的连接、发布消息(具有所有三种不同 QoS 级别)以及订阅主题。
此模块具有技术预览状态,这意味着 API 可能会在版本之间发生变化。 |
连接/断开连接
客户端提供了连接和断开服务器的便利。此外,您可以通过构造函数传递 MqttClientOptions
实例作为参数,来指定您想要连接的服务器的主机和端口等信息。
此示例展示了如何使用 Vert.x MQTT 客户端并调用 connect
和 disconnect
方法来连接和断开服务器。
MqttClient client = MqttClient.create(vertx);
client.connect(1883, "mqtt.eclipse.org").onComplete(s -> {
client.disconnect();
});
通过 MqttClientOptions 提供的服务器默认地址是 localhost:1883,如果您使用 SSL/TSL,则是 localhost:8883。 |
订阅主题
现在,让我们深入了解这个例子
client.publishHandler(s -> {
System.out.println("There are new message in topic: " + s.topicName());
System.out.println("Content(as string) of the message: " + s.payload().toString());
System.out.println("QoS: " + s.qosLevel());
})
.subscribe("rpi2/temp", 2);
这里我们有 subscribe
方法的使用示例。为了从 rpi2/temp 主题接收消息,我们调用 subscribe
方法。此外,为了处理从服务器接收到的消息,您需要提供一个处理器,每当您订阅的主题中有新消息时,该处理器都会被调用。如本示例所示,处理器可以通过 publishHandler
方法提供。
向主题发布消息
如果您想向主题发布一些消息,则应调用 publish
。让我们看看这个例子
client.publish("temperature",
Buffer.buffer("hello"),
MqttQoS.AT_LEAST_ONCE,
false,
false);
在此示例中,我们向名为“temperature”的主题发送消息。
处理服务器认证请求/向服务器发送 AUTH 包(仅限 MQTT 5 版本)
在客户端和服务器之间建立连接后,客户端可以使用 authenticationExchange
向服务器发送认证请求以进行认证。服务器可能会返回一个 AUTH 包。MqttClient
接口允许使用 authenticationExchangeHandler
方法为传入的认证包指定处理器。该处理器接收 MqttAuthenticationExchangeMessage
接口的一个实例,该实例包含原因码、认证方法和数据。
client.authenticationExchange(MqttAuthenticationExchangeMessage.create(MqttAuthenticateReasonCode.SUCCESS, MqttProperties.NO_PROPERTIES));
client.authenticationExchangeHandler(auth -> {
//The handler will be called time to time by default
System.out.println("We have just received AUTH packet: " + auth.reasonCode());
});
保持与服务器的连接
为了保持与服务器的连接,您应该不时向服务器发送一些内容,否则服务器将关闭连接。保持连接的正确方法是 ping
方法。
默认情况下,您的客户端会自动保持与服务器的连接。这意味着您无需调用 ping 来保持与服务器的连接。MqttClient 将为您完成此操作。 |
如果要禁用此功能,应将 setAutoKeepAlive
参数设置为 false
options.setAutoKeepAlive(false);
在以下情况下收到通知
-
发布完成
您可以调用
publishCompletionHandler
提供处理器。每次发布完成后,该处理器都将被调用。这非常有用,因为您可以看到刚刚收到的 PUBACK 或 PUBCOMP 数据包的 packetId。client.publishCompletionHandler(id -> { System.out.println("Id of just received PUBACK or PUBCOMP packet is " + id); }); // The line of code below will trigger publishCompletionHandler (QoS 2) client.publish("hello", Buffer.buffer("hello"), MqttQoS.EXACTLY_ONCE, false, false); // The line of code below will trigger publishCompletionHandler (QoS is 1) client.publish("hello", Buffer.buffer("hello"), MqttQoS.AT_LEAST_ONCE, false, false); // The line of code below does not trigger because QoS value is 0 client.publish("hello", Buffer.buffer("hello"), MqttQoS.AT_LEAST_ONCE, false, false);
如果发送的发布数据包的 QoS=0,则处理器将**不会被调用**。 -
订阅完成
client.subscribeCompletionHandler(mqttSubAckMessage -> { System.out.println("Id of just received SUBACK packet is " + mqttSubAckMessage.messageId()); for (int s : mqttSubAckMessage.grantedQoSLevels()) { if (s == 0x80) { System.out.println("Failure"); } else { System.out.println("Success. Maximum QoS is " + s); } } }); client.subscribe("temp", 1); client.subscribe("temp2", 2);
-
取消订阅完成
client .unsubscribeCompletionHandler(id -> { System.out.println("Id of just received UNSUBACK packet is " + id); }); client.subscribe("temp", 1); client.unsubscribe("temp");
-
已发送取消订阅
client.subscribe("temp", 1); client.unsubscribe("temp").onSuccess(id -> { System.out.println("Id of just sent UNSUBSCRIBE packet is " + id); });
-
收到 PINGRESP
client.pingResponseHandler(s -> { //The handler will be called time to time by default System.out.println("We have just received PINGRESP packet"); });
使用 TLS 连接
您可以通过配置客户端 TCP 选项来使用 TLS 连接 MQTT 服务器,请确保设置:
-
SSL 标志
-
服务器证书或信任所有标志
-
主机名验证算法为
"HTTPS"
(如果您想验证服务器身份)或""
(否则)
MqttClientOptions options = new MqttClientOptions();
options
.setSsl(true)
.setTrustOptions(new PemTrustOptions().addCertPath("/path/to/server.crt"))
// Algo can be the empty string "" or "HTTPS" to verify the server hostname
.setHostnameVerificationAlgorithm(algo);
MqttClient client = MqttClient.create(vertx);
client.connect(1883, "mqtt.eclipse.org").onComplete(s -> {
client.disconnect();
});
有关 TLS 客户端配置的更多详细信息,请参见 此处 |
使用代理协议
MqttServer mqttServer = MqttServer
.create(vertx, new MqttServerOptions()
// set true to use proxy protocol
.setUseProxyProtocol(true));
mqttServer.endpointHandler(endpoint -> {
// remote address is origin real address, not proxy's address
System.out.println(endpoint.remoteAddress());
endpoint.accept(false);
})
.listen()
.onComplete(ar -> {
if (ar.succeeded()) {
System.out.println("MQTT server is listening on port " + ar.result().actualPort());
} else {
System.out.println("Error on starting the server");
ar.cause().printStackTrace();
}
});
如果您的服务器位于 haproxy 或 nginx 之后,并且您想获取客户端的原始 IP 和端口,则需要将 setUseProxyProtocol
设置为 true
要启用此功能,您需要添加依赖 netty-codec-haproxy ,但它默认不引入,因此您需要手动添加它 |
-
Maven(在您的
pom.xml
中)
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-haproxy</artifactId>
<version>5.0.1</version>
</dependency>
-
Gradle(在您的
build.gradle
文件中)
compile "io.netty:netty-codec-haproxy:5.0.1"