赞
踩
软件 | 版本 |
---|---|
Spring Boot | 2.5.0 |
spring-boot-starter-amqp | 2.5.0 |
Erlang | 24.0 |
RabbitMQ | 3.8.19 |
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: 123456
virtual-host: 'example'
# 生产者 ==>> Exchange 确认方式
publisher-confirm-type: correlated
# Exchange ==>> Queue
publisher-returns: true
Use RabbitTemplate#waitForConfirms() (or waitForConfirmsOrDie() within scoped operations
。经测试发现,会触发回调方法,在回调方法中可以做自己的业务逻辑。还可以在发布消息的同步线程中调用 waitForConfirms
或者 waitForConfirmsOrDie
方法等待返回结果来进行下一步操作。public void send() { Demo demo = Demo.builder() .name("大漠知秋") .age(25) .height(new BigDecimal("175")) .build(); CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString().toUpperCase(Locale.ROOT)); Boolean invokeResult = rabbitTemplate.invoke( operations -> { rabbitTemplate.convertAndSend("example", "example", demo, correlationData); boolean flag = false; try { flag = rabbitTemplate.waitForConfirms(5000); log.info("等待结果:${}$", flag); } catch (Exception e) { e.printStackTrace(); } return flag; }, (deliveryTag, multiple) -> { log.info("【SIMPLE waitForConfirms 形式】- deliveryTag:${}$, multiple:${}$", deliveryTag, multiple); }, (deliveryTag, multiple) -> { log.info("【SIMPLE waitForConfirms 形式】- deliveryTag:${}$, multiple:${}$", deliveryTag, multiple); } ); }
上方代码块(前者)的 9 - 27 行也可以使用 rabbitTemplate.convertAndSend("example", "example", demo, correlationData);(后者)
直接发送,上方使用的是带等待结果的形式,建议直接使用后者
。
后者
使用起来对 publisher-confirm-type
没有要求,前者
对 publisher-confirm-type
的要求不同,如下:
rabbitTemplate#waitForConfirms
会报错。下方代码块中的 rabbitTemplate#setConfirmCallback
ack 为 false。rabbitTemplate#waitForConfirms
正常,下方代码块中的 rabbitTemplate#setConfirmCallback
ack 正常。rabbitTemplate#waitForConfirms
正常,并且 rabbitTemplate#setConfirmCallback
第二个、第三个参数也会正常回调。下方代码块中的 rabbitTemplate#setConfirmCallback
ack 正常。@Configuration @Slf4j public class AutoRabbitMqConfiguration { @Resource private CachingConnectionFactory connectionFactory; @Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); // 消息是否成功发送到 Exchange rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { log.debug("【发送消息到 Exchange】- 成功了。correlationData:${}$", correlationData); } else { log.warn("【发送消息到 Exchange】- 失败了。correlationData ID:${}$,returned:${}$,cause:${}$", correlationData, null != correlationData ? correlationData.getReturned() : "空", cause); } }); // 触发 setReturnCallback 回调必须设置 mandatory=true, 否则 Exchange 没有找到 Queue 就会丢弃掉消息, 而不会触发回调 rabbitTemplate.setMandatory(true); // 消息是否从 Exchange 路由到 Queue, 注意: 这是一个失败回调, 只有消息从 Exchange 路由到 Queue 失败才会回调这个方法 rabbitTemplate.setReturnsCallback(returned -> { log.warn("【从 Exchange 发送消息到 Queue】- 失败了。returned:${}$", returned); }); return rabbitTemplate; } }
生产者 ==>> Exchange
的回调确认,当 publisher-confirm-type
配置为 CORRELATED
或者 SIMPLE
时都会在此处进行回调。Exchange ==>> Queue
的回调,只有在失败的时候才会进行此处回调。注意 rabbitTemplate#setMandatory
。通过以上的回调方法,我们可以在发消息前对消息做个状态标记,在回调结果中无论成功或者失败,就把标记更改为对应的状态。如果失败我们还可以把失败的消息放到死信队列、放到数据库、放到 Redis 等等,后续操作是定时任务轮询或者事件触发就根据业务来选择了。
spring: rabbitmq: host: localhost port: 5672 username: admin password: 123456 virtual-host: 'example' # 生产者 ==>> Exchange 确认方式 publisher-confirm-type: correlated # Exchange ==>> Queue publisher-returns: true listener: simple: # ACK 模式,此处选择手动 ACK acknowledge-mode: manual # 每次处理 100 条消息 prefetch: 100 # 决定由于监听器抛出异常而拒绝的消息是否被重新放回队列。默认值为 true,重新放回队列。这里设置为 false,如果多次重试还是异常就转发到死信队列 default-requeue-rejected: true retry: # 开启重试 enabled: true # 最大重试 3 次,涵盖当前次 max-attempts: 3 # 每次重试间隔 initial-interval: 3000 # 最大允许重试时间 max-interval: 30000 # 下一次重试的时间间隔 = 上次重试时间间隔 * multiplier multiplier: 2
@RabbitListener(queues = "example")
public void consume(Message message, Channel channel) throws Exception {
log.info("进入消费了");
TimeUnit.SECONDS.sleep(3);
log.info("消费完毕了");
}
配置改成手动确认 Ack,这里没有进行手动确认操作,这就会造成消费的所有消息 Unacked
:
断开连接,重新建立连接即可把消息恢复成正常的
Ready
。
@RabbitListener(queues = "example")
public void consume(Message message, Channel channel) throws Exception {
log.info("进入消费了");
int i = MyRandomUtils.nextInt(1, 100);
if (i % 2 == 1) {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} else {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
TimeUnit.SECONDS.sleep(3);
log.info("消费完毕了");
}
如果一直走 channel#basicNack
会出现如下无限循环消费的场景,这是因为被 Nack 的消息又重新进行投递了:
channel#basicNack(Nack 重新投递消息) 的三个参数说明
Connection
,每一个消费者都会与 RabbitMQ 服务端建立一个 Channel
,RabbitMQ 服务端会向对应的消费者推送消息,这个消息携带了一个 deliveryTag
,它代表了 RabbitMQ 服务端向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,deliveryTag
的范围仅限于 Channel
。deliveryTag
小于等于传入值的所有消息。否则只处理当前这条消息。Queue
中。如果为 false 那这条消息将丢弃了。
channel#basicAck
的参数同理,只是这里是 Ack(正常消费消息)。
手动 Ack 模式正确的做法应该是自己把异常捕获掉,正常就 Ack,异常就 Nack,也可以扩展出现 N 次 Nack 之后就入库或者其他地方保存,就别在 Nack 了,不然都成死循环了。这里只是延伸一下如果把异常抛出去会怎样?
@RabbitListener(queues = "example")
public void consume(Message message, Channel channel) throws Exception {
log.info("进入消费了");
if (1 == 1) {
throw new Exception();
}
TimeUnit.SECONDS.sleep(3);
log.info("消费完毕了");
}
消息卡住,断开连接后消息恢复 Ready
与 default-requeue-rejected=false
时一样。
也由此可以推断出,在
acknowledge-mode=manual
、抛出异常时,default-requeue-rejected=true
的设置值并不能决定消息是否重新放入 Queue。
org.springframework.amqp.rabbit.retry.MessageRecoverer
在经过一番查找之后发现在最后一次重试之后如果还出现了异常,会调用 org.springframework.retry.support.RetryTemplate#handleRetryExhausted
方法,进而调用 org.springframework.amqp.rabbit.retry.MessageRecoverer#recover
方法,如下:
而 MessageRecoverer
的具体实现有以下几个:
现在就是要确定使用的是哪一个实现,最终追踪代码发现是 org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer
:
本着刨根问底的心态,为什么就是 RejectAndDontRequeueRecoverer
?为什么不是其他几个实现,继续向上探究 RejectAndDontRequeueRecoverer
是怎么来的,找到了这里 org.springframework.boot.autoconfigure.amqp.AbstractRabbitListenerContainerFactoryConfigurer#configure
:
显然 this.messageRecoverer
是 NULL
,所以直接 new 了一个 RejectAndDontRequeueRecoverer
。那 this.messageRecoverer
为什么为 NULL 呢?再往上追,发现 org.springframework.boot.autoconfigure.amqp.RabbitAnnotationDrivenConfiguration#simpleRabbitListenerContainerFactoryConfigurer
方法中给 this.messageRecoverer 赋值的:
那这里的 this.messageRecoverer
是个啥?
发现是个 ObjectProvider
,ObjectProvider
自行了解,确实项目中并没有配置 org.springframework.amqp.rabbit.retry.MessageRecoverer
的实现类为 Bean,所以这里就注入的也就没有,继而引发了后续代码的 new RejectAndDontRequeueRecoverer()
。
所以想解决这个问题也简单,只需使用对应的 MessageRecoverer
实现,把它生成 Bean 交由 Spring 管理即可。
言归正传,还回到 RejectAndDontRequeueRecoverer
类中,进到类中发现 recover
仅仅只是打印了下 warn
级别日志,再就是抛出异常:
可以看到这里抛出了异常 ListenerExecutionFailedException
,再继续跟踪代码发现此异常最终会被方法 org.springframework.amqp.rabbit.listener.BlockingQueueConsumer#rollbackOnExceptionIfNecessary
拿来处理后续是否需要把抛错的消息重新入 Queue 操作,如下:
783
行做了一些判断,结果 ackRequired 影响着 792
行是否会进入 Nack 逻辑,先看这些判断。
只要 acknowledge-mode
不是配置的 NONE
就不会为 false。所以这里根据上下文是:true
只要 acknowledge-mode
不是配置的 MANUAL
就不会为 false。所以这里根据上下文是:false
根据前文已经知道,抛的异常是 ListenerExecutionFailedException
,所以这里肯定返回:false
所以最终结果是:true && (false || false) = false
。所以也就无法走后续的 Nack 逻辑。进而导致以下一直 Unacked 的局面:
org.springframework.amqp.rabbit.retry.ImmediateRequeueMessageRecoverer
换上 MessageRecoverer
的另一个实现 ImmediateRequeueMessageRecoverer
测试一下。查看 recover
方法:
也是打印日志、抛异常,只是抛的异常类 ImmediateRequeueAmqpException
不一样。先说结论:ImmediateRequeueMessageRecoverer
实现也不会在 rollbackOnExceptionIfNecessary
方法中进行 Nack,但是在 rollbackOnExceptionIfNecessary
执行之后会进行重启。
先把代码定位到 rollbackOnExceptionIfNecessary
的调用者 org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java:1010
:
继续把异常 ImmediateRequeueAmqpException
向上抛,由于中间一直没有能正常捕获此异常的 catch,所以直接就抛到了上图中下面的红框截图上:
继续向下走,打一下消费者异常日志,然后进入方法 org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#killOrRestart
:
就是在此处进行了断开连接,closeChannel
执行完会释放 Unacked 的消息,变成 Ready。closeConnection
不会关闭,因为内部就不会关闭成功:
这里是并不是通过 Nack 进行消息重新投递,而是通过关闭对应的 Channel 做到消息重新投递,这样可以一直无限循环消费此消息,但也会存在问题:后续消息都堵塞住了
回到原来的的 RejectAndDontRequeueRecoverer
,为什么它就没走这一套关闭连接的流程呢?回到调用 rollbackOnExceptionIfNecessary
方法处:
此时抛出的异常为:org.springframework.amqp.rabbit.support.ListenerExecutionFailedException
,此异常在 org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#mainLoop
已被捕获:
之后也没有存在有效的处理,就继续监听下一个消息了:
org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer
与前两个实现的区别主要在于 recover
方法上:
这里做了向其他队列转发的操作,但是没有抛异常,所以也就不会走 rollbackOnExceptionIfNecessary
以及其之后的逻辑。但是他也没有 Ack 或者 Nack 当前出问题的这条消息,所以会出现原消息一直 Unacked
,一旦断开连接又会恢复 Ready
,被转发的那个队列也会存在这条消息。
所以在 acknowledge-mode=manual
时,default-requeue-rejected
是什么根本无关紧要,消息是不是会重新投递到 Queue 取决于 MessageRecoverer
的实现。
Unacked
状态,当断开连接时,消息会转为 Ready
,继续等待被消费Unacked
状态,当断开连接时,消息会转为 Ready
,继续等待被消费Queue
中spring: rabbitmq: host: localhost port: 5672 username: admin password: 123456 virtual-host: 'example' # 生产者 ==>> Exchange 确认方式 publisher-confirm-type: correlated # Exchange ==>> Queue publisher-returns: true listener: simple: # ACK 模式,此处选择手动 ACK acknowledge-mode: auto # 每次处理 100 条消息 prefetch: 100 # 决定由于监听器抛出异常而拒绝的消息是否被重新放回队列。默认值为 true,重新放回队列。这里设置为 false,如果多次重试还是异常就转发到死信队列 default-requeue-rejected: false retry: # 开启重试 enabled: true # 最大重试 3 次,涵盖当前次 max-attempts: 3 # 每次重试间隔 initial-interval: 3000 # 最大允许重试时间 max-interval: 30000 # 下一次重试的时间间隔 = 上次重试时间间隔 * multiplier multiplier: 2
@RabbitListener(queues = "example")
public void consume(Message message) throws Exception {
log.info("进入消费了");
TimeUnit.SECONDS.sleep(3);
log.info("消费完毕了");
}
依照上面 yaml
的配置,这里会进行总共三次消费,但是都会报错,所以三次之后会走异常处理逻辑。
大部分内容已经说过,这里不再赘述,直接从 org.springframework.amqp.rabbit.listener.BlockingQueueConsumer#rollbackOnExceptionIfNecessary
开始:
这里与上面不同的是 !this.acknowledgeMode.isAutoAck()
和 !this.acknowledgeMode.isAutoAck()
永远都是 true
,所以 ackRequired
的值就是 true
,那下面的代码就与 acknowledge-mode=manual
时走的代码分支不一样了:
这里的关键点就落在了 ContainerUtils.shouldRequeue(this.defaultRequeueRejected, ex, logger)
方法上,进去查看:
default-requeue-rejected
,但是他起不到决定性作用 注释写的也比较清楚,这个方法是用来确定消息是否应该重新投递;如果可抛出对象为 MessageRejectedWhileStoppingException
或 defaultRequeueRejected
为 true 并且在异常原因链中没有 AmqpRejectAndDontRequeueException
或者在异常原因链中有 ImmediateRequeueAmqpException
,则返回true。
所以这里的 defaultRequeueRejected
用处只能说是有点用,但具体的还是要看抛的什么异常,如果是 AmqpRejectAndDontRequeueException
异常,那不好意思,这个消息就没了,不会重新投递到 Queue 了。如果是 ImmediateRequeueAmqpException
或者是其他自定义异常,都是可以重新投递到 Queue 中的。
会重新进行投递消息到 Queue,后续代码把 Channel 断开并重连。整体流程与 acknowledge-mode=manual
不差啥。
此类的实现方法 org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer#recover
并没有抛出异常,所以也就不会走到 org.springframework.amqp.rabbit.listener.BlockingQueueConsumer#rollbackOnExceptionIfNecessary
,最终会把消息投递到其他指定队列,原消息消费完毕正常 Ack。
详细 Ack 方法见 org.springframework.amqp.rabbit.listener.BlockingQueueConsumer#commitIfNecessary
:
这里 ackRequired
的结果肯定为 true
,就会走下方的 Ack 方法。
所以在 acknowledge-mode=auto
时,default-requeue-rejected
是什么以及其起到的作用要受 MessageRecoverer
的实现影响,消息是不是会重新投递到 Queue 取决于 MessageRecoverer
的实现。如果是自己实现 MessageRecoverer
接口,那 default-requeue-rejected
的设置是生效的。
Queue
中spring:
rabbitmq:
listener:
simple:
retry:
# 开启重试
enabled: true
# 最大重试 3 次,涵盖当前次
max-attempts: 3
# 每次重试间隔
initial-interval: 3000
# 最大允许重试时间
max-interval: 5000
# 下一次重试的时间间隔 = 上次重试时间间隔 * multiplier
multiplier: 2
initial-interval
并不大于 max-interval
,所以按照 initial-interval
的间隔时间来。上次重试时间间隔 * multiplier
,也就是 6 秒,显然 6 秒已经大于 max-interva
,所以这次的重试间隔时间是 5 秒。spring: rabbitmq: host: localhost port: 5672 username: admin password: 123456 virtual-host: 'example' # 生产者 ==>> Exchange 确认方式 publisher-confirm-type: correlated # Exchange ==>> Queue publisher-returns: true listener: simple: # ACK 模式,此处选择手动 ACK acknowledge-mode: auto # 每次处理 100 条消息 prefetch: 100 # 决定由于监听器抛出异常而拒绝的消息是否被重新放回队列。默认值为 true,重新放回队列。这里设置为 false,如果多次重试还是异常就转发到死信队列 default-requeue-rejected: false retry: # 开启重试 enabled: true # 最大重试 3 次,涵盖当前次 max-attempts: 3 # 每次重试间隔时间 initial-interval: 3000 # 最大允许重试间隔时间,用来限制 initial-interval。 max-interval: 10000 # 下一次重试的时间间隔 = 上次重试时间间隔 * multiplier multiplier: 2
生产端,一定要重写 org.springframework.amqp.rabbit.core.RabbitTemplate#setConfirmCallback
和 org.springframework.amqp.rabbit.core.RabbitTemplate#setReturnsCallback
,前者保证消息 生产者 ==>> Exchange
,后者保证消息 Exchange ==>> Queue
,可以根据这两处来对消息进行标记是否正常发送,可以存库。如果有多次发送的情况产生,同时也要记得处理幂等问题。
消费端 acknowledge-mode: auto
、default-requeue-rejected: false
即可,这个前提就保证了消息不会再次重新投递到原来的队列。
MessageRecoverer
,在 recover
方法中是入库或者是发送到其他 Queue 都可,不抛出异常,这样最终消息会被 Ack,消息也被存入库中或者是发送到其他 Queue 并等待后续处理。MessageRecoverer
,默认会使用 RejectAndDontRequeueRecoverer
,最终会进行 Nack 并且 No Requeue
,在 RabbitMQ 服务端为每个业务队列都配置死信队列,把消息最终都收集到死信队列中。Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。