Spark技術內幕:Shuffle的性能調優

經過上面的架構和源碼實現的分析,不可貴出Shuffle是Spark Core比較複雜的模塊的結論。它也是很是影響性能的操做之一。所以,在這裏整理了會影響Shuffle性能的各項配置。儘管大部分的配置項在前文已經解釋過它的含義,因爲這些參數的確是很是重要,這裏算是作一個詳細的總結。apache

1.1.1  spark.shuffle.manager

前文也屢次提到過,Spark1.2.0官方支持兩種方式的Shuffle,即Hash Based Shuffle和Sort Based Shuffle。其中在Spark 1.0以前僅支持Hash Based Shuffle。Spark 1.1的時候引入了Sort Based Shuffle。Spark 1.2的默認Shuffle機制從Hash變成了Sort。若是須要Hash Based Shuffle,能夠將spark.shuffle.manager設置成「hash」便可。網絡

若是對性能有比較苛刻的要求,那麼就要理解這兩種不一樣的Shuffle機制的原理,結合具體的應用場景進行選擇。架構

Hash Based Shuffle,就是將數據根據Hash的結果,將各個Reducer partition的數據寫到單獨的文件中去,寫數據時不會有排序的操做。這個問題就是若是Reducer的partition比較多的時候,會產生大量的磁盤文件。這會帶來兩個問題:ide

1)       同時打開的文件比較多,那麼大量的文件句柄和寫操做分配的臨時內存會很是大,對於內存的使用和GC帶來不少的壓力。尤爲是在Sparkon YARN的模式下,Executor分配的內存廣泛比較小的時候,這個問題會更嚴重。性能

2)       從總體來看,這些文件帶來大量的隨機讀,讀性能可能會遇到瓶頸。測試

更加細節的討論能夠參見7.1節和7.6.6(嘗試去解決寫的文件太多的問題)。大數據

 

Sort Based Shuffle會根據實際狀況對數據採用不一樣的方式進行Sort。這個排序可能僅僅是按照Reducer的partition進行排序,保證同一個Shuffle Map Task的對應於不一樣的Reducer的partition的數據均可以寫到同一個數據文件,經過一個Offset來標記不一樣的Reducer partition的分界。所以一個Shuffle Map Task僅僅會生成一個數據文件(還有一個index索引文件),從而避免了Hash Based Shuffle文件數量過多的問題。優化

 

選擇Hash仍是Sort,取決於內存,排序和文件操做等因素的綜合影響。spa

對於不須要進行排序的Shuffle並且Shuffle產生的文件數量不是特別多,Hash Based Shuffle多是個更好的選擇;畢竟Sort Based Shuffle至少會按照Reducer的partition進行排序。netty

而Sort BasedShuffle的優點就在於Scalability,它的出現實際上很大程度上是解決Hash Based Shuffle的Scalability的問題。因爲Sort Based Shuffle還在不斷的演進中,所以Sort Based Shuffle的性能會獲得不斷的改善。

對選擇那種Shuffle,若是對於性能要求苛刻,最好仍是經過實際的場景中測試後再決定。不過選擇默認的Sort,能夠知足大部分的場景須要。

1.1.2  spark.shuffle.spill

這個參數的默認值是true,用於指定Shuffle過程當中若是內存中的數據超過閾值(參考spark.shuffle.memoryFraction的設置),那麼是否須要將部分數據臨時寫入外部存儲。若是設置爲false,那麼這個過程就會一直使用內存,會有Out Of Memory的風險。所以只有在肯定內存足夠使用時,才能夠將這個選項設置爲false。

對於Hash BasedShuffle的Shuffle Write過程當中使用的org.apache.spark.util.collection.AppendOnlyMap就是全內存的方式,而org.apache.spark.util.collection.ExternalAppendOnlyMap對org.apache.spark.util.collection.AppendOnlyMap有了進一步的封裝,在內存使用超過閾值時會將它spill到外部存儲,在最後的時候會對這些臨時文件進行Merge。

而Sort BasedShuffle Write使用到的org.apache.spark.util.collection.ExternalSorter也會有相似的spill。

而對於ShuffleRead,若是須要作aggregate,也可能在aggregate的過程當中將數據spill的外部存儲。

1.1.3  spark.shuffle.memoryFraction和spark.shuffle.safetyFraction

