当前位置:   article > 正文

rabbitMQ引入死信队列_php rabbitmq 死信队列

php rabbitmq 死信队列

一、基本概念

1、死信定义

        指的是,从队列当中取出来的消息,到达消费方后,因为某些原因导致消息并没有被正常消费掉,这些没有被后续处理的消息就是“死信”,而保存死信的队列,就是死信队列

2、死信出现的场景举例

        为了保证订单业务的消息数据不丢失,需要使用死信队列机制,在消息消费发生异常的时候,将消息给投入到死信队列当中;

          用户在商城下单成功并点进去准备支付,超过指定时间未支付时,消息自动失效成为死信(消息超时情况);

3、死信的来源

        消息TTL过期;

        队列已经到达最大长度,数据无法再添加到MQ;

        消息被拒绝了;

4、死信架构图

分析:

对于消息生产者而言,只需要关注将消息发送给交换机即可;

而对于普通消费者C1而言,需要关注普通交换机、普通队列、死信交换机的相关信息,要做两次绑定操作(普通交换机和普通队列绑定,普通队列和死信队列绑定),难点在于---普通队列怎么与死信交换机进行绑定?

而消费者C2也是一个普通消费者,专注于死信队列当中消息的处理,需要关注死信交换机、死信队列的信息,在死信交换机和死信队列绑定之后,从队列当中拿到死信进行处理;

二、代码部分

(0)提前准备工具类

封装获取MQ的connection方法,以及释放资源的方法

  1. public class AMQPUtils {
  2. //用于获取客户端和MQ绑定的连接对象
  3. public static Connection getConnection() throws Exception{
  4. ConnectionFactory factory = new ConnectionFactory();
  5. factory.setVirtualHost("/test");//用于隔离资源的虚拟主机
  6. factory.setHost("MQServer的ip地址");
  7. factory.setPort(5672);
  8. factory.setUsername("zhangsan");
  9. factory.setPassword("1234");
  10. Connection connection = factory.newConnection();
  11. return connection;
  12. }
  13. //释放资源
  14. public static void close(Channel channel,Connection connection) throws Exception{
  15. channel.close();
  16. connection.close();
  17. }
  18. }

(一)模拟消息超时情况

1、消息发布者

  1. public class Provider {
  2. public static void main(String[] args) throws Exception {
  3. Connection connection = AMQPUtils.getConnection();
  4. Channel channel = connection.createChannel();
  5. //声明一个普通交换机
  6. String normalExchange = "normal_Exchange";
  7. channel.exchangeDeclare(normalExchange,"direct");
  8. String key = "zhangsan";
  9. //设置消息的ttl时间为5s
  10. AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("5000").build();
  11. //发布消息-发布多条消息验证
  12. for (int i = 1; i <= 10 ; i++) {
  13. channel.basicPublish(normalExchange,key,properties,("message---->"+i).getBytes());
  14. }
  15. //释放资源
  16. AMQPUtils.close(channel,connection);
  17. }
  18. }

2、消费者C1

代码:

  1. public class Consumer1 {
  2. public static void main(String[] args) throws Exception{
  3. Connection connection = AMQPUtils.getConnection();
  4. Channel channel = connection.createChannel();
  5. //提前准备一些名字
  6. String normalExchange = "normal_exchange";
  7. String deadExchange = "dead_exchange";
  8. String normalQueue = "normal_queue";
  9. String deadQueue = "dead_queue";
  10. String normal_key = "zhangsan";
  11. String dead_key = "lisi";
  12. //声明普通交换机
  13. channel.exchangeDeclare(normalExchange,"direct");
  14. //声明死信交换机
  15. channel.exchangeDeclare(deadExchange,"direct");
  16. //设置普通队列当中需要携带的其他信息(死信交换机、死信队列、路由key)
  17. Map<String, Object> params = new HashMap<>();
  18. params.put("x-dead-letter-exchange",deadExchange);
  19. params.put("x-dead-letter-routing-key",dead_key);
  20. //声明普通队列
  21. channel.queueDeclare(normalQueue,false,false,false,params);
  22. //声明死信队列
  23. channel.queueDeclare(deadQueue,false,false,false,null);
  24. //binding
  25. //将普通交换机和普通队列绑定
  26. channel.queueBind(normalQueue,normalExchange,normal_key);
  27. //将将死信交换机和死信队列绑定
  28. channel.queueBind(deadQueue,deadExchange,dead_key);
  29. //消费消息
  30. channel.basicConsume(normalQueue,true,new DefaultConsumer(channel){
  31. @Override
  32. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  33. System.out.println("C1消费的消息是---->"+new String(body));
  34. }
  35. });
  36. }
  37. }

