当前位置:   article > 正文

Nacos对spring-cloud服务注册的实现_de-registering

de-registering

Nacos架构图

在这里插入图片描述

  • Provider APP:服务提供者。

  • Consumer APP:服务消费者。

  • Name Server:通过Virtual IP或者DNS的方式实现Nacos高可用集群的服务路由。

  • Nacos Server:Nacos服务提供者

    • OpenAPI是功能访问入口
    • Config Service:Nacos提供的配置服务模块
    • Naming Service:Nacos提供的名字服务模块
    • Consistency Protocol:一致性协议,用来实现Nacos集群节点的数据同步,使用Raft算法实现。
  • Nacos Console:Nacos控制台。

    服务提供者通过VIP(Virtual IP)访问Nacos Server高可用集群,基于OpenAPI完成服务的注册和服务的查询。
    Nacos Server本身可以支持主备模式,所以底层会采用数据一致性算法(Raft)来完成节点的数据同步。
    消费者也是如此,基于OpenAPI从Nacos Server中查询服务列表。

Nacos服务注册与发现的实现原理图

在这里插入图片描述
服务注册与发现的功能:

  • 服务实例启动时注册到服务注册表、关闭时则注销(服务注册)
  • 服务注册中心根据服务实例定时发送的心跳包,实现健康检测(健康检查BeatReactor中的BeatTask)
  • 服务消费者可以通过查询服务注册表来获得可用的实例(服务发现)
  • 服务消费者定时拉取服务注册中心的服务实例数据(HostReactor中的UpdateTask)
  • 服务注册中心检测到服务提供者异常,主动通过UDP协议推送更新给服务消费者(PushReceiver)

NacosServiceRegistry

public class NacosServiceRegistry implements ServiceRegistry<Registration> {
    private static final String STATUS_UP = "UP";
    private static final String STATUS_DOWN = "DOWN";
    //Nacos服务注册与发现的配置属性信息
    private final NacosDiscoveryProperties nacosDiscoveryProperties;
    //NacosService管理者,通过其创建NamingService,NamingService通过OpenAPI与Nacos服务器通信,调用Nacos服务器对外暴露的接口
    @Autowired
    private NacosServiceManager nacosServiceManager;

    public NacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {
        this.nacosDiscoveryProperties = nacosDiscoveryProperties;
    }
    //注册服务
    public void register(Registration registration) {
        if (StringUtils.isEmpty(registration.getServiceId())) {
            log.warn("No service to register for nacos client...");
        } else {
            NamingService namingService = this.namingService();
            String serviceId = registration.getServiceId();
            String group = this.nacosDiscoveryProperties.getGroup();
            Instance instance = this.getNacosInstanceFromRegistration(registration);

            try {
                //通过nacos的namingService注册服务实例
                namingService.registerInstance(serviceId, group, instance);
                log.info("nacos registry, {} {} {}:{} register finished", new Object[]{group, serviceId, instance.getIp(), instance.getPort()});
            } catch (Exception var7) {
                if (this.nacosDiscoveryProperties.isFailFast()) {
                    log.error("nacos registry, {} register failed...{},", new Object[]{serviceId, registration.toString(), var7});
                    ReflectionUtils.rethrowRuntimeException(var7);
                } else {
                    log.warn("Failfast is false. {} register failed...{},", new Object[]{serviceId, registration.toString(), var7});
                }
            }

        }
    }
    //服务实例注销
    public void deregister(Registration registration) {
        log.info("De-registering from Nacos Server now...");
        if (StringUtils.isEmpty(registration.getServiceId())) {
            log.warn("No dom to de-register for nacos client...");
        } else {
            NamingService namingService = this.namingService();
            String serviceId = registration.getServiceId();
            String group = this.nacosDiscoveryProperties.getGroup();

            try {
                namingService.deregisterInstance(serviceId, group, registration.getHost(), registration.getPort(), this.nacosDiscoveryProperties.getClusterName());
            } catch (Exception var6) {
                log.error("ERR_NACOS_DEREGISTER, de-register failed...{},", registration.toString(), var6);
            }

            log.info("De-registration finished.");
        }
    }

