赞
踩
可以选择用 RabbitMQ 提供的事务功能,就是生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit。
值得我们注意的是:RabbitMQ中的事务与数据库的事务有稍许不同,数据库每次都需要打开事务,且最后与之对应的有commit或者rollback,而RabbitMQ中channel中的事务只需要开启一次,可以多次commit或者rollback
代码样例:
- // 开启事务
- channel.txSelect();
- try {
- // 这里发送消息
- } catch (Exception e) {
- channel.txRollback();
- // 这里再次重发这条消息
- }
- // 提交事务
- channel.txCommit();
这样看可能不太直观,下面我简单写一段使用RabbitMQ的代码,然后给大家解释一下
- //channel开启事务
- channel.txSelect();
- //发送3条消息
- String msgTemplate = "测试事务消息内容[%d]";
- channel.basicPublish("tx.exchange", "tx", new AMQP.BasicProperties(), String.format(msgTemplate,1).getBytes(StandardCharsets.UTF_8));
- channel.basicPublish("tx.exchange", "tx", new AMQP.BasicProperties(), String.format(msgTemplate,2).getBytes(StandardCharsets.UTF_8));
- channel.basicPublish("tx.exchange", "tx", new AMQP.BasicProperties(), String.format(msgTemplate,3).getBytes(StandardCharsets.UTF_8));
- //消息回滚
- channel.txRollback();
- //成功提交
- channel.basicPublish("tx.exchange", "tx", new AMQP.BasicProperties(), String.format(msgTemplate,4).getBytes(StandardCharsets.UTF_8));
- channel.txCommit();
上面的方法中一共发送了4次消息,前三次发送后最后调用了txRollback,这将导致前三条消息回滚而没有发送成功。而第四次发送之后调用commit,最后在RabbitMQ中只会有一条消息。
虽然事务可以保证消息一定被提交到服务器,而且在客户端编码方面足够简单。但是它也不是那么完美,在性能方面事务会带来较大的性能影响。RabbitMQ 事务机制是同步的,你提交一个事务之后会阻塞在那儿,采用这种方式基本上吞吐量会下来,因为太耗性能。
事务机制和 confirm 机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm 机制是异步的。
confirm机制是为了解决事务性能问题的一种方案,我们可以通过使用channel.confirmSelect方法开启confirm模式,在生产者开启了confirm模式之后,每次写的消息都会分配一个唯一的id,然后如果写入了rabbitmq之中,rabbitmq会给你回传一个ack消息,告诉你这个消息发送OK了;
如果rabbitmq没能处理这个消息,会回调你一个nack接口,告诉你这个消息失败了,你可以进行重试。而且你可以结合这个机制知道自己在内存里维护每个消息的id,如果超过一定时间还没接收到这个消息的回调,那么你可以进行重发
代码样例:
- //创建channel
- Channel channel = connection.createChannel();
-
- //消息的确认模式
- channel.confirmSelect();
-
- String exchangeName="test_confirm_exchange";
- String routeKey="confirm.test";
- String msg="RabbitMQ send message confirm test!";
- for (int i=0;i<5;i++){
- channel.basicPublish(exchangeName,routeKey,null,msg.getBytes());
- }
- //确定监听事件
- channel.addConfirmListener(new ConfirmListener() {
-
- /**
- * 消息成功发送
- * @param deliveryTag 消息唯一标签
- * @param multiple 是否批量
- * @throws IOException
- */
- @Override
- public void handleAck(long deliveryTag, boolean multiple) throws IOException {
- System.out.println("**********Ack*********");
- }
-
- /**
- * 消息没有成功发送
- * @param deliveryTag
- * @param multiple
- * @throws IOException
- */
- @Override
- public void handleNack(long deliveryTag, boolean multiple) throws IOException {
- System.out.println("**********No Ack*********");
- }
-
- });

- //创建channel
- Channel channel = connection.createChannel();
- String exchangeName="test_confirm_exchange";
- String exchangeType="topic";
- //声明Exchange
- channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
- String queueName="test_confirm_queue";
- //声明队列
- channel.queueDeclare(queueName,true,false,false,null);
- String routeKey="confirm.#";
- //绑定队列和交换机
- channel.queueBind(queueName,exchangeName,routeKey);
- channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
-
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("接收到消息::"+new String(body));
- }
- });

