赞
踩
连接阿里云mqtt服务
连接
package mqtt
import (
"crypto/tls"
"crypto/x509"
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"io/ioutil"
"log"
"src/goProject1/base_64"
"src/goProject1/config"
"strconv"
"time"
)
var (
broker string
port int
AccessKey_ID string
AccessKey_Secret string
InstanceId string
client mqtt.Client
groupId string
clientId string
connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
fmt.Println("Connected")
}
connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
fmt.Printf("Connect lost: %v", err)
}
// 默认处理接收的消息
messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
}
)
func init() {
broker = config.ENV["mqtt"]["broker"]
port, _ = strconv.Atoi(config.ENV["mqtt"]["port"])
AccessKey_ID = config.ENV["alibaba_mq"]["AccessKey_ID"]
AccessKey_Secret = config.ENV["alibaba_mq"]["AccessKey_Secret"]
InstanceId = config.ENV["alibaba_mq"]["InstanceId"]
groupId = "GID_server"
clientId = groupId + "@@@" + "maochang_test"
conn()
checkToken()
}
func GetClientID() string {
return clientId
}
func checkToken() {
if token := client.Connect(); token.Wait() && token.Error() != nil {
fmt.Println(" -- ")
fmt.Printf(token.Error().Error())
panic(token.Error())
}
}
func conn() {
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
userName := "Signature" + "|" + AccessKey_ID + "|" + InstanceId
password := base_64.HmacSha1(AccessKey_Secret, clientId)
fmt.Println(userName)
fmt.Println("password == ", password)
opts.SetClientID(clientId)
opts.SetUsername(userName)
opts.SetPassword(string(base_64.Decode(password)))
opts.SetDefaultPublishHandler(messagePubHandler)
//opts.SetPingTimeout(5)
//opts.SetKeepAlive(5)
opts.SetAutoReconnect(true)
opts.SetMaxReconnectInterval(3)
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
client = mqtt.NewClient(opts)
}
func Main() {
//var broker = "broker.emqx.io"
//broker = "127.0.0.1"
//port = 1883
//password := base_64.b64encode(hmac.new(secretKey.encode(), client_id.encode(), sha1).digest()).decode()
//client.username_pw_set(userName, password)
//sub(client)
//m := make(map[string]byte)z
//m["mao"] = 1
//client.SubscribeMultiple(m, MessageHandler)
//publish(client)
// 订阅
//SubOne("mao/p2p/"+clientId, 1, nil)
//SubOne("mao", 1, nil)
//SubOne("mao/mao", 1, nil)
//SubOne("mao/mao/"+clientId, 1, nil)
//client.AddRoute("mao", nil)
//发布消息
//PublishOne("mao/mao/"+clientId, 0, false, "dsggfj")
//PublishOne("mao/p2p/"+clientId, 0, false, "dsggfj")
//PublishOne("mao", 1, true, "dsggfj2")
//PublishOne("mao", 1, true, "dsggfj3")
//PublishOne("mao", 1, true, "dsggfj4")
time.Sleep(time.Second)
client.Disconnect(250)
}
func MessageHandler(client mqtt.Client, msg mqtt.Message) {
//fmt.Printf("%v\n",msg.MessageID())
fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
}
func NewTlsConfig() *tls.Config {
certpool := x509.NewCertPool()
ca, err := ioutil.ReadFile("ca.pem")
if err != nil {
log.Fatalln(err.Error())
}
certpool.AppendCertsFromPEM(ca)
// Import client certificate/key pair
clientKeyPair, err := tls.LoadX509KeyPair("client-crt.pem", "client-key.pem")
if err != nil {
panic(err)
}
return &tls.Config{
RootCAs: certpool,
ClientAuth: tls.NoClientCert,
ClientCAs: nil,
InsecureSkipVerify: true,
Certificates: []tls.Certificate{clientKeyPair},
}
}
订阅接收
package mqtt
import (
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"time"
)
func sub(client mqtt.Client) {
topic := "mao"
token := client.Subscribe(topic, 1, nil)
token.Wait()
fmt.Printf("Subscribed to topic %s %v \n", topic, token)
}
func SubOne(topic string, qos byte, callback mqtt.MessageHandler) mqtt.Token {
token := client.Subscribe(topic, qos, callback)
token.Wait()
fmt.Printf("Subscribed to topic %s %v \n", topic, token)
return token
}
func PublishOne(topic string, qos byte, retained bool, payload interface{}) mqtt.Token {
token := client.Publish(topic, qos, retained, payload)
token.Wait()
if token.Error() != nil {
fmt.Println("token = ", token.Error())
}
return token
}
func publish(client mqtt.Client) {
num := 5
for i := 0; i < num; i++ {
text := fmt.Sprintf("Message %d", i)
token := PublishOne("mao1", 1, false, text)
fmt.Println("token:", token)
time.Sleep(time.Second)
}
}
base_64 编码
package base_64
import (
"crypto/hmac"
"crypto/sha1"
"encoding/base64"
"fmt"
"io"
)
func Encode(data string) string {
// Base64 Standard Encoding
sEnc := base64.StdEncoding.EncodeToString([]byte(data))
fmt.Println(sEnc) //
return sEnc
}
func Decode(data string) []byte {
sEnc := base64.StdEncoding.EncodeToString([]byte(data))
// Base64 Standard Decoding
sDec, err := base64.StdEncoding.DecodeString(sEnc)
if err != nil {
fmt.Printf("Error decoding string: %s ", err.Error())
return nil
}
fmt.Println(string(sDec)) //hello world12345!?$*&()'-@~
return sDec
}
func Sha1() {
h := sha1.New()
io.WriteString(h, "aaaaaa")
fmt.Printf("%x\n", h.Sum(nil))
//hmac ,use sha1
key := []byte("123456")
mac := hmac.New(sha1.New, key)
mac.Write([]byte("aaaaaa"))
fmt.Printf("%x\n", mac.Sum(nil))
}
func HmacSha1(keyStr, value string) string {
key := []byte(keyStr)
mac := hmac.New(sha1.New, key)
mac.Write([]byte(value))
//进行base64编码
res := base64.StdEncoding.EncodeToString(mac.Sum(nil))
return res
}
config.ini
[mqtt] broker = port = 1883 [alibaba_mq] broker = port = 1883 addr = AccessKey_ID = AccessKey_Secret = InstanceId =
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。