本文主要研究一下flink的CheckpointSchedulerhtml
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.javajava
/** * This actor listens to changes in the JobStatus and activates or deactivates the periodic * checkpoint scheduler. */ public class CheckpointCoordinatorDeActivator implements JobStatusListener { private final CheckpointCoordinator coordinator; public CheckpointCoordinatorDeActivator(CheckpointCoordinator coordinator) { this.coordinator = checkNotNull(coordinator); } @Override public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) { if (newJobStatus == JobStatus.RUNNING) { // start the checkpoint scheduler coordinator.startCheckpointScheduler(); } else { // anything else should stop the trigger for now coordinator.stopCheckpointScheduler(); } } }
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.javaapache
/** * The checkpoint coordinator coordinates the distributed snapshots of operators and state. * It triggers the checkpoint by sending the messages to the relevant tasks and collects the * checkpoint acknowledgements. It also collects and maintains the overview of the state handles * reported by the tasks that acknowledge the checkpoint. */ public class CheckpointCoordinator { /** Map from checkpoint ID to the pending checkpoint */ private final Map<Long, PendingCheckpoint> pendingCheckpoints; /** The number of consecutive failed trigger attempts */ private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger(0); //...... public void startCheckpointScheduler() { synchronized (lock) { if (shutdown) { throw new IllegalArgumentException("Checkpoint coordinator is shut down"); } // make sure all prior timers are cancelled stopCheckpointScheduler(); periodicScheduling = true; long initialDelay = ThreadLocalRandom.current().nextLong( minPauseBetweenCheckpointsNanos / 1_000_000L, baseInterval + 1L); currentPeriodicTrigger = timer.scheduleAtFixedRate( new ScheduledTrigger(), initialDelay, baseInterval, TimeUnit.MILLISECONDS); } } public void stopCheckpointScheduler() { synchronized (lock) { triggerRequestQueued = false; periodicScheduling = false; if (currentPeriodicTrigger != null) { currentPeriodicTrigger.cancel(false); currentPeriodicTrigger = null; } for (PendingCheckpoint p : pendingCheckpoints.values()) { p.abortError(new Exception("Checkpoint Coordinator is suspending.")); } pendingCheckpoints.clear(); numUnsuccessfulCheckpointsTriggers.set(0); } } private final class ScheduledTrigger implements Runnable { @Override public void run() { try { triggerCheckpoint(System.currentTimeMillis(), true); } catch (Exception e) { LOG.error("Exception while triggering checkpoint for job {}.", job, e); } } } //...... }
Map<Long, PendingCheckpoint>
)以及numUnsuccessfulCheckpointsTriggers(AtomicInteger
)flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.javaapi
/** * The checkpoint coordinator coordinates the distributed snapshots of operators and state. * It triggers the checkpoint by sending the messages to the relevant tasks and collects the * checkpoint acknowledgements. It also collects and maintains the overview of the state handles * reported by the tasks that acknowledge the checkpoint. */ public class CheckpointCoordinator { /** Tasks who need to be sent a message when a checkpoint is started */ private final ExecutionVertex[] tasksToTrigger; /** Tasks who need to acknowledge a checkpoint before it succeeds */ private final ExecutionVertex[] tasksToWaitFor; /** Map from checkpoint ID to the pending checkpoint */ private final Map<Long, PendingCheckpoint> pendingCheckpoints; /** The maximum number of checkpoints that may be in progress at the same time */ private final int maxConcurrentCheckpointAttempts; /** The min time(in ns) to delay after a checkpoint could be triggered. Allows to * enforce minimum processing time between checkpoint attempts */ private final long minPauseBetweenCheckpointsNanos; /** * Triggers a new standard checkpoint and uses the given timestamp as the checkpoint * timestamp. * * @param timestamp The timestamp for the checkpoint. * @param isPeriodic Flag indicating whether this triggered checkpoint is * periodic. If this flag is true, but the periodic scheduler is disabled, * the checkpoint will be declined. * @return <code>true</code> if triggering the checkpoint succeeded. */ public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) { return triggerCheckpoint(timestamp, checkpointProperties, null, isPeriodic).isSuccess(); } @VisibleForTesting public CheckpointTriggerResult triggerCheckpoint( long timestamp, CheckpointProperties props, @Nullable String externalSavepointLocation, boolean isPeriodic) { // make some eager pre-checks synchronized (lock) { // abort if the coordinator has been shutdown in the meantime if (shutdown) { return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN); } // Don't allow periodic checkpoint if scheduling has been disabled if (isPeriodic && !periodicScheduling) { return new CheckpointTriggerResult(CheckpointDeclineReason.PERIODIC_SCHEDULER_SHUTDOWN); } // validate whether the checkpoint can be triggered, with respect to the limit of // concurrent checkpoints, and the minimum time between checkpoints. // these checks are not relevant for savepoints if (!props.forceCheckpoint()) { // sanity check: there should never be more than one trigger request queued if (triggerRequestQueued) { LOG.warn("Trying to trigger another checkpoint for job {} while one was queued already.", job); return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED); } // if too many checkpoints are currently in progress, we need to mark that a request is queued if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) { triggerRequestQueued = true; if (currentPeriodicTrigger != null) { currentPeriodicTrigger.cancel(false); currentPeriodicTrigger = null; } return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS); } // make sure the minimum interval between checkpoints has passed final long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos; final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000; if (durationTillNextMillis > 0) { if (currentPeriodicTrigger != null) { currentPeriodicTrigger.cancel(false); currentPeriodicTrigger = null; } // Reassign the new trigger to the currentPeriodicTrigger currentPeriodicTrigger = timer.scheduleAtFixedRate( new ScheduledTrigger(), durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS); return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS); } } } // check if all tasks that we need to trigger are running. // if not, abort the checkpoint Execution[] executions = new Execution[tasksToTrigger.length]; for (int i = 0; i < tasksToTrigger.length; i++) { Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt(); if (ee == null) { LOG.info("Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.", tasksToTrigger[i].getTaskNameWithSubtaskIndex(), job); return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING); } else if (ee.getState() == ExecutionState.RUNNING) { executions[i] = ee; } else { LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.", tasksToTrigger[i].getTaskNameWithSubtaskIndex(), job, ExecutionState.RUNNING, ee.getState()); return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING); } } // next, check if all tasks that need to acknowledge the checkpoint are running. // if not, abort the checkpoint Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(tasksToWaitFor.length); for (ExecutionVertex ev : tasksToWaitFor) { Execution ee = ev.getCurrentExecutionAttempt(); if (ee != null) { ackTasks.put(ee.getAttemptId(), ev); } else { LOG.info("Checkpoint acknowledging task {} of job {} is not being executed at the moment. Aborting checkpoint.", ev.getTaskNameWithSubtaskIndex(), job); return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING); } } // we will actually trigger this checkpoint! // we lock with a special lock to make sure that trigger requests do not overtake each other. // this is not done with the coordinator-wide lock, because the 'checkpointIdCounter' // may issue blocking operations. Using a different lock than the coordinator-wide lock, // we avoid blocking the processing of 'acknowledge/decline' messages during that time. synchronized (triggerLock) { final CheckpointStorageLocation checkpointStorageLocation; final long checkpointID; try { // this must happen outside the coordinator-wide lock, because it communicates // with external services (in HA mode) and may block for a while. checkpointID = checkpointIdCounter.getAndIncrement(); checkpointStorageLocation = props.isSavepoint() ? checkpointStorage.initializeLocationForSavepoint(checkpointID, externalSavepointLocation) : checkpointStorage.initializeLocationForCheckpoint(checkpointID); } catch (Throwable t) { int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet(); LOG.warn("Failed to trigger checkpoint for job {} ({} consecutive failed attempts so far).", job, numUnsuccessful, t); return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION); } final PendingCheckpoint checkpoint = new PendingCheckpoint( job, checkpointID, timestamp, ackTasks, props, checkpointStorageLocation, executor); if (statsTracker != null) { PendingCheckpointStats callback = statsTracker.reportPendingCheckpoint( checkpointID, timestamp, props); checkpoint.setStatsCallback(callback); } // schedule the timer that will clean up the expired checkpoints final Runnable canceller = () -> { synchronized (lock) { // only do the work if the checkpoint is not discarded anyways // note that checkpoint completion discards the pending checkpoint object if (!checkpoint.isDiscarded()) { LOG.info("Checkpoint {} of job {} expired before completing.", checkpointID, job); checkpoint.abortExpired(); pendingCheckpoints.remove(checkpointID); rememberRecentCheckpointId(checkpointID); triggerQueuedRequests(); } } }; try { // re-acquire the coordinator-wide lock synchronized (lock) { // since we released the lock in the meantime, we need to re-check // that the conditions still hold. if (shutdown) { return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN); } else if (!props.forceCheckpoint()) { if (triggerRequestQueued) { LOG.warn("Trying to trigger another checkpoint for job {} while one was queued already.", job); return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED); } if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) { triggerRequestQueued = true; if (currentPeriodicTrigger != null) { currentPeriodicTrigger.cancel(false); currentPeriodicTrigger = null; } return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS); } // make sure the minimum interval between checkpoints has passed final long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos; final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000; if (durationTillNextMillis > 0) { if (currentPeriodicTrigger != null) { currentPeriodicTrigger.cancel(false); currentPeriodicTrigger = null; } // Reassign the new trigger to the currentPeriodicTrigger currentPeriodicTrigger = timer.scheduleAtFixedRate( new ScheduledTrigger(), durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS); return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS); } } LOG.info("Triggering checkpoint {} @ {} for job {}.", checkpointID, timestamp, job); pendingCheckpoints.put(checkpointID, checkpoint); ScheduledFuture<?> cancellerHandle = timer.schedule( canceller, checkpointTimeout, TimeUnit.MILLISECONDS); if (!checkpoint.setCancellerHandle(cancellerHandle)) { // checkpoint is already disposed! cancellerHandle.cancel(false); } // trigger the master hooks for the checkpoint final List<MasterState> masterStates = MasterHooks.triggerMasterHooks(masterHooks.values(), checkpointID, timestamp, executor, Time.milliseconds(checkpointTimeout)); for (MasterState s : masterStates) { checkpoint.addMasterState(s); } } // end of lock scope final CheckpointOptions checkpointOptions = new CheckpointOptions( props.getCheckpointType(), checkpointStorageLocation.getLocationReference()); // send the messages to the tasks that trigger their checkpoint for (Execution execution: executions) { execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions); } numUnsuccessfulCheckpointsTriggers.set(0); return new CheckpointTriggerResult(checkpoint); } catch (Throwable t) { // guard the map against concurrent modifications synchronized (lock) { pendingCheckpoints.remove(checkpointID); } int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet(); LOG.warn("Failed to trigger checkpoint {} for job {}. ({} consecutive failed attempts so far)", checkpointID, job, numUnsuccessful, t); if (!checkpoint.isDiscarded()) { checkpoint.abortError(new Exception("Failed to trigger checkpoint", t)); } try { checkpointStorageLocation.disposeOnFailure(); } catch (Throwable t2) { LOG.warn("Cannot dispose failed checkpoint storage location {}", checkpointStorageLocation, t2); } return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION); } } // end trigger lock } //...... }
觸發checkpoint的時候須要通知到的task
)是否都處於RUNNING狀態,不是的話則馬上fail fast,返回CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING)須要在執行成功的時候ack checkpoint的任務
)是否都處於RUNNING狀態,不是的話馬上fail fast,返回CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING)用於在失效的時候執行abort操做
);以後對於不是forceCheckpoint的,再從新來一輪TOO_MANY_CONCURRENT_CHECKPOINTS、MINIMUM_TIME_BETWEEN_CHECKPOINTS校驗flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/executiongraph/Execution.javaapp
public class Execution implements AccessExecution, Archiveable<ArchivedExecution>, LogicalSlot.Payload { /** * Trigger a new checkpoint on the task of this execution. * * @param checkpointId of th checkpoint to trigger * @param timestamp of the checkpoint to trigger * @param checkpointOptions of the checkpoint to trigger */ public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) { final LogicalSlot slot = assignedResource; if (slot != null) { final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions); } else { LOG.debug("The execution has no slot assigned. This indicates that the execution is " + "no longer running."); } } //...... }
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.javadom
/** * Implementation of the {@link TaskManagerGateway} for Flink's RPC system. */ public class RpcTaskManagerGateway implements TaskManagerGateway { private final TaskExecutorGateway taskExecutorGateway; public void triggerCheckpoint(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp, CheckpointOptions checkpointOptions) { taskExecutorGateway.triggerCheckpoint( executionAttemptID, checkpointId, timestamp, checkpointOptions); } //...... }
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskExecutor.java異步
/** * TaskExecutor implementation. The task executor is responsible for the execution of multiple * {@link Task}. */ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { public CompletableFuture<Acknowledge> triggerCheckpoint( ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp, CheckpointOptions checkpointOptions) { log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID); final Task task = taskSlotTable.getTask(executionAttemptID); if (task != null) { task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions); return CompletableFuture.completedFuture(Acknowledge.get()); } else { final String message = "TaskManager received a checkpoint request for unknown task " + executionAttemptID + '.'; log.debug(message); return FutureUtils.completedExceptionally(new CheckpointException(message)); } } //...... }
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/taskmanager/Task.javaasync
public class Task implements Runnable, TaskActions, CheckpointListener { /** The invokable of this task, if initialized. All accesses must copy the reference and * check for null, as this field is cleared as part of the disposal logic. */ @Nullable private volatile AbstractInvokable invokable; /** * Calls the invokable to trigger a checkpoint. * * @param checkpointID The ID identifying the checkpoint. * @param checkpointTimestamp The timestamp associated with the checkpoint. * @param checkpointOptions Options for performing this checkpoint. */ public void triggerCheckpointBarrier( final long checkpointID, long checkpointTimestamp, final CheckpointOptions checkpointOptions) { final AbstractInvokable invokable = this.invokable; final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp); if (executionState == ExecutionState.RUNNING && invokable != null) { // build a local closure final String taskName = taskNameWithSubtask; final SafetyNetCloseableRegistry safetyNetCloseableRegistry = FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread(); Runnable runnable = new Runnable() { @Override public void run() { // set safety net from the task's context for checkpointing thread LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName()); FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry); try { boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions); if (!success) { checkpointResponder.declineCheckpoint( getJobID(), getExecutionId(), checkpointID, new CheckpointDeclineTaskNotReadyException(taskName)); } } catch (Throwable t) { if (getExecutionState() == ExecutionState.RUNNING) { failExternally(new Exception( "Error while triggering checkpoint " + checkpointID + " for " + taskNameWithSubtask, t)); } else { LOG.debug("Encountered error while triggering checkpoint {} for " + "{} ({}) while being not in state running.", checkpointID, taskNameWithSubtask, executionId, t); } } finally { FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null); } } }; executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId)); } else { LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId); // send back a message that we did not do the checkpoint checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID, new CheckpointDeclineTaskNotReadyException(taskNameWithSubtask)); } } //...... }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.javaide
@Internal public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends StreamSource<OUT, SRC>> extends StreamTask<OUT, OP> { private volatile boolean externallyInducedCheckpoints; @Override public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception { if (!externallyInducedCheckpoints) { return super.triggerCheckpoint(checkpointMetaData, checkpointOptions); } else { // we do not trigger checkpoints here, we simply state whether we can trigger them synchronized (getCheckpointLock()) { return isRunning(); } } } //...... }
@Internal public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends AbstractInvokable implements AsyncExceptionHandler { @Override public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception { try { // No alignment if we inject a checkpoint CheckpointMetrics checkpointMetrics = new CheckpointMetrics() .setBytesBufferedInAlignment(0L) .setAlignmentDurationNanos(0L); return performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics); } catch (Exception e) { // propagate exceptions only if the task is still in "running" state if (isRunning) { throw new Exception("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() + " for operator " + getName() + '.', e); } else { LOG.debug("Could not perform checkpoint {} for operator {} while the " + "invokable was not in state running.", checkpointMetaData.getCheckpointId(), getName(), e); return false; } } } private boolean performCheckpoint( CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception { LOG.debug("Starting checkpoint ({}) {} on task {}", checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName()); synchronized (lock) { if (isRunning) { // we can do a checkpoint // All of the following steps happen as an atomic step from the perspective of barriers and // records/watermarks/timers/callbacks. // We generally try to emit the checkpoint barrier as soon as possible to not affect downstream // checkpoint alignments // Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work. // The pre-barrier work should be nothing or minimal in the common case. operatorChain.prepareSnapshotPreBarrier(checkpointMetaData.getCheckpointId()); // Step (2): Send the checkpoint barrier downstream operatorChain.broadcastCheckpointBarrier( checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), checkpointOptions); // Step (3): Take the state snapshot. This should be largely asynchronous, to not // impact progress of the streaming topology checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics); return true; } else { // we cannot perform our checkpoint - let the downstream operators know that they // should not wait for any input from this operator // we cannot broadcast the cancellation markers on the 'operator chain', because it may not // yet be created final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId()); Exception exception = null; for (StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> streamRecordWriter : streamRecordWriters) { try { streamRecordWriter.broadcastEvent(message); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed( new Exception("Could not send cancel checkpoint marker to downstream tasks.", e), exception); } } if (exception != null) { throw exception; } return false; } } } private void checkpointState( CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception { CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorageLocation( checkpointMetaData.getCheckpointId(), checkpointOptions.getTargetLocation()); CheckpointingOperation checkpointingOperation = new CheckpointingOperation( this, checkpointMetaData, checkpointOptions, storage, checkpointMetrics); checkpointingOperation.executeCheckpointing(); } //...... }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/OperatorChain.javaui
@Internal public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements StreamStatusMaintainer { public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { // go forward through the operator chain and tell each operator // to prepare the checkpoint final StreamOperator<?>[] operators = this.allOperators; for (int i = operators.length - 1; i >= 0; --i) { final StreamOperator<?> op = operators[i]; if (op != null) { op.prepareSnapshotPreBarrier(checkpointId); } } } //...... }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@Internal public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements StreamStatusMaintainer { public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException { CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions); for (RecordWriterOutput<?> streamOutput : streamOutputs) { streamOutput.broadcastEvent(barrier); } } //...... }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/StreamTask.java
private static final class CheckpointingOperation { private final StreamTask<?, ?> owner; private final CheckpointMetaData checkpointMetaData; private final CheckpointOptions checkpointOptions; private final CheckpointMetrics checkpointMetrics; private final CheckpointStreamFactory storageLocation; private final StreamOperator<?>[] allOperators; private long startSyncPartNano; private long startAsyncPartNano; // ------------------------ private final Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress; public CheckpointingOperation( StreamTask<?, ?> owner, CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointStreamFactory checkpointStorageLocation, CheckpointMetrics checkpointMetrics) { this.owner = Preconditions.checkNotNull(owner); this.checkpointMetaData = Preconditions.checkNotNull(checkpointMetaData); this.checkpointOptions = Preconditions.checkNotNull(checkpointOptions); this.checkpointMetrics = Preconditions.checkNotNull(checkpointMetrics); this.storageLocation = Preconditions.checkNotNull(checkpointStorageLocation); this.allOperators = owner.operatorChain.getAllOperators(); this.operatorSnapshotsInProgress = new HashMap<>(allOperators.length); } public void executeCheckpointing() throws Exception { startSyncPartNano = System.nanoTime(); try { for (StreamOperator<?> op : allOperators) { checkpointStreamOperator(op); } if (LOG.isDebugEnabled()) { LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}", checkpointMetaData.getCheckpointId(), owner.getName()); } startAsyncPartNano = System.nanoTime(); checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000); // we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable( owner, operatorSnapshotsInProgress, checkpointMetaData, checkpointMetrics, startAsyncPartNano); owner.cancelables.registerCloseable(asyncCheckpointRunnable); owner.asyncOperationsThreadPool.submit(asyncCheckpointRunnable); if (LOG.isDebugEnabled()) { LOG.debug("{} - finished synchronous part of checkpoint {}. " + "Alignment duration: {} ms, snapshot duration {} ms", owner.getName(), checkpointMetaData.getCheckpointId(), checkpointMetrics.getAlignmentDurationNanos() / 1_000_000, checkpointMetrics.getSyncDurationMillis()); } } catch (Exception ex) { // Cleanup to release resources for (OperatorSnapshotFutures operatorSnapshotResult : operatorSnapshotsInProgress.values()) { if (null != operatorSnapshotResult) { try { operatorSnapshotResult.cancel(); } catch (Exception e) { LOG.warn("Could not properly cancel an operator snapshot result.", e); } } } if (LOG.isDebugEnabled()) { LOG.debug("{} - did NOT finish synchronous part of checkpoint {}. " + "Alignment duration: {} ms, snapshot duration {} ms", owner.getName(), checkpointMetaData.getCheckpointId(), checkpointMetrics.getAlignmentDurationNanos() / 1_000_000, checkpointMetrics.getSyncDurationMillis()); } owner.synchronousCheckpointExceptionHandler.tryHandleCheckpointException(checkpointMetaData, ex); } } @SuppressWarnings("deprecation") private void checkpointStreamOperator(StreamOperator<?> op) throws Exception { if (null != op) { OperatorSnapshotFutures snapshotInProgress = op.snapshotState( checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), checkpointOptions, storageLocation); operatorSnapshotsInProgress.put(op.getOperatorID(), snapshotInProgress); } } private enum AsyncCheckpointState { RUNNING, DISCARDED, COMPLETED } }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@PublicEvolving public abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT>, Serializable { @Override public final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, CheckpointStreamFactory factory) throws Exception { KeyGroupRange keyGroupRange = null != keyedStateBackend ? keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE; OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures(); try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl( checkpointId, timestamp, factory, keyGroupRange, getContainingTask().getCancelables())) { snapshotState(snapshotContext); snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture()); snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture()); if (null != operatorStateBackend) { snapshotInProgress.setOperatorStateManagedFuture( operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions)); } if (null != keyedStateBackend) { snapshotInProgress.setKeyedStateManagedFuture( keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions)); } } catch (Exception snapshotException) { try { snapshotInProgress.cancel(); } catch (Exception e) { snapshotException.addSuppressed(e); } String snapshotFailMessage = "Could not complete snapshot " + checkpointId + " for operator " + getOperatorName() + "."; if (!getContainingTask().isCanceled()) { LOG.info(snapshotFailMessage, snapshotException); } throw new Exception(snapshotFailMessage, snapshotException); } return snapshotInProgress; } /** * Stream operators with state, which want to participate in a snapshot need to override this hook method. * * @param context context that provides information and means required for taking a snapshot */ public void snapshotState(StateSnapshotContext context) throws Exception { final KeyedStateBackend<?> keyedStateBackend = getKeyedStateBackend(); //TODO all of this can be removed once heap-based timers are integrated with RocksDB incremental snapshots if (keyedStateBackend instanceof AbstractKeyedStateBackend && ((AbstractKeyedStateBackend<?>) keyedStateBackend).requiresLegacySynchronousTimerSnapshots()) { KeyedStateCheckpointOutputStream out; try { out = context.getRawKeyedOperatorStateOutput(); } catch (Exception exception) { throw new Exception("Could not open raw keyed operator state stream for " + getOperatorName() + '.', exception); } try { KeyGroupsList allKeyGroups = out.getKeyGroupList(); for (int keyGroupIdx : allKeyGroups) { out.startNewKeyGroup(keyGroupIdx); timeServiceManager.snapshotStateForKeyGroup( new DataOutputViewStreamWrapper(out), keyGroupIdx); } } catch (Exception exception) { throw new Exception("Could not write timer service of " + getOperatorName() + " to checkpoint state stream.", exception); } finally { try { out.close(); } catch (Exception closeException) { LOG.warn("Could not close raw keyed operator state stream for {}. This " + "might have prevented deleting some state data.", getOperatorName(), closeException); } } } } //...... }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@PublicEvolving public abstract class AbstractUdfStreamOperator<OUT, F extends Function> extends AbstractStreamOperator<OUT> implements OutputTypeConfigurable<OUT> { @Override public void snapshotState(StateSnapshotContext context) throws Exception { super.snapshotState(context); StreamingFunctionUtils.snapshotFunctionState(context, getOperatorStateBackend(), userFunction); } //...... }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java
@Internal public final class StreamingFunctionUtils { public static void snapshotFunctionState( StateSnapshotContext context, OperatorStateBackend backend, Function userFunction) throws Exception { Preconditions.checkNotNull(context); Preconditions.checkNotNull(backend); while (true) { if (trySnapshotFunctionState(context, backend, userFunction)) { break; } // inspect if the user function is wrapped, then unwrap and try again if we can snapshot the inner function if (userFunction instanceof WrappingFunction) { userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction(); } else { break; } } } private static boolean trySnapshotFunctionState( StateSnapshotContext context, OperatorStateBackend backend, Function userFunction) throws Exception { if (userFunction instanceof CheckpointedFunction) { ((CheckpointedFunction) userFunction).snapshotState(context); return true; } if (userFunction instanceof ListCheckpointed) { @SuppressWarnings("unchecked") List<Serializable> partitionableState = ((ListCheckpointed<Serializable>) userFunction). snapshotState(context.getCheckpointId(), context.getCheckpointTimestamp()); ListState<Serializable> listState = backend. getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME); listState.clear(); if (null != partitionableState) { try { for (Serializable statePartition : partitionableState) { listState.add(statePartition); } } catch (Exception e) { listState.clear(); throw new Exception("Could not write partitionable state to operator " + "state backend.", e); } } return true; } return false; } //...... }
若是實現了CheckpointedFunction接口,則調用CheckpointedFunction.snapshotState,若是實現了ListCheckpointed接口,則調用ListCheckpointed.snapshotState方法
)