赞
踩
经过上述步骤,客户端提交任务过程就完成了,主要涉及到文件(JobGraph和jar包)的上传。
远端流程
接下来就进行任务具体调度(构造ExecutionGraph、申请Slot等)流程。
主要结合一下yarn的执行流程,介绍一下任务调转执行流程。
./bin/flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 ./examples/batch/WordCount.jar
以这个执行脚本入口进行分析,入口函数org.apache.flink.client.cli.CliFrontend,此类中的Main方法是所有提交操作的开始
大体框架
本地提交逻辑
CliFrontend[Main]调用栈
- CliFrontend[Main]
- -> cli.parseParameters(args)
- -> buildProgram(runOptions)
- -> runProgram(customCommandLine, commandLine, runOptions, program)
- (根据yarn提交模式,走不同分支,以Job小Session集群方式为例)
- -> customCommandLine.createClusterDescriptor
- -> clusterDescriptor.deploySessionCluster(clusterSpecification)
- deployInternal -- block,直到ApplicationMaster/JobManager在YARN上部署完毕
- startAppMaster
- setupApplicationMasterContainer
- startCommandValues.put("class", yarnClusterEntrypoint) -- 此处是 YarnJobClusterEntrypoint[Main]
- -> executeProgram(program, client, userParallelism);
- (执行程序就是优化得到JobGraph,远程提交的过程)
runProgram(customCommandLine, commandLine, runOptions, program)
- private <T> void runProgram(
- CustomCommandLine<T> customCommandLine,
- CommandLine commandLine,
- RunOptions runOptions,
- PackagedProgram program) throws ProgramInvocationException, FlinkException {
-
- // 获取yarnClusterDescriptor,用户创建集群
- final ClusterDescriptor<T> clusterDescriptor = customCommandLine.createClusterDescriptor(commandLine);
-
- try {
- // 此处clusterId如果不为null,则表示是session模式
- final T clusterId = customCommandLine.getClusterId(commandLine);
-
- final ClusterClient<T> client;
-
- /*
- * Yarn模式:
- * 1. Job模式:每个flink job 单独在yarn上声明一个flink集群
- * 2. Session模式:在集群中维护flink master,即一个yarn application master,运行多个job。
- */
-
- if (clusterId == null && runOptions.getDetachedMode()) {
- // job + DetachedMode模式
- int parallelism = runOptions.getParallelism() == -1 ? defaultParallelism : runOptions.getParallelism();
-
- // 从jar包中获取jobGraph
- final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism);
-
- // clusterDescriptor.deployJobCluster
- // -> YarnClusterDescriptor.deployInternal
- // -> AbstractYarnClusterDescriptor.startAppMaster
- // -> AbstractYarnClusterDescriptor.yarnClient.submitApplication(appContext);
- // 新建一个RestClusterClient,在yarn集群中启动应用(ClusterEntrypoint)
- final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(commandLine);
- client = clusterDescriptor.deployJobCluster(
- clusterSpecification,
- jobGraph,
- runOptions.getDetachedMode());
-
- ......
- } else {
- final Thread shutdownHook;
- if (clusterId != null) {
- // session模式
- client = clusterDescriptor.retrieve(clusterId);
- shutdownHook = null;
- } else {
- // job + non-DetachedMode模式
- final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(commandLine);
- // 新建一个小session集群,会启动ClusterEntrypoint,提供Dispatcher,ResourceManager和WebMonitorEndpoint等服务
- client = clusterDescriptor.deploySessionCluster(clusterSpecification);
-
- // 进行资源清理的钩子
- if (!runOptions.getDetachedMode() && runOptions.isShutdownOnAttachedExit()) {
- shutdownHook = ShutdownHookUtil.addShutdownHook(client::shutDownCluster, client.getClass().getSimpleName(), LOG);
- } else {
- shutdownHook = null;
- }
- }
-
- try {
- ......
-
- // 优化图,执行程序的远程提交
- executeProgram(program, client, userParallelism);
- } finally {
- ......
- }
- }
- } finally {
- ......
- }
- }

