在 Spark 的源碼中,負責 shuffle 過程的執行、計算、處理的組件主要是 ShuffleManager。網絡
在 Spark 1.2 之前,默認的 shuffle 計算引擎是 HashShuffleManager。該 ShuffleMananger 有一個很是嚴重的弊端,就是會產生大量的磁盤文件,進而有大量的磁盤 IO 操做,比較影響性能。數據結構
所以在 Spark 1.2 以後,默認的 ShuffleManager 改爲了 SortShuffleManager。SortShuffleManager 相對來講,有了必定的改進。主要就在於,每一個 Task 在 Shuffle Write 操做時,雖然也會產生較大的磁盤文件,但最後會將全部的臨時文件合併 (merge) 成一個磁盤文件,所以每一個 Task 就只有一個磁盤文件。在下一個 Stage 的 Shuffle Read Task 拉取本身數據的時候,只要根據索引拉取每一個磁盤文件中的部分數據便可。ide
普通模式下,在 Shuffle Write 階段,每一個 Task 將數據按照 Key 進行 Hash 計算,而後按照計算結果,將相同的 Key 對應的數據寫入內存緩衝區,當內存緩衝區寫滿以後會直接溢寫到磁盤文件。這裏須要寫多少個磁盤文件,和下一個 stage 的 Shuffle Read Task 的數量一致。性能
而後,Shuffle Read 階段的每一個 Task 會拉取 Shuffle Write 階段全部相同 Key 的文件,一遍拉取一遍聚合。每一個 Shuffle Read 階段的 Task 都有本身的緩衝區,每次只能拉取與緩衝區大小一致的數據,而後經過內存中的 Map 進行聚合等操做,聚合完一批再取下一批數據。大數據
好比,當前 Stage 有 5 個 Executor,每一個 Executor 分配一個 cpu core,有 50 個 task,每一個 Executor 執行 10 個 task;下一個 stage 有100 個 task。那麼在 Shuffle Write 階段每一個 task 要建立 100 個磁盤文件,每一個 Executor 進程要建立 1000 個文件,一共要建立 1000 * 5 = 5000 個磁盤文件,數量不少。優化
具體執行原理圖以下圖所示:spa
針對 HashShuffleManager 咱們能夠設置一個參數:spark.shuffle.consolidateFiles
。這個參數的值默認是 fasle,若是設置成 true 以後就會開啓優化機制。3d
當開啓這個參數以後,在 Shuffle Write 階段寫文件的時候會複用文件,每一個 task 不會爲 Shuffle Read 階段的 task 都建立一份文件。此時會出現一個 shuffleFileGroup 的概念,每一個 shuffleFileGroup 會對應一批磁盤文件,磁盤文件的數量和 Shuffle Read 階段的 task 數量一致。每一個 Executor 上有多少個 cpu core 就會並行執行幾個 task,每一個 task 會建立一個 shuffleFileGroup,而後後續並行執行的 task 會複用前面生成的這個 shuffleFileGroup。code
好比,當前 stage 有 5 個 Executor,每一個 Executor 分配 3 個 cpu core,一共有 50 個 task,每一個 Executor 執行 10 個 task,Shuffle Read 階段有 100 個 task。那麼此時,每一個 Executor 進程會建立 3 * 100 個文件,一共會建立 5 * 3 * 100 個文件。cdn
具體原理如圖示:
SortShuffleManager 運行機制有兩種,一種是普通運行機制,另外一種是 bypass 運行機制。當 shuffle read task 的數量小於等於 spark.shuffle.sort.bypassMergeThreshold
參數值時 (默認是 200 ) ,就會啓用 bypass 機制。
在該模式下,Shuffle Write 階段會將數據寫入一個內存的數據結構中,此時根據不一樣的算子會有不一樣的數據結構。好比是 reduceByKey 這種聚合類的 shuffle 算子,會選用 Map 數據結構,一遍用 Map 進行聚合(HashShuffleManager 聚合操做是放在 Shuffle Read 階段),一遍寫入內存;若是是 join 相關的普通 shuffle 算子的話,會用 Array 數據結構,直接寫入內存。當內存達到臨界閾值以後,會將內存中的數據進行排序,而後分批次寫入磁盤 (默認每批次有 1W 條數據),在寫入磁盤的時候不會像 HashShuffleManager 那樣直接寫入磁盤,這裏會先寫入內存緩衝流,當緩衝流滿溢以後一次性寫入磁盤。
此時也會生成大批量的文件,最後會將以前全部的臨時磁盤文件進行合併,這就是 merge 過程 (就是將全部的臨時磁盤文件中的數據讀取出來,而後依次寫入最終的文件中)。每一個 task 最終會生成一份磁盤文件和一份索引文件,索引文件中標示了下游每一個 task 的數據在文件中的 start offset 和 end offset。
好比,當前 stage 有 5 個 Executor,每一個 Executor 分配 1 個 cpu core,共有 50 個 task,每一個 Executor 執行 10 個 task;下一個 stage 有 100 個 task。那麼每一個 Executor 建立 10 個磁盤文件,一共有 50 個磁盤文件。
具體以下圖所示:
觸發該機制的條件:
1,shuffle reduce 端的 task 數量小於 spark.shuffle.sort.bypassMergeThreshold
參數值的時候;
2,不是聚合類的shuffle算子(好比reduceByKey);
該機制下,當前 stage 的每一個 task 會將數據的 key 進行 hash,而後將相同 hash 的 key 鎖對應的數據寫入到同一個內存緩衝區,緩衝寫滿後會溢寫到磁盤文件,這裏和 HashShuffleManager一致。
而後會進入 merge 階段,將全部的磁盤文件合併成一個磁盤文件,並建立一個索引文件。
相比較於普通機制,這裏有兩個地方不一樣:
1,將數據寫入內存時候,普通模式是將數據寫入 Map 或者 Array 這樣的內存數據結構中,這裏是根據 key 的 Hash 值直接寫入內存;
2,該模式下在寫入磁盤以前不會排序;
3,磁盤寫機制不一樣。
具體如圖示:
spark.shuffle.file.buffer
spark.reducer.maxSizeInFlight
spark.shuffle.io.maxRetries
spark.shuffle.io.retryWait
spark.shuffle.memoryFraction
spark.shuffle.manager
spark.shuffle.sort.bypassMergeThreshold
spark.shuffle.consolidateFiles