当前位置:   article > 正文

Flink CheckPoint的触发过程_flink 触发checkpoint

flink 触发checkpoint

CheckpointCoordinator的转换及调度

1、转换过程

在Flink JobMaster中有用于协调和触发checkpoint机制的协调管理器CheckpointCoordinator,其是Flink分布式快照的核心管理控制组件,其主要维护的功能如下:

  • 发起checkpoint触发的消息,并接收不同task对checkpoint的响应信息(Ack)
  • 维护Ack中附带的状态句柄(state-handle)的全局视图

其针对checkpoint的配置及最终触发调度主要集中在两个转换过程中:

  1. StreamGraph-->JobGraph的转换流程:在客户端中的streamGraph向jobGraph转换过程中,其主要依赖于StreamingJobGraphGenerator#createJobGraph()方法,在生成JobGraph之后会调用StreamingJobGraphGenerator#configureCheckpointing()方法进行Checkpoint相关的配置。其主要的工作就是读取streamGraph中的checkpoint配置,并将其节点划分为trigger节点、ack节点以及commit节点。在该类内部标识为对应的三个列表:
    1. List<JobVertexID> triggerVertices
    2. List<JobVertexID> ackVertices
    3. List<JobVertexID> commitVertices
      1. class StreamingJobGraphGenerator {
      2. private void configureCheckpointing() {
      3. CheckpointConfig cfg = streamGraph.getCheckpointConfig();
      4. long interval = cfg.getCheckpointInterval();
      5. if (interval > 0) {
      6. ExecutionConfig executionConfig = streamGraph.getExecutionConfig();
      7. // propagate the expected behaviour for checkpoint errors to task.
      8. executionConfig.setFailTaskOnCheckpointError(cfg.isFailOnCheckpointingErrors());
      9. } else {
      10. // interval of max value means disable periodic checkpoint
      11. interval = Long.MAX_VALUE;
      12. }
      13. // --- configure the participating vertices ---
      14. // collect the vertices that receive "trigger checkpoint" messages.
      15. // currently, these are all the sources
      16. List<JobVertexID> triggerVertices = new ArrayList<>();
      17. // collect the vertices that need to acknowledge the checkpoint
      18. // currently, these are all vertices
      19. List<JobVertexID> ackVertices = new ArrayList<>(jobVertices.size());
      20. // collect the vertices that receive "commit checkpoint" messages
      21. // currently, these are all vertices
      22. List<JobVertexID> commitVertices = new ArrayList<>(jobVertices.size());
      23. for (JobVertex vertex : jobVertices.values()) {
      24. if (vertex.isInputVertex()) {
      25. triggerVertices.add(vertex.getID());
      26. }
      27. commitVertices.add(vertex.getID());
      28. ackVertices.add(vertex.getID());
      29. }
      30. // --- configure options ---
      31. CheckpointRetentionPolicy retentionAfterTermination;
      32. if (cfg.isExternalizedCheckpointsEnabled()) {
      33. CheckpointConfig.ExternalizedCheckpointCleanup cleanup = cfg.getExternalizedCheckpointCleanup();
      34. // Sanity check
      35. if (cleanup == null) {
      36. throw new IllegalStateException("Externalized checkpoints enabled, but no cleanup mode configured.");
      37. }
      38. retentionAfterTermination = cleanup.deleteOnCancellation() ?
      39. CheckpointRetentionPolicy.RETAIN_ON_FAILURE :
      40. CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION;
      41. } else {
      42. retentionAfterTermination = CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
      43. }
      44. CheckpointingMode mode = cfg.getCheckpointingMode();
      45. boolean isExactlyOnce;
      46. if (mode == CheckpointingMode.EXACTLY_ONCE) {
      47. isExactlyOnce = true;
      48. } else if (mode == CheckpointingMode.AT_LEAST_ONCE) {
      49. isExactlyOnce = false;
      50. } else {
      51. throw new IllegalStateException("Unexpected checkpointing mode. " +
      52. "Did not expect there to be another checkpointing mode besides " +
      53. "exactly-once or at-least-once.");
      54. }
      55. // --- configure the master-side checkpoint hooks ---
      56. final ArrayList<MasterTriggerRestoreHook.Factory> hooks = new ArrayList<>();
      57. for (StreamNode node : streamGraph.getStreamNodes()) {
      58. StreamOperator<?> op = node.getOperator();
      59. if (op instanceof AbstractUdfStreamOperator) {
      60. Function f = ((AbstractUdfStreamOperator<?, ?>) op).getUserFunction();
      61. if (f instanceof WithMasterCheckpointHook) {
      62. hooks.add(new FunctionMasterCheckpointHookFactory((WithMasterCheckpointHook<?>) f));
      63. }
      64. }
      65. }
      66. // because the hooks can have user-defined code, they need to be stored as
      67. // eagerly serialized values
      68. final SerializedValue<MasterTriggerRestoreHook.Factory[]> serializedHooks;
      69. if (hooks.isEmpty()) {
      70. serializedHooks = null;
      71. } else {
      72. try {
      73. MasterTriggerRestoreHook.Factory[] asArray = hooks.toArray(new MasterTriggerRestoreHook.Factory[hooks.size()]);
      74. serializedHooks = new SerializedValue<>(asArray);
      75. } catch (IOException e) {
      76. throw new FlinkRuntimeException("Trigger/restore hook is not serializable", e);
      77. }
      78. }
      79. // because the state backend can have user-defined code, it needs to be stored as
      80. // eagerly serialized value
      81. final SerializedValue<StateBackend> serializedStateBackend;
      82. if (streamGraph.getStateBackend() == null) {
      83. serializedStateBackend = null;
      84. } else {
      85. try {
      86. serializedStateBackend = new SerializedValue<StateBackend>(streamGraph.getStateBackend());
      87. } catch (IOException e) {
      88. throw new FlinkRuntimeException("State backend is not serializable", e);
      89. }
      90. }
      91. // --- done, put it all together ---
      92. JobCheckpointingSettings settings = new JobCheckpointingSettings(
      93. triggerVertices,
      94. ackVertices,
      95. commitVertices,
      96. new CheckpointCoordinatorConfiguration(
      97. interval,
      98. cfg.getCheckpointTimeout(),
      99. cfg.getMinPauseBetweenCheckpoints(),
      100. cfg.getMaxConcurrentCheckpoints(),
      101. retentionAfterTermination,
      102. isExactlyOnce),
      103. serializedStateBackend,
      104. serializedHooks);
      105. jobGraph.setSnapshotSettings(settings);
      106. }
      107. }
  2. JobGraph-->ExecutionGraph的转换流程:在客户端将JobGraph提交到Dispatcher之后,其会生成对应的JobMaster来处理该JobGraph到ExecutionGraph的可执行图的转换,转换过程中主要依赖于ExecutionGraphBuilder#buildGraph()方法,在构建过程中,如果作业开启了checkpoint,则会调用ExecutionGraph.enableCheckpointing()方法,这里会创建CheckpointCoordinator对象,并注册一个作业状态的监听CheckpointCoordinatorDeActivator,CheckpointCoordinatorDeActivator会在作业状态发生改变时得到通知。
     
    1. class ExecutionGraph {
    2. public void enableCheckpointing(
    3. long interval,
    4. long checkpointTimeout,
    5. long minPauseBetweenCheckpoints,
    6. int maxConcurrentCheckpoints,
    7. CheckpointRetentionPolicy retentionPolicy,
    8. List<ExecutionJobVertex> verticesToTrigger,
    9. List<ExecutionJobVertex> verticesToWaitFor,
    10. List<ExecutionJobVertex> verticesToCommitTo,
    11. List<MasterTriggerRestoreHook<?>> masterHooks,
    12. CheckpointIDCounter checkpointIDCounter,
    13. CompletedCheckpointStore checkpointStore,
    14. StateBackend checkpointStateBackend,
    15. CheckpointStatsTracker statsTracker) {
    16. // simple sanity checks
    17. checkArgument(interval >= 10, "checkpoint interval must not be below 10ms");
    18. checkArgument(checkpointTimeout >= 10, "checkpoint timeout must not be below 10ms");
    19. checkState(state == JobStatus.CREATED, "Job must be in CREATED state");
    20. checkState(checkpointCoordinator == null, "checkpointing already enabled");
    21. ExecutionVertex[] tasksToTrigger = collectExecutionVertices(verticesToTrigger);
    22. ExecutionVertex[] tasksToWaitFor = collectExecutionVertices(verticesToWaitFor);
    23. ExecutionVertex[] tasksToCommitTo = collectExecutionVertices(verticesToCommitTo);
    24. checkpointStatsTracker = checkNotNull(statsTracker, "CheckpointStatsTracker");
    25. // create the coordinator that triggers and commits checkpoints and holds the state
    26. checkpointCoordinator = new CheckpointCoordinator(
    27. jobInformation.getJobId(),
    28. interval,
    29. checkpointTimeout,
    30. minPauseBetweenCheckpoints,
    31. maxConcurrentCheckpoints,
    32. retentionPolicy,
    33. tasksToTrigger,
    34. tasksToWaitFor,
    35. tasksToCommitTo,
    36. checkpointIDCounter,
    37. checkpointStore,
    38. checkpointStateBackend,
    39. ioExecutor,
    40. SharedStateRegistry.DEFAULT_FACTORY);
    41. // register the master hooks on the checkpoint coordinator
    42. for (MasterTriggerRestoreHook<?> hook : masterHooks) {
    43. if (!checkpointCoordinator.addMasterHook(hook)) {
    44. LOG.warn("Trying to register multiple checkpoint hooks with the name: {}", hook.getIdentifier());
    45. }
    46. }
    47. checkpointCoordinator.setCheckpointStatsTracker(checkpointStatsTracker);
    48. // interval of max long value indicates disable periodic checkpoint,
    49. // the CheckpointActivatorDeactivator should be created only if the interval is not max value
    50. if (interval != Long.MAX_VALUE) {
    51. // the periodic checkpoint scheduler is activated and deactivated as a result of
    52. // job status changes (running -> on, all other states -> off)
    53. // 注册任务状态更改 监听器通知
    54. registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator());
    55. }
    56. }
    57. private void notifyJobStatusChange(JobStatus newState, Throwable error) {
    58. if (jobStatusListeners.size() > 0) {
    59. final long timestamp = System.currentTimeMillis();
    60. final Throwable serializedError = error == null ? null : new SerializedThrowable(error);
    61. for (JobStatusListener listener : jobStatusListeners) {
    62. try {
    63. listener.jobStatusChanges(getJobID(), newState, timestamp, serializedError);
    64. } catch (Throwable t) {
    65. LOG.warn("Error while notifying JobStatusListener", t);
    66. }
    67. }
    68. }
    69. }
    70. }

    当JobStatus状态变为RUNNING时,listener.jobStatusChanges()会通知其上注册的监听器;CheckpointCoordinatorDeActivator会得到通知,并且通过CheckpointCoordinator.startCheckpointScheduler启动checkpoint的定时器。

    1. class CheckpointCoordinatorDeActivator implements JobStatusListener {
    2. @Override
    3. public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
    4. if (newJobStatus == JobStatus.RUNNING) {
    5. // start the checkpoint scheduler
    6. coordinator.startCheckpointScheduler();
    7. } else {
    8. // anything else should stop the trigger for now
    9. coordinator.stopCheckpointScheduler();
    10. }
    11. }
    12. }

