赞
踩
目录
在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败, 导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢? 特别是在这样比较极端的情况,RabbitMQ 集群不可用的时候,无法投递的消息该如何处理呢:?
原理上的架构:
实际代码的架构:
1.首先配置application.yml,加上publish-confirm-type : correlated
NONE 禁用发布确认模式,是默认值
CORRELATED 发布消息成功到交换机后会触发回调方法
SIMPLE 有两种效果:
其一效果和 CORRELATED 值一样会触发回调方法, 其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法 等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker
另外,还要添加一个属性:publisher-returns: true
- spring:
- rabbitmq:
- host: 192.168.80.128
- port: 5672
- username: admin
- password: 123
- publisher-confirm-type: correlated
- publisher-returns: true
2.配置类
- @Configuration
- public class ConfirmConfig {
- public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
- public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
-
- //声明交换机
- @Bean("confirmExchange")
- public DirectExchange confirmExchange(){
- return new DirectExchange(CONFIRM_EXCHANGE_NAME);
- }
- //声明队列
- @Bean("confirmQueue")
- public Queue confirmQueue(){
- return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
- }
- //声明绑定关系
- @Bean
- public Binding binding(@Qualifier("confirmQueue") Queue queue,@Qualifier("confirmExchange") DirectExchange exchange){
- return BindingBuilder.bind(queue).to(exchange).with("key1");
- }
- }

3.回调配置类
- @Component
- public class MyConfirmCallBack implements RabbitTemplate.ConfirmCallback {
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String reason) {
- String id;
- if(correlationData!=null) id= correlationData.getId();
- else id="";
- if(ack) System.out.println("已收到id为"+id+"的消息");
- else System.out.println("未收到id为"+id+"的消息,原因是:"+reason);
- }
- }
4.生产者
- @RestController
- public class Producer {
- public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
- @Autowired
- private RabbitTemplate rabbitTemplate;
- @Autowired
- private MyConfirmCallBack callBack;
- //依赖注入 rabbitTemplate 之后再设置它的回调对象
- @PostConstruct
- public void init(){
- rabbitTemplate.setConfirmCallback(callBack);
- }
-
- @GetMapping("sendMessage/{message}")
- public void sendMessage(@PathVariable String message){
- //指定消息 id 为 1
- CorrelationData correlationData1=new CorrelationData("1");
- String routingKey="key1";
- rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData1);
-
- //指定消息 id 为 2 (这条消息应该消费者接收不到)
- CorrelationData correlationData2=new CorrelationData("2");
- routingKey="key2";
- rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData2);
- System.out.println("发送消息内容:"+message);
- }
-
- }

5.消费者
- @Component
- public class Consumer {
- public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
- @RabbitListener(queues =CONFIRM_QUEUE_NAME)
- public void receiveMsg(Message message){
- String msg=new String(message.getBody());
- System.out.println("接受到队列 confirm.queue 消息:"+msg);;
- }
- }
6.测试结果
- 发送消息内容:我爱新世纪百货
- 已收到id为1的消息
- 已收到id为2的消息
- 接受到队列 confirm.queue 消息:我爱新世纪百货key1
可以看到,发送了两条消息,第一条消息的 RoutingKey 为 "key1",第二条消息的 RoutingKey 为 "key2",两条消息都成功被交换机接收,也收到了交换机的确认回调,但消费者只收到了一条消息,因为第二条消息的 RoutingKey 与队列的 BindingKey 不一致,也没有其它队列能接收这个消息,所有第二条消息被直接丢弃了。
在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。那么如何让无法被路由的消息帮我想办法处理一下?通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。
这里我们直接将回调函数的配置写在生产者内部了:
- @RestController
- public class Producer002 implements
- RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback{
- public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
- @Autowired
- private RabbitTemplate rabbitTemplate;
- //rabbitTemplate 注入之后就设置该值
- @PostConstruct
- public void init(){
- rabbitTemplate.setConfirmCallback(this);//发布的回调函数
- rabbitTemplate.setMandatory(true);//换机无法将消息进行路由时,会将该消息返回给生产者
- rabbitTemplate.setReturnCallback(this);//消息投递失败的回调函数
- }
-
- @GetMapping("sendMessage/{message}")
- public void sendMessage(@PathVariable String message){
- //指定消息 id 为 1
- CorrelationData correlationData1=new CorrelationData("1");
- String routingKey="key1";
- rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData1);
-
- //指定消息 id 为 2 (这条消息应该是发送失败的)
- CorrelationData correlationData2=new CorrelationData("2");
- routingKey="key2";
- rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData2);
- }
-
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String reason) {
- String id;
- if(correlationData!=null) id= correlationData.getId();
- else id="";
- if(ack) System.out.println("交换机已收到id为"+id+"的消息");
- else System.out.println("交换机未收到id为"+id+"的消息,原因是:"+reason);
- }
-
- @Override
- public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
- System.out.println("消息"+new String(message.getBody())+
- "被服务器退回,原因:"+replyText+",交换机:"+exchange+",路由:"+routingKey);
- }
- }

