当前位置:   article > 正文

golang MQTT消息服务

golang mqtt

作物IoT的公司当然要有物联网的服务,在公司规模还没有形成之初,就计划要将公司全部智能产品线的设备全部联网,与服务端有很好的通信,这样既可以拿到设备的使用数据,通过大数据的分析形成用户画相,还可以下放其他的一些物联网的服务。这是所有IoT的公司需要构建的基础服务,在做基础架构的时候,数据库选了阿里开发的tablestore,tablestrore适合大数据的公司存储海量的运行数据。消息系统则选了mqtt,理由是mqtt的低消耗、低带宽、轻量化的特点,下面是mqtt简述:

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。

MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。

详细的介绍看这里:
https://www.runoob.com/w3cnote/mqtt-intro.html

MQTT协议实现方式

实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。

MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:

(1)Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);
(2)payload,可以理解为消息的内容,是指订阅者具体要使用的内容。

看图会比较直观:
在这里插入图片描述
实际的使用中客户端和服务端都即可以是生产者,也可以是消费者,将生产的消息发送到broker队列,谁订阅谁消费。这样,大体上工作步骤就非常明确了:

1.注册客户端/服务端
2.获取broker地址
3.订阅需要的topic
4.生产/消费消息

以我开发的客户端的代码为例,供大家讨论:

1.模拟新设备开机激活,拿到mac,sn等信息

