有許多場景下,咱們須要進行跨服務器的數據整合,好比兩個表之間,經過Id進行join操做,你必須確保全部具備相同id的數據整合到相同的塊文件中。那麼咱們先說一下mapreduce的shuffle過程。算法
Mapreduce的shuffle的計算過程是在executor中劃分mapper與reducer。Spark的Shuffling中有兩個重要的壓縮參數。spark.shuffle.compress true---是否將會將shuffle中outputs的過程進行壓縮。將spark.io.compression.codec編碼器設置爲壓縮數據,默認是true.同時,經過spark.shuffle.manager 來設置shuffle時的排序算法,有hash,sort,tungsten-sort。(用hash會快一點,我不須要排序啊~)數組
Hash Shuffle緩存
使用hash散列有不少缺點,主要是由於每一個Map task都會爲每一個reduce生成一份文件,因此最後就會有M * R個文件數量。那麼若是在比較多的Map和Reduce的狀況下就會出問題,輸出緩衝區的大小,系統中打開文件的數量,建立和刪除全部這些文件的速度都會受到影響。以下圖:服務器
這裏有一個優化的參數spark.shuffle.consolidateFiles,默認爲false,當設置成true時,會對mapper output時的文件進行合併。若是你集羣有E個executors(「-num-excutors」)以及C個cores("-executor-cores」),以及每一個task又T個CPUs(「spark.task.cpus」),那麼總共的execution的slot在集羣上的個數就是E * C / T(也就是executor個數×CORE的數量/CPU個數)個,那麼shuffle過程當中所建立的文件就爲E * C / T * R(也就是executor個數 × core的個數/CPU個數×reduce個數)個。外文文獻寫的太公式化,那麼我用通俗易懂的形式闡述下。就比如總共的並行度是20(5個executor,4個task) Map階段會將數據寫入磁盤,當它完成時,他將會以reduce的個數來生成文件數。那麼每一個executor就只會計算core的數量/cpu個數的tasks.若是task數量大於總共集羣並行度,那麼將開啓下一輪,輪詢執行。app
速度較快,由於沒有再對中間結果進行排序,減小了reduce打開文件時的性能消耗。性能
固然,當數據是通過序列化以及壓縮的。當從新讀取文件,數據將進行解壓縮與反序列化,這裏reduce端數據的拉取有個參數spark.reducer.maxSizeInFlight(默認爲48MB),它將決定每次數據從遠程的executors中拉取大小。這個拉取過程是由5個並行的request,從不一樣的executor中拉取過來,從而提高了fetch的效率。 若是你加大了這個參數,那麼reducers將會請求更多的文數據進來,它將提升性能,可是也會增長reduce時的內存開銷。fetch
Sort Shuffle優化
Sort Shuffle如同hash shuffle map寫入磁盤,reduce拉取數據的一個性質,當在進行sort shuffle時,總共的reducers要小於spark.shuffle.sort.bypassMergeThrshold(默認爲200),將會執行回退計劃,使用hash將數據寫入單獨的文件中,而後將這些小文件彙集到一個文件中,從而加快了效率。(實現自BypassMergeSortShuffleWriter中)編碼
那麼它的實現邏輯是在reducer端合併mappers的輸出結果。Spark在reduce端的排序是用了TimSort,它就是在reduce前,提早用算法進行了排序。 那麼用算法的思想來講,合併的M N個元素進行排序,那麼其複雜度爲O(MNlogM) 具體算法不講了~要慢慢看~spa
隨之,當你沒有足夠的內存保存map的輸出結果時,在溢出前,會將它們disk到磁盤,那麼緩存到內存的大小即是 spark.shuffle.memoryFraction * spark.shuffle.safyFraction.默認的狀況下是」JVM Heap Size * 0.2 * 0.8 = JVM Heap Size * 0.16」。須要注意的是,當你多個線程同時在一個executor中運行時(spark.executor.cores/spark.task.cpus 大於1的狀況下),那麼map output的每一個task將會擁有 「JVM Heap Size * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction / spark.executor.cores * spark.task.cpus。運行原理以下圖:
使用此種模式,會比使用hashing要慢一點,可經過bypassMergeThreshold找到集羣的最快平衡點。
Tungsten Sort
使用此種排序方法的優勢在於,操做的二進制數據不須要進行反序列化。它使用 sun.misc.Unsafe模式進行直接數據的複製,由於沒有反序列化,因此直接是個字節數組。同時,它使用特殊的高效緩存器ShuffleExtemalSorter壓記錄與指針以及排序的分區id.只用了8 Bytes的空間的排序數組。這將會比使用CPU緩存要效率。
每一個spill的數據、指針進行排序,輸出到一個索引文件中。隨後將這些partitions再次合併到一個輸出文件中。
本文翻譯自一位國外大神的博客:https://0x0fff.com/spark-memory-management/