ShuffleManager 原理

在 Spark 的源碼中,負責 shuffle 過程的執行、計算、處理的組件主要是 ShuffleManager。網絡

在 Spark 1.2 之前,默認的 shuffle 計算引擎是 HashShuffleManager。該 ShuffleMananger 有一個很是嚴重的弊端,就是會產生大量的磁盤文件,進而有大量的磁盤 IO 操做,比較影響性能。數據結構

所以在 Spark 1.2 以後,默認的 ShuffleManager 改爲了 SortShuffleManager。SortShuffleManager 相對來講,有了必定的改進。主要就在於,每一個 Task 在 Shuffle Write 操做時,雖然也會產生較大的磁盤文件,但最後會將全部的臨時文件合併 (merge) 成一個磁盤文件,所以每一個 Task 就只有一個磁盤文件。在下一個 Stage 的 Shuffle Read Task 拉取本身數據的時候,只要根據索引拉取每一個磁盤文件中的部分數據便可。ide

一,HashShuffleManager 運行原理

普通模式下,在 Shuffle Write 階段,每一個 Task 將數據按照 Key 進行 Hash 計算,而後按照計算結果,將相同的 Key 對應的數據寫入內存緩衝區,當內存緩衝區寫滿以後會直接溢寫到磁盤文件。這裏須要寫多少個磁盤文件,和下一個 stage 的 Shuffle Read Task 的數量一致。性能

而後,Shuffle Read 階段的每一個 Task 會拉取 Shuffle Write 階段全部相同 Key 的文件,一遍拉取一遍聚合。每一個 Shuffle Read 階段的 Task 都有本身的緩衝區,每次只能拉取與緩衝區大小一致的數據,而後經過內存中的 Map 進行聚合等操做,聚合完一批再取下一批數據。大數據

好比,當前 Stage 有 5 個 Executor,每一個 Executor 分配一個 cpu core,有 50 個 task,每一個 Executor 執行 10 個 task;下一個 stage 有100 個 task。那麼在 Shuffle Write 階段每一個 task 要建立 100 個磁盤文件,每一個 Executor 進程要建立 1000 個文件,一共要建立 1000 * 5 = 5000 個磁盤文件,數量不少。優化

具體執行原理圖以下圖所示:spa

針對 HashShuffleManager 咱們能夠設置一個參數:spark.shuffle.consolidateFiles。這個參數的值默認是 fasle,若是設置成 true 以後就會開啓優化機制。3d

當開啓這個參數以後,在 Shuffle Write 階段寫文件的時候會複用文件,每一個 task 不會爲 Shuffle Read 階段的 task 都建立一份文件。此時會出現一個 shuffleFileGroup 的概念,每一個 shuffleFileGroup 會對應一批磁盤文件,磁盤文件的數量和 Shuffle Read 階段的 task 數量一致。每一個 Executor 上有多少個 cpu core 就會並行執行幾個 task,每一個 task 會建立一個 shuffleFileGroup,而後後續並行執行的 task 會複用前面生成的這個 shuffleFileGroup。code

好比,當前 stage 有 5 個 Executor,每一個 Executor 分配 3 個 cpu core,一共有 50 個 task,每一個 Executor 執行 10 個 task,Shuffle Read 階段有 100 個 task。那麼此時,每一個 Executor 進程會建立 3 * 100 個文件,一共會建立 5 * 3 * 100 個文件。cdn

具體原理如圖示:

二,SortShuffleManager 運行原理

SortShuffleManager 運行機制有兩種,一種是普通運行機制,另外一種是 bypass 運行機制。當 shuffle read task 的數量小於等於 spark.shuffle.sort.bypassMergeThreshold 參數值時 (默認是 200 ) ,就會啓用 bypass 機制。

1,普通機制

