spark 調優概述

分爲幾個部分:
開發調優、資源調優、數據傾斜調優、shuffle調優html


開發調優:

主要包括這幾個方面 RDD lineage設計、算子的合理使用、特殊操做的優化等java

避免建立重複的RDD,儘量複用同一個RDD 一個RDD包含另一個RDD,對屢次使用的RDD進行持久化 內存(序列化),磁盤(序列化)
儘可能避免使用shuffle類算子

shuffle過程當中,各個節點上的相同key都會先寫入本地磁盤文件中,而後其餘節點須要經過網絡傳輸拉取各個節點上的磁盤文件中的相同key。並且相同key都拉取到同一個節點進行聚合操做時,還有可能會由於一個節點上處理的key過多,致使內存不夠存放,進而溢寫到磁盤文件中。所以在shuffle過程當中,可能會發生大量的磁盤文件讀寫的IO操做,以及數據的網絡傳輸操做。磁盤IO和網絡數據傳輸也是shuffle性能較差的主要緣由。數據庫

使用高性能的函數算子
  • 使用map-side預聚合的shuffle操做
    所謂的map-side預聚合,說的是在每一個節點本地對相同的key進行一次聚合操做,相似於MapReduce中的本地combiner。map-side預聚合以後,每一個節點本地就只會有一條相同的key,由於多條相同的key都被聚合起來了。其餘節點在拉取全部節點上的相同key時,就會大大減小須要拉取的數據數量,從而也就減小了磁盤IO以及網絡傳輸開銷。數組

  • 使用reduceByKey/aggregateByKey替代groupByKey
    由於reduceByKey和aggregateByKey算子都會使用用戶自定義的函數對每一個節點本地的相同key進行預聚合。而groupByKey算子是不會進行預聚合的,全量的數據會在集羣的各個節點之間分發和傳輸,性能相對來講比較差。網絡

  • 使用mapPartitions替代普通map
    mapPartitions類的算子,一次函數調用會處理一個partition全部的數據,而不是一次函數調用處理一條,性能相對來講會高一些。可是有的時候,使用mapPartitions會出現OOM(內存溢出)的問題。由於單次函數調用就要處理掉一個partition全部的數據,若是內存不夠,垃圾回收時是沒法回收掉太多對象的,極可能出現OOM異常。因此使用這類操做時要慎重!數據結構

  • 使用foreachPartitions替代foreach
    原理相似於「使用mapPartitions替代map」,也是一次函數調用處理一個partition的全部數據,而不是一次函數調用處理一條數據。在實踐中發現,foreachPartitions類的算子,對性能的提高仍是頗有幫助的。好比在foreach函數中,將RDD中全部數據寫MySQL,那麼若是是普通的foreach算子,就會一條數據一條數據地寫,每次函數調用可能就會建立一個數據庫鏈接,此時就勢必會頻繁地建立和銷燬數據庫鏈接,性能是很是低下;可是若是用foreachPartitions算子一次性處理一個partition的數據,那麼對於每一個partition,只要建立一個數據庫鏈接便可,而後執行批量插入操做,此時性能是比較高的。多線程

  • 使用repartitionAndSortWithinPartitions替代repartition與sort類操做併發

使用Kryo優化序列化性能 比默認的java序列化性能高
優化數據結構
  • 儘可能使用字符串替代對象,使用原始類型(好比Int、Long)替代字符串,使用數組替代集合類型,這樣儘量地減小內存佔用,從而下降GC頻率,提高性能。

資源調優

運行時的關鍵點
Spark是根據shuffle類算子來進行stage的劃分。若是咱們的代碼中執行了某個shuffle類算子(好比reduceByKey、join等),那麼就會在該算子處,劃分出一個stage界限來。能夠大體理解爲,shuffle算子執行以前的代碼會被劃分爲一個stage,shuffle算子執行以及以後的代碼會被劃分爲下一個stage。所以一個stage剛開始執行的時候,它的每一個task可能都會從上一個stage的task所在的節點,去經過網絡傳輸拉取須要本身處理的全部key,而後對拉取到的全部相同的key使用咱們本身編寫的算子函數執行聚合操做(好比reduceByKey()算子接收的函數)。這個過程就是shuffle。ide

所以Executor的內存主要分爲三塊:
第一塊是讓task執行咱們本身編寫的代碼時使用,默認是佔Executor總內存的20%;
第二塊是讓task經過shuffle過程拉取了上一個stage的task的輸出後,進行聚合等操做時使用,默認也是佔Executor總內存的20%;
第三塊是讓RDD持久化時使用,默認佔Executor總內存的60%。函數

task的執行速度是跟每一個Executor進程的CPU core數量有直接關係的。一個CPU core同一時間只能執行一個線程。而每一個Executor進程上分配到的多個task,都是以每一個task一條線程的方式,多線程併發運行的。若是CPU core數量比較充足,並且分配到的task數量比較合理,那麼一般來講,能夠比較快速和高效地執行完這些task線程。

瞭解完了Spark做業運行的基本原理以後,對資源相關的參數就容易理解了。

  • num-executors
    設置Spark做業總共要用多少個Executor進程來執行。建議:50~100個左右的Executor進程

  • executor-memory
    設置每一個Executor進程的內存。檢建議:每一個Executor進程的內存設置4G~8G較爲合適,num-executors乘以executor-memory,是不能超過隊列的最大內存量的,通常1/3~1/2

  • executor-cores
    設置每一個Executor進程的CPU core數量。建議:Executor的CPU core數量設置爲2~4個較爲合適

  • driver-memory
    要使用collect算子將RDD的數據所有拉取到Driver上進行處理,那麼必須確保Driver的內存足夠大,不然會出現OOM內存溢出的問題

  • spark.default.parallelism

==該參數用於設置每一個stage的默認task數量。這個參數極爲重要,若是不設置可能會直接影響你的Spark做業性能。==
Spark做業的默認task數量爲500~1000個較爲合適。若是不設置這個參數,那麼此時就會致使Spark本身根據底層HDFS的block數量來設置task的數量,默認是一個HDFS block對應一個task。一般來講,Spark默認設置的數量是偏少的(好比就幾十個task),若是task數量偏少的話,就會致使你前面設置好的Executor的參數都前功盡棄。試想一下,不管你的Executor進程有多少個,內存和CPU core有多大,可是task只有1個或者10個,那麼90%的Executor進程可能根本就沒有task執行,也就是白白浪費了資源!所以Spark官網建議的設置原則是,設置該參數爲num-executors * executor-cores的2~3倍較爲合適,好比Executor的總CPU core數量爲300個,那麼設置1000個task是能夠的,此時能夠充分地利用Spark集羣的資源。

  • spark.storage.memoryFraction
    設置RDD持久化數據在Executor內存中能佔的比例,默認是0.6。當操做中有較多的RDD持久化操做,該參數的值能夠適當提升一些,少的話就相對少一點

  • spark.shuffle.memoryFraction
    設置shuffle過程當中一個task拉取到上個stage的task的輸出後,進行聚合操做時可以使用的Executor內存的比例,默認是0.2。也就是說,Executor默認只有20%的內存用來進行該操做。shuffle操做在進行聚合時,若是發現使用的內存超出了這個20%的限制,那麼多餘的數據就會溢寫到磁盤文件中去,此時就會極大地下降性能。

如下是一份spark-submit命令的示例,你們能夠參考一下,並根據本身的實際狀況進行調節:
./bin/spark-submit \
  --master yarn-cluster \
  --num-executors 100 \
  --executor-memory 6G \
  --executor-cores 4 \
  --driver-memory 1G \
  --conf spark.default.parallelism=1000 \
  --conf spark.storage.memoryFraction=0.5 \
  --conf spark.shuffle.memoryFraction=0.3 \

