当前位置:   article > 正文

C++代码建立mqtt客户端发送数据和接受mqtt消息_c++ mqtt

c++ mqtt

首先,声明一下,因为懒的原因,将mqtt的小文章搁置了两个月,现在终于要补上了!哈哈

一.什么是mqtt以及用途和优点

MQTT(Message Queuing Telemetry Transport)是一种轻量级、基于发布-订阅模式的消息传输协议,适用于资源受限的设备和低带宽、高延迟或不稳定的网络环境。它在物联网应用中广受欢迎,能够实现传感器、执行器和其它设备之间的高效通信。

MQTT用于收发消息的主要组件有:

  • Publisher(发布者)
  • Broker(代理)
  • Subscriber(订阅者)

三者的关系如下图:

从图中可以看出,如果把发送消息或接收消息的端成为Client,那么一个Client即可以是发布者(Publisher)也可以是订阅者(Subscriber)。因为一个client可以通过实现publish接口发送消息,也可以用过实现subscribe接口订阅消息。

代理(Broker)是链接发布者(Publisher)和订阅者(Subscriber)的中心。一个代理可以链接上千个Client。代理的职责是接收所有发布者(Publisher)的消息,判断消息应该发往哪个订阅者(Subscriber)

详细介绍移步至:MQTT 协议入门:基础知识和快速教程 | EMQ (emqx.com)

 二.环境搭建

涉及到3个,一个是客户端工具,一个是服务端,最后是客户端发布订阅代码实现,用于发布和订阅。

一.mqtt客户端工具及

mqtt客户端工具可以不用代码发送你想要发送的数据到各个订阅端(mqtt发布订阅等参照上面的链接),也可以用于测试写出的代码是否可以正常发布数据(客户端工具相当于一个)。

下载链接:MQTTX:全功能 MQTT 客户端工具

二 服务端

不管是订阅还是发送都是一个client,而服务则是一个BROKER(代理)。

下载链接 :Download EMQX

Download EMQX

 下载后解压名字为emqx,用cmd命令进入到bin文件夹,直接emqx.cmd start命令即可启动mqtt服务

三 发布订阅代码实现

这里只实现发布的C++代码。代码编写的逻辑是:写一个mqtt发布端,在链接到代理之后开始发布缓存中的数据,这里通过定时器每秒发送数据给到代理,如果某个订阅者订阅到消息则会触发回调函数,以显示订阅者收到了数据。

