Spark 技術調優,別告訴我你不會?

目錄java

 

1、性能調優sql

2、jvm調優 數據庫

3、shuffle調優(優先使用前面兩點,實測有效)apache

4、算子調優緩存

5、troubleshooting網絡

6、數據傾斜解決方案架構


1、性能調優

1.1 配更多資源:app

    --num-executors 3 \  配置executor的數量--driver-memory 100m \  配置driver的內存(影響不大) --executor-memory 100m \  配置每一個executor的內存大小 --executor-cores 3 \  配置每一個executor的cpu core數量運維

    num-executors、executor-cores可提高任務的並行度;driver-memory、executor-memory增長內存,可緩存更多的數據減小磁盤IO,減小suffle時reduce端磁盤IO,可下降堆內存滿了頻繁GC,避免頻繁垃圾回收jvm

1.2 調節並行度:

    一、task數量,至少設置成與Spark application的總cpu core數量相同(最理想狀況,好比總共150個cpu core,分配了150個task,一塊兒運行,差很少同一時間運行完畢)

    二、官方是推薦,task數量,設置成spark application總cpu core數量的2~3倍,好比150個cpu core,基本要設置task數量爲300~500;

    如何設置一個Spark Application的並行度? SparkConf conf = new SparkConf()  .set("spark.default.parallelism", "500")

1.3 重構RDD架構以及RDD持久化:

    第一,RDD架構重構與優化 儘可能去複用RDD,差很少的RDD,能夠抽取稱爲一個共同的RDD,供後面的RDD計算時,反覆使用。

    第二,公共RDD必定要實現持久化,對於要屢次計算和使用的公共RDD,必定要進行持久化。

    第三,持久化,是能夠進行序列化的。優化使用memory,再考慮磁盤

    第四,爲了數據的高可靠性,並且內存充足,可使用雙副本機制,進行持久化;持久化後的一個副本,由於機器宕機了,副本丟了,就仍是得從新計算一次;這種方式,僅僅針對你的內存資源極度充足

1.4 廣播大變量

    廣播變量的好處,不是每一個task一份變量副本,而是變成每一個節點的executor才一份副本。這樣的話,就可讓變量產生的副本大大減小。減小網絡開銷、內存佔用、磁盤IO、GC垃圾回收的次數

1.5 使用Kryo序列化set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

    Spark內部是使用Java的序列化機制,ObjectOutputStream / ObjectInputStream,對象輸入輸出流機制,來進行序列化

    Spark支持使用Kryo序列化機制。Kryo序列化機制,比默認的Java序列化機制,速度要快,序列化後的數據要更小,大概是Java序列化機制的1/10。 因此Kryo序列化優化之後,可讓網絡傳輸的數據變少;在集羣中耗費的內存資源大大減小。

1.6 使用fastutil優化數據格式

    fastutil儘可能提供了在任何場景下都是速度最快的集合類庫

1.7 調節數據本地化等待時長

    BlockManager > PROCESS_LOCAL > NODE_LOCAL > NO_PREF > RACK_LOCAL > ANY    

    觀察日誌,spark做業的運行日誌,推薦你們在測試的時候,先用client模式,在本地就直接能夠看到比較全的日誌。日誌裏面會顯示,starting task。。。,PROCESS LOCAL、NODE LOCAL 觀察大部分task的數據本地化級別

    new SparkConf()  .set("spark.locality.wait", "10") 

BlockManager

存放位置

詳細說明

PROCESS_LOCAL

進程本地化 

task要計算的數據在同一個Executor中

NODE_LOCAL

節點本地化  

速度比PROCESS_LOCAL稍慢,由於數據須要在不一樣進程之間傳遞或從文件中讀取

NO_PREF

沒有最佳位置這一說

數據從哪裏訪問都同樣快,不須要位置優先。好比SparkSQL讀取MySQL中的數據

RACK_LOCAL

本架本地化

數據在同一機架的不一樣節點。須要經過網絡傳輸數據及文件IO,比NODE_LOCAL慢

ANY

跨機架

數據在非同一機架的網絡上,速度最慢


2、jvm調優 

