Spark Shuffle

1. Shuffle相關

當Map的輸出結果要被Reduce使用時,輸出結果須要按key哈希,而且分發到每個Reducer上去,這個過程就是shuffle。因爲shuffle涉及到了磁盤的讀寫和網絡的傳輸,所以shuffle性能的高低直接影響到了整個程序的運行效率。概念上shuffle就是一個溝通數據鏈接(map和reduce)的橋樑。每一個ReduceTask從每一個Map Task產生數的據中讀取一片數據,極限狀況下可能觸發M*R個數據拷貝通道(M是MapTask數目,R是Reduce Task數目)。算法

在Spark1.1以前,其shuffle只存在一種模式,即hash base。在Spark1.1版本以後加入了sort base。Spark1.1默認採用的shuffle模式仍是hash base。在Spark1.2中,sort base將做爲默認模式。固然,你能夠經過shuffle manager進行配置。apache

2. Spark shuffle流程

·        首先每個Mapper會根據Reducer的數量建立出相應的bucket,bucket的數量是M×R,其中M是Map的個數,R是Reduce的個數。網絡

·        其次Mapper產生的結果會根據設置的partition算法填充到每一個bucket中去。這裏的partition算法是能夠自定義的,固然默認的算法是根據key哈希到不一樣的bucket中去。數據結構

·        當Reducer啓動時,它會根據本身task的id和所依賴的Mapper的id從遠端或是本地的block manager中取得相應的bucket做爲Reducer的輸入進行處理。併發

這裏的bucket是一個抽象概念,在實現中每一個bucket能夠對應一個文件,能夠對應文件的一部分或是其餘等。app

一般shuffle分爲兩部分:Map階段的數據準備和Reduce階段的數據拷貝。首先,Map階段需根據Reduce階段的Task數量決定每一個MapTask輸出的數據分片數目,有多種方式存放這些數據分片:框架

1) 保存在內存中或者磁盤上(Spark和MapReduce都存放在磁盤上);ide

2) 每一個分片一個文件(如今Spark採用的方式,若干年前MapReduce採用的方式),或者全部分片放到一個數據文件中,外加一個索引文件記錄每一個分片在數據文件中的偏移量(如今MapReduce採用的方式)。性能

在Map端,不一樣的數據存放方式各有優缺點和適用場景。通常而言,shuffle在Map端的數據要存儲到磁盤上,以防止容錯觸發重算帶來的龐大開銷(若是保存到Reduce端內存中,一旦Reduce Task掛掉了,全部Map Task須要重算)。但數據在磁盤上存放方式有多種可選方案,在MapReduce前期設計中,採用瞭如今Spark的方案(目前一直在改進),每一個Map Task爲每一個Reduce Task產生一個文件,該文件只保存特定Reduce Task需處理的數據,這樣會產生M*R個文件,若是M和R很是龐大,好比均爲1000,則會產生100w個文件,產生和讀取這些文件會產生大量的隨機IO,效率很是低下。解決這個問題的一種直觀方法是減小文件數目,經常使用的方法有:1) 將一個節點上全部Map產生的文件合併成一個大文件(MapReduce如今採用的方案),2) 每一個節點產生{(slot數目)*R}個文件(Spark優化後的方案)。對後面這種方案簡單解釋一下:不論是MapReduce 1.0仍是Spark,每一個節點的資源會被抽象成若干個slot,因爲一個Task佔用一個slot,所以slot數目可當作是最多同時運行的Task數目。若是一個Job的Task數目很是多,限於slot數目有限,可能須要運行若干輪。這樣,只須要由第一輪產生{(slot數目)*R}個文件,後續幾輪產生的數據追加到這些文件末尾便可。所以,後一種方案可減小大做業產生的文件數目。優化

在Reduce端,各個Task會併發啓動多個線程同時從多個Map Task端拉取數據。因爲Reduce階段的主要任務是對數據進行按組規約。也就是說,須要將數據分紅若干組,以便以組爲單位進行處理。你們知道,分組的方式很是多,常見的有:Map/HashTable(key相同的,放到同一個value list中)和Sort(按key進行排序,key相同的一組,經排序後會挨在一塊兒),這兩種方式各有優缺點,第一種複雜度低,效率高,可是須要將數據所有放到內存中,第二種方案複雜度高,但可以藉助磁盤(外部排序)處理龐大的數據集。Spark前期採用了第一種方案,而在最新的版本中加入了第二種方案, MapReduce則從一開始就選用了基於sort的方案。