在該模式下,Shuffle Write 階段會將數據寫入一個內存的數據結構中,此時根據不一樣的算子會有不一樣的數據結構。好比是 reduceByKey 這種聚合類的 shuffle 算子,會選用 Map 數據結構,一遍用 Map 進行聚合(HashShuffleManager 聚合操做是放在 Shuffle Read 階段),一遍寫入內存;若是是 join 相關的普通 shuffle 算子的話,會用 Array 數據結構,直接寫入內存。當內存達到臨界閾值以後,會將內存中的數據進行排序,而後分批次寫入磁盤 (默認每批次有 1W 條數據),在寫入磁盤的時候不會像 HashShuffleManager 那樣直接寫入磁盤,這裏會先寫入內存緩衝流,當緩衝流滿溢以後一次性寫入磁盤。

此時也會生成大批量的文件,最後會將以前全部的臨時磁盤文件進行合併,這就是 merge 過程 (就是將全部的臨時磁盤文件中的數據讀取出來,而後依次寫入最終的文件中)。每一個 task 最終會生成一份磁盤文件和一份索引文件,索引文件中標示了下游每一個 task 的數據在文件中的 start offset 和 end offset。

好比,當前 stage 有 5 個 Executor,每一個 Executor 分配 1 個 cpu core,共有 50 個 task,每一個 Executor 執行 10 個 task;下一個 stage 有 100 個 task。那麼每一個 Executor 建立 10 個磁盤文件,一共有 50 個磁盤文件。

具體以下圖所示:

2,bypass 機制

觸發該機制的條件:

1,shuffle reduce 端的 task 數量小於 spark.shuffle.sort.bypassMergeThreshold 參數值的時候;

2,不是聚合類的shuffle算子(好比reduceByKey);

該機制下,當前 stage 的每一個 task 會將數據的 key 進行 hash,而後將相同 hash 的 key 鎖對應的數據寫入到同一個內存緩衝區,緩衝寫滿後會溢寫到磁盤文件,這裏和 HashShuffleManager一致。

而後會進入 merge 階段,將全部的磁盤文件合併成一個磁盤文件,並建立一個索引文件。

相比較於普通機制,這裏有兩個地方不一樣:

1,將數據寫入內存時候,普通模式是將數據寫入 Map 或者 Array 這樣的內存數據結構中,這裏是根據 key 的 Hash 值直接寫入內存;

2,該模式下在寫入磁盤以前不會排序;

3,磁盤寫機制不一樣。

具體如圖示:

三,shuffle 相關的參數

spark.shuffle.file.buffer

  • 默認值:32k
  • 參數說明:該參數用於設置 shuffle write task 的 BufferedOutputStream 的 buffer 緩衝大小。將數據寫到磁盤文件以前,會先寫入 buffer 緩衝中,待緩衝寫滿以後,纔會溢寫到磁盤。
  • 調優建議:若是做業可用的內存資源較爲充足的話,能夠適當增長這個參數的大小(好比 64k),從而減小 shuffle write 過程當中溢寫磁盤文件的次數,也就能夠減小磁盤 IO 次數,進而提高性能。在實踐中發現,合理調節該參數,性能會有 1%~5% 的提高。

spark.reducer.maxSizeInFlight

  • 默認值:48m
  • 參數說明:該參數用於設置 shuffle read task 的 buffer 緩衝大小,而這個 buffer 緩衝決定了每次可以拉取多少數據。
  • 調優建議:若是做業可用的內存資源較爲充足的話,能夠適當增長這個參數的大小(好比 96m),從而減小拉取數據的次數,也就能夠減小網絡傳輸的次數,進而提高性能。在實踐中發現,合理調節該參數,性能會有 1%~5% 的提高。