注意:开启C1接受普通交换机的消息之后,关闭C1,让普通队列当中的消息超过超时时间,成为死信,后被死信交换机路由进入dead_queue当中,下图所示的就是消息超时之后进入死信队列:

点击dead_queue可以查看具体的死信来源、交换机、路由key等信息;

此时:注意此时死信消息是保存在MQ当中的;

3、消费者c2

消费者C2要去消费死信队列当中的消息:

  1. public class Consumer2 {
  2. public static void main(String[] args)throws Exception {
  3. Connection connection = AMQPUtils.getConnection();
  4. Channel channel = connection.createChannel();
  5. //准备一些名字
  6. String deadExchange = "dead_exchange";
  7. String deadQueue = "dead_queue";
  8. String key = "lisi";
  9. //声明死信交换机
  10. channel.exchangeDeclare(deadExchange,"direct");
  11. //声明死信队列
  12. channel.queueDeclare(deadQueue,false,false,false,null);
  13. //交换机和队列绑定
  14. channel.queueBind(deadQueue,deadExchange,key);
  15. //消费死信消息
  16. channel.basicConsume(deadQueue,true,new DefaultConsumer(channel){
  17. @Override
  18. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  19. System.out.println("C2消费了死信----->"+new String(body));
  20. }
  21. });
  22. }
  23. }

控制台结果:

此时存储在死信队列当中的消息已经被C2消费了!

(二)模拟队列达到最大长度

请提前在MQ的控制台上,将情况1当中设置的队列给删除;

1、消息发布者

  1. public class Provider {
  2. public static void main(String[] args) throws Exception {
  3. Connection connection = AMQPUtils.getConnection();
  4. Channel channel = connection.createChannel();
  5. //声明一个普通交换机
  6. String normalExchange = "normal_exchange";
  7. channel.exchangeDeclare(normalExchange,"direct");
  8. String key = "zhangsan";
  9. //发布消息-发布多条消息验证
  10. for (int i = 1; i <= 10 ; i++) {
  11. channel.basicPublish(normalExchange,key,null,("message---->"+i).getBytes());
  12. }
  13. //释放资源
  14. AMQPUtils.close(channel,connection);
  15. }
  16. }

2、消费者C1

  1. public class Consumer1 {
  2. public static void main(String[] args) throws Exception{
  3. Connection connection = AMQPUtils.getConnection();
  4. Channel channel = connection.createChannel();
  5. //提前准备一些名字
  6. String normalExchange = "normal_exchange";
  7. String deadExchange = "dead_exchange";
  8. String normalQueue = "normal_queue";
  9. String deadQueue = "dead_queue";
  10. String normal_key = "zhangsan";
  11. String dead_key = "lisi";
  12. //声明普通交换机
  13. channel.exchangeDeclare(normalExchange,"direct");
  14. //声明死信交换机
  15. channel.exchangeDeclare(deadExchange,"direct");
  16. //设置普通队列当中需要携带的其他信息(死信交换机、死信队列、路由key)
  17. Map<String, Object> params = new HashMap<>();
  18. params.put("x-dead-letter-exchange",deadExchange);
  19. params.put("x-dead-letter-routing-key",dead_key);
  20. //设置普通队列的最大长度
  21. params.put("x-max-length",6);
  22. //声明普通队列
  23. channel.queueDeclare(normalQueue,false,false,false,params);
  24. //声明死信队列
  25. channel.queueDeclare(deadQueue,false,false,false,null);
  26. //binding
  27. //将普通交换机和普通队列绑定
  28. channel.queueBind(normalQueue,normalExchange,normal_key);
  29. //将将死信交换机和死信队列绑定
  30. channel.queueBind(deadQueue,deadExchange,dead_key);
  31. //消费消息
  32. channel.basicConsume(normalQueue,true,new DefaultConsumer(channel){
  33. @Override
  34. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  35. System.out.println("C1消费的消息是---->"+new String(body));
  36. }
  37. });
  38. }
  39. }

开启C1,然后关闭,再启动消息生产者,结果:

3、消费者C2

  1. public class Consumer2 {
  2. public static void main(String[] args)throws Exception {
  3. Connection connection = AMQPUtils.getConnection();
  4. Channel channel = connection.createChannel();
  5. //准备一些名字
  6. String deadExchange = "dead_exchange";
  7. String deadQueue = "dead_queue";
  8. String key = "lisi";
  9. //声明死信交换机
  10. channel.exchangeDeclare(deadExchange,"direct");
  11. //声明死信队列
  12. channel.queueDeclare(deadQueue,false,false,false,null);
  13. //交换机和队列绑定
  14. channel.queueBind(deadQueue,deadExchange,key);
  15. //消费死信消息
  16. channel.basicConsume(deadQueue,true,new DefaultConsumer(channel){
  17. @Override
  18. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  19. System.out.println("C2消费了死信----->"+new String(body));
  20. }
  21. });
  22. }
  23. }

