Spark性能優化指南—— shuffle調優

大多數Spark做業的性能主要就是消耗在了shuffle環節,由於該環節包含了大量的磁盤IO、序列化、網絡數據傳輸等操做。所以,若是要讓做業的性能更上一層樓,就有必要對shuffle過程進行調優。可是也必須提醒你們的是,影響一個Spark做業性能的因素,主要仍是代碼開發、資源參數以及數據傾斜,shuffle調優只能在整個Spark的性能調優中佔到一小部分而已。所以你們務必把握住調優的基本原則,千萬不要捨本逐末。下面咱們就給你們詳細講解shuffle的原理,以及相關參數的說明,同時給出各個參數的調優建議。算法

#ShuffleManager發展概述
在Spark的源碼中,負責shuffle過程的執行、計算和處理的組件主要就是ShuffleManager,也即shuffle管理器。而隨着Spark的版本的發展,ShuffleManager也在不斷迭代,變得愈來愈先進。網絡

在Spark 1.2之前,默認的shuffle計算引擎是HashShuffleManager。該ShuffleManager而HashShuffleManager有着一個很是嚴重的弊端,就是會產生大量的中間磁盤文件,進而由大量的磁盤IO操做影響了性能。數據結構

所以在Spark 1.2之後的版本中,默認的ShuffleManager改爲了SortShuffleManager。SortShuffleManager相較於HashShuffleManager來講,有了必定的改進。主要就在於,每一個Task在進行shuffle操做時,雖然也會產生較多的臨時磁盤文件,可是最後會將全部的臨時文件合併(merge)成一個磁盤文件,所以每一個Task就只有一個磁盤文件。在下一個stage的shuffle read task拉取本身的數據時,只要根據索引讀取每一個磁盤文件中的部分數據便可。ide

下面咱們詳細分析一下HashShuffleManager和SortShuffleManager的原理。性能

#HashShuffleManager運行原理大數據

##未經優化的HashShuffleManager優化

下圖說明了未經優化的HashShuffleManager的原理。這裏咱們先明確一個假設前提:每一個Executor只有1個CPU core,也就是說,不管這個Executor上分配多少個task線程,同一時間都只能執行一個task線程。spa

咱們先從shuffle write開始提及。shuffle write階段,主要就是在一個stage結束計算以後,爲了下一個stage能夠執行shuffle類的算子(好比reduceByKey),而將每一個task處理的數據按key進行「分類」。所謂「分類」,就是對相同的key執行hash算法,從而將相同key都寫入同一個磁盤文件中,而每個磁盤文件都只屬於下游stage的一個task。在將數據寫入磁盤以前,會先將數據寫入內存緩衝中,當內存緩衝填滿以後,纔會溢寫到磁盤文件中去。線程

那麼每一個執行shuffle write的task,要爲下一個stage建立多少個磁盤文件呢?很簡單,下一個stage的task有多少個,當前stage的每一個task就要建立多少份磁盤文件。好比下一個stage總共有100個task,那麼當前stage的每一個task都要建立100份磁盤文件。若是當前stage有50個task,總共有10個Executor,每一個Executor執行5個Task,那麼每一個Executor上總共就要建立500個磁盤文件,全部Executor上會建立5000個磁盤文件。因而可知,未經優化的shuffle write操做所產生的磁盤文件的數量是極其驚人的。排序

接着咱們來講說shuffle read。shuffle read,一般就是一個stage剛開始時要作的事情。此時該stage的每個task就須要將上一個stage的計算結果中的全部相同key,從各個節點上經過網絡都拉取到本身所在的節點上,而後進行key的聚合或鏈接等操做。因爲shuffle write的過程當中,task給下游stage的每一個task都建立了一個磁盤文件,所以shuffle read的過程當中,每一個task只要從上游stage的全部task所在節點上,拉取屬於本身的那一個磁盤文件便可。

shuffle read的拉取過程是一邊拉取一邊進行聚合的。每一個shuffle read task都會有一個本身的buffer緩衝,每次都只能拉取與buffer緩衝相同大小的數據,而後經過內存中的一個Map進行聚合等操做。聚合完一批數據後,再拉取下一批數據,並放到buffer緩衝中進行聚合操做。以此類推,直到最後將全部數據到拉取完,並獲得最終的結果。 輸入圖片說明

##優化後的HashShuffleManager

下圖說明了優化後的HashShuffleManager的原理。這裏說的優化,是指咱們能夠設置一個參數,spark.shuffle.consolidateFiles。該參數默認值爲false,將其設置爲true便可開啓優化機制。一般來講,若是咱們使用HashShuffleManager,那麼都建議開啓這個選項。

開啓consolidate機制以後,在shuffle write過程當中,task就不是爲下游stage的每一個task建立一個磁盤文件了。此時會出現shuffleFileGroup的概念,每一個shuffleFileGroup會對應一批磁盤文件,磁盤文件的數量與下游stage的task數量是相同的。一個Executor上有多少個CPU core,就能夠並行執行多少個task。而第一批並行執行的每一個task都會建立一個shuffleFileGroup,並將數據寫入對應的磁盤文件內。