spark.shuffle.io.maxRetries

  • 默認值:3
  • 參數說明:shuffle read task 從 shuffle write task 所在節點拉取屬於本身的數據時,若是由於網絡異常致使拉取失敗,是會自動進行重試的。該參數就表明了能夠重試的最大次數。若是在指定次數以內拉取仍是沒有成功,就可能會致使做業執行失敗。
  • 調優建議:對於那些包含了特別耗時的 shuffle 操做的做業,建議增長重試最大次數(好比 60 次),以免因爲 JVM 的 full gc 或者網絡不穩定等因素致使的數據拉取失敗。在實踐中發現,對於針對超大數據量(數十億~上百億)的 shuffle 過程,調節該參數能夠大幅度提高穩定性。

spark.shuffle.io.retryWait

  • 默認值:5s
  • 參數說明:具體解釋同上,該參數表明了每次重試拉取數據的等待間隔,默認是 5s。
  • 調優建議:建議加大間隔時長(好比 60s),以增長 shuffle 操做的穩定性。

spark.shuffle.memoryFraction

  • 默認值:0.2
  • 參數說明:該參數表明了 Executor 內存中,分配給 shuffle read task 進行聚合操做的內存比例,默認是 20%。
  • 調優建議:在資源參數調優中講解過這個參數。若是內存充足,並且不多使用持久化操做,建議調高這個比例,給 shuffle read 的聚合操做更多內存,以免因爲內存不足致使聚合過程當中頻繁讀寫磁盤。在實踐中發現,合理調節該參數能夠將性能提高 10% 左右。

spark.shuffle.manager

  • 默認值:sort
  • 參數說明:該參數用於設置 ShuffleManager 的類型。Spark 1.5 之後,有三個可選項:hash、sort 和 tungsten-sort。HashShuffleManager 是 Spark 1.2 之前的默認選項,可是 Spark 1.2 以及以後的版本默認都是 SortShuffleManager 了。tungsten-sort 與 sort 相似,可是使用了 tungsten 計劃中的堆外內存管理機制,內存使用效率更高。
  • 調優建議:因爲 SortShuffleManager 默認會對數據進行排序,所以若是你的業務邏輯中須要該排序機制的話,則使用默認的 SortShuffleManager 就能夠;而若是你的業務邏輯不須要對數據進行排序,那麼建議參考後面的幾個參數調優,經過 bypass 機制或優化的 HashShuffleManager 來避免排序操做,同時提供較好的磁盤讀寫性能。這裏要注意的是,tungsten-sort 要慎用,由於以前發現了一些相應的 bug。

spark.shuffle.sort.bypassMergeThreshold

  • 默認值:200
  • 參數說明:當 ShuffleManager 爲 SortShuffleManager 時,若是 shuffle read task 的數量小於這個閾值(默認是200),則 shuffle write 過程當中不會進行排序操做,而是直接按照未經優化的 HashShuffleManager 的方式去寫數據,可是最後會將每一個task產生的全部臨時磁盤文件都合併成一個文件,並會建立單獨的索引文件。
  • 調優建議:當你使用 SortShuffleManager 時,若是的確不須要排序操做,那麼建議將這個參數調大一些,大於 shuffle read task 的數量。那麼此時就會自動啓用 bypass 機制,map-side 就不會進行排序了,減小了排序的性能開銷。可是這種方式下,依然會產生大量的磁盤文件,所以 shuffle write 性能有待提升。

spark.shuffle.consolidateFiles

  • 默認值:false
  • 參數說明:若是使用 HashShuffleManager,該參數有效。若是設置爲 true,那麼就會開啓 consolidate 機制,會大幅度合併 shuffle write 的輸出文件,對於 shuffle read task 數量特別多的狀況下,這種方法能夠極大地減小磁盤 IO 開銷,提高性能。
  • 調優建議:若是的確不須要 SortShuffleManager 的排序機制,那麼除了使用 bypass 機制,還能夠嘗試將 spark.shffle.manager 參數手動指定爲 hash,使用 HashShuffleManager,同時開啓 consolidate 機制。在實踐中嘗試過,發現其性能比開啓了 bypass 機制的 SortShuffleManager 要高出 10%~30%。
相關文章
相關標籤/搜索