2.1 JVM調優之下降cache操做的內存佔比

    JVM調優的第一個點:下降cache操做的內存佔比 spark中,堆內存又被劃分紅了兩塊兒,一起是專門用來給RDD的cache、persist操做進行RDD數據緩存用的;

    另一塊兒,就是咱們剛纔所說的,用來給spark算子函數的運行使用的,存放函數中本身建立的對象

    spark.storage.memoryFraction,0.6 -> 0.5 -> 0.4 -> 0.2

2.2 JVM調優之調節executor堆外內存與鏈接等待時長

    --conf spark.yarn.executor.memoryOverhead=2048(最小300m),通常會調大些

    --conf spark.core.connection.ack.wait.timeout=300(某某file,not found。file lost。頗有多是有那份數據的executor在jvm gc。因此拉取數據的時候,創建不了鏈接。而後超過默認60s之後,直接宣告失敗。)


3、shuffle調優(優先使用前面兩點,實測有效)

3.1 Shuffle調優之合併map端輸出文件

    new SparkConf().set("spark.shuffle.consolidateFiles", "true") 開啓shuffle map端輸出文件合併的機制;默認狀況下,是不開啓的,就是會發生如上所述的大量map端輸出文件的操做,嚴重影響性能。

    沒有開啓的話,每個task都會建立一份文件,先後有多少個task建立就會有多少個文件生成;若是開啓的話,就會生成task的並行度 * executor的數量 份文件

3.2 Shuffle調優之調節map端內存緩衝與reduce端內存佔比

    spark.shuffle.file.buffer,默認32k,能夠調成64K      // spark.shuffle.file.buffer,每次擴大一倍,而後看看效果,64,128;spark.shuffle.memoryFraction,每次提升0.1,看看效果。

    spark.shuffle.memoryFraction,默認0.2,能夠調成0.3   // 若是數據量比較大,reduce task拉取過來的數據不少,那麼就會頻繁發生reduce端聚合內存不夠用,頻繁發生spill操做,溢寫到磁盤上去。

3.3 Shuffle調優之HashShuffleManager與SortShuffleManager

  1. 在生產環境中,不建議你們貿然使用第三點和第四點:
  2. 若是你不想要你的數據在shuffle時排序,那麼就本身設置一下,用hash shuffle manager。 
  3. 若是你的確是須要你的數據在shuffle時進行排序的,那麼就默認不用動,默認就是sort shuffle manager;或者是什麼?若是你壓根兒不care是否排序這個事兒,那麼就默認讓他就是sort的。調節一些其餘的參數(consolidation機制)。(80%,都是用這種) 
    spark.shuffle.manager:hash、sort、tungsten-sort 
    new SparkConf().set("spark.shuffle.manager", "hash") 
    new SparkConf().set("spark.shuffle.manager", "tungsten-sort") 
    // 默認就是,new SparkConf().set("spark.shuffle.manager", "sort") 
    new SparkConf().set("spark.shuffle.sort.bypassMergeThreshold", "550")

4、算子調優

4.1 MapPartitions提高Map類操做性能

    何時比較適合用MapPartitions系列操做,就是說,數據量不是特別大的時候,均可以用這種MapPartitions系列操做,性能仍是很是不錯的,是有提高的。好比原來是15分鐘,(曾經有一次性能調優),12分鐘。10分鐘->9分鐘。

4.2 filter事後使用coalesce減小分區數量

    主要就是用於在filter操做以後,針對每一個partition的數據量各不相同的狀況,來壓縮partition的數量。減小partition的數量,並且讓每一個partition的數據量都儘可能均勻緊湊。 從而便於後面的task進行計算操做,可以必定程度的提高性能。

4.3 使用foreachPartition優化寫數據庫性能

    在實際生產環境中,清一色,都是使用foreachPartition操做;可是有個問題,跟mapPartitions操做同樣,若是一個partition的數量真的特別特別大,好比真的是100萬,那基本上就不太靠譜了。 一會兒進來,頗有可能會發生OOM,內存溢出的問題。

