Spark調優-初級

寫在前面

先列出全部的調優參數,方便快速查詢sql

set hive.exec.dynamic.partition=true; ##--動態分區
set hive.exec.dynamic.partition.mode=nonstrict; ##--動態分區
set hive.auto.convert.join=true; ##-- 自動判斷大表和小表

##-- hive並行
set hive.exec.parallel=true;
set hive.merge.mapredfiles=true;

##-- 內存能力
set spark.driver.memory=8G; 
set spark.executor.memory=2G; 

##-- 併發度
set spark.dynamicAllocation.enabled=true;
set spark.dynamicAllocation.maxExecutors=50;
set spark.executor.cores=2;

##-- shuffle
set spark.sql.shuffle.partitions=100; -- 默認的partition數,及shuffle的reader數
set spark.sql.adaptive.enabled=true; -- 啓用自動設置 Shuffle Reducer 的特性,動態設置Shuffle Reducer個數(Adaptive Execution 的自動設置 Reducer 是由 ExchangeCoordinator 根據 Shuffle Write 統計信息決定)
set spark.sql.adaptive.join.enabled=true; -- 開啓 Adaptive Execution 的動態調整 Join 功能 (根據前面stage的shuffle write信息操做來動態調整是使用sortMergeJoin仍是broadcastJoin)
set spark.sql.adaptiveBroadcastJoinThreshold=268435456; -- 64M ,設置 SortMergeJoin 轉 BroadcastJoin 的閾值,默認與spark.sql.autoBroadcastJoinThreshold相同
set spark.sql.adaptive.shuffle.targetPostShuffleInputSize=134217728; -- shuffle時每一個reducer讀取的數據量大小,Adaptive Execution就是根據這個值動態設置Shuffle reader的數量
set spark.sql.adaptive.allowAdditionalShuffle=true; -- 是否容許爲了優化 Join 而增長 Shuffle,默認爲false
set spark.shuffle.service.enabled=true; 


##-- orc
set spark.sql.orc.filterPushdown=true;
set spark.sql.orc.splits.include.file.footer=true;
set spark.sql.orc.cache.stripe.details.size=10000;
set hive.exec.orc.split.strategy=ETL -- ETL:會切分文件,多個stripe組成一個split,BI:按文件進行切分,HYBRID:平均文件大小大於hadoop最大split值使用ETL,不然BI
set spark.hadoop.mapreduce.input.fileinputformat.split.maxsize=134217728; -- 128M 讀ORC時,設置一個split的最大值,超出後會進行文件切分
set spark.hadoop.mapreduce.input.fileinputformat.split.minsize=67108864; -- 64M 讀ORC時,設置小文件合併的閾值

##-- 其餘
set spark.sql.hive.metastorePartitionPruning=true;

##-- 廣播表
set spark.sql.autoBroadcastJoinThreshold=268435456; -- 256M

##-- 小文件
set spark.sql.mergeSmallFileSize=10485760; -- 10M -- 小文件合併的閾值
set spark.hadoopRDD.targetBytesInPartition=67108864; -- 64M 設置stage 輸入端的map(不涉及shuffle的時候)合併文件大小
set spark.sql.targetBytesInPartitionWhenMerge=67108864; --64M 設置額外的(非最開始)合併job的map端輸入size

複製代碼

一 運行行爲

1.1 動態生成分區

下列Hive參數對Spark一樣起做用。緩存

set hive.exec.dynamic.partition=true; // 是否容許動態生成分區

set hive.exec.dynamic.partition.mode=nonstrict; // 是否容忍指定分區所有動態生成

set hive.exec.max.dynamic.partitions = 100; // 動態生成的最多分區數
複製代碼

1.2 broadcast join

當大表JOIN小表時,若是小表足夠小,能夠將大表分片,分別用小表和每一個大表的分片進行JOIN,最後彙總,可以大大提高做業性能。bash

spark.sql.autoBroadcastJoinThreshold 默認值爲26214400(25M),若是小表的大小小於該值,則會將小表廣播到全部executor中,使JOIN快速完成。若是該值設置太大,則會致使executor內存壓力過大,容易出現OOM。併發

注:ORC格式的表會對數據進行壓縮,一般壓縮比爲2到3左右,但有些表的壓縮比就會很高,有時能夠達到10。請妥善配置該參數,並配合spark.executor.memory,使做業可以順利執行。app

1.3 動態資源分配

spark.dynamicAllocation.enabled:是否開啓動態資源分配,默認開啓,同時強烈建議用戶不要關閉。理由:開啓動態資源分配後,Spark能夠根據當前做業的負載動態申請和釋放資源。ide

