上一篇,咱們分析了了任務在executor端的運行流程,任務運行結束後,在Executor.launchTask方法最後,經過調用execBackend.statusUpdate方法將任務結果以及任務狀態發送給driver。回到driver端,咱們在driver的rpc服務端DriverEndPoint的receive方法中尋找對StatusUpdate消息的處理邏輯。node
case StatusUpdate(executorId, taskId, state, data) => // 通知TaskScheduler任務已完成 scheduler.statusUpdate(taskId, state, data.value) // 若是任務已經運行結束了,包括FINISHED, FAILED, KILLED, LOST這幾種狀態 // 那麼說明任務佔用的資源已經釋放了,此時就能夠回收這部分資源並從新分配任務 if (TaskState.isFinished(state)) { executorDataMap.get(executorId) match { case Some(executorInfo) => executorInfo.freeCores += scheduler.CPUS_PER_TASK makeOffers(executorId) case None => // Ignoring the update since we don't know about the executor. logWarning(s"Ignored task status update ($taskId state $state) " + s"from unknown executor with ID $executorId") } }
因此重點是scheduler.statusUpdate調用git
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) { var failedExecutor: Option[String] = None var reason: Option[ExecutorLossReason] = None synchronized { try { taskIdToTaskSetManager.get(tid) match { case Some(taskSet) => // 這個狀態不明,沒看什麼地方會產生這個狀態 if (state == TaskState.LOST) { // TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode, // where each executor corresponds to a single task, so mark the executor as failed. val execId = taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException( "taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)")) if (executorIdToRunningTaskIds.contains(execId)) { reason = Some( SlaveLost(s"Task $tid was lost, so marking the executor as lost as well.")) removeExecutor(execId, reason.get) failedExecutor = Some(execId) } } // 任務運行結束,包括這幾種狀態FINISHED, FAILED, KILLED, LOST if (TaskState.isFinished(state)) { // 清除關於這個task的一些簿記量 cleanupTaskState(tid) // 將這個task從正在運行的task集合中移除 taskSet.removeRunningTask(tid) if (state == TaskState.FINISHED) { // 啓動一個線程,用來異步地處理任務成功的狀況 taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) { taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData) } } case None => logError( ("Ignoring update with state %s for TID %s because its task set is gone (this is " + "likely the result of receiving duplicate task finished status updates) or its " + "executor has been marked as failed.") .format(state, tid)) } } catch { case e: Exception => logError("Exception in statusUpdate", e) } } // Update the DAGScheduler without holding a lock on this, since that can deadlock if (failedExecutor.isDefined) { assert(reason.isDefined) dagScheduler.executorLost(failedExecutor.get, reason.get) backend.reviveOffers() } }
這裏,啓動了一個異步任務,用來處理任務成功的狀況,因此咱們分析一下異步任務的處理邏輯。github
def enqueueSuccessfulTask( taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer): Unit = { // 啓動一個異步任務 getTaskResultExecutor.execute(new Runnable { override def run(): Unit = Utils.logUncaughtExceptions { try { // 對傳回的結果進行反序列化 val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match { // 若是是直接傳回的結果,那麼直接從反序列化的對象中取數據便可 case directResult: DirectTaskResult[_] => // 首先檢查結果大小是否超過閾值,默認是1g, // 也即最多可以容許多大的結果放到driver端 if (!taskSetManager.canFetchMoreResults(serializedData.limit())) { return } // deserialize "value" without holding any lock so that it won't block other threads. // We should call it here, so that when it's called again in // "TaskSetManager.handleSuccessfulTask", it does not need to deserialize the value. // 對任務結果進行反序列化,調用該方法不會引發其餘線程阻塞, directResult.value(taskResultSerializer.get()) (directResult, serializedData.limit()) case IndirectTaskResult(blockId, size) => // 檢查結果大小是否超過限制 if (!taskSetManager.canFetchMoreResults(size)) { // dropped by executor if size is larger than maxResultSize // 若是放棄了該任務,那麼須要將該任務在blockmanager中對應的block移除掉 sparkEnv.blockManager.master.removeBlock(blockId) return } logDebug("Fetching indirect task result for TID %s".format(tid)) // 這句話最終會經過DAGScheduler給事件總線投遞一條TaskGetting的事件 scheduler.handleTaskGettingResult(taskSetManager, tid) // 經過blockManager遠程拉取結果數據 // 而這個blockId對應的塊的位置信息已經在以前由executor端傳回 val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId) if (!serializedTaskResult.isDefined) { /* We won't be able to get the task result if the machine that ran the task failed * between when the task ended and when we tried to fetch the result, or if the * block manager had to flush the result. */ // 這裏拉取數據失敗分爲兩種狀況:一種是因爲任務序列化後體積太大主動丟棄 // 另外一種是executor節點網絡異常,致使拉取失敗 // 這兩種狀況都算做任務失敗 // 這個方法主要是對失敗的任務從新運行 scheduler.handleFailedTask( taskSetManager, tid, TaskState.FINISHED, TaskResultLost) return } // 將從blockManager拉取到的數據進行反序列化 val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]]( serializedTaskResult.get.toByteBuffer) // force deserialization of referenced value // 對任務結果進行反序列化 deserializedResult.value(taskResultSerializer.get()) // 將block移除,由於數據已經拉取到了 sparkEnv.blockManager.master.removeBlock(blockId) (deserializedResult, size) } // Set the task result size in the accumulator updates received from the executors. // We need to do this here on the driver because if we did this on the executors then // we would have to serialize the result again after updating the size. // 處理累加器,主要是對任務結果大小的統計量須要作特殊處理 result.accumUpdates = result.accumUpdates.map { a => // 對於任務結果大小的統計量須要作特殊處理 if (a.name == Some(InternalAccumulator.RESULT_SIZE)) { val acc = a.asInstanceOf[LongAccumulator] assert(acc.sum == 0L, "task result size should not have been set on the executors") acc.setValue(size.toLong) acc } else { a } } // 將反序列化好的結果數據告訴TaskSchedulerImpl作進一步處理 scheduler.handleSuccessfulTask(taskSetManager, tid, result) } catch { case cnf: ClassNotFoundException => val loader = Thread.currentThread.getContextClassLoader taskSetManager.abort("ClassNotFound with classloader: " + loader) // Matching NonFatal so we don't catch the ControlThrowable from the "return" above. case NonFatal(ex) => logError("Exception while getting task result", ex) taskSetManager.abort("Exception while getting task result: %s".format(ex)) } } }) }
這裏會有好幾回反序列化,這時由於在executor端對任務結果數據處理時就是通過了好幾回序列化,apache
能夠看到在結果傳回driver端後,是按照與上面相反的順序進行反序列化的。
最後拿到任務運行的結果數據之後,將結果數據交給TaskSchedulerImpl作進一步處理。後端
def handleSuccessfulTask( taskSetManager: TaskSetManager, tid: Long, taskResult: DirectTaskResult[_]): Unit = synchronized { taskSetManager.handleSuccessfulTask(tid, taskResult) }
def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = { // 更新一些簿記量 val info = taskInfos(tid) val index = info.index info.markFinished(TaskState.FINISHED, clock.getTimeMillis()) if (speculationEnabled) { successfulTaskDurations.insert(info.duration) } removeRunningTask(tid) // Kill any other attempts for the same task (since those are unnecessary now that one // attempt completed successfully). // 對於這個任務的其餘運行中的副本,所有都要殺掉,主要是推測執行機制會對同一個任務同時運行多個副本 for (attemptInfo <- taskAttempts(index) if attemptInfo.running) { logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task ${attemptInfo.id} " + s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on ${attemptInfo.host} " + s"as the attempt ${info.attemptNumber} succeeded on ${info.host}") killedByOtherAttempt(index) = true // 經過調度後端發送殺死任務的信息 sched.backend.killTask( attemptInfo.taskId, attemptInfo.executorId, interruptThread = true, reason = "another attempt succeeded") } // 檢查是否是第一次,若是是第一次纔會更新這些簿記量 // 這麼作主要是爲了防止多個任務副本屢次更新形成不一致 if (!successful(index)) { tasksSuccessful += 1 logInfo(s"Finished task ${info.id} in stage ${taskSet.id} (TID ${info.taskId}) in" + s" ${info.duration} ms on ${info.host} (executor ${info.executorId})" + s" ($tasksSuccessful/$numTasks)") // Mark successful and stop if all the tasks have succeeded. successful(index) = true // 若是所有的任務都完成了,就說明這個任務集(stage)完成了 if (tasksSuccessful == numTasks) { isZombie = true } } else { logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id + " because task " + index + " has already completed successfully") } // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not // "deserialize" the value when holding a lock to avoid blocking other threads. So we call // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here. // Note: "result.value()" only deserializes the value when it's called at the first time, so // here "result.value()" just returns the value and won't block other threads. // 進一步通知DAG調度器作進一步處理, // 這裏可見在任務提交運行是的處理順序是從DAGScheduler -> TaskScheduler -> SchedulerBackend -> executor // 而任務運行結束後結果返回處理的順序則與上面的順正好反過來。 // 此外,也能看出TaskScheduler也充當了DAGScheduler和SchedulerBackend中間人的角色,傳遞消息 sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info) // 更新一些簿記量 maybeFinishTaskSet() }
這個方法的主要工做是更新一些簿記量;殺掉其餘的任務副本;
而後通知DAGScheduler作進一步處理。api
這個方法很長,因此咱們把這個方法的主要邏輯作一個總結:緩存
處理拉取數據失敗的狀況。除了更新一些簿記量,主要作的事就是判斷是否要再次提交stage,若是不能再次提交(衝提交次數超過閾值)那麼就須要將關聯的job取消掉,不然再次提交這個stage。這裏須要注意的是,再次提交stage並不會把全部的任務所有再從新運行一遍,只會把那些因失敗而致使沒有完成的任務從新提交,經過mapOutputTrackerMaster組件追蹤mShuffleMap任務的輸出狀況。安全
private[scheduler] def handleTaskCompletion(event: CompletionEvent) { val task = event.task val taskId = event.taskInfo.id val stageId = task.stageId val taskType = Utils.getFormattedClassName(task) // 通知outputCommitCoordinator組件對任務完成的事件作一些處理 // outputCommitCoordinator組件須要對失敗的任務 outputCommitCoordinator.taskCompleted( stageId, task.partitionId, event.taskInfo.attemptNumber, // this is a task attempt number event.reason) if (!stageIdToStage.contains(task.stageId)) { // The stage may have already finished when we get this event -- eg. maybe it was a // speculative task. It is important that we send the TaskEnd event in any case, so listeners // are properly notified and can chose to handle it. For instance, some listeners are // doing their own accounting and if they don't get the task end event they think // tasks are still running when they really aren't. // 在獲取這個事件時對應的stage可能已經完成了。好比,當前完成的task多是一個推測執行的task。 // 可是,不管如何,咱們都有必要向事件總線中投遞一個任務結束的事件, // 這樣才能正確第通知監聽器,以使得監聽器可以作出正確的處理。 // 例若有的監聽器會對全部完成的任務(包括推測執行)進行計數,若是監聽器獲取不到任務完成的事件 // 他們就會認爲任務還在運行。 postTaskEnd(event) // Skip all the actions if the stage has been cancelled. // 因爲stage在以前已經被處理過了,因此這裏直接返回 return } val stage = stageIdToStage(task.stageId) // Make sure the task's accumulators are updated before any other processing happens, so that // we can post a task end event before any jobs or stages are updated. The accumulators are // only updated in certain cases. // 這裏應該思考一個問題:既然任務的多個副本可能會同時完成, // 那麼也就有可能會同時發送任務結束事件, // 也就說這個方法可能由於任務的多個副本在同一段時間內完成而被同時執行 // 那麼這裏沒有加鎖,也沒有CAS或其餘的一些同步措施,這樣不會嘗試線程不安全問題嗎?? // 答案在於EventLoop類中,這個類處理事件的線程只有一個, // 因此實際上全部的事件都是串行執行的,天然也就不會有線程不安全的問題了 // 這一步主要是處理累加器 event.reason match {
case Success =>
task match {
case rt: ResultTask[, ] =>
val resultStage = stage.asInstanceOf[ResultStage]
resultStage.activeJob match {
case Some(job) =>
// Only update the accumulator once for each result task.
// 對於ResultTask的累加器只計算一次,不會重複計算
if (!job.finished(rt.outputId)) {
updateAccumulators(event)
}
case None => // Ignore update if task's job has finished.
}
case _ =>
// 對於ShuffleMapTask則不會考慮累加器的重複計數,
// 也就意味着ShufleMapTask中執行的累加器會重複計數
updateAccumulators(event)
}
case : ExceptionFailure => updateAccumulators(event)
case =>
}
// 向事件總線投遞一個任務完成的事件
postTaskEnd(event)網絡
// 這一步主要是對做業的一些簿記量的更新維護 // 若是做業的所有分區都已完成,那麼移除掉這個做業 // 並移除做業內不被其餘做業依賴的stage的信息 event.reason match { case Success => task match { case rt: ResultTask[_, _] => // Cast to ResultStage here because it's part of the ResultTask // TODO Refactor this out to a function that accepts a ResultStage val resultStage = stage.asInstanceOf[ResultStage] resultStage.activeJob match { case Some(job) => if (!job.finished(rt.outputId)) { job.finished(rt.outputId) = true job.numFinished += 1 // If the whole job has finished, remove it // 若是做業的所有分區都已完成,那麼移除掉這個做業 // 並移除做業內不被其餘做業依賴的stage的信息 if (job.numFinished == job.numPartitions) { // 把這個stage標記爲已完成 markStageAsFinished(resultStage) // 移除做業內不被其餘做業依賴的stage的信息 cleanupStateForJobAndIndependentStages(job) // 向事件總線追蹤投遞一個做業結束的事件 listenerBus.post( SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded)) } // taskSucceeded runs some user code that might throw an exception. Make sure // we are resilient against that. // 最後,須要調用做業監聽器的回調函數,以通知做業監聽器 try { job.listener.taskSucceeded(rt.outputId, event.result) } catch { case e: Exception => // TODO: Perhaps we want to mark the resultStage as failed? job.listener.jobFailed(new SparkDriverExecutionException(e)) } } case None => logInfo("Ignoring result from " + rt + " because its job has finished") } // 處理shuffleMapTask的狀況 case smt: ShuffleMapTask => val shuffleStage = stage.asInstanceOf[ShuffleMapStage] val status = event.result.asInstanceOf[MapStatus] val execId = status.location.executorId logDebug("ShuffleMapTask finished on " + execId) if (stageIdToStage(task.stageId).latestInfo.attemptNumber == task.stageAttemptId) { // This task was for the currently running attempt of the stage. Since the task // completed successfully from the perspective of the TaskSetManager, mark it as // no longer pending (the TaskSetManager may consider the task complete even // when the output needs to be ignored because the task's epoch is too small below. // In this case, when pending partitions is empty, there will still be missing // output locations, which will cause the DAGScheduler to resubmit the stage below.) // 若是若是task的stageAttemptId與當前最新的stage信息相同, // 說明該任務已經完成 shuffleStage.pendingPartitions -= task.partitionId } // 若是這個任務的epoch比被標記爲失敗的epoch要早,那麼忽略此次運行結果 if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { logInfo(s"Ignoring possibly bogus $smt completion from executor $execId") } else { // The epoch of the task is acceptable (i.e., the task was launched after the most // recent failure we're aware of for the executor), so mark the task's output as // available. // 這個任務的epoch被接收,那麼在mapOutputTracker組件中將這個任務標記爲成功 // 而後就能經過mapOutputTracker組件獲取到這個分區的結果狀態了 mapOutputTracker.registerMapOutput( shuffleStage.shuffleDep.shuffleId, smt.partitionId, status) // Remove the task's partition from pending partitions. This may have already been // done above, but will not have been done yet in cases where the task attempt was // from an earlier attempt of the stage (i.e., not the attempt that's currently // running). This allows the DAGScheduler to mark the stage as complete when one // copy of each task has finished successfully, even if the currently active stage // still has tasks running. // 一樣將這個分區標記爲已完成 shuffleStage.pendingPartitions -= task.partitionId } // 若是stage的全部分區都已完成,那麼將這個stage標記爲已完成 if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) { markStageAsFinished(shuffleStage) logInfo("looking for newly runnable stages") logInfo("running: " + runningStages) logInfo("waiting: " + waitingStages) logInfo("failed: " + failedStages) // This call to increment the epoch may not be strictly necessary, but it is retained // for now in order to minimize the changes in behavior from an earlier version of the // code. This existing behavior of always incrementing the epoch following any // successful shuffle map stage completion may have benefits by causing unneeded // cached map outputs to be cleaned up earlier on executors. In the future we can // consider removing this call, but this will require some extra investigation. // See https://github.com/apache/spark/pull/17955/files#r117385673 for more details. mapOutputTracker.incrementEpoch() // 清除RDD的分區結果位置緩存 // 以便在訪問緩存是從新從blockManager中或rdd分區結果的位置信息 clearCacheLocs() if (!shuffleStage.isAvailable) { // Some tasks had failed; let's resubmit this shuffleStage. // 若是有部分任務失敗,那麼須要從新提交這個stage // TODO: Lower-level scheduler should also deal with this logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name + ") because some of its tasks had failed: " + shuffleStage.findMissingPartitions().mkString(", ")) submitStage(shuffleStage) } else { // Mark any map-stage jobs waiting on this stage as finished // 將全部依賴於這個stage的job標記爲運行結束 if (shuffleStage.mapStageJobs.nonEmpty) { val stats = mapOutputTracker.getStatistics(shuffleStage.shuffleDep) for (job <- shuffleStage.mapStageJobs) { markMapStageJobAsFinished(job, stats) } } // 提價下游的子stage submitWaitingChildStages(shuffleStage) } } } //處理重複提交的狀況 case Resubmitted => logInfo("Resubmitted " + task + ", so marking it as still running") stage match { case sms: ShuffleMapStage => sms.pendingPartitions += task.partitionId case _ => assert(false, "TaskSetManagers should only send Resubmitted task statuses for " + "tasks in ShuffleMapStages.") } // 處理拉取數據失敗的狀況 case FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) => val failedStage = stageIdToStage(task.stageId) val mapStage = shuffleIdToMapStage(shuffleId) // 若是這個任務的attempId與stage最近一次的attemptId不一樣, // 那麼忽略這個異常,由於又一次更新的stage的嘗試正在運行中 if (failedStage.latestInfo.attemptNumber != task.stageAttemptId) { logInfo(s"Ignoring fetch failure from $task as it's from $failedStage attempt" + s" ${task.stageAttemptId} and there is a more recent attempt for that stage " + s"(attempt ${failedStage.latestInfo.attemptNumber}) running") } else { // It is likely that we receive multiple FetchFailed for a single stage (because we have // multiple tasks running concurrently on different executors). In that case, it is // possible the fetch failure has already been handled by the scheduler. // 將這個stage標記爲已結束 if (runningStages.contains(failedStage)) { logInfo(s"Marking $failedStage (${failedStage.name}) as failed " + s"due to a fetch failure from $mapStage (${mapStage.name})") markStageAsFinished(failedStage, Some(failureMessage)) } else { logDebug(s"Received fetch failure from $task, but its from $failedStage which is no " + s"longer running") } // 把拉取失敗的stage的attemptId記錄下來 failedStage.fetchFailedAttemptIds.add(task.stageAttemptId) // 若是stage的嘗試次數已經超過最大容許值,那麼將直接將取消該stage val shouldAbortStage = failedStage.fetchFailedAttemptIds.size >= maxConsecutiveStageAttempts || disallowStageRetryForTest if (shouldAbortStage) { val abortMessage = if (disallowStageRetryForTest) { "Fetch failure will not retry stage due to testing config" } else { s"""$failedStage (${failedStage.name}) |has failed the maximum allowable number of |times: $maxConsecutiveStageAttempts. |Most recent failure reason: $failureMessage""".stripMargin.replaceAll("\n", " ") } // 取消這個stage, 作一些處理 abortStage(failedStage, abortMessage, None) } else { // update failedStages and make sure a ResubmitFailedStages event is enqueued // TODO: Cancel running tasks in the failed stage -- cf. SPARK-17064 val noResubmitEnqueued = !failedStages.contains(failedStage) // 將這個stage添加到失敗的stage隊列中, // 這個隊列是等待從新提交的stage隊列 failedStages += failedStage failedStages += mapStage if (noResubmitEnqueued) { // We expect one executor failure to trigger many FetchFailures in rapid succession, // but all of those task failures can typically be handled by a single resubmission of // the failed stage. We avoid flooding the scheduler's event queue with resubmit // messages by checking whether a resubmit is already in the event queue for the // failed stage. If there is already a resubmit enqueued for a different failed // stage, that event would also be sufficient to handle the current failed stage, but // producing a resubmit for each failed stage makes debugging and logging a little // simpler while not producing an overwhelming number of scheduler events. logInfo( s"Resubmitting $mapStage (${mapStage.name}) and " + s"$failedStage (${failedStage.name}) due to fetch failure" ) // 200毫秒以後給內部的事件處理線程發送一個從新提交stage的事件 // 以通知DAGSchedduler從新提交失敗的stage messageScheduler.schedule( new Runnable { override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS ) } } // Mark the map whose fetch failed as broken in the map stage // 從mapOutputTracker中將這個任務的map輸出信息移除掉 if (mapId != -1) { mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress) } // TODO: mark the executor as failed only if there were lots of fetch failures on it // 將拉取失敗的block所在的executor移除掉,通知DriverEndpoint移除 // 而且在blockManagerMaster中將對應的executor上的全部block信息所有移除 if (bmAddress != null) { val hostToUnregisterOutputs = if (env.blockManager.externalShuffleServiceEnabled && unRegisterOutputOnHostOnFetchFailure) { // We had a fetch failure with the external shuffle service, so we // assume all shuffle data on the node is bad. Some(bmAddress.host) } else { // Unregister shuffle data just for one executor (we don't have any // reason to believe shuffle data has been lost for the entire host). None } removeExecutorAndUnregisterOutputs( execId = bmAddress.executorId, fileLost = true, hostToUnregisterOutputs = hostToUnregisterOutputs, maybeEpoch = Some(task.epoch)) } } case commitDenied: TaskCommitDenied => // Do nothing here, left up to the TaskScheduler to decide how to handle denied commit case exceptionFailure: ExceptionFailure => // Nothing left to do, already handled above for accumulator updates. case TaskResultLost => // Do nothing here; the TaskScheduler handles these failures and resubmits the task. case _: ExecutorLostFailure | _: TaskKilled | UnknownReason => // Unrecognized failure - also do nothing. If the task fails repeatedly, the TaskScheduler // will abort the job. } }