赞
踩
MQTT(Message Queue Telemetry Transport)是一种轻量级的消息传递协议,支持发布-订阅模式。在MQTT中,发布者将消息发布到主题(topic),而订阅者可以订阅一个或多个主题以接收相关消息。
要发布消息,客户端需要连接到MQTT代理(broker)并指定要发布的主题和消息内容。代理将消息转发给所有已经订阅该主题的客户端。
要订阅主题,客户端需要连接到MQTT代理并指定要订阅的主题。代理将根据订阅信息,向客户端发送所订阅主题下的所有消息。
可以使用不同QoS(Quality of Service)等级来控制消息传递的可靠性和效率。QoS级别越高,消息传递越可靠但是也会增加网络开销。
MQTT的发布和订阅模式是基于代理(broker)来实现的。客户端通过连接到代理并发送相应的消息,代理将根据主题(topic)将消息转发给所有已经订阅该主题的客户端。
当客户端发布一个消息时,它需要指定要发布的主题和消息内容,并将消息发送到代理。代理会记录下该消息并向所有已经订阅该主题的客户端转发该消息。
当客户端订阅一个主题时,它需要指定所需的QoS(Quality of Service)等级和订阅主题的名称。代理会将此信息记录下来,并将该主题的所有未读消息发送给该客户端。在此之后,代理将继续向该客户端发送任何新的与该主题相关的消息。
注意:当客户端订阅主题时,可以使用通配符来匹配多个主题。例如,使用“+”订阅“a/+”将匹配所有以“a/”开头的主题。或者使用“#”订阅“a/#”将匹配所有以“a/”开头的主题及其子主题。
- package start
-
- import (
- "fmt"
- mqtt "github.com/eclipse/paho.mqtt.golang"
- log "github.com/sirupsen/logrus"
- "xxxx/common/config"
- mqttService "xxxx/common/mqtt"
- "time"
- )
-
- var MqttClient mqtt.Client
-
- // 初始化MQTT
- func MQTT() {
- opts := mqtt.NewClientOptions().AddBroker(config.MqttUrl)
- clientId := fmt.Sprintf("xxxx/xxxx/%d", time.Now().Unix())
- log.Info("clientId:", clientId)
- opts.SetClientID(clientId).
- SetUsername(config.MqttUsername).
- SetPassword(config.MqttPassword)
-
- // 重连订阅
- opts.SetReconnectingHandler(mqttService.Subscribe)
- opts.SetAutoReconnect(true)
- opts.SetKeepAlive(30 * time.Second)
- opts.SetPingTimeout(5 * time.Second)
- opts.SetDefaultPublishHandler(mqttService.Receive)
-
- MqttClient = mqtt.NewClient(opts)
- if token := MqttClient.Connect(); token.Wait() && token.Error() != nil {
- log.Fatalf("连接mqtt失败: %v", token.Error())
- }
- log.Info("Connect mqtt server successfully")
- // 订阅
- mqttService.Subscribe(MqttClient, opts)
- }

- package mqttService
-
- import (
- "encoding/json"
- "fmt"
- mqtt "github.com/eclipse/paho.mqtt.golang"
- log "github.com/sirupsen/logrus"
- "io/ioutil"
- "xxxx/common/config"
- "os"
- "strings"
- )
-
- // Send 发送MQTT
- func Send(client mqtt.Client, topic string, body interface{}) bool {
- bytes, err := json.Marshal(body)
- if err != nil {
- log.Error("序列化失败", err)
- return false
- }
-
- if token := client.Publish(topic, 1, false, bytes); token.Wait() && token.Error() != nil {
- log.Error("发送 MQTT 失败", string(bytes), err)
- return false
- }
- return true
- }
-
- // Receive 接收 MQTT
- func Receive(client mqtt.Client, msg mqtt.Message) {
- message := &vo.ReceiveVO{}
- if err := json.Unmarshal(msg.Payload(), message); err != nil {
- log.Errorf("Unmarshal mqtt message from %s failed, err = %s", msg.Topic(), err)
- return
- }
- log.Info("message = ", message)
- body := message.Body
- if &body == nil {
- return
- }
- log.Info("body = ", body)
- }
-
- // Subscribe 订阅
- func Subscribe(client mqtt.Client, clientOptions *mqtt.ClientOptions) {
- topic := fmt.Sprintf("%s", config.MqttServerTopic)
- if token := client.Subscribe(fmt.Sprintf("$queue/%s", topic), 2, Receive); token.Wait() && token.Error() != nil {
- log.Fatalf("Subscribe topic %s failed, err: %s", topic, token.Error())
- }
- log.Infof("Subscribe topic %s successfully", topic)
- }

- package config
-
- const (
- // mqtt 地址
- MqttUrl = "tcp://xxxxx:1883"
- // mqtt 用户名
- MqttUsername = "用户名"
- // mqtt 密码
- MqttPassword = "密码"
- // mqtt topic
- MqttServerTopic = "Topic"
- )
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。