Spark底層原理詳細解析(深度好文,建議收藏)

Spark簡介

Apache Spark是用於大規模數據處理的統一分析引擎,基於內存計算,提升了在大數據環境下數據處理的實時性,同時保證了高容錯性高可伸縮性,容許用戶將Spark部署在大量硬件之上,造成集羣。算法

Spark源碼從1.x的40w行發展到如今的超過100w行,有1400多位大牛貢獻了代碼。整個Spark框架源碼是一個巨大的工程。下面咱們一塊兒來看下spark的底層執行原理。多線程

Spark運行流程

Spark運行流程

具體運行流程以下:架構

  1. SparkContext 向資源管理器註冊並向資源管理器申請運行Executor框架

  2. 資源管理器分配Executor,而後資源管理器啓動Executoride

  3. Executor 發送心跳至資源管理器函數

  4. SparkContext 構建DAG有向無環圖學習

  5. 將DAG分解成Stage(TaskSet)大數據

  6. 把Stage發送給TaskScheduler優化

  7. Executor 向 SparkContext 申請 Taskspa

  8. TaskScheduler 將 Task 發送給 Executor 運行

  9. 同時 SparkContext 將應用程序代碼發放給 Executor

  10. Task 在 Executor 上運行,運行完畢釋放全部資源

1. 從代碼角度看DAG圖的構建

Val lines1 = sc.textFile(inputPath1).map(...).map(...)

Val lines2 = sc.textFile(inputPath2).map(...)

Val lines3 = sc.textFile(inputPath3)

Val dtinone1 = lines2.union(lines3)

Val dtinone = lines1.join(dtinone1)

dtinone.saveAsTextFile(...)

dtinone.filter(...).foreach(...)

上述代碼的DAG圖以下所示:

構建DAG圖

Spark內核會在須要計算髮生的時刻繪製一張關於計算路徑的有向無環圖,也就是如上圖所示的DAG。

Spark 的計算髮生在RDD的Action操做,而對Action以前的全部Transformation,Spark只是記錄下RDD生成的軌跡,而不會觸發真正的計算

2. 將DAG劃分爲Stage核心算法

一個Application能夠有多個job多個Stage:

Spark Application中能夠由於不一樣的Action觸發衆多的job,一個Application中能夠有不少的job,每一個job是由一個或者多個Stage構成的,後面的Stage依賴於前面的Stage,也就是說只有前面依賴的Stage計算完畢後,後面的Stage纔會運行。

劃分依據:

Stage劃分的依據就是寬依賴,像reduceByKey,groupByKey等算子,會致使寬依賴的產生。

回顧下寬窄依賴的劃分原則:  
窄依賴:父RDD的一個分區只會被子RDD的一個分區依賴。即一對一或者多對一的關係,可理解爲獨生子女。  常見的窄依賴有:map、filter、union、mapPartitions、mapValues、join(父RDD是hash-partitioned)等。      
寬依賴:父RDD的一個分區會被子RDD的多個分區依賴(涉及到shuffle)。即一對多的關係,可理解爲超生。 常見的寬依賴有groupByKey、partitionBy、reduceByKey、join(父RDD不是hash-partitioned)等。

核心算法:回溯算法

從後往前回溯/反向解析,遇到窄依賴加入本Stage,碰見寬依賴進行Stage切分。

Spark內核會從觸發Action操做的那個RDD開始從後往前推,首先會爲最後一個RDD建立一個Stage,而後繼續倒推,若是發現對某個RDD是寬依賴,那麼就會將寬依賴的那個RDD建立一個新的Stage,那個RDD就是新的Stage的最後一個RDD。
而後依次類推,繼續倒推,根據窄依賴或者寬依賴進行Stage的劃分,直到全部的RDD所有遍歷完成爲止。

3. 將DAG劃分爲Stage剖析

DAG劃分Stage

一個Spark程序能夠有多個DAG(有幾個Action,就有幾個DAG,上圖最後只有一個Action(圖中未表現),那麼就是一個DAG)

一個DAG能夠有多個Stage(根據寬依賴/shuffle進行劃分)。

同一個Stage能夠有多個Task並行執行(task數=分區數,如上圖,Stage1 中有三個分區P一、P二、P3,對應的也有三個 Task)。

能夠看到這個DAG中只reduceByKey操做是一個寬依賴,Spark內核會以此爲邊界將其先後劃分紅不一樣的Stage。

