Flink的checkpoint是經過定時器週期性觸發的。checkpoint觸發最關鍵的類是CheckpointCoordinator,稱它爲檢查點協調器。java
org.apache.flink.runtime.checkpoint.CheckpointCoordinator
CheckpointCoordinator主要做用是協調operators和state的分佈式快照。它經過向相關的tasks發送觸發消息和從各tasks收集確認消息(Ack)來完成checkpoint。同時,它還收集和維護各個tasks上報的狀態句柄/狀態引用(state handles)。apache
CheckpointCoordinator主要屬性:數組
1 /** Coordinator-wide lock to safeguard the checkpoint updates */ 2 private final Object lock = new Object(); //Coordinator範圍的鎖 3 4 /** Lock specially to make sure that trigger requests do not overtake each other. 5 * This is not done with the coordinator-wide lock, because as part of triggering, 6 * blocking operations may happen (distributed atomic counters). 7 * Using a dedicated lock, we avoid blocking the processing of 'acknowledge/decline' 8 * messages during that phase. */ 9 private final Object triggerLock = new Object(); //trigger requests的專用鎖,避免在獲取checkpointID時阻塞對消息的處理。 10 11 /** Tasks who need to be sent a message when a checkpoint is started */ 12 private final ExecutionVertex[] tasksToTrigger; 13 14 /** Tasks who need to acknowledge a checkpoint before it succeeds */ 15 private final ExecutionVertex[] tasksToWaitFor; 16 17 /** Tasks who need to be sent a message when a checkpoint is confirmed */ 18 private final ExecutionVertex[] tasksToCommitTo; 19 20 /** Map from checkpoint ID to the pending checkpoint */ 21 private final Map<Long, PendingCheckpoint> pendingCheckpoints;//待處理的checkpoint 22 23 /** Actor that receives status updates from the execution graph this coordinator works for */ 24 private JobStatusListener jobStatusListener;//Actor實例,監聽Job狀態變化並根據變化啓停定時任務 25 26 /** Flag whether a trigger request could not be handled immediately. Non-volatile, because only 27 * accessed in synchronized scope */ 28 private boolean triggerRequestQueued;//標記一個觸發請求是否不能被當即處理。 29 30 /** Flag marking the coordinator as shut down (not accepting any messages any more) */ 31 private volatile boolean shutdown;//coordinator的關閉標誌
ScheduledTrigger是檢查點定時任務類,主要是調用了triggerCheckpoint方法。併發
1 private final class ScheduledTrigger implements Runnable { 2 @Override 3 public void run() { 4 try { 5 triggerCheckpoint(System.currentTimeMillis(), true); 6 } 7 catch (Exception e) { 8 LOG.error("Exception while triggering checkpoint.", e); 9 } 10 } 11 }
下面具體看一下 triggerCheckpoint 方法的具體實現app
1 //觸發一個新的標準檢查點。timestamp爲觸發檢查點的時間戳,isPeriodic標誌是不是週期性的觸發 2 public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) { 3 return triggerCheckpoint(timestamp, checkpointProperties, checkpointDirectory, isPeriodic).isSuccess(); 4 }
觸發檢查點的核心邏輯:異步
首先進行觸發Checkpoint以前的預檢查,判斷是否知足條件;分佈式
而後獲取一個CheckpointID,建立PendingCheckpoint實例;ide
以後從新檢查觸發條件是否知足要求,防止產生競態條件;ui
最後將PendingCheckpoint實例checkpoint加入到pendingCheckpoints中,並向tasks發送消息觸發它們的檢查點。this
1 CheckpointTriggerResult triggerCheckpoint( 2 long timestamp, 3 CheckpointProperties props, 4 String targetDirectory, 5 boolean isPeriodic) { 6 7 // Sanity check 若是檢查點是存儲在外部系統中且targetDirectory爲空,報錯 8 if (props.externalizeCheckpoint() && targetDirectory == null) { 9 throw new IllegalStateException("No target directory specified to persist checkpoint to."); 10 } 11 12 // make some eager pre-checks 一些checkpoint以前的預檢查 13 synchronized (lock) { 14 // abort if the coordinator has been shutdown in the meantime 15 if (shutdown) { 16 return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN); 17 } 18 19 // Don't allow periodic checkpoint if scheduling has been disabled 20 if (isPeriodic && !periodicScheduling) { 21 return new CheckpointTriggerResult(CheckpointDeclineReason.PERIODIC_SCHEDULER_SHUTDOWN); 22 } 23 24 // validate whether the checkpoint can be triggered, with respect to the limit of 25 // concurrent checkpoints, and the minimum time between checkpoints. 26 // these checks are not relevant for savepoints 27 // 驗證checkpoint是否能被觸發,關於併發檢查點的限制和檢查點之間的最小時間。 28 // 判斷checkpoint是否被強制。強制checkpoint不受併發檢查點最大數量和檢查點之間最小時間的限制。 29 if (!props.forceCheckpoint()) { 30 // sanity check: there should never be more than one trigger request queued 31 if (triggerRequestQueued) { 32 //若是不能被當即觸發,直接返回異常 33 LOG.warn("Trying to trigger another checkpoint while one was queued already"); 34 return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED); 35 } 36 37 // if too many checkpoints are currently in progress, we need to mark that a request is queued 38 if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) { 39 //若是未完成的檢查點太多,大於配置的併發檢查點最大數量,則將當前檢查點的觸發請求設置爲不能當即執行。 40 triggerRequestQueued = true; 41 //若是定時任務已經啓動,則取消定時任務的執行。 42 if (currentPeriodicTrigger != null) { 43 currentPeriodicTrigger.cancel(false); 44 currentPeriodicTrigger = null; 45 } 46 return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS); 47 } 48 49 // make sure the minimum interval between checkpoints has passed 50 //檢查是否知足checkpoint之間的最小時間間隔的條件 51 final long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos; 52 final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000; 53 54 if (durationTillNextMillis > 0) { 55 if (currentPeriodicTrigger != null) { 56 currentPeriodicTrigger.cancel(false); 57 currentPeriodicTrigger = null; 58 } 59 // Reassign the new trigger to the currentPeriodicTrigger 60 //此時延遲時間設置爲durationTillNextMillis 61 currentPeriodicTrigger = timer.scheduleAtFixedRate( 62 new ScheduledTrigger(), 63 durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS); 64 65 return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS); 66 } 67 } 68 } 69 70 // check if all tasks that we need to trigger are running. 71 // if not, abort the checkpoint 72 // 檢查須要觸發checkpoint的全部Tasks是否處於運行狀態,若是有一個不知足條件,則不觸發檢查點 73 Execution[] executions = new Execution[tasksToTrigger.length]; 74 for (int i = 0; i < tasksToTrigger.length; i++) { 75 Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt(); 76 if (ee != null && ee.getState() == ExecutionState.RUNNING) { 77 executions[i] = ee; 78 } else { 79 LOG.info("Checkpoint triggering task {} is not being executed at the moment. Aborting checkpoint.", 80 tasksToTrigger[i].getTaskNameWithSubtaskIndex()); 81 return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING); 82 } 83 } 84 85 // next, check if all tasks that need to acknowledge the checkpoint are running. 86 // if not, abort the checkpoint 87 //檢查全部須要ack的tasks是否都處於運行狀態,若是有一個不知足條件,則不觸發檢查點。 88 Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(tasksToWaitFor.length); 89 90 for (ExecutionVertex ev : tasksToWaitFor) { 91 Execution ee = ev.getCurrentExecutionAttempt(); 92 if (ee != null) { 93 ackTasks.put(ee.getAttemptId(), ev); 94 } else { 95 LOG.info("Checkpoint acknowledging task {} is not being executed at the moment. Aborting checkpoint.", 96 ev.getTaskNameWithSubtaskIndex()); 97 return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING); 98 } 99 } 100 101 // we will actually trigger this checkpoint! 102 103 // we lock with a special lock to make sure that trigger requests do not overtake each other. 104 // this is not done with the coordinator-wide lock, because the 'checkpointIdCounter' 105 // may issue blocking operations. Using a different lock than the coordinator-wide lock, 106 // we avoid blocking the processing of 'acknowledge/decline' messages during that time. 107 // 觸發檢查點,在triggerLock同步代碼塊中完成,而不是使用coordinator範圍的鎖。 108 synchronized (triggerLock) { 109 final long checkpointID; 110 //首先獲取checkpointID 111 try { 112 // this must happen outside the coordinator-wide lock, because it communicates 113 // with external services (in HA mode) and may block for a while. 114 checkpointID = checkpointIdCounter.getAndIncrement(); 115 } 116 catch (Throwable t) { 117 int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet(); 118 LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t); 119 return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION); 120 } 121 122 //建立PendingCheckpoint實例,表示待處理檢查點 123 final PendingCheckpoint checkpoint = new PendingCheckpoint( 124 job, 125 checkpointID, 126 timestamp, 127 ackTasks, 128 props, 129 targetDirectory, 130 executor); 131 132 if (statsTracker != null) { 133 PendingCheckpointStats callback = statsTracker.reportPendingCheckpoint( 134 checkpointID, 135 timestamp, 136 props); 137 138 checkpoint.setStatsCallback(callback); 139 } 140 141 // schedule the timer that will clean up the expired checkpoints 142 // 針對當前checkpoints超時進行資源清理的canceller 143 final Runnable canceller = new Runnable() { 144 @Override 145 public void run() { 146 synchronized (lock) { 147 // only do the work if the checkpoint is not discarded anyways 148 // note that checkpoint completion discards the pending checkpoint object 149 if (!checkpoint.isDiscarded()) { 150 LOG.info("Checkpoint " + checkpointID + " expired before completing."); 151 152 checkpoint.abortExpired(); 153 pendingCheckpoints.remove(checkpointID); 154 rememberRecentCheckpointId(checkpointID); 155 156 triggerQueuedRequests(); 157 } 158 } 159 } 160 }; 161 162 try { 163 //從新請求coordinator-wide lock 164 // re-acquire the coordinator-wide lock 165 synchronized (lock) { 166 // since we released the lock in the meantime, we need to re-check 167 // that the conditions still hold. 168 // 從新檢查觸發條件,防止產生競態條件。這裏作二次檢查的緣由是,中間有一段關於得到checkpointId的代碼,不在同步塊中。 169 if (shutdown) { 170 return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN); 171 } 172 else if (!props.forceCheckpoint()) { 173 if (triggerRequestQueued) { 174 LOG.warn("Trying to trigger another checkpoint while one was queued already"); 175 return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED); 176 } 177 178 if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) { 179 triggerRequestQueued = true; 180 if (currentPeriodicTrigger != null) { 181 currentPeriodicTrigger.cancel(false); 182 currentPeriodicTrigger = null; 183 } 184 return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS); 185 } 186 187 // make sure the minimum interval between checkpoints has passed 188 final long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos; 189 final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000; 190 191 if (durationTillNextMillis > 0) { 192 if (currentPeriodicTrigger != null) { 193 currentPeriodicTrigger.cancel(false); 194 currentPeriodicTrigger = null; 195 } 196 197 // Reassign the new trigger to the currentPeriodicTrigger 198 currentPeriodicTrigger = timer.scheduleAtFixedRate( 199 new ScheduledTrigger(), 200 durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS); 201 202 return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS); 203 } 204 } 205 206 LOG.info("Triggering checkpoint " + checkpointID + " @ " + timestamp); 207 208 //將checkpoint加入到pendingCheckpoints中 209 pendingCheckpoints.put(checkpointID, checkpoint); 210 211 //啓動超時canceller,延遲checkpointTimeout執行 212 ScheduledFuture<?> cancellerHandle = timer.schedule( 213 canceller, 214 checkpointTimeout, TimeUnit.MILLISECONDS); 215 216 if (!checkpoint.setCancellerHandle(cancellerHandle)) { 217 // checkpoint is already disposed! 218 cancellerHandle.cancel(false); 219 } 220 221 // trigger the master hooks for the checkpoint 222 final List<MasterState> masterStates = MasterHooks.triggerMasterHooks(masterHooks.values(), 223 checkpointID, timestamp, executor, Time.milliseconds(checkpointTimeout)); 224 for (MasterState s : masterStates) { 225 checkpoint.addMasterState(s); 226 } 227 } 228 // end of lock scope 229 230 CheckpointOptions checkpointOptions; 231 if (!props.isSavepoint()) { 232 checkpointOptions = CheckpointOptions.forFullCheckpoint(); 233 } else { 234 checkpointOptions = CheckpointOptions.forSavepoint(targetDirectory); 235 } 236 237 // send the messages to the tasks that trigger their checkpoint 238 // 向tasks發送消息,觸發它們的檢查點 239 for (Execution execution: executions) { 240 execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions); 241 } 242 243 numUnsuccessfulCheckpointsTriggers.set(0); 244 return new CheckpointTriggerResult(checkpoint); 245 } 246 catch (Throwable t) { 247 // guard the map against concurrent modifications 248 synchronized (lock) { 249 pendingCheckpoints.remove(checkpointID); 250 } 251 252 int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet(); 253 LOG.warn("Failed to trigger checkpoint {}. ({} consecutive failed attempts so far)", 254 checkpointID, numUnsuccessful, t); 255 256 if (!checkpoint.isDiscarded()) { 257 checkpoint.abortError(new Exception("Failed to trigger checkpoint", t)); 258 } 259 return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION); 260 } 261 262 } // end trigger lock 263 }
啓動定時任務方法:startCheckpointScheduler
1 public void startCheckpointScheduler() { 2 synchronized (lock) { 3 if (shutdown) { 4 throw new IllegalArgumentException("Checkpoint coordinator is shut down"); 5 } 6 //保證全部之前的timer被取消 7 stopCheckpointScheduler(); 8 9 periodicScheduling = true; 10 //scheduleAtFixedRate方法是以固定延遲和固定時間間隔週期性的執行任務 11 currentPeriodicTrigger = timer.scheduleAtFixedRate( 12 new ScheduledTrigger(), 13 baseInterval, baseInterval, TimeUnit.MILLISECONDS); 14 } 15 }
中止定時任務方法:stopCheckpointScheduler
1 //重置一些標記變量,釋放資源 2 public void stopCheckpointScheduler() { 3 synchronized (lock) { 4 triggerRequestQueued = false; 5 periodicScheduling = false; 6 7 if (currentPeriodicTrigger != null) { 8 currentPeriodicTrigger.cancel(false);//取消當前週期的觸發任務 9 currentPeriodicTrigger = null; 10 } 11 12 //pendingCheckpoints中存的是待執行的檢查點 13 for (PendingCheckpoint p : pendingCheckpoints.values()) { 14 p.abortError(new Exception("Checkpoint Coordinator is suspending.")); 15 } 16 pendingCheckpoints.clear();//清空pendingCheckpoints 17 numUnsuccessfulCheckpointsTriggers.set(0); 18 } 19 }
啓動和中止定時任務的機制是怎樣的?Flink使用的是基於AKKA的Actor模型的消息驅動機制。
org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator
CheckpointCoordinatorDeActivator是actor的實現類,監聽JobStatus的變化,啓動和中止週期性的checkpoint調度任務。
1 //actor的實現類,監聽JobStatus的變化,激活和取消週期性的checkpoint調度任務。 2 public class CheckpointCoordinatorDeActivator implements JobStatusListener { 3 4 private final CheckpointCoordinator coordinator; 5 6 public CheckpointCoordinatorDeActivator(CheckpointCoordinator coordinator) { 7 this.coordinator = checkNotNull(coordinator); 8 } 9 10 @Override 11 public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) { 12 if (newJobStatus == JobStatus.RUNNING) { 13 // start the checkpoint scheduler 14 // 一旦監聽到JobStatus變爲RUNNING,就會啓動定時任務 15 coordinator.startCheckpointScheduler(); 16 } else { 17 // anything else should stop the trigger for now 18 coordinator.stopCheckpointScheduler(); 19 } 20 } 21 }
CheckpointCoordinatorDeActivator的實例是在CheckpointCoordinator中被建立的,方法爲createActivatorDeactivator。
1 public JobStatusListener createActivatorDeactivator() { 2 synchronized (lock) { 3 if (shutdown) { 4 throw new IllegalArgumentException("Checkpoint coordinator is shut down"); 5 } 6 7 if (jobStatusListener == null) { 8 jobStatusListener = new CheckpointCoordinatorDeActivator(this); 9 } 10 11 return jobStatusListener; 12 } 13 }
AbstractCheckpointMessage :全部checkpoint消息的基礎抽象類
org.apache.flink.runtime.messages.checkpoint.AbstractCheckpointMessage
AbstractCheckpointMessage主要屬性:
1 /** The job to which this message belongs */ 2 private final JobID job; 3 /** The task execution that is source/target of the checkpoint message */ 4 private final ExecutionAttemptID taskExecutionId; //檢查點的source/target task 5 /** The ID of the checkpoint that this message coordinates */ 6 private final long checkpointId;
它有如下實現類:
TriggerCheckpoint :JobManager向TaskManager發送的檢查點觸發消息;
AcknowledgeCheckpoint :TaskManager向JobManager發送的某個獨立task的檢查點完成確認的消息;
DeclineCheckpoint :TaskManager向JobManager發送的檢查點尚未被處理的消息;
NotifyCheckpointComplete :JobManager向TaskManager發送的檢查點完成的消息。
從JobManager發送到TaskManager,通知指定的task觸發checkpoint。
發送消息
發送消息的邏輯是在CheckpointCoordinator中,上文提到過:
1 // send the messages to the tasks that trigger their checkpoint 2 for (Execution execution: executions) { 3 execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions); 4 }
其中executions是Execution[]數組,其中存儲的元素是在檢查點觸發時須要被髮送消息的Tasks的集合(即CheckpointCoordinator成員變量tasksToTrigger中的數據)。對每個要發送的Task執行triggerCheckpoint()方法。
接下來,看一下Execution的triggerCheckpoint方法。
1 //在該execution的task上觸發一個新的checkpoint 2 public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) { 3 //獲取Resource 4 final SimpleSlot slot = assignedResource;//獲取Slot 5 6 if (slot != null) { 7 //TaskManagerGateway是用於和TaskManager通訊的抽象基礎類 8 final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); 9 //經過taskManagerGateway向TaskManager發送消息 10 taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions); 11 } else { 12 LOG.debug("The execution has no slot assigned. This indicates that the execution is " + 13 "no longer running."); 14 } 15 }
繼續進入ActorTaskManagerGateway(TaskManagerGateway抽象類的Actor實現)類的triggerCheckpoint()方法:
1 public void triggerCheckpoint( 2 ExecutionAttemptID executionAttemptID, 3 JobID jobId, 4 long checkpointId, 5 long timestamp, 6 CheckpointOptions checkpointOptions) { 7 8 Preconditions.checkNotNull(executionAttemptID); 9 Preconditions.checkNotNull(jobId); 10 //新建了一個TriggerCheckpoint消息,經過actorGateway的tell方法(異步發送,沒有返回結果)發送這個消息 11 //ActorGateway是基於actor通訊的接口 12 actorGateway.tell(new TriggerCheckpoint(jobId, executionAttemptID, checkpointId, timestamp, checkpointOptions)); 13 }
AkkaActorGateway類是ActorGateway接口一種實現,它使用 Akka 與遠程的actors進行通訊。看一下AkkaActorGateway的tell方法:
1 @Override 2 public void tell(Object message) { 3 Object newMessage = decorator.decorate(message); 4 //經過ActorRef實例actor發送消息,ActorRef是akka中的類。之後須要研究Akka的實現機制。 5 actor.tell(newMessage, ActorRef.noSender()); 6 }
至此,發送TriggerCheckpoint消息的過程結束。下面將看一下TaskManager接收消息的過程。
接收消息
TaskManager接收消息的部分是用scala實現的。
org.apache.flink.runtime.taskmanager.TaskManager
TaskManager類的handleMessage方法是消息處理中心。
1 //該方法爲TaskManager的消息處理中心。接收消息,按消息的種類調用不一樣的方法處理。 2 override def handleMessage: Receive = { 3 case message: TaskMessage => handleTaskMessage(message) 4 5 //這個就是處理checkpoints相關的消息 6 case message: AbstractCheckpointMessage => handleCheckpointingMessage(message) 7 8 case JobManagerLeaderAddress(address, newLeaderSessionID) => 9 handleJobManagerLeaderAddress(address, newLeaderSessionID) 10 11 case message: RegistrationMessage => handleRegistrationMessage(message) 12 13 ... 14 }
接下來,看方法handleCheckpointingMessage(),主要是觸發Checkpoint Barrier。
1 //處理Checkpoint相關的消息 2 private def handleCheckpointingMessage(actorMessage: AbstractCheckpointMessage): Unit = { 3 4 actorMessage match { 5 //觸發Checkpoint消息 6 case message: TriggerCheckpoint => 7 val taskExecutionId = message.getTaskExecutionId 8 val checkpointId = message.getCheckpointId 9 val timestamp = message.getTimestamp 10 val checkpointOptions = message.getCheckpointOptions 11 12 log.debug(s"Receiver TriggerCheckpoint $checkpointId@$timestamp for $taskExecutionId.") 13 14 val task = runningTasks.get(taskExecutionId) 15 if (task != null) { 16 //調用Task的triggerCheckpointBarrier方法,觸發Checkpoint Barrier,Barrier實現機制的細節之後討論。 17 task.triggerCheckpointBarrier(checkpointId, timestamp, checkpointOptions) 18 } else { 19 log.debug(s"TaskManager received a checkpoint request for unknown task $taskExecutionId.") 20 } 21 //Checkpoint完成通知消息 22 case message: NotifyCheckpointComplete => 23 val taskExecutionId = message.getTaskExecutionId 24 val checkpointId = message.getCheckpointId 25 val timestamp = message.getTimestamp 26 27 log.debug(s"Receiver ConfirmCheckpoint $checkpointId@$timestamp for $taskExecutionId.") 28 29 val task = runningTasks.get(taskExecutionId) 30 if (task != null) { 31 //調用Task的notifyCheckpointComplete方法,進行相關處理 32 task.notifyCheckpointComplete(checkpointId) 33 } else { 34 log.debug( 35 s"TaskManager received a checkpoint confirmation for unknown task $taskExecutionId.") 36 } 37 38 // unknown checkpoint message 39 case _ => unhandled(actorMessage) 40 } 41 }
JobManager發送到TaskManager,通知task它的檢查點已經獲得完成確認,task能夠向第三方提交checkpoint。
發送消息
發送NotifyCheckpointComplete消息的部分在CheckpointCoordinator類的receiveAcknowledgeMessage方法中。
1 //該方法接收一個AcknowledgeCheckpoint消息,返回該Message是否與一個pending checkpoint相關聯 2 public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws CheckpointException { 3 if (shutdown || message == null) { 4 return false; 5 } 6 if (!job.equals(message.getJob())) { 7 LOG.error("Received wrong AcknowledgeCheckpoint message for job {}: {}", job, message); 8 return false; 9 } 10 11 final long checkpointId = message.getCheckpointId(); 12 13 synchronized (lock) { 14 // we need to check inside the lock for being shutdown as well, otherwise we 15 // get races and invalid error log messages 16 if (shutdown) { 17 return false; 18 } 19 20 final PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId); 21 22 //若是是待處理的檢查點而且沒有被Discarded 23 if (checkpoint != null && !checkpoint.isDiscarded()) { 24 25 //根據TaskExecutionId和SubtaskState,Acknowledges the task。確認該任務 26 switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState(), message.getCheckpointMetrics())) { 27 //確認成功 28 case SUCCESS: 29 LOG.debug("Received acknowledge message for checkpoint {} from task {} of job {}.", 30 checkpointId, message.getTaskExecutionId(), message.getJob()); 31 //若是收到了所有task的確認消息(即notYetAcknowledgedTasks爲空) 32 if (checkpoint.isFullyAcknowledged()) { 33 //嘗試完成PendingCheckpoint(Try to complete the given pending checkpoint) 34 //將完成的checkpointId從checkpoint中刪除和一下標誌修改,最後,發送notify complete消息 35 completePendingCheckpoint(checkpoint); 36 } 37 break; 38 //重複消息 39 case DUPLICATE: 40 LOG.debug("Received a duplicate acknowledge message for checkpoint {}, task {}, job {}.", 41 message.getCheckpointId(), message.getTaskExecutionId(), message.getJob()); 42 break; 43 //未知消息 44 case UNKNOWN: 45 LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, " + 46 "because the task's execution attempt id was unknown. Discarding " + 47 "the state handle to avoid lingering state.", message.getCheckpointId(), 48 message.getTaskExecutionId(), message.getJob()); 49 50 discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState()); 51 break; 52 //廢棄消息 53 case DISCARDED: 54 LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, " + 55 "because the pending checkpoint had been discarded. Discarding the " + 56 "state handle tp avoid lingering state.", 57 message.getCheckpointId(), message.getTaskExecutionId(), message.getJob()); 58 discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState()); 59 } 60 61 return true; 62 } 63 else if (checkpoint != null) { 64 // this should not happen 65 throw new IllegalStateException( 66 "Received message for discarded but non-removed checkpoint " + checkpointId); 67 } 68 else { 69 boolean wasPendingCheckpoint; 70 71 // message is for an unknown checkpoint, or comes too late (checkpoint disposed) 72 if (recentPendingCheckpoints.contains(checkpointId)) { 73 wasPendingCheckpoint = true; 74 LOG.warn("Received late message for now expired checkpoint attempt {} from " + 75 "{} of job {}.", checkpointId, message.getTaskExecutionId(), message.getJob()); 76 } 77 else { 78 LOG.debug("Received message for an unknown checkpoint {} from {} of job {}.", 79 checkpointId, message.getTaskExecutionId(), message.getJob()); 80 wasPendingCheckpoint = false; 81 } 82 83 // try to discard the state so that we don't have lingering state lying around 84 discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState()); 85 86 return wasPendingCheckpoint; 87 } 88 } 89 }
completePendingCheckpoint方法中發送NotifyCheckpointComplete消息的代碼以下:
1 for (ExecutionVertex ev : tasksToCommitTo) { 2 Execution ee = ev.getCurrentExecutionAttempt(); 3 if (ee != null) { 4 ee.notifyCheckpointComplete(checkpointId, timestamp); 5 } 6 }
接收消息
在TriggerCheckpoint消息接收中的有這部分代碼,主要是調用notifyCheckpointComplete方法: task.notifyCheckpointComplete(checkpointId)。
由TaskManager發向JobManager,告知JobManager指定task的checkpoint已完成。該消息可能攜帶task的狀態和checkpointMetrics。
AcknowledgeCheckpoint消息類的兩個屬性:
private final SubtaskState subtaskState;//任務狀態 private final CheckpointMetrics checkpointMetrics;
發送消息
發送消息的過程在RuntimeEnvironment類中的acknowledgeCheckpoint方法
1 public void acknowledgeCheckpoint( 2 long checkpointId, 3 CheckpointMetrics checkpointMetrics, 4 SubtaskState checkpointStateHandles) { 5 //經過CheckpointResponder接口的實例checkpointResponder發送ack消息 6 checkpointResponder.acknowledgeCheckpoint( 7 jobId, executionId, checkpointId, checkpointMetrics, 8 checkpointStateHandles); 9 }
CheckpointResponder接口是checkpoint acknowledge and decline messages 的應答類。ActorGatewayCheckpointResponder是使用了ActorGateway的CheckpointResponder接口的實現類,包含acknowledgeCheckpoint和declineCheckpoint兩個方法。
1 @Override 2 public void acknowledgeCheckpoint( 3 JobID jobID, 4 ExecutionAttemptID executionAttemptID, 5 long checkpointId, 6 CheckpointMetrics checkpointMetrics, 7 SubtaskState checkpointStateHandles) { 8 //新建一個AcknowledgeCheckpoint消息 9 AcknowledgeCheckpoint message = new AcknowledgeCheckpoint( 10 jobID, executionAttemptID, checkpointId, checkpointMetrics, 11 checkpointStateHandles); 12 //經過actorGateway發送出去 13 actorGateway.tell(message); 14 }
接收消息
經過receiveAcknowledgeMessage方法接收(和NotifyCheckpointComplete消息的發送過程在同一個方法)。
該消息由TaskManager發送給JobManager,用於告知CheckpointCoordinator:檢查點的請求尚未可以被處理。這種狀況一般發生於:某task已處於RUNNING狀態,但在內部可能尚未準備好執行檢查點。
發送消息
位於task類的triggerCheckpointBarrier方法中。
org.apache.flink.runtime.taskmanager.Task
1 try { 2 boolean success = statefulTask.triggerCheckpoint(checkpointMetaData, checkpointOptions); 3 if (!success) { 4 //經過CheckpointResponder發送消息,相似發送AcknowledgeCheckpoint消息 5 checkpointResponder.declineCheckpoint( 6 getJobID(), getExecutionId(), checkpointID, 7 new CheckpointDeclineTaskNotReadyException(taskName)); 8 } 9 }
接收消息
CheckpointCoordinator中的receiveDeclineMessage方法。
1 public void receiveDeclineMessage(DeclineCheckpoint message) { 2 if (shutdown || message == null) { 3 return; 4 } 5 if (!job.equals(message.getJob())) { 6 throw new IllegalArgumentException("Received DeclineCheckpoint message for job " + 7 message.getJob() + " while this coordinator handles job " + job); 8 } 9 10 final long checkpointId = message.getCheckpointId(); 11 final String reason = (message.getReason() != null ? message.getReason().getMessage() : ""); 12 13 PendingCheckpoint checkpoint; 14 15 synchronized (lock) { 16 // we need to check inside the lock for being shutdown as well, otherwise we 17 // get races and invalid error log messages 18 if (shutdown) { 19 return; 20 } 21 22 checkpoint = pendingCheckpoints.get(checkpointId); 23 24 if (checkpoint != null && !checkpoint.isDiscarded()) { 25 //若是是待處理的Checkpoint且沒有被遺棄 26 LOG.info("Discarding checkpoint {} because of checkpoint decline from task {} : {}", 27 checkpointId, message.getTaskExecutionId(), reason); 28 29 pendingCheckpoints.remove(checkpointId);//將checkpointId從pendingCheckpoints中刪除 30 checkpoint.abortDeclined(); 31 rememberRecentCheckpointId(checkpointId); 32 33 // we don't have to schedule another "dissolving" checkpoint any more because the 34 // cancellation barriers take care of breaking downstream alignments 35 // we only need to make sure that suspended queued requests are resumed 36 37 //是否還有更多pending 的checkpoint 38 boolean haveMoreRecentPending = false; 39 for (PendingCheckpoint p : pendingCheckpoints.values()) { 40 if (!p.isDiscarded() && p.getCheckpointId() >= checkpoint.getCheckpointId()) { 41 haveMoreRecentPending = true; 42 break; 43 } 44 } 45 // 46 if (!haveMoreRecentPending) { 47 triggerQueuedRequests(); 48 } 49 } 50 else if (checkpoint != null) { 51 // this should not happen 52 throw new IllegalStateException( 53 "Received message for discarded but non-removed checkpoint " + checkpointId); 54 } 55 else if (LOG.isDebugEnabled()) { 56 if (recentPendingCheckpoints.contains(checkpointId)) { 57 // message is for an unknown checkpoint, or comes too late (checkpoint disposed) 58 LOG.debug("Received another decline message for now expired checkpoint attempt {} : {}", 59 checkpointId, reason); 60 } else { 61 // message is for an unknown checkpoint. might be so old that we don't even remember it any more 62 LOG.debug("Received decline message for unknown (too old?) checkpoint attempt {} : {}", 63 checkpointId, reason); 64 } 65 } 66 } 67 }