赞
踩
在Flink JobMaster中有用于协调和触发checkpoint机制的协调管理器CheckpointCoordinator,其是Flink分布式快照的核心管理控制组件,其主要维护的功能如下:
其针对checkpoint的配置及最终触发调度主要集中在两个转换过程中:
- class StreamingJobGraphGenerator {
- private void configureCheckpointing() {
- CheckpointConfig cfg = streamGraph.getCheckpointConfig();
- long interval = cfg.getCheckpointInterval();
- if (interval > 0) {
- ExecutionConfig executionConfig = streamGraph.getExecutionConfig();
- // propagate the expected behaviour for checkpoint errors to task.
- executionConfig.setFailTaskOnCheckpointError(cfg.isFailOnCheckpointingErrors());
- } else {
- // interval of max value means disable periodic checkpoint
- interval = Long.MAX_VALUE;
- }
- // --- configure the participating vertices ---
-
- // collect the vertices that receive "trigger checkpoint" messages.
- // currently, these are all the sources
- List<JobVertexID> triggerVertices = new ArrayList<>();
-
- // collect the vertices that need to acknowledge the checkpoint
- // currently, these are all vertices
- List<JobVertexID> ackVertices = new ArrayList<>(jobVertices.size());
-
- // collect the vertices that receive "commit checkpoint" messages
- // currently, these are all vertices
- List<JobVertexID> commitVertices = new ArrayList<>(jobVertices.size());
-
- for (JobVertex vertex : jobVertices.values()) {
- if (vertex.isInputVertex()) {
- triggerVertices.add(vertex.getID());
- }
- commitVertices.add(vertex.getID());
- ackVertices.add(vertex.getID());
- }
-
- // --- configure options ---
- CheckpointRetentionPolicy retentionAfterTermination;
- if (cfg.isExternalizedCheckpointsEnabled()) {
- CheckpointConfig.ExternalizedCheckpointCleanup cleanup = cfg.getExternalizedCheckpointCleanup();
- // Sanity check
- if (cleanup == null) {
- throw new IllegalStateException("Externalized checkpoints enabled, but no cleanup mode configured.");
- }
- retentionAfterTermination = cleanup.deleteOnCancellation() ?
- CheckpointRetentionPolicy.RETAIN_ON_FAILURE :
- CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION;
- } else {
- retentionAfterTermination = CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
- }
- CheckpointingMode mode = cfg.getCheckpointingMode();
- boolean isExactlyOnce;
- if (mode == CheckpointingMode.EXACTLY_ONCE) {
- isExactlyOnce = true;
- } else if (mode == CheckpointingMode.AT_LEAST_ONCE) {
- isExactlyOnce = false;
- } else {
- throw new IllegalStateException("Unexpected checkpointing mode. " +
- "Did not expect there to be another checkpointing mode besides " +
- "exactly-once or at-least-once.");
- }
-
- // --- configure the master-side checkpoint hooks ---
- final ArrayList<MasterTriggerRestoreHook.Factory> hooks = new ArrayList<>();
- for (StreamNode node : streamGraph.getStreamNodes()) {
- StreamOperator<?> op = node.getOperator();
- if (op instanceof AbstractUdfStreamOperator) {
- Function f = ((AbstractUdfStreamOperator<?, ?>) op).getUserFunction();
- if (f instanceof WithMasterCheckpointHook) {
- hooks.add(new FunctionMasterCheckpointHookFactory((WithMasterCheckpointHook<?>) f));
- }
- }
- }
-
- // because the hooks can have user-defined code, they need to be stored as
- // eagerly serialized values
- final SerializedValue<MasterTriggerRestoreHook.Factory[]> serializedHooks;
- if (hooks.isEmpty()) {
- serializedHooks = null;
- } else {
- try {
- MasterTriggerRestoreHook.Factory[] asArray = hooks.toArray(new MasterTriggerRestoreHook.Factory[hooks.size()]);
- serializedHooks = new SerializedValue<>(asArray);
- } catch (IOException e) {
- throw new FlinkRuntimeException("Trigger/restore hook is not serializable", e);
- }
- }
-
- // because the state backend can have user-defined code, it needs to be stored as
- // eagerly serialized value
- final SerializedValue<StateBackend> serializedStateBackend;
- if (streamGraph.getStateBackend() == null) {
- serializedStateBackend = null;
- } else {
- try {
- serializedStateBackend = new SerializedValue<StateBackend>(streamGraph.getStateBackend());
- } catch (IOException e) {
- throw new FlinkRuntimeException("State backend is not serializable", e);
- }
- }
-
- // --- done, put it all together ---
- JobCheckpointingSettings settings = new JobCheckpointingSettings(
- triggerVertices,
- ackVertices,
- commitVertices,
- new CheckpointCoordinatorConfiguration(
- interval,
- cfg.getCheckpointTimeout(),
- cfg.getMinPauseBetweenCheckpoints(),
- cfg.getMaxConcurrentCheckpoints(),
- retentionAfterTermination,
- isExactlyOnce),
- serializedStateBackend,
- serializedHooks);
-
- jobGraph.setSnapshotSettings(settings);
- }
- }

