赞
踩
前面对MQTT进行了简单的介绍,并了解了如何在Linux上搭建MQTT 的运行 环境,参考链接:MQTT通信协议(mosquitto)在Linux上的环境构建与测试,那些仅仅是通过命令去测试,现在我们来通过mosquitto的官方源码,编程实现MQTT协议的发布订阅。
上一章中通过源码编译安装后,将会有几个我们需要用到的文件。对应路径如下:
mosquitto-1.6.10/lib/libmosquitto.so.1
mosquitto-1.6.10/lib/cpp/libmosquittopp.so.1
mosquitto-1.6.10/lib/mosquitto.h
mosquitto-1.6.10/lib/mosquittopp.h
下面的程序是使用标准C语言来实现,所以我们只需要用到mosquitto.h和libmosquitto.so.1两个文件就可以了。
mosquitto有同步和异步两种通讯方式。这里的异步是一种非阻塞的方式,比同步通信性能更好,因为同步的方式是“通信+等待”的阻塞模式,不过接下来先编写一下mosquitto同步函数,再写异步函数。
具体函数如下:
对mosquitto的函数说明可以参照官方网站的API解释:参考链接如:mosquitto_connect
订阅端(sub.c)
#include <stdio.h> #include <stdlib.h> #include <string.h> #include "mosquitto.h" #define HOST "localhost" #define PORT 1883 #define KEEP_ALIVE 60 #define MSG_MAX_SIZE 512 // 定义运行标志决定是否需要结束 static int running = 1; void my_connect_callback(struct mosquitto *mosq, void *obj, int rc) { printf("Call the function: on_connect\n"); if(rc){ // 连接错误,退出程序 printf("on_connect error!\n"); exit(1); }else{ // 订阅主题 // 参数:句柄、id、订阅的主题、qos if(mosquitto_subscribe(mosq, NULL, "topic1", 2)){ printf("Set the topic error!\n"); exit(1); } } } void my_disconnect_callback(struct mosquitto *mosq, void *obj, int rc) { printf("Call the function: my_disconnect_callback\n"); running = 0; } void my_subscribe_callback(struct mosquitto *mosq, void *obj, int mid, int qos_count, const int *granted_qos) { printf("Call the function: on_subscribe\n"); } void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg) { printf("Call the function: on_message\n"); printf("Recieve a message of %s : %s\n", (char *)msg->topic, (char *)msg->payload); if(0 == strcmp(msg->payload, "quit")){ mosquitto_disconnect(mosq); } } int main() { int ret; struct mosquitto *mosq; // 初始化mosquitto库 ret = mosquitto_lib_init(); if(ret){ printf("Init lib error!\n"); return -1; } // 创建一个订阅端实例 // 参数:id(不需要则为NULL)、clean_start、用户数据 mosq = mosquitto_new("sub_test", true, NULL); if(mosq == NULL){ printf("New sub_test error!\n"); mosquitto_lib_cleanup(); return -1; } // 设置回调函数 // 参数:句柄、回调函数 mosquitto_connect_callback_set(mosq, my_connect_callback); mosquitto_disconnect_callback_set(mosq, my_disconnect_callback); mosquitto_subscribe_callback_set(mosq, my_subscribe_callback); mosquitto_message_callback_set(mosq, my_message_callback); // 连接至服务器 // 参数:句柄、ip(host)、端口、心跳 ret = mosquitto_connect(mosq, HOST, PORT, KEEP_ALIVE); if(ret){ printf("Connect server error!\n"); mosquitto_destroy(mosq); mosquitto_lib_cleanup(); return -1; } // 开始通信:循环执行、直到运行标志running被改变 printf("Start!\n"); while(running) { mosquitto_loop(mosq, -1, 1); } // 结束后的清理工作 mosquitto_destroy(mosq); mosquitto_lib_cleanup(); printf("End!\n"); return 0; }
发布端(pub.c)
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include "mosquitto.h" #define HOST "localhost" #define PORT 1883 #define KEEP_ALIVE 60 #define MSG_MAX_SIZE 512 static int running = 1; void my_connect_callback(struct mosquitto *mosq, void *obj, int rc) { printf("Call the function: my_connect_callback\n"); } void my_disconnect_callback(struct mosquitto *mosq, void *obj, int rc) { printf("Call the function: my_disconnect_callback\n"); running = 0; } void my_publish_callback(struct mosquitto *mosq, void *obj, int mid) { printf("Call the function: my_publish_callback\n"); } int main() { int ret; struct mosquitto *mosq; char buff[MSG_MAX_SIZE]; //初始化libmosquitto库 ret = mosquitto_lib_init(); if(ret){ printf("Init lib error!\n"); return -1; } //创建一个发布端实例 mosq = mosquitto_new("pub_test", true, NULL); if(mosq == NULL){ printf("New pub_test error!\n"); mosquitto_lib_cleanup(); return -1; } //设置回调函数 mosquitto_connect_callback_set(mosq, my_connect_callback); mosquitto_disconnect_callback_set(mosq, my_disconnect_callback); mosquitto_publish_callback_set(mosq, my_publish_callback); // 连接至服务器 // 参数:句柄、ip(host)、端口、心跳 ret = mosquitto_connect(mosq, HOST, PORT, KEEP_ALIVE); if(ret){ printf("Connect server error!\n"); mosquitto_destroy(mosq); mosquitto_lib_cleanup(); return -1; } printf("Start!\n"); //mosquitto_loop_start作用是开启一个线程,在线程里不停的调用 mosquitto_loop() 来处理网络信息 int loop = mosquitto_loop_start(mosq); if(loop != MOSQ_ERR_SUCCESS) { printf("mosquitto loop error\n"); return 1; } while(fgets(buff, MSG_MAX_SIZE, stdin) != NULL) { /*发布消息*/ mosquitto_publish(mosq,NULL,"topic1",strlen(buff)+1,buff,0,0); memset(buff,0,sizeof(buff)); } mosquitto_destroy(mosq); mosquitto_lib_cleanup(); printf("End!\n"); return 0; }
makefile文件
all:
@echo ""
@echo "Start compiling......"
@echo ""
gcc -o sub sub.c -lmosquitto
gcc -o pub pub.c -lmosquitto
@echo "end"
sub:
gcc -o sub sub.c -lmosquitto
pub:
gcc -o pub pub.c -lmosquitto
clean:
-rm sub pub
1、先执行make
2、启动mosquitto服务器
3、运行pub和sub
4、测试发布订阅信息
异步函数与同步函数两者的差别就是在连接服务器的connect函数、loop循环函数。那接下来就简单探究一下loop函数的调用方式:同步函数是调用mosquitto_loop函数来阻塞等待实现的一种通信;而查看源码我们就会发现,异步方式的"loop"函数就是创建了一个线程去完成同步方式中导致阻塞等待的mosquitto_loop函数,其调用过程如下:
mosquitto_loop_start(mosq); // 异步方式的loop
pthread_create(&mosq->thread_id, NULL, mosquitto__thread_main, mosq)
mosquitto_loop_forever(mosq, 1000*86400, 1);
mosquitto_loop(mosq, timeout, max_packets); // 同步方式的loop
mosquitto_loop_stop(mosq, false);
pthread_cancel(mosq->thread_id);
pthread_join(mosq->thread_id, NULL);
了解同步异步函数的调用区别之后,我们继续看一下异步方式连接服务器函数mosquitto_connect_async的官方说明:
订阅端(sub.c)
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include "mosquitto.h" #define HOST "localhost" #define PORT 1883 #define KEEP_ALIVE 60 #define MSG_MAX_SIZE 512 // 定义运行标志决定是否需要结束 static int running = 1; void my_connect_callback(struct mosquitto *mosq, void *obj, int rc) { printf("Call the function: on_connect\n"); if(rc){ // 连接错误,退出程序 printf("on_connect error!\n"); exit(1); }else{ // 订阅主题 // 参数:句柄、id、订阅的主题、qos if(mosquitto_subscribe(mosq, NULL, "topic2", 2)){ printf("Set the topic error!\n"); exit(1); } } } void my_disconnect_callback(struct mosquitto *mosq, void *obj, int rc) { printf("Call the function: my_disconnect_callback\n"); running = 0; } void my_subscribe_callback(struct mosquitto *mosq, void *obj, int mid, int qos_count, const int *granted_qos) { printf("Call the function: on_subscribe\n"); } void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg) { printf("Call the function: on_message\n"); printf("Recieve a message of %s : %s\n", (char *)msg->topic, (char *)msg->payload); if(0 == strcmp(msg->payload, "quit")){ mosquitto_disconnect(mosq); } } int main() { int ret; struct mosquitto *mosq; // 初始化mosquitto库 ret = mosquitto_lib_init(); if(ret){ printf("Init lib error!\n"); return -1; } // 创建一个订阅端实例 // 参数:id(不需要则为NULL)、clean_start、用户数据 mosq = mosquitto_new("sub_test", true, NULL); if(mosq == NULL){ printf("New sub_test error!\n"); mosquitto_lib_cleanup(); return -1; } // 设置回调函数 // 参数:句柄、回调函数 mosquitto_connect_callback_set(mosq, my_connect_callback); mosquitto_disconnect_callback_set(mosq, my_disconnect_callback); mosquitto_subscribe_callback_set(mosq, my_subscribe_callback); mosquitto_message_callback_set(mosq, my_message_callback); // 连接至服务器 // 参数:句柄、ip(host)、端口、心跳 ret = mosquitto_connect_async(mosq, HOST, PORT, KEEP_ALIVE); if(ret){ printf("Connect server error!\n"); mosquitto_destroy(mosq); mosquitto_lib_cleanup(); return -1; } ret = mosquitto_loop_start(mosq); if(ret){ printf("Start loop error!\n"); mosquitto_destroy(mosq); mosquitto_lib_cleanup(); return -1; } // 开始通信:循环执行、直到运行标志g_iRunFlag被改变 printf("Start!\n"); while(running) { //mosquitto_loop(mosq, -1, 1); sleep(1); } // 结束后的清理工作 mosquitto_loop_stop(mosq, false); mosquitto_destroy(mosq); mosquitto_lib_cleanup(); printf("End!\n"); return 0; }
发布端(pub.c)
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include "mosquitto.h" #define HOST "localhost" #define PORT 1883 #define KEEP_ALIVE 60 #define MSG_MAX_SIZE 512 static int running = 1; void my_connect_callback(struct mosquitto *mosq, void *obj, int rc) { printf("Call the function: my_connect_callback\n"); } void my_disconnect_callback(struct mosquitto *mosq, void *obj, int rc) { printf("Call the function: my_disconnect_callback\n"); running = 0; } void my_publish_callback(struct mosquitto *mosq, void *obj, int mid) { printf("Call the function: my_publish_callback\n"); } int main() { int ret; struct mosquitto *mosq; char buff[MSG_MAX_SIZE]; // 初始化mosquitto库 ret = mosquitto_lib_init(); if(ret){ printf("Init lib error!\n"); return -1; } // 创建一个发布端实例 // 参数:id(不需要则为NULL)、clean_start、用户数据 mosq = mosquitto_new("pub_test", true, NULL); if(mosq == NULL){ printf("New pub_test error!\n"); mosquitto_lib_cleanup(); return -1; } // 设置回调函数 // 参数:句柄、回调函数 mosquitto_connect_callback_set(mosq, my_connect_callback); mosquitto_disconnect_callback_set(mosq, my_disconnect_callback); mosquitto_publish_callback_set(mosq, my_publish_callback); // 连接至服务器 // 参数:句柄、ip(host)、端口、心跳 // ret = mosquitto_connect_async(mosq, HOST, PORT, KEEP_ALIVE); ret = mosquitto_connect_async(mosq, HOST, PORT, KEEP_ALIVE); if(ret){ printf("Connect server error!\n"); mosquitto_destroy(mosq); mosquitto_lib_cleanup(); return -1; } int loop = mosquitto_loop_start(mosq); if(loop != MOSQ_ERR_SUCCESS) { printf("mosquitto loop error\n"); return 1; } // 开始通信:循环执行、直到运行标志g_iRunFlag被改变 printf("Start!\n"); while(fgets(buff, MSG_MAX_SIZE, stdin) != NULL) { /*发布消息*/ mosquitto_publish(mosq,NULL,"topic2",strlen(buff)+1,buff,0,0); memset(buff,0,sizeof(buff)); } // 结束后的清理工作 mosquitto_loop_stop(mosq, false); mosquitto_destroy(mosq); mosquitto_lib_cleanup(); printf("End!\n"); return 0; }
makefile文件同上:
all:
@echo ""
@echo "Start compiling......"
@echo ""
gcc -o sub sub.c -lmosquitto
gcc -o pub pub.c -lmosquitto
@echo "end"
sub:
gcc -o sub sub.c -lmosquitto
pub:
gcc -o pub pub.c -lmosquitto
clean:
-rm sub pub
1、先执行make
2、启动mosquitto服务器
3、运行pub和sub
4、测试发布订阅信息
客户端1代码:pub.c
#include <stdio.h> #include <stdlib.h> #include <mosquitto.h> #include <string.h> #define HOST "localhost" #define PORT 1883 #define KEEP_ALIVE 60 #define MSG_MAX_SIZE 512 bool session = true; void my_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message) { if(message->payloadlen){ printf("%s %s", message->topic, (char *)message->payload); }else{ printf("%s (null)\n", message->topic); } fflush(stdout); } void my_connect_callback(struct mosquitto *mosq, void *userdata, int result) { int i; if(!result){ /* Subscribe to broker information topics on successful connect. */ mosquitto_subscribe(mosq, NULL, "topic2 ", 2); }else{ fprintf(stderr, "Connect failed\n"); } } void my_subscribe_callback(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos) { int i; printf("Subscribed (mid: %d): %d", mid, granted_qos[0]); for(i=1; i<qos_count; i++){ printf(", %d", granted_qos[i]); } printf("\n"); } void my_log_callback(struct mosquitto *mosq, void *userdata, int level, const char *str) { printf("%s\n", str); } int main() { struct mosquitto *mosq = NULL; char buff[MSG_MAX_SIZE]; //libmosquitto 库初始化 mosquitto_lib_init(); //创建mosquitto客户端 mosq = mosquitto_new(NULL,session,NULL); if(!mosq){ printf("create client failed..\n"); mosquitto_lib_cleanup(); return 1; } //设置回调函数,需要时可使用 mosquitto_log_callback_set(mosq, my_log_callback); mosquitto_connect_callback_set(mosq, my_connect_callback); mosquitto_message_callback_set(mosq, my_message_callback); mosquitto_subscribe_callback_set(mosq, my_subscribe_callback); //连接服务器 if(mosquitto_connect(mosq, HOST, PORT, KEEP_ALIVE)){ fprintf(stderr, "Unable to connect.\n"); return 1; } //开启一个线程,在线程里不停的调用 mosquitto_loop() 来处理网络信息 int loop = mosquitto_loop_start(mosq); if(loop != MOSQ_ERR_SUCCESS) { printf("mosquitto loop error\n"); return 1; } while(fgets(buff, MSG_MAX_SIZE, stdin) != NULL) { /*发布消息*/ mosquitto_publish(mosq,NULL,"topic1 ",strlen(buff)+1,buff,0,0); memset(buff,0,sizeof(buff)); } mosquitto_destroy(mosq); mosquitto_lib_cleanup(); return 0; }
客户端2代码:sub.c
#include <stdio.h> #include <stdlib.h> #include <mosquitto.h> #include <string.h> #define HOST "localhost" #define PORT 1883 #define KEEP_ALIVE 60 #define MSG_MAX_SIZE 512 #define TOPIC_NUM 3 bool session = true; void my_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message) { if(message->payloadlen){ printf("%s %s", message->topic, (char *)message->payload); }else{ printf("%s (null)\n", message->topic); } fflush(stdout); } void my_connect_callback(struct mosquitto *mosq, void *userdata, int result) { int i; if(!result){ /* Subscribe to broker information topics on successful connect. */ mosquitto_subscribe(mosq, NULL, "topic1 ", 2); }else{ fprintf(stderr, "Connect failed\n"); } } void my_subscribe_callback(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos) { int i; printf("Subscribed (mid: %d): %d", mid, granted_qos[0]); for(i=1; i<qos_count; i++){ printf(", %d", granted_qos[i]); } printf("\n"); } void my_log_callback(struct mosquitto *mosq, void *userdata, int level, const char *str) { /* Pring all log messages regardless of level. */ printf("%s\n", str); } int main() { struct mosquitto *mosq = NULL; char buff[MSG_MAX_SIZE]; //libmosquitto 库初始化 mosquitto_lib_init(); //创建mosquitto客户端 mosq = mosquitto_new(NULL,session,NULL); if(!mosq){ printf("create client failed..\n"); mosquitto_lib_cleanup(); return 1; } //设置回调函数,需要时可使用 mosquitto_log_callback_set(mosq, my_log_callback); mosquitto_connect_callback_set(mosq, my_connect_callback); mosquitto_message_callback_set(mosq, my_message_callback); mosquitto_subscribe_callback_set(mosq, my_subscribe_callback); //连接服务器 if(mosquitto_connect(mosq, HOST, PORT, KEEP_ALIVE)){ fprintf(stderr, "Unable to connect.\n"); return 1; } //开启一个线程,在线程里不停的调用 mosquitto_loop() 来处理网络信息 int loop = mosquitto_loop_start(mosq); if(loop != MOSQ_ERR_SUCCESS) { printf("mosquitto loop error\n"); return 1; } while(fgets(buff, MSG_MAX_SIZE, stdin) != NULL) { /*发布消息*/ mosquitto_publish(mosq,NULL,"topic2 ",strlen(buff)+1,buff,0,0); memset(buff,0,sizeof(buff)); } mosquitto_destroy(mosq); mosquitto_lib_cleanup(); return 0; }
makefile文件
all:
@echo ""
@echo "Start compiling......"
@echo ""
gcc -o sub sub.c -lmosquitto
gcc -o pub pub.c -lmosquitto
@echo "end"
sub:
gcc -o sub sub.c -lmosquitto
pub:
gcc -o pub pub.c -lmosquitto
clean:
-rm sub pub
1、先执行make
2、启动mosquitto服务器
3、运行pub和sub
4、测试发布订阅信息,每个客户端都可以发布和订阅,双向收发。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。