spark 應用程序性能優化經驗

一 常規性能調優算法

1 . 分配更多資源數據庫

--num-executors 3 \  配置executor的數量數組

--driver-memory 100m \  配置driver的內存(影響不大)緩存

--executor-memory 100m \  配置每一個executor的內存大小網絡

--executor-cores 3 \  配置每一個executor的cpu core數量架構

增長每一個executor的內存量。增長了內存量之後,對性能的提高,有兩點:jvm

一、若是須要對RDD進行cache,那麼更多的內存,就能夠緩存更多的數據,將更少的數據寫入磁盤,甚至不寫入磁盤。減小了磁盤IO。ide

二、對於shuffle操做,reduce端,會須要內存來存放拉取的數據並進行聚合。若是內存不夠,也會寫入磁盤。若是給executor分配更多內存之後,就有更少的數據,須要寫入磁盤,甚至不須要寫入磁盤。減小了磁盤IO,提高了性能。函數

三、對於task的執行,可能會建立不少對象。若是內存比較小,可能會頻繁致使JVM堆內存滿了,而後頻繁GC,垃圾回收,minor GC和full GC。(速度很慢)。內存加大之後,帶來更少的GC,垃圾回收,避免了速度變慢,速度變快了。性能

2.調節並行度

很簡單的道理,只要合理設置並行度,就能夠徹底充分利用你的集羣計算資源,而且減小每一個task要處理的數據量,最終,就是提高你的整個Spark做業的性能和運行速度。


3.重構RDD架構以及RDD持久化

第一,RDD架構重構與優化 儘可能去複用RDD,差很少的RDD,能夠抽取稱爲一個共同的RDD,供後面的RDD計算時,反覆使用

第二,公共RDD必定要實現持久化 北方吃餃子,現包現煮。你人來了,要點一盤餃子。餡料+餃子皮+水->包好的餃子,對包好的餃子去煮,煮開了之後,纔有你須要的熟的,熱騰騰的餃子。 現實生活中,餃子現包現煮,固然是最好的了;可是Spark中,RDD要去「現包現煮」,那就是一場致命的災難。 對於要屢次計算和使用的公共RDD,必定要進行持久化。 持久化,也就是說,將RDD的數據緩存到內存中/磁盤中,(BlockManager),之後不管對這個RDD作多少次計算,那麼都是直接取這個RDD的持久化的數據,好比從內存中或者磁盤中,直接提取一份數據。

第三,持久化,是能夠進行序列化的 若是正常將數據持久化在內存中,那麼可能會致使內存的佔用過大,這樣的話,也許,會致使OOM內存溢出。 當純內存沒法支撐公共RDD數據徹底存放的時候,就優先考慮,使用序列化的方式在純內存中存儲。將RDD的每一個partition的數據,序列化成一個大的字節數組,就一個對象;序列化後,大大減小內存的空間佔用。 序列化的方式,惟一的缺點就是,在獲取數據的時候,須要反序列化。 若是序列化純內存方式,仍是致使OOM,內存溢出;就只能考慮磁盤的方式,內存+磁盤的普通方式(無序列化)。 內存+磁盤,序列化

第四,爲了數據的高可靠性,並且內存充足,可使用雙副本機制,進行持久化 持久化的雙副本機制,持久化後的一個副本,由於機器宕機了,副本丟了,就仍是得從新計算一次;持久化的每一個數據單元,存儲一份副本,放在其餘節點上面;從而進行容錯;一個副本丟了,不用從新計算,還可使用另一份副本。 這種方式,僅僅針對你的內存資源極度充足

4.廣播大變量

廣播變量,初始的時候,就在Drvier上有一份副本。 task在運行的時候,想要使用廣播變量中的數據,此時首先會在本身本地的Executor對應的BlockManager中,嘗試獲取變量副本;若是本地沒有,那麼就從Driver遠程拉取變量副本,並保存在本地的BlockManager中;此後這個executor上的task,都會直接使用本地的BlockManager中的副本。 executor的BlockManager除了從driver上拉取,也可能從其餘節點的BlockManager上拉取變量副本,舉例越近越好。

