其餘更多java基礎文章:
java基礎學習(目錄)java
Spark的性能調優主要有如下幾個方向:node
按照優化效果簡單排序:算法
本系列主要講解:shell
性能調優的王道,就是增長和分配更多的資源,性能和速度上的提高,是顯而易見的;基本上,在必定範圍以內,增長資源與性能的提高,是成正比的;寫完了一個複雜的spark做業以後,進行性能調優的時候,首先第一步,我以爲,就是要來調節最優的資源配置;在這個基礎之上,若是說你的spark做業,可以分配的資源達到了你的能力範圍的頂端以後,沒法再分配更多的資源了,公司資源有限;那麼纔是考慮去作後面的這些性能調優的點。apache
在咱們在生產環境中,提交spark做業時,用的spark-submit shell腳本,裏面調整對應的參數數組
/usr/local/spark/bin/spark-submit \
--class cn.spark.sparktest.core.WordCountCluster \
--num-executors 3 \ 配置executor的數量
--driver-memory 100m \ 配置driver的內存(影響不大)
--executor-memory 100m \ 配置每一個executor的內存大小
--executor-cores 3 \ 配置每一個executor的cpu core數量
/usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
複製代碼
若是executor數量比較少,那麼,可以並行執行的task數量就比較少,就意味着,咱們的Application的並行執行的能力就很弱。
好比有3個executor,每一個executor有2個cpu core,那麼同時可以並行執行的task,就是6個。6個執行完之後,再換下一批6個task。 增長了executor數量之後,那麼,就意味着,可以並行執行的task數量,也就變多了。好比原先是6個,如今可能能夠並行執行10個,甚至20個,100個。那麼並行能力就比以前提高了數倍,數十倍。 相應的,性能(執行的速度),也能提高數倍~數十倍。緩存
增長每一個executor的cpu core,也是增長了執行的並行能力。本來20個executor,每一個才2個cpu core。可以並行執行的task數量,就是40個task。
如今每一個executor的cpu core,增長到了5個。可以並行執行的task數量,就是100個task。 執行的速度,提高了2.5倍。bash
增長了內存量之後,對性能的提高,有三點:網絡
並行度:其實就是指的是,Spark做業中,各個stage的task數量,也就表明了Spark做業的在各個階段(stage)的並行度。架構
假設,如今已經在spark-submit腳本里面,給咱們的spark做業分配了足夠多的資源,好比50個executor,每一個executor有10G內存,每一個executor有3個cpu core。基本已經達到了集羣或者yarn隊列的資源上限。
task沒有設置,或者設置的不多,好比就設置了,100個task。50個executor,每一個executor有3個cpu core,也就是說,你的Application任何一個stage運行的時候,都有總數在150個cpu core,能夠並行運行。可是你如今,只有100個task,平均分配一下,每一個executor分配到2個task,ok,那麼同時在運行的task,只有100個,每一個executor只會並行運行2個task。每一個executor剩下的一個cpu core,就浪費掉了。
你的資源雖然分配足夠了,可是問題是,並行度沒有與資源相匹配,致使你分配下去的資源都浪費掉了。
應該是要設置的足夠大,大到能夠徹底合理的利用你的集羣資源;好比上面的例子,總共集羣有150個cpu core,能夠並行運行150個task。那麼就應該將你的Application的並行度,至少設置成150,才能徹底有效的利用你的集羣資源,讓150個task,並行執行;並且task增長到150個之後,便可以同時並行運行,還可讓每一個task要處理的數據量變少;好比總共150G的數據要處理,若是是100個task,每一個task計算1.5G的數據;如今增長到150個task,能夠並行運行,並且每一個task主要處理1G的數據就能夠。
很簡單的道理,只要合理設置並行度,就能夠徹底充分利用你的集羣計算資源,而且減小每一個task要處理的數據量,最終,就是提高你的整個Spark做業的性能和運行速度。
實際狀況,與理想狀況不一樣的,有些task會運行的快一點,好比50s就完了,有些task,可能會慢一點,要1分半才運行完,因此若是你的task數量,恰好設置的跟cpu core數量相同,可能仍是會致使資源的浪費,由於,好比150個task,10個先運行完了,剩餘140個還在運行,可是這個時候,有10個cpu core就空閒出來了,就致使了浪費。那若是task數量設置成cpu core總數的2~3倍,那麼一個task運行完了之後,另外一個task立刻能夠補上來,就儘可能讓cpu core不要空閒,同時也是儘可能提高spark做業運行的效率和速度,提高性能。
spark.default.parallelism
SparkConf conf = new SparkConf()
.set("spark.default.parallelism", "500")
複製代碼
如上圖第一條DAG,默認狀況下,屢次對一個RDD執行算子,去獲取不一樣的RDD;都會對這個RDD以及以前的父RDD,所有從新計算一次;因此在計算RDD3和RDD4的時候,前面的讀取HDFS文件,而後對RDD1執行算子,獲取 到RDD2會計算兩遍。
這種狀況,是絕對絕對,必定要避免的,一旦出現一個RDD重複計算的狀況,就會致使性能急劇下降。好比,HDFS->RDD1-RDD2的時間是15分鐘,那麼此時就要走兩遍,變成30分鐘。
另一種狀況,在上圖第二條DAG中國,從一個RDD到幾個不一樣的RDD,算子和計算邏輯實際上是徹底同樣的,結果由於人爲的疏忽,計算了屢次,獲取到了多個RDD。這個也是儘可能要避免的。
1. RDD架構重構與優化
儘可能去複用RDD,差很少的RDD,能夠抽取稱爲一個共同的RDD,供後面的RDD計算時,反覆使用。
2. 公共RDD必定要實現持久化
對於要屢次計算和使用的公共RDD,必定要進行持久化。
持久化,也就是說,將RDD的數據緩存到內存中/磁盤中,(BlockManager),之後不管對這個RDD作多少次計算,那麼都是直接取這個RDD的持久化的數據,好比從內存中或者磁盤中,直接提取一份數據。
3. 持久化,是能夠進行序列化的
若是正常將數據持久化在內存中,那麼可能會致使內存的佔用過大,這樣的話,也許,會致使OOM內存溢出。
當純內存沒法支撐公共RDD數據徹底存放的時候,就優先考慮,使用序列化的方式在純內存中存儲。將RDD的每一個partition的數據,序列化成一個大的字節數組,就一個對象;序列化後,大大減小內存的空間佔用。
序列化的方式,惟一的缺點就是,在獲取數據的時候,須要反序列化。
4. 爲了數據的高可靠性,並且內存充足,可使用雙副本機制,進行持久化
持久化的雙副本機制,持久化後的一個副本,由於機器宕機了,副本丟了,就仍是得從新計算一次;持久化的每一個數據單元,存儲一份副本,放在其餘節點上面;從而進行容錯;一個副本丟了,不用從新計算,還可使用另一份副本。
這種方式,僅僅針對你的內存資源極度充足
關於廣播能夠閱讀 Spark學習(二)——RDD基礎 中的共享變量。
task執行的算子中,使用了外部的變量,而後driver會把變量以task的形式發送到excutor端,每一個task都會獲取一份變量的副本。若是有不少個task,就會有不少給excutor端攜帶不少個變量,若是這個變量很是大的時候,就可能會形成內存溢出。
好比,外部變量map是1M。總共,你前面調優都調的特好,資源給的到位,配合着資源,並行度調節的絕對到位,1000個task。大量task的確都在並行運行。
這些task裏面都用到了佔用1M內存的map,那麼首先,map會拷貝1000份副本,經過網絡傳輸到各個task中去,給task使用。總計有1G的數據,會經過網絡傳輸。網絡傳輸的開銷,不容樂觀啊!!!網絡傳輸,也許就會消耗掉你的spark做業運行的總時間的一小部分。
map副本,傳輸到了各個task上以後,是要佔用內存的。1個map的確不大,1M;1000個map分佈在你的集羣中,一會兒就耗費掉1G的內存。對性能會有什麼影響呢?
沒必要要的內存的消耗和佔用,就致使了,你在進行RDD持久化到內存,也許就無法徹底在內存中放下;就只能寫入磁盤,最後致使後續的操做在磁盤IO上消耗性能;
你的task在建立對象的時候,也許會發現堆內存放不下全部對象,也許就會致使頻繁的垃圾回收器的回收,GC。GC的時候,必定是會致使工做線程中止,也就是致使Spark暫停工做那麼一點時間。頻繁GC的話,對Spark做業的運行的速度會有至關可觀的影響。
Kryo序列化機制,一旦啓用之後,會生效的幾個地方:
SparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
複製代碼
Kryo之因此沒有被做爲默認的序列化類庫的緣由,就要出現了:主要是由於Kryo要求,若是要達到它的最佳性能的話,那麼就必定要註冊你自定義的類(好比,你的算子函數中使用到了外部自定義類型的對象變量,這時,就要求必須註冊你的類,不然Kryo達不到最佳性能)。
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(new Class[]{CategorySortKey.class})
複製代碼
fastutil是擴展了Java標準集合框架(Map、List、Set;HashMap、ArrayList、HashSet)的類庫,提供了特殊類型的map、set、list和queue;
fastutil可以提供更小的內存佔用,更快的存取速度;咱們使用fastutil提供的集合類,來替代本身平時使用的JDK的原生的Map、List、Set,好處在於,fastutil集合類,能夠減少內存的佔用,而且在進行集合的遍歷、根據索引(或者key)獲取元素的值和設置元素的值的時候,提供更快的存取速度;
fastutil也提供了64位的array、set和list,以及高性能快速的,以及實用的IO類,來處理二進制和文本類型的文件; fastutil最新版本要求Java 7以及以上版本;
Spark中應用fastutil的場景:
第一步:在pom.xml中引用fastutil的包
<dependency>
<groupId>fastutil</groupId>
<artifactId>fastutil</artifactId>
<version>5.0.9</version>
</dependency>
複製代碼
第二步:List => IntList
IntList fastutilExtractList = new IntArrayList();
複製代碼
可是呢,一般來講,有時,事與願違,可能task沒有機會分配到它的數據所在的節點,爲何呢,可能那個節點的計算資源和計算能力都滿了;因此呢,這種時候,一般來講,Spark會等待一段時間,默認狀況下是3s鍾(不是絕對的,還有不少種狀況,對不一樣的本地化級別,都會去等待),到最後,實在是等待不了了,就會選擇一個比較差的本地化級別,好比說,將task分配到靠它要計算的數據所在節點,比較近的一個節點,而後進行計算。
可是對於第二種狀況,一般來講,確定是要發生數據傳輸,task會經過其所在節點的BlockManager來獲取數據,BlockManager發現本身本地沒有數據,會經過一個getRemote()方法,經過TransferService(網絡數據傳輸組件)從數據所在節點的BlockManager中,獲取數據,經過網絡傳輸回task所在節點。
對於咱們來講,固然不但願是相似於第二種狀況的了。最好的,固然是task和數據在一個節點上,直接從本地executor的BlockManager中獲取數據,純內存,或者帶一點磁盤IO;若是要經過網絡傳輸數據的話,那麼實在是,性能確定會降低的,大量網絡傳輸,以及磁盤IO,都是性能的殺手。
觀察日誌,spark做業的運行日誌,推薦你們在測試的時候,先用client模式,在本地就直接能夠看到比較全的日誌。 日誌裏面會顯示,starting task。。。,PROCESS LOCAL、NODE LOCAL 觀察大部分task的數據本地化級別
若是大多都是PROCESS_LOCAL,那就不用調節了 若是是發現,好多的級別都是NODE_LOCAL、ANY,那麼最好就去調節一下數據本地化的等待時長 調節完,應該是要反覆調節,每次調節完之後,再來運行,觀察日誌 看看大部分的task的本地化級別有沒有提高;看看,整個spark做業的運行時間有沒有縮短
你別本末倒置,本地化級別卻是提高了,可是由於大量的等待時長,spark做業的運行時間反而增長了,那就仍是不要調節了
new SparkConf()
.set("spark.locality.wait", "10")
複製代碼
默認狀況下,下面3個的等待時長,都是跟上面那個是同樣的,都是3s