当前位置:   article > 正文

flink源码阅读第一篇—入口

flink源码阅读

前序

  • 由于最近接触了flink相关项目,封装flink-table模块,这部分模块应该在flink官方1.9x版本进行发布,截止目前还是beta版本,等待最终的release版本发布。在开发期间,出于工作和兴趣的需求,就阅读了部分源码,阅读源码期间也是阅读了很多博客文章,发下文章写错的也比比皆是呀,哎有时也会误导人。先叙述第一篇总体轮廓篇。该篇总体思路是从flink任务提交开始,从本地提交代码逻辑,到服务端如果接收任务,最后运行的不同分支逻辑。了解这部分逻辑,需要一些基础知识,包括yarn, netty最基本的了解。

flink任务提交方式

  • flink提交方式和spark类似,比spark还略微复杂些。大体分这么几类 1、单机本地体检,2、多机集群提交,3、yarn-session提交,4、yarn-cluster per-job提交、5、还包括mesos和docker提交(这俩个略叙)。 生产环境中用第四种比就多,每个任务作为一个yarn application提交到集群,申请的资源和其他任务是隔离的,其他方式相对这个都略显简单。下面主要介绍第四种Per-Job-Cluster。
  • 在看代码之前先对flink组件有个大概的初步认知:
    • 1、Dispatcher(Application Master)提供REST接口来接收client的application提交,它负责启动JM和提交application,同时运行Web UI。
    • 2、ResourceManager:一般是Yarn,当TM有空闲的slot就会告诉JM,没有足够的slot也会启动新的TM。kill掉长时间空闲的TM。
    • 3、JobManager :接受application,包含StreamGraph(DAG)、JobGraph(logical dataflow graph,已经进过优化,如task chain)和JAR,将JobGraph转化为ExecutionGraph(physical dataflow graph,并行化),包含可以并发执行的tasks。其他工作类似Spark driver,如向RM申请资源、schedule tasks、保存作业的元数据,如checkpoints。如今JM可分为JobMaster和ResourceManager(和下面的不同),分别负责任务和资源,在Session模式下启动多个job就会有多个JobMaster。
    • 4、TaskManager:类似Spark的executor,会跑多个线程的task、数据缓存与交换。

代码分析:

  • Per-Job-Cluster模式也分为本地和远端。

  • 本地模式:

本地流程

  • 与Session-Cluster模式类似,入口也为CliFrontend#main
  • 解析处理参数
  • 根据用户jar、main、程序参数、savepoint信息生成PackagedProgram
  • 根据PackagedProgram创建JobGraph(对于非分离模式还是和Session模式一样,模式Session-Cluster)
  • 获取集群资源信息
  • 部署集群YarnClusterDesriptor#deployJobCluster -> AbstractYarnClusterDescriptor#deployInternal;
    • 进行资源校验(如内存大小、vcore大小、队列)
    • 通过YarnClient创建Application
    • 再次校验资源
    • AbstractYarnClusterDescriptor#startAppMaster启动AppMaster
      • 初始化文件系统(HDFS)
      • 将log4j、logback、flink-conf.yaml、jar包上传至HDFS
      • 构造AppMaster的Container(确定Container进程的入口类YarnSessionClusterEntrypoint),构造相应的Env
      • YarnClient向Yarn提交Container申请
      • 跟踪ApplicationReport状态(确定是否启动成功,可能会由于资源不够,一直等待)
    • 启动成功后将对应的ip和port写入flinkConfiguration中
    • 创建与将集群交互的ClusterClient
      • 根据flink-conf的HA配置创建对应的服务(如StandaloneHaServices、ZooKeeperHaServices等)
      • 创建基于Netty的RestClient;
      • 创建/rest_server_lock、/dispatcher_lock节点(以ZK为例)
      • 启动监听节点的变化(主备切换)
  • 通过ClusterClient获取到appId信息并写入本地临时文件AbstractYarnClusterDescriptor#startAppMaster中与Session-Cluster有一个显著不同的就是其会将任务的JobGraph上传至Hdfs供后续服务端使用

经过上述步骤,客户端提交任务过程就完成了,主要涉及到文件(JobGraph和jar包)的上传。

