本次分享者:辰石,來自阿里巴巴計算平臺事業部EMR團隊技術專家,目前從事大數據存儲以及Spark相關方面的工做。面試
Spark 0.8及之前 Hash Based ShuffleSpark 0.8.1 爲Hash Based Shuffle引入File Consolidation機制Spark 0.9 引入ExternalAppendOnlyMapSpark 1.1 引入Sort Based Shuffle,但默認仍爲Hash Based ShuffleSpark 1.2 默認的Shuffle方式改成Sort Based ShuffleSpark 1.4 引入Tungsten-Sort Based ShuffleSpark 1.6 Tungsten-sort併入Sort Based ShuffleSpark 2.0 Hash Based Shuffle退出歷史舞臺apache
總結一下, 就是最開始的時候使用的是 Hash Based Shuffle, 這時候每個Mapper會根據Reducer的數量建立出相應的bucket,bucket的數量是M x R ,其中M是Map的個數,R是Reduce的個數。這樣會產生大量的小文件,對文件系統壓力很大,並且也不利於IO吞吐量。後面忍不了了就作了優化,把在同一core上運行的多個Mapper 輸出的合併到同一個文件,這樣文件數目就變成了 cores x R 個了。網絡
這個方式的選擇是在org.apache.spark.SparkEnv完成的:架構
// Let the user specify short names forshuffle managers
val shortShuffleMgrNames = Map(
"hash" ->"org.apache.spark.shuffle.hash.HashShuffleManager",
"sort" ->"org.apache.spark.shuffle.sort.SortShuffleManager")
val shuffleMgrName =conf.get("spark.shuffle.manager", "sort") //得到Shuffle Manager的type,sort爲默認
val shuffleMgrClass =shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
val shuffleManager =instantiateClass[ShuffleManager](shuffleMgrClass)複製代碼
Hashbased shuffle的每一個mapper都須要爲每一個reducer寫一個文件,供reducer讀取,即須要產生M x R個數量的文件,若是mapper和reducer的數量比較大,產生的文件數會很是多。Hash based shuffle設計的目標之一就是避免不須要的排序(Hadoop Map Reduce被人詬病的地方,不少不須要sort的地方的sort致使了沒必要要的開銷)。可是它在處理超大規模數據集的時候,產生了大量的DiskIO和內存的消耗,這無疑很影響性能。Hash based shuffle也在不斷的優化中,正如前面講到的Spark 0.8.1引入的file consolidation在必定程度上解決了這個問題。爲了更好的解決這個問題,Spark 1.1 引入了Sort based shuffle。首先,每一個Shuffle Map Task不會爲每一個Reducer生成一個單獨的文件;相反,它會將全部的結果寫到一個文件裏,同時會生成一個index文件,Reducer能夠經過這個index文件取得它須要處理的數據。避免產生大量的文件的直接收益就是節省了內存的使用和順序Disk IO帶來的低延時。節省內存的使用能夠減小GC的風險和頻率。而減小文件的數量能夠避免同時寫多個文件對系統帶來的壓力。app
目前writer的實現分爲三種, 分爲 BypassMergeSortShuffleWriter, SortShuffleWriter 和 UnsafeShuffleWriter。異步
SortShuffleManager只有BlockStoreShuffleReader這一種ShuffleReader。oop
同步操做性能
Shuffle數據只有等map task任務結束後可能會觸發多路歸併生成最終數據。大數據
大量的磁盤IO優化
Shuffle的數據在Merge階段存在大量的磁盤讀寫IO,在sort-merge階段對磁盤IO帶寬要求較高。
計算與網絡的串行
Task任務計算和網絡IO的串行操做。
shuffle數據的pipeline
shuffle數據在map端累積到必定數量發送到reduce端。
避免沒必要要的網絡IO
根據partition數量的位置,能夠調度該reduce任務到相應的節點。
計算和網絡IO的異步化
shuffle數據的生成和shuffle數據的發送能夠並行執行。
避免sort-merge減小磁盤IO
shuffle數據是按照partition進行分區,shuffle數據無需sort-merge
硬件及軟件資源:
TPC-DS性能:
Smart shuffle TPC-DS性能提高28%:
提取Q2和Q49查詢性能分析:
單個查詢對比:
左側爲sorted shuffle,右邊爲smart shuffle。Q2查詢相對簡單,shuffle數據也比較少,smart shuffle性能保持不變。
Q2 CPU對比:左側爲sorted shuffle,右側是smart shuffle
磁盤對比:
左側爲sorted shuffle,右側是smart shuffle
聲明:本號全部文章除特殊註明,都爲原創,公衆號讀者擁有優先閱讀權,未經做者本人容許不得轉載,不然追究侵權責任。
關注個人公衆號,後臺回覆【JAVAPDF】獲取200頁面試題!5萬人關注的大數據成神之路,不來了解一下嗎?5萬人關注的大數據成神之路,真的不來了解一下嗎?5萬人關注的大數據成神之路,肯定真的不來了解一下嗎?