Shuffle就是將不一樣節點上相同的Key拉取到一個節點的過程。這之中涉及到各類IO,因此執行時間勢必會較長。對shuffle的優化也是spark job優化的重點。html
Spark的Shuffle在1.2以前默認的計算引擎是HashShuffleManagergit
假設每一個executor只有一個core,意味着一個executor只能同時運行一個task。有三個Reducer,每一個reducer都會從上游拉取對應block file,每一個task會爲下游的每個reducer生成一個block文件,這樣算,總的文件個數就是上一個stage的分區數 × 下游的分區數 (如圖是12個)
若是分區數比較多,map task就比較多,下個stage的reduce task也比較多,那就會不少小文件產生,IO消耗很大,顯然很差。github
優化就是複用buffer,也就使輸出的block文件合併了。開啓合併機制spark.shuffle.consolidateFiles=true
。數據結構
如圖,一個core無論有幾個task都會複用同一個buffer,這樣生成的文件個數即爲core × reducer。很明顯比優化前少了不少。但若是下游stage的分區不少的話,文件仍然多。優化
在Spark1.2版本以後,出現了SortShuffle,這種方式以更少的中間磁盤文件產生而遠遠優於HashShuffle。spa
在該模式下,數據會先寫入一個內存數據結構中(默認5M),Map或者Array。若是使reduceByKey類算子,就用Map,join類算子就用Array。每條數據寫入內存後就會判斷是否達到閾值,若是達到了就溢寫磁盤,最後清空內存。shuffle中的定時器會定時會檢查內存數據結構的大小,若是內存數據結構空間不夠,那麼會申請額外的內存
在溢寫到磁盤文件以前,會先根據key對內存數據結構中已有的數據進行排序,再以默認每批1w條數據經過BufferedOutputStream
寫入磁盤。
最後,再把這些文件合併成一個文件,並多出一個索引文件來告訴下游task從哪一個offset開始讀取。
結果生成的文件個數爲 map task × 2,已經大大減小了。code
知足如下兩個條件:
shuffle map task數量小於spark.shuffle.sort.bypassMergeThreshold
參數的值(默認200)。
不是聚合類的shuffle算子(好比reduceByKey
)。htm
在這種機制下,當前stage的task會爲每一個下游的task都建立臨時磁盤文件。將數據按照key值進行hash,而後根據hash值,將key寫入對應的磁盤文件中。最終,一樣會將全部臨時文件依次合併成一個磁盤文件,創建索引。
本質上就是在Hash Shuffle後進行了小文件的合併。相比普通機制的Sort Shuffle,文件個數也是map task × 2,但省去了排序的過程消耗。blog
參考連接:
https://ruozedata.github.io/2...
https://www.cnblogs.com/itboy...排序