3. shuffle分析


3.1 shuffle寫


Spark中須要Shuffle輸出的ShuffleMapTask會爲每一個ResultTask建立對應的Bucket,ShuffleMapTask產生的結果會根據設置的partitioner獲得對應的BucketId,而後填充到相應的Bucket中區。每一個ShuffleMapTask的輸出結果可能包含全部的ResultTask所須要的數據,因此每一個ShuffleMapTask建立Bucket的數目是和ResultTask的數目相等。

ShuffleMapTask建立的Bucket對應磁盤上的一個文件,用於存儲結果,此文件也被稱爲BlockFile。經過屬性spark.shuffle.file.buffer.kb配置的緩衝區就是用來建立FastBufferedOutputStream輸出流的。若是在配置文件中設置了屬性spark.shuffle.consolidateFiles爲true的話,ShuffleMapTask所產生的Bucket就不必定單獨對應一個文件了,而是對應文件的一部分,這樣作會大量減小產生的BlockFile文件數量。

ShuffleMapTask在某個節點上第一次執行時,會被每一個ResultTask建立一個輸出文件,並把這些文件組織成ShuffleFileGroup,當這個ShuffleMapTask執行結束後,當前建立的ShuffleFileGroup能夠被釋放掉,進行循環使用,當又有ShuffleMapTask在這個節點執行時,不須要建立新的輸出文件,而是在上次的ShuffleFileGroup中已經建立的文件裏追加寫一個Segment;若是當前的ShuffleMapTask還沒執行完,此時又在此節點上啓動了新的ShuffleMapTask,那麼新的ShuffleMapTask只能又建立新的輸出文件再組成一個ShuffleFileGroup來進行結果輸出。

3.2 shuffle讀

前面ShuffleMapTask寫結果,如今輪到ResultTask去讀那些數據了。Spark可使用兩種方式來讀取數據,一種是普通的Socket方式,一種是使用Netty框架。使用Netty方式的話,能夠經過配置屬性spark.shuffle.use.netty爲true來啓動。

ResultTask讀數據時,會經過BlockManager根據BlockID把相關的數據返回給ResultTask。若是使用是Netty框架,BlockManaget會建立ShuffleSender專門用於發送數據。若是ResultTask所須要的數據剛好在本節點,那就直接去磁盤上讀便可,不在經過網絡獲取,這點比MapReduce作得更好,MapReduce取數據時,即便數據在本地仍是要走一遍網絡傳輸。

Spark默認的Shuffle過程當中的數據都沒有通過排序(Hash模式),這一點也要比MapReduce框架節省不少時間。ResultTask讀取過來的數據首先存放到HashMap中,若是數據量比較小,佔用內存空間不會太大,若是數據量比較大,那就須要較多內存,內存不足該如何解決?

Spark提供了兩種方式,根據spark.shuffle.spill的設置,當內存不夠時,直接就失敗。若是設置了能夠Spill到磁盤,那就把內存中的數據溢寫到磁盤中。寫到磁盤前,先把內存中的HashMap排序,而且把內存緩衝區中的數據排序以後和寫到磁盤上文件數據組成一個最小堆,每次從最小堆中讀取最小的數據。


4. sort與hash模式

用來配置所使用的shuffle manager,目前可用的有:

org.apache.spark.shuffle.sort.HashShuffleManager(配置參數值爲hash)org.apache.spark.shuffle.sort.SortShuffleManager(配置參數值爲sort)

可在spark-default.conf中加入以下內容使用SORT模式:

Spark.shuffle.maager SORT

這兩個ShuffleManager如何選擇呢,首先須要瞭解他們在實現方式上的區別。

HashShuffleManager,故名思義也就是在Shuffle的過程當中寫數據時不作排序操做,只是將數據根據Hash的結果,將各個Reduce分區的數據寫到各自的磁盤文件中。帶來的問題就是若是Reduce分區的數量比較大的話,將會產生大量的磁盤文件。若是文件數量特別巨大,對文件讀寫的性能會帶來比較大的影響,此外因爲同時打開的文件句柄數量衆多,序列化,以及壓縮等操做須要分配的臨時內存空間也可能會迅速膨脹到沒法接受的地步,對內存的使用和GC帶來很大的壓力,在Executor內存比較小的狀況下尤其突出,例如Spark on Yarn模式。

