Spark RDD概念學習系列之Spark Hash Shuffle內幕完全解密(二十)

 

本博文的主要內容:算法

  一、Hash Shuffle完全解密apache

  二、Shuffle Pluggable解密緩存

  三、Sorted Shuffle解密性能優化

  四、Shuffle性能優化微信

 

一:到底什麼是Shuffle?網絡

  Shuffle中文翻譯爲「洗牌」,須要Shuffle的關鍵性緣由是某種具備共同特徵的數據須要最終匯聚到一個計算節點上進行計算。app

 

 

二:Shuffle可能面臨的問題?負載均衡

  運行Task的時候纔會產生Shuffle(Shuffle已經融化在Spark的算子中了)。框架

  一、 數據量很是大;【幾千甚至上萬臺機器進行Shuffle的數據量會很大,從其餘各臺機器上收集過來數據的時候,網絡傳輸量會很恐怖】分佈式

  二、 數據如何分類,即如何Partition,Hash、Sort、鎢絲計算;【不一樣的Partition的不一樣實現,他會影響集羣規模的大小,會影響內存的使用,會影響性能等等方面,也就有了Shuffle幾個不一樣的淨化階段】

  三、 負載均衡(數據傾斜);【由於採用不一樣的Shuffle的方式對數據進行不一樣的分類,而分類以後數據又分到不一樣的節點上進行計算,若是Shuffle分類不恰當,會致使負載均衡,也就是數據傾斜】

  四、 網絡傳輸效率,須要在壓縮和解壓縮之間作出權衡,序列化和反序列也是要考慮的問題;【若是壓縮,則須要解壓縮,解壓縮須要消耗CPU,因此須要衡量帶寬和CPU解壓的時間,作出正確的權衡】

  說明:具體的Task進行計算的時候盡一切最大可能使得數據具有Process Locality的特性

  【由於這是它運行最快的方式,數據在內存中,也就是默認採起的方式,若是無可奈何,數據不能所有放在內存中,從實際生成角度講(即不具有內存本地性)】;退而求次是增長數據分片,減小每一個Task處理的數據量

  【致使任務運行的批次更多,任務更多】。

  【1,cache自己具備風險,Memory溢出風險,它被其餘計算佔用掉內存的風險,致使從新計算,除非計算特別複雜,計算鏈條特別長,可能有必要爲了容錯,爲了再次數據複用,來進行中間結果的持久化,不然的話,尤爲是持久化到disk時,還不如在內存中直接計算,這樣的速度有可能比從磁盤中讀取曾經計算結果來的更快2,度磁盤I/O是一個高風險的動做,讀內存分享會下降不少。

  在一個Stage內部,不持久化中間結果,數據丟失從新計算依賴的RDD;可是在產生Shuffle的時候,會產生網絡通訊,這是須要持久化。

持久化默認狀況下放在磁盤中,也能夠調整Spark的框架,將數據放在內存中,如今通常放在Local FileSystem上面,也能夠放在Tachyon中,這些均可以經過調整Spark的配置和改造Spark源碼來實現。】

 

 

 

 

