当前位置:   article > 正文

RabbitMq生产者发送消息确认_channel error; protocol method: #method

channel error; protocol method: #method(reply-code=404, reply

RabbitMq生产者发送消息失败现象

一般情况下RabbitMq的生产者能够正常的把消息投递到交换机Exchange,Exchange能够根据路由键routingKey把消息投递到队列Queue,但是一旦出现消息无法投递到交换机Exchange,或无法路由到Queue的这种特殊情况下,则需要对生产者的消息进行缓存或者保存到数据库,后续在调查完RabbitMq服务器的问题之后,待RabbitMq服务器正常之后,需要对这些消息进行重新投递。正常来说RabbitMq做了集群之后是不会出现这种问题,整个集群挂断的概率也是非常小。

错误信息

当项目启动后,然后把交换机Exchange删除后,然后生产者发送消息时会提示交换机不存在。Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirm_exchange' in vhost '/', class-id=60, method-id=40)

SpringBoot代码示例

SpringBoot的application.properties需要新增spring.rabbitmq.publisher-confirm-type配置要求值是correlated。默认值是none表示无需触发交换机收到消息的回调接口。correlated表示消息发布后会触发交换机收到消息的回调接口。

  1. # springboot整合rabbitMq的配置
  2. spring.rabbitmq.host=192.168.15.200
  3. spring.rabbitmq.port=5672
  4. spring.rabbitmq.username=admin
  5. spring.rabbitmq.password=123
  6. spring.rabbitmq.publisher-confirm-type=correlated

队列和交换机配置类

  1. package springbootrabbitmq.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.beans.factory.annotation.Qualifier;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.stereotype.Component;
  6. @Component
  7. public class ConfirmConfig {
  8. // 普通交换机名称
  9. public static final String EXCHANGE_NAME = "confirm_exchange";
  10. // 队列名称
  11. public static final String QUEUE_NAME = "confirm_queue";
  12. public static final String ROUTING_KEY = "key1";
  13. @Bean("confirmExchange")
  14. public DirectExchange confirmExchange() {
  15. return new DirectExchange(EXCHANGE_NAME);
  16. }
  17. @Bean("confirmQueue")
  18. public Queue confirmQueue() {
  19. return QueueBuilder.durable(QUEUE_NAME).build();
  20. }
  21. @Bean
  22. public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue, @Qualifier("confirmExchange") DirectExchange confirmExchange) {
  23. return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(ROUTING_KEY);
  24. }
  25. }

生产者消息发送确认配置类

  1. package springbootrabbitmq.config;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.rabbit.connection.CorrelationData;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.stereotype.Component;
  7. import javax.annotation.PostConstruct;
  8. // 1.先实现RabbitTemplate.ConfirmCallback接口,从写confirm回调函数
  9. @Slf4j
  10. @Component
  11. public class RabbitMqCallBack implements RabbitTemplate.ConfirmCallback {
  12. // 2.注入
  13. @Autowired
  14. private RabbitTemplate rabbitTemplate;
  15. /**
  16. *
  17. * @param correlationData 消息
  18. * @param b 发送成功是true,失败是false
  19. * @param s 发送失败时的原因
  20. */
  21. @Override
  22. public void confirm(CorrelationData correlationData, boolean b, String s) {
  23. String id = correlationData != null ? correlationData.getId() : "";
  24. if (b) {
  25. log.info("交换机已经收到id为{}的消息", id);
  26. } else {
  27. log.error("交换机未收到id为{}的消息, 原因是:{}", id, s);
  28. // 消息缓存或入库,邮件提醒运维
  29. }
  30. }
  31. // 3.然后在springBoot对象初始化之后再执行rabbitTemplate.setConfirmCallback(this);设置回调函数,避免使用默认的ConfirmCallback
  32. @PostConstruct
  33. public void init() {
  34. rabbitTemplate.setConfirmCallback(this);
  35. }
  36. }

