当前位置:   article > 正文

RabbitMQ确认机制_rabbitmq生产者确认机制

rabbitmq生产者确认机制
介绍

概要:RabbitMQ中间件的引入对于整个系统来说是一把双刃剑,在对系统进行解耦的同时也降低了消息的可靠性,但是对于某些系统来说我们又必须保证我们的消息是不会丢失的,因此rabbitmq给提供了以下一些功能来保证消息的可靠性,本文我们主要讲解消息可靠性中的 发送端确认机制 以及 消费端确认机制,需要保证消费者、RabbitMQ自己和消费者都不能丢消息

RabbitMQ通过 publisher confirm 机制来实现的消息发送端确认。生产者将信道设置成confirm(确认)模式,一旦信道进入confirm 模式,所有在该信道上⾯面发布的消息都会被指派一个唯一的ID(从 1 开始),一旦消息被投递到所有匹配的队列之后 (如果消息和队列是持久化的,那么确认消息会在消息持久化后发出),RabbitMQ 就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID),这样生产者就知道消息已经正确送达了。

img

​ 在生产者向rabbitmq发送消息的整个流程中,生产者首先是要将消息发送给 交换机,然后交换机 根据指定的 路由键 把消息路由到指定的消息队列中,然后消费者从对应的消息队列对消息进行消费,因此我们要实现生产端的消息确认就需要保证 消息发送到交换机 以及 交换机路由消息到队列 的时候消息是不会丢失的

img

1、导入依赖
<dependencies>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
2、配置文件
server:
  port: 7001

#RabbitMQ
spring:
  rabbitmq:
    username: admin
    password: admin
    host: 192.168.132.128
    port: 5672
    virtual-host: /
    publisher-returns: true
    listener:
      simple:
        concurrency: 10 #消费者数量
        max-concurrency: 10 #最大消费者的数量
        prefetch: 1 #限流(消费者每次从队列获取的消息数量)
        auto-startup: true  #启动时自动启动容器
        acknowledge-mode: manual #手动ack
#        retry:
#          enabled: true
#          max-attempts: 3 # 重试次数
#          max-interval: 10000   # 重试最大间隔时间
#          initial-interval: 2000  # 重试初始间隔时间
#          multiplier: 2 # 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间
    publisher-confirm-type: correlated   # 开启确认机制/老版 publisher-confirms: true
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
3、创建交换机
1、Direct交换机模式
package edu.hunan.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DirectConfig {

    //声明注册Direct交换机模式
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("direct_exchange");
    }

    //声明队列
    @Bean
    public Queue directQueue(){
        /**
         * 参数详解
         * durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
         * exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
         * autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
         * 一般设置一下队列的持久化就好,其余两个就是默认false
         */
        return new Queue("direct_queue",true);
    }

    @Bean
    public Queue directQueue2(){
        /**
         * 参数详解
         * durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
         * exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
         * autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
         * 一般设置一下队列的持久化就好,其余两个就是默认false
         */
        return new Queue("direct_queue2",true);
    }

    //交换机和队列绑定
    @Bean
    public Binding directBinding(){
        return BindingBuilder.bind(directQueue()).to(directExchange()).with("direct1");
    }

    @Bean
    public Binding directBinding2(){
        return BindingBuilder.bind(directQueue2()).to(directExchange()).with("direct2");
    }


}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
2、FanOut交换机模式
package edu.hunan.rabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanOutConfig {

    //声明注册FanOut交换机模式
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanout_exchange");
    }

    //声明队列
    @Bean
    public Queue fanoutQueue(){
        /**
         * 参数详解
         * durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
         * exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
         * autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
         * 一般设置一下队列的持久化就好,其余两个就是默认false
         */
        return new Queue("fanout_queue",true);
    }

    //交换机和队列绑定
    @Bean
    public Binding fanoutBinding(){
        return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());
    }


}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
4、初始化回调方法
package edu.hunan.rabbitmq.service;

import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;

@Service
public class RabbitMQService {


    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 被@PostConstruct修饰的方法会在服务器加载Servlet的时候运行,并且只会被服务器执行一次。
     */
    @PostConstruct
    public void init(){
        /**
         * 消息投递到交换机后触发回调
         * 使用该功能需要开启确认,spring-boot中配置如下:
         * publisher-confirm-type: correlated   # 开启确认机制/老版 publisher-confirms: true
         */
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if(ack){
                    System.err.println("消息投递成功,消息已确认->"+cause+"\t"+correlationData);
                }else{
                    /**
                     * 如果消息投递失败需要设置重发   如果一直重发失败投递到死信队列中/数据库 进行手动排查
                     * 重发数据库+定时任务实现
                     */
                    System.err.println("消息投递失败,消息未确认->"+cause+"\t"+correlationData);
                }
            }
        });

        /**
         * 通过实现ReturnsCallback接口
         * 如果消息从交换机投递到队列中失败时触发
         * 比如根据发送消息指定Routingkey找不到队列时触发
         * 使用该功能需要开启确认,spring-boot中配置如下:
         * spring.rabbitmq.publisher-returns = true
         */
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                //需要将接收失败的保存到DB中并且手动排错
                System.err.println("队列接收消息失败,消息被退回"+returned);
            }
        });
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
5、生产者
package edu.hunan.rabbitmq.service;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * 生产者
 */
@Service
public class DirectServiceImpl {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void make(String message) {
        //交换机名称
        String exchangeName = "direct_exchange";
        //如果有多个队列 通过routingkey 投递到指定的队列
        String routingkey ="direct1";
        //投递到交换机中
        System.err.println("生产者准备开始投递消息");
        rabbitTemplate.convertAndSend(exchangeName,routingkey,message);
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
6、消费者
package edu.hunan.rabbitmq.service;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;

import java.io.IOException;

/**
 * 消费者
 */
@Service
@RabbitListener(queues = "direct_queue")
public class ConsumerDirectService {
    @RabbitHandler
    public void reviceMessage(@Payload String Message2, @Headers Channel channel, Message message) throws IOException, InterruptedException {
        System.err.println("测试异步不影响前端返回");
           
            /**
             * 接收失败设置重试  拒绝当前消息,并把消息返回原队列 都设置false表示拒绝重发 消息则丢失
             * 方式1 使用自带的重试机制并且控制重试次数  重试机制不能用try/catch否则会死循环 而是将异常抛出
             * 触发重试机制需要消费者抛出异常,而不能try/catch捕捉异常,不然会死循环。
             * 方式2 将接收失败的消息使用死行队列接盘   死行队列+try/catch
             */
        
        try {
            //处理消息
             System.err.println("消费者消息接收成功-》"+Message2);
//           确认消息已经消费成功
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            System.err.println("消费处理异常"+userId+"\t"+e);
//             拒绝当前消息,并把消息返回原队列 都设置false表示拒绝重发
                   /**
             * 接收失败设置重试  拒绝当前消息,并把消息返回原队列 都设置false表示拒绝重发 消息则丢失
             * 方式1 使用自带的重试机制并且控制重试次数  重试机制不能用try/catch否则会死循环 而是将异常抛出
             * 触发重试机制需要消费者抛出异常,而不能try/catch捕捉异常,不然会死循环。
             * 方式2 将接收失败的消息使用死信队列接盘   死信队列+try/catch
             */
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/700841
推荐阅读
相关标签
  

闽ICP备14008679号