spark.dynamicAllocation.maxExecutors: 開啓動態資源分配後,同一時刻,最多可申請的executor個數。平臺默認設置爲1000。當在Spark UI中觀察到task較多時,可適當調大此參數,保證task可以併發執行完成,縮短做業執行時間。函數

下圖是一個因爲併發不足致使做業執行較慢的一個明顯的任務:oop

打開執行時間較長的stage,查看其任務數爲2w+。性能

image.png

點擊stage的連接,進入查看stage中的任務,將任務按照Launch Time排序,先有小到大再由大到小。優化

image.png

image.png

能夠看到任務啓動時間差了3個多小時。能夠肯定該任務是因爲spark.dynamicAllocation.maxExecutors太小致使的。該參數能夠和spark.executor.cores配合增大做業併發度。

spark.dynamicAllocation.minExecutors: 和s,d,maxExecutors相反,此參數限定了某一時刻executor的最小個數。平臺默認設置爲3,即在任什麼時候刻,做業都會保持至少有3個及以上的executor存活,保證任務能夠迅速調度。

1.4 Shuflle相關

spark.sql.shuffle.partitions: 在有JOIN或聚合等須要shuffle的操做時,從mapper端寫出的partition個數,默認設置爲2000。

select a, avg(c) from test_table group by a語句,不考慮優化行爲,若是一個map端的task中包含有3000個a,根據spark.sql.shuffle.partitions=2000,會將計算結果分紅2000份partition(例如按2000取餘),寫到磁盤,啓動2000個reducer,每一個reducer從每一個mapper端拉取對應索引的partition。

看成業數據較多時,適當調大該值,看成業數據較少時,適當調小以節省資源。

spark.sql.adaptive.enabled:是否開啓調整partition功能,若是開啓,spark.sql.shuffle.partitions設置的partition可能會被合併到一個reducer裏運行。默認開啓,同時強烈建議開啓。理由:更好利用單個executor的性能,還能緩解小文件問題。

spark.sql.adaptive.shuffle.targetPostShuffleInputSize:和spark.sql.adaptive.enabled配合使用,當開啓調整partition功能後,當mapper端兩個partition的數據合併後數據量小於targetPostShuffleInputSize時,Spark會將兩個partition進行合併到一個reducer端進行處理。平臺默認爲67108864(64M),用戶可根據自身做業的狀況酌情調整該值。當調大該值時,一個reduce端task處理的數據量變大,最終產出的數據,存到HDFS上的文件也變大。當調小該值時,相反。

spark.sql.adaptive.minNumPostShufflePartitions: 當spark.sql.adaptive.enabled參數開啓後,有時會致使不少分區被合併,爲了防止分區過少,能夠設置spark.sql.adaptive.minNumPostShufflePartitions參數,防止分區過少而影響性能。

1.5 讀ORC表優化

hive.exec.orc.split.strategy參數控制在讀取ORC表時生成split的策略。BI策略以文件爲粒度進行split劃分;ETL策略會將文件進行切分,多個stripe組成一個split;HYBRID策略爲:當文件的平均大小大於hadoop最大split值(默認256 * 1024 * 1024)時使用ETL策略,不然使用BI策略。

對於一些較大的ORC表,可能其footer較大,ETL策略可能會致使其從hdfs拉取大量的數據來切分split,甚至會致使driver端OOM,所以這類表的讀取建議使用BI策略。

對於一些較小的尤爲有數據傾斜的表(這裏的數據傾斜指大量stripe存儲於少數文件中),建議使用ETL策略。

另外,spark.hadoop.mapreduce.input.fileinputformat.split.maxsize參數能夠控制在ORC切分時stripe的合併處理。具體邏輯是,當幾個stripe的大小大於spark.hadoop.mapreduce.input.fileinputformat.split.maxsize時,會合併到一個task中處理。能夠適當調小該值,如set spark.hadoop.mapreduce.input.fileinputformat.split.maxsize=134217728。以此增大讀ORC表的併發。

二 executor能力

2.1內存

spark.executor.memory:executor用於緩存數據、代碼執行的堆內存以及JVM運行時須要的內存。當executor端因爲OOM時,多數是因爲spark.executor.memory設置較小引發的。該參數通常能夠根據表中單個文件的大小進行估計,可是若是是壓縮表如ORC,則須要對文件大小乘以2~3倍,這是因爲文件解壓後所佔空間要增加2~3倍。平臺默認設置爲2G。