生产者类

  1. package springbootrabbitmq.controller;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.AmqpException;
  4. import org.springframework.amqp.core.Message;
  5. import org.springframework.amqp.core.MessagePostProcessor;
  6. import org.springframework.amqp.rabbit.connection.CorrelationData;
  7. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.web.bind.annotation.GetMapping;
  10. import org.springframework.web.bind.annotation.PathVariable;
  11. import org.springframework.web.bind.annotation.RequestMapping;
  12. import org.springframework.web.bind.annotation.RestController;
  13. import springbootrabbitmq.config.ConfirmConfig;
  14. import springbootrabbitmq.config.TtlQueueConfig;
  15. import java.util.Date;
  16. @Slf4j
  17. @RestController
  18. @RequestMapping("/confirm")
  19. public class ConfirmController {
  20. @Autowired
  21. private RabbitTemplate rabbitTemplate;
  22. @GetMapping("/sendMsg/{message}")
  23. public String sendMsg(@PathVariable String message) {
  24. log.info("当前时间:{}, 发送一条消息:{} 到队列", new Date().toString(), message);
  25. rabbitTemplate.convertAndSend(ConfirmConfig.EXCHANGE_NAME, ConfirmConfig.ROUTING_KEY, message);
  26. return "success";
  27. }
  28. @GetMapping("/sendMsg2/{message}")
  29. public String sendMsg2(@PathVariable String message) {
  30. log.info("当前时间:{}, 发送一条消息:{} 到队列", new Date().toString(), message);
  31. CorrelationData data = new CorrelationData();
  32. data.setId("1111");
  33. rabbitTemplate.convertAndSend(ConfirmConfig.EXCHANGE_NAME, ConfirmConfig.ROUTING_KEY, message, data);
  34. return "success";
  35. }
  36. }

消费者类

  1. package springbootrabbitmq.consumer;
  2. import com.rabbitmq.client.Channel;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.core.Message;
  5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  6. import org.springframework.stereotype.Component;
  7. import springbootrabbitmq.config.ConfirmConfig;
  8. import java.util.Date;
  9. @Component
  10. @Slf4j
  11. public class ConfirmConsumer {
  12. //监听器接收消息
  13. @RabbitListener(queues = ConfirmConfig.QUEUE_NAME)
  14. public void receiveD(Message message, Channel channel) {
  15. String msg = new String(message.getBody());
  16. log.info("当前时间:{}, 收到一条消息:{} ", new Date().toString(), msg);
  17. }
  18. }

