Task執行容錯實現原理

Spark在調度和執行任務時具備容錯能力,所謂任務的容錯能力其實就是任務失敗重試的能力,當Task失敗時,Spark會從新提交Task,那麼Spark是如何得知Task失敗?得知Task失敗後是由誰來負責重試的呢?緩存

本文分析Spark任務執行時容錯機制的實現原理。框架

Task狀態更新消息

當Executor端的Task狀態發生改變時,會經過與Executor對應的執行器後臺(即:CoarseGrainedExecutorBackend)的statusUpdate函數來向Driver端的調度後臺發送更新後的狀態信息。ide

向Driver端的調度後臺彙報的Task狀態主要有如下幾種:函數

Task狀態 說明
TaskState.RUNNING 啓動Task(啓動TaskRunner線程)時發送該狀態信息。
TaskState.FINISHED Task執行完成時,發送該狀態信息。並會發送任務執行的結果。
TaskState.KILLED Task執行線程被殺掉時,發送該狀態信息。可能會發送被殺掉的緣由。
TaskState.FAILED Task任務執行失敗時,發送該狀態信息。並會發送失敗緣由。
TaskState.LOST Task任務執行失敗,且執行線程退出。

Task狀態更新消息的傳遞過程如圖1所示:oop

image.png

       圖1 Task狀態更新處理流程
spa

在Executor中,若任務執行線程TaskRunner發生了異常或者是執行完成,就會調用執行器後臺的statusUpdate函數,該函數會經過Executor的RPC發送客戶端向Driver的調度後臺發送StatusUpdate消息。該消息中包含Task的最新狀態信息和緣由(也可能沒有)。線程

Driver端處理Task狀態信息

Driver處理Executor任務失敗狀態消息

當Driver端的調度後臺服務(RPC服務)接收到StatusUpdate消息時,會調用TaskSchedulerImpl#statusUpdate來處理該消息。orm

若Task的狀態是:TaskState.FAILED、TaskState.KILLED或TaskState.LOST,說明Task失敗了,此時會先獲取Task失敗的緣由,而後調用TaskSchedulerImpl#handleFailedTask函數來進行處理,該函數最終會調用TaskSetManager#handleFailedTask函數來處理Task的失敗。在該函數中,會根據Task的失敗緣由採起不一樣的處理流程。對象

這些Task失敗的緣由可能有如下一些:blog

錯誤碼 緣由說明
FetchFailed Task從遠端節點獲取shuffle數據失敗。多是因爲遠端的Executor失敗了,所以須要從新運行前一個Stage。
ExceptionFailure 因爲Task運行時發生異常而產生了錯誤。能夠經過代碼來捕捉異常信息。
TaskKilled Task被殺死,須要從新調度執行。
ExecutorLostFailure Task失敗。由於在執行時,執行任務的Executor由於與Task運行無關的緣由而退出。這種錯誤不會

最後,會調用DAGScheduler#taskEnded函數來向DAGScheduler的事件處理框架發送CompletionEvent事件。DAGScheduler會在事件處理框架中處理該事件。

該處理過程如圖2所示:

image.png

圖2 任務失敗處理流程

處理CompletionEvent事件

如圖2所示,當Task結束或失敗時,會向DAGScheduler的事件處理框架發送CompletionEvent事件。也就是說,該事件意味着Executor端執行的某個Task已經結束了或則失敗了,此時須要決定那個stage須要從新發送。

該事件的處理流程以下:

1)根據CompletionEvent對象獲取task對象,並根據task對象獲取stage對象

2)若任務執行成功(CompletionEvent對象中的reason是Success),則根據任務的類型來處理獲取並處理執行結果。

3)若任務執行失敗的緣由是:FetchFailed,說明是Task獲取遠端的shuffle數據失敗。此時,先判斷是否應該放棄繼續嘗試提交,若嘗試次數大於參數spark.stage.maxConsecutiveAttempts設定的值(默認是4)則放棄該Stage,宣告Stage執行失敗。若小於4,則經過DAGSchedulerEventProcessLoop發送ResubmitFailedStages事件,再次嘗試提交該Stage。

ResubmitFailedStages事件的產生

由任務完成處理函數,發起。當任務完成時,會向DAGScheduler的事件處理框架:DAGSchedulerEventProcessLoop發送CompletionEvent事件。在DAGScheduler事件處理框架中,該事件會經過DAGScheduler#handleTaskCompletion函數來進行處理。

處理ResubmitFailedStages事件

  1. 遍歷failedStages集合,

  2. 清除DAGScheduler的緩存位置信息,此時緩存可能已經失效了。

  3. 清空失敗的Stage集合:failedStages;每次提交後,就須要清空一次,避免下次重複提交。該集合會根據Executor端的狀態來更新。

  4. 遍歷失敗的Stage集合,而後調用DAGScheduler#submitStage來提交Stage。

小結

本文講述了Task提交容錯的實現過程。當在Executor端執行的任務失敗時,會把結束狀態和緣由發送給Driver端,若知足重試條件,則會發起ResubmitFailedStages事件,經過DAGScheduler來繼續嘗試提交Task失敗所在的Stage。

相關文章
相關標籤/搜索