HashShuffle調優概述算法
大多數Spark做業的性能主要就是消耗在了shuffle環 節,由於該環節包含了大量的磁盤IO、序列化、網絡數據傳輸等操做。所以,若是要讓做業的性能更上一層樓,就有必要對shuffle過程進行調優。可是也 必須提醒你們的是,影響一個Spark做業性能的因素,主要仍是代碼開發、資源參數以及數據傾斜,shuffle調優只能在整個Spark的性能調優中佔 到一小部分而已。所以你們務必把握住調優的基本原則,千萬不要捨本逐末。下面咱們就給你們詳細講解shuffle的原理,以及相關參數的說明,同時給出各 個參數的調優建議。網絡
ShuffleManager發展概述數據結構
在Spark的源碼中,負責shuffle過程的執行、計算和處理的組件主要就是ShuffleManager,也即shuffle管理器。性能
在Spark 1.2之前,默認的shuffle計算引擎是HashShuffleManager。該ShuffleManager而HashShuffleManager有着一個很是嚴重的弊端,就是會產生大量的中間磁盤文件,進而由大量的磁盤IO操做影響了性能。優化
所以在Spark 1.2之後的版本中,默認的ShuffleManager改爲了SortShuffleManager。SortShuffleManager相較於 HashShuffleManager來講,有了必定的改進。主要就在於,每一個Task在進行shuffle操做時,雖然也會產生較多的臨時磁盤文件,但 是最後會將全部的臨時文件合併(merge)成一個磁盤文件,所以每一個Task就只有一個磁盤文件。在下一個stage的shuffle read task拉取本身的數據時,只要根據索引讀取每一個磁盤文件中的部分數據便可。spa
未優化的HashShuffleManager線程
圖說明了未經優化的HashShuffleManager的原理。這裏咱們先明確一個假設前提:每一個Executor只有1個CPU core,也就是說,不管這個Executor上分配多少個task線程,同一時間都只能執行一個task線程。blog
shuffle write:shuffle write階段就是在一個stage結束計算以後,爲了下一個stage能夠執行shuffle類的算子(好比reduceByKey),而將每一個 task處理的數據按key進行「分類」。也就是對相同的key執行hash算法,從而將相同key都寫入同一個磁盤文件中,而每個磁盤文件都只屬於下游stage的一個task。在將數據寫入磁盤以前,會先將數據寫入內存緩衝中,當內存緩衝填滿以後,纔會溢寫到磁盤文件中去。排序
那麼每一個執行shuffle write的task,要爲下一個stage建立多少個磁盤文件呢?下一個stage的task有多少個,當前stage的每一個task就要建立多少份磁盤文件。好比下一個stage總共有100個task,那麼當前stage的每一個task都要建立100份磁盤文件。若是當前stage有50個 task,則總共會創建5000個磁盤文件。因而可知,未經優化的shuffle write操做所產生的磁盤文件的數量是極其驚人的。索引
shuffle read:shuffle read,一般就是一個stage剛開始時要作的事情。此時該stage的每個task就須要將上一個stage的計算結果中的全部相同key,從各個 節點上經過網絡都拉取到本身所在的節點上,而後進行key的聚合或鏈接等操做。因爲shuffle write的過程當中,task給下游stage的每一個task都建立了一個磁盤文件,所以shuffle read的過程當中,每一個task只要從上游stage的全部task所在節點上,拉取屬於本身的那一個磁盤文件便可。
shuffle read的拉取過程是一邊拉取一邊進行聚合的。每一個shuffle read task都會有一個本身的buffer緩衝,每次都只能拉取與buffer緩衝相同大小的數據,而後經過內存中的一個Map進行聚合等操做。聚合完一批數 據後,再拉取下一批數據,並放到buffer緩衝中進行聚合操做。以此類推,直到最後將全部數據到拉取完,並獲得最終的結果。
優化後的HashShuffleManager
圖說明了優化後的HashShuffleManager的原理。這裏說的優化是指咱們能夠設置一個參數,spark.shuffle.consolidateFiles=true。該參數默認值爲false,一般來講若是咱們使用HashShuffleManager,那麼都建議開啓這個選項。
開啓consolidate機制以後,在shuffle write過程當中,task不會爲下游stage的每一個task建立一個磁盤文件,此時會出現shuffleFileGroup的概念,每一個 shuffleFileGroup會對應一批磁盤文件,磁盤文件的數量與下游stage的task數量是相同的。一個Executor上有多少個CPU core,就能夠並行執行多少個task。而第一批並行執行的每一個task都會建立一個shuffleFileGroup,並將數據寫入對應的磁盤文件內。
當Executor的CPU core執行完一批task,接着執行下一批task時,下一批task就會複用以前已有的shuffleFileGroup,包括其中的磁盤文件。也就是說task會將數據寫入已有的磁盤文件中,而不會寫入新的磁盤文件中。 這樣就能夠有效將多個task的磁盤文件進行必定程度上的合併,從而大幅度減小磁盤文件的數量,進而提高shuffle write的性能。
優化與未優化對比
假設第一個stage有 50個task, 第二個stage有100個task,總共仍是有10個Executor(CPU核心數),每一個Executor執行5個task。
未經優化的HashShuffleManager:每一個Executor會產生500個磁盤文件,全部Executor會產生5000個磁盤文件的。
每一個Executor建立的磁盤文件: 單個Executor執行task數 * 下一個stage的task數量 (5*100)
全部Executor建立的磁盤文件: 當前stage的task數 * 下一個stage的task數量 (50*100)
優化以後的HashShuffleManager: 每一個Executor此時只會建立100個磁盤文件,全部Executor只會建立1000個磁盤文件。
每一個Executor建立的磁盤文件:單個Executor CPU核心數量 * 下一個stage的task數量 (1*100)
全部 Executor建立的磁盤文件: 全部Executor CPU核心數量 * 下一個stage的task數量(10*100)
SortShuffle概述
普通機制:
a) map task 的計算結果會寫入到一個內存數據結構裏面,內存數據結構默認是5M
b) 在shuffle的時候會有一個定時器,不按期的去估算這個內存結構的大小,當內存結構中的數據超過5M時,好比如今內存結構中的數據爲5.01M,那麼他會申請5.01*2-5=5.02M內存給內存數據結構。
c) 若是申請成功不會進行溢寫,若是申請不成功,這時候會發生溢寫磁盤。
d) 在溢寫以前內存結構中的數據會進行排序分區
e) 而後開始溢寫磁盤,寫磁盤是以batch的形式去寫,一個batch是1萬條數據,
f) map task執行完成後,會將這些磁盤小文件合併成一個大的磁盤文件(有序),同時生成一個索引文件。
g) reduce task去map端拉取數據的時候,首先解析索引文件,根據索引文件再去拉取對應的數據。
產生磁盤小文件的個數: 2*M(map task的個數)索引文件-和磁盤文件
bypass機制:
a) bypass運行機制的觸發條件以下:
shuffle reduce task的數量小於spark.shuffle.sort.bypassMergeThreshold的參數值。這個值默認是200。
b)產生的磁盤小文件爲:2*M(map task的個數)