- class ExecutionGraph {
- public void enableCheckpointing(
- long interval,
- long checkpointTimeout,
- long minPauseBetweenCheckpoints,
- int maxConcurrentCheckpoints,
- CheckpointRetentionPolicy retentionPolicy,
- List<ExecutionJobVertex> verticesToTrigger,
- List<ExecutionJobVertex> verticesToWaitFor,
- List<ExecutionJobVertex> verticesToCommitTo,
- List<MasterTriggerRestoreHook<?>> masterHooks,
- CheckpointIDCounter checkpointIDCounter,
- CompletedCheckpointStore checkpointStore,
- StateBackend checkpointStateBackend,
- CheckpointStatsTracker statsTracker) {
-
- // simple sanity checks
- checkArgument(interval >= 10, "checkpoint interval must not be below 10ms");
- checkArgument(checkpointTimeout >= 10, "checkpoint timeout must not be below 10ms");
- checkState(state == JobStatus.CREATED, "Job must be in CREATED state");
- checkState(checkpointCoordinator == null, "checkpointing already enabled");
-
- ExecutionVertex[] tasksToTrigger = collectExecutionVertices(verticesToTrigger);
- ExecutionVertex[] tasksToWaitFor = collectExecutionVertices(verticesToWaitFor);
- ExecutionVertex[] tasksToCommitTo = collectExecutionVertices(verticesToCommitTo);
-
- checkpointStatsTracker = checkNotNull(statsTracker, "CheckpointStatsTracker");
-
- // create the coordinator that triggers and commits checkpoints and holds the state
- checkpointCoordinator = new CheckpointCoordinator(
- jobInformation.getJobId(),
- interval,
- checkpointTimeout,
- minPauseBetweenCheckpoints,
- maxConcurrentCheckpoints,
- retentionPolicy,
- tasksToTrigger,
- tasksToWaitFor,
- tasksToCommitTo,
- checkpointIDCounter,
- checkpointStore,
- checkpointStateBackend,
- ioExecutor,
- SharedStateRegistry.DEFAULT_FACTORY);
-
- // register the master hooks on the checkpoint coordinator
- for (MasterTriggerRestoreHook<?> hook : masterHooks) {
- if (!checkpointCoordinator.addMasterHook(hook)) {
- LOG.warn("Trying to register multiple checkpoint hooks with the name: {}", hook.getIdentifier());
- }
- }
- checkpointCoordinator.setCheckpointStatsTracker(checkpointStatsTracker);
-
- // interval of max long value indicates disable periodic checkpoint,
- // the CheckpointActivatorDeactivator should be created only if the interval is not max value
- if (interval != Long.MAX_VALUE) {
- // the periodic checkpoint scheduler is activated and deactivated as a result of
- // job status changes (running -> on, all other states -> off)
- // 注册任务状态更改 监听器通知
- registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator());
- }
- }
-
- private void notifyJobStatusChange(JobStatus newState, Throwable error) {
- if (jobStatusListeners.size() > 0) {
- final long timestamp = System.currentTimeMillis();
- final Throwable serializedError = error == null ? null : new SerializedThrowable(error);
-
- for (JobStatusListener listener : jobStatusListeners) {
- try {
- listener.jobStatusChanges(getJobID(), newState, timestamp, serializedError);
- } catch (Throwable t) {
- LOG.warn("Error while notifying JobStatusListener", t);
- }
- }
- }
- }
- }