SortShuffleManager,是1.1版本以後實現的一個試驗性(也就是一些功能和接口還在開發演變中)的ShuffleManager,它在寫入分區數據的時候,首先會根據實際狀況對數據採用不一樣的方式進行排序操做,底線是至少按照Reduce分區Partition進行排序,這樣來至於同一個Map任務Shuffle到不一樣的Reduce分區中去的全部數據均可以寫入到同一個外部磁盤文件中去,用簡單的Offset標誌不一樣Reduce分區的數據在這個文件中的偏移量。這樣一個Map任務就只須要生成一個shuffle文件,從而避免了上述HashShuffleManager可能遇到的文件數量巨大的問題

二者的性能比較,取決於內存,排序,文件操做等因素的綜合影響。

對於不須要進行排序的Shuffle操做來講,如repartition等,若是文件數量不是特別巨大,HashShuffleManager面臨的內存問題不大,而SortShuffleManager須要額外的根據Partition進行排序,顯然HashShuffleManager的效率會更高。

而對於原本就須要在Map端進行排序的Shuffle操做來講,如ReduceByKey等,使用HashShuffleManager雖然在寫數據時不排序,但在其它的步驟中仍然須要排序,而SortShuffleManager則能夠將寫數據和排序兩個工做合併在一塊兒執行,所以即便不考慮HashShuffleManager的內存使用問題,SortShuffleManager依舊可能更快。


5. Shuffle相關屬性


可在conf/spark-defaults.conf中配置,或者是在spark-submit --conf中提供參數

(eg.spark-submit --conf spark.shuffle.spill=false)

屬性名

缺省值

含義

spark.shuffle.consolidateFiles

false

若是爲true,shuffle時就合併中間文件,對於有大量Reduce任務的shuffle來講,合併文件能夠提升文件系統性能,若是使用的是ext4xfs文件系統,建議設置爲true;對於ext3,因爲文件系統的限制,設置爲true反而會使內核>8的機器下降性能

spark.shuffle.spill

true

若是爲true,經過使數據一出道磁盤對reduce階段內存的使用進行限制。移除的閾值由spark.shuffle.memoryFraction指定

spark.shuffle.spill.compress

true

shuffle時是否壓縮溢出的數據。具體壓縮方式由spark.io.compression.codec屬性設定。

spark.shuffle.memoryFraction

0.2

只有spark.shuffle.spill設爲true,此選項纔有意義,決定了當shuffle過程當中使用的內存達到總內存多少比例的時候開始Spill,默認爲20%,若是Spill的太頻繁,能夠適當增長該數值,減小Spill次數。

spark.shuffle.compress

true

是否壓縮map的輸出文件,一般選擇壓縮。具體壓縮方式spark.io.compression.codec屬性設定。

spark.shuffle.file.buffer.kb

32

每一個shuffle的文件輸出流的內存緩衝區大小,單位是kb。這些緩衝區減小了建立shuffle中間文件時的系統調用以及磁盤尋道的次數。

spark.reducer.maxMbInFlight

48

設定同時從reduce任務中取出的Map輸出最大值.單位是MB。由於要爲每一份輸出建立一個緩衝區進行接收,這表示每一個reduce任務要消耗固定大小的內存,因此,儘可能使這個選項的值較小,除非有大量的內存可用。

spark.shuffle.manager

HASH

對數據進行shuffle時執行的shuffle管理器。基於Hashshuffle管理器是默認的,可是從spark1.1開始,出現了基於排序的shuffle管理器,後者在小的executor環境下,如YARN中會有更好的內存效率。要使用後者,將值設定爲SORT

spark.shuffle.sort.

bypassMergeThreshold

200

該參數只適用於spark.shuffle.manager設置爲SORT,由於SortShuffleManager在處理不須要排序的shuffle操做時,會因爲排序引發性能降低,該參數決定了在Reduce分區少於200時,不使用Merge Sort的方式處理數據,而是與Hash Shuffle相似,直接將分區文件寫入調度的文件,不一樣的是在最後仍是會將這些文件合併成一個獨立的文件。經過取出Sort步驟來加快處理速度,帶價是須要併發打開多個文件,致使農村消耗增長,本質是相對HashShuffleManager的一個折衷方案,若是GC問題嚴重,能夠下降該值

相關文章
相關標籤/搜索