赞
踩
目录
概要:RabbitMQ中间件的引入对于整个系统来说是一把双刃剑,在对系统进行解耦的同时也降低了消息的可靠性,但是对于某些系统来说我们又必须保证我们的消息是不会丢失的,因此rabbitmq给提供了以下一些功能来保证消息的可靠性,本文我们主要讲解消息可靠性中的 发送端确认机制 以及 消费端确认机制
RabbitMQ通过 publisher confirm 机制来实现的消息发送端确认。生产者将信道设置成confirm(确认)模式,一旦信道进入confirm 模式,所有在该信道上⾯面发布的消息都会被指派一个唯一的ID(从 1 开始),一旦消息被投递到所有匹配的队列之后 (如果消息和队列是持久化的,那么确认消息会在消息持久化后发出),RabbitMQ 就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID),这样生产者就知道消息已经正确送达了。
在生产者向rabbitmq发送消息的整个流程中,生产者首先是要将消息发送给 交换机,然后交换机 根据指定的 路由键 把消息路由到指定的消息队列中,然后消费者从对应的消息队列对消息进行消费,因此我们要实现生产端的消息确认就需要保证 消息发送到交换机 以及 交换机路由消息到队列 的时候消息是不会丢失的
消息发送到交换机
在配置文件中开启消息确认模式
- # SIMPLE 禁用发布确认模式,是默认值
- # CORRELATED 发布消息成功到交换器或失败后 会触发回调方法
- # NONE 有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用
- rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回
- 发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果
- 返回false则会关闭channel,则接下来无法发送消息到broker;
-
- spring.rabbitmq.publisher-confirm-type=CORRELATED
通过实现 RabbitTemplate.ConfirmCallback 类来对消息发送结果进行处理
- @Component
- public class RabbitConfirmConfig implements RabbitTemplate.ConfirmCallback {
-
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
-
- if (!ack) {
- # 根据具体的业务进行相应的处理
- System.out.println("【交换机】 生产者消息确认失败了====" + cause);
- } else {
- System.out.println("【交换机】 生产者消息确认成功====");
- }
- }
- }
对rabbitTemplate的callback进行设置
- @Configuration
- public class RabbitConfig {
-
- @Autowired
- private RabbitConfirmConfig rabbitConfirmConfig;
-
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @PostConstruct
- public void initRabbitTemplate(){
- rabbitTemplate.setConfirmCallback(rabbitConfirmConfig);
- }
- }
交换机路由消息到队列
在配置文件中开启消息异常重新入队
- # 确保消息发送失败后可以重新返回到队列中
- # 也可以通过 rabbitTemplate.setMandatory(true) 来设置
- spring.rabbitmq.publisher-returns=true
通过实现 RabbitTemplate.ConfirmCallback 类来对消息发送结果进行处理
- @Component
- public class RabbitConfirmConfig implements RabbitTemplate.ReturnCallback {
-
- @Override
- public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
-
- # 根据具体的业务对异常进行处理,自行判断是否消息可以丢弃
- if (AMQP.NO_ROUTE == replyCode){
- System.out.println("【队列】 交换机路由到队列失败====" + message);
- }
-
- }
-
- }
对rabbitTemplate的 returnback 进行设置
- @Configuration
- public class RabbitConfig {
-
- @Autowired
- private RabbitConfirmConfig rabbitConfirmConfig;
-
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @PostConstruct
- public void initRabbitTemplate(){
- rabbitTemplate.setReturnCallback(rabbitConfirmConfig);
- }
- }
在配置文件中对消费者消息应答方式进行设置
- # NONE 则只要收到消息后就立即确认(消息出列,标记已消费),有丢失数据的风险
- # MANUAL 需要显式的调用当前channel的basicAck方法
- # AUTO 看情况确认,如果此时消费者抛出异常则消息会返回到队列中
- spring.rabbitmq.listener.direct.acknowledge-mode=MANUAL
-
-
- SpringBoot项目中支持如下的一些配置:
- #最大重试次数
- spring.rabbitmq.listener.simple.retry.max-attempts=5
-
- #是否开启消费者重试(为false时关闭消费者重试,意思不是“不重试”,而是一直收到消息直到jack 确认或者一直到超时)
- spring.rabbitmq.listener.simple.retry.enabled=true
-
- #重试间隔时间(单位毫秒)
- spring.rabbitmq.listener.simple.retry.initial-interval=5000 #
-
- 重试超过最大次数后是否拒绝
- spring.rabbitmq.listener.simple.default-requeue-rejected=false
-
- #ack模式
- spring.rabbitmq.listener.simple.acknowledge-mode=manual
通过实现监听器的方式实现消费端消息的推送
- @Component
- public class OrderListener {
-
- @RabbitListener(queues = RabbitConfig.QUEUE_ORDER_NAME, ackMode = "MANUAL")
- public void toOrderMessage(Channel channel, Message message) throws IOException {
-
- try {
- /**
- * deliveryTag 消息编号
- * multiple 是否全应答 true: deliveryTag 之前的消息全部设为应答
- */
- channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
- } catch (Exception e){
- /**
- * deliveryTag 消息编号
- * multiple 是否全应答 true: deliveryTag 之前的消息全部设为应答
- * requeue 是否重新入队
- */
- channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
- }
-
- }
-
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。