当前位置:   article > 正文

eureka监听各服务状态,下线、重连等,并做相应的处理_eureka-server 熔断

eureka-server 熔断

在一些场景下,我们需要监听eureka服务中心的一些状态,譬如某个微服务挂掉了,我们希望能监听到,并给管理员发送邮件通知。

Eureka的server端会发出5个事件通知,分别是:

EurekaInstanceCanceledEvent 服务下线事件
EurekaInstanceRegisteredEvent 服务注册事件
EurekaInstanceRenewedEvent 服务续约事件
EurekaRegistryAvailableEvent Eureka注册中心启动事件
EurekaServerStartedEvent Eureka Server启动事件

我们可以从源码中很方便地看到

eureka的server端源码一共也没几个类,Controller是给界面用的,还有一些配置类,我们简单来看几个。


  1. @Configuration
  2. @CommonsLog
  3. public class EurekaServerInitializerConfiguration
  4. implements ServletContextAware, SmartLifecycle, Ordered {
  5. ...
  6. @Override
  7. public void start() {
  8. new Thread(new Runnable() {
  9. @Override
  10. public void run() {
  11. try {
  12. //TODO: is this class even needed now?
  13. // 初始化eureka server
  14. eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
  15. log.info("Started Eureka Server");
  16. // 传递eureka注册事件,找来找去没看到有listen去处理这件事件
  17. // spring包中没有去处理,如果我们有需要可以自己处理
  18. // https://github.com/spring-cloud/spring-cloud-netflix/issues/1726
  19. publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
  20. // 设置在spring容器关闭的时候执行stop方法
  21. EurekaServerInitializerConfiguration.this.running = true;
  22. publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
  23. }
  24. catch (Exception ex) {
  25. // Help!
  26. log.error("Could not initialize Eureka servlet context", ex);
  27. }
  28. }
  29. }).start();
  30. }
  31. @Bean
  32. public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
  33. EurekaServerContext serverContext) {
  34. return new EurekaServerBootstrap(this.applicationInfoManager,
  35. this.eurekaClientConfig, this.eurekaServerConfig, registry,
  36. serverContext);
  37. }
  38. ...
  39. }

  1. @CommonsLog
  2. public class EurekaServerBootstrap {
  3. ....
  4. public void contextInitialized(ServletContext context) {
  5. try {
  6. //初始化eureka环境
  7. initEurekaEnvironment();
  8. //初始化eureka上下文
  9. initEurekaServerContext();
  10. context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
  11. }
  12. catch (Throwable e) {
  13. log.error("Cannot bootstrap eureka server :", e);
  14. throw new RuntimeException("Cannot bootstrap eureka server :", e);
  15. }
  16. }
  17. protected void initEurekaEnvironment() throws Exception {
  18. log.info("Setting the eureka configuration..");
  19. //如果在云环境中运行,需要传递java命令行属性-Deureka.datacenter = cloud,以便Eureka客户端/服务器知道初始化AWS云特定的信息
  20. String dataCenter = ConfigurationManager.getConfigInstance()
  21. .getString(EUREKA_DATACENTER);
  22. if (dataCenter == null) {
  23. log.info(
  24. "Eureka data center value eureka.datacenter is not set, defaulting to default");
  25. ConfigurationManager.getConfigInstance()
  26. .setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, DEFAULT);
  27. }
  28. else {
  29. ConfigurationManager.getConfigInstance()
  30. .setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, dataCenter);
  31. }
  32. //eureka 运行环境,java命令行属性-Deureka.environment=eureka-client-{test,prod}
  33. String environment = ConfigurationManager.getConfigInstance()
  34. .getString(EUREKA_ENVIRONMENT);
  35. if (environment == null) {
  36. ConfigurationManager.getConfigInstance()
  37. .setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, TEST);
  38. log.info(
  39. "Eureka environment value eureka.environment is not set, defaulting to test");
  40. }
  41. else {
  42. ConfigurationManager.getConfigInstance()
  43. .setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, environment);
  44. }
  45. }
  46. // eureka使用XStream来对json、xml格式的转换
  47. protected void initEurekaServerContext() throws Exception {
  48. // 向后兼容
  49. //注册状态转换器
  50. JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
  51. XStream.PRIORITY_VERY_HIGH);
  52. XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
  53. XStream.PRIORITY_VERY_HIGH);
  54. //判断是否亚马逊的数据中心
  55. if (isAws(this.applicationInfoManager.getInfo())) {
  56. this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
  57. this.eurekaClientConfig, this.registry, this.applicationInfoManager);
  58. //
  59. this.awsBinder.start();
  60. }
  61. //初始化eureka server上下文
  62. EurekaServerContextHolder.initialize(this.serverContext);
  63. log.info("Initialized server context");
  64. // 从相邻的eureka节点复制注册表
  65. int registryCount = this.registry.syncUp();
  66. // 默认每30秒发送心跳,1分钟就是2次
  67. // 修改eureka状态为up
  68. this.registry.openForTraffic(this.applicationInfoManager, registryCount);
  69. // 注册所有监控
  70. EurekaMonitors.registerAllStats();
  71. }
  72. ...
  73. }

  1. public class InstanceRegistry extends PeerAwareInstanceRegistryImpl implements ApplicationContextAware {
  2. private static final Log log = LogFactory.getLog(InstanceRegistry.class);
  3. private ApplicationContext ctxt;
  4. private int defaultOpenForTrafficCount;
  5. public InstanceRegistry(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs, EurekaClient eurekaClient, int expectedNumberOfRenewsPerMin, int defaultOpenForTrafficCount) {
  6. super(serverConfig, clientConfig, serverCodecs, eurekaClient);
  7. this.expectedNumberOfRenewsPerMin = expectedNumberOfRenewsPerMin;
  8. this.defaultOpenForTrafficCount = defaultOpenForTrafficCount;
  9. }
  10. public void setApplicationContext(ApplicationContext context) throws BeansException {
  11. this.ctxt = context;
  12. }
  13. public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
  14. super.openForTraffic(applicationInfoManager, count == 0 ? this.defaultOpenForTrafficCount : count);
  15. }
  16. public void register(InstanceInfo info, int leaseDuration, boolean isReplication) {
  17. this.handleRegistration(info, leaseDuration, isReplication);
  18. super.register(info, leaseDuration, isReplication);
  19. }
  20. public void register(InstanceInfo info, boolean isReplication) {
  21. this.handleRegistration(info, this.resolveInstanceLeaseDuration(info), isReplication);
  22. super.register(info, isReplication);
  23. }
  24. public boolean cancel(String appName, String serverId, boolean isReplication) {
  25. this.handleCancelation(appName, serverId, isReplication);
  26. return super.cancel(appName, serverId, isReplication);
  27. }
  28. public boolean renew(String appName, String serverId, boolean isReplication) {
  29. this.log("renew " + appName + " serverId " + serverId + ", isReplication {}" + isReplication);
  30. List<Application> applications = this.getSortedApplications();
  31. Iterator var5 = applications.iterator();
  32. while(var5.hasNext()) {
  33. Application input = (Application)var5.next();
  34. if (input.getName().equals(appName)) {
  35. InstanceInfo instance = null;
  36. Iterator var8 = input.getInstances().iterator();
  37. while(var8.hasNext()) {
  38. InstanceInfo info = (InstanceInfo)var8.next();
  39. if (info.getId().equals(serverId)) {
  40. instance = info;
  41. break;
  42. }
  43. }
  44. //这里发布重连事件
  45. this.publishEvent(new EurekaInstanceRenewedEvent(this, appName, serverId, instance, isReplication));
  46. break;
  47. }
  48. }
  49. return super.renew(appName, serverId, isReplication);
  50. }
  51. protected boolean internalCancel(String appName, String id, boolean isReplication) {
  52. this.handleCancelation(appName, id, isReplication);
  53. return super.internalCancel(appName, id, isReplication);
  54. }
  55. private void handleCancelation(String appName, String id, boolean isReplication) {
  56. this.log("cancel " + appName + ", serverId " + id + ", isReplication " + isReplication);
  57. this.publishEvent(new EurekaInstanceCanceledEvent(this, appName, id, isReplication));
  58. }
  59. private void handleRegistration(InstanceInfo info, int leaseDuration, boolean isReplication) {
  60. this.log("register " + info.getAppName() + ", vip " + info.getVIPAddress() + ", leaseDuration " + leaseDuration + ", isReplication " + isReplication);
  61. this.publishEvent(new EurekaInstanceRegisteredEvent(this, info, leaseDuration, isReplication));
  62. }
  63. private void log(String message) {
  64. if (log.isDebugEnabled()) {
  65. log.debug(message);
  66. }
  67. }
  68. private void publishEvent(ApplicationEvent applicationEvent) {
  69. this.ctxt.publishEvent(applicationEvent);
  70. }
  71. private int resolveInstanceLeaseDuration(InstanceInfo info) {
  72. int leaseDuration = 90;
  73. if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
  74. leaseDuration = info.getLeaseInfo().getDurationInSecs();
  75. }
  76. return leaseDuration;
  77. }
  78. }