2、调度触发

        在CheckpointCoordinator.startCheckpointScheduler启动checkpoint定时器之后,也就意味着该checkpoint已启动并会周期性的调度执行。在flink中,其对该checkpoint调度的定时任务被封装为ScheduledTrigger,运行时会调用CheckpointCoordinator.triggerCheckpoint()触发一次checkpoint。其triggerCheckpoint()方法主要执行功能如下:

  • 检查是否可以触发checkpoint,包括是否需要强制进行checkpoint,当前正在排队的并发checkpoint的数目是否超过阈值,距离上一次成功checkpoint的间隔时间是否过小等,如果这些条件不满足,则当前检查点的触发请求不会执行
  • 检查是否所有需要触发checkpoint的Execution都是RUNNING状态
  • 生成此次checkpoint的checkpointID(id是严格自增的),并初始化CheckpointStorageLocation,CheckpointStorageLocation是此次checkpoint存储位置的抽象,通过CheckpointStorage.initializeLocationForCheckpoint()创建(CheckpointStorage目前有两个具体实现,分别为FsCheckpointStorage和MemoryBackendCheckpointStorage),CheckpointStorage则是从StateBackend中创建
  • 生成PendingCheckpoint,这表示一个处于中间状态的checkpoint,并保存在checkpointId->PendingCheckpoint这样的映射关系中
  • 注册一个调度任务,在checkpoint超时后取消此次checkpoint,并重新触发一次新的checkpoint
  • 调用Execution.triggerCheckpoint()方法向所有需要trigger的task发起checkpoint请求

