赞
踩
目录
VC++常用功能开发汇总(专栏文章列表,欢迎订阅,持续更新...)
https://blog.csdn.net/chenlycly/article/details/124272585C++软件异常排查从入门到精通系列教程(专栏文章列表,欢迎订阅,持续更新...)
https://blog.csdn.net/chenlycly/article/details/125529931C++软件分析工具从入门到精通案例集锦(专栏文章正在更新中...)
https://blog.csdn.net/chenlycly/article/details/131405795C/C++基础与进阶(专栏文章,持续更新中...)
https://blog.csdn.net/chenlycly/category_11931267.html开源组件及数据库技术(专栏文章,持续更新中...)
https://blog.csdn.net/chenlycly/category_12458859.html网络编程与网络问题分享(专栏文章,持续更新中...)
https://blog.csdn.net/chenlycly/category_2276111.html 为了方便第三方厂商对接我们的系统,后台服务器对外提供了基于HTTP的API。其中,为了解决HTTP API无法实时感知到业务实体的各种状态变化的问题,基于CometD技术,开发了消息推送服务,使用者可以通过CometD订阅关注的信息。当业务实体的状态发生变化时,可以实时收到状态变化的推送消息。但消息推送服务发布后,使用的人却不是很多,主要是因为CometD协议较为复杂,且仅有Java和JavaScript客户端,无法匹配客户的开发环境及开发语言。在对现有技术进行比较和分析后,发现MQTT协议是一个较为简单、成熟、且对各类开发语言支持都较好,决定使用MQTT协议替代CometD协议。今天就给大家详细介绍一下MQTT协议相关的内容。

MQTT是由IBM开发的一个Publish/Subscribe(发布/订阅)模型的即时通讯协议:
MQTT 协议由 Andy Stanford-Clark (IBM)和 Arlen Nipper(Arcom,现为 Cirrus Link,现任该公司的总裁兼CTO)于 1999 年发布。据 Arlen Nipper 在 IBM Podcast 上的自述,MQTT 原名是 MQ TT,注意 MQ 与 TT之间的空格,其全称为:MQ Telemetry Transport,是九十年代早期他在参与 Conoco Phillips 公司的一个原油管道数据采集监控系统(pipeline SCADA system)时开发的一个实时数据传输协议。它的目的在于让传感器通过带宽有限的 VSAT ,与 IBM 的 MQ Integrator 通信。由于 Nipper 是遥感和数据采集监控专业出身,所以按业内惯例取了 MQTT 这个名字。
MQTT基于TCP/IP协议,与其他的推送协议,如(XMPP、CometD)相比,有以下优势:
1)协议简单,开销极小
MQTT的固定消息头只有2字节,与XMPP所使用的XML和CometD所使用的HTTP协议相比,协议开销几乎可以忽略。因此MQTT非常适合于运算能力有限的嵌入式设备及网络带宽受限的移动设备。
2)客户端/服务器端实现丰富
几乎所有语言都有MQTT客户端实现,包括但不限于C、C++、Objective-C、C#、Java、JavaScript、Python、PHP、Go、Erlang等常用语言,而CometD只提供了Java及JavaScript客户端。
3)支持多种QoS选项,以满足不同场景下对消息可靠性的需求
相比之下、XMPP和CometD都没有类似的消息保障机制
4)MQTT支持消息路由
MQTT的消息路由方式与CometD类似,都是基于路径的路由方式。
MQTT相比于HTTP,有以下的优势:
- MQTT 的最小报文仅为 2 个字节,比 HTTP 占用更少的网络开销。
- MQTT 与 HTTP 都能使用 TCP 连接,并实现稳定、可靠的网络连接。
- MQTT 基于发布订阅模型,HTTP 基于请求响应,因此 MQTT 支持双工通信。
- MQTT 可实时推送消息,但 HTTP 需要通过轮询获取数据更新。
- MQTT 是有状态的,但是 HTTP 是无状态的。
- MQTT 可从连接异常断开中恢复,HTTP 无法实现此目标。
MQTT 协议设计简单轻量、路由灵活,将在移动互联网、物联网消息领域,全面取代 PC 时代的 XMPP 协议。MQTT相比于XMPP,有以下的优势:
- MQTT 报文体积小且编解码容易,XMPP 基于繁重的 XML,报文体积大且交互繁琐。
- MQTT 基于发布订阅模式,相比 XMPP 基于 JID 的点对点消息路由更为灵活。
- MQTT 支持 JSON、二进制等不同类型报文。XMPP 采用 XML 承载报文,二进制必须 Base64 编码等处理。
- MQTT 通过 QoS 保证消息可靠传输,XMPP 主协议并未定义类似机制。
基于MQTT的上述特点,MQTT常被用于以下场景:

1)物联网M2M通信,物联网大数据采集;
2)Android消息推送,WEB消息推送;
3)移动即时消息,例如Facebook Messenger;
4)智能硬件、智能家具、智能电器;
5)车联网通信,电动车站桩采集;
6)智慧城市、远程医疗、远程教育;7)电力、石油与能源等行业市场。

据 IoT Analytics 最新发布的《2022 年春季物联网状况》研究报告显示,到 2022 年,物联网市场预计将增长 18%,达到 144 亿活跃连接。
在如此大规模的物联网需求下,海量的设备接入和设备管理对网络带宽、通信协议以及平台服务架构都带来了巨大的挑战。对于物联网协议来说,必须针对性地解决物联网设备通信的几个关键问题:网络环境复杂而不可靠、内存和闪存容量小、处理器能力有限。

MQTT 协议正是为了应对以上问题而创建,经过多年的发展凭借其轻量高效、可靠的消息传递、海量连接支持、安全的双向通信等优点已成为物联网行业的首选协议。
MQTT 将协议本身占用的额外消耗最小化,消息头部最小只需要占用 2 个字节,可稳定运行在带宽受限的网络环境下。同时,MQTT 客户端只需占用非常小的硬件资源,能运行在各种资源受限的边缘端设备上。
MQTT 协议提供了 3 种消息服务质量等级(Quality of Service),保证了在不同的网络环境下消息传递的可靠性。
- QoS 0:消息最多传递一次。
如果当时客户端不可用,则会丢失该消息。发布者发送一条消息之后,就不再关心它有没有发送到对方,也不设置任何重发机制。- QoS 1:消息传递至少 1 次。
包含了简单的重发机制,发布者发送消息之后等待接收者的 ACK,如果没收到 ACK 则重新发送消息。这种模式能保证消息至少能到达一次,但无法保证消息重复。- QoS 2:消息仅传送一次。
设计了重发和重复消息发现机制,保证消息到达对方并且严格只到达一次。
除了 QoS 之外,MQTT 还提供了清除会话(Clean Session)机制。对于那些想要在重新连接后,收到离线期间错过的消息的客户端,可在连接时设置关闭清除会话,此时服务端将会为客户端存储订阅关系及离线消息,并在客户端再次上线后发送给客户端。
MQTT 协议从诞生之时便考虑到了日益增长的海量物联网设备,得益于其优秀的设计,基于 MQTT 的物联网应用及服务可轻松具备高并发、高吞吐、高可扩展能力。
连接海量的物联网设备,离不开 MQTT 服务器的支持。目前,MQTT 服务器中支持并发连接数最多的是 EMQX。最近发布的 EMQX 5.0 通过一个 23 节点的集群达成了 1 亿 MQTT 连接+每秒 100 万消息吞吐,这使得 EMQX 5.0 成为目前为止全球最具扩展性的 MQTT 服务器。
依赖于发布订阅模式,MQTT 允许在设备和云之间进行双向消息通信。发布订阅模式的优点在于:发布者与订阅者不需要建立直接连接,也不需要同时在线,而是由消息服务器负责所有消息的路由和分发工作。

