Flink源碼閱讀(二)——checkpoint源碼分析

前言

  在Flink原理——容錯機制一文中,已對checkpoint的機制有了較爲基礎的介紹,本文着重從源碼方面去分析checkpoint的過程。固然本文只是分析作checkpoint的調度過程,只是儘可能弄清楚總體的邏輯,沒有弄清楚其實現細節,仍是有遺憾的,後期仍是努力去分析實現細節。文中如果有誤,歡迎大夥留言指出html

  本文基於Flink1.9。java

一、參數設置

  1.1 有關checkpoint常見的參數以下:

1 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 2 env.enableCheckpointing(10000); //默認是不開啓的   3 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //默認爲EXACTLY_ONCE 4 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);  //默認爲0,最大值爲1年 5 env.getCheckpointConfig().setCheckpointTimeout(150000);  //默認爲10min 6 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);  //默認爲1

   上述參數的默認值可見flink-streaming-java*.jar中的CheckpointConfig.java,配置值是經過該類中私有configureCheckpointing()的jobGraph.setSnapshotSettings(settings)傳遞給runtime層的,更多設置也能夠參見該類。併發

  1.2 參數分析

  這裏着重分析enableCheckpointing()設置的baseInterval和minPauseBetweenCheckpoint之間的關係。爲分析二者的關係,這裏先給出源碼中定義app

1     /** The base checkpoint interval. Actual trigger time may be affected by the 2  * max concurrent checkpoints and minimum-pause values */
3     //checkpoint觸發週期,時間觸發時間還受maxConcurrentCheckpointAttempts和minPauseBetweenCheckpointsNanos影響
4     private final long baseInterval; 5     
6     /** The min time(in ns) to delay after a checkpoint could be triggered. Allows to 7  * enforce minimum processing time between checkpoint attempts */
8     //在能夠觸發checkpoint的時,兩次checkpoint之間的時間間隔
9     private final long minPauseBetweenCheckpointsNanos;

   當baseInterval<minPauseBetweenCheckpoint時,在CheckpointCoordinator.java源碼中定義以下:異步

1     // it does not make sense to schedule checkpoints more often then the desired 2     // time between checkpoints
3     long baseInterval = chkConfig.getCheckpointInterval(); 4     if (baseInterval < minPauseBetweenCheckpoints) { 5         baseInterval = minPauseBetweenCheckpoints; 6     }

   今後能夠看出,checkpoint的觸發雖然設置爲週期性的,可是實際觸發狀況,還得考慮minPauseBetweenCheckpoint和maxConcurrentCheckpointAttempts,若maxConcurrentCheckpointAttempts爲1,就算知足觸發時間也需等待正在執行的checkpoint結束。async

