赞
踩
死信,顾名思义就是无法被消费的消息,有些时候由于某些原因导致队列中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,死信的来源如下:
basic.reject
或 basic.nack
)并且 requeue=false
使用场景:
消费者 C1 代码如下:
Channel channel = RabbitMqUtils.getChannel(); // 1.声明普通交换机 channel.exchangeDeclare("normal_exchange", BuiltinExchangeType.DIRECT); // 声明普通队列,最后一个参数设置死信交换机 channel.queueDeclare("normal_queue", false, false, false, Stream.of( // new AbstractMap.SimpleEntry<>("x-message-ttl", 10000), // 设置过期时间 10s,一般由生产者指定 new AbstractMap.SimpleEntry<>("x-dead-letter-exchange", "dead_exchange"),// 设置死信交换机名称 new AbstractMap.SimpleEntry<>("x-dead-letter-routing-key", "lisi") // 设置死信交换机 routingKey ).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))); // 普通绑定 channel.queueBind("normal_queue", "normal_exchange", "zhangsan"); // 2.声明死信交换机 channel.exchangeDeclare("dead_exchange", BuiltinExchangeType.DIRECT); // 声明死信队列 channel.queueDeclare("dead_queue", false, false, false, null); // 死信绑定 channel.queueBind("dead_queue", "dead_exchange", "lisi"); System.out.println("C1 等待接收消息"); // 3.消费消息 channel.basicConsume("normal_queue", true, (consumerTag, message) -> { System.out.println("C1 接收的消息:" + new String(message.getBody())); }, consumerTag -> { // 消费者被关闭时执行的操作 });
消费者 C2 代码如下:
Channel channel = RabbitMqUtils.getChannel();
System.out.println("C2 等待接收消息");
// .消费消息
channel.basicConsume("dead_queue", true, (consumerTag, message) -> {
System.out.println("C2 接收的消息:" + new String(message.getBody()));
}, consumerTag -> {
// 消费者被关闭时执行的操作
});
生产者 Producer 代码如下:
Channel channel = RabbitMqUtils.getChannel();
// 发送消息,设置 TTL 时间
for (int i = 0; i < 10; i++) {
String message = "xiaohua" + i;
channel.basicPublish("normal_exchange", "zhangsan",
new AMQP.BasicProperties()
.builder().expiration("10000").build() // 设置 TTL 时间
, message.getBytes());
}
先启动消费者 C1 声明好队列、交换机及其关系后再关闭(模拟超时),之后启动生产者 Producer 和 消费者 C2 ,此时消息会到死信队列被 C2 所消费
在声明普通队列时设置普通队列最大长度,代码如下:
// 声明普通队列,最后一个参数设置死信交换机
channel.queueDeclare("normal_queue", false, false, false, Stream.of(
new AbstractMap.SimpleEntry<>("x-max-length", 6),// 设置普通队列最大长度
new AbstractMap.SimpleEntry<>("x-dead-letter-exchange", "dead_exchange"),// 设置死信交换机名称
new AbstractMap.SimpleEntry<>("x-dead-letter-routing-key", "lisi") // 设置死信交换机 routingKey
).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
C1 拒绝消息,此时消息就会到死信队列,代码如下:
// 开启手动应答 channel.basicConsume("normal_queue", false, (consumerTag, message) -> { String msg = new String(message.getBody()); if (msg.equals("xiaohua2")) { // 拒绝 xiaohua2,死信队列中只会有 1 条 System.out.println(msg + ":此消息被 C1 拒绝"); // 第二个参数即 requeue,false 表示不重新放回普通队列 channel.basicReject(message.getEnvelope().getDeliveryTag(), false); } else { System.out.println("C1 接收的消息:" + msg); channel.basicAck(message.getEnvelope().getDeliveryTag(), false); } }, consumerTag -> { // 消费者被关闭时执行的操作 });
基于死信队列,即消息 TTL 过期,使用场景如下:
购买车票的基本架构图如下:
@Configuration public class RabbitmqConfig1 { //声明 X 交换机 @Bean public DirectExchange X(){ return new DirectExchange("X"); } //声明 Y 交换机 @Bean public DirectExchange Y(){ return new DirectExchange("Y"); } // 声明 QA 队列 @Bean public Queue QA(){ return QueueBuilder.durable("QA").withArguments( Stream.of( new AbstractMap.SimpleEntry<>("x-dead-letter-exchange", "Y"), // 绑定死信交换机 new AbstractMap.SimpleEntry<>("x-dead-letter-routing-key", "YD"), // 设置 routingKey new AbstractMap.SimpleEntry<>("x-message-ttl", 10000) // 设置过期时间 ).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))) .build(); } // 声明 QB 队列 @Bean public Queue QB(){ return QueueBuilder.durable("QB").withArguments( Stream.of( new AbstractMap.SimpleEntry<>("x-dead-letter-exchange", "Y"), // 绑定死信交换机 new AbstractMap.SimpleEntry<>("x-dead-letter-routing-key", "YD"), // 设置 routingKey new AbstractMap.SimpleEntry<>("x-message-ttl", 40000) // 设置过期时间 ).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))) .build(); } // 声明 QD 队列 @Bean public Queue QD(){ return QueueBuilder.durable("QD").build(); } // 绑定队列 QA 和交换机 X(QA 和死信交换机的绑定已经在声明 QA 时进行) @Bean public Binding queueQABingX(@Qualifier("QA") Queue QA, @Qualifier("X") DirectExchange X) { return BindingBuilder.bind(QA).to(X).with("XA"); } // 绑定队列 QB 和交换机 X @Bean public Binding queueQBBingX(@Qualifier("QB") Queue QB, @Qualifier("X") DirectExchange X) { return BindingBuilder.bind(QB).to(X).with("XB"); } // 绑定队列 QD 和交换机 Y @Bean public Binding queueQDBingY(@Qualifier("QD") Queue QD, @Qualifier("Y") DirectExchange Y) { return BindingBuilder.bind(QD).to(Y).with("YD"); } }
@Slf4j @RestController @RequestMapping("/ttl") public class sendMegController { @Resource private RabbitTemplate rabbitTemplate; @GetMapping("/sendMsg/{message}") public void sendMsg(@PathVariable String message) { log.info("当前时间:{},发送消息{}给 QA 和 QB 两个队列", new Date().toString(), message); rabbitTemplate.convertAndSend("X", "XA", "消息来自 ttl 为 10s 的队列:" + message); rabbitTemplate.convertAndSend("X", "XB", "消息来自 ttl 为 40s 的队列:" + message); } }
因为并没有设置方法使 QA 和 QB 处理消息,所以最终消息都会到队列 QD 上面,代码如下:
@Component
public class DeadQueueConsumer {
private static final Logger log = LoggerFactory.getLogger(DeadQueueConsumer.class);
@RabbitListener(queues = "QD")
public void receiveD(Message message) throws Exception {
String msg = new String(message.getBody());
log.info("当前时间{},收到延迟队列的消息:{}", new Date().toString(), msg);
}
}
问题:上面代码中,如果每添加一个新的时间需求,就要新增一个普通队列
只使用 1 个 QC 作为普通队列,该队列不设置 TTL,时间由生产者发送消息时指定
@Configuration public class RabbitmqConfig2 { // 声明 QC 队列 @Bean public Queue QC() { return QueueBuilder.durable("X").withArguments( Stream.of( new AbstractMap.SimpleEntry<>("x-dead-letter-exchange", "Y"), // 绑定死信交换机 new AbstractMap.SimpleEntry<>("x-dead-letter-routing-key", "YD") // 设置 routingKey ).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))) .build(); } // 绑定队列 QC 和交换机 X public Binding queueCBindingX(@Qualifier("QC") Queue QC, @Qualifier("X") DirectExchange X) { return BindingBuilder.bind(QC).to(X).with("XC"); } }
@RestController @RequestMapping("/ttl") public class sendMegController { private static final Logger log = LoggerFactory.getLogger(sendMegController.class); @Resource private RabbitTemplate rabbitTemplate; @GetMapping("/sendExpireMeg/{message}/{ttlTime}") public void sendMsg(@PathVariable String message, @PathVariable String ttlTime) { log.info("当前时间:{},发送 TTL 为{}的消息{}给 QC 队列", new Date().toString(), ttlTime, message); rabbitTemplate.convertAndSend("X", "XC", message, msg -> { // 设置发送消息的 TTL 时间 msg.getMessageProperties().setExpiration(ttlTime); return msg; }); } }
消费者代码不变
发送多个信息,如果第 1 个消息的 TTL 很长,而第 2 个消息的 TTL 很短,由于 rabbitmq 只会检查第 1 个消息是否过期,如果第 1 个消息还未过期,那么第 2 个消息并不会优先得到执行。
基于插件的延迟队列是在交换机上面做延迟,该交换机支持延迟投递机制 消息传递后并不会立即投递到目标队列中,而是存储在 mnesia (一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中。
@Configuration public class DelayedQueueConfig { // 声明交换机,基于插件的 @Bean public CustomExchange delayedExchange() { /** * 1.交换机的名称 * 2.交换机的类型 * 3.是否持久化 * 4.是否自动删除 * 5.其他参数 * */ return new CustomExchange("delayed.exchange", "x-delayed-message", true, false, Stream.of( new AbstractMap.SimpleEntry<>("x-delayed-type", "direct") // 延迟交换机类型 ).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)) ); } @Bean // 声明队列 public Queue delayedQueue() { return new Queue("delayed.queue"); } // 绑定 @Bean public Binding delayedBindingDelayedExchange( @Qualifier("delayedQueue") Queue delayedQueue, @Qualifier("delayedExchange") CustomExchange delayedExchange ) { return BindingBuilder.bind(delayedQueue).to(delayedExchange). with("delayed.routingkey").noargs(); } }
@RestController @RequestMapping("/ttl") public class sendMegController { private static final Logger log = LoggerFactory.getLogger(sendMegController.class); @Resource private RabbitTemplate rabbitTemplate; // 开始发基于插件的延迟消息 @GetMapping("/sendDelayMeg/{message}/{delayTime}") public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) { log.info("当前时间:{},发送延迟为{}的消息{}给 delayed.queue 队列", new Date().toString(), delayTime, message); rabbitTemplate.convertAndSend("delayed.exchange", "delayed.routingkey", message, msg -> { // 设置发送消息的延迟时间 msg.getMessageProperties().setDelay(delayTime); return msg; }); } }
@Component
public class DelayQueueConsumer {
private static final Logger log = LoggerFactory.getLogger(DelayQueueConsumer.class);
// 监听消息
@RabbitListener(queues = "delayed.queue")
public void receiveDelayQueue(Message message) {
String msg = new String(message.getBody());
log.info("当前时间:{},收到的延迟消息:{}", new Date(), msg);
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。