clusterDescriptor.deploySessionCluster
- clusterDescriptor.deploySessionCluster(clusterSpecification)
- deployInternal -- block,直到ApplicationMaster/JobManager在YARN上部署完毕
- startAppMaster
- setupApplicationMasterContainer
- startCommandValues.put("class", yarnClusterEntrypoint) -- 此处是 YarnJobClusterEntrypoint[Main]
deployInternal方法,部署集群:
- protected ClusterClient<ApplicationId> deployInternal(
- ClusterSpecification clusterSpecification,
- String applicationName,
- String yarnClusterEntrypoint,
- @Nullable JobGraph jobGraph,
- boolean detached) throws Exception {
-
- // ------------------ Check if configuration is valid --------------------
- ......
-
- // ------------------ Check if the specified queue exists --------------------
-
- checkYarnQueues(yarnClient);
-
- // ------------------ Add dynamic properties to local flinkConfiguraton ------
- ......
-
- // ------------------ Check if the YARN ClusterClient has the requested resources --------------
-
- // Create application via yarnClient
- final YarnClientApplication yarnApplication = yarnClient.createApplication();
- ......
-
- // ------------------启动ApplicationMaster ----------------
- ApplicationReport report = startAppMaster(
- flinkConfiguration,
- applicationName,
- yarnClusterEntrypoint,
- jobGraph,
- yarnClient,
- yarnApplication,
- validClusterSpecification);
-
- ......
- // the Flink cluster is deployed in YARN. Represent cluster
- return createYarnClusterClient(
- this,
- validClusterSpecification.getNumberTaskManagers(),
- validClusterSpecification.getSlotsPerTaskManager(),
- report,
- flinkConfiguration,
- true);
- }

startAppMaster方法,启动ApplicationMaster:
- public ApplicationReport startAppMaster(
- Configuration configuration,
- String applicationName,
- String yarnClusterEntrypoint,
- JobGraph jobGraph,
- YarnClient yarnClient,
- YarnClientApplication yarnApplication,
- ClusterSpecification clusterSpecification) throws Exception {
-
- // ------------------ Initialize the file systems -------------------------
- ......
-
- // ------------- Set-up ApplicationSubmissionContext for the application -------------
- ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
- final ApplicationId appId = appContext.getApplicationId();
-
- // ------------------ Add Zookeeper namespace to local flinkConfiguraton ------
- ......
-
- // ------------------ 准备Yarn所需的资源和文件 ------
- // Setup jar for ApplicationMaster
- ......
-
- // 准备TaskManager的相关配置信息
- configuration.setInteger(
- TaskManagerOptions.NUM_TASK_SLOTS,
- clusterSpecification.getSlotsPerTaskManager());
-
- configuration.setString(
- TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY,
- clusterSpecification.getTaskManagerMemoryMB() + "m");
-
- // Upload the flink configuration, write out configuration file
- ......
-
- // ------------------ 启动ApplicationMasterContainer ------
- final ContainerLaunchContext amContainer = setupApplicationMasterContainer(
- yarnClusterEntrypoint,
- hasLogback,
- hasLog4j,
- hasKrb5,
- clusterSpecification.getMasterMemoryMB());
-
- // --------- set user specified app master environment variables ---------
- ......
-
- // 提交App
- yarnClient.submitApplication(appContext);
-
- // --------- Waiting for the cluster to be allocated ---------
- ......
- }

远端逻辑ClusterEntrypoint[Main]
与yarn集群打交道(这里主要是resourcemananger和taskmamanager),ClusterEntrypoint 包含了 webMonitor、resourceManager、dispatcher 的服务。
- YarnJobClusterEntrypoint[Main]
- -> ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint);
- -> clusterEntrypoint.startCluster();
- -> runCluster(configuration);
- -> clusterComponent = dispatcherResourceManagerComponentFactory.create();
- * 在同一进程中启动Dispatcher,ResourceManager和WebMonitorEndpoint组件服务
- create -> {
- webMonitorEndpoint.start();
- resourceManager.start();
- dispatcher.start();
- }
- * 重点关注ResourceManager,会创建TaskManager
- -> resourceManager = resourceManagerFactory.createResourceManager()
- -> YarnResourceManager.initialize()
- * 创建 resourceManagerClient 和 nodeManagerClient
- * YarnResourceManager 继承自 yarn 的 AMRMClientAsync.CallbackHandler接口,在Container分配完之后,回调如下接口:
- -> void onContainersAllocated(List<Container> containers)
- -> createTaskExecutorLaunchContext()
- -> Utils.createTaskExecutorContext() -- 参数 YarnTaskExecutorRunner.class, 指明TaskManager的Main入口
- -> nodeManagerClient.startContainer(container, taskExecutorLaunchContext);