远端流程

  • 远端宿主在Container中的集群入口为YarnJobClusterEntrypoint#main
  • ClusterEntrypoint#runClusterEntrypoint -> ClusterEntrypoint#startCluster启动集群
  • 创建JobDispatcherResourceManagerComponentFactory(用于创建JobDispatcherResourceManagerComponent)
  • 创建ResourceManager(YarnResourceManager)、Dispatcher(MiniDispatcher),其中在创建MiniDispatcher时会从之前的JobGraph文件中读取出JobGraph,并启动进行ZK选举
  • 当为主时会调用Dispatcher#grantLeadership方法
    • Dispatcher#recoverJobs恢复任务,获取JobGraph
    • Dispatcher#tryAcceptLeadershipAndRunJobs确认获取主并开始运行任务
      • Dispatcher#runJob开始运行任务(创建JobManagerRunner并启动进行ZK选举),
        • 创建JobManagerRunner(处理leader选举)
        • 创建JobMaster(实际执行任务入口,包含在JobManagerRunner)
        • 启动JobManagerRunner(会进行leader选举,ZK目录为leader/${jobId}/job_manager_lock)
        • 当为主时会调用JobManagerRunner#grantLeadership方法
          • 启动JobMaster
          • 将任务运行状态信息写入ZK(/${AppID}/running_job_registry/${jobId})
          • 启动JobMaster的Endpoint
          • 开始调度任务JobMaster#startJobExecution

接下来就进行任务具体调度(构造ExecutionGraph、申请Slot等)流程。

Flink On Yarn With dispatcher

主要结合一下yarn的执行流程,介绍一下任务调转执行流程。

  1. Dispatcher组件负责接收作业提交,持久化它们,生成JobManagers以执行作业并在Master故障时恢复它们。此外,它知道Flink会话群集的状态。
  • 某些集群管理器需要一个集中的作业生成和监视实例
  • 它包含独立JobManager的角色,等待提交作业
  1. 当开始一个新的Flink yarn 会话时,客户端首先检查所请求的资源(containers和内存)是否可用。如果资源够用,之后,上传一个jar包,包含Flink和HDFS的配置。
  2. 客户端向yarn resource manager发送请求,申请一个yarn container去启动ApplicationMaster。
  3. yarn resource manager会在nodemanager上分配一个container,去启动ApplicationMaster
  4. yarn nodemanager会将配置文件和jar包下载到对应的container中,进行container容器的初始化。
  5. 初始化完成后,ApplicationMaster构建完成。ApplicationMaster会为TaskManagers生成新的Flink配置文件(使得TaskManagers根据配置文件去连接到JobManager),配置文件会上传到HDFS。
  6. ApplicationMaster开始为该Flink应用的TaskManagers分配containers,这个过程会从HDFS上下载jar和配置文件(此处的配置文件是AM修改过的,包含了JobManager的一些信息,比如说JobManager的地址)
  7. 一旦上面的步骤完成,Flink已经建立并准备好接受jobs。

代码详细分析

./bin/flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 ./examples/batch/WordCount.jar

以这个执行脚本入口进行分析,入口函数org.apache.flink.client.cli.CliFrontend,此类中的Main方法是所有提交操作的开始

大体框架

  • CliFrontend[Main] :Client提交任务的入口,AM创建,提交程序
  • ClusterEntrypoint[Main] : 与Yarn集群交互,启动集群的基本服务,如Dispatcher,ResourceManager和WebMonitorEndpoint等
  • YarnTaskExecutorRunner[Main] :TaskExecutor(即TaskManager)上的Task执行Main入口
  • JobSubmitHandler与Dispatcher :处理Client端任务提交,启动JobMaster,构建ExecutionGraph,并deploy所有Task任务
  • ResourceManager :资源管理器,卖游戏账号平台指明TaskExecutor入口类,启动TaskExecutor的Container

本地提交逻辑


CliFrontend[Main]调用栈

  1. CliFrontend[Main]
  2. -> cli.parseParameters(args)
  3. -> buildProgram(runOptions)
  4. -> runProgram(customCommandLine, commandLine, runOptions, program)
  5. (根据yarn提交模式,走不同分支,以Job小Session集群方式为例)
  6. -> customCommandLine.createClusterDescriptor
  7. -> clusterDescriptor.deploySessionCluster(clusterSpecification)
  8. deployInternal -- block,直到ApplicationMaster/JobManager在YARN上部署完毕
  9. startAppMaster
  10. setupApplicationMasterContainer
  11. startCommandValues.put("class", yarnClusterEntrypoint) -- 此处是 YarnJobClusterEntrypoint[Main]
  12. -> executeProgram(program, client, userParallelism);
  13. (执行程序就是优化得到JobGraph,远程提交的过程)

