当前位置:   article > 正文

Linux环境下使用Eclipse Paho C 实现(MQTT Client)同步模式发布和订阅Message_eclipse paho c client

eclipse paho c client

目录

概述

1 同步模式和异步模式

1.1 同步模式

1.2 异步模式

2 下载和安装paho.mqtt.c

3 同步方式发布和订阅消息功能实现

3.1 MQTT Client参数配置

3.2 初始化MQTT Client

3.3 发布消息功能

3.4 订阅消息功能

3.5 解析订阅的信息

4 编译和测试

4.1 编译代码

4.2 运行

5 验证MQTT Client功能

5.1 EMQX服务器上查看MQTT Client

5.2 MQTT.fx发布Topic

5.3 MQTT.fx订阅的主题

6 完整代码


概述

本文主要介绍在linux环境(ubuntu)环境下,下载和安装Eclipse Paho C MQTT 软件包,还编写一个范例实现同步发布Message的功能,并使用基于EMQX的服务验证其功能,还是用MQTT.fx订阅消息,已验证发布消息功能的可靠性。

1 同步模式和异步模式

1.1 同步模式

在同步模式下,客户机应用程序在单个线程上运行。使用MQTTClient_publish()和MQTTClient_publishMessage()函数发布消息。要确定QoS1或QoS2(请参阅服务质量)消息已成功交付,应用程必须调用MQTTClient_waitForCompletion()函数。同步发布示例中显示了显示同步发布的示例。在同步模式下接消息使用MQTTClient_receive()函数。客户机应用程序必须相对频繁地调用 MQTTClient_receive()MQTTClient_yield(),以便允许处理确认和MQTT“ping”,从而保持与服务器的网络连接处于活动状态。

总结同步模式应用方法

1)客户机应用程序在单个线程上运行

2)使用MQTTClient_publish()或者MQTTClient_publishMessage()发布消息

3)使用MQTTClient_waitForCompletion()确认消息是否发布成功

4)使用MQTTClient_receive()接收消息

5)必须频繁调用MQTTClient_receive()和MQTTClient_yield(),以确认消息

1.2 异步模式

在异步模式下,客户机应用程序在多个线程上运行。主程序调用客户端库中的函数来发布和订阅,就像同步模式一样。但是,握手和维护网络连接的处理是在后台执行的。使用调用MQTTClient_setCallbacks()(参见MQTTClient_messageArrived()、MQTTClient_connectionLost()和MQTTClient_deliveryComplete())向库注册的回调,向客户端应用程序提供状态通知和消息接收。然而,这个API不是线程安全的——在没有同步的情况下,不可能从多个线程调用它。可以为此使用MQTTAsync API来实现这些功能。

总结异步模式应用方法

1)客户机应用程序在多个线程上运行

2)主程序调用客户端库中的函数来发布和订阅,使用MQTTClient_publish()或者MQTTClient_publishMessage()发布消息;使用MQTTClient_publishMessage订阅消息

3)使用调用MQTTClient_setCallbacks(),向客户端应用程序提供状态通知和消息接收

异步模式的详细使用范例,参看文章:

Linux环境下使用Eclipse Paho C 实现(MQTT Client)异步订阅Message-CSDN博客

Linux环境下使用Eclipse Paho C 实现(MQTT Client)异步方式发布Message-CSDN博客

2 下载和安装paho.mqtt.c

登录mqtt官网,点击Software,可以看见如下页面,选择Eclipse Paho C进入下载页面

https://mqtt.org/

下载paho.mqtt.c

笔者选择使用命令直接安装该软件包,具体操作步骤如下:

Step -1: 下载软件包执行命令:

git clone https://github.com/eclipse/paho.mqtt.c.git

step-2: 进入paho.mqtt.c目录,执行make

  1. cd paho.mqtt.c
  2. make

系统会自动编译代码,等待编译结果。

编译完成后,会自动生成build文件,这时可以安装

step-3 : 执行如下命令就可以安装软件

sudo make install

3 同步方式发布和订阅消息功能实现

3.1 MQTT Client参数配置

初始化MQTT Client,必须配置一些参数,包括broker的IP地址,订阅的topic等,具体参数如下表所示:

参数功能介绍:

参数名称参数值描述
ADDRESStcp://192.168.1.11:1883mqtt broker的IP地址
CLIENTIDmqtt_ubuntu_asys设备ID
TOPICMQTTAsync发布的Topic
SUBTOPICswitch订阅的Topic
PAYLOAD12.56Topic下的payload
QOS1服务质量等级=1
TIMEOUT10000L超时计数
USERNAMEmqtt_ubuntu_user终端认证username
PASSWORD123456终端认证username对应的password

在代码中定义这些参数的位置:

3.2 初始化MQTT Client

初始化MQTT终端需要完成以下2个步骤:

step-1: 创建MQTT Client

step-2: 连接服务器

具体实现代码如下:

代码66行:创建MQTT Client,需要传入服务器IP和Client ID信息

代码73行: 心跳包时间间隔设置为20s

代码74行: 清除会话 标记设置为1,不接受离线消息

代码75行: 配置设备终端用户

代码76行: 配置设备终端用户password

代码78行: MQTT连接Broker

3.3 发布消息功能

要实现发布消息功能,需要将payload及其相关参数填到MQTTClient_message定义的数据结构中,下面介绍整个public message 函数的功能。

代码39行: 装载payload

代码40行:payload的字符长度

代码41行:消息服务等级参数

代码42行:配置为保留消息

代码44行:使用MQTTClient_publishMessage函数发布消息

代码53行:使用MQTTClient_waitForCompletion等待消息发布完毕

3.4 订阅消息功能

要取消订阅的Topic,调用MQTTClient_unsubscribe函数可以实现该功能。实现范例如下:

3.5 解析订阅的信息

代码69行: 使用MQTTClient_receive接收订阅的消息

代码70行: 通过检测rc的值,并判断topic的值是否有效,以确定是否要解析消息

4 编译和测试

4.1 编译代码

使用如下命令编译代码

 gcc test_03_Synchronous.c -lpaho-mqtt3c -lpthread

4.2 运行

执行.out文件后,可以看见,MQTT Client订阅和发布消息成功了,服务器端收到消息后,token值会自动加1

5 验证MQTT Client功能

5.1 EMQX服务器上查看MQTT Client

在ubuntu上运行MQTT Client后,EMQX服务器会显示MQTT Client的运行状态,登录EMQX服务器可以看见

在订阅管理面板上,也可以看见mqtt_ubuntu_asys订阅了Topic为"switch"

5.2 MQTT.fx发布Topic

使用MQTT.fx发布Topic为switch的消息,Client ID 为mqtt_ubuntu_asys的客户端订阅了该消息,那么当MQTT.fx发布消息之后,mqtt_ubuntu_asys会收到该消息,并在终端上打印出来。

要使用MQTT.fx MQTT Client工具订阅MQTTsync,首先保证MQTT.fx能正常连接至EMQX服务器

使用MQTT.fx发布Topic为switch的消息

  1. {
  2. "switch": false
  3. }

使用MQTT.fx发布Topic为switch的消息

  1. {
  2. "switch": false
  3. }

在EMQX的保留信息页面,查看MQTT.fx发布Topic为switch的消息,该信息和Client ID 为mqtt_ubuntu_asys的客户端

5.3 MQTT.fx订阅的主题

在EMQX的订阅管理页面,查看MQTT.fx订阅Topic为MQTTsync的消息,该信息和Client ID 为mqtt_ubuntu_asys的客户端发布的消息

在MQTT.fx上查看Topic为MQTTsync的消息

6 完整代码

