赞
踩
一、Eureka Client 的工作
二、源码解析
Eureka Client 是通过Starter的方式引入依赖,Spring Boot 将会为项目使用以下自动配置类:
1. EurekaClientAutoConfiguration :Eureka Client 自动配置类,负责 Eureka Client 中关键 Beans 的配置和初始化,如 ApplicationInfoManager 和 EurekaClientConfig等。
2. RibbonEurekaAutoConfiguration: Ribbon 负载均衡相关配置。
3. EurekaDiscoveryClientConfiguration: 配置自动注册和应用的健康检查器。
2.1 读取应用自身配置信息
通过EurekaDiscoverClientConfig 配置类, Spring Boot 帮助 Eureka Client 完成很多必要 Bean 的属性读取和配置, 现列出 EurekaDiscoveryClientConfiguation 中的属性读取和配置类。
类名 | 作用和介绍 |
EurekaClientConfig | 封装 Eureka Client 与 Eureka Server 交互所需要的配置信息。 Spring Cloud 为其提供了一个默认配置类的 EurekaClientConfigBean ,可以在配置文件中通过前缀 eureka.client+属性名进行属性覆盖 |
ApplicationInfoManager | 作为应用信息管理器,管理服务实例的信息类 InstanceInfo 和服务实例的配置信息类 EurekaInstanceConfig |
InstanceInfo | 封装将被发送到Eureka Server 进行服务注册的服务实例元数据。它在 Eureka Server 的注册中代表一个服务实例,其他服务实例可以通过 InstanceInfo 了解该服务实例的相关信息从而发起服务请求 |
EurekaInstanceConfig | 封装 Eureka Client 自身服务实例的配置信息,主要用于构建 InstanceInfo 通常这些信息在配置文件中的 eureka.instance 前缀下进行设置, Spring Cloud 通过EurekaInstanceConfigBean 配置类提供了默认配置 |
DiscoveryClient | Spring Cloud 中定义用来服务发现的客户端接口 |
2.2 服务发现客户端
2.1.1 DiscoveryClient 职责
DiscoveryClient 是 Eureka Client 的核心类,包括与 Eureka Server 交互的关键逻辑,具备了以下职能:
2.1.2 DiscoveryClient 构造函数
构造函数中初始化发送心跳、缓存刷新等定时任务
- // com.netflix.discovery.DiscoveryClient#DiscoveryClient()
- // 对应配置为 eureka.client.fetch-register , true 表示Eureka Client 将从 Eureka Server 中拉取注册表信息
- if (config.shouldFetchRegistry()) {
- this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
- } else {
- this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
- }
- // 对应配置为 eureka.client.register-with-eureka , true 表示Eureka Client 将注册到 Eureka Server 中
- if (config.shouldRegisterWithEureka()) {
- this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
- } else {
- this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
- }
接着定义一个基于线程池的定时器线程池 ScheduledExecutorService,线程池大小为2,一个线程用于发送心跳,另一个线程用于缓存刷新,同时定义了发送心跳和缓存刷新线程池,代码如下所示:
- // com.netflix.discovery.DiscoveryClient#DiscoveryClient()
-
- scheduler = Executors.newScheduledThreadPool(2,
- new ThreadFactoryBuilder()
- .setNameFormat("DiscoveryClient-%d")
- .setDaemon(true)
- .build());
-
- heartbeatExecutor = new ThreadPoolExecutor(
- 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
- new SynchronousQueue<Runnable>(),
- new ThreadFactoryBuilder()
- .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
- .setDaemon(true)
- .build()
- ); // use direct handoff
-
- cacheRefreshExecutor = new ThreadPoolExecutor(...);
-
- // 内部类,封装 http 调用的 Jersey 客户端
- eurekaTransport = new EurekaTransport();
- scheduleServerEndpointTask(eurekaTransport, args);

