Spark 源碼系列(四)圖解做業生命週期

這一章咱們探索了 Spark 做業的運行過程,可是沒把整個過程描繪出來,好,跟着我走吧,let you know!html

img

咱們先回顧一下這個圖,Driver Program 是咱們寫的那個程序,它的核心是 SparkContext,回想一下,從 api 的使用角度,RDD 都必須經過它來得到。apache

下面講一講它所不爲認知的一面,它和其它組件是如何交互的。api

Driver 向 Master 註冊 Application 過程

SparkContext 實例化以後,在內部實例化兩個很重要的類,DAGScheduler 和 TaskScheduler。app

在 standalone 的模式下,TaskScheduler 的實現類是 TaskSchedulerImpl,在初始化它的時候 SparkContext 會傳入一個 SparkDeploySchedulerBackend。函數

在 SparkDeploySchedulerBackend 的 start 方法裏面啓動了一個 AppClient。ui

val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", args,
    sc.executorEnvs, classPathEntries, libraryPathEntries, extraJavaOpts)
    val sparkHome = sc.getSparkHome()
    val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
    sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logDir))
    client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
    client.start()
複製代碼

maxCores 是由參數 spark.cores.max 來指定的,executorMemoy 是由 spark.executor.memory 指定的。this

AppClient 啓動以後就會去向 Master 註冊 Applicatoin 了,後面的過程我用圖來表達。spa

img

上面的圖中涉及到了三方通訊,具體的過程以下:線程

一、Driver 經過 AppClient 向 Master 發送了 RegisterApplication 消息來註冊 Application,Master 收到消息以後會發送 RegisteredApplication 通知 Driver 註冊成功,Driver 的接收類仍是 AppClient。scala

二、Master 接受到 RegisterApplication 以後會觸發調度過程,在資源足夠的狀況下會向 Woker 和 Driver 分別發送 LaunchExecutor、ExecutorAdded 消息。

三、Worker 接收到 LaunchExecutor 消息以後,會執行消息中攜帶的命令,執行 CoarseGrainedExecutorBackend 類 (圖中僅以它繼承的接口 ExecutorBackend 代替),執行完畢以後會發送 ExecutorStateChanged 消息給 Master。

四、Master 接收 ExecutorStateChanged 以後,當即發送 ExecutorUpdated 消息通知 Driver。

五、Driver 中的 AppClient 接收到 Master 發過來的 ExecutorAdded 和 ExecutorUpdated 後進行相應的處理。

六、啓動以後的 CoarseGrainedExecutorBackend 會向 Driver 發送 RegisterExecutor 消息。

七、Driver 中的 SparkDeploySchedulerBackend(具體代碼在 CoarseGrainedSchedulerBackend 裏面)接收到 RegisterExecutor 消息,回覆註冊成功的消息 RegisteredExecutor 給 ExecutorBackend,而且立馬準備給它發送任務。

八、CoarseGrainedExecutorBackend 接收到 RegisteredExecutor 消息以後,實例化一個 Executor 等待任務的到來。

資源的調度

好,在咱們講完了整個註冊 Application 的通訊過程以後,其中一個比較重要的地方是它的調度這塊,它是怎麼調度的?這也是我在前面爲何那麼強調 maxCores 和 executorMemoy 的緣由。

細心的讀者若是看了第一章《spark-submit 提交做業過程》的就知道,其實我已經講過調度了,由於當時不知道這個 app 是啥。可是如今咱們知道 app 是啥了。代碼我不就貼了,總結一下吧。

一、先調度 Driver,再調度 Application。

二、它的調度 Application 的方式是先進先出,因此就不要奇怪爲何你的 App 總得不到調度了,就像去北京的醫院看病,去晚了號就沒了,是一個道理。

三、Executor 的分配方式有兩種,一種是傾向於把任務分散在多個節點上,一種是在儘可能少的節點上運行,由參數spark.deploy.spreadOut 參數來決定的,默認是 true,把任務分散到多個節點上。

遍歷全部等待的 Application,給它分配 Executor 運行的 Worker,默認分配方式以下:

一、先從 workers 當中選出內存大於 executorMemoy 的 worker,而且按照空閒 cpu 數從大到小的順序來排序。

二、遍歷 worker,從可用的 worker 分配須要的 cpu 數,每一個 worker 提供一個 cpu 核心,直到 cpu 數不足或者 maxCores 分配完畢。

三、給選出來的 worker 發送任務,讓它們啓動 Executor,每一個 Executor 佔用的內存是咱們設定的 executorMemoy。

