当前位置:   article > 正文

使用go的MQTT Client接入EMQX_golang mqtt client

golang mqtt client

目录

1. MQTT协议介绍

2. MQTT的client库Eclipse Paho MQTT Go Client

3. Eclipse Paho MQTT Go Client的库介绍

4.使用Eclipse Paho MQTT Go client实现消息发布和订阅

5.参考链接


1. MQTT协议介绍

MQTT是客户机服务器发布/订阅消息传输协议。它重量轻,开放,简单,设计便于实现。这些特性使它非常适合在许多情况下使用,包括受限的环境,如机器到机器(M2M)和物联网(IoT)环境中的通信,这些环境需要很小的代码占用和/或网络带宽非常高。该协议运行在TCP/IP或其他提供有序、无损、双向连接的网络协议上,这样可以实现双向的消息推送服务。

mqtt协议有如下优点:

  • 使用发布/订阅消息模式,提供一对多消息分发和应用程序解耦。
  • 与有效负载内容无关的消息传输。
  • 信息传递的服务质量控制Qos: “最多一次”,“至少一次”,“精确一次”的三种类型
  • 较小的传输开销和最小化的协议交换以减少网络流量。

正因为这些优点,mqtt协议已经作为开发物联网应用的首选。

2. MQTT的client库Eclipse Paho MQTT Go Client

前面的文章我们已经使用emqx搭建了mqtt的broker,该文章我们讲使用Eclipse Paho MQTT Go client来连接emqx,并且进行消息的发布和订阅。

首先安装paho mqtt client的库:

go get github.com/eclipse/paho.mqtt.golang

3. Eclipse Paho MQTT Go Client的库介绍

  • 使用Mqtt连接broker, 我们需要创建一个mqtt的Client对象, 查看paho.mqtt.golang的文档,提供了如下创建Client方法:
func NewClient(o *ClientOptions) Client

这里需要传入一个ClientOptions的对象,该对象主要是为Client设置一些参数,我们查看文档的ClientOptions函数列表可以看到,该对象提供了众多参数选项:

  •  我们根据ClientOptions的要求,首先使用NewClientOptions构造一个ClientOptions的对象:
opts := mqtt.NewClientOptions()

      通过Options的模式添加选项值,这里我们主要添加上Broker,ClientId和Username,其他的参数可以根据自己的需要自行添加:

  1. clientId := fmt.Sprintf("%s-%d", nodeId, rand.Intn(100000000))
  2. opts.AddBroker("tcp://192.168.1.181:1883").SetClientID(clientId).SetUsername(username)
  • 准备好ClientOptions后,我们可以NewClient函数创建Client对象:
mqttClient := mqtt.NewClient(opts)

       查看Client的方法,Client其实是一个接口,该接口提供了如下方法接口:

  1. type Client interface {​
  2. IsConnected() bool
  3. IsConnectionOpen() bool
  4. Connect() Token​
  5. Disconnect(quiesce uint)​
  6. Publish(topic string, qos byte, retained bool, payload interface{}) Token​
  7. Subscribe(topic string, qos byte, callback MessageHandler) Token
  8. SubscribeMultiple(filters map[string]byte, callback MessageHandler) Token
  9. Unsubscribe(topics ...string) Token​
  10. AddRoute(topic string, callback MessageHandler)​
  11. OptionsReader() ClientOptionsReader
  12. }

        而具体实现Client接口是内部的client的struct结构,具体我们可以通过NewClient方法去源码查看,这里我们不就不展开了。

  • 有了MqttClient对象,我们先进行连接到Broker
  1. if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
  2. panic(token.Error())
  3. }

调用连接方法会返回一个Token的对象,我们可以根据token的Wait方法等待操作返回,根据Token的Error方法来检测返回是否有错误。

我看查看一下Token,实际上可以看到Token是一个如下定义的interface,它提供了如下方法:

  1. type Token interface {
  2. Wait() bool
  3. WaitTimeout(time.Duration) bool
  4. Done() <-chan struct{}
  5. Error() error
  6. }

Token接口定义了用于指示操作何时完成的方法,我们可以看到mqtt Client对象的里面好几个方法都返回了Token对象。

