当前位置:   article > 正文

springboot整合mqtt(emqx)_springboot 整合mqtt客户端连接多个服务端(多个emqx)

springboot 整合mqtt客户端连接多个服务端(多个emqx)

目录

1.安装emqx

2.导入pom依赖

3.yml配置

4.emqx配置类+服务端

5.客户端


1.安装emqx

可以参考:(278条消息) WINDOWS下搭建MQTT服务EMQX_emqx windows_李夕的博客-CSDN博客icon-default.png?t=N7T8https://blog.csdn.net/qq_19294353/article/details/123290346?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522169033795716800213030622%2522%252C%2522scm%2522%253A%252220140713.130102334..%2522%257D&request_id=169033795716800213030622&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~top_click~default-1-123290346-null-null.142%5Ev91%5Einsert_down28v1,239%5Ev3%5Econtrol&utm_term=windows%E5%AE%89%E8%A3%85emqx&spm=1018.2226.3001.4187为什么使用emqx:

(279条消息) EMQX以及用MQTT的测试和性能测试_emqx和rabbitmq性能_yiyang1208的博客-CSDN博客icon-default.png?t=N7T8https://blog.csdn.net/yiyang1208/article/details/128283367?ops_request_misc=&request_id=&biz_id=102&utm_term=emqx%E7%9A%84%E5%A5%BD%E5%A4%84&utm_medium=distribute.pc_search_result.none-task-blog-2~all~sobaiduweb~default-1-128283367.142%5Ev91%5Einsert_down28v1,239%5Ev3%5Econtrol&spm=1018.2226.3001.4187

2.导入pom依赖

  1. <!--mqtt依赖 start-->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-integration</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.integration</groupId>
  8. <artifactId>spring-integration-stream</artifactId>
  9. </dependency>
  10. <dependency>
  11. <groupId>org.springframework.integration</groupId>
  12. <artifactId>spring-integration-mqtt</artifactId>
  13. </dependency>
  14. <!--mqtt依赖 end-->

3.yml配置

  1. mqtt:
  2. host: tcp://127.0.0.1:1883
  3. username: admin
  4. password: admin123

emqx默认端口是1883,账号:admin  密码:public

4.emqx配置类+服务端

  1. @Configuration
  2. // MQTT配置类
  3. public class MqttConfig {
  4. // exmq服务器地址
  5. @Value("${mqtt.host}")
  6. private String host;
  7. // 定义客户端ID,使用"DC"加上一个随机生成的数字
  8. private final String clientId = "DC" + new Random().nextInt(100000000);
  9. @Value("${mqtt.username}")
  10. private String username;
  11. @Value("${mqtt.password}")
  12. private String password;
  13. // 定义连接超时时间,默认为10秒,如果未在属性中指定,则使用默认值
  14. @Value("${mqtt.connection.timeout:10}")
  15. private int connectionTimeout;
  16. private static MqttClient mqttClient;
  17. /*
  18. * MQTT连接参数设置
  19. */
  20. private MqttConnectOptions mqttConnectOptions(String userName, String passWord, String host) throws MqttException {
  21. mqttClient = new MqttClient(host, clientId, new MemoryPersistence());
  22. MqttConnectOptions options = new MqttConnectOptions();
  23. options.setUserName(userName);
  24. options.setPassword(passWord.toCharArray());
  25. options.setConnectionTimeout(connectionTimeout); // 设置连接超时时间
  26. options.setAutomaticReconnect(true); // 开启自动重连
  27. options.setCleanSession(false); // 设置为false,表示不清除会话session
  28. // 可以根据需要设置其他参数,例如 options.setKeepAliveInterval(20); 设置心跳间隔时间,默认为60
  29. return options;
  30. }
  31. // 创建一个MqttClient的Bean实例,用于连接MQTT代理
  32. @Bean
  33. public MqttClient mqttClient() throws MqttException {
  34. MqttConnectOptions options = mqttConnectOptions(username, password, host);
  35. try {
  36. mqttClient.connect(options);
  37. } catch (MqttException e) {
  38. System.out.println("连接失败:" + e.getMessage());
  39. }
  40. return mqttClient;
  41. }
  42. // 发布消息
  43. public void publish(String topic, String msg, int qos) throws MqttException {
  44. MqttMessage mqttMessage = new MqttMessage();
  45. mqttMessage.setQos(qos); // 设置消息质量
  46. mqttMessage.setRetained(true); // 设置保留消息
  47. mqttMessage.setPayload(msg.getBytes()); // 设置消息内容
  48. try {
  49. MqttTopic mqttTopic = mqttClient.getTopic(topic);
  50. MqttDeliveryToken token = mqttTopic.publish(mqttMessage); // 发布消息
  51. token.waitForCompletion(); // 等待发布完成
  52. System.out.println("发送消息:Topic=" + topic + ", Message=" + msg.getBytes());
  53. } catch (MqttException e) {
  54. MqttConnectOptions options = mqttConnectOptions(username, password, host);
  55. reconnect(mqttClient, options, topic, mqttMessage); // 递归重新连接并重试发送消息
  56. }
  57. }
  58. // 重新连接并重试发送消息
  59. private static void reconnect(MqttClient mqttClient, MqttConnectOptions mqttConnectOptions, String topic, MqttMessage mqttMessage) throws MqttException {
  60. try {
  61. // 等待一段时间,可以根据需要调整等待时间
  62. Thread.sleep(5000);
  63. // 重新连接 MqttClient
  64. mqttClient.connect(mqttConnectOptions);
  65. // 判断是否连接成功
  66. if (mqttClient.isConnected()) {
  67. System.out.println("发送方MQTT 客户端已成功连接到 MQTT 代理。");
  68. MqttTopic mqttTopic = mqttClient.getTopic(topic);
  69. MqttDeliveryToken token = mqttTopic.publish(mqttMessage); // 重新发布消息
  70. token.waitForCompletion();
  71. }
  72. } catch (MqttException | InterruptedException e) {
  73. System.out.println("发送方重新连接失败:" + e.getMessage());
  74. reconnect(mqttClient, mqttConnectOptions, topic, mqttMessage); // 重连失败,继续重试
  75. }
  76. }
  77. }

1.导入相关的包,包括MqttClient和MqttException以及其他必要的类。
2.使用@Configuration注解将这个类声明为一个配置类,使得Spring Boot能够识别它。
3.使用@Value注解注入配置文件中的MQTT相关配置信息,包括服务器地址host、用户名username和密码等。password
4.定义一个静态的MqttClient对象mqttClient,用于管理MQTT客户端连接。
5.定义了一个私有方法mqttConnectOptions,用于设置MQTT连接参数,包括用户名、密码、连接超时时间、自动重连等。
6.在@Bean注解的方法mqttClient()中创建一个MqttClient的实例,并连接到MQTT代理。如果连接失败,会打印连接失败的信息。
7.定义了一个publish方法,用于发布消息。该方法接收三个参数:topic表示要发布的主题,msg表示要发布的消息内容,qos表示消息的质量。如果发布消息失败,会调用reconnect方法进行重新连接并重试发送消息。
8.reconnect方法用于处理连接丢失后的重新连接逻辑。它会等待一段时间(这里是5000毫秒),然后尝试重新连接MqttClient。如果连接成功,就会重新发布之前的消息。
需要注意的是,这里的代码实现了一个消息的发布功能,没有涉及到订阅功能。如果需要实现消息的订阅功能,可以通过在mqttClient()方法中设置消息回调来处理接收到的消息,类似之前的示例代码。同时,也需要添加订阅主题的逻辑

