赞
踩
服务启动入口类为Elasticsearch
/** * Main entry point for starting elasticsearch */ public static void main(final String[] args) throws Exception { LogConfigurator.registerErrorListener(); final Elasticsearch elasticsearch = new Elasticsearch(); //启动 int status = main(args, elasticsearch, Terminal.DEFAULT); if (status != ExitCodes.OK) { final String basePath = System.getProperty("es.logs.base_path"); // It's possible to fail before logging has been configured, in which case there's no point // suggesting that the user look in the log file. if (basePath != null) { Terminal.DEFAULT.errorPrintln( "ERROR: Elasticsearch did not exit normally - check the logs at " + basePath + System.getProperty("file.separator") + System.getProperty("es.logs.cluster_name") + ".log" ); } exit(status); } } static int main(final String[] args, final Elasticsearch elasticsearch, final Terminal terminal) throws Exception { return elasticsearch.main(args, terminal); }
elasticsearch类继承了EnvironmentAwareCommand,首先会解析配置文件,将配置信息加载进来
@Override protected void execute(Terminal terminal, OptionSet options) throws Exception { final Map<String, String> settings = new HashMap<>(); //遍历配置存入map for (final KeyValuePair kvp : settingOption.values(options)) { if (kvp.value.isEmpty()) { throw new UserException(ExitCodes.USAGE, "setting [" + kvp.key + "] must not be empty"); } if (settings.containsKey(kvp.key)) { final String message = String.format( Locale.ROOT, "setting [%s] already set, saw [%s] and [%s]", kvp.key, settings.get(kvp.key), kvp.value); throw new UserException(ExitCodes.USAGE, message); } settings.put(kvp.key, kvp.value); } //设置data存储 putSystemPropertyIfSettingIsMissing(settings, "path.data", "es.path.data"); //home目录 putSystemPropertyIfSettingIsMissing(settings, "path.home", "es.path.home"); //日志记录目录 putSystemPropertyIfSettingIsMissing(settings, "path.logs", "es.path.logs"); execute(terminal, options, createEnv(settings)); }
然后调用elasticsearch下的execute方法
@Override protected void execute(Terminal terminal, OptionSet options, Environment env) throws UserException { //后台运行 final boolean daemonize = options.has(daemonizeOption); //记录pid的文件 final Path pidFile = pidfileOption.value(options); //静默模式 final boolean quiet = options.has(quietOption); // a misconfigured java.io.tmpdir can cause hard-to-diagnose problems later, so reject it immediately try { //确保配置的临时目录是有效目录 env.validateTmpFile(); } catch (IOException e) { throw new UserException(ExitCodes.CONFIG, e.getMessage()); } try { //初始化 init(daemonize, pidFile, quiet, env); } catch (NodeValidationException e) { throw new UserException(ExitCodes.CONFIG, e.getMessage()); } }
这里首先会根据不同的启动参数进行处理,及是否后台运行,是否指定了启动的pid文件,是否为静默模式,检查配置的临时目录是否存在调用Bootstrap的init方法
Bootstrap.init(!daemonize, pidFile, quiet, initialEnv); static void init( final boolean foreground, final Path pidFile, final boolean quiet, final Environment initialEnv) throws BootstrapException, NodeValidationException, UserException { INSTANCE = new Bootstrap(); final SecureSettings keystore = loadSecureSettings(initialEnv); //创建运行环境 final Environment environment = createEnvironment(pidFile, keystore, initialEnv.settings(), initialEnv.configFile()); //设置node名称 LogConfigurator.setNodeName(Node.NODE_NAME_SETTING.get(environment.settings())); try { LogConfigurator.configure(environment); } catch (IOException e) { throw new BootstrapException(e); } //校验java版本 if (JavaVersion.current().compareTo(JavaVersion.parse("11")) < 0) { final String message = String.format( Locale.ROOT, "future versions of Elasticsearch will require Java 11; " + "your Java version from [%s] does not meet this requirement", System.getProperty("java.home")); new DeprecationLogger(LogManager.getLogger(Bootstrap.class)).deprecatedAndMaybeLog("java_version_11_required", message); } if (environment.pidFile() != null) { try { //创建pid文件 PidFile.create(environment.pidFile(), true); } catch (IOException e) { throw new BootstrapException(e); } } final boolean closeStandardStreams = (foreground == false) || quiet; try { if (closeStandardStreams) { final Logger rootLogger = LogManager.getRootLogger(); final Appender maybeConsoleAppender = Loggers.findAppender(rootLogger, ConsoleAppender.class); if (maybeConsoleAppender != null) { Loggers.removeAppender(rootLogger, maybeConsoleAppender); } closeSystOut(); } //校验lucene依赖版本,防止有人替换了lucene jar包 checkLucene(); // install the default uncaught exception handler; must be done before security is // initialized as we do not want to grant the runtime permission // setDefaultUncaughtExceptionHandler //设置异常处理类 Thread.setDefaultUncaughtExceptionHandler(new ElasticsearchUncaughtExceptionHandler()); //创建node实例,并创建相关的服务 INSTANCE.setup(true, environment); try { // any secure settings must be read during node construction IOUtils.close(keystore); } catch (IOException e) { throw new BootstrapException(e); } //启动节点 INSTANCE.start(); } catch (NodeValidationException | RuntimeException e) { ... } }
这里会先创建bootstrap实例,添加一个shutdownhook当接收到signal时候,会触发keepAliveThread线程任务退出
Bootstrap() { keepAliveThread = new Thread(new Runnable() { @Override public void run() { try { keepAliveLatch.await(); } catch (InterruptedException e) { // bail out } } }, "elasticsearch[keepAlive/" + Version.CURRENT + "]"); keepAliveThread.setDaemon(false); // keep this thread alive (non daemon thread) until we shutdown Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { keepAliveLatch.countDown(); } }); }
然后创建运行环境,检查jdk版本不能低于11,同时检查lucene的jar包依赖是否正常,然后调用setup方法创建node对象,node构造函数会加载所有组件。
//插件服务
this.pluginsService = new PluginsService(tmpSettings, initialEnvironment.configFile(), initialEnvironment.modulesFile(),
initialEnvironment.pluginsFile(), classpathPlugins);
ES插件服务,很多组件都是通过插件的形式加载进来的比如底层的通信服务等,就是将modules和plugins目录下的内容加载进来。
//创建配置
this.environment = new Environment(settings, initialEnvironment.configFile(), Node.NODE_LOCAL_STORAGE_SETTING.get(settings));
Environment.assertEquivalent(initialEnvironment, this.environment);
//创建节点配置
nodeEnvironment = new NodeEnvironment(tmpSettings, environment);
创建运行环境,封装一些配置信息
//初始化创建线程池 final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0])); public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBuilders) { assert Node.NODE_NAME_SETTING.exists(settings); final Map<String, ExecutorBuilder> builders = new HashMap<>(); //分片分配操作处理线程数 final int allocatedProcessors = EsExecutors.allocatedProcessors(settings); //allocatedProcessors的一半,最大为5 final int halfProcMaxAt5 = halfAllocatedProcessorsMaxFive(allocatedProcessors); //allocatedProcessors的一半,最大为10 final int halfProcMaxAt10 = halfAllocatedProcessorsMaxTen(allocatedProcessors); //generic线程池最大数 final int genericThreadPoolMax = boundedBy(4 * allocatedProcessors, 128, 512); //定义线程池 builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30))); builders.put(Names.WRITE, new FixedExecutorBuilder(settings, Names.WRITE, allocatedProcessors, 200)); builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, allocatedProcessors, 1000)); builders.put(Names.ANALYZE, new FixedExecutorBuilder(settings, Names.ANALYZE, 1, 16)); builders.put(Names.SEARCH, new AutoQueueAdjustingExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(allocatedProcessors), 1000, 1000, 1000, 2000)); builders.put(Names.SEARCH_THROTTLED, new AutoQueueAdjustingExecutorBuilder(settings, Names.SEARCH_THROTTLED, 1, 100, 100, 100, 200)); builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5))); // no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded // the assumption here is that the listeners should be very lightweight on the listeners side builders.put(Names.LISTENER, new FixedExecutorBuilder(settings, Names.LISTENER, halfProcMaxAt10, -1, true)); builders.put(Names.FLUSH, new ScalingExecutorBuilder(Names.FLUSH, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); builders.put(Names.REFRESH, new ScalingExecutorBuilder(Names.REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5))); builders.put(Names.WARMER, new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); builders.put(Names.SNAPSHOT, new ScalingExecutorBuilder(Names.SNAPSHOT, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); builders.put(Names.FETCH_SHARD_STARTED, new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * allocatedProcessors, TimeValue.timeValueMinutes(5))); builders.put(Names.FORCE_MERGE, new FixedExecutorBuilder(settings, Names.FORCE_MERGE, 1, -1)); builders.put(Names.FETCH_SHARD_STORE, new ScalingExecutorBuilder(Names.FETCH_SHARD_STORE, 1, 2 * allocatedProcessors, TimeValue.timeValueMinutes(5))); for (final ExecutorBuilder<?> builder : customBuilders) { if (builders.containsKey(builder.name())) { throw new IllegalArgumentException("builder with name [" + builder.name() + "] already exists"); } builders.put(builder.name(), builder); } this.builders = Collections.unmodifiableMap(builders); threadContext = new ThreadContext(settings); final Map<String, ExecutorHolder> executors = new HashMap<>(); for (final Map.Entry<String, ExecutorBuilder> entry : builders.entrySet()) { final ExecutorBuilder.ExecutorSettings executorSettings = entry.getValue().getSettings(settings); final ExecutorHolder executorHolder = entry.getValue().build(executorSettings, threadContext); if (executors.containsKey(executorHolder.info.getName())) { throw new IllegalStateException("duplicate executors with name [" + executorHolder.info.getName() + "] registered"); } logger.debug("created thread pool: {}", entry.getValue().formatInfo(executorHolder.info)); executors.put(entry.getKey(), executorHolder); } executors.put(Names.SAME, new ExecutorHolder(DIRECT_EXECUTOR, new Info(Names.SAME, ThreadPoolType.DIRECT))); this.executors = unmodifiableMap(executors); final List<Info> infos = executors .values() .stream() .filter(holder -> holder.info.getName().equals("same") == false) .map(holder -> holder.info) .collect(Collectors.toList()); this.threadPoolInfo = new ThreadPoolInfo(infos); //初始化定时线程池 this.scheduler = Scheduler.initScheduler(settings); TimeValue estimatedTimeInterval = ESTIMATED_TIME_INTERVAL_SETTING.get(settings); //缓存时间线程 this.cachedTimeThread = new CachedTimeThread(EsExecutors.threadName(settings, "[timer]"), estimatedTimeInterval.millis()); this.cachedTimeThread.start(); }
创建自定义的线程池,ES中线程池分为generic、index、search、get、bulk、snapshot、same等
//es扩展脚本模块
final ScriptModule scriptModule = new ScriptModule(settings, pluginsService.filterPlugins(ScriptPlugin.class));
final ScriptService scriptService = newScriptService(settings, scriptModule.engines, scriptModule.contexts);
加载ES扩展脚本模块painless,复杂查询支持通过脚本形式
//分词模块
AnalysisModule analysisModule = new AnalysisModule(this.environment, pluginsService.filterPlugins(AnalysisPlugin.class));
创建分词器模块,用于索引和搜索时进行分词
final SettingsModule settingsModule =
new SettingsModule(settings, additionalSettings, additionalSettingsFilter, settingsUpgraders);
scriptModule.registerClusterSettingsListeners(scriptService, settingsModule.getClusterSettings());
加载各种配置参数,如elasticsearch.yml和jvm.options中配置的参数
//网络服务
final NetworkService networkService = new NetworkService(
getCustomNameResolvers(pluginsService.filterPlugins(DiscoveryPlugin.class)));
//集群插件
List<ClusterPlugin> clusterPlugins = pluginsService.filterPlugins(ClusterPlugin.class);
//集群服务
final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool);
clusterService.addStateApplier(scriptService);
创建网络服务和集群组件服务,用于后期集群选举主节点、生成集群状态、分片分配、发布集群状态、分片恢复、数据同步等功能
//集群信息更新服务
final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client);
//用于监控 Elasticsearch 功能使用情况的服务。
final UsageService usageService = new UsageService();
周期性同步集群状态服务和监控ES使用情况服务
ModulesBuilder modules = new ModulesBuilder();
// plugin modules must be added here, before others or we can get crazy injection errors...
for (Module pluginModule : pluginsService.createGuiceModules()) {
modules.add(pluginModule);
}
加载组件
//监控服务
final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool, clusterInfoService);
创建监控服务,监控jvm运行参数和GC运行情况、句柄使用、操作系统等信息
//集群模块 ClusterModule clusterModule = new ClusterModule(settings, clusterService, clusterPlugins, clusterInfoService); modules.add(clusterModule); public ClusterModule(Settings settings, ClusterService clusterService, List<ClusterPlugin> clusterPlugins, ClusterInfoService clusterInfoService) { this.clusterPlugins = clusterPlugins; //分配选择器 this.deciderList = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins); this.allocationDeciders = new AllocationDeciders(deciderList); //分片分配 this.shardsAllocator = createShardsAllocator(settings, clusterService.getClusterSettings(), clusterPlugins); this.clusterService = clusterService; this.indexNameExpressionResolver = new IndexNameExpressionResolver(); //分配服务 this.allocationService = new AllocationService(allocationDeciders, shardsAllocator, clusterInfoService); } public static Collection<AllocationDecider> createAllocationDeciders(Settings settings, ClusterSettings clusterSettings, List<ClusterPlugin> clusterPlugins) { // collect deciders by class so that we can detect duplicates Map<Class, AllocationDecider> deciders = new LinkedHashMap<>(); addAllocationDecider(deciders, new MaxRetryAllocationDecider()); addAllocationDecider(deciders, new ResizeAllocationDecider()); addAllocationDecider(deciders, new ReplicaAfterPrimaryActiveAllocationDecider()); addAllocationDecider(deciders, new RebalanceOnlyWhenActiveAllocationDecider()); addAllocationDecider(deciders, new ClusterRebalanceAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new ConcurrentRebalanceAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new EnableAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new NodeVersionAllocationDecider()); addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider()); addAllocationDecider(deciders, new RestoreInProgressAllocationDecider()); addAllocationDecider(deciders, new FilterAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new SameShardAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new DiskThresholdDecider(settings, clusterSettings)); addAllocationDecider(deciders, new ThrottlingAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new ShardsLimitAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new AwarenessAllocationDecider(settings, clusterSettings)); clusterPlugins.stream() .flatMap(p -> p.createAllocationDeciders(settings, clusterSettings).stream()) .forEach(d -> addAllocationDecider(deciders, d)); return deciders.values(); }
创建集群模块,这里创建了分片分配决策的各种策略,用于后面的分片分配
//索引处理模块
IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class));
modules.add(indicesModule);
创建索引服务模块
//最主要的查询处理模块 SearchModule searchModule = new SearchModule(settings, false, pluginsService.filterPlugins(SearchPlugin.class)); //断路器服务 CircuitBreakerService circuitBreakerService = createCircuitBreakerService(settingsModule.getSettings(), settingsModule.getClusterSettings()); public SearchModule(Settings settings, boolean transportClient, List<SearchPlugin> plugins) { this.settings = settings; //交互组件 this.transportClient = transportClient; //查询建议模块 registerSuggesters(plugins); //高亮 highlighters = setupHighlighters(settings, plugins); registerScoreFunctions(plugins); //查询解析器 registerQueryParsers(plugins); //查询出来的文档打分 registerRescorers(plugins); //文档排序 registerSorts(); //数据根式化 registerValueFormats(); registerSignificanceHeuristics(plugins); //注册聚合组件 this.valuesSourceRegistry = registerAggregations(plugins); registerMovingAverageModels(plugins); //一些管道处理聚合比如bucket_selector registerPipelineAggregations(plugins); //获取数据阶段 registerFetchSubPhases(plugins); //查询是否存在 registerSearchExts(plugins); registerShapes(); registerIntervalsSourceProviders(); namedWriteables.addAll(SortValue.namedWriteables()); }
查询模块中又创建了子组件,比如查询建议模块、高亮查询、查询解析器、文档打分、文档排序、聚合服务等
查询建议词
private void registerSuggesters(List<SearchPlugin> plugins) {
registerSmoothingModels(namedWriteables);
//term级智能建议
registerSuggester(new SuggesterSpec<>(TermSuggestionBuilder.SUGGESTION_NAME,
TermSuggestionBuilder::new, TermSuggestionBuilder::fromXContent, TermSuggestion::new));
//分词级智能建议
registerSuggester(new SuggesterSpec<>(PhraseSuggestionBuilder.SUGGESTION_NAME,
PhraseSuggestionBuilder::new, PhraseSuggestionBuilder::fromXContent, PhraseSuggestion::new));
//completion级智能建议
registerSuggester(new SuggesterSpec<>(CompletionSuggestionBuilder.SUGGESTION_NAME,
CompletionSuggestionBuilder::new, CompletionSuggestionBuilder::fromXContent, CompletionSuggestion::new));
registerFromPlugin(plugins, SearchPlugin::getSuggesters, this::registerSuggester);
}
查询解析器包含很多种,这里有兴趣可以自己阅读一下
private void registerQueryParsers(List<SearchPlugin> plugins) { registerQuery(new QuerySpec<>(MatchQueryBuilder.NAME, MatchQueryBuilder::new, MatchQueryBuilder::fromXContent)); registerQuery(new QuerySpec<>(MatchPhraseQueryBuilder.NAME, MatchPhraseQueryBuilder::new, MatchPhraseQueryBuilder::fromXContent)); registerQuery(new QuerySpec<>(MatchPhrasePrefixQueryBuilder.NAME, MatchPhrasePrefixQueryBuilder::new, MatchPhrasePrefixQueryBuilder::fromXContent)); registerQuery(new QuerySpec<>(MultiMatchQueryBuilder.NAME, MultiMatchQueryBuilder::new, MultiMatchQueryBuilder::fromXContent)); registerQuery(new QuerySpec<>(NestedQueryBuilder.NAME, NestedQueryBuilder::new, NestedQueryBuilder::fromXContent)); registerQuery(new QuerySpec<>(IdsQueryBuilder.NAME, IdsQueryBuilder::new, IdsQueryBuilder::fromXContent)); registerQuery(new QuerySpec<>(MatchAllQueryBuilder.NAME, MatchAllQueryBuilder::new, MatchAllQueryBuilder::fromXContent)); registerQuery(new QuerySpec<>(QueryStringQueryBuilder.NAME, QueryStringQueryBuilder::new, QueryStringQueryBuilder::fromXContent)); registerQuery(new QuerySpec<>(BoostingQueryBuilder.NAME, BoostingQueryBuilder::new, BoostingQueryBuilder::fromXContent)); BooleanQuery.setMaxClauseCount(INDICES_MAX_CLAUSE_COUNT_SETTING.get(settings)); registerQuery(new QuerySpec<>(BoolQueryBuilder.NAME, BoolQueryBuilder::new, BoolQueryBuilder::fromXContent)); registerQuery(new QuerySpec<>(TermQueryBuilder.NAME, TermQueryBuilder::new, TermQueryBuilder::fromXContent)); registerQuery(new QuerySpec<>(TermsQueryBuilder.NAME, TermsQueryBuilder::new, TermsQueryBuilder::fromXContent)); registerQuery(new QuerySpec<>(FuzzyQueryBuilder.NAME, FuzzyQueryBuilder::new, FuzzyQueryBuilder::fromXContent)); registerQuery(new QuerySpec<>(RegexpQueryBuilder.NAME, RegexpQueryBuilder::new, RegexpQueryBuilder::fromXContent)); registerQuery(new QuerySpec<>(RangeQueryBuilder.NAME, RangeQueryBuilder::new, RangeQueryBuilder::fromXContent)); registerQuery(new QuerySpec<>(PrefixQueryBuilder.NAME, PrefixQueryBuilder::new, PrefixQueryBuilder::fromXContent)); registerQuery(new QuerySpec<>(WildcardQueryBuilder.NAME, WildcardQueryBuilder::new, WildcardQueryBuilder::fromXContent)); ... registerFromPlugin(plugins, SearchPlugin::getQueries, this::registerQuery); }
文档打分,会根据查询关键词的词频进行打分
private void registerRescorers(List<SearchPlugin> plugins) {
registerRescorer(new RescorerSpec<>(QueryRescorerBuilder.NAME, QueryRescorerBuilder::new, QueryRescorerBuilder::fromXContent));
registerFromPlugin(plugins, SearchPlugin::getRescorers, this::registerRescorer);
}
查询排序组件
private void registerSorts() {
//地图查询排序
namedWriteables.add(new NamedWriteableRegistry.Entry(SortBuilder.class, GeoDistanceSortBuilder.NAME, GeoDistanceSortBuilder::new));
namedWriteables.add(new NamedWriteableRegistry.Entry(SortBuilder.class, ScoreSortBuilder.NAME, ScoreSortBuilder::new));
namedWriteables.add(new NamedWriteableRegistry.Entry(SortBuilder.class, ScriptSortBuilder.NAME, ScriptSortBuilder::new));
namedWriteables.add(new NamedWriteableRegistry.Entry(SortBuilder.class, FieldSortBuilder.NAME, FieldSortBuilder::new));
}
注册各种聚合查询服务,如果指标聚合、桶聚合服务
private ValuesSourceRegistry registerAggregations(List<SearchPlugin> plugins) { ValuesSourceRegistry.Builder builder = new ValuesSourceRegistry.Builder(); registerAggregation(new AggregationSpec(AvgAggregationBuilder.NAME, AvgAggregationBuilder::new, AvgAggregationBuilder.PARSER) .addResultReader(InternalAvg::new) .setAggregatorRegistrar(AvgAggregationBuilder::registerAggregators), builder); registerAggregation(new AggregationSpec(WeightedAvgAggregationBuilder.NAME, WeightedAvgAggregationBuilder::new, WeightedAvgAggregationBuilder.PARSER).addResultReader(InternalWeightedAvg::new) .setAggregatorRegistrar(WeightedAvgAggregationBuilder::registerUsage), builder); registerAggregation(new AggregationSpec(SumAggregationBuilder.NAME, SumAggregationBuilder::new, SumAggregationBuilder.PARSER) .addResultReader(InternalSum::new) .setAggregatorRegistrar(SumAggregationBuilder::registerAggregators), builder); registerAggregation(new AggregationSpec(MinAggregationBuilder.NAME, MinAggregationBuilder::new, MinAggregationBuilder.PARSER) .addResultReader(InternalMin::new) .setAggregatorRegistrar(MinAggregationBuilder::registerAggregators), builder); registerAggregation(new AggregationSpec(MaxAggregationBuilder.NAME, MaxAggregationBuilder::new, MaxAggregationBuilder.PARSER) .addResultReader(InternalMax::new) .setAggregatorRegistrar(MaxAggregationBuilder::registerAggregators), builder); registerAggregation(new AggregationSpec(StatsAggregationBuilder.NAME, StatsAggregationBuilder::new, StatsAggregationBuilder.PARSER) .addResultReader(InternalStats::new) .setAggregatorRegistrar(StatsAggregationBuilder::registerAggregators), builder); registerAggregation(new AggregationSpec(ExtendedStatsAggregationBuilder.NAME, ExtendedStatsAggregationBuilder::new,ExtendedStatsAggregationBuilder.PARSER) .addResultReader(InternalExtendedStats::new) .setAggregatorRegistrar(ExtendedStatsAggregationBuilder::registerAggregators), builder); registerAggregation(new AggregationSpec(ValueCountAggregationBuilder.NAME, ValueCountAggregationBuilder::new,ValueCountAggregationBuilder.PARSER) .addResultReader(InternalValueCount::new) .setAggregatorRegistrar(ValueCountAggregationBuilder::registerAggregators), builder); registerAggregation(new AggregationSpec(PercentilesAggregationBuilder.NAME, PercentilesAggregationBuilder::new, PercentilesAggregationBuilder::parse) .addResultReader(InternalTDigestPercentiles.NAME, InternalTDigestPercentiles::new) .addResultReader(InternalHDRPercentiles.NAME, InternalHDRPercentiles::new) .setAggregatorRegistrar(PercentilesAggregationBuilder::registerAggregators), builder); registerAggregation(new AggregationSpec(PercentileRanksAggregationBuilder.NAME, PercentileRanksAggregationBuilder::new, PercentileRanksAggregationBuilder::parse) .addResultReader(InternalTDigestPercentileRanks.NAME, InternalTDigestPercentileRanks::new) .addResultReader(InternalHDRPercentileRanks.NAME, InternalHDRPercentileRanks::new) .setAggregatorRegistrar(PercentileRanksAggregationBuilder::registerAggregators), builder); registerAggregation(new AggregationSpec(MedianAbsoluteDeviationAggregationBuilder.NAME, MedianAbsoluteDeviationAggregationBuilder::new, MedianAbsoluteDeviationAggregationBuilder.PARSER) .addResultReader(InternalMedianAbsoluteDeviation::new) .setAggregatorRegistrar(MedianAbsoluteDeviationAggregationBuilder::registerAggregators), builder); registerAggregation(new AggregationSpec(CardinalityAggregationBuilder.NAME, CardinalityAggregationBuilder::new, ... return builder.build(); }
查询子阶段服务
private void registerFetchSubPhases(List<SearchPlugin> plugins) { //根据doc id执行fetch结果 registerFetchSubPhase(new ExplainPhase()); //获取doc values 列式存储,用于排序和聚合 registerFetchSubPhase(new FetchDocValuesPhase()); registerFetchSubPhase(new ScriptFieldsPhase()); //获取源数据 registerFetchSubPhase(new FetchSourcePhase()); registerFetchSubPhase(new FetchVersionPhase()); registerFetchSubPhase(new SeqNoPrimaryTermPhase()); registerFetchSubPhase(new MatchedQueriesPhase()); registerFetchSubPhase(new HighlightPhase(highlighters)); registerFetchSubPhase(new FetchScorePhase()); FetchPhaseConstructionContext context = new FetchPhaseConstructionContext(highlighters); registerFromPlugin(plugins, p -> p.getFetchSubPhases(context), this::registerFetchSubPhase); }
我们继续回到node的构造函数中
这里创建了GatewayModule
//元数据处理模块,包括集群级元数据,索引级元数据
modules.add(new GatewayModule());
创建页缓存管理,ES非常依赖pagecache,一般es内存设置的时候尽量有一半内存留给页缓存,提升io效率
//页缓存回收器
PageCacheRecycler pageCacheRecycler = createPageCacheRecycler(settings);
BigArrays bigArrays = createBigArrays(pageCacheRecycler, circuitBreakerService);
modules.add(settingsModule);
创建元数据管理服务和持久化元数据服务,其实就是集群的状态,有哪些索引、索引有几个分片、几个副本、分片的allocation id、主分片信息等
//索引存储
final Map<String, IndexStorePlugin.DirectoryFactory> indexStoreFactories =
pluginsService.filterPlugins(IndexStorePlugin.class)
.stream()
.map(IndexStorePlugin::getDirectoryFactories)
.flatMap(m -> m.entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
索引存储服务,底层其实每个分片就是一个lucene实例
创建索引服务
//索引服务
final IndicesService indicesService =
new IndicesService(settings, pluginsService, nodeEnvironment, xContentRegistry, analysisModule.getAnalysisRegistry(),
clusterModule.getIndexNameExpressionResolver(), indicesModule.getMapperRegistry(), namedWriteableRegistry,
threadPool, settingsModule.getIndexScopedSettings(), circuitBreakerService, bigArrays, scriptService,
clusterService, client, metaStateService, engineFactoryProviders, indexStoreFactories,
searchModule.getValuesSourceRegistry());
内部通信各种action处理服务
//各种内部请求处理操作 ActionModule actionModule = new ActionModule(false, settings, clusterModule.getIndexNameExpressionResolver(), settingsModule.getIndexScopedSettings(), settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(), threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService, usageService, clusterService); modules.add(actionModule); public ActionModule(boolean transportClient, Settings settings, IndexNameExpressionResolver indexNameExpressionResolver, IndexScopedSettings indexScopedSettings, ClusterSettings clusterSettings, SettingsFilter settingsFilter, ThreadPool threadPool, List<ActionPlugin> actionPlugins, NodeClient nodeClient, CircuitBreakerService circuitBreakerService, UsageService usageService, ClusterService clusterService) { this.transportClient = transportClient; this.settings = settings; this.indexNameExpressionResolver = indexNameExpressionResolver; this.indexScopedSettings = indexScopedSettings; this.clusterSettings = clusterSettings; this.settingsFilter = settingsFilter; this.actionPlugins = actionPlugins; this.clusterService = clusterService; //设置所有请求 actions = setupActions(actionPlugins); actionFilters = setupActionFilters(actionPlugins); autoCreateIndex = transportClient ? null : new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver); destructiveOperations = new DestructiveOperations(settings, clusterSettings); Set<RestHeaderDefinition> headers = Stream.concat( actionPlugins.stream().flatMap(p -> p.getRestHeaders().stream()), Stream.of(new RestHeaderDefinition(Task.X_OPAQUE_ID, false)) ).collect(Collectors.toSet()); UnaryOperator<RestHandler> restWrapper = null; for (ActionPlugin plugin : actionPlugins) { UnaryOperator<RestHandler> newRestWrapper = plugin.getRestHandlerWrapper(threadPool.getThreadContext()); if (newRestWrapper != null) { logger.debug("Using REST wrapper from plugin " + plugin.getClass().getName()); if (restWrapper != null) { throw new IllegalArgumentException("Cannot have more than one plugin implementing a REST wrapper"); } restWrapper = newRestWrapper; } } mappingRequestValidators = new RequestValidators<>( actionPlugins.stream().flatMap(p -> p.mappingRequestValidators().stream()).collect(Collectors.toList())); indicesAliasesRequestRequestValidators = new RequestValidators<>( actionPlugins.stream().flatMap(p -> p.indicesAliasesRequestValidators().stream()).collect(Collectors.toList())); if (transportClient) { restController = null; } else { restController = new RestController(headers, restWrapper, nodeClient, circuitBreakerService, usageService); } } static Map<String, ActionHandler<?, ?>> setupActions(List<ActionPlugin> actionPlugins) { // Subclass NamedRegistry for easy registration class ActionRegistry extends NamedRegistry<ActionHandler<?, ?>> { ActionRegistry() { super("action"); } public void register(ActionHandler<?, ?> handler) { register(handler.getAction().name(), handler); } public <Request extends ActionRequest, Response extends ActionResponse> void register( ActionType<Response> action, Class<? extends TransportAction<Request, Response>> transportAction, Class<?>... supportTransportActions) { register(new ActionHandler<>(action, transportAction, supportTransportActions)); } } ActionRegistry actions = new ActionRegistry(); actions.register(MainAction.INSTANCE, TransportMainAction.class); actions.register(NodesInfoAction.INSTANCE, TransportNodesInfoAction.class); actions.register(RemoteInfoAction.INSTANCE, TransportRemoteInfoAction.class); actions.register(NodesStatsAction.INSTANCE, TransportNodesStatsAction.class); actions.register(NodesUsageAction.INSTANCE, TransportNodesUsageAction.class); actions.register(NodesHotThreadsAction.INSTANCE, TransportNodesHotThreadsAction.class); actions.register(ListTasksAction.INSTANCE, TransportListTasksAction.class); actions.register(GetTaskAction.INSTANCE, TransportGetTaskAction.class); actions.register(CancelTasksAction.INSTANCE, TransportCancelTasksAction.class); actions.register(AddVotingConfigExclusionsAction.INSTANCE, TransportAddVotingConfigExclusionsAction.class); actions.register(ClearVotingConfigExclusionsAction.INSTANCE, TransportClearVotingConfigExclusionsAction.class); actions.register(ClusterAllocationExplainAction.INSTANCE, TransportClusterAllocationExplainAction.class); actions.register(ClusterStatsAction.INSTANCE, TransportClusterStatsAction.class); actions.register(ClusterStateAction.INSTANCE, TransportClusterStateAction.class); actions.register(ClusterHealthAction.INSTANCE, TransportClusterHealthAction.class); actions.register(ClusterUpdateSettingsAction.INSTANCE, TransportClusterUpdateSettingsAction.class); actions.register(ClusterRerouteAction.INSTANCE, TransportClusterRerouteAction.class); actions.register(ClusterSearchShardsAction.INSTANCE, TransportClusterSearchShardsAction.class); actions.register(PendingClusterTasksAction.INSTANCE, TransportPendingClusterTasksAction.class); actions.register(PutRepositoryAction.INSTANCE, TransportPutRepositoryAction.class); actions.register(GetRepositoriesAction.INSTANCE, TransportGetRepositoriesAction.class); actions.register(DeleteRepositoryAction.INSTANCE, TransportDeleteRepositoryAction.class); actions.register(VerifyRepositoryAction.INSTANCE, TransportVerifyRepositoryAction.class); actions.register(CleanupRepositoryAction.INSTANCE, TransportCleanupRepositoryAction.class); actions.register(GetSnapshotsAction.INSTANCE, TransportGetSnapshotsAction.class); actions.register(DeleteSnapshotAction.INSTANCE, TransportDeleteSnapshotAction.class); actions.register(CreateSnapshotAction.INSTANCE, TransportCreateSnapshotAction.class); actions.register(RestoreSnapshotAction.INSTANCE, TransportRestoreSnapshotAction.class); actions.register(SnapshotsStatusAction.INSTANCE, TransportSnapshotsStatusAction.class); actions.register(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class); actions.register(IndicesSegmentsAction.INSTANCE, TransportIndicesSegmentsAction.class); actions.register(IndicesShardStoresAction.INSTANCE, TransportIndicesShardStoresAction.class); //创建索引请求 actions.register(CreateIndexAction.INSTANCE, TransportCreateIndexAction.class); actions.register(ResizeAction.INSTANCE, TransportResizeAction.class); actions.register(RolloverAction.INSTANCE, TransportRolloverAction.class); actions.register(DeleteIndexAction.INSTANCE, TransportDeleteIndexAction.class); actions.register(GetIndexAction.INSTANCE, TransportGetIndexAction.class); actions.register(OpenIndexAction.INSTANCE, TransportOpenIndexAction.class); actions.register(CloseIndexAction.INSTANCE, TransportCloseIndexAction.class); actions.register(IndicesExistsAction.INSTANCE, TransportIndicesExistsAction.class); actions.register(TypesExistsAction.INSTANCE, TransportTypesExistsAction.class); actions.register(GetMappingsAction.INSTANCE, TransportGetMappingsAction.class); actions.register(GetFieldMappingsAction.INSTANCE, TransportGetFieldMappingsAction.class, TransportGetFieldMappingsIndexAction.class); actions.register(PutMappingAction.INSTANCE, TransportPutMappingAction.class); actions.register(IndicesAliasesAction.INSTANCE, TransportIndicesAliasesAction.class); actions.register(UpdateSettingsAction.INSTANCE, TransportUpdateSettingsAction.class); actions.register(AnalyzeAction.INSTANCE, TransportAnalyzeAction.class); actions.register(PutIndexTemplateAction.INSTANCE, TransportPutIndexTemplateAction.class); actions.register(GetIndexTemplatesAction.INSTANCE, TransportGetIndexTemplatesAction.class); actions.register(DeleteIndexTemplateAction.INSTANCE, TransportDeleteIndexTemplateAction.class); actions.register(PutComponentTemplateAction.INSTANCE, TransportPutComponentTemplateAction.class); actions.register(GetComponentTemplateAction.INSTANCE, TransportGetComponentTemplateAction.class); actions.register(DeleteComponentTemplateAction.INSTANCE, TransportDeleteComponentTemplateAction.class); actions.register(PutComposableIndexTemplateAction.INSTANCE, TransportPutComposableIndexTemplateAction.class); actions.register(GetComposableIndexTemplateAction.INSTANCE, TransportGetComposableIndexTemplateAction.class); actions.register(DeleteComposableIndexTemplateAction.INSTANCE, TransportDeleteComposableIndexTemplateAction.class); actions.register(SimulateIndexTemplateAction.INSTANCE, TransportSimulateIndexTemplateAction.class); actions.register(ValidateQueryAction.INSTANCE, TransportValidateQueryAction.class); actions.register(RefreshAction.INSTANCE, TransportRefreshAction.class); actions.register(FlushAction.INSTANCE, TransportFlushAction.class); actions.register(SyncedFlushAction.INSTANCE, TransportSyncedFlushAction.class); actions.register(ForceMergeAction.INSTANCE, TransportForceMergeAction.class); actions.register(UpgradeAction.INSTANCE, TransportUpgradeAction.class); actions.register(UpgradeStatusAction.INSTANCE, TransportUpgradeStatusAction.class); actions.register(UpgradeSettingsAction.INSTANCE, TransportUpgradeSettingsAction.class); actions.register(ClearIndicesCacheAction.INSTANCE, TransportClearIndicesCacheAction.class); actions.register(GetAliasesAction.INSTANCE, TransportGetAliasesAction.class); actions.register(AliasesExistAction.INSTANCE, TransportAliasesExistAction.class); actions.register(GetSettingsAction.INSTANCE, TransportGetSettingsAction.class); actions.register(IndexAction.INSTANCE, TransportIndexAction.class); actions.register(GetAction.INSTANCE, TransportGetAction.class); actions.register(TermVectorsAction.INSTANCE, TransportTermVectorsAction.class); actions.register(MultiTermVectorsAction.INSTANCE, TransportMultiTermVectorsAction.class, TransportShardMultiTermsVectorAction.class); actions.register(DeleteAction.INSTANCE, TransportDeleteAction.class); actions.register(UpdateAction.INSTANCE, TransportUpdateAction.class); actions.register(MultiGetAction.INSTANCE, TransportMultiGetAction.class, TransportShardMultiGetAction.class); //注册批处理 actions.register(BulkAction.INSTANCE, TransportBulkAction.class, TransportShardBulkAction.class); actions.register(SearchAction.INSTANCE, TransportSearchAction.class); actions.register(SearchScrollAction.INSTANCE, TransportSearchScrollAction.class); actions.register(MultiSearchAction.INSTANCE, TransportMultiSearchAction.class); actions.register(ExplainAction.INSTANCE, TransportExplainAction.class); actions.register(ClearScrollAction.INSTANCE, TransportClearScrollAction.class); actions.register(RecoveryAction.INSTANCE, TransportRecoveryAction.class); actions.register(NodesReloadSecureSettingsAction.INSTANCE, TransportNodesReloadSecureSettingsAction.class); actions.register(AutoCreateAction.INSTANCE, AutoCreateAction.TransportAction.class); //Indexed scripts actions.register(PutStoredScriptAction.INSTANCE, TransportPutStoredScriptAction.class); actions.register(GetStoredScriptAction.INSTANCE, TransportGetStoredScriptAction.class); actions.register(DeleteStoredScriptAction.INSTANCE, TransportDeleteStoredScriptAction.class); actions.register(GetScriptContextAction.INSTANCE, TransportGetScriptContextAction.class); actions.register(GetScriptLanguageAction.INSTANCE, TransportGetScriptLanguageAction.class); actions.register(FieldCapabilitiesAction.INSTANCE, TransportFieldCapabilitiesAction.class, TransportFieldCapabilitiesIndexAction.class); actions.register(PutPipelineAction.INSTANCE, PutPipelineTransportAction.class); actions.register(GetPipelineAction.INSTANCE, GetPipelineTransportAction.class); actions.register(DeletePipelineAction.INSTANCE, DeletePipelineTransportAction.class); actions.register(SimulatePipelineAction.INSTANCE, SimulatePipelineTransportAction.class); actionPlugins.stream().flatMap(p -> p.getActions().stream()).forEach(actions::register); // Data streams: if (DATASTREAMS_FEATURE_ENABLED) { actions.register(CreateDataStreamAction.INSTANCE, CreateDataStreamAction.TransportAction.class); actions.register(DeleteDataStreamAction.INSTANCE, DeleteDataStreamAction.TransportAction.class); actions.register(GetDataStreamsAction.INSTANCE, GetDataStreamsAction.TransportAction.class); } // Persistent tasks: actions.register(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class); actions.register(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class); actions.register(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class); actions.register(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class); // retention leases actions.register(RetentionLeaseActions.Add.INSTANCE, RetentionLeaseActions.Add.TransportAction.class); actions.register(RetentionLeaseActions.Renew.INSTANCE, RetentionLeaseActions.Renew.TransportAction.class); actions.register(RetentionLeaseActions.Remove.INSTANCE, RetentionLeaseActions.Remove.TransportAction.class); //返回注册请求 return unmodifiableMap(actions.getRegistry()); }
可以看到把所有的action调用处理的xxxaction都注册进来
//rest处理
final RestController restController = actionModule.getRestController();
//处理注册和绑定所有网络相关类的模块
final NetworkModule networkModule = new NetworkModule(settings, false, pluginsService.filterPlugins(NetworkPlugin.class),
threadPool, bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry,
networkService, restController, clusterService.getClusterSettings());
Collection<UnaryOperator<Map<String, IndexTemplateMetadata>>> indexTemplateMetadataUpgraders =
pluginsService.filterPlugins(Plugin.class).stream()
.map(Plugin::getIndexTemplateMetadataUpgrader)
.collect(Collectors.toList());
创建rest请求处理器和网络模块
public NetworkModule(Settings settings, boolean transportClient, List<NetworkPlugin> plugins, ThreadPool threadPool, BigArrays bigArrays, PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, NamedXContentRegistry xContentRegistry, NetworkService networkService, HttpServerTransport.Dispatcher dispatcher, ClusterSettings clusterSettings) { this.settings = settings; this.transportClient = transportClient; for (NetworkPlugin plugin : plugins) { //获取http服务端 Map<String, Supplier<HttpServerTransport>> httpTransportFactory = plugin.getHttpTransports(settings, threadPool, bigArrays, pageCacheRecycler, circuitBreakerService, xContentRegistry, networkService, dispatcher, clusterSettings); if (transportClient == false) { for (Map.Entry<String, Supplier<HttpServerTransport>> entry : httpTransportFactory.entrySet()) { registerHttpTransport(entry.getKey(), entry.getValue()); } } Map<String, Supplier<Transport>> transportFactory = plugin.getTransports(settings, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService); for (Map.Entry<String, Supplier<Transport>> entry : transportFactory.entrySet()) { registerTransport(entry.getKey(), entry.getValue()); } List<TransportInterceptor> transportInterceptors = plugin.getTransportInterceptors(namedWriteableRegistry, threadPool.getThreadContext()); for (TransportInterceptor interceptor : transportInterceptors) { registerTransportInterceptor(interceptor); } } }
主要有三种,http服务处理、tcp处理和请求拦截器
final TransportService transportService = newTransportService(settings, transport, threadPool, networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), taskHeaders); final GatewayMetaState gatewayMetaState = new GatewayMetaState(); final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService); //查询传输服务 final SearchTransportService searchTransportService = new SearchTransportService(transportService, SearchExecutionStatsCollector.makeWrapper(responseCollectorService)); //http服务端 final HttpServerTransport httpServerTransport = newHttpTransport(networkModule); public TransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor transportInterceptor, Function<BoundTransportAddress, DiscoveryNode> localNodeFactory, @Nullable ClusterSettings clusterSettings, Set<String> taskHeaders, ConnectionManager connectionManager) { // The only time we do not want to validate node connections is when this is a transport client using the simple node sampler this.validateConnections = TransportClient.CLIENT_TYPE.equals(settings.get(Client.CLIENT_TYPE_SETTING_S.getKey())) == false || TransportClient.CLIENT_TRANSPORT_SNIFF.get(settings); this.transport = transport; this.threadPool = threadPool; this.localNodeFactory = localNodeFactory; this.connectionManager = connectionManager; this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings); setTracerLogInclude(TransportSettings.TRACE_LOG_INCLUDE_SETTING.get(settings)); setTracerLogExclude(TransportSettings.TRACE_LOG_EXCLUDE_SETTING.get(settings)); tracerLog = Loggers.getLogger(logger, ".tracer"); taskManager = createTaskManager(settings, threadPool, taskHeaders); this.interceptor = transportInterceptor; this.asyncSender = interceptor.interceptSender(this::sendRequestInternal); this.remoteClusterClient = Node.NODE_REMOTE_CLUSTER_CLIENT.get(settings); remoteClusterService = new RemoteClusterService(settings, this); responseHandlers = transport.getResponseHandlers(); if (clusterSettings != null) { clusterSettings.addSettingsUpdateConsumer(TransportSettings.TRACE_LOG_INCLUDE_SETTING, this::setTracerLogInclude); clusterSettings.addSettingsUpdateConsumer(TransportSettings.TRACE_LOG_EXCLUDE_SETTING, this::setTracerLogExclude); if (remoteClusterClient) { remoteClusterService.listenForUpdates(clusterSettings); } } //注册握手服务 registerRequestHandler( HANDSHAKE_ACTION_NAME, ThreadPool.Names.SAME, false, false, HandshakeRequest::new, (request, channel, task) -> channel.sendResponse( new HandshakeResponse(localNode, clusterName, localNode.getVersion()))); }
创建传输服务,底层依赖于transport插件真正发送请求和接收请求
//仓库 RepositoriesModule repositoriesModule = new RepositoriesModule(this.environment, pluginsService.filterPlugins(RepositoryPlugin.class), transportService, clusterService, threadPool, xContentRegistry); RepositoriesService repositoryService = repositoriesModule.getRepositoryService(); repositoriesServiceReference.set(repositoryService); //快照 SnapshotsService snapshotsService = new SnapshotsService(settings, clusterService, clusterModule.getIndexNameExpressionResolver(), repositoryService, threadPool); SnapshotShardsService snapshotShardsService = new SnapshotShardsService(settings, clusterService, repositoryService, threadPool, transportService, indicesService, actionModule.getActionFilters(), clusterModule.getIndexNameExpressionResolver()); //节点快照状态 TransportNodesSnapshotsStatus nodesSnapshotsStatus = new TransportNodesSnapshotsStatus(threadPool, clusterService, transportService, snapshotShardsService, actionModule.getActionFilters()); //快照恢复 RestoreService restoreService = new RestoreService(clusterService, repositoryService, clusterModule.getAllocationService(), metadataCreateIndexService, metadataIndexUpgradeService, clusterService.getClusterSettings()); //集群重新平衡服务 final RerouteService rerouteService = new BatchedRerouteService(clusterService, clusterModule.getAllocationService()::reroute); //磁盘使用监控 final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor(settings, clusterService::state, clusterService.getClusterSettings(), client, threadPool::relativeTimeInMillis, rerouteService); clusterInfoService.addListener(diskThresholdMonitor::onNewInfo);
创建仓库、快照等服务
//服务发现 final DiscoveryModule discoveryModule = new DiscoveryModule(settings, threadPool, transportService, namedWriteableRegistry, networkService, clusterService.getMasterService(), clusterService.getClusterApplierService(), clusterService.getClusterSettings(), pluginsService.filterPlugins(DiscoveryPlugin.class), clusterModule.getAllocationService(), environment.configFile(), gatewayMetaState, rerouteService); public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportService transportService, NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService, MasterService masterService, ClusterApplier clusterApplier, ClusterSettings clusterSettings, List<DiscoveryPlugin> plugins, AllocationService allocationService, Path configFile, GatewayMetaState gatewayMetaState, RerouteService rerouteService) { final Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators = new ArrayList<>(); final Map<String, Supplier<SeedHostsProvider>> hostProviders = new HashMap<>(); hostProviders.put("settings", () -> new SettingsBasedSeedHostsProvider(settings, transportService)); hostProviders.put("file", () -> new FileBasedSeedHostsProvider(configFile)); final Map<String, ElectionStrategy> electionStrategies = new HashMap<>(); electionStrategies.put(DEFAULT_ELECTION_STRATEGY, ElectionStrategy.DEFAULT_INSTANCE); for (DiscoveryPlugin plugin : plugins) { plugin.getSeedHostProviders(transportService, networkService).forEach((key, value) -> { if (hostProviders.put(key, value) != null) { throw new IllegalArgumentException("Cannot register seed provider [" + key + "] twice"); } }); BiConsumer<DiscoveryNode, ClusterState> joinValidator = plugin.getJoinValidator(); if (joinValidator != null) { joinValidators.add(joinValidator); } plugin.getElectionStrategies().forEach((key, value) -> { if (electionStrategies.put(key, value) != null) { throw new IllegalArgumentException("Cannot register election strategy [" + key + "] twice"); } }); } List<String> seedProviderNames = getSeedProviderNames(settings); // for bwc purposes, add settings provider even if not explicitly specified if (seedProviderNames.contains("settings") == false) { List<String> extendedSeedProviderNames = new ArrayList<>(); extendedSeedProviderNames.add("settings"); extendedSeedProviderNames.addAll(seedProviderNames); seedProviderNames = extendedSeedProviderNames; } final Set<String> missingProviderNames = new HashSet<>(seedProviderNames); missingProviderNames.removeAll(hostProviders.keySet()); if (missingProviderNames.isEmpty() == false) { throw new IllegalArgumentException("Unknown seed providers " + missingProviderNames); } List<SeedHostsProvider> filteredSeedProviders = seedProviderNames.stream() .map(hostProviders::get).map(Supplier::get).collect(Collectors.toList()); String discoveryType = DISCOVERY_TYPE_SETTING.get(settings); //集群主机地址 final SeedHostsProvider seedHostsProvider = hostsResolver -> { final List<TransportAddress> addresses = new ArrayList<>(); for (SeedHostsProvider provider : filteredSeedProviders) { addresses.addAll(provider.getSeedAddresses(hostsResolver)); } return Collections.unmodifiableList(addresses); }; final ElectionStrategy electionStrategy = electionStrategies.get(ELECTION_STRATEGY_SETTING.get(settings)); if (electionStrategy == null) { throw new IllegalArgumentException("Unknown election strategy " + ELECTION_STRATEGY_SETTING.get(settings)); } //新版本发现模式 if (ZEN2_DISCOVERY_TYPE.equals(discoveryType) || SINGLE_NODE_DISCOVERY_TYPE.equals(discoveryType)) { discovery = new Coordinator(NODE_NAME_SETTING.get(settings), settings, clusterSettings, transportService, namedWriteableRegistry, allocationService, masterService, gatewayMetaState::getPersistedState, seedHostsProvider, clusterApplier, joinValidators, new Random(Randomness.get().nextLong()), rerouteService, electionStrategy); } else if (ZEN_DISCOVERY_TYPE.equals(discoveryType)) { discovery = new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier, clusterSettings, seedHostsProvider, allocationService, joinValidators, rerouteService); } else { throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]"); } logger.info("using discovery type [{}] and seed hosts providers {}", discoveryType, seedProviderNames); }
节点发现服务,用于节点发现,加入集群,根据配置文件获取不同的选取服务默认为zen,并且启动时会打印日志
//节点服务 this.nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(), transportService, indicesService, pluginsService, circuitBreakerService, scriptService, httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService, searchTransportService); //查询服务 final SearchService searchService = newSearchService(clusterService, indicesService, threadPool, scriptService, bigArrays, searchModule.getFetchPhase(), responseCollectorService, circuitBreakerService); final List<PersistentTasksExecutor<?>> tasksExecutors = pluginsService .filterPlugins(PersistentTaskPlugin.class).stream() .map(p -> p.getPersistentTasksExecutor(clusterService, threadPool, client, settingsModule, clusterModule.getIndexNameExpressionResolver())) .flatMap(List::stream) .collect(toList()); final PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(tasksExecutors); //仅在主节点上运行的组件,负责将正在运行的任务分配给节点 final PersistentTasksClusterService persistentTasksClusterService = new PersistentTasksClusterService(settings, registry, clusterService, threadPool); resourcesToClose.add(persistentTasksClusterService); //传递任务 final PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, threadPool, client);
然后开始创建容器将组件都注入进来
modules.add(b -> {
b.bind(Node.class).toInstance(this);
b.bind(NodeService.class).toInstance(nodeService);
...
}
);
//注入依赖
injector = modules.createInjector();
注册已存在分片分配器和声明周期组件,加载各种actions处理到client中,加载各种rest请求处理器
//设置已存在分片的分配器 clusterModule.setExistingShardsAllocators(injector.getInstance(GatewayAllocator.class)); //声明周期组件 List<LifecycleComponent> pluginLifecycleComponents = pluginComponents.stream() .filter(p -> p instanceof LifecycleComponent) .map(p -> (LifecycleComponent) p).collect(Collectors.toList()); pluginLifecycleComponents.addAll(pluginsService.getGuiceServiceClasses().stream() .map(injector::getInstance).collect(Collectors.toList())); resourcesToClose.addAll(pluginLifecycleComponents); resourcesToClose.add(injector.getInstance(PeerRecoverySourceService.class)); this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents); //初始化注册本地处理action client.initialize(injector.getInstance(new Key<Map<ActionType, TransportAction>>() {}), () -> clusterService.localNode().getId(), transportService.getRemoteClusterService()); logger.debug("initializing HTTP handlers ..."); //注册rest处理handler actionModule.initRestHandlers(() -> clusterService.state().nodes()); logger.info("initialized");
//注册请求处理类handler public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) { List<AbstractCatAction> catActions = new ArrayList<>(); //注册rest请求处理类 Consumer<RestHandler> registerHandler = handler -> { if (handler instanceof AbstractCatAction) { //添加到查看运行状态处理类集合中 catActions.add((AbstractCatAction) handler); } restController.registerHandler(handler); }; registerHandler.accept(new RestAddVotingConfigExclusionAction()); registerHandler.accept(new RestClearVotingConfigExclusionsAction()); registerHandler.accept(new RestMainAction()); registerHandler.accept(new RestNodesInfoAction(settingsFilter)); registerHandler.accept(new RestRemoteClusterInfoAction()); registerHandler.accept(new RestNodesStatsAction()); registerHandler.accept(new RestNodesUsageAction()); registerHandler.accept(new RestNodesHotThreadsAction()); registerHandler.accept(new RestClusterAllocationExplainAction()); registerHandler.accept(new RestClusterStatsAction()); registerHandler.accept(new RestClusterStateAction(settingsFilter)); registerHandler.accept(new RestClusterHealthAction()); registerHandler.accept(new RestClusterUpdateSettingsAction()); registerHandler.accept(new RestClusterGetSettingsAction(settings, clusterSettings, settingsFilter)); registerHandler.accept(new RestClusterRerouteAction(settingsFilter)); registerHandler.accept(new RestClusterSearchShardsAction()); registerHandler.accept(new RestPendingClusterTasksAction()); //索引快照仓库 registerHandler.accept(new RestPutRepositoryAction()); registerHandler.accept(new RestGetRepositoriesAction(settingsFilter)); registerHandler.accept(new RestDeleteRepositoryAction()); registerHandler.accept(new RestVerifyRepositoryAction()); registerHandler.accept(new RestCleanupRepositoryAction()); //快照处理 registerHandler.accept(new RestGetSnapshotsAction()); registerHandler.accept(new RestCreateSnapshotAction()); registerHandler.accept(new RestRestoreSnapshotAction()); registerHandler.accept(new RestDeleteSnapshotAction()); registerHandler.accept(new RestSnapshotsStatusAction()); // registerHandler.accept(new RestGetIndicesAction()); registerHandler.accept(new RestIndicesStatsAction()); registerHandler.accept(new RestIndicesSegmentsAction()); registerHandler.accept(new RestIndicesShardStoresAction()); //索引别名处理 registerHandler.accept(new RestGetAliasesAction()); registerHandler.accept(new RestIndexDeleteAliasesAction()); registerHandler.accept(new RestIndexPutAliasAction()); registerHandler.accept(new RestIndicesAliasesAction()); //创建索引 registerHandler.accept(new RestCreateIndexAction()); registerHandler.accept(new RestResizeHandler.RestShrinkIndexAction()); registerHandler.accept(new RestResizeHandler.RestSplitIndexAction()); registerHandler.accept(new RestResizeHandler.RestCloneIndexAction()); registerHandler.accept(new RestRolloverIndexAction()); //删除索引 registerHandler.accept(new RestDeleteIndexAction()); //关闭索引 registerHandler.accept(new RestCloseIndexAction()); //开启索引 registerHandler.accept(new RestOpenIndexAction()); registerHandler.accept(new RestUpdateSettingsAction()); registerHandler.accept(new RestGetSettingsAction()); registerHandler.accept(new RestAnalyzeAction()); //获取索引模板 registerHandler.accept(new RestGetIndexTemplateAction()); //修改/新增索引模板 registerHandler.accept(new RestPutIndexTemplateAction()); //删除索引模板 registerHandler.accept(new RestDeleteIndexTemplateAction()); registerHandler.accept(new RestPutComponentTemplateAction()); registerHandler.accept(new RestGetComponentTemplateAction()); registerHandler.accept(new RestDeleteComponentTemplateAction()); registerHandler.accept(new RestPutComposableIndexTemplateAction()); registerHandler.accept(new RestGetComposableIndexTemplateAction()); registerHandler.accept(new RestDeleteComposableIndexTemplateAction()); registerHandler.accept(new RestSimulateIndexTemplateAction()); //修改或新增索映射 registerHandler.accept(new RestPutMappingAction()); registerHandler.accept(new RestGetMappingAction()); registerHandler.accept(new RestGetFieldMappingAction()); registerHandler.accept(new RestRefreshAction()); registerHandler.accept(new RestFlushAction()); registerHandler.accept(new RestSyncedFlushAction()); registerHandler.accept(new RestForceMergeAction()); registerHandler.accept(new RestUpgradeAction()); registerHandler.accept(new RestUpgradeStatusAction()); registerHandler.accept(new RestClearIndicesCacheAction()); registerHandler.accept(new RestIndexAction()); registerHandler.accept(new CreateHandler()); registerHandler.accept(new AutoIdHandler(nodesInCluster)); registerHandler.accept(new RestGetAction()); registerHandler.accept(new RestGetSourceAction()); registerHandler.accept(new RestMultiGetAction(settings)); registerHandler.accept(new RestDeleteAction()); registerHandler.accept(new RestCountAction()); registerHandler.accept(new RestTermVectorsAction()); registerHandler.accept(new RestMultiTermVectorsAction()); //批量操作 registerHandler.accept(new RestBulkAction(settings)); registerHandler.accept(new RestUpdateAction()); //查询操作 registerHandler.accept(new RestSearchAction()); //快照滚动分页查询 registerHandler.accept(new RestSearchScrollAction()); //删除分页查询快照 registerHandler.accept(new RestClearScrollAction()); //多个查询 registerHandler.accept(new RestMultiSearchAction(settings)); registerHandler.accept(new RestValidateQueryAction()); //查询分析 registerHandler.accept(new RestExplainAction()); //索引恢复操作 registerHandler.accept(new RestRecoveryAction()); registerHandler.accept(new RestReloadSecureSettingsAction()); // Scripts API 脚本API registerHandler.accept(new RestGetStoredScriptAction()); registerHandler.accept(new RestPutStoredScriptAction()); registerHandler.accept(new RestDeleteStoredScriptAction()); registerHandler.accept(new RestGetScriptContextAction()); registerHandler.accept(new RestGetScriptLanguageAction()); registerHandler.accept(new RestFieldCapabilitiesAction()); // Tasks API 任务处理API registerHandler.accept(new RestListTasksAction(nodesInCluster)); registerHandler.accept(new RestGetTaskAction()); registerHandler.accept(new RestCancelTasksAction(nodesInCluster)); // Ingest API 预处理API registerHandler.accept(new RestPutPipelineAction()); registerHandler.accept(new RestGetPipelineAction()); registerHandler.accept(new RestDeletePipelineAction()); registerHandler.accept(new RestSimulatePipelineAction()); // Data Stream API if (DATASTREAMS_FEATURE_ENABLED) { registerHandler.accept(new RestCreateDataStreamAction()); registerHandler.accept(new RestDeleteDataStreamAction()); registerHandler.accept(new RestGetDataStreamsAction()); } // CAT API 查看运行状态API registerHandler.accept(new RestAllocationAction()); registerHandler.accept(new RestShardsAction()); registerHandler.accept(new RestMasterAction()); registerHandler.accept(new RestNodesAction()); registerHandler.accept(new RestTasksAction(nodesInCluster)); registerHandler.accept(new RestIndicesAction()); registerHandler.accept(new RestSegmentsAction()); // Fully qualified to prevent interference with rest.action.count.RestCountAction registerHandler.accept(new org.elasticsearch.rest.action.cat.RestCountAction()); // Fully qualified to prevent interference with rest.action.indices.RestRecoveryAction registerHandler.accept(new RestCatRecoveryAction()); registerHandler.accept(new RestHealthAction()); registerHandler.accept(new org.elasticsearch.rest.action.cat.RestPendingClusterTasksAction()); registerHandler.accept(new RestAliasAction()); registerHandler.accept(new RestThreadPoolAction()); registerHandler.accept(new RestPluginsAction()); registerHandler.accept(new RestFielddataAction()); registerHandler.accept(new RestNodeAttrsAction()); registerHandler.accept(new RestRepositoriesAction()); registerHandler.accept(new RestSnapshotAction()); registerHandler.accept(new RestTemplatesAction()); for (ActionPlugin plugin : actionPlugins) { for (RestHandler handler : plugin.getRestHandlers(settings, restController, clusterSettings, indexScopedSettings, settingsFilter, indexNameExpressionResolver, nodesInCluster)) { registerHandler.accept(handler); } } registerHandler.accept(new RestCatAction(catActions)); }
到这里node的构造函数执行结束,各种组件已经创建完成
调用start方法启动服务
//启动节点
INSTANCE.start();
private void start() throws NodeValidationException {
//启动node节点
node.start();
//启动存活,有线程存在不会退出
keepAliveThread.start();
}
保证只会启动一次
//只能启动一次
if (!lifecycle.moveToStarted()) {
return this;
}
//生命周期相关插件启动
pluginLifecycleComponents.forEach(LifecycleComponent::start);
injector.getInstance(MappingUpdatedAction.class).setClient(client);
injector.getInstance(IndicesService.class).start();
injector.getInstance(IndicesClusterStateService.class).start();
injector.getInstance(SnapshotsService.class).start();
injector.getInstance(SnapshotShardsService.class).start();
injector.getInstance(RepositoriesService.class).start();
injector.getInstance(SearchService.class).start();
nodeService.getMonitorService().start();
组件都继承了AbstractLifecycleComponent类,在调用start方法时会调用该类的start方法
public void start() {
synchronized (lifecycle) {
if (!lifecycle.canMoveToStarted()) {
return;
}
for (LifecycleListener listener : listeners) {
listener.beforeStart();
}
doStart();
lifecycle.moveToStarted();
for (LifecycleListener listener : listeners) {
listener.afterStart();
}
}
}
最终调用各组件的doStart方法
nodeService.getMonitorService().start(); @Override protected void doStart() { jvmGcMonitorService.start(); } protected void doStart() { if (!enabled) { return; } scheduledFuture = threadPool.scheduleWithFixedDelay(new JvmMonitor(gcThresholds, gcOverheadThreshold) { @Override void onMonitorFailure(Exception e) { logger.debug("failed to monitor", e); } @Override void onSlowGc(final Threshold threshold, final long seq, final SlowGcEvent slowGcEvent) { logSlowGc(logger, threshold, seq, slowGcEvent, JvmGcMonitorService::buildPools); } @Override void onGcOverhead(final Threshold threshold, final long current, final long elapsed, final long seq) { logGcOverhead(logger, threshold, current, elapsed, seq); } }, interval, Names.SAME); }
最终调用到jvmGcMonitorService中的doStart方法,启动一个定时任务监控GC信息
final NodeConnectionsService nodeConnectionsService = injector.getInstance(NodeConnectionsService.class); nodeConnectionsService.start(); protected void doStart() { final ConnectionChecker connectionChecker = new ConnectionChecker(); this.connectionChecker = connectionChecker; connectionChecker.scheduleNextCheck(); } class ConnectionChecker extends AbstractRunnable { protected void doRun() { if (connectionChecker == this) { //连接其他节点 connectDisconnectedTargets(this::scheduleNextCheck); } } //重连 void scheduleNextCheck() { if (connectionChecker == this) { threadPool.scheduleUnlessShuttingDown(reconnectInterval, ThreadPool.Names.GENERIC, this); } } @Override public void onFailure(Exception e) { logger.warn("unexpected error while checking for node reconnects", e); scheduleNextCheck(); } @Override public String toString() { return "periodic reconnection checker"; } }
这里面创建了一个连接检查,不停检查连接状态,如果连接断开进行重连
private void connectDisconnectedTargets(Runnable onCompletion) { final List<Runnable> runnables = new ArrayList<>(); synchronized (mutex) { final Collection<ConnectionTarget> connectionTargets = targetsByNode.values(); if (connectionTargets.isEmpty()) { runnables.add(onCompletion); } else { logger.trace("connectDisconnectedTargets: {}", targetsByNode); final GroupedActionListener<Void> listener = new GroupedActionListener<>( ActionListener.wrap(onCompletion), connectionTargets.size()); for (final ConnectionTarget connectionTarget : connectionTargets) { //检查连接 runnables.add(connectionTarget.ensureConnected(listener)); } } } runnables.forEach(Runnable::run); }
启动gateway状态服务,将自己注册到监听器中,当集群状态变更时进行通知,执行恢复分片
injector.getInstance(GatewayService.class).start();
@Override
protected void doStart() {
// use post applied so that the state will be visible to the background recovery thread we spawn in performStateRecovery
clusterService.addListener(this);
@Override
public void clusterChanged(final ClusterChangedEvent event) {...}
设置集群状态发布服务
Discovery discovery = injector.getInstance(Discovery.class);
//集群状态发布服务
clusterService.getMasterService().setClusterStatePublisher(discovery::publish);
启动传输服务,启动底层连接,这点我们会在后面独立进行理解
//交互服务 TransportService transportService = injector.getInstance(TransportService.class); transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskResultsService.class)); transportService.start(); @Override protected void doStart() { transport.setMessageListener(this); connectionManager.addListener(this); //底层传输服务 transport.start(); if (transport.boundAddress() != null && logger.isInfoEnabled()) { logger.info("{}", transport.boundAddress()); for (Map.Entry<String, BoundTransportAddress> entry : transport.profileBoundAddresses().entrySet()) { logger.info("profile [{}]: {}", entry.getKey(), entry.getValue()); } } localNode = localNodeFactory.apply(transport.boundAddress()); if (remoteClusterClient) { // here we start to connect to the remote clusters remoteClusterService.initializeRemoteClusters(); } }
启动恢复服务
//启动恢复服务
injector.getInstance(PeerRecoverySourceService.class).start();
开始恢复集群元数据
//集群元数据
final GatewayMetaState gatewayMetaState = injector.getInstance(GatewayMetaState.class);
gatewayMetaState.start(settings(), transportService, clusterService, injector.getInstance(MetaStateService.class),
injector.getInstance(MetadataIndexUpgradeService.class), injector.getInstance(MetadataUpgrader.class),
injector.getInstance(PersistedClusterStateService.class));
集群状态每个节点都会定期保存到本地磁盘中,node刚启动时会从磁盘中加载元数据进入内存中构造节点状态,为后面发布整个状态做准备
public void start(Settings settings, TransportService transportService, ClusterService clusterService, MetaStateService metaStateService, MetadataIndexUpgradeService metadataIndexUpgradeService, MetadataUpgrader metadataUpgrader, PersistedClusterStateService persistedClusterStateService) { assert persistedState.get() == null : "should only start once, but already have " + persistedState.get(); //zen1版本 if (DiscoveryModule.DISCOVERY_TYPE_SETTING.get(settings).equals(DiscoveryModule.ZEN_DISCOVERY_TYPE)) { // only for tests that simulate mixed Zen1/Zen2 clusters, see Zen1IT final Tuple<Manifest, Metadata> manifestClusterStateTuple; try { NodeMetadata.FORMAT.writeAndCleanup(new NodeMetadata(persistedClusterStateService.getNodeId(), Version.CURRENT), persistedClusterStateService.getDataPaths()); //加载元数据 manifestClusterStateTuple = metaStateService.loadFullState(); } catch (IOException e) { throw new UncheckedIOException(e); } //集群状态 final ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings)) .version(manifestClusterStateTuple.v1().getClusterStateVersion()) .metadata(manifestClusterStateTuple.v2()).build(); final IncrementalClusterStateWriter incrementalClusterStateWriter = new IncrementalClusterStateWriter(settings, clusterService.getClusterSettings(), metaStateService, manifestClusterStateTuple.v1(), prepareInitialClusterState(transportService, clusterService, clusterState), transportService.getThreadPool()::relativeTimeInMillis); if (DiscoveryNode.isMasterNode(settings) || DiscoveryNode.isDataNode(settings)) { clusterService.addLowPriorityApplier(new GatewayClusterApplier(incrementalClusterStateWriter)); } persistedState.set(new InMemoryPersistedState(manifestClusterStateTuple.v1().getCurrentTerm(), clusterState)); return; } //master资格节点或数据节点 if (DiscoveryNode.isMasterNode(settings) || DiscoveryNode.isDataNode(settings)) { try { //从磁盘加载集群状态 final PersistedClusterStateService.OnDiskState onDiskState = persistedClusterStateService.loadBestOnDiskState(); Metadata metadata = onDiskState.metadata; long lastAcceptedVersion = onDiskState.lastAcceptedVersion; long currentTerm = onDiskState.currentTerm; if (onDiskState.empty()) { assert Version.CURRENT.major <= Version.V_7_0_0.major + 1 : "legacy metadata loader is not needed anymore from v9 onwards"; final Tuple<Manifest, Metadata> legacyState = metaStateService.loadFullState(); if (legacyState.v1().isEmpty() == false) { metadata = legacyState.v2(); lastAcceptedVersion = legacyState.v1().getClusterStateVersion(); currentTerm = legacyState.v1().getCurrentTerm(); } } PersistedState persistedState = null; boolean success = false; try { final ClusterState clusterState = prepareInitialClusterState(transportService, clusterService, ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings)) .version(lastAcceptedVersion) .metadata(upgradeMetadataForNode(metadata, metadataIndexUpgradeService, metadataUpgrader)) .build()); if (DiscoveryNode.isMasterNode(settings)) { persistedState = new LucenePersistedState(persistedClusterStateService, currentTerm, clusterState); } else { persistedState = new AsyncLucenePersistedState(settings, transportService.getThreadPool(), new LucenePersistedState(persistedClusterStateService, currentTerm, clusterState)); } if (DiscoveryNode.isDataNode(settings)) { metaStateService.unreferenceAll(); // unreference legacy files (only keep them for dangling indices functionality) } else { metaStateService.deleteAll(); // delete legacy files } // write legacy node metadata to prevent accidental downgrades from spawning empty cluster state NodeMetadata.FORMAT.writeAndCleanup(new NodeMetadata(persistedClusterStateService.getNodeId(), Version.CURRENT), persistedClusterStateService.getDataPaths()); success = true; } finally { if (success == false) { IOUtils.closeWhileHandlingException(persistedState); } } this.persistedState.set(persistedState); } catch (IOException e) { throw new ElasticsearchException("failed to load metadata", e); } } else { final long currentTerm = 0L; final ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings)).build(); if (persistedClusterStateService.getDataPaths().length > 0) { // write empty cluster state just so that we have a persistent node id. There is no need to write out global metadata with // cluster uuid as coordinating-only nodes do not snap into a cluster as they carry no state try (PersistedClusterStateService.Writer persistenceWriter = persistedClusterStateService.createWriter()) { persistenceWriter.writeFullStateAndCommit(currentTerm, clusterState); } catch (IOException e) { throw new ElasticsearchException("failed to load metadata", e); } try { // delete legacy cluster state files metaStateService.deleteAll(); // write legacy node metadata to prevent downgrades from spawning empty cluster state NodeMetadata.FORMAT.writeAndCleanup(new NodeMetadata(persistedClusterStateService.getNodeId(), Version.CURRENT), persistedClusterStateService.getDataPaths()); } catch (IOException e) { throw new UncheckedIOException(e); } } persistedState.set(new InMemoryPersistedState(currentTerm, clusterState)); } }
启动发现服务,这里我们以ZenDiscovery为例,这里会选择主节点,我们在后面会详细分析
//启动发现服务,设置初始集群状态 discovery.start(); @Override protected void doStart() { //本地节点 DiscoveryNode localNode = transportService.getLocalNode(); assert localNode != null; synchronized (stateMutex) { // set initial state 设置初始状态 assert committedState.get() == null; assert localNode != null; //集群状态中添加集群名称 ClusterState.Builder builder = ClusterState.builder(clusterName); //构建初始的集群状态 ClusterState initialState = builder .blocks(ClusterBlocks.builder() .addGlobalBlock(STATE_NOT_RECOVERED_BLOCK) .addGlobalBlock(noMasterBlockService.getNoMasterBlock())) .nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId())) .build(); //设置初始集群状态 committedState.set(initialState); clusterApplier.setInitialState(initialState); //节点容错 nodesFD.setLocalNode(localNode); //开始查询集群节点 joinThreadControl.start(); } zenPing.start(); }
集群服务启动
//集群服务启动 clusterService.start(); @Override protected synchronized void doStart() { clusterApplierService.start(); masterService.start(); } @Override protected synchronized void doStart() { //添加监听器 addListener(localNodeMasterListeners); //创建一个单线程的线程池,具有优先级。名字为:clusterApplierService#updateTask threadPoolExecutor = createThreadPoolExecutor(); } @Override protected synchronized v //创建一个单线程的具有优先级的线程池,名称为masterService#updateTask threadPoolExecutor = createThreadPoolExecutor(); //创建任务执行 taskBatcher = new Batcher(logger, threadPoolExecutor); }
开始接收请求,启动选举主节点服务,后面会详细分析
//开始接收请求
transportService.acceptIncomingRequests();
//node启动开始选举master节点
discovery.startInitialJoin();
@Override
public void startInitialJoin() {
// start the join thread from a cluster state update. See {@link JoinThreadControl} for details.
//从集群状态更新启动连接线程
synchronized (stateMutex) {
// do the join on a different thread, the caller of this method waits for 30s anyhow till it is discovered
joinThreadControl.startNewThreadIfNotRunning();
}
}
然后启动rest请求处理
//启动rest 请求处理
injector.getInstance(HttpServerTransport.class).start();
然后打印node started,并且调用插件的节点启动方法
后面开始首先选举主节点,然后选举集群元数据,分配主分片,分配副分片,索引开始执行恢复,恢复完成集群状态变成Green,开始对外提供服务。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。