調用StreamExecutionEnvironment
的enableCheckpointing
方法,interval
間隔須要大於等於10msapp
public StreamExecutionEnvironment enableCheckpointing(long interval) { checkpointCfg.setCheckpointInterval(interval); return this; }
JobGraphGenerator
構建JobGraph
的過程當中會生成三個List<JobVertexID>
類型的節點列表:異步
CheckpointCoordinator
發送的triggerCheckpoint
請求acknowledge
確認消息CheckpointCoordinator
會調用notifyCheckpointComplete
通知這些實例節點該檢查點已經所有完成若是用戶啓用了checkpoint,則CheckpointCoordinator
的定時任務會週期性的生成新的checkpoint id
並調用與triggerVertices
對應的CheckpointCoordinator.tasksToTrigger
中各節點的triggerCheckpoint
方法,該方法經過RPC方式觸發TaskExecutor->triggerCheckpoint
,而後TaskExecutor
會找到對應的Task
並調用其triggerCheckpointBarrier
方法,在此方法中會異步調用StreamTask
的triggerCheckpoint
方法。而後各Task節點checkpoint執行完成後會遠程調用acknowledgeCheckpoint
通知CheckpointCoordinator
,若是該checkpoint全部節點都已經確認完成則CheckpointCoordinator
會調用tasksToCommitTo
中各節點的notifyCheckpointComplete
方法通知各節點該檢查點已經成功完成。async
triggerCheckpoint
方法:ide
final CheckpointOptions checkpointOptions = new CheckpointOptions( props.getCheckpointType(), checkpointStorageLocation.getLocationReference()); // send the messages to the tasks that trigger their checkpoint for (Execution execution: executions) { execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions); }
Execution.triggerCheckpoint
方法會經過RPC
方式調用TaskExecutor.triggerCheckpoint
方法:ui
/** * Trigger a new checkpoint on the task of this execution. * * @param checkpointId of th checkpoint to trigger * @param timestamp of the checkpoint to trigger * @param checkpointOptions of the checkpoint to trigger */ public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) { final LogicalSlot slot = assignedResource; if (slot != null) { final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions); } else { LOG.debug("The execution has no slot assigned. This indicates that the execution is " + "no longer running."); } }
triggerCheckpoint
方法會調用Task.triggerCheckpointBarrier
方法:this
@Override public CompletableFuture<Acknowledge> triggerCheckpoint( ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp, CheckpointOptions checkpointOptions) { log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID); final Task task = taskSlotTable.getTask(executionAttemptID); if (task != null) { task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions); return CompletableFuture.completedFuture(Acknowledge.get()); } else { final String message = "TaskManager received a checkpoint request for unknown task " + executionAttemptID + '.'; log.debug(message); return FutureUtils.completedExceptionally(new CheckpointException(message)); } }
triggerCheckpointBarrier
方法異步調用StreamTask.triggerCheckpoint
方法:atom
if (executionState == ExecutionState.RUNNING && invokable != null) { // build a local closure final String taskName = taskNameWithSubtask; final SafetyNetCloseableRegistry safetyNetCloseableRegistry = FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread(); Runnable runnable = new Runnable() { @Override public void run() { // set safety net from the task's context for checkpointing thread LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName()); FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry); try { boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions); if (!success) { checkpointResponder.declineCheckpoint( getJobID(), getExecutionId(), checkpointID, new CheckpointDeclineTaskNotReadyException(taskName)); } } catch (Throwable t) { if (getExecutionState() == ExecutionState.RUNNING) { failExternally(new Exception( "Error while triggering checkpoint " + checkpointID + " for " + taskNameWithSubtask, t)); } else { LOG.debug("Encountered error while triggering checkpoint {} for " + "{} ({}) while being not in state running.", checkpointID, taskNameWithSubtask, executionId, t); } } finally { FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null); } } }; executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId)); }
private boolean performCheckpoint( CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception { LOG.debug("Starting checkpoint ({}) {} on task {}", checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName()); synchronized (lock) { if (isRunning) { // we can do a checkpoint // All of the following steps happen as an atomic step from the perspective of barriers and // records/watermarks/timers/callbacks. // We generally try to emit the checkpoint barrier as soon as possible to not affect downstream // checkpoint alignments // Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work. // The pre-barrier work should be nothing or minimal in the common case. operatorChain.prepareSnapshotPreBarrier(checkpointMetaData.getCheckpointId()); // Step (2): Send the checkpoint barrier downstream operatorChain.broadcastCheckpointBarrier( checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), checkpointOptions); // Step (3): Take the state snapshot. This should be largely asynchronous, to not // impact progress of the streaming topology checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics); return true; }
AbstractUdfStreamOperator
的snapshotState
方法:debug
@Override public void snapshotState(StateSnapshotContext context) throws Exception { super.snapshotState(context); StreamingFunctionUtils.snapshotFunctionState(context, getOperatorStateBackend(), userFunction); }
而後會根據userFunction
實現的是CheckpointedFunction
仍是ListCheckpointed
接口執行對應的方法:code
private static boolean trySnapshotFunctionState( StateSnapshotContext context, OperatorStateBackend backend, Function userFunction) throws Exception { if (userFunction instanceof CheckpointedFunction) { ((CheckpointedFunction) userFunction).snapshotState(context); return true; } if (userFunction instanceof ListCheckpointed) { @SuppressWarnings("unchecked") List<Serializable> partitionableState = ((ListCheckpointed<Serializable>) userFunction). snapshotState(context.getCheckpointId(), context.getCheckpointTimestamp()); ListState<Serializable> listState = backend. getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME); listState.clear(); if (null != partitionableState) { try { for (Serializable statePartition : partitionableState) { listState.add(statePartition); } } catch (Exception e) { listState.clear(); throw new Exception("Could not write partitionable state to operator " + "state backend.", e); } } return true; } return false; }
StreamTask
與StreamOperator
交互使用StreamTask.lock
對象進行同步,保證checkpoint的一致性調用。orm
/** * Gets the lock object on which all operations that involve data and state mutation have to lock. * @return The checkpoint lock object. */ public Object getCheckpointLock() { return lock; } @Override protected void run() throws Exception { headOperator.run(getCheckpointLock(), getStreamStatusMaintainer()); }
StreamSource.run
方法:
this.ctx = StreamSourceContexts.getSourceContext( timeCharacteristic, getProcessingTimeService(), lockingObject, streamStatusMaintainer, collector, watermarkInterval, -1); try { userFunction.run(ctx); // if we get here, then the user function either exited after being done (finite source) // or the function was canceled or stopped. For the finite source case, we should emit // a final watermark that indicates that we reached the end of event-time if (!isCanceledOrStopped()) { ctx.emitWatermark(Watermark.MAX_WATERMARK); }
所以若是SourceFunction
須要checkpoint(實現了CheckpointedFunction
或者ListCheckpointed
)則必須在run方法中使用synchronized (ctx.getCheckpointLock())
進行同步,相似下面這樣:
public void run(SourceContext<T> ctx) { while (isRunning && count < 1000) { // this synchronized block ensures that state checkpointing, // internal state updates and emission of elements are an atomic operation synchronized (ctx.getCheckpointLock()) { ctx.collect(count); count++; } } }
@Override protected void run() throws Exception { // cache processor reference on the stack, to make the code more JIT friendly final StreamInputProcessor<IN> inputProcessor = this.inputProcessor; while (running && inputProcessor.processInput()) { // all the work happens in the "processInput" method } }
StreamInputProcessor.processInput
方法保證了全部用戶自定義方法的調用都在lock
同步塊內:
} else if (recordOrMark.isLatencyMarker()) { // handle latency marker synchronized (lock) { streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker()); } continue; } else { // now we can do the actual processing StreamRecord<IN> record = recordOrMark.asRecord(); synchronized (lock) { numRecordsIn.inc(); streamOperator.setKeyContextElement1(record); streamOperator.processElement(record); } return true; }
StreamMap.processElement
:
@Override public void processElement(StreamRecord<IN> element) throws Exception { output.collect(element.replace(userFunction.map(element.getValue()))); }
非Source
類型的Function自定義方法中不須要再進行額外的checkpoint鎖同步。