当前位置:   article > 正文

RabbitMQ高级特性-消息可靠性_listener.simple.prefetch

listener.simple.prefetch

一、常见问题

在这里插入图片描述

  • 消息可靠性
  • 延迟消息问题
  • 高可用问题
  • 消息堆积问题

二、消息可靠性问题

消息从生产者发送到exchange,再到queue,再到消费者,有哪些导致消息丢失的可能性?
在这里插入图片描述

  • 发送时丢失
    • 生产者发送的消息味道大exchange
    • 消息到达exchange首未到达queue
  • MQ宕机,queue将消息丢失
  • cunsumer接收到消息后未消费就宕机

三、生产者消息确认

RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。结果有两种请求:

  • publisher-confirm,发送者确认
    • 消息成功投递到交换机,返回ack
    • 消息为投递到交换机,返回nack
  • publisher-return,发送者回执
    • 消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败的原因

在这里插入图片描述

确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同消息,避免ack冲突。

1、配置yml

在publisher服务的application.yml中添加配置

spring:
  rabbitmq:
    host: rabbitmq # rabbitMQ的ip地址
    port: 5672 # 端口
    username: demo
    password: 123456
    virtual-host: /
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

配置说明

  • publisher-confirm-type:开启publisher-confirm,这里支持两种类型
    • simple:同步等待confirm结果,知道超时
    • correlated:异步回调,定义ConfirmCallback,MQ返回结果时会调用这个ConfirmCallback
  • publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
  • template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false,则直接丢弃消息

2、配置ReturnCallback(消息发送到队列成功/失败)

每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动的过程之,配置:

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 获取RabbitTemplate对象
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 配置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            // 判断是否是延迟消息
            Integer receivedDelay = message.getMessageProperties().getReceivedDelay();
            if (receivedDelay != null && receivedDelay > 0) {
                // 是一个延迟消息,忽略这个错误提示
                return;
            }
            // 记录日志
            log.error("消息发送到队列失败,响应码:{}, 失败原因:{}, 交换机: {}, 路由key:{}, 消息: {}",
                     replyCode, replyText, exchange, routingKey, message.toString());
            // 如果有需要的话,重发消息
        });
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

3、配置ConfirmCallback(消息投递到交换机成功/失败)

发送消息,指定消息ID,消息ConfirmCallback

@Test
    public void testSendMessage2SimpleQueue() throws InterruptedException {
        // 1.准备消息
        String message = "hello, spring amqp!";
        // 2.准备CorrelationData
        // 2.1.消息ID
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        // 2.2.准备ConfirmCallback
        correlationData.getFuture().addCallback(result -> {
            // 判断结果
            if (result.isAck()) {
                // ACK
                log.debug("消息成功投递到交换机!消息ID: {}", correlationData.getId());
            } else {
                // NACK
                log.error("消息投递到交换机失败!消息ID:{}", correlationData.getId());
                // 重发消息
            }
        }, ex -> {
            // 记录日志
            log.error("消息发送失败!", ex);
            // 重发消息
        });
        // 3.发送消息
        rabbitTemplate.convertAndSend("amq.topic", "a.simple.test", message, correlationData);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

4、总结

在这里插入图片描述

四、消息持久化

MQ默认是内存存储消息,开启持久化功能可以确保缓存在MQ的消息不丢失

1、交换机持久化

@Bean
public TopicExchange topicExchange(){
     // 参数介绍
     // 1.交换器名 2.是否持久化 3.自动删除 4.其他参数
     return new TopicExchange("amq.topic",true,false,null);
 }
@Bean
public DirectExchange simpleDirect(){
    return new DirectExchange("simple.direct");
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

2、队列持久化

@Bean
public Queue queue(){
    // 参数介绍
    // 1.队列名 2.是否持久化 3.是否独占 4.自动删除 5.其他参数
    return new Queue("simple.queue",true,false,false,null);
}
// 或者
@Bean
public Queue queue(){
    return QueueBuilder.durable("simple.queue").build();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

3、消息持久化

队列持久化只会保证队列在mq重启不丢失,但是如果是用rabbit网页publish的消息重启会丢失。
但是SpringAMQP中的消息默认是持久的。当然也可以显示的通过MessageProperties中的DeliveryMode来指定:

// 1.准备消息
Message message = MessageBuilder.withBody("hello, spring".getBytes(StandardCharsets.UTF_8))
        // 持久化消息
        .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
        .build();
// 2.发送消息
rabbitTemplate.convertAndSend("simple.queue", message);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

其实不需要上面显示指定,因为SpringAMQP中默认创建的队列,交换机,消息都是持久的

在这里插入图片描述

4、绑定

@Bean
public Binding bingExchange(){
    // 绑定队列
    return BindingBuilder.bind(queue())
            // 队列绑定到哪个交换器
            .to(topicExchange())
            // 绑定路由key,必须指定
            .with("simple.#");
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

五、消费者消息确认

RabbitMQ支持消费者确认机制,即:消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息。
而SpringAMQP则允许配置三种确认模式:

  • manaul:手动ack,需要在业务代码结束后,调用api发送ack.
  • auto:自动ack,有Springj检测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
  • none:关闭ack,MQ假定消费者获取消息后会处成功处理,因此消息投递后立即被删除

1、配置方式

spring:
  rabbitmq:
    host: 192.168.150.101 # rabbitMQ的ip地址
    port: 5672 # 端口
    username: demo
    password: 123321
    virtual-host: /
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: auto # none,关闭ack; manual,手动ack; auto: 自动ack
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

2、监听

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
    log.debug("消费者接收到simple.queue的消息:【" + msg + "】");
    System.out.println(1 / 0);
    log.info("消费者处理消息成功!");
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

六、消息失败重试机制

当消费者出现异常后,消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力:
在这里插入图片描述
我们可以利用Spring的retry机制,在消费者出现异常时,利用本地重试,而不是无限制的requeue到mq队列。

1、配置

spring:
  rabbitmq:
    host: rabbitmq # rabbitMQ的ip地址
    port: 7105 # 端口
    username: hikinner
    password: hikinner
    virtual-host: /
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: auto
        retry:
          enabled: true #开启消费者失败重试
          initial-interval: 1000 # 初始的失败等待时长为1秒
          multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态; false有状态。如果业务中包含事务,这里改为false
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

重试次数耗尽之后报错,并丢弃,因为也没必要再回到队列了

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Retry Policy Exhausted
  • 1

七、消费者失败消息处理策略

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

1、失败时把消息投递到error的队列

使用RepublishMessageRecoverer策略
在这里插入图片描述

2、配置

@Configuration
public class ErrorMessageConfig {

    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }

    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue");
    }

    @Bean
    public Binding errorMessageBinding(){
        return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
    }

    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

3、启动测试

发布一条消息之后,可以发现重试了3次之后,消息被投递到error.direct
在这里插入图片描述

在rabbitMQ的error.queue中看到如下,不但记录了该消息,还记录了异常信息
在这里插入图片描述

八、代码

仓库:https://gitee.com/edevp/mq-advanced-demo

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/空白诗007/article/detail/1018359
推荐阅读
相关标签
  

闽ICP备14008679号