安全性是所有物联网应用的基石,MQTT 支持通过 TLS/SSL 确保安全的双向通信,同时 MQTT 协议中提供的客户端 ID、用户名和密码允许我们实现应用层的身份验证和授权。
为了应对网络不稳定的情况,MQTT 提供了心跳保活(Keep Alive)机制。在客户端与服务端长时间无消息交互的情况下,Keep Alive 保持连接不被断开,若一旦断开,客户端可即时感知并立即重连。
同时,MQTT 设计了遗愿(Last Will) 消息,让服务端在发现客户端异常下线的情况下,帮助客户端发布一条遗愿消息到指定的 MQTT 主题。
另外,部分 MQTT 服务器也提供了上下线事件通知功能,当后端服务订阅了特定主题后,即可收到所有客户端的上下线事件,这样有助于后端服务统一处理客户端的上下线事件。

市面上有多种基于不同语言实现的MQTT服务器,它们对MQTT标准的支持也各不相同,如下所示:
| Server | QoS 0 | QoS 1 | QoS 2 | auth | SSL | cluster | websockets | plugin system | |||
| ✔ | ✔ | ✔ | ✔ | ✔ | § | ✔ | ✔ | ✔ | ✔ | ✘ | |
| ✔ | ✔ | ✔ | ✔ | ✘ | ✘ | ✔ | ✔ | ✔ | ✔ | ✔ | |
| ✔ | ✔ | ✔ | ✔ | ✘ | ✘ | ✔ | ✔ | ? | ✔ | ? | |
| ✔ | ✔ | ✔ | ✔ | rm | ✔ | ✔ | ✔ | ✔ | ✔ | rm | |
| ✔ | § | ✘ | ✔ | ✘ | ✘ | ✔ | ✔ | ✔ | ✔ | ✘ | |
| ✔ | ✔ | ✔ | ✔ | ✔ | ✔ | ✔ | ✔ | ✔ | ✔ | ✔ | |
| ✔ | ✔ | ✔ | ✔ | ✘ | ✘ | ✔ | ✔ | ✔ | ✔ | ✘ | |
| ✔ | ✔ | ✔ | ✔ | ✘ | ✘ | ✘ | ✔ | ✘ | ✘ | ✘ | |
| ✔ | ✔ | ✔ | ✔ | ✘ | ✔ | ✔ | ✔ | ✘ | ✔ | ✔ | |
| ✔ | ✔ | ✔ | ✔ | ✔ | ✔ | ✔ | ✔ | ✔ | ✔ | ✔ | |
| ✔ | ✔ | ✔ | ✔ | ✘ | ✔ | ✔ | ✔ | § | ✔ | ✘ | |
| ✔ | ✔ | ✔ | ✔ | ✔ | ✔ | ✔ | ✔ | ✔ | ✔ | ✔ | |
| ✔ | ✔ | ? | ? | ? | ? | ? | ? | ? | ? | ? | |
| ✔ | ✔ | ✔ | ✔ | ? | ? | ✔ | ? | rm | ✔ | ✘ | |
| ✔ | ✔ | ✘ | ✔ | ? | ? | ? | ? | ✘ | ✔ | ✘ | |
| ✔ | ✔ | ✔ | ✔ | ✔ | ✔ | ✔ | ✔ | § | ✔ | ✔ | |
| ✔ | ✔ | ✔ | § | ✘ | ✘ | ✔ | ✔ | ✘ | ✔ | ✘ | |
| ✔ | ✔ | ✘ | ✔ | ✘ | ✘ | ✔ | ✔ | ? | ? | ? | |
| ✔ | ✔ | ✔ | ✔ | ✔ | ✔ | ✘ | ✔ | ✘ | ✘ | ? | |
| ✔ | ✔ | ✔ | ✔ | ✘ | ✘ | ✔ | ✔ | ✔ | rm | ✘ | |
| ✔ | ✔ | ✘ | ✔ | § | ✔ | ✔ | ✔ | ✔ | ✔ | ✘ | |
| ✔ | ✔ | ✔ | ✔ | ✔ | ✘ | ✔ | ✔ | ✔ | ✘ | ✔ | |
| ✔ | ✔ | ✔ | ✔ | ✘ | ✘ | ✔ | ✔ | ✘ | ✘ | ✘ | |
| ✔ | ✔ | ✔ | ✔ | ✔ | ✔ | ✔ | ✔ | ✔ | ✔ | ✔ | |
| ✔ | ✔ | ✔ | ✔ | ✔ | ✔ | ✔ | ✔ | ? | ? | ? |
Key: ✔ supported ✘ not supported ? unknown § see limitations rm roadmap (planned)
本文选择基于erlang实现的emqttd,emqttd由国人实现,支持了MQTT所有标准,并且提供了相对完善的中文文档,相关的帮助文档链接为:MQTT 最全教程:从入门到精通 | EMQ
emqttd(现改名为emqx)是一款应用广泛的大规模分布式物联网 MQTT 服务器。自 2013 年在 GitHub 发布开源版本以来,目前全球下载量已超千万,累计连接物联网关键设备超过 1 亿台。