当JobStatus状态变为RUNNING时,listener.jobStatusChanges()会通知其上注册的监听器;CheckpointCoordinatorDeActivator会得到通知,并且通过CheckpointCoordinator.startCheckpointScheduler启动checkpoint的定时器。
- class CheckpointCoordinatorDeActivator implements JobStatusListener {
- @Override
- public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
- if (newJobStatus == JobStatus.RUNNING) {
- // start the checkpoint scheduler
- coordinator.startCheckpointScheduler();
- } else {
- // anything else should stop the trigger for now
- coordinator.stopCheckpointScheduler();
- }
- }
- }
在CheckpointCoordinator.startCheckpointScheduler启动checkpoint定时器之后,也就意味着该checkpoint已启动并会周期性的调度执行。在flink中,其对该checkpoint调度的定时任务被封装为ScheduledTrigger,运行时会调用CheckpointCoordinator.triggerCheckpoint()触发一次checkpoint。其triggerCheckpoint()方法主要执行功能如下:
savepoint和checkpoint的处理逻辑基本一致,只是savepoint是强制触发的,需要调用Execution.triggerSynchronousSavepoint()进行触发。
在CheckpointCoordinator内部也有三个列表:
这就对应了前面JobGraph中的三个列表,在触发checkpoint的时候,只有作为source的Execution会调用Execution.triggerCheckpoint()方法。会通过RPC调用通知对应的RpcTaskManagerGateway调用triggerCheckpoint。
- public class CheckpointCoordinator {
- public CheckpointTriggerResult triggerCheckpoint(
- long timestamp,
- CheckpointProperties props,
- @Nullable String externalSavepointLocation,
- boolean isPeriodic) {
-
- // make some eager pre-checks
- synchronized (lock) {
- // 检查是否可以触发checkpoint(筛选条件)
- // abort if the coordinator has been shutdown in the meantime
- // Don't allow periodic checkpoint if scheduling has been disabled
-
- // validate whether the checkpoint can be triggered, with respect to the limit of
- // concurrent checkpoints, and the minimum time between checkpoints.
- // these checks are not relevant for savepoints
-
- // check if all tasks that we need to trigger are running.
- // if not, abort the checkpoint
- // 检查是否所有需要触发checkpoint的Execution都是RUNNING状态
- Execution[] executions = new Execution[tasksToTrigger.length];
- for (int i = 0; i < tasksToTrigger.length; i++) {
- Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
- if (ee == null) {
- // ......
- } else if (ee.getState() == ExecutionState.RUNNING) {
- executions[i] = ee;
- } else {
- // ......
- }
- }
- // ......
- // we will actually trigger this checkpoint!
-
- // we lock with a special lock to make sure that trigger requests do not overtake each other.
- // this is not done with the coordinator-wide lock, because the 'checkpointIdCounter'
- // may issue blocking operations. Using a different lock than the coordinator-wide lock,
- // we avoid blocking the processing of 'acknowledge/decline' messages during that time.
- synchronized (triggerLock) {
- final CheckpointStorageLocation checkpointStorageLocation;
- final long checkpointID;
- try {
- // this must happen outside the coordinator-wide lock, because it communicates
- // with external services (in HA mode) and may block for a while.
- checkpointID = checkpointIdCounter.getAndIncrement();
- // 生成此次checkpoint的checkpointID(id是严格自增的),并初始化CheckpointStorageLocation
- checkpointStorageLocation = props.isSavepoint() ?
- checkpointStorage.initializeLocationForSavepoint(checkpointID, externalSavepointLocation) :
- checkpointStorage.initializeLocationForCheckpoint(checkpointID);
- } catch (Throwable t) {
- // ......
- }
-
- final PendingCheckpoint checkpoint = new PendingCheckpoint(
- job,
- checkpointID,
- timestamp,
- ackTasks,
- props,
- checkpointStorageLocation,
- executor);
- if (statsTracker != null) {
- PendingCheckpointStats callback = statsTracker.reportPendingCheckpoint(
- checkpointID,
- timestamp,
- props);
- checkpoint.setStatsCallback(callback);
- }
-
- // schedule the timer that will clean up the expired checkpoints
- // 注册一个调度任务,在checkpoint超时后取消此次checkpoint,并重新触发一次新的checkpoint
- final Runnable canceller = () -> {
- synchronized (lock) {
- // only do the work if the checkpoint is not discarded anyways
- // note that checkpoint completion discards the pending checkpoint object
- if (!checkpoint.isDiscarded()) {
- LOG.info("Checkpoint {} of job {} expired before completing.", checkpointID, job);
- checkpoint.abortExpired();
- pendingCheckpoints.remove(checkpointID);
- rememberRecentCheckpointId(checkpointID);
- triggerQueuedRequests(); // 重新调度触发一次新的checkpoint
- }
- }
- };
-
- try {
- // re-acquire the coordinator-wide lock
- synchronized (lock) {
- // since we released the lock in the meantime, we need to re-check
- // that the conditions still hold.
- // ......
- LOG.info("Triggering checkpoint {} @ {} for job {}.", checkpointID, timestamp, job);
- pendingCheckpoints.put(checkpointID, checkpoint);
- ScheduledFuture<?> cancellerHandle = timer.schedule(
- canceller,
- checkpointTimeout, TimeUnit.MILLISECONDS);
- if (!checkpoint.setCancellerHandle(cancellerHandle)) {
- // checkpoint is already disposed!
- cancellerHandle.cancel(false);
- }
-
- // trigger the master hooks for the checkpoint
- final List<MasterState> masterStates = MasterHooks.triggerMasterHooks(masterHooks.values(),
- checkpointID, timestamp, executor, Time.milliseconds(checkpointTimeout));
- for (MasterState s : masterStates) {
- checkpoint.addMasterState(s);
- }
- }
- // end of lock scope
-
- final CheckpointOptions checkpointOptions = new CheckpointOptions(
- props.getCheckpointType(),
- checkpointStorageLocation.getLocationReference());
-
- // send the messages to the tasks that trigger their checkpoint
- // 调用Execution.triggerCheckpoint()方法向所有需要trigger的task发起checkpoint请求
- for (Execution execution: executions) {
- execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
- }
-
- numUnsuccessfulCheckpointsTriggers.set(0);
- return new CheckpointTriggerResult(checkpoint);
- } catch (Throwable t) {
- // guard the map against concurrent modifications
- // ......
- }
- } // end trigger lock
- }
- }
-
-
- public class Execution implements AccessExecution, Archiveable<ArchivedExecution>, LogicalSlot.Payload {
- /**
- * Trigger a new checkpoint on the task of this execution.
- *
- * @param checkpointId of th checkpoint to trigger
- * @param timestamp of the checkpoint to trigger
- * @param checkpointOptions of the checkpoint to trigger
- */
- public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
- final LogicalSlot slot = assignedResource;
- if (slot != null) {
- final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
- // 只有作为source的Execution会调用Execution.triggerCheckpoint()
- // 通过RPC调用通知对应的RpcTaskManagerGateway调用triggerCheckpoint
- taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);
- } else {
- LOG.debug("The execution has no slot assigned. This indicates that the execution is " +
- "no longer running.");
- }
- }
- }

