当前位置:   article > 正文

RabbitMQ(python)_python消息队列rabbitmq入门详解

python消息队列rabbitmq入门详解

 一、认识MQ

MQ全称为Message Queue 消息队列(MQ)是一种应用程序对应用程序的通信方法。MQ是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取队列中的消息。这样发布者和使用者都不用知道对方的存在。

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

 如下图所示:

二、为什么要MQ 

消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有ActiveMQ,RabbitMQ,Kafka,RocketMQ。

接下来利用一个外卖系统的消息推送给大家解释下MQ的意义。

三、RabbitMQ 

RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。RabbitMQ是一个消息代理 - 一个消息系统的媒介。它可以为你的应用提供一个通用的消息发送和接收平台,并且保证消息在传输过程中的安全。使用消息中间件利于应用之间的解耦,生产者(客户端)无需知道消费者(服务端)的存在。而且两端可以使用不同的语言编写,大大提供了灵活性。

四、RabbitMQ的安装

【Windows安装RabbitMQ详细教程】_rabbitmq windows-CSDN博客

五、RabbitMQ的工作模型

5.1简单模式

  1. #生产者
  2. import pika
  3. connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
  4. channel = connection.channel()
  5. #创建队列
  6. channel.queue_declare(queue='hello')
  7. #往队列插入数据
  8. channel.basic_publish(exchange='',
  9. routing_key='hello',
  10. body='Hello World!')
  11. print(" [x] Sent 'Hello World!'")
  12. # 消费者
  13. import pika
  14. connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
  15. channel = connection.channel()
  16. channel.queue_declare(queue='hello')
  17. #回调函数
  18. def callback(ch, method, properties, body):
  19. print(" [x] Received %r" % body)
  20. #确定监听队列
  21. channel.basic_consume(queue='hello',
  22. auto_ack=True,#默认应答
  23. on_message_callback=callback)
  24. print(' [*] Waiting for messages. To exit press CTRL+C')
  25. channel.start_consuming()

5.2RabbitMQ参数

应答参数

  1. #生产者代码不变
  2. #消费者
  3. import pika
  4. connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
  5. channel = connection.channel()
  6. channel.queue_declare(queue='hello')
  7. #回调函数
  8. def callback(ch, method, properties, body):
  9. print(" [x] Received %r" % body)
  10. ch.basic_ack(delivery_tag=method.delivery_tag) #添加这一句
  11. #确定监听队列
  12. channel.basic_consume(queue='hello',
  13. auto_ack=False,#改为False,手动应答
  14. on_message_callback=callback)
  15. print(' [*] Waiting for messages. To exit press CTRL+C')
  16. channel.start_consuming()

默认应答的意思是当生产者向消费者发送数据的时候,假如消费者在处理数据的过程中出现了一些错误,这时候生产者队列中的数据不会进行保留,当再次运行消费者的时候,数据已经丢失。手动应答就是在处理数据过程中,如果出现了错误,生产者队列的数据不会丢失,还保留在那,直到消费者向生产者传递了信号ch.basic_ack(delivery_tag=method.delivery_tag),这时候,生产者中的数据才会删除。

分发参数

有两个消费者同时监听一个的队列。其中一个线程sleep2秒,另一个消费者线程sleep1秒,但是处理的消息是一样多。这种方式叫轮询分发不管谁忙,都不会多给消息,总是你一个我一个。想要做到公平分发(根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配;),必须关闭自动应答ack,改成手动应答。使用basicQos(perfetch=1)限制每次只发送不超过1条消息到同一个消费者,消费者必须手动反馈告知队列,才会发送下一个。

  1. ### 消费者
  2. import pika
  3. connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
  4. channel = connection.channel()
  5. channel.queue_declare(queue='hello',durable=True)
  6. #回调函数
  7. def callback(ch, method, properties, body):
  8. print(" [x] Received %r" % body)
  9. #公平分发
  10. channel.basic_qos(prefetch_count=1)
  11. #确定监听队列
  12. channel.basic_consume(queue='hello',
  13. auto_ack=False,#手动应答
  14. on_message_callback=callback)
  15. print(' [*] Waiting for messages. To exit press CTRL+C')
  16. channel.start_consuming()

持久化参数

持久化的意思就是把数据写入磁盘;举个例子:当生产者往队列里面插入数据的时候i,此时RabbitMQ突然崩了,而消费者一开始没有去监听这个队列,当它崩掉的时候,消费者此时去监听这个队列,这个队列是没有信息了的,因为RabbitMQ往队列插入的数据是写到内存中的,而它已经崩掉了,那就意味着数据丢失了。要解决这个问题就需要把数据写入磁盘。

  1. #只对生产者代码进行修改,这里注意,如果之前声明过一个队列,此时就不能再次把它重新定义为持久化队列
  2. #持久化参数解决的是RabbitMQ在运行过程中突然宕机导致数据丢失的问题
  3. #声明queue
  4. channel.queue_declare(queue='hello2', durable=True) # 若声hello2明过,则换一个名字
  5. channel.basic_publish(exchange='',
  6. routing_key='hello2',
  7. body='Hello World!',
  8. properties=pika.BasicProperties(
  9. delivery_mode=2, # make message persistent
  10. )
  11. )

5.3交换机模式

发布订阅

