在Flink原理——容錯機制一文中,已對checkpoint的機制有了較爲基礎的介紹,本文着重從源碼方面去分析checkpoint的過程。固然本文只是分析作checkpoint的調度過程,只是儘可能弄清楚總體的邏輯,沒有弄清楚其實現細節,仍是有遺憾的,後期仍是努力去分析實現細節。文中如果有誤,歡迎大夥留言指出!html
本文基於Flink1.9。java
1 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 2 env.enableCheckpointing(10000); //默認是不開啓的 3 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //默認爲EXACTLY_ONCE 4 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000); //默認爲0,最大值爲1年 5 env.getCheckpointConfig().setCheckpointTimeout(150000); //默認爲10min 6 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //默認爲1
上述參數的默認值可見flink-streaming-java*.jar中的CheckpointConfig.java,配置值是經過該類中私有configureCheckpointing()的jobGraph.setSnapshotSettings(settings)傳遞給runtime層的,更多設置也能夠參見該類。併發
這裏着重分析enableCheckpointing()設置的baseInterval和minPauseBetweenCheckpoint之間的關係。爲分析二者的關係,這裏先給出源碼中定義app
1 /** The base checkpoint interval. Actual trigger time may be affected by the 2 * max concurrent checkpoints and minimum-pause values */ 3 //checkpoint觸發週期,時間觸發時間還受maxConcurrentCheckpointAttempts和minPauseBetweenCheckpointsNanos影響 4 private final long baseInterval; 5 6 /** The min time(in ns) to delay after a checkpoint could be triggered. Allows to 7 * enforce minimum processing time between checkpoint attempts */ 8 //在能夠觸發checkpoint的時,兩次checkpoint之間的時間間隔 9 private final long minPauseBetweenCheckpointsNanos;
當baseInterval<minPauseBetweenCheckpoint時,在CheckpointCoordinator.java源碼中定義以下:異步
1 // it does not make sense to schedule checkpoints more often then the desired 2 // time between checkpoints 3 long baseInterval = chkConfig.getCheckpointInterval(); 4 if (baseInterval < minPauseBetweenCheckpoints) { 5 baseInterval = minPauseBetweenCheckpoints; 6 }
今後能夠看出,checkpoint的觸發雖然設置爲週期性的,可是實際觸發狀況,還得考慮minPauseBetweenCheckpoint和maxConcurrentCheckpointAttempts,若maxConcurrentCheckpointAttempts爲1,就算知足觸發時間也需等待正在執行的checkpoint結束。async
將JobGraph提交到Dispatcher後,會createJobManagerRunner和startJobManagerRunner,能夠關注Dispatcher類中的createJobManagerRunner(...)方法。ide
該階段會建立一個JobManagerRunner實例,在該過程和checkpoint有關的是會啓動listener去監聽job的狀態。ui
1 #JobManagerRunner.java 2 public JobManagerRunner(...) throws Exception { 3 4 //.......... 5 6 // make sure we cleanly shut down out JobManager services if initialization fails 7 try { 8 //.......... 9 //加載JobGraph、library、leader選舉等 10 11 // now start the JobManager 12 //啓動JobManager 13 this.jobMasterService = jobMasterFactory.createJobMasterService(jobGraph, this, userCodeLoader); 14 } 15 catch (Throwable t) { 16 //...... 17 } 18 } 19 20 //在DefaultJobMasterServiceFactory類的createJobMasterService()中新建一個JobMaster對象 21 //#JobMaster.java 22 public JobMaster(...) throws Exception { 23 24 //........ 25 //該方法中主要作了參數檢查,slotPool的建立、slotPool的schedul的建立等一系列的事情 26 27 //建立一個調度器 28 this.schedulerNG = createScheduler(jobManagerJobMetricGroup); 29 //...... 30 }
在建立調度器中核心的語句以下:this
1 //#LegacyScheduler.java中的LegacyScheduler() 2 //建立ExecutionGraph 3 this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup, checkNotNull(shuffleMaster), checkNotNull(partitionTracker)); 4 5 6 private ExecutionGraph createAndRestoreExecutionGraph( 7 JobManagerJobMetricGroup currentJobManagerJobMetricGroup, 8 ShuffleMaster<?> shuffleMaster, 9 PartitionTracker partitionTracker) throws Exception { 10 11 12 ExecutionGraph newExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup, shuffleMaster, partitionTracker); 13 14 final CheckpointCoordinator checkpointCoordinator = newExecutionGraph.getCheckpointCoordinator(); 15 16 if (checkpointCoordinator != null) { 17 // check whether we find a valid checkpoint 18 //若state沒有被恢復是否能夠經過savepoint恢復 19 //...... 20 } 21 } 22 23 return newExecutionGraph; 24 }
經過調用到達生成ExecutionGraph的核心類ExecutionGraphBuilder的在buildGraph()方法,其中該方法主要是生成ExecutionGraph和設置checkpoint,下面給出其中的核心代碼:atom
1 //.............. 2 //生成ExecutionGraph的核心方法,這裏後期會詳細分析 3 executionGraph.attachJobGraph(sortedTopology); 4 5 //....................... 6 7 //在enableCheckpointing中設置CheckpointCoordinator 8 executionGraph.enableCheckpointing( 9 chkConfig, 10 triggerVertices, 11 ackVertices, 12 confirmVertices, 13 hooks, 14 checkpointIdCounter, 15 completedCheckpoints, 16 rootBackend, 17 checkpointStatsTracker);
在enableCheckpointing()方法中主要是建立了checkpoint失敗是的manager、設置了checkpoint的核心類CheckpointCoordinator。
1 //#ExecutionGraph.java 2 public void enableCheckpointing( 3 CheckpointCoordinatorConfiguration chkConfig, 4 List<ExecutionJobVertex> verticesToTrigger, 5 List<ExecutionJobVertex> verticesToWaitFor, 6 List<ExecutionJobVertex> verticesToCommitTo, 7 List<MasterTriggerRestoreHook<?>> masterHooks, 8 CheckpointIDCounter checkpointIDCounter, 9 CompletedCheckpointStore checkpointStore, 10 StateBackend checkpointStateBackend, 11 CheckpointStatsTracker statsTracker) { 12 //Job的狀態必須爲Created, 13 checkState(state == JobStatus.CREATED, "Job must be in CREATED state"); 14 checkState(checkpointCoordinator == null, "checkpointing already enabled"); 15 //checkpointing的不一樣狀態 16 ExecutionVertex[] tasksToTrigger = collectExecutionVertices(verticesToTrigger); 17 ExecutionVertex[] tasksToWaitFor = collectExecutionVertices(verticesToWaitFor); 18 ExecutionVertex[] tasksToCommitTo = collectExecutionVertices(verticesToCommitTo); 19 20 checkpointStatsTracker = checkNotNull(statsTracker, "CheckpointStatsTracker"); 21 //checkpoint失敗manager,如果checkpoint失敗會根據設置來決定下一步 22 CheckpointFailureManager failureManager = new CheckpointFailureManager( 23 chkConfig.getTolerableCheckpointFailureNumber(), 24 new CheckpointFailureManager.FailJobCallback() { 25 @Override 26 public void failJob(Throwable cause) { 27 getJobMasterMainThreadExecutor().execute(() -> failGlobal(cause)); 28 } 29 30 @Override 31 public void failJobDueToTaskFailure(Throwable cause, ExecutionAttemptID failingTask) { 32 getJobMasterMainThreadExecutor().execute(() -> failGlobalIfExecutionIsStillRunning(cause, failingTask)); 33 } 34 } 35 ); 36 37 // create the coordinator that triggers and commits checkpoints and holds the state 38 //checkpoint的核心類CheckpointCoordinator 39 checkpointCoordinator = new CheckpointCoordinator( 40 jobInformation.getJobId(), 41 chkConfig, 42 tasksToTrigger, 43 tasksToWaitFor, 44 tasksToCommitTo, 45 checkpointIDCounter, 46 checkpointStore, 47 checkpointStateBackend, 48 ioExecutor, 49 SharedStateRegistry.DEFAULT_FACTORY, 50 failureManager); 51 52 // register the master hooks on the checkpoint coordinator 53 for (MasterTriggerRestoreHook<?> hook : masterHooks) { 54 if (!checkpointCoordinator.addMasterHook(hook)) { 55 LOG.warn("Trying to register multiple checkpoint hooks with the name: {}", hook.getIdentifier()); 56 } 57 } 58 //checkpoint統計 59 checkpointCoordinator.setCheckpointStatsTracker(checkpointStatsTracker); 60 61 // interval of max long value indicates disable periodic checkpoint, 62 // the CheckpointActivatorDeactivator should be created only if the interval is not max value 63 //設置爲Long.MAX_VALUE標識關閉週期性的checkpoint 64 if (chkConfig.getCheckpointInterval() != Long.MAX_VALUE) { 65 // the periodic checkpoint scheduler is activated and deactivated as a result of 66 // job status changes (running -> on, all other states -> off) 67 //只有在job的狀態爲running時,纔會開啓checkpoint的scheduler 68 //createActivatorDeactivator()建立一個listener監聽器 69 //registerJobStatusListener()將listener加入監聽器集合jobStatusListeners中 70 registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator()); 71 } 72 } 73 74 75 //#CheckpointCoordinator.java 76 / ------------------------------------------------------------------------ 77 // job status listener that schedules / cancels periodic checkpoints 78 // ------------------------------------------------------------------------ 79 //建立一個listener監聽器checkpointCoordinator.createActivatorDeactivator() 80 public JobStatusListener createActivatorDeactivator() { 81 synchronized (lock) { 82 if (shutdown) { 83 throw new IllegalArgumentException("Checkpoint coordinator is shut down"); 84 } 85 86 if (jobStatusListener == null) { 87 jobStatusListener = new CheckpointCoordinatorDeActivator(this); 88 } 89 90 return jobStatusListener; 91 } 92 }
至此,createJobManagerRunner階段結束了,ExecutionGraph中checkpoint的配置就設置好了。
在該階段中,在得到leaderShip以後,就會啓動startJobExecution,這裏只給出調用涉及的類和方法:
1 //#JobManagerRunner.java類中 2 //grantLeadership(...)==>verifyJobSchedulingStatusAndStartJobManager(...) 3 //==>startJobMaster(...),該方法中核心代碼爲 4 startFuture = jobMasterService.start(new JobMasterId(leaderSessionId)); 5 6 //進一步調用#JobMaster.java類中的start()==>startJobExecution(...)
startJobExecution()方法是JobMaster類中的私有方法,具體代碼分析以下:
1 //---------------------------------------------------------------------------------------------- 2 // Internal methods 3 //---------------------------------------------------------------------------------------------- 4 5 //-- job starting and stopping ----------------------------------------------------------------- 6 7 private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception { 8 9 validateRunsInMainThread(); 10 11 checkNotNull(newJobMasterId, "The new JobMasterId must not be null."); 12 13 if (Objects.equals(getFencingToken(), newJobMasterId)) { 14 log.info("Already started the job execution with JobMasterId {}.", newJobMasterId); 15 16 return Acknowledge.get(); 17 } 18 19 setNewFencingToken(newJobMasterId); 20 //啓動slotPool並申請資源,該方法能夠具體看看申請資源的過程 21 startJobMasterServices(); 22 23 log.info("Starting execution of job {} ({}) under job master id {}.", jobGraph.getName(), jobGraph.getJobID(), newJobMasterId); 24 //執行ExecuteGraph的切入口,先判斷job的狀態是否爲created的,後調執行executionGraph.scheduleForExecution(); 25 resetAndStartScheduler(); 26 27 return Acknowledge.get(); 28 }
在LegacyScheduler類中的方法scheduleForExecution()調度過程以下:
1 public void scheduleForExecution() throws JobException { 2 3 assertRunningInJobMasterMainThread(); 4 5 final long currentGlobalModVersion = globalModVersion; 6 //任務執行以前進行狀態切換從CREATED到RUNNING, 7 //transitionState(...)方法中會經過notifyJobStatusChange(newState, error)通知jobStatusListeners集合中listeners狀態改變 8 if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) { 9 //根據啓動算子調度模式不一樣,採用不一樣的調度方案 10 final CompletableFuture<Void> newSchedulingFuture = SchedulingUtils.schedule( 11 scheduleMode, 12 getAllExecutionVertices(), 13 this); 14 15 //.............. 16 } 17 else { 18 throw new IllegalStateException("Job may only be scheduled from state " + JobStatus.CREATED); 19 } 20 } 21 22 private void notifyJobStatusChange(JobStatus newState, Throwable error) { 23 if (jobStatusListeners.size() > 0) { 24 final long timestamp = System.currentTimeMillis(); 25 final Throwable serializedError = error == null ? null : new SerializedThrowable(error); 26 27 for (JobStatusListener listener : jobStatusListeners) { 28 try { 29 listener.jobStatusChanges(getJobID(), newState, timestamp, serializedError); 30 } catch (Throwable t) { 31 LOG.warn("Error while notifying JobStatusListener", t); 32 } 33 } 34 } 35 } 36 37 38 //#CheckpointCoordinatorDeActivator.java 39 public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) { 40 if (newJobStatus == JobStatus.RUNNING) { 41 // start the checkpoint scheduler 42 //觸發checkpoint的核心方法 43 coordinator.startCheckpointScheduler(); 44 } else { 45 // anything else should stop the trigger for now 46 coordinator.stopCheckpointScheduler(); 47 } 48 }
下面具體分析觸發checkpoint的核心方法startCheckpointScheduler()。
startCheckpointScheduler()方法結合註釋仍是比較好理解的,但因爲方法太長這裏就不所有貼出來了,先分析一下大體作什麼了,而後給出其核心代碼:
1)檢查觸發checkpoint的條件。如coordinator被關閉、週期性checkpoint被禁止、在沒有開啓強制checkpoint的狀況下沒有達到最小的checkpoint間隔以及超過併發的checkpoint個數等;
2)檢查是否全部須要checkpoint和須要響應checkpoint的ACK(的task都處於running狀態,不然拋出異常;
3)若均符合,執行checkpointID = checkpointIdCounter.getAndIncrement();以生成一個新的checkpointID,而後生成一個PendingCheckpoint。其中,PendingCheckpoint僅是一個啓動了的checkpoint,可是尚未被確認,直到全部的task都確認了本次checkpoint,該checkpoint對象才轉化爲一個CompletedCheckpoint;
4)調度timer清理失敗的checkpoint;
5)定義一個超時callback,若是checkpoint執行了好久還沒完成,就把它取消;
6)觸發MasterHooks,用戶能夠定義一些額外的操做,用以加強checkpoint的功能(如準備和清理外部資源);
核心代碼以下:
1 // send the messages to the tasks that trigger their checkpoint 2 //遍歷ExecutionVertex,是否異步觸發checkpoint 3 for (Execution execution: executions) { 4 if (props.isSynchronous()) { 5 execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime); 6 } else { 7 execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions); 8 } 9 }
不論是否以異步的方式觸發checkpoint,最終調用的方法是Execution類中的私有方法triggerCheckpointHelper(...),具體代碼以下:
1 //Execution.java 2 private void triggerCheckpointHelper(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) { 3 4 final CheckpointType checkpointType = checkpointOptions.getCheckpointType(); 5 if (advanceToEndOfEventTime && !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) { 6 throw new IllegalArgumentException("Only synchronous savepoints are allowed to advance the watermark to MAX."); 7 } 8 9 final LogicalSlot slot = assignedResource; 10 11 if (slot != null) { 12 //TaskManagerGateway是用於與taskManager通訊的組件 13 final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); 14 15 taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime); 16 } else { 17 LOG.debug("The execution has no slot assigned. This indicates that the execution is no longer running."); 18 } 19 }
至此,checkpointCoordinator就將作checkpoint的命令發送到TaskManager去了,下面着重分析TM中checkpoint的執行過程。
TaskManager 接收到觸發checkpoint的RPC後,會觸發生成checkpoint barrier。RpcTaskManagerGateway做爲消息入口,其triggerCheckpoint(...)會調用TaskExecutor的triggerCheckpoint(...),具體過程以下:
1 //RpcTaskManagerGateway.java 2 public void triggerCheckpoint(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) { 3 taskExecutorGateway.triggerCheckpoint( 4 executionAttemptID, 5 checkpointId, 6 timestamp, 7 checkpointOptions, 8 advanceToEndOfEventTime); 9 } 10 11 //TaskExecutor.java 12 @Override 13 public CompletableFuture<Acknowledge> triggerCheckpoint( 14 ExecutionAttemptID executionAttemptID, 15 long checkpointId, 16 long checkpointTimestamp, 17 CheckpointOptions checkpointOptions, 18 boolean advanceToEndOfEventTime) { 19 log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID); 20 21 //........... 22 23 if (task != null) { 24 //核心方法,觸發生成barrier 25 task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions, advanceToEndOfEventTime); 26 27 return CompletableFuture.completedFuture(Acknowledge.get()); 28 } else { 29 final String message = "TaskManager received a checkpoint request for unknown task " + executionAttemptID + '.'; 30 31 //......... 32 } 33 }
在Task類的triggerCheckpointBarrier(...)方法中生成了一個Runable匿名類用於執行checkpoint,而後以異步的方式觸發了該Runable,具體代碼以下:
1 public void triggerCheckpointBarrier( 2 final long checkpointID, 3 final long checkpointTimestamp, 4 final CheckpointOptions checkpointOptions, 5 final boolean advanceToEndOfEventTime) { 6 7 final AbstractInvokable invokable = this.invokable; 8 //建立一個CheckpointMetaData,該對象僅有checkpointID、checkpointTimestamp兩個屬性 9 final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp); 10 11 if (executionState == ExecutionState.RUNNING && invokable != null) { 12 13 //.............. 14 15 Runnable runnable = new Runnable() { 16 @Override 17 public void run() { 18 // set safety net from the task's context for checkpointing thread 19 LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName()); 20 FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry); 21 22 try { 23 //根據SourceStreamTask和StreamTask調用不一樣的方法 24 boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime); 25 if (!success) { 26 checkpointResponder.declineCheckpoint( 27 getJobID(), getExecutionId(), checkpointID, 28 new CheckpointException("Task Name" + taskName, CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY)); 29 } 30 } 31 catch (Throwable t) { 32 if (getExecutionState() == ExecutionState.RUNNING) { 33 failExternally(new Exception( 34 "Error while triggering checkpoint " + checkpointID + " for " + 35 taskNameWithSubtask, t)); 36 } else { 37 LOG.debug("Encountered error while triggering checkpoint {} for " + 38 "{} ({}) while being not in state running.", checkpointID, 39 taskNameWithSubtask, executionId, t); 40 } 41 } finally { 42 FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null); 43 } 44 } 45 }; 46 //以異步的方式觸發Runnable 47 executeAsyncCallRunnable( 48 runnable, 49 String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId)); 50 } 51 else { 52 LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId); 53 54 // send back a message that we did not do the checkpoint 55 checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID, 56 new CheckpointException("Task name with subtask : " + taskNameWithSubtask, CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY)); 57 } 58 }
SourceStreamTask和StreamTask調用triggerCheckpoint最終都是調用StreamTask類中的triggerCheckpoint(...)方法,其核心代碼爲:
1 //#StreamTask.java 2 return performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics, advanceToEndOfEventTime);
在performCheckpoint(...)方法中,主要有如下兩件事:
一、若task是running,則能夠進行checkpoint,主要有如下三件事:
1)爲checkpoint作準備,通常是什麼不作的,直接接受checkpoint;
2)生成barrier,並以廣播的形式發射到下游去;
3)觸發本task保存state;
二、若不是running,通知下游取消本次checkpoint,方法是發送一個CancelCheckpointMarker,這是相似於Barrier的另外一種消息。
具體代碼以下:
1 //#StreamTask.java 2 private boolean performCheckpoint( 3 CheckpointMetaData checkpointMetaData, 4 CheckpointOptions checkpointOptions, 5 CheckpointMetrics checkpointMetrics, 6 boolean advanceToEndOfTime) throws Exception { 7 //...... 8 9 synchronized (lock) { 10 if (isRunning) { 11 12 if (checkpointOptions.getCheckpointType().isSynchronous()) { 13 syncSavepointLatch.setCheckpointId(checkpointId); 14 15 if (advanceToEndOfTime) { 16 advanceToEndOfEventTime(); 17 } 18 } 19 20 // All of the following steps happen as an atomic step from the perspective of barriers and 21 // records/watermarks/timers/callbacks. 22 // We generally try to emit the checkpoint barrier as soon as possible to not affect downstream 23 // checkpoint alignments 24 25 // Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work. 26 // The pre-barrier work should be nothing or minimal in the common case. 27 operatorChain.prepareSnapshotPreBarrier(checkpointId); 28 29 // Step (2): Send the checkpoint barrier downstream 30 operatorChain.broadcastCheckpointBarrier( 31 checkpointId, 32 checkpointMetaData.getTimestamp(), 33 checkpointOptions); 34 35 // Step (3): Take the state snapshot. This should be largely asynchronous, to not 36 // impact progress of the streaming topology 37 checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics); 38 39 return true; 40 } 41 else { 42 //....... 43 } 44 } 45 }
接下來分析checkpointState(...)過程。
checkpointState(...)方法最終會調用StreamTask類中executeCheckpointing(),其中會建立一個異步對象AsyncCheckpointRunnable,用以報告該檢查點已完成,關鍵代碼以下:
1 //#StreamTask.java類中executeCheckpointing() 2 public void executeCheckpointing() throws Exception { 3 startSyncPartNano = System.nanoTime(); 4 5 try { 6 //調用StreamOperator進行snapshotState的入口方法,依算子不一樣而變 7 for (StreamOperator<?> op : allOperators) { 8 checkpointStreamOperator(op); 9 } 10 //......... 11 12 // we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit 13 AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable( 14 owner, 15 operatorSnapshotsInProgress, 16 checkpointMetaData, 17 checkpointMetrics, 18 startAsyncPartNano); 19 20 owner.cancelables.registerCloseable(asyncCheckpointRunnable); 21 owner.asyncOperationsThreadPool.execute(asyncCheckpointRunnable); 22 23 //......... 24 } catch (Exception ex) { 25 //....... 26 } 27 }
進入AsyncCheckpointRunnable(...)中的run()方法,其中會調用StreamTask類中reportCompletedSnapshotStates(...)(對於一個無狀態的job返回的null),進而調用TaskStateManagerImpl類中的reportTaskStateSnapshots(...)將TM的checkpoint彙報給JM,關鍵代碼以下:
1 //TaskStateManagerImpl.java 2 checkpointResponder.acknowledgeCheckpoint( 3 jobId, 4 executionAttemptID, 5 checkpointId, 6 checkpointMetrics, 7 acknowledgedState);
其邏輯是邏輯是經過rpc的方式遠程調JobManager的相關方法完成報告事件。
經過RpcCheckpointResponder類中acknowledgeCheckpoint(...)來響應checkpoint返回的消息,該方法以後的調度過程和涉及的核心方法以下:
1 //#JobMaster類中acknowledgeCheckpoint==> 2 //#LegacyScheduler類中acknowledgeCheckpoint==> 3 //#CheckpointCoordinator類中receiveAcknowledgeMessage(...)==> 4 //completePendingCheckpoint(checkpoint); 5 6 //<p>Important: This method should only be called in the checkpoint lock scope 7 private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) throws CheckpointException { 8 final long checkpointId = pendingCheckpoint.getCheckpointId(); 9 final CompletedCheckpoint completedCheckpoint; 10 11 // As a first step to complete the checkpoint, we register its state with the registry 12 Map<OperatorID, OperatorState> operatorStates = pendingCheckpoint.getOperatorStates(); 13 sharedStateRegistry.registerAll(operatorStates.values()); 14 15 try { 16 try { 17 //完成checkpoint 18 completedCheckpoint = pendingCheckpoint.finalizeCheckpoint(); 19 failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointId()); 20 } 21 catch (Exception e1) { 22 // abort the current pending checkpoint if we fails to finalize the pending checkpoint. 23 if (!pendingCheckpoint.isDiscarded()) { 24 failPendingCheckpoint(pendingCheckpoint, CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1); 25 } 26 27 throw new CheckpointException("Could not finalize the pending checkpoint " + checkpointId + '.', 28 CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1); 29 } 30 31 // the pending checkpoint must be discarded after the finalization 32 Preconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint != null); 33 34 try { 35 //添加新的checkpoints,如有必要(completedCheckpoints.size() > maxNumberOfCheckpointsToRetain)刪除舊的 36 completedCheckpointStore.addCheckpoint(completedCheckpoint); 37 } catch (Exception exception) { 38 // we failed to store the completed checkpoint. Let's clean up 39 executor.execute(new Runnable() { 40 @Override 41 public void run() { 42 try { 43 completedCheckpoint.discardOnFailedStoring(); 44 } catch (Throwable t) { 45 LOG.warn("Could not properly discard completed checkpoint {}.", completedCheckpoint.getCheckpointID(), t); 46 } 47 } 48 }); 49 50 throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.', 51 CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, exception); 52 } 53 } finally { 54 pendingCheckpoints.remove(checkpointId); 55 56 triggerQueuedRequests(); 57 } 58 59 rememberRecentCheckpointId(checkpointId); 60 61 // drop those pending checkpoints that are at prior to the completed one 62 //刪除在其以前未完成的checkpoint(優先級高的) 63 dropSubsumedCheckpoints(checkpointId); 64 65 // record the time when this was completed, to calculate 66 // the 'min delay between checkpoints' 67 lastCheckpointCompletionNanos = System.nanoTime(); 68 69 LOG.info("Completed checkpoint {} for job {} ({} bytes in {} ms).", checkpointId, job, 70 completedCheckpoint.getStateSize(), completedCheckpoint.getDuration()); 71 72 if (LOG.isDebugEnabled()) { 73 StringBuilder builder = new StringBuilder(); 74 builder.append("Checkpoint state: "); 75 for (OperatorState state : completedCheckpoint.getOperatorStates().values()) { 76 builder.append(state); 77 builder.append(", "); 78 } 79 // Remove last two chars ", " 80 builder.setLength(builder.length() - 2); 81 82 LOG.debug(builder.toString()); 83 } 84 85 // send the "notify complete" call to all vertices 86 final long timestamp = completedCheckpoint.getTimestamp(); 87 88 //通知全部(TM中)operator該checkpoint已完成 89 for (ExecutionVertex ev : tasksToCommitTo) { 90 Execution ee = ev.getCurrentExecutionAttempt(); 91 if (ee != null) { 92 ee.notifyCheckpointComplete(checkpointId, timestamp); 93 } 94 } 95 }
至此,checkpoint的總體流程分析完畢建議結合原理去理解,參考的三篇文獻都是寫的很好的,有時間建議看看。
Ref:
[1]https://www.jianshu.com/p/a40a1b92f6a2
[2]http://www.javashuo.com/article/p-khdfegoi-mh.html
[3] https://blog.csdn.net/qq475781638/article/details/92698301