每一個 Executor一個副本,不必定每一個節點。

5.使用Kryo序列化

內存佔用,網絡傳輸

一、算子函數中使用到的外部變量

二、持久化RDD時進行序列化,StorageLevel.MEMORY_ONLY_SER

三、shuffle

一、算子函數中使用到的外部變量,使用Kryo之後:優化網絡傳輸的性能,能夠優化集羣中內存的佔用和消耗

二、持久化RDD,優化內存的佔用和消耗;持久化RDD佔用的內存越少,task執行的時候,建立的對象,就不至於頻繁的佔滿內存,頻繁發生GC。

三、shuffle:能夠優化網絡傳輸的性能

bbg

使用時,要自定義註冊類哦



6.使用 fastutil


7.數據本地化的等待時長

Spark在Driver上,對Application的每個stage的task,進行分配以前,都會計算出每一個task要計算的是哪一個分片數據,RDD的某個partition;Spark的task分配算法,優先,會但願每一個task正好分配到它要計算的數據所在的節點,這樣的話,就不用在網絡間傳輸數據; 可是呢,一般來講,有時,事與願違,可能task沒有機會分配到它的數據所在的節點,爲何呢,可能那個節點的計算資源和計算能力都滿了;因此呢,這種時候,一般來講,Spark會等待一段時間,默認狀況下是3s鍾(不是絕對的,還有不少種狀況,對不一樣的本地化級別,都會去等待),到最後,實在是等待不了了,就會選擇一個比較差的本地化級別,好比說,將task分配到靠它要計算的數據所在節點,比較近的一個節點,而後進行計算。 可是對於第二種狀況,一般來講,確定是要發生數據傳輸,task會經過其所在節點的BlockManager來獲取數據,BlockManager發現本身本地沒有數據,會經過一個getRemote()方法,經過TransferService(網絡數據傳輸組件)從數據所在節點的BlockManager中,獲取數據,經過網絡傳輸回task所在節點。 對於咱們來講,固然不但願是相似於第二種狀況的了。最好的,固然是task和數據在一個節點上,直接從本地executor的BlockManager中獲取數據,純內存,或者帶一點磁盤IO;若是要經過網絡傳輸數據的話,那麼實在是,性能確定會降低的,大量網絡傳輸,以及磁盤IO,都是性能的殺手。


二.JVM調優

JVM調優的第一個點:下降cache操做的內存佔比 spark中,堆內存又被劃分紅了兩塊兒,一起是專門用來給RDD的cache、persist操做進行RDD數據緩存用的;另一塊兒,就是咱們剛纔所說的,用來給spark算子函數的運行使用的,存放函數中本身建立的對象。 默認狀況下,給RDD cache操做的內存佔比,是0.6,60%的內存都給了cache操做了。可是問題是,若是某些狀況下,cache不是那麼的緊張,問題在於task算子函數中建立的對象過多,而後內存又不太大,致使了頻繁的minor gc,甚至頻繁full gc,致使spark頻繁的中止工做。性能影響會很大。 針對上述這種狀況,你們能夠在以前咱們講過的那個spark ui。yarn去運行的話,那麼就經過yarn的界面,去查看你的spark做業的運行統計,很簡單,你們一層一層點擊進去就好。能夠看到每一個stage的運行狀況,包括每一個task的運行時間、gc時間等等。若是發現gc太頻繁,時間太長。此時就能夠適當調價這個比例。 下降cache操做的內存佔比,大不了用persist操做,選擇將一部分緩存的RDD數據寫入磁盤,或者序列化方式,配合Kryo序列化類,減小RDD緩存的內存佔用;下降cache操做內存佔比;對應的,算子函數的內存佔比就提高了。這個時候,可能,就能夠減小minor gc的頻率,同時減小full gc的頻率。對性能的提高是有必定的幫助的。

一句話,讓task執行算子函數時,有更多的內存可使用。

spark.storage.memoryFraction,0.6 -> 0.5 -> 0.4 -> 0.2

