赞
踩
目录
2. MQTT的client库Eclipse Paho MQTT Go Client
3. Eclipse Paho MQTT Go Client的库介绍
4.使用Eclipse Paho MQTT Go client实现消息发布和订阅
MQTT是客户机服务器发布/订阅消息传输协议。它重量轻,开放,简单,设计便于实现。这些特性使它非常适合在许多情况下使用,包括受限的环境,如机器到机器(M2M)和物联网(IoT)环境中的通信,这些环境需要很小的代码占用和/或网络带宽非常高。该协议运行在TCP/IP或其他提供有序、无损、双向连接的网络协议上,这样可以实现双向的消息推送服务。
mqtt协议有如下优点:
正因为这些优点,mqtt协议已经作为开发物联网应用的首选。
前面的文章我们已经使用emqx搭建了mqtt的broker,该文章我们讲使用Eclipse Paho MQTT Go client来连接emqx,并且进行消息的发布和订阅。
首先安装paho mqtt client的库:
go get github.com/eclipse/paho.mqtt.golang
func NewClient(o *ClientOptions) Client
这里需要传入一个ClientOptions的对象,该对象主要是为Client设置一些参数,我们查看文档的ClientOptions函数列表可以看到,该对象提供了众多参数选项:

opts := mqtt.NewClientOptions()
通过Options的模式添加选项值,这里我们主要添加上Broker,ClientId和Username,其他的参数可以根据自己的需要自行添加:
- clientId := fmt.Sprintf("%s-%d", nodeId, rand.Intn(100000000))
- opts.AddBroker("tcp://192.168.1.181:1883").SetClientID(clientId).SetUsername(username)
mqttClient := mqtt.NewClient(opts)
查看Client的方法,Client其实是一个接口,该接口提供了如下方法接口:
- type Client interface {
-
- IsConnected() bool
- IsConnectionOpen() bool
-
- Connect() Token
- Disconnect(quiesce uint)
-
- Publish(topic string, qos byte, retained bool, payload interface{}) Token
-
- Subscribe(topic string, qos byte, callback MessageHandler) Token
-
- SubscribeMultiple(filters map[string]byte, callback MessageHandler) Token
-
- Unsubscribe(topics ...string) Token
-
- AddRoute(topic string, callback MessageHandler)
-
- OptionsReader() ClientOptionsReader
-
- }

而具体实现Client接口是内部的client的struct结构,具体我们可以通过NewClient方法去源码查看,这里我们不就不展开了。
- if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
- panic(token.Error())
- }
调用连接方法会返回一个Token的对象,我们可以根据token的Wait方法等待操作返回,根据Token的Error方法来检测返回是否有错误。
我看查看一下Token,实际上可以看到Token是一个如下定义的interface,它提供了如下方法:
- type Token interface {
-
- Wait() bool
-
- WaitTimeout(time.Duration) bool
-
- Done() <-chan struct{}
-
- Error() error
- }
Token接口定义了用于指示操作何时完成的方法,我们可以看到mqtt Client对象的里面好几个方法都返回了Token对象。
Token只是一个interface接口,库提供了多种Token interface的实现,比如Connect操作返回的ConnectToken, Publish方法返回的PublishToken, Subscribe方法返回的SubscribeToken, Unsubscribe方法返回的UnsubscribeToken。他们都实现Token接口
- func send(mqttClient mqtt.Client, topic string, msg Msg) error {
-
- data, err := json.Marshal(msg)
- if err != nil {
- return MsgMarshalError
- }
-
- fmt.Printf("[%s]send data:%s\n", time.Now().Format("2006-01-02 15:04:05"),msg)
- if token := mqttClient.Publish(topic, 1, false, data); token.Wait() && token.Error() != nil {
- fmt.Println(token.Error())
- return token.Error()
- }
- return nil
- }
我们可以通过Client的Publish方法发布消息, 这里我们封装一个发送Json消息的方法:
mqtt中有一个重要的概念叫topic,它跟http协议中的url和消息队列的中的主题有点类似, 他用来识别将消息发送的通道。我们将消息发送该topic的通道之后,当有人通过相同的topic在broker上订阅该消息后, broker会将publish到该topic的消息转发到订阅该topic的客户端上。
我们也可以通过Client的的Subscribe方法来订阅某个topic的消息:
- func Subscribe(mqttClient mqtt.Client, topic string, callback mqtt.MessageHandler) error{
- if token := mqttClient.Subscribe(topic, 1, callback); token.Wait() && token.Error() != nil {
- return token.Error()
- }
- return nil
- }
订阅topic消息后,我们可以传入一个mqtt.MessageHandler的函数方法, MessageHandler方法签名如下:
type MessageHandler func(Client, Message)
通过该传入该回调方法,我们可以获取到订阅主题的接收的Message消息。
订阅消息后,我们还可以通过Unsubscribe方法来取消topic的订阅
上面我们已经分析了Eclipse Paho MQTT Go client库的使用方法,下面我们来具体具体实现mqtt的client,来进行消息的收发
- // MQTT传输的消息的协议
- type Msg struct {
- DeviceNo string `json:deviceNo; comment: 设备号`
- MsgNo uint16 `json:msgNo; comment: 消息号`
- MsgSn uint64 `json:msgSn;comment: 消息序号`
- Body string `json:body;comment: 消息内容`
- }
-
- func NewMsg(deviceNo string, msgNo uint16, msgSn uint64, body string) Msg{
- return Msg{
- DeviceNo: deviceNo,
- MsgNo: msgNo,
- MsgSn: msgSn,
- Body: body,
- }
- }
-
- func (msg *Msg)String() string{
- return fmt.Sprintf("msg:%+v\n", msg)
- }

