赞
踩
RabbitMQ如何防止消息丢失及重复消费
如图所示,RabbitMQ丢失消息的情况可以发送在任何一个节点。记住下面图的执行流程
丢失的原因 :因为网络传输的不稳定性,当生产者在向MQ发送消息的过程中,MQ没有成功接收到消息,但是生产者却以为MQ成功接收到了消息,不会再次重复发送该消息,从而导致消息的丢失。
解决办法 : 有两个解决办法:事务机制和confirm机制,最常用的是confirm机制(发布确认机制)
注意:
1、RabbitMQ的事务机制是同步的,很耗型能,会降低RabbitMQ的吞吐量。
2、confirm机制是异步的,生成者发送完一个消息之后,不需要等待RabbitMQ的回调,就可以发送下一个消息,当RabbitMQ成功接收到消息之后会自动异步的回调生产者的一个接口返回成功与否的消息。
两个机制说明如下:
confirm(发布确认)机制: RabbitMQ可以开启 confirm 模式,在生产者那里设置开启 confirm 模式之后,生产者每次写的消息都会分配一个唯一的 id,如果消息成功写入 RabbitMQ 中,RabbitMQ 会给生产者回传一个 ack 消息,告诉你说这个消息 ok 了。如果 RabbitMQ 没写成功,会回调你的一个 nack 接口,告诉你这个消息接收失败,生产者可以再次向RabbitMQ发送消息。
RabbitMQ 提供了事务功能,生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit。伪代码如下:
//1、开启事务
channel.txSelect
try {
//2、这里发送消息
} catch (Exception e) {
channel.txRollback
//3、这里再次重发这条消息
}
//4、提交事务
channel.txCommit
丢失的原因 :RabbitMQ接收到生产者发送过来的消息,是存在内存中的,如果没有发给消费者,此时RabbitMQ宕机了,那么再次启动的时候,原来内存中的那些消息都丢失了。
解决办法
1、开启RabbitMQ的持久化。当生产者把消息成功写入RabbitMQ之后,RabbitMQ就把消息持久化到磁盘。结合上面的说到的confirm机制,如果开启了消息的持久化,只有当消息成功持久化磁盘之后,才会回调生产者的接口返回ack消息,否则都算失败
2、重启之后,他会读取磁盘中的消息,不会导致消息的丢失。
持久化的配置:
1、queue队列持久化,创建 queue 的时候将其设置为持久化,这个时候即使重启 rabbitmq 队列,它也依然存在,但是它是不会持久化queue 里的数据的。
2、Message消息持久化,发送消息的时候将消息的 deliveryMode 设置为 2,设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。
3、注意:持久化要起作用必须同时设置这两个持久化才行
丢失的原因:如果RabbitMQ成功的把消息发送给了消费者,那么RabbitMQ的ack机制会自动的返回成功,表明发送消息成功,下次就不会发送这个消息。但如果就在此时,消费者还没处理完该消息,然后宕机了,那么这个消息就丢失了。
解决的办法 :简单来说,就是必须关闭 RabbitMQ 的自动 ack,采用手动ack(消息应答机制)。这样的话,如果你还没处理完,不就没有 ack了! 那 RabbitMQ就认为你还没处理完,当到达超时时间还没有提交,这个时候 RabbitMQ 认为你处理失败,消息自动重新入队,会把这个消费分配给别的consumer 去处理。当处理成功后rabbitmq 才会把该消息删除
消息应答机制,分为两种:自动应答、手动应答。
正常情况下,消费者在消费消息的时候,消费完毕后,会发送一个确认消息给RabbitMQ(假如RabbitMQ没有收到呢),消息队列就知道该消息被消费了,就会将该消息从消息队列中删除。
特殊情况下:
因为网络传输等等故障,如果这个确认信息没有回传送到RabbitMQ,RabbitMQ可能认为消费者处理失败,就会再次将消息分发给其他的消费者。
解决思路是:保证消息的唯一性,就算是多次传输,不要让消息的多次消费带来影响;保证消息的幂等性;
1、在消息生产时,MQ内部针对每条生产者发送的消息生成一个inner-msg-id,作为去重和幂等的依据,避免重复的消息进入队列
2、在消息消费时,要求消息体中必须要有一个bizId(全局唯一: 如支付ID、订单ID等)作为去重和幂等的依据,避免同一条消息被重复消费。
所以在保证稳定的情况下:最好上面2种手段都做一下,然后消费的时候,也把msgId入库,在消费是查询是否被消费过,如果有就不处理
本文引子:
https://blog.csdn.net/wang_luwei/article/details/123613091
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。