结果:

死信队列当中的消息已经被消费了;

(三)模拟消息被拒绝

1、消息发布者

  1. public class Provider {
  2. public static void main(String[] args) throws Exception {
  3. Connection connection = AMQPUtils.getConnection();
  4. Channel channel = connection.createChannel();
  5. //声明一个普通交换机
  6. String normalExchange = "normal_exchange";
  7. channel.exchangeDeclare(normalExchange,"direct");
  8. String key = "zhangsan";
  9. //发布消息-发布多条消息验证
  10. for (int i = 1; i <= 10 ; i++) {
  11. channel.basicPublish(normalExchange,key,null,("message---->"+i).getBytes());
  12. }
  13. //释放资源
  14. AMQPUtils.close(channel,connection);
  15. }
  16. }

2、消费者C1

  1. public class Consumer1 {
  2. public static void main(String[] args) throws Exception{
  3. Connection connection = AMQPUtils.getConnection();
  4. final Channel channel = connection.createChannel();
  5. //提前准备一些名字
  6. String normalExchange = "normal_exchange";
  7. String deadExchange = "dead_exchange";
  8. String normalQueue = "normal_queue";
  9. String deadQueue = "dead_queue";
  10. String normal_key = "zhangsan";
  11. String dead_key = "lisi";
  12. //声明普通交换机
  13. channel.exchangeDeclare(normalExchange,"direct");
  14. //声明死信交换机
  15. channel.exchangeDeclare(deadExchange,"direct");
  16. //设置普通队列当中需要携带的其他信息(死信交换机、死信队列、路由key)
  17. Map<String, Object> params = new HashMap<>();
  18. params.put("x-dead-letter-exchange",deadExchange);
  19. params.put("x-dead-letter-routing-key",dead_key);
  20. //声明普通队列
  21. channel.queueDeclare(normalQueue,false,false,false,params);
  22. //声明死信队列
  23. channel.queueDeclare(deadQueue,false,false,false,null);
  24. //binding
  25. //将普通交换机和普通队列绑定
  26. channel.queueBind(normalQueue,normalExchange,normal_key);
  27. //将将死信交换机和死信队列绑定
  28. channel.queueBind(deadQueue,deadExchange,dead_key);
  29. //消费消息--注意关闭自动确认
  30. channel.basicConsume(normalQueue,false,new DefaultConsumer(channel){
  31. @Override
  32. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  33. /*System.out.println("C1消费的消息是---->"+new String(body));*/
  34. //模拟消息被拒绝--把所有消息都拒绝
  35. channel.basicReject(envelope.getDeliveryTag(),false);
  36. }
  37. });
  38. }
  39. }

注意:先启动C1,然后关闭,启动消息发布者,结果如下:

这10条消息目前都保存在MQ当中,然后再启动C1,把消息全部拒绝掉,让它们成为死信:

点击dead_queue,去查看死信队列当中的一些信息:

3、消费者C2

  1. public class Consumer2 {
  2. public static void main(String[] args)throws Exception {
  3. Connection connection = AMQPUtils.getConnection();
  4. Channel channel = connection.createChannel();
  5. //准备一些名字
  6. String deadExchange = "dead_exchange";
  7. String deadQueue = "dead_queue";
  8. String key = "lisi";
  9. //声明死信交换机
  10. channel.exchangeDeclare(deadExchange,"direct");
  11. //声明死信队列
  12. channel.queueDeclare(deadQueue,false,false,false,null);
  13. //交换机和队列绑定
  14. channel.queueBind(deadQueue,deadExchange,key);
  15. //消费死信消息
  16. channel.basicConsume(deadQueue,true,new DefaultConsumer(channel){
  17. @Override
  18. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  19. System.out.println("C2消费了死信----->"+new String(body));
  20. }
  21. });
  22. }
  23. }

启动消费者C2,消费死信队列当中的消息!

三、小结

        死信队列的出现,是为了保存因为特殊原因无法被消费的消息,避免消息直接失效!这些消息通过rabbitMQ的死信队列机制,可以保存在MQ服务的死信队列当中,等待被其他的消费者进行处理!

 需要注意的是:

       只有针对消息的设置会放在消息发布方进行,队列等操作,因为发布方无法自己决定消息被路由到哪个队列,只能决定把消息交给哪个交换机,以及给定路由规则;

      对于消息消费方而言,需要确定交换机、消息队列,已经完成 交换机和队列的绑定操作,所以针对于队列的设置都是放在消费方完成的!

        

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/运维做开发/article/detail/930116
推荐阅读
相关标签
  

闽ICP备14008679号