差不多能用上的就这几个类了,这5个server端事件在这几个类中都能找到。

知道了事件名,要监听就很简单了

  1. package com.tianyalei.server;
  2. import com.netflix.appinfo.InstanceInfo;
  3. import org.springframework.cloud.netflix.eureka.server.event.EurekaInstanceCanceledEvent;
  4. import org.springframework.cloud.netflix.eureka.server.event.EurekaInstanceRegisteredEvent;
  5. import org.springframework.cloud.netflix.eureka.server.event.EurekaInstanceRenewedEvent;
  6. import org.springframework.cloud.netflix.eureka.server.event.EurekaRegistryAvailableEvent;
  7. import org.springframework.cloud.netflix.eureka.server.event.EurekaServerStartedEvent;
  8. import org.springframework.context.event.EventListener;
  9. import org.springframework.stereotype.Component;
  10. /**
  11. * Created by wuweifeng on 2017/10/10.
  12. */
  13. @Component
  14. public class EurekaStateChangeListener {
  15. @EventListener
  16. public void listen(EurekaInstanceCanceledEvent eurekaInstanceCanceledEvent) {
  17. //服务断线事件
  18. String appName = eurekaInstanceCanceledEvent.getAppName();
  19. String serverId = eurekaInstanceCanceledEvent.getServerId();
  20. System.out.println(appName);
  21. System.out.println(serverId);
  22. }
  23. @EventListener
  24. public void listen(EurekaInstanceRegisteredEvent event) {
  25. InstanceInfo instanceInfo = event.getInstanceInfo();
  26. System.out.println(instanceInfo);
  27. }
  28. @EventListener
  29. public void listen(EurekaInstanceRenewedEvent event) {
  30. event.getAppName();
  31. event.getServerId();
  32. }
  33. @EventListener
  34. public void listen(EurekaRegistryAvailableEvent event) {
  35. }
  36. @EventListener
  37. public void listen(EurekaServerStartedEvent event) {
  38. //Server启动
  39. }
  40. }
在server项目里加上这个监听类,然后在各个事件被触发时就能在这里监听并处理了。

譬如我启动一个client

在这里就能看到新注册的client的各项详细信息。
然后我再关掉client


可以看到挂掉的client的appName,serverId等信息。那么我们就可以在这里做一些处理,譬如邮件通知管理员,某某服务挂掉了。



声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Guff_9hys/article/detail/818261
推荐阅读
相关标签
  

闽ICP备14008679号