runProgram(customCommandLine, commandLine, runOptions, program)

  1. private <T> void runProgram(
  2. CustomCommandLine<T> customCommandLine,
  3. CommandLine commandLine,
  4. RunOptions runOptions,
  5. PackagedProgram program) throws ProgramInvocationException, FlinkException {
  6. // 获取yarnClusterDescriptor,用户创建集群
  7. final ClusterDescriptor<T> clusterDescriptor = customCommandLine.createClusterDescriptor(commandLine);
  8. try {
  9. // 此处clusterId如果不为null,则表示是session模式
  10. final T clusterId = customCommandLine.getClusterId(commandLine);
  11. final ClusterClient<T> client;
  12. /*
  13. * Yarn模式:
  14. * 1. Job模式:每个flink job 单独在yarn上声明一个flink集群
  15. * 2. Session模式:在集群中维护flink master,即一个yarn application master,运行多个job。
  16. */
  17. if (clusterId == null && runOptions.getDetachedMode()) {
  18. // job + DetachedMode模式
  19. int parallelism = runOptions.getParallelism() == -1 ? defaultParallelism : runOptions.getParallelism();
  20. // 从jar包中获取jobGraph
  21. final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism);
  22. // clusterDescriptor.deployJobCluster
  23. // -> YarnClusterDescriptor.deployInternal
  24. // -> AbstractYarnClusterDescriptor.startAppMaster
  25. // -> AbstractYarnClusterDescriptor.yarnClient.submitApplication(appContext);
  26. // 新建一个RestClusterClient,在yarn集群中启动应用(ClusterEntrypoint)
  27. final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(commandLine);
  28. client = clusterDescriptor.deployJobCluster(
  29. clusterSpecification,
  30. jobGraph,
  31. runOptions.getDetachedMode());
  32. ......
  33. } else {
  34. final Thread shutdownHook;
  35. if (clusterId != null) {
  36. // session模式
  37. client = clusterDescriptor.retrieve(clusterId);
  38. shutdownHook = null;
  39. } else {
  40. // job + non-DetachedMode模式
  41. final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(commandLine);
  42. // 新建一个小session集群,会启动ClusterEntrypoint,提供Dispatcher,ResourceManager和WebMonitorEndpoint等服务
  43. client = clusterDescriptor.deploySessionCluster(clusterSpecification);
  44. // 进行资源清理的钩子
  45. if (!runOptions.getDetachedMode() && runOptions.isShutdownOnAttachedExit()) {
  46. shutdownHook = ShutdownHookUtil.addShutdownHook(client::shutDownCluster, client.getClass().getSimpleName(), LOG);
  47. } else {
  48. shutdownHook = null;
  49. }
  50. }
  51. try {
  52. ......
  53. // 优化图,执行程序的远程提交
  54. executeProgram(program, client, userParallelism);
  55. } finally {
  56. ......
  57. }
  58. }
  59. } finally {
  60. ......
  61. }
  62. }

clusterDescriptor.deploySessionCluster

  1. clusterDescriptor.deploySessionCluster(clusterSpecification)
  2. deployInternal -- block,直到ApplicationMaster/JobManager在YARN上部署完毕
  3. startAppMaster
  4. setupApplicationMasterContainer
  5. startCommandValues.put("class", yarnClusterEntrypoint) -- 此处是 YarnJobClusterEntrypoint[Main]

deployInternal方法,部署集群:

  1. protected ClusterClient<ApplicationId> deployInternal(
  2. ClusterSpecification clusterSpecification,
  3. String applicationName,
  4. String yarnClusterEntrypoint,
  5. @Nullable JobGraph jobGraph,
  6. boolean detached) throws Exception {
  7. // ------------------ Check if configuration is valid --------------------
  8. ......
  9. // ------------------ Check if the specified queue exists --------------------
  10. checkYarnQueues(yarnClient);
  11. // ------------------ Add dynamic properties to local flinkConfiguraton ------
  12. ......
  13. // ------------------ Check if the YARN ClusterClient has the requested resources --------------
  14. // Create application via yarnClient
  15. final YarnClientApplication yarnApplication = yarnClient.createApplication();
  16. ......
  17. // ------------------启动ApplicationMaster ----------------
  18. ApplicationReport report = startAppMaster(
  19. flinkConfiguration,
  20. applicationName,
  21. yarnClusterEntrypoint,
  22. jobGraph,
  23. yarnClient,
  24. yarnApplication,
  25. validClusterSpecification);
  26. ......
  27. // the Flink cluster is deployed in YARN. Represent cluster
  28. return createYarnClusterClient(
  29. this,
  30. validClusterSpecification.getNumberTaskManagers(),
  31. validClusterSpecification.getSlotsPerTaskManager(),
  32. report,
  33. flinkConfiguration,
  34. true);
  35. }