发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。

  1. # 生产者
  2. import pika
  3. connection = pika.BlockingConnection(pika.ConnectionParameters(
  4. host='localhost'))
  5. channel = connection.channel()
  6. #声明交换机 名字为logs 模式为发布订阅(fanout)
  7. channel.exchange_declare(exchange='logs',
  8. exchange_type='fanout')
  9. message = "info: Hello World!"
  10. #向logs交换机插入数据
  11. channel.basic_publish(exchange='logs',
  12. routing_key='',
  13. body=message)
  14. print(" [x] Sent %r" % message)
  15. connection.close()
  16. # 消费者
  17. import pika
  18. connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
  19. channel = connection.channel()
  20. #声明交换机 名字为logs 模式为发布订阅(fanout)
  21. channel.exchange_declare(exchange='logs',
  22. exchange_type='fanout')
  23. #声明队列,名字是系统随机的,
  24. result = channel.queue_declare("",exclusive=True)
  25. #获取队列名字
  26. queue_name = result.method.queue
  27. #将指定队列绑定到logs交换机上
  28. channel.queue_bind(exchange='logs',
  29. queue=queue_name)
  30. print(' [*] Waiting for logs. To exit press CTRL+C')
  31. def callback(ch, method, properties, body):
  32. print(" [x] %r" % body)
  33. channel.basic_consume(queue=queue_name,
  34. auto_ack=True,
  35. on_message_callback=callback)
  36. channel.start_consuming()

连续启动同一份代码可能会报错,解决如下:

关键字模式

关键字模式的意思就是消费者通过绑定关键字来确定消息是否是自己需要的。之前的发布订阅模式是生产者往交换机中插入数据,而交换机则把数据插入到消费者生产的队列中,无论消息是否是消费者需要的,只要有数据都会往里面插入。而通过关键字绑定就能避开这个问题。一个消费者可以绑定多个关键字。而生产者在发送消息的时候需要指定消息类型,即routing_key。代码如下:

  1. # 生产者
  2. import pika
  3. connection = pika.BlockingConnection(pika.ConnectionParameters(
  4. host='localhost'))
  5. channel = connection.channel()
  6. #声明交换机 名字为logs2 模式为关键字模式(direct)
  7. channel.exchange_declare(exchange='logs2',
  8. exchange_type='direct')
  9. message = "info: Hello Yuan!"
  10. #绑定关键字routing_key='info'
  11. channel.basic_publish(exchange='logs2',
  12. routing_key='info',
  13. body=message)
  14. print(" [x] Sent %r" % message)
  15. connection.close()
  16. # 消费者
  17. import pika
  18. import sys
  19. connection = pika.BlockingConnection(pika.ConnectionParameters(
  20. host='localhost'))
  21. channel = connection.channel()
  22. channel.exchange_declare(exchange='logs2',
  23. exchange_type='direct')
  24. result = channel.queue_declare("",exclusive=True)
  25. queue_name = result.method.queue
  26. severities = sys.argv[1:]
  27. if not severities:
  28. sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
  29. sys.exit(1)
  30. #绑定多个关键字
  31. for severity in severities:
  32. channel.queue_bind(exchange='logs2',
  33. queue=queue_name,
  34. routing_key=severity)
  35. def callback(ch, method, properties, body):
  36. print(" [x] %r" % body)
  37. channel.basic_consume(queue=queue_name,
  38. auto_ack=True,
  39. on_message_callback=callback)
  40. channel.start_consuming()

通配符

通配符交换机”与之前的路由模式相比,它将信息的传输类型的key更加细化,以“key1.key2.keyN....”的模式来指定信息传输的key的大类型和大类型下面的小类型,让消费者可以更加精细的确认自己想要获取的信息类型。而在消费者一段,不用精确的指定具体到哪一个大类型下的小类型的key,而是可以使用类似正则表达式(但与正则表达式规则完全不同)的通配符在指定一定范围或符合某一个字符串匹配规则的key,来获取想要的信息。

“通配符交换机”(Topic Exchange)将路由键和某模式进行匹配。此时队列需要绑定在一个模式上。符号“#”匹配一个或多个词,符号“*”仅匹配一个词因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*”只会匹配到“audit.irs”。(这里与我们一般的正则表达式的“*”和“#”刚好相反,这里我们需要注意一下。)
下面是一个解释通配符模式交换机工作的一个样例

  1. # 生产者
  2. import pika
  3. connection = pika.BlockingConnection(pika.ConnectionParameters(
  4. host='localhost'))
  5. channel = connection.channel()
  6. #声明交换机 名字为logs3 模式为通配符模式(topic)
  7. channel.exchange_declare(exchange='logs3',
  8. exchange_type='topic')
  9. message = "info: Hello ERU!"
  10. channel.basic_publish(exchange='logs3',
  11. routing_key='europe.news',
  12. body=message)
  13. print(" [x] Sent %r" % message)
  14. connection.close()
  15. # 消费者
  16. import pika
  17. import sys
  18. connection = pika.BlockingConnection(pika.ConnectionParameters(
  19. host='localhost'))
  20. channel = connection.channel()
  21. #声明交换机 名字为logs3 模式为通配符模式(topic)
  22. channel.exchange_declare(exchange='logs3',
  23. exchange_type='topic')
  24. result = channel.queue_declare("",exclusive=True)
  25. queue_name = result.method.queue
  26. #通配符“#”匹配一个或多个词,符号“*”仅匹配一个词,因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*”只会匹配到“audit.irs”。(这里与我们一般的正则表达式的“*”和“#”刚好相反,这里我们需要注意一下。)
  27. channel.queue_bind(exchange='logs3',
  28. queue=queue_name,
  29. routing_key="#.news")
  30. print(' [*] Waiting for logs. To exit press CTRL+C')
  31. def callback(ch, method, properties, body):
  32. print(" [x] %r" % body)
  33. channel.basic_consume(queue=queue_name,
  34. auto_ack=True,
  35. on_message_callback=callback)
  36. channel.start_consuming()

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/人工智能uu/article/detail/956946
推荐阅读
相关标签
  

闽ICP备14008679号