spark性能調優

第一:提升並行度

並行度就是Spark做業中,各個stage的task數量,也就表明了Spark做業的在各個階段(stage)的並行度。

若是不調節並行度,致使並行度太低,會怎麼樣?
假設,如今已經在spark-submit腳本里面,給咱們的spark做業分配了足夠多的資源,好比50個executor,每一個executor有10G內存,每一個executor有3個cpu core。基本已經達到了集羣或者yarn隊列的資源上限。
task沒有設置,或者設置的不多,好比就設置了,100個task。50個executor,每一個executor有3個cpu core,也就是說,你的Application任何一個stage運行的時候,都有總數在150個cpu core,能夠並行運行。可是你如今,只有100個task,平均分配一下,每一個executor分配到2個task,ok,那麼同時在運行的task,只有100個,每一個executor只會並行運行2個task。每一個executor剩下的一個cpu core,就浪費掉了。
合理的並行度的設置,應該是要設置的足夠大,大到能夠徹底合理的利用你的集羣資源;好比上面的例子,總共集羣有150個cpu core,能夠並行運行150個task。那麼就應該將你的Application的並行度,至少設置成150,才能徹底有效的利用你的集羣資源,讓150個task,並行執行;並且task增長到150個之後,便可以同時並行運行,還可讓每一個task要處理的數據量變少;好比總共150G的數據要處理,若是是100個task,每一個task計算1.5G的數據;如今增長到150個task,能夠並行運行,並且每一個task主要處理1G的數據就能夠。
一、task數量,至少設置成與Spark application的總cpu core數量相同(最理想狀況,好比總共150個cpu core,分配了150個task,一塊兒運行,差很少同一時間運行完畢)。
二、官方是推薦,task數量,設置成spark application總cpu core數量的2~3倍,好比150個cpu core,基本要設置task數量爲300~500。

由於實際狀況,與理想狀況不一樣的,有些task會運行的快一點,好比50s就完了,有些task,可能會慢一點,要1分半才運行完,因此若是你的task數量,恰好設置的跟cpu core數量相同,可能仍是會致使資源的浪費,由於,好比150個task,10個先運行完了,剩餘140個還在運行,可是這個時候,有10個cpu core就空閒出來了,就致使了浪費。那若是task數量設置成cpu core總數的2~3倍,那麼一個task運行完了之後,另外一個task立刻能夠補上來,就儘可能讓cpu core不要空閒,同時也是儘可能提高spark做業運行的效率和速度,提高性能。
三、如何設置一個Spark Application的並行度?
spark.default.parallelism
SparkConf conf = new SparkConf()
conf.set("spark.default.parallelism", "500")
 

第二,資源分配優化node

Spark的分配資源主要就是 executor、cpu per executor、memory per executor、driver memory 等的調節,在咱們在生產環境中,提交spark做業時,用的spark-submit shell腳本,裏面調整對應的參數:算法

/usr/local/spark/bin/spark-submit \shell

--class cn.spark.sparktest.core.WordCountCluster \
--num-executors 3 \ 配置executor的數量
--driver-memory 100m \ 配置driver的內存(影響不大)
--executor-memory 100m \ 配置每一個executor的內存大小
--executor-cores 3 \ 配置每一個executor的cpu core數量
/usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar \

首先要了解你的機子的資源,多大的內存,多少個cpu core,就根據這個實際狀況去設置,能使用多少資源,就儘可能去調節到最大的大小(executor的數量,幾十個到上百個不等;executor內存;executor cpu core)。
Spark Standalone 模式下,若是每臺機器可用內存是4G,2個cpu core,20臺機器,那能夠設置:
20個executor;每一個executor4G內存2個cpu core。
yarn 模式下,根據spark要提交的資源隊列資源來考慮,若是所在隊列資源爲500G內存,100個cpu core,那能夠設置:
50個executor;每一個executor10G內存2個cpu core。
 
