赞
踩
【Eureka】【源码+图解】【二】Eureka客户端启动过程
HelloWorld总是如此简单,接下来就要去看看eureka server内部的工作原理了。
如果要研究它的源码,我们总得找到一个入口,服务端的代码我们就只加了一个@EnableEurekaServer
就成功创建了一个server,跟着这个蛛丝马迹一探它的究竟
/** * Annotation to activate Eureka Server related configuration. * {@link EurekaServerAutoConfiguration} * * @author Dave Syer * @author Biju Kunjummen * */ @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import(EurekaServerMarkerConfiguration.class) public @interface EnableEurekaServer { }
好吧,啥都没有?别急,我们看下注解,它是用来激活Eureka Server的相关配置的,还告诉我们是在哪个配置文件,我们先看下EurekaServerAutoConfiguration
类图
配置类注册了几个和服务端工作相关的类,看不懂不要紧,我们暂时先放下,再详细看下EurekaServerAutoConfiguration
的源码,当然不会看全部,先看下头部的注解
@Configuration(proxyBeanMethods = false)
// 7、它的作用主要是启动定时线程,定期检查客户端实例的状态,实现自我保护机制
@Import(EurekaServerInitializerConfiguration.class)
// 还记得前面的@EnableEurekaServer吗?它@Import(EurekaServerMarkerConfiguration.class)注入了Marker类
// 这就是为什么eureka server启动时要加@EnableEurekaServer的原因
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
// 配置参数
@EnableConfigurationProperties({
EurekaDashboardProperties.class, // eureka.dashboard.*
InstanceRegistryProperties.class, // eureka.instance.registry.*
EurekaProperties.class }) // eureka.*
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {
......
}
先介绍下这几个类的作用,看图
jerseyApplication:其实是它里面的各个xxxResource类,用于接收来自客户端以及其它服务端的请求
PeerAwareInstanceRegistry:处理请求
PeerEurekaNode:服务端与服务端相互复制实例信息
RemoteRegionRegistry:获取其他region的注册实例
而EurekaServerContext、EurekaServerBootstrap、EurekaServerInitializerConfiguration则是用来初始化途中的四个类的。
先看配置
@EnableConfigurationProperties({ EurekaDashboardProperties.class, ... })
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {
@Bean
@ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true)
public EurekaController eurekaController(EurekaProperties eurekaProperties) {
return new EurekaController(this.applicationInfoManager, eurekaProperties);
}
}
先看它的配置信息类EurekaDashboardProperties
@ConfigurationProperties("eureka.dashboard")
public class EurekaDashboardProperties {
// dashboard页面的跟path,
// 前面的HelloWorld中我们直接输入http://localhost:3333/就可以显示,
// 可配置eureka.dashboard.path修改
private String path = "/";
// 默认开启的,如果false我们就不能看到dashboard页面
private boolean enabled = true;
}
再看EurekaController
提供了两个page,第一个是默认,可以结合下面的图看;第二个是/lastn
@Controller @RequestMapping("${eureka.dashboard.path:/}") public class EurekaController { @Value("${eureka.dashboard.path:/}") private String dashboardPath = ""; @RequestMapping(method = RequestMethod.GET) public String status(HttpServletRequest request, Map<String, Object> model) { // 自身基础信息 populateBase(request, model); // 已向它注册的实例信息 populateApps(model); StatusInfo statusInfo; ...... model.put("statusInfo", statusInfo); populateInstanceInfo(model, statusInfo); filterReplicas(model, statusInfo); return "eureka/status"; } @RequestMapping(value = "/lastn", method = RequestMethod.GET) public String lastn(HttpServletRequest request, Map<String, Object> model) { ...... return "eureka/lastn"; } }
public class EurekaServerAutoConfiguration implements WebMvcConfigurer { @Bean public PeerAwareInstanceRegistry peerAwareInstanceRegistry(ServerCodecs serverCodecs) { // 强制初始化,还记得前面客户端的DiscoveryClient吗?成为一个Server之前你得先是一个Client, // 与客户端稍微不同的是,客户端的eurekaTransport.queryClient是RestTemplateEurekaHttpClient, // 而服务端的是JerseyApplicationClient this.eurekaClient.getApplications(); // 注意,这里创建的是InstanceRegistry类,不是InstanceRegistry接口 return new InstanceRegistry(this.eurekaServerConfig, // EurekaServerConfigBean,eureka.server.*的配置 this.eurekaClientConfig, // EurekaClientConfigBean,eureka.client.*的配置 serverCodecs, // 加密解密 this.eurekaClient, // 作为客户端身份的信息 // eureka.instance.registry.expectedNumberOfClientsSendingRenews,默认1 this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(), // eureka.instance.registry.defaultOpenForTrafficCount,默认1 this.instanceRegistryProperties.getDefaultOpenForTrafficCount()); } }
先看下InstanceRegistry类图
先看下InstanceRegistry的接口,这些接口的方法名是不是很熟悉?和客户端RestTemplateEurekaHttpClient
的方法一一对应,所以客户端发送的请求最终都会由InstanceRegistry
受理,在这里可以简单把InstanceRegistry
当做我们平时常见的service
。从类图中我们看到PeerAwareInstanceRegistry
接口定义了init()
方法,对于这种init
、start
等方法都值得去看一下
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry { @Override public void init(PeerEurekaNodes peerEurekaNodes) throws Exception { // 启动统计 this.numberOfReplicationsLastMin.start(); // 赋值副本节点 this.peerEurekaNodes = peerEurekaNodes; // 1、初始化缓存 initializedResponseCache(); // 2、开启定时刷新 scheduleRenewalThresholdUpdateTask(); // 3、初始化 initRemoteRegionRegistry(); ...... } }
ResponseCacheImpl
主要是缓存注册信息,当客户端发出请求的时候可以从缓存中取,还记得客户端启动时的fetchRegistry
吗?
public abstract class AbstractInstanceRegistry implements InstanceRegistry {
@Override
public synchronized void initializedResponseCache() {
if (responseCache == null) {
responseCache = new ResponseCacheImpl(serverConfig, serverCodecs, this);
}
}
}
先看下类图
最重要的是两个绿色的缓存数据结构,先看下他们的比较
他们之前所不同的是一个map,一个是LoadingCache,从名字中大概能看出一个是静态的,一个是动态的。map是通过一个定时任务维护的,而LoadingCache是每次拿值的时候动态更新,我们先看下ResponseCacheImpl
构造函数
ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) { this.serverConfig = serverConfig; this.serverCodecs = serverCodecs; // useReadOnlyResponseCache,使用只读缓存,默认true // 只读缓存:readOnlyCacheMap,每次读的时候先从自己里读,没有再去readWriteCacheMap读,读到值后更新 // 读写缓存:readWriteCacheMap,每次读值都会去获取最新的Applications this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache(); this.registry = registry; this.readWriteCacheMap = CacheBuilder.newBuilder() // eureka.server.initialCapacityOfResponseCache,默认1000,读写缓存大小 .initialCapacity(serverConfig.getInitialCapacityOfResponseCache()) // eureka.server.responseCacheAutoExpirationInSeconds,缓存失效时间,默认180秒 .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS) .removalListener(...) // 监听RemovalNotification,当收到通知时从regionSpecificKeys删除相应的信息 .build(new CacheLoader<Key, Value>() { @Override public Value load(Key key) throws Exception { if (key.hasRegions()) { Key cloneWithNoRegions = key.cloneWithoutRegions(); regionSpecificKeys.put(cloneWithNoRegions, key); } // 1、根据key动态获取value Value value = generatePayload(key); return value; } }); // eureka.server.responseCacheUpdateIntervalMs,只读缓存更新时间,默认30*1000毫秒,即30秒 long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs(); if (shouldUseReadOnlyResponseCache) { // 开启只读缓存定时更新任务 timer.schedule(getCacheUpdateTask(), // 2、定时更新只读缓存 new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs) + responseCacheUpdateIntervalMs), responseCacheUpdateIntervalMs); } ...... }
从构造函数中看到readWriteCacheMap
的关键在于generatePayload(key)
public class ResponseCacheImpl implements ResponseCache { private Value generatePayload(Key key) { String payload; switch (key.getEntityType()) { case Application: boolean isRemoteRegionRequested = key.hasRegions(); // key.getName()即appName // 目的就是获得Application或者Applications,对于从客户端来的请求key的name为ALL_APPS if (ALL_APPS.equals(key.getName())) { // 不管isRemoteRegionRequested是true或false,最终都会走到registry.getApplicationsFromMultipleRegions if (isRemoteRegionRequested) { payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions())); } else { payload = getPayLoad(key, registry.getApplications()); } } else if (ALL_APPS_DELTA.equals(key.getName())) { ...... } else { payload = getPayLoad(key, registry.getApplication(key.getName())); } break; case VIP: case SVIP: payload = getPayLoad(key, getApplicationsForVip(key, registry)); break; default: payload = ""; break; } return new Value(payload); } }
public abstract class AbstractInstanceRegistry implements InstanceRegistry { public Applications getApplicationsFromMultipleRegions(String[] remoteRegions) { boolean includeRemoteRegion = null != remoteRegions && remoteRegions.length != 0; ...... Applications apps = new Applications(); apps.setVersion(1L); for (Entry<String, Map<String, Lease<InstanceInfo>>> entry : registry.entrySet()) { // 先获取当前server的注册实例列表 ...... } if (includeRemoteRegion) { // 再获取remoteRegions的注册实例 for (String remoteRegion : remoteRegions) { // 在服务端启动的时候会初始化regionNameVSRemoteRegistry,并且每个RemoteRegionRegistry会定时更新注册实例的信息,可参考后文 RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion); if (null != remoteRegistry) { Applications remoteApps = remoteRegistry.getApplications(); ...... } else { logger.warn("No remote registry available for the remote region {}", remoteRegion); } } } apps.setAppsHashCode(apps.getReconcileHashCode()); return apps; } }
private TimerTask getCacheUpdateTask() { return new TimerTask() { @Override public void run() { for (Key key : readOnlyCacheMap.keySet()) { try { CurrentRequestVersion.set(key.getVersion()); // 如果readOnlyCacheMap与readWriteCacheMap的值,如果不相同则把readWriteCacheMap中的值更新到readOnlyCacheMap Value cacheValue = readWriteCacheMap.get(key); Value currentCacheValue = readOnlyCacheMap.get(key); if (cacheValue != currentCacheValue) { readOnlyCacheMap.put(key, cacheValue); } } catch (Throwable th) { } finally { CurrentRequestVersion.remove(); } } } }; }
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry { private void scheduleRenewalThresholdUpdateTask() { // 每隔15分钟更新自我保护的阈值,也就是如果续约的实例低于这个阈值就会发出预警 timer.schedule(new TimerTask() { @Override public void run() { updateRenewalThreshold(); } }, serverConfig.getRenewalThresholdUpdateIntervalMs(), // eureka.server.renewalThresholdUpdateIntervalMs,默认15分钟 serverConfig.getRenewalThresholdUpdateIntervalMs()); } private void updateRenewalThreshold() { ...... Applications apps = eurekaClient.getApplications(); int count = 0; // 统计当前已注册的实例总数 for (Application app : apps.getRegisteredApplications()) { for (InstanceInfo instance : app.getInstances()) { if (this.isRegisterable(instance)) { ++count; } } } synchronized (lock) { if ((count) > (serverConfig.getRenewalPercentThreshold() * expectedNumberOfClientsSendingRenews) || (!this.isSelfPreservationModeEnabled())) { // 更新实例总数 this.expectedNumberOfClientsSendingRenews = count; updateRenewsPerMinThreshold(); } } ...... } protected void updateRenewsPerMinThreshold() { // 这个值的作用时,server会每隔一段时间检查续约的实例,续约成功的实例个数要达到numberOfRenewsPerMinThreshold,不然会发出警告,即自我保护 // 实例总数 * 单个实例1分钟需注册的次数 * 百分比 this.numberOfRenewsPerMinThreshold = (int) ( // 当前已注册的实例总数 this.expectedNumberOfClientsSendingRenews // eureka.server.expectedClientRenewalIntervalSeconds,客户端续约时间,默认30秒 // 60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds(),1分钟要注册的次数 * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds()) // eureka.server.renewalPercentThreshold,续约比例不能低于这个值,默认0.85,否则就会发出警告 * serverConfig.getRenewalPercentThreshold()); } }
RemoteRegionRegistry的作用主要是获取运行在其他region的server的实例注册信息
public abstract class AbstractInstanceRegistry implements InstanceRegistry { protected void initRemoteRegionRegistry() throws MalformedURLException { // eureka.server.remoteRegionUrlsWithName,默认new HashMap<>(),key为region,value为region的url Map<String, String> remoteRegionUrlsWithName = serverConfig.getRemoteRegionUrlsWithName(); if (!remoteRegionUrlsWithName.isEmpty()) { allKnownRemoteRegions = new String[remoteRegionUrlsWithName.size()]; int remoteRegionArrayIndex = 0; for (Map.Entry<String, String> remoteRegionUrlWithName : remoteRegionUrlsWithName.entrySet()) { // 初始化RemoteRegionRegistry,初始化完后会开启一个定时更新RemoteRegion的注册实例列表,详见后文 RemoteRegionRegistry remoteRegionRegistry = new RemoteRegionRegistry( serverConfig, clientConfig, serverCodecs, remoteRegionUrlWithName.getKey(), new URL(remoteRegionUrlWithName.getValue())); // 更新到regionNameVSRemoteRegistry regionNameVSRemoteRegistry.put(remoteRegionUrlWithName.getKey(), remoteRegionRegistry); allKnownRemoteRegions[remoteRegionArrayIndex++] = remoteRegionUrlWithName.getKey(); } } } }
先看下RemoteRegionRegistry类图
从图中可以看到,它的作用与客户单的DiscoveryClient
和服务端的InstanceRegistry
作用差不多,它负责的是与其他region的服务端的通信,再看下其构造函数。其构造函数代码过多,为了简便先看下创建流程图
着重分析下绿色的部分
discoveryJerseyClient即EurekaJerseyClientImpl,作用是提供ApacheHttpClient4,用于获取其他region的注册实例
public class RemoteRegionRegistry implements LookupService<String> { public RemoteRegionRegistry(...) { ...... EurekaJerseyClientBuilder clientBuilder = new EurekaJerseyClientBuilder() .withUserAgent("Java-EurekaClient-RemoteRegion") .withEncoderWrapper(serverCodecs.getFullJsonCodec()) .withDecoderWrapper(serverCodecs.getFullJsonCodec()) // eureka.server.remoteRegionConnectTimeoutMs,默认1000毫秒 .withConnectionTimeout(serverConfig.getRemoteRegionConnectTimeoutMs()) // eureka.server.remoteRegionReadTimeoutMs,默认1000毫秒 .withReadTimeout(serverConfig.getRemoteRegionReadTimeoutMs()) // eureka.server.remoteRegionTotalConnectionsPerHost,默认500 .withMaxConnectionsPerHost(serverConfig.getRemoteRegionTotalConnectionsPerHost()) // eureka.server.remoteRegionTotalConnections,默认1000 .withMaxTotalConnections(serverConfig.getRemoteRegionTotalConnections()) // eureka.server.remoteRegionConnectionIdleTimeoutSeconds,默认30秒 .withConnectionIdleTimeout(serverConfig.getRemoteRegionConnectionIdleTimeoutSeconds()); if (remoteRegionURL.getProtocol().equals("http")) { ...... } else { clientBuilder.withClientName("Discovery-RemoteRegionSecureClient-" + regionName) .withTrustStoreFile( // eureka.server.remoteRegionTrustStore,默认null serverConfig.getRemoteRegionTrustStore(), // eureka.server.remoteRegionTrustStorePassword,默认changeit serverConfig.getRemoteRegionTrustStorePassword() ); } discoveryJerseyClient = clientBuilder.build(); discoveryApacheClient = discoveryJerseyClient.getClient(); } }
过程不再赘述,这里简单比较下不同角色所用的client,作用也是用于获取其他region的注册实例
public class RemoteRegionRegistry implements LookupService<String> { public RemoteRegionRegistry(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs, String regionName, URL remoteRegionURL) { ...... // 1、定义更新remote region的server的注册信息的task Runnable remoteRegionFetchTask = new Runnable() { @Override public void run() { // 具体逻辑,最终会到fetchRemoteRegistry函数 if (fetchRegistry()) { readyForServingData = true; } } }; // 2、创建执行task的线程池 ThreadPoolExecutor remoteRegionFetchExecutor = new ThreadPoolExecutor( 1, // eureka.server.remoteRegionFetchThreadPoolSize,默认20 serverConfig.getRemoteRegionFetchThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); // 3、创建定时调度器 scheduler = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() .setNameFormat("Eureka-RemoteRegionCacheRefresher_" + regionName + "-%d") .setDaemon(true) .build()); // 4、开启定时任务,每次完成更新后再过30秒进行下一次更新,循环反复 scheduler.schedule( new TimedSupervisorTask( "RemoteRegionFetch_" + regionName, scheduler, remoteRegionFetchExecutor, // eureka.server.remoteRegionRegistryFetchInterval, 默认30秒 serverConfig.getRemoteRegionRegistryFetchInterval(), TimeUnit.SECONDS, 5, // 最大超时倍数 remoteRegionFetchTask ), serverConfig.getRemoteRegionRegistryFetchInterval(), TimeUnit.SECONDS); } private Applications fetchRemoteRegistry(boolean delta) { // eureka.server.experimental.transport.enabled,默认false if (shouldUseExperimentalTransport()) { ...... // 最终由JerseyApplicationClient发出http请求 EurekaHttpResponse<Applications> httpResponse = delta ? eurekaHttpClient.getDelta() : eurekaHttpClient.getApplications(); ...... } else { ...... // delta为false String urlPath = delta ? "apps/delta" : "apps/"; // discoveryApacheClient为ApacheHttpClient4 response = discoveryApacheClient.resource(this.remoteRegionURL + urlPath) .accept(MediaType.APPLICATION_JSON_TYPE) .get(ClientResponse.class); ...... } return null; } }
未完待续
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。