赞
踩
RabbitMQ 提供了一系列的特性和机制来确保消息的可靠性,即确保消息不丢失、按需到达目的地。要实现在 RabbitMQ 中消息的可靠性,可通过以下几个方面进行操作:
消息队列(MQ)的生产者重试机制是指当生产者尝试将消息发送到消息队列中时,如果因为某些原因(如网络问题、队列不可用或其他任何导致消息发送失败的问题)导致消息未能成功发送,那么生产者会根据预设的策略尝试重新发送消息的过程。
修改配置文件即可:
- spring:
- rabbitmq:
- connection-timeout: 1s # 设置MQ的连接超时时间
- template:
- retry:
- enabled: true # 开启超时重试机制
- initial-interval: 1000ms # 失败后的初始等待时间
- multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
- max-attempts: 3 # 最大重试次数
生产者确认机制是一种确保消息从生产者安全达到消息队列的方法,尤其是在使用像RabbitMQ这样的消息中间件时,这种机制尤为重要。生产者确认(Publisher Confirms)也被视为是一种可靠性保障,使得生产者在发送消息后可以知晓该消息是否已被消息队列成功接收。
在不使用生产者确认机制的情况下,生产者将消息发送出去之后,就没有办法知道消息是否正确到达队列。这在某些场景下可能会导致数据丢失。因此,通过实现生产者确认机制,可以显著增加消息系统的可靠性。
RabbitMQ提供的生产者消息确认机制包括Publisher Confirm
和Publisher Return
两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的回执。
当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ack的确认信息,代表投递成功
临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功
其它情况都会返回NACK,告知投递失败
其中ack
和nack
属于Publisher Confirm机制,ack
是投递成功;nack
是投递失败。而return
则属于Publisher Return机制。
默认两种机制都是关闭状态,需要通过配置文件来开启:
- spring:
- rabbitmq:
- publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
- publisher-returns: true # 开启publisher return机制
这里publisher-confirm-type
有三种模式可选:
none
:关闭confirm机制
simple
:同步阻塞等待MQ的回执
correlated
:MQ异步回调返回回执
一般推荐使用correlated模式,兼顾了性能可靠性
ReturnCallback
ReturnCallback
是RabbitMQ中Java客户端库提供的一个接口,用于处理不能被路由到合适队列的消息。每个RabbitTemplate
只能配置一个ReturnCallback
,因此我们可以在配置类中统一设置:
- package com.itheima.publisher.config;
-
- import lombok.AllArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.ReturnedMessage;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.context.annotation.Configuration;
-
- import javax.annotation.PostConstruct;
-
- @Slf4j
- @AllArgsConstructor
- @Configuration
- public class MqConfig {
- private final RabbitTemplate rabbitTemplate;
-
- @PostConstruct
- public void init(){
- rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
- @Override
- public void returnedMessage(ReturnedMessage returned) {
- log.error("触发return callback,");
- log.debug("exchange: {}", returned.getExchange());
- log.debug("routingKey: {}", returned.getRoutingKey());
- log.debug("message: {}", returned.getMessage());
- log.debug("replyCode: {}", returned.getReplyCode());
- log.debug("replyText: {}", returned.getReplyText());
- }
- });
- }
- }