savepoint和checkpoint的处理逻辑基本一致,只是savepoint是强制触发的,需要调用Execution.triggerSynchronousSavepoint()进行触发。

 

在CheckpointCoordinator内部也有三个列表:

  • ExecutionVertex[] tasksToTrigger;
  • ExecutionVertex[] tasksToWaitFor;
  • ExecutionVertex[] tasksToCommitTo;

        这就对应了前面JobGraph中的三个列表,在触发checkpoint的时候,只有作为source的Execution会调用Execution.triggerCheckpoint()方法。会通过RPC调用通知对应的RpcTaskManagerGateway调用triggerCheckpoint。

  1. public class CheckpointCoordinator {
  2. public CheckpointTriggerResult triggerCheckpoint(
  3. long timestamp,
  4. CheckpointProperties props,
  5. @Nullable String externalSavepointLocation,
  6. boolean isPeriodic) {
  7. // make some eager pre-checks
  8. synchronized (lock) {
  9. // 检查是否可以触发checkpoint(筛选条件)
  10. // abort if the coordinator has been shutdown in the meantime
  11. // Don't allow periodic checkpoint if scheduling has been disabled
  12. // validate whether the checkpoint can be triggered, with respect to the limit of
  13. // concurrent checkpoints, and the minimum time between checkpoints.
  14. // these checks are not relevant for savepoints
  15. // check if all tasks that we need to trigger are running.
  16. // if not, abort the checkpoint
  17. // 检查是否所有需要触发checkpoint的Execution都是RUNNING状态
  18. Execution[] executions = new Execution[tasksToTrigger.length];
  19. for (int i = 0; i < tasksToTrigger.length; i++) {
  20. Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
  21. if (ee == null) {
  22. // ......
  23. } else if (ee.getState() == ExecutionState.RUNNING) {
  24. executions[i] = ee;
  25. } else {
  26. // ......
  27. }
  28. }
  29. // ......
  30. // we will actually trigger this checkpoint!
  31. // we lock with a special lock to make sure that trigger requests do not overtake each other.
  32. // this is not done with the coordinator-wide lock, because the 'checkpointIdCounter'
  33. // may issue blocking operations. Using a different lock than the coordinator-wide lock,
  34. // we avoid blocking the processing of 'acknowledge/decline' messages during that time.
  35. synchronized (triggerLock) {
  36. final CheckpointStorageLocation checkpointStorageLocation;
  37. final long checkpointID;
  38. try {
  39. // this must happen outside the coordinator-wide lock, because it communicates
  40. // with external services (in HA mode) and may block for a while.
  41. checkpointID = checkpointIdCounter.getAndIncrement();
  42. // 生成此次checkpoint的checkpointID(id是严格自增的),并初始化CheckpointStorageLocation
  43. checkpointStorageLocation = props.isSavepoint() ?
  44. checkpointStorage.initializeLocationForSavepoint(checkpointID, externalSavepointLocation) :
  45. checkpointStorage.initializeLocationForCheckpoint(checkpointID);
  46. } catch (Throwable t) {
  47. // ......
  48. }
  49. final PendingCheckpoint checkpoint = new PendingCheckpoint(
  50. job,
  51. checkpointID,
  52. timestamp,
  53. ackTasks,
  54. props,
  55. checkpointStorageLocation,
  56. executor);
  57. if (statsTracker != null) {
  58. PendingCheckpointStats callback = statsTracker.reportPendingCheckpoint(
  59. checkpointID,
  60. timestamp,
  61. props);
  62. checkpoint.setStatsCallback(callback);
  63. }
  64. // schedule the timer that will clean up the expired checkpoints
  65. // 注册一个调度任务,在checkpoint超时后取消此次checkpoint,并重新触发一次新的checkpoint
  66. final Runnable canceller = () -> {
  67. synchronized (lock) {
  68. // only do the work if the checkpoint is not discarded anyways
  69. // note that checkpoint completion discards the pending checkpoint object
  70. if (!checkpoint.isDiscarded()) {
  71. LOG.info("Checkpoint {} of job {} expired before completing.", checkpointID, job);
  72. checkpoint.abortExpired();
  73. pendingCheckpoints.remove(checkpointID);
  74. rememberRecentCheckpointId(checkpointID);
  75. triggerQueuedRequests(); // 重新调度触发一次新的checkpoint
  76. }
  77. }
  78. };
  79. try {
  80. // re-acquire the coordinator-wide lock
  81. synchronized (lock) {
  82. // since we released the lock in the meantime, we need to re-check
  83. // that the conditions still hold.
  84. // ......
  85. LOG.info("Triggering checkpoint {} @ {} for job {}.", checkpointID, timestamp, job);
  86. pendingCheckpoints.put(checkpointID, checkpoint);
  87. ScheduledFuture<?> cancellerHandle = timer.schedule(
  88. canceller,
  89. checkpointTimeout, TimeUnit.MILLISECONDS);
  90. if (!checkpoint.setCancellerHandle(cancellerHandle)) {
  91. // checkpoint is already disposed!
  92. cancellerHandle.cancel(false);
  93. }
  94. // trigger the master hooks for the checkpoint
  95. final List<MasterState> masterStates = MasterHooks.triggerMasterHooks(masterHooks.values(),
  96. checkpointID, timestamp, executor, Time.milliseconds(checkpointTimeout));
  97. for (MasterState s : masterStates) {
  98. checkpoint.addMasterState(s);
  99. }
  100. }
  101. // end of lock scope
  102. final CheckpointOptions checkpointOptions = new CheckpointOptions(
  103. props.getCheckpointType(),
  104. checkpointStorageLocation.getLocationReference());
  105. // send the messages to the tasks that trigger their checkpoint
  106. // 调用Execution.triggerCheckpoint()方法向所有需要trigger的task发起checkpoint请求
  107. for (Execution execution: executions) {
  108. execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
  109. }
  110. numUnsuccessfulCheckpointsTriggers.set(0);
  111. return new CheckpointTriggerResult(checkpoint);
  112. } catch (Throwable t) {
  113. // guard the map against concurrent modifications
  114. // ......
  115. }
  116. } // end trigger lock
  117. }
  118. }
  119. public class Execution implements AccessExecution, Archiveable<ArchivedExecution>, LogicalSlot.Payload {
  120. /**
  121. * Trigger a new checkpoint on the task of this execution.
  122. *
  123. * @param checkpointId of th checkpoint to trigger
  124. * @param timestamp of the checkpoint to trigger
  125. * @param checkpointOptions of the checkpoint to trigger
  126. */
  127. public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
  128. final LogicalSlot slot = assignedResource;
  129. if (slot != null) {
  130. final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
  131. // 只有作为source的Execution会调用Execution.triggerCheckpoint()
  132. // 通过RPC调用通知对应的RpcTaskManagerGateway调用triggerCheckpoint
  133. taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);
  134. } else {
  135. LOG.debug("The execution has no slot assigned. This indicates that the execution is " +
  136. "no longer running.");
  137. }
  138. }
  139. }