//设备注册请求
// func Actdev(device []string) (DeviceName string,Sk string,productAlias string,Isact bool,ProductKey string){
func Actdev(device models.DevInfo) (error,models.DevInfo){
    body := Body{}
    Header := make(map[string]string)
    form := make(map[string]string)
    var dev map[string]string


    //计算sign
    // var sign string
    var ttime string = strconv.FormatInt(time.Now().UnixNano() / 1e9,10)

    dev = map[string]string{
        "Product-Key":   device.ProductKey,
        "Mac":      device.Mac ,
        "Sn":   device.Sn,
        "Model": device.ModelName,
        "Version":  "V010101RCN01C016011B1901081S",
        //"Time": strconv.FormatInt(time.Now().UnixNano() / 1e9,10),
        "Time":ttime,
    }
    // log.Info(dev)
    sign := md5.Hmac(device.ProductSecert, dev)
    // log.Info("sign: %s\n",sign)

    url := "http://beta.evice/v1/register"

	//添加 header
    Header["Content-Type"]="application/x-www-form-urlencoded"
    Header["Product-Key"]=device.ProductKey
    Header["Mac"]=device.Mac
    Header["Sn"]= device.Sn
    Header["Model"]=device.ModelName
    Header["Version"]="V010101RCN01C016011B1901081S"
    Header["Time"]= ttime
    form["sign"]= sign
    err,rsp := Postdata(Header, form, dev, url)

    if err !=nil{//fail
        log.Info(string(rsp))
        return err,device
    }else{//ok
		err1 := json.Unmarshal(rsp, &body)
		// log.Info(string(rsp))
		if err1 != nil {
            log.Info(err1)
            return err,device
		}
		// for k,v :=range(Header){
		// 	log.Info(k+":"+v)
		// }
		if body.Code == 10000 {
			// log.Info("--------Device active success!")
            // log.Info(body.Code)
            device.ProductAlias= body.Data.ProductAlias
            device.DeviceName= body.Data.DeviceName
            device.DeviceSecret= body.Data.DeviceSecret
			// log.Info("\n")
		}else{
            log.Info(body.Code)
            return nil,device
        }
	}
    return nil, device
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66

2.设备激活注册成功后获取broker地址:

//传入下发devname获取broker
func Getbroker(device models.DevInfo) (error){ 
    body := Body{}
    Header := make(map[string]string)
    form := make(map[string]string)
    var dev map[string]string


    key := device.ProductKey + device.DeviceSecret
    var ttime string = strconv.FormatInt(time.Now().UnixNano() / 1e9,10)

    Header["Content-Type"]="application/x-www-form-urlencoded"
    Header["Product-Key"]=device.ProductKey
    Header["Mac"]=device.Mac
    Header["Sn"]= device.Sn
    Header["Model"]=device.ModelName
    Header["Version"]="V010101RCN01C016011B1901081S"
    Header["Time"]= ttime

    nsign := md5.Hmac(key, dev)
    //log.Info("sign: %s\n",nsign )


    url2 := "http://beta./broker/v1/address?" + "deviceName=" + device.DeviceName + "&sign=" + nsign
	rsp, err := Get(url2, Header, form)
    if err !=nil{//fail
        // log.Info(string(rsp))
        return err
    }else{//ok
		err1 := json.Unmarshal(rsp, &body)
		// log.Info(string(rsp))
		if err1 != nil {
            log.Info(err1)
            return err1
		}
		// for k,v :=range(Header){
		// 	log.Info(k+":"+v)
		// }
		if body.Code == 10000 {
			// log.Info("--------GET broker success!")
            // log.Info(body.Code)
            // dev["ProductAlias"]= Alias
			// dev["deviceName"]= body.Data.DeviceName
            // dev["DeviceSecret"]= Secret
            // log.Info(dev)
			// log.Info("\n")

		}else{
            log.Info(body.Code)
            return nil
        }
    }

    return nil
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56

3.获取到broker地址后注册mqtt客户端:

func Newmqclient(device models.DevInfo) (Client,error)  {

	broker:= "tcp://127.0.0.1:1883"//支持大多数通信协议
	opts := mqtt.NewClientOptions().AddBroker(broker)//beta
	devname :=  device.DeviceName+"@"+device.ProductAlias
	passwd:= device.DeviceSecret

	opts.Username = devname
	opts.Password = passwd

	//timestamp := strconv.FormatInt(time.Now().UTC().UnixNano(), 10)
	//timestamp := time.Now().Format("2006-01-02 15:04:05.000000")
	clientID:="|aiot|device|"+devname+","+"deviceId="+device.DeviceID+","+"sn="+device.Sn+ "|"
	opts.SetClientID(clientID)
	// opts.SetDefaultPublishHandler(defaultHandler)
	//opts.SetConnectionLostHandler(onLostHandler)
	opts.SetAutoReconnect(true)
	client := NewClient(broker,devname,passwd,clientID)
	// client = mqtt.NewClient(opts)
	var err = client.Connect()
	if err != nil {
		return nil, err
	} 
	// log.Info(opts.ClientID)

	// deviceproductkey = productkey
	ClientSuccess++
	log.Info("%v | MQ is connected  %d\n",device.DeviceName,ClientSuccess)
	// DevicemqName = deviceName
	//log := timestamp+" | "+deviceName+"connect to MQ " +strings.ToUpper(strconv.FormatBool(Isconn))+"\n"
	//logs.Logfile(log)
	// for {
	// 	time.Sleep(3*time.Second)
	// }
	return client,nil
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

这里先简单介绍mqtt,后续有完整的事件再详细介绍mqtt的其他方法

4.订阅topic:

func (c *defaultClient) SubscribeUplink(client Client, topic string, device models.DevInfo) Token {
	
	return c.client.Subscribe(topic, QoS, func(mqtt MQTT.Client, msg MQTT.Message) {
		// Determine the actual topic
		log.Info("Success SubscribeUplink with device: %v", device.DeviceName)
		CheckReq(client, device, msg.Payload(), topic)//listen
	})
	
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

5.监听消息并处理:

// CheckReq ...
func CheckReq(client Client, device models.DevInfo, msg []byte, topic string) {

	//log.Infof("[MESSSAGE_RECEIVED] topic=%s, msg=%s, device=%v", topic, string(msg), device.DeviceName)
	var msgInfo models.MsgInfo
	json.Unmarshal(msg, &msgInfo)
	// replyId := msgInfo.MsgId
	ota_check_launch := "/aiot/{ProductKey}/{DeviceName}/running/launch"
	ota_check_online    := "/aiot/{ProductKey}/{DeviceName}/req/status/online"
	ota_version_confirm := "/ota/device/{ProductKey}/{DeviceName}/req/version/inform"
	ota_version_notify  := "/ota/device/{ProductKey}/{DeviceName}/req/version/notify"

	ota_check_launch =strings.Replace(ota_check_launch, "{ProductKey}", device.ProductKey, -1)
	ota_check_online = strings.Replace(ota_check_online, "{ProductKey}", device.ProductKey, -1)
	ota_check_online = strings.Replace(ota_check_online, "{DeviceName}", device.DeviceName, -1)
	ota_version_confirm = strings.Replace(ota_version_confirm, "{ProductKey}", device.ProductKey, -1)
	ota_version_confirm = strings.Replace(ota_version_confirm, "{DeviceName}", device.DeviceName, -1)
	ota_version_notify = strings.Replace(ota_version_notify, "{ProductKey}", device.ProductKey, -1)
	ota_version_notify = strings.Replace(ota_version_notify, "{DeviceName}", device.DeviceName, -1)

	//log.Infof("Received Topic: %s, device: %s, match: %s", topic, device.DeviceName, ota_check_online)
	switch topic {
	case ota_check_online:
		//log.Infof("Processing... topic=%s,match=%s,method=%s", topic, ota_check_online, "OTA_CHECK_ONLINE")
		// DeviceStatusOnlineReq(client, device, replyId)
	case ota_version_confirm:
		//log.Debugf("o=%s,t=%s,method=%s", topic, ota_version_confirm, "OTA_VERSION_CONFORM")
		// DeviceVersionInformReq(client, device, replyId)
	case ota_version_notify:
		//log.Debugf("o=%s,t=%s,method=%s", topic, ota_version_notify, "OTA_VERSION_NOTIFY")
		// DevicePushVersionReq(client, device, replyId, string(msg))
	case ota_check_launch:
		log.Info(topic)

	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

6.发布消息:

func LaunchReq(client Client, device models.DevInfo) {
	topic := "/aiot/" + device.ProductKey + "/" + device.DeviceName + "/running/launch"
	// log.Info(topic)
	msgInfo := models.MsgInfo{
		Code:    10000,
		Data:    "",
		MsgId:   strconv.FormatInt(time.Now().Unix(), 10),
		Time:    time.Now().UnixNano() / 1e6,
		Version: "1.0",
	}
	jsonBytes, _ := json.Marshal(msgInfo)
	if client != nil {
		client.PublishUplink(topic, string(jsonBytes))
		Publistcount++
		now :=time.Now().Format("2006-01-02 15:04:05")
		// db.Logfile(now +" | "+strconv.Itoa(Publistcount)+" | "+topic)
		log.Info("published %d    "+topic, Publistcount)
		db.InsertDB(device.DeviceName,now)
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

模拟发布设备开机的消息

实际应用中可能更复杂,这里只介绍最基本的使用,欢迎有兴趣的童靴留言讨论

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/article/detail/42895
推荐阅读
相关标签
  

闽ICP备14008679号