ConfirmCallback
ConfirmCallback
是RabbitMQ在Java客户端库中提供的一个接口,用于处理消息确认情况。当生产者将消息发送到RabbitMQ服务器时,可以通过实现ConfirmCallback
接口来监听这些消息是否成功被服务器接收(acknowledged)或被拒绝(nacked)。这是一种异步通信模式,允许生产者在不阻塞当前线程的情况下获得消息发送后的状态反馈。由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数:
这里的CorrelationData中包含两个核心的东西:
id
:消息的唯一标示,MQ对不同的消息的回执以此做判断,避免混淆
SettableListenableFuture
:回执结果的Future对象
将来MQ的回执就会通过这个Future
来返回,我们可以提前给CorrelationData
中的Future
添加回调函数来处理消息回执:
示例,新建一个测试,向系统自带的交换机发送消息,并且添加ConfirmCallback
:
- @Test
- void testPublisherConfirm() {
- // 1.创建CorrelationData
- CorrelationData cd = new CorrelationData();
- // 2.给Future添加ConfirmCallback
- cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
- @Override
- public void onFailure(Throwable ex) {
- // 2.1.Future发生异常时的处理逻辑,基本不会触发
- log.error("send message fail", ex);
- }
- @Override
- public void onSuccess(CorrelationData.Confirm result) {
- // 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容
- if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执
- log.debug("发送消息成功,收到 ack!");
- }else{ // result.getReason(),String类型,返回nack时的异常描述
- log.error("发送消息失败,收到 nack, reason : {}", result.getReason());
- }
- }
- });
- // 3.发送消息
- rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);
- }

为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置数据持久化,包括:
交换机持久化
队列持久化
消息持久化
在控制台配置数据持久化:
交换机&队列:添加时配置Durability参数为Durable
就是持久化模式,Transient
就是临时模式。
消息:发送消息时配置Dilivery mode参数为persistent即可
在java客户端配置数据持久化:
交换机&队列:创建时配置属性durable为true即可
消息:发送时设置MessageProperties.PERSISTENT_TEXT_PLAIN
属性:
- String message = "Hello World!";
- channel.basicPublish("my_exchange", "routing_key", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
注意:尽管消息标记为持久化,RabbitMQ也不会为每条消息都做fsync操作,因为这样会严重影响性能。RabbitMQ会按照自己的节奏批量地将消息写入磁盘。
在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。但在某些特殊情况下,这会导致消息积压,比如:
消费者宕机或出现网络故障
消息发送量激增,超过了消费者处理速度
消费者处理业务发生阻塞
一旦出现消息堆积问题,RabbitMQ的内存占用就会越来越高,直到触发内存预警上限。此时RabbitMQ会将内存消息刷到磁盘上,这个行为成为PageOut
. PageOut
会耗费一段时间,并且会阻塞队列进程。因此在这个过程中RabbitMQ不会再处理新的消息,生产者的所有请求都会被阻塞。
为了解决这个问题,从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的模式,也就是惰性队列。惰性队列的特征如下:
接收到消息后直接存入磁盘而非内存
消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)
支持数百万条的消息存储
而在3.12版本之后,LazyQueue已经成为所有队列的默认格式。因此官方推荐升级MQ为3.12版本或者所有队列都设置为LazyQueue模式。
控制台配置Lazy模式
在添加队列的时候,添加x-queue-mod=lazy
参数即可设置队列为Lazy模式:
代码配置Lazy模式
在利用SpringAMQP声明队列的时候,添加x-queue-mod=lazy
参数也可设置队列为Lazy模式:
- @Bean
- public Queue lazyQueue(){
- return QueueBuilder
- .durable("lazy.queue")
- .lazy() // 开启Lazy模式
- .build();
- }
这里是通过QueueBuilder
的lazy()
函数配置Lazy模式,底层源码如下:
也可以基于注解来声明队列并设置为Lazy模式:
- @RabbitListener(queuesToDeclare = @Queue(
- name = "lazy.queue",
- durable = "true",
- arguments = @Argument(name = "x-queue-mode", value = "lazy")
- ))
- public void listenLazyQueue(String msg){
- log.info("接收到 lazy.queue的消息:{}", msg);
- }
为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:
ack:成功处理消息,RabbitMQ从队列中删除该消息
nack:消息处理失败,RabbitMQ需要再次投递消息
reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息
一般reject方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过try catch
机制捕获,消息处理成功时返回ack,处理失败时返回nack.
由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:
none
:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
manual
:手动模式。需要自己在业务代码中调用api,发送ack
或reject
,存在业务入侵,但更灵活
auto
:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack
. 当业务出现异常时,根据异常判断返回不同结果:
如果是业务异常,会自动返回nack
;
如果是消息处理或校验异常,自动返回reject
;
通过下面的配置可以修改SpringAMQP的ACK处理方式:
- spring:
- rabbitmq:
- listener:
- simple:
- acknowledge-mode: none # 不做处理
当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。
极端情况就是消费者一直无法执行成功,那么消息requeue就会无限循环,导致mq的消息处理飙升,带来不必要的压力:
当然,上述极端情况发生的概率还是非常低的,不过不怕一万就怕万一。为了应对上述情况Spring又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。
修改consumer服务的application.yml文件,添加内容:
- spring:
- rabbitmq:
- listener:
- simple:
- retry:
- enabled: true # 开启消费者失败重试
- initial-interval: 1000ms # 初识的失败等待时长为1秒
- multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
- max-attempts: 3 # 最大重试次数
- stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
在之前的测试中,本地测试达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。
因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery
接口来定义的,它有3个不同实现:
RejectAndDontRequeueRecoverer
:重试耗尽后,直接reject
,丢弃消息。默认就是这种方式
ImmediateRequeueMessageRecoverer
:重试耗尽后,返回nack
,消息重新入队
RepublishMessageRecoverer
:重试耗尽后,将失败消息投递到指定的交换机
比较优雅的一种处理方案是RepublishMessageRecoverer
,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。
定义处理失败消息的交换机和队列,定义一个RepublishMessageRecoverer,关联队列和交换机:
- package com.itheima.consumer.config;
-
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.DirectExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.amqp.rabbit.retry.MessageRecoverer;
- import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
- import org.springframework.context.annotation.Bean;
-
- @Configuration
- @ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
- public class ErrorMessageConfig {
- @Bean
- public DirectExchange errorMessageExchange(){
- return new DirectExchange("error.direct");
- }
- @Bean
- public Queue errorQueue(){
- return new Queue("error.queue", true);
- }
- @Bean
- public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
- return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
- }
-
- @Bean
- public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
- return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
- }
- }