测试:
- 交换机已收到id为1的消息
- 消息-我爱新世纪百货key2 被服务器退回,原因:NO_ROUTE,交换机:confirm.exchange,路由:key2
- 交换机已收到id为2的消息
- 消费者接受到队列 confirm.queue 消息:我爱新世纪百货key1
有了 mandatory 参数和回退消息,我们获得了对无法投递消息的感知能力,有机会在生产者的消息无法被投递时发现并处理。但有时候,我们并不知道该如何处理这些无法路由的消息。如果既不想丢失消息,又不想增加生产者的复杂性,该怎么做呢?
备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时, 就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。
演示:
1.修改配置类
- @Configuration
- public class ConfirmConfig {
- public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
- public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
- public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
- public static final String BACKUP_QUEUE_NAME = "backup.queue";
- public static final String WARNING_QUEUE_NAME = "warning.queue";
- // 声明确认队列
- @Bean("confirmQueue")
- public Queue confirmQueue(){
- return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
- }
- //声明确认队列绑定关系
- @Bean
- public Binding queueBinding(@Qualifier("confirmQueue") Queue queue,
- @Qualifier("confirmExchange") DirectExchange exchange){
- return BindingBuilder.bind(queue).to(exchange).with("key1");
- }
- //声明备份 Exchange
- @Bean("backupExchange")
- public FanoutExchange backupExchange(){
- return new FanoutExchange(BACKUP_EXCHANGE_NAME);
- }
- //声明确认 Exchange 交换机的备份交换机
- @Bean("confirmExchange")
- public DirectExchange confirmExchange(){
- ExchangeBuilder exchangeBuilder =
- ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
- .durable(true)
- //设置该交换机的备份交换机
- .withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME);
- return (DirectExchange)exchangeBuilder.build();
- }
- // 声明警告队列
- @Bean("warningQueue")
- public Queue warningQueue(){
- return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
- }
- // 声明报警队列绑定关系
- @Bean
- public Binding warningBinding(@Qualifier("warningQueue") Queue queue,
- @Qualifier("backupExchange") FanoutExchange
- backupExchange){
- return BindingBuilder.bind(queue).to(backupExchange);
- }
- // 声明备份队列
- @Bean("backQueue")
- public Queue backQueue(){
- return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
- }
- // 声明备份队列绑定关系
- @Bean
- public Binding backupBinding(@Qualifier("backQueue") Queue queue,
- @Qualifier("backupExchange") FanoutExchange backupExchange){
- return BindingBuilder.bind(queue).to(backupExchange);
- }
- }

2.创建报警消费者
- @Component
- public class WarningConsumer {
- public static final String WARNING_QUEUE_NAME = "warning.queue";
- @RabbitListener(queues = WARNING_QUEUE_NAME)
- public void receiveWarningMsg(Message message) {
- String msg = new String(message.getBody());
- System.out.println("报警发现不可路由消息:"+msg);
- }
- }
3.测试(先删除原来的exchange)
- 交换机已收到id为1的消息
- 交换机已收到id为2的消息
- 报警发现不可路由消息:我爱新世纪百货key2
- 接受到队列 confirm.queue 消息:我爱新世纪百货key1
当mandatory 参数与备份交换机一起使用的时候,备份交换机优先级高。
什么是幂等性?对同一个系统,使用同样的条件,一次请求和重复的多次请求对系统资源的影响是一致的。
消费的重复消费:消费者在消费 MQ 中的消息时,MQ 已把消息发送给消费者,消费者在给 MQ 返回 ack 时网络中断, 故 MQ 未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。
解决思路:MQ 消费者的幂等性的解决一般使用全局 ID 或者写个唯一标识比如时间戳 或者 UUID
业界主流的幂等性有两种操作:
a. 唯一 ID+指纹码机制,利用数据库主键去重,
b.利用 redis 的原子性去实现(setnx 命令,天然具有幂等性)
RabbitMQ 从 3.6.0 版本开始引入了惰性队列的概念。惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。
默认情况下,当生产者将消息发送到 RabbitMQ 的时候,队列中的消息会尽可能的存储在内存之中, 这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份。
队列具备两种模式:default 和 lazy。默认为 default 模式,在 3.6.0 之前的版本无需做任何变更。lazy 模式即为惰性队列的模式,可以通过调用 channel.queueDeclare 方法的时候在参数中设置, 在队列声明的时候可以通过“x-queue-mode”参数来设置队列的模式,取值为“default”和“lazy”。下面示例中演示了一个惰性队列的声明细节:
- Map args = new HashMap();
- args.put("x-queue-mode", "lazy");
- channel.queueDeclare("myqueue", false, false, false, args);
为什么在集群中使用镜像队列?
如果 RabbitMQ 集群中只有一个 Broker节点,那么该节点的失效将导致整体服务的临时性不可用,并且也可能会导致消息丢失。可以将所有消息都设置为持久化,并且对应队列的durable属性也设置为true, 但是这样仍然无法避免由于缓存导致的问题:因为消息在发送之后和被写入磁盘井执行刷盘动作之间存在一个短暂却会产生问题的时间窗。通过 publisher-confirm 机制能够确保客户端知道哪些消息己经存入磁盘, 尽管如此,一般不希望遇到因单点故障导致的服务不可用。
引入镜像队列(Mirror Queue)的机制,可以将队列镜像到集群中的其他 Broker 节点之上,如果集群中的一个节点失效了,队列能自动切换到镜像中的另一个节点上以保证服务的可用性。(就相当于复制了好几份)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。