大多數Spark做業的性能主要就是消耗在了shuffle環節,由於該環節包含了大量的磁盤IO、序列化、網絡數據傳輸等操做。所以,若是要讓做業的性能更上一層樓,就有必要對shuffle過程進行調優。可是也必須提醒你們的是,影響一個Spark做業性能的因素,主要仍是代碼開發、資源參數以及數據傾斜,shuffle調優只能在整個Spark的性能調優中佔到一小部分而已。所以你們務必把握住調優的基本原則,千萬不要捨本逐末。下面咱們就給你們詳細講解shuffle的原理,以及相關參數的說明,同時給出各個參數的調優建議。算法
#ShuffleManager發展概述
在Spark的源碼中,負責shuffle過程的執行、計算和處理的組件主要就是ShuffleManager,也即shuffle管理器。而隨着Spark的版本的發展,ShuffleManager也在不斷迭代,變得愈來愈先進。網絡
在Spark 1.2之前,默認的shuffle計算引擎是HashShuffleManager。該ShuffleManager而HashShuffleManager有着一個很是嚴重的弊端,就是會產生大量的中間磁盤文件,進而由大量的磁盤IO操做影響了性能。數據結構
所以在Spark 1.2之後的版本中,默認的ShuffleManager改爲了SortShuffleManager。SortShuffleManager相較於HashShuffleManager來講,有了必定的改進。主要就在於,每一個Task在進行shuffle操做時,雖然也會產生較多的臨時磁盤文件,可是最後會將全部的臨時文件合併(merge)成一個磁盤文件,所以每一個Task就只有一個磁盤文件。在下一個stage的shuffle read task拉取本身的數據時,只要根據索引讀取每一個磁盤文件中的部分數據便可。ide
下面咱們詳細分析一下HashShuffleManager和SortShuffleManager的原理。性能
#HashShuffleManager運行原理大數據
##未經優化的HashShuffleManager優化
下圖說明了未經優化的HashShuffleManager的原理。這裏咱們先明確一個假設前提:每一個Executor只有1個CPU core,也就是說,不管這個Executor上分配多少個task線程,同一時間都只能執行一個task線程。spa
咱們先從shuffle write開始提及。shuffle write階段,主要就是在一個stage結束計算以後,爲了下一個stage能夠執行shuffle類的算子(好比reduceByKey),而將每一個task處理的數據按key進行「分類」。所謂「分類」,就是對相同的key執行hash算法,從而將相同key都寫入同一個磁盤文件中,而每個磁盤文件都只屬於下游stage的一個task。在將數據寫入磁盤以前,會先將數據寫入內存緩衝中,當內存緩衝填滿以後,纔會溢寫到磁盤文件中去。線程
那麼每一個執行shuffle write的task,要爲下一個stage建立多少個磁盤文件呢?很簡單,下一個stage的task有多少個,當前stage的每一個task就要建立多少份磁盤文件。好比下一個stage總共有100個task,那麼當前stage的每一個task都要建立100份磁盤文件。若是當前stage有50個task,總共有10個Executor,每一個Executor執行5個Task,那麼每一個Executor上總共就要建立500個磁盤文件,全部Executor上會建立5000個磁盤文件。因而可知,未經優化的shuffle write操做所產生的磁盤文件的數量是極其驚人的。排序
接着咱們來講說shuffle read。shuffle read,一般就是一個stage剛開始時要作的事情。此時該stage的每個task就須要將上一個stage的計算結果中的全部相同key,從各個節點上經過網絡都拉取到本身所在的節點上,而後進行key的聚合或鏈接等操做。因爲shuffle write的過程當中,task給下游stage的每一個task都建立了一個磁盤文件,所以shuffle read的過程當中,每一個task只要從上游stage的全部task所在節點上,拉取屬於本身的那一個磁盤文件便可。
shuffle read的拉取過程是一邊拉取一邊進行聚合的。每一個shuffle read task都會有一個本身的buffer緩衝,每次都只能拉取與buffer緩衝相同大小的數據,而後經過內存中的一個Map進行聚合等操做。聚合完一批數據後,再拉取下一批數據,並放到buffer緩衝中進行聚合操做。以此類推,直到最後將全部數據到拉取完,並獲得最終的結果。
##優化後的HashShuffleManager
下圖說明了優化後的HashShuffleManager的原理。這裏說的優化,是指咱們能夠設置一個參數,spark.shuffle.consolidateFiles。該參數默認值爲false,將其設置爲true便可開啓優化機制。一般來講,若是咱們使用HashShuffleManager,那麼都建議開啓這個選項。
開啓consolidate機制以後,在shuffle write過程當中,task就不是爲下游stage的每一個task建立一個磁盤文件了。此時會出現shuffleFileGroup的概念,每一個shuffleFileGroup會對應一批磁盤文件,磁盤文件的數量與下游stage的task數量是相同的。一個Executor上有多少個CPU core,就能夠並行執行多少個task。而第一批並行執行的每一個task都會建立一個shuffleFileGroup,並將數據寫入對應的磁盤文件內。
當Executor的CPU core執行完一批task,接着執行下一批task時,下一批task就會複用以前已有的shuffleFileGroup,包括其中的磁盤文件。也就是說,此時task會將數據寫入已有的磁盤文件中,而不會寫入新的磁盤文件中。所以,consolidate機制容許不一樣的task複用同一批磁盤文件,這樣就能夠有效將多個task的磁盤文件進行必定程度上的合併,從而大幅度減小磁盤文件的數量,進而提高shuffle write的性能。
假設第二個stage有100個task,第一個stage有50個task,總共仍是有10個Executor,每一個Executor執行5個task。那麼本來使用未經優化的HashShuffleManager時,每一個Executor會產生500個磁盤文件,全部Executor會產生5000個磁盤文件的。可是此時通過優化以後,每一個Executor建立的磁盤文件的數量的計算公式爲:CPU core的數量 * 下一個stage的task數量。也就是說,每一個Executor此時只會建立100個磁盤文件,全部Executor只會建立1000個磁盤文件。
#SortShuffleManager運行原理
SortShuffleManager的運行機制主要分紅兩種,一種是普通運行機制,另外一種是bypass運行機制。當shuffle read task的數量小於等於spark.shuffle.sort.bypassMergeThreshold參數的值時(默認爲200),就會啓用bypass機制。
##普通運行機制
下圖說明了普通的SortShuffleManager的原理。在該模式下,數據會先寫入一個內存數據結構中,此時根據不一樣的shuffle算子,可能選用不一樣的數據結構。若是是reduceByKey這種聚合類的shuffle算子,那麼會選用Map數據結構,一邊經過Map進行聚合,一邊寫入內存;若是是join這種普通的shuffle算子,那麼會選用Array數據結構,直接寫入內存。接着,每寫一條數據進入內存數據結構以後,就會判斷一下,是否達到了某個臨界閾值。若是達到臨界閾值的話,那麼就會嘗試將內存數據結構中的數據溢寫到磁盤,而後清空內存數據結構。
在溢寫到磁盤文件以前,會先根據key對內存數據結構中已有的數據進行排序。排序事後,會分批將數據寫入磁盤文件。默認的batch數量是10000條,也就是說,排序好的數據,會以每批1萬條數據的形式分批寫入磁盤文件。寫入磁盤文件是經過Java的BufferedOutputStream實現的。BufferedOutputStream是Java的緩衝輸出流,首先會將數據緩衝在內存中,當內存緩衝滿溢以後再一次寫入磁盤文件中,這樣能夠減小磁盤IO次數,提高性能。
一個task將全部數據寫入內存數據結構的過程當中,會發生屢次磁盤溢寫操做,也就會產生多個臨時文件。最後會將以前全部的臨時磁盤文件都進行合併,這就是merge過程,此時會將以前全部臨時磁盤文件中的數據讀取出來,而後依次寫入最終的磁盤文件之中。此外,因爲一個task就只對應一個磁盤文件,也就意味着該task爲下游stage的task準備的數據都在這一個文件中,所以還會單獨寫一份索引文件,其中標識了下游各個task的數據在文件中的start offset與end offset。
SortShuffleManager因爲有一個磁盤文件merge的過程,所以大大減小了文件數量。好比第一個stage有50個task,總共有10個Executor,每一個Executor執行5個task,而第二個stage有100個task。因爲每一個task最終只有一個磁盤文件,所以此時每一個Executor上只有5個磁盤文件,全部Executor只有50個磁盤文件。
##bypass運行機制 下圖說明了bypass SortShuffleManager的原理。bypass運行機制的觸發條件以下:
此時task會爲每一個下游task都建立一個臨時磁盤文件,並將數據按key進行hash而後根據key的hash值,將key寫入對應的磁盤文件之中。固然,寫入磁盤文件時也是先寫入內存緩衝,緩衝寫滿以後再溢寫到磁盤文件的。最後,一樣會將全部臨時磁盤文件都合併成一個磁盤文件,並建立一個單獨的索引文件。
該過程的磁盤寫機制其實跟未經優化的HashShuffleManager是如出一轍的,由於都要建立數量驚人的磁盤文件,只是在最後會作一個磁盤文件的合併而已。所以少許的最終磁盤文件,也讓該機制相對未經優化的HashShuffleManager來講,shuffle read的性能會更好。
而該機制與普通SortShuffleManager運行機制的不一樣在於:第一,磁盤寫機制不一樣;第二,不會進行排序。也就是說,啓用該機制的最大好處在於,shuffle write過程當中,不須要進行數據的排序操做,也就節省掉了這部分的性能開銷。
#shuffle相關參數調優
如下是Shffule過程當中的一些主要參數,這裏詳細講解了各個參數的功能、默認值以及基於實踐經驗給出的調優建議。
spark.shuffle.file.buffer
spark.reducer.maxSizeInFlight
spark.shuffle.io.maxRetries
spark.shuffle.io.retryWait
spark.shuffle.memoryFraction
spark.shuffle.manager
spark.shuffle.sort.bypassMergeThreshold
spark.shuffle.consolidateFiles