当前位置:   article > 正文

重启rabbitmq后,队列中没有消费者的原因_rabbitmq没有消费者

rabbitmq没有消费者

问题:

      上次生产服务器出问题,导致上面的rabbitmq主节点和一系列应用服务宕机。运维先重启应用服务(消费者)后再启动MQ主节点后发现,主节点队列上堆积的消息没有被消费,点进队列一看,没有消费者!

场景复现:

        在开发环境搭建mq集群,模拟生产场景。

场景重现:

        启动消费者应用,可以发现日志报错信息如下:

解释一下意思就是: 消费者发现没有要监听的 queue 时,默认会进行三次重试监听 queue,三次都失败后就无法重试了(三次时间很短,默认10s一次)

此时就算你重启了MQ,但是消费者不会再去监听mq对应队列了,所以MQ的队列上就不会有消费者信息了。

解决办法:

spring通过发布事件的方式,可以通知观察者(即事件监听器)消费者的一些行为,消费者相关的事件如下所示。

  • AsyncConsumerStartedEvent:An event that is published whenever a new consumer is started.
  • AsyncConsumerStoppedEvent:An event that is published whenever a consumer is stopped (and not restarted).
  • AsyncConsumerRestartedEvent:An event that is published whenever a consumer is restarted.
  • ListenerContainerConsumerFailedEvent:Published when a listener consumer fails.

基于事件机制,可以通过监听事件ListenerContainerConsumerFailedEvent,当有消费者发生致命错误时,重新创建消费者消费消息,并发送告警信息给相关责任人。具体实现如下:

  1. import java.util.Arrays;
  2. import org.springframework.amqp.rabbit.listener.ListenerContainerConsumerFailedEvent;
  3. import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
  4. import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
  5. import org.springframework.context.ApplicationListener;
  6. import org.springframework.stereotype.Component;
  7. import org.springframework.util.Assert;
  8. import lombok.extern.slf4j.Slf4j;
  9. /**
  10. * MQ消费者失败事件监听器
  11. * @author wxyh
  12. * @date 2018/04/02
  13. */
  14. @Slf4j
  15. @Component
  16. public class ListenerContainerConsumerFailedEventListener implements ApplicationListener<ListenerContainerConsumerFailedEvent> {
  17. @Override
  18. public void onApplicationEvent(ListenerContainerConsumerFailedEvent event) {
  19. log.error("消费者失败事件发生:{}", event);
  20. if (event.isFatal()) {
  21. log.error(String.format("Stopping container from aborted consumer. Reason::%s.",
  22. event.getReason()), event.getThrowable());
  23. SimpleMessageListenerContainer container = (SimpleMessageListenerContainer) event.getSource();
  24. String queueNames = Arrays.toString(container.getQueueNames());
  25. // 重启
  26. try {
  27. restart(container);
  28. log.info("重启队列%s的监听成功!", queueNames);
  29. } catch (Exception e) {
  30. log.error(String.format("重启队列%s的监听失败!", queueNames), e);
  31. }
  32. // TODO 告警,包含队列信息,监听断开原因,断开时异常信息,重启是否成功等...
  33. }
  34. }
  35. /**
  36. * 重启监听
  37. * @param container
  38. * @return
  39. */
  40. private void restart(SimpleMessageListenerContainer container) {
  41. // 暂停30s
  42. try {
  43. Thread.sleep(30000);
  44. } catch (Exception e) {
  45. log.error(e.getMessage());
  46. }
  47. Assert.state(!container.isRunning(), String.format("监听容器%s正在运行!", container));
  48. container.start();
  49. }
  50. }

参考:一文教你如何解决RabbitMQ队列无消费者

        RabbitMQ异常监控及动态控制队列消费的解决方案

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/712797
推荐阅读
相关标签
  

闽ICP备14008679号