赞
踩
作物IoT的公司当然要有物联网的服务,在公司规模还没有形成之初,就计划要将公司全部智能产品线的设备全部联网,与服务端有很好的通信,这样既可以拿到设备的使用数据,通过大数据的分析形成用户画相,还可以下放其他的一些物联网的服务。这是所有IoT的公司需要构建的基础服务,在做基础架构的时候,数据库选了阿里开发的tablestore,tablestrore适合大数据的公司存储海量的运行数据。消息系统则选了mqtt,理由是mqtt的低消耗、低带宽、轻量化的特点,下面是mqtt简述:
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。
详细的介绍看这里:
https://www.runoob.com/w3cnote/mqtt-intro.html
MQTT协议实现方式
实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。
MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:
(1)Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);
(2)payload,可以理解为消息的内容,是指订阅者具体要使用的内容。
看图会比较直观:

实际的使用中客户端和服务端都即可以是生产者,也可以是消费者,将生产的消息发送到broker队列,谁订阅谁消费。这样,大体上工作步骤就非常明确了:
1.注册客户端/服务端
2.获取broker地址
3.订阅需要的topic
4.生产/消费消息
以我开发的客户端的代码为例,供大家讨论:
1.模拟新设备开机激活,拿到mac,sn等信息
//设备注册请求 // func Actdev(device []string) (DeviceName string,Sk string,productAlias string,Isact bool,ProductKey string){ func Actdev(device models.DevInfo) (error,models.DevInfo){ body := Body{} Header := make(map[string]string) form := make(map[string]string) var dev map[string]string //计算sign // var sign string var ttime string = strconv.FormatInt(time.Now().UnixNano() / 1e9,10) dev = map[string]string{ "Product-Key": device.ProductKey, "Mac": device.Mac , "Sn": device.Sn, "Model": device.ModelName, "Version": "V010101RCN01C016011B1901081S", //"Time": strconv.FormatInt(time.Now().UnixNano() / 1e9,10), "Time":ttime, } // log.Info(dev) sign := md5.Hmac(device.ProductSecert, dev) // log.Info("sign: %s\n",sign) url := "http://beta.evice/v1/register" //添加 header Header["Content-Type"]="application/x-www-form-urlencoded" Header["Product-Key"]=device.ProductKey Header["Mac"]=device.Mac Header["Sn"]= device.Sn Header["Model"]=device.ModelName Header["Version"]="V010101RCN01C016011B1901081S" Header["Time"]= ttime form["sign"]= sign err,rsp := Postdata(Header, form, dev, url) if err !=nil{//fail log.Info(string(rsp)) return err,device }else{//ok err1 := json.Unmarshal(rsp, &body) // log.Info(string(rsp)) if err1 != nil { log.Info(err1) return err,device } // for k,v :=range(Header){ // log.Info(k+":"+v) // } if body.Code == 10000 { // log.Info("--------Device active success!") // log.Info(body.Code) device.ProductAlias= body.Data.ProductAlias device.DeviceName= body.Data.DeviceName device.DeviceSecret= body.Data.DeviceSecret // log.Info("\n") }else{ log.Info(body.Code) return nil,device } } return nil, device }
2.设备激活注册成功后获取broker地址:
//传入下发devname获取broker func Getbroker(device models.DevInfo) (error){ body := Body{} Header := make(map[string]string) form := make(map[string]string) var dev map[string]string key := device.ProductKey + device.DeviceSecret var ttime string = strconv.FormatInt(time.Now().UnixNano() / 1e9,10) Header["Content-Type"]="application/x-www-form-urlencoded" Header["Product-Key"]=device.ProductKey Header["Mac"]=device.Mac Header["Sn"]= device.Sn Header["Model"]=device.ModelName Header["Version"]="V010101RCN01C016011B1901081S" Header["Time"]= ttime nsign := md5.Hmac(key, dev) //log.Info("sign: %s\n",nsign ) url2 := "http://beta./broker/v1/address?" + "deviceName=" + device.DeviceName + "&sign=" + nsign rsp, err := Get(url2, Header, form) if err !=nil{//fail // log.Info(string(rsp)) return err }else{//ok err1 := json.Unmarshal(rsp, &body) // log.Info(string(rsp)) if err1 != nil { log.Info(err1) return err1 } // for k,v :=range(Header){ // log.Info(k+":"+v) // } if body.Code == 10000 { // log.Info("--------GET broker success!") // log.Info(body.Code) // dev["ProductAlias"]= Alias // dev["deviceName"]= body.Data.DeviceName // dev["DeviceSecret"]= Secret // log.Info(dev) // log.Info("\n") }else{ log.Info(body.Code) return nil } } return nil }
3.获取到broker地址后注册mqtt客户端:
func Newmqclient(device models.DevInfo) (Client,error) { broker:= "tcp://127.0.0.1:1883"//支持大多数通信协议 opts := mqtt.NewClientOptions().AddBroker(broker)//beta devname := device.DeviceName+"@"+device.ProductAlias passwd:= device.DeviceSecret opts.Username = devname opts.Password = passwd //timestamp := strconv.FormatInt(time.Now().UTC().UnixNano(), 10) //timestamp := time.Now().Format("2006-01-02 15:04:05.000000") clientID:="|aiot|device|"+devname+","+"deviceId="+device.DeviceID+","+"sn="+device.Sn+ "|" opts.SetClientID(clientID) // opts.SetDefaultPublishHandler(defaultHandler) //opts.SetConnectionLostHandler(onLostHandler) opts.SetAutoReconnect(true) client := NewClient(broker,devname,passwd,clientID) // client = mqtt.NewClient(opts) var err = client.Connect() if err != nil { return nil, err } // log.Info(opts.ClientID) // deviceproductkey = productkey ClientSuccess++ log.Info("%v | MQ is connected %d\n",device.DeviceName,ClientSuccess) // DevicemqName = deviceName //log := timestamp+" | "+deviceName+"connect to MQ " +strings.ToUpper(strconv.FormatBool(Isconn))+"\n" //logs.Logfile(log) // for { // time.Sleep(3*time.Second) // } return client,nil }
这里先简单介绍mqtt,后续有完整的事件再详细介绍mqtt的其他方法
4.订阅topic:
func (c *defaultClient) SubscribeUplink(client Client, topic string, device models.DevInfo) Token {
return c.client.Subscribe(topic, QoS, func(mqtt MQTT.Client, msg MQTT.Message) {
// Determine the actual topic
log.Info("Success SubscribeUplink with device: %v", device.DeviceName)
CheckReq(client, device, msg.Payload(), topic)//listen
})
}
5.监听消息并处理:
// CheckReq ... func CheckReq(client Client, device models.DevInfo, msg []byte, topic string) { //log.Infof("[MESSSAGE_RECEIVED] topic=%s, msg=%s, device=%v", topic, string(msg), device.DeviceName) var msgInfo models.MsgInfo json.Unmarshal(msg, &msgInfo) // replyId := msgInfo.MsgId ota_check_launch := "/aiot/{ProductKey}/{DeviceName}/running/launch" ota_check_online := "/aiot/{ProductKey}/{DeviceName}/req/status/online" ota_version_confirm := "/ota/device/{ProductKey}/{DeviceName}/req/version/inform" ota_version_notify := "/ota/device/{ProductKey}/{DeviceName}/req/version/notify" ota_check_launch =strings.Replace(ota_check_launch, "{ProductKey}", device.ProductKey, -1) ota_check_online = strings.Replace(ota_check_online, "{ProductKey}", device.ProductKey, -1) ota_check_online = strings.Replace(ota_check_online, "{DeviceName}", device.DeviceName, -1) ota_version_confirm = strings.Replace(ota_version_confirm, "{ProductKey}", device.ProductKey, -1) ota_version_confirm = strings.Replace(ota_version_confirm, "{DeviceName}", device.DeviceName, -1) ota_version_notify = strings.Replace(ota_version_notify, "{ProductKey}", device.ProductKey, -1) ota_version_notify = strings.Replace(ota_version_notify, "{DeviceName}", device.DeviceName, -1) //log.Infof("Received Topic: %s, device: %s, match: %s", topic, device.DeviceName, ota_check_online) switch topic { case ota_check_online: //log.Infof("Processing... topic=%s,match=%s,method=%s", topic, ota_check_online, "OTA_CHECK_ONLINE") // DeviceStatusOnlineReq(client, device, replyId) case ota_version_confirm: //log.Debugf("o=%s,t=%s,method=%s", topic, ota_version_confirm, "OTA_VERSION_CONFORM") // DeviceVersionInformReq(client, device, replyId) case ota_version_notify: //log.Debugf("o=%s,t=%s,method=%s", topic, ota_version_notify, "OTA_VERSION_NOTIFY") // DevicePushVersionReq(client, device, replyId, string(msg)) case ota_check_launch: log.Info(topic) } }
6.发布消息:
func LaunchReq(client Client, device models.DevInfo) { topic := "/aiot/" + device.ProductKey + "/" + device.DeviceName + "/running/launch" // log.Info(topic) msgInfo := models.MsgInfo{ Code: 10000, Data: "", MsgId: strconv.FormatInt(time.Now().Unix(), 10), Time: time.Now().UnixNano() / 1e6, Version: "1.0", } jsonBytes, _ := json.Marshal(msgInfo) if client != nil { client.PublishUplink(topic, string(jsonBytes)) Publistcount++ now :=time.Now().Format("2006-01-02 15:04:05") // db.Logfile(now +" | "+strconv.Itoa(Publistcount)+" | "+topic) log.Info("published %d "+topic, Publistcount) db.InsertDB(device.DeviceName,now) } }
模拟发布设备开机的消息
实际应用中可能更复杂,这里只介绍最基本的使用,欢迎有兴趣的童靴留言讨论
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。