數據傾斜調優

數據傾斜發生的原理:
在進行shuffle的時候,必須將各個節點上相同的key拉取到某個節點上的一個task來進行處理,好比按照key進行聚合或join等操做。此時若是某個key對應的數據量特別大的話,就會發生數據傾斜。好比大部分key對應10條數據,可是個別key卻對應了100萬條數據,那麼大部分task可能就只會分配到10條數據,而後1秒鐘就運行完了;可是個別task可能分配到了100萬數據,要運行一兩個小時。所以,整個Spark做業的運行進度是由運行時間最長的那個task決定的。

如何定位致使數據傾斜的代碼和數據分部狀況:

數據傾斜只會發生在shuffle過程當中。這裏給你們羅列一些經常使用的而且可能會觸發shuffle操做的算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。出現數據傾斜時,可能就是你的代碼中使用了這些算子中的某一個所致使的。
咱們均可以在Spark Web UI上深刻看一下當前這個stage各個task分配的數據量,從而進一步肯定是否是task分配的數據不均勻致使了數據傾斜

  1. 若是是Spark SQL中的group by、join語句致使的數據傾斜,那麼就查詢一下SQL中使用的表的key分佈狀況。
  2. 若是是對Spark RDD執行shuffle算子致使的數據傾斜,那麼能夠在Spark做業中加入查看key分佈的代碼,好比RDD.countByKey()。而後對統計出來的各個key出現的次數,collect/take到客戶端打印一下,就能夠看到key的分佈狀況。

提升shuffle操做的並行度(優先考慮)

增長shuffle read task的數量,可讓本來分配給一個task的多個key分配給多個task,從而讓每一個task處理比原來更少的數據。舉例來講,若是本來有5個key,每一個key對應10條數據,這5個key都是分配給一個task的,那麼這個task就要處理50條數據。而增長了shuffle read task之後,每一個task就分配到一個key,即每一個task就處理10條數據,那麼天然每一個task的執行時間都會變短了

該方案一般沒法完全解決數據傾斜,由於若是出現一些極端狀況,好比某個key對應的數據量有100萬,那麼不管你的task數量增長到多少,這個對應着100萬數據的key確定仍是會分配到一個task中去處理,所以註定仍是會發生數據傾斜的。

image

兩階段聚合(局部聚合+全局聚合)

這個方案的核心實現思路就是進行兩階段聚合。第一次是局部聚合,先給每一個key都打上一個隨機數,好比10之內的隨機數,此時原先同樣的key就變成不同的了,好比(hello, 1) (hello, 1) (hello, 1) (hello, 1),就會變成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着對打上隨機數後的數據,執行reduceByKey等聚合操做,進行局部聚合,那麼局部聚合結果,就會變成了(1_hello, 2) (2_hello, 2)。而後將各個key的前綴給去掉,就會變成(hello,2)(hello,2),再次進行全局聚合操做,就能夠獲得最終結果了,==僅僅適用於聚合類的shuffle操做==。

image

將reduce join轉爲map join(廣播+map)

普通的join是會走shuffle過程的,而一旦shuffle,就至關於會將相同key的數據拉取到一個shuffle read task中再進行join,此時就是reduce join。可是若是一個RDD是比較小的,則能夠採用廣播小RDD全量數據+map算子來實現與join一樣的效果,也就是map join,此時就不會發生shuffle操做,也就不會發生數據傾斜。具體原理以下圖所示
適用場景較少,由於這個方案只適用於一個大表和一個小表的狀況。

image

採樣傾斜key並分拆join操做

對於join致使的數據傾斜,若是隻是某幾個key致使了傾斜,能夠將少數幾個key分拆成獨立RDD,並附加隨機前綴打散成n份去進行join,此時這幾個key對應的數據就不會集中在少數幾個task上,而是分散到多個task進行join了