- public DispatcherResourceManagerComponent<T> create(
- Configuration configuration,
- RpcService rpcService,
- HighAvailabilityServices highAvailabilityServices,
- BlobServer blobServer,
- HeartbeatServices heartbeatServices,
- MetricRegistry metricRegistry,
- ArchivedExecutionGraphStore archivedExecutionGraphStore,
- MetricQueryServiceRetriever metricQueryServiceRetriever,
- FatalErrorHandler fatalErrorHandler) throws Exception {
-
- // 创建服务后会启动部分服务
- webMonitorEndpoint.start();
- resourceManager.start(); -- 里面指明TaskExecutor(即TaskManager)的Main入口
- dispatcher.start(); -- Dispatcher服务会处理client 的 submitjob,促使TaskExecutor上的任务执行
-
- // 返回所有服务的封装类
- return createDispatcherResourceManagerComponent(
- dispatcher,
- resourceManager,
- dispatcherLeaderRetrievalService,
- resourceManagerRetrievalService,
- webMonitorEndpoint,
- jobManagerMetricGroup);
-
- } catch (Exception exception) {
- ......
- }
- }

ClusterEntrypoint会启动Dispatcher服务:
- Dispatcher
- --> onStart()
- --> startDispatcherServices()
- -> submittedJobGraphStore.start(this)
- -> leaderElectionService.start(this)
LeaderRetrievalHandler会从netty处理从Client发来的submitjob消息:
- LeaderRetrievalHandler
- -> channelRead0() -- 一个netty对象
- -> AbstractHandler.respondAsLeader()
- -> AbstractRestHandler.respondToRequest()
- -> JobSubmitHandler.handleRequest
- -> Dispatcher.submitJob
- -> Dispatcher.internalSubmitJob
- -> Dispatcher.persistAndRunJob
- -> Dispatcher.runJob
- -> Dispatcher.createJobManagerRunner -- 创建JobManagerRunner
- -> jobManagerRunnerFactory.createJobManagerRunner
- * 创建DefaultJobMasterServiceFactory
- * new JobManagerRunner()
- -> dispatcher.startJobManagerRunner -- 启动JobManagerRunner
- -> jobManagerRunner.start();
- -> ZooKeeperLeaderElectionService.start
- -> ZooKeeperLeaderElectionService.isLeader
- -> leaderContender.grantLeadership(issuedLeaderSessionID)
- -> jobManagerRunner.verifyJobSchedulingStatusAndStartJobManager
- -> startJobMaster(leaderSessionId) -- 启动JobMaster
- -> jobMasterService.start
- -> startJobExecution(newJobMasterId)
- -> startJobMasterServices -- 包括slotPool和scheduler的启动,告知flinkresourceManager leader的地址,当FlinkRM和JM建立好连接后,slot就可以开始requesting slots
- -> resetAndScheduleExecutionGraph -- 执行job
- --> createAndRestoreExecutionGraph -- 生成ExecutionGraph
- --> scheduleExecutionGraph
- --> executionGraph.scheduleForExecution()
- --> scheduleEager {
- * 给Execution 分配 slots
- --> allocateResourcesForAll()
- * 遍历 execution,调用其 deploy 方法
- --> execution.deploy()
- --> taskManagerGateway.submitTask
- --> [TaskExecutor] new Task()
- --> [TaskExecutor] task.startTaskThread() -- 至此,任务真正执行
- }

后续会继续分析
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。