調節資源後,SparkContext,DAGScheduler,TaskScheduler,會將咱們的算子,切割成大量的task,提交到Application的executor上面去執行。
增長每一個executor的cpu core,也是增長了執行的並行能力。本來20個executor,每一個才2個cpu core。可以並行執行的task數量,就是40個task。
如今每一個executor的cpu core,增長到了5個。可以並行執行的task數量,就是100個task。執行的速度,提高了2.5倍。
若是executor數量比較少,那麼,可以並行執行的task數量就比較少,就意味着,咱們的Application的並行執行的能力就很弱。
好比有3個executor,每一個executor有2個cpu core,那麼同時可以並行執行的task,就是6個。6個執行完之後,再換下一批6個task。
增長了executor數量之後,那麼,就意味着,可以並行執行的task數量,也就變多了。好比原先是6個,如今可能能夠並行執行10個,甚至20個,100個。那麼並行能力就比以前提高了數倍,數十倍。相應的,性能(執行的速度),也能提高數倍~數十倍。
增長每一個executor的內存量。增長了內存量之後,對性能的提高,有兩點:
一、若是須要對RDD進行cache,那麼更多的內存,就能夠緩存更多的數據,將更少的數據寫入磁盤,甚至不寫入磁盤。減小了磁盤IO。
二、對於shuffle操做,reduce端,會須要內存來存放拉取的數據並進行聚合。若是內存不夠,也會寫入磁盤。若是給executor分配更多內存之後,就有更少的數據,須要寫入磁盤,甚至不須要寫入磁盤。減小了磁盤IO,提高了性能。
三、對於task的執行,可能會建立不少對象。若是內存比較小,可能會頻繁致使JVM堆內存滿了,而後頻繁GC,垃圾回收,minor GC和full GC。(速度很慢)。內存加大之後,帶來更少的GC,垃圾回收,避免了速度變慢,速度變快了。
 

第三,RDD持久化或緩存apache

當第一次對RDD2執行算子,獲取RDD3的時候,就會從RDD1開始計算,就是讀取HDFS文件,而後對RDD1執行算子,獲取到RDD2,而後再計算,獲得RDD3數組

默認狀況下,屢次對一個RDD執行算子,去獲取不一樣的RDD;都會對這個RDD以及以前的父RDD,所有從新計算一次;讀取HDFS->RDD1->RDD2-RDD4
這種狀況,是絕對絕對,必定要避免的,一旦出現一個RDD重複計算的狀況,就會致使性能急劇下降。緩存

好比,HDFS->RDD1-RDD2的時間是15分鐘,那麼此時就要走兩遍,變成30分鐘網絡

 另一種狀況,從一個RDD到幾個不一樣的RDD,算子和計算邏輯實際上是徹底同樣的,結果由於人爲的疏忽,計算了屢次,獲取到了多個RDD。session

因此,建議採用如下方法能夠優化:數據結構

 

第一,RDD架構重構與優化
儘可能去複用RDD,差很少的RDD,能夠抽取稱爲一個共同的RDD,供後面的RDD計算時,反覆使用。架構

 

第二,公共RDD必定要實現持久化

持久化,也就是說,將RDD的數據緩存到內存中/磁盤中,(BlockManager),之後不管對這個RDD作多少次計算,那麼都是直接取這個RDD的持久化的數據,好比從內存中或者磁盤中,直接提取一份數據。

 

第三,持久化,是能夠進行序列化的

若是正常將數據持久化在內存中,那麼可能會致使內存的佔用過大,這樣的話,也許,會致使OOM內存溢出。

當純內存沒法支撐公共RDD數據徹底存放的時候,就優先考慮,使用序列化的方式在純內存中存儲。將RDD的每一個partition的數據,序列化成一個大的字節數組,就一個對象;序列化後,大大減小內存的空間佔用。

序列化的方式,惟一的缺點就是,在獲取數據的時候,須要反序列化。

若是序列化純內存方式,仍是致使OOM,內存溢出;就只能考慮磁盤的方式,內存+磁盤的普通方式(無序列化)。內存+磁盤,序列化。

 

第四,爲了數據的高可靠性,並且內存充足,可使用雙副本機制,進行持久化

持久化的雙副本機制,持久化後的一個副本,由於機器宕機了,副本丟了,就仍是得從新計算一次;持久化的每一個數據單元,存儲一份副本,放在其餘節點上面;從而進行容錯;一個副本丟了,不用從新計算,還可使用另一份副本。這種方式,僅僅針對你的內存資源極度充足。

  

sessionid2actionRDD = sessionid2actionRDD.persist(StorageLevel.MEMORY_ONLY());

/**
* 持久化,很簡單,就是對RDD調用persist()方法,並傳入一個持久化級別
*
* 若是是persist(StorageLevel.MEMORY_ONLY()),純內存,無序列化,那麼就能夠用cache()方法來替代
* StorageLevel.MEMORY_ONLY_SER(),第二選擇
* StorageLevel.MEMORY_AND_DISK(),第三選擇
* StorageLevel.MEMORY_AND_DISK_SER(),第四選擇
* StorageLevel.DISK_ONLY(),第五選擇
*
* 若是內存充足,要使用雙副本高可靠機制
* 選擇後綴帶_2的策略
* StorageLevel.MEMORY_ONLY_2()
*
*/
sessionid2actionRDD = sessionid2actionRDD.persist(StorageLevel.MEMORY_ONLY());
 

第四:使用廣播變量