CheckpointCoordinator发出触发checkpoint的消息,最终通过RPC调用TaskExecutorGateway.triggerCheckpoint,即请求执行TaskExecutor.triggerCheckpoin()。因为一个TaskExecutor中可能有多个Task正在运行,因而要根据触发checkpoint的ExecutionAttemptID找到对应的Task,然后调用Task.triggerCheckpointBarrier()方法。只有作为source的Task才会触发triggerCheckpointBarrier()方法的调用。
在Task中,checkpoint的触发被封装为一个异步任务执行:
- class Task {
- /**
- * Calls the invokable to trigger a checkpoint.
- *
- * @param checkpointID The ID identifying the checkpoint.
- * @param checkpointTimestamp The timestamp associated with the checkpoint.
- * @param checkpointOptions Options for performing this checkpoint.
- */
- public void triggerCheckpointBarrier(
- final long checkpointID,
- long checkpointTimestamp,
- final CheckpointOptions checkpointOptions) {
-
- final AbstractInvokable invokable = this.invokable;
- final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);
-
- if (executionState == ExecutionState.RUNNING && invokable != null) {
- // build a local closure
- final String taskName = taskNameWithSubtask;
- final SafetyNetCloseableRegistry safetyNetCloseableRegistry =
- FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread();
-
- Runnable runnable = new Runnable() {
- @Override
- public void run() {
- // set safety net from the task's context for checkpointing thread
- LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
- FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);
- try {
- // 触发checkpoint真正的调用逻辑
- boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions);
- if (!success) {
- checkpointResponder.declineCheckpoint(
- getJobID(), getExecutionId(), checkpointID,
- new CheckpointDeclineTaskNotReadyException(taskName));
- }
- } catch (Throwable t) {
- // ......
- } finally {
- FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null);
- }
- }
- };
- // 异步执行
- executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));
- }
- else {
- LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);
- // send back a message that we did not do the checkpoint
- checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
- new CheckpointDeclineTaskNotReadyException(taskNameWithSubtask));
- }
- }
- }