4.4 使用repartition解決Spark SQL低並行度的性能問題

    repartition算子,你用Spark SQL這一步的並行度和task數量,確定是沒有辦法去改變了。可是呢,能夠將你用Spark SQL查詢出來的RDD,使用repartition算子,去從新進行分區,此時能夠分區成多個partition,好比從20個partition,分區成100個。

4.5 reduceByKey本地聚合

    reduceByKey,相較於普通的shuffle操做(好比groupByKey),它的一個特色,就是說,會進行map端的本地聚合。


5、troubleshooting

5.1 控制shuffle reduce端緩衝大小以免OOM

    reduce端執行的聚合函數的代碼,可能會建立大量的對象。也許,一會兒,內存就撐不住了,就會OOM。這個時候,就應該減小reduce端task緩衝的大小。我寧願多拉取幾回,可是每次同時可以拉取到reduce端每一個task的數量,比較少,就不容易發生OOM內存溢出的問題。(好比,能夠調節成12M)

    spark.reducer.maxSizeInFlight,48
    spark.reducer.maxSizeInFlight,24

5.2 解決JVM GC致使的shuffle文件拉取失敗

spark.shuffle.io.maxRetries 3

    第一個參數,意思就是說,shuffle文件拉取的時候,若是沒有拉取到(拉取失敗),最多或重試幾回(會從新拉取幾回文件),默認是3次。 

    spark.shuffle.io.retryWait 5s 第二個參數,意思就是說,每一次重試拉取文件的時間間隔,默認是5s鍾。

5.3 解決YARN隊列資源不足致使的application直接失敗

    可能同時提交了相同的做業,以前那個做業就佔據了資源的60%,再提交一個相同的做業,確定會資源不足;或者提交了一個長時間的做業,然後面須要運行2分鐘的做業

    解決:跟運維溝通,實現調度策略。zeus

5.4 解決各類序列化致使的報錯

    序列化報錯要注意的三個點: 

  1. 你的算子函數裏面,若是使用到了外部的自定義類型的變量,那麼此時,就要求你的自定義類型,必須是可序列化的
  2. 若是要將自定義的類型,做爲RDD的元素類型,那麼自定義的類型也必須是能夠序列化的
  3. 不能在上述兩種狀況下,去使用一些第三方的,不支持序列化的類型

    Connection是不支持序列化的

5.5 解決算子函數返回NULL致使的問題

    你們能夠看到,在有些算子函數裏面,是須要咱們有一個返回值的。可是,有時候,咱們可能對某些值,就是不想有什麼返回值。

    咱們若是直接返回NULL的話,那麼能夠不幸的告訴你們,是不行的,會報錯的。 Scala.Math(NULL),異常 若是碰到你的確是對於某些值,不想要有返回值的話,有一個解決的辦法: 

  1. 在返回的時候,返回一些特殊的值,不要返回null,好比「-999」 
  2. 在經過算子獲取到了一個RDD以後,能夠對這個RDD執行filter操做,進行數據過濾。filter內,能夠對數據進行斷定,若是是-999,那麼就返回false,給過濾掉就能夠了。 
  3. 你們不要忘了,以前我們講過的那個算子調優裏面的coalesce算子,在filter以後,可使用coalesce算子壓縮一下RDD的partition的數量,讓各個partition的數據比較緊湊一些。也能提高一些性能。

5.6 解決yarn-client模式致使的網卡流量激增問題

    yarn-client模式下,只是用於測試時使用;

    yarn-cluster模式,就跟你的本地機器引發的網卡流量激增的問題,就沒有關係了。也就是說,就算有問題,也應該是yarn運維團隊和基礎運維團隊之間的事情了。

    使用了yarn-cluster模式之後,就不是你的本地機器運行Driver,進行task調度了。是yarn集羣中,某個節點會運行driver進程,負責task調度。