廣播變量, 其實就是SparkContext的broadcast()方法,傳入你要廣播的變量,便可
final Broadcast<Map<String, Map<String, IntList>>> broadcast = sc.broadcast(fastutilDateHourExtractMap);
使用廣播變量的時候,直接調用廣播變量(Broadcast類型)的value() / getValue() ,能夠獲取到以前封裝的廣播變量
Map<String, Map<String, IntList>> dateHourExtractMap = broadcast .value();

像隨機抽取的map,1M,舉例。還算小的。若是你是從哪一個表裏面讀取了一些維度數據,比方說,全部商品品類的信息,在某個算子函數中要使用到。100M。1000個task。100G的數據,網絡傳輸。集羣瞬間由於這個緣由消耗掉100G的內存。
這種默認的,task執行的算子中,使用了外部的變量,每一個task都會獲取一份變量的副本,有什麼缺點呢?在什麼狀況下,會出現性能上的惡劣的影響呢?map,自己是不小,存放數據的一個單位是Entry,還有可能會用鏈表的格式的來存放Entry鏈條。因此map是比較消耗內存的數據格式。
好比,map是1M。總共,你前面調優都調的特好,資源給的到位,配合着資源,並行度調節的絕對到位,1000個task。大量task的確都在並行運行。這些task裏面都用到了佔用1M內存的map,那麼首先,map會拷貝1000份副本,經過網絡傳輸到各個task中去,給task使用。總計有1G的數據,會經過網絡傳輸。網絡傳輸的開銷,不容樂觀啊!!!網絡傳輸,也許就會消耗掉你的spark做業運行的總時間的一小部分。
map副本,傳輸到了各個task上以後,是要佔用內存的。1個map的確不大,1M;1000個map分佈在你的集羣中,一會兒就耗費掉1G的內存。對性能會有什麼影響呢?沒必要要的內存的消耗和佔用,就致使了,你在進行RDD持久化到內存,也許就無法徹底在內存中放下;就只能寫入磁盤,最後致使後續的操做在磁盤IO上消耗性能;
你的task在建立對象的時候,也許會發現堆內存放不下全部對象,也許就會致使頻繁的垃圾回收器的回收,GC。GC的時候,必定是會致使工做線程中止,也就是致使Spark暫停工做那麼一點時間。頻繁GC的話,對Spark做業的運行的速度會有至關可觀的影響。

若是說,task使用大變量(1m~100m),明知道會致使性能出現惡劣的影響。那麼咱們怎麼來解決呢?
廣播,Broadcast,將大變量廣播出去。而不是直接使用。
 
廣播變量的好處,不是每一個task一份變量副本,而是變成每一個節點的executor才一份副本。這樣的話,就可讓變量產生的副本大大減小。
廣播變量,初始的時候,就在Drvier上有一份副本。task在運行的時候,想要使用廣播變量中的數據,此時首先會在本身本地的Executor對應的
BlockManager中,嘗試獲取變量副本;若是本地沒有,BlockManager,也許會從遠程的Driver上面去獲取變量副本;也有可能從距離比較近的其餘
節點的Executor的BlockManager上去獲取,並保存在本地的BlockManager中;BlockManager負責管理某個Executor對應的內存和磁盤上的數據,
此後這個executor上的task,都會直接使用本地的BlockManager中的副本。

好比,50個executor,1000個task。一個map,10M:
默認狀況下,1000個task,1000份副本。10G的數據,網絡傳輸,在集羣中,耗費10G的內存資源。
若是使用了廣播變量。50個execurtor,50個副本。500M的數據,網絡傳輸,並且不必定都是從Driver傳輸到每一個節點,還多是就近從最近的
節點的executor的bockmanager上拉取變量副本,網絡傳輸速度大大增長;500M的內存消耗。
 

第五:使用Kryo序列化

在SparkConf中設置一個屬性,spark.serializer,org.apache.spark.serializer.KryoSerializer類;註冊你使用到的,須要經過Kryo序列化的,
一些自定義類,SparkConf.registerKryoClasses()
SparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(new Class[]{CategorySortKey.class})
Kryo之因此沒有被做爲默認的序列化類庫的緣由,就要出現了:主要是由於Kryo要求,若是要達到它的最佳性能的話,那麼就必定要註冊你自定義的類(好比,你的算子函數中使用到了外部自定義類型的對象變量,這時,就要求必須註冊你的類,不然Kryo達不到最佳性能)。

