當第一次對RDD2執行算子,獲取RDD3的時候,就會從RDD1開始計算,就是讀取HDFS文件,而後對RDD1執行算子,獲取到RDD2,而後再計算,獲得RDD3數組
默認狀況下,屢次對一個RDD執行算子,去獲取不一樣的RDD;都會對這個RDD以及以前的父RDD,所有從新計算一次;讀取HDFS->RDD1->RDD2-RDD4
這種狀況,是絕對絕對,必定要避免的,一旦出現一個RDD重複計算的狀況,就會致使性能急劇下降。緩存
好比,HDFS->RDD1-RDD2的時間是15分鐘,那麼此時就要走兩遍,變成30分鐘網絡
另一種狀況,從一個RDD到幾個不一樣的RDD,算子和計算邏輯實際上是徹底同樣的,結果由於人爲的疏忽,計算了屢次,獲取到了多個RDD。session
因此,建議採用如下方法能夠優化:數據結構
第一,RDD架構重構與優化
儘可能去複用RDD,差很少的RDD,能夠抽取稱爲一個共同的RDD,供後面的RDD計算時,反覆使用。架構
第二,公共RDD必定要實現持久化
持久化,也就是說,將RDD的數據緩存到內存中/磁盤中,(BlockManager),之後不管對這個RDD作多少次計算,那麼都是直接取這個RDD的持久化的數據,好比從內存中或者磁盤中,直接提取一份數據。
第三,持久化,是能夠進行序列化的
若是正常將數據持久化在內存中,那麼可能會致使內存的佔用過大,這樣的話,也許,會致使OOM內存溢出。
當純內存沒法支撐公共RDD數據徹底存放的時候,就優先考慮,使用序列化的方式在純內存中存儲。將RDD的每一個partition的數據,序列化成一個大的字節數組,就一個對象;序列化後,大大減小內存的空間佔用。
序列化的方式,惟一的缺點就是,在獲取數據的時候,須要反序列化。
若是序列化純內存方式,仍是致使OOM,內存溢出;就只能考慮磁盤的方式,內存+磁盤的普通方式(無序列化)。內存+磁盤,序列化。
第四,爲了數據的高可靠性,並且內存充足,可使用雙副本機制,進行持久化
持久化的雙副本機制,持久化後的一個副本,由於機器宕機了,副本丟了,就仍是得從新計算一次;持久化的每一個數據單元,存儲一份副本,放在其餘節點上面;從而進行容錯;一個副本丟了,不用從新計算,還可使用另一份副本。這種方式,僅僅針對你的內存資源極度充足。
sessionid2actionRDD = sessionid2actionRDD.persist(StorageLevel.MEMORY_ONLY());
/**
* 持久化,很簡單,就是對RDD調用persist()方法,並傳入一個持久化級別
*
* 若是是persist(StorageLevel.MEMORY_ONLY()),純內存,無序列化,那麼就能夠用cache()方法來替代
* StorageLevel.MEMORY_ONLY_SER(),第二選擇
* StorageLevel.MEMORY_AND_DISK(),第三選擇
* StorageLevel.MEMORY_AND_DISK_SER(),第四選擇
* StorageLevel.DISK_ONLY(),第五選擇
*
* 若是內存充足,要使用雙副本高可靠機制
* 選擇後綴帶_2的策略
* StorageLevel.MEMORY_ONLY_2()
*
*/
sessionid2actionRDD = sessionid2actionRDD.persist(StorageLevel.MEMORY_ONLY());
廣播變量,
其實就是SparkContext的broadcast()方法,傳入你要廣播的變量,便可
final Broadcast<Map<String, Map<String, IntList>>> broadcast = sc.broadcast(fastutilDateHourExtractMap);
使用廣播變量的時候,直接調用廣播變量(Broadcast類型)的value() / getValue() ,能夠獲取到以前封裝的廣播變量
Map<String, Map<String, IntList>> dateHourExtractMap = broadcast .value();
像隨機抽取的map,1M,舉例。還算小的。若是你是從哪一個表裏面讀取了一些維度數據,比方說,全部商品品類的信息,在某個算子函數中要使用到。100M。1000個task。100G的數據,網絡傳輸。集羣瞬間由於這個緣由消耗掉100G的內存。
這種默認的,task執行的算子中,使用了外部的變量,每一個task都會獲取一份變量的副本,有什麼缺點呢?在什麼狀況下,會出現性能上的惡劣的影響呢?map,自己是不小,存放數據的一個單位是Entry,還有可能會用鏈表的格式的來存放Entry鏈條。因此map是比較消耗內存的數據格式。
好比,map是1M。總共,你前面調優都調的特好,資源給的到位,配合着資源,並行度調節的絕對到位,1000個task。大量task的確都在並行運行。這些task裏面都用到了佔用1M內存的map,那麼首先,map會拷貝1000份副本,經過網絡傳輸到各個task中去,給task使用。總計有1G的數據,會經過網絡傳輸。網絡傳輸的開銷,不容樂觀啊!!!網絡傳輸,也許就會消耗掉你的spark做業運行的總時間的一小部分。
map副本,傳輸到了各個task上以後,是要佔用內存的。1個map的確不大,1M;1000個map分佈在你的集羣中,一會兒就耗費掉1G的內存。對性能會有什麼影響呢?沒必要要的內存的消耗和佔用,就致使了,你在進行RDD持久化到內存,也許就無法徹底在內存中放下;就只能寫入磁盤,最後致使後續的操做在磁盤IO上消耗性能;
你的task在建立對象的時候,也許會發現堆內存放不下全部對象,也許就會致使頻繁的垃圾回收器的回收,GC。GC的時候,必定是會致使工做線程中止,也就是致使Spark暫停工做那麼一點時間。頻繁GC的話,對Spark做業的運行的速度會有至關可觀的影響。
若是說,task使用大變量(1m~100m),明知道會致使性能出現惡劣的影響。那麼咱們怎麼來解決呢?
廣播,Broadcast,將大變量廣播出去。而不是直接使用。
廣播變量的好處,不是每一個task一份變量副本,而是變成每一個節點的executor才一份副本。這樣的話,就可讓變量產生的副本大大減小。
廣播變量,初始的時候,就在Drvier上有一份副本。task在運行的時候,想要使用廣播變量中的數據,此時首先會在本身本地的Executor對應的
BlockManager中,嘗試獲取變量副本;若是本地沒有,BlockManager,也許會從遠程的Driver上面去獲取變量副本;也有可能從距離比較近的其餘
節點的Executor的BlockManager上去獲取,並保存在本地的BlockManager中;BlockManager負責管理某個Executor對應的內存和磁盤上的數據,
此後這個executor上的task,都會直接使用本地的BlockManager中的副本。
好比,50個executor,1000個task。一個map,10M:
默認狀況下,1000個task,1000份副本。10G的數據,網絡傳輸,在集羣中,耗費10G的內存資源。
若是使用了廣播變量。50個execurtor,50個副本。500M的數據,網絡傳輸,並且不必定都是從Driver傳輸到每一個節點,還多是就近從最近的
節點的executor的bockmanager上拉取變量副本,網絡傳輸速度大大增長;500M的內存消耗。
在SparkConf中設置一個屬性,spark.serializer,org.apache.spark.serializer.KryoSerializer類;註冊你使用到的,須要經過Kryo序列化的,
一些自定義類,SparkConf.registerKryoClasses()
SparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(new Class[]{CategorySortKey.class})
Kryo之因此沒有被做爲默認的序列化類庫的緣由,就要出現了:主要是由於Kryo要求,若是要達到它的最佳性能的話,那麼就必定要註冊你自定義的類(好比,你的算子函數中使用到了外部自定義類型的對象變量,這時,就要求必須註冊你的類,不然Kryo達不到最佳性能)。
當使用了序列化的持久化級別時,在將每一個RDD partition序列化成一個大的字節數組時,就會使用Kryo進一步優化序列化的效率和性能。默認狀況下,Spark內部是使用Java的序列化機制,ObjectOutputStream / ObjectInputStream,對象輸入輸出流機制,來進行序列化。
這種默認序列化機制的好處在於,處理起來比較方便;也不須要咱們手動去作什麼事情,只是,你在算子裏面使用的變量,必須是實現Serializable接口的,可序列化便可。可是缺點在於,默認的序列化機制的效率不高,序列化的速度比較慢;序列化之後的數據,佔用的內存空間相對仍是比較大。
Spark支持使用Kryo序列化機制。Kryo序列化機制,比默認的Java序列化機制,速度要快,序列化後的數據要更小,大概是Java序列化機制的1/10
。因此Kryo序列化優化之後,可讓網絡傳輸的數據變少;在集羣中耗費的內存資源大大減小。在進行stage間的task的shuffle操做時,節點與節點之
間的task會互相大量經過網絡拉取和傳輸文件,此時,這些數據既然經過網絡傳輸,也是可能要序列化的,就會使用Kryo。
Kryo序列化機制,一旦啓用之後,會生效的幾個地方:
一、算子函數中使用到的外部變量,使用Kryo之後:優化網絡傳輸的性能,能夠優化集羣中內存的佔用和消耗
二、持久化RDD,StorageLevel.MEMORY_ONLY_SER優化內存的佔用和消耗;持久化RDD佔用的內存越少,task執行的時候,建立的對象,就
不至於頻繁的佔滿內存,頻繁發生GC。
三、shuffle:能夠優化網絡傳輸的性能。
本地化級別
PROCESS_LOCAL:進程本地化,代碼和數據在同一個進程中,也就是在同一個executor中;計算數據的task由executor執行,數據在executor的BlockManager中;性能最好
NODE_LOCAL:節點本地化,代碼和數據在同一個節點中;好比說,數據做爲一個HDFS block塊,就在節點上,而task在節點上某個executor中運行;或者是,數據和task在一個節點上的不一樣executor中;數據須要在進程間進行傳輸
NO_PREF:對於task來講,數據從哪裏獲取都同樣,沒有好壞之分
RACK_LOCAL:機架本地化,數據和task在一個機架的兩個節點上;數據須要經過網絡在節點之間進行傳輸
ANY:數據和task可能在集羣中的任何地方,並且不在一個機架中,性能最差
Spark.locality.wait,默認是3s
Spark在Driver上,對Application的每個stage的task,進行分配以前,都會計算出每一個task要計算的是哪一個分片數據,RDD的某個partition;Spark的task分配算法,優先,會但願每一個task正好分配到它要計算的數據所在的節點,這樣的話,就不用在網絡間傳輸數據;
可是可能task沒有機會分配到它的數據所在的節點,由於可能那個節點的計算資源和計算能力都滿了;因此呢,這種時候,一般來講,Spark會等待一段時間,默認狀況下是3s鍾(不是絕對的,還有不少種狀況,對不一樣的本地化級別,都會去等待),到最後,實在是等待不了了,就會選擇一個比較差的本地化級別,好比說,將task分配到靠它要計算的數據所在節點,比較近的一個節點,而後進行計算。
可是對於第二種狀況,一般來講,確定是要發生數據傳輸,task會經過其所在節點的BlockManager來獲取數據,BlockManager發現本身本地沒有數據,會經過一個getRemote()方法,經過TransferService(網絡數據傳輸組件)從數據所在節點的BlockManager中,獲取數據,經過網絡傳輸回task所在節點。
對於咱們來講,固然不但願是相似於第二種狀況的了。最好的,固然是task和數據在一個節點上,直接從本地executor的BlockManager中獲取數據,純內存,或者帶一點磁盤IO;若是要經過網絡傳輸數據的話,那麼實在是,性能確定會降低的,大量網絡傳輸,以及磁盤IO,都是性能的殺手。
時候要調節這個參數?
觀察日誌,spark做業的運行日誌,推薦你們在測試的時候,先用client模式,在本地就直接能夠看到比較全的日誌。
日誌裏面會顯示,starting task。。。,PROCESS LOCAL、NODE LOCAL,觀察大部分task的數據本地化級別。
若是大多都是PROCESS_LOCAL,那就不用調節了
若是是發現,好多的級別都是NODE_LOCAL、ANY,那麼最好就去調節一下數據本地化的等待時長
調節完,應該是要反覆調節,每次調節完之後,再來運行,觀察日誌
看看大部分的task的本地化級別有沒有提高;看看,整個spark做業的運行時間有沒有縮短
可是注意別本末倒置,本地化級別卻是提高了,可是由於大量的等待時長,spark做業的運行時間反而增長了,那就仍是不要調節了。
spark.locality.wait,默認是3s;能夠改爲6s,10s
默認狀況下,下面3個的等待時長,都是跟上面那個是同樣的,都是3sspark.locality.wait.processspark.locality.wait.nodespark.locality.wait.racknew SparkConf() .set("spark.locality.wait", "10")