源文件放在github,若有謬誤之處,歡迎指正。原文連接https://github.com/jacksu/utils4s/blob/master/spark-knowledge/md/hash-shuffle.mdhtml
正如你所知,spark實現了多種shuffle方法,經過 spark.shuffle.manager來肯定。暫時總共有三種:hash shuffle、sort shuffle和tungsten-sort shuffle,從1.2.0開始默認爲sort shuffle。本節主要介紹hash shuffle。node
spark在1.2前默認爲hash shuffle(spark.shuffle.manager = hash),但hash shuffle也經歷了兩個發展階段。git
上圖有 4 個 ShuffleMapTask 要在同一個 worker node 上運行,CPU core 數爲 2,能夠同時運行兩個 task。每一個 task 的執行結果(該 stage 的 finalRDD 中某個 partition 包含的 records)被逐一寫到本地磁盤上。每一個 task 包含 R 個緩衝區,R = reducer 個數(也就是下一個 stage 中 task 的個數),緩衝區被稱爲 bucket,其大小爲spark.shuffle.file.buffer.kb ,默認是 32KB(Spark 1.1 版本之前是 100KB)。github
這樣的實現很簡單,但有幾個問題:apache
1 產生的 FileSegment 過多。每一個 ShuffleMapTask 產生 R(reducer 個數)個 FileSegment,M 個 ShuffleMapTask 就會產生 M * R
個文件。通常 Spark job 的 M 和 R 都很大,所以磁盤上會存在大量的數據文件。緩存
2 緩衝區佔用內存空間大。每一個 ShuffleMapTask 須要開 R 個 bucket,M 個 ShuffleMapTask 就會產生 M * R 個 bucket。雖然一個 ShuffleMapTask 結束後,對應的緩衝區能夠被回收,但一個 worker node 上同時存在的 bucket 個數能夠達到 cores R 個(通常 worker 同時能夠運行 cores 個 ShuffleMapTask),佔用的內存空間也就達到了cores * R * 32 KB。對於 8 核 1000 個 reducer 來講,佔用內存就是 256MB。markdown
spark.shuffle.consolidateFiles默認爲false,若是爲true,shuffleMapTask輸出文件能夠被合併。如圖性能
能夠明顯看出,在一個 core 上連續執行的 ShuffleMapTasks 能夠共用一個輸出文件 ShuffleFile。先執行完的 ShuffleMapTask 造成 ShuffleBlock i,後執行的 ShuffleMapTask 能夠將輸出數據直接追加到 ShuffleBlock i 後面,造成 ShuffleBlock i',每一個 ShuffleBlock 被稱爲 FileSegment。下一個 stage 的 reducer 只須要 fetch 整個 ShuffleFile 就好了。這樣,每一個 worker 持有的文件數降爲 cores * R
。可是緩存空間佔用大尚未解決。fetch
固然,數據通過序列化、壓縮寫入文件,讀取的時候,須要反序列化、解壓縮。reduce fetch的時候有一個很是重要的參數spark.reducer.maxSizeInFlight
,這裏用 softBuffer 表示,默認大小爲 48MB。一個 softBuffer 裏面通常包含多個 FileSegment,但若是某個 FileSegment 特別大的話,這一個就能夠填滿甚至超過 softBuffer 的界限。若是增大,reduce請求的chunk就會變大,能夠提升性能,可是增長了reduce的內存使用量。spa
若是排序在reduce不強制執行,那麼reduce只返回一個依賴於map的迭代器。若是須要排序, 那麼在reduce端,調用ExternalSorter。