startAppMaster方法,启动ApplicationMaster:

  1. public ApplicationReport startAppMaster(
  2. Configuration configuration,
  3. String applicationName,
  4. String yarnClusterEntrypoint,
  5. JobGraph jobGraph,
  6. YarnClient yarnClient,
  7. YarnClientApplication yarnApplication,
  8. ClusterSpecification clusterSpecification) throws Exception {
  9. // ------------------ Initialize the file systems -------------------------
  10. ......
  11. // ------------- Set-up ApplicationSubmissionContext for the application -------------
  12. ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
  13. final ApplicationId appId = appContext.getApplicationId();
  14. // ------------------ Add Zookeeper namespace to local flinkConfiguraton ------
  15. ......
  16. // ------------------ 准备Yarn所需的资源和文件 ------
  17. // Setup jar for ApplicationMaster
  18. ......
  19. // 准备TaskManager的相关配置信息
  20. configuration.setInteger(
  21. TaskManagerOptions.NUM_TASK_SLOTS,
  22. clusterSpecification.getSlotsPerTaskManager());
  23. configuration.setString(
  24. TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY,
  25. clusterSpecification.getTaskManagerMemoryMB() + "m");
  26. // Upload the flink configuration, write out configuration file
  27. ......
  28. // ------------------ 启动ApplicationMasterContainer ------
  29. final ContainerLaunchContext amContainer = setupApplicationMasterContainer(
  30. yarnClusterEntrypoint,
  31. hasLogback,
  32. hasLog4j,
  33. hasKrb5,
  34. clusterSpecification.getMasterMemoryMB());
  35. // --------- set user specified app master environment variables ---------
  36. ......
  37. // 提交App
  38. yarnClient.submitApplication(appContext);
  39. // --------- Waiting for the cluster to be allocated ---------
  40. ......
  41. }

远端逻辑ClusterEntrypoint[Main]

与yarn集群打交道(这里主要是resourcemananger和taskmamanager),ClusterEntrypoint 包含了 webMonitor、resourceManager、dispatcher 的服务。

  • 封装了Cluster启停的逻辑
  • 根据配置文件来创建RpcService
  • HaService
  • HeartbeatService
  • MetricRegistry
  • 提供了几个抽象方法给子类(createDispatcher,createResourceManager,createRestEndpoint,
  1. YarnJobClusterEntrypoint[Main]
  2. -> ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint);
  3. -> clusterEntrypoint.startCluster();
  4. -> runCluster(configuration);
  5. -> clusterComponent = dispatcherResourceManagerComponentFactory.create();
  6. * 在同一进程中启动Dispatcher,ResourceManager和WebMonitorEndpoint组件服务
  7. create -> {
  8. webMonitorEndpoint.start();
  9. resourceManager.start();
  10. dispatcher.start();
  11. }
  12. * 重点关注ResourceManager,会创建TaskManager
  13. -> resourceManager = resourceManagerFactory.createResourceManager()
  14. -> YarnResourceManager.initialize()
  15. * 创建 resourceManagerClient 和 nodeManagerClient
  16. * YarnResourceManager 继承自 yarn 的 AMRMClientAsync.CallbackHandler接口,在Container分配完之后,回调如下接口:
  17. -> void onContainersAllocated(List<Container> containers)
  18. -> createTaskExecutorLaunchContext()
  19. -> Utils.createTaskExecutorContext() -- 参数 YarnTaskExecutorRunner.class, 指明TaskManager的Main入口
  20. -> nodeManagerClient.startContainer(container, taskExecutorLaunchContext);