Task执行checkpoint的真正逻辑被封装在AbstractInvokable.triggerCheckpoint(...)中,AbstractInvokable中有两个触发checkpoint的方法:
其中triggerCheckpoint是触发checkpoint的源头,会向下游注入CheckpointBarrier;而下游的其他任务在收到CheckpointBarrier后调用triggerCheckpointOnBarrier方法。这两个方法的具体实现有一些细微的差异,但主要的逻辑是一致的,在StreamTask.performCheckpoint()方法中:1)先向下游发送barrier,2)存储检查点快照。
一旦StreamTask.triggerCheckpoint()或StreamTask.triggerCheckpointOnBarrier()被调用,就会通过OperatorChain.broadcastCheckpointBarrier()向下游发送barrier:
- public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
- extends AbstractInvokable
- implements AsyncExceptionHandler {
- private boolean performCheckpoint(
- CheckpointMetaData checkpointMetaData,
- CheckpointOptions checkpointOptions,
- CheckpointMetrics checkpointMetrics) throws Exception {
- LOG.debug("Starting checkpoint ({}) {} on task {}",
- checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());
-
- synchronized (lock) {
- if (isRunning) {
- // we can do a checkpoint
-
- // All of the following steps happen as an atomic step from the perspective of barriers and
- // records/watermarks/timers/callbacks.
- // We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
- // checkpoint alignments
-
- // Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
- // The pre-barrier work should be nothing or minimal in the common case.
- operatorChain.prepareSnapshotPreBarrier(checkpointMetaData.getCheckpointId());
-
- // Step (2): Send the checkpoint barrier downstream // 向下游发送barrier
- operatorChain.broadcastCheckpointBarrier(
- checkpointMetaData.getCheckpointId(),
- checkpointMetaData.getTimestamp(),
- checkpointOptions);
-
- // Step (3): Take the state snapshot. This should be largely asynchronous, to not
- // impact progress of the streaming topology
- checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics); // 存储检查点快照
- return true;
- }
- else {
- // we cannot perform our checkpoint - let the downstream operators know that they
- // should not wait for any input from this operator
-
- // we cannot broadcast the cancellation markers on the 'operator chain', because it may not
- // yet be created
- final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
- Exception exception = null;
-
- for (StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> streamRecordWriter : streamRecordWriters) {
- try {
- streamRecordWriter.broadcastEvent(message);
- } catch (Exception e) {
- // ......
- }
- }
- // ......
- return false;
- }
- }
- }
- }

