赞
踩
免费创建网址:https://cloud.emqx.cn/
或者使用开放免费:
broker: broker.emqx.io
port: 1833
ClientID: go_mqtt_client
Username: emqx
Password: public
案例1:
package main import ( "fmt" "sync" "time" mqtt "github.com/eclipse/paho.mqtt.golang" ) //12847(mqtt), 12173(mqtts), 8083(ws), 8084(wss) const broker = "tcp://tee2b1be.cn.emqx.cloud:12847" const username = "xhcomvip" const password = "" const ClientID = "go_mqtt_client" //message的回调 var onMessage mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { fmt.Printf("[%s] -> %s\n", msg.Topic(), msg.Payload()) } var wg sync.WaitGroup var client mqtt.Client func main() { //连接MQTT服务器 mqttConnect() defer client.Disconnect(250) //注册销毁 wg.Add(1) go mqttSubScribe("topic/test") wg.Add(1) go testPublish() wg.Wait() } func mqttConnect() { //配置 clinetOptions := mqtt.NewClientOptions().AddBroker(broker).SetUsername(username).SetPassword(password) clinetOptions.SetClientID(ClientID) clinetOptions.SetConnectTimeout(time.Duration(60) * time.Second) //连接 client = mqtt.NewClient(clinetOptions) //客户端连接判断 if token := client.Connect(); token.WaitTimeout(time.Duration(60)*time.Second) && token.Wait() && token.Error() != nil { panic(token.Error()) } } func mqttSubScribe(topic string) { defer wg.Done() for { token := client.Subscribe(topic, 1, onMessage) token.Wait() } } //测试 3秒发送一次,然后自己接收 func testPublish() { defer wg.Done() for { client.Publish("topic/test", 1, false, "TEST") time.Sleep(time.Duration(3) * time.Second) } }
案例2:
package main import ( "crypto/tls" "crypto/x509" "fmt" "io/ioutil" "log" "time" mqtt "github.com/eclipse/paho.mqtt.golang" ) var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic()) } var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) { fmt.Println("Connected") } var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) { fmt.Printf("Connect lost: %v", err) } //https://cloud.emqx.cn/console/deployments/0?oper=new // ClientOptions:用于设置 broker,端口,客户端 id ,用户名密码等选项 // messagePubHandler:全局 MQTT pub 消息处理 // connectHandler:连接的回调 // connectLostHandler:连接丢失的回调 func main() { var broker = "broker.emqx.io" var port = 1883 opts := mqtt.NewClientOptions() opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port)) opts.SetClientID("go_mqtt_client") opts.SetUsername("emqx") opts.SetPassword("public") opts.SetDefaultPublishHandler(messagePubHandler) opts.OnConnect = connectHandler opts.OnConnectionLost = connectLostHandler client := mqtt.NewClient(opts) if token := client.Connect(); token.Wait() && token.Error() != nil { panic(token.Error()) } sub(client) publish(client) client.Disconnect(250) } //发布消息 func publish(client mqtt.Client) { num := 10 for i := 0; i < num; i++ { text := fmt.Sprintf("Message %d", i) token := client.Publish("topic/test", 0, false, text) token.Wait() time.Sleep(time.Second) } } //订阅 func sub(client mqtt.Client) { topic := "topic/test" token := client.Subscribe(topic, 1, nil) token.Wait() fmt.Printf("Subscribed to topic: %s", topic) } //如果想使用 TLS 连接,可以如下设置: func NewTlsConfig() *tls.Config { certpool := x509.NewCertPool() ca, err := ioutil.ReadFile("ca.pem") if err != nil { log.Fatalln(err.Error()) } certpool.AppendCertsFromPEM(ca) // Import client certificate/key pair clientKeyPair, err := tls.LoadX509KeyPair("client-crt.pem", "client-key.pem") if err != nil { panic(err) } return &tls.Config{ RootCAs: certpool, ClientAuth: tls.NoClientCert, ClientCAs: nil, InsecureSkipVerify: true, Certificates: []tls.Certificate{clientKeyPair}, } } //如果不设置客户端证书,可以如下设置: func NewTlsConfigs() *tls.Config { certpool := x509.NewCertPool() ca, err := ioutil.ReadFile("ca.pem") if err != nil { log.Fatalln(err.Error()) } certpool.AppendCertsFromPEM(ca) return &tls.Config{ RootCAs: certpool, ClientAuth: tls.NoClientCert, ClientCAs: nil, InsecureSkipVerify: true, } }
案例3:
package main import ( "crypto/tls" "crypto/x509" "flag" "fmt" "io/ioutil" "log" "time" mqtt "github.com/eclipse/paho.mqtt.golang" ) type Config struct { Host string Port int Action string Topic string Username string Password string Qos int Tls bool CaCert string } var Host = flag.String("host", "broker.emqx.io", "server hostname or IP") var Port = flag.Int("port", 1883, "server port") var Action = flag.String("action", "pubsub", "pub/sub/pubsub action") var Protocol = flag.String("protocol", "mqtt", "mqtt/mqtts/ws/wss") var Topic = flag.String("topic", "golang-mqtt/test", "publish/subscribe topic") var Username = flag.String("username", "emqx", "username") var Password = flag.String("password", "public", "password") var Qos = flag.Int("qos", 0, "MQTT QOS") var Tls = flag.Bool("tls", false, "Enable TLS/SSL") var CaCert = flag.String("cacert", "./broker.emqx.io-ca.crt", "tls cacert") func main() { flag.Parse() config := Config{Host: *Host, Port: *Port, Action: *Action, Topic: *Topic, Username: *Username, Password: *Password, Qos: *Qos, Tls: *Tls, CaCert: *CaCert} protocol := *Protocol switch protocol { case "mqtt": MQTTConnection(config) case "mqtts": MQTTSConnection(config) case "ws": WSConnection(config) case "wss": WSSConnection(config) default: log.Fatalf("Unsupported protocol: %s", protocol) } } func Pub(client mqtt.Client, topic string) { pubClient := client i := 1 for { payload := fmt.Sprintf("%d", i) pubClient.Publish(topic, 0, false, payload) log.Printf("pub [%s] %s\n", topic, payload) //i += 1 i++ time.Sleep(1 * time.Second) } } func Sub(client mqtt.Client, topic string) { subClient := client subClient.Subscribe(topic, 0, func(subClient mqtt.Client, msg mqtt.Message) { log.Printf("sub [%s] %s\n", msg.Topic(), string(msg.Payload())) }) for { time.Sleep(1 * time.Second) } } func PubSub(client mqtt.Client, topic string) { go Sub(client, topic) Pub(client, topic) } func connectByMQTT(config Config) mqtt.Client { opts := mqtt.NewClientOptions() broker := fmt.Sprintf("tcp://%s:%d", config.Host, config.Port) opts.AddBroker(broker) opts.SetUsername(config.Username) opts.SetPassword(config.Password) client := mqtt.NewClient(opts) token := client.Connect() for !token.WaitTimeout(3 * time.Second) { } if err := token.Error(); err != nil { log.Fatal(err) } return client } func connectByMQTTS(config Config) mqtt.Client { var tlsConfig tls.Config if config.Tls && config.CaCert == "" { log.Fatalln("TLS field in config is required") } certpool := x509.NewCertPool() ca, err := ioutil.ReadFile(config.CaCert) if err != nil { log.Fatalln(err.Error()) } certpool.AppendCertsFromPEM(ca) tlsConfig.RootCAs = certpool opts := mqtt.NewClientOptions() broker := fmt.Sprintf("ssl://%s:%d", config.Host, config.Port) println(broker) opts.AddBroker(broker) opts.SetUsername(config.Username) opts.SetPassword(config.Password) opts.SetTLSConfig(&tlsConfig) client := mqtt.NewClient(opts) token := client.Connect() for !token.WaitTimeout(3 * time.Second) { } if err := token.Error(); err != nil { log.Fatal(err) } return client } func connectByWS(config Config) mqtt.Client { opts := mqtt.NewClientOptions() broker := fmt.Sprintf("ws://%s:%d/mqtt", config.Host, config.Port) opts.AddBroker(broker) opts.SetUsername(config.Username) opts.SetPassword(config.Password) client := mqtt.NewClient(opts) token := client.Connect() for !token.WaitTimeout(3 * time.Second) { } if err := token.Error(); err != nil { log.Fatal(err) } return client } func connectByWSS(config Config) mqtt.Client { var tlsConfig tls.Config if config.Tls && config.CaCert == "" { log.Fatalln("TLS field in config is required") } certpool := x509.NewCertPool() ca, err := ioutil.ReadFile(config.CaCert) if err != nil { log.Fatalln(err.Error()) } certpool.AppendCertsFromPEM(ca) tlsConfig.RootCAs = certpool opts := mqtt.NewClientOptions() broker := fmt.Sprintf("wss://%s:%d/mqtt", config.Host, config.Port) opts.AddBroker(broker) opts.SetUsername(config.Username) opts.SetPassword(config.Password) opts.SetTLSConfig(&tlsConfig) client := mqtt.NewClient(opts) token := client.Connect() for !token.WaitTimeout(3 * time.Second) { } if err := token.Error(); err != nil { log.Fatal(err) } return client } func MQTTSConnection(config Config) { client := connectByMQTTS(config) action := config.Action switch action { case "pub": Pub(client, config.Topic) case "sub": Sub(client, config.Topic) case "pubsub": PubSub(client, config.Topic) default: log.Fatalf("Unsupported action: %s", action) } } func MQTTConnection(config Config) { client := connectByMQTT(config) action := config.Action switch action { case "pub": Pub(client, config.Topic) case "sub": Sub(client, config.Topic) case "pubsub": PubSub(client, config.Topic) default: log.Fatalf("Unsupported action: %s", action) } } func WSConnection(config Config) { client := connectByWS(config) action := config.Action switch action { case "pub": Pub(client, config.Topic) case "sub": Sub(client, config.Topic) case "pubsub": PubSub(client, config.Topic) default: log.Fatalf("Unsupported action: %s", action) } } func WSSConnection(config Config) { client := connectByWSS(config) action := config.Action switch action { case "pub": Pub(client, config.Topic) case "sub": Sub(client, config.Topic) case "pubsub": PubSub(client, config.Topic) default: log.Fatalf("Unsupported action: %s", action) } }


Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。