Checkpoint的执行

1、barrier的流动

        CheckpointCoordinator发出触发checkpoint的消息,最终通过RPC调用TaskExecutorGateway.triggerCheckpoint,即请求执行TaskExecutor.triggerCheckpoin()。因为一个TaskExecutor中可能有多个Task正在运行,因而要根据触发checkpoint的ExecutionAttemptID找到对应的Task,然后调用Task.triggerCheckpointBarrier()方法。只有作为source的Task才会触发triggerCheckpointBarrier()方法的调用。

        在Task中,checkpoint的触发被封装为一个异步任务执行:

  1. class Task {
  2. /**
  3. * Calls the invokable to trigger a checkpoint.
  4. *
  5. * @param checkpointID The ID identifying the checkpoint.
  6. * @param checkpointTimestamp The timestamp associated with the checkpoint.
  7. * @param checkpointOptions Options for performing this checkpoint.
  8. */
  9. public void triggerCheckpointBarrier(
  10. final long checkpointID,
  11. long checkpointTimestamp,
  12. final CheckpointOptions checkpointOptions) {
  13. final AbstractInvokable invokable = this.invokable;
  14. final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);
  15. if (executionState == ExecutionState.RUNNING && invokable != null) {
  16. // build a local closure
  17. final String taskName = taskNameWithSubtask;
  18. final SafetyNetCloseableRegistry safetyNetCloseableRegistry =
  19. FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread();
  20. Runnable runnable = new Runnable() {
  21. @Override
  22. public void run() {
  23. // set safety net from the task's context for checkpointing thread
  24. LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
  25. FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);
  26. try {
  27. // 触发checkpoint真正的调用逻辑
  28. boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions);
  29. if (!success) {
  30. checkpointResponder.declineCheckpoint(
  31. getJobID(), getExecutionId(), checkpointID,
  32. new CheckpointDeclineTaskNotReadyException(taskName));
  33. }
  34. } catch (Throwable t) {
  35. // ......
  36. } finally {
  37. FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null);
  38. }
  39. }
  40. };
  41. // 异步执行
  42. executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));
  43. }
  44. else {
  45. LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);
  46. // send back a message that we did not do the checkpoint
  47. checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
  48. new CheckpointDeclineTaskNotReadyException(taskNameWithSubtask));
  49. }
  50. }
  51. }

Task执行checkpoint的真正逻辑被封装在AbstractInvokable.triggerCheckpoint(...)中,AbstractInvokable中有两个触发checkpoint的方法:

  • triggerCheckpoint
  • triggerCheckpointOnBarrier

       其中triggerCheckpoint是触发checkpoint的源头,会向下游注入CheckpointBarrier;而下游的其他任务在收到CheckpointBarrier后调用triggerCheckpointOnBarrier方法。这两个方法的具体实现有一些细微的差异,但主要的逻辑是一致的,在StreamTask.performCheckpoint()方法中:1)先向下游发送barrier,2)存储检查点快照。