- public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements StreamStatusMaintainer {
- public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException {
- // 创建一个CheckpointBarrier
- CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions);
- for (RecordWriterOutput<?> streamOutput : streamOutputs) {
- // 向所有的下游发送
- streamOutput.broadcastEvent(barrier);
- }
- }
- }
在每一个Task中,其会通过InputGate消费上游Task产生的数据,在flink实际的任务执行运行中;其主要的checkpoint触发与流向如下:
在Task接收上游数据的实现中(OneInputStreamTask),其主要会在StreamInputProcessor和StreamTwoInputProcessor中创建CheckpointBarrierHandler;CheckpointBarrierHandler是对InputGate的一层封装,增加了对CheckpointBarrier等事件的处理。CheckpointBarrierHandler有两个具体的实现,即BarrierTracker和BarrierBuffer,分别对应AT_LEAST_ONCE和EXACTLY_ONCE这两种模式。
StreamInputProcessor和StreamTwoInputProcessor中循环调用CheckpointBarrierHandler.getNextNonBlocked()获取新数据,因而在CheckpointBarrierHandler获得CheckpointBarrier后可以及时地进行checkpoint相关的操作。
1、AT_LEAST_ONCE模式下对应的BarrierTracker,它仅仅追踪从每一个inputchannel接收到的barrier,当所有inputchannel的barrier都被接收时,就可以触发checkpoint了:
- public class BarrierTracker implements CheckpointBarrierHandler {
- @Override
- public BufferOrEvent getNextNonBlocked() throws Exception {
- while (true) {
- Optional<BufferOrEvent> next = inputGate.getNextBufferOrEvent();
- if (!next.isPresent()) {
- // buffer or input exhausted
- return null;
- }
-
- BufferOrEvent bufferOrEvent = next.get();
- if (bufferOrEvent.isBuffer()) {
- return bufferOrEvent;
- }
- else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
- // 接收到 CheckpointBarrier
- processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
- }
- else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {
- // 接收到 CancelCheckpointMarker
- processCheckpointAbortBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
- }
- else {
- // some other event
- return bufferOrEvent;
- }
- }
- }
-
- private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
- final long barrierId = receivedBarrier.getId();
- // fast path for single channel trackers
- if (totalNumberOfInputChannels == 1) {
- notifyCheckpoint(barrierId, receivedBarrier.getTimestamp(), receivedBarrier.getCheckpointOptions());
- return;
- }
-
- // general path for multiple input channels
- if (LOG.isDebugEnabled()) {
- LOG.debug("Received barrier for checkpoint {} from channel {}", barrierId, channelIndex);
- }
- // find the checkpoint barrier in the queue of pending barriers
- CheckpointBarrierCount cbc = null;
- int pos = 0;
-
- for (CheckpointBarrierCount next : pendingCheckpoints) {
- if (next.checkpointId == barrierId) {
- cbc = next;
- break;
- }
- pos++;
- }
-
- if (cbc != null) {
- // add one to the count to that barrier and check for completion
- int numBarriersNew = cbc.incrementBarrierCount();
- if (numBarriersNew == totalNumberOfInputChannels) {
- // checkpoint can be triggered (or is aborted and all barriers have been seen)
- // first, remove this checkpoint and all all prior pending
- // checkpoints (which are now subsumed)
- // 在当前 barrierId 前面的所有未完成的 checkpoint 都可以丢弃了
- for (int i = 0; i <= pos; i++) {
- pendingCheckpoints.pollFirst();
- }
- // notify the listener
- if (!cbc.isAborted()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Received all barriers for checkpoint {}", barrierId);
- }
- // 通知进行 checkpoint
- notifyCheckpoint(receivedBarrier.getId(), receivedBarrier.getTimestamp(), receivedBarrier.getCheckpointOptions());
- }
- }
- }
- else {
- // first barrier for that checkpoint ID
- // add it only if it is newer than the latest checkpoint.
- // if it is not newer than the latest checkpoint ID, then there cannot be a
- // successful checkpoint for that ID anyways
- if (barrierId > latestPendingCheckpointID) {
- latestPendingCheckpointID = barrierId;
- pendingCheckpoints.addLast(new CheckpointBarrierCount(barrierId));
-
- // make sure we do not track too many checkpoints
- if (pendingCheckpoints.size() > MAX_CHECKPOINTS_TO_TRACK) {
- pendingCheckpoints.pollFirst();
- }
- }
- }
- }
- }

2、EXACTLY_ONCE模式下对应的BarrierBuffer,它除了要追踪每一个inputchannel接收到的barrier之外,在接收到所有的barrier之前,先收到barrier的channel要进入阻塞状态。当然为了避免进入“反压”状态,BarrierBuffer会继续接收数据,但会对接收到的数据进行缓存,直到所有的barrier都到达。
- public class BarrierBuffer implements CheckpointBarrierHandler {
- /** To utility to write blocked data to a file channel. */
- private final BufferBlocker bufferBlocker; // 用于缓存被阻塞的channel接收的数据
-
- /**
- * The sequence of buffers/events that has been unblocked and must now be consumed before
- * requesting further data from the input gate.
- */
- private BufferOrEventSequence currentBuffered; // 当前缓存的数据
-
- // ------------------------------------------------------------------------
- // Buffer and barrier handling
- // ------------------------------------------------------------------------
- @Override
- public BufferOrEvent getNextNonBlocked() throws Exception {
- while (true) {
- // process buffered BufferOrEvents before grabbing new ones
- // 先处理缓存的数据
- Optional<BufferOrEvent> next;
- if (currentBuffered == null) {
- next = inputGate.getNextBufferOrEvent();
- }
- else {
- next = Optional.ofNullable(currentBuffered.getNext());
- if (!next.isPresent()) {
- completeBufferedSequence();
- return getNextNonBlocked();
- }
- }
-
- if (!next.isPresent()) {
- if (!endOfStream) {
- // end of input stream. stream continues with the buffered data
- endOfStream = true;
- releaseBlocksAndResetBarriers();
- return getNextNonBlocked();
- }
- else {
- // final end of both input and buffered data
- return null;
- }
- }
-
- BufferOrEvent bufferOrEvent = next.get();
- if (isBlocked(bufferOrEvent.getChannelIndex())) {
- // if the channel is blocked we, we just store the BufferOrEvent
- // 如果当前 channel 是 block 状态,先写入缓存
- bufferBlocker.add(bufferOrEvent);
- checkSizeLimit();
- }
- else if (bufferOrEvent.isBuffer()) {
- return bufferOrEvent;
- }
- else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
- if (!endOfStream) {
- // process barriers only if there is a chance of the checkpoint completing
- processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
- }
- }
- else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {
- processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent());
- }
- else {
- if (bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class) {
- processEndOfPartition();
- }
- return bufferOrEvent;
- }
- }
- }
- }