三:Hash Shuffle完全解密

  一、key不能是Array;

  【key若是是Array,則就沒法很是友好的計算具體的hashcode值】

  二、 Hash Shuffle不須要排序

  【使得速度很快,其工做機制根據Shuffle的前面的Stage的最後一個final RDD,依據Partition把數據分紅不一樣的類,按照Key的hashcode,而後按照必定的業務邏輯規則(例如,假以下一個Stage有3個並行任務,最簡單的就是取模3運算,分紅3種類型的數據)無需排序,性能很好】,此時從理論上講就節省了Hadoop MapReduce中進行Shuffle須要排序時候的時間浪費,由於實際生產環境有大量的不須要排序的Shuffle類型;

  思考:不須要排序的Hash Shuffle是否必定比須要排序的Sorted Shuffle速度更快?不必定!若是數據規模比較小的情形下,Hash Shuffle會比Sorted Shuffle速度快(不少)!可是若是數據量大,此時Sorted Shuffle通常都會比Hash Shuffle快(不少)

  【數據量大的狀況下,Sorted Shuffle比Hash Shuffle快的緣由:若是數據規模比較 大,可能Hash Shuffle沒法處理,由於hash的方式時會有key和句柄之類,還有許 多小文件,此時,磁盤的性能會成爲瓶頸,內存也會變成瓶頸。Sorted Shuffle會極 大地節省磁盤、內存的訪問,更有利於更大規模的數據運算】

  三、每一個ShuffleMapTask會根據key的哈希值計算出當前的key須要寫入的Partition,而後把決定後的結果寫入當單獨的文件,此時會致使每一個Task產生R(指下一個Stage的並行度)個文件,若是當前的Stage中有M個ShuffleMapTask,則會M*R個文件!!!

  注意:Shuffle操做絕大多數狀況下都要經過網絡,若是Mapper和Reducer在同一臺機器上,此時只須要讀取本地磁盤便可。

  【每一個任務都產生R個小文件,因爲其須要將數據分紅幾種不一樣類型,就是下一個Stage的具體的Task會讀取的與本身相關的數據,由於已經分好類了,此時會產生M*R個小文件,那麼下一個Stage就會經過網絡根據Driver的註冊信息(因爲上一個Stage寫過的內容會註冊給Driver),而後詢問Driver上一個Stage具體的輸出在哪裏,以及哪些屬於該Stage的部分,經過網絡讀取數據;同時Shuffle的數據不必定都須要經過網絡(有可能在同一臺機器上)】

  Hash Shuffle的兩大死穴:第一:Shuffle前會產生海量的小文件於磁盤之上,此時會產生大量耗時低效的IO操做;第二:內存不共用!!!因爲內存中須要保存海量的文件操做句柄和臨時緩存信息,若是數據處理規模比較龐大的話,內存不可承受,出現OOM等問題!

            

  

  

 

  Hash-based Shuffle另外一說法

  一、  Spark Shuffle在最開始的時候只支持Hash-based Shuffle:默認Mapper階段會爲Reducer階段的每個Task單首創建一個文件來保存該Task中要使用的數據。

  優勢:就是操做數據簡單。

  缺點:可是在一些狀況下(例如數據量很是大的狀況)會形成大量文件(M*R,其中M表明Mapper中的全部的並行任務數量,R表明Reducer中全部的並行任務數據)大數據的隨機磁盤I/O操做且會造成大量的Memory(極易形成OOM)。

  二、Hash-based Shuffle產生的問題:

  第一:不可以處理大規模的數據

  第二:Spark不可以運行在大規模的分佈式集羣上!

     三、Consolidate機制:

  後來的改善是加入了Consolidate機制來將Shuffle時候產生的文件數量減小到C*R個(C表明在Mapper端,同時可以使用的cores數量,R表明Reducer中全部的並行任務數量)。可是此時若是Reducer端的並行數據分片過多的話則C*R可能已通過大,此時依舊沒有逃脫文件打開過多的厄運!!!Consolidate並無下降並行度,只是下降了臨時文件的數量,此時Mapper端的內存消耗就會變少,因此OOM也就會下降,另一方面磁盤的性能也會變得更好。

  Spark在引入Sort-Based Shuffle以前,適合中小型數據規模的大數據處理!

 

 

四: Sorted Shuffle解密

  爲了改善上述的問題(同時打開過多文件致使Writer Handler內存使用過大以及產生過分文件致使大量的隨機讀寫帶來的效率極爲低下的磁盤IO操做),Spark後來推出了Consalidate機制,來把小文件合併【根據TaskId進行合併】,此時Shuffle時文件產生的數量爲cores*R,對於ShuffleMapTask的數量明顯多於同時可用的並行Cores的數量的狀況下,Shuffle產生的文件會大幅度減小,會極大下降OOM的可能;【consalidate機制減小了文件,同時也減小了文件句柄的數量;但對於並行度很是高時,及R值特別大時,仍是很麻煩。】

  【在接口ShuffleManager中:registerShuffle:由Driver註冊源數據中的信息,系統默認狀況下其有HashBasedShuffle和SortedBasedShuffle兩種狀況。getReader和getWriter:獲取怎麼在Shuffle的時候寫本地數據,獲取下一個Stage讀取上一個Stage的具體數據的閱讀器。unregisterShuffle:刪除掉本地的Shuffle的源數據。Stop:中止ShuffleManager】

  爲此Spark推出了Shuffle Pluggable開放框架,方便系統升級的時候定製Shuffle功能模塊,也方便第三方系統改造人員根據實際的業務場景來開放具體最佳的Shuffle模塊;核心接口ShuffleManager,具體默認實現有HashShuffleManager、SortShuffleManager等,Spark 1.6.0/Spark 1.5.2中具體的配置以下:

 

val shortShuffleMgrNames = Map(
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager",
"tungsten-sort" -> "org.apache.spark.shuffle.unsafe.UnsafeShuffleManager")

  默認採起SortedBasedShuffle的方式。

 

