赞
踩
Provider APP:服务提供者。
Consumer APP:服务消费者。
Name Server:通过Virtual IP或者DNS的方式实现Nacos高可用集群的服务路由。
Nacos Server:Nacos服务提供者
Nacos Console:Nacos控制台。
服务提供者通过VIP(Virtual IP)访问Nacos Server高可用集群,基于OpenAPI完成服务的注册和服务的查询。
Nacos Server本身可以支持主备模式,所以底层会采用数据一致性算法(Raft)来完成节点的数据同步。
消费者也是如此,基于OpenAPI从Nacos Server中查询服务列表。
服务注册与发现的功能:
public class NacosServiceRegistry implements ServiceRegistry<Registration> { private static final String STATUS_UP = "UP"; private static final String STATUS_DOWN = "DOWN"; //Nacos服务注册与发现的配置属性信息 private final NacosDiscoveryProperties nacosDiscoveryProperties; //NacosService管理者,通过其创建NamingService,NamingService通过OpenAPI与Nacos服务器通信,调用Nacos服务器对外暴露的接口 @Autowired private NacosServiceManager nacosServiceManager; public NacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) { this.nacosDiscoveryProperties = nacosDiscoveryProperties; } //注册服务 public void register(Registration registration) { if (StringUtils.isEmpty(registration.getServiceId())) { log.warn("No service to register for nacos client..."); } else { NamingService namingService = this.namingService(); String serviceId = registration.getServiceId(); String group = this.nacosDiscoveryProperties.getGroup(); Instance instance = this.getNacosInstanceFromRegistration(registration); try { //通过nacos的namingService注册服务实例 namingService.registerInstance(serviceId, group, instance); log.info("nacos registry, {} {} {}:{} register finished", new Object[]{group, serviceId, instance.getIp(), instance.getPort()}); } catch (Exception var7) { if (this.nacosDiscoveryProperties.isFailFast()) { log.error("nacos registry, {} register failed...{},", new Object[]{serviceId, registration.toString(), var7}); ReflectionUtils.rethrowRuntimeException(var7); } else { log.warn("Failfast is false. {} register failed...{},", new Object[]{serviceId, registration.toString(), var7}); } } } } //服务实例注销 public void deregister(Registration registration) { log.info("De-registering from Nacos Server now..."); if (StringUtils.isEmpty(registration.getServiceId())) { log.warn("No dom to de-register for nacos client..."); } else { NamingService namingService = this.namingService(); String serviceId = registration.getServiceId(); String group = this.nacosDiscoveryProperties.getGroup(); try { namingService.deregisterInstance(serviceId, group, registration.getHost(), registration.getPort(), this.nacosDiscoveryProperties.getClusterName()); } catch (Exception var6) { log.error("ERR_NACOS_DEREGISTER, de-register failed...{},", registration.toString(), var6); } log.info("De-registration finished."); } } public void close() { try { this.nacosServiceManager.nacosServiceShutDown(); } catch (NacosException var2) { log.error("Nacos namingService shutDown failed", var2); } } //设置服务实例的状态 public void setStatus(Registration registration, String status) { if (!"UP".equalsIgnoreCase(status) && !"DOWN".equalsIgnoreCase(status)) { log.warn("can't support status {},please choose UP or DOWN", status); } else { String serviceId = registration.getServiceId(); Instance instance = this.getNacosInstanceFromRegistration(registration); if ("DOWN".equalsIgnoreCase(status)) { instance.setEnabled(false); } else { instance.setEnabled(true); } try { Properties nacosProperties = this.nacosDiscoveryProperties.getNacosProperties(); this.nacosServiceManager.getNamingMaintainService(nacosProperties).updateInstance(serviceId, this.nacosDiscoveryProperties.getGroup(), instance); } catch (Exception var6) { throw new RuntimeException("update nacos instance status fail", var6); } } } //获取服务实例的状态 public Object getStatus(Registration registration) { String serviceName = registration.getServiceId(); String group = this.nacosDiscoveryProperties.getGroup(); try { List<Instance> instances = this.namingService().getAllInstances(serviceName, group); Iterator var5 = instances.iterator(); while(var5.hasNext()) { Instance instance = (Instance)var5.next(); if (instance.getIp().equalsIgnoreCase(this.nacosDiscoveryProperties.getIp()) && instance.getPort() == this.nacosDiscoveryProperties.getPort()) { return instance.isEnabled() ? "UP" : "DOWN"; } } } catch (Exception var7) { log.error("get all instance of {} error,", serviceName, var7); } return null; } //将Registration转换成nacos的服务实例 private Instance getNacosInstanceFromRegistration(Registration registration) { Instance instance = new Instance(); instance.setIp(registration.getHost()); instance.setPort(registration.getPort()); instance.setWeight((double)this.nacosDiscoveryProperties.getWeight()); instance.setClusterName(this.nacosDiscoveryProperties.getClusterName()); instance.setEnabled(this.nacosDiscoveryProperties.isInstanceEnabled()); instance.setMetadata(registration.getMetadata()); instance.setEphemeral(this.nacosDiscoveryProperties.isEphemeral()); return instance; } //获取namingService private NamingService namingService() { return this.nacosServiceManager.getNamingService(this.nacosDiscoveryProperties.getNacosProperties()); } }
public class NacosRegistration implements Registration, ServiceInstance { public static final String MANAGEMENT_PORT = "management.port"; public static final String MANAGEMENT_CONTEXT_PATH = "management.context-path"; public static final String MANAGEMENT_ADDRESS = "management.address"; public static final String MANAGEMENT_ENDPOINT_BASE_PATH = "management.endpoints.web.base-path"; private List<NacosRegistrationCustomizer> registrationCustomizers; private NacosDiscoveryProperties nacosDiscoveryProperties; private ApplicationContext context; public NacosRegistration(List<NacosRegistrationCustomizer> registrationCustomizers, NacosDiscoveryProperties nacosDiscoveryProperties, ApplicationContext context) { this.registrationCustomizers = registrationCustomizers; this.nacosDiscoveryProperties = nacosDiscoveryProperties; this.context = context; } //@PostConstruct注解用来修饰非静态void方法,用于依赖注入之后,进行一些初始化工作 @PostConstruct public void init() { Map<String, String> metadata = this.nacosDiscoveryProperties.getMetadata(); Environment env = this.context.getEnvironment(); String endpointBasePath = env.getProperty("management.endpoints.web.base-path"); if (!StringUtils.isEmpty(endpointBasePath)) { metadata.put("management.endpoints.web.base-path", endpointBasePath); } Integer managementPort = ManagementServerPortUtils.getPort(this.context); if (null != managementPort) { metadata.put("management.port", managementPort.toString()); String contextPath = env.getProperty("management.server.servlet.context-path"); String address = env.getProperty("management.server.address"); if (!StringUtils.isEmpty(contextPath)) { metadata.put("management.context-path", contextPath); } if (!StringUtils.isEmpty(address)) { metadata.put("management.address", address); } } if (null != this.nacosDiscoveryProperties.getHeartBeatInterval()) { metadata.put("preserved.heart.beat.interval", this.nacosDiscoveryProperties.getHeartBeatInterval().toString()); } if (null != this.nacosDiscoveryProperties.getHeartBeatTimeout()) { metadata.put("preserved.heart.beat.timeout", this.nacosDiscoveryProperties.getHeartBeatTimeout().toString()); } if (null != this.nacosDiscoveryProperties.getIpDeleteTimeout()) { metadata.put("preserved.ip.delete.timeout", this.nacosDiscoveryProperties.getIpDeleteTimeout().toString()); } customize(this.registrationCustomizers, this); } private static void customize(List<NacosRegistrationCustomizer> registrationCustomizers, NacosRegistration registration) { if (registrationCustomizers != null) { Iterator var2 = registrationCustomizers.iterator(); while(var2.hasNext()) { NacosRegistrationCustomizer customizer = (NacosRegistrationCustomizer)var2.next(); customizer.customize(registration); } } } public String getServiceId() { return this.nacosDiscoveryProperties.getService(); } public String getHost() { return this.nacosDiscoveryProperties.getIp(); } public int getPort() { return this.nacosDiscoveryProperties.getPort(); } public void setPort(int port) { this.nacosDiscoveryProperties.setPort(port); } public boolean isSecure() { return this.nacosDiscoveryProperties.isSecure(); } public URI getUri() { return DefaultServiceInstance.getUri(this); } public Map<String, String> getMetadata() { return this.nacosDiscoveryProperties.getMetadata(); } public boolean isRegisterEnabled() { return this.nacosDiscoveryProperties.isRegisterEnabled(); } public String getCluster() { return this.nacosDiscoveryProperties.getClusterName(); } public float getRegisterWeight() { return this.nacosDiscoveryProperties.getWeight(); } public NacosDiscoveryProperties getNacosDiscoveryProperties() { return this.nacosDiscoveryProperties; } }
public class NacosAutoServiceRegistration extends AbstractAutoServiceRegistration<Registration> { //Nacos对Registration的实现 private NacosRegistration registration; public NacosAutoServiceRegistration(ServiceRegistry<Registration> serviceRegistry, AutoServiceRegistrationProperties autoServiceRegistrationProperties, NacosRegistration registration) { super(serviceRegistry, autoServiceRegistrationProperties); this.registration = registration; } /** @deprecated */ @Deprecated public void setPort(int port) { this.getPort().set(port); } protected NacosRegistration getRegistration() { if (this.registration.getPort() < 0 && this.getPort().get() > 0) { this.registration.setPort(this.getPort().get()); } Assert.isTrue(this.registration.getPort() > 0, "service.port has not been set"); return this.registration; } protected NacosRegistration getManagementRegistration() { return null; } //进行服务注册,调用类的register,再有父类的调用ServiceRegistry的实现类NacosServiceRegistry protected void register() { //判断Nacos服务注册的配置决定是否进行服务注册 if (!this.registration.getNacosDiscoveryProperties().isRegisterEnabled()) { log.debug("Registration disabled."); } else { if (this.registration.getPort() < 0) { this.registration.setPort(this.getPort().get()); } //调用父类register,再由父类调用NacosServiceRegistry进行真实的服务注册 super.register(); } } protected void registerManagement() { if (this.registration.getNacosDiscoveryProperties().isRegisterEnabled()) { super.registerManagement(); } } ...... //监听NacosDiscoveryInfoChangedEvent事件 @EventListener public void onNacosDiscoveryInfoChangedEvent(NacosDiscoveryInfoChangedEvent event) { this.restart(); } private void restart() { this.stop(); this.start(); } }
Nacos Client包中的NamingService实现类为NacosNamingService,通过封装好的SDK供用户使用,来调用nacos对外暴露的OpenAPI
SDK方式只是提供了一种访问的封装,在底层仍然是基于HTTP协议完成请求的。
NamingService提供了以下方法:
registerInstance:注册实例
deregisterInstance:注销实例
getAllInstances:获取某一服务的所有实例
selectInstances:获取某一服务健康或不健康的实例
selectOneHealthyInstance:根据权重选择一个健康的实例
getServerStatus:检测服务端健康状态
subscribe:注册对某个服务的监听
unsubscribe:注销对某个服务的监听
getSubscribeServices:获取被监听的服务
getServicesOfServer:获取命名空间(namespace)下的所有服务名
NacosNamingService还初始化了其他核心类,外提供的方法都是委托给其他核心类处理的。按顺序将依次初始化NamingProxy、BeatReactor、HostReactor
public class NacosNamingService implements NamingService { private String namespace; private String endpoint; private String serverList; private String cacheDir; private String logName; private HostReactor hostReactor; //心跳包响应 private BeatReactor beatReactor; //进行服务注册的带来,通过NamingProxy与Nacos Server进行最终的通信 private NamingProxy serverProxy; public NacosNamingService(String serverList) throws NacosException { Properties properties = new Properties(); properties.setProperty("serverAddr", serverList); this.init(properties); } public NacosNamingService(Properties properties) throws NacosException { this.init(properties); } private void init(Properties properties) throws NacosException { ValidatorUtils.checkInitParam(properties); this.namespace = InitUtils.initNamespaceForNaming(properties); InitUtils.initSerialization(); this.initServerAddr(properties); InitUtils.initWebRootContext(properties); this.initCacheDir(); this.initLogName(properties); //NamingService网络层代理 this.serverProxy = new NamingProxy(this.namespace, this.endpoint, this.serverList, properties); //心跳包检测线程池 this.beatReactor = new BeatReactor(this.serverProxy, this.initClientBeatThreadCount(properties)); this.hostReactor = new HostReactor(this.serverProxy, this.beatReactor, this.cacheDir, this.isLoadCacheAtStart(properties), this.isPushEmptyProtect(properties), this.initPollingThreadCount(properties)); } ......省略...... //注册服务,委托NamingProxy处理 public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { NamingUtils.checkInstanceIsLegal(instance); String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName); //如果是临时节点 if (instance.isEphemeral()) { //构造心跳包 BeatInfo beatInfo = this.beatReactor.buildBeatInfo(groupedServiceName, instance); //将心跳包加到定时线程池中定时执行 this.beatReactor.addBeatInfo(groupedServiceName, beatInfo); } //服务注册 this.serverProxy.registerService(groupedServiceName, groupName, instance); } //注销服务,委托NamingProxy处理 public void deregisterInstance(String serviceName, String groupName, Instance instance) throws NacosException { //如果是临时节点,则移除心跳包 if (instance.isEphemeral()) { this.beatReactor.removeBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), instance.getIp(), instance.getPort()); } //调用NamingProxy进行服务注销 this.serverProxy.deregisterService(NamingUtils.getGroupedName(serviceName, groupName), instance); } //获取所有服务实例的方法,委托HostReactor处理 public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters, boolean subscribe) throws NacosException { ServiceInfo serviceInfo; // 如果该消费者订阅了这个服务,那么会先从本地维护的服务列表中获取,本地为空再从服务注册中心获取服务 if (subscribe) { serviceInfo = this.hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")); } else { // 否则实例会从服务中心进行获取 serviceInfo = this.hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")); } List list; return (List)(serviceInfo != null && !CollectionUtils.isEmpty(list = serviceInfo.getHosts()) ? list : new ArrayList()); } //获取健康(不健康)服务实例方法,委托HostReactor处理 public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException { ServiceInfo serviceInfo; // 如果该消费者订阅了这个服务,那么会在本地维护一个服务列表,服务从本地获取 if (subscribe) { serviceInfo = this.hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")); } else { // 否则实例会从服务中心进行获取 serviceInfo = this.hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")); } return this.selectInstances(serviceInfo, healthy); } private List<Instance> selectInstances(ServiceInfo serviceInfo, boolean healthy) { List list; if (serviceInfo != null && !CollectionUtils.isEmpty(list = serviceInfo.getHosts())) { Iterator iterator = list.iterator(); while(true) { Instance instance; do { if (!iterator.hasNext()) { return list; } instance = (Instance)iterator.next(); } while(healthy == instance.isHealthy() && instance.isEnabled() && instance.getWeight() > 0.0D); iterator.remove(); } } else { return new ArrayList(); } } //获取一个健康的实例,委托HostReactor处理 public Instance selectOneHealthyInstance(String serviceName, String groupName, List<String> clusters, boolean subscribe) throws NacosException { return subscribe ? RandomByWeight.selectHost(this.hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","))) : RandomByWeight.selectHost(this.hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","))); } //监听服务实例,委托HostReactor处理 public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener) throws NacosException { this.hostReactor.subscribe(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","), listener); } //取消监听服务 public void unsubscribe(String serviceName, String groupName, List<String> clusters, EventListener listener) throws NacosException { this.hostReactor.unSubscribe(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","), listener); } //查询服务列表 public ListView<String> getServicesOfServer(int pageNo, int pageSize, String groupName, AbstractSelector selector) throws NacosException { return this.serverProxy.getServiceList(pageNo, pageSize, groupName, selector); } public List<ServiceInfo> getSubscribeServices() { return this.hostReactor.getSubscribeServices(); } public String getServerStatus() { return this.serverProxy.serverHealthy() ? "UP" : "DOWN"; } public BeatReactor getBeatReactor() { return this.beatReactor; } public void shutDown() throws NacosException { this.beatReactor.shutdown(); this.hostReactor.shutdown(); this.serverProxy.shutdown(); } }
NamingProxy用于与Nacos服务端通信,注册服务、注销服务、发送心跳等都经由NamingProxy来请求服务端。
NamingProxy会启动1个名为com.alibaba.nacos.client.naming.serverlist.updater的线程,用于定期调用refreshSrvIfNeed()方法更新Nacos服务端地址,默认间隔为30秒。
public class NamingProxy implements Closeable { //Nacos自定义的RestTemplate private final NacosRestTemplate nacosRestTemplate = NamingHttpClientManager.getInstance().getNacosRestTemplate(); //默认服务端口 private static final int DEFAULT_SERVER_PORT = 8848; private int serverPort = 8848; //命名空间 private final String namespaceId; private final String endpoint; private String nacosDomain; private List<String> serverList; private List<String> serversFromEndpoint = new ArrayList(); private final SecurityProxy securityProxy; private long lastSrvRefTime = 0L; private final long vipSrvRefInterMillis; private final long securityInfoRefreshIntervalMills; private Properties properties; //刷新定时任务 private ScheduledExecutorService executorService; //最大重试次数,默认值是3 private int maxRetry; public NamingProxy(String namespaceId, String endpoint, String serverList, Properties properties) { this.vipSrvRefInterMillis = TimeUnit.SECONDS.toMillis(30L); this.securityInfoRefreshIntervalMills = TimeUnit.SECONDS.toMillis(5L); this.securityProxy = new SecurityProxy(properties, this.nacosRestTemplate); this.properties = properties; this.setServerPort(8848); this.namespaceId = namespaceId; this.endpoint = endpoint; this.maxRetry = ConvertUtils.toInt(properties.getProperty("namingRequestDomainMaxRetryCount", String.valueOf(3))); if (StringUtils.isNotEmpty(serverList)) { this.serverList = Arrays.asList(serverList.split(",")); if (this.serverList.size() == 1) { this.nacosDomain = serverList; } } this.initRefreshTask(); } //初始化刷新定时任务 private void initRefreshTask() { //初始化线程池 this.executorService = new ScheduledThreadPoolExecutor(2, new ThreadFactory() { public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.naming.updater"); t.setDaemon(true); return t; } }); this.refreshSrvIfNeed(); this.securityProxy.login(this.getServerList()); //设置定时刷新任务 this.executorService.scheduleWithFixedDelay(new Runnable() { public void run() { NamingProxy.this.refreshSrvIfNeed(); } }, 0L, this.vipSrvRefInterMillis, TimeUnit.MILLISECONDS); // this.executorService.scheduleWithFixedDelay(new Runnable() { public void run() { NamingProxy.this.securityProxy.login(NamingProxy.this.getServerList()); } }, 0L, this.securityInfoRefreshIntervalMills, TimeUnit.MILLISECONDS); } // public List<String> getServerListFromEndpoint() { try { String urlString = "http://" + this.endpoint + "/nacos/serverlist"; Header header = this.builderHeader(); HttpRestResult<String> restResult = this.nacosRestTemplate.get(urlString, header, Query.EMPTY, String.class); if (!restResult.ok()) { throw new IOException("Error while requesting: " + urlString + "'. Server returned: " + restResult.getCode()); } else { String content = (String)restResult.getData(); List<String> list = new ArrayList(); Iterator var6 = IoUtils.readLines(new StringReader(content)).iterator(); while(var6.hasNext()) { String line = (String)var6.next(); if (!line.trim().isEmpty()) { list.add(line.trim()); } } return list; } } catch (Exception var8) { var8.printStackTrace(); return null; } } //进行刷新 private void refreshSrvIfNeed() { try { if (!CollectionUtils.isEmpty(this.serverList)) { return; } if (System.currentTimeMillis() - this.lastSrvRefTime < this.vipSrvRefInterMillis) { return; } List<String> list = this.getServerListFromEndpoint(); if (CollectionUtils.isEmpty(list)) { throw new Exception("Can not acquire Nacos list"); } if (!CollectionUtils.isEqualCollection(list, this.serversFromEndpoint)) { } this.serversFromEndpoint = list; this.lastSrvRefTime = System.currentTimeMillis(); } catch (Throwable var2) { LogUtils.NAMING_LOGGER.warn("failed to update server list", var2); } } //注册服务 public void registerService(String serviceName, String groupName, Instance instance) throws NacosException { Map<String, String> params = new HashMap(16); params.put("namespaceId", this.namespaceId); params.put("serviceName", serviceName); params.put("groupName", groupName); params.put("clusterName", instance.getClusterName()); params.put("ip", instance.getIp()); params.put("port", String.valueOf(instance.getPort())); params.put("weight", String.valueOf(instance.getWeight())); params.put("enable", String.valueOf(instance.isEnabled())); params.put("healthy", String.valueOf(instance.isHealthy())); params.put("ephemeral", String.valueOf(instance.isEphemeral())); params.put("metadata", JacksonUtils.toJson(instance.getMetadata())); // 把上述服务实例的一些必要参数保存到一个Map中,通过OpenAPI的方式发送注册请求 this.reqApi(UtilAndComs.nacosUrlInstance, params, "POST"); } //注销服务 public void deregisterService(String serviceName, Instance instance) throws NacosException { Map<String, String> params = new HashMap(8); params.put("namespaceId", this.namespaceId); params.put("serviceName", serviceName); params.put("clusterName", instance.getClusterName()); params.put("ip", instance.getIp()); params.put("port", String.valueOf(instance.getPort())); params.put("ephemeral", String.valueOf(instance.isEphemeral())); this.reqApi(UtilAndComs.nacosUrlInstance, params, "DELETE"); } public void updateInstance(String serviceName, String groupName, Instance instance) throws NacosException { Map<String, String> params = new HashMap(8); params.put("namespaceId", this.namespaceId); params.put("serviceName", serviceName); params.put("groupName", groupName); params.put("clusterName", instance.getClusterName()); params.put("ip", instance.getIp()); params.put("port", String.valueOf(instance.getPort())); params.put("weight", String.valueOf(instance.getWeight())); params.put("enabled", String.valueOf(instance.isEnabled())); params.put("ephemeral", String.valueOf(instance.isEphemeral())); params.put("metadata", JacksonUtils.toJson(instance.getMetadata())); this.reqApi(UtilAndComs.nacosUrlInstance, params, "PUT"); } public Service queryService(String serviceName, String groupName) throws NacosException { Map<String, String> params = new HashMap(3); params.put("namespaceId", this.namespaceId); params.put("serviceName", serviceName); params.put("groupName", groupName); String result = this.reqApi(UtilAndComs.nacosUrlService, params, "GET"); return (Service)JacksonUtils.toObj(result, Service.class); } public void createService(Service service, AbstractSelector selector) throws NacosException { Map<String, String> params = new HashMap(6); params.put("namespaceId", this.namespaceId); params.put("serviceName", service.getName()); params.put("groupName", service.getGroupName()); params.put("protectThreshold", String.valueOf(service.getProtectThreshold())); params.put("metadata", JacksonUtils.toJson(service.getMetadata())); params.put("selector", JacksonUtils.toJson(selector)); this.reqApi(UtilAndComs.nacosUrlService, params, "POST"); } public boolean deleteService(String serviceName, String groupName) throws NacosException { Map<String, String> params = new HashMap(6); params.put("namespaceId", this.namespaceId); params.put("serviceName", serviceName); params.put("groupName", groupName); String result = this.reqApi(UtilAndComs.nacosUrlService, params, "DELETE"); return "ok".equals(result); } public void updateService(Service service, AbstractSelector selector) throws NacosException { Map<String, String> params = new HashMap(6); params.put("namespaceId", this.namespaceId); params.put("serviceName", service.getName()); params.put("groupName", service.getGroupName()); params.put("protectThreshold", String.valueOf(service.getProtectThreshold())); params.put("metadata", JacksonUtils.toJson(service.getMetadata())); params.put("selector", JacksonUtils.toJson(selector)); this.reqApi(UtilAndComs.nacosUrlService, params, "PUT"); } public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly) throws NacosException { Map<String, String> params = new HashMap(8); params.put("namespaceId", this.namespaceId); params.put("serviceName", serviceName); params.put("clusters", clusters); params.put("udpPort", String.valueOf(udpPort)); params.put("clientIP", NetUtils.localIP()); params.put("healthyOnly", String.valueOf(healthyOnly)); return this.reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, "GET"); } public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException { if (LogUtils.NAMING_LOGGER.isDebugEnabled()) { LogUtils.NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", this.namespaceId, beatInfo.toString()); } Map<String, String> params = new HashMap(8); Map<String, String> bodyMap = new HashMap(2); if (!lightBeatEnabled) { bodyMap.put("beat", JacksonUtils.toJson(beatInfo)); } params.put("namespaceId", this.namespaceId); params.put("serviceName", beatInfo.getServiceName()); params.put("clusterName", beatInfo.getCluster()); params.put("ip", beatInfo.getIp()); params.put("port", String.valueOf(beatInfo.getPort())); String result = this.reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, "PUT"); return JacksonUtils.toObj(result); } public boolean serverHealthy() { try { String result = this.reqApi(UtilAndComs.nacosUrlBase + "/operator/metrics", new HashMap(2), "GET"); JsonNode json = JacksonUtils.toObj(result); String serverStatus = json.get("status").asText(); return "UP".equals(serverStatus); } catch (Exception var4) { return false; } } //查询服务列表 public ListView<String> getServiceList(int pageNo, int pageSize, String groupName) throws NacosException { return this.getServiceList(pageNo, pageSize, groupName, (AbstractSelector)null); } public ListView<String> getServiceList(int pageNo, int pageSize, String groupName, AbstractSelector selector) throws NacosException { Map<String, String> params = new HashMap(4); params.put("pageNo", String.valueOf(pageNo)); params.put("pageSize", String.valueOf(pageSize)); params.put("namespaceId", this.namespaceId); params.put("groupName", groupName); if (selector != null) { switch(SelectorType.valueOf(selector.getType())) { case none: default: break; case label: ExpressionSelector expressionSelector = (ExpressionSelector)selector; params.put("selector", JacksonUtils.toJson(expressionSelector)); } } String result = this.reqApi(UtilAndComs.nacosUrlBase + "/service/list", params, "GET"); JsonNode json = JacksonUtils.toObj(result); ListView<String> listView = new ListView(); listView.setCount(json.get("count").asInt()); listView.setData((List)JacksonUtils.toObj(json.get("doms").toString(), new TypeReference<List<String>>() { })); return listView; } public String reqApi(String api, Map<String, String> params, String method) throws NacosException { return this.reqApi(api, params, Collections.EMPTY_MAP, method); } public String reqApi(String api, Map<String, String> params, Map<String, String> body, String method) throws NacosException { return this.reqApi(api, params, body, this.getServerList(), method); } public String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers, String method) throws NacosException { params.put("namespaceId", this.getNamespaceId()); if (CollectionUtils.isEmpty(servers) && StringUtils.isBlank(this.nacosDomain)) { throw new NacosException(400, "no server available"); } else { NacosException exception = new NacosException(); if (StringUtils.isNotBlank(this.nacosDomain)) { int i = 0; while(i < this.maxRetry) { try { return this.callServer(api, params, body, this.nacosDomain, method); } catch (NacosException var12) { exception = var12; if (LogUtils.NAMING_LOGGER.isDebugEnabled()) { LogUtils.NAMING_LOGGER.debug("request {} failed.", this.nacosDomain, var12); } ++i; } } } else { Random random = new Random(System.currentTimeMillis()); int index = random.nextInt(servers.size()); int i = 0; while(i < servers.size()) { String server = (String)servers.get(index); try { return this.callServer(api, params, body, server, method); } catch (NacosException var13) { exception = var13; if (LogUtils.NAMING_LOGGER.isDebugEnabled()) { LogUtils.NAMING_LOGGER.debug("request {} failed.", server, var13); } index = (index + 1) % servers.size(); ++i; } } } throw new NacosException(exception.getErrCode(), "failed to req API:" + api + " after all servers(" + servers + ") tried: " + exception.getMessage()); } } private List<String> getServerList() { List<String> snapshot = this.serversFromEndpoint; if (!CollectionUtils.isEmpty(this.serverList)) { snapshot = this.serverList; } return snapshot; } public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer) throws NacosException { return this.callServer(api, params, body, curServer, "GET"); } public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer, String method) throws NacosException { long start = System.currentTimeMillis(); long end = 0L; this.injectSecurityInfo(params); Header header = this.builderHeader(); String url; if (!curServer.startsWith("https://") && !curServer.startsWith("http://")) { if (!IPUtil.containsPort(curServer)) { curServer = curServer + ":" + this.serverPort; } url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api; } else { url = curServer + api; } try { HttpRestResult<String> restResult = this.nacosRestTemplate.exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class); end = System.currentTimeMillis(); MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode())).observe((double)(end - start)); if (restResult.ok()) { return (String)restResult.getData(); } else if (304 == restResult.getCode()) { return ""; } else { throw new NacosException(restResult.getCode(), restResult.getMessage()); } } catch (Exception var13) { LogUtils.NAMING_LOGGER.error("[NA] failed to request", var13); throw new NacosException(500, var13); } } private void injectSecurityInfo(Map<String, String> params) { if (StringUtils.isNotBlank(this.securityProxy.getAccessToken())) { params.put("accessToken", this.securityProxy.getAccessToken()); } String ak = this.getAccessKey(); String sk = this.getSecretKey(); params.put("app", AppNameUtils.getAppName()); if (StringUtils.isNotBlank(ak) && StringUtils.isNotBlank(sk)) { try { String signData = getSignData((String)params.get("serviceName")); String signature = SignUtil.sign(signData, sk); params.put("signature", signature); params.put("data", signData); params.put("ak", ak); } catch (Exception var6) { LogUtils.NAMING_LOGGER.error("inject ak/sk failed.", var6); } } } public Header builderHeader() { Header header = Header.newInstance(); header.addParam("Client-Version", VersionUtils.version); header.addParam("User-Agent", UtilAndComs.VERSION); header.addParam("Accept-Encoding", "gzip,deflate,sdch"); header.addParam("Connection", "Keep-Alive"); header.addParam("RequestId", UuidUtils.generateUuid()); header.addParam("Request-Module", "Naming"); return header; } private static String getSignData(String serviceName) { return StringUtils.isNotEmpty(serviceName) ? System.currentTimeMillis() + "@@" + serviceName : String.valueOf(System.currentTimeMillis()); } public String getAccessKey() { return this.properties == null ? SpasAdapter.getAk() : TemplateUtils.stringEmptyAndThenExecute(this.properties.getProperty("accessKey"), new Callable<String>() { public String call() { return SpasAdapter.getAk(); } }); } public String getSecretKey() { return this.properties == null ? SpasAdapter.getSk() : TemplateUtils.stringEmptyAndThenExecute(this.properties.getProperty("secretKey"), new Callable<String>() { public String call() throws Exception { return SpasAdapter.getSk(); } }); } public void setProperties(Properties properties) { this.properties = properties; this.setServerPort(8848); } public String getNamespaceId() { return this.namespaceId; } public void setServerPort(int serverPort) { this.serverPort = serverPort; String sp = System.getProperty("nacos.naming.exposed.port"); if (StringUtils.isNotBlank(sp)) { this.serverPort = Integer.parseInt(sp); } } public void shutdown() throws NacosException { String className = this.getClass().getName(); LogUtils.NAMING_LOGGER.info("{} do shutdown begin", className); ThreadUtils.shutdownThreadPool(this.executorService, LogUtils.NAMING_LOGGER); NamingHttpClientManager.getInstance().shutdown(); SpasAdapter.freeCredentialInstance(); LogUtils.NAMING_LOGGER.info("{} do shutdown stop", className); } }
BeatReactor用于向Nacos服务端发送已注册服务的心跳。
成员变量Map<String, BeatInfo> dom2Beat中保存了需要发送的BeatInfo,key为{serviceName}#{ip}#{port},value为对应的BeatInfo。
BeatReactor会启动名为com.alibaba.nacos.naming.beat.sender的线程来发送心跳,默认线程数为1~CPU核心数的一半,可由namingClientBeatThreadCount参数指定。
默认情况下每5秒发送一次心跳,可根据Nacos服务端返回的clientBeatInterval的值调整心跳间隔。
public class BeatReactor implements Closeable { //定时任务线程池 private final ScheduledExecutorService executorService; // private final NamingProxy serverProxy; private boolean lightBeatEnabled; //心跳包Map public final Map<String, BeatInfo> dom2Beat; public BeatReactor(NamingProxy serverProxy) { this(serverProxy, UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT); } public BeatReactor(NamingProxy serverProxy, int threadCount) { this.lightBeatEnabled = false; this.dom2Beat = new ConcurrentHashMap(); this.serverProxy = serverProxy; this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() { public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName("com.alibaba.nacos.naming.beat.sender"); return thread; } }); } //添加心跳包到定时线程池中 public void addBeatInfo(String serviceName, BeatInfo beatInfo) { //构造心跳包key String key = this.buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()); BeatInfo existBeat = null; //从dom2Beat移除该key,如果key对应的value不为空的话,表明该beatInfo已经存在了,则停止心跳检测 if ((existBeat = (BeatInfo)this.dom2Beat.remove(key)) != null) { //停止心跳检测 existBeat.setStopped(true); } //如果dom2Beat中不存在该key,则将key放到map中,并进行定时心跳检测 this.dom2Beat.put(key, beatInfo); //定时任务线程池,定时执行里面的线程 this.executorService.schedule(new BeatReactor.BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS); MetricsMonitor.getDom2BeatSizeMonitor().set((double)this.dom2Beat.size()); } public void removeBeatInfo(String serviceName, String ip, int port) { //从心跳包map中删除对应的心跳包信息 BeatInfo beatInfo = (BeatInfo)this.dom2Beat.remove(this.buildKey(serviceName, ip, port)); //将心跳包状态设置为stop if (beatInfo != null) { beatInfo.setStopped(true); MetricsMonitor.getDom2BeatSizeMonitor().set((double)this.dom2Beat.size()); } } //根据服务实例,构造心跳包 public BeatInfo buildBeatInfo(Instance instance) { return this.buildBeatInfo(instance.getServiceName(), instance); } public BeatInfo buildBeatInfo(String groupedServiceName, Instance instance) { BeatInfo beatInfo = new BeatInfo(); beatInfo.setServiceName(groupedServiceName); beatInfo.setIp(instance.getIp()); beatInfo.setPort(instance.getPort()); beatInfo.setCluster(instance.getClusterName()); beatInfo.setWeight(instance.getWeight()); beatInfo.setMetadata(instance.getMetadata()); beatInfo.setScheduled(false); beatInfo.setPeriod(instance.getInstanceHeartBeatInterval()); return beatInfo; } public String buildKey(String serviceName, String ip, int port) { return serviceName + "#" + ip + "#" + port; } public void shutdown() throws NacosException { String className = this.getClass().getName(); ThreadUtils.shutdownThreadPool(this.executorService, LogUtils.NAMING_LOGGER); } //心跳包发送线程 class BeatTask implements Runnable { BeatInfo beatInfo; public BeatTask(BeatInfo beatInfo) { this.beatInfo = beatInfo; } public void run() { //如果心跳包检查没有停止,则发送心跳包 if (!this.beatInfo.isStopped()) { long nextTime = this.beatInfo.getPeriod(); try { //发送心跳包 JsonNode result = BeatReactor.this.serverProxy.sendBeat(this.beatInfo, BeatReactor.this.lightBeatEnabled); long interval = result.get("clientBeatInterval").asLong(); boolean lightBeatEnabled = false; if (result.has("lightBeatEnabled")) { lightBeatEnabled = result.get("lightBeatEnabled").asBoolean(); } BeatReactor.this.lightBeatEnabled = lightBeatEnabled; if (interval > 0L) { nextTime = interval; } int code = 10200; if (result.has("code")) { code = result.get("code").asInt(); } //如果该实例没有在注册中心注册,则进行注册 if (code == 20404) { Instance instance = new Instance(); instance.setPort(this.beatInfo.getPort()); instance.setIp(this.beatInfo.getIp()); instance.setWeight(this.beatInfo.getWeight()); instance.setMetadata(this.beatInfo.getMetadata()); instance.setClusterName(this.beatInfo.getCluster()); instance.setServiceName(this.beatInfo.getServiceName()); instance.setInstanceId(instance.getInstanceId()); instance.setEphemeral(true); try { //注册服务 BeatReactor.this.serverProxy.registerService(this.beatInfo.getServiceName(), NamingUtils.getGroupName(this.beatInfo.getServiceName()), instance); } catch (Exception var15) { //log日志 } } } catch (NacosException var16) { //log日志 } catch (Exception var17) { //log日志 } finally { //将线程再次放到定时任务线程池中执行下次的心跳包发送 BeatReactor.this.executorService.schedule(BeatReactor.this.new BeatTask(this.beatInfo), nextTime, TimeUnit.MILLISECONDS); } } } } }
HostReactor用于获取、保存、更新各Service实例信息。
成员变量Map<String, ServiceInfo> serviceInfoMap中保存了已获取到的服务的信息,key为{服务名}@@{集群名}。
HostReactor会启动名为com.alibaba.nacos.client.naming.updater的线程来更新服务信息,默认线程数为1~CPU核心数的一半,可由namingPollingThreadCount参数指定。
定时任务UpdateTask会根据服务的cacheMillis值定时更新服务信息,默认值为10秒。该定时任务会在获取某一服务信息时创建,保存在成员变量Map<String, ScheduledFuture<?>> futureMap中。
public class HostReactor implements Closeable { private static final long DEFAULT_DELAY = 1000L; private static final long UPDATE_HOLD_INTERVAL = 5000L; private final Map<String, ScheduledFuture<?>> futureMap; // 本地已存在的服务列表,key是服务名称,value是ServiceInfo private final Map<String, ServiceInfo> serviceInfoMap; // 待更新的实例列表 private final Map<String, Object> updatingMap; private final PushReceiver pushReceiver; private final BeatReactor beatReactor; private final NamingProxy serverProxy; private final FailoverReactor failoverReactor; private final String cacheDir; private final boolean pushEmptyProtection; // 定时任务(负责服务列表的实时更新) private final ScheduledExecutorService executor; //实例变化通知者,负责管理服务的订阅信息,并进行回调 private final InstancesChangeNotifier notifier; public HostReactor(NamingProxy serverProxy, BeatReactor beatReactor, String cacheDir) { this(serverProxy, beatReactor, cacheDir, false, false, UtilAndComs.DEFAULT_POLLING_THREAD_COUNT); } public HostReactor(NamingProxy serverProxy, BeatReactor beatReactor, String cacheDir, boolean loadCacheAtStart, boolean pushEmptyProtection, int pollingThreadCount) { this.futureMap = new HashMap(); this.executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() { public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName("com.alibaba.nacos.client.naming.updater"); return thread; } }); this.beatReactor = beatReactor; this.serverProxy = serverProxy; this.cacheDir = cacheDir; if (loadCacheAtStart) { this.serviceInfoMap = new ConcurrentHashMap(DiskCache.read(this.cacheDir)); } else { this.serviceInfoMap = new ConcurrentHashMap(16); } this.pushEmptyProtection = pushEmptyProtection; this.updatingMap = new ConcurrentHashMap(); this.failoverReactor = new FailoverReactor(this, cacheDir); this.pushReceiver = new PushReceiver(this); this.notifier = new InstancesChangeNotifier(); NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384); NotifyCenter.registerSubscriber(this.notifier); } public Map<String, ServiceInfo> getServiceInfoMap() { return this.serviceInfoMap; } //增加定时刷新服务任务 public synchronized ScheduledFuture<?> addTask(HostReactor.UpdateTask task) { return this.executor.schedule(task, 1000L, TimeUnit.MILLISECONDS); } //订阅服务,serviceName服务名称,clusters集群列表,EventLintener回调Listener public void subscribe(String serviceName, String clusters, EventListener eventListener) { //给该服务增加监听器,服务发生变化后进行回调 this.notifier.registerListener(serviceName, clusters, eventListener); //将该服务添加到HostReactor的定时任务中,定时刷新 this.getServiceInfo(serviceName, clusters); } //取消订阅服务,serviceName服务名称,clusters集群列表,EventLintener回调Listener public void unSubscribe(String serviceName, String clusters, EventListener eventListener) { this.notifier.deregisterListener(serviceName, clusters, eventListener); } public List<ServiceInfo> getSubscribeServices() { return this.notifier.getSubscribeServices(); } //处理从注册中心获取到的JSON格式的服务实例,并更新到本地serviceInfoMap中 public ServiceInfo processServiceJson(String json) { ServiceInfo serviceInfo = (ServiceInfo)JacksonUtils.toObj(json, ServiceInfo.class); String serviceKey = serviceInfo.getKey(); if (serviceKey == null) { return null; } else { ServiceInfo oldService = (ServiceInfo)this.serviceInfoMap.get(serviceKey); if (this.pushEmptyProtection && !serviceInfo.validate()) { return oldService; } else { boolean changed = false; if (oldService != null) { if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) { LogUtils.NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: " + serviceInfo.getLastRefTime()); } this.serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); Map<String, Instance> oldHostMap = new HashMap(oldService.getHosts().size()); Iterator var7 = oldService.getHosts().iterator(); while(var7.hasNext()) { Instance host = (Instance)var7.next(); oldHostMap.put(host.toInetAddr(), host); } Map<String, Instance> newHostMap = new HashMap(serviceInfo.getHosts().size()); Iterator var17 = serviceInfo.getHosts().iterator(); while(var17.hasNext()) { Instance host = (Instance)var17.next(); newHostMap.put(host.toInetAddr(), host); } Set<Instance> modHosts = new HashSet(); Set<Instance> newHosts = new HashSet(); Set<Instance> remvHosts = new HashSet(); List<Entry<String, Instance>> newServiceHosts = new ArrayList(newHostMap.entrySet()); Iterator var12 = newServiceHosts.iterator(); while(true) { Entry entry; Instance host; String key; while(var12.hasNext()) { entry = (Entry)var12.next(); host = (Instance)entry.getValue(); key = (String)entry.getKey(); if (oldHostMap.containsKey(key) && !StringUtils.equals(host.toString(), ((Instance)oldHostMap.get(key)).toString())) { modHosts.add(host); } else if (!oldHostMap.containsKey(key)) { newHosts.add(host); } } var12 = oldHostMap.entrySet().iterator(); while(var12.hasNext()) { entry = (Entry)var12.next(); host = (Instance)entry.getValue(); key = (String)entry.getKey(); if (!newHostMap.containsKey(key) && !newHostMap.containsKey(key)) { remvHosts.add(host); } } if (newHosts.size() > 0) { changed = true; } if (remvHosts.size() > 0) { changed = true; } if (modHosts.size() > 0) { changed = true; this.updateBeatInfo(modHosts); } serviceInfo.setJsonFromServer(json); if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) { NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts())); DiskCache.write(serviceInfo, this.cacheDir); } break; } } else { changed = true; this.serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts())); serviceInfo.setJsonFromServer(json); DiskCache.write(serviceInfo, this.cacheDir); } MetricsMonitor.getServiceInfoMapSizeMonitor().set((double)this.serviceInfoMap.size()); if (changed) { //记录日志 } return serviceInfo; } } } private void updateBeatInfo(Set<Instance> modHosts) { Iterator var2 = modHosts.iterator(); while(var2.hasNext()) { Instance instance = (Instance)var2.next(); String key = this.beatReactor.buildKey(instance.getServiceName(), instance.getIp(), instance.getPort()); if (this.beatReactor.dom2Beat.containsKey(key) && instance.isEphemeral()) { BeatInfo beatInfo = this.beatReactor.buildBeatInfo(instance); this.beatReactor.addBeatInfo(instance.getServiceName(), beatInfo); } } } //从本地缓存中获取服务实例信息 private ServiceInfo getServiceInfo0(String serviceName, String clusters) { String key = ServiceInfo.getKey(serviceName, clusters); return (ServiceInfo)this.serviceInfoMap.get(key); } //直接从服务注册中心获取服务 public ServiceInfo getServiceInfoDirectlyFromServer(String serviceName, String clusters) throws NacosException { String result = this.serverProxy.queryList(serviceName, clusters, 0, false); return StringUtils.isNotEmpty(result) ? (ServiceInfo)JacksonUtils.toObj(result, ServiceInfo.class) : null; } //获取服务,先从本地获取,本地没有,则进行维护,并从注册中心更新最新服务信息 public ServiceInfo getServiceInfo(String serviceName, String clusters) { String key = ServiceInfo.getKey(serviceName, clusters); if (this.failoverReactor.isFailoverSwitch()) { return this.failoverReactor.getService(key); } else { // 1.先通过serverName即服务名获得一个serviceInfo ServiceInfo serviceObj = this.getServiceInfo0(serviceName, clusters); if (null == serviceObj) { //如果没有serviceInfo,则通过传进来的参数new出一个新的serviceInfo对象,并且同时维护到本地Map和更新Map serviceObj = new ServiceInfo(serviceName, clusters); this.serviceInfoMap.put(serviceObj.getKey(), serviceObj); this.updatingMap.put(serviceName, new Object()); // 2.updateServiceNow(),立刻去Nacos服务端拉取该服务最新实例列表,更新serviceInfoMap this.updateServiceNow(serviceName, clusters); this.updatingMap.remove(serviceName); } else if (this.updatingMap.containsKey(serviceName)) { synchronized(serviceObj) { try { serviceObj.wait(5000L); } catch (InterruptedException var8) { } } } // 3.定时更新实例信息 this.scheduleUpdateIfAbsent(serviceName, clusters); // 4.最后返回服务实例数据(前面已经进行了更新) return (ServiceInfo)this.serviceInfoMap.get(serviceObj.getKey()); } } //立即从注册中心拉取该服务最新实例列表,并更新到本地 private void updateServiceNow(String serviceName, String clusters) { try { this.updateService(serviceName, clusters); } catch (NacosException var4) { } } //通过定时任务,每10秒去更新一次数据 public void scheduleUpdateIfAbsent(String serviceName, String clusters) { if (this.futureMap.get(ServiceInfo.getKey(serviceName, clusters)) == null) { synchronized(this.futureMap) { if (this.futureMap.get(ServiceInfo.getKey(serviceName, clusters)) == null) { //创建一个UpdateTask的更新线程任务,每10秒去异步更新集合数据 ScheduledFuture<?> future = this.addTask(new HostReactor.UpdateTask(serviceName, clusters)); this.futureMap.put(ServiceInfo.getKey(serviceName, clusters), future); } } } } //从注册中心拉取该服务最新实例列表,并更新到本地 public void updateService(String serviceName, String clusters) throws NacosException { ServiceInfo oldService = this.getServiceInfo0(serviceName, clusters); boolean var12 = false; try { var12 = true; //从注册中心查询服务下的实例列表 String result = this.serverProxy.queryList(serviceName, clusters, this.pushReceiver.getUdpPort(), false); if (StringUtils.isNotEmpty(result)) { //处理从注册中心获取到的服务实例JSON数据,更新本地服务列表 this.processServiceJson(result); var12 = false; } else { var12 = false; } } finally { if (var12) { if (oldService != null) { synchronized(oldService) { oldService.notifyAll(); } } } } if (oldService != null) { synchronized(oldService) { oldService.notifyAll(); } } } //仅仅执行刷新,从Nacos注册中心获取服务,但不刷新本地列表 public void refreshOnly(String serviceName, String clusters) { try { this.serverProxy.queryList(serviceName, clusters, this.pushReceiver.getUdpPort(), false); } catch (Exception var4) { } } public void shutdown() throws NacosException { String className = this.getClass().getName(); LogUtils.NAMING_LOGGER.info("{} do shutdown begin", className); ThreadUtils.shutdownThreadPool(this.executor, LogUtils.NAMING_LOGGER); this.pushReceiver.shutdown(); this.failoverReactor.shutdown(); NotifyCenter.deregisterSubscriber(this.notifier); LogUtils.NAMING_LOGGER.info("{} do shutdown stop", className); } //更新任务线程 public class UpdateTask implements Runnable { long lastRefTime = 9223372036854775807L; private final String clusters; private final String serviceName; private int failCount = 0; public UpdateTask(String serviceName, String clusters) { this.serviceName = serviceName; this.clusters = clusters; } private void incFailCount() { int limit = 6; if (this.failCount != limit) { ++this.failCount; } } private void resetFailCount() { this.failCount = 0; } public void run() { long delayTime = 1000L; try { //从本地缓存中获取服务实例列表 ServiceInfo serviceObj = (ServiceInfo)HostReactor.this.serviceInfoMap.get(ServiceInfo.getKey(this.serviceName, this.clusters)); //如果服务为空,则更新本地服务列表 if (serviceObj == null) { HostReactor.this.updateService(this.serviceName, this.clusters); return; } // 过期服务(服务的最新更新时间小于等于缓存刷新时间),从注册中心重新查询 if (serviceObj.getLastRefTime() <= this.lastRefTime) { HostReactor.this.updateService(this.serviceName, this.clusters); serviceObj = (ServiceInfo)HostReactor.this.serviceInfoMap.get(ServiceInfo.getKey(this.serviceName, this.clusters)); } else { HostReactor.this.refreshOnly(this.serviceName, this.clusters); } // 刷新更新时间 this.lastRefTime = serviceObj.getLastRefTime(); // 判断该注册的Service是否被订阅,如果没有订阅则不再执行 if (!HostReactor.this.notifier.isSubscribed(this.serviceName, this.clusters) && !HostReactor.this.futureMap.containsKey(ServiceInfo.getKey(this.serviceName, this.clusters))) { return; } if (CollectionUtils.isEmpty(serviceObj.getHosts())) { this.incFailCount(); return; } // 下次更新缓存时间设置,默认为10秒 delayTime = serviceObj.getCacheMillis(); //任务运行成功,重置失败次数为0 this.resetFailCount(); } catch (Throwable var7) { //任务执行失败,失败次数+1 this.incFailCount(); } finally { //取delayTime<<failCount的值与60000L之间的小值,作为下次运行的时间间隔 //下次调度刷新时间,下次执行的时间与failCount有关 HostReactor.this.executor.schedule(this, Math.min(delayTime << this.failCount, 60000L), TimeUnit.MILLISECONDS); } } } }
PushReceiver用于接收Nacos服务端的推送,初始化时会创建DatagramSocket使用UDP的方式接收推送。
会启动1个名为com.alibaba.nacos.naming.push.receiver的线程。
public class PushReceiver implements Runnable, Closeable { private static final Charset UTF_8 = Charset.forName("UTF-8"); private static final int UDP_MSS = 65536; private ScheduledExecutorService executorService; private DatagramSocket udpSocket; private HostReactor hostReactor; private volatile boolean closed = false; public static String getPushReceiverUdpPort() { return System.getenv("push.receiver.udp.port"); } public PushReceiver(HostReactor hostReactor) { try { this.hostReactor = hostReactor; String udpPort = getPushReceiverUdpPort(); if (StringUtils.isEmpty(udpPort)) { this.udpSocket = new DatagramSocket(); } else { this.udpSocket = new DatagramSocket(new InetSocketAddress(Integer.parseInt(udpPort))); } this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() { public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName("com.alibaba.nacos.naming.push.receiver"); return thread; } }); this.executorService.execute(this); } catch (Exception var3) { LogUtils.NAMING_LOGGER.error("[NA] init udp socket failed", var3); } } public void run() { while(!this.closed) { try { byte[] buffer = new byte[65536]; DatagramPacket packet = new DatagramPacket(buffer, buffer.length); this.udpSocket.receive(packet); String json = (new String(IoUtils.tryDecompress(packet.getData()), UTF_8)).trim(); LogUtils.NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString()); PushReceiver.PushPacket pushPacket = (PushReceiver.PushPacket)JacksonUtils.toObj(json, PushReceiver.PushPacket.class); String ack; if (!"dom".equals(pushPacket.type) && !"service".equals(pushPacket.type)) { if ("dump".equals(pushPacket.type)) { ack = "{\"type\": \"dump-ack\", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(this.hostReactor.getServiceInfoMap())) + "\"}"; } else { ack = "{\"type\": \"unknown-ack\", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":\"\"}"; } } else { //调用HostReactor处理收到的json数据 this.hostReactor.processServiceJson(pushPacket.data); ack = "{\"type\": \"push-ack\", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":\"\"}"; } this.udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length, packet.getSocketAddress())); } catch (Exception var6) { if (this.closed) { return; } LogUtils.NAMING_LOGGER.error("[NA] error while receiving push data", var6); } } } public void shutdown() throws NacosException { String className = this.getClass().getName(); LogUtils.NAMING_LOGGER.info("{} do shutdown begin", className); ThreadUtils.shutdownThreadPool(this.executorService, LogUtils.NAMING_LOGGER); this.closed = true; this.udpSocket.close(); LogUtils.NAMING_LOGGER.info("{} do shutdown stop", className); } public int getUdpPort() { return this.udpSocket.getLocalPort(); } public static class PushPacket { public String type; public long lastRefTime; public String data; public PushPacket() { } } }
该对象负责保存服务实例的监听,当服务实例发生变化的时候,负责进行通知,回调监听器方法
public class InstancesChangeNotifier extends Subscriber<InstancesChangeEvent> { //监听器map private final Map<String, ConcurrentHashSet<EventListener>> listenerMap = new ConcurrentHashMap(); private final Object lock = new Object(); public InstancesChangeNotifier() { } //注册服务监听器 public void registerListener(String serviceName, String clusters, EventListener listener) { String key = ServiceInfo.getKey(serviceName, clusters); ConcurrentHashSet<EventListener> eventListeners = (ConcurrentHashSet)this.listenerMap.get(key); if (eventListeners == null) { synchronized(this.lock) { eventListeners = (ConcurrentHashSet)this.listenerMap.get(key); if (eventListeners == null) { eventListeners = new ConcurrentHashSet(); this.listenerMap.put(key, eventListeners); } } } eventListeners.add(listener); } //取消注册服务监听器 public void deregisterListener(String serviceName, String clusters, EventListener listener) { String key = ServiceInfo.getKey(serviceName, clusters); ConcurrentHashSet<EventListener> eventListeners = (ConcurrentHashSet)this.listenerMap.get(key); if (eventListeners != null) { eventListeners.remove(listener); if (CollectionUtils.isEmpty(eventListeners)) { this.listenerMap.remove(key); } } } //判断是否被订阅 public boolean isSubscribed(String serviceName, String clusters) { String key = ServiceInfo.getKey(serviceName, clusters); ConcurrentHashSet<EventListener> eventListeners = (ConcurrentHashSet)this.listenerMap.get(key); return CollectionUtils.isNotEmpty(eventListeners); } public List<ServiceInfo> getSubscribeServices() { List<ServiceInfo> serviceInfos = new ArrayList(); Iterator var2 = this.listenerMap.keySet().iterator(); while(var2.hasNext()) { String key = (String)var2.next(); serviceInfos.add(ServiceInfo.fromKey(key)); } return serviceInfos; } //服务实例变化事件 public void onEvent(InstancesChangeEvent event) { String key = ServiceInfo.getKey(event.getServiceName(), event.getClusters()); //获取该服务的订阅者 ConcurrentHashSet<EventListener> eventListeners = (ConcurrentHashSet)this.listenerMap.get(key); if (!CollectionUtils.isEmpty(eventListeners)) { Iterator var4 = eventListeners.iterator(); while(true) { //循环该服务的订阅者,调用订阅者的回调方法 while(var4.hasNext()) { final EventListener listener = (EventListener)var4.next(); final Event namingEvent = this.transferToNamingEvent(event); //如果该listener继承了Nacos定义的AbstractEventListener,并且executor不为空,则通过executor以新的线程的方式回调监听onEvent方法 if (listener instanceof AbstractEventListener && ((AbstractEventListener)listener).getExecutor() != null) { ((AbstractEventListener)listener).getExecutor().execute(new Runnable() { public void run() { listener.onEvent(namingEvent); } }); } //否则直接调用该监听器的onEvent方法 else { listener.onEvent(namingEvent); } } //回调结束,返回 return; } } } //转换事件 private Event transferToNamingEvent(InstancesChangeEvent instancesChangeEvent) { return new NamingEvent(instancesChangeEvent.getServiceName(), instancesChangeEvent.getGroupName(), instancesChangeEvent.getClusters(), instancesChangeEvent.getHosts()); } public Class<? extends com.alibaba.nacos.common.notify.Event> subscribeType() { return InstancesChangeEvent.class; } }
参考http://dreamphp.cn/blog/detail?blog_id=25851
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。