除了CheckpointBarrier消息以外,在checkpoint发生异常或取消checkpoint的时候,会向下游发送CancelCheckpointMarker消息。
Task对checkpoint的响应是通过CheckpointResponder接口完成的:
- public interface CheckpointResponder {
- /**
- * Acknowledges the given checkpoint.
- */
- void acknowledgeCheckpoint(
- JobID jobID,
- ExecutionAttemptID executionAttemptID,
- long checkpointId,
- CheckpointMetrics checkpointMetrics,
- TaskStateSnapshot subtaskState);
-
- /**
- * Declines the given checkpoint.
- */
- void declineCheckpoint(
- JobID jobID,
- ExecutionAttemptID executionAttemptID,
- long checkpointId,
- Throwable cause);
- }

RpcCheckpointResponder作为CheckpointResponder的具体实现,主要是通过RPC调用通知CheckpointCoordinatorGateway,即通知给JobMaster;JobMaster调用CheckpointCoordinator.receiveAcknowledgeMessage()和CheckpointCoordinator.receiveDeclineMessage()进行处理。
在一个Task完成checkpoint操作后,其会通过checkpointCoordinatorGateway的RPC接口checkpointResponder.acknowledgeCheckpoint(),发送ACK响应给CheckpointCoordinator;在JobMaster中的CheckpointCoordinator接收到Ack响应,对Ack响应的处理流程主要如下:
相应的代码如下:
- public class CheckpointCoordinator {
- /**
- * Receives an AcknowledgeCheckpoint message and returns whether the
- * message was associated with a pending checkpoint.
- */
- public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws CheckpointException {
- if (shutdown || message == null) {
- return false;
- }
- if (!job.equals(message.getJob())) {
- LOG.error("Received wrong AcknowledgeCheckpoint message for job {}: {}", job, message);
- return false;
- }
-
- final long checkpointId = message.getCheckpointId();
- synchronized (lock) {
- // we need to check inside the lock for being shutdown as well, otherwise we
- // get races and invalid error log messages
- if (shutdown) {
- return false;
- }
- final PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId); // 查找对应的PendingCheckpoint
- if (checkpoint != null && !checkpoint.isDiscarded()) {
- switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState(), message.getCheckpointMetrics())) {
- case SUCCESS:
- LOG.debug("Received acknowledge message for checkpoint {} from task {} of job {}.",
- checkpointId, message.getTaskExecutionId(), message.getJob());
- if (checkpoint.isFullyAcknowledged()) {
- completePendingCheckpoint(checkpoint);
- }
- break;
- case DUPLICATE:
- LOG.debug("Received a duplicate acknowledge message for checkpoint {}, task {}, job {}.",
- message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());
- break;
- case UNKNOWN:
- LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, " +
- "because the task's execution attempt id was unknown. Discarding " +
- "the state handle to avoid lingering state.", message.getCheckpointId(),
- message.getTaskExecutionId(), message.getJob());
- discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
- break;
- case DISCARDED:
- LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, " +
- "because the pending checkpoint had been discarded. Discarding the " +
- "state handle tp avoid lingering state.",
- message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());
- discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
- }
- return true;
- }
- else if (checkpoint != null) {
- // this should not happen
- throw new IllegalStateException(
- "Received message for discarded but non-removed checkpoint " + checkpointId);
- }
- else {
- boolean wasPendingCheckpoint;
- // message is for an unknown checkpoint, or comes too late (checkpoint disposed)
- if (recentPendingCheckpoints.contains(checkpointId)) {
- wasPendingCheckpoint = true;
- LOG.warn("Received late message for now expired checkpoint attempt {} from " +
- "{} of job {}.", checkpointId, message.getTaskExecutionId(), message.getJob());
- }
- else {
- LOG.debug("Received message for an unknown checkpoint {} from {} of job {}.",
- checkpointId, message.getTaskExecutionId(), message.getJob());
- wasPendingCheckpoint = false;
- }
- // try to discard the state so that we don't have lingering state lying around
- discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
- return wasPendingCheckpoint;
- }
- }
- }
- }