==若是致使傾斜的key特別多的話,好比成千上萬個key都致使數據傾斜,那麼這種方式也不適合==

image

使用隨機前綴和擴容RDD進行join

將原先同樣的key經過附加隨機前綴變成不同的key,而後就能夠將這些處理後的「不一樣key」分散到多個task中去處理,而不是讓一個task處理大量的相同key。該方案上一個的不一樣之處就在於,上一種方案是儘可能只對少數傾斜key對應的數據進行特殊處理,因爲處理過程須要擴容RDD,所以上一種方案擴容RDD後對內存的佔用並不大;而這一種方案是針對有大量傾斜key的狀況,無法將部分key拆分出來進行單獨處理,所以只能對整個RDD進行數據擴容,對內存資源要求很高。


shuffle調優

調優概述

大多數Spark做業的性能主要就是消耗在了shuffle環節,由於該環節包含了大量的磁盤IO、序列化、網絡數據傳輸等操做。所以,若是要讓做業的性能更上一層樓,就有必要對shuffle過程進行調優。

ShuffleManager發展概述

負責shuffle過程的執行、計算和處理的組件主要就是ShuffleManager,也即shuffle管理器。而隨着Spark的版本的發展,ShuffleManager也在不斷迭代,變得愈來愈先進。

在Spark 1.2之前,默認的shuffle計算引擎是HashShuffleManager。該ShuffleManager有着一個很是嚴重的弊端,就是會產生大量的中間磁盤文件,進而由大量的磁盤IO操做影響了性能。

所以在Spark 1.2之後的版本中,默認的ShuffleManager改爲了SortShuffleManager。SortShuffleManager相較於HashShuffleManager來講,有了必定的改進。主要就在於,每一個Task在進行shuffle操做時,雖然也會產生較多的臨時磁盤文件,可是最後會將全部的臨時文件合併(merge)成一個磁盤文件,所以每一個Task就只有一個磁盤文件。在下一個stage的shuffle read task拉取本身的數據時,只要根據索引讀取每一個磁盤文件中的部分數據便可。

HashShuffleManager運行原理

未經優化的HashShuffleManager

shuffle write階段
一個stage結束計算以後,爲了下一個stage能夠執行shuffle類的算子,而將每一個task處理的數據按key進行「分類」,將相同key都寫入(經過一個內存緩衝buffer)同一個磁盤文件中,而每個磁盤文件都只屬於下游stage的一個task。

shuffle read
一般就是一個stage剛開始時要作的事情。此時該stage的每個task就須要將上一個stage的計算結果中的全部相同key,從各個節點上經過網絡都拉取write階段建立的屬於本身的那一個磁盤文件到本身所在的節點上,而後進行key的聚合或鏈接等操做。

優化後的HashShuffleManager

spark.shuffle.consolidateFiles默認爲false,若是設置爲ture開啓優化機制。 write過程當中,task就不是爲下游stage的每一個task建立一個磁盤文件了。==此時會出現shuffleFileGroup的概念==,每一個shuffleFileGroup會對應一批磁盤文件,磁盤文件的數量與下游stage的task數量是相同的。一個Executor上有多少個CPU core,就能夠並行執行多少個task。而第一批並行執行的每一個task都會建立一個shuffleFileGroup,並將數據寫入對應的磁盤文件內。
當Executor的CPU core執行完一批task,接着執行下一批task時,下一批task就會複用以前已有的shuffleFileGroup,包括其中的磁盤文件。

SortShuffleManager運行原理

