赞
踩
项目需求:原本做的项目是单进程单线程模式订阅mqtt,发现在消息回调处理消息时耗时较久,我们业务对消息处理是一次性的,只要求处理一次,所以需要提升并发处理能力。看了网上建议改为多线程模式,然而本人实践过程,采用多进程or多线程模式方式运行,发现并没达到预期效果。下面时本人的一下实践记录,仅供参考学习。
环境:python3.7
本地mqtt服务使用的emqx
操作工具用的MQTTX客户端
1、下面是mqtt多线程模式运行代码实现,只实现消息订阅端。
- import random, string
- from paho.mqtt.client import Client
- from threading import Thread
-
- broker = '192.168.8.205'
- port = 1883
- topic = "python-mqtt"
-
-
- def connect_mqtt():
- def on_connect(_, __, ___, rc):
- if rc == 0:
- print("Connected to MQTT Broker!")
- else:
- print("Failed to connect, return code %d\n", rc)
-
- client_id = f"test-client_{''.join(random.choice(string.ascii_lowercase) for _ in range(4))}"
- client = Client(client_id)
- client.on_connect = on_connect
- client.connect(broker, port)
- return client
-
-
- def subscribe(client):
- def on_message(_, __, mesgage):
- print(f"Received `{mesgage.payload.decode()}` from `{mesgage.topic}` topic\n")
- client.subscribe(topic)
- client.on_message = on_message
-
-
- def main():
- c = connect_mqtt()
- subscribe(c)
- c.loop_forever()
- # c.loop_start()
-
-
- if __name__ == '__main__':
-
- lt = []
- for i in range(10):
- t = Thread(target=main, args=(), name=f'thread-{i}')
- lt.append(t)
- for t in lt:
- t.start()
- print(t.name)
- for t in lt:
- t.join()

2、MQTTX连接发送消息:

3、运行效果,发现发布一条消息可以被接收10次(订阅客户端10个,分别被处理了10次),而我的需求是想要发布一条消息,被这个10个客户端之一消费掉,且只处理一次。
代码参考前文,修改如下


4、后来又换个思路,尝试了下还是10个线程,客户端唯一(client_id唯一),这种模式由于mqtt协议要求客户端唯一,导致10个线程并发启动,出现抢占式连接mqqt服务,出现不停的断开连接,重新连接。这种模式下运行客户端实际也只有一个,订阅处理等能力同于一个线程模式下的客户端方式,也无法达到预期。

-----------------------------------------------分割线-------------------------------
后面无意间找到一篇文章,了解到mqtt服务有一种叫共享订阅模式。以emqx为例,emqx支持两种格式的共享订阅前缀:$share/topic 和$queue/topic,然后通过修改emqx服务的配置etc/emqx.conf
如图:
消息发布时,topic配置不变,订阅时,沿用$share/topic 和$queue/topic即可。
代码参考前文修改如下:

运行效果:

可以看到程序运行只收到一条消息了
参考文章:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。