当前位置:   article > 正文

Golang使用MQTT_golang mqtt

golang mqtt

Golang使用MQTT

最近在接触一些物联网的知识,学到了一款产品,就是MQTT,下面就展示如何使用"github.com/eclipse/paho.mqtt.golang"包去连接MQTT

话不多说,直接上代码,里面有注释,不懂得可以评论区问我

package mqttclient

import (
	"crypto/tls"
	"errors"
	"fmt"
	mqtt "github.com/eclipse/paho.mqtt.golang"
	"log"
	"time"
)

// MqttConnectConfig 连接的相关配置
type MqttConnectConfig struct {
	Broker           string
	User             string
	Password         string
	Certificate      string //证书文件
	PrivateKey       string //秘钥
	ClientId         string
	WillEnabled      bool   //遗愿
	WillTopic        string //遗愿主题
	WillPayload      string //遗愿消息
	WillQos          byte   //遗愿服务质量
	Qos              byte   //服务质量
	Retained         bool   //保留消息
	OnConnect        mqtt.OnConnectHandler
	OnConnectionLost mqtt.ConnectionLostHandler
}

// MqttClient MQTT客户端,此外也包含了几个参数
type MqttClient struct {
	qos      byte
	retained bool
	Client   mqtt.Client
	topics   map[string]mqtt.MessageHandler
}

// 新建证书,也可以不用
func newTLSConfig(certFile string, privateKey string) (*tls.Config, error) {
	cert, err := tls.LoadX509KeyPair(certFile, privateKey)
	if err != nil {
		return nil, err
	}
	return &tls.Config{
		ClientAuth:         tls.NoClientCert, //不需要证书
		ClientCAs:          nil,              //不验证证书
		InsecureSkipVerify: true,             //接受服务器提供的任何证书和该证书中的任何主机名
		Certificates:       []tls.Certificate{cert},
	}, nil
}

func NewMqttClient(config MqttConnectConfig) *MqttClient {
	var c MqttClient
	opts := mqtt.NewClientOptions().AddBroker(config.Broker).SetClientID(config.ClientId).SetMaxReconnectInterval(time.Second * 5)
	if config.WillEnabled {
		opts.SetWill(config.WillTopic, config.WillPayload, config.WillQos, config.Retained)
	}
	//判断是否设置证书
	if config.Certificate != "" {
		tlsConfig, err := newTLSConfig(config.Certificate, config.PrivateKey)
		if err != nil {
			log.Panic(err)
			return nil
		}
		opts.SetTLSConfig(tlsConfig)
	} else {
		opts.SetUsername(config.User).SetPassword(config.Password)
	}
	//初始化
	if config.OnConnect == nil {
		config.OnConnect = func(c mqtt.Client) {}
	}
	if config.OnConnectionLost == nil {
		config.OnConnectionLost = func(c mqtt.Client, err error) {}
	}
	opts.SetOnConnectHandler(c.connectHandler(config.OnConnect)).SetConnectionLostHandler(c.onConnectionLostHandler(config.OnConnectionLost))
	c.Client = mqtt.NewClient(opts)
	c.qos = config.Qos                              // qos的级别
	c.retained = config.Retained                    // 保留消息
	c.topics = make(map[string]mqtt.MessageHandler) //topic
	// 用token的状态判断
	if tc := c.Client.Connect(); tc.Wait() && tc.Error() != nil {
		log.Panic(tc.Error())
		return nil
	}
	return &c
}

// Publish  Mqtt message.
func (mc *MqttClient) Publish(topic string, payload []byte) error {
	if mc != nil && mc.Client.IsConnected() {
		if tc := mc.Client.Publish(topic, mc.qos, mc.retained, payload); tc.Wait() && tc.Error() != nil {
			return tc.Error()
		}
		return nil
	}
	return errors.New("mqttClient is nil or disconnected")
}

// Subscribe subscribe a Mqtt topic.
func (mc *MqttClient) Subscribe(topics []string, onMessage mqtt.MessageHandler) error {
	for _, topic := range topics {
		if tc := mc.Client.Subscribe(topic, mc.qos, onMessage); tc.Wait() && tc.Error() != nil {
			return tc.Error()
		}
		mc.topics[topic] = onMessage
		log.Println(fmt.Sprintf("订阅主题[%s]成功", topic))
	}
	return nil
}

// Unsubscribe unsubscribe a Mqtt topic.
func (mc *MqttClient) Unsubscribe(topics ...string) error {
	if tc := mc.Client.Unsubscribe(topics...); tc.Wait() && tc.Error() != nil {
		return tc.Error()
	}
	for _, topic := range topics {
		delete(mc.topics, topic)
	}
	return nil
}

func (mc *MqttClient) Close() {
	mc.Client.Disconnect(250) //Millisecond
}

func (mc *MqttClient) connectHandler(handler mqtt.OnConnectHandler) mqtt.OnConnectHandler {
	return func(c mqtt.Client) {
		for topic, onMessage := range mc.topics {
			mc.Client.Subscribe(topic, mc.qos, onMessage)
		}
		handler(c)
	}
}

func (mc *MqttClient) onConnectionLostHandler(handler mqtt.ConnectionLostHandler) mqtt.ConnectionLostHandler {
	return func(c mqtt.Client, e error) {
		handler(c, e)
	}
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141

上面代码已经封装好了初始化、连接、推送消息、订阅消息等功能

func main() {
	mqttConnectConfig := mqttclient.MqttConnectConfig{
		//自己设置参数
	}
	mc := mqttclient.NewMqttClient(mqttConnectConfig)
	for i := 65; i < 75; i++ {
		err := mc.Publish("a/b/c", []byte{byte(i)})
		if err != nil {
			log.Panic(err)
			return
		}
		log.Println([]byte{byte(i)})
		time.Sleep(1 * time.Second)
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/article/detail/42894
推荐阅读
相关标签
  

闽ICP备14008679号