赞
踩
概要:RabbitMQ中间件的引入对于整个系统来说是一把双刃剑,在对系统进行解耦的同时也降低了消息的可靠性,但是对于某些系统来说我们又必须保证我们的消息是不会丢失的,因此rabbitmq给提供了以下一些功能来保证消息的可靠性,本文我们主要讲解消息可靠性中的 发送端确认机制 以及 消费端确认机制,需要保证消费者、RabbitMQ自己和消费者都不能丢消息
RabbitMQ通过 publisher confirm 机制来实现的消息发送端确认。生产者将信道设置成confirm(确认)模式,一旦信道进入confirm 模式,所有在该信道上⾯面发布的消息都会被指派一个唯一的ID(从 1 开始),一旦消息被投递到所有匹配的队列之后 (如果消息和队列是持久化的,那么确认消息会在消息持久化后发出),RabbitMQ 就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID),这样生产者就知道消息已经正确送达了。
在生产者向rabbitmq发送消息的整个流程中,生产者首先是要将消息发送给 交换机,然后交换机 根据指定的 路由键 把消息路由到指定的消息队列中,然后消费者从对应的消息队列对消息进行消费,因此我们要实现生产端的消息确认就需要保证 消息发送到交换机 以及 交换机路由消息到队列 的时候消息是不会丢失的
<dependencies>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
server: port: 7001 #RabbitMQ spring: rabbitmq: username: admin password: admin host: 192.168.132.128 port: 5672 virtual-host: / publisher-returns: true listener: simple: concurrency: 10 #消费者数量 max-concurrency: 10 #最大消费者的数量 prefetch: 1 #限流(消费者每次从队列获取的消息数量) auto-startup: true #启动时自动启动容器 acknowledge-mode: manual #手动ack # retry: # enabled: true # max-attempts: 3 # 重试次数 # max-interval: 10000 # 重试最大间隔时间 # initial-interval: 2000 # 重试初始间隔时间 # multiplier: 2 # 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间 publisher-confirm-type: correlated # 开启确认机制/老版 publisher-confirms: true
package edu.hunan.rabbitmq.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class DirectConfig { //声明注册Direct交换机模式 @Bean public DirectExchange directExchange(){ return new DirectExchange("direct_exchange"); } //声明队列 @Bean public Queue directQueue(){ /** * 参数详解 * durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效 * exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable * autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。 * 一般设置一下队列的持久化就好,其余两个就是默认false */ return new Queue("direct_queue",true); } @Bean public Queue directQueue2(){ /** * 参数详解 * durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效 * exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable * autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。 * 一般设置一下队列的持久化就好,其余两个就是默认false */ return new Queue("direct_queue2",true); } //交换机和队列绑定 @Bean public Binding directBinding(){ return BindingBuilder.bind(directQueue()).to(directExchange()).with("direct1"); } @Bean public Binding directBinding2(){ return BindingBuilder.bind(directQueue2()).to(directExchange()).with("direct2"); } }
package edu.hunan.rabbitmq.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class FanOutConfig { //声明注册FanOut交换机模式 @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("fanout_exchange"); } //声明队列 @Bean public Queue fanoutQueue(){ /** * 参数详解 * durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效 * exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable * autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。 * 一般设置一下队列的持久化就好,其余两个就是默认false */ return new Queue("fanout_queue",true); } //交换机和队列绑定 @Bean public Binding fanoutBinding(){ return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange()); } }
package edu.hunan.rabbitmq.service; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; @Service public class RabbitMQService { @Autowired private RabbitTemplate rabbitTemplate; /** * 被@PostConstruct修饰的方法会在服务器加载Servlet的时候运行,并且只会被服务器执行一次。 */ @PostConstruct public void init(){ /** * 消息投递到交换机后触发回调 * 使用该功能需要开启确认,spring-boot中配置如下: * publisher-confirm-type: correlated # 开启确认机制/老版 publisher-confirms: true */ rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if(ack){ System.err.println("消息投递成功,消息已确认->"+cause+"\t"+correlationData); }else{ /** * 如果消息投递失败需要设置重发 如果一直重发失败投递到死信队列中/数据库 进行手动排查 * 重发数据库+定时任务实现 */ System.err.println("消息投递失败,消息未确认->"+cause+"\t"+correlationData); } } }); /** * 通过实现ReturnsCallback接口 * 如果消息从交换机投递到队列中失败时触发 * 比如根据发送消息指定Routingkey找不到队列时触发 * 使用该功能需要开启确认,spring-boot中配置如下: * spring.rabbitmq.publisher-returns = true */ rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returned) { //需要将接收失败的保存到DB中并且手动排错 System.err.println("队列接收消息失败,消息被退回"+returned); } }); } }
package edu.hunan.rabbitmq.service; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** * 生产者 */ @Service public class DirectServiceImpl { @Autowired private RabbitTemplate rabbitTemplate; public void make(String message) { //交换机名称 String exchangeName = "direct_exchange"; //如果有多个队列 通过routingkey 投递到指定的队列 String routingkey ="direct1"; //投递到交换机中 System.err.println("生产者准备开始投递消息"); rabbitTemplate.convertAndSend(exchangeName,routingkey,message); } }
package edu.hunan.rabbitmq.service; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Service; import java.io.IOException; /** * 消费者 */ @Service @RabbitListener(queues = "direct_queue") public class ConsumerDirectService { @RabbitHandler public void reviceMessage(@Payload String Message2, @Headers Channel channel, Message message) throws IOException, InterruptedException { System.err.println("测试异步不影响前端返回"); /** * 接收失败设置重试 拒绝当前消息,并把消息返回原队列 都设置false表示拒绝重发 消息则丢失 * 方式1 使用自带的重试机制并且控制重试次数 重试机制不能用try/catch否则会死循环 而是将异常抛出 * 触发重试机制需要消费者抛出异常,而不能try/catch捕捉异常,不然会死循环。 * 方式2 将接收失败的消息使用死行队列接盘 死行队列+try/catch */ try { //处理消息 System.err.println("消费者消息接收成功-》"+Message2); // 确认消息已经消费成功 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { System.err.println("消费处理异常"+userId+"\t"+e); // 拒绝当前消息,并把消息返回原队列 都设置false表示拒绝重发 /** * 接收失败设置重试 拒绝当前消息,并把消息返回原队列 都设置false表示拒绝重发 消息则丢失 * 方式1 使用自带的重试机制并且控制重试次数 重试机制不能用try/catch否则会死循环 而是将异常抛出 * 触发重试机制需要消费者抛出异常,而不能try/catch捕捉异常,不然会死循环。 * 方式2 将接收失败的消息使用死信队列接盘 死信队列+try/catch */ channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。