    public void close() {
        try {
            this.nacosServiceManager.nacosServiceShutDown();
        } catch (NacosException var2) {
            log.error("Nacos namingService shutDown failed", var2);
        }

    }
    //设置服务实例的状态
    public void setStatus(Registration registration, String status) {
        if (!"UP".equalsIgnoreCase(status) && !"DOWN".equalsIgnoreCase(status)) {
            log.warn("can't support status {},please choose UP or DOWN", status);
        } else {
            String serviceId = registration.getServiceId();
            Instance instance = this.getNacosInstanceFromRegistration(registration);
            if ("DOWN".equalsIgnoreCase(status)) {
                instance.setEnabled(false);
            } else {
                instance.setEnabled(true);
            }

            try {
                Properties nacosProperties = this.nacosDiscoveryProperties.getNacosProperties();
                this.nacosServiceManager.getNamingMaintainService(nacosProperties).updateInstance(serviceId, this.nacosDiscoveryProperties.getGroup(), instance);
            } catch (Exception var6) {
                throw new RuntimeException("update nacos instance status fail", var6);
            }
        }
    }
    //获取服务实例的状态
    public Object getStatus(Registration registration) {
        String serviceName = registration.getServiceId();
        String group = this.nacosDiscoveryProperties.getGroup();

        try {
            List<Instance> instances = this.namingService().getAllInstances(serviceName, group);
            Iterator var5 = instances.iterator();

            while(var5.hasNext()) {
                Instance instance = (Instance)var5.next();
                if (instance.getIp().equalsIgnoreCase(this.nacosDiscoveryProperties.getIp()) && instance.getPort() == this.nacosDiscoveryProperties.getPort()) {
                    return instance.isEnabled() ? "UP" : "DOWN";
                }
            }
        } catch (Exception var7) {
            log.error("get all instance of {} error,", serviceName, var7);
        }

        return null;
    }
    //将Registration转换成nacos的服务实例
    private Instance getNacosInstanceFromRegistration(Registration registration) {
        Instance instance = new Instance();
        instance.setIp(registration.getHost());
        instance.setPort(registration.getPort());
        instance.setWeight((double)this.nacosDiscoveryProperties.getWeight());
        instance.setClusterName(this.nacosDiscoveryProperties.getClusterName());
        instance.setEnabled(this.nacosDiscoveryProperties.isInstanceEnabled());
        instance.setMetadata(registration.getMetadata());
        instance.setEphemeral(this.nacosDiscoveryProperties.isEphemeral());
        return instance;
    }
    //获取namingService
    private NamingService namingService() {
        return this.nacosServiceManager.getNamingService(this.nacosDiscoveryProperties.getNacosProperties());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124

NacosRegistration

public class NacosRegistration implements Registration, ServiceInstance {
    public static final String MANAGEMENT_PORT = "management.port";
    public static final String MANAGEMENT_CONTEXT_PATH = "management.context-path";
    public static final String MANAGEMENT_ADDRESS = "management.address";
    public static final String MANAGEMENT_ENDPOINT_BASE_PATH = "management.endpoints.web.base-path";
    private List<NacosRegistrationCustomizer> registrationCustomizers;
    private NacosDiscoveryProperties nacosDiscoveryProperties;
    private ApplicationContext context;

    public NacosRegistration(List<NacosRegistrationCustomizer> registrationCustomizers, NacosDiscoveryProperties nacosDiscoveryProperties, ApplicationContext context) {
        this.registrationCustomizers = registrationCustomizers;
        this.nacosDiscoveryProperties = nacosDiscoveryProperties;
        this.context = context;
    }
    //@PostConstruct注解用来修饰非静态void方法,用于依赖注入之后,进行一些初始化工作
    @PostConstruct
    public void init() {
        Map<String, String> metadata = this.nacosDiscoveryProperties.getMetadata();
        Environment env = this.context.getEnvironment();
        String endpointBasePath = env.getProperty("management.endpoints.web.base-path");
        if (!StringUtils.isEmpty(endpointBasePath)) {
            metadata.put("management.endpoints.web.base-path", endpointBasePath);
        }

        Integer managementPort = ManagementServerPortUtils.getPort(this.context);
        if (null != managementPort) {
            metadata.put("management.port", managementPort.toString());
            String contextPath = env.getProperty("management.server.servlet.context-path");
            String address = env.getProperty("management.server.address");
            if (!StringUtils.isEmpty(contextPath)) {
                metadata.put("management.context-path", contextPath);
            }

            if (!StringUtils.isEmpty(address)) {
                metadata.put("management.address", address);
            }
        }

        if (null != this.nacosDiscoveryProperties.getHeartBeatInterval()) {
            metadata.put("preserved.heart.beat.interval", this.nacosDiscoveryProperties.getHeartBeatInterval().toString());
        }

        if (null != this.nacosDiscoveryProperties.getHeartBeatTimeout()) {
            metadata.put("preserved.heart.beat.timeout", this.nacosDiscoveryProperties.getHeartBeatTimeout().toString());
        }

        if (null != this.nacosDiscoveryProperties.getIpDeleteTimeout()) {
            metadata.put("preserved.ip.delete.timeout", this.nacosDiscoveryProperties.getIpDeleteTimeout().toString());
        }

        customize(this.registrationCustomizers, this);
    }

    private static void customize(List<NacosRegistrationCustomizer> registrationCustomizers, NacosRegistration registration) {
        if (registrationCustomizers != null) {
            Iterator var2 = registrationCustomizers.iterator();

            while(var2.hasNext()) {
                NacosRegistrationCustomizer customizer = (NacosRegistrationCustomizer)var2.next();
                customizer.customize(registration);
            }
        }

    }

    public String getServiceId() {
        return this.nacosDiscoveryProperties.getService();
    }

    public String getHost() {
        return this.nacosDiscoveryProperties.getIp();
    }

    public int getPort() {
        return this.nacosDiscoveryProperties.getPort();
    }

    public void setPort(int port) {
        this.nacosDiscoveryProperties.setPort(port);
    }

    public boolean isSecure() {
        return this.nacosDiscoveryProperties.isSecure();
    }

    public URI getUri() {
        return DefaultServiceInstance.getUri(this);
    }

    public Map<String, String> getMetadata() {
        return this.nacosDiscoveryProperties.getMetadata();
    }

    public boolean isRegisterEnabled() {
        return this.nacosDiscoveryProperties.isRegisterEnabled();
    }

    public String getCluster() {
        return this.nacosDiscoveryProperties.getClusterName();
    }

    public float getRegisterWeight() {
        return this.nacosDiscoveryProperties.getWeight();
    }

    public NacosDiscoveryProperties getNacosDiscoveryProperties() {
        return this.nacosDiscoveryProperties;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109

NacosAutoServiceRegistration

public class NacosAutoServiceRegistration extends AbstractAutoServiceRegistration<Registration> {
    //Nacos对Registration的实现
    private NacosRegistration registration;

    public NacosAutoServiceRegistration(ServiceRegistry<Registration> serviceRegistry, AutoServiceRegistrationProperties autoServiceRegistrationProperties, NacosRegistration registration) {
        super(serviceRegistry, autoServiceRegistrationProperties);
        this.registration = registration;
    }

    /** @deprecated */
    @Deprecated
    public void setPort(int port) {
        this.getPort().set(port);
    }

    protected NacosRegistration getRegistration() {
        if (this.registration.getPort() < 0 && this.getPort().get() > 0) {
            this.registration.setPort(this.getPort().get());
        }

        Assert.isTrue(this.registration.getPort() > 0, "service.port has not been set");
        return this.registration;
    }

    protected NacosRegistration getManagementRegistration() {
        return null;
    }
    //进行服务注册,调用类的register,再有父类的调用ServiceRegistry的实现类NacosServiceRegistry
    protected void register() {
        //判断Nacos服务注册的配置决定是否进行服务注册
        if (!this.registration.getNacosDiscoveryProperties().isRegisterEnabled()) {
            log.debug("Registration disabled.");
        } else {
            if (this.registration.getPort() < 0) {
                this.registration.setPort(this.getPort().get());
            }
            //调用父类register,再由父类调用NacosServiceRegistry进行真实的服务注册
            super.register();
        }
    }

    protected void registerManagement() {
        if (this.registration.getNacosDiscoveryProperties().isRegisterEnabled()) {
            super.registerManagement();
        }
    }
    ......
    //监听NacosDiscoveryInfoChangedEvent事件
    @EventListener
    public void onNacosDiscoveryInfoChangedEvent(NacosDiscoveryInfoChangedEvent event) {
        this.restart();
    }

    private void restart() {
        this.stop();
        this.start();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58

NacosNamingService

Nacos Client包中的NamingService实现类为NacosNamingService,通过封装好的SDK供用户使用,来调用nacos对外暴露的OpenAPI

SDK方式只是提供了一种访问的封装,在底层仍然是基于HTTP协议完成请求的。

NamingService提供了以下方法:

  • registerInstance:注册实例

  • deregisterInstance:注销实例

  • getAllInstances:获取某一服务的所有实例

  • selectInstances:获取某一服务健康或不健康的实例

  • selectOneHealthyInstance:根据权重选择一个健康的实例

  • getServerStatus:检测服务端健康状态

  • subscribe:注册对某个服务的监听

  • unsubscribe:注销对某个服务的监听

  • getSubscribeServices:获取被监听的服务

  • getServicesOfServer:获取命名空间(namespace)下的所有服务名

NacosNamingService还初始化了其他核心类,外提供的方法都是委托给其他核心类处理的。按顺序将依次初始化NamingProxy、BeatReactor、HostReactor

  • NamingProxy:用于与Nacos服务端通信,注册服务、注销服务、发送心跳等都经由NamingProxy来请求服务端
  • BeatReactor:本地实例心跳,用于向Nacos服务端发送本地服务的心跳
  • HostReactor:用于从注册中心获取、保存、更新各服务实例信息
public class NacosNamingService implements NamingService {
    private String namespace;
    private String endpoint;
    private String serverList;
    private String cacheDir;
    private String logName;
    private HostReactor hostReactor;
    //心跳包响应
    private BeatReactor beatReactor;
    //进行服务注册的带来,通过NamingProxy与Nacos Server进行最终的通信
    private NamingProxy serverProxy;

    public NacosNamingService(String serverList) throws NacosException {
        Properties properties = new Properties();
        properties.setProperty("serverAddr", serverList);
        this.init(properties);
    }

    public NacosNamingService(Properties properties) throws NacosException {
        this.init(properties);
    }

    private void init(Properties properties) throws NacosException {
        ValidatorUtils.checkInitParam(properties);
        this.namespace = InitUtils.initNamespaceForNaming(properties);
        InitUtils.initSerialization();
        this.initServerAddr(properties);
        InitUtils.initWebRootContext(properties);
        this.initCacheDir();
        this.initLogName(properties);
        //NamingService网络层代理
        this.serverProxy = new NamingProxy(this.namespace, this.endpoint, this.serverList, properties);
        //心跳包检测线程池
        this.beatReactor = new BeatReactor(this.serverProxy, this.initClientBeatThreadCount(properties));
        this.hostReactor = new HostReactor(this.serverProxy, this.beatReactor, this.cacheDir, this.isLoadCacheAtStart(properties), this.isPushEmptyProtect(properties), this.initPollingThreadCount(properties));
    }
     ......省略......

    //注册服务,委托NamingProxy处理
    public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
        NamingUtils.checkInstanceIsLegal(instance);
        String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
        //如果是临时节点
        if (instance.isEphemeral()) {
            //构造心跳包
            BeatInfo beatInfo = this.beatReactor.buildBeatInfo(groupedServiceName, instance);
            //将心跳包加到定时线程池中定时执行
            this.beatReactor.addBeatInfo(groupedServiceName, beatInfo);
        }
        //服务注册
        this.serverProxy.registerService(groupedServiceName, groupName, instance);
    }
    //注销服务,委托NamingProxy处理
    public void deregisterInstance(String serviceName, String groupName, Instance instance) throws NacosException {
        //如果是临时节点,则移除心跳包
        if (instance.isEphemeral()) {
            this.beatReactor.removeBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), instance.getIp(), instance.getPort());
        }
        //调用NamingProxy进行服务注销
        this.serverProxy.deregisterService(NamingUtils.getGroupedName(serviceName, groupName), instance);
    }

    //获取所有服务实例的方法,委托HostReactor处理
    public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters, boolean subscribe) throws NacosException {
        ServiceInfo serviceInfo;
        // 如果该消费者订阅了这个服务,那么会先从本地维护的服务列表中获取,本地为空再从服务注册中心获取服务
        if (subscribe) {
            serviceInfo = this.hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
        } else {
            // 否则实例会从服务中心进行获取
            serviceInfo = this.hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
        }

        List list;
        return (List)(serviceInfo != null && !CollectionUtils.isEmpty(list = serviceInfo.getHosts()) ? list : new ArrayList());
    }

    //获取健康(不健康)服务实例方法,委托HostReactor处理
    public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException {
        ServiceInfo serviceInfo;
        // 如果该消费者订阅了这个服务,那么会在本地维护一个服务列表,服务从本地获取
        if (subscribe) {
            serviceInfo = this.hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
        } else {
            // 否则实例会从服务中心进行获取
            serviceInfo = this.hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
        }

        return this.selectInstances(serviceInfo, healthy);
    }

    private List<Instance> selectInstances(ServiceInfo serviceInfo, boolean healthy) {
        List list;
        if (serviceInfo != null && !CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
            Iterator iterator = list.iterator();

            while(true) {
                Instance instance;
                do {
                    if (!iterator.hasNext()) {
                        return list;
                    }

                    instance = (Instance)iterator.next();
                } while(healthy == instance.isHealthy() && instance.isEnabled() && instance.getWeight() > 0.0D);

                iterator.remove();
            }
        } else {
            return new ArrayList();
        }
    }

    //获取一个健康的实例,委托HostReactor处理
    public Instance selectOneHealthyInstance(String serviceName, String groupName, List<String> clusters, boolean subscribe) throws NacosException {
        return subscribe ? RandomByWeight.selectHost(this.hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","))) : RandomByWeight.selectHost(this.hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")));
    }

    //监听服务实例,委托HostReactor处理
    public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener) throws NacosException {
        this.hostReactor.subscribe(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","), listener);
    }
    //取消监听服务
    public void unsubscribe(String serviceName, String groupName, List<String> clusters, EventListener listener) throws NacosException {
        this.hostReactor.unSubscribe(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","), listener);
    }

    //查询服务列表
    public ListView<String> getServicesOfServer(int pageNo, int pageSize, String groupName, AbstractSelector selector) throws NacosException {
        return this.serverProxy.getServiceList(pageNo, pageSize, groupName, selector);
    }

    public List<ServiceInfo> getSubscribeServices() {
        return this.hostReactor.getSubscribeServices();
    }

    public String getServerStatus() {
        return this.serverProxy.serverHealthy() ? "UP" : "DOWN";
    }

    public BeatReactor getBeatReactor() {
        return this.beatReactor;
    }

    public void shutDown() throws NacosException {
        this.beatReactor.shutdown();
        this.hostReactor.shutdown();
        this.serverProxy.shutdown();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150

NamingProxy

NamingProxy用于与Nacos服务端通信,注册服务、注销服务、发送心跳等都经由NamingProxy来请求服务端。

NamingProxy会启动1个名为com.alibaba.nacos.client.naming.serverlist.updater的线程,用于定期调用refreshSrvIfNeed()方法更新Nacos服务端地址,默认间隔为30秒。

public class NamingProxy implements Closeable {
    //Nacos自定义的RestTemplate
    private final NacosRestTemplate nacosRestTemplate = NamingHttpClientManager.getInstance().getNacosRestTemplate();
    //默认服务端口
    private static final int DEFAULT_SERVER_PORT = 8848;
    private int serverPort = 8848;
    //命名空间
    private final String namespaceId;
    private final String endpoint;
    private String nacosDomain;
    private List<String> serverList;
    private List<String> serversFromEndpoint = new ArrayList();
    private final SecurityProxy securityProxy;
    private long lastSrvRefTime = 0L;
    private final long vipSrvRefInterMillis;
    private final long securityInfoRefreshIntervalMills;
    private Properties properties;
    //刷新定时任务
    private ScheduledExecutorService executorService;
    //最大重试次数,默认值是3
    private int maxRetry;

    public NamingProxy(String namespaceId, String endpoint, String serverList, Properties properties) {
        this.vipSrvRefInterMillis = TimeUnit.SECONDS.toMillis(30L);
        this.securityInfoRefreshIntervalMills = TimeUnit.SECONDS.toMillis(5L);
        this.securityProxy = new SecurityProxy(properties, this.nacosRestTemplate);
        this.properties = properties;
        this.setServerPort(8848);
        this.namespaceId = namespaceId;
        this.endpoint = endpoint;
        this.maxRetry = ConvertUtils.toInt(properties.getProperty("namingRequestDomainMaxRetryCount", String.valueOf(3)));
        if (StringUtils.isNotEmpty(serverList)) {
            this.serverList = Arrays.asList(serverList.split(","));
            if (this.serverList.size() == 1) {
                this.nacosDomain = serverList;
            }
        }

        this.initRefreshTask();
    }
    //初始化刷新定时任务
    private void initRefreshTask() {
        //初始化线程池
        this.executorService = new ScheduledThreadPoolExecutor(2, new ThreadFactory() {
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.naming.updater");
                t.setDaemon(true);
                return t;
            }
        });
        this.refreshSrvIfNeed();
        this.securityProxy.login(this.getServerList());
        //设置定时刷新任务
        this.executorService.scheduleWithFixedDelay(new Runnable() {
            public void run() {
                NamingProxy.this.refreshSrvIfNeed();
            }
        }, 0L, this.vipSrvRefInterMillis, TimeUnit.MILLISECONDS);
        //
        this.executorService.scheduleWithFixedDelay(new Runnable() {
            public void run() {
                NamingProxy.this.securityProxy.login(NamingProxy.this.getServerList());
            }
        }, 0L, this.securityInfoRefreshIntervalMills, TimeUnit.MILLISECONDS);
    }

    //
    public List<String> getServerListFromEndpoint() {
        try {
            String urlString = "http://" + this.endpoint + "/nacos/serverlist";
            Header header = this.builderHeader();
            HttpRestResult<String> restResult = this.nacosRestTemplate.get(urlString, header, Query.EMPTY, String.class);
            if (!restResult.ok()) {
                throw new IOException("Error while requesting: " + urlString + "'. Server returned: " + restResult.getCode());
            } else {
                String content = (String)restResult.getData();
                List<String> list = new ArrayList();
                Iterator var6 = IoUtils.readLines(new StringReader(content)).iterator();

                while(var6.hasNext()) {
                    String line = (String)var6.next();
                    if (!line.trim().isEmpty()) {
                        list.add(line.trim());
                    }
                }

                return list;
            }
        } catch (Exception var8) {
            var8.printStackTrace();
            return null;
        }
    }
    //进行刷新
    private void refreshSrvIfNeed() {
        try {
            if (!CollectionUtils.isEmpty(this.serverList)) {
                return;
            }
            if (System.currentTimeMillis() - this.lastSrvRefTime < this.vipSrvRefInterMillis) {
                return;
            }

            List<String> list = this.getServerListFromEndpoint();
            if (CollectionUtils.isEmpty(list)) {
                throw new Exception("Can not acquire Nacos list");
            }

            if (!CollectionUtils.isEqualCollection(list, this.serversFromEndpoint)) {
            }

            this.serversFromEndpoint = list;
            this.lastSrvRefTime = System.currentTimeMillis();
        } catch (Throwable var2) {
            LogUtils.NAMING_LOGGER.warn("failed to update server list", var2);
        }

    }
    //注册服务
    public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
        
        Map<String, String> params = new HashMap(16);
        params.put("namespaceId", this.namespaceId);
        params.put("serviceName", serviceName);
        params.put("groupName", groupName);
        params.put("clusterName", instance.getClusterName());
        params.put("ip", instance.getIp());
        params.put("port", String.valueOf(instance.getPort()));
        params.put("weight", String.valueOf(instance.getWeight()));
        params.put("enable", String.valueOf(instance.isEnabled()));
        params.put("healthy", String.valueOf(instance.isHealthy()));
        params.put("ephemeral", String.valueOf(instance.isEphemeral()));
        params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
        // 把上述服务实例的一些必要参数保存到一个Map中,通过OpenAPI的方式发送注册请求
        this.reqApi(UtilAndComs.nacosUrlInstance, params, "POST");
    }
    //注销服务
    public void deregisterService(String serviceName, Instance instance) throws NacosException {
       
        Map<String, String> params = new HashMap(8);
        params.put("namespaceId", this.namespaceId);
        params.put("serviceName", serviceName);
        params.put("clusterName", instance.getClusterName());
        params.put("ip", instance.getIp());
        params.put("port", String.valueOf(instance.getPort()));
        params.put("ephemeral", String.valueOf(instance.isEphemeral()));
        this.reqApi(UtilAndComs.nacosUrlInstance, params, "DELETE");
    }

    public void updateInstance(String serviceName, String groupName, Instance instance) throws NacosException {
       
        Map<String, String> params = new HashMap(8);
        params.put("namespaceId", this.namespaceId);
        params.put("serviceName", serviceName);
        params.put("groupName", groupName);
        params.put("clusterName", instance.getClusterName());
        params.put("ip", instance.getIp());
        params.put("port", String.valueOf(instance.getPort()));
        params.put("weight", String.valueOf(instance.getWeight()));
        params.put("enabled", String.valueOf(instance.isEnabled()));
        params.put("ephemeral", String.valueOf(instance.isEphemeral()));
        params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
        this.reqApi(UtilAndComs.nacosUrlInstance, params, "PUT");
    }

    public Service queryService(String serviceName, String groupName) throws NacosException {
        
        Map<String, String> params = new HashMap(3);
        params.put("namespaceId", this.namespaceId);
        params.put("serviceName", serviceName);
        params.put("groupName", groupName);
        String result = this.reqApi(UtilAndComs.nacosUrlService, params, "GET");
        return (Service)JacksonUtils.toObj(result, Service.class);
    }

    public void createService(Service service, AbstractSelector selector) throws NacosException {
        
        Map<String, String> params = new HashMap(6);
        params.put("namespaceId", this.namespaceId);
        params.put("serviceName", service.getName());
        params.put("groupName", service.getGroupName());
        params.put("protectThreshold", String.valueOf(service.getProtectThreshold()));
        params.put("metadata", JacksonUtils.toJson(service.getMetadata()));
        params.put("selector", JacksonUtils.toJson(selector));
        this.reqApi(UtilAndComs.nacosUrlService, params, "POST");
    }

    public boolean deleteService(String serviceName, String groupName) throws NacosException {
        
        Map<String, String> params = new HashMap(6);
        params.put("namespaceId", this.namespaceId);
        params.put("serviceName", serviceName);
        params.put("groupName", groupName);
        String result = this.reqApi(UtilAndComs.nacosUrlService, params, "DELETE");
        return "ok".equals(result);
    }

    public void updateService(Service service, AbstractSelector selector) throws NacosException {
       
        Map<String, String> params = new HashMap(6);
        params.put("namespaceId", this.namespaceId);
        params.put("serviceName", service.getName());
        params.put("groupName", service.getGroupName());
        params.put("protectThreshold", String.valueOf(service.getProtectThreshold()));
        params.put("metadata", JacksonUtils.toJson(service.getMetadata()));
        params.put("selector", JacksonUtils.toJson(selector));
        this.reqApi(UtilAndComs.nacosUrlService, params, "PUT");
    }

    public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly) throws NacosException {
        Map<String, String> params = new HashMap(8);
        params.put("namespaceId", this.namespaceId);
        params.put("serviceName", serviceName);
        params.put("clusters", clusters);
        params.put("udpPort", String.valueOf(udpPort));
        params.put("clientIP", NetUtils.localIP());
        params.put("healthyOnly", String.valueOf(healthyOnly));
        return this.reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, "GET");
    }

    public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {
        if (LogUtils.NAMING_LOGGER.isDebugEnabled()) {
            LogUtils.NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", this.namespaceId, beatInfo.toString());
        }

        Map<String, String> params = new HashMap(8);
        Map<String, String> bodyMap = new HashMap(2);
        if (!lightBeatEnabled) {
            bodyMap.put("beat", JacksonUtils.toJson(beatInfo));
        }

        params.put("namespaceId", this.namespaceId);
        params.put("serviceName", beatInfo.getServiceName());
        params.put("clusterName", beatInfo.getCluster());
        params.put("ip", beatInfo.getIp());
        params.put("port", String.valueOf(beatInfo.getPort()));
        String result = this.reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, "PUT");
        return JacksonUtils.toObj(result);
    }

    public boolean serverHealthy() {
        try {
            String result = this.reqApi(UtilAndComs.nacosUrlBase + "/operator/metrics", new HashMap(2), "GET");
            JsonNode json = JacksonUtils.toObj(result);
            String serverStatus = json.get("status").asText();
            return "UP".equals(serverStatus);
        } catch (Exception var4) {
            return false;
        }
    }
    //查询服务列表
    public ListView<String> getServiceList(int pageNo, int pageSize, String groupName) throws NacosException {
        return this.getServiceList(pageNo, pageSize, groupName, (AbstractSelector)null);
    }

    public ListView<String> getServiceList(int pageNo, int pageSize, String groupName, AbstractSelector selector) throws NacosException {
        Map<String, String> params = new HashMap(4);
        params.put("pageNo", String.valueOf(pageNo));
        params.put("pageSize", String.valueOf(pageSize));
        params.put("namespaceId", this.namespaceId);
        params.put("groupName", groupName);
        if (selector != null) {
            switch(SelectorType.valueOf(selector.getType())) {
            case none:
            default:
                break;
            case label:
                ExpressionSelector expressionSelector = (ExpressionSelector)selector;
                params.put("selector", JacksonUtils.toJson(expressionSelector));
            }
        }

        String result = this.reqApi(UtilAndComs.nacosUrlBase + "/service/list", params, "GET");
        JsonNode json = JacksonUtils.toObj(result);
        ListView<String> listView = new ListView();
        listView.setCount(json.get("count").asInt());
        listView.setData((List)JacksonUtils.toObj(json.get("doms").toString(), new TypeReference<List<String>>() {
        }));
        return listView;
    }

    public String reqApi(String api, Map<String, String> params, String method) throws NacosException {
        return this.reqApi(api, params, Collections.EMPTY_MAP, method);
    }

    public String reqApi(String api, Map<String, String> params, Map<String, String> body, String method) throws NacosException {
        return this.reqApi(api, params, body, this.getServerList(), method);
    }

    public String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers, String method) throws NacosException {
        params.put("namespaceId", this.getNamespaceId());
        if (CollectionUtils.isEmpty(servers) && StringUtils.isBlank(this.nacosDomain)) {
            throw new NacosException(400, "no server available");
        } else {
            NacosException exception = new NacosException();
            if (StringUtils.isNotBlank(this.nacosDomain)) {
                int i = 0;

                while(i < this.maxRetry) {
                    try {
                        return this.callServer(api, params, body, this.nacosDomain, method);
                    } catch (NacosException var12) {
                        exception = var12;
                        if (LogUtils.NAMING_LOGGER.isDebugEnabled()) {
                            LogUtils.NAMING_LOGGER.debug("request {} failed.", this.nacosDomain, var12);
                        }

                        ++i;
                    }
                }
            } else {
                Random random = new Random(System.currentTimeMillis());
                int index = random.nextInt(servers.size());
                int i = 0;

                while(i < servers.size()) {
                    String server = (String)servers.get(index);

                    try {
                        return this.callServer(api, params, body, server, method);
                    } catch (NacosException var13) {
                        exception = var13;
                        if (LogUtils.NAMING_LOGGER.isDebugEnabled()) {
                            LogUtils.NAMING_LOGGER.debug("request {} failed.", server, var13);
                        }

                        index = (index + 1) % servers.size();
                        ++i;
                    }
                }
            }

            throw new NacosException(exception.getErrCode(), "failed to req API:" + api + " after all servers(" + servers + ") tried: " + exception.getMessage());
        }
    }

    private List<String> getServerList() {
        List<String> snapshot = this.serversFromEndpoint;
        if (!CollectionUtils.isEmpty(this.serverList)) {
            snapshot = this.serverList;
        }

        return snapshot;
    }

    public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer) throws NacosException {
        return this.callServer(api, params, body, curServer, "GET");
    }

    public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer, String method) throws NacosException {
        long start = System.currentTimeMillis();
        long end = 0L;
        this.injectSecurityInfo(params);
        Header header = this.builderHeader();
        String url;
        if (!curServer.startsWith("https://") && !curServer.startsWith("http://")) {
            if (!IPUtil.containsPort(curServer)) {
                curServer = curServer + ":" + this.serverPort;
            }

            url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api;
        } else {
            url = curServer + api;
        }

        try {
            HttpRestResult<String> restResult = this.nacosRestTemplate.exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class);
            end = System.currentTimeMillis();
            MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode())).observe((double)(end - start));
            if (restResult.ok()) {
                return (String)restResult.getData();
            } else if (304 == restResult.getCode()) {
                return "";
            } else {
                throw new NacosException(restResult.getCode(), restResult.getMessage());
            }
        } catch (Exception var13) {
            LogUtils.NAMING_LOGGER.error("[NA] failed to request", var13);
            throw new NacosException(500, var13);
        }
    }

    private void injectSecurityInfo(Map<String, String> params) {
        if (StringUtils.isNotBlank(this.securityProxy.getAccessToken())) {
            params.put("accessToken", this.securityProxy.getAccessToken());
        }

        String ak = this.getAccessKey();
        String sk = this.getSecretKey();
        params.put("app", AppNameUtils.getAppName());
        if (StringUtils.isNotBlank(ak) && StringUtils.isNotBlank(sk)) {
            try {
                String signData = getSignData((String)params.get("serviceName"));
                String signature = SignUtil.sign(signData, sk);
                params.put("signature", signature);
                params.put("data", signData);
                params.put("ak", ak);
            } catch (Exception var6) {
                LogUtils.NAMING_LOGGER.error("inject ak/sk failed.", var6);
            }
        }

    }

    public Header builderHeader() {
        Header header = Header.newInstance();
        header.addParam("Client-Version", VersionUtils.version);
        header.addParam("User-Agent", UtilAndComs.VERSION);
        header.addParam("Accept-Encoding", "gzip,deflate,sdch");
        header.addParam("Connection", "Keep-Alive");
        header.addParam("RequestId", UuidUtils.generateUuid());
        header.addParam("Request-Module", "Naming");
        return header;
    }

    private static String getSignData(String serviceName) {
        return StringUtils.isNotEmpty(serviceName) ? System.currentTimeMillis() + "@@" + serviceName : String.valueOf(System.currentTimeMillis());
    }

    public String getAccessKey() {
        return this.properties == null ? SpasAdapter.getAk() : TemplateUtils.stringEmptyAndThenExecute(this.properties.getProperty("accessKey"), new Callable<String>() {
            public String call() {
                return SpasAdapter.getAk();
            }
        });
    }

    public String getSecretKey() {
        return this.properties == null ? SpasAdapter.getSk() : TemplateUtils.stringEmptyAndThenExecute(this.properties.getProperty("secretKey"), new Callable<String>() {
            public String call() throws Exception {
                return SpasAdapter.getSk();
            }
        });
    }

    public void setProperties(Properties properties) {
        this.properties = properties;
        this.setServerPort(8848);
    }

    public String getNamespaceId() {
        return this.namespaceId;
    }

    public void setServerPort(int serverPort) {
        this.serverPort = serverPort;
        String sp = System.getProperty("nacos.naming.exposed.port");
        if (StringUtils.isNotBlank(sp)) {
            this.serverPort = Integer.parseInt(sp);
        }

    }

    public void shutdown() throws NacosException {
        String className = this.getClass().getName();
        LogUtils.NAMING_LOGGER.info("{} do shutdown begin", className);
        ThreadUtils.shutdownThreadPool(this.executorService, LogUtils.NAMING_LOGGER);
        NamingHttpClientManager.getInstance().shutdown();
        SpasAdapter.freeCredentialInstance();
        LogUtils.NAMING_LOGGER.info("{} do shutdown stop", className);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238
  • 239
  • 240
  • 241
  • 242
  • 243
  • 244
  • 245
  • 246
  • 247
  • 248
  • 249
  • 250
  • 251
  • 252
  • 253
  • 254
  • 255
  • 256
  • 257
  • 258
  • 259
  • 260
  • 261
  • 262
  • 263
  • 264
  • 265
  • 266
  • 267
  • 268
  • 269
  • 270
  • 271
  • 272
  • 273
  • 274
  • 275
  • 276
  • 277
  • 278
  • 279
  • 280
  • 281
  • 282
  • 283
  • 284
  • 285
  • 286
  • 287
  • 288
  • 289
  • 290
  • 291
  • 292
  • 293
  • 294
  • 295
  • 296
  • 297
  • 298
  • 299
  • 300
  • 301
  • 302
  • 303
  • 304
  • 305
  • 306
  • 307
  • 308
  • 309
  • 310
  • 311
  • 312
  • 313
  • 314
  • 315
  • 316
  • 317
  • 318
  • 319
  • 320
  • 321
  • 322
  • 323
  • 324
  • 325
  • 326
  • 327
  • 328
  • 329
  • 330
  • 331
  • 332
  • 333
  • 334
  • 335
  • 336
  • 337
  • 338
  • 339
  • 340
  • 341
  • 342
  • 343
  • 344
  • 345
  • 346
  • 347
  • 348
  • 349
  • 350
  • 351
  • 352
  • 353
  • 354
  • 355
  • 356
  • 357
  • 358
  • 359
  • 360
  • 361
  • 362
  • 363
  • 364
  • 365
  • 366
  • 367
  • 368
  • 369
  • 370
  • 371
  • 372
  • 373
  • 374
  • 375
  • 376
  • 377
  • 378
  • 379
  • 380
  • 381
  • 382
  • 383
  • 384
  • 385
  • 386
  • 387
  • 388
  • 389
  • 390
  • 391
  • 392
  • 393
  • 394
  • 395
  • 396
  • 397
  • 398
  • 399
  • 400
  • 401
  • 402
  • 403
  • 404
  • 405
  • 406
  • 407
  • 408
  • 409
  • 410
  • 411
  • 412
  • 413
  • 414
  • 415
  • 416
  • 417
  • 418
  • 419
  • 420
  • 421
  • 422
  • 423
  • 424
  • 425
  • 426
  • 427
  • 428
  • 429
  • 430
  • 431
  • 432
  • 433
  • 434
  • 435
  • 436
  • 437
  • 438
  • 439
  • 440
  • 441
  • 442
  • 443
  • 444
  • 445
  • 446
  • 447
  • 448
  • 449
  • 450
  • 451
  • 452
  • 453
  • 454
  • 455
  • 456
  • 457
  • 458
  • 459
  • 460
  • 461
  • 462
  • 463

BeatReactor

在这里插入图片描述

BeatReactor用于向Nacos服务端发送已注册服务的心跳。

成员变量Map<String, BeatInfo> dom2Beat中保存了需要发送的BeatInfo,key为{serviceName}#{ip}#{port},value为对应的BeatInfo。

BeatReactor会启动名为com.alibaba.nacos.naming.beat.sender的线程来发送心跳,默认线程数为1~CPU核心数的一半,可由namingClientBeatThreadCount参数指定。

默认情况下每5秒发送一次心跳,可根据Nacos服务端返回的clientBeatInterval的值调整心跳间隔。

public class BeatReactor implements Closeable {
    //定时任务线程池
    private final ScheduledExecutorService executorService;
    //
    private final NamingProxy serverProxy;
    private boolean lightBeatEnabled;
    //心跳包Map
    public final Map<String, BeatInfo> dom2Beat;

    public BeatReactor(NamingProxy serverProxy) {
        this(serverProxy, UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT);
    }
    public BeatReactor(NamingProxy serverProxy, int threadCount) {
        this.lightBeatEnabled = false;
        this.dom2Beat = new ConcurrentHashMap();
        this.serverProxy = serverProxy;
        this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setDaemon(true);
                thread.setName("com.alibaba.nacos.naming.beat.sender");
                return thread;
            }
        });
    }
    //添加心跳包到定时线程池中
    public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
        //构造心跳包key
        String key = this.buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
        BeatInfo existBeat = null;
        //从dom2Beat移除该key,如果key对应的value不为空的话,表明该beatInfo已经存在了,则停止心跳检测
        if ((existBeat = (BeatInfo)this.dom2Beat.remove(key)) != null) {
            //停止心跳检测
            existBeat.setStopped(true);
        }
        //如果dom2Beat中不存在该key,则将key放到map中,并进行定时心跳检测
        this.dom2Beat.put(key, beatInfo);
        //定时任务线程池,定时执行里面的线程
        this.executorService.schedule(new BeatReactor.BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
        MetricsMonitor.getDom2BeatSizeMonitor().set((double)this.dom2Beat.size());
    }

    public void removeBeatInfo(String serviceName, String ip, int port) {
        //从心跳包map中删除对应的心跳包信息
        BeatInfo beatInfo = (BeatInfo)this.dom2Beat.remove(this.buildKey(serviceName, ip, port));
        //将心跳包状态设置为stop
        if (beatInfo != null) {
            beatInfo.setStopped(true);
            MetricsMonitor.getDom2BeatSizeMonitor().set((double)this.dom2Beat.size());
        }
    }
    //根据服务实例,构造心跳包
    public BeatInfo buildBeatInfo(Instance instance) {
        return this.buildBeatInfo(instance.getServiceName(), instance);
    }

    public BeatInfo buildBeatInfo(String groupedServiceName, Instance instance) {
        BeatInfo beatInfo = new BeatInfo();
        beatInfo.setServiceName(groupedServiceName);
        beatInfo.setIp(instance.getIp());
        beatInfo.setPort(instance.getPort());
        beatInfo.setCluster(instance.getClusterName());
        beatInfo.setWeight(instance.getWeight());
        beatInfo.setMetadata(instance.getMetadata());
        beatInfo.setScheduled(false);
        beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
        return beatInfo;
    }

    public String buildKey(String serviceName, String ip, int port) {
        return serviceName + "#" + ip + "#" + port;
    }

    public void shutdown() throws NacosException {
        String className = this.getClass().getName();
        ThreadUtils.shutdownThreadPool(this.executorService, LogUtils.NAMING_LOGGER);
    }
    //心跳包发送线程
    class BeatTask implements Runnable {
        BeatInfo beatInfo;

        public BeatTask(BeatInfo beatInfo) {
            this.beatInfo = beatInfo;
        }

        public void run() {
            //如果心跳包检查没有停止,则发送心跳包
            if (!this.beatInfo.isStopped()) {
                long nextTime = this.beatInfo.getPeriod();

                try {
                    //发送心跳包
                    JsonNode result = BeatReactor.this.serverProxy.sendBeat(this.beatInfo, BeatReactor.this.lightBeatEnabled);
                    long interval = result.get("clientBeatInterval").asLong();
                    boolean lightBeatEnabled = false;
                    if (result.has("lightBeatEnabled")) {
                        lightBeatEnabled = result.get("lightBeatEnabled").asBoolean();
                    }

                    BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
                    if (interval > 0L) {
                        nextTime = interval;
                    }

                    int code = 10200;
                    if (result.has("code")) {
                        code = result.get("code").asInt();
                    }
                    //如果该实例没有在注册中心注册,则进行注册
                    if (code == 20404) {
                        Instance instance = new Instance();
                        instance.setPort(this.beatInfo.getPort());
                        instance.setIp(this.beatInfo.getIp());
                        instance.setWeight(this.beatInfo.getWeight());
                        instance.setMetadata(this.beatInfo.getMetadata());
                        instance.setClusterName(this.beatInfo.getCluster());
                        instance.setServiceName(this.beatInfo.getServiceName());
                        instance.setInstanceId(instance.getInstanceId());
                        instance.setEphemeral(true);

                        try {
                           //注册服务
             BeatReactor.this.serverProxy.registerService(this.beatInfo.getServiceName(), NamingUtils.getGroupName(this.beatInfo.getServiceName()), instance);
                        } catch (Exception var15) {
                           //log日志
                        }
                    }
                } catch (NacosException var16) {
                    //log日志
                } catch (Exception var17) {
                    //log日志
                } finally {
                    //将线程再次放到定时任务线程池中执行下次的心跳包发送
                    BeatReactor.this.executorService.schedule(BeatReactor.this.new BeatTask(this.beatInfo), nextTime, TimeUnit.MILLISECONDS);
                }

            }
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141

HostReactor

在这里插入图片描述

HostReactor用于获取、保存、更新各Service实例信息。

成员变量Map<String, ServiceInfo> serviceInfoMap中保存了已获取到的服务的信息,key为{服务名}@@{集群名}。

HostReactor会启动名为com.alibaba.nacos.client.naming.updater的线程来更新服务信息,默认线程数为1~CPU核心数的一半,可由namingPollingThreadCount参数指定。

定时任务UpdateTask会根据服务的cacheMillis值定时更新服务信息,默认值为10秒。该定时任务会在获取某一服务信息时创建,保存在成员变量Map<String, ScheduledFuture<?>> futureMap中。

public class HostReactor implements Closeable {
    private static final long DEFAULT_DELAY = 1000L;
    private static final long UPDATE_HOLD_INTERVAL = 5000L;
    private final Map<String, ScheduledFuture<?>> futureMap;
    // 本地已存在的服务列表,key是服务名称,value是ServiceInfo
    private final Map<String, ServiceInfo> serviceInfoMap;
    // 待更新的实例列表
    private final Map<String, Object> updatingMap;
    private final PushReceiver pushReceiver;
    private final BeatReactor beatReactor;
    private final NamingProxy serverProxy;
    private final FailoverReactor failoverReactor;
    private final String cacheDir;
    private final boolean pushEmptyProtection;
    // 定时任务(负责服务列表的实时更新)
    private final ScheduledExecutorService executor;
    //实例变化通知者,负责管理服务的订阅信息,并进行回调
    private final InstancesChangeNotifier notifier;

    public HostReactor(NamingProxy serverProxy, BeatReactor beatReactor, String cacheDir) {
        this(serverProxy, beatReactor, cacheDir, false, false, UtilAndComs.DEFAULT_POLLING_THREAD_COUNT);
    }

    public HostReactor(NamingProxy serverProxy, BeatReactor beatReactor, String cacheDir, boolean loadCacheAtStart, boolean pushEmptyProtection, int pollingThreadCount) {
        this.futureMap = new HashMap();
        this.executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() {
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setDaemon(true);
                thread.setName("com.alibaba.nacos.client.naming.updater");
                return thread;
            }
        });
        this.beatReactor = beatReactor;
        this.serverProxy = serverProxy;
        this.cacheDir = cacheDir;
        if (loadCacheAtStart) {
            this.serviceInfoMap = new ConcurrentHashMap(DiskCache.read(this.cacheDir));
        } else {
            this.serviceInfoMap = new ConcurrentHashMap(16);
        }

        this.pushEmptyProtection = pushEmptyProtection;
        this.updatingMap = new ConcurrentHashMap();
        this.failoverReactor = new FailoverReactor(this, cacheDir);
        this.pushReceiver = new PushReceiver(this);
        this.notifier = new InstancesChangeNotifier();
        NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
        NotifyCenter.registerSubscriber(this.notifier);
    }

    public Map<String, ServiceInfo> getServiceInfoMap() {
        return this.serviceInfoMap;
    }
    //增加定时刷新服务任务
    public synchronized ScheduledFuture<?> addTask(HostReactor.UpdateTask task) {
        return this.executor.schedule(task, 1000L, TimeUnit.MILLISECONDS);
    }
    //订阅服务,serviceName服务名称,clusters集群列表,EventLintener回调Listener
    public void subscribe(String serviceName, String clusters, EventListener eventListener) {
        //给该服务增加监听器,服务发生变化后进行回调
        this.notifier.registerListener(serviceName, clusters, eventListener);
        //将该服务添加到HostReactor的定时任务中,定时刷新
        this.getServiceInfo(serviceName, clusters);
    }
    //取消订阅服务,serviceName服务名称,clusters集群列表,EventLintener回调Listener
    public void unSubscribe(String serviceName, String clusters, EventListener eventListener) {
        this.notifier.deregisterListener(serviceName, clusters, eventListener);
    }

    public List<ServiceInfo> getSubscribeServices() {
        return this.notifier.getSubscribeServices();
    }
    //处理从注册中心获取到的JSON格式的服务实例,并更新到本地serviceInfoMap中
    public ServiceInfo processServiceJson(String json) {
        ServiceInfo serviceInfo = (ServiceInfo)JacksonUtils.toObj(json, ServiceInfo.class);
        String serviceKey = serviceInfo.getKey();
        if (serviceKey == null) {
            return null;
        } else {
            ServiceInfo oldService = (ServiceInfo)this.serviceInfoMap.get(serviceKey);
            if (this.pushEmptyProtection && !serviceInfo.validate()) {
                return oldService;
            } else {
                boolean changed = false;
                if (oldService != null) {
                    if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {
                        LogUtils.NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: " + serviceInfo.getLastRefTime());
                    }

                    this.serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
                    Map<String, Instance> oldHostMap = new HashMap(oldService.getHosts().size());
                    Iterator var7 = oldService.getHosts().iterator();

                    while(var7.hasNext()) {
                        Instance host = (Instance)var7.next();
                        oldHostMap.put(host.toInetAddr(), host);
                    }

                    Map<String, Instance> newHostMap = new HashMap(serviceInfo.getHosts().size());
                    Iterator var17 = serviceInfo.getHosts().iterator();

                    while(var17.hasNext()) {
                        Instance host = (Instance)var17.next();
                        newHostMap.put(host.toInetAddr(), host);
                    }

                    Set<Instance> modHosts = new HashSet();
                    Set<Instance> newHosts = new HashSet();
                    Set<Instance> remvHosts = new HashSet();
                    List<Entry<String, Instance>> newServiceHosts = new ArrayList(newHostMap.entrySet());
                    Iterator var12 = newServiceHosts.iterator();

                    while(true) {
                        Entry entry;
                        Instance host;
                        String key;
                        while(var12.hasNext()) {
                            entry = (Entry)var12.next();
                            host = (Instance)entry.getValue();
                            key = (String)entry.getKey();
                            if (oldHostMap.containsKey(key) && !StringUtils.equals(host.toString(), ((Instance)oldHostMap.get(key)).toString())) {
                                modHosts.add(host);
                            } else if (!oldHostMap.containsKey(key)) {
                                newHosts.add(host);
                            }
                        }

                        var12 = oldHostMap.entrySet().iterator();

                        while(var12.hasNext()) {
                            entry = (Entry)var12.next();
                            host = (Instance)entry.getValue();
                            key = (String)entry.getKey();
                            if (!newHostMap.containsKey(key) && !newHostMap.containsKey(key)) {
                                remvHosts.add(host);
                            }
                        }

                        if (newHosts.size() > 0) {
                            changed = true;
                        }

                        if (remvHosts.size() > 0) {
                            changed = true;
                        }

                        if (modHosts.size() > 0) {
                            changed = true;
                            this.updateBeatInfo(modHosts);
                        }

                        serviceInfo.setJsonFromServer(json);
                        if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) {
                            NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts()));
                            DiskCache.write(serviceInfo, this.cacheDir);
                        }
                        break;
                    }
                } else {
                    changed = true;
     
                    this.serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
                    NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts()));
                    serviceInfo.setJsonFromServer(json);
                    DiskCache.write(serviceInfo, this.cacheDir);
                }

                MetricsMonitor.getServiceInfoMapSizeMonitor().set((double)this.serviceInfoMap.size());
                if (changed) {
                   //记录日志
                }
                return serviceInfo;
            }
        }
    }

    private void updateBeatInfo(Set<Instance> modHosts) {
        Iterator var2 = modHosts.iterator();

        while(var2.hasNext()) {
            Instance instance = (Instance)var2.next();
            String key = this.beatReactor.buildKey(instance.getServiceName(), instance.getIp(), instance.getPort());
            if (this.beatReactor.dom2Beat.containsKey(key) && instance.isEphemeral()) {
                BeatInfo beatInfo = this.beatReactor.buildBeatInfo(instance);
                this.beatReactor.addBeatInfo(instance.getServiceName(), beatInfo);
            }
        }

    }
    //从本地缓存中获取服务实例信息
    private ServiceInfo getServiceInfo0(String serviceName, String clusters) {
        String key = ServiceInfo.getKey(serviceName, clusters);
        return (ServiceInfo)this.serviceInfoMap.get(key);
    }
    //直接从服务注册中心获取服务
    public ServiceInfo getServiceInfoDirectlyFromServer(String serviceName, String clusters) throws NacosException {
        String result = this.serverProxy.queryList(serviceName, clusters, 0, false);
        return StringUtils.isNotEmpty(result) ? (ServiceInfo)JacksonUtils.toObj(result, ServiceInfo.class) : null;
    }
    //获取服务,先从本地获取,本地没有,则进行维护,并从注册中心更新最新服务信息
    public ServiceInfo getServiceInfo(String serviceName, String clusters) {
        
        String key = ServiceInfo.getKey(serviceName, clusters);
        if (this.failoverReactor.isFailoverSwitch()) {
            return this.failoverReactor.getService(key);
        } else {
            // 1.先通过serverName即服务名获得一个serviceInfo
            ServiceInfo serviceObj = this.getServiceInfo0(serviceName, clusters);
            if (null == serviceObj) {
                //如果没有serviceInfo,则通过传进来的参数new出一个新的serviceInfo对象,并且同时维护到本地Map和更新Map
                serviceObj = new ServiceInfo(serviceName, clusters);
                this.serviceInfoMap.put(serviceObj.getKey(), serviceObj);
                this.updatingMap.put(serviceName, new Object());
                // 2.updateServiceNow(),立刻去Nacos服务端拉取该服务最新实例列表,更新serviceInfoMap
                this.updateServiceNow(serviceName, clusters);
                this.updatingMap.remove(serviceName);
            } else if (this.updatingMap.containsKey(serviceName)) {
                synchronized(serviceObj) {
                    try {
                        serviceObj.wait(5000L);
                    } catch (InterruptedException var8) {
                        
                    }
                }
            }
            // 3.定时更新实例信息
            this.scheduleUpdateIfAbsent(serviceName, clusters);
            // 4.最后返回服务实例数据(前面已经进行了更新)
            return (ServiceInfo)this.serviceInfoMap.get(serviceObj.getKey());
        }
    }
    //立即从注册中心拉取该服务最新实例列表,并更新到本地
    private void updateServiceNow(String serviceName, String clusters) {
        try {
            this.updateService(serviceName, clusters);
        } catch (NacosException var4) {  
        }
    }
    //通过定时任务,每10秒去更新一次数据
    public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
        if (this.futureMap.get(ServiceInfo.getKey(serviceName, clusters)) == null) {
            synchronized(this.futureMap) {
                if (this.futureMap.get(ServiceInfo.getKey(serviceName, clusters)) == null) {
                    //创建一个UpdateTask的更新线程任务,每10秒去异步更新集合数据
                    ScheduledFuture<?> future = this.addTask(new HostReactor.UpdateTask(serviceName, clusters));
                    this.futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
                }
            }
        }
    }
    //从注册中心拉取该服务最新实例列表,并更新到本地
    public void updateService(String serviceName, String clusters) throws NacosException {
        ServiceInfo oldService = this.getServiceInfo0(serviceName, clusters);
        boolean var12 = false;
        try {
            var12 = true;
            //从注册中心查询服务下的实例列表
            String result = this.serverProxy.queryList(serviceName, clusters, this.pushReceiver.getUdpPort(), false);
            if (StringUtils.isNotEmpty(result)) {
                //处理从注册中心获取到的服务实例JSON数据,更新本地服务列表
                this.processServiceJson(result);
                var12 = false;
            } else {
                var12 = false;
            }
        } finally {
            if (var12) {
                if (oldService != null) {
                    synchronized(oldService) {
                        oldService.notifyAll();
                    }
                }
            }
        }
        if (oldService != null) {
            synchronized(oldService) {
                oldService.notifyAll();
            }
        }
    }
    //仅仅执行刷新,从Nacos注册中心获取服务,但不刷新本地列表
    public void refreshOnly(String serviceName, String clusters) {
        try {
            this.serverProxy.queryList(serviceName, clusters, this.pushReceiver.getUdpPort(), false);
        } catch (Exception var4) {    
        }
    }

    public void shutdown() throws NacosException {
        String className = this.getClass().getName();
        LogUtils.NAMING_LOGGER.info("{} do shutdown begin", className);
        ThreadUtils.shutdownThreadPool(this.executor, LogUtils.NAMING_LOGGER);
        this.pushReceiver.shutdown();
        this.failoverReactor.shutdown();
        NotifyCenter.deregisterSubscriber(this.notifier);
        LogUtils.NAMING_LOGGER.info("{} do shutdown stop", className);
    }
    //更新任务线程
    public class UpdateTask implements Runnable {
        long lastRefTime = 9223372036854775807L;
        private final String clusters;
        private final String serviceName;
        private int failCount = 0;

        public UpdateTask(String serviceName, String clusters) {
            this.serviceName = serviceName;
            this.clusters = clusters;
        }

        private void incFailCount() {
            int limit = 6;
            if (this.failCount != limit) {
                ++this.failCount;
            }
        }

        private void resetFailCount() {
            this.failCount = 0;
        }

        public void run() {
            long delayTime = 1000L;

            try {
                //从本地缓存中获取服务实例列表
                ServiceInfo serviceObj = (ServiceInfo)HostReactor.this.serviceInfoMap.get(ServiceInfo.getKey(this.serviceName, this.clusters));
                //如果服务为空,则更新本地服务列表
                if (serviceObj == null) {
                    HostReactor.this.updateService(this.serviceName, this.clusters);
                    return;
                }
                // 过期服务(服务的最新更新时间小于等于缓存刷新时间),从注册中心重新查询
                if (serviceObj.getLastRefTime() <= this.lastRefTime) {
                    HostReactor.this.updateService(this.serviceName, this.clusters);
                    serviceObj = (ServiceInfo)HostReactor.this.serviceInfoMap.get(ServiceInfo.getKey(this.serviceName, this.clusters));
                } else {
                    HostReactor.this.refreshOnly(this.serviceName, this.clusters);
                }
                 // 刷新更新时间 
                this.lastRefTime = serviceObj.getLastRefTime();
                // 判断该注册的Service是否被订阅,如果没有订阅则不再执行
                if (!HostReactor.this.notifier.isSubscribed(this.serviceName, this.clusters) && !HostReactor.this.futureMap.containsKey(ServiceInfo.getKey(this.serviceName, this.clusters))) {
                    return;
                }

                if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
                    this.incFailCount();
                    return;
                }
                // 下次更新缓存时间设置,默认为10秒
                delayTime = serviceObj.getCacheMillis();
                //任务运行成功,重置失败次数为0
                this.resetFailCount();
            } catch (Throwable var7) {
                //任务执行失败,失败次数+1
                this.incFailCount();
            } finally {
                //取delayTime<<failCount的值与60000L之间的小值,作为下次运行的时间间隔
                //下次调度刷新时间,下次执行的时间与failCount有关 
                HostReactor.this.executor.schedule(this, Math.min(delayTime << this.failCount, 60000L), TimeUnit.MILLISECONDS);
            }

        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238
  • 239
  • 240
  • 241
  • 242
  • 243
  • 244
  • 245
  • 246
  • 247
  • 248
  • 249
  • 250
  • 251
  • 252
  • 253
  • 254
  • 255
  • 256
  • 257
  • 258
  • 259
  • 260
  • 261
  • 262
  • 263
  • 264
  • 265
  • 266
  • 267
  • 268
  • 269
  • 270
  • 271
  • 272
  • 273
  • 274
  • 275
  • 276
  • 277
  • 278
  • 279
  • 280
  • 281
  • 282
  • 283
  • 284
  • 285
  • 286
  • 287
  • 288
  • 289
  • 290
  • 291
  • 292
  • 293
  • 294
  • 295
  • 296
  • 297
  • 298
  • 299
  • 300
  • 301
  • 302
  • 303
  • 304
  • 305
  • 306
  • 307
  • 308
  • 309
  • 310
  • 311
  • 312
  • 313
  • 314
  • 315
  • 316
  • 317
  • 318
  • 319
  • 320
  • 321
  • 322
  • 323
  • 324
  • 325
  • 326
  • 327
  • 328
  • 329
  • 330
  • 331
  • 332
  • 333
  • 334
  • 335
  • 336
  • 337
  • 338
  • 339
  • 340
  • 341
  • 342
  • 343
  • 344
  • 345
  • 346
  • 347
  • 348
  • 349
  • 350
  • 351
  • 352
  • 353
  • 354
  • 355
  • 356
  • 357
  • 358
  • 359
  • 360
  • 361
  • 362
  • 363
  • 364
  • 365
  • 366
  • 367

获取服务实例流程

在这里插入图片描述

更新服务实例流程

在这里插入图片描述

PushReceiver

PushReceiver用于接收Nacos服务端的推送,初始化时会创建DatagramSocket使用UDP的方式接收推送。

会启动1个名为com.alibaba.nacos.naming.push.receiver的线程。

public class PushReceiver implements Runnable, Closeable {
    private static final Charset UTF_8 = Charset.forName("UTF-8");
    private static final int UDP_MSS = 65536;
    private ScheduledExecutorService executorService;
    private DatagramSocket udpSocket;
    private HostReactor hostReactor;
    private volatile boolean closed = false;

    public static String getPushReceiverUdpPort() {
        return System.getenv("push.receiver.udp.port");
    }

    public PushReceiver(HostReactor hostReactor) {
        try {
            this.hostReactor = hostReactor;
            String udpPort = getPushReceiverUdpPort();
            if (StringUtils.isEmpty(udpPort)) {
                this.udpSocket = new DatagramSocket();
            } else {
                this.udpSocket = new DatagramSocket(new InetSocketAddress(Integer.parseInt(udpPort)));
            }

            this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setDaemon(true);
                    thread.setName("com.alibaba.nacos.naming.push.receiver");
                    return thread;
                }
            });
            this.executorService.execute(this);
        } catch (Exception var3) {
            LogUtils.NAMING_LOGGER.error("[NA] init udp socket failed", var3);
        }

    }

    public void run() {
        while(!this.closed) {
            try {
                byte[] buffer = new byte[65536];
                DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
                this.udpSocket.receive(packet);
                String json = (new String(IoUtils.tryDecompress(packet.getData()), UTF_8)).trim();
                LogUtils.NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
                PushReceiver.PushPacket pushPacket = (PushReceiver.PushPacket)JacksonUtils.toObj(json, PushReceiver.PushPacket.class);
                String ack;
                if (!"dom".equals(pushPacket.type) && !"service".equals(pushPacket.type)) {
                    if ("dump".equals(pushPacket.type)) {
                        ack = "{\"type\": \"dump-ack\", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(this.hostReactor.getServiceInfoMap())) + "\"}";
                    } else {
                        ack = "{\"type\": \"unknown-ack\", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":\"\"}";
                    }
                } else {
                    //调用HostReactor处理收到的json数据
                    this.hostReactor.processServiceJson(pushPacket.data);
                    ack = "{\"type\": \"push-ack\", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":\"\"}";
                }

                this.udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length, packet.getSocketAddress()));
            } catch (Exception var6) {
                if (this.closed) {
                    return;
                }

                LogUtils.NAMING_LOGGER.error("[NA] error while receiving push data", var6);
            }
        }

    }

    public void shutdown() throws NacosException {
        String className = this.getClass().getName();
        LogUtils.NAMING_LOGGER.info("{} do shutdown begin", className);
        ThreadUtils.shutdownThreadPool(this.executorService, LogUtils.NAMING_LOGGER);
        this.closed = true;
        this.udpSocket.close();
        LogUtils.NAMING_LOGGER.info("{} do shutdown stop", className);
    }

    public int getUdpPort() {
        return this.udpSocket.getLocalPort();
    }

    public static class PushPacket {
        public String type;
        public long lastRefTime;
        public String data;

        public PushPacket() {
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94

InstancesChangeNotifier

该对象负责保存服务实例的监听,当服务实例发生变化的时候,负责进行通知,回调监听器方法

public class InstancesChangeNotifier extends Subscriber<InstancesChangeEvent> {
    //监听器map
    private final Map<String, ConcurrentHashSet<EventListener>> listenerMap = new ConcurrentHashMap();
    private final Object lock = new Object();

    public InstancesChangeNotifier() {
    }
    //注册服务监听器
    public void registerListener(String serviceName, String clusters, EventListener listener) {
        String key = ServiceInfo.getKey(serviceName, clusters);
        ConcurrentHashSet<EventListener> eventListeners = (ConcurrentHashSet)this.listenerMap.get(key);
        if (eventListeners == null) {
            synchronized(this.lock) {
                eventListeners = (ConcurrentHashSet)this.listenerMap.get(key);
                if (eventListeners == null) {
                    eventListeners = new ConcurrentHashSet();
                    this.listenerMap.put(key, eventListeners);
                }
            }
        }

        eventListeners.add(listener);
    }
    //取消注册服务监听器
    public void deregisterListener(String serviceName, String clusters, EventListener listener) {
        String key = ServiceInfo.getKey(serviceName, clusters);
        ConcurrentHashSet<EventListener> eventListeners = (ConcurrentHashSet)this.listenerMap.get(key);
        if (eventListeners != null) {
            eventListeners.remove(listener);
            if (CollectionUtils.isEmpty(eventListeners)) {
                this.listenerMap.remove(key);
            }

        }
    }
    //判断是否被订阅
    public boolean isSubscribed(String serviceName, String clusters) {
        String key = ServiceInfo.getKey(serviceName, clusters);
        ConcurrentHashSet<EventListener> eventListeners = (ConcurrentHashSet)this.listenerMap.get(key);
        return CollectionUtils.isNotEmpty(eventListeners);
    }

    public List<ServiceInfo> getSubscribeServices() {
        List<ServiceInfo> serviceInfos = new ArrayList();
        Iterator var2 = this.listenerMap.keySet().iterator();

        while(var2.hasNext()) {
            String key = (String)var2.next();
            serviceInfos.add(ServiceInfo.fromKey(key));
        }

        return serviceInfos;
    }
    //服务实例变化事件
    public void onEvent(InstancesChangeEvent event) {
        String key = ServiceInfo.getKey(event.getServiceName(), event.getClusters());
        //获取该服务的订阅者
        ConcurrentHashSet<EventListener> eventListeners = (ConcurrentHashSet)this.listenerMap.get(key);
        if (!CollectionUtils.isEmpty(eventListeners)) {
            Iterator var4 = eventListeners.iterator();

            while(true) {
                //循环该服务的订阅者,调用订阅者的回调方法
                while(var4.hasNext()) {
                    final EventListener listener = (EventListener)var4.next();
                    final Event namingEvent = this.transferToNamingEvent(event);
                    //如果该listener继承了Nacos定义的AbstractEventListener,并且executor不为空,则通过executor以新的线程的方式回调监听onEvent方法
                    if (listener instanceof AbstractEventListener && ((AbstractEventListener)listener).getExecutor() != null) {
                        ((AbstractEventListener)listener).getExecutor().execute(new Runnable() {
                            public void run() {
                                listener.onEvent(namingEvent);
                            }
                        });
                    } 
                    //否则直接调用该监听器的onEvent方法
                    else {
                        listener.onEvent(namingEvent);
                    }
                }
                //回调结束,返回
                return;
            }
        }
    }
    //转换事件
    private Event transferToNamingEvent(InstancesChangeEvent instancesChangeEvent) {
        return new NamingEvent(instancesChangeEvent.getServiceName(), instancesChangeEvent.getGroupName(), instancesChangeEvent.getClusters(), instancesChangeEvent.getHosts());
    }

    public Class<? extends com.alibaba.nacos.common.notify.Event> subscribeType() {
        return InstancesChangeEvent.class;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93

参考http://dreamphp.cn/blog/detail?blog_id=25851

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/秋刀鱼在做梦/article/detail/937138
推荐阅读
相关标签
  

闽ICP备14008679号