當Executor的CPU core執行完一批task,接着執行下一批task時,下一批task就會複用以前已有的shuffleFileGroup,包括其中的磁盤文件。也就是說,此時task會將數據寫入已有的磁盤文件中,而不會寫入新的磁盤文件中。所以,consolidate機制容許不一樣的task複用同一批磁盤文件,這樣就能夠有效將多個task的磁盤文件進行必定程度上的合併,從而大幅度減小磁盤文件的數量,進而提高shuffle write的性能。

假設第二個stage有100個task,第一個stage有50個task,總共仍是有10個Executor,每一個Executor執行5個task。那麼本來使用未經優化的HashShuffleManager時,每一個Executor會產生500個磁盤文件,全部Executor會產生5000個磁盤文件的。可是此時通過優化以後,每一個Executor建立的磁盤文件的數量的計算公式爲:CPU core的數量 * 下一個stage的task數量。也就是說,每一個Executor此時只會建立100個磁盤文件,全部Executor只會建立1000個磁盤文件。 輸入圖片說明

#SortShuffleManager運行原理

SortShuffleManager的運行機制主要分紅兩種,一種是普通運行機制,另外一種是bypass運行機制。當shuffle read task的數量小於等於spark.shuffle.sort.bypassMergeThreshold參數的值時(默認爲200),就會啓用bypass機制。

##普通運行機制

下圖說明了普通的SortShuffleManager的原理。在該模式下,數據會先寫入一個內存數據結構中,此時根據不一樣的shuffle算子,可能選用不一樣的數據結構。若是是reduceByKey這種聚合類的shuffle算子,那麼會選用Map數據結構,一邊經過Map進行聚合,一邊寫入內存;若是是join這種普通的shuffle算子,那麼會選用Array數據結構,直接寫入內存。接着,每寫一條數據進入內存數據結構以後,就會判斷一下,是否達到了某個臨界閾值。若是達到臨界閾值的話,那麼就會嘗試將內存數據結構中的數據溢寫到磁盤,而後清空內存數據結構。

在溢寫到磁盤文件以前,會先根據key對內存數據結構中已有的數據進行排序。排序事後,會分批將數據寫入磁盤文件。默認的batch數量是10000條,也就是說,排序好的數據,會以每批1萬條數據的形式分批寫入磁盤文件。寫入磁盤文件是經過Java的BufferedOutputStream實現的。BufferedOutputStream是Java的緩衝輸出流,首先會將數據緩衝在內存中,當內存緩衝滿溢以後再一次寫入磁盤文件中,這樣能夠減小磁盤IO次數,提高性能。

一個task將全部數據寫入內存數據結構的過程當中,會發生屢次磁盤溢寫操做,也就會產生多個臨時文件。最後會將以前全部的臨時磁盤文件都進行合併,這就是merge過程,此時會將以前全部臨時磁盤文件中的數據讀取出來,而後依次寫入最終的磁盤文件之中。此外,因爲一個task就只對應一個磁盤文件,也就意味着該task爲下游stage的task準備的數據都在這一個文件中,所以還會單獨寫一份索引文件,其中標識了下游各個task的數據在文件中的start offset與end offset。

SortShuffleManager因爲有一個磁盤文件merge的過程,所以大大減小了文件數量。好比第一個stage有50個task,總共有10個Executor,每一個Executor執行5個task,而第二個stage有100個task。因爲每一個task最終只有一個磁盤文件,所以此時每一個Executor上只有5個磁盤文件,全部Executor只有50個磁盤文件。
輸入圖片說明

##bypass運行機制 下圖說明了bypass SortShuffleManager的原理。bypass運行機制的觸發條件以下:

  • shuffle map task數量小於spark.shuffle.sort.bypassMergeThreshold參數的值。
  • 不是聚合類的shuffle算子(好比reduceByKey)。

此時task會爲每一個下游task都建立一個臨時磁盤文件,並將數據按key進行hash而後根據key的hash值,將key寫入對應的磁盤文件之中。固然,寫入磁盤文件時也是先寫入內存緩衝,緩衝寫滿以後再溢寫到磁盤文件的。最後,一樣會將全部臨時磁盤文件都合併成一個磁盤文件,並建立一個單獨的索引文件。

該過程的磁盤寫機制其實跟未經優化的HashShuffleManager是如出一轍的,由於都要建立數量驚人的磁盤文件,只是在最後會作一個磁盤文件的合併而已。所以少許的最終磁盤文件,也讓該機制相對未經優化的HashShuffleManager來講,shuffle read的性能會更好。

而該機制與普通SortShuffleManager運行機制的不一樣在於:第一,磁盤寫機制不一樣;第二,不會進行排序。也就是說,啓用該機制的最大好處在於,shuffle write過程當中,不須要進行數據的排序操做,也就節省掉了這部分的性能開銷。
輸入圖片說明

