Spark(十二)--性能調優篇

一段程序只能完成功能是沒有用的,只能可以穩定、高效率地運行纔是生成環境所須要的。html

本篇記錄了Spark各個角度的調優技巧,以備不時之需。算法

1、配置參數的方式和觀察性能的方式

額。。。從最基本的開始講,可能一些剛接觸Spark的人不是很清楚Spark的一些參數變量到底要配置在哪裏。數據庫

能夠經過三種方式配置參數,任選其一皆可。apache

  1. spark-env.sh文件中配置:最近常使用的配置方式,格式能夠參考其中的一些官方保留的配置。
  2. 程序中經過SparkConf配置:經過SparkConf對象set方法設置鍵值對,比較直觀。
  3. 程序中經過System.setProperty配置:和方法二差很少。

值得一提的是一個略顯詭異的現象,有些參數在spark-env.sh中配置並不起做用,反而要在程序中設置纔有效果。markdown

Spark的參數不少,一些默認的設置能夠參考官網推薦的配置參數:/docs/latest/configuration.html網絡

能夠經過如下幾種方式來觀察Spark集羣的狀態和相關性能問題:app

  1. Web UI:即8088端口進入的UI界面。
  2. Driver程序日誌:根據程序提交方式的不一樣到指定的節點上觀察Driver程序日誌。
  3. logs文件夾下的日誌:Spark集羣的大部分信息都會記錄在這裏。
  4. works文件夾下的日誌:主要記錄Work節點的信息。
  5. Profiler工具:沒有使用過。

前景交代完畢,下面進入正題:分佈式

2、調度與分區優化

一、小分區合併的問題函數

因爲程序中過分使用filter算子或者使用不當,都會形成大量的小分區出現。
由於每次過濾獲得的結果只有原來數據集的一小部分,而這些量很小的數據一樣會以必定的分區數並行化分配到各個節點中執行。工具

帶來的問題就是:任務處理的數據量很小,反覆地切換任務所消耗的資源反而會帶來很大的系統開銷。

解決方案:使用重分區函數coalesce進行數據緊縮、減小分區數並設置shuffle=true保證任務是並行計算的

減小分區數,雖然意味着並行度下降,可是相對比以前的大量小任務過分切換的消耗,倒是比較值得的。

這裏也能夠直接使用repartition重分區函數進行操做,由於其底層使用的是coalesce並設置Shuffle=true

二、數據傾斜問題

這是一個生產環境中常常遇到的問題,典型的場景是:大量的數據被分配到小部分節點計算,而其餘大部分節點卻只計算小部分數據。

問題產生的緣由有不少,可能且不所有包括:

  • key的數據分佈不均勻
  • 業務數據自己緣由
  • 結構化表設計問題
  • 某些SQL語句會形成數據傾斜

可選的解決方案有:

  1. 增大任務數,減小分區數量:這種方法和解決小分區問題相似。
  2. 對特殊的key進行處理,如空值等:直接過濾掉空值的key以避免對任務產生干擾。
  3. 使用廣播:小數據量直接廣播,大數據量先拆分以後再進行廣播。

還有一種場景是任務執行速度傾斜問題:集羣中其餘節點都計算完畢了,可是隻有少數幾個節點死活運行不完。(其實這和上面的那個場景是差很少的)

解決方案:

  • 設置spark.speculation=true將執行事件過長的節點去掉,從新分配任務
  • spark.speculation.interval用來設置執行間隔

三、並行度調整

官方推薦每一個CPU CORE分配2-3個任務。

  • 任務數太多:並行度過高,產生大量的任務啓動和切換開銷。
  • 任務數過低:並行度太低,沒法發揮集羣並行計算能力,任務執行慢

Spark會根據文件大小默認配置Map階段的任務數,因此咱們可以自行調整的就是Reduce階段的分區數了。

  • reduceByKey等操做時經過numPartitions參數進行分區數量配置。
  • 經過spark.default.parallelism進行默認分區數配置。

四、DAG調度執行優化

DAG圖是Spark計算的基本依賴,因此建議:

  1. 同一個Stage儘可能容納更多地算子,防止多餘的Shuffle。
  2. 複用已經cache的數據。

儘量地在Transformation算子中完成對數據的計算,由於過多的Action算子會產生不少多餘的Shuffle,在劃分DAG圖時會造成衆多Stage。