Token只是一个interface接口,库提供了多种Token interface的实现,比如Connect操作返回的ConnectToken, Publish方法返回的PublishToken, Subscribe方法返回的SubscribeToken, Unsubscribe方法返回的UnsubscribeToken。他们都实现Token接口

  • 创建了mqtt Client对象,完成连接之后,我们可以使用Client的方法来进行消息的收发
  1. func send(mqttClient mqtt.Client, topic string, msg Msg) error {
  2. data, err := json.Marshal(msg)
  3. if err != nil {
  4. return MsgMarshalError
  5. }
  6. fmt.Printf("[%s]send data:%s\n", time.Now().Format("2006-01-02 15:04:05"),msg)
  7. if token := mqttClient.Publish(topic, 1, false, data); token.Wait() && token.Error() != nil {
  8. fmt.Println(token.Error())
  9. return token.Error()
  10. }
  11. return nil
  12. }

我们可以通过Client的Publish方法发布消息, 这里我们封装一个发送Json消息的方法:

mqtt中有一个重要的概念叫topic,它跟http协议中的url和消息队列的中的主题有点类似, 他用来识别将消息发送的通道。我们将消息发送该topic的通道之后,当有人通过相同的topic在broker上订阅该消息后, broker会将publish到该topic的消息转发到订阅该topic的客户端上。

我们也可以通过Client的的Subscribe方法来订阅某个topic的消息:

  1. func Subscribe(mqttClient mqtt.Client, topic string, callback mqtt.MessageHandler) error{
  2. if token := mqttClient.Subscribe(topic, 1, callback); token.Wait() && token.Error() != nil {
  3. return token.Error()
  4. }
  5. return nil
  6. }

订阅topic消息后,我们可以传入一个mqtt.MessageHandler的函数方法, MessageHandler方法签名如下:

type MessageHandler func(Client, Message)

通过该传入该回调方法,我们可以获取到订阅主题的接收的Message消息。

订阅消息后,我们还可以通过Unsubscribe方法来取消topic的订阅

  • 当我们要主动断开mqtt Client的连接时,可以使用Disconnect(quiesce uint)方法开关闭连接, quiesce参数表示关闭连接后等待时间。

4.使用Eclipse Paho MQTT Go client实现消息发布和订阅

上面我们已经分析了Eclipse Paho MQTT Go client库的使用方法,下面我们来具体具体实现mqtt的client,来进行消息的收发

  • 我们我们使用json来定义一个消息的格式,实际开发的中我们可以使用其他的协议来定义,比如pb
  1. // MQTT传输的消息的协议
  2. type Msg struct {
  3. DeviceNo string `json:deviceNo; comment: 设备号`
  4. MsgNo uint16 `json:msgNo; comment: 消息号`
  5. MsgSn uint64 `json:msgSn;comment: 消息序号`
  6. Body string `json:body;comment: 消息内容`
  7. }
  8. func NewMsg(deviceNo string, msgNo uint16, msgSn uint64, body string) Msg{
  9. return Msg{
  10. DeviceNo: deviceNo,
  11. MsgNo: msgNo,
  12. MsgSn: msgSn,
  13. Body: body,
  14. }
  15. }
  16. func (msg *Msg)String() string{
  17. return fmt.Sprintf("msg:%+v\n", msg)
  18. }
  • 我们封装一下MQTT的Client对象
  1. type Client struct {
  2. viper *viper.Viper
  3. MqttClient mqtt.Client
  4. }
  5. func NewMqttClient(viper *viper.Viper) *Client{
  6. return &Client{
  7. viper: viper,
  8. }
  9. }

       创建mqtt Client对象的参数通过viper从配置文件读取,所以这里我们直接传入viper.Viper,下面我们定一个初始化方法,来配置参数,并且连接到broker

  1. func (c *Client)InitService() {
  2. clientId := fmt.Sprintf("%s-%d", viper.GetString("mqtt.node"), rand.Intn(100000000))
  3. opts := mqtt.NewClientOptions().AddBroker(c.viper.GetString("mqtt.url")).SetClientID(clientId).SetUsername(c.viper.GetString("mqtt.username")).SetPassword(c.viper.GetString("mqtt.password"))
  4. c.MqttClient = mqtt.NewClient(opts)
  5. if token := c.MqttClient.Connect(); token.Wait() && token.Error() != nil {
  6. panic(token.Error())
  7. }
  8. }

在Client里封装发送消息方法

  1. func(c *Client)Publish(topic string, msg Msg) error{
  2. data, err := json.Marshal(msg)
  3. if err != nil {
  4. return MsgMarshalError
  5. }
  6. if token := c.MqttClient.Publish(topic, 1, false, data); token.Wait() && token.Error() != nil {
  7. return token.Error()
  8. }
  9. return nil
  10. }

在Client里封装一个添加订阅消息的方法

  1. func(c *Client)Subscribe(topic string, callback mqtt.MessageHandler) error{
  2. if token := c.MqttClient.Subscribe(topic, 1, callback); token.Wait() && token.Error() != nil {
  3. return token.Error()
  4. }
  5. return nil
  6. }

同时我们定义一个操作的topic, 我们通过该topic进行消息发送和订阅

const topicPushMsg = "/iot/device/msg/push"
  • 读取配置文件

   配置文件结构:

  1. // mqtt.yml
  2. mqtt:
  3. node: node1
  4. url: tcp://192.168.1.181:1883
  5. username: snode-10000
  6. password: 123456

我们使用一个函数来封装viper读取配置文件,并且返回*viper.VIper的实例

  1. func NewConfig(path string) (*viper.Viper,error){
  2. v := viper.New()
  3. v.AddConfigPath(".")
  4. v.SetConfigFile(path)
  5. if err := v.ReadInConfig(); err == nil {
  6. return v, err
  7. } else {
  8. return nil, err
  9. }
  10. }
  • 我们写个测试用例来实际测试一下
  1. func subscribe(mqttClient *Client){
  2. mqttClient.Subscribe(topicPushMsg, func(client mqtt.Client, message mqtt.Message) {
  3. msg := string(message.Payload())
  4. log.Printf("receive msg: %+v\n", msg)
  5. })
  6. }
  7. func main(){
  8. v, err := NewConfig("mqtt.yml")
  9. if err != nil {
  10. log.Fatalf("config init failure")
  11. }
  12. mqttClient := NewMqttClient(v)
  13. mqttClient.InitService()
  14. go subscribe(mqttClient)
  15. msg := NewMsg("459432801001034", 100, 1, "cmdNo=1")
  16. for ;; {
  17. mqttClient.Publish(topicPushMsg, msg)
  18. time.Sleep(time.Duration(10) * time.Second)
  19. }
  20. }

启动运行应用之后,在emqx的dashboard中已经可以看到创建的连接了

 同时在控制台可以看到打印到的订阅topic接收的消息

  1. 2022/09/06 17:02:40 receive msg: {"DeviceNo":"459432801001034","MsgNo":100,"MsgSn":1,"Body":"cmdNo=1"}
  2. 2022/09/06 17:02:50 receive msg: {"DeviceNo":"459432801001034","MsgNo":100,"MsgSn":1,"Body":"cmdNo=1"}
  3. 2022/09/06 17:03:00 receive msg: {"DeviceNo":"459432801001034","MsgNo":100,"MsgSn":1,"Body":"cmdNo=1"}
  4. 2022/09/06 17:03:10 receive msg: {"DeviceNo":"459432801001034","MsgNo":100,"MsgSn":1,"Body":"cmdNo=1"}
  5. 2022/09/06 17:03:20 receive msg: {"DeviceNo":"459432801001034","MsgNo":100,"MsgSn":1,"Body":"cmdNo=1"}

5.参考链接

MQTT Client库使用参考: mqtt package - github.com/eclipse/paho.mqtt.golang - Go Packages

MQTT Client源码地址: GitHub - eclipse/paho.mqtt.golang

MQTT协议标准参考:https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.pdf

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/article/detail/42897
推荐阅读
相关标签
  

闽ICP备14008679号