在啓用spark.shuffle.spill的狀況下,spark.shuffle.memoryFraction決定了當Shuffle過程當中使用的內存達到總內存多少比例的時候開始Spill。在Spark 1.2.0裏,這個值是0.2。經過這個參數能夠設置Shuffle過程佔用內存的大小,它直接影響了Spill的頻率和GC。

 若是Spill的頻率過高,那麼能夠適當的增長spark.shuffle.memoryFraction來增長Shuffle過程的可用內存數,進而減小Spill的頻率。固然爲了不OOM(內存溢出),可能就須要減小RDD cache所用的內存,即須要減小spark.storage.memoryFraction的值;可是減小RDD cache所用的內存有可能會帶來其餘的影響,所以須要綜合考量。

在Shuffle過程當中,Shuffle佔用的內存數是估計出來的,並非每次新增的數據項都會計算一次佔用的內存大小,這樣作是爲了下降時間開銷。可是估計也會有偏差,所以存在實際使用的內存數比估算值要大的狀況,所以參數 spark.shuffle.safetyFraction做爲一個保險係數下降實際Shuffle過程所須要的內存值,下降實際內存超出用戶配置值的風險。

1.1.4  spark.shuffle.sort.bypassMergeThreshold

這個配置的默認值是200,用於設置在Reducer的partition數目少於多少的時候,Sort Based Shuffle內部不使用Merge Sort的方式處理數據,而是直接將每一個partition寫入單獨的文件。這個方式和Hash Based的方式是相似的,區別就是在最後這些文件仍是會合併成一個單獨的文件,並經過一個index索引文件來標記不一樣partition的位置信息。從Reducer看來,數據文件和索引文件的格式和內部是否作過Merge Sort是徹底相同的。

這個能夠看作SortBased Shuffle在Shuffle量比較小的時候對於Hash Based Shuffle的一種折衷。固然了它和Hash Based Shuffle同樣,也存在同時打開文件過多致使內存佔用增長的問題。所以若是GC比較嚴重或者內存比較緊張,能夠適當的下降這個值。

1.1.5  spark.shuffle.blockTransferService

在Spark 1.2.0,這個配置的默認值是netty,而以前是nio。這個主要是用於在各個Executor之間傳輸Shuffle數據。Netty的實現更加簡潔,但實際上用戶不用太關心這個選項。除非是有特殊的需求,不然採用默認配置就能夠。

1.1.6  spark.shuffle.consolidateFiles

這個配置的默認配置是false。主要是爲了解決在Hash Based Shuffle過程當中產生過多文件的問題。若是配置選項爲true,那麼對於同一個Core上運行的Shuffle Map Task不會新產生一個Shuffle文件而是重用原來的。可是每一個Shuffle Map Task仍是須要產生下游Task數量的文件,所以它並無減小同時打開文件的數量。若是須要了解更加詳細的細節,能夠閱讀7.1節。

可是consolidateFiles的機制在Spark 0.8.1就引入了,到Spark 1.2.0仍是沒有穩定下來。從源碼實現的角度看,實現源碼是很是簡單的,可是因爲涉及本地的文件系統等限制,這個策略可能會帶來各類各樣的問題。因爲它並無減小同時打開文件的數量,所以不能減小由文件句柄帶來的內存消耗。若是面臨Shuffle的文件數量很是大,那麼是否打開這個選項最好仍是經過實際測試後再決定。

 

1.1.7  spark.shuffle.service.enabled

(false)

1.1.8  spark.shuffle.compress和 spark.shuffle.spill.compress

 這兩個參數的默認配置都是true。spark.shuffle.compress和spark.shuffle.spill.compress都是用來設置Shuffle過程當中是否對Shuffle數據進行壓縮;其中前者針對最終寫入本地文件系統的輸出文件,後者針對在處理過程須要spill到外部存儲的中間數據,後者針對最終的shuffle輸出文件。

 如何設置spark.shuffle.compress?

若是下游的Task經過網絡獲取上游Shuffle Map Task的結果的網絡IO成爲瓶頸,那麼就須要考慮將它設置爲true:經過壓縮數據來減小網絡IO。因爲上游Shuffle Map Task和下游的Task現階段是不會並行處理的,即上游Shuffle Map Task處理完成,而後下游的Task纔會開始執行。所以若是須要壓縮的時間消耗就是Shuffle MapTask壓縮數據的時間 + 網絡傳輸的時間 + 下游Task解壓的時間;而不須要壓縮的時間消耗僅僅是網絡傳輸的時間。所以須要評估壓縮解壓時間帶來的時間消耗和由於數據壓縮帶來的時間節省。若是網絡成爲瓶頸,好比集羣廣泛使用的是千兆網絡,那麼可能將這個選項設置爲true是合理的;若是計算是CPU密集型的,那麼可能將這個選項設置爲false才更好。

 如何設置spark.shuffle.spill.compress?

