当前位置:   article > 正文

RabbitMQ 之 死信队列_rabbitmq 死信队列

rabbitmq 死信队列

目录

​编辑一、死信的概念

二、死信的来源

三、死信实战

1、代码架构图

2、消息 TTL 过期

(1)消费者

(2)生产者

(3)结果展示​编辑

 3、队列达到最大长度

(1)消费者

(2)生产者

(3)结果展示

4、消息被拒

(1)消费者

(2)生产者

(3)结果展示


一、死信的概念

先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理
解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息
消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效


二、死信的来源

1、消息 TTL 过期
2、队列达到最大长度(队列满了,无法再添加数据到 mq 中)
3、消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.


三、死信实战

1、代码架构图

生产者正常情况下走的是普通的交换机,这个交换机的类型是 direct ,它和普通队列之间的关系是一个叫 "zhangsan" 的路由 key, 正常情况下会被 C1 消费。

但是发生了上面所说的三种情况中的一种,成为了死信,然后被转换到死信交换机中,这个死信交换机也是 direct 类型,它们之间的 routingKey 是 "lisi",然后就进入了死信队列,死信队列由  C2 消费。


2、消息 TTL 过期

(1)消费者

  1. // 死信队列 实战
  2. // 消费者 1
  3. public class Comsumer01 {
  4. // 普通交换机名称
  5. public static final String NORMAL_EXCHANGE = "normal_exchange";
  6. // 死信交换机名称
  7. public static final String DEAD_EXCHANGE = "dead_exchange";
  8. // 普通队列名称
  9. public static final String NORMAL_QUEUE = "normal_queue";
  10. // 死信队列名称
  11. public static final String DEAD_QUEUE = "dead_queue";
  12. public static void main(String[] args) throws IOException, TimeoutException {
  13. Channel channel = RabbitMqUtils.getChannel();
  14. // 交换机的声明
  15. channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
  16. channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
  17. // 普通队列的声明
  18. Map<String, Object> arguments = new HashMap<>();
  19. // 过期时间
  20. //arguments.put("x-message-ttl",100000);
  21. // 正常队列设置死信交换机
  22. arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
  23. // 设置死信 RoutingKey
  24. arguments.put("x-dead-letter-routing-key","lisi");
  25. channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
  26. // 死信队列的声明
  27. channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
  28. // 绑定普通的交换机与普通队列
  29. channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
  30. // 绑定死信的交换机与死信的队列
  31. channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
  32. System.out.println("等待接收消息.....");
  33. DeliverCallback deliverCallback = ( consumerTag, message) ->{
  34. System.out.println("Consumer01 接收的消息是: " + new String(message.getBody(),"UTF-8"));
  35. };
  36. CancelCallback cancelCallback = consumerTag->{};
  37. channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,cancelCallback);
  38. }
  39. }

  1. // 消费者 2
  2. public class Comsumer02 {
  3. // 死信队列名称
  4. public static final String DEAD_QUEUE = "dead_queue";
  5. public static void main(String[] args) throws IOException, TimeoutException {
  6. Channel channel = RabbitMqUtils.getChannel();
  7. System.out.println("等待接收消息.....");
  8. DeliverCallback deliverCallback = ( consumerTag, message) ->{
  9. System.out.println("Consumer02 接收的消息是: " + new String(message.getBody(),"UTF-8"));
  10. };
  11. CancelCallback cancelCallback = consumerTag->{};
  12. channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);
  13. }
  14. }

(2)生产者

  1. // 死信队列 生产者代码
  2. public class Producer {
  3. // 普通交换机名称
  4. public static final String NORMAL_EXCHANGE = "normal_exchange";
  5. public static void main(String[] args) throws IOException, TimeoutException {
  6. Channel channel = RabbitMqUtils.getChannel();
  7. // 死信消息 设置 TTL 的时间
  8. AMQP.BasicProperties properties = new AMQP.BasicProperties().
  9. builder().expiration("10000").build();
  10. for (int i = 1; i < 11; i++) {
  11. String message = "info" + i;
  12. channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
  13. }
  14. }
  15. }

(3)结果展示


 3、队列达到最大长度

(1)消费者

  1. // 死信队列 实战
  2. // 消费者 1
  3. public class Comsumer01 {
  4. // 普通交换机名称
  5. public static final String NORMAL_EXCHANGE = "normal_exchange";
  6. // 死信交换机名称
  7. public static final String DEAD_EXCHANGE = "dead_exchange";
  8. // 普通队列名称
  9. public static final String NORMAL_QUEUE = "normal_queue";
  10. // 死信队列名称
  11. public static final String DEAD_QUEUE = "dead_queue";
  12. public static void main(String[] args) throws IOException, TimeoutException {
  13. Channel channel = RabbitMqUtils.getChannel();
  14. // 交换机的声明
  15. channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
  16. channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
  17. // 普通队列的声明
  18. Map<String, Object> arguments = new HashMap<>();
  19. // 过期时间
  20. //arguments.put("x-message-ttl",100000);
  21. // 正常队列设置死信交换机
  22. arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
  23. // 设置死信 RoutingKey
  24. arguments.put("x-dead-letter-routing-key","lisi");
  25. // 设置正常队列的长度的限制
  26. arguments.put("x-max-length",6);
  27. channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
  28. // 死信队列的声明
  29. channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
  30. // 绑定普通的交换机与普通队列
  31. channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
  32. // 绑定死信的交换机与死信的队列
  33. channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
  34. System.out.println("等待接收消息.....");
  35. DeliverCallback deliverCallback = ( consumerTag, message) ->{
  36. System.out.println("Consumer01 接收的消息是: " + new String(message.getBody(),"UTF-8"));
  37. };
  38. CancelCallback cancelCallback = consumerTag->{};
  39. channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,cancelCallback);
  40. }
  41. }
  1. // 死信队列 实战
  2. // 消费者 2
  3. public class Comsumer02 {
  4. // 死信队列名称
  5. public static final String DEAD_QUEUE = "dead_queue";
  6. public static void main(String[] args) throws IOException, TimeoutException {
  7. Channel channel = RabbitMqUtils.getChannel();
  8. System.out.println("等待接收消息.....");
  9. DeliverCallback deliverCallback = ( consumerTag, message) ->{
  10. System.out.println("Consumer02 接收的消息是: " + new String(message.getBody(),"UTF-8"));
  11. };
  12. CancelCallback cancelCallback = consumerTag->{};
  13. channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);
  14. }
  15. }

