Spark性能調優的第一步,就是爲任務分配更多的資源,在必定範圍內,增長資源的分配與性能的提高是成正比的,實現了最優的資源配置後,在此基礎上再考慮進行後面論述的性能調優策略。算法
資源的分配在使用腳本提交Spark任務時進行指定,標準的Spark任務提交腳本如代碼清單2-1所示:sql
代碼清單2-1 標準Spark提交腳本數據庫
/usr/opt/modules/spark/bin/spark-submit \apache
--class com.atguigu.spark.Analysis \數組
--num-executors 80 \緩存
--driver-memory 6g \網絡
--executor-memory 6g \架構
--executor-cores 3 \app
/usr/opt/modules/spark/jar/spark.jar \運維
能夠進行分配的資源如表2-1所示:
表2-1 可分配資源表
名稱 |
說明 |
--num-executors |
配置Executor的數量 |
--driver-memory |
配置Driver內存(影響不大) |
--executor-memory |
配置每一個Executor的內存大小 |
--executor-cores |
配置每一個Executor的CPU core數量 |
調節原則:儘可能將任務分配的資源調節到可使用的資源的最大限度。
對於具體資源的分配,咱們分別討論Spark的兩種Cluster運行模式:
第一種是Spark Standalone模式,你在提交任務前,必定知道或者能夠從運維部門獲取到你可使用的資源狀況,在編寫submit腳本的時候,就根據可用的資源狀況進行資源的分配,好比說集羣有15臺機器,每臺機器爲8G內存,2個CPU core,那麼就指定15個Executor,每一個Executor分配8G內存,2個CPU core。
第二種是Spark Yarn模式,因爲Yarn使用資源隊列進行資源的分配和調度,在表寫submit腳本的時候,就根據Spark做業要提交到的資源隊列,進行資源的分配,好比資源隊列有400G內存,100個CPU core,那麼指定50個Executor,每一個Executor分配8G內存,2個CPU core。
對錶2-1中的各項資源進行了調節後,獲得的性能提高如表2-2所示:
表2-2 資源調節後的性能提高
名稱 |
解析 |
增長Executor·個數 |
在資源容許的狀況下,增長Executor的個數能夠提升執行task的並行度。好比有4個Executor,每一個Executor有2個CPU core,那麼能夠並行執行8個task,若是將Executor的個數增長到8個(資源容許的狀況下),那麼能夠並行執行16個task,此時的並行能力提高了一倍。 |
增長每一個Executor的CPU core個數 |
在資源容許的狀況下,增長每一個Executor的Cpu core個數,能夠提升執行task的並行度。好比有4個Executor,每一個Executor有2個CPU core,那麼能夠並行執行8個task,若是將每一個Executor的CPU core個數增長到4個(資源容許的狀況下),那麼能夠並行執行16個task,此時的並行能力提高了一倍。 |
增長每一個Executor的內存量 |
在資源容許的狀況下,增長每一個Executor的內存量之後,對性能的提高有三點:
|
補充:生產環境Spark submit腳本配置
/usr/local/spark/bin/spark-submit \
--class com.atguigu.spark.dataetl \
--num-executors 80 \
--driver-memory 6g \
--executor-memexecutoory 6g \
--r-cores 3 \
--master yarn-cluster \
--queue root.default \
--conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.core.connection.ack.wait.timeout =300 \
/usr/local/spark/spark.jar
參數配置參考值:
--num-executors:50~100
--driver-memory:1G~5G
--executor-memory:6G~10G
--executor-cores:3
--master:實際生產環境必定使用yarn-cluster
1.2.1 RDD複用
在對RDD進行算子時,要避免相同的算子和計算邏輯之下對RDD進行重複的計算,如圖2-1所示:
圖2-1 RDD的重複計算
對圖2-1中的RDD計算架構進行修改,獲得如圖2-2所示的優化結果:
圖2-2 RDD架構優化
1.2.2 RDD持久化
在Spark中,當屢次對同一個RDD執行算子操做時,每一次都會對這個RDD以以前的父RDD從新計算一次,這種狀況是必需要避免的,對同一個RDD的重複計算是對資源的極大浪費,所以,必須對屢次使用的RDD進行持久化,經過持久化將公共RDD的數據緩存到內存/磁盤中,以後對於公共RDD的計算都會從內存/磁盤中直接獲取RDD數據。
對於RDD的持久化,有兩點須要說明:
第一,RDD的持久化是能夠進行序列化的,當內存沒法將RDD的數據完整的進行存放的時候,能夠考慮使用序列化的方式減少數據體積,將數據完整存儲在內存中。
第二,若是對於數據的可靠性要求很高,而且內存充足,可使用副本機制,對RDD數據進行持久化。當持久化啓用了複本機制時,對於持久化的每一個數據單元都存儲一個副本,放在其餘節點上面,由此實現數據的容錯,一旦一個副本數據丟失,不須要從新計算,還可使用另一個副本。
1.2.3 RDD儘量早的filter操做
獲取到初始RDD後,應該考慮儘早地過濾掉不須要的數據,進而減小對內存的佔用,從而提高Spark做業的運行效率。
默認狀況下,task中的算子中若是使用了外部的變量,每一個task都會獲取一份變量的複本,這就形成了內存的極大消耗。一方面,若是後續對RDD進行持久化,可能就沒法將RDD數據存入內存,只能寫入磁盤,磁盤IO將會嚴重消耗性能;另外一方面,task在建立對象的時候,也許會發現堆內存沒法存放新建立的對象,這就會致使頻繁的GC,GC會致使工做線程中止,進而致使Spark暫停工做一段時間,嚴重影響Spark性能。
假設當前任務配置了20個Executor,指定500個task,有一個20M的變量被全部task共用,此時會在500個task中產生500個副本,耗費集羣10G的內存,若是使用了廣播變量, 那麼每一個Executor保存一個副本,一共消耗400M內存,內存消耗減小了5倍。
廣播變量在每一個Executor保存一個副本,此Executor的全部task共用此廣播變量,這讓變量產生的副本數量大大減小。
在初始階段,廣播變量只在Driver中有一份副本。task在運行的時候,想要使用廣播變量中的數據,此時首先會在本身本地的Executor對應的BlockManager中嘗試獲取變量,若是本地沒有,BlockManager就會從Driver或者其餘節點的BlockM=anager上遠程拉取變量的複本,並由本地的BlockManager進行管理;以後此Executor的全部task都會直接從本地的BlockManager中獲取變量。
默認狀況下,Spark使用Java的序列化機制。Java的序列化機制使用方便,不須要額外的配置,在算子中使用的變量實現Serializable接口便可,可是,Java序列化機制的效率不高,序列化速度慢而且序列化後的數據所佔用的空間依然較大。
Kryo序列化機制比Java序列化機制性能提升10倍左右,Spark之因此沒有默認使用Kryo做爲序列化類庫,是由於它不支持全部對象的序列化,同時Kryo須要用戶在使用前註冊須要序列化的類型,不夠方便,但從Spark 2.0.0版本開始,簡單類型、簡單類型數組、字符串類型的Shuffling RDDs 已經默認使用Kryo序列化方式了。
Kryo序列化註冊方式的實例代碼如代碼清單2-3所示:
代碼清單2-3 Kryo序列化機制配置代碼
public class MyKryoRegistrator implements KryoRegistrator
{
@Override
public void registerClasses(Kryo kryo)
{
kryo.register(StartupReportLogs.class);
}
}
配置Kryo序列化方式的實例代碼如代碼清單2-4所示:
代碼清單2-4 Kryo序列化機制配置代碼
//建立SparkConf對象
val conf = new SparkConf().setMaster(…).setAppName(…)
//使用Kryo序列化庫,若是要使用Java序列化庫,須要把該行屏蔽掉
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
//在Kryo序列化庫中註冊自定義的類集合,若是要使用Java序列化庫,須要把該行屏蔽掉
conf.set("spark.kryo.registrator", "atguigu.com.MyKryoRegistrator");
Spark做業運行過程當中,Driver會對每個stage的task進行分配。根據Spark的task分配算法,Spark但願task可以運行在它要計算的數據算在的節點(數據本地化思想),這樣就能夠避免數據的網絡傳輸。一般來講,task可能不會被分配到它處理的數據所在的節點,由於這些節點可用的資源可能已經用盡,此時,Spark會等待一段時間,默認3s,若是等待指定時間後仍然沒法在指定節點運行,那麼會自動降級,嘗試將task分配到比較差的本地化級別所對應的節點上,好比將task分配到離它要計算的數據比較近的一個節點,而後進行計算,若是當前級別仍然不行,那麼繼續降級。
當task要處理的數據不在task所在節點上時,會發生數據的傳輸。task會經過所在節點的BlockManager獲取數據,BlockManager發現數據不在本地時,戶經過網絡傳輸組件從數據所在節點的BlockManager處獲取數據。
網絡傳輸數據的狀況是咱們不肯意看到的,大量的網絡傳輸會嚴重影響性能,所以,咱們但願經過調節本地化等待時長,若是在等待時長這段時間內,目標節點處理完成了一部分task,那麼當前的task將有機會獲得執行,這樣就可以改善Spark做業的總體性能。
Spark的本地化等級如表2-3所示:
表2-3 Spark本地化等級
名稱 |
解析 |
PROCESS_LOCAL |
進程本地化,task和數據在同一個Executor中,性能最好。 |
NODE_LOCAL |
節點本地化,task和數據在同一個節點中,可是task和數據不在同一個Executor中,數據須要在進程間進行傳輸。 |
RACK_LOCAL |
機架本地化,task和數據在同一個機架的兩個節點上,數據須要經過網絡在節點之間進行傳輸。 |
NO_PREF |
對於task來講,從哪裏獲取都同樣,沒有好壞之分。 |
ANY |
task和數據能夠在集羣的任何地方,並且不在一個機架中,性能最差。 |
在Spark項目開發階段,可使用client模式對程序進行測試,此時,能夠在本地看到比較全的日誌信息,日誌信息中有明確的task數據本地化的級別,若是大部分都是PROCESS_LOCAL,那麼就無需進行調節,可是若是發現不少的級別都是NODE_LOCAL、ANY,那麼須要對本地化的等待時長進行調節,經過延長本地化等待時長,看看task的本地化級別有沒有提高,並觀察Spark做業的運行時間有沒有縮短。
注意,過猶不及,不要將本地化等待時長延長地過長,致使由於大量的等待時長,使得Spark做業的運行時間反而增長了。
Spark本地化等待時長的設置如代碼清單2-5所示:
代碼清單2-5 Spark本地化等待時長設置示例
val conf = new SparkConf()
.set("spark.locality.wait", "6")
普通的map算子對RDD中的每個元素進行操做,而mapPartitions算子對RDD中每個分區進行操做。若是是普通的map算子,假設一個partition有1萬條數據,那麼map算子中的function要執行1萬次,也就是對每一個元素進行操做。
圖2-3 map算子
若是是mapPartition算子,因爲一個task處理一個RDD的partition,那麼一個task只會執行一次function,function一次接收全部的partition數據,效率比較高。
圖2-4 mapPartitions算子
好比,當要把RDD中的全部數據經過JDBC寫入數據,若是使用map算子,那麼須要對RDD中的每個元素都建立一個數據庫鏈接,這樣對資源的消耗很大,若是使用mapPartitions算子,那麼針對一個分區的數據,只須要創建一個數據庫鏈接。
mapPartitions算子也存在一些缺點:對於普通的map操做,一次處理一條數據,若是在處理了2000條數據後內存不足,那麼能夠將已經處理完的2000條數據從內存中垃圾回收掉;可是若是使用mapPartitions算子,但數據量很是大時,function一次處理一個分區的數據,若是一旦內存不足,此時沒法回收內存,就可能會OOM,即內存溢出。
所以,mapPartitions算子適用於數據量不是特別大的時候,此時使用mapPartitions算子對性能的提高效果仍是不錯的。(當數據量很大的時候,一旦使用mapPartitions算子,就會直接OOM)
在項目中,應該首先估算一下RDD的數據量、每一個partition的數據量,以及分配給每一個Executor的內存資源,若是資源容許,能夠考慮使用mapPartitions算子代替map。
在生產環境中,一般使用foreachPartition算子來完成數據庫的寫入,經過foreachPartition算子的特性,能夠優化寫數據庫的性能。
若是使用foreach算子完成數據庫的操做,因爲foreach算子是遍歷RDD的每條數據,所以,每條數據都會創建一個數據庫鏈接,這是對資源的極大浪費,所以,對於寫數據庫操做,咱們應當使用foreachPartition算子。
與mapPartitions算子很是類似,foreachPartition是將RDD的每一個分區做爲遍歷對象,一次處理一個分區的數據,也就是說,若是涉及數據庫的相關操做,一個分區的數據只須要建立一次數據庫鏈接,如圖2-5所示:
圖2-5 foreachPartition算子
使用了foreachPartition算子後,能夠得到如下的性能提高:
1. 對於咱們寫的function函數,一次處理一整個分區的數據;
2. 對於一個分區內的數據,建立惟一的數據庫鏈接;
3. 只須要向數據庫發送一次SQL語句和多組參數;
在生產環境中,所有都會使用foreachPartition算子完成數據庫操做-。foreachPartition算子存在一個問題,與mapPartitions算子相似,若是一個分區的數據量特別大,可能會形成OOM,即內存溢出。
在Spark任務中咱們常常會使用filter算子完成RDD中數據的過濾,在任務初始階段,從各個分區中加載到的數據量是相近的,可是一旦進過filter過濾後,每一個分區的數據量有可能會存在較大差別,如圖2-6所示:
圖2-6 分區數據過濾結果
根據圖2-6咱們能夠發現兩個問題:
1. 每一個partition的數據量變小了,若是還按照以前與partition相等的task個數去處理當前數據,有點浪費task的計算資源;
2. 每一個partition的數據量不同,會致使後面的每一個task處理每一個partition數據的時候,每一個task要處理的數據量不一樣,這頗有可能致使數據傾斜問題。
如圖2-6所示,第二個分區的數據過濾後只剩100條,而第三個分區的數據過濾後剩下800條,在相同的處理邏輯下,第二個分區對應的task處理的數據量與第三個分區對應的task處理的數據量差距達到了8倍,這也會致使運行速度可能存在數倍的差距,這也就是數據傾斜問題。
針對上述的兩個問題,咱們分別進行分析:
1. 針對第一個問題,既然分區的數據量變小了,咱們但願能夠對分區數據進行從新分配,好比將原來4個分區的數據轉化到2個分區中,這樣只須要用後面的兩個task進行處理便可,避免了資源的浪費。
2. 針對第二個問題,解決方法和第一個問題的解決方法很是類似,對分區數據從新分配,讓每一個partition中的數據量差很少,這就避免了數據傾斜問題。
那麼具體應該如何實現上面的解決思路?咱們須要coalesce算子。
repartition與coalesce均可以用來進行重分區,其中repartition只是coalesce接口中shuffle爲true的簡易實現,coalesce默認狀況下不進行shuffle,可是能夠經過參數進行設置。
假設咱們但願將本來的分區個數A經過從新分區變爲B,那麼有如下幾種狀況:
① A與B相差值不大
此時使用coalesce便可,無需shuffle過程。
② A與B相差值很大
此時可使用coalesce而且不啓用shuffle過程,可是會致使合併過程性能低下,因此推薦設置coalesce的第二個參數爲true,即啓動shuffle過程。
此時使用repartition便可,若是使用coalesce須要將shuffle設置爲true,不然coalesce無效。
咱們能夠在filter操做以後,使用coalesce算子針對每一個partition的數據量各不相同的狀況,壓縮partition的數量,並且讓每一個partition的數據量儘可能均勻緊湊,以便於後面的task進行計算操做,在某種程度上可以在必定程度上提高性能。
注意:local模式是進程內模擬集羣運行,已經對並行度和分區數量有了必定的內部優化,所以不用去設置並行度和分區數量。
Spark SQL的並行度不容許用戶本身指定,Spark SQL本身會默認根據hive表對應的HDFS文件的block個數自動設置Spark SQL所在的那個stage的並行度,但有時此默認並行度太低,致使任務運行緩慢。
因爲Spark SQL所在stage的並行度沒法手動設置,若是數據量較大,而且此stage中後續的transformation操做有着複雜的業務邏輯,而Spark SQL自動設置的task數量不多,這就意味着每一個task要處理爲數很多的數據量,而後還要執行很是複雜的處理邏輯,這就可能表現爲第一個有Spark SQL的stage速度很慢,然後續的沒有Spark SQL的stage運行速度很是快。
爲了解決Spark SQL沒法設置並行度和task數量的問題,咱們可使用repartition算子。
圖2-7 repartition算子使用先後對比圖
Spark SQL這一步的並行度和task數量確定是沒有辦法去改變了,可是,對於Spark SQL查詢出來的RDD,當即使用repartition算子,去從新進行分區,這樣能夠從新分區爲多個partition,從repartition以後的RDD操做,因爲再也不設計Spark SQL,所以stage的並行度就會等於你手動設置的值,這樣就避免了Spark SQL所在的stage只能用少許的task去處理大量數據並執行復雜的算法邏輯。使用repartition算子的先後對好比圖2-7所示。
reduceByKey相較於普通的shuffle操做一個顯著的特色就是會進行map端的本地聚合,map端會先對本地的數據進行combine操做,而後將數據寫入給下個stage的每一個task建立的文件中,也就是在map端,對每個key對應的value,執行reduceByKey算子函數。reduceByKey算子的執行過程如圖2-8所示:
圖2-8 reduceByKey算子執行過程
使用reduceByKey對性能的提高以下:
基於reduceByKey的本地聚合特徵,咱們應該考慮使用reduceByKey代替其餘的shuffle算子,例如groupByKey。reduceByKey與groupByKey的運行原理如圖2-9和圖2-10所示:
圖2-9 groupByKey原理
圖2-10 reduceByKey原理
根據上圖可知,groupByKey不會進行map端的聚合,而是將全部map端的數據shuffle到reduce端,而後在reduce端進行數據的聚合操做。因爲reduceByKey有map端聚合的特性,使得網絡傳輸的數據量減少,所以效率要明顯高於groupByKey。
在Spark任務運行過程當中,若是shuffle的map端處理的數據量比較大,可是map端緩衝的大小是固定的,可能會出現map端緩衝數據頻繁spill溢寫到磁盤文件中的狀況,使得性能很是低下,經過調節map端緩衝的大小,能夠避免頻繁的磁盤IO操做,進而提高Spark任務的總體性能。
map端緩衝的默認配置是32KB,若是每一個task處理640KB的數據,那麼會發生640/32 = 20次溢寫,若是每一個task處理64000KB的數據,機會發生64000/32=2000此溢寫,這對於性能的影響是很是嚴重的。
map端緩衝的配置方法如代碼清單2-7所示:
代碼清單2-7 map端緩衝配置
val conf = new SparkConf()
.set("spark.shuffle.file.buffer", "64")
Spark Shuffle過程當中,shuffle reduce task的buffer緩衝區大小決定了reduce task每次可以緩衝的數據量,也就是每次可以拉取的數據量,若是內存資源較爲充足,適當增長拉取數據緩衝區的大小,能夠減小拉取數據的次數,也就能夠減小網絡傳輸的次數,進而提高性能。
reduce端數據拉取緩衝區的大小能夠經過spark.reducer.maxSizeInFlight參數進行設置,默認爲48MB,該參數的設置方法如代碼清單2-8所示:
代碼清單2-8 reduce端數據拉取緩衝區配置
val conf = new SparkConf()
.set("spark.reducer.maxSizeInFlight", "96")
Spark Shuffle過程當中,reduce task拉取屬於本身的數據時,若是由於網絡異常等緣由致使失敗會自動進行重試。對於那些包含了特別耗時的shuffle操做的做業,建議增長重試最大次數(好比60次),以免因爲JVM的full gc或者網絡不穩定等因素致使的數據拉取失敗。在實踐中發現,對於針對超大數據量(數十億~上百億)的shuffle過程,調節該參數能夠大幅度提高穩定性。
reduce端拉取數據重試次數能夠經過spark.shuffle.io.maxRetries參數進行設置,該參數就表明了能夠重試的最大次數。若是在指定次數以內拉取仍是沒有成功,就可能會致使做業執行失敗,默認爲3,該參數的設置方法如代碼清單2-9所示:
代碼清單2-9 reduce端拉取數據重試次數配置
val conf = new SparkConf()
.set("spark.shuffle.io.maxRetries", "6")
Spark Shuffle過程當中,reduce task拉取屬於本身的數據時,若是由於網絡異常等緣由致使失敗會自動進行重試,在一次失敗後,會等待必定的時間間隔再進行重試,能夠經過加大間隔時長(好比60s),以增長shuffle操做的穩定性。
reduce端拉取數據等待間隔能夠經過spark.shuffle.io.retryWait參數進行設置,默認值爲5s,該參數的設置方法如代碼清單2-10所示:
代碼清單2-10 reduce端拉取數據等待間隔配置
val conf = new SparkConf()
.set("spark.shuffle.io.retryWait", "10s")
對於SortShuffleManager,若是shuffle reduce task的數量小於某一閾值則shuffle write過程當中不會進行排序操做,而是直接按照未經優化的HashShuffleManager的方式去寫數據,可是最後會將每一個task產生的全部臨時磁盤文件都合併成一個文件,並會建立單獨的索引文件。
當你使用SortShuffleManager時,若是的確不須要排序操做,那麼建議將這個參數調大一些,大於shuffle read task的數量,那麼此時map-side就不會進行排序了,減小了排序的性能開銷,可是這種方式下,依然會產生大量的磁盤文件,所以shuffle write性能有待提升。
SortShuffleManager排序操做閾值的設置能夠經過spark.shuffle.sort. bypassMergeThreshold這一參數進行設置,默認值爲200,該參數的設置方法如代碼清單2-11所示:
代碼清單2-10 reduce端拉取數據等待間隔配置
val conf = new SparkConf()
.set("spark.shuffle.sort.bypassMergeThreshold", "400")
對於JVM調優,首先應該明確,full gc/minor gc,都會致使JVM的工做線程中止工做,即stop the world。
1. 靜態內存管理機制
根據Spark靜態內存管理機制,堆內存被劃分爲了兩塊,Storage和Execution。Storage主要用於緩存RDD數據和broadcast數據,Execution主要用於緩存在shuffle過程當中產生的中間數據,Storage佔系統內存的60%,Execution佔系統內存的20%,而且二者徹底獨立。
在通常狀況下,Storage的內存都提供給了cache操做,可是若是在某些狀況下cache操做內存不是很緊張,而task的算子中建立的對象不少,Execution內存又相對較小,這回致使頻繁的minor gc,甚至於頻繁的full gc,進而致使Spark頻繁的中止工做,性能影響會很大。
在Spark UI中能夠查看每一個stage的運行狀況,包括每一個task的運行時間、gc時間等等,若是發現gc太頻繁,時間太長,就能夠考慮調節Storage的內存佔比,讓task執行算子函數式,有更多的內存可使用。
Storage內存區域能夠經過spark.storage.memoryFraction參數進行指定,默認爲0.6,即60%,能夠逐級向下遞減,如代碼清單2-6所示:
代碼清單2-6 Storage內存佔比設置
val conf = new SparkConf()
.set("spark.storage.memoryFraction", "0.4")
2. 統一內存管理機制
根據Spark統一內存管理機制,堆內存被劃分爲了兩塊,Storage和Execution。Storage主要用於緩存數據,Execution主要用於緩存在shuffle過程當中產生的中間數據,二者所組成的內存部分稱爲統一內存,Storage和Execution各佔統一內存的50%,因爲動態佔用機制的實現,shuffle過程須要的內存過大時,會自動佔用Storage的內存區域,所以無需手動進行調節。
Executor的堆外內存主要用於程序的共享庫、Perm Space、 線程Stack和一些Memory mapping等, 或者類C方式allocate object。
有時,若是你的Spark做業處理的數據量很是大,達到幾億的數據量,此時運行Spark做業會時不時地報錯,例如shuffle output file cannot find,executor lost,task lost,out of memory等,這多是Executor的堆外內存不太夠用,致使Executor在運行的過程當中內存溢出。
stage的task在運行的時候,可能要從一些Executor中去拉取shuffle map output文件,可是Executor可能已經因爲內存溢出掛掉了,其關聯的BlockManager也沒有了,這就可能會報出shuffle output file cannot find,executor lost,task lost,out of memory等錯誤,此時,就能夠考慮調節一下Executor的堆外內存,也就能夠避免報錯,與此同時,堆外內存調節的比較大的時候,對於性能來說,也會帶來必定的提高。
默認狀況下,Executor堆外內存上限大概爲300多MB,在實際的生產環境下,對海量數據進行處理的時候,這裏都會出現問題,致使Spark做業反覆崩潰,沒法運行,此時就會去調節這個參數,到至少1G,甚至於2G、4G。
Executor堆外內存的配置須要在spark-submit腳本里配置,如代碼清單2-7所示:
代碼清單2-7 Executor堆外內存配置
--conf spark.yarn.executor.memoryOverhead=2048
以上參數配置完成後,會避免掉某些JVM OOM的異常問題,同時,能夠提高總體Spark做業的性能。
在Spark做業運行過程當中,Executor優先從本身本地關聯的BlockManager中獲取某份數據,若是本地BlockManager沒有的話,會經過TransferService遠程鏈接其餘節點上Executor的BlockManager來獲取數據。
若是task在運行過程當中建立大量對象或者建立的對象較大,會佔用大量的內存,這回致使頻繁的垃圾回收,可是垃圾回收會致使工做現場所有中止,也就是說,垃圾回收一旦執行,Spark的Executor進程就會中止工做,沒法提供相應,此時,因爲沒有響應,沒法創建網絡鏈接,會致使網絡鏈接超時。
在生產環境下,有時會遇到file not found、file lost這類錯誤,在這種狀況下,頗有多是Executor的BlockManager在拉取數據的時候,沒法創建鏈接,而後超過默認的鏈接等待時長60s後,宣告數據拉取失敗,若是反覆嘗試都拉取不到數據,可能會致使Spark做業的崩潰。這種狀況也可能會致使DAGScheduler反覆提交幾回stage,TaskScheduler返回提交幾回task,大大延長了咱們的Spark做業的運行時間。
此時,能夠考慮調節鏈接的超時時長,鏈接等待時長鬚要在spark-submit腳本中進行設置,設置方式如代碼清單2-8所示:
代碼清單2-8 鏈接等待時長配置
--conf spark.core.connection.ack.wait.timeout=300
調節鏈接等待時長後,一般能夠避免部分的XX文件拉取失敗、XX文件lost等報錯。
Spark中的數據傾斜問題主要指shuffle過程當中出現的數據傾斜問題,是因爲不一樣的key對應的數據量不一樣致使的不一樣task所處理的數據量不一樣的問題。
例如,reduce點一共要處理100萬條數據,第一個和第二個task分別被分配到了1萬條數據,計算5分鐘內完成,第三個task分配到了98萬數據,此時第三個task可能須要10個小時完成,這使得整個Spark做業須要10個小時才能運行完成,這就是數據傾斜所帶來的後果。
注意,要區分開數據傾斜與數據量過量這兩種狀況,數據傾斜是指少數task被分配了絕大多數的數據,所以少數task運行緩慢;數據過量是指全部task被分配的數據量都很大,相差很少,全部task都運行緩慢。
數據傾斜的表現:
1. Spark做業的大部分task都執行迅速,只有有限的幾個task執行的很是慢,此時可能出現了數據傾斜,做業能夠運行,可是運行得很是慢;
2. Spark做業的大部分task都執行迅速,可是有的task在運行過程當中會忽然報出OOM,反覆執行幾回都在某一個task報出OOM錯誤,此時可能出現了數據傾斜,做業沒法正常運行。
定位數據傾斜問題:
1. 查閱代碼中的shuffle算子,例如reduceByKey、countByKey、groupByKey、join等算子,根據代碼邏輯判斷此處是否會出現數據傾斜;
2. 查看Spark做業的log文件,log文件對於錯誤的記錄會精確到代碼的某一行,能夠根據異常定位到的代碼位置來明確錯誤發生在第幾個stage,對應的shuffle算子是哪個;
絕大多數狀況下,Spark做業的數據來源都是Hive表,這些Hive表基本都是通過ETL以後的昨天的數據。
爲了不數據傾斜,咱們能夠考慮避免shuffle過程,若是避免了shuffle過程,那麼從根本上就消除了發生數據傾斜問題的可能。
若是Spark做業的數據來源於Hive表,那麼能夠先在Hive表中對數據進行聚合,例如按照key進行分組,將同一key對應的全部value用一種特殊的格式拼接到一個字符串裏去,這樣,一個key就只有一條數據了;以後,對一個key的全部value進行處理時,只須要進行map操做便可,無需再進行任何的shuffle操做。經過上述方式就避免了執行shuffle操做,也就不可能會發生任何的數據傾斜問題。
對於Hive表中數據的操做,不必定是拼接成一個字符串,也能夠是直接對key的每一條數據進行累計計算。
若是在Spark做業中容許丟棄某些數據,那麼能夠考慮將可能致使數據傾斜的key進行過濾,濾除可能致使數據傾斜的key對應的數據,這樣,在Spark做業中就不會發生數據傾斜了。
當方案一和方案二對於數據傾斜的處理沒有很好的效果時,能夠考慮提升shuffle過程當中的reduce端並行度,reduce端並行度的提升就增長了reduce端task的數量,那麼每一個task分配到的數據量就會相應減小,由此緩解數據傾斜問題。
在大部分的shuffle算子中,均可以傳入一個並行度的設置參數,好比reduceByKey(500),這個參數會決定shuffle過程當中reduce端的並行度,在進行shuffle操做的時候,就會對應着建立指定數量的reduce task。對於Spark SQL中的shuffle類語句,好比group by、join等,須要設置一個參數,即spark.sql.shuffle.partitions,該參數表明了shuffle read task的並行度,該值默認是200,對於不少場景來講都有點太小。
增長shuffle read task的數量,可讓本來分配給一個task的多個key分配給多個task,從而讓每一個task處理比原來更少的數據。舉例來講,若是本來有5個key,每一個key對應10條數據,這5個key都是分配給一個task的,那麼這個task就要處理50條數據。而增長了shuffle read task之後,每一個task就分配到一個key,即每一個task就處理10條數據,那麼天然每一個task的執行時間都會變短了。
提升reduce端並行度並無從根本上改變數據傾斜的本質和問題(方案一和方案二從根本上避免了數據傾斜的發生),只是儘量地去緩解和減輕shuffle reduce task的數據壓力,以及數據傾斜的問題,適用於有較多key對應的數據量都比較大的狀況。
該方案一般沒法完全解決數據傾斜,由於若是出現一些極端狀況,好比某個key對應的數據量有100萬,那麼不管你的task數量增長到多少,這個對應着100萬數據的key確定仍是會分配到一個task中去處理,所以註定仍是會發生數據傾斜的。因此這種方案只能說是在發現數據傾斜時嘗試使用的第一種手段,嘗試去用嘴簡單的方法緩解數據傾斜而已,或者是和其餘方案結合起來使用。
在理想狀況下,reduce端並行度提高後,會在必定程度上減輕數據傾斜的問題,甚至基本消除數據傾斜;可是,在一些狀況下,只會讓原來因爲數據傾斜而運行緩慢的task運行速度稍有提高,或者避免了某些task的OOM問題,可是,仍然運行緩慢,此時,要及時放棄方案三,開始嘗試後面的方案。
當使用了相似於groupByKey、reduceByKey這樣的算子時,能夠考慮使用隨機key實現雙重聚合,如圖3-1所示:
圖3-1 隨機key實現雙重聚合
首先,經過map算子給每一個數據的key添加隨機數前綴,對key進行打散,將原先同樣的key變成不同的key,而後進行第一次聚合,這樣就可讓本來被一個task處理的數據分散到多個task上去作局部聚合;隨後,去除掉每一個key的前綴,再次進行聚合。
此方法對於由groupByKey、reduceByKey這類算子形成的數據傾斜由比較好的效果,僅僅適用於聚合類的shuffle操做,適用範圍相對較窄。若是是join類的shuffle操做,還得用其餘的解決方案。
此方法也是前幾種方案沒有比較好的效果時要嘗試的解決方案。
正常狀況下,join操做都會執行shuffle過程,而且執行的是reduce join,也就是先將全部相同的key和對應的value匯聚到一個reduce task中,而後再進行join。普通join的過程以下圖所示:
圖3-2 普通join過程
普通的join是會走shuffle過程的,而一旦shuffle,就至關於會將相同key的數據拉取到一個shuffle read task中再進行join,此時就是reduce join。可是若是一個RDD是比較小的,則能夠採用廣播小RDD全量數據+map算子來實現與join一樣的效果,也就是map join,此時就不會發生shuffle操做,也就不會發生數據傾斜。
(注意,RDD是並不能進行廣播的,只能將RDD內部的數據經過collect拉取到Driver內存而後再進行廣播)
不使用join算子進行鏈接操做,而使用Broadcast變量與map類算子實現join操做,進而徹底規避掉shuffle類的操做,完全避免數據傾斜的發生和出現。將較小RDD中的數據直接經過collect算子拉取到Driver端的內存中來,而後對其建立一個Broadcast變量;接着對另一個RDD執行map類算子,在算子函數內,從Broadcast變量中獲取較小RDD的全量數據,與當前RDD的每一條數據按照鏈接key進行比對,若是鏈接key相同的話,那麼就將兩個RDD的數據用你須要的方式鏈接起來。
根據上述思路,根本不會發生shuffle操做,從根本上杜絕了join操做可能致使的數據傾斜問題。
當join操做有數據傾斜問題而且其中一個RDD的數據量較小時,能夠優先考慮這種方式,效果很是好。map join的過程如圖3-3所示:
圖3-3 map join過程
因爲Spark的廣播變量是在每一個Executor中保存一個副本,若是兩個RDD數據量都比較大,那麼若是將一個數據量比較大的 RDD作成廣播變量,那麼頗有可能會形成內存溢出。
在Spark中,若是某個RDD只有一個key,那麼在shuffle過程當中會默認將此key對應的數據打散,由不一樣的reduce端task進行處理。
當由單個key致使數據傾斜時,可有將發生數據傾斜的key單獨提取出來,組成一個RDD,而後用這個本來會致使傾斜的key組成的RDD根其餘RDD單獨join,此時,根據Spark的運行機制,此RDD中的數據會在shuffle階段被分散到多個task中去進行join操做。傾斜key單獨join的流程如圖3-4所示:
圖3-4 傾斜key單獨join流程
1. 適用場景分析:
對於RDD中的數據,能夠將其轉換爲一箇中間表,或者是直接使用countByKey()的方式,看一個這個RDD中各個key對應的數據量,此時若是你發現整個RDD就一個key的數據量特別多,那麼就能夠考慮使用這種方法。
當數據量很是大時,能夠考慮使用sample採樣獲取10%的數據,而後分析這10%的數據中哪一個key可能會致使數據傾斜,而後將這個key對應的數據單獨提取出來。
2. 不適用場景分析:
若是一個RDD中致使數據傾斜的key不少,那麼此方案不適用。
若是在進行join操做時,RDD中有大量的key致使數據傾斜,那麼進行分拆key也沒什麼意義,此時就只能使用最後一種方案來解決問題了,對於join操做,咱們能夠考慮對其中一個RDD數據進行擴容,另外一個RDD進行稀釋後再join。
咱們會將原先同樣的key經過附加隨機前綴變成不同的key,而後就能夠將這些處理後的「不一樣key」分散到多個task中去處理,而不是讓一個task處理大量的相同key。這一種方案是針對有大量傾斜key的狀況,無法將部分key拆分出來進行單獨處理,須要對整個RDD進行數據擴容,對內存資源要求很高。
1. 核心思想:
選擇一個RDD,使用flatMap進行擴容,對每條數據的key添加數值前綴(1~N的數值),將一條數據映射爲多條數據;(擴容)
選擇另一個RDD,進行map映射操做,每條數據的key都打上一個隨機數做爲前綴(1~N的隨機數);(稀釋)
將兩個處理後的RDD,進行join操做。
圖3-6 使用隨機數以及擴容進行join
2. 侷限性:
若是兩個RDD都很大,那麼將RDD進行N倍的擴容顯然行不通;
使用擴容的方式只能緩解數據傾斜,不能完全解決數據傾斜問題。
當RDD中有幾個key致使數據傾斜時,方案六再也不適用,而方案七又很是消耗資源,此時能夠引入方案七的思想完善方案六:
1. 對包含少數幾個數據量過大的key的那個RDD,經過sample算子採樣出一份樣原本,而後統計一下每一個key的數量,計算出來數據量最大的是哪幾個key。
2. 而後將這幾個key對應的數據從原來的RDD中拆分出來,造成一個單獨的RDD,並給每一個key都打上n之內的隨機數做爲前綴,而不會致使傾斜的大部分key造成另一個RDD。
3. 接着將須要join的另外一個RDD,也過濾出來那幾個傾斜key對應的數據並造成一個單獨的RDD,將每條數據膨脹成n條數據,這n條數據都按順序附加一個0~n的前綴,不會致使傾斜的大部分key也造成另一個RDD。
4. 再將附加了隨機前綴的獨立RDD與另外一個膨脹n倍的獨立RDD進行join,此時就能夠將原先相同的key打散成n份,分散到多個task中去進行join了。
5. 而另外兩個普通的RDD就照常join便可。
6. 最後將兩次join的結果使用union算子合併起來便可,就是最終的join結果。
在Shuffle過程,reduce端task並非等到map端task將其數據所有寫入磁盤後再去拉取,而是map端寫一點數據,reduce端task就會拉取一小部分數據,而後當即進行後面的聚合、算子函數的使用等操做。
reduce端task可以拉取多少數據,由reduce拉取數據的緩衝區buffer來決定,由於拉取過來的數據都是先放在buffer中,而後再進行後續的處理,buffer的默認大小爲48MB。
reduce端task會一邊拉取一邊計算,不必定每次都會拉滿48MB的數據,可能大多數時候拉取一部分數據就處理掉了。
雖說增大reduce端緩衝區大小能夠減小拉取次數,提高Shuffle性能,可是有時map端的數據量很是大,寫出的速度很是快,此時reduce端的全部task在拉取的時候,有可能所有達到本身緩衝的最大極限值,即48MB,此時,再加上reduce端執行的聚合函數的代碼,可能會建立大量的對象,這可難會致使內存溢出,即OOM。
若是一旦出現reduce端內存溢出的問題,咱們能夠考慮減少reduce端拉取數據緩衝區的大小,例如減小爲12MB。
在實際生產環境中是出現過這種問題的,這是典型的以性能換執行的原理。reduce端拉取數據的緩衝區減少,不容易致使OOM,可是相應的,reudce端的拉取次數增長,形成更多的網絡傳輸開銷,形成性能的降低。
注意,要保證任務可以運行,再考慮性能的優化。
在Spark做業中,有時會出現shuffle file not found的錯誤,這是很是常見的一個報錯,有時出現這種錯誤之後,選擇從新執行一遍,就再也不報出這種錯誤。
出現上述問題可能的緣由是Shuffle操做中,後面stage的task想要去上一個stage的task所在的Executor拉取數據,結果對方正在執行GC,執行GC會致使Executor內全部的工做現場所有中止,好比BlockManager、基於netty的網絡通訊等,這就會致使後面的task拉取數據拉取了半天都沒有拉取到,就會報出shuffle file not found的錯誤,而第二次再次執行就不會再出現這種錯誤。
能夠經過調整reduce端拉取數據重試次數和reduce端拉取數據時間間隔這兩個參數來對Shuffle性能進行調整,增大參數值,使得reduce端拉取數據的重試次數增長,而且每次失敗後等待的時間間隔加長。
代碼清單4-1 JVM GC致使的shuffle文件拉取失敗
val conf = new SparkConf()
.set("spark.shuffle.io.maxRetries", "6")
.set("spark.shuffle.io.retryWait", "6s")
當Spark做業在運行過程當中報錯,並且報錯信息中含有Serializable等相似詞彙,那麼多是序列化問題致使的報錯。
序列化問題要注意如下三點:
在一些算子函數裏,須要咱們有一個返回值,可是在一些狀況下咱們不但願有返回值,此時咱們若是直接返回NULL,會報錯,例如Scala.Math(NULL)異常。
若是你遇到某些狀況,不但願有返回值,那麼能夠經過下述方式解決:
2. 在經過算子獲取到了一個RDD以後,能夠對這個RDD執行filter操做,進行數據過濾,將數值爲-1的數據給過濾掉;
3. 在使用完filter算子後,繼續調用coalesce算子進行優化。
YARN-client模式的運行原理以下圖所示:
圖4-1 YARN-client模式運行原理
在YARN-client模式下,Driver啓動在本地機器上,而Driver負責全部的任務調度,須要與YARN集羣上的多個Executor進行頻繁的通訊。
假設有100個Executor, 1000個task,那麼每一個Executor分配到10個task,以後,Driver要頻繁地跟Executor上運行的1000個task進行通訊,通訊數據很是多,而且通訊品類特別高。這就致使有可能在Spark任務運行過程當中,因爲頻繁大量的網絡通信,本地機器的網卡流量會激增。
注意,YARN-client模式只會在測試環境中使用,而之因此使用YARN-client模式,是因爲能夠看到詳細全面的log信息,經過查看log,能夠鎖定程序中存在的問題,避免在生產環境下發送故障。
在生產環境下,使用的必定是YARN-cluster模式。在YARN-cluster模式下,就不會形成本地機器網卡流量激增問題,若是YARN-cluster模式下存在網絡通訊的問題,須要運維團隊進行解決。
YARN-cluster模式的運行原理以下圖所示:
圖4-1 YARN-client模式運行原理
當Spark做業中包含SparkSQL的內容時,可能會碰到YARN-client模式下能夠運行,可是YARN-cluster模式下沒法提交運行(報出OOM錯誤)的狀況。
YARN-client模式下,Driver是運行在本地機器上的,Spark使用的JVM的PermGen的配置(JDK1.8以前),是本地機器上的spark-class文件,JVM永久代的大小是128MB,這個是沒有問題的,可是在YARN-cluster模式下,Driver運行在YARN集羣的某個節點上,使用的是沒有通過配置的默認設置,PermGen永久代大小爲82MB。
SparkSQL的內部要進行很複雜的SQL的語義解析、語法樹轉換等等,很是複雜,若是sql語句自己就很是複雜,那麼頗有可能會致使性能的損耗和內存的佔用,特別是對PermGen的佔用會比較大。
因此,此時若是PermGen的佔用好過了82MB,可是又小於128MB,就會出現YARN-client模式下能夠運行,YARN-cluster模式下沒法運行的狀況。
解決上述問題的方法時增長PermGen的容量,須要在spark-submit腳本中對相關參數進行設置,設置方法如代碼清單4-2所示。
代碼清單4-2 配置
--conf spark.driver.extraJavaOptions="-XX:PermSize=128M -XX:MaxPermSize=256M"
經過上述方法就設置了Driver永久代的大小,默認爲128MB,最大256MB,這樣就能夠避免上面所說的問題。
當SparkSQL的sql語句有成百上千的or關鍵字時,就可能會出現Driver端的JVM棧內存溢出。
JVM棧內存溢出基本上就是因爲調用的方法層級過多,產生了大量的,很是深的,超出了JVM棧深度限制的遞歸。(咱們猜想SparkSQL有大量or語句的時候,在解析SQL時,例如轉換爲語法樹或者進行執行計劃的生成的時候,對於or的處理是遞歸,or很是多時,會發生大量的遞歸)
此時,建議將一條sql語句拆分爲多條sql語句來執行,每條sql語句儘可能保證100個之內的子句。根據實際的生產環境試驗,一條sql語句的or關鍵字控制在100個之內,一般不會致使JVM棧內存溢出。
Spark持久化在大部分狀況下是沒有問題的,可是有時數據可能會丟失,若是數據一旦丟失,就須要對丟失的數據從新進行計算,計算完後再緩存和使用,爲了不數據的丟失,能夠選擇對這個RDD進行checkpoint,也就是將數據持久化一份到容錯的文件系統上(好比HDFS)。
一個RDD緩存並checkpoint後,若是一旦發現緩存丟失,就會優先查看checkpoint數據存不存在,若是有,就會使用checkpoint數據,而不用從新計算。也便是說,checkpoint能夠視爲cache的保障機制,若是cache失敗,就使用checkpoint的數據。
使用checkpoint的優勢在於提升了Spark做業的可靠性,一旦緩存出現問題,不須要從新計算數據,缺點在於,checkpoint時須要將數據寫入HDFS等文件系統,對性能的消耗較大。