二、checkpoint調用過程

  將JobGraph提交到Dispatcher後,會createJobManagerRunner和startJobManagerRunner,能夠關注Dispatcher類中的createJobManagerRunner(...)方法。ide

  2.1 createJobManagerRunner階段

  該階段會建立一個JobManagerRunner實例,在該過程和checkpoint有關的是會啓動listener去監聽job的狀態。ui

 1   #JobManagerRunner.java  2     public JobManagerRunner(...) throws Exception {  3 
 4         //..........  5 
 6         // make sure we cleanly shut down out JobManager services if initialization fails
 7         try {  8             //..........  9             //加載JobGraph、library、leader選舉等 10 
11             // now start the JobManager 12             //啓動JobManager
13             this.jobMasterService = jobMasterFactory.createJobMasterService(jobGraph, this, userCodeLoader); 14  } 15         catch (Throwable t) { 16             //......
17  } 18  } 19     
20     //在DefaultJobMasterServiceFactory類的createJobMasterService()中新建一個JobMaster對象 21     //#JobMaster.java
22     public JobMaster(...) throws Exception { 23 
24         //........ 25         //該方法中主要作了參數檢查,slotPool的建立、slotPool的schedul的建立等一系列的事情 26         
27         //建立一個調度器
28         this.schedulerNG = createScheduler(jobManagerJobMetricGroup); 29         //......
30     }

   在建立調度器中核心的語句以下:this

 1   //#LegacyScheduler.java中的LegacyScheduler()  2     //建立ExecutionGraph
 3     this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup, checkNotNull(shuffleMaster), checkNotNull(partitionTracker));  4   
 5 
 6     private ExecutionGraph createAndRestoreExecutionGraph(  7  JobManagerJobMetricGroup currentJobManagerJobMetricGroup,  8         ShuffleMaster<?> shuffleMaster,  9         PartitionTracker partitionTracker) throws Exception { 10 
11         
12         ExecutionGraph newExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup, shuffleMaster, partitionTracker); 13 
14         final CheckpointCoordinator checkpointCoordinator = newExecutionGraph.getCheckpointCoordinator(); 15 
16         if (checkpointCoordinator != null) { 17             // check whether we find a valid checkpoint 18             //若state沒有被恢復是否能夠經過savepoint恢復 19             //......
20  } 21  } 22 
23         return newExecutionGraph; 24     }

   經過調用到達生成ExecutionGraph的核心類ExecutionGraphBuilder的在buildGraph()方法,其中該方法主要是生成ExecutionGraph和設置checkpoint,下面給出其中的核心代碼:atom

 1     //..............  2     //生成ExecutionGraph的核心方法,這裏後期會詳細分析
 3  executionGraph.attachJobGraph(sortedTopology);  4     
 5     //.......................  6         
 7     //在enableCheckpointing中設置CheckpointCoordinator
 8  executionGraph.enableCheckpointing(  9  chkConfig, 10  triggerVertices, 11  ackVertices, 12  confirmVertices, 13  hooks, 14  checkpointIdCounter, 15  completedCheckpoints, 16  rootBackend, 17         checkpointStatsTracker);    

   在enableCheckpointing()方法中主要是建立了checkpoint失敗是的manager、設置了checkpoint的核心類CheckpointCoordinator。

 1     //#ExecutionGraph.java
 2     public void enableCheckpointing(  3  CheckpointCoordinatorConfiguration chkConfig,  4             List<ExecutionJobVertex> verticesToTrigger,  5             List<ExecutionJobVertex> verticesToWaitFor,  6             List<ExecutionJobVertex> verticesToCommitTo,  7             List<MasterTriggerRestoreHook<?>> masterHooks,  8  CheckpointIDCounter checkpointIDCounter,  9  CompletedCheckpointStore checkpointStore, 10  StateBackend checkpointStateBackend, 11  CheckpointStatsTracker statsTracker) { 12         //Job的狀態必須爲Created,
13         checkState(state == JobStatus.CREATED, "Job must be in CREATED state"); 14         checkState(checkpointCoordinator == null, "checkpointing already enabled"); 15         //checkpointing的不一樣狀態
16         ExecutionVertex[] tasksToTrigger = collectExecutionVertices(verticesToTrigger); 17         ExecutionVertex[] tasksToWaitFor = collectExecutionVertices(verticesToWaitFor); 18         ExecutionVertex[] tasksToCommitTo = collectExecutionVertices(verticesToCommitTo); 19 
20         checkpointStatsTracker = checkNotNull(statsTracker, "CheckpointStatsTracker"); 21         //checkpoint失敗manager,如果checkpoint失敗會根據設置來決定下一步
22         CheckpointFailureManager failureManager = new CheckpointFailureManager( 23  chkConfig.getTolerableCheckpointFailureNumber(), 24             new CheckpointFailureManager.FailJobCallback() { 25  @Override 26                 public void failJob(Throwable cause) { 27                     getJobMasterMainThreadExecutor().execute(() -> failGlobal(cause)); 28  } 29 
30  @Override 31                 public void failJobDueToTaskFailure(Throwable cause, ExecutionAttemptID failingTask) { 32                     getJobMasterMainThreadExecutor().execute(() -> failGlobalIfExecutionIsStillRunning(cause, failingTask)); 33  } 34  } 35  ); 36 
37         // create the coordinator that triggers and commits checkpoints and holds the state 38         //checkpoint的核心類CheckpointCoordinator
39         checkpointCoordinator = new CheckpointCoordinator( 40  jobInformation.getJobId(), 41  chkConfig, 42  tasksToTrigger, 43  tasksToWaitFor, 44  tasksToCommitTo, 45  checkpointIDCounter, 46  checkpointStore, 47  checkpointStateBackend, 48  ioExecutor, 49  SharedStateRegistry.DEFAULT_FACTORY, 50  failureManager); 51 
52         // register the master hooks on the checkpoint coordinator
53         for (MasterTriggerRestoreHook<?> hook : masterHooks) { 54             if (!checkpointCoordinator.addMasterHook(hook)) { 55                 LOG.warn("Trying to register multiple checkpoint hooks with the name: {}", hook.getIdentifier()); 56  } 57  } 58         //checkpoint統計
59  checkpointCoordinator.setCheckpointStatsTracker(checkpointStatsTracker); 60 
61         // interval of max long value indicates disable periodic checkpoint, 62         // the CheckpointActivatorDeactivator should be created only if the interval is not max value 63         //設置爲Long.MAX_VALUE標識關閉週期性的checkpoint
64         if (chkConfig.getCheckpointInterval() != Long.MAX_VALUE) { 65             // the periodic checkpoint scheduler is activated and deactivated as a result of 66             // job status changes (running -> on, all other states -> off) 67             //只有在job的狀態爲running時,纔會開啓checkpoint的scheduler 68             //createActivatorDeactivator()建立一個listener監聽器 69             //registerJobStatusListener()將listener加入監聽器集合jobStatusListeners中
70  registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator()); 71  } 72  } 73     
74     
75     //#CheckpointCoordinator.java
76     / ------------------------------------------------------------------------
77     // job status listener that schedules / cancels periodic checkpoints 78     // ------------------------------------------------------------------------ 79     //建立一個listener監聽器checkpointCoordinator.createActivatorDeactivator()
80     public JobStatusListener createActivatorDeactivator() { 81         synchronized (lock) { 82             if (shutdown) { 83                 throw new IllegalArgumentException("Checkpoint coordinator is shut down"); 84  } 85 
86             if (jobStatusListener == null) { 87                 jobStatusListener = new CheckpointCoordinatorDeActivator(this); 88  } 89 
90             return jobStatusListener; 91  } 92     }

   至此,createJobManagerRunner階段結束了,ExecutionGraph中checkpoint的配置就設置好了。

  2.2 startJobManagerRunner階段

  在該階段中,在得到leaderShip以後,就會啓動startJobExecution,這裏只給出調用涉及的類和方法:

1     //#JobManagerRunner.java類中 2     //grantLeadership(...)==>verifyJobSchedulingStatusAndStartJobManager(...) 3     //==>startJobMaster(...),該方法中核心代碼爲
4     startFuture = jobMasterService.start(new JobMasterId(leaderSessionId)); 5     
6     //進一步調用#JobMaster.java類中的start()==>startJobExecution(...)    

   startJobExecution()方法是JobMaster類中的私有方法,具體代碼分析以下:

 1   //----------------------------------------------------------------------------------------------  2     // Internal methods  3     //----------------------------------------------------------------------------------------------  4 
 5     //-- job starting and stopping -----------------------------------------------------------------
 6 
 7     private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception {  8 
 9  validateRunsInMainThread(); 10 
11         checkNotNull(newJobMasterId, "The new JobMasterId must not be null."); 12 
13         if (Objects.equals(getFencingToken(), newJobMasterId)) { 14             log.info("Already started the job execution with JobMasterId {}.", newJobMasterId); 15 
16             return Acknowledge.get(); 17  } 18 
19  setNewFencingToken(newJobMasterId); 20         //啓動slotPool並申請資源,該方法能夠具體看看申請資源的過程
21  startJobMasterServices(); 22 
23         log.info("Starting execution of job {} ({}) under job master id {}.", jobGraph.getName(), jobGraph.getJobID(), newJobMasterId); 24         //執行ExecuteGraph的切入口,先判斷job的狀態是否爲created的,後調執行executionGraph.scheduleForExecution();
25  resetAndStartScheduler(); 26 
27         return Acknowledge.get(); 28     }

   在LegacyScheduler類中的方法scheduleForExecution()調度過程以下:

 1     public void scheduleForExecution() throws JobException {
 2 
 3         assertRunningInJobMasterMainThread();
 4 
 5         final long currentGlobalModVersion = globalModVersion;
 6         //任務執行以前進行狀態切換從CREATED到RUNNING,
 7         //transitionState(...)方法中會經過notifyJobStatusChange(newState, error)通知jobStatusListeners集合中listeners狀態改變
 8         if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
 9             //根據啓動算子調度模式不一樣,採用不一樣的調度方案
10             final CompletableFuture<Void> newSchedulingFuture = SchedulingUtils.schedule(
11                 scheduleMode,
12                 getAllExecutionVertices(),
13                 this);
14             
15             //..............
16         }
17         else {
18             throw new IllegalStateException("Job may only be scheduled from state " + JobStatus.CREATED);
19         }
20     }
21     
22     private void notifyJobStatusChange(JobStatus newState, Throwable error) {
23         if (jobStatusListeners.size() > 0) {
24             final long timestamp = System.currentTimeMillis();
25             final Throwable serializedError = error == null ? null : new SerializedThrowable(error);
26 
27             for (JobStatusListener listener : jobStatusListeners) {
28                 try {
29                     listener.jobStatusChanges(getJobID(), newState, timestamp, serializedError);
30                 } catch (Throwable t) {
31                     LOG.warn("Error while notifying JobStatusListener", t);
32                 }
33             }
34         }
35     }
36     
37     
38     //#CheckpointCoordinatorDeActivator.java
39     public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
40         if (newJobStatus == JobStatus.RUNNING) {
41             // start the checkpoint scheduler
42             //觸發checkpoint的核心方法
43             coordinator.startCheckpointScheduler();
44         } else {
45             // anything else should stop the trigger for now
46             coordinator.stopCheckpointScheduler();
47         }
48     }

   下面具體分析觸發checkpoint的核心方法startCheckpointScheduler()。

  startCheckpointScheduler()方法結合註釋仍是比較好理解的,但因爲方法太長這裏就不所有貼出來了,先分析一下大體作什麼了,而後給出其核心代碼:

  1)檢查觸發checkpoint的條件。如coordinator被關閉、週期性checkpoint被禁止、在沒有開啓強制checkpoint的狀況下沒有達到最小的checkpoint間隔以及超過併發的checkpoint個數等;

  2)檢查是否全部須要checkpoint和須要響應checkpoint的ACK(的task都處於running狀態,不然拋出異常;

  3)若均符合,執行checkpointID = checkpointIdCounter.getAndIncrement();以生成一個新的checkpointID,而後生成一個PendingCheckpoint。其中,PendingCheckpoint僅是一個啓動了的checkpoint,可是尚未被確認,直到全部的task都確認了本次checkpoint,該checkpoint對象才轉化爲一個CompletedCheckpoint;

  4)調度timer清理失敗的checkpoint;

  5)定義一個超時callback,若是checkpoint執行了好久還沒完成,就把它取消;

  6)觸發MasterHooks,用戶能夠定義一些額外的操做,用以加強checkpoint的功能(如準備和清理外部資源);

  核心代碼以下:

1     // send the messages to the tasks that trigger their checkpoint 2     //遍歷ExecutionVertex,是否異步觸發checkpoint
3     for (Execution execution: executions) { 4         if (props.isSynchronous()) { 5  execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime); 6         } else { 7  execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions); 8  } 9     }

   不論是否以異步的方式觸發checkpoint,最終調用的方法是Execution類中的私有方法triggerCheckpointHelper(...),具體代碼以下:

 1   //Execution.java  2     private void triggerCheckpointHelper(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) {  3 
 4         final CheckpointType checkpointType = checkpointOptions.getCheckpointType();  5         if (advanceToEndOfEventTime && !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {  6             throw new IllegalArgumentException("Only synchronous savepoints are allowed to advance the watermark to MAX.");  7  }  8 
 9         final LogicalSlot slot = assignedResource; 10 
11         if (slot != null) { 12             //TaskManagerGateway是用於與taskManager通訊的組件
13             final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); 14 
15  taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime); 16         } else { 17             LOG.debug("The execution has no slot assigned. This indicates that the execution is no longer running."); 18  } 19     }

   至此,checkpointCoordinator就將作checkpoint的命令發送到TaskManager去了,下面着重分析TM中checkpoint的執行過程。

  2.3 TaskManager中checkpoint

  TaskManager 接收到觸發checkpoint的RPC後,會觸發生成checkpoint barrier。RpcTaskManagerGateway做爲消息入口,其triggerCheckpoint(...)會調用TaskExecutor的triggerCheckpoint(...),具體過程以下:

 1   //RpcTaskManagerGateway.java
 2     public void triggerCheckpoint(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) {  3  taskExecutorGateway.triggerCheckpoint(  4  executionAttemptID,  5  checkpointId,  6  timestamp,  7  checkpointOptions,  8  advanceToEndOfEventTime);  9  } 10     