3、網絡傳輸優化

一、大任務分發問題

Spark採用Akka的Actor模型來進行消息傳遞,包括數據、jar包和相關文件等。

而Akka消息通訊傳遞默認的容量最大爲10M,一旦傳遞的消息超過這個限制就會出現這樣的錯誤:

Worker任務失敗後Master上會打印「Lost TID:」

根據這個信息找到對應的Worker節點後查看SparkHome/work/目錄下的日誌,查看Serialized size of result是否超過10M,就能夠知道是否是Akka這邊的問題了。

一旦確認是Akka通訊容量限制以後,就能夠經過配置spark.akka.frameSize控制Akka通訊消息的最大容量。

二、Broadcast在調優場景的使用

Broadcast廣播,主要是用於共享Spark每一個Task都會用到的一些只讀變量。

對於那些每一個Task都會用到的變量來講,若是每一個Task都爲這些變量分配內存空間顯然會使用不少多餘的資源,使用廣播能夠有效的避免這個問題,廣播以後,這些變量僅僅會在每臺機器上保存一份,有Task須要使用時就到本身的機器上讀取就ok。

官方推薦,Task大於20k時可使用,能夠在控制檯上看Task的大小。

三、Collect結果過大的問題

大量數據時將數據存儲在HDFS上或者其餘,不是大量數據,可是超出Akka傳輸的Buffer大小,經過配置spark.akka.frameSize調整。

4、序列化與壓縮

一、經過序列化手段優化

序列化以前說過,好處多多,因此是推薦能用就用,Spark上的序列化方式有幾種,具體的能夠參考官方文檔。

這裏只簡單介紹一下Kryo。

配置參數的時候使用spark.serializer=」org.apache.spark.serializer.KryoSerializer」配置

自定義定義能夠被Kryo序列化的類的步驟:

  1. 自定義類extends KryoRegistrator
  2. 設置序列化方式conf.set(「spark.serializer」,」org.apache.spark.serializer.KryoSerializer」)
  3. conf.set(「spark.kyro.registrator」,」自定義的class」)
  4. 若是對象佔用空間大,須要增長Kryo的緩衝區則配置spark.kryoserializer.buffer.mb上值默認爲2M

二、經過壓縮手段優化

Spark的Job大體能夠分爲兩種:

  • I/O密集型:即存在大量讀取磁盤的操做。
  • CPU密集型:即存在大量的數據計算,使用CPU資源較多。

對於I/O密集型的Job,能壓縮就壓縮,由於讀磁盤的時候數據壓縮了,佔用空間小了,讀取速度不就快了。

對於CPU密集型的Job,看具體CPU使用狀況再作決定,由於使用壓縮是須要消耗一些CPU資源的,若是當前CPU已經超負荷了,再使用壓縮反而拔苗助長。

Spark支持兩種壓縮算法:

  • LZF:高壓縮比
  • Snappy:高速度

一些壓縮相關的參數配置:

  1. spark.broadcast.compress:推薦爲true
  2. spark.rdd.compress:默認爲false,看狀況配置,壓縮花費一些時間,可是能夠節省大量內存空間
  3. spark.io.compression.codec:org.apache.spark.io.LZFCompressionCodec根據狀況選擇壓縮算法
  4. spark.io.compressions.snappy.block.size:設置Snappy壓縮的塊大小

5、其餘優化方式

一、對外部資源的批處理操做

如操做數據庫時,每一個分區的數據應該一塊兒執行一次批處理,而不是一條數據寫一次,即map=>mapPartition。

二、reduce和reduceByKey

reduce:內部調用了runJob方法,是一個action操做。
reduceByKey:內部只是調用了combineBykey,是Transformation操做。

大量的數據操做時,reduce彙總全部數據到主節點會有性能瓶頸,將數據轉換爲Key-Value的形式使用reduceByKey實現邏輯,會作相似mr程序中的Combiner的操做,Transformation操做分佈式進行。

三、Shuffle操做符的內存使用

使用會觸發Shuffle過程的操做符時,操做的數據集合太大形成OOM,每一個任務執行過程當中會在各自的內存建立Hash表來進行數據分組。

能夠解決的方案可能有:

  • 增長並行度即分區數能夠適當解決問題
  • 能夠將任務數量擴展到超過集羣總體的CPU core數
相關文章
相關標籤/搜索