資源調度的過程大致是這樣的,說到這裏有些童鞋在有點兒疑惑了,那咱們任務調度裏面 FIFO/FAIR 調度是在哪裏用的?任務調度器調度的不是 Application,而是你的代碼裏面被解析出來的全部 Task,這在上一章當中有提到。

基於這個緣由,在共用 SparkContext 的狀況下,好比 Shark、JobServer 什麼的,任務調度器的做用纔會明顯。

Driver 向 Executor 發佈 Task 過程

下面咱們講一講 Driver 向 Executor 發佈 Task 過程,這在上一章是講過的,如今把圖給你們放出來了。

img

一、Driver 程序的代碼運行到 action 操做,觸發了 SparkContext 的 runJob 方法。

二、SparkContext 比較懶,轉手就交給 DAGScheduler。

三、DAGScheduler 把 Job 劃分 stage,而後把 stage 轉化爲相應的 Tasks,把 Tasks 交給 TaskScheduler。

四、經過 TaskScheduler 把 Tasks 添加到任務隊列當中,轉手就交給 SchedulerBackend 了。

五、調度器給 Task 分配執行 Executor,ExecutorBackend 負責執行 Task 了。

補充:ExecutorBackend 執行 Task,是經過它內部的 Executor 來執行的,Executor 內部有個線程池,new 了一個 TaskRunner 交給線程池了。

Task 狀態更新

Task 執行是經過 TaskRunner 來運行,它須要經過 ExecutorBackend 和 Driver 通訊,通訊消息是 StatusUpdate:

一、Task 運行以前,告訴 Driver 當前 Task 的狀態爲 TaskState.RUNNING。

二、Task 運行以後,告訴 Driver 當前 Task 的狀態爲 TaskState.FINISHED,並返回計算結果。

三、若是 Task 運行過程當中發生錯誤,告訴 Driver 當前 Task 的狀態爲 TaskState.FAILED,並返回錯誤緣由。

四、若是 Task 在中途被 Kill 掉了,告訴 Driver 當前 Task 的狀態爲 TaskState.FAILED。

下面講的是運行成功的狀態,具體過程以文字爲主。

img

一、Task 運行結束以後,調用 ExecutorBackend 的 statusUpdate 方法,把結果返回。結果超過 10M,就把結果保存在 blockManager 處,返回 blockId,須要的時候經過 blockId 到 blockManager 認領。

二、ExecutorBackend 直接向 Driver 發送 StatusUpdate 返回 Task 的信息。

三、Driver(這裏具體指的是 SchedulerBackend)接收到 StatusUpdate 消息以後,調用 TaskScheduler 的 statusUpdate 方法,而後準備給 ExecutorBackend 發送下一批 Task。

四、TaskScheduler 經過 TaskId 找到管理這個 Task 的 TaskSetManager(負責管理一批 Task 的類),從 TaskSetManager 裏面刪掉這個 Task,並把 Task 插入到 TaskResultGetter(負責獲取 Task 結果的類)的成功隊列裏。

五、TaskResultGetter 獲取到結果以後,調用 TaskScheduler 的 handleSuccessfulTask 方法把結果返回。

六、TaskScheduler 調用 TaskSetManager 的 handleSuccessfulTask 方法,處理成功的 Task。

七、TaskSetManager 調用 DAGScheduler 的 taskEnded 方法,告訴 DAGScheduler 這個 Task 運行結束了,若是這個時候 Task 所有成功了,就會結束 TaskSetManager。

八、DAGScheduler 在 taskEnded 方法裏觸發 CompletionEvent 事件,CompletionEvent 分 ResultTask 和 ShuffleMapTask 來處理。

  1)ResultTask:job 的 numFinished 加 1,若是 numFinished 等於它的分片數,則表示任務該 Stage 結束,標記這個 Stage 爲結束,最後調用 JobListener(具體實如今 JobWaiter)的 taskSucceeded 方法,把結果交給 resultHandler(通過包裝的本身寫的那個匿名函數)處理,若是完成的 Task 數量等於總任務數,任務退出。

  2)ShuffleMapTask:

   (1)調用 Stage 的 addOutputLoc 方法,把結果添加到 Stage 的 outputLocs 列表裏。

   (2)若是該 Stage 沒有等待的 Task 了,就標記該 Stage 爲結束。

   (3)把 Stage 的 outputLocs 註冊到 MapOutputTracker 裏面,留個下一個 Stage 用。

   (4)若是 Stage 的 outputLocs 爲空,表示它的計算失敗,從新提交 Stage。

   (5)找出下一個在等待而且沒有父親的 Stage 提交。

參考文獻

Spark源碼解析四

相關文章
相關標籤/搜索