5.7 解決yarn-cluster模式的JVM棧內存溢出問題

    yarn-client模式下,driver是運行在本地機器上的,JVM的永久代的大小是128M,這個是沒有問題的

    yarn-cluster模式下,driver是運行在yarn集羣的某個節點上的,使用的是沒有通過配置的默認設置(PermGen永久代大小),82M。

    解決:spark-submit腳本中,加入如下配置便可:--conf spark.driver.extraJavaOptions="-XX:PermSize=128M -XX:MaxPermSize=256M"

    問題二:spark sql,調用的方法層級過多,由於產生了大量的,很是深的,超出了JVM棧深度限制的,遞歸。

    解決:JVM Stack Memory Overflow,棧內存溢出。 這種時候,建議不要搞那麼複雜的spark sql語句。採用替代方案:將一條sql語句,拆解成多條sql語句來執行。每條sql語句,就只有100個or子句之內;一條一條SQL語句來執行。根據生產環境經驗的測試,一條sql語句,100個or子句之內,是還能夠的。一般狀況下,不會報那個棧內存溢出。

5.8 錯誤的持久化方式以及checkpoint的使用


6、數據傾斜解決方案

6.1 聚合源數據以及過濾致使傾斜的key

  • 第一個方案:聚合源數據;將數據按key,將valuse進行聚合,中間使用分隔符進行拼接
  • 第二個方案:過濾致使傾斜的key

    簡單。直接。效果是很是之好的。完全根除了數據傾斜的問題。

6.2 提升shuffle操做reduce並行度

    全部的shuffle算子,好比groupByKey、countByKey、reduceByKey。在調用的時候,傳入進去一個參數。一個數字。那個數字,就表明了那個shuffle操做的reduce端的並行度。那麼在進行shuffle操做的時候,就會對應着建立指定數量的reduce task。 這樣的話,就可讓每一個reduce task分配到更少的數據。基本能夠緩解數據傾斜的問題。

    好比說,本來某個task分配數據特別多,直接OOM,內存溢出了,程序無法運行,直接掛掉。按照log,找到發生數據傾斜的shuffle操做,給它傳入一個並行度數字,這樣的話,原先那個task分配到的數據,確定會變少。就至少能夠避免OOM的狀況,程序至少是能夠跑的。

6.3 使用隨機key實現雙重聚合

    第一輪聚合的時候,對key進行打散,將原先同樣的key,變成不同的key,至關因而將每一個key分爲多組; 

    先針對多個組,進行key的局部聚合;接着,再去除掉每一個key的前綴,而後對全部的key,進行全局的聚合。 

    對groupByKey、reduceByKey形成的數據傾斜,有比較好的效果。 

    若是說,以前的第1、第2、第三種方案,都無法解決數據傾斜的問題,那麼就只能依靠這一種方式了。

6.4 將reduce join轉換爲map join

    若是兩個RDD要進行join,其中一個RDD是比較小的。一個RDD是100萬數據,一個RDD是1萬數據。(一個RDD是1億數據,一個RDD是100萬數據) 

    其中一個RDD必須是比較小的,broadcast出去那個小RDD的數據之後,就會在每一個executor的block manager中都駐留一份。要確保你的內存足夠存放那個小RDD中的數據 

    這種方式下,根本不會發生shuffle操做,確定也不會發生數據傾斜;從根本上杜絕了join操做可能致使的數據傾斜的問題; 

    對於join中有數據傾斜的狀況,你們儘可能第一時間先考慮這種方式,效果很是好;若是某個RDD比較小的狀況下。

6.5 sample採樣傾斜key單獨進行join

    將發生數據傾斜的key,單獨拉出來,放到一個RDD中去;就用這個本來會傾斜的key RDD跟其餘RDD,單獨去join一下,這個時候,key對應的數據,可能就會分散到多個task中去進行join操做。 

    就不至於說是,這個key跟以前其餘的key混合在一個RDD中時,確定是會致使一個key對應的全部數據,都到一個task中去,就會致使數據傾斜。

6.6 使用隨機數以及擴容表進行join

  1. 選擇一個RDD,要用flatMap,進行擴容,將每條數據,映射爲多條數據,每一個映射出來的數據,都帶了一個n之內的隨機數,一般來講,會選擇10。 
  2. 將另一個RDD,作普通的map映射操做,每條數據,都打上一個10之內的隨機數。 
  3. 最後,將兩個處理後的RDD,進行join操做。
相關文章
相關標籤/搜索