赞
踩
上次生产服务器出问题,导致上面的rabbitmq主节点和一系列应用服务宕机。运维先重启应用服务(消费者)后再启动MQ主节点后发现,主节点队列上堆积的消息没有被消费,点进队列一看,没有消费者!
在开发环境搭建mq集群,模拟生产场景。
启动消费者应用,可以发现日志报错信息如下:
解释一下意思就是: 消费者发现没有要监听的 queue 时,默认会进行三次重试监听 queue,三次都失败后就无法重试了(三次时间很短,默认10s一次)
此时就算你重启了MQ,但是消费者不会再去监听mq对应队列了,所以MQ的队列上就不会有消费者信息了。
spring通过发布事件的方式,可以通知观察者(即事件监听器)消费者的一些行为,消费者相关的事件如下所示。
基于事件机制,可以通过监听事件ListenerContainerConsumerFailedEvent,当有消费者发生致命错误时,重新创建消费者消费消息,并发送告警信息给相关责任人。具体实现如下:
- import java.util.Arrays;
-
- import org.springframework.amqp.rabbit.listener.ListenerContainerConsumerFailedEvent;
- import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
- import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
- import org.springframework.context.ApplicationListener;
- import org.springframework.stereotype.Component;
- import org.springframework.util.Assert;
-
- import lombok.extern.slf4j.Slf4j;
-
- /**
- * MQ消费者失败事件监听器
- * @author wxyh
- * @date 2018/04/02
- */
- @Slf4j
- @Component
- public class ListenerContainerConsumerFailedEventListener implements ApplicationListener<ListenerContainerConsumerFailedEvent> {
-
- @Override
- public void onApplicationEvent(ListenerContainerConsumerFailedEvent event) {
- log.error("消费者失败事件发生:{}", event);
-
- if (event.isFatal()) {
- log.error(String.format("Stopping container from aborted consumer. Reason::%s.",
- event.getReason()), event.getThrowable());
- SimpleMessageListenerContainer container = (SimpleMessageListenerContainer) event.getSource();
- String queueNames = Arrays.toString(container.getQueueNames());
- // 重启
- try {
- restart(container);
- log.info("重启队列%s的监听成功!", queueNames);
- } catch (Exception e) {
- log.error(String.format("重启队列%s的监听失败!", queueNames), e);
- }
-
- // TODO 告警,包含队列信息,监听断开原因,断开时异常信息,重启是否成功等...
-
- }
- }
-
- /**
- * 重启监听
- * @param container
- * @return
- */
- private void restart(SimpleMessageListenerContainer container) {
- // 暂停30s
- try {
- Thread.sleep(30000);
- } catch (Exception e) {
- log.error(e.getMessage());
- }
-
- Assert.state(!container.isRunning(), String.format("监听容器%s正在运行!", container));
- container.start();
- }
-
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。