当前位置:   article > 正文

RabbitMQ用作MQTT服务器_rabbitmq搭建mqtt服务

rabbitmq搭建mqtt服务

第一步开启MQTT插件,开启Web MQTT插件,WebMQTT用来前端websocket连接mqtt。参考:https://blog.51cto.com/guzt/3995771

本人遇到的问题外部计算机无法连接mqtt,原因:没开webmqtt,然后防火墙还把请求给拦截了,导致其他计算机无法访问。遇到类似情况可以在RabbitMQ控制台查看是否开启webmqtt。之后可以让外部计算机用cmd的telnet ip 端口,来测试是否能访问服务器。

  1. 监听
  2. package com.jqxx.digtwinresop.module.equipment.mqtt;
  3. import io.netty.util.CharsetUtil;
  4. import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
  5. import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
  6. import org.eclipse.paho.client.mqttv3.MqttException;
  7. import org.eclipse.paho.client.mqttv3.MqttMessage;
  8. import org.slf4j.Logger;
  9. import org.slf4j.LoggerFactory;
  10. import javax.annotation.Resource;
  11. import java.util.Map;
  12. /*
  13. 监听接受消息
  14. */
  15. public class JwMqttCallback implements MqttCallbackExtended {
  16. //手动注入
  17. @Resource
  18. private MQTTConfig mqttConfig;
  19. // private MQTTConfig mqttConfig = SpringUtils.getBean(MQTTConfig.class);
  20. private static final Logger log = LoggerFactory.getLogger(JwMqttCallback.class);
  21. private JwMqttClient jwMqttClient;
  22. public JwMqttCallback(JwMqttClient jwMqttClient) {
  23. this.jwMqttClient = jwMqttClient;
  24. }
  25. /**
  26. * 丢失连接,可在这里做重连
  27. * 只会调用一次
  28. *
  29. * @param throwable
  30. */
  31. @Override
  32. public void connectionLost(Throwable throwable) {
  33. log.error("mqtt connectionLost 连接断开,5S之后尝试重连: {}", throwable.getMessage());
  34. long reconnectTimes = 1;
  35. while (true) {
  36. try {
  37. if (jwMqttClient.getClient().isConnected()) {
  38. //判断已经重新连接成功 需要重新订阅主题 可以在这个if里面订阅主题 或者 connectComplete(方法里面) 看你们自己选择
  39. log.warn("mqtt reconnect success end 重新连接 重新订阅成功");
  40. return;
  41. }
  42. reconnectTimes+=1;
  43. log.warn("mqtt reconnect times = {} try again... mqtt重新连接时间 {}", reconnectTimes, reconnectTimes);
  44. jwMqttClient.getClient().reconnect();
  45. } catch (MqttException e) {
  46. log.error("mqtt断连异常", e);
  47. }
  48. try {
  49. Thread.sleep(5000);
  50. } catch (InterruptedException e1) {
  51. }
  52. }
  53. }
  54. /**
  55. * @param topic
  56. * @param mqttMessage
  57. * @throws Exception
  58. * subscribe后得到的消息会执行到这里面
  59. */
  60. @Override
  61. public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
  62. log.info("MQTT接收消息主题 : {},接收消息内容 : {}", topic, new String(mqttMessage.getPayload()));
  63. //发布消息主题
  64. if (topic.equals("gj/embed/resp")){
  65. // Map maps = (Map) JSON.parse(new String(mqttMessage.getPayload(), CharsetUtil.UTF_8));
  66. //你自己的业务接口
  67. // insertCmdResults(maps);
  68. }
  69. //接收报警主题
  70. if (topic.equals("gj/embed/warn")){
  71. // Map maps = (Map) JSON.parse(new String(mqttMessage.getPayload(), CharsetUtil.UTF_8));
  72. //你自己的业务接口
  73. // insertPushAlarm(maps);
  74. }
  75. if (topic.equals("dp")) {
  76. log.info("主题为dp");
  77. }
  78. }
  79. /**
  80. *连接成功后的回调 可以在这个方法执行 订阅主题 生成Bean的 MqttConfiguration方法中订阅主题 出现bug
  81. *重新连接后 主题也需要再次订阅 将重新订阅主题放在连接成功后的回调 比较合理
  82. * @param reconnect
  83. * @param serverURI
  84. */
  85. @Override
  86. public void connectComplete(boolean reconnect,String serverURI){
  87. log.info("MQTT 连接成功,连接方式:{}",reconnect?"重连":"直连");
  88. //订阅主题
  89. // jwMqttClient.subscribe(mqttConfiguration.topic1, 1);
  90. // jwMqttClient.subscribe(mqttConfiguration.topic2, 1);
  91. // jwMqttClient.subscribe(mqttConfiguration.topic3, 1);
  92. // jwMqttClient.subscribe(mqttConfiguration.topic4, 1);
  93. }
  94. /**
  95. * 消息到达后
  96. * subscribe后,执行的回调函数
  97. *
  98. * @param s
  99. * @param mqttMessage
  100. * @throws Exception
  101. */
  102. /**
  103. * publish后,配送完成后回调的方法
  104. *
  105. * @param iMqttDeliveryToken
  106. */
  107. @Override
  108. public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
  109. log.info("==========deliveryComplete={}==========", iMqttDeliveryToken.isComplete());
  110. }
  111. }
  112. 发布订阅主题
  113. package com.jqxx.digtwinresop.module.equipment.mqtt;
  114. import org.eclipse.paho.client.mqttv3.*;
  115. import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
  116. import org.slf4j.Logger;
  117. import org.slf4j.LoggerFactory;
  118. import java.nio.charset.Charset;
  119. /**
  120. * 发送订阅消息
  121. */
  122. public class JwMqttClient {
  123. private static final Logger LOGGER = LoggerFactory.getLogger(JwMqttClient.class);
  124. private static MqttClient client;
  125. private String host;
  126. private String username;
  127. private String password;
  128. private String clientId;
  129. private int timeout;
  130. private int keepalive;
  131. public JwMqttClient(String host, String username, String password, String clientId, int timeOut, int keepAlive) {
  132. this.host = host;
  133. this.username = username;
  134. this.password = password;
  135. this.clientId = clientId;
  136. this.timeout = timeOut;
  137. this.keepalive = keepAlive;
  138. }
  139. public static MqttClient getClient() {
  140. return client;
  141. }
  142. public static void setClient(MqttClient client) {
  143. JwMqttClient.client = client;
  144. }
  145. /**
  146. * 设置mqtt连接参数
  147. *
  148. * @param username
  149. * @param password
  150. * @param timeout
  151. * @param keepalive
  152. * @return
  153. */
  154. public MqttConnectOptions setMqttConnectOptions(String username, String password, int timeout, int keepalive) {
  155. MqttConnectOptions options = new MqttConnectOptions();
  156. options.setUserName(username);
  157. options.setPassword(password.toCharArray());
  158. options.setConnectionTimeout(timeout);
  159. options.setKeepAliveInterval(keepalive);
  160. options.setCleanSession(true);
  161. options.setAutomaticReconnect(true);
  162. return options;
  163. }
  164. /**
  165. * 连接mqtt服务端,得到MqttClient连接对象
  166. */
  167. public void connect() throws MqttException {
  168. if (client == null) {
  169. client = new MqttClient(host, clientId, new MemoryPersistence());
  170. client.setCallback(new JwMqttCallback(JwMqttClient.this));
  171. }
  172. MqttConnectOptions mqttConnectOptions = setMqttConnectOptions(username, password, timeout, keepalive);
  173. if (!client.isConnected()) {
  174. client.connect(mqttConnectOptions);
  175. } else {
  176. client.disconnect();
  177. client.connect(mqttConnectOptions);
  178. }
  179. LOGGER.info("MQTT connect success");//未发生异常,则连接成功
  180. }
  181. /**
  182. * 发布,默认qos为0,非持久化
  183. *
  184. * @param pushMessage
  185. * @param topic
  186. */
  187. public void publish(String pushMessage, String topic) {
  188. publish(pushMessage, topic, 0, false);
  189. }
  190. /**
  191. * 发布消息
  192. *
  193. * @param pushMessage
  194. * @param topic
  195. * @param qos
  196. * @param retained:留存
  197. */
  198. public void publish(String pushMessage, String topic, int qos, boolean retained) {
  199. MqttMessage message = new MqttMessage();
  200. System.out.println(Charset.defaultCharset());
  201. try{
  202. message.setPayload(pushMessage.getBytes("UTF-8"));
  203. }
  204. catch(Exception e){
  205. System.out.println("MQTT消息使用默认编码");
  206. System.out.println(Charset.defaultCharset());
  207. message.setPayload(pushMessage.getBytes());
  208. }
  209. message.setQos(qos);
  210. message.setRetained(retained);
  211. MqttTopic mqttTopic = JwMqttClient.getClient().getTopic(topic);
  212. if (null == mqttTopic) {
  213. LOGGER.error("topic is not exist");
  214. }
  215. MqttDeliveryToken token;//Delivery:配送
  216. synchronized (this) {//注意:这里一定要同步,否则,在多线程publish的情况下,线程会发生死锁,分析见文章最后补充
  217. try {
  218. token = mqttTopic.publish(message);//也是发送到执行队列中,等待执行线程执行,将消息发送到消息中间件
  219. token.waitForCompletion(1000L);
  220. } catch (MqttPersistenceException e) {
  221. e.printStackTrace();
  222. } catch (MqttException e) {
  223. e.printStackTrace();
  224. }
  225. }
  226. }
  227. /**
  228. * 订阅某个主题
  229. *
  230. * @param topic
  231. * @param qos
  232. */
  233. public void subscribe(String topic, int qos) {
  234. try {
  235. JwMqttClient.getClient().subscribe(topic, qos);
  236. } catch (MqttException e) {
  237. e.printStackTrace();
  238. }
  239. }
  240. /**
  241. * 取消订阅主题
  242. *
  243. * @param topic 主题名称
  244. */
  245. public void cleanTopic(String topic) {
  246. if (client != null && client.isConnected()) {
  247. try {
  248. client.unsubscribe(topic);
  249. } catch (MqttException e) {
  250. e.printStackTrace();
  251. }
  252. } else {
  253. System.out.println("取消订阅失败!");
  254. }
  255. }
  256. }
  257. 消息实体
  258. package com.jqxx.digtwinresop.module.equipment.mqtt;
  259. public class JwMqttMessage {
  260. private String topic;
  261. private String content;
  262. private String time;
  263. public String getTopic() {
  264. return topic;
  265. }
  266. public void setTopic(String topic) {
  267. this.topic = topic;
  268. }
  269. public String getContent() {
  270. return content;
  271. }
  272. public void setContent(String content) {
  273. this.content = content;
  274. }
  275. public String getTime() {
  276. return time;
  277. }
  278. public void setTime(String time) {
  279. this.time = time;
  280. }
  281. @Override
  282. public String toString(){
  283. return "JsonStr";
  284. // return JSON.toJSONString(this);
  285. }
  286. }
  287. MQTT配置
  288. package com.jqxx.digtwinresop.module.equipment.mqtt;
  289. import org.eclipse.paho.client.mqttv3.MqttException;
  290. import org.slf4j.Logger;
  291. import org.slf4j.LoggerFactory;
  292. import org.springframework.beans.factory.annotation.Value;
  293. import org.springframework.context.annotation.Bean;
  294. import org.springframework.context.annotation.Configuration;
  295. @Configuration
  296. public class MQTTConfig {
  297. private static final Logger log = LoggerFactory.getLogger(MQTTConfig.class);
  298. @Value("${mqtt.host}")
  299. String host;
  300. @Value("${mqtt.userName}")
  301. String username;
  302. @Value("${mqtt.password}")
  303. String password;
  304. @Value("${mqtt.clientId}")
  305. String clientId;
  306. @Value("${mqtt.timeout}")
  307. int timeOut;
  308. @Value("${mqtt.keepalive}")
  309. int keepAlive;
  310. @Bean//注入spring
  311. public JwMqttClient jwMqttClient() {
  312. JwMqttClient jwMqttClient = new JwMqttClient(host, username, password, clientId, timeOut, keepAlive);
  313. for (int i = 0; i < 10; i++) {
  314. try {
  315. jwMqttClient.connect();
  316. //不同的主题
  317. jwMqttClient.subscribe(MqttConstants.TOPIC_DXGIS, 1);
  318. // jwMqttClient.subscribe(MqttConstants.TOPIC_DXGIS_APP, 1);
  319. // jwMqttClient.subscribe(MqttConstants.TOPIC_WL_IOT_ONLINE, 1);
  320. // jwMqttClient.subscribe(MqttConstants.TOPIC_WL_IOT_MONITOR, 1);
  321. return jwMqttClient;
  322. } catch (MqttException e) {
  323. log.error("MQTT connect exception,connect time = " + i);
  324. try {
  325. Thread.sleep(2000);
  326. } catch (InterruptedException e1) {
  327. e1.printStackTrace();
  328. }
  329. }
  330. }
  331. return jwMqttClient;
  332. }
  333. public String getHost() {
  334. return host;
  335. }
  336. public void setHost(String host) {
  337. this.host = host;
  338. }
  339. public String getUsername() {
  340. return username;
  341. }
  342. public void setUsername(String username) {
  343. this.username = username;
  344. }
  345. public String getPassword() {
  346. return password;
  347. }
  348. public void setPassword(String password) {
  349. this.password = password;
  350. }
  351. public String getClientId() {
  352. return clientId;
  353. }
  354. public void setClientId(String clientId) {
  355. this.clientId = clientId;
  356. }
  357. public int getTimeOut() {
  358. return timeOut;
  359. }
  360. public void setTimeOut(int timeOut) {
  361. this.timeOut = timeOut;
  362. }
  363. public int getKeepAlive() {
  364. return keepAlive;
  365. }
  366. public void setKeepAlive(int keepAlive) {
  367. this.keepAlive = keepAlive;
  368. }
  369. }
  370. MQTT主题枚举
  371. package com.jqxx.digtwinresop.module.equipment.mqtt;
  372. /**
  373. * @author wll
  374. * date 2024/3/12
  375. */
  376. public class MqttConstants {
  377. public static final String TOPIC_DXGIS = "dp";//大屏队列
  378. public static final String EQUIPMENT_DIRECT_EXCHANGE = "equipment_direct_exchange";//大屏Direct交换机
  379. public static final String EQUIPMENT_QUEUE_BIDING = "dp";//大屏
  380. }

依赖:

        <!--        MQTT-->
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.5</version>
        </dependency>
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/一键难忘520/article/detail/845652
推荐阅读
相关标签
  

闽ICP备14008679号