若是設置爲true,表明處理的中間結果在spill到本地硬盤時都會進行壓縮,在將中間結果取回進行merge的時候,要進行解壓。所以要綜合考慮CPU因爲引入壓縮解壓的消耗時間和Disk IO由於壓縮帶來的節省時間的比較。在Disk IO成爲瓶頸的場景下,這個被設置爲true可能比較合適;若是本地硬盤是SSD,那麼這個設置爲false可能比較合適。

1.1.9  spark.reducer.maxMbInFlight

這個參數用於限制一個ReducerTask向其餘的Executor請求Shuffle數據時所佔用的最大內存數,尤爲是若是網卡是千兆和千兆如下的網卡時。默認值是48MB。設置這個值須要中和考慮網卡帶寬和內存。

Spark性能調優之Shuffle調優

   • Spark底層shuffle的傳輸方式是使用netty傳輸,netty在進行網絡傳輸的過程會申請堆外內存(netty是零拷貝),因此使用了堆外內存

 

    shuffle過程當中常出現的問題

常見問題一:reduce oom?

    問題緣由:

       reduce task 去map端獲取數據,reduce一邊拉取數據一邊聚合,reduce端有一塊聚合內存(executor memory * 0.2),也就是這塊內存不夠

    解決辦法:

                          1.增長reduce 聚合操做的內存的比例  

                          2.增長Executor memory的大小  --executor-memory 5G

                          3.減小reduce task每次拉取的數據量 設置spak.reducer.maxSizeInFlight 24m, 拉取的次數就多了,所以創建鏈接的次數增多,有可能會鏈接不上(正好遇上map task端進行GC)

 

常見問題二:錯誤描述--shuffle file cannot find   or   executor lost

   • 什

麼時候須要調節Executor的堆外內存大小?

       • shuffle file cannot find (DAGScheduler,resubmitting task)

       • executor lost

       • task lost

       • out of memory

    問題緣由:

        1.map task所運行的executor內存不足,致使executor

掛掉了,executor裏面的BlockManager就掛掉了,致使ConnectionManager不能用,也就沒法創建鏈接,從而不能拉取數據

        2.executor並無掛掉

            2.1 BlockManage之間的鏈接失敗(map task所運行的executor正在GC)

            2.2創建鏈接成功,map task所運行的executor正在GC

        3.reduce task向Driver中的MapOutputTracker獲取shuffle file位置的時候出現了問題

    解決辦法:

        1.增大Executor內存(即堆內內存) ,申請的堆外內存也會隨之增長--executor-memory 5G

        2.增大堆外內存 --conf spark.yarn.executor.memoryoverhead 2048M

   --conf spark.executor.memoryoverhead 2048M

 (默認申請的堆外內存是Executor內存的10%,真正處理大數據的時候,這裏都會出現問題,致使spark做業反覆崩潰,沒法運行;此時就會去調節這個參數,到至少1G(1024M),甚至說2G、4G)

)

 

buffer 32k    //緩衝區默認大小爲32k  SparkConf.set("spark.shuffle.file.buffer","64k")

reduce 48M //reduce端拉取數據的時候,默認大小是48M 

SparkConf.set("spark.reducer.maxSizeInFlight","96M")

    

spark.shuffle.file.buffer

默認值:32k

參數說明:該參數用於設置shuffle write task的BufferedOutputStream的buffer緩衝大小。將數據寫到磁盤文件以前,會先寫入buffer緩衝中,待緩衝寫滿以後,纔會溢寫到磁盤。

調優建議:若是做業可用的內存資源較爲充足的話,能夠適當增長這個參數的大小(好比64k),從而減小shuffle write過程當中溢寫磁盤文件的次數,也就能夠減小磁盤IO次數,進而提高性能。在實踐中發現,合理調節該參數,性能會有1%~5%的提高。

 

spark.reducer.maxSizeInFlight

默認值:48m

參數說明:該參數用於設置shuffle read task的buffer緩衝大小,而這個buffer緩衝決定了每次可以拉取多少數據。

調優建議:若是做業可用的內存資源較爲充足的話,能夠適當增長這個參數的大小(好比96m),從而減小拉取數據的次數,也就能夠減小網絡傳輸的次數,進而提高性能。在實踐中發現,合理調節該參數,性能會有1%~5%的提高。

