赞
踩
最近在接触一些物联网的知识,学到了一款产品,就是MQTT,下面就展示如何使用"github.com/eclipse/paho.mqtt.golang"包去连接MQTT
话不多说,直接上代码,里面有注释,不懂得可以评论区问我
package mqttclient import ( "crypto/tls" "errors" "fmt" mqtt "github.com/eclipse/paho.mqtt.golang" "log" "time" ) // MqttConnectConfig 连接的相关配置 type MqttConnectConfig struct { Broker string User string Password string Certificate string //证书文件 PrivateKey string //秘钥 ClientId string WillEnabled bool //遗愿 WillTopic string //遗愿主题 WillPayload string //遗愿消息 WillQos byte //遗愿服务质量 Qos byte //服务质量 Retained bool //保留消息 OnConnect mqtt.OnConnectHandler OnConnectionLost mqtt.ConnectionLostHandler } // MqttClient MQTT客户端,此外也包含了几个参数 type MqttClient struct { qos byte retained bool Client mqtt.Client topics map[string]mqtt.MessageHandler } // 新建证书,也可以不用 func newTLSConfig(certFile string, privateKey string) (*tls.Config, error) { cert, err := tls.LoadX509KeyPair(certFile, privateKey) if err != nil { return nil, err } return &tls.Config{ ClientAuth: tls.NoClientCert, //不需要证书 ClientCAs: nil, //不验证证书 InsecureSkipVerify: true, //接受服务器提供的任何证书和该证书中的任何主机名 Certificates: []tls.Certificate{cert}, }, nil } func NewMqttClient(config MqttConnectConfig) *MqttClient { var c MqttClient opts := mqtt.NewClientOptions().AddBroker(config.Broker).SetClientID(config.ClientId).SetMaxReconnectInterval(time.Second * 5) if config.WillEnabled { opts.SetWill(config.WillTopic, config.WillPayload, config.WillQos, config.Retained) } //判断是否设置证书 if config.Certificate != "" { tlsConfig, err := newTLSConfig(config.Certificate, config.PrivateKey) if err != nil { log.Panic(err) return nil } opts.SetTLSConfig(tlsConfig) } else { opts.SetUsername(config.User).SetPassword(config.Password) } //初始化 if config.OnConnect == nil { config.OnConnect = func(c mqtt.Client) {} } if config.OnConnectionLost == nil { config.OnConnectionLost = func(c mqtt.Client, err error) {} } opts.SetOnConnectHandler(c.connectHandler(config.OnConnect)).SetConnectionLostHandler(c.onConnectionLostHandler(config.OnConnectionLost)) c.Client = mqtt.NewClient(opts) c.qos = config.Qos // qos的级别 c.retained = config.Retained // 保留消息 c.topics = make(map[string]mqtt.MessageHandler) //topic // 用token的状态判断 if tc := c.Client.Connect(); tc.Wait() && tc.Error() != nil { log.Panic(tc.Error()) return nil } return &c } // Publish Mqtt message. func (mc *MqttClient) Publish(topic string, payload []byte) error { if mc != nil && mc.Client.IsConnected() { if tc := mc.Client.Publish(topic, mc.qos, mc.retained, payload); tc.Wait() && tc.Error() != nil { return tc.Error() } return nil } return errors.New("mqttClient is nil or disconnected") } // Subscribe subscribe a Mqtt topic. func (mc *MqttClient) Subscribe(topics []string, onMessage mqtt.MessageHandler) error { for _, topic := range topics { if tc := mc.Client.Subscribe(topic, mc.qos, onMessage); tc.Wait() && tc.Error() != nil { return tc.Error() } mc.topics[topic] = onMessage log.Println(fmt.Sprintf("订阅主题[%s]成功", topic)) } return nil } // Unsubscribe unsubscribe a Mqtt topic. func (mc *MqttClient) Unsubscribe(topics ...string) error { if tc := mc.Client.Unsubscribe(topics...); tc.Wait() && tc.Error() != nil { return tc.Error() } for _, topic := range topics { delete(mc.topics, topic) } return nil } func (mc *MqttClient) Close() { mc.Client.Disconnect(250) //Millisecond } func (mc *MqttClient) connectHandler(handler mqtt.OnConnectHandler) mqtt.OnConnectHandler { return func(c mqtt.Client) { for topic, onMessage := range mc.topics { mc.Client.Subscribe(topic, mc.qos, onMessage) } handler(c) } } func (mc *MqttClient) onConnectionLostHandler(handler mqtt.ConnectionLostHandler) mqtt.ConnectionLostHandler { return func(c mqtt.Client, e error) { handler(c, e) } }
上面代码已经封装好了初始化、连接、推送消息、订阅消息等功能
func main() {
mqttConnectConfig := mqttclient.MqttConnectConfig{
//自己设置参数
}
mc := mqttclient.NewMqttClient(mqttConnectConfig)
for i := 65; i < 75; i++ {
err := mc.Publish("a/b/c", []byte{byte(i)})
if err != nil {
log.Panic(err)
return
}
log.Println([]byte{byte(i)})
time.Sleep(1 * time.Second)
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。