赞
踩
消息从生产者发送到exchange,再到queue,再到消费者,有哪些导致消息丢失的可能性?
RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。结果有两种请求:
确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同消息,避免ack冲突。
在publisher服务的application.yml中添加配置
spring:
rabbitmq:
host: rabbitmq # rabbitMQ的ip地址
port: 5672 # 端口
username: demo
password: 123456
virtual-host: /
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
配置说明
每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动的过程之,配置:
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 获取RabbitTemplate对象
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 配置ReturnCallback
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
// 判断是否是延迟消息
Integer receivedDelay = message.getMessageProperties().getReceivedDelay();
if (receivedDelay != null && receivedDelay > 0) {
// 是一个延迟消息,忽略这个错误提示
return;
}
// 记录日志
log.error("消息发送到队列失败,响应码:{}, 失败原因:{}, 交换机: {}, 路由key:{}, 消息: {}",
replyCode, replyText, exchange, routingKey, message.toString());
// 如果有需要的话,重发消息
});
}
}
发送消息,指定消息ID,消息ConfirmCallback
@Test
public void testSendMessage2SimpleQueue() throws InterruptedException {
// 1.准备消息
String message = "hello, spring amqp!";
// 2.准备CorrelationData
// 2.1.消息ID
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 2.2.准备ConfirmCallback
correlationData.getFuture().addCallback(result -> {
// 判断结果
if (result.isAck()) {
// ACK
log.debug("消息成功投递到交换机!消息ID: {}", correlationData.getId());
} else {
// NACK
log.error("消息投递到交换机失败!消息ID:{}", correlationData.getId());
// 重发消息
}
}, ex -> {
// 记录日志
log.error("消息发送失败!", ex);
// 重发消息
});
// 3.发送消息
rabbitTemplate.convertAndSend("amq.topic", "a.simple.test", message, correlationData);
}
MQ默认是内存存储消息,开启持久化功能可以确保缓存在MQ的消息不丢失
@Bean
public TopicExchange topicExchange(){
// 参数介绍
// 1.交换器名 2.是否持久化 3.自动删除 4.其他参数
return new TopicExchange("amq.topic",true,false,null);
}
@Bean
public DirectExchange simpleDirect(){
return new DirectExchange("simple.direct");
}
@Bean
public Queue queue(){
// 参数介绍
// 1.队列名 2.是否持久化 3.是否独占 4.自动删除 5.其他参数
return new Queue("simple.queue",true,false,false,null);
}
// 或者
@Bean
public Queue queue(){
return QueueBuilder.durable("simple.queue").build();
}
队列持久化只会保证队列在mq重启不丢失,但是如果是用rabbit网页publish的消息重启会丢失。
但是SpringAMQP中的消息默认是持久的。当然也可以显示的通过MessageProperties中的DeliveryMode来指定:
// 1.准备消息
Message message = MessageBuilder.withBody("hello, spring".getBytes(StandardCharsets.UTF_8))
// 持久化消息
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();
// 2.发送消息
rabbitTemplate.convertAndSend("simple.queue", message);
其实不需要上面显示指定,因为SpringAMQP中默认创建的队列,交换机,消息都是持久的
@Bean
public Binding bingExchange(){
// 绑定队列
return BindingBuilder.bind(queue())
// 队列绑定到哪个交换器
.to(topicExchange())
// 绑定路由key,必须指定
.with("simple.#");
}
RabbitMQ支持消费者确认机制,即:消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息。
而SpringAMQP则允许配置三种确认模式:
spring:
rabbitmq:
host: 192.168.150.101 # rabbitMQ的ip地址
port: 5672 # 端口
username: demo
password: 123321
virtual-host: /
listener:
simple:
prefetch: 1
acknowledge-mode: auto # none,关闭ack; manual,手动ack; auto: 自动ack
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
log.debug("消费者接收到simple.queue的消息:【" + msg + "】");
System.out.println(1 / 0);
log.info("消费者处理消息成功!");
}
当消费者出现异常后,消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力:
我们可以利用Spring的retry机制,在消费者出现异常时,利用本地重试,而不是无限制的requeue到mq队列。
spring:
rabbitmq:
host: rabbitmq # rabbitMQ的ip地址
port: 7105 # 端口
username: hikinner
password: hikinner
virtual-host: /
listener:
simple:
prefetch: 1
acknowledge-mode: auto
retry:
enabled: true #开启消费者失败重试
initial-interval: 1000 # 初始的失败等待时长为1秒
multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态; false有状态。如果业务中包含事务,这里改为false
重试次数耗尽之后报错,并丢弃,因为也没必要再回到队列了
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Retry Policy Exhausted
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:
使用RepublishMessageRecoverer策略
@Configuration
public class ErrorMessageConfig {
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue");
}
@Bean
public Binding errorMessageBinding(){
return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
}
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}
发布一条消息之后,可以发现重试了3次之后,消息被投递到error.direct
在rabbitMQ的error.queue中看到如下,不但记录了该消息,还记录了异常信息
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。