錯誤:reduce oom

reduce task去map拉數據,reduce 一邊拉數據一邊聚合   reduce段有一塊聚合內存(executor memory * 0.2)

解決辦法:一、增長reduce 聚合的內存的比例  設置spark.shuffle.memoryFraction

二、 增長executor memory的大小  --executor-memory 5G

三、減小reduce task每次拉取的數據量  設置spark.reducer.maxSizeInFlight  24m

 

spark.shuffle.io.maxRetries

默認值:3

參數說明:shuffle read task從shuffle write task所在節點拉取屬於本身的數據時,若是由於網絡異常致使拉取失敗,是會自動進行重試的。該參數就表明了能夠重試的最大次數。若是在指定次數以內拉取仍是沒有成功,就可能會致使做業執行失敗。

調優建議:對於那些包含了特別耗時的shuffle操做的做業,建議增長重試最大次數(好比60次),以免因爲JVM的full gc或者網絡不穩定等因素致使的數據拉取失敗。在實踐中發現,對於針對超大數據量(數十億~上百億)的shuffle過程,調節該參數能夠大幅度提高穩定性。

shuffle file not find    taskScheduler不負責重試task,由DAGScheduler負責重試stage

 

spark.shuffle.io.retryWait

默認值:5s

參數說明:具體解釋同上,該參數表明了每次重試拉取數據的等待間隔,默認是5s。

調優建議:建議加大間隔時長(好比60s),以增長shuffle操做的穩定性。

 

spark.shuffle.memoryFraction

默認值:0.2

參數說明:該參數表明了Executor內存中,分配給shuffle read task進行聚合操做的內存比例,默認是20%。

調優建議:在資源參數調優中講解過這個參數。若是內存充足,並且不多使用持久化操做,建議調高這個比例,給shuffle read的聚合操做更多內存,以免因爲內存不足致使聚合過程當中頻繁讀寫磁盤。在實踐中發現,合理調節該參數能夠將性能提高10%左右。

 

spark.shuffle.manager

默認值:sort

參數說明:該參數用於設置ShuffleManager的類型。Spark 1.5之後,有三個可選項:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2之前的默認選項,可是Spark 1.2以及以後的版本默認都是SortShuffleManager了。tungsten-sort與sort相似,可是使用了tungsten計劃中的堆外內存管理機制,內存使用效率更高。

調優建議:因爲SortShuffleManager默認會對數據進行排序,所以若是你的業務邏輯中須要該排序機制的話,則使用默認的SortShuffleManager就能夠;而若是你的業務邏輯不須要對數據進行排序,那麼建議參考後面的幾個參數調優,經過bypass機制或優化的HashShuffleManager來避免排序操做,同時提供較好的磁盤讀寫性能。這裏要注意的是,tungsten-sort要慎用,由於以前發現了一些相應的bug。

 

spark.shuffle.sort.bypassMergeThreshold

默認值:200

參數說明:當ShuffleManager爲SortShuffleManager時,若是shuffle read task的數量小於這個閾值(默認是200),則shuffle write過程當中不會進行排序操做,而是直接按照未經優化的HashShuffleManager的方式去寫數據,可是最後會將每一個task產生的全部臨時磁盤文件都合併成一個文件,並會建立單獨的索引文件。

調優建議:當你使用SortShuffleManager時,若是的確不須要排序操做,那麼建議將這個參數調大一些,大於shuffle read task的數量。那麼此時就會自動啓用bypass機制,map-side就不會進行排序了,減小了排序的性能開銷。可是這種方式下,依然會產生大量的磁盤文件,所以shuffle write性能有待提升。

 

spark.shuffle.consolidateFiles

默認值:false

參數說明:若是使用HashShuffleManager,該參數有效。若是設置爲true,那麼就會開啓consolidate機制,會大幅度合併shuffle write的輸出文件,對於shuffle read task數量特別多的狀況下,這種方法能夠極大地減小磁盤IO開銷,提高性能。

調優建議:若是的確不須要SortShuffleManager的排序機制,那麼除了使用bypass機制,還能夠嘗試將spark.shffle.manager參數手動指定爲hash,使用HashShuffleManager,同時開啓consolidate機制。在實踐中嘗試過,發現其性能比開啓了bypass機制的SortShuffleManager要高出10%~30%。

相關文章
相關標籤/搜索