赞
踩
先做一个简单的近日总结吧
最近在学习怎么用vertx框架实现一个mqtt服务和物联网平台进一步的搭建
这两天又折腾了笔记软件,想从语雀迁到obsidian,也花了不少时间,想着最近都没怎么写过博客,就写一写自己这几天学习vertx框架实现mqtt服务的收获和学习到东西
MQTT是基于发布/订阅(publish/subscribe)模式的”轻量级“通信协议,该协议构建于TCP/IP协议之上,由于其低开销,低带宽占用的特点,非常适合在物联网设备通信方面使用。
关于MQTT相关的基础知识我不再赘述,如果还不清楚的同学可以看看这个文档,快速了解一下相关知识。MQTT 协议入门:基础知识和快速教程
而Vert.x MQTT是一个使用Vert.x实现MQTT协议中客户端与服务端的组件,里边有一些基础的api,可以利用api写出自己想要的服务。以下是其官方文档,可以参考一下,虽然是汉化过的文档,但是我看的过程还是比较困难,可能是作者太菜了。Vert.x MQTT
这里我只用了Vertx MQTT的服务端组件实现了MQTT服务,客户端部分采用了MQTTX创建虚拟设备。MQTTX是一款客户端工具,可以用于创建虚拟设备快速进行测试,MQTT 客户端工具演示 | EMQX 文档
先分方法放出来,最后再放完整代码
代码的入口,创建了一个MQTT服务器并进行了监听。在接收到来自客户端的连接请求时,会调用不同的处理方法来处理订阅、退订、收到消息和断开连接等操作。
@Override public void start() throws Exception { MqttServer mqttServer = MqttServer.create(vertx); topicSubscribers = new HashMap<>(); subscriptions=new HashMap<>(); mqttServer.endpointHandler(endpoint -> { //使用lambda表达式实现了endpointHandler方法,传入参数endpoint; System.out.println("MQTT client [" + endpoint.clientIdentifier() + "] request to connect, clean session = " + endpoint.isCleanSession()); if (endpoint.auth() != null) { System.out.println("[username = " + endpoint.auth().getUsername() + ", password = " + endpoint.auth().getPassword() + "]"); } System.out.println("[properties = " + endpoint.connectProperties() + "]"); // accept connection from the remote client endpoint.accept(false); SubscribeHandle(endpoint); UnSubscribeHandle(endpoint); ReceiveHandle(endpoint); DisConnectHandle(endpoint); }); mqttServer.listen(ar -> { if (ar.succeeded()) { System.out.println("MQTT server is listening on port " + ar.result().actualPort()); } else { System.out.println("Error on starting the server"); ar.cause().printStackTrace(); } }); }
用于处理设备断开连接的情况。当设备发送断开连接消息时,会从订阅关系列表中删除该设备,并从订阅列表中删除该设备的所有订阅。
private void DisConnectHandle(MqttEndpoint endpoint) {
endpoint.disconnectMessageHandler(disconnectMessage -> {
System.out.println("Received disconnect from client, reason code = " + disconnectMessage.code());
System.out.println("Client "+endpoint.auth().getUsername()+"disconnect with");
//对两个映射订阅关系的列表进行更新
for (String topic:subscriptions.get(endpoint)){
topicSubscribers.get(topic).remove(endpoint);
System.out.println("["+topic+"]");
}
subscriptions.remove(endpoint);
});
}
用于处理订阅消息。当设备发送订阅消息时,会检查订阅的主题是否合法。如果主题合法,则将设备添加到订阅关系列表和订阅列表中,并向设备发送订阅确认消息。
private void SubscribeHandle(MqttEndpoint endpoint) { endpoint.subscribeHandler(subscribe -> { Boolean IsValidTopic=false; //存储订阅消息中要订阅的topic的列表 List<MqttTopicSubscription> topicSubscriptions = subscribe.topicSubscriptions(); //存储订阅topic Qos级别的列表 List<MqttSubAckReasonCode> reasonCodes = new ArrayList<>(); //遍历列表 for (MqttTopicSubscription s : topicSubscriptions) { //topic String topic = s.topicName(); //Qos级别 MqttQoS qos = s.qualityOfService(); //判断topic是否合法 if (!isValidTopic(topic)){ //不合法则向设备发送消息 endpoint.publish(topic, Buffer.buffer("非法topic,topic不可包含空格"), qos, false, false); continue; }else { IsValidTopic=true; } System.out.println("Subscription for " + topic + " with QoS " + qos); reasonCodes.add(MqttSubAckReasonCode.qosGranted(qos)); //判断是否已有此topic,如果有则直接添加,没有则新建键值对 if (!topicSubscribers.containsKey(topic)) { topicSubscribers.put(topic, new ArrayList<MqttEndpoint>()); } topicSubscribers.get(topic).add(endpoint); //同上 if (!subscriptions.containsKey(endpoint)) { subscriptions.put(endpoint, new ArrayList<String>()); } subscriptions.get(endpoint).add(topic); } if(IsValidTopic){ endpoint.subscribeAcknowledge(subscribe.messageId(), reasonCodes, MqttProperties.NO_PROPERTIES); } }); }
用于处理退订消息。当设备发送退订消息时,会从订阅关系列表中删除该设备,并检查是否需要删除订阅列表中的主题。
private void UnSubscribeHandle(MqttEndpoint endpoint) { endpoint.unsubscribeHandler(unsubscribe -> { //遍历要退订的topic for (String unsubscribedTopic : unsubscribe.topics()) { topicSubscribers.get(unsubscribedTopic).remove(endpoint); //如果某topic的订阅列表为空,删除topic if (topicSubscribers.get(unsubscribedTopic).size() == 0) { topicSubscribers.remove(unsubscribedTopic); } subscriptions.get(endpoint).remove(unsubscribedTopic); //同上 if (subscriptions.get(endpoint).size()==0){ subscriptions.remove(endpoint); } System.out.println("unsubscribed :" + endpoint.auth().getUsername() + "for" + unsubscribedTopic); } endpoint.unsubscribeAcknowledge(unsubscribe.messageId()); }); }
用于处理收到的消息。当设备发布消息时,会检查发布的主题是否合法。如果主题合法,则遍历订阅关系列表,将消息发布给订阅了匹配主题的设备。
private void ReceiveHandle(MqttEndpoint endpoint) { endpoint.publishHandler(publish -> { String topic = publish.topicName(); Buffer payload = publish.payload(); //对topic的合法性进行判断 if (!isValidTopic(topic)){ endpoint.publish(topic, Buffer.buffer("非法topic,topic不可包含空格"), MqttQoS.AT_MOST_ONCE, false, false); return; } //记录日志接收到设备发布的消息 System.out.println("Received message [" + publish.payload().toString(Charset.defaultCharset()) + "] with QoS [" + publish.qosLevel() + "]"); if (publish.qosLevel() == MqttQoS.AT_LEAST_ONCE) { endpoint.publishAcknowledge(publish.messageId()); } else if (publish.qosLevel() == MqttQoS.EXACTLY_ONCE) { endpoint.publishReceived(publish.messageId()); } //遍历订阅关系,进行消息发布 for (Map.Entry<String, List<MqttEndpoint>> entry : topicSubscribers.entrySet()) { String subscribedTopic = entry.getKey(); //被订阅的topic List<MqttEndpoint> subscribers = entry.getValue(); //订阅上方topic的订阅者 //判断消息发布的topic是否能和被设备订阅的topic按照规则匹配 if (isTopicMatch(subscribedTopic, topic)) { //若匹配,则遍历topic订阅者列表,并进行消息发布 for (MqttEndpoint subscriber : subscribers) { subscriber.publish(topic, payload, publish.qosLevel(), publish.isDup(), publish.isRetain()); } } } }); endpoint.publishAcknowledgeHandler(messageId -> { System.out.println("received ack for message =" + messageId); }).publishReceivedHandler(messageId -> { endpoint.publishRelease(messageId); }).publishCompletionHandler(messageId -> { System.out.println("Received ack for message =" + messageId); }); endpoint.publishReleaseHandler(endpoint::publishComplete); }
用于判断订阅的主题和发布的主题是否匹配。它会将主题按照"/“进行分割,并进行逐层匹配,支持通配符”+“和”#"。
private boolean isTopicMatch(String subscribedTopic, String publishedTopic) { String[] publishTopicArray = publishedTopic.split("/"); String[] subscribedTopicArray = subscribedTopic.split("/"); //将两个要比较的topic分割 //订阅的topic长度不能比发布的topic长一个以上 if (subscribedTopicArray.length - 1 > publishTopicArray.length) { return false; } //如果发布的topic长度比订阅的topic长度要长 //并且订阅的topic最后不是以#结尾都返回false,因为这不可能 if (subscribedTopicArray.length<publishTopicArray.length){ if (!subscribedTopicArray[subscribedTopicArray.length-1].equals("#")){ return false; } } //对两个topic进行比较 for (int i = 0; i < publishTopicArray.length && i < subscribedTopicArray.length; i++) { //如果匹配成功或者匹配到了+,进行下一层匹配 if (subscribedTopicArray[i].equals(publishTopicArray[i])||subscribedTopicArray[i].equals("+")){ continue; } //如果匹配到了#,直接通过 if (subscribedTopicArray[i].equals("#")) { break; } return false; } return true; }
用于判断主题是否合法。它会检查主题是否包含空格,并且要么以"/#“结尾,要么不包含”#"。
public boolean isValidTopic(String topic) {
//topic 不能包含任何空格,并且要么以 /# 结尾,要么不包含 #
return (!topic.matches(".*\\s+.*"))&&(topic.matches(".*(?:\\/#)?$"));
}
package com.yichen.starter; import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttQoS; import io.vertx.core.AbstractVerticle; import io.vertx.core.buffer.Buffer; import io.vertx.mqtt.MqttEndpoint; import io.vertx.mqtt.MqttServer; import io.vertx.mqtt.MqttTopicSubscription; import io.vertx.mqtt.messages.codes.MqttSubAckReasonCode; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /* * 缺少 * */ public class MqttVertical extends AbstractVerticle { private static Map<String, List<MqttEndpoint>> topicSubscribers; //存储每个topic的订阅关系 private static Map<MqttEndpoint, List<String>> subscriptions; //存储每个设备订阅的topic @Override public void start() throws Exception { MqttServer mqttServer = MqttServer.create(vertx); topicSubscribers = new HashMap<>(); subscriptions=new HashMap<>(); mqttServer.endpointHandler(endpoint -> { //使用lambda表达式实现了endpointHandler方法,传入参数endpoint; System.out.println("MQTT client [" + endpoint.clientIdentifier() + "] request to connect, clean session = " + endpoint.isCleanSession()); if (endpoint.auth() != null) { System.out.println("[username = " + endpoint.auth().getUsername() + ", password = " + endpoint.auth().getPassword() + "]"); } System.out.println("[properties = " + endpoint.connectProperties() + "]"); // accept connection from the remote client endpoint.accept(false); SubscribeHandle(endpoint); UnSubscribeHandle(endpoint); ReceiveHandle(endpoint); DisConnectHandle(endpoint); }); mqttServer.listen(ar -> { if (ar.succeeded()) { System.out.println("MQTT server is listening on port " + ar.result().actualPort()); } else { System.out.println("Error on starting the server"); ar.cause().printStackTrace(); } }); } /* * 设备断开处理 * */ private void DisConnectHandle(MqttEndpoint endpoint) { endpoint.disconnectMessageHandler(disconnectMessage -> { System.out.println("Received disconnect from client, reason code = " + disconnectMessage.code()); System.out.println("Client "+endpoint.auth().getUsername()+"disconnect with"); //对两个映射订阅关系的列表进行更新 for (String topic:subscriptions.get(endpoint)){ topicSubscribers.get(topic).remove(endpoint); System.out.println("["+topic+"]"); } subscriptions.remove(endpoint); }); } /* *处理订阅消息 * */ private void SubscribeHandle(MqttEndpoint endpoint) { endpoint.subscribeHandler(subscribe -> { Boolean IsValidTopic=false; //存储订阅消息中要订阅的topic的列表 List<MqttTopicSubscription> topicSubscriptions = subscribe.topicSubscriptions(); //存储订阅topic Qos级别的列表 List<MqttSubAckReasonCode> reasonCodes = new ArrayList<>(); //遍历列表 for (MqttTopicSubscription s : topicSubscriptions) { //topic String topic = s.topicName(); //Qos级别 MqttQoS qos = s.qualityOfService(); //判断topic是否合法 if (!isValidTopic(topic)){ //不合法则向设备发送消息 endpoint.publish(topic, Buffer.buffer("非法topic,topic不可包含空格"), qos, false, false); continue; }else { IsValidTopic=true; } System.out.println("Subscription for " + topic + " with QoS " + qos); reasonCodes.add(MqttSubAckReasonCode.qosGranted(qos)); //判断是否已有此topic,如果有则直接添加,没有则新建键值对 if (!topicSubscribers.containsKey(topic)) { topicSubscribers.put(topic, new ArrayList<MqttEndpoint>()); } topicSubscribers.get(topic).add(endpoint); //同上 if (!subscriptions.containsKey(endpoint)) { subscriptions.put(endpoint, new ArrayList<String>()); } subscriptions.get(endpoint).add(topic); } if(IsValidTopic){ endpoint.subscribeAcknowledge(subscribe.messageId(), reasonCodes, MqttProperties.NO_PROPERTIES); } }); } /* * 处理退订 * */ private void UnSubscribeHandle(MqttEndpoint endpoint) { endpoint.unsubscribeHandler(unsubscribe -> { //遍历要退订的topic for (String unsubscribedTopic : unsubscribe.topics()) { topicSubscribers.get(unsubscribedTopic).remove(endpoint); //如果某topic的订阅列表为空,删除topic if (topicSubscribers.get(unsubscribedTopic).size() == 0) { topicSubscribers.remove(unsubscribedTopic); } subscriptions.get(endpoint).remove(unsubscribedTopic); //同上 if (subscriptions.get(endpoint).size()==0){ subscriptions.remove(endpoint); } System.out.println("unsubscribed :" + endpoint.auth().getUsername() + "for" + unsubscribedTopic); } endpoint.unsubscribeAcknowledge(unsubscribe.messageId()); }); } private void ReceiveHandle(MqttEndpoint endpoint) { endpoint.publishHandler(publish -> { String topic = publish.topicName(); Buffer payload = publish.payload(); //对topic的合法性进行判断 if (!isValidTopic(topic)){ endpoint.publish(topic, Buffer.buffer("非法topic,topic不可包含空格"), MqttQoS.AT_MOST_ONCE, false, false); return; } //记录日志接收到设备发布的消息 System.out.println("Received message [" + publish.payload().toString(Charset.defaultCharset()) + "] with QoS [" + publish.qosLevel() + "]"); if (publish.qosLevel() == MqttQoS.AT_LEAST_ONCE) { endpoint.publishAcknowledge(publish.messageId()); } else if (publish.qosLevel() == MqttQoS.EXACTLY_ONCE) { endpoint.publishReceived(publish.messageId()); } //遍历订阅关系,进行消息发布 for (Map.Entry<String, List<MqttEndpoint>> entry : topicSubscribers.entrySet()) { String subscribedTopic = entry.getKey(); //被订阅的topic List<MqttEndpoint> subscribers = entry.getValue(); //订阅上方topic的订阅者 //判断消息发布的topic是否能和被设备订阅的topic按照规则匹配 if (isTopicMatch(subscribedTopic, topic)) { //若匹配,则遍历topic订阅者列表,并进行消息发布 for (MqttEndpoint subscriber : subscribers) { subscriber.publish(topic, payload, publish.qosLevel(), publish.isDup(), publish.isRetain()); } } } }); endpoint.publishAcknowledgeHandler(messageId -> { System.out.println("received ack for message =" + messageId); }).publishReceivedHandler(messageId -> { endpoint.publishRelease(messageId); }).publishCompletionHandler(messageId -> { System.out.println("Received ack for message =" + messageId); }); endpoint.publishReleaseHandler(endpoint::publishComplete); } /* * 判断topic是否匹配 * */ private boolean isTopicMatch(String subscribedTopic, String publishedTopic) { String[] publishTopicArray = publishedTopic.split("/"); String[] subscribedTopicArray = subscribedTopic.split("/"); //将两个要比较的topic分割 //订阅的topic长度不能比发布的topic长一个以上 if (subscribedTopicArray.length - 1 > publishTopicArray.length) { return false; } //如果发布的topic长度比订阅的topic长度要长 //并且订阅的topic最后不是以#结尾都返回false,因为这不可能 if (subscribedTopicArray.length<publishTopicArray.length){ if (!subscribedTopicArray[subscribedTopicArray.length-1].equals("#")){ return false; } } //对两个topic进行比较 for (int i = 0; i < publishTopicArray.length && i < subscribedTopicArray.length; i++) { //如果匹配成功或者匹配到了+,进行下一层匹配 if (subscribedTopicArray[i].equals(publishTopicArray[i])||subscribedTopicArray[i].equals("+")){ continue; } //如果匹配到了#,直接通过 if (subscribedTopicArray[i].equals("#")) { break; } return false; } return true; } public boolean isValidTopic(String topic) { //topic 不能包含任何空格,并且要么以 /# 结尾,要么不包含 # return (!topic.matches(".*\\s+.*"))&&(topic.matches(".*(?:\\/#)?$")); } }
以上就是我的全部代码和相关内容解释,如果有疑问欢迎评论区交流,如果帮到你了可以点个赞
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。