赞
踩
当queue中的消息无法被消费时,消息成为死信,产生条件如下三个:
basic.reject
拒绝 或 basic.nack
否定)并且requeue = false
不重新入队失败消息如何转发到(绑定到)死信队列
应用场景举例:
但是对于大数据量、时效性强的场景,需要用到延迟队列
延迟功能有保障(时间要求严格)
不会像轮训一样一瞬间处理太多
RabbitMQ本身是不支持延迟队列的,如果不使用插件,就只能改造死信队列————C1消费着直接不存在,消息到期TTL之后一定会进入死信队列进行处理
延迟队列的实现分为“死信队列实现”和“插件直接实现”,结构图分别为:
注意:由于队列的先进先出特性,只有当过期的消息到了队列的顶端(队首),才会被真正的丢弃或者进入死信队列。
如果遇到不同的任务类型需要不同的延时的话,需要为每一种不同延迟时间的消息建立单独的消息队列。
下面两个
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.8.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
spring.rabbitmq.host=124...
#spring.rabbitmq.port=5672 默认
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
server.port=8080
左边的YD要与右边的YD相同(routingkey保证相同才能正确转发消息)
@Configuration
public class DeadConfig {
// 1.交换机
public static final String X_EXCHANGE = "X";
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
// 2. 队列
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
public static final String DEAD_LETTER_QUEUE = "QD";
}
@Bean("xExchange")
public DirectExchange xExchange(){
return new DirectExchange(X_EXCHANGE);
}
@Bean("yExchange")
public DirectExchange yExchange(){
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
关于ttl,实际环境尽量不要写在队列声明期间,而应写在生产者send的时候
@Bean("queueA")//10s过期时间 public Queue queueA(){//AMQP包下的Queue //绑定参数————转发到死信交换机 /* 原始写法 HashMap<String, Object> arg = new HashMap<>(); arg.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE); arg.put("x-dead-letter-routing-key","YD"); arg.put("x-message-ttl",10000); return QueueBuilder. durable(QUEUE_A). withArguments(arg). build(); *///SpringBoot写法 return QueueBuilder. durable(QUEUE_A). deadLetterExchange(Y_DEAD_LETTER_EXCHANGE). deadLetterRoutingKey("YD"). ttl(10000). build(); } @Bean("queueB")//40s过期时间 public Queue queueB(){ return QueueBuilder. durable(QUEUE_B). deadLetterExchange(Y_DEAD_LETTER_EXCHANGE). deadLetterRoutingKey("YD"). ttl(40000). build(); } @Bean("queueD")//死信队列 public Queue queueD(){ return QueueBuilder.durable(DEAD_LETTER_QUEUE).build(); }
@Bean //参数:QA xExchange
public Binding QAbindX(@Qualifier("queueA") Queue QA, @Qualifier("xExchange") DirectExchange X){
return BindingBuilder.bind(QA).to(X).with("XA");
}
@Bean
public Binding QBbindX(@Qualifier("queueB") Queue QB, @Qualifier("xExchange") DirectExchange X){
return BindingBuilder.bind(QB).to(X).with("XB");
}
@Bean
public Binding QDbindY(@Qualifier("queueD") Queue QD, @Qualifier("yExchange") DirectExchange Y){
return BindingBuilder.bind(QD).to(Y).with("YB");
}
@RestController
@RequestMapping("ttl")
public class ProducerController {
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("send")
public void sendA(String msg){
System.out.println("生产者发送消息:" + new Date().toString());
rabbitTemplate.convertAndSend("X","XA","10s延迟");
rabbitTemplate.convertAndSend("X","XB","40s延迟");
}
}
消费者其实只需要用@RabbitListener
进行监听即可,而不是做成一个接口
@Service
//@Component也行
public class ConsumerService {
@RabbitListener(queues = "QD")//监听QD队列消息(QD是死信队列)
public void receiveD(Message message , Channel channel) {//导入AMQP包
System.out.println("收到死信队列消息:" + new String(message.getBody()) + new Date().toString());
}
}
具体百度RabbitMQ延迟队列插件,官网下载,当安装好后可以在web界面看到多了一种交换机————由此可知,插件是通过交换机实现延迟的
因为基于插件的延迟队列只有交换机不同,所以只需要配置交换机
而新的交换机在Spring中没有对应的API,所以需要用自定义交换机CustomExchange
@Bean public Queue delayQueue(){ return new Queue("delayQueue"); } @Bean public CustomExchange delayExchange(){ //只能用原生写法,没有Spring提供的API HashMap<String, Object> arg = new HashMap<>(); arg.put("x-delayed-type","direct"); return new CustomExchange("dExchange","x-delayed-message",true,false,arg); } @Bean public Binding delayBinding(@Qualifier("delayQueue") Queue QC, @Qualifier("delayExchange") CustomExchange cExchange){ return BindingBuilder. bind(QC). to(cExchange). with("delayRoutingKey"). noargs(); }
@GetMapping("delay")
public void delaySent(String msg){
System.out.println("生产者发送消息:" + new Date().toString());
rabbitTemplate.convertAndSend("dExchange","delayRoutingKey",msg,m -> {
m.getMessageProperties().setDelay(5000);//5秒延迟
return m;
});
}
@RabbitListener(queues = "delayQueue")
public void delayListenner(Message message , Channel channel){
System.out.println("收到死信队列消息:" + new String(message.getBody()) + new Date().toString());
}
延迟队列的选择还有:Java中的DelayQueue、Quartz;Redis中的zset;Kafka的时间轮。但RabbitMQ的方案最全面
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。