#shuffle相關參數調優
如下是Shffule過程當中的一些主要參數,這裏詳細講解了各個參數的功能、默認值以及基於實踐經驗給出的調優建議。
spark.shuffle.file.buffer

  • 默認值:32k
  • 參數說明:該參數用於設置shuffle write task的BufferedOutputStream的buffer緩衝大小。將數據寫到磁盤文件以前,會先寫入buffer緩衝中,待緩衝寫滿以後,纔會溢寫到磁盤。
  • 調優建議:若是做業可用的內存資源較爲充足的話,能夠適當增長這個參數的大小(好比64k),從而減小shuffle write過程當中溢寫磁盤文件的次數,也就能夠減小磁盤IO次數,進而提高性能。在實踐中發現,合理調節該參數,性能會有1%~5%的提高。

spark.reducer.maxSizeInFlight

  • 默認值:48m
  • 參數說明:該參數用於設置shuffle read task的buffer緩衝大小,而這個buffer緩衝決定了每次可以拉取多少數據。
  • 調優建議:若是做業可用的內存資源較爲充足的話,能夠適當增長這個參數的大小(好比96m),從而減小拉取數據的次數,也就能夠減小網絡傳輸的次數,進而提高性能。在實踐中發現,合理調節該參數,性能會有1%~5%的提高。

spark.shuffle.io.maxRetries

  • 默認值:3
  • 參數說明:shuffle read task從shuffle write task所在節點拉取屬於本身的數據時,若是由於網絡異常致使拉取失敗,是會自動進行重試的。該參數就表明了能夠重試的最大次數。若是在指定次數以內拉取仍是沒有成功,就可能會致使做業執行失敗。
  • 調優建議:對於那些包含了特別耗時的shuffle操做的做業,建議增長重試最大次數(好比60次),以免因爲JVM的full gc或者網絡不穩定等因素致使的數據拉取失敗。在實踐中發現,對於針對超大數據量(數十億~上百億)的shuffle過程,調節該參數能夠大幅度提高穩定性。

spark.shuffle.io.retryWait

  • 默認值:5s
  • 參數說明:具體解釋同上,該參數表明了每次重試拉取數據的等待間隔,默認是5s。
  • 調優建議:建議加大間隔時長(好比60s),以增長shuffle操做的穩定性。

spark.shuffle.memoryFraction

  • 默認值:0.2
  • 參數說明:該參數表明了Executor內存中,分配給shuffle read task進行聚合操做的內存比例,默認是20%。
  • 調優建議:在資源參數調優中講解過這個參數。若是內存充足,並且不多使用持久化操做,建議調高這個比例,給shuffle read的聚合操做更多內存,以免因爲內存不足致使聚合過程當中頻繁讀寫磁盤。在實踐中發現,合理調節該參數能夠將性能提高10%左右。

spark.shuffle.manager

  • 默認值:sort
  • 參數說明:該參數用於設置ShuffleManager的類型。Spark 1.5之後,有三個可選項:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2之前的默認選項,可是Spark 1.2以及以後的版本默認都是SortShuffleManager了。tungsten-sort與sort相似,可是使用了tungsten計劃中的堆外內存管理機制,內存使用效率更高。
  • 調優建議:因爲SortShuffleManager默認會對數據進行排序,所以若是你的業務邏輯中須要該排序機制的話,則使用默認的SortShuffleManager就能夠;而若是你的業務邏輯不須要對數據進行排序,那麼建議參考後面的幾個參數調優,經過bypass機制或優化的HashShuffleManager來避免排序操做,同時提供較好的磁盤讀寫性能。這裏要注意的是,tungsten-sort要慎用,由於以前發現了一些相應的bug。

spark.shuffle.sort.bypassMergeThreshold

  • 默認值:200
  • 參數說明:當ShuffleManager爲SortShuffleManager時,若是shuffle read task的數量小於這個閾值(默認是200),則shuffle write過程當中不會進行排序操做,而是直接按照未經優化的HashShuffleManager的方式去寫數據,可是最後會將每一個task產生的全部臨時磁盤文件都合併成一個文件,並會建立單獨的索引文件。
  • 調優建議:當你使用SortShuffleManager時,若是的確不須要排序操做,那麼建議將這個參數調大一些,大於shuffle read task的數量。那麼此時就會自動啓用bypass機制,map-side就不會進行排序了,減小了排序的性能開銷。可是這種方式下,依然會產生大量的磁盤文件,所以shuffle write性能有待提升。

spark.shuffle.consolidateFiles

  • 默認值:false
  • 參數說明:若是使用HashShuffleManager,該參數有效。若是設置爲true,那麼就會開啓consolidate機制,會大幅度合併shuffle write的輸出文件,對於shuffle read task數量特別多的狀況下,這種方法能夠極大地減小磁盤IO開銷,提高性能。
  • 調優建議:若是的確不須要SortShuffleManager的排序機制,那麼除了使用bypass機制,還能夠嘗試將spark.shffle.manager參數手動指定爲hash,使用HashShuffleManager,同時開啓consolidate機制。在實踐中嘗試過,發現其性能比開啓了bypass機制的SortShuffleManager要高出10%~30%。