一段程序只能完成功能是沒有用的,只能可以穩定、高效率地運行纔是生成環境所須要的。html
本篇記錄了Spark各個角度的調優技巧,以備不時之需。算法
額。。。從最基本的開始講,可能一些剛接觸Spark的人不是很清楚Spark的一些參數變量到底要配置在哪裏。數據庫
能夠經過三種方式配置參數,任選其一皆可。apache
值得一提的是一個略顯詭異的現象,有些參數在spark-env.sh中配置並不起做用,反而要在程序中設置纔有效果。markdown
Spark的參數不少,一些默認的設置能夠參考官網推薦的配置參數:/docs/latest/configuration.html網絡
能夠經過如下幾種方式來觀察Spark集羣的狀態和相關性能問題:app
前景交代完畢,下面進入正題:分佈式
一、小分區合併的問題函數
因爲程序中過分使用filter算子或者使用不當,都會形成大量的小分區出現。
由於每次過濾獲得的結果只有原來數據集的一小部分,而這些量很小的數據一樣會以必定的分區數並行化分配到各個節點中執行。工具
帶來的問題就是:任務處理的數據量很小,反覆地切換任務所消耗的資源反而會帶來很大的系統開銷。
解決方案:使用重分區函數coalesce進行數據緊縮、減小分區數並設置shuffle=true保證任務是並行計算的
減小分區數,雖然意味着並行度下降,可是相對比以前的大量小任務過分切換的消耗,倒是比較值得的。
這裏也能夠直接使用repartition重分區函數進行操做,由於其底層使用的是coalesce並設置Shuffle=true
二、數據傾斜問題
這是一個生產環境中常常遇到的問題,典型的場景是:大量的數據被分配到小部分節點計算,而其餘大部分節點卻只計算小部分數據。
問題產生的緣由有不少,可能且不所有包括:
可選的解決方案有:
還有一種場景是任務執行速度傾斜問題:集羣中其餘節點都計算完畢了,可是隻有少數幾個節點死活運行不完。(其實這和上面的那個場景是差很少的)
解決方案:
三、並行度調整
官方推薦每一個CPU CORE分配2-3個任務。
Spark會根據文件大小默認配置Map階段的任務數,因此咱們可以自行調整的就是Reduce階段的分區數了。
四、DAG調度執行優化
DAG圖是Spark計算的基本依賴,因此建議:
儘量地在Transformation算子中完成對數據的計算,由於過多的Action算子會產生不少多餘的Shuffle,在劃分DAG圖時會造成衆多Stage。
一、大任務分發問題
Spark採用Akka的Actor模型來進行消息傳遞,包括數據、jar包和相關文件等。
而Akka消息通訊傳遞默認的容量最大爲10M,一旦傳遞的消息超過這個限制就會出現這樣的錯誤:
Worker任務失敗後Master上會打印「Lost TID:」
根據這個信息找到對應的Worker節點後查看SparkHome/work/目錄下的日誌,查看Serialized size of result是否超過10M,就能夠知道是否是Akka這邊的問題了。
一旦確認是Akka通訊容量限制以後,就能夠經過配置spark.akka.frameSize控制Akka通訊消息的最大容量。
二、Broadcast在調優場景的使用
Broadcast廣播,主要是用於共享Spark每一個Task都會用到的一些只讀變量。
對於那些每一個Task都會用到的變量來講,若是每一個Task都爲這些變量分配內存空間顯然會使用不少多餘的資源,使用廣播能夠有效的避免這個問題,廣播以後,這些變量僅僅會在每臺機器上保存一份,有Task須要使用時就到本身的機器上讀取就ok。
官方推薦,Task大於20k時可使用,能夠在控制檯上看Task的大小。
三、Collect結果過大的問題
大量數據時將數據存儲在HDFS上或者其餘,不是大量數據,可是超出Akka傳輸的Buffer大小,經過配置spark.akka.frameSize調整。
一、經過序列化手段優化
序列化以前說過,好處多多,因此是推薦能用就用,Spark上的序列化方式有幾種,具體的能夠參考官方文檔。
這裏只簡單介紹一下Kryo。
配置參數的時候使用spark.serializer=」org.apache.spark.serializer.KryoSerializer」配置
自定義定義能夠被Kryo序列化的類的步驟:
二、經過壓縮手段優化
Spark的Job大體能夠分爲兩種:
對於I/O密集型的Job,能壓縮就壓縮,由於讀磁盤的時候數據壓縮了,佔用空間小了,讀取速度不就快了。
對於CPU密集型的Job,看具體CPU使用狀況再作決定,由於使用壓縮是須要消耗一些CPU資源的,若是當前CPU已經超負荷了,再使用壓縮反而拔苗助長。
Spark支持兩種壓縮算法:
一些壓縮相關的參數配置:
一、對外部資源的批處理操做
如操做數據庫時,每一個分區的數據應該一塊兒執行一次批處理,而不是一條數據寫一次,即map=>mapPartition。
二、reduce和reduceByKey
reduce:內部調用了runJob方法,是一個action操做。
reduceByKey:內部只是調用了combineBykey,是Transformation操做。
大量的數據操做時,reduce彙總全部數據到主節點會有性能瓶頸,將數據轉換爲Key-Value的形式使用reduceByKey實現邏輯,會作相似mr程序中的Combiner的操做,Transformation操做分佈式進行。
三、Shuffle操做符的內存使用
使用會觸發Shuffle過程的操做符時,操做的數據集合太大形成OOM,每一個任務執行過程當中會在各自的內存建立Hash表來進行數據分組。
能夠解決的方案可能有: