当前位置:   article > 正文

paho-mqtt实现多客户端订阅一个主题,并保证消息只被接收一次_mqtt 多个订阅者 只允许一个接收

mqtt 多个订阅者 只允许一个接收

项目需求:原本做的项目是单进程单线程模式订阅mqtt,发现在消息回调处理消息时耗时较久,我们业务对消息处理是一次性的,只要求处理一次,所以需要提升并发处理能力。看了网上建议改为多线程模式,然而本人实践过程,采用多进程or多线程模式方式运行,发现并没达到预期效果。下面时本人的一下实践记录,仅供参考学习。

环境:python3.7

本地mqtt服务使用的emqx

操作工具用的MQTTX客户端

 1、下面是mqtt多线程模式运行代码实现,只实现消息订阅端。

  1. import random, string
  2. from paho.mqtt.client import Client
  3. from threading import Thread
  4. broker = '192.168.8.205'
  5. port = 1883
  6. topic = "python-mqtt"
  7. def connect_mqtt():
  8. def on_connect(_, __, ___, rc):
  9. if rc == 0:
  10. print("Connected to MQTT Broker!")
  11. else:
  12. print("Failed to connect, return code %d\n", rc)
  13. client_id = f"test-client_{''.join(random.choice(string.ascii_lowercase) for _ in range(4))}"
  14. client = Client(client_id)
  15. client.on_connect = on_connect
  16. client.connect(broker, port)
  17. return client
  18. def subscribe(client):
  19. def on_message(_, __, mesgage):
  20. print(f"Received `{mesgage.payload.decode()}` from `{mesgage.topic}` topic\n")
  21. client.subscribe(topic)
  22. client.on_message = on_message
  23. def main():
  24. c = connect_mqtt()
  25. subscribe(c)
  26. c.loop_forever()
  27. # c.loop_start()
  28. if __name__ == '__main__':
  29. lt = []
  30. for i in range(10):
  31. t = Thread(target=main, args=(), name=f'thread-{i}')
  32. lt.append(t)
  33. for t in lt:
  34. t.start()
  35. print(t.name)
  36. for t in lt:
  37. t.join()

 2、MQTTX连接发送消息:

 3、运行效果,发现发布一条消息可以被接收10次(订阅客户端10个,分别被处理了10次),而我的需求是想要发布一条消息,被这个10个客户端之一消费掉,且只处理一次。

代码参考前文,修改如下

 4、后来又换个思路,尝试了下还是10个线程,客户端唯一(client_id唯一),这种模式由于mqtt协议要求客户端唯一,导致10个线程并发启动,出现抢占式连接mqqt服务,出现不停的断开连接,重新连接。这种模式下运行客户端实际也只有一个,订阅处理等能力同于一个线程模式下的客户端方式,也无法达到预期。

-----------------------------------------------分割线------------------------------- 

后面无意间找到一篇文章,了解到mqtt服务有一种叫共享订阅模式。以emqx为例,emqx支持两种格式的共享订阅前缀:$share/topic 和$queue/topic,然后通过修改emqx服务的配置etc/emqx.conf

如图:

消息发布时,topic配置不变,订阅时,沿用$share/topic 和$queue/topic即可。

代码参考前文修改如下:

 运行效果:

 可以看到程序运行只收到一条消息了

 参考文章:

   1.(mqtt集群订阅如何只消费一个(一次)消息? - 程序新视界

   2.(paho-mqtt 实现通信_nuc_baixu的博客-CSDN博客_paho mqtt

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/article/detail/42865
推荐阅读
相关标签
  

闽ICP备14008679号