Spark 學習筆記 (二): 深刻Spark計算引擎

Spark 學習筆記 (二): 深刻Spark計算引擎

先來回顧一下Spark的程序運行架構:
Spark0node

  • 對於任何一個Spark程序,有且僅有一個SparkContext,其實一個SparkContext就對應了一個Drivergit

  • 一個Driver就是一個進城,運行在一個節點上,程序的main函數就運行在Driver上;github

  • main函數經過分析程序,將程序轉化成一些列Task,而後分發到各個節點的Executor上去執行;一個節點能夠運行一個或多個Executor;而後一個Executor能夠同時跑若干個Task算法

    • 每一個節點有多少個Executor,每一個Executor上有多少個Task,都是能夠由用戶指定的計算資源)架構

    • (分佈式計算:主要就是須要分佈式地調度計算資源和計算任務)併發

Job執行過程:做業、階段與任務

Spark1

Job邏輯執行圖

Job的實際執行流程比用戶頭腦中的要複雜,須要先創建邏輯執行圖(或者叫數據依賴圖),而後劃分邏輯執行圖生成DAG型的物理執行圖,而後生成具體Task執行。app

如何產生RDD,產生哪些RDD

一些典型的transformation()及其建立的RDD:框架

iterator(split) 的意思是 foreach record in the partition分佈式

Spark2

RDD的依賴關係