何为幂等性?
幂等是一个数学概念,用函数表达式来描述是这样的:f(x) = f(f(x))
,例如求绝对值函数。
在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。例如:
根据id删除数据
查询数据
新增数据
但数据的更新往往不是幂等的,如果重复执行可能造成不一样的后果。比如:
取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增加的情况
退款业务。重复退款对商家而言会有经济损失。
所以,我们要尽可能避免业务被重复执行。
然而在实际业务场景中,由于意外经常会出现业务被重复执行的情况,例如:
页面卡顿时频繁刷新导致表单重复提交
服务间调用的重试
MQ消息的重复投递
因此,我们必须想办法保证消息处理的幂等性。这里给出两种方案:
唯一消息ID
这个思路非常简单:
每一条消息都生成一个唯一的id,与消息一起投递给消费者。
消费者接收到消息后处理业务,业务处理成功后将消息ID保存到数据库
如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。
我们该如何给消息添加唯一ID呢?
其实很简单,SpringAMQP的MessageConverter自带了MessageID的功能,我们只要开启这个功能即可。
以Jackson的消息转换器为例:
- @Bean
- public MessageConverter messageConverter(){
- // 1.定义消息转换器
- Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
- // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
- jjmc.setCreateMessageIds(true);
- return jjmc;
- }
业务状态判断
业务判断就是基于业务本身的逻辑或状态来判断是否是重复的请求或消息,不同的业务场景判断的思路也不一样。
例如我们当前案例中,处理消息的业务逻辑是把订单状态从未支付修改为已支付。因此我们就可以在执行业务时判断订单状态是否是未支付,如果不是则证明订单已经被处理过,无需重复处理。
相比较而言,消息ID的方案需要改造原有的数据库,所以更推荐使用业务判断的方案。
虽然我们利用各种机制尽可能增加了消息的可靠性,但也不好说能保证消息100%的可靠。万一真的MQ通知失败该怎么办呢?
有没有其它兜底方案,能够确保订单的支付状态一致呢?
其实思想很简单:既然MQ通知不一定发送到消费者,那就消费者定时去查询相关生产者状态,这样即便MQ通知失败,还可以利用定时任务作为兜底方案,确保状态的最终一致性。
而这就需要使用到延迟消息的技术:
延迟消息是指在消息队列(MQ)系统中允许消息在一定时间后才被消费者消费的消息。这意味着,消息被发送到消息队列后,并不会立即被消费者处理,而是在指定的延迟时间之后才可用于消费。这种机制允许开发者在设计系统时,能够处理那些需要延迟执行的任务,例如定时任务、延迟通知、过期订单的处理等。
在RabbitMQ中实现延迟消息也有两种方案:
死信交换机+TTL
延迟消息插件
首先,什么是死信?
当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):
消费者使用basic.reject
或 basic.nack
声明消费失败,并且消息的requeue
参数设置为false
消息是一个过期消息,超时无人消费
要投递的队列消息满了,无法投递
那么,什么是死信交换机呢?
如果一个队列中的消息已经成为死信,并且这个队列通过dead-letter-exchange
属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange)。
TTL(Time-To-Live,生存时间):是指消息或队列存在的时长。
那么如何通过死信交换机+TTL实现延迟消息呢?
设想一下这样的场景:
如图,有一组绑定的交换机(ttl.fanout
)和队列(ttl.queue
)。但是ttl.queue
没有消费者监听,而是设定了死信交换机hmall.direct
,而队列direct.queue1
则与死信交换机绑定,RoutingKey是blue:
假如我们现在发送一条消息到ttl.fanout
,RoutingKey为blue,并设置消息的有效期为5000毫秒,消息肯定会被投递到ttl.queue
之后,由于没有消费者,因此消息无人消费。5秒之后,消息的有效期到期,成为死信,死信被再次投递到死信交换机hmall.direct
,并沿用之前的RoutingKey,也就是blue,
由于direct.queue1
与hmall.direct
绑定的key是blue,因此最终消息被成功路由到direct.queue1
,如果此时有消费者与direct.queue1
绑定, 也就能成功消费消息了。但此时已经是5秒钟以后了:
也就是说,publisher发送了一条消息,但最终consumer在5秒后才收到消息。我们成功实现了延迟消息。
基于死信队列虽然可以实现延迟消息,但是太麻烦了。因此RabbitMQ社区提供了一个延迟消息插件来实现相同的效果。
插件下载地址:rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ (github.com)
下载好后将插件上传到RabbitMQ的插件目录对应的数据卷,然后执行命令安装插件(docker):
docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
在java客户端声明延迟交换机
基于注解方式:
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "delay.queue", durable = "true"),
- exchange = @Exchange(name = "delay.direct", delayed = "true"),
- key = "delay"
- ))
- public void listenDelayMessage(String msg){
- log.info("接收到delay.queue的延迟消息:{}", msg);
- }
基于@Bean
的方式:
- package com.itheima.consumer.config;
-
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.*;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Slf4j
- @Configuration
- public class DelayExchangeConfig {
-
- @Bean
- public DirectExchange delayExchange(){
- return ExchangeBuilder
- .directExchange("delay.direct") // 指定交换机类型和名称
- .delayed() // 设置delay的属性为true
- .durable(true) // 持久化
- .build();
- }
-
- @Bean
- public Queue delayedQueue(){
- return new Queue("delay.queue");
- }
-
- @Bean
- public Binding delayQueueBinding(){
- return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");
- }
- }

发送延迟消息
发送消息时,必须通过x-delay属性设定延迟时间:
- @Test
- void testPublisherDelayMessage() {
- // 1.创建消息
- String message = "hello, delayed message";
- // 2.发送消息,利用消息后置处理器添加消息头
- rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
- @Override
- public Message postProcessMessage(Message message) throws AmqpException {
- // 添加延迟消息属性
- message.getMessageProperties().setDelay(5000);
- return message;
- }
- });
- }
注意:
延迟消息插件内部会维护一个本地数据库表,同时使用Elang Timers功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的CPU开销,同时延迟消息的时间会存在误差。
因此,不建议设置延迟时间过长的延迟消息。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。