赞
踩
本文将从三个方面详细介绍在使用RabbitMQ时如何确保消息不丢失的方法:
- @Bean
- public Queue myQueue() {
- return new Queue("queue-name", true, true, false);
- }
- @Bean
- public DirectExchange myExchange() {
- return new DirectExchange("exchange-name", true, false);
- }
- Message message = MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
- rabbitTemplate.convertAndSend("exchange-name", "routing-key", message);
我们需要手动开启 Confirm 机制,可以在配置文件中添加以下配置:
- spring:
- rabbitmq:
- publisher-confirm-type: correlated # MQ异步回调方式接收回执消息,效率高于simple
- # publisher-confirm-type: none # 关闭confirm机制
- # publisher-confirm-type: simple # 同步阻塞并等待MQ的回执消息,效率很低
- publisher-returns: true
- rabbitTemplate.setConfirmCallback((CorrelationData correlationData, boolean ack, String cause) -> {
- if (ack) {
- log.info("message delivery to exchange, message id: {}", correlationData.getId());
- // 处理消息确认
- } else {
- log.error("message not delivery to exchange, cause: {}", cause);
- // 处理消息未确认
- }
- });
- rabbitTemplate.setReturnsCallback(returnedMessage -> {
- log.error("message routing failed: exchange({}), routing({}), replyCode({}), replyText({}), message({})",
- returnedMessage.getExchange(), returnedMessage.getRoutingKey(), returnedMessage.getReplyCode(),
- returnedMessage.getReplyText(), returnedMessage.getMessage());
- // 处理消息发送到Queue失败
- });
RabbitMQ消费者处理消息成功后可以向MQ发送 ack 回执,MQ收到 ack 后会在队列中删除该消息,从而确保消息不会丢失。若消费者在处理消息中出现异常,则会发送 nack 回执,MQ收到 nack 后会再次投递消息。
AMQP的 ack 回执处理方式有以下几种:
最好使用手动 ack 回执,这样消费者在处理完消息后,可以决定是否确认收到消息。如果在处理消息时发生错误,可以选择重新将消息放回队列,或者拒绝这条消息,这样可以防止丢失并且减少重复处理的情况。
- spring:
- rabbitmq:
- listener:
- simple:
- acknowledge-mode: manual #手动ack
- retry:
- enabled: true #开启重试机制
虽然我们进行消息持久化机制、生产者 Confirm 异步回调机制、消费者手动 ack 回执机制,等一系列操作,但是由于 RabbitMQ 的持久化过程是异步的,所以无法保证消息在传递过程中做到100%不丢失。
若业务需要做到消息100%不丢失,可以引入本地消息表,通过轮询(或其他方式)的方式来进行消息的重新投递。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。