並行度:其實就是指的是,Spark做業中,各個stage的task數量,也就表明了Spark做業的在各個階段(stage)的並行度。java
假設,如今已經在spark-submit腳本里面,給咱們的spark做業分配了足夠多的資源,好比50個executor,每一個executor有10G內存,每一個executor有3個cpu core。基本已經達到了集羣或者yarn隊列的資源上限。
可是task沒有設置,或者設置的不多,好比就設置了,100個task。50個executor,每一個executor有3個cpu core,也就是說,你的Application任何一個stage運行的時候,都有總數在150個cpu core,能夠並行運行。可是你如今,只有100個task,平均分配一下,每一個executor分配到2個task,ok,那麼同時在運行的task,只有100個,每一個executor只會並行運行2個task。每一個executor剩下的一個cpu core,就浪費掉了。git
官方推薦,task數量設置成spark application總cpu core數量的2~3倍,好比150個cpu core,基本要設置task數量爲300~500github
如何設置一個Spark Application的並行度算法
SparkConf conf = new SparkConf() .set("spark.default.parallelism", "500")
RDD架構重構與優化
儘可能去複用RDD,差很少的RDD,能夠抽取稱爲一個共同的RDD,供後面的RDD計算時,反覆使用。apache
公共RDD必定要實現持久化
對於要屢次計算和使用的公共RDD,必定要進行持久化。數組
持久化,是能夠進行序列化的
若是正常將數據持久化在內存中,那麼可能會致使內存的佔用過大,這樣的話,也許,會致使OOM內存溢出。
當純內存沒法支撐公共RDD數據徹底存放的時候,就優先考慮,使用序列化的方式在純內存中存儲。將RDD的每一個partition的數據,序列化成一個大的字節數組,就一個對象;序列化後,大大減小內存的空間佔用。
若是序列化純內存方式,仍是致使OOM,內存溢出;就只能考慮磁盤的方式,內存+磁盤的普通方式(無序列化)。
缺點:在獲取數據的時候須要反序列化緩存
數據的高可靠性,
在內存資源很充沛的狀況下,能夠持久化一個副本網絡
而每一個task在處理變量的時候,都會拷貝一份變量的副本,若是變量很大的話,就會耗費不少內存。這時能夠採用廣播變量的方式,把這個變量廣播出去,由於廣播變量只在每一個節點的Executor才一份副本
廣播變量在初始的時候,就只在Driver上有一份。task在運行的時候,想要使用廣播變量中的數據,此時首先會在本身本地的Executor對應的BlockManager中,嘗試獲取變量副本;若是本地沒有,那麼就從Driver遠程拉取變量副本,並保存在本地的BlockManager中;此後這個executor上的task,都會直接使用本地的BlockManager中的副本。
executor的BlockManager除了從driver上拉取,也可能從其餘節點的BlockManager上拉取變量副本,總之越近越好。session
默認狀況下,Spark內部是使用Java的序列化機制,ObjectOutputStream/ObjectInputStream,對象輸入輸出流機制,來進行序列化。
優勢:處理起來比較方便,只是在算子裏面使用的變量,必須是實現Serializable接口的。
缺點:默認的序列化機制的效率不高,序列化的速度比較慢;序列化之後的數據,佔用的內存空間相對仍是比較大。
Kryo序列化機制,比默認的Java序列化機制,速度要快,序列化後的數據要更小,大概是Java序列化機制的1/10。
Kryo序列化機制,一旦啓用之後,會生效的幾個地方:架構
使用Kryo序列化步驟:
fastutil是擴展了Java標準集合框架(Map、List、Set;HashMap、ArrayList、HashSet)的類庫,提供了特殊類型的map、set、list和queue;
fastutil可以提供更小的內存佔用,更快的存取速度;咱們使用fastutil提供的集合類,來替代本身平時使用的JDK的原生的Map、List、Set,好處在於,fastutil集合類,能夠減少內存的佔用,而且在進行集合的遍歷、根據索引(或者key)獲取元素的值和設置元素的值的時候,提供更快的存取速度;
fastutil也提供了64位的array、set和list,以及高性能快速的,以及實用的IO類,來處理二進制和文本類型的文件
fastutil的每一種集合類型,都實現了對應的Java中的標準接口(好比fastutil的map,實現了Java的Map接口),所以能夠直接放入已有系統的任何代碼中。
fastutil還提供了一些JDK標準類庫中沒有的額外功能(好比雙向迭代器)。
fastutil除了對象和原始類型爲元素的集合,fastutil也提供引用類型的支持,可是對引用類型是使用等於號(=)進行比較的,而不是equals()方法。
若是算子函數使用了外部變量;那麼第一,你可使用Broadcast廣播變量優化;第二,可使用Kryo序列化類庫,提高序列化性能和效率;第三,若是外部變量是某種比較大的集合,那麼能夠考慮使用fastutil改寫外部變量,首先從源頭上就減小內存的佔用,經過廣播變量進一步減小內存佔用,再經過Kryo序列化類庫進一步減小內存佔用。
在你的算子函數裏,若是要建立比較大的Map、List等集合,可能會佔用較大的內存空間,並且可能涉及到消耗性能的遍歷、存取等集合操做;那麼此時,能夠考慮將這些集合類型使用fastutil類庫重寫,使用了fastutil集合類之後,就能夠在必定程度上,減小task建立出來的集合類型的內存佔用。避免executor內存頻繁佔滿,頻繁喚起GC,致使性能降低。
<dependency> <groupId>it.unimi.dsi</groupId> <artifactId>fastutil</artifactId> <version>7.0.6</version> </dependency>
UserVisitSessionAnalyzeSpark.java中831行有示例。
PROCESS_LOCAL > NODE_LOCAL > NO_PREF > RACK_LOCAL > ANY
Spark要對任務(task)進行分配的時候, 會計算出每一個task要計算的是哪一個分片的數據(partition),Spark的task分配算法,會按照上面的順序來進行分配。
可能PROCESS_LOCAL節點的計算資源和計算能力都滿了;Spark會等待一段時間,默認狀況下是3s鍾(不是絕對的,還有不少種狀況,對不一樣的本地化級別,都會去等待),到最後,就會選擇一個比較差的本地化級別,好比說,將task分配到靠它要計算的數據所在節點,比較近的一個節點,而後進行計算。
觀察日誌,spark做業的運行日誌,先用client模式,在本地就直接能夠看到比較全的日誌。日誌裏面會顯示,starting task...,PROCESS LOCAL、NODE LOCAL
若是是發現,好多的級別都是NODE_LOCAL、ANY,那麼最好就去調節一下數據本地化的等待時長。調節完,應該是要反覆調節,每次調節完之後,再來運行,觀察日誌
spark.locality.wait
, 3s, 6s, 10s...
spark中,堆內存又被劃分紅了兩塊兒,一起是專門用來給RDD的cache、persist操做進行RDD數據緩存用的;另一塊兒,用來給spark算子函數的運行使用的,存放函數中本身建立的對象。
默認狀況下,給RDD cache操做的內存佔比,是0.6,60%的內存都給了cache操做了。可是問題是,若是某些狀況下,cache不是那麼的緊張,問題在於task算子函數中建立的對象過多,而後內存又不太大,致使了頻繁的minor gc,甚至頻繁full gc,致使spark頻繁的中止工做。性能影響會很大。
能夠經過spark ui,若是是spark on yarn的話,那麼就經過yarn的界面,去查看你的spark做業的運行統計。能夠看到每一個stage的運行狀況,包括每一個task的運行時間、gc時間等等。若是發現gc太頻繁,時間太長。此時就能夠適當調價這個比例。
下降cache操做的內存佔比,大不了用persist操做,選擇將一部分緩存的RDD數據寫入磁盤,或者序列化方式,配合Kryo序列化類,減小RDD緩存的內存佔用;下降cache操做內存佔比;對應的,算子函數的內存佔比就提高了。這個時候,可能,就能夠減小minor gc的頻率,同時減小full gc的頻率。對性能的提高是有必定的幫助的。
spark.storage.memoryFraction,0.6 -> 0.5 -> 0.4 -> 0.2
有時候,若是你的spark做業處理的數據量特別特別大,幾億數據量;而後spark做業一運行,時不時的報錯,shuffle file cannot find,executor、task lost,out of memory(內存溢出)
多是說executor的堆外內存不太夠用,致使executor在運行的過程當中,可能會內存溢出;而後可能致使後續的stage的task在運行的時候,可能要從一些executor中去拉取shuffle map output文件,
可是executor可能已經掛掉了,關聯的block manager也沒有了;因此可能會報shuffle output file not found;resubmitting task;executor lost;spark做業完全崩潰。
--conf spark.yarn.executor.memoryOverhead=2048
spark-submit腳本里面,去用--conf的方式,去添加配置; 切記,不是在你的spark做業代碼中,用new SparkConf().set()這種方式去設置,不要這樣去設置,是沒有用的!必定要在spark-submit腳本中去設置。
默認狀況下,這個堆外內存上限大概是300多M;一般項目,真正處理大數據的時候,這裏都會出現問題,致使spark做業反覆崩潰,沒法運行;此時就會去調節這個參數,到至少1G(1024M),甚至說2G、4G
若是Executor遠程從另外一個Executor中拉取數據的時候,那個Executor正好在gc,此時呢,沒法創建網絡鏈接,會卡住;spark默認的網絡鏈接的超時時長,是60s;若是卡住60s都沒法創建鏈接的話,那麼就宣告失敗了。
碰到某某file。一串file id。uuid(dsfsfd-2342vs--sdf--sdfsd)。not found。file lost。頗有多是有那份數據的executor在jvm gc。因此拉取數據的時候,創建不了鏈接。而後超過默認60s之後,直接宣告失敗。
--conf spark.core.connection.ack.wait.timeout=300
spark-submit腳本,切記,不是在new SparkConf().set()這種方式來設置的。一般來講,能夠避免部分的偶爾出現的某某文件拉取失敗,某某文件lost
《北風網Spark項目實戰》
github: https://github.com/yangtong123/StudySpark