一旦StreamTask.triggerCheckpoint()或StreamTask.triggerCheckpointOnBarrier()被调用,就会通过OperatorChain.broadcastCheckpointBarrier()向下游发送barrier:

  1. public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
  2. extends AbstractInvokable
  3. implements AsyncExceptionHandler {
  4. private boolean performCheckpoint(
  5. CheckpointMetaData checkpointMetaData,
  6. CheckpointOptions checkpointOptions,
  7. CheckpointMetrics checkpointMetrics) throws Exception {
  8. LOG.debug("Starting checkpoint ({}) {} on task {}",
  9. checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());
  10. synchronized (lock) {
  11. if (isRunning) {
  12. // we can do a checkpoint
  13. // All of the following steps happen as an atomic step from the perspective of barriers and
  14. // records/watermarks/timers/callbacks.
  15. // We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
  16. // checkpoint alignments
  17. // Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
  18. // The pre-barrier work should be nothing or minimal in the common case.
  19. operatorChain.prepareSnapshotPreBarrier(checkpointMetaData.getCheckpointId());
  20. // Step (2): Send the checkpoint barrier downstream // 向下游发送barrier
  21. operatorChain.broadcastCheckpointBarrier(
  22. checkpointMetaData.getCheckpointId(),
  23. checkpointMetaData.getTimestamp(),
  24. checkpointOptions);
  25. // Step (3): Take the state snapshot. This should be largely asynchronous, to not
  26. // impact progress of the streaming topology
  27. checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics); // 存储检查点快照
  28. return true;
  29. }
  30. else {
  31. // we cannot perform our checkpoint - let the downstream operators know that they
  32. // should not wait for any input from this operator
  33. // we cannot broadcast the cancellation markers on the 'operator chain', because it may not
  34. // yet be created
  35. final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
  36. Exception exception = null;
  37. for (StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> streamRecordWriter : streamRecordWriters) {
  38. try {
  39. streamRecordWriter.broadcastEvent(message);
  40. } catch (Exception e) {
  41. // ......
  42. }
  43. }
  44. // ......
  45. return false;
  46. }
  47. }
  48. }
  49. }
  1. public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements StreamStatusMaintainer {
  2. public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException {
  3. // 创建一个CheckpointBarrier
  4. CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions);
  5. for (RecordWriterOutput<?> streamOutput : streamOutputs) {
  6. // 向所有的下游发送
  7. streamOutput.broadcastEvent(barrier);
  8. }
  9. }
  10. }

在每一个Task中,其会通过InputGate消费上游Task产生的数据,在flink实际的任务执行运行中;其主要的checkpoint触发与流向如下:

  • 在执行图JobGraph转化到JobMaster中的ExecutionGraph中;其会通过配置开启enableCheckpointing();在这里其会初始化CheckpointCoordinator检查点协作组件(主要包括ckp的周期、输入源、操作算子等ExecutionVertex);并通过ExecutionGraph#registerJobStatusListener注册自己,当任务状态JobStatus变化到Running的过程中;其会接受回调并触发coordinator.startCheckpointScheduler();开启周期性ckp的触发;
  • CheckpointCoordinator.startCheckpointScheduler触发过程中主要通过找到只作为source源输入的Execution;并调用其Execution.triggerCheckpoint()方法。其会通过RPC调用通知对应的RpcTaskManagerGateway调用triggerCheckpoint。
  • 在对应的TaskExecutor接受到对应的调用后,其会通过taskSlotTable获取到对应的Task任务(SourceStreamTask);并触发其task.triggerCheckpointBarrier()进行检查点的触发;在SourceStreamTask触发检查点Barrier操作时,其会委托给抽象的父类StreamTask进行触发;
  • 在SourceStreamTask委托给抽象父类StreamTask进行ckp触发后;1、其会先向下游算子(OneInputStreamTask、TwoInputStreamTask等)广播当前的CheckpointBarrier;2、再对本算子进行当前状态的status存储;
  • 下游算子(以OneInputStreamTask为例)在其自己初始化的时候会获取其对应的InputGate;并将其封装委托给对应初始化的StreamInputProcessor;在StreamInputProcessor初始化的时候会根据ckp模式和InputGate进行对应CheckpointBarrierHandler的初始化;并将其作为接收到上游发出的CheckpointBarrier的处理核心;
  • 下游算子在周期性处理函数inputProcessor.processInput()中接受到的任务数据时;1、其会先调用其内部持有的操作算子streamOperator.processElement(record);来对数据进行自定义userFunction的处理;并通过output.collect(element)向下游传递;2、其会接收下一个数据块BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();在接收的过程中,其会判断其是不是CheckpointBarrier事件,如果是,其会触发当前算子的ckp操作;

2、barrier的接收及处理

       在Task接收上游数据的实现中(OneInputStreamTask),其主要会在StreamInputProcessor和StreamTwoInputProcessor中创建CheckpointBarrierHandler;CheckpointBarrierHandler是对InputGate的一层封装,增加了对CheckpointBarrier等事件的处理。CheckpointBarrierHandler有两个具体的实现,即BarrierTracker和BarrierBuffer,分别对应AT_LEAST_ONCE和EXACTLY_ONCE这两种模式。

StreamInputProcessor和StreamTwoInputProcessor中循环调用CheckpointBarrierHandler.getNextNonBlocked()获取新数据,因而在CheckpointBarrierHandler获得CheckpointBarrier后可以及时地进行checkpoint相关的操作。