--conf spark.yarn.executor.memoryOverhead=2048 spark-submit腳本里面,去用--conf的方式,去添加配置;必定要注意!!!切記,不是在你的spark做業代碼中,用new SparkConf().set()這種方式去設置,不要這樣去設置,是沒有用的!必定要在spark-submit腳本中去設置。 spark.yarn.executor.memoryOverhead(看名字,顧名思義,針對的是基於yarn的提交模式) 默認狀況下,這個堆外內存上限大概是300多M;後來咱們一般項目中,真正處理大數據的時候,這裏都會出現問題,致使spark做業反覆崩潰,沒法運行;此時就會去調節這個參數,到至少1G(1024M),甚至說2G、4G 一般這個參數調節上去之後,就會避免掉某些JVM OOM的異常問題,同時呢,會讓總體spark做業的性能,獲得較大的提高。

http://blog.csdn.net/hammertank/article/details/48346285

此時呢,就會沒有響應,沒法創建網絡鏈接;會卡住;ok,spark默認的網絡鏈接的超時時長,是60s;若是卡住60s都沒法創建鏈接的話,那麼就宣告失敗了。 碰到一種狀況,偶爾,偶爾,偶爾!!!沒有規律!!!某某file。一串file id。uuid(dsfsfd-2342vs--sdf--sdfsd)。not found。file lost。 這種狀況下,頗有多是有那份數據的executor在jvm gc。因此拉取數據的時候,創建不了鏈接。而後超過默認60s之後,直接宣告失敗。 報錯幾回,幾回都拉取不到數據的話,可能會致使spark做業的崩潰。也可能會致使DAGScheduler,反覆提交幾回stage。TaskScheduler,反覆提交幾回task。大大延長咱們的spark做業的運行時間。

實際案例腳本:

/usr/local/spark/bin/spark-submit \

--class com.ibeifeng.sparkstudy.WordCount \

--num-executors 80 \

--driver-memory 6g \

--executor-memory 6g \

--executor-cores 3 \

--master yarn-cluster \

--queue root.default \

--conf spark.yarn.executor.memoryOverhead=2048 \

--conf spark.core.connection.ack.wait.timeout=300 \

/usr/local/spark/spark.jar \

三.Shuffle調優

  1. new SparkConf().set("spark.shuffle.consolidateFiles", "true") 開啓shuffle map端輸出文件合併的機制;默認狀況下,是不開啓的,就是會發生如上所述的大量map端輸出文件的操做,嚴重影響性能。

  2. 增大map端溢寫的內存緩衝空間,減小溢寫次數。  spark.shuffle.file.buffer ,32k--》 64k

  3. 增大reduce端聚合內存,減小讀寫次數 spark.shuffle.memoryFraction ,0.2--》0.3 嘗試性的增長

  4. new SparkConf().set(" spark.shuffle.manager", " hash") hash(默認)sort(能夠排序)tungsten-sort鎢絲(1.5版本後纔有,不穩定)


四.spark操做調優(算子調優)

1.MapPartitions替代map操做,不過看具體操做,由於Maprtition容易致使OOM哦!

2.filter以後,數據容易傾斜,採用coalesce算子。主要就是用於在filter操做以後,針對每一個partition的數據量各不相同的狀況,來壓縮partition的數量。減小partition的數量,並且讓每一個partition的數據量都儘可能均勻緊湊。 從而便於後面的task進行計算操做,在某種程度上,可以必定程度的提高性能。

3.foreachPartition替代foreach,例如數據庫鏈接操做的時候,是很是好的。在實際生產環境,都是用這個,但數據量特別大,會有oom的可能。

4.repartition,SparkSQL的初始stage受限於hdfs的block數量限制。repartition算子,你用Spark SQL這一步的並行度和task數量,確定是沒有辦法去改變了。可是呢,能夠將你用Spark SQL查詢出來的RDD,使用repartition算子,去從新進行分區,此時能夠分區成多個partition,好比從20個partition,分區成100個。

5.reduceBykey,map 端本地聚合。

相關文章
相關標籤/搜索