接着从Eureka Server 中拉取注册表信息,代码如下所示:
- if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
- fetchRegistryFromBackup();
- }
拉取完 Eureka Server 中的注册表信息后,将对服务实例进行注册,代码如下所示:
- if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
- try {
- // 发起服务注册
- if (!register() ) {
- throw new IllegalStateException("Registration error at startup. Invalid server response.");
- }
- } catch (Throwable th) {
- logger.error("Registration error at startup: {}", th.getMessage());
- throw new IllegalStateException(th);
- }
- }
- // 初始化定时任务
- // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
- initScheduledTasks();
最后总结一下,在 DiscoveryClient 的构造函数中,主要依次做了以下的事情:
2.3 拉取注册表信息
com.netflix.discovery.DiscoveryClient#fetchRegistry
- private boolean fetchRegistry(boolean forceFullRegistryFetch) {
- Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
-
- try {
- //如果增量式拉取被禁止,或者Applications 为 null ,进行全是拉取
- // If the delta is disabled or if it is the first time, get all
- // applications
- Applications applications = getApplications();
-
- if (clientConfig.shouldDisableDelta()
- || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
- || forceFullRegistryFetch
- || (applications == null)
- || (applications.getRegisteredApplications().size() == 0)
- || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
- {
- // 全量拉取注册表信息
- getAndStoreFullRegistry();
- } else {
- // 增量拉取注册表信息
- getAndUpdateDelta(applications);
- }
- // 计算应用集合一致性哈希码
- applications.setAppsHashCode(applications.getReconcileHashCode());
- // 打印注册表上所有服务实例的总数量
- logTotalInstances();
- } catch (Throwable e) {
- logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
- return false;
- } finally {
- if (tracer != null) {
- tracer.stop();
- }
- }
- // 在更新远程实例状态之前推送缓存刷新事件,但是Eureka中并没有提供默认的事件监听器
- // Notify about cache refresh before updating the instance remote status
- onCacheRefreshed();
-
- // 基于缓存中被刷新的数据更新远程实例状态
- // Update remote status based on refreshed data held in the cache
- updateInstanceRemoteStatus();
-
- // registry was fetched successfully, so return true
- return true;
- }

2.3.1 全量拉取注册表信息
com.netflix.discovery.DiscoveryClient#getAndStoreFullRegistry
接口: http://localhost:20000/eureka/apps/
getAndStoreFullRegistry 方法可能被多个线程同时调用,导致新拉取的注册表被旧的注册表覆盖,产生脏数据,对此,Eureka 通过类型为 AtomicLong 的 currentUpdateGeneration 对 apps 的更新版本进行跟踪。如果更新版本不一致,说明本次拉取的注册信息已过时,不需要保存到本地。拉取到注册表信息之后会对获取到的apps进行筛选,只保留状态为 UP 的服务实例信息。
2.3.2 增量式拉取注册表信息
com.netflix.discovery.DiscoveryClient#getAndUpdateDelta
接口: http://localhost:20000/eureka/apps/delta
- private void getAndUpdateDelta(Applications applications) throws Throwable {
- long currentUpdateGeneration = fetchRegistryGeneration.get();
-
- Applications delta = null;
- EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
- if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
- delta = httpResponse.getEntity();
- }
- // 获取增量拉取失败,则进行全量拉取
- if (delta == null) {
- logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
- + "Hence got the full registry.");
- getAndStoreFullRegistry();
- } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
- logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
- String reconcileHashCode = "";
- if (fetchRegistryUpdateLock.tryLock()) {
- try {
- // 更新本地缓存
- updateDelta(delta);
- // 计算应用集合一致性哈希码
- reconcileHashCode = getReconcileHashCode(applications);
- } finally {
- fetchRegistryUpdateLock.unlock();
- }
- } else {
- logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
- }
- // 比较应用集合一致性哈希码,如果不一致将认为本次增量式拉取数据已脏,将发起全量拉取更新本地注册表信息
- // There is a diff in number of instances for some reason
- if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
- reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall
- }
- } else {
- logger.warn("Not updating application delta as another thread is updating it already");
- logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
- }
- }

appsHashCode 的一般表示方式为:
appsHashCode = ${status}_${count}_
它通过将应用状态和数量拼接成字符串,表示了当前注册表中服务实例状态的统计信息。举个简单的例子,有10个应用实例的状态为UP,有5个应用实例状态为DOWN,其他的状态数据为0(不进行表示),那么appsHashCode 的形式将是:
appsHashCode = UP_10_DOWN_5_
2.4 服务注册
com.netflix.discovery.DiscoveryClient#register
接口: http://localhost:20000/eureka/apps/${APP_NAME}
-
- /**
- * Register with the eureka service by making the appropriate REST call.
- */
- boolean register() throws Throwable {
- logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
- EurekaHttpResponse<Void> httpResponse;
- try {
- // 将自身服务实例元数据封装到 InstanceInfo 中,发送到 Eureka Server中请求服务注册, 当Eureka Server 返回 204 状态码时,说明服务注册成功。
- httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
- } catch (Exception e) {
- logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
- throw e;
- }
- if (logger.isInfoEnabled()) {
- logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
- }
- return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
- }

