Spark Shuffle大揭祕

什麼是Shuffle:緩存

Shuffle中文翻譯爲「洗牌」,須要Shuffle的關鍵緣由是某種具備共同特徵的數據須要最終匯聚到一個計算節點上進行計算。網絡

Shuffle面臨的問題:app

1. 數據量很是大;負載均衡

2 數據如何分類,及如何Partition,Hash、Sort、鎢絲計劃框架

3. 負載均衡(數據傾斜)oop

4. 網絡傳輸效率,須要在壓縮和解壓縮作出權衡,序列化和反序列化也是須要考慮的問題。spa

Hash Shuffle:翻譯

1. Key不能是Array3d

2. Hash Shuffle不須要排序,從理論上就節省了Hadoop MapReduce中進行Shuffle須要排序時候的時間浪費,由於實際生產環境有大量不要排序的Shuffle類型。blog

思考:不要排序的Hash Shuffle是否必定比不須要排序的Sort額度 Shuffle速度更快? 不必定,若是數據規模比較小的狀況下,Hash Shuffle會比Sorted Shuffle速度快(不少)!可是若是數據量大,此時Sorted Shuffle通常會比Hash Shuffle快(不少)。

3. 每一個ShuffleMapTask會根據key的哈希值計算出當前的key須要寫入的Partition,而後把決定後的結果寫入單獨的文件,此時會致使每一個Task產生R(指下一個Stage的並行度)個文件,若是當前的Stage中有M個ShuffleMapTask,則會M*R個文件。

  注意:Shuffle操做絕大多數都要經過網路,若是Mapper和Reducer在同一臺機器上,此時只須要讀取本地磁盤便可。

Hash Shuffle的兩大死穴:第一:Shuffle前會產生大量的小文件到磁盤之上,此時會產生大量耗時低效的IO操做;第二:因爲內存中須要保存海量的文件操做句柄和臨時緩存信息,若是數據量比較龐大的話,內存不可承受,出現OOM等問題。

爲了改善上述問題(同時打開太多文件致使Write Handler內存使用過大以及過多文件致使大量的隨機讀寫帶來的效率低下的磁盤IO操做),後來推出了Consalidate機制,來把小文件合併,此時Shuffle時產生的文件數量爲cores*R,對於ShuffleMapTask的數量明顯多於同時可用的並行cores的數量的狀況下,Shuffle產生的文件大幅減小,會極大減低OOM的可能。

爲此Spark推出了Shuffle Pluggable開發框架,方便系統升級的時候定製Shuffle功能模塊,業方面第三方系統改造人員根據實際的業務場景來開發具體最佳的Shuffle模塊;核心接口ShuffleManager,具體默認的實現由HashShuffleManager、SortShuffleManager等,Spark1.6.0中具體的配置以下:

 

爲何須要Sort-Based Shuffle?

1. Shuffle通常包含兩個階段任務:第一部分,產生Shuffle數據的階段(Map階段,額外的補充,須要實現ShuffleManager中getWriter來寫數據(數據能夠以BlockManager寫到Memory、Disk、Tachyon等,例如像很是快的Shuffle,此時能夠考慮把數據寫在內存中,可是內存不穩定,建議採用MEMOrY_AND_DISK方式)),第二部分,使用Shuffle數據的階段(Reduce階段,額外的補充,須要實現ShuffleManager的getReader,Reader會向Driver去獲取上一個Stage產生的Shuffle數據)。

2.Spark的Job會被劃分紅不少Stage:

    若是隻有一個Stage,則這個Job就至關於只有一個Mapper階段,固然不會產生Shuffle,適合於簡單的ETL;

   若是不止一個Stage,則最後一個Stage就是最終的Reducer,最左側的第一個Stage就僅僅是整個Job的Mapper,中間全部的任意一個Stage是其父Stage的Reducer且是其子Stage的Mapper。

3.Spark Shuffle在最開始的時候只支持Hash-base Shuffle:默認Mappper階段會爲Reducer階段的每個Task單首創建一個文件來保存該Task中要使用的數據,可是在一些狀況下(例如數據量很是大的狀況)會形成大量文件(M*R,其中M表明Mapper中的全部的並行任務的數量,R表明)

3.

 

 

相關文章
相關標籤/搜索