1、AT_LEAST_ONCE模式下对应的BarrierTracker,它仅仅追踪从每一个inputchannel接收到的barrier,当所有inputchannel的barrier都被接收时,就可以触发checkpoint了:

  1. public class BarrierTracker implements CheckpointBarrierHandler {
  2. @Override
  3. public BufferOrEvent getNextNonBlocked() throws Exception {
  4. while (true) {
  5. Optional<BufferOrEvent> next = inputGate.getNextBufferOrEvent();
  6. if (!next.isPresent()) {
  7. // buffer or input exhausted
  8. return null;
  9. }
  10. BufferOrEvent bufferOrEvent = next.get();
  11. if (bufferOrEvent.isBuffer()) {
  12. return bufferOrEvent;
  13. }
  14. else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
  15. // 接收到 CheckpointBarrier
  16. processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
  17. }
  18. else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {
  19. // 接收到 CancelCheckpointMarker
  20. processCheckpointAbortBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
  21. }
  22. else {
  23. // some other event
  24. return bufferOrEvent;
  25. }
  26. }
  27. }
  28. private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
  29. final long barrierId = receivedBarrier.getId();
  30. // fast path for single channel trackers
  31. if (totalNumberOfInputChannels == 1) {
  32. notifyCheckpoint(barrierId, receivedBarrier.getTimestamp(), receivedBarrier.getCheckpointOptions());
  33. return;
  34. }
  35. // general path for multiple input channels
  36. if (LOG.isDebugEnabled()) {
  37. LOG.debug("Received barrier for checkpoint {} from channel {}", barrierId, channelIndex);
  38. }
  39. // find the checkpoint barrier in the queue of pending barriers
  40. CheckpointBarrierCount cbc = null;
  41. int pos = 0;
  42. for (CheckpointBarrierCount next : pendingCheckpoints) {
  43. if (next.checkpointId == barrierId) {
  44. cbc = next;
  45. break;
  46. }
  47. pos++;
  48. }
  49. if (cbc != null) {
  50. // add one to the count to that barrier and check for completion
  51. int numBarriersNew = cbc.incrementBarrierCount();
  52. if (numBarriersNew == totalNumberOfInputChannels) {
  53. // checkpoint can be triggered (or is aborted and all barriers have been seen)
  54. // first, remove this checkpoint and all all prior pending
  55. // checkpoints (which are now subsumed)
  56. // 在当前 barrierId 前面的所有未完成的 checkpoint 都可以丢弃了
  57. for (int i = 0; i <= pos; i++) {
  58. pendingCheckpoints.pollFirst();
  59. }
  60. // notify the listener
  61. if (!cbc.isAborted()) {
  62. if (LOG.isDebugEnabled()) {
  63. LOG.debug("Received all barriers for checkpoint {}", barrierId);
  64. }
  65. // 通知进行 checkpoint
  66. notifyCheckpoint(receivedBarrier.getId(), receivedBarrier.getTimestamp(), receivedBarrier.getCheckpointOptions());
  67. }
  68. }
  69. }
  70. else {
  71. // first barrier for that checkpoint ID
  72. // add it only if it is newer than the latest checkpoint.
  73. // if it is not newer than the latest checkpoint ID, then there cannot be a
  74. // successful checkpoint for that ID anyways
  75. if (barrierId > latestPendingCheckpointID) {
  76. latestPendingCheckpointID = barrierId;
  77. pendingCheckpoints.addLast(new CheckpointBarrierCount(barrierId));
  78. // make sure we do not track too many checkpoints
  79. if (pendingCheckpoints.size() > MAX_CHECKPOINTS_TO_TRACK) {
  80. pendingCheckpoints.pollFirst();
  81. }
  82. }
  83. }
  84. }
  85. }

2、EXACTLY_ONCE模式下对应的BarrierBuffer,它除了要追踪每一个inputchannel接收到的barrier之外,在接收到所有的barrier之前,先收到barrier的channel要进入阻塞状态。当然为了避免进入“反压”状态,BarrierBuffer会继续接收数据,但会对接收到的数据进行缓存,直到所有的barrier都到达。

  1. public class BarrierBuffer implements CheckpointBarrierHandler {
  2. /** To utility to write blocked data to a file channel. */
  3. private final BufferBlocker bufferBlocker; // 用于缓存被阻塞的channel接收的数据
  4. /**
  5. * The sequence of buffers/events that has been unblocked and must now be consumed before
  6. * requesting further data from the input gate.
  7. */
  8. private BufferOrEventSequence currentBuffered; // 当前缓存的数据
  9. // ------------------------------------------------------------------------
  10. // Buffer and barrier handling
  11. // ------------------------------------------------------------------------
  12. @Override
  13. public BufferOrEvent getNextNonBlocked() throws Exception {
  14. while (true) {
  15. // process buffered BufferOrEvents before grabbing new ones
  16. // 先处理缓存的数据
  17. Optional<BufferOrEvent> next;
  18. if (currentBuffered == null) {
  19. next = inputGate.getNextBufferOrEvent();
  20. }
  21. else {
  22. next = Optional.ofNullable(currentBuffered.getNext());
  23. if (!next.isPresent()) {
  24. completeBufferedSequence();
  25. return getNextNonBlocked();
  26. }
  27. }
  28. if (!next.isPresent()) {
  29. if (!endOfStream) {
  30. // end of input stream. stream continues with the buffered data
  31. endOfStream = true;
  32. releaseBlocksAndResetBarriers();
  33. return getNextNonBlocked();
  34. }
  35. else {
  36. // final end of both input and buffered data
  37. return null;
  38. }
  39. }
  40. BufferOrEvent bufferOrEvent = next.get();
  41. if (isBlocked(bufferOrEvent.getChannelIndex())) {
  42. // if the channel is blocked we, we just store the BufferOrEvent
  43. // 如果当前 channel 是 block 状态,先写入缓存
  44. bufferBlocker.add(bufferOrEvent);
  45. checkSizeLimit();
  46. }
  47. else if (bufferOrEvent.isBuffer()) {
  48. return bufferOrEvent;
  49. }
  50. else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
  51. if (!endOfStream) {
  52. // process barriers only if there is a chance of the checkpoint completing
  53. processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
  54. }
  55. }
  56. else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {
  57. processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent());
  58. }
  59. else {
  60. if (bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class) {
  61. processEndOfPartition();
  62. }
  63. return bufferOrEvent;
  64. }
  65. }
  66. }
  67. }