对于一个已经触发但还没有完成的checkpoint,即PendingCheckpoint,它是如何处理Ack消息的呢?在PendingCheckpoint内部维护了两个Map,分别是:
每当接收到一个Ack消息时,PendingCheckpoint就从notYetAcknowledgedTasks中移除对应的Task,并保存Ack携带的状态句柄保存。当notYetAcknowledgedTasks为空时,表明所有的Ack消息都接收到了。
其中OperatorState是算子状态句柄的一层封装:
- class OperatorState implements CompositeStateHandle {
- /** handles to non-partitioned states, subtaskindex -> subtaskstate */
- private final Map<Integer, OperatorSubtaskState> operatorSubtaskStates;
- }
-
- public class OperatorSubtaskState implements CompositeStateHandle {
- /** Snapshot from the {@link org.apache.flink.runtime.state.OperatorStateBackend}. */
- @Nonnull
- private final StateObjectCollection<OperatorStateHandle> managedOperatorState;
-
- /** Snapshot written using {@link org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream}. */
- @Nonnull
- private final StateObjectCollection<OperatorStateHandle> rawOperatorState;
-
- /** Snapshot from {@link org.apache.flink.runtime.state.KeyedStateBackend}. */
- @Nonnull
- private final StateObjectCollection<KeyedStateHandle> managedKeyedState;
-
- /** Snapshot written using {@link org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream}. */
- @Nonnull
- private final StateObjectCollection<KeyedStateHandle> rawKeyedState;
- }

一旦PendingCheckpoint调用checkpoint.acknowledgeTask()确认了所有Ack消息都已经接收,那么就可以完成此次checkpoint了,具体包括:
在Task进行checkpoint的过程,可能会发生异常导致checkpoint失败,在这种情况下会通过CheckpointResponder发出回绝的消息。当CheckpointCoordinator接收到DeclineCheckpoint消息后会移除PendingCheckpoint,并尝试丢弃已经接收到的Ack消息中已完成的状态句柄:
- public class CheckpointCoordinator {
- /**
- * Receives a {@link DeclineCheckpoint} message for a pending checkpoint.
- */
- public void receiveDeclineMessage(DeclineCheckpoint message) {
- if (shutdown || message == null) {
- return;
- }
- if (!job.equals(message.getJob())) {
- throw new IllegalArgumentException("Received DeclineCheckpoint message for job " +
- message.getJob() + " while this coordinator handles job " + job);
- }
- final long checkpointId = message.getCheckpointId();
- final String reason = (message.getReason() != null ? message.getReason().getMessage() : "");
-
- PendingCheckpoint checkpoint;
- synchronized (lock) {
- // we need to check inside the lock for being shutdown as well, otherwise we
- // get races and invalid error log messages
- if (shutdown) {
- return;
- }
- checkpoint = pendingCheckpoints.remove(checkpointId);
- if (checkpoint != null && !checkpoint.isDiscarded()) {
- LOG.info("Decline checkpoint {} by task {} of job {}.", checkpointId, message.getTaskExecutionId(), job);
- discardCheckpoint(checkpoint, message.getReason());
- }
- else if (checkpoint != null) {
- // this should not happen
- throw new IllegalStateException("Received message for discarded but non-removed checkpoint " + checkpointId);
- }
- else if (LOG.isDebugEnabled()) {
- if (recentPendingCheckpoints.contains(checkpointId)) {
- // message is for an unknown checkpoint, or comes too late (checkpoint disposed)
- LOG.debug("Received another decline message for now expired checkpoint attempt {} of job {} : {}", checkpointId, job, reason);
- } else {
- // message is for an unknown checkpoint. might be so old that we don't even remember it any more
- LOG.debug("Received decline message for unknown (too old?) checkpoint attempt {} of job {} : {}", checkpointId, job, reason);
- }
- }
- }
- }
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。