同時咱們能夠注意到,在圖中Stage1中,從textFile到flatMap到map都是窄依賴,這幾步操做能夠造成一個流水線操做,經過flatMap操做生成的partition能夠不用等待整個RDD計算結束,而是繼續進行map操做,這樣大大提升了計算的效率

4. 提交Stages

調度階段的提交,最終會被轉換成一個任務集的提交,DAGScheduler經過TaskScheduler接口提交任務集,這個任務集最終會觸發TaskScheduler構建一個TaskSetManager的實例來管理這個任務集的生命週期,對於DAGScheduler來講,提交調度階段的工做到此就完成了。

而TaskScheduler的具體實現則會在獲得計算資源的時候,進一步經過TaskSetManager調度具體的任務到對應的Executor節點上進行運算。

5. 監控Job、Task、Executor

  1. DAGScheduler監控Job與Task:

要保證相互依賴的做業調度階段可以獲得順利的調度執行,DAGScheduler須要監控當前做業調度階段乃至任務的完成狀況。

這經過對外暴露一系列的回調函數來實現的,對於TaskScheduler來講,這些回調函數主要包括任務的開始結束失敗、任務集的失敗,DAGScheduler根據這些任務的生命週期信息進一步維護做業和調度階段的狀態信息。

  1. DAGScheduler監控Executor的生命狀態:

TaskScheduler經過回調函數通知DAGScheduler具體的Executor的生命狀態,若是某一個Executor崩潰了,則對應的調度階段任務集的ShuffleMapTask的輸出結果也將標誌爲不可用,這將致使對應任務集狀態的變動,進而從新執行相關計算任務,以獲取丟失的相關數據

6. 獲取任務執行結果

  1. 結果DAGScheduler:

一個具體的任務在Executor中執行完畢後,其結果須要以某種形式返回給DAGScheduler,根據任務類型的不一樣,任務結果的返回方式也不一樣。

  1. 兩種結果,中間結果與最終結果:

對於FinalStage所對應的任務,返回給DAGScheduler的是運算結果自己。

而對於中間調度階段對應的任務ShuffleMapTask,返回給DAGScheduler的是一個MapStatus裏的相關存儲信息,而非結果自己,這些存儲位置信息將做爲下一個調度階段的任務獲取輸入數據的依據。

  1. 兩種類型,DirectTaskResult與IndirectTaskResult

根據任務結果大小的不一樣,ResultTask返回的結果又分爲兩類:

若是結果足夠小,則直接放在DirectTaskResult對象內中。

若是超過特定尺寸則在Executor端會將DirectTaskResult先序列化,再把序列化的結果做爲一個數據塊存放在BlockManager中,而後將BlockManager返回的BlockID放在IndirectTaskResult對象中返回給TaskScheduler,TaskScheduler進而調用TaskResultGetter將IndirectTaskResult中的BlockID取出並經過BlockManager最終取得對應的DirectTaskResult。

7. 任務調度整體詮釋

一張圖說明任務整體調度:

任務整體調度

Spark運行架構特色

1. Executor進程專屬

每一個Application獲取專屬的Executor進程,該進程在Application期間一直駐留,並以多線程方式運行Tasks

Spark Application不能跨應用程序共享數據,除非將數據寫入到外部存儲系統。如圖所示:

Executor進程專屬

2. 支持多種資源管理器

Spark與資源管理器無關,只要可以獲取Executor進程,並能保持相互通訊就能夠了。

Spark支持資源管理器包含: Standalone、On Mesos、On YARN、Or On EC2。如圖所示:

支持多種資源管理器支持多種資源管理器

3. Job提交就近原則

提交SparkContext的Client應該靠近Worker節點(運行Executor的節點),最好是在同一個Rack(機架)裏,由於Spark Application運行過程當中SparkContext和Executor之間有大量的信息交換;

若是想在遠程集羣中運行,最好使用RPC將SparkContext提交給集羣,不要遠離Worker運行SparkContext

如圖所示:

Job提交就近原則

4. 移動程序而非移動數據的原則執行

移動程序而非移動數據的原則執行,Task採用了數據本地性和推測執行的優化機制

關鍵方法:taskIdToLocations、getPreferedLocations。

如圖所示:


數據本地性

搜索公衆號:五分鐘學大數據,帶你學習大數據技術!

相關文章
相關標籤/搜索