创建test_03_Synchronous.c,编写如下代码:

  1. /***************************************************************
  2. Copyright 2024-2029. All rights reserved.
  3. 文件名 : test_03_Synchronous.c
  4. 作者   : tangmingfei2013@126.com
  5. 版本   : V1.0
  6. 描述   : mqtt同步发布和订阅消息
  7. 日志   : 初版V1.0 2024/03/13
  8. ***************************************************************/
  9. #include <stdio.h>
  10. #include <stdlib.h>
  11. #include <string.h>
  12. #include <unistd.h>
  13. #include <pthread.h>
  14. #include "MQTTClient.h"
  15. #define ADDRESS     "tcp://192.168.1.11:1883"
  16. #define CLIENTID   "mqtt_ubuntu_asys"
  17. #define TOPIC       "MQTTsync"
  18. #define SUBTOPIC   "switch"
  19. #define PAYLOAD     "12.56"
  20. #define QOS         1
  21. #define TIMEOUT     10000L
  22. #define USERNAME   "mqtt_ubuntu_user"
  23. #define PASSWORD   "123456"
  24. static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
  25. static MQTTClient_message pubmsg = MQTTClient_message_initializer;
  26. static MQTTClient client;
  27. static MQTTClient_deliveryToken deliveredtoken, temptoken;
  28. int count;
  29. MQTTClient_deliveryToken user_publicMsg( void )
  30. {
  31.    MQTTClient_deliveryToken token;
  32.    int rc;
  33.        
  34.    pubmsg.payload = PAYLOAD;
  35.    pubmsg.payloadlen = (int)strlen(PAYLOAD);
  36.    pubmsg.qos = QOS;
  37.    pubmsg.retained = 1;
  38.    
  39.    if ((rc = MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token)) != MQTTCLIENT_SUCCESS)
  40.   {
  41.         printf("Failed to publish message, return code %d\n", rc);
  42.         exit(EXIT_FAILURE);
  43.   }
  44.    printf("Waiting for up to %d seconds for publication of %s\n"
  45.            "on topic %s for client with ClientID: %s\n",
  46.           (int)(TIMEOUT/1000), PAYLOAD, TOPIC, CLIENTID);
  47.    rc = MQTTClient_waitForCompletion(client, token, TIMEOUT);
  48.    printf("Message with delivery token %d delivered\n", token);
  49.    
  50.    return token;
  51. }
  52. int receive_subMessage( void )
  53. {
  54.    int topicLen;
  55.    int rc;
  56.    char *topic = SUBTOPIC;
  57.    MQTTClient_message *message = NULL;
  58.    
  59.    rc = MQTTClient_receive( client, &topic, &topicLen, &message, TIMEOUT);
  60.    if( rc == MQTTCLIENT_SUCCESS && topic!= NULL ){
  61.        printf("Message arrived \n");
  62.        printf("     topic: %s\n", topic);
  63.        printf("   message: %.*s\n", message->payloadlen, (char*)message->payload);
  64.   }
  65.    printf("MQTTClient receive message, return code %d\n", rc);
  66.    
  67.    return rc;
  68. }
  69. /* 线程 */
  70. void thread_subMsg(void)
  71. {
  72.    usleep(1000000L);
  73.    count++;
  74. }
  75. int main(int argc, char* argv[])
  76. {
  77.    int current_cnt;
  78.    pthread_t id;
  79.    int ret;
  80.    int rc;
  81.    deliveredtoken = 0;
  82.    if ((rc = MQTTClient_create(&client, ADDRESS, CLIENTID,
  83.        MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTCLIENT_SUCCESS)
  84.   {
  85.         printf("Failed to create client, return code %d\n", rc);
  86.         exit(EXIT_FAILURE);
  87.   }
  88.    conn_opts.keepAliveInterval = 20;
  89.    conn_opts.cleansession = 1;
  90.    conn_opts.username = USERNAME;   //用户名
  91.    conn_opts.password = PASSWORD;   //密码
  92.    
  93.    if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
  94.   {
  95.        printf("Failed to connect, return code %d\n", rc);
  96.        exit(EXIT_FAILURE);
  97.   }
  98.    
  99.    printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n",
  100.            SUBTOPIC, CLIENTID, QOS);
  101.            
  102.    if ((rc = MQTTClient_subscribe(client, SUBTOPIC, QOS)) != MQTTCLIENT_SUCCESS)
  103.   {
  104.        printf("Failed to subscribe, return code %d\n", rc);
  105.        rc = EXIT_FAILURE;
  106.   }
  107.    
  108.    ret = pthread_create(&id,NULL,(void *) thread_subMsg,NULL);
  109.    if(ret!=0){
  110.        printf ("Create pthread error!\n");
  111.        exit (1);
  112.   }
  113.    while(1)
  114.   {
  115.        receive_subMessage();
  116.        temptoken = user_publicMsg();
  117.        
  118.        if(count != current_cnt )
  119.       {
  120.            current_cnt = count;
  121.            if(temptoken != deliveredtoken){
  122.                deliveredtoken = temptoken;
  123.           }
  124.       }
  125.   }
  126.    if ((rc = MQTTClient_disconnect(client, 10000)) != MQTTCLIENT_SUCCESS)
  127.        printf("Failed to disconnect, return code %d\n", rc);
  128.    MQTTClient_destroy(&client);
  129.    
  130.    return rc;
  131. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/722363
推荐阅读
相关标签
  

闽ICP备14008679号