赞
踩
第一步开启MQTT插件,开启Web MQTT插件,WebMQTT用来前端websocket连接mqtt。参考:https://blog.51cto.com/guzt/3995771
本人遇到的问题外部计算机无法连接mqtt,原因:没开webmqtt,然后防火墙还把请求给拦截了,导致其他计算机无法访问。遇到类似情况可以在RabbitMQ控制台查看是否开启webmqtt。之后可以让外部计算机用cmd的telnet ip 端口,来测试是否能访问服务器。
- 监听
- package com.jqxx.digtwinresop.module.equipment.mqtt;
-
-
- import io.netty.util.CharsetUtil;
- import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
- import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
- import org.eclipse.paho.client.mqttv3.MqttException;
- import org.eclipse.paho.client.mqttv3.MqttMessage;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- import javax.annotation.Resource;
- import java.util.Map;
-
- /*
- 监听接受消息
- */
-
- public class JwMqttCallback implements MqttCallbackExtended {
- //手动注入
- @Resource
- private MQTTConfig mqttConfig;
-
- // private MQTTConfig mqttConfig = SpringUtils.getBean(MQTTConfig.class);
-
- private static final Logger log = LoggerFactory.getLogger(JwMqttCallback.class);
-
- private JwMqttClient jwMqttClient;
-
- public JwMqttCallback(JwMqttClient jwMqttClient) {
- this.jwMqttClient = jwMqttClient;
- }
-
-
- /**
- * 丢失连接,可在这里做重连
- * 只会调用一次
- *
- * @param throwable
- */
- @Override
- public void connectionLost(Throwable throwable) {
- log.error("mqtt connectionLost 连接断开,5S之后尝试重连: {}", throwable.getMessage());
- long reconnectTimes = 1;
- while (true) {
- try {
- if (jwMqttClient.getClient().isConnected()) {
- //判断已经重新连接成功 需要重新订阅主题 可以在这个if里面订阅主题 或者 connectComplete(方法里面) 看你们自己选择
- log.warn("mqtt reconnect success end 重新连接 重新订阅成功");
- return;
- }
- reconnectTimes+=1;
- log.warn("mqtt reconnect times = {} try again... mqtt重新连接时间 {}", reconnectTimes, reconnectTimes);
- jwMqttClient.getClient().reconnect();
- } catch (MqttException e) {
- log.error("mqtt断连异常", e);
- }
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e1) {
- }
- }
- }
-
- /**
- * @param topic
- * @param mqttMessage
- * @throws Exception
- * subscribe后得到的消息会执行到这里面
- */
- @Override
- public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
- log.info("MQTT接收消息主题 : {},接收消息内容 : {}", topic, new String(mqttMessage.getPayload()));
- //发布消息主题
- if (topic.equals("gj/embed/resp")){
- // Map maps = (Map) JSON.parse(new String(mqttMessage.getPayload(), CharsetUtil.UTF_8));
- //你自己的业务接口
- // insertCmdResults(maps);
- }
- //接收报警主题
- if (topic.equals("gj/embed/warn")){
- // Map maps = (Map) JSON.parse(new String(mqttMessage.getPayload(), CharsetUtil.UTF_8));
- //你自己的业务接口
- // insertPushAlarm(maps);
- }
- if (topic.equals("dp")) {
- log.info("主题为dp");
- }
- }
-
-
- /**
- *连接成功后的回调 可以在这个方法执行 订阅主题 生成Bean的 MqttConfiguration方法中订阅主题 出现bug
- *重新连接后 主题也需要再次订阅 将重新订阅主题放在连接成功后的回调 比较合理
- * @param reconnect
- * @param serverURI
- */
- @Override
- public void connectComplete(boolean reconnect,String serverURI){
- log.info("MQTT 连接成功,连接方式:{}",reconnect?"重连":"直连");
- //订阅主题
- // jwMqttClient.subscribe(mqttConfiguration.topic1, 1);
- // jwMqttClient.subscribe(mqttConfiguration.topic2, 1);
- // jwMqttClient.subscribe(mqttConfiguration.topic3, 1);
- // jwMqttClient.subscribe(mqttConfiguration.topic4, 1);
- }
-
- /**
- * 消息到达后
- * subscribe后,执行的回调函数
- *
- * @param s
- * @param mqttMessage
- * @throws Exception
- */
- /**
- * publish后,配送完成后回调的方法
- *
- * @param iMqttDeliveryToken
- */
- @Override
- public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
- log.info("==========deliveryComplete={}==========", iMqttDeliveryToken.isComplete());
- }
- }
- 发布订阅主题
- package com.jqxx.digtwinresop.module.equipment.mqtt;
-
- import org.eclipse.paho.client.mqttv3.*;
- import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- import java.nio.charset.Charset;
-
- /**
- * 发送订阅消息
- */
- public class JwMqttClient {
- private static final Logger LOGGER = LoggerFactory.getLogger(JwMqttClient.class);
-
- private static MqttClient client;
- private String host;
- private String username;
- private String password;
- private String clientId;
- private int timeout;
- private int keepalive;
- public JwMqttClient(String host, String username, String password, String clientId, int timeOut, int keepAlive) {
- this.host = host;
- this.username = username;
- this.password = password;
- this.clientId = clientId;
- this.timeout = timeOut;
- this.keepalive = keepAlive;
- }
-
- public static MqttClient getClient() {
- return client;
- }
-
- public static void setClient(MqttClient client) {
- JwMqttClient.client = client;
- }
-
- /**
- * 设置mqtt连接参数
- *
- * @param username
- * @param password
- * @param timeout
- * @param keepalive
- * @return
- */
- public MqttConnectOptions setMqttConnectOptions(String username, String password, int timeout, int keepalive) {
- MqttConnectOptions options = new MqttConnectOptions();
- options.setUserName(username);
- options.setPassword(password.toCharArray());
- options.setConnectionTimeout(timeout);
- options.setKeepAliveInterval(keepalive);
- options.setCleanSession(true);
- options.setAutomaticReconnect(true);
- return options;
- }
-
- /**
- * 连接mqtt服务端,得到MqttClient连接对象
- */
- public void connect() throws MqttException {
- if (client == null) {
- client = new MqttClient(host, clientId, new MemoryPersistence());
- client.setCallback(new JwMqttCallback(JwMqttClient.this));
- }
- MqttConnectOptions mqttConnectOptions = setMqttConnectOptions(username, password, timeout, keepalive);
- if (!client.isConnected()) {
- client.connect(mqttConnectOptions);
- } else {
- client.disconnect();
- client.connect(mqttConnectOptions);
- }
- LOGGER.info("MQTT connect success");//未发生异常,则连接成功
- }
-
- /**
- * 发布,默认qos为0,非持久化
- *
- * @param pushMessage
- * @param topic
- */
- public void publish(String pushMessage, String topic) {
- publish(pushMessage, topic, 0, false);
- }
-
- /**
- * 发布消息
- *
- * @param pushMessage
- * @param topic
- * @param qos
- * @param retained:留存
- */
- public void publish(String pushMessage, String topic, int qos, boolean retained) {
- MqttMessage message = new MqttMessage();
- System.out.println(Charset.defaultCharset());
- try{
- message.setPayload(pushMessage.getBytes("UTF-8"));
- }
- catch(Exception e){
- System.out.println("MQTT消息使用默认编码");
- System.out.println(Charset.defaultCharset());
- message.setPayload(pushMessage.getBytes());
- }
- message.setQos(qos);
- message.setRetained(retained);
- MqttTopic mqttTopic = JwMqttClient.getClient().getTopic(topic);
- if (null == mqttTopic) {
- LOGGER.error("topic is not exist");
- }
- MqttDeliveryToken token;//Delivery:配送
- synchronized (this) {//注意:这里一定要同步,否则,在多线程publish的情况下,线程会发生死锁,分析见文章最后补充
- try {
- token = mqttTopic.publish(message);//也是发送到执行队列中,等待执行线程执行,将消息发送到消息中间件
- token.waitForCompletion(1000L);
- } catch (MqttPersistenceException e) {
- e.printStackTrace();
- } catch (MqttException e) {
- e.printStackTrace();
- }
- }
- }
-
-
- /**
- * 订阅某个主题
- *
- * @param topic
- * @param qos
- */
- public void subscribe(String topic, int qos) {
- try {
- JwMqttClient.getClient().subscribe(topic, qos);
- } catch (MqttException e) {
- e.printStackTrace();
- }
- }
-
-
- /**
- * 取消订阅主题
- *
- * @param topic 主题名称
- */
- public void cleanTopic(String topic) {
- if (client != null && client.isConnected()) {
- try {
- client.unsubscribe(topic);
- } catch (MqttException e) {
- e.printStackTrace();
- }
- } else {
- System.out.println("取消订阅失败!");
- }
- }
- }
- 消息实体
- package com.jqxx.digtwinresop.module.equipment.mqtt;
-
-
- public class JwMqttMessage {
- private String topic;
- private String content;
- private String time;
-
- public String getTopic() {
- return topic;
- }
- public void setTopic(String topic) {
- this.topic = topic;
- }
-
- public String getContent() {
- return content;
- }
- public void setContent(String content) {
- this.content = content;
- }
-
- public String getTime() {
- return time;
- }
- public void setTime(String time) {
- this.time = time;
- }
- @Override
- public String toString(){
- return "JsonStr";
- // return JSON.toJSONString(this);
- }
- }
- MQTT配置
- package com.jqxx.digtwinresop.module.equipment.mqtt;
-
- import org.eclipse.paho.client.mqttv3.MqttException;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- public class MQTTConfig {
- private static final Logger log = LoggerFactory.getLogger(MQTTConfig.class);
- @Value("${mqtt.host}")
- String host;
- @Value("${mqtt.userName}")
- String username;
- @Value("${mqtt.password}")
- String password;
- @Value("${mqtt.clientId}")
- String clientId;
- @Value("${mqtt.timeout}")
- int timeOut;
- @Value("${mqtt.keepalive}")
- int keepAlive;
-
- @Bean//注入spring
- public JwMqttClient jwMqttClient() {
- JwMqttClient jwMqttClient = new JwMqttClient(host, username, password, clientId, timeOut, keepAlive);
- for (int i = 0; i < 10; i++) {
- try {
- jwMqttClient.connect();
- //不同的主题
- jwMqttClient.subscribe(MqttConstants.TOPIC_DXGIS, 1);
- // jwMqttClient.subscribe(MqttConstants.TOPIC_DXGIS_APP, 1);
- // jwMqttClient.subscribe(MqttConstants.TOPIC_WL_IOT_ONLINE, 1);
- // jwMqttClient.subscribe(MqttConstants.TOPIC_WL_IOT_MONITOR, 1);
- return jwMqttClient;
- } catch (MqttException e) {
- log.error("MQTT connect exception,connect time = " + i);
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e1) {
- e1.printStackTrace();
- }
- }
- }
- return jwMqttClient;
- }
-
- public String getHost() {
- return host;
- }
-
- public void setHost(String host) {
- this.host = host;
- }
-
- public String getUsername() {
- return username;
- }
-
- public void setUsername(String username) {
- this.username = username;
- }
-
- public String getPassword() {
- return password;
- }
-
- public void setPassword(String password) {
- this.password = password;
- }
-
- public String getClientId() {
- return clientId;
- }
-
- public void setClientId(String clientId) {
- this.clientId = clientId;
- }
-
- public int getTimeOut() {
- return timeOut;
- }
-
- public void setTimeOut(int timeOut) {
- this.timeOut = timeOut;
- }
-
- public int getKeepAlive() {
- return keepAlive;
- }
-
- public void setKeepAlive(int keepAlive) {
- this.keepAlive = keepAlive;
- }
- }
- MQTT主题枚举
- package com.jqxx.digtwinresop.module.equipment.mqtt;
-
- /**
- * @author wll
- * date 2024/3/12
- */
- public class MqttConstants {
-
- public static final String TOPIC_DXGIS = "dp";//大屏队列
-
- public static final String EQUIPMENT_DIRECT_EXCHANGE = "equipment_direct_exchange";//大屏Direct交换机
-
- public static final String EQUIPMENT_QUEUE_BIDING = "dp";//大屏
-
- }
-
-
-
依赖:
<!-- MQTT--> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.5</version> </dependency>
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。