当前位置:   article > 正文

RabbitMQ高级特性-死信队列和延迟队列_deadletterroutingkey

deadletterroutingkey

一、初识死信交换机

如下情况的消息会成为死信:

  • 消费者使用basic.reject或basic.nack声明消费失败,并且消息的requeue参数参数设置为false
  • 消息时一个过期消息,超时无人消费
  • 要投递的队列消息堆积满了,最早的消息可能成为死信

如果该队列配置了dead-letter-exchange属性,指定了一个交换机,name队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)

在这里插入图片描述

队列发现消费者无法处理消息时,比如失败次数太多,队列就会根据配置的死信交换机,路由到死信队列,在之前的消费失败处理策略中RepublishMessageRecoverer(重试耗尽后,将失败消息投递到指定的交换机),是由消费者来投递到error.queue。比如下对比:
在这里插入图片描述

二、如何绑定死信交换机

  • 给队列指定dead-letter-exchange属性,指定一个交换机
  • 给队列设置dead-letter-routing-key属性,设置死信交换机与死信队列的RoutingKey

三、TTL

TTL(Time-To-Live),如果一个队列中的消息TTL结束仍未消费,则会变成死信,ttl超时分为两种情况:

  • 消息所在的队列设置了存活时间
  • 消息本身设置了存活时间

在这里插入图片描述

如果设置了两个时间时,以最短时间计算。

1、应用场景

超时未处理的订单

2、申明延迟消息的消费者

/**
     * 延迟消息的消费者
     * @param msg
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "dl.queue", durable = "true"),
            exchange = @Exchange(name = "dl.direct"),
            key = "dl"
    ))
    public void listenDlQueue(String msg) {
        log.info("消费者接收到了dl.queue的延迟消息");
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

3、申明死信队列

/**
 * 声明死信队列
 * @author edevp
 */
@Configuration
public class TTLMessageConfig {

    @Bean
    public DirectExchange ttlDirectExchange(){
        return new DirectExchange("ttl.direct");
    }

    @Bean
    public Queue ttlQueue(){
        return QueueBuilder
                .durable("ttl.queue")
                .ttl(10000)
                .deadLetterExchange("dl.direct")
                .deadLetterRoutingKey("dl")
                .build();
    }

    @Bean
    public Binding ttlBinding(){
        return BindingBuilder.bind(ttlQueue()).to(ttlDirectExchange()).with("ttl");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

4、测试发送消息

@Test
    public void testTTLMessage() {
        // 1.准备消息
        Message message = MessageBuilder
                .withBody("hello, ttl messsage".getBytes(StandardCharsets.UTF_8))
                // 不设置默认是持久化
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                .setExpiration("5000")
                .build();
        // 2.发送消息
        rabbitTemplate.convertAndSend("ttl.direct", "ttl", message);
        // 3.记录日志
        log.info("消息已经成功发送!");
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

监听过了5秒之后(因为队列申明是10秒,但是消息自身是5秒,取最小)受到了延迟消息

10:55:18:726  INFO 15068 --- [ntContainer#0-1] c.i.mq.listener.SpringRabbitListener     : 消费者接收到了dl.queue的延迟消息
  • 1

四、延迟队列

TTL的实现方式稍微负责,我们可以使用插件利用TTL结合死信交换机,实现消息发出去后,消息者延迟收到消息的效果。这种消息模式就称之为延迟队列(Delay Queue)模式。

1、使用场景

  • 延迟发送短信
  • 用户下单,如果用户在15分钟内未支付,则自动取消
  • 预约工作会议,20分钟后自动通知所有参会人员

2、安装DelayExchange插件

参考:https://blog.csdn.net/Blueeyedboy521/article/details/124001883

3、管理平台使用

DelayExchange插件的原理是对官方原生的Exchange做了功能的升级:

  • 将DelayExchange接收到的消息暂存在内存中(官方的exchange是无法存储消息的)
  • 在DelayExchange中计时,超时后才投递消息到队列中

RabbitMQ的管理平台声明一个DelayExchage

在这里插入图片描述

消息的延迟时间需要在发送消息的时候指定

在这里插入图片描述

4、SpringAMQP使用延迟队列插件

DelayExchange的本质还是官方的三种交换机,只是添加了延迟功能。因此使用时只需要声明一个交换机,交换机的类型可以任意类型,然后设定delayed属性为true即可。

基于注解

/**
     * 死信队列的消费者
     * @param msg
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "delay.queue", durable = "true"),
            exchange = @Exchange(name = "delay.direct", delayed = "true"),
            key = "delay"
    ))
    public void listenDelayExchange(String msg) {
        log.info("消费者接收到了delay.queue的延迟消息");
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

基于Java

在这里插入图片描述

发送测试

@Test
    public void testSendDelayMessage() throws InterruptedException {
        // 1.准备消息
        Message message = MessageBuilder
                .withBody("hello, ttl messsage".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                .setHeader("x-delay", 5000)
                .build();
        // 2.准备CorrelationData
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        // 3.发送消息
        rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData);

        log.info("发送消息成功");
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

.setHeader(“x-delay”, 5000)设置延迟5秒

注意:发送之后虽然消息最终进入了延时队列,但是由于系统根据消息做了延时,会先缓存到内存不发送到队列,导致抛出异常,所以我们需要在ReturnCallback中判断是否延迟,不让这种消息抛异常

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 获取RabbitTemplate对象
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 配置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            // 判断是否是延迟消息
            Integer receivedDelay = message.getMessageProperties().getReceivedDelay();
            if (receivedDelay != null && receivedDelay > 0) {
                // 是一个延迟消息,忽略这个错误提示
                return;
            }
            // 记录日志
            log.error("消息发送到队列失败,响应码:{}, 失败原因:{}, 交换机: {}, 路由key:{}, 消息: {}",
                     replyCode, replyText, exchange, routingKey, message.toString());
            // 如果有需要的话,重发消息
        });
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

5、通过DelayQueue实现延时任务

使用DelayQueue实现延时任务非常简单,而且简便,全部都是标准的JDK代码实现,不用引入第三方依赖(不依赖redis实现、消息队列实现等),非常的轻量级。

它的缺点就是所有的操作都是基于应用内存的,一旦出现应用单点故障,可能会造成延时任务数据的丢失。如果订单并发量非常大,因为DelayQueue是无界的,订单量越大,队列内的对象就越多,可能造成OOM的风险。所以使用DelayQueue实现延时任务,只适用于任务量较小的情况。
参考:完整实现-通过DelayQueue实现延时任务

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/木道寻08/article/detail/930172
推荐阅读
相关标签
  

闽ICP备14008679号