11     //TaskExecutor.java
12  @Override 13     public CompletableFuture<Acknowledge> triggerCheckpoint( 14  ExecutionAttemptID executionAttemptID, 15             long checkpointId, 16             long checkpointTimestamp, 17  CheckpointOptions checkpointOptions, 18             boolean advanceToEndOfEventTime) { 19         log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID); 20 
21         //...........
22 
23         if (task != null) { 24             //核心方法,觸發生成barrier
25  task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions, advanceToEndOfEventTime); 26 
27             return CompletableFuture.completedFuture(Acknowledge.get()); 28         } else { 29             final String message = "TaskManager received a checkpoint request for unknown task " + executionAttemptID + '.'; 30 
31             //.........
32  } 33     }

   在Task類的triggerCheckpointBarrier(...)方法中生成了一個Runable匿名類用於執行checkpoint,而後以異步的方式觸發了該Runable,具體代碼以下:

 1     public void triggerCheckpointBarrier(  2             final long checkpointID,  3             final long checkpointTimestamp,  4             final CheckpointOptions checkpointOptions,  5             final boolean advanceToEndOfEventTime) {  6 
 7         final AbstractInvokable invokable = this.invokable;  8         //建立一個CheckpointMetaData,該對象僅有checkpointID、checkpointTimestamp兩個屬性
 9         final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp); 10 
11         if (executionState == ExecutionState.RUNNING && invokable != null) { 12 
13             //..............
14 
15             Runnable runnable = new Runnable() { 16  @Override 17                 public void run() { 18                     // set safety net from the task's context for checkpointing thread
19                     LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName()); 20  FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry); 21 
22                     try { 23                         //根據SourceStreamTask和StreamTask調用不一樣的方法
24                         boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime); 25                         if (!success) { 26  checkpointResponder.declineCheckpoint( 27  getJobID(), getExecutionId(), checkpointID, 28                                     new CheckpointException("Task Name" + taskName, CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY)); 29  } 30  } 31                     catch (Throwable t) { 32                         if (getExecutionState() == ExecutionState.RUNNING) { 33                             failExternally(new Exception( 34                                 "Error while triggering checkpoint " + checkpointID + " for " +
35  taskNameWithSubtask, t)); 36                         } else { 37                             LOG.debug("Encountered error while triggering checkpoint {} for " +
38                                 "{} ({}) while being not in state running.", checkpointID, 39  taskNameWithSubtask, executionId, t); 40  } 41                     } finally { 42                         FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null); 43  } 44  } 45  }; 46             //以異步的方式觸發Runnable
47  executeAsyncCallRunnable( 48  runnable, 49                     String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId)); 50  } 51         else { 52             LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId); 53 
54             // send back a message that we did not do the checkpoint
55  checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID, 56                     new CheckpointException("Task name with subtask : " + taskNameWithSubtask, CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY)); 57  } 58     }

   SourceStreamTask和StreamTask調用triggerCheckpoint最終都是調用StreamTask類中的triggerCheckpoint(...)方法,其核心代碼爲:

1   //#StreamTask.java
2     return performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics, advanceToEndOfEventTime);

   在performCheckpoint(...)方法中,主要有如下兩件事:

  一、若task是running,則能夠進行checkpoint,主要有如下三件事:

    1)爲checkpoint作準備,通常是什麼不作的,直接接受checkpoint;

    2)生成barrier,並以廣播的形式發射到下游去;

    3)觸發本task保存state;

  二、若不是running,通知下游取消本次checkpoint,方法是發送一個CancelCheckpointMarker,這是相似於Barrier的另外一種消息。

   具體代碼以下:

 1   //#StreamTask.java
 2     private boolean performCheckpoint(  3  CheckpointMetaData checkpointMetaData,  4  CheckpointOptions checkpointOptions,  5  CheckpointMetrics checkpointMetrics,  6             boolean advanceToEndOfTime) throws Exception {  7         //......
 8 
 9         synchronized (lock) { 10             if (isRunning) { 11 
12                 if (checkpointOptions.getCheckpointType().isSynchronous()) { 13  syncSavepointLatch.setCheckpointId(checkpointId); 14 
15                     if (advanceToEndOfTime) { 16  advanceToEndOfEventTime(); 17  } 18  } 19 
20                 // All of the following steps happen as an atomic step from the perspective of barriers and 21                 // records/watermarks/timers/callbacks. 22                 // We generally try to emit the checkpoint barrier as soon as possible to not affect downstream 23                 // checkpoint alignments 24 
25                 // Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work. 26                 // The pre-barrier work should be nothing or minimal in the common case.
27  operatorChain.prepareSnapshotPreBarrier(checkpointId); 28 
29                 // Step (2): Send the checkpoint barrier downstream
30  operatorChain.broadcastCheckpointBarrier( 31  checkpointId, 32  checkpointMetaData.getTimestamp(), 33  checkpointOptions); 34 
35                 // Step (3): Take the state snapshot. This should be largely asynchronous, to not 36                 // impact progress of the streaming topology
37  checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics); 38 
39                 return true; 40  } 41             else { 42                 //.......
43  } 44  } 45     }

    接下來分析checkpointState(...)過程。

  checkpointState(...)方法最終會調用StreamTask類中executeCheckpointing(),其中會建立一個異步對象AsyncCheckpointRunnable,用以報告該檢查點已完成,關鍵代碼以下:

 1   //#StreamTask.java類中executeCheckpointing()
 2     public void executeCheckpointing() throws Exception {  3             startSyncPartNano = System.nanoTime();  4 
 5             try {  6                 //調用StreamOperator進行snapshotState的入口方法,依算子不一樣而變
 7                 for (StreamOperator<?> op : allOperators) {  8  checkpointStreamOperator(op);  9  } 10                 //......... 11 
12                 // we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit
13                 AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable( 14  owner, 15  operatorSnapshotsInProgress, 16  checkpointMetaData, 17  checkpointMetrics, 18  startAsyncPartNano); 19 
20  owner.cancelables.registerCloseable(asyncCheckpointRunnable); 21  owner.asyncOperationsThreadPool.execute(asyncCheckpointRunnable); 22 
23                 //.........
24             } catch (Exception ex) { 25                 //.......
26  } 27         }

   進入AsyncCheckpointRunnable(...)中的run()方法,其中會調用StreamTask類中reportCompletedSnapshotStates(...)(對於一個無狀態的job返回的null),進而調用TaskStateManagerImpl類中的reportTaskStateSnapshots(...)將TM的checkpoint彙報給JM,關鍵代碼以下:

1     //TaskStateManagerImpl.java
2     checkpointResponder.acknowledgeCheckpoint(
3             jobId,
4             executionAttemptID,
5             checkpointId,
6             checkpointMetrics,
7             acknowledgedState);

  其邏輯是邏輯是經過rpc的方式遠程調JobManager的相關方法完成報告事件。

  2.4 JobManager處理checkpoint

  經過RpcCheckpointResponder類中acknowledgeCheckpoint(...)來響應checkpoint返回的消息,該方法以後的調度過程和涉及的核心方法以下:

 1    //#JobMaster類中acknowledgeCheckpoint==>  2     //#LegacyScheduler類中acknowledgeCheckpoint==>  3     //#CheckpointCoordinator類中receiveAcknowledgeMessage(...)==>  4     //completePendingCheckpoint(checkpoint);  5     
 6     //<p>Important: This method should only be called in the checkpoint lock scope
 7     private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) throws CheckpointException {  8         final long checkpointId = pendingCheckpoint.getCheckpointId();  9         final CompletedCheckpoint completedCheckpoint; 10 
11         // As a first step to complete the checkpoint, we register its state with the registry
12         Map<OperatorID, OperatorState> operatorStates = pendingCheckpoint.getOperatorStates(); 13  sharedStateRegistry.registerAll(operatorStates.values()); 14 
15         try { 16             try { 17                 //完成checkpoint
18                 completedCheckpoint = pendingCheckpoint.finalizeCheckpoint(); 19  failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointId()); 20  } 21             catch (Exception e1) { 22                 // abort the current pending checkpoint if we fails to finalize the pending checkpoint.
23                 if (!pendingCheckpoint.isDiscarded()) { 24  failPendingCheckpoint(pendingCheckpoint, CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1); 25  } 26 
27                 throw new CheckpointException("Could not finalize the pending checkpoint " + checkpointId + '.', 28  CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1); 29  } 30 
31             // the pending checkpoint must be discarded after the finalization
32             Preconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint != null); 33 
34             try { 35                 //添加新的checkpoints,如有必要(completedCheckpoints.size() > maxNumberOfCheckpointsToRetain)刪除舊的
36  completedCheckpointStore.addCheckpoint(completedCheckpoint); 37             } catch (Exception exception) { 38                 // we failed to store the completed checkpoint. Let's clean up
39                 executor.execute(new Runnable() { 40  @Override 41                     public void run() { 42                         try { 43  completedCheckpoint.discardOnFailedStoring(); 44                         } catch (Throwable t) { 45                             LOG.warn("Could not properly discard completed checkpoint {}.", completedCheckpoint.getCheckpointID(), t); 46  } 47  } 48  }); 49 
50                 throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.', 51  CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, exception); 52  } 53         } finally { 54  pendingCheckpoints.remove(checkpointId); 55 
56  triggerQueuedRequests(); 57  } 58 
59  rememberRecentCheckpointId(checkpointId); 60 
61         // drop those pending checkpoints that are at prior to the completed one 62         //刪除在其以前未完成的checkpoint(優先級高的)
63  dropSubsumedCheckpoints(checkpointId); 64 
65         // record the time when this was completed, to calculate 66         // the 'min delay between checkpoints'
67         lastCheckpointCompletionNanos = System.nanoTime(); 68 
69         LOG.info("Completed checkpoint {} for job {} ({} bytes in {} ms).", checkpointId, job, 70  completedCheckpoint.getStateSize(), completedCheckpoint.getDuration()); 71 
72         if (LOG.isDebugEnabled()) { 73             StringBuilder builder = new StringBuilder(); 74             builder.append("Checkpoint state: "); 75             for (OperatorState state : completedCheckpoint.getOperatorStates().values()) { 76  builder.append(state); 77                 builder.append(", "); 78  } 79             // Remove last two chars ", "
80             builder.setLength(builder.length() - 2); 81 
82  LOG.debug(builder.toString()); 83  } 84 
85         // send the "notify complete" call to all vertices
86         final long timestamp = completedCheckpoint.getTimestamp(); 87         
88         //通知全部(TM中)operator該checkpoint已完成
89         for (ExecutionVertex ev : tasksToCommitTo) { 90             Execution ee = ev.getCurrentExecutionAttempt(); 91             if (ee != null) { 92  ee.notifyCheckpointComplete(checkpointId, timestamp); 93  } 94  } 95     }

   至此,checkpoint的總體流程分析完畢建議結合原理去理解,參考的三篇文獻都是寫的很好的,有時間建議看看。

 

Ref:

[1]https://www.jianshu.com/p/a40a1b92f6a2

[2]http://www.javashuo.com/article/p-khdfegoi-mh.html

[3] https://blog.csdn.net/qq475781638/article/details/92698301

相關文章
相關標籤/搜索