阿里雲Spark Shuffle的優化

本次分享者:辰石,來自阿里巴巴計算平臺事業部EMR團隊技術專家,目前從事大數據存儲以及Spark相關方面的工做。面試

  • Spark Shuffle介紹
  • Smart Shuffle設計
  • 性能分析

Spark Shuffle流程

Spark 0.8及之前 Hash Based ShuffleSpark 0.8.1 爲Hash Based Shuffle引入File Consolidation機制Spark 0.9 引入ExternalAppendOnlyMapSpark 1.1 引入Sort Based Shuffle,但默認仍爲Hash Based ShuffleSpark 1.2 默認的Shuffle方式改成Sort Based ShuffleSpark 1.4 引入Tungsten-Sort Based ShuffleSpark 1.6 Tungsten-sort併入Sort Based ShuffleSpark 2.0 Hash Based Shuffle退出歷史舞臺apache

總結一下, 就是最開始的時候使用的是 Hash Based Shuffle, 這時候每個Mapper會根據Reducer的數量建立出相應的bucket,bucket的數量是M x R ,其中M是Map的個數,R是Reduce的個數。這樣會產生大量的小文件,對文件系統壓力很大,並且也不利於IO吞吐量。後面忍不了了就作了優化,把在同一core上運行的多個Mapper 輸出的合併到同一個文件,這樣文件數目就變成了 cores x R 個了。網絡

file

Spark Shuffle實現

Sort-based shuffle介紹

這個方式的選擇是在org.apache.spark.SparkEnv完成的:架構

// Let the user specify short names forshuffle managers
    val shortShuffleMgrNames = Map(
      "hash" ->"org.apache.spark.shuffle.hash.HashShuffleManager",
      "sort" ->"org.apache.spark.shuffle.sort.SortShuffleManager")
    val shuffleMgrName =conf.get("spark.shuffle.manager", "sort") //得到Shuffle Manager的type,sort爲默認
    val shuffleMgrClass =shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
    val shuffleManager =instantiateClass[ShuffleManager](shuffleMgrClass)複製代碼

Hashbased shuffle的每一個mapper都須要爲每一個reducer寫一個文件,供reducer讀取,即須要產生M x R個數量的文件,若是mapper和reducer的數量比較大,產生的文件數會很是多。Hash based shuffle設計的目標之一就是避免不須要的排序(Hadoop Map Reduce被人詬病的地方,不少不須要sort的地方的sort致使了沒必要要的開銷)。可是它在處理超大規模數據集的時候,產生了大量的DiskIO和內存的消耗,這無疑很影響性能。Hash based shuffle也在不斷的優化中,正如前面講到的Spark 0.8.1引入的file consolidation在必定程度上解決了這個問題。爲了更好的解決這個問題,Spark 1.1 引入了Sort based shuffle。首先,每一個Shuffle Map Task不會爲每一個Reducer生成一個單獨的文件;相反,它會將全部的結果寫到一個文件裏,同時會生成一個index文件,Reducer能夠經過這個index文件取得它須要處理的數據。避免產生大量的文件的直接收益就是節省了內存的使用和順序Disk IO帶來的低延時。節省內存的使用能夠減小GC的風險和頻率。而減小文件的數量能夠避免同時寫多個文件對系統帶來的壓力。app

目前writer的實現分爲三種, 分爲 BypassMergeSortShuffleWriter, SortShuffleWriter 和 UnsafeShuffleWriter。異步

file

SortShuffleManager只有BlockStoreShuffleReader這一種ShuffleReader。oop

Spark-shuffle存在的問題

同步操做性能

Shuffle數據只有等map task任務結束後可能會觸發多路歸併生成最終數據。大數據

大量的磁盤IO優化

Shuffle的數據在Merge階段存在大量的磁盤讀寫IO,在sort-merge階段對磁盤IO帶寬要求較高。

計算與網絡的串行

Task任務計算和網絡IO的串行操做。

Smart Shuffle

file

shuffle數據的pipeline

shuffle數據在map端累積到必定數量發送到reduce端。

避免沒必要要的網絡IO

根據partition數量的位置,能夠調度該reduce任務到相應的節點。

計算和網絡IO的異步化

shuffle數據的生成和shuffle數據的發送能夠並行執行。

避免sort-merge減小磁盤IO

shuffle數據是按照partition進行分區,shuffle數據無需sort-merge

Smart Shuffle使用

  • 配置spark.shuffle.manager : org.apache.spark.shuffle.hash.HashShuffleManager
  • 配置spark.shuffle.smart.spill.memorySizeForceSpillThreshold:控制shuffle數據佔用內存的大小,默認爲128M
  • 配置spark.shuffle.smart.transfer.blockSize:控制shuffle在網絡傳輸數據塊的大小

性能分析

硬件及軟件資源:

file

TPC-DS性能:

file

Smart shuffle TPC-DS性能提高28%:

  • Smart shuffle沒有打來單個query性能的降低
  • 單個query最大可以帶來最大2倍的性能提高

提取Q2和Q49查詢性能分析:

  • Q2在兩種shuffle性能保持一致
  • Q49在Smart shuffle下性能有很大提高

單個查詢對比:

file

左側爲sorted shuffle,右邊爲smart shuffle。Q2查詢相對簡單,shuffle數據也比較少,smart shuffle性能保持不變。

Q2 CPU對比:左側爲sorted shuffle,右側是smart shufflefile

磁盤對比:

左側爲sorted shuffle,右側是smart shufflefile

聲明:本號全部文章除特殊註明,都爲原創,公衆號讀者擁有優先閱讀權,未經做者本人容許不得轉載,不然追究侵權責任。

關注個人公衆號,後臺回覆【JAVAPDF】獲取200頁面試題!5萬人關注的大數據成神之路,不來了解一下嗎?5萬人關注的大數據成神之路,真的不來了解一下嗎?5萬人關注的大數據成神之路,肯定真的不來了解一下嗎?

歡迎您關注《大數據成神之路》

大數據技術與架構

相關文章
相關標籤/搜索