2.5 初始化定时任务
com.netflix.discovery.DiscoveryClient#initScheduledTasks
这个方法中初始化三个定时器任务,一个用于向 Eureka Server 拉取注册表信息刷新本地缓存;一个用于向 Eureka Server 发送心跳;一个用于进行按需注册的操作。代码如下所示:
-
- /**
- * Initializes all scheduled tasks.
- */
- private void initScheduledTasks() {
- if (clientConfig.shouldFetchRegistry()) {
- // 注册表缓存刷新定时器
- // 获取配置文件中刷新间隔,默认为30秒,可以通过 eureka.client.registry-fetch-interval-seconds 进行设置
- // registry cache refresh timer
- int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
- int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
- scheduler.schedule(
- new TimedSupervisorTask("cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds,
- TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread()
- ),
- registryFetchIntervalSeconds, TimeUnit.SECONDS);
- }
-
- if (clientConfig.shouldRegisterWithEureka()) {
- int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
- int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
- logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
-
- // 发送心跳定时器,默认30秒发送一次心跳
- // Heartbeat timer
- scheduler.schedule(
- new TimedSupervisorTask("heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs,
- TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread()
- ),
- renewalIntervalInSecs, TimeUnit.SECONDS);
-
- // 按需注册定时器 。。。。
-
- } else {
- logger.info("Not registering with Eureka server per configuration");
- }
- }

2.5.1 按需注册定时任务
按需注册定时任务的作用是当 Eureka Client 中的 InstanceInfo 或者 status 发生变化时,重新向 Eureka Server 发起注册请求,更新注册表中的服务实例信息,保证 Eureka Server 注册表中服务实例信息有效和可用。
- // 按需注册定时器 。。。。
- // InstanceInfo replicator
- // 定时检查刷新服务实例信息,检查是否有变化,是否需要重新注册
- instanceInfoReplicator = new InstanceInfoReplicator(
- this,
- instanceInfo,
- clientConfig.getInstanceInfoReplicationIntervalSeconds(),
- 2); // burstSize
- // 监控应用的 status 变化,发生变化即可发起重新注册
- statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
- @Override
- public String getId() {
- return "statusChangeListener";
- }
-
- @Override
- public void notify(StatusChangeEvent statusChangeEvent) {
- if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
- InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
- // log at warn level if DOWN was involved
- logger.warn("Saw local status change event {}", statusChangeEvent);
- } else {
- logger.info("Saw local status change event {}", statusChangeEvent);
- }
- instanceInfoReplicator.onDemandUpdate();
- }
- };
-
- if (clientConfig.shouldOnDemandUpdateStatusChange()) {
- // 注册应用状态改变监控器
- applicationInfoManager.registerStatusChangeListener(statusChangeListener);
- }
- // 启动定时器按需注册定时任务
- instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());

2.6 服务下线
com.netflix.discovery.DiscoveryClient#shutdown
-
- /**
- * Shuts down Eureka Client. Also sends a deregistration request to the
- * eureka server.
- */
- @PreDestroy
- @Override
- public synchronized void shutdown() {
- // 同步方法
- if (isShutdown.compareAndSet(false, true)) {
- logger.info("Shutting down DiscoveryClient ...");
- // 原子操作,确保只会执行一次
- if (statusChangeListener != null && applicationInfoManager != null) {
- // 注销状态监听器
- applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
- }
- // 取消定时任务
- cancelScheduledTasks();
-
- // If APPINFO was registered
- if (applicationInfoManager != null
- && clientConfig.shouldRegisterWithEureka()
- && clientConfig.shouldUnregisterOnShutdown()) {
- // 服务下线
- applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
- unregister();
- }
- // 关闭 Jersy 客户端
- if (eurekaTransport != null) {
- eurekaTransport.shutdown();
- }
- // 关闭相关 Monitor
- heartbeatStalenessMonitor.shutdown();
- registryStalenessMonitor.shutdown();
-
- logger.info("Completed shut down of DiscoveryClient");
- }
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。