MQTT使用基于主题(Topic)的消息路由,与CometD类似,格式如下:
chat/room/1
sensor/10/temperature
$SYS/broker/metrics/packets/received
订阅者可以使用通配符(+,#)订阅主题,其中+可以匹配一层目录,#可以匹配多层目录,但#只能用在Topic结尾,如:
chat/room/+
sensor/+/temperature
chat/room/members/#
MQTT中的Topic支持动态创建,当客户端订阅一个不存在的Topic或向一个不存在的Topic中发送消息时,MQTT服务器会自动创建这个Topic。
MQTT支持三种QoS模型:
QoS 0:最多一次,消息最多被发送一次,如果发送失败不会重试
QoS 1:最少一次,消息至少被发送一次,如果失败会重试,但可能造成重复消息
QoS 2:恰好一次,消息不会丢失也不会重复,但会造成更高的延迟和协议开销

MQTT服务器负责接收客户端发起的连接,并将客户端发送的消息转发到另外一些符合条件的客户端。一个成熟的MQTT服务器可支持海量的客户端连接及百万级的消息吞吐,帮助物联网业务提供商专注于业务功能并快速创建一个可靠的MQTT应用。下面详细讲述emqtt服务器的搭建过程。
emqtt服务器搭建比较简单,这里以Windows为例。官方的下载页面提供了编译好的二进制文件下载,这里我们选择Windows 10(64-bit)下载,下载后将文件解压到所需目录,如D:/emqqtd。然后在cmd里执行以下命令,安装emqttd:
cd /d D:\emqttd\bin
emqttd.cmd install
安装成功后,执行以下脚本启动emqttd服务(第一次启动可能会卡住,杀掉后重启一次即可):
emqqtd.cmd start
emqtt成功启动后,就可以通过http://127.0.0.1:18083访问WEB控制台了,默认用户名和密码为admin和public。

emqtt支持多种用户校验方式,包括http、mysql、redis等方式。
emqtt的MySQL认证可以自己配置认证所用的数据库及查询语句,无需创建特定的数据表,因此较为灵活,也更容易与已有系统集成。
我们可以通过修改etc\plugins\emq_auth_mysql.conf文件配置MySQL认证插件。
其中auth.mysql.auth_query为用户登陆认证查询,emqtt通过此语句查询出用户密码,并尝试匹配。
auth.mysql.password_hash可以配置数据库里密码的加密方式,这里我们选择md5。
auth.mysql.super_query用于查询用户是否为管理员,这里暂且不配,auth.mysql.acl_query用来查询用户的权限,也暂且忽略。
- ##--------------------------------------------------------------------
- ## MySQL Auth/ACL Plugin
- ##--------------------------------------------------------------------
-
- ## MySQL server address.
- ##
- ## Value: Port | IP:Port
- ##
- ## Examples: 3306, 127.0.0.1:3306, localhost:3306
- auth.mysql.server = 172.16.218.136:3306
-
- ## MySQL pool size.
- ##
- ## Value: Number
- auth.mysql.pool = 8
-
- ## MySQL username.
- ##
- ## Value: String
- auth.mysql.username = test001
-
- ## MySQL password.
- ##
- ## Value: String
- auth.mysql.password = xxxxxx
-
- ## MySQL database.
- ##
- ## Value: String
- auth.mysql.database = ap
-
- ## Variables: %u = username, %c = clientid
-
- ## Authentication query.
- ##
- ## Note that column names should be 'password' and 'salt' (if used).
- ## In case column names differ in your DB - please use aliases,
- ## e.g. "my_column_name as password".
- ##
- ## Value: SQL
- ##
- ## Variables:
- ## - %u: username
- ## - %c: clientid
- ##
- auth.mysql.auth_query = select password from user_info where (e164 = '%u' or email ='%u' or account = '%u' or mobile = '%s') and binded = 0 and enable = 1 limit 1
- ## auth.mysql.auth_query = select password_hash as password from mqtt_user where username = '%u' limit 1
-
- ## Password hash.
- ##
- ## Value: plain | md5 | sha | sha256 | bcrypt
- auth.mysql.password_hash = md5
-
- ## sha256 with salt prefix
- ## auth.mysql.password_hash = salt,sha256
-
- ## bcrypt with salt only prefix
- ## auth.mysql.password_hash = salt,bcrypt
-
- ## sha256 with salt suffix
- ## auth.mysql.password_hash = sha256,salt
-
- ## pbkdf2 with macfun iterations dklen
- ## macfun: md4, md5, ripemd160, sha, sha224, sha256, sha384, sha512
- ## auth.mysql.password_hash = pbkdf2,sha256,1000,20
-
- ## Superuser query.
- ##
- ## Value: SQL
- ##
- ## Variables:
- ## - %u: username
- ## - %c: clientid
- auth.mysql.super_query = select is_superuser from mqtt_user where username = '%u' limit 1
-
- ## ACL query.
- ##
- ## Value: SQL
- ##
- ## Variables:
- ## - %a: ipaddr
- ## - %u: username
- ## - %c: clientid
- ##auth.mysql.acl_query = select allow, ipaddr, username, clientid, access, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'

配置完毕后,可以通过命令或WEB页面启用插件:
bin/emqttd_ctl.cmd plugins load emq_auth_mysql
这样emqtt客户端连接服务器时,就需要密码认证了。
emqtt的HTTP认证可以向指定的认证服务器发送请求判断客户端是否有权限登陆、是否是管理员、是否有权限订阅Topic。
与MySQL认证相比,HTTP认证更为灵活,我们可以在HTTP认证服务器端做一些额外的逻辑以满足需求。
HTTP认证插件的配置文件位于etc\plugins\emq_auth_http.conf:
- ##--------------------------------------------------------------------
- ## HTTP Auth/ACL Plugin
- ##--------------------------------------------------------------------
-
- ##--------------------------------------------------------------------
- ## Authentication request.
- ##
- ## Variables:
- ## - %u: username
- ## - %c: clientid
- ## - %a: ipaddress
- ## - %P: password
- ##
- ## Value: URL
- auth.http.auth_req = http://127.0.0.1:8080/mqtt/auth
- ## Value: post | get | put
- auth.http.auth_req.method = post
- ## Value: Params
- auth.http.auth_req.params = clientid=%c,username=%u,password=%P
-
- ##--------------------------------------------------------------------
- ## Superuser request.
- ##
- ## Variables:
- ## - %u: username
- ## - %c: clientid
- ## - %a: ipaddress
- ##
- ## Value: URL
- auth.http.super_req = http://127.0.0.1:8080/mqtt/superuser
- ## Value: post | get | put
- auth.http.super_req.method = post
- ## Value: Params
- auth.http.super_req.params = clientid=%c,username=%u
-
- ##--------------------------------------------------------------------
- ## ACL request.
- ##
- ## Variables:
- ## - %A: 1 | 2, 1 = sub, 2 = pub
- ## - %u: username
- ## - %c: clientid
- ## - %a: ipaddress
- ## - %t: topic
- ##
- ## Value: URL
- auth.http.acl_req = http://127.0.0.1:8080/mqtt/acl
- ## Value: post | get | put
- auth.http.acl_req.method = get
- ## Value: Params
- auth.http.acl_req.params = access=%A,username=%u,clientid=%c,ipaddr=%a,topic=%t

当有客户端向EMQTT服务器建链时,EMQTT会按配置文件中配置的格式向指定的服务器发送HTTP请求,如:
- POST /mqtt/auth HTTP/1.1
- content-type: application/x-www-form-urlencoded
- content-length: 77
- te:
- host: 127.0.0.1:8070
- connection: keep-alive
-
- clientid=hbmqtt%2FY78%5CX7NV81pt%3Fa0w&username=XXXXXXXXXX&password=XXXXXX
HTTP服务器端只要响应并处理该请求即可,如果用户名密码正确,则返回 200 OK给EMQTT服务器,如果错误,则返回4XX。

EMQTT支持ACL控制,可以实现对Topic订阅及发布权限的控制。当客户端订阅或发布消息时,EMQTT服务器会根据ACL配置判断用户是否有权限订阅或在Topic上发布消息。
ACL规则同样支持以配置文件、MySQL、Redis、HTTP等方式实现。但配置文件中所配规则,必需重新加载MQTT才能生效,对于有较多动态Topic的场景,并不适合。
MySQL鉴权需要用户创建一张权限表,并将Topic的权限信息写入数据库,EMQTT服务器会执行一条查询语句来插判断该客户端是否有权限。
Redis鉴权与MySQL鉴权类似,也需要预先在将Topic的权限信息写入Redis,EMQTT会执行一条Redis命令来判断是否有权限。MySQL和Redis鉴权都可以处理简单的鉴权问题,对于需要额外逻辑进行鉴权的场景,可以使用HTTP方式鉴权。
与HTTP认证一样,EMQTT服务器也会通过发送一条鉴权请求来判断客户端是否有权限,如:
- GET /mqtt/auth?clientid=hbmqtt%2FY78%5CX7NV81pt%3Fa0w&username=xxxxxxxxx&password=xxxxxx&topic=test%2Faaa&access=1 HTTP/1.1
- content-type: application/x-www-form-urlencoded
- content-length: 77
- te:
- host: 127.0.0.1:8070
- connection: keep-alive
服务器可以通过回复200 OK授权或者4XX错误拒绝。注:superuser不受ACL控制,EMQTT会默认允许superuser的订阅和发布行为。

MQTT 应用通常需要基于 MQTT 客户端库来实现 MQTT 通信。目前,几乎所有常见语言都有对应的MQTT客户端实现,不同的客户端实现,可能对MQTT标准的支持程度不同,可以在官网上查询到所有的客户端列表。
这里我们以Python的paho-mqtt为例,以下代码实现了订阅、发布消息的功能:
- import paho.mqtt.client as mqtt
-
- def on_connect(client, userdata, flags, rc):
- # rc为0时代表连接成功
- if rc == 0:
- client.subscribe('userdomains/#')
-
- def on_subscribe(client, userdata, mid, granted_qos):
- client.publish('userdomains/xxx', 'hello')
-
- def on_message(client, userdata, msg):
- print(msg.topic+' '+str(msg.payload))
-
- if __name__ == '__main__':
- client = mqtt.Client()
- client.username_pw_set('xxxxxxxxx', 'xxxxxxxx')
- client.on_connect = on_connect
- client.on_message = on_message
- client.on_subscribe = on_subscribe
- client.connect("127.0.0.1", 1883, 60)
- client.loop_forever()

与CometD相比,MQTT协议更简单,网络开销也远小于CometD,并且提供了丰富的语言支持,无论是前端语言JavaScript,还是后端语言Java、php、C++等都有原生的客户端。对于第三方来说,学习和使用成本都降低不少。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。