一:Spark的性能優化,主要手段包括:
一、使用高性能序列化類庫
二、優化數據結構
三、對屢次使用的RDD進行持久化 / Checkpoint
四、使用序列化的持久化級別
五、Java虛擬機垃圾回收調優
六、提升並行度
七、廣播共享數據
八、數據本地化
九、reduceByKey和groupByKey的合理使用
十、Shuffle調優(核心中的核心,重中之重)java
二:spark診斷內存消耗node
java主要的內存消耗算法
1、每一個Java對象,都有一個對象頭,會佔用16個字節,主要是包括了一些對象的元信息,好比指向它的類的指針。若是一個對象自己很小,好比就包括了一個int類型的field,那麼它的對象頭實際上比對象本身還要大。 2、Java的String對象,會比它內部的原始數據,要多出40個字節。由於它內部使用char數組來保存內部的字符序列的,而且還得保存諸如數組長度之類的信息。並且由於String使用的是UTF-16編碼,因此每一個字符會佔用2個字節。好比,包含10個字符的String,會佔用60個字節。 3、Java中的集合類型,好比HashMap和LinkedList,內部使用的是鏈表數據結構,因此對鏈表中的每個數據,都使用了Entry對象來包裝。Entry對象不光有對象頭,還有指向下一個Entry的指針,一般佔用8個字節。 4、元素類型爲原始數據類型(好比int)的集合,內部一般會使用原始數據類型的包裝類型,好比Integer,來存儲元素。
怎麼判斷程序消耗的內存:apache
1、首先,本身設置RDD的並行度,有兩種方式:要否則,在parallelize()、textFile()等方法中,傳入第二個參數,設置RDD的task / partition的數量;要否則,用SparkConf.set()方法,設置一個參數,spark.default.parallelism,能夠統一設置這個application全部RDD的partition數量。 2、其次,在程序中將RDD cache到內存中,調用RDD.cache()方法便可。 3、最後,觀察Driver的log,你會發現相似於:「INFO BlockManagerMasterActor: Added rdd_0_1 in memory on mbk.local:50311 (size: 717.5 KB, free: 332.3 MB)」的日誌信息。這就顯示了每一個partition佔用了多少內存。 4、將這個內存信息乘以partition數量,便可得出RDD的內存佔用量。
三:spark高性能序列化庫編程
兩種序列化機制json
spark默認使用了第一種序列化機制: 1、Java序列化機制:默認狀況下,Spark使用Java自身的ObjectInputStream和ObjectOutputStream機制進行對象的序列化。只要你的類實現了Serializable接口,那麼都是能夠序列化的。並且Java序列化機制是提供了自定義序列化支持的,只要你實現Externalizable接口便可實現本身的更高性能的序列化算法。Java序列化機制的速度比較慢,並且序列化後的數據佔用的內存空間比較大。 2、Kryo序列化機制:Spark也支持使用Kryo類庫來進行序列化。Kryo序列化機制比Java序列化機制更快,並且序列化後的數據佔用的空間更小,一般比Java序列化的數據佔用的空間要小10倍。Kryo序列化機制之因此不是默認序列化機制的緣由是,有些類型雖然實現了Seriralizable接口,可是它也不必定可以進行序列化;此外,若是你要獲得最佳的性能,Kryo還要求你在Spark應用程序中,對全部你須要序列化的類型都進行註冊。
如何使用Kroyo序列數組
方式一:SparkConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
方式二:若是要註冊自定義的類型,那麼就使用以下的代碼,便可:
Scala版本:
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[Counter] ))
val sc = new SparkContext(conf)
Java版本:
SparkConf conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Counter.class)
JavaSparkContext sc = new JavaSparkContext(conf)
使用Kroyo序列的建議:緩存
1、優化緩存大小 若是註冊的要序列化的自定義的類型,自己特別大,好比包含了超過100個field。那麼就會致使要序列化的對象過大。此時就須要對Kryo自己進行優化。由於Kryo內部的緩存可能不夠存放那麼大的class對象。此時就須要調用SparkConf.set()方法,設置spark.kryoserializer.buffer.mb參數的值,將其調大。 默認狀況下它的值是2,就是說最大能緩存2M的對象,而後進行序列化。能夠在必要時將其調大。好比設置爲10。 2、預先註冊自定義類型 雖然不註冊自定義類型,Kryo類庫也能正常工做,可是那樣的話,對於它要序列化的每一個對象,都會保存一份它的全限定類名。此時反而會耗費大量內存。所以一般都建議預先註冊號要序列化的自定義的類。
使用場景:性能優化
首先,這裏討論的都是Spark的一些普通的場景,一些特殊的場景,好比RDD的持久化,在後面會講解。這裏先不說。
那麼,這裏針對的Kryo序列化類庫的使用場景,就是算子函數使用到了外部的大數據的狀況。好比說吧,咱們在外部定義了一個封裝了應用全部配置的對象,好比自定義了一個MyConfiguration對象,裏面包含了100m的數據。而後,在算子函數裏面,使用到了這個外部的大對象。
此時呢,若是默認狀況下,讓Spark用java序列化機制來序列化這種外部的大對象,那麼就會致使,序列化速度緩慢,而且序列化之後的數據仍是比較大,比較佔用內存空間。
所以,在這種狀況下,比較適合,切換到Kryo序列化類庫,來對外部的大對象進行序列化操做。一是,序列化速度會變快;二是,會減小序列化後的數據佔用的內存空間。
四:Spark優化數據結構網絡
目的:使用數據結構是爲了減小數據的佔用量,從而減小內存的開銷。
優化的對象:主要就是優化你的算子函數,內部使用到的局部數據,或者是算子函數外部的數據。均可以進行數據結構的優化。優化以後,都會減小其對內存的消耗和佔用。
優化方式:
1、優先使用數組以及字符串,而不是集合類。也就是說,優先用array,而不是ArrayList、LinkedList、HashMap等集合。 好比,有個List<Integer> list = new ArrayList<Integer>(),將其替換爲int[] arr = new int[]。這樣的話,array既比List少了額外信息的存儲開銷,還能使用原始數據類型(int)來存儲數據,比List中用Integer這種包裝類型存儲數據,要節省內存的多。 還好比,一般企業級應用中的作法是,對於HashMap、List這種數據,統一用String拼接成特殊格式的字符串,好比Map<Integer, Person> persons = new HashMap<Integer, Person>()。能夠優化爲,特殊的字符串格式:id:name,address|id:name,address...。 2、避免使用多層嵌套的對象結構。好比說,public class Teacher { private List<Student> students = new ArrayList<Student>() }。就是很是很差的例子。由於Teacher類的內部又嵌套了大量的小Student對象。 好比說,對於上述例子,也徹底可使用特殊的字符串來進行數據的存儲。好比,用json字符串來存儲數據,就是一個很好的選擇。 {"teacherId": 1, "teacherName": "leo", students:[{"studentId": 1, "studentName": "tom"},{"studentId":2, "studentName":"marry"}]} 3、對於有些可以避免的場景,儘可能使用int替代String。由於String雖然比ArrayList、HashMap等數據結構高效多了,佔用內存量少多了,可是以前分析過,仍是有額外信息的消耗。好比以前用String表示id,那麼如今徹底能夠用數字類型的int,來進行替代。 這裏提醒,在spark應用中,id就不要用經常使用的uuid了,由於沒法轉成int,就用自增的int類型的id便可。(sdfsdfdf-234242342-sdfsfsfdfd)
五:對屢次使用的RDD進行持久化操做 或 CheckPoint
對屢次運算的RDD進行持久化或放到內存,能夠減小對重複計算的代價;
若是要保證在RDD的持久化數據可能丟失的狀況下,還要保證高性能,那麼能夠對RDD進行Checkpoint操做。
對數據的持久化有多重級別:
除了對屢次使用的RDD進行持久化操做以外,還能夠進一步優化其性能。由於頗有可能,RDD的數據是持久化到內存,或者磁盤中的。那麼,此時,若是內存大小不是特別充足,徹底可使用序列化的持久化級別,好比MEMORY_ONLY_SER、MEMORY_AND_DISK_SER等。使用RDD.persist(StorageLevel.MEMORY_ONLY_SER)這樣的語法便可。
這樣的話,將數據序列化以後,再持久化,能夠大大減少對內存的消耗。此外,數據量小了以後,若是要寫入磁盤,那麼磁盤io性能消耗也比較小。
對RDD持久化序列化後,RDD的每一個partition的數據,都是序列化爲一個巨大的字節數組。這樣,對於內存的消耗就小的多了。可是惟一的缺點就是,獲取RDD數據時,須要對其進行反序列化,會增大其性能開銷。
所以,對於序列化的持久化級別,還能夠進一步優化,也就是說,使用Kryo序列化類庫,這樣,能夠得到更快的序列化速度,而且佔用更小的內存空間。可是要記住,若是RDD的元素(RDD<T>的泛型類型),是自定義類型的話,在Kryo中提早註冊自定義類型。
六:JVM虛擬機垃圾回收
主要是建立少許的對象,以及建立對象的大小。編程中避免大對象。
還有一些jvm的通用方法。都是通用的,能夠參考一些通用方法。
七:提升並行度
實際上Spark集羣的資源並不必定會被充分利用到,因此要儘可能設置合理的並行度,來充分地利用集羣的資源。才能充分提升Spark應用程序的性能。
Spark會自動設置以文件做爲輸入源的RDD的並行度,依據其大小,好比HDFS,就會給每個block建立一個partition,也依據這個設置並行度。對於reduceByKey等會發生shuffle的操做,就使用並行度最大的父RDD的並行度便可。
能夠手動使用textFile()、parallelize()等方法的第二個參數來設置並行度;也可使用spark.default.parallelism參數,來設置統一的並行度。
好比說,spark-submit設置了executor數量是10個,每一個executor要求分配2個core,那麼application總共會有20個core。此時能夠設置new SparkConf().set("spark.default.parallelism", "60")來設置合理的並行度,從而充分利用資源。
官方建議設置的並行數量爲2-3倍的cpu cores的數量,這樣可使一些計算能力較弱的cpu少計算一些數據。能力好的cpu計算多一些數據。
八:廣播共享文件
若是你的算子函數中,使用到了特別大的數據,那麼,這個時候,推薦將該數據進行廣播。這樣的話,就不至於將一個大數據拷貝到每個task上去。而是給每一個節點拷貝一份,而後節點上的task共享該數據。
這樣的話,就能夠減小大數據在節點上的內存消耗。而且能夠減小數據到節點的網絡傳輸消耗。
九:數據本地化
基於移動計算的成本要遠遠小於移動數據的原則。
數據本地化級別:
數據本地化,指的是,數據離計算它的代碼有多近。基於數據距離代碼的距離,有幾種數據本地化級別: 1、PROCESS_LOCAL:數據和計算它的代碼在同一個JVM進程中。 2、NODE_LOCAL:數據和計算它的代碼在一個節點上,可是不在一個進程中,好比在不一樣的executor進程中,或者是數據在HDFS文件的block中。 3、NO_PREF:數據從哪裏過來,性能都是同樣的。 4、RACK_LOCAL:數據和計算它的代碼在一個機架上。 5、ANY:數據可能在任意地方,好比其餘網絡環境內,或者其餘機架上。
優化方案:
Spark傾向於使用最好的本地化級別來調度task,可是這是不可能的。若是沒有任何未處理的數據在空閒的executor上,那麼Spark就會放低本地化級別。這時有兩個選擇:第一,等待,直到executor上的cpu釋放出來,那麼就分配task過去;第二,當即在任意一個executor上啓動一個task。
Spark默認會等待一下子,來指望task要處理的數據所在的節點上的executor空閒出一個cpu,從而將task分配過去。只要超過了時間,那麼Spark就會將task分配到其餘任意一個空閒的executor上。
能夠設置參數,spark.locality系列參數,來調節Spark等待task能夠進行數據本地化的時間。spark.locality.wait(3000毫秒)、spark.locality.wait.node、spark.locality.wait.process、spark.locality.wait.rack。
十: groupByKey 和 ReduceByKey
若是能用reduceByKey,那就用reduceByKey,由於它會在map端,先進行本地combine,能夠大大減小要傳輸到reduce端的數據量,減少網絡傳輸的開銷。
只有在reduceByKey處理不了時,才用groupByKey().map()來替代。
十一: shuffle優化
瞭解下shuffle的過程:
優化參數:
new SparkConf().set("spark.shuffle.consolidateFiles", "true") spark.shuffle.consolidateFiles:是否開啓shuffle block file的合併,默認爲false spark.reducer.maxSizeInFlight:reduce task的拉取緩存,默認48m spark.shuffle.file.buffer:map task的寫磁盤緩存,默認32k spark.shuffle.io.maxRetries:拉取失敗的最大重試次數,默認3次 spark.shuffle.io.retryWait:拉取失敗的重試間隔,默認5s spark.shuffle.memoryFraction:用於reduce端聚合的內存比例,默認0.2,超過比例就會溢出到磁盤上
1- spark.shuffle.consolidateFiles參數優化
沒有開啓consolidation機制的時候,shuffle write的性能是比較低下的,並且會影響到shuffle read的性能,也會比較低下。
由於在shuffle map端建立的磁盤文件太多了,致使shuffle write要耗費大量的性能到磁盤文件的建立,以及磁盤的io上。對於shuffle read,也是同樣的,每一個result task可能都須要經過磁盤io讀取多個文件的數據,都只shuffle read,性能可能也會受到影響。作主要的仍是shuffle write,由於要寫的磁盤文件太多。
好比每一個節點有100個shuffle map task,10個CPU core是,總共有1000個result task。因此,每一個節點上的磁盤文件爲100*1000個。
設置爲true時,每一個cpu爲每一個result task寫一個文件(文件內容是以前的數據進行合併的結果),每一個節點上的磁盤文件爲10*1000個。
2- spark.reducer.maxSizeInFlight
若是內存足夠的話,這個量應該增大,這樣,result task拉取的次數會減小(每次拉取數據量增長)。
3- spark.shuffle.file.buffer
能夠適量增大,這樣每次寫入到文件的數據量減小,從而減小寫文件的次數。
4- spark.shuffle.io.maxRetries
拉取數據的時候,可能jvm在full GC。
5- spark.shuffle.io.retryWait
能夠適當增長時間。爲了應對jvm 的full GC。
6- spark.shuffle.memoryFraction
能夠適當的調大。
執行reduce task的Excetor中,有一部份內存是用來匯聚各個reduce task拉取的數據,放到map集合中,進行聚合。
當該數據超過總緩存*比例時,會把該內存的數據寫入到磁盤上。
7- 若是jvm GC沒有調優好,會致使每次gc都須要1min。那麼拉取的最大默認時間爲3*5s=15s。就會致使頻繁的不少文件拉取失敗。會報shuffle output file lost。而後DAGScheduler會重試task和stage。最後甚至致使application掛掉。
以上觀點基本都借鑑自:中華石杉--spark從入門到精通的觀點。