(2)生产者

  1. // 死信队列 生产者代码
  2. public class Producer {
  3. // 普通交换机名称
  4. public static final String NORMAL_EXCHANGE = "normal_exchange";
  5. public static void main(String[] args) throws IOException, TimeoutException {
  6. Channel channel = RabbitMqUtils.getChannel();
  7. // 死信消息 设置 TTL 的时间
  8. /*AMQP.BasicProperties properties = new AMQP.BasicProperties().
  9. builder().expiration("10000").build();*/
  10. for (int i = 1; i < 11; i++) {
  11. String message = "info" + i;
  12. channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null,message.getBytes());
  13. }
  14. }
  15. }

(3)结果展示


4、消息被拒

(1)消费者

  1. // 死信队列 实战
  2. // 消费者 1
  3. public class Comsumer01 {
  4. // 普通交换机名称
  5. public static final String NORMAL_EXCHANGE = "normal_exchange";
  6. // 死信交换机名称
  7. public static final String DEAD_EXCHANGE = "dead_exchange";
  8. // 普通队列名称
  9. public static final String NORMAL_QUEUE = "normal_queue";
  10. // 死信队列名称
  11. public static final String DEAD_QUEUE = "dead_queue";
  12. public static void main(String[] args) throws IOException, TimeoutException {
  13. Channel channel = RabbitMqUtils.getChannel();
  14. // 交换机的声明
  15. channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
  16. channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
  17. // 普通队列的声明
  18. Map<String, Object> arguments = new HashMap<>();
  19. // 过期时间
  20. //arguments.put("x-message-ttl",100000);
  21. // 正常队列设置死信交换机
  22. arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
  23. // 设置死信 RoutingKey
  24. arguments.put("x-dead-letter-routing-key","lisi");
  25. // 设置正常队列的长度的限制
  26. // arguments.put("x-max-length",6);
  27. channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
  28. // 死信队列的声明
  29. channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
  30. // 绑定普通的交换机与普通队列
  31. channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
  32. // 绑定死信的交换机与死信的队列
  33. channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
  34. System.out.println("等待接收消息.....");
  35. DeliverCallback deliverCallback = ( consumerTag, message) ->{
  36. String msg = new String(message.getBody(),"UTF-8");
  37. if (msg.equals("info5")){
  38. System.out.println("Consumer01 接收的消息是: " + msg + ": 此消息是被 C1 拒绝的");
  39. // 拒绝,且不放囧普通队列
  40. channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
  41. }else {
  42. System.out.println("Consumer01 接收的消息是: " + new String(message.getBody(),"UTF-8"));
  43. channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
  44. }
  45. };
  46. CancelCallback cancelCallback = consumerTag->{};
  47. // 开启手动应答
  48. channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,cancelCallback);
  49. }
  50. }
  1. // 死信队列 实战
  2. // 消费者 2
  3. public class Comsumer02 {
  4. // 死信队列名称
  5. public static final String DEAD_QUEUE = "dead_queue";
  6. public static void main(String[] args) throws IOException, TimeoutException {
  7. Channel channel = RabbitMqUtils.getChannel();
  8. System.out.println("等待接收消息.....");
  9. DeliverCallback deliverCallback = ( consumerTag, message) ->{
  10. System.out.println("Consumer02 接收的消息是: " + new String(message.getBody(),"UTF-8"));
  11. };
  12. CancelCallback cancelCallback = consumerTag->{};
  13. channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);
  14. }
  15. }

(2)生产者

  1. // 死信队列 生产者代码
  2. public class Producer {
  3. // 普通交换机名称
  4. public static final String NORMAL_EXCHANGE = "normal_exchange";
  5. public static void main(String[] args) throws IOException, TimeoutException {
  6. Channel channel = RabbitMqUtils.getChannel();
  7. // 死信消息 设置 TTL 的时间
  8. /*AMQP.BasicProperties properties = new AMQP.BasicProperties().
  9. builder().expiration("10000").build();*/
  10. for (int i = 1; i < 11; i++) {
  11. String message = "info" + i;
  12. channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null,message.getBytes());
  13. }
  14. }
  15. }

(3)结果展示

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/930127
推荐阅读
相关标签
  

闽ICP备14008679号