当前位置:   article > 正文

RabbitMQ中消息确认机制_@rabbitlistener ackmode

@rabbitlistener ackmode

目录

1.发送端确认机制

2.消费端确认机制


概要:RabbitMQ中间件的引入对于整个系统来说是一把双刃剑,在对系统进行解耦的同时也降低了消息的可靠性,但是对于某些系统来说我们又必须保证我们的消息是不会丢失的,因此rabbitmq给提供了以下一些功能来保证消息的可靠性,本文我们主要讲解消息可靠性中的 发送端确认机制 以及 消费端确认机制

1.发送端确认机制

RabbitMQ通过 publisher confirm 机制来实现的消息发送端确认。生产者将信道设置成confirm(确认)模式,一旦信道进入confirm 模式,所有在该信道上⾯面发布的消息都会被指派一个唯一的ID(从 1 开始),一旦消息被投递到所有匹配的队列之后 (如果消息和队列是持久化的,那么确认消息会在消息持久化后发出),RabbitMQ 就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID),这样生产者就知道消息已经正确送达了。

        在生产者向rabbitmq发送消息的整个流程中,生产者首先是要将消息发送给 交换机,然后交换机 根据指定的 路由键 把消息路由到指定的消息队列中,然后消费者从对应的消息队列对消息进行消费,因此我们要实现生产端的消息确认就需要保证 消息发送到交换机 以及 交换机路由消息到队列 的时候消息是不会丢失的

消息发送到交换机

在配置文件中开启消息确认模式

  1. # SIMPLE 禁用发布确认模式,是默认值
  2. # CORRELATED 发布消息成功到交换器或失败后 会触发回调方法
  3. # NONE 有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用
  4. rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回
  5. 发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果
  6. 返回false则会关闭channel,则接下来无法发送消息到broker;
  7. spring.rabbitmq.publisher-confirm-type=CORRELATED

 通过实现 RabbitTemplate.ConfirmCallback 类来对消息发送结果进行处理

  1. @Component
  2. public class RabbitConfirmConfig implements RabbitTemplate.ConfirmCallback {
  3. @Override
  4. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  5. if (!ack) {
  6. # 根据具体的业务进行相应的处理
  7. System.out.println("【交换机】 生产者消息确认失败了====" + cause);
  8. } else {
  9. System.out.println("【交换机】 生产者消息确认成功====");
  10. }
  11. }
  12. }

 对rabbitTemplate的callback进行设置

  1. @Configuration
  2. public class RabbitConfig {
  3. @Autowired
  4. private RabbitConfirmConfig rabbitConfirmConfig;
  5. @Autowired
  6. private RabbitTemplate rabbitTemplate;
  7. @PostConstruct
  8. public void initRabbitTemplate(){
  9. rabbitTemplate.setConfirmCallback(rabbitConfirmConfig);
  10. }
  11. }

 交换机路由消息到队列

在配置文件中开启消息异常重新入队

  1. # 确保消息发送失败后可以重新返回到队列中
  2. # 也可以通过 rabbitTemplate.setMandatory(true) 来设置
  3. spring.rabbitmq.publisher-returns=true

 通过实现 RabbitTemplate.ConfirmCallback 类来对消息发送结果进行处理

  1. @Component
  2. public class RabbitConfirmConfig implements RabbitTemplate.ReturnCallback {
  3. @Override
  4. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
  5. # 根据具体的业务对异常进行处理,自行判断是否消息可以丢弃
  6. if (AMQP.NO_ROUTE == replyCode){
  7. System.out.println("【队列】 交换机路由到队列失败====" + message);
  8. }
  9. }
  10. }

对rabbitTemplate的 returnback 进行设置

  1. @Configuration
  2. public class RabbitConfig {
  3. @Autowired
  4. private RabbitConfirmConfig rabbitConfirmConfig;
  5. @Autowired
  6. private RabbitTemplate rabbitTemplate;
  7. @PostConstruct
  8. public void initRabbitTemplate(){
  9. rabbitTemplate.setReturnCallback(rabbitConfirmConfig);
  10. }
  11. }

2.消费端确认机制

在配置文件中对消费者消息应答方式进行设置

  1. # NONE 则只要收到消息后就立即确认(消息出列,标记已消费),有丢失数据的风险
  2. # MANUAL 需要显式的调用当前channel的basicAck方法
  3. # AUTO 看情况确认,如果此时消费者抛出异常则消息会返回到队列中
  4. spring.rabbitmq.listener.direct.acknowledge-mode=MANUAL
  5. SpringBoot项目中支持如下的一些配置:
  6. #最大重试次数
  7. spring.rabbitmq.listener.simple.retry.max-attempts=5
  8. #是否开启消费者重试(为false时关闭消费者重试,意思不是“不重试”,而是一直收到消息直到jack 确认或者一直到超时)
  9. spring.rabbitmq.listener.simple.retry.enabled=true
  10. #重试间隔时间(单位毫秒)
  11. spring.rabbitmq.listener.simple.retry.initial-interval=5000 #
  12. 重试超过最大次数后是否拒绝
  13. spring.rabbitmq.listener.simple.default-requeue-rejected=false
  14. #ack模式
  15. spring.rabbitmq.listener.simple.acknowledge-mode=manual

通过实现监听器的方式实现消费端消息的推送

  1. @Component
  2. public class OrderListener {
  3. @RabbitListener(queues = RabbitConfig.QUEUE_ORDER_NAME, ackMode = "MANUAL")
  4. public void toOrderMessage(Channel channel, Message message) throws IOException {
  5. try {
  6. /**
  7. * deliveryTag 消息编号
  8. * multiple 是否全应答 true: deliveryTag 之前的消息全部设为应答
  9. */
  10. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  11. } catch (Exception e){
  12. /**
  13. * deliveryTag 消息编号
  14. * multiple 是否全应答 true: deliveryTag 之前的消息全部设为应答
  15. * requeue 是否重新入队
  16. */
  17. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
  18. }
  19. }
  20. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/395117?site
推荐阅读
相关标签
  

闽ICP备14008679号