Spark Shuffle之Sort Shuffle

源文件放在github,隨着理解的深刻,不斷更新,若有謬誤之處,歡迎指正。原文連接https://github.com/jacksu/utils4s/blob/master/spark-knowledge/md/sort-shuffle.mdjava

正如你所知,spark實現了多種shuffle方法,經過 spark.shuffle.manager來肯定。暫時總共有三種:hash shuffle、sort shuffle和tungsten-sort shuffle,從1.2.0開始默認爲sort shuffle。本節主要介紹sort shuffle。git

從1.2.0開始默認爲sort shuffle(spark.shuffle.manager = sort),實現邏輯相似於Hadoop MapReduce,Hash Shuffle每個reducers產生一個文件,可是Sort Shuffle只是產生一個按照reducer id排序可索引的文件,這樣,只需獲取有關文件中的相關數據塊的位置信息,並fseek就能夠讀取指定reducer的數據。但對於rueducer數比較少的狀況,Hash Shuffle明顯要比Sort Shuffle快,所以Sort Shuffle有個「fallback」計劃,對於reducers數少於 「spark.shuffle.sort.bypassMergeThreshold」 (200 by default),咱們使用fallback計劃,hashing相關數據到分開的文件,而後合併這些文件爲一個,具體實現爲BypassMergeSortShuffleWritergithub

image

在map進行排序,在reduce端應用Timsort[1]進行合併。map端是否允許spill,經過spark.shuffle.spill來設置,默認是true。設置爲false,若是沒有足夠的內存來存儲map的輸出,那麼就會致使OOM錯誤,所以要慎用。算法

用於存儲map輸出的內存爲:「JVM Heap Size」 \* spark.shuffle.memoryFraction \* spark.shuffle.safetyFraction,默認爲「JVM Heap Size」 \* 0.2 \* 0.8 = 「JVM Heap Size」 \* 0.16。若是你在同一個執行程序中運行多個線程(設定spark.executor.cores/ spark.task.cpus超過1),每一個map任務存儲的空間爲「JVM Heap Size」 * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction / spark.executor.cores * spark.task.cpus, 默認2個cores,那麼爲0.08 * 「JVM Heap Size」。 spark使用AppendOnlyMap存儲map輸出的數據,利用開源hash函數MurmurHash3和平方探測法把key和value保存在相同的array中。這種保存方法能夠是spark進行combine。若是spill爲true,會在spill前sort。apache

Sort Shuffle內存的源碼級別更詳細說明能夠參考[4],讀寫過程能夠參考[5]函數

##優勢oop

  1. map建立文件量較少
  2. 少許的IO隨機操做,大部分是順序讀寫

##缺點spa

  1. 要比Hash Shuffle要慢,須要本身經過spark.shuffle.sort.bypassMergeThreshold來設置合適的值。
  2. 若是使用SSD盤存儲shuffle數據,那麼Hash Shuffle可能更合適。

##參考.net

[1]Timsort原理介紹線程

[2]形式化方法的逆襲——如何找出Timsort算法和玉兔月球車中的Bug?

[3]Spark Architecture: Shuffle

[4]Spark Sort Based Shuffle內存分析

[5]Spark Shuffle Write階段磁盤文件分析

相關文章
相關標籤/搜索