Sort是如何解決內存不夠、小文件過多的問題?

  採用Hash的方式的適用場景是數據規模相對比較小,並且不須要排序。Hadoop的MapReduce進行排序,使得處理數據規模更大,集羣規模更大。

 

   Consalidate必定程度上解決了該問題,但仍不完全,SortedBasedShuffle更好的解決了該問題。首先,每一個ShuffleMapTask不會爲每一個Reducer生成一個單獨的文件,它護肩全部的結果寫到一個文件裏,同時生成一個Index索引文件,每一個Reducer能夠根據這個Index索引文件取得它所須要處理的數據,這樣就避免產生大量文件,沒有了大量文件,也就沒有了大量的文件句柄,節省了內存;同時因爲磁盤上文佳變少了,並且有Index索引,不用隨機的去讀寫,而是順序的disk I/O,帶來了低延遲,節省了內存;另外一方面,減小了GC風險和頻率,而減小具體的文件數量能夠避免同時些多個文件是給系統帶來的壓力,這就是優點所在。

 

  具體的實現:ShuffleMapTask會按照Key相應的Partition的ID進行Sort,若是屬於同一個Partition的Key,自己不進行Sort,所以對不須要sort的操做來講,若是內存不夠用,他就會把那些已經排序的內容寫到外部disk,結束的時候再進行歸併排序(merge-sort)

爲高效讀取這些file Seagate,它有一個Index文件,會記錄不一樣的Partition的位置信息,BlockManager也會對它的尋址算法進行優化性的實現。歸併排序最優是打開10-100個文件。

  最後生成文件時須要同時生成Index索引文件。

 

  對具體的ShuffleMapTask,它外部有具體的歸併排序方式,mergeSort,sort以後會產生兩個文件,這兩個文件其中一個是Index索引文件,一個是存放具體的Task的輸出內容,在Reducer端讀取數據的時候,其實首先訪問Index,具體在工做的時候,BlockManager首先訪問Index,經過Index去定位具體文件內容。避免了大量文件句柄,節省內存。

  採用Sort方式集羣的規模和數據的計算規模就不受限制了。

 

 

 

Sort-Based Shuffle的另外一說法

  一、爲了讓Spark在更大規模的集羣上更高性能處理更大規模的數據,因而就引入了Sort-based Shuffle!今後之後(Spark1.1版本開始),Spark能夠勝任任何規模(包括PB級別及PB以上的級別)的大數據的處理,尤爲是鎢絲計劃的引入和優化,Spark更快速的在更大規模的集羣處理更海量的數據的能力推向了一個新的巔峯!

  二、Spark1.6版本支持最少三種類型Shuffle:

        

實現ShuffleManager接口能夠根據本身的業務實際須要最優化的使用自定義的Shuffle實現;

   三、Spark1.6默認採用的就是Sort-based Shuffle的方式:

val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")

  上述源碼說明,你能夠在Spark配置文件中配置Spark框架運行時要使用的具體的ShuffleManager的實現。能夠在conf/spark-default.conf加入以下內容:

      spark.shuffle.manager SORT   配置Shuffle方式是SORT

  四、  Sort-based Shuffle的工做方式以下:Shuffle的目的就是:數據分類,而後數據彙集

  1)       首先每一個ShuffleMapTask不會爲每一個Reducer單獨生成一個文件,相反,Sort-based Shuffle會把Mapper中每一個ShuffleMapTask全部的輸出數據Data只寫到一個文件中。由於每一個ShuffleMapTask中的數據會被分類,因此Sort-based Shuffle使用了index文件存儲具體ShuffleMapTask輸出數據在同一個Data文件中是如何分類的信息!!

  2)       基於Sort-base的Shuffle會在Mapper中的每個ShuffleMapTask中產生兩個文件:Data文件和Index文件,其中Data文件是存儲當前Task的Shuffle輸出的。而index文件中則存儲了Data文件中的數據經過Partitioner的分類信息,此時下一個階段的Stage中的Task就是根據這個Index文件獲取本身所要抓取的上一個Stage中的ShuffleMapTask產生的數據的,Reducer就是根據index文件來獲取屬於本身的數據。

涉及問題:Sorted-based Shuffle:會產生 2*M(M表明了Mapper階段中並行的Partition的總數量,其實就是ShuffleMapTask的總數量)個Shuffle臨時文件。

Shuffle產生的臨時文件的數量的變化一次爲:

                  Basic Hash Shuffle: M*R;

                  Consalidate方式的Hash Shuffle: C*R;

                  Sort-based Shuffle: 2*M;

 

 

 

 

 

 

