赞
踩
CSDN 格式阅览观赏性可能较差,可移步阅览:
https://www.yuque.com/docs/share/1902147c-8c40-4300-b3ee-4f2e7b8f3bba?# 《RabbitMQ(Message Queue,消息队列)》
● 消息中间件是软件和软件之间发送消息的软件
● 消息中间件最大的作用就是 异步处理和系统解耦
● 另外消息中间件还有 消息收集广播、流量控制等功能
综合对比:
总结:
● ActiveMQ最"老",老牌,但维护较慢
● RabbitMQ最"火",适合大小公司,各种场景通杀
● RocketMQ最"猛",功能强,但考验公司运维能力
● Kafka最"强",支持超大量数据,但消息可靠性弱
1、主要得力于语言(Ericsson Language)
由爱立信公司开发,专门为交换机软件开发诞生的编程语言
2、Erlang特点
● 通用的面向并发的编程语言,适用于分布式系统
● 基于虚拟机解释运行,友好的跨平台部署
● 进程间上下文切换效率远高于C语言和Java,进一步提高了 RabbitMQ 并发性能
● 有着和原生Socket一样的延迟,使得RabbitMQ的网络IO性能极高
3、总结:
● RabbitMQ底层使用Erlang实现,天生具有高性能基因
● RabbitMQ在互联网和金融领域都有广泛的应用
安装 Erlang 参考博客:
https://blog.csdn.net/weixin_47627102/article/details/126128198?spm=1001.2014.3001.5502安装 RabbitMQ参考博客: https://blog.csdn.net/weixin_47627102/article/details/126128601?spm=1001.2014.3001.5502
● 管理台是RabbitMQ开发和管理的必备工具
● 管理台有配置、验证、监控等多种功能
● 在业务开发中,要注意利用管控台,有效提升开发效率
使用场景:
● 生产环境、端口限制等不便打开网页端工具的场景
● 使用脚本自动化配置 RabbitMQ
使用口诀:
关键:
● AMQP协议直接定义了 RabbitMQ 的内部结构和外部行为
● 我们使用 RabbitMQ 本质上就是在是使用 AMQP 协议
● AMQP 协议被多种消息中间件使用,可以举一反三
流程:
● 交换机数量不能过多,一般来说同一个业务,或者同一类业务使用同一个交换机
● 合理设置队列数量,一般来说一个微服务监听一个队列,或者一个微服务的一个业务监听一个队列
● 合理配置交换机类型,使用 Topic 模式时仔细设置绑定键
● 尽量使用自动化配置:
○ 将创建交换机 / 队列的操作固化在应用代码中,免去复杂的运维操作,高效且不易出错
○ 一般来说,交换机由双方同时声明,队列由接收方声明并配置绑定关系
○ 交换机 / 队列的参数一定要由双方开发团队确认,否则重复声明时,若参数不一致,会导致声明失败
总结:
● 理解AMQP协议架构
● 熟悉消息流转流程
● 合理设置交换机队列
● 尽量使用自动化配置
提一个问题:为什么AMQP要设计Exchange消息流转机制?
1)模拟交换机处理的机制,通信输入根据绑定规则找到相应的队列发送给消费者,信息传递可靠性高
2)流转机制:同一个交换机可以绑定多个队列,分布式的处理,使性能提高
3)灵活多变的绑定机制可以适用于多种情况的应用
1.1.1、消息真的发出去了嘛?
● 消息发送后,发送端不知道RabbitMQ是否真的收到了消息
● 若RabbitMQ异常,消息丢失后,订单处理流程停止,业务异常
● 需要使用RabbitMQ发送端确认机制,确认消息发送
三种确认机制
● 单条同步确认
○ 配置channel,开启确认模式:channel.confirmSelect()
○ 每发送一条消息,调用channel.waitForConfirms()方法
● 多条同步确认 (不推荐)
○ 配置channel,开启确认模式:channel.confirmSelect()
○ 发送多条消息后,调用channel.waitForConfirms()方法,等待确认
● 异步确认(不推荐,但值得学习其思路)
○ 配置channel,开启确认模式:channel.confirmSelect()
○ 在channel上添加监听:addConfirmListenner,发送消息后,会回调此方法,通知是否发送成功
○ 异步确认有可能是单条,也有可能是多条,取决于MQ
1.1.2、消息真的被路由了嘛?
● 消息发送后,发送端不知道消息是否被正确路由,若路由异常,消息会被抛弃
● 消息被丢弃后,订单处理流程停止,业务异常
● 需要使用RabbitMQ消息返回机制,确认消息被正确路由
消息返回的使用方法:
● 在RabbitMQ基础配置中有一个关键配置项: Mandatory
● Mandatory若为false,RabbitMQ将直接丢弃无法路由的消息
● Mandatory若为true,RabbitMQ才会处理无法路由的消息
1.1.3、消费端处理的过来嘛?
● 业务高峰期,可能出现发送端与接收端性能不一致,大量消息被同时推送给接收端,造成接收端服务崩溃
● 需要使用RabbitMQ消费端限流机制,限制消息推送速度,保障接收端服务稳定
RabbitMQ - QoS
● 针对以上问题,RabbitMQ开发了 QoS (服务质量保证功能)
● QoS 功能保证了在一定数目的消息未被确认前,不消费新的消息
● QoS 功能的前提是不使用自动确认
QoS 原理
● QoS 原理是当消费端有一定数量的消息未被 ACK 确认时,RabbitMQ 不给消费端推送新的消息
● RabbitMQ 使用 QoS 机制实现了消费端限流
消费端限流流参数设置
● prefetchCount:针对一个消费端最多推送多少未确认消息
● global: true 针对整个消费端限流 false:针对当前channel
● prefetchSize:0 (单个消息大小限制, 一般为0)
● prefetchSize与global两项,RabbitMQ暂时未实现
1.1.4、消费端处理异常怎么办?
● 默认情况下,消费端接收消息时,消息会被自动确认(ACK)
● 消费端消息处理异常时,发送端与消息中间件无法得知消息处理情况
● 需要使用RabbitMQ消费端确认机制,确认消息被正确处理
消费端ACK类型
● 自动ACK:消费端收到消息后,会自动签收消息
● 手动ACK:消费端收到消息后,不会自动签收消息,需要我们在业务代码中显式签收消息
○ 单条手动ACK:mulitple=false(推荐使用)
○ 多条手动ACK:mulitple=true
重回队列
● 若设置了重回队列,消息被NACK之后,会返回队列末尾,等待进一步被处理
● 一般不建议开启重回队列,因为第一次处理异常的消息,再次处理,基本上也是异常
1.1.5、队列爆满怎么办?
● 默认情况下,消息进入队列,会永远存在,直到被消费
● 大量堆积的消息会给RabbitMQ产生很大的压力
● 需要使用RabbitMQ消息过期时间,防止消息大量积压
RabbitMQ的过期时间(TTL)
● RabbitMQ 的过期时间称为 TTL (Time to Live) ,生存时间
● RabbitMQ 的过期时间分为消息 TTL 和队列 TTL
● 消息 TTL 设置了单条消息的过期时间
● 队列 TLL 设置了队列中所有消息的过期时间
如何找到适合自己的TTL
● TTL 的设置主要考虑技术架构与业务
● TTL 应该明显长于服务的平均重启时间
● 建议 TTL 长于业务高峰期时间
1.1.6、如何转移过期消息?
● 消息被设置了过期时间,过期后会直接被丢弃
● 直接被丢弃的消息,无法对系统运行异常发出警报
● 需要使用RabbitMQ死信队列,收集过期消息,以供分析
什么是死信队列
● 死信队列:队列被配置了DLX属性 (Dead-Letter-Exchange)
● 当一个消息变成死信(dead message)后,能重新被发布到另一个Exchange,这个Exchange也是一个普通交换机
● 死信被死信交换机路由后,一般进入一个固定队列
怎么变成死信
● 消息被拒绝 ( reject/nack ) 并且requeue = false
● 消息过期(TTL到期)
● 队列达到最大长度
死信队列设置方法
● 设置转发、接收死信的交换机和队列:
◆Exchange: dlx.exchange
◆Queue: dlx.queue
◆RoutingKey: #
● 在需要设置死信的队列加入参数:
◆x-dead-letter-exchange = dlx.exchange
总结:
● 发送端确认机制
● 消息返回机制
● 消费端限流机制
● 消费端确认机制
● 消息过期时间
● 死信队列
● 异步消息监听容器
● 原生提供 RabbitTemplate,方便收发消息
● 原生提供 RabbitAdmin,方便队列、交换机的声明
● SpringBoot Config 原生支持 RabbitMQ
1、异步消息监听容器
● 原生实现:自己实现线程池、回调方法,并注册回调方法
● SpringBoot:自动实现可配置的线程池,并自动注册回调方法,只需实现回调方法
1.1、SimpleMessageListenerContainer简单消息监听容器
● 设置同时监听多个队列、自动启动、自动配置RabbitMQ
● 设置消费者数量(最大数量、最小数量、批量消费)
● 设置消费确认模式、是否重回队列、异常捕获
● 设置是否独占、其他消费者属性等
● 设置具体的监听器、消息转换器等
● 支持动态设置,运行中修改监听器配置
@Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){ // 创建简单消息监听容器 SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory); // 设置监听的队列 simpleMessageListenerContainer.addQueueNames("queue.order"); // 设置消费者线程数为3 simpleMessageListenerContainer.setConcurrentConsumers(3); // 设置消费者线程最大并发数为5 simpleMessageListenerContainer.setMaxConcurrentConsumers(5); /*以下为自动确认配置*/ // // 设置监听确认的方式为自动确认 // simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO); // // 设置通自动进行回调处理 // simpleMessageListenerContainer.setMessageListener(message -> { // log.info("RabbitMqConfigMessage:{}",message); // }); /*以下为手动确认配置*/ // 设置监听确认的方式为手动确认 simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 设置通自动进行回调处理 simpleMessageListenerContainer.setMessageListener((ChannelAwareMessageListener) (message, channel) -> { log.info("RabbitMqConfigMessage:{}",message); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }); // 消费端限流,最多可取一条消息 simpleMessageListenerContainer.setPrefetchCount(1); return simpleMessageListenerContainer; }
1.2、MessageListenerAdapter消息监听适配器
● 适配器设计模式
● 解决业务逻辑代码无法修改的问题
1.2.1、使用方法
● 简单模式:实现 handleMessage 方法
● 高阶模式:自定义 “队列名 --> 方法名” 映射关系
@Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){ // 创建简单消息监听容器 SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory); // 设置监听的队列 simpleMessageListenerContainer.addQueueNames("queue.order"); // 设置消费者线程数为3 simpleMessageListenerContainer.setConcurrentConsumers(3); // 设置消费者线程最大并发数为5 simpleMessageListenerContainer.setMaxConcurrentConsumers(5); /*以下为自动确认配置*/ // // 设置监听确认的方式为自动确认 // simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO); // // 设置通自动进行回调处理 // simpleMessageListenerContainer.setMessageListener(message -> { // log.info("RabbitMqConfigMessage:{}",message); // }); /*以下为手动确认配置*/ // 设置监听确认的方式为手动确认 simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 设置通自动进行回调处理(此方式不够优雅,下面采用消息监听适配器使调用变的优雅) // simpleMessageListenerContainer.setMessageListener((ChannelAwareMessageListener) (message, channel) -> { // log.info("RabbitMqConfigMessage:{}",message); // // 调用 // orderMessageService.handleMessage(message.getBody()); // channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // }); // 消费端限流,最多可取一条消息 simpleMessageListenerContainer.setPrefetchCount(1); // 创建 orderMessageService 类适配器 MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(orderMessageService); // 可以通过构造器直接构造,也可以当属性写入 // messageListenerAdapter.setDelegate(orderMessageService); // 可以通过自定义队列名调用哪个回调方法(非常方便且优雅) // Map<String,String> methodMap = new HashMap<>(8); // methodMap.put("queue.order1","handleMessage1"); // methodMap.put("queue.order2","handleMessage2"); // methodMap.put("queue.order3","handleMessage3"); // messageListenerAdapter.setQueueOrTagToMethodName(methodMap); // 将适配器设置为此队列消息监听器 simpleMessageListenerContainer.setMessageListener(messageListenerAdapter); return simpleMessageListenerContainer; }
1.3、MessageConverter
● 之前收发消息时,使用了Byte[] 数组作为消息体
● 编写业务逻辑时,需要使用Java对象
● MessageConverter用来在收发消息时自动转换消息
● 作为一个接口,使用需要继承,spring中的实现是Jackson2JsonMessageConverter
1.3.1、Jackson2JsonMessageConverter
● 最常用的MessageConverter,用来转换Json格式消息
● 配合ClassMapper可以直接转换为POJO对象
// 创建JSON消息转换器对象 Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); // 转换方式一:(有需求才用) // 通过实现ClassMapper方法实现转化 // jackson2JsonMessageConverter.setClassMapper(new ClassMapper() { // @Override // public void fromClass(Class<?> aClass, MessageProperties messageProperties) { // // } // // @Override // public Class<?> toClass(MessageProperties messageProperties) { // // 将消息转化为 OrderMessageDTO 类型 // return OrderMessageDTO.class; // } // }); // 转换方式二:(不推荐) // Jackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper(); // jackson2JsonMessageConverter.setJavaTypeMapper(); // 根据消息属性的不同去调用不同的MessageConverter(很少用) // 其属性:private final Map<String, MessageConverter> delegates; ContentTypeDelegatingMessageConverter contentTypeDelegatingMessageConverter = new ContentTypeDelegatingMessageConverter();
1.3.2、自定义MessageConverter
● 实现MessageConverter接口
● 重写 toMessage、fromMessage方法
// 转换方式一:(有需求才用) // 通过实现ClassMapper方法实现转化 // jackson2JsonMessageConverter.setClassMapper(new ClassMapper() { // @Override // public void fromClass(Class<?> aClass, MessageProperties messageProperties) { // // } // // @Override // public Class<?> toClass(MessageProperties messageProperties) { // // 将消息转化为 OrderMessageDTO 类型 // return OrderMessageDTO.class; // } // }); 1.4、RabbitListener ● RabbitListener是SpringBoot架构中监听消息的"终极方案" ● RabbitListener使用注解声明,对业务代码无侵入 ● RabbitListener可以在SpringBoot配置文件中进行配置 1.4.1、@RabbitListener注解 ● @RabbitListener是一个组合注解,可以嵌套以下注解: ● @Exchange:自动声明Exchange ● @Queue:自动声明队列 ● @QueueBinding:自动声明绑定关系 /** * 声明消息队列、交换机、绑定、消息的处理(注解版) * * @param message 消息对象 */ @RabbitListener( containerFactory = "rabbitListenerContainerFactory", admin = "rabbitAdmin", bindings = { @QueueBinding( value = @Queue( name = "queue.order" // ,arguments = { // @Argument( // name = "x-message-ttl", // value = "1000", // type = "java.lang.Integer"// 如果是String类型就不需要声明 // ),// 添加单条消息过期时间 // @Argument( // name = "x-dead-letter-exchange", // value = "exchange.dlx" // ),// 声明这个死信队列 // }// 添加参数 ),//声明队列 exchange = @Exchange(name = "exchange.order.restaurant",type = ExchangeTypes.DIRECT),//声明交换机,默认为DIRECT key = "key.order"//绑定key ),//声明绑定 @QueueBinding( value = @Queue(name = "queue.order"),//声明队列 exchange = @Exchange(name = "exchange.order.deliveryman",type = ExchangeTypes.DIRECT),//声明交换机,默认为DIRECT key = "key.order"//绑定key ),//声明绑定 @QueueBinding( value = @Queue(name = "queue.order"),//声明队列 exchange = @Exchange(name = "exchange.settlement.order",type = ExchangeTypes.FANOUT),//声明交换机,默认为DIRECT key = "key.order"//绑定key ),//声明绑定 @QueueBinding( value = @Queue(name = "queue.order"),//声明队列 exchange = @Exchange(name = "exchange.order.reward",type = ExchangeTypes.TOPIC),//声明交换机,默认为DIRECT key = "key.order"//绑定key ),//声明绑定 } )// 联合使用 // @RabbitListener(containerFactory = "rabbitListenerContainerFactory", queues = "queue.order")// 放在方法上直接监听使用 // @RabbitHandler(isDefault = true) // public void handleMessage(byte[] messageBody) { // public void handleMessage(OrderMessageDTO orderMessageDTO) { public void handleMessage(@Payload Message message) {........}
2、RabbitTemplate
● 相比basicPublish,功能更加强大,能自动实现消息转换等功能
● RabbitTemplate与RestTemplate类似,使用了模板方法设计模式
● RabbitTemplate提供了丰富的功能,方便消息收发
● RabbitTemplate可以显式传入配置也可以隐式声明配置
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 开启消息返回机制,确认消息是否被正确路由
rabbitTemplate.setMandatory(true);
// 消息返回的回调
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("message:{},replyCode:{},replyText:{},exchange:{},routingKey:{}",message,replyCode,replyText,exchange,routingKey);
});
// 确认是否收到的回调
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
log.info("correlationData:{},ack:{},cause:{}",correlationData,ack,cause);
});
return rabbitTemplate;
}
3、RabbitAdmin
● 声明式提供队列、交换机、绑定关系的注册方法
● 甚至不需要显示的注册代码
3.1、创建方法:
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectFactory);
3.2、功能:(红色为常用)
● declareExchange:创建交换机
● deleteExchange:删除交换机
● declareQueue:创建队列
● deleteQueue:删除队列
● purgeQueue: 清空队列
● declareBinding: 新建绑定关系
● removeBinding: 删除绑定关系
● getQueueProperties: 查询队列属性
3.3、RabbitAdmin声明式配置
● 将Exchange、Queue、Binding声明为Bean
● 再将RabbitAdmin声明为Bean
● Exchange、Queue、Binding即可自动创建
优点:
● 将声明和创建工作分开,解耦多人工作
● 不需要显示声明,减少代码量,减少BUG
以下为创建配置类代码,全部交由Spring管理;
同样可通过此方法达到不修改源代码增加交换机、队列、绑定关系等目的
@Configuration public class RabbitConfig { private final OrderMessageService orderMessageService; public RabbitConfig(OrderMessageService orderMessageService) { this.orderMessageService = orderMessageService; } @Autowired public void startListenMessage() throws IOException, InterruptedException, TimeoutException { orderMessageService.handleMessage(); } /*-------ConnectionFactory------*/ @Bean public ConnectionFactory connectionFactory(){ CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.createConnection(); return connectionFactory; } /*-------RabbitAdmin------*/ @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){ RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); rabbitAdmin.setAutoStartup(true); return rabbitAdmin; } /*-------restaurant------*/ @Bean public Exchange exchange1() { return new DirectExchange("exchange.order.restaurant"); } @Bean public Queue queue1() { return new Queue("queue.order"); } @Bean public Binding binding1() { return new Binding( "queue.order", Binding.DestinationType.QUEUE, "exchange.order.restaurant", "key.order", null ); } /*-------deliveryman------*/ @Bean public Exchange exchange2() { return new DirectExchange("exchange.order.deliveryman"); } @Bean public Binding binding2() { return new Binding( "queue.order", Binding.DestinationType.QUEUE, "exchange.order.deliveryman", "key.order", null ); } /*-------settlement------*/ @Bean public Exchange exchange3() { return new FanoutExchange("exchange.order.settlement"); } @Bean public Exchange exchange4() { return new FanoutExchange("exchange.settlement.order"); } @Bean public Binding binding3() { return new Binding( "queue.order", Binding.DestinationType.QUEUE, "exchange.settlement.order", "key.order", null ); } /*-------reward------*/ @Bean public Exchange exchange5() { return new TopicExchange("exchange.order.reward"); } @Bean public Binding binding4() { return new Binding( "queue.order", Binding.DestinationType.QUEUE, "exchange.order.reward", "key.order", null ); } // 通过RabbitAdmin手动声明 // @Autowired // public void initRabbit() { // CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); // connectionFactory.setHost("127.0.0.1"); // connectionFactory.setPort(5672); // connectionFactory.setPassword("guest"); // connectionFactory.setUsername("guest"); // // RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); // // /*---------------------restaurant---------------------*/ // Exchange exchange = new DirectExchange("exchange.order.restaurant"); // rabbitAdmin.declareExchange(exchange); // // Queue queue = new Queue("queue.order"); // rabbitAdmin.declareQueue(queue); // // Binding binding = new Binding( // "queue.order", // Binding.DestinationType.QUEUE, // "exchange.order.restaurant", // "key.order", // null); // // rabbitAdmin.declareBinding(binding); // // /*---------------------deliveryman---------------------*/ // exchange = new DirectExchange("exchange.order.deliveryman"); // rabbitAdmin.declareExchange(exchange); // binding = new Binding( // "queue.order", // Binding.DestinationType.QUEUE, // "exchange.order.deliveryman", // "key.order", // null); // rabbitAdmin.declareBinding(binding); // // // /*---------settlement---------*/ // exchange = new FanoutExchange("exchange.order.settlement"); // rabbitAdmin.declareExchange(exchange); // exchange = new FanoutExchange("exchange.settlement.order"); // rabbitAdmin.declareExchange(exchange); // binding = new Binding( // "queue.order", // Binding.DestinationType.QUEUE, // "exchange.order.settlement", // "key.order", // null); // rabbitAdmin.declareBinding(binding); // // // /*--------------reward----------------*/ // exchange = new TopicExchange("exchange.order.reward"); // rabbitAdmin.declareExchange(exchange); // binding = new Binding( // "queue.order", // Binding.DestinationType.QUEUE, // "exchange.order.reward", // "key.order", // null); // rabbitAdmin.declareBinding(binding); } }
原理解析:
主要是实现了 ApplicationContextAware 和 InitializingBean 两个接口
① 通过 ApplicationContextAware 的 setApplicationContext(ApplicationContext applicationContext)方法可以获取到 Spring 容器中的所有内容
② 实现 InitializingBean 中的 afterPropertiesSet() 方法相当于这个Bean被自动声明,然后处理好放进spring容器的时候这个方法就会被自动调用
在 afterPropertiesSet() 方法中 通过 initializing.compareAndSet(false, true) 设置为状态初始化 ,然后再将整个lambda表达式添加到 addConnectionListener() 中
@Override public void afterPropertiesSet() { this.connectionFactory.addConnectionListener(connection -> { if (!initializing.compareAndSet(false, true)) { // If we are already initializing, we don't need to do it again... return; } try { /* * ...but it is possible for this to happen twice in the same ConnectionFactory (if more than * one concurrent Connection is allowed). It's idempotent, so no big deal (a bit of network * chatter). In fact it might even be a good thing: exclusive queues only make sense if they are * declared for every connection. If anyone has a problem with it: use auto-startup="false". */ if (this.retryTemplate != null) { this.retryTemplate.execute(c -> { initialize(); return null; }); }else { initialize(); } }finally { initializing.compareAndSet(true, false); } }); }
其中initialize()找到并注册所有的交换机、队列、绑定关系、消费者
@Override // NOSONAR complexity public void initialize() { if (this.applicationContext == null) { this.logger.debug("no ApplicationContext has been set, cannot auto-declare Exchanges, Queues, and Bindings"); return; } this.logger.debug("Initializing declarations"); Collection<Exchange> contextExchanges = new LinkedList<Exchange>( this.applicationContext.getBeansOfType(Exchange.class).values()); Collection<Queue> contextQueues = new LinkedList<Queue>( this.applicationContext.getBeansOfType(Queue.class).values()); Collection<Binding> contextBindings = new LinkedList<Binding>( this.applicationContext.getBeansOfType(Binding.class).values()); Collection<DeclarableCustomizer> customizers = this.applicationContext.getBeansOfType(DeclarableCustomizer.class).values(); processDeclarables(contextExchanges, contextQueues, contextBindings); final Collection<Exchange> exchanges = filterDeclarables(contextExchanges, customizers); final Collection<Queue> queues = filterDeclarables(contextQueues, customizers); final Collection<Binding> bindings = filterDeclarables(contextBindings, customizers); for (Exchange exchange : exchanges) { if ((!exchange.isDurable() || exchange.isAutoDelete()) && this.logger.isInfoEnabled()) { this.logger.info("Auto-declaring a non-durable or auto-delete Exchange (" + exchange.getName() + ") durable:" + exchange.isDurable() + ", auto-delete:" + exchange.isAutoDelete() + ". " + "It will be deleted by the broker if it shuts down, and can be redeclared by closing and " + "reopening the connection."); } } for (Queue queue : queues) { if ((!queue.isDurable() || queue.isAutoDelete() || queue.isExclusive()) && this.logger.isInfoEnabled()) { this.logger.info("Auto-declaring a non-durable, auto-delete, or exclusive Queue (" + queue.getName() + ") durable:" + queue.isDurable() + ", auto-delete:" + queue.isAutoDelete() + ", exclusive:" + queue.isExclusive() + ". " + "It will be redeclared if the broker stops and is restarted while the connection factory is " + "alive, but all messages will be lost."); } } if (exchanges.size() == 0 && queues.size() == 0 && bindings.size() == 0) { this.logger.debug("Nothing to declare"); return; } this.rabbitTemplate.execute(channel -> { declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()])); declareQueues(channel, queues.toArray(new Queue[queues.size()])); declareBindings(channel, bindings.toArray(new Binding[bindings.size()])); return null; }); this.logger.debug("Declarations finished"); }
通过 addConnectionListener(ConnectionListener listener) 能够得知一但有connection连接的时候,就会调用lambd实现方法,然后通过上述流程完成了构建。
@Override
public void addConnectionListener(ConnectionListener listener) {
this.connectionListener.addDelegate(listener);
if (this.publisherConnectionFactory != null) {
this.publisherConnectionFactory.addConnectionListener(listener);
}
}
4、Spring Boot Config
● 充分发挥 Spring Boot 约定大于配置的特性
● 可以隐式建立 Connection、Channel
spring:
rabbitmq:
addresses: localhost
port: 5672
username: guest
password: guest
listener:
direct:
acknowledge-mode: auto #设置为自动应答
总结:
● Spring Boot提供了易用的RabbitMQ连接方式
● Spring Boot提供了方便的RabbitMQ配置方式
● Spring Boot提供了完善的消息收发方式
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。