除了CheckpointBarrier消息以外,在checkpoint发生异常或取消checkpoint的时候,会向下游发送CancelCheckpointMarker消息。

JobMaster对Checkpoint的确认

      Task对checkpoint的响应是通过CheckpointResponder接口完成的:

  1. public interface CheckpointResponder {
  2. /**
  3. * Acknowledges the given checkpoint.
  4. */
  5. void acknowledgeCheckpoint(
  6. JobID jobID,
  7. ExecutionAttemptID executionAttemptID,
  8. long checkpointId,
  9. CheckpointMetrics checkpointMetrics,
  10. TaskStateSnapshot subtaskState);
  11. /**
  12. * Declines the given checkpoint.
  13. */
  14. void declineCheckpoint(
  15. JobID jobID,
  16. ExecutionAttemptID executionAttemptID,
  17. long checkpointId,
  18. Throwable cause);
  19. }

         RpcCheckpointResponder作为CheckpointResponder的具体实现,主要是通过RPC调用通知CheckpointCoordinatorGateway,即通知给JobMaster;JobMaster调用CheckpointCoordinator.receiveAcknowledgeMessage()和CheckpointCoordinator.receiveDeclineMessage()进行处理。

确认完成

         在一个Task完成checkpoint操作后,其会通过checkpointCoordinatorGateway的RPC接口checkpointResponder.acknowledgeCheckpoint(),发送ACK响应给CheckpointCoordinator;在JobMaster中的CheckpointCoordinator接收到Ack响应,对Ack响应的处理流程主要如下:

  • 根据Ack的checkpointID从Map<Long,PendingCheckpoint> pendingCheckpoints中查找对应的PendingCheckpoint
  • 若存在对应的PendingCheckpoint
    • 这个PendingCheckpoint没有被丢弃,调用PendingCheckpoint.acknowledgeTask方法处理Ack,根据处理结果的不同:
      • SUCCESS:判断是否已经接受了所有需要响应的Ack,如果是,则调用completePendingCheckpoint完成此次checkpoint
      • DISCARD:Checkpoint已经被discard,清理上报的Ack中携带的状态句柄
      • UNKNOWN:未知的Ack消息,清理上报的Ack中携带的状态句柄
      • DUPLICATE:Ack消息重复接收,直接忽略
    • 这个PendingCheckpoint已经被丢弃,抛出异常
  • 若不存在对应的PendingCheckpoint,则清理上报的Ack中携带的状态句柄

相应的代码如下:

  1. public class CheckpointCoordinator {
  2. /**
  3. * Receives an AcknowledgeCheckpoint message and returns whether the
  4. * message was associated with a pending checkpoint.
  5. */
  6. public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws CheckpointException {
  7. if (shutdown || message == null) {
  8. return false;
  9. }
  10. if (!job.equals(message.getJob())) {
  11. LOG.error("Received wrong AcknowledgeCheckpoint message for job {}: {}", job, message);
  12. return false;
  13. }
  14. final long checkpointId = message.getCheckpointId();
  15. synchronized (lock) {
  16. // we need to check inside the lock for being shutdown as well, otherwise we
  17. // get races and invalid error log messages
  18. if (shutdown) {
  19. return false;
  20. }
  21. final PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId); // 查找对应的PendingCheckpoint
  22. if (checkpoint != null && !checkpoint.isDiscarded()) {
  23. switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState(), message.getCheckpointMetrics())) {
  24. case SUCCESS:
  25. LOG.debug("Received acknowledge message for checkpoint {} from task {} of job {}.",
  26. checkpointId, message.getTaskExecutionId(), message.getJob());
  27. if (checkpoint.isFullyAcknowledged()) {
  28. completePendingCheckpoint(checkpoint);
  29. }
  30. break;
  31. case DUPLICATE:
  32. LOG.debug("Received a duplicate acknowledge message for checkpoint {}, task {}, job {}.",
  33. message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());
  34. break;
  35. case UNKNOWN:
  36. LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, " +
  37. "because the task's execution attempt id was unknown. Discarding " +
  38. "the state handle to avoid lingering state.", message.getCheckpointId(),
  39. message.getTaskExecutionId(), message.getJob());
  40. discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
  41. break;
  42. case DISCARDED:
  43. LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, " +
  44. "because the pending checkpoint had been discarded. Discarding the " +
  45. "state handle tp avoid lingering state.",
  46. message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());
  47. discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
  48. }
  49. return true;
  50. }
  51. else if (checkpoint != null) {
  52. // this should not happen
  53. throw new IllegalStateException(
  54. "Received message for discarded but non-removed checkpoint " + checkpointId);
  55. }
  56. else {
  57. boolean wasPendingCheckpoint;
  58. // message is for an unknown checkpoint, or comes too late (checkpoint disposed)
  59. if (recentPendingCheckpoints.contains(checkpointId)) {
  60. wasPendingCheckpoint = true;
  61. LOG.warn("Received late message for now expired checkpoint attempt {} from " +
  62. "{} of job {}.", checkpointId, message.getTaskExecutionId(), message.getJob());
  63. }
  64. else {
  65. LOG.debug("Received message for an unknown checkpoint {} from {} of job {}.",
  66. checkpointId, message.getTaskExecutionId(), message.getJob());
  67. wasPendingCheckpoint = false;
  68. }
  69. // try to discard the state so that we don't have lingering state lying around
  70. discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
  71. return wasPendingCheckpoint;
  72. }
  73. }
  74. }
  75. }