在集羣中動手實戰Sort-based Shuffle

  在Sorted-based Shuffle中Reducer是如何獲取本身須要的數據呢?具體而言,Reducer首先找Driver去獲取父Stage中的ShuffleMapTask輸出的位置信息,根據位置信息獲取index文件,解析index,從解析的index文件中獲取Data文件中屬於本身的那部份內容;

  Sorted-based Shuffle與排序沒有關係,Sorted-based Shuffle並無對內容進行排序,Sorted-based Shuffle是對Shuffle進行Sort,對咱們具體要執行的內容沒有排序。

  Reducer在何時去fetch數據?

當parent Stage的全部ShuffleMapTasks結束後再fetch。等全部的ShuffleMapTask執行完以後,邊fetch邊計算。

  經過動手實踐確實證實了Sort-based Shuffle產生了2M個文件。M是並行Task的數量。

    Shuffle_0_0_0.data

           shuffle_0_3_0.index

從上能夠看出index文件和data文件數量是同樣的。

 

Sorted Shuffle Writer源碼:

  1. ShuffleMapTask的runTask方法

  反序列化RDD和Dependency

  調用SortShuffleManager的getWriter方法。

  Writer方法寫入結果。

 

   2.       SortShuffleManager複寫了ShuffleManager中的getWriter方法,源碼以下:

 

 

   3.       SorShuffleWriter的write方法源碼以下:

 

 

其中ShuffleBlockId記錄shuffleId和mapId得到Block。

 

 

  1. 其中writeIndexFileAndCommit方法:

用於在Block的索引文件中記錄每一個block的偏移量,其中getBlockData方法能夠根據ShuffleId和mapId讀取索引文件,得到前面partition計算以後,,將結果寫入文件中的偏移量和結果的大小。

/**

 * Write an index file with the offsets of each block, plus a final offset at the end for the

 * end of the output file. This will be used by getBlockData to figure out where each block

 * begins and ends.

 *

 * It will commit the data and index file as an atomic operation, use the existing ones, or

 * replace them with new ones.

 *

 * Note: the `lengths` will be updated to match the existing index file if use the existing ones.

 * */

def writeIndexFileAndCommit(

    shuffleId: Int,

mapId: Int,

lengths: Array[Long],

dataTmp: File): Unit = {

val indexFile = getIndexFile(shuffleId, mapId)

val indexTmp = Utils.tempFileWith(indexFile)

val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))

  Utils.tryWithSafeFinally {

// We take in lengths of each block, need to convert it to offsets.

var offset = 0L

out.writeLong(offset)

for (length <- lengths) {

      offset += length

      out.writeLong(offset)

    }

  } {

    out.close()

  }


val dataFile = getDataFile(shuffleId, mapId)

// There is only one IndexShuffleBlockResolver per executor, this synchronization make sure

  // the following check and rename are atomic.

synchronized {

val existingLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length)

if (existingLengths != null) {

// Another attempt for the same task has already written our map outputs successfully,

      // so just use the existing partition lengths and delete our temporary map outputs.

System.arraycopy(existingLengths, 0, lengths, 0, lengths.length)

if (dataTmp != null && dataTmp.exists()) {

        dataTmp.delete()

      }

      indexTmp.delete()

    } else {

// This is the first successful attempt in writing the map outputs for this task,

      // so override any existing index and data files with the ones we wrote.

if (indexFile.exists()) {

        indexFile.delete()

      }

if (dataFile.exists()) {

        dataFile.delete()

      }

if (!indexTmp.renameTo(indexFile)) {

throw new IOException("fail to rename file " + indexTmp + " to " + indexFile)

      }

if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) {


 

默認Sort-based Shuffle的幾個缺陷:

  1. 若是Mapper中Task的數量過大,依舊會產生不少小文件,此時在Shuffle傳遞數據的過程當中到Reducer端,reduce會須要同時打開大量的記錄來進行反序列化,致使大量的內存消耗和GC的巨大負擔,形成系統緩慢甚至崩潰!

  2.若是須要在分片內也進行排序的話,此時須要進行Mapper端和Reducer端的兩次排序!!!

優化:

         能夠改造Mapper和Reducer端,改框架來實現一次排序。

         頻繁GC的解決辦法是:鎢絲計劃!!

 

 

 

 

 

感謝下面的博主:

王家林 中國Spark第一人

DT大數據夢工廠

新浪微博: http://weibo.com.ilovepains/

微信公共號DT_Spark

博客:http://bolg.sina.com.cn/ilovepains

手機:18610086859

qq:1740415547

郵箱:18610086859@vip.126.com

相關文章
相關標籤/搜索