需要注意的是confirm机制与事务是不能共存的,简单的说就是开启事务就无法使用confirm,开启confirm就无法使用事务
在生产者将消息推送到RabbitMQ时,我们可以通过事务或者confirm模式来保证消息不会丢失。但是这两种措施只能保证消息到达Exchange,如果我们的消息无法根据RoutingKey到达对应的Queue中,那么我们的消息最后就会丢失。
对于这种情况,RabbitMQ中在发送消息时提供了mandatory参数。如果mandatory为true时,Exchange根据自身的类型和RoutingKey无法找到对应的Queue,它将不会丢掉该消息,而是会将消息返回给生产者
- //创建Exchange
- channel.exchangeDeclare("mandatory.exchange", BuiltinExchangeType.DIRECT, true, false, new HashMap<>());
- //创建Queue
- channel.queueDeclare("mandatory.queue", true, false, false, new HashMap<>());
- //绑定路由
- channel.queueBind("mandatory.queue", "mandatory.exchange", "mandatory");
- channel.addReturnListener(new ReturnListener() {
- @Override
- public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
- log.error("replyCode = {},replyText ={},exchange={},routingKey={},body={}",replyCode,replyText,exchange,routingKey,new String(body));
- }
- });
- //设置mandatory = true
- //void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
- channel.basicPublish("mandatory.exchange", "mandatory-1",true, new AMQP.BasicProperties(), "测试mandatory的消息".getBytes(StandardCharsets.UTF_8));
在我们调用BasicPublish方法的时候,我们设置了mandatory为true,同时还给channel设置了ReturnListener用来监听路由到队列失败的消息
RabbitMQ本身主要应对三点:
要保证rabbitMQ不丢失消息,那么就需要开启rabbitMQ的持久化机制,即把消息持久化到硬盘上,这样即使rabbitMQ挂掉在重启后仍然可以从硬盘读取消息;
如果rabbitMQ单点故障怎么办,这种情况倒不会造成消息丢失,这里就要提到rabbitMQ的3种安装模式,单机模式、普通集群模式、镜像集群模式,这里要保证rabbitMQ的高可用就要配合HAP ROXY做镜像集群模式;
如果硬盘坏掉怎么保证消息不丢失
RabbitMQ 的消息默认存放在内存上面,如果不特别声明设置,消息不会持久化保存到硬盘上面的,如果节点重启或者意外crash掉,消息就会丢失,所以就要对消息进行持久化处理。
在RabbitMQ中,我们可以通过将durable的值设置为true来保证持久化。如何持久化,下面具体说明下。要想做到消息持久化,必须满足以下三个条件,缺一不可。
Exchange 设置持久化
Queue 设置持久化
Message持久化发送:发送消息设置发送模式deliveryMode=2,代表持久化消息
先来介绍下RabbitMQ三种部署模式:
单节点模式:最简单的情况,非集群模式,节点挂了,消息就不能用了。业务可能瘫痪,只能等待。
普通模式:消息只会存在与当前节点中,并不会同步到其他节点,当前节点宕机,有影响的业务会瘫痪,只能等待节点恢复重启可用(必须持久化消息情况下)。
镜像模式:消息会同步到其他节点上,可以设置同步的节点个数,但吞吐量会下降。属于RabbitMQ的HA方案
为什么设置镜像模式集群,因为队列的内容仅仅存在某一个节点上面,不会存在所有节点上面,所有节点仅仅存放消息结构和元数据
如果想解决上面途中问题,保证消息不丢失,需要采用HA 镜像模式队列。
下面介绍下三种HA策略模式:
同步至所有的
同步最多N个机器
只同步至符合指定名称的nodes
但是:HA 镜像队列有一个很大的缺点就是系统的吞吐量会有所下降
系统是在一个复杂的环境,虽然以上的三种方案,基本可以保证消息的高可用不丢失的问题,但是仍然会遇到消息丢失的问题,如:持久化的消息,保存到硬盘过程中,当前队列节点挂了,存储节点硬盘又坏了,这种情况下消息仍然会丢失。
为了避免上面这个问题,我们可以让生产端首先将业务数据以及消息数据入库,需要在同一个事务中,消息数据入库失败,则整体回滚
消息表的主键:消息id ,消息状态,重试次数,创建时间
然后我们根据消息表中消息状态,失败则进行消息补偿措施,重新发送消息处理
消费者获取消息后处理消息失败
通过上面的方式我们保证了从生产者到RabbitMQ消息不会丢失,现在到了消费者消费消息了。
在消费者处理业务时,可能由于我们业务代码的异常导致消息没有被正常处理完,但是消息已经从RabbitMQ中的队列移除了,这样我们的消息就丢失了。
我同样也可以通过ACK确认机制去避免这种情况
在生产者发送消息到RabbitMQ时我们可以通过ack来确认消息是否到达了服务端,与之类似的是,消费者在消费消息时同样提供手动ack模式。默认情况下,消费者从队列中获取消息后会自动ack,我们可以通过手动ack来保证消费者主动的控制ack行为,这样我们可以避免业务异常导致消息丢失的情况。
- DeliverCallback deliverCallback = new DeliverCallback() {
- @Override
- public void handle(String consumerTag, Delivery message) throws IOException {
- try {
- byte[] body = message.getBody();
- String messageContent = new String(body, StandardCharsets.UTF_8);
- if("error".equals(messageContent)){
- throw new RuntimeException("业务异常");
- }
- log.info("收到的消息内容:{}",messageContent);
- channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
- }catch (Exception e){
- log.info("消费消息失败!重回队列!");
- channel.basicNack(message.getEnvelope().getDeliveryTag(),false,true);
- }
- }
- };
- CancelCallback cancelCallback = new CancelCallback() {
- @Override
- public void handle(String consumerTag) throws IOException {
- log.info("取消订阅:{}",consumerTag);
- }
- };
- channel.basicConsume("confirm.queue",false,deliverCallback,cancelCallback);

我们通过分析消息从生产者发送消息到消费者消费消息的全过程,得出了消息可能丢失的几种场景,并给出了相应的解决方案,如果需要保证消息在整条链路中不丢失,那就需要生产端、mq自身与消费端共同去保障。
生产端:对生产的消息进行状态标记,开启confirm机制,依据mq的响应来更新消息状态,使用定时任务重新投递超时的消息,多次投递失败进行报警。
mq自身:开启持久化,并在落盘后再进行ack。如果是镜像部署模式,需要在同步到多个副本之后再进行ack。
消费端:开启手动ack模式,在业务处理完成后再进行ack,并且需要保证幂等。
整个过程如下图所示
通过以上的处理,理论上不存在消息丢失的情况,但是系统的吞吐量以及性能有所下降。在实际开发中,需要考虑消息丢失的影响程度,来做出对可靠性以及性能之间的权衡。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。