首先正常发送,然后再删除交换机然后再发送。测试结果如下

  1. 2023-01-29 21:07:12.367 INFO 79848 --- [nio-8080-exec-1] s.controller.ConfirmController : 当前时间:Sun Jan 29 21:07:12 CST 2023, 发送一条消息:12 到队列
  2. 2023-01-29 21:07:12.399 INFO 79848 --- [nectionFactory1] s.config.RabbitMqCallBack : 交换机已经收到id为1111的消息
  3. 2023-01-29 21:07:12.403 INFO 79848 --- [ntContainer#0-1] s.consumer.ConfirmConsumer : 当前时间:Sun Jan 29 21:07:12 CST 2023, 收到一条消息:12
  4. 2023-01-29 21:08:01.282 INFO 79848 --- [nio-8080-exec-2] s.controller.ConfirmController : 当前时间:Sun Jan 29 21:08:01 CST 2023, 发送一条消息:123 到队列
  5. 2023-01-29 21:08:01.289 ERROR 79848 --- [168.15.200:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirm_exchange' in vhost '/', class-id=60, method-id=40)
  6. 2023-01-29 21:08:01.290 ERROR 79848 --- [nectionFactory2] s.config.RabbitMqCallBack : 交换机未收到id为1111的消息, 原因是:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirm_exchange' in vhost '/', class-id=60, method-id=40)

消息回退

如果不开启消息回退,默认是消息即使无法发送到队列(如路由键错误等场景),也不会进行提醒,生产者不知道消息能否成功发送到队列。

解决方案

当消息无法到达队列的时候进行提醒

消息回退代码示例

配置,开启消息不可达目的地时的回调

spring.rabbitmq.publisher-returns=true

配置类,实现RabbitTemplate.ReturnCallback接口

  1. package springbootrabbitmq.config;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.connection.CorrelationData;
  5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.stereotype.Component;
  8. import javax.annotation.PostConstruct;
  9. // 1.先实现RabbitTemplate.ConfirmCallback接口,从写confirm回调函数
  10. @Slf4j
  11. @Component
  12. public class RabbitMqCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
  13. // 2.注入
  14. @Autowired
  15. private RabbitTemplate rabbitTemplate;
  16. /**
  17. *
  18. * @param correlationData 消息
  19. * @param b 发送成功是true,失败是false
  20. * @param s 发送失败时的原因
  21. */
  22. @Override
  23. public void confirm(CorrelationData correlationData, boolean b, String s) {
  24. String id = correlationData != null ? correlationData.getId() : "";
  25. if (b) {
  26. log.info("交换机已经收到id为{}的消息", id);
  27. } else {
  28. log.error("交换机未收到id为{}的消息, 原因是:{}", id, s);
  29. // 消息缓存或入库,邮件提醒运维
  30. }
  31. }
  32. // 3.然后在springBoot对象初始化之后再执行rabbitTemplate.setConfirmCallback(this);设置回调函数,避免使用默认的ConfirmCallback
  33. @PostConstruct
  34. public void init() {
  35. rabbitTemplate.setConfirmCallback(this);
  36. rabbitTemplate.setReturnCallback(this);
  37. }
  38. // 当消息传递过程中不可达到目的地时将消息返回给生产者,只有不可达到目的地时才会调用这个方法
  39. @Override
  40. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
  41. log.error("消息无法被写入队列:{}, 退回原因:{}, 路由Key: {}", message, replyText, routingKey);
  42. // 邮件发送,缓存或存到数据库
  43. }
  44. }

生产者

  1. @GetMapping("/sendMsg3/{message}")
  2. public String sendMsg3(@PathVariable String message) {
  3. log.info("当前时间:{}, 发送一条消息:{} 到队列", new Date().toString(), message);
  4. CorrelationData data = new CorrelationData();
  5. data.setId("1111");
  6. rabbitTemplate.convertAndSend(ConfirmConfig.EXCHANGE_NAME, ConfirmConfig.ROUTING_KEY, message, data);
  7. rabbitTemplate.convertAndSend(ConfirmConfig.EXCHANGE_NAME, ConfirmConfig.ROUTING_KEY+"222", message +"222", data);
  8. return "success";
  9. }

消费者与上一个消费者相同

测试结果如下:调用:http://127.0.0.1:8080/confirm/sendMsg3/123生产者的接口可以看到当路由键错误导致交换机无法把消息投递到队列时会回调returnedMessage方法。

  1. 2023-01-29 21:27:48.910 INFO 74512 --- [nio-8080-exec-1] s.controller.ConfirmController : 当前时间:Sun Jan 29 21:27:48 CST 2023, 发送一条消息:123 到队列
  2. 2023-01-29 21:27:48.934 INFO 74512 --- [nectionFactory1] s.config.RabbitMqCallBack : 交换机已经收到id为1111的消息
  3. 2023-01-29 21:27:48.941 ERROR 74512 --- [nectionFactory1] s.config.RabbitMqCallBack : 消息无法被写入队列:(Body:'123222' MessageProperties [headers={spring_returned_message_correlation=1111}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), 退回原因:NO_ROUTE, 路由Key: key1222
  4. 2023-01-29 21:27:48.943 INFO 74512 --- [nectionFactory2] s.config.RabbitMqCallBack : 交换机已经收到id为1111的消息
  5. 2023-01-29 21:27:48.946 INFO 74512 --- [ntContainer#0-1] s.consumer.ConfirmConsumer : 当前时间:Sun Jan 29 21:27:48 CST 2023, 收到一条消息:123
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/article/detail/36599
推荐阅读
相关标签
  

闽ICP备14008679号