spark.yarn.executor.memoryOverhead:Spark運行還須要一些堆外內存,直接向系統申請,如數據傳輸時的netty等。

Spark根據 spark.executor.memory+spark.yarn.executor.memoryOverhead的值向RM申請一個容器,當executor運行時使用的內存超過這個限制時,會被yarn kill掉。在Spark UI中相應失敗的task的錯誤信息爲:

Container killed by YARN for exceeding memory limits. XXX of YYY physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
複製代碼

這個時候,適當調大spark.yarn.executor.memoryOverhead。默認設置爲1024(1G),注意:該參數的單位爲MB。可是,若是用戶在代碼中無限制的使用堆外內存。調大該參數沒有意義。須要用戶瞭解本身的代碼在executor中的行爲,合理使用堆內堆外內存。

spark.sql.windowExec.buffer.spill.threshold:當用戶的SQL中包含窗口函數時,並不會把一個窗口中的全部數據所有讀進內存,而是維護一個緩存池,當池中的數據條數大於該參數表示的閾值時,spark將數據寫到磁盤。該參數若是設置的太小,會致使spark頻繁寫磁盤,若是設置過大則一個窗口中的數據全都留在內存,有OOM的風險。可是,爲了實現快速讀入磁盤的數據,spark在每次讀磁盤數據時,會保存一個1M的緩存。

舉例:當spark.sql.windowExec.buffer.spill.threshold爲10時,若是一個窗口有100條數據,則spark會寫9((100 - 10)/10)次磁盤,在讀的時候,會建立9個磁盤reader,每一個reader會有一個1M的空間作緩存,也就是說會額外增長9M的空間。

當某個窗口中數據特別多時,會致使寫磁盤特別頻繁,就會佔用很大的內存空間做緩存。所以若是觀察到executor的日誌中存在大量以下內容,則能夠考慮適當調大該參數,平臺默認該參數爲40960。

pilling data because number of spilledRecords crossed the threshold
複製代碼

2.2 executor併發度

spark.executor.cores:單個executor上能夠同時運行的task數。Spark中的task調度在線程上,該參數決定了一個executor上能夠並行執行幾個task。這幾個task共享同一個executor的內存(spark.executor.memory+spark.yarn.executor.memoryOverhead)。適當提升該參數的值,能夠有效增長程序的併發度,是做業執行的更快,但使executor端的日誌變得不易閱讀,同時增長executor內存壓力,容易出現OOM。在做業executor端出現OOM時,若是不能增大spark.executor.memory,能夠適當下降該值。平臺默認設置爲1。

該參數是executor的併發度,和spark.dynamicAllocation.maxExecutors配合,能夠提升整個做業的併發度。

2.4 GC優化(使用較少,當嘗試其餘調優方法均無效時可嘗試此方法) executor的JVM參數傳遞方式爲:

set spark.executor.extraJavaOptions="XXXXXXXXXX "。
例如:
set spark.executor.extraJavaOptions="-XX:NewRatio=3 -XX:+UseG1GC"複製代碼

注:全部的JVM參數必須寫在一塊兒,不能分開。 bad case:

set spark.executor.extraJavaOptions="-XX:NewRatio=3 "; set spark.executor.extraJavaOptions="-XX:+UseG1GC " ;
複製代碼

打開GC打印:-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps

full GC 頻繁:內存不夠用,調大spark.executor.memory,調小spark.executor.cores

minor GC頻繁,而full GC比較少:能夠適當提升Eden區大小-Xmn

若是OldGen區快要滿了,適當提升spark.executor.memory(默認2G)或適當下降spark.memory.fraction(默認爲0.3)或適當提升-XX:NewRatio(老年代是年輕代的多少倍,通常默認是2)。

若是spark.executor.memory調的很大且GC還是程序運行的瓶頸,能夠嘗試啓用G1垃圾回收器(-XX:+UseG1GC

修改了GC的參數必定要仔細觀察GC的頻率和時間。

修改方法:set spark.executor.extraJavaOptions="-XX:NewRatio=3 -XX:+UseG1GC ..."

三 driver指標:

3.1 內存

spark.driver.memory:driver使用內存大小, 平臺默認爲10G,根據做業的大小能夠適當增大或減少此值。

3.2 GC優化

經過**set spark.driver.extraJavaOptions="XXXXXXXXXX "**設置,具體設置內容可參考2.4節,通常狀況driver內存較大,可嘗試啓用G1垃圾回收器。

相關文章
相關標籤/搜索