dispatcherResourceManagerComponentFactory.create

  1. public DispatcherResourceManagerComponent<T> create(
  2. Configuration configuration,
  3. RpcService rpcService,
  4. HighAvailabilityServices highAvailabilityServices,
  5. BlobServer blobServer,
  6. HeartbeatServices heartbeatServices,
  7. MetricRegistry metricRegistry,
  8. ArchivedExecutionGraphStore archivedExecutionGraphStore,
  9. MetricQueryServiceRetriever metricQueryServiceRetriever,
  10. FatalErrorHandler fatalErrorHandler) throws Exception {
  11. // 创建服务后会启动部分服务
  12. webMonitorEndpoint.start();
  13. resourceManager.start(); -- 里面指明TaskExecutor(即TaskManager)的Main入口
  14. dispatcher.start(); -- Dispatcher服务会处理client 的 submitjob,促使TaskExecutor上的任务执行
  15. // 返回所有服务的封装类
  16. return createDispatcherResourceManagerComponent(
  17. dispatcher,
  18. resourceManager,
  19. dispatcherLeaderRetrievalService,
  20. resourceManagerRetrievalService,
  21. webMonitorEndpoint,
  22. jobManagerMetricGroup);
  23. } catch (Exception exception) {
  24. ......
  25. }
  26. }

ClusterEntrypoint会启动Dispatcher服务:

  1. Dispatcher
  2. --> onStart()
  3. --> startDispatcherServices()
  4. -> submittedJobGraphStore.start(this)
  5. -> leaderElectionService.start(this)

LeaderRetrievalHandler会从netty处理从Client发来的submitjob消息:

  1. LeaderRetrievalHandler
  2. -> channelRead0() -- 一个netty对象
  3. -> AbstractHandler.respondAsLeader()
  4. -> AbstractRestHandler.respondToRequest()
  5. -> JobSubmitHandler.handleRequest
  6. -> Dispatcher.submitJob
  7. -> Dispatcher.internalSubmitJob
  8. -> Dispatcher.persistAndRunJob
  9. -> Dispatcher.runJob
  10. -> Dispatcher.createJobManagerRunner -- 创建JobManagerRunner
  11. -> jobManagerRunnerFactory.createJobManagerRunner
  12. * 创建DefaultJobMasterServiceFactory
  13. * new JobManagerRunner()
  14. -> dispatcher.startJobManagerRunner -- 启动JobManagerRunner
  15. -> jobManagerRunner.start();
  16. -> ZooKeeperLeaderElectionService.start
  17. -> ZooKeeperLeaderElectionService.isLeader
  18. -> leaderContender.grantLeadership(issuedLeaderSessionID)
  19. -> jobManagerRunner.verifyJobSchedulingStatusAndStartJobManager
  20. -> startJobMaster(leaderSessionId) -- 启动JobMaster
  21. -> jobMasterService.start
  22. -> startJobExecution(newJobMasterId)
  23. -> startJobMasterServices -- 包括slotPool和scheduler的启动,告知flinkresourceManager leader的地址,当FlinkRM和JM建立好连接后,slot就可以开始requesting slots
  24. -> resetAndScheduleExecutionGraph -- 执行job
  25. --> createAndRestoreExecutionGraph -- 生成ExecutionGraph
  26. --> scheduleExecutionGraph
  27. --> executionGraph.scheduleForExecution()
  28. --> scheduleEager {
  29. * 给Execution 分配 slots
  30. --> allocateResourcesForAll()
  31. * 遍历 execution,调用其 deploy 方法
  32. --> execution.deploy()
  33. --> taskManagerGateway.submitTask
  34. --> [TaskExecutor] new Task()
  35. --> [TaskExecutor] task.startTaskThread() -- 至此,任务真正执行
  36. }

总结

  1. 运行 flink 脚本,从CliFrontend类开始提交流程;
  2. 创建 yarnClusterDescriptor,准备集群创建所需的信息;
  3. 部署Session集群,启动ApplicationMaster/JobManager,通过ClusterEntrypoint[Main]启动Flink所需的服务,如Dispatcher,ResourceManager和WebMonitorEndpoint等;
  4. ResourceManager会创建resourceManagerClient 和 nodeManagerClient,在Container分配完成,启动TaskExecutor的Container(同步指定TaskExecutor的Main入口);
  5. 3、4集群部署完毕,Client会进行任务提交,DIspatcher服务会接收到命令;
  6. Dispatcher通过JobManagerRunner启动JobMaster服务,构建ExecutionGraph,分配slot,通知TaskExecutor执行Task;
  7. 至此,任务真正执行。

后续安排.....

后续会继续分析

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/265985
推荐阅读
相关标签
  

闽ICP备14008679号