5.客户端

  1. public static void main(String[] args) {
  2. String host = "tcp://localhost:1883";
  3. String clientId = "Client_B";
  4. String topic = "li";
  5. try {
  6. MqttClient mqttClient = new MqttClient(host, clientId);
  7. MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
  8. mqttConnectOptions.setUserName("admin");
  9. mqttConnectOptions.setPassword("admin123".toCharArray());
  10. // 连接到 EMQ X Broker
  11. mqttClient.connect(mqttConnectOptions);
  12. // 设置消息回调
  13. mqttClient.setCallback(new MqttCallback() {
  14. @SneakyThrows
  15. @Override
  16. public void connectionLost(Throwable cause) {
  17. // 处理连接丢失的情况
  18. System.out.println("连接丢失,尝试重新连接...");
  19. reconnect(mqttClient, mqttConnectOptions, topic);
  20. }
  21. @Override
  22. public void messageArrived(String topic, MqttMessage message) throws Exception {
  23. // 处理接收到的消息
  24. String payload = new String(message.getPayload());
  25. System.out.println("收到消息:Topic=" + topic + ", Message=" + payload);
  26. }
  27. @SneakyThrows
  28. @Override
  29. public void deliveryComplete(IMqttDeliveryToken token) {
  30. // 处理消息发送完成的情况
  31. System.out.println("消息发送完成: " + token.getMessage().getPayload());
  32. }
  33. });
  34. // 订阅主题
  35. mqttClient.subscribe(topic);
  36. // 保持连接,防止程序退出
  37. // 这里可以根据需要,设置一个条件或者等待一段时间,确保程序能够保持连接
  38. while (true) {
  39. Thread.sleep(1000);
  40. }
  41. } catch (MqttException | InterruptedException e) {
  42. e.printStackTrace();
  43. }
  44. }
  45. private static void reconnect(MqttClient mqttClient, MqttConnectOptions mqttConnectOptions, String topic) throws MqttException {
  46. try {
  47. // 等待一段时间,可以根据需要调整等待时间
  48. Thread.sleep(5000);
  49. // 重新连接 MqttClient
  50. mqttClient.connect(mqttConnectOptions);
  51. // 判断是否连接成功
  52. if (mqttClient.isConnected()) {
  53. System.out.println("MQTT 客户端已成功连接到 MQTT 代理。");
  54. // 重新订阅主题
  55. mqttClient.subscribe(topic);
  56. }
  57. } catch (MqttException | InterruptedException e) {
  58. System.out.println("重新连接失败:" + e.getMessage());
  59. reconnect(mqttClient, mqttConnectOptions, topic);
  60. }
  61. }

1.导入MqttClient相关的包,以及SneakyThrows注解。
2.定义MQTT Broker服务器地址 host、客户端ID clientId、订阅主题 topic。
3..在main方法中,创建MqttClient实例,并设置连接参数,包括用户名和密码。然后使用mqttClient.connect(mqttConnectOptions)连接到MQTT Broker。
4.设置消息回调,通过实现MqttCallback接口,处理连接丢失、接收到消息和消息发送完成的情况。
5.使用mqttClient.subscribe(topic)订阅指定主题。
6.使用一个无限循环保持连接,防止程序退出。可以根据需要设置一个条件或者等待一段时间,确保程序保持连接。
7.当连接丢失时,MqttCallback中的connectionLost方法会被触发,执行reconnect方法进行重新连接,并重新订阅主题。

8.reconnect方法是用来处理连接丢失后的重新连接逻辑。它会等待一段时间(这里是5000毫秒),然后尝试重新连接MqttClient。如果连接成功,就会重新订阅之前的主题。

需要注意的是,MqttClient是一个阻塞的客户端,它会一直运行在主线程中,除非程序被显式终止,否则不会自动退出。因此,这段代码将保持连接状态并持续接收处理来自MQTT Broker的消息。

最后写个发送的接口调用发送消息:

  1. @RestController
  2. public class MessageControlle {
  3. @Autowired
  4. private MqttConfig mqttConfig;
  5. @PostMapping("/publish")
  6. public String publishMessage(@RequestBody String message) {
  7. try {
  8. mqttConfig.publish("li", message,1);
  9. return "发送成功.";
  10. } catch (MqttException e) {
  11. return "发送失败: " + e.getMessage();
  12. }
  13. }
  14. }

让我们来看看效果:

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/寸_铁/article/detail/748083
推荐阅读
相关标签
  

闽ICP备14008679号