main.cpp:

  1. #include "mqttclient.hpp"
  2. #include <Qapplication>
  3. #include <mutex>
  4. /*
  5. 1.收到消息的结果回调
  6. 2.qtimer,到了时间自动publish
  7. 3.异步方式(生产者消费者方式)
  8. 3.满足用户不同方式使用客户端(多个构造)
  9. */
  10. const QHostAddress EXAMPLE_HOST = QHostAddress("127.0.0.1");
  11. const quint16 EXAMPLE_PORT = 1883;
  12. const QString EXAMPLE_TOPIC = "mqtt/lmy";
  13. #if 1
  14. int main(int argc,char **argv)
  15. {
  16. QApplication app(argc, argv);
  17. //QHostAddress ip(EXAMPLE_HOST);
  18. std::shared_ptr<main_test>ptr;
  19. mqtt_test mtest(EXAMPLE_HOST, EXAMPLE_PORT);
  20. mtest.SetTopic(EXAMPLE_TOPIC);
  21. mtest.SetCallback( ptr);
  22. for (int i = 0; i < 100; ++i)
  23. {
  24. std::unique_lock<std::mutex>lock(mtest.m_mtx);
  25. mtest.m_queue.push(i);
  26. }
  27. return app.exec();
  28. }
  29. #include "main.moc"
  30. #endif
  31. //#include <filesystem>
  32. //namespace fs = std::filesystem;
  33. //int main()
  34. //{
  35. // std::cout<<fs::current_path().generic_string();
  36. // return 0;
  37. //}

 mqtt 发布类代码实现:

  1. #ifndef MQTTCLIENT_HPP
  2. #define MQTTCLIENT_HPP
  3. #include <iostream>
  4. #include <Qdebug>
  5. #include <QObject>
  6. #include <qmqtt.h>
  7. #include <queue>
  8. #include <QTimer>
  9. #include <mutex>
  10. #include <memory>
  11. using namespace QMQTT;
  12. const QString EXAMPLE_NAME = "LMY";
  13. class Oncallback
  14. {
  15. public:
  16. virtual void Callback() = 0;
  17. };
  18. class main_test :public Oncallback
  19. {
  20. void Callback()
  21. {
  22. std::cout << "recive meassage ";
  23. }
  24. };
  25. class mqtt_test :public QObject
  26. {
  27. Q_OBJECT
  28. public:
  29. QHostAddress m_ip;
  30. quint16 m_port;
  31. QString m_topic;
  32. QMQTT::Client* m_client;
  33. QTimer m_timer;
  34. std::queue<int>m_queue;
  35. std::mutex m_mtx;
  36. std::shared_ptr<Oncallback> m_call;
  37. public:
  38. mqtt_test(QHostAddress ip, quint16 port = 1883)
  39. {
  40. m_client = new Client();
  41. m_client->setHost(ip);
  42. m_client->setPort(port);
  43. //m_client->setHostName(EXAMPLE_NAME);
  44. m_client->connectToHost();
  45. connect(m_client, &Client::connected,this,
  46. &mqtt_test::Onconnected);
  47. connect(&m_timer, &QTimer::timeout, this,
  48. &mqtt_test::Published);
  49. connect(m_client, &Client::received, this,
  50. &mqtt_test::Received);
  51. connect(m_client, &Client::disconnected, this,
  52. &mqtt_test::OnDisconnected);
  53. }
  54. ~mqtt_test() { m_client->disconnectFromHost(); }
  55. public slots:
  56. void SetTopic(QString topic)
  57. {
  58. //m_port = PORT;
  59. //m_ip = IP;
  60. m_topic = topic;
  61. m_client->subscribe(topic, 0);
  62. }
  63. bool SetCallback(std::shared_ptr<Oncallback>ptr)
  64. {
  65. m_call = ptr;
  66. return true;
  67. }
  68. void Onconnected()
  69. {
  70. m_timer.start(1);
  71. qDebug() << "connected:";
  72. }
  73. void Published()
  74. {
  75. int x;
  76. {
  77. std::unique_lock < std::mutex >lock(m_mtx);
  78. if (!m_queue.empty())
  79. {
  80. x = m_queue.front();
  81. if (x == 99)
  82. {
  83. std::cout << x;
  84. }
  85. m_queue.pop();
  86. }
  87. }
  88. if (m_queue.empty())return;
  89. QMQTT::Message message(x, m_topic,QString("publish number %1").arg(x).toUtf8());
  90. m_client->publish(message);
  91. qDebug()<< "PUBLISH:" ;
  92. }
  93. void OnDisconnected()
  94. {
  95. qDebug() << "disconnected:";
  96. m_timer.stop();
  97. }
  98. void Received(const QMQTT::Message& message)
  99. {
  100. //开始回调
  101. m_call->Callback();
  102. qDebug() << "publish received: \"" << QString::fromUtf8(message.payload());
  103. }
  104. };
  105. #endif // !MQTTCLIENT_HPP

 代理收到了发布者的消息并显示在界面上。注意订阅主题需要按规范且保持一致。

 这里mqtt客户类的构造中,使用信号槽方式来接受和处理信号。

     mqtt_test(QHostAddress ip, quint16 port = 1883)
    {
        m_client = new Client();
        m_client->setHost(ip);
        m_client->setPort(port);
        //m_client->setHostName(EXAMPLE_NAME);
        m_client->connectToHost();
        connect(m_client, &Client::connected,this,
            &mqtt_test::Onconnected);
// 一旦收到了链接成功的信号则会触发Onconnected()函数
        connect(&m_timer, &QTimer::timeout, this,
            &mqtt_test::Published);
        
        connect(m_client, &Client::received, this,
            &mqtt_test::Received);//
这里没收到回复,则不调动槽函数执行Received().
        connect(m_client, &Client::disconnected, this,
            &mqtt_test::OnDisconnected);
    }

 需要注意的是,这里并没有用到集成mqtt类去实现(不会),而是直接使用qmqtt的头文件加DLL库的形式调动qmqtt。

这里qmqtt库的获取方式可直接去github上下载代码,拿到本地,编译成动态库即可。链接:

GitHub - emqx/qmqtt: MQTT client for Qt

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/黑客灵魂/article/detail/937097
推荐阅读
相关标签
  

闽ICP备14008679号