赞
踩
首先,声明一下,因为懒的原因,将mqtt的小文章搁置了两个月,现在终于要补上了!哈哈
MQTT(Message Queuing Telemetry Transport)是一种轻量级、基于发布-订阅模式的消息传输协议,适用于资源受限的设备和低带宽、高延迟或不稳定的网络环境。它在物联网应用中广受欢迎,能够实现传感器、执行器和其它设备之间的高效通信。
MQTT用于收发消息的主要组件有:
三者的关系如下图:
从图中可以看出,如果把发送消息或接收消息的端成为Client,那么一个Client即可以是发布者(Publisher)也可以是订阅者(Subscriber)。因为一个client可以通过实现publish接口发送消息,也可以用过实现subscribe接口订阅消息。
代理(Broker)是链接发布者(Publisher)和订阅者(Subscriber)的中心。一个代理可以链接上千个Client。代理的职责是接收所有发布者(Publisher)的消息,判断消息应该发往哪个订阅者(Subscriber)
详细介绍移步至:MQTT 协议入门:基础知识和快速教程 | EMQ (emqx.com)
涉及到3个,一个是客户端工具,一个是服务端,最后是客户端发布订阅代码实现,用于发布和订阅。
mqtt客户端工具可以不用代码发送你想要发送的数据到各个订阅端(mqtt发布订阅等参照上面的链接),也可以用于测试写出的代码是否可以正常发布数据(客户端工具相当于一个)。
下载链接:MQTTX:全功能 MQTT 客户端工具
不管是订阅还是发送都是一个client,而服务则是一个BROKER(代理)。
下载链接 :Download EMQX
下载后解压名字为emqx,用cmd命令进入到bin文件夹,直接emqx.cmd start命令即可启动mqtt服务
这里只实现发布的C++代码。代码编写的逻辑是:写一个mqtt发布端,在链接到代理之后开始发布缓存中的数据,这里通过定时器每秒发送数据给到代理,如果某个订阅者订阅到消息则会触发回调函数,以显示订阅者收到了数据。
main.cpp:
- #include "mqttclient.hpp"
- #include <Qapplication>
- #include <mutex>
- /*
- 1.收到消息的结果回调
- 2.qtimer,到了时间自动publish
- 3.异步方式(生产者消费者方式)
- 3.满足用户不同方式使用客户端(多个构造)
- */
- const QHostAddress EXAMPLE_HOST = QHostAddress("127.0.0.1");
- const quint16 EXAMPLE_PORT = 1883;
- const QString EXAMPLE_TOPIC = "mqtt/lmy";
- #if 1
- int main(int argc,char **argv)
- {
- QApplication app(argc, argv);
- //QHostAddress ip(EXAMPLE_HOST);
- std::shared_ptr<main_test>ptr;
- mqtt_test mtest(EXAMPLE_HOST, EXAMPLE_PORT);
-
- mtest.SetTopic(EXAMPLE_TOPIC);
- mtest.SetCallback( ptr);
-
-
- for (int i = 0; i < 100; ++i)
- {
- std::unique_lock<std::mutex>lock(mtest.m_mtx);
- mtest.m_queue.push(i);
- }
-
- return app.exec();
- }
- #include "main.moc"
- #endif
- //#include <filesystem>
- //namespace fs = std::filesystem;
- //int main()
- //{
- // std::cout<<fs::current_path().generic_string();
- // return 0;
- //}

mqtt 发布类代码实现:
- #ifndef MQTTCLIENT_HPP
- #define MQTTCLIENT_HPP
-
- #include <iostream>
- #include <Qdebug>
- #include <QObject>
- #include <qmqtt.h>
- #include <queue>
- #include <QTimer>
- #include <mutex>
- #include <memory>
- using namespace QMQTT;
- const QString EXAMPLE_NAME = "LMY";
-
- class Oncallback
- {
- public:
- virtual void Callback() = 0;
- };
-
- class main_test :public Oncallback
- {
- void Callback()
- {
- std::cout << "recive meassage ";
- }
- };
-
-
- class mqtt_test :public QObject
- {
- Q_OBJECT
- public:
- QHostAddress m_ip;
- quint16 m_port;
- QString m_topic;
- QMQTT::Client* m_client;
- QTimer m_timer;
- std::queue<int>m_queue;
- std::mutex m_mtx;
- std::shared_ptr<Oncallback> m_call;
- public:
- mqtt_test(QHostAddress ip, quint16 port = 1883)
- {
- m_client = new Client();
- m_client->setHost(ip);
- m_client->setPort(port);
- //m_client->setHostName(EXAMPLE_NAME);
- m_client->connectToHost();
- connect(m_client, &Client::connected,this,
- &mqtt_test::Onconnected);
- connect(&m_timer, &QTimer::timeout, this,
- &mqtt_test::Published);
-
- connect(m_client, &Client::received, this,
- &mqtt_test::Received);
- connect(m_client, &Client::disconnected, this,
- &mqtt_test::OnDisconnected);
- }
- ~mqtt_test() { m_client->disconnectFromHost(); }
-
-
- public slots:
- void SetTopic(QString topic)
- {
- //m_port = PORT;
- //m_ip = IP;
- m_topic = topic;
- m_client->subscribe(topic, 0);
- }
-
- bool SetCallback(std::shared_ptr<Oncallback>ptr)
- {
- m_call = ptr;
- return true;
- }
-
- void Onconnected()
- {
- m_timer.start(1);
- qDebug() << "connected:";
- }
-
- void Published()
- {
- int x;
- {
- std::unique_lock < std::mutex >lock(m_mtx);
- if (!m_queue.empty())
- {
- x = m_queue.front();
- if (x == 99)
- {
- std::cout << x;
- }
- m_queue.pop();
- }
- }
- if (m_queue.empty())return;
-
- QMQTT::Message message(x, m_topic,QString("publish number %1").arg(x).toUtf8());
- m_client->publish(message);
- qDebug()<< "PUBLISH:" ;
- }
-
- void OnDisconnected()
- {
- qDebug() << "disconnected:";
- m_timer.stop();
- }
-
- void Received(const QMQTT::Message& message)
- {
- //开始回调
- m_call->Callback();
- qDebug() << "publish received: \"" << QString::fromUtf8(message.payload());
- }
-
- };
- #endif // !MQTTCLIENT_HPP
-
-
-

代理收到了发布者的消息并显示在界面上。注意订阅主题需要按规范且保持一致。
这里mqtt客户类的构造中,使用信号槽方式来接受和处理信号。
mqtt_test(QHostAddress ip, quint16 port = 1883)
{
m_client = new Client();
m_client->setHost(ip);
m_client->setPort(port);
//m_client->setHostName(EXAMPLE_NAME);
m_client->connectToHost();
connect(m_client, &Client::connected,this,
&mqtt_test::Onconnected); // 一旦收到了链接成功的信号则会触发Onconnected()函数
connect(&m_timer, &QTimer::timeout, this,
&mqtt_test::Published);
connect(m_client, &Client::received, this,
&mqtt_test::Received);//这里没收到回复,则不调动槽函数执行Received().
connect(m_client, &Client::disconnected, this,
&mqtt_test::OnDisconnected);
}
需要注意的是,这里并没有用到集成mqtt类去实现(不会),而是直接使用qmqtt的头文件加DLL库的形式调动qmqtt。
这里qmqtt库的获取方式可直接去github上下载代码,拿到本地,编译成动态库即可。链接:
GitHub - emqx/qmqtt: MQTT client for Qt
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。