赞
踩
目录
- <!--mqtt依赖 start-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-integration</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.integration</groupId>
- <artifactId>spring-integration-stream</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.integration</groupId>
- <artifactId>spring-integration-mqtt</artifactId>
- </dependency>
- <!--mqtt依赖 end-->
- mqtt:
- host: tcp://127.0.0.1:1883
- username: admin
- password: admin123
emqx默认端口是1883,账号:admin 密码:public
- @Configuration
- // MQTT配置类
- public class MqttConfig {
-
- // exmq服务器地址
- @Value("${mqtt.host}")
- private String host;
-
- // 定义客户端ID,使用"DC"加上一个随机生成的数字
- private final String clientId = "DC" + new Random().nextInt(100000000);
-
- @Value("${mqtt.username}")
- private String username;
-
- @Value("${mqtt.password}")
- private String password;
-
- // 定义连接超时时间,默认为10秒,如果未在属性中指定,则使用默认值
- @Value("${mqtt.connection.timeout:10}")
- private int connectionTimeout;
-
- private static MqttClient mqttClient;
-
- /*
- * MQTT连接参数设置
- */
- private MqttConnectOptions mqttConnectOptions(String userName, String passWord, String host) throws MqttException {
- mqttClient = new MqttClient(host, clientId, new MemoryPersistence());
- MqttConnectOptions options = new MqttConnectOptions();
- options.setUserName(userName);
- options.setPassword(passWord.toCharArray());
- options.setConnectionTimeout(connectionTimeout); // 设置连接超时时间
- options.setAutomaticReconnect(true); // 开启自动重连
- options.setCleanSession(false); // 设置为false,表示不清除会话session
- // 可以根据需要设置其他参数,例如 options.setKeepAliveInterval(20); 设置心跳间隔时间,默认为60秒
- return options;
- }
-
- // 创建一个MqttClient的Bean实例,用于连接MQTT代理
- @Bean
- public MqttClient mqttClient() throws MqttException {
- MqttConnectOptions options = mqttConnectOptions(username, password, host);
- try {
- mqttClient.connect(options);
- } catch (MqttException e) {
- System.out.println("连接失败:" + e.getMessage());
- }
- return mqttClient;
- }
-
- // 发布消息
- public void publish(String topic, String msg, int qos) throws MqttException {
- MqttMessage mqttMessage = new MqttMessage();
- mqttMessage.setQos(qos); // 设置消息质量
- mqttMessage.setRetained(true); // 设置保留消息
- mqttMessage.setPayload(msg.getBytes()); // 设置消息内容
- try {
- MqttTopic mqttTopic = mqttClient.getTopic(topic);
- MqttDeliveryToken token = mqttTopic.publish(mqttMessage); // 发布消息
- token.waitForCompletion(); // 等待发布完成
- System.out.println("发送消息:Topic=" + topic + ", Message=" + msg.getBytes());
- } catch (MqttException e) {
- MqttConnectOptions options = mqttConnectOptions(username, password, host);
- reconnect(mqttClient, options, topic, mqttMessage); // 递归重新连接并重试发送消息
- }
- }
-
-
- // 重新连接并重试发送消息
- private static void reconnect(MqttClient mqttClient, MqttConnectOptions mqttConnectOptions, String topic, MqttMessage mqttMessage) throws MqttException {
- try {
- // 等待一段时间,可以根据需要调整等待时间
- Thread.sleep(5000);
- // 重新连接 MqttClient
- mqttClient.connect(mqttConnectOptions);
- // 判断是否连接成功
- if (mqttClient.isConnected()) {
- System.out.println("发送方MQTT 客户端已成功连接到 MQTT 代理。");
- MqttTopic mqttTopic = mqttClient.getTopic(topic);
- MqttDeliveryToken token = mqttTopic.publish(mqttMessage); // 重新发布消息
- token.waitForCompletion();
- }
- } catch (MqttException | InterruptedException e) {
- System.out.println("发送方重新连接失败:" + e.getMessage());
- reconnect(mqttClient, mqttConnectOptions, topic, mqttMessage); // 重连失败,继续重试
- }
- }
-
- }
1.导入相关的包,包括MqttClient和MqttException以及其他必要的类。
2.使用@Configuration注解将这个类声明为一个配置类,使得Spring Boot能够识别它。
3.使用@Value注解注入配置文件中的MQTT相关配置信息,包括服务器地址host、用户名username和密码等。password
4.定义一个静态的MqttClient对象mqttClient,用于管理MQTT客户端连接。
5.定义了一个私有方法mqttConnectOptions,用于设置MQTT连接参数,包括用户名、密码、连接超时时间、自动重连等。
6.在@Bean注解的方法mqttClient()中创建一个MqttClient的实例,并连接到MQTT代理。如果连接失败,会打印连接失败的信息。
7.定义了一个publish方法,用于发布消息。该方法接收三个参数:topic表示要发布的主题,msg表示要发布的消息内容,qos表示消息的质量。如果发布消息失败,会调用reconnect方法进行重新连接并重试发送消息。
8.reconnect方法用于处理连接丢失后的重新连接逻辑。它会等待一段时间(这里是5000毫秒),然后尝试重新连接MqttClient。如果连接成功,就会重新发布之前的消息。
需要注意的是,这里的代码实现了一个消息的发布功能,没有涉及到订阅功能。如果需要实现消息的订阅功能,可以通过在mqttClient()方法中设置消息回调来处理接收到的消息,类似之前的示例代码。同时,也需要添加订阅主题的逻辑
- public static void main(String[] args) {
-
- String host = "tcp://localhost:1883";
- String clientId = "Client_B";
- String topic = "li";
- try {
- MqttClient mqttClient = new MqttClient(host, clientId);
- MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
- mqttConnectOptions.setUserName("admin");
- mqttConnectOptions.setPassword("admin123".toCharArray());
-
- // 连接到 EMQ X Broker
- mqttClient.connect(mqttConnectOptions);
-
- // 设置消息回调
- mqttClient.setCallback(new MqttCallback() {
- @SneakyThrows
- @Override
- public void connectionLost(Throwable cause) {
- // 处理连接丢失的情况
- System.out.println("连接丢失,尝试重新连接...");
- reconnect(mqttClient, mqttConnectOptions, topic);
- }
-
- @Override
- public void messageArrived(String topic, MqttMessage message) throws Exception {
- // 处理接收到的消息
- String payload = new String(message.getPayload());
- System.out.println("收到消息:Topic=" + topic + ", Message=" + payload);
- }
-
- @SneakyThrows
- @Override
- public void deliveryComplete(IMqttDeliveryToken token) {
- // 处理消息发送完成的情况
- System.out.println("消息发送完成: " + token.getMessage().getPayload());
- }
- });
-
- // 订阅主题
- mqttClient.subscribe(topic);
-
- // 保持连接,防止程序退出
- // 这里可以根据需要,设置一个条件或者等待一段时间,确保程序能够保持连接
- while (true) {
- Thread.sleep(1000);
- }
- } catch (MqttException | InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- private static void reconnect(MqttClient mqttClient, MqttConnectOptions mqttConnectOptions, String topic) throws MqttException {
- try {
- // 等待一段时间,可以根据需要调整等待时间
- Thread.sleep(5000);
- // 重新连接 MqttClient
- mqttClient.connect(mqttConnectOptions);
- // 判断是否连接成功
- if (mqttClient.isConnected()) {
- System.out.println("MQTT 客户端已成功连接到 MQTT 代理。");
- // 重新订阅主题
- mqttClient.subscribe(topic);
- }
- } catch (MqttException | InterruptedException e) {
- System.out.println("重新连接失败:" + e.getMessage());
- reconnect(mqttClient, mqttConnectOptions, topic);
- }
- }
1.导入MqttClient相关的包,以及SneakyThrows注解。
2.定义MQTT Broker服务器地址 host、客户端ID clientId、订阅主题 topic。
3..在main方法中,创建MqttClient实例,并设置连接参数,包括用户名和密码。然后使用mqttClient.connect(mqttConnectOptions)连接到MQTT Broker。
4.设置消息回调,通过实现MqttCallback接口,处理连接丢失、接收到消息和消息发送完成的情况。
5.使用mqttClient.subscribe(topic)订阅指定主题。
6.使用一个无限循环保持连接,防止程序退出。可以根据需要设置一个条件或者等待一段时间,确保程序保持连接。
7.当连接丢失时,MqttCallback中的connectionLost方法会被触发,执行reconnect方法进行重新连接,并重新订阅主题。
8.reconnect方法是用来处理连接丢失后的重新连接逻辑。它会等待一段时间(这里是5000毫秒),然后尝试重新连接MqttClient。如果连接成功,就会重新订阅之前的主题。
需要注意的是,MqttClient是一个阻塞的客户端,它会一直运行在主线程中,除非程序被显式终止,否则不会自动退出。因此,这段代码将保持连接状态并持续接收处理来自MQTT Broker的消息。
最后写个发送的接口调用发送消息:
- @RestController
- public class MessageControlle {
-
- @Autowired
- private MqttConfig mqttConfig;
-
-
- @PostMapping("/publish")
- public String publishMessage(@RequestBody String message) {
- try {
- mqttConfig.publish("li", message,1);
- return "发送成功.";
- } catch (MqttException e) {
- return "发送失败: " + e.getMessage();
- }
- }
- }
让我们来看看效果:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。