当前位置:   article > 正文

【Go】MQTT的发布和订阅_mqtt 订阅

mqtt 订阅

背景

MQTT(Message Queue Telemetry Transport)是一种轻量级的消息传递协议,支持发布-订阅模式。在MQTT中,发布者将消息发布到主题(topic),而订阅者可以订阅一个或多个主题以接收相关消息。

要发布消息,客户端需要连接到MQTT代理(broker)并指定要发布的主题和消息内容。代理将消息转发给所有已经订阅该主题的客户端。

要订阅主题,客户端需要连接到MQTT代理并指定要订阅的主题。代理将根据订阅信息,向客户端发送所订阅主题下的所有消息。

可以使用不同QoS(Quality of Service)等级来控制消息传递的可靠性和效率。QoS级别越高,消息传递越可靠但是也会增加网络开销。

实现原理

MQTT的发布和订阅模式是基于代理(broker)来实现的。客户端通过连接到代理并发送相应的消息,代理将根据主题(topic)将消息转发给所有已经订阅该主题的客户端。

当客户端发布一个消息时,它需要指定要发布的主题和消息内容,并将消息发送到代理。代理会记录下该消息并向所有已经订阅该主题的客户端转发该消息。

当客户端订阅一个主题时,它需要指定所需的QoS(Quality of Service)等级和订阅主题的名称。代理会将此信息记录下来,并将该主题的所有未读消息发送给该客户端。在此之后,代理将继续向该客户端发送任何新的与该主题相关的消息。

注意:当客户端订阅主题时,可以使用通配符来匹配多个主题。例如,使用“+”订阅“a/+”将匹配所有以“a/”开头的主题。或者使用“#”订阅“a/#”将匹配所有以“a/”开头的主题及其子主题。

代码实现

初始化

  1. package start
  2. import (
  3. "fmt"
  4. mqtt "github.com/eclipse/paho.mqtt.golang"
  5. log "github.com/sirupsen/logrus"
  6. "xxxx/common/config"
  7. mqttService "xxxx/common/mqtt"
  8. "time"
  9. )
  10. var MqttClient mqtt.Client
  11. // 初始化MQTT
  12. func MQTT() {
  13. opts := mqtt.NewClientOptions().AddBroker(config.MqttUrl)
  14. clientId := fmt.Sprintf("xxxx/xxxx/%d", time.Now().Unix())
  15. log.Info("clientId:", clientId)
  16. opts.SetClientID(clientId).
  17. SetUsername(config.MqttUsername).
  18. SetPassword(config.MqttPassword)
  19. // 重连订阅
  20. opts.SetReconnectingHandler(mqttService.Subscribe)
  21. opts.SetAutoReconnect(true)
  22. opts.SetKeepAlive(30 * time.Second)
  23. opts.SetPingTimeout(5 * time.Second)
  24. opts.SetDefaultPublishHandler(mqttService.Receive)
  25. MqttClient = mqtt.NewClient(opts)
  26. if token := MqttClient.Connect(); token.Wait() && token.Error() != nil {
  27. log.Fatalf("连接mqtt失败: %v", token.Error())
  28. }
  29. log.Info("Connect mqtt server successfully")
  30. // 订阅
  31. mqttService.Subscribe(MqttClient, opts)
  32. }

 发布、接收、订阅

  1. package mqttService
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. mqtt "github.com/eclipse/paho.mqtt.golang"
  6. log "github.com/sirupsen/logrus"
  7. "io/ioutil"
  8. "xxxx/common/config"
  9. "os"
  10. "strings"
  11. )
  12. // Send 发送MQTT
  13. func Send(client mqtt.Client, topic string, body interface{}) bool {
  14. bytes, err := json.Marshal(body)
  15. if err != nil {
  16. log.Error("序列化失败", err)
  17. return false
  18. }
  19. if token := client.Publish(topic, 1, false, bytes); token.Wait() && token.Error() != nil {
  20. log.Error("发送 MQTT 失败", string(bytes), err)
  21. return false
  22. }
  23. return true
  24. }
  25. // Receive 接收 MQTT
  26. func Receive(client mqtt.Client, msg mqtt.Message) {
  27. message := &vo.ReceiveVO{}
  28. if err := json.Unmarshal(msg.Payload(), message); err != nil {
  29. log.Errorf("Unmarshal mqtt message from %s failed, err = %s", msg.Topic(), err)
  30. return
  31. }
  32. log.Info("message = ", message)
  33. body := message.Body
  34. if &body == nil {
  35. return
  36. }
  37. log.Info("body = ", body)
  38. }
  39. // Subscribe 订阅
  40. func Subscribe(client mqtt.Client, clientOptions *mqtt.ClientOptions) {
  41. topic := fmt.Sprintf("%s", config.MqttServerTopic)
  42. if token := client.Subscribe(fmt.Sprintf("$queue/%s", topic), 2, Receive); token.Wait() && token.Error() != nil {
  43. log.Fatalf("Subscribe topic %s failed, err: %s", topic, token.Error())
  44. }
  45. log.Infof("Subscribe topic %s successfully", topic)
  46. }

 配置信息

  1. package config
  2. const (
  3. // mqtt 地址
  4. MqttUrl = "tcp://xxxxx:1883"
  5. // mqtt 用户名
  6. MqttUsername = "用户名"
  7. // mqtt 密码
  8. MqttPassword = "密码"
  9. // mqtt topic
  10. MqttServerTopic = "Topic"
  11. )

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/article/detail/42887
推荐阅读
相关标签
  

闽ICP备14008679号