对于一个已经触发但还没有完成的checkpoint,即PendingCheckpoint,它是如何处理Ack消息的呢?在PendingCheckpoint内部维护了两个Map,分别是:

  • Map<OperatorID, OperatorState> operatorStates;已经接收到Ack的算子的状态句柄
  • Map<ExecutionAttemptID, ExecutionVertex> notYetAcknowledgedTasks;需要Ack但还没有接收到的Task

每当接收到一个Ack消息时,PendingCheckpoint就从notYetAcknowledgedTasks中移除对应的Task,并保存Ack携带的状态句柄保存。当notYetAcknowledgedTasks为空时,表明所有的Ack消息都接收到了。

其中OperatorState是算子状态句柄的一层封装:

  1. class OperatorState implements CompositeStateHandle {
  2. /** handles to non-partitioned states, subtaskindex -> subtaskstate */
  3. private final Map<Integer, OperatorSubtaskState> operatorSubtaskStates;
  4. }
  5. public class OperatorSubtaskState implements CompositeStateHandle {
  6. /** Snapshot from the {@link org.apache.flink.runtime.state.OperatorStateBackend}. */
  7. @Nonnull
  8. private final StateObjectCollection<OperatorStateHandle> managedOperatorState;
  9. /** Snapshot written using {@link org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream}. */
  10. @Nonnull
  11. private final StateObjectCollection<OperatorStateHandle> rawOperatorState;
  12. /** Snapshot from {@link org.apache.flink.runtime.state.KeyedStateBackend}. */
  13. @Nonnull
  14. private final StateObjectCollection<KeyedStateHandle> managedKeyedState;
  15. /** Snapshot written using {@link org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream}. */
  16. @Nonnull
  17. private final StateObjectCollection<KeyedStateHandle> rawKeyedState;
  18. }

一旦PendingCheckpoint调用checkpoint.acknowledgeTask()确认了所有Ack消息都已经接收,那么就可以完成此次checkpoint了,具体包括:

  • 调用PendingCheckpoint.finalizeCheckpoint()将PendingCheckpoint转化为CompletedCheckpoint
    • 获取CheckpointMetadataOutputStream,将所有的状态句柄信息通过CheckpointMetadataOutputStream写入到存储系统中
    • 创建一个CompletedCheckpoint对象
  • 将CompletedCheckpoint保存到CompletedCheckpointStore中
    • CompletedCheckpointStore有两种实现,分别为StandaloneCompletedCheckpointStore和ZooKeeperCompletedCheckpointStore
    • StandaloneCompletedCheckpointStore简单地将CompletedCheckpointStore存放在一个数组中
    • ZooKeeperCompletedCheckpointStore提供高可用实现:先将CompletedCheckpointStore写入到RetrievableStateStorageHelper中(通常是文件系统),然后将文件句柄存在ZK中
    • 保存的CompletedCheckpointStore数量是有限的,会删除旧的快照
  • 移除被越过的PendingCheckpoint,因为CheckpointID是递增的,那么所有比当前完成的CheckpointID小的PendingCheckpoint都可以被丢弃了
  • 依次调用Execution.notifyCheckpointComplete()通知所有的Task当前Checkpoint已经完成
    • 通过RPC调用TaskExecutor.confirmCheckpoint()告知对应的Task

拒绝

         在Task进行checkpoint的过程,可能会发生异常导致checkpoint失败,在这种情况下会通过CheckpointResponder发出回绝的消息。当CheckpointCoordinator接收到DeclineCheckpoint消息后会移除PendingCheckpoint,并尝试丢弃已经接收到的Ack消息中已完成的状态句柄:

  1. public class CheckpointCoordinator {
  2. /**
  3. * Receives a {@link DeclineCheckpoint} message for a pending checkpoint.
  4. */
  5. public void receiveDeclineMessage(DeclineCheckpoint message) {
  6. if (shutdown || message == null) {
  7. return;
  8. }
  9. if (!job.equals(message.getJob())) {
  10. throw new IllegalArgumentException("Received DeclineCheckpoint message for job " +
  11. message.getJob() + " while this coordinator handles job " + job);
  12. }
  13. final long checkpointId = message.getCheckpointId();
  14. final String reason = (message.getReason() != null ? message.getReason().getMessage() : "");
  15. PendingCheckpoint checkpoint;
  16. synchronized (lock) {
  17. // we need to check inside the lock for being shutdown as well, otherwise we
  18. // get races and invalid error log messages
  19. if (shutdown) {
  20. return;
  21. }
  22. checkpoint = pendingCheckpoints.remove(checkpointId);
  23. if (checkpoint != null && !checkpoint.isDiscarded()) {
  24. LOG.info("Decline checkpoint {} by task {} of job {}.", checkpointId, message.getTaskExecutionId(), job);
  25. discardCheckpoint(checkpoint, message.getReason());
  26. }
  27. else if (checkpoint != null) {
  28. // this should not happen
  29. throw new IllegalStateException("Received message for discarded but non-removed checkpoint " + checkpointId);
  30. }
  31. else if (LOG.isDebugEnabled()) {
  32. if (recentPendingCheckpoints.contains(checkpointId)) {
  33. // message is for an unknown checkpoint, or comes too late (checkpoint disposed)
  34. LOG.debug("Received another decline message for now expired checkpoint attempt {} of job {} : {}", checkpointId, job, reason);
  35. } else {
  36. // message is for an unknown checkpoint. might be so old that we don't even remember it any more
  37. LOG.debug("Received decline message for unknown (too old?) checkpoint attempt {} of job {} : {}", checkpointId, job, reason);
  38. }
  39. }
  40. }
  41. }
  42. }
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/在线问答5/article/detail/870908
推荐阅读
相关标签
  

闽ICP备14008679号