- type Client struct {
- viper *viper.Viper
- MqttClient mqtt.Client
- }
-
- func NewMqttClient(viper *viper.Viper) *Client{
- return &Client{
- viper: viper,
- }
- }
创建mqtt Client对象的参数通过viper从配置文件读取,所以这里我们直接传入viper.Viper,下面我们定一个初始化方法,来配置参数,并且连接到broker
- func (c *Client)InitService() {
- clientId := fmt.Sprintf("%s-%d", viper.GetString("mqtt.node"), rand.Intn(100000000))
- opts := mqtt.NewClientOptions().AddBroker(c.viper.GetString("mqtt.url")).SetClientID(clientId).SetUsername(c.viper.GetString("mqtt.username")).SetPassword(c.viper.GetString("mqtt.password"))
- c.MqttClient = mqtt.NewClient(opts)
- if token := c.MqttClient.Connect(); token.Wait() && token.Error() != nil {
- panic(token.Error())
- }
- }
在Client里封装发送消息方法
- func(c *Client)Publish(topic string, msg Msg) error{
-
- data, err := json.Marshal(msg)
- if err != nil {
- return MsgMarshalError
- }
-
- if token := c.MqttClient.Publish(topic, 1, false, data); token.Wait() && token.Error() != nil {
- return token.Error()
- }
- return nil
- }
在Client里封装一个添加订阅消息的方法
- func(c *Client)Subscribe(topic string, callback mqtt.MessageHandler) error{
-
- if token := c.MqttClient.Subscribe(topic, 1, callback); token.Wait() && token.Error() != nil {
- return token.Error()
- }
- return nil
- }
同时我们定义一个操作的topic, 我们通过该topic进行消息发送和订阅
const topicPushMsg = "/iot/device/msg/push"
配置文件结构:
- // mqtt.yml
-
- mqtt:
- node: node1
- url: tcp://192.168.1.181:1883
- username: snode-10000
- password: 123456
我们使用一个函数来封装viper读取配置文件,并且返回*viper.VIper的实例
- func NewConfig(path string) (*viper.Viper,error){
- v := viper.New()
- v.AddConfigPath(".")
- v.SetConfigFile(path)
-
- if err := v.ReadInConfig(); err == nil {
- return v, err
- } else {
- return nil, err
- }
- }
- func subscribe(mqttClient *Client){
- mqttClient.Subscribe(topicPushMsg, func(client mqtt.Client, message mqtt.Message) {
- msg := string(message.Payload())
- log.Printf("receive msg: %+v\n", msg)
- })
- }
-
- func main(){
-
- v, err := NewConfig("mqtt.yml")
-
- if err != nil {
- log.Fatalf("config init failure")
- }
-
- mqttClient := NewMqttClient(v)
- mqttClient.InitService()
-
- go subscribe(mqttClient)
-
- msg := NewMsg("459432801001034", 100, 1, "cmdNo=1")
-
- for ;; {
-
- mqttClient.Publish(topicPushMsg, msg)
-
- time.Sleep(time.Duration(10) * time.Second)
- }
- }

启动运行应用之后,在emqx的dashboard中已经可以看到创建的连接了

同时在控制台可以看到打印到的订阅topic接收的消息
- 2022/09/06 17:02:40 receive msg: {"DeviceNo":"459432801001034","MsgNo":100,"MsgSn":1,"Body":"cmdNo=1"}
- 2022/09/06 17:02:50 receive msg: {"DeviceNo":"459432801001034","MsgNo":100,"MsgSn":1,"Body":"cmdNo=1"}
- 2022/09/06 17:03:00 receive msg: {"DeviceNo":"459432801001034","MsgNo":100,"MsgSn":1,"Body":"cmdNo=1"}
- 2022/09/06 17:03:10 receive msg: {"DeviceNo":"459432801001034","MsgNo":100,"MsgSn":1,"Body":"cmdNo=1"}
- 2022/09/06 17:03:20 receive msg: {"DeviceNo":"459432801001034","MsgNo":100,"MsgSn":1,"Body":"cmdNo=1"}
MQTT Client库使用参考: mqtt package - github.com/eclipse/paho.mqtt.golang - Go Packages
MQTT Client源码地址: GitHub - eclipse/paho.mqtt.golang
MQTT协议标准参考:https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.pdf
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。