實際上,從parent RDD 通過轉換操做,生成RDD x的過過程當中,須要考慮的問題是主要會分爲三種:函數

  • RDD 自己的依賴關係。要生成的 RDD(之後用 RDD x 表示)是依賴一個 parent RDD,仍是多個 parent RDDs?
    • (這個比較直接能夠由代碼就可分析出好比 x = rdda.transformation(rddb) (e.g., x = a.join(b)) 就表示 RDD x 同時依賴於 RDD a 和 RDD b)
  • RDD x 中會有多少個 partition ?
    • (這個通常可由用戶只指定,不指定的話通常取 max(numPartitions[parent RDD 1], ..., numPartitions[parent RDD n])

numPartitions[parent RDD n])`)

  • RDD x 與其 parent RDDs 中 partition 之間是什麼依賴關係?是依賴 parent RDD 中一個仍是多個 partition?

主要須要考慮的是最後一個問題。這要結合不一樣 transformation() 的語義,不一樣的 transformation() 的依賴關係不一樣。

RDD x 中每一個 partition 能夠依賴於 parent RDD 中一個或者多個 partition。並且這個依賴能夠是NarrowDependency 徹底依賴(窄依賴)或者ShuffleDependency 部分依賴(寬依賴)

下圖展現了徹底依賴和部分依賴

Spark4

前三個是徹底依賴,RDD x 中的 partition 與 parent RDD 中的 partition/partitions 徹底相關。也就是說parent RDD中的每一個partition只會被RDD x中的一個partiton使用

最後一個是部分依賴,RDD x 中的 partition 只與 parent RDD 中的 partition 一部分數據相關,另外一部分數據與 RDD x 中的其餘 partition 相關。也就是說父RDD的每一個partiton都有可能被多個子RDD的partition使用

  • 部分依賴(寬依賴)是Spark計算的主要耗時階段,須要重點優化的部分

在 Spark 中,徹底依賴被稱爲 NarrowDependency,部分依賴被稱爲 ShuffleDependency。其實 ShuffleDependency 跟 MapReduce 中 shuffle 的數據依賴相同(mapper 將其 output 進行 partition,而後每一個 reducer 會將全部 mapper 輸出中屬 於本身的 partition 經過 HTTP fetch 獲得)

Job物理執行圖

主要的問題是:給定job的邏輯執行圖,如何生成物理執行圖(也就是 stages 和 tasks)?

邏輯執行計劃到物理執行計劃的轉化須要執行:

  • (1) 劃分Stage
  • (2) 生成Task

Spark Task的類型只有兩種:ShuffleMapTaskResultTask

問:每一個Stage的Task數目?

  • First Stage: 由hdfs block或hbase regioin 數目決定
  • Other Stages: 由用戶設置,默認與第一個階段相等

Stage劃分算法

根據RDD的依賴關係,劃分Stage:

從後往前推算,遇到ShuffleDependency就斷開,遇到NarrowDependency就將其加入該stage。 每一個stage裏面task的數目由該stage最後一個 RDD 中的partition個數決定

Spark4

物理圖的執行

生成了Stage和Task後,接下來的問題就是怎麼去執行物理圖:

回想 pipeline 的思想是 數據用的時候再算,並且數據是流到要計算的位置的

Result 產生的地方的就是要計算的位置,要肯定 「須要計算的數據」,咱們能夠從後往前推,須要哪一個 partition 就計算哪一個 partition,若是 partition 裏面沒有數據,就繼續向前推,造成 computing chain。這樣推下去,結果就是:須要首先計算出每一個 stage 最左邊的 RDD 中的某些 partition

整個 computing chain 根據數據依賴關係自後向前創建,遇到 ShuffleDependency 後造成 stage。在每一個 stage 中,每一個 RDD 中的 compute() 調用 parentRDD.iter() 來將 parent RDDs 中的 records 一個個 fetch 過來。

生成Job

用戶的 driver 程序中一旦出現 action(),就會生成一個 job

每個 job 包含 n 個 stage,最後一個 stage 產生 result。好比,第一章的 GroupByTest 例子中存在兩個 job,一共產生了兩組 result。在提交 job 過程當中,DAGScheduler 會首先劃分 stage,而後先提交無 parent stage 的 stages,並在提交過程當中肯定該 stage 的 task 個數及類型,並提交具體的 task。無 parent stage 的 stage 提交完後,依賴該 stage 的 stage 纔可以提交。從 stage 和 task 的執行角度來說,一個 stage 的 parent stages 執行完後,該 stage 才能執行

Spark 資源調度和任務調度

從TaskScheduler開始

TaskScheduler的主要做用就是得到須要處理的任務集合,並將其發送到集羣進行處理。而且還有彙報任務運行狀態的做用。 因此其是在Master端。具體有如下4個做用:

  • 接收來自Executor的心跳信息,使Master知道該Executer的BlockManager還「活着」

  • 對於失敗的任務進行重試

  • 對於stragglers(拖後腿的任務)放到其餘的節點執行

  • 向集羣提交任務集,交給集羣運行

調度和執行任務

  • 做業調度
    • FIFO或Fair

    • 優化機制:數據本地性和推測執行

  • 任務執行
    • Task被序列化後,發送到executor上執行

    • ShuffleMapTask將中間數據寫到本地,ResultTask遠程讀取數據
      • 爲何要寫在本地?
        後面的 RDD 多個分區都要去讀這個信息,若是放到內存,若是出現數據丟失,後面的全部步驟所有不能進行,違背了以前所說的須要父 RDD 分區數據所有 ready 的原則

        同一個 stage 裏面的 task 是能夠併發執行的,下一個 stage 要等前一個 stage ready

    • 數據用的時候再算,並且數據是流到要計算的位置的

  • Spark 任務調度與執行模塊,源碼閱讀與理解

Spark Shuffle過程

Shuffle是分佈式計算框架的核心s數據交換方式,它的實現方式直接決定了計算框架的性能和可擴展性

Shuffle階段主要解決的問題是:數據是怎麼經過 ShuffleDependency 流向下一個 stage 的

產生shuffle的算子:join, cogroup, 和*ByKey(reduceByKey, groupByKey, sortByKey,…

Shuffle write

Shuffle中map端輸出的數據要先寫到磁盤,而後由reduce進行拉取

4

上圖有 4 個 ShuffleMapTask 要在同一個 worker node 上運行,CPU core 數爲 2,能夠同時運行兩個 task。每一個 task 的執行結果(該 stage 的 finalRDD 中某個 partition 包含的 records)被逐一寫到本地磁盤上。每一個 task 包含 R 個緩衝區,R = reducer 個數(也就是下一個 stage 中 task 的個數),緩衝區被稱爲 bucket

ShuffleMapTask 的執行過程很簡單:先利用 pipeline 計算獲得 finalRDD 中對應 partition 的 records。每獲得一個 record 就將其送到對應的 bucket 裏,具體是哪一個 bucket 由partitioner.partition(record.getKey()))決定。每一個 bucket 裏面的數據會不斷被寫到本地磁盤上,造成一個 ShuffleBlockFile,或者簡稱 FileSegment。以後的 reducer 會去 fetch 屬於本身的FileSegment,而後進入 shuffle read 階段。

這樣的實現很簡單,但有幾個問題:

  • 產生的 FileSegment 過多

  • 緩衝區佔用內存空間大

第一個問題有一些方法去解決,下面介紹已經在 Spark 裏面實現的 FileConsolidation 方法: 一個MapTask只產生一個FileSegment;前面每一個MapTask有幾個Reduce就產生幾個FileSegment;以下圖:

5

在一個 core 上連續執行的 ShuffleMapTasks 能夠共用一個輸出文件 ShuffleFile。先執行完的 ShuffleMapTask 造成 ShuffleBlock i,後執行的 ShuffleMapTask 能夠將輸出數據直接追加到 ShuffleBlock i 後面,造成 ShuffleBlock i;數據塊在數據文件中的偏移量存儲在不一樣的索引文件中

Shuffle read

Shuffle read就是將以前Shuffle write寫的數據拉過來再作處理;

fetch 來的 records 被逐個 aggreagte 到 HashMap 中,等到全部 records 都進入 HashMap,就獲得最後的處理結果

9

參考資料

Spark的任務調度機制

相關文章
相關標籤/搜索