當使用了序列化的持久化級別時,在將每一個RDD partition序列化成一個大的字節數組時,就會使用Kryo進一步優化序列化的效率和性能。默認狀況下,Spark內部是使用Java的序列化機制,ObjectOutputStream / ObjectInputStream,對象輸入輸出流機制,來進行序列化。
這種默認序列化機制的好處在於,處理起來比較方便;也不須要咱們手動去作什麼事情,只是,你在算子裏面使用的變量,必須是實現Serializable接口的,可序列化便可。可是缺點在於,默認的序列化機制的效率不高,序列化的速度比較慢;序列化之後的數據,佔用的內存空間相對仍是比較大。
Spark支持使用Kryo序列化機制。Kryo序列化機制,比默認的Java序列化機制,速度要快,序列化後的數據要更小,大概是Java序列化機制的1/10
 
 
。因此Kryo序列化優化之後,可讓網絡傳輸的數據變少;在集羣中耗費的內存資源大大減小。在進行stage間的task的shuffle操做時,節點與節點之
間的task會互相大量經過網絡拉取和傳輸文件,此時,這些數據既然經過網絡傳輸,也是可能要序列化的,就會使用Kryo。
 
Kryo序列化機制,一旦啓用之後,會生效的幾個地方:
一、算子函數中使用到的外部變量,使用Kryo之後:優化網絡傳輸的性能,能夠優化集羣中內存的佔用和消耗
二、持久化RDD,StorageLevel.MEMORY_ONLY_SER優化內存的佔用和消耗;持久化RDD佔用的內存越少,task執行的時候,建立的對象,就
不至於頻繁的佔滿內存,頻繁發生GC。
三、shuffle:能夠優化網絡傳輸的性能。
 

第六:數據本地化

本地化級別

PROCESS_LOCAL:進程本地化,代碼和數據在同一個進程中,也就是在同一個executor中;計算數據的task由executor執行,數據在executor的BlockManager中;性能最好

NODE_LOCAL:節點本地化,代碼和數據在同一個節點中;好比說,數據做爲一個HDFS block塊,就在節點上,而task在節點上某個executor中運行;或者是,數據和task在一個節點上的不一樣executor中;數據須要在進程間進行傳輸
NO_PREF:對於task來講,數據從哪裏獲取都同樣,沒有好壞之分
RACK_LOCAL:機架本地化,數據和task在一個機架的兩個節點上;數據須要經過網絡在節點之間進行傳輸
ANY:數據和task可能在集羣中的任何地方,並且不在一個機架中,性能最差


Spark.locality.wait,默認是3s

 

Spark在Driver上,對Application的每個stage的task,進行分配以前,都會計算出每一個task要計算的是哪一個分片數據,RDD的某個partition;Spark的task分配算法,優先,會但願每一個task正好分配到它要計算的數據所在的節點,這樣的話,就不用在網絡間傳輸數據;

可是可能task沒有機會分配到它的數據所在的節點,由於可能那個節點的計算資源和計算能力都滿了;因此呢,這種時候,一般來講,Spark會等待一段時間,默認狀況下是3s鍾(不是絕對的,還有不少種狀況,對不一樣的本地化級別,都會去等待),到最後,實在是等待不了了,就會選擇一個比較差的本地化級別,好比說,將task分配到靠它要計算的數據所在節點,比較近的一個節點,而後進行計算。

可是對於第二種狀況,一般來講,確定是要發生數據傳輸,task會經過其所在節點的BlockManager來獲取數據,BlockManager發現本身本地沒有數據,會經過一個getRemote()方法,經過TransferService(網絡數據傳輸組件)從數據所在節點的BlockManager中,獲取數據,經過網絡傳輸回task所在節點。

對於咱們來講,固然不但願是相似於第二種狀況的了。最好的,固然是task和數據在一個節點上,直接從本地executor的BlockManager中獲取數據,純內存,或者帶一點磁盤IO;若是要經過網絡傳輸數據的話,那麼實在是,性能確定會降低的,大量網絡傳輸,以及磁盤IO,都是性能的殺手。

 

時候要調節這個參數?

觀察日誌,spark做業的運行日誌,推薦你們在測試的時候,先用client模式,在本地就直接能夠看到比較全的日誌。
日誌裏面會顯示,starting task。。。,PROCESS LOCAL、NODE LOCAL,觀察大部分task的數據本地化級別。

若是大多都是PROCESS_LOCAL,那就不用調節了
若是是發現,好多的級別都是NODE_LOCAL、ANY,那麼最好就去調節一下數據本地化的等待時長
調節完,應該是要反覆調節,每次調節完之後,再來運行,觀察日誌
看看大部分的task的本地化級別有沒有提高;看看,整個spark做業的運行時間有沒有縮短

 

可是注意別本末倒置,本地化級別卻是提高了,可是由於大量的等待時長,spark做業的運行時間反而增長了,那就仍是不要調節了。

spark.locality.wait,默認是3s;能夠改爲6s,10s

默認狀況下,下面3個的等待時長,都是跟上面那個是同樣的,都是3sspark.locality.wait.processspark.locality.wait.nodespark.locality.wait.racknew SparkConf()  .set("spark.locality.wait", "10")

相關文章
相關標籤/搜索