在該模式下,數據會先寫入一個內存數據結構中,此時根據不一樣的shuffle算子,可能選用不一樣的數據結構。若是是reduceByKey這種聚合類的shuffle算子,那麼會選用Map數據結構,一邊經過Map進行聚合,一邊寫入內存;若是是join這種普通的shuffle算子,那麼會選用Array數據結構,直接寫入內存。接着,每寫一條數據進入內存數據結構以後,就會判斷一下,是否達到了某個臨界閾值。若是達到臨界閾值的話,那麼就會嘗試將內存數據結構中的數據溢寫到磁盤,而後清空內存數據結構。
在溢寫到磁盤文件以前,會先根據key對內存數據結構中已有的數據進行排序。排序事後,會分批將數據寫入磁盤文件
會產生多個臨時文件。最後會將以前全部的臨時磁盤文件都進行合併,這就是merge過程,此時會將以前全部臨時磁盤文件中的數據讀取出來,而後依次寫入最終的磁盤文件之中。

SortShuffleManager因爲有一個磁盤文件merge的過程,所以大大減小了文件數量。好比第一個stage有50個task,總共有10個Executor,每一個Executor執行5個task,而第二個stage有100個task。因爲每一個task最終只有一個磁盤文件,所以此時每一個Executor上只有5個磁盤文件,全部Executor只有50個磁盤文件。

image

bypass運行機制
下圖說明了bypass SortShuffleManager的原理。bypass運行機制的觸發條件以下:

shuffle map task數量小於spark.shuffle.sort.bypassMergeThreshold參數的值。
不是聚合類的shuffle算子(好比reduceByKey)。
此時task會爲每一個下游task都建立一個臨時磁盤文件,並將數據按key進行hash而後根據key的hash值,將key寫入對應的磁盤文件之中。固然,寫入磁盤文件時也是先寫入內存緩衝,緩衝寫滿以後再溢寫到磁盤文件的。最後,一樣會將全部臨時磁盤文件都合併成一個磁盤文件,並建立一個單獨的索引文件。
該過程的磁盤寫機制其實跟未經優化的HashShuffleManager是如出一轍的,由於都要建立數量驚人的磁盤文件,只是在最後會作一個磁盤文件的合併而已。所以少許的最終磁盤文件,也讓該機制相對未經優化的HashShuffleManager來講,shuffle read的性能會更好。

而該機制與普通SortShuffleManager運行機制的不一樣在於:第一,磁盤寫機制不一樣;第二,不會進行排序。也就是說,啓用該機制的最大好處在於,shuffle write過程當中,不須要進行數據的排序操做,也就節省掉了這部分的性能開銷。

image

shuffle相關參數調優
  • spark.shuffle.file.buffer
    默認32K ,shuffle write task緩衝大小,適當跳大(64K)能夠減小shuffle write過程當中溢寫磁盤文件的次數

  • spark.reducer.maxSizeInFlight
    默認48M,shuffle read task的buffer緩衝大小

  • spark.shuffle.io.maxRetries
    默認3,huffle read task從shuffle write task拉取數據若是失敗的重試次數,數據量特別大的時候能夠增長,調節該參數能夠大幅度提高穩定性。

  • spark.shuffle.io.retryWait
    默認5S,每次重試拉取數據的等待間隔,增大(60S),提升穩定性

  • spark.shuffle.memoryFraction
    默認0.2,Executor內存中,分配給shuffle read task進行聚合操做的內存比例

  • spark.shuffle.manager
    設置ShuffleManager的類型,可選項:hash、sort和tungsten-sort。業務若是須要排序,就用sort,不須要排序就用bypass機制,避免排序。

  • spark.shuffle.sort.bypassMergeThreshold
    默認200,當ShuffleManager爲SortShuffleManager時,若是shuffle read task的數量小於這個閾值(默認是200),則shuffle write過程當中不會進行排序操做

spark.shuffle.consolidateFiles
默認false,設置爲true,那麼就會開啓consolidate機制,會大幅度合併shuffle write的輸出文件


原本絕大部分觀點來自美團點評(https://tech.meituan.com/spark-tuning-basic.html),通過本身的實踐發如今開發,資源,數據傾斜方面的調優能夠有着明顯的效果,而shuffle調優的結果並不明顯。因此你們在調優的過程當中必定要注意優先級。

相關文章
相關標籤/搜索