下面這些關於Spark的性能調優項,有的是來自官方的,有的是來自別的的工程師,有的則是我本身總結的。node
基本概念和原則程序員
首先,要搞清楚Spark的幾個基本概念和原則,不然系統的性能調優無從談起:算法
每一臺host上面能夠並行N個worker,每個worker下面能夠並行M個executor,task們會被分配到executor上面去執行。Stage指的是一組並行運行的task,stage內部是不能出現shuffle的,由於shuffle的就像籬笆同樣阻止了並行task的運行,遇到shuffle就意味着到了stage的邊界。緩存
CPU的core數量,每一個executor能夠佔用一個或多個core,能夠經過觀察CPU的使用率變化來了解計算資源的使用狀況,例如,很常見的一種浪費是一個executor佔用了多個core,可是總的CPU使用率卻不高(由於一個executor並不總能充分利用多核的能力),這個時候能夠考慮讓麼個executor佔用更少的core,同時worker下面增長更多的executor,或者一臺host上面增長更多的worker來增長並行執行的executor的數量,從而增長CPU利用率。可是增長executor的時候須要考慮好內存消耗的控制,以避免出現Out of Memory的狀況。網絡
partition和parallelism,partition指的就是數據分片的數量,每一次task只能處理一個partition的數據,這個值過小了會致使每片數據量太大,致使內存壓力,或者諸多executor的計算能力沒法利用充分;可是若是太大了則會致使分片太多,執行效率下降。在執行action類型操做的時候(好比各類reduce操做),partition的數量會選擇parent RDD中最大的那一個。而parallelism則指的是在RDD進行reduce類操做的時候,默認返回數據的paritition數量(而在進行map類操做的時候,partition數量一般取自parent RDD中較大的一個,並且也不會涉及shuffle,所以這個parallelism的參數沒有影響)。因此說,這兩個概念密切相關,都是涉及到數據分片的,做用方式實際上是統一的。經過spark.default.parallelism能夠設置默認的分片數量,而不少RDD的操做均可以指定一個partition參數來顯式控制具體的分片數量。數據結構
上面這兩條原理上看起來很簡單,可是卻很是重要,根據硬件和任務的狀況選擇不一樣的取值。想要取一個放之四海而皆準的配置是不現實的。看這樣幾個例子:(1)實踐中跑的EMR Spark job,有的特別慢,查看CPU利用率很低,咱們就嘗試減小每一個executor佔用CPU core的數量,增長並行的executor數量,同時配合增長分片,總體上增長了CPU的利用率,加快數據處理速度。(2)發現某job很容易發生內存溢出,咱們就增大分片數量,從而減小了每片數據的規模,同時還減小並行的executor數量,這樣相同的內存資源分配給數量更少的executor,至關於增長了每一個task的內存分配,這樣運行速度可能慢了些,可是總比OOM強。(3)數據量特別少,有大量的小文件生成,就減小文件分片,不必建立那麼多task,這種狀況,若是隻是最原始的input比較小,通常都能被注意到;可是,若是是在運算過程當中,好比應用某個reduceBy或者某個filter之後,數據大量減小,這種低效狀況就不多被留意到。app
最後再補充一點,隨着參數和配置的變化,性能的瓶頸是變化的,在分析問題的時候不要忘記。例如在每臺機器上部署的executor數量增長的時候,性能一開始是增長的,同時也觀察到CPU的平均使用率在增長;可是隨着單臺機器上的executor愈來愈多,性能降低了,由於隨着executor的數量增長,被分配到每一個executor的內存數量減少,在內存裏直接操做的愈來愈少,spill over到磁盤上的數據愈來愈多,天然性能就變差了。框架
下面給這樣一個直觀的例子,當前總的cpu利用率並不高:ide
可是通過根據上述原則的的調整以後,能夠顯著發現cpu總利用率增長了:工具
其次,涉及性能調優咱們常常要改配置,在Spark裏面有三種常見的配置方式,雖然有些參數的配置是能夠互相替代,可是做爲最佳實踐,仍是須要遵循不一樣的情形下使用不一樣的配置:
設置環境變量,這種方式主要用於和環境、硬件相關的配置;
命令行參數,這種方式主要用於不一樣次的運行會發生變化的參數,用雙橫線開頭;
代碼裏面(好比Scala)顯式設置(SparkConf對象),這種配置一般是application級別的配置,通常不改變。
舉一個配置的具體例子。Node、worker和executor之間的比例調整。咱們常常須要調整並行的executor的數量,那麼簡單說有兩種方式:
一個是調整並行的worker的數量,好比,SPARK_WORKER_INSTANCES能夠設置每一個node的worker的數量,可是在改變這個參數的時候,好比改爲2,必定要相應設置SPARK_WORKER_CORES的值,讓每一個worker使用原有一半的core,這樣才能讓兩個worker一同工做;
另外一個是調整worker內executor的數量,咱們是在YARN框架下采用這個調整來實現executor數量改變的,一種典型辦法是,一個host只跑一個worker,而後配置spark.executor.cores爲host上CPU core的N分之一,同時也設置spark.executor.memory爲host上分配給Spark計算內存的N分之一,這樣這個host上就可以啓動N個executor。
有的配置在不一樣的MR框架/工具下是不同的,好比YARN下有的參數的默認取值就不一樣,這點須要注意。
明確這些基礎的事情之後,再來一項一項看性能調優的要點。
內存
Memory Tuning,Java對象會佔用原始數據2~5倍甚至更多的空間。最好的檢測對象內存消耗的辦法就是建立RDD,而後放到cache裏面去,而後在UI上面看storage的變化;固然也可使用SizeEstimator來估算。使用-XX:+UseCompressedOops選項能夠壓縮指針(8字節變成4字節)。在調用collect等等API的時候也要當心——大塊數據往內存拷貝的時候內心要清楚。內存要留一些給操做系統,好比20%,這裏面也包括了OS的buffer cache,若是預留得太少了,會見到這樣的錯誤:
Required executor memory (235520+23552 MB) is above the max threshold (241664 MB) of this cluster! Please increase the value of ‘yarn.scheduler.maximum-allocation-mb’.
或者乾脆就沒有這樣的錯誤,可是依然有由於內存不足致使的問題,有的會有警告,好比這個:
16/01/13 23:54:48 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
Reduce Task的內存使用。在某些狀況下reduce task特別消耗內存,好比當shuffle出現的時候,好比sortByKey、groupByKey、reduceByKey和join等,要在內存裏面創建一個巨大的hash table。其中一個解決辦法是增大level of parallelism,這樣每一個task的輸入規模就相應減少。
注意原始input的大小,有不少操做始終都是須要某類全集數據在內存裏面完成的,那麼並不是拼命增長parallelism和partition的值就能夠把內存佔用減得很是小的。咱們遇到過某些性能低下甚至OOM的問題,是改變這兩個參數所難以緩解的。可是能夠經過增長每臺機器的內存,或者增長機器的數量均可以直接或間接增長內存總量來解決。
在選擇EC2機器類型的時候,要明確瓶頸(能夠藉由測試來明確),好比咱們遇到的狀況就是使用r3.8 xlarge和c3.8 xlarge選擇的問題,運算能力至關,前者比後者貴50%,可是內存是後者的5倍。
CPU
Level of Parallelism。指定它之後,在進行reduce類型操做的時候,默認partition的數量就被指定了。這個參數在實際工程中一般是必不可少的,通常都要根據input和每一個executor內存的大小來肯定。設置level of parallelism或者屬性spark.default.parallelism來改變並行級別,一般來講,每個CPU核能夠分配2~3個task。
CPU core的訪問模式是共享仍是獨佔。即CPU核是被同一host上的executor共享仍是瓜分並獨佔。好比YARN環境,一臺機器上共有32個CPU core的資源,同時部署了兩個executor,總內存是50G,那麼一種方式是配置spark.executor.cores爲16,spark.executor.memory爲20G,這樣因爲內存的限制,這臺機器上會部署兩個executor,每一個都使用20G內存,而且各使用獨佔的16個CPU core資源;而若是把spark.executor.cores配置爲32,那麼依然會部署兩個executor,可是兩者會共享這32個core。根據個人測試,獨佔模式的性能要略好與共享模式。同時,獨佔模式也是Spark官方文檔上推薦的方式。
GC調優。打印GC信息:-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps。默認60%的executor內存能夠被用來做爲RDD的緩存,所以只有40%的內存能夠被用來做爲對象建立的空間,這一點能夠經過設置spark.storage.memoryFraction改變。若是有不少小對象建立,可是這些對象在不徹底GC的過程當中就能夠回收,那麼增大Eden區會有必定幫助。若是有任務從HDFS拷貝數據,內存消耗有一個簡單的估算公式——好比HDFS的block size是64MB,工做區內有4個task拷貝數據,而解壓縮一個block要增大3倍大小,那麼內存消耗就是:4*3*64MB。另外,工做中遇到過這樣的一個問題:GC默認狀況下有一個限制,默認是GC時間不能超過2%的CPU時間,可是若是大量對象建立(在Spark裏很容易出現,代碼模式就是一個RDD轉下一個RDD),就會致使大量的GC時間,從而出現OutOfMemoryError: GC overhead limit exceeded,能夠經過設置-XX:-UseGCOverheadLimit關掉它。
序列化和傳輸
Data Serialization,默認使用的是Java Serialization,這個程序員最熟悉,可是性能、空間表現都比較差。還有一個選項是Kryo Serialization,更快,壓縮率也更高,可是並不是支持任意類的序列化。在Spark UI上可以看到序列化佔用總時間開銷的比例,若是這個比例高的話能夠考慮優化內存使用和序列化。
Broadcasting Large Variables。在task使用靜態大對象的時候,能夠把它broadcast出去。Spark會打印序列化後的大小,一般來講若是它超過20KB就值得這麼作。有一種常見情形是,一個大表join一個小表,把小表broadcast後,大表的數據就不須要在各個node之間瘋跑,安安靜靜地呆在本地等小表broadcast過來就行了。
Data Locality。數據和代碼要放到一塊兒才能處理,一般代碼總比數據要小一些,所以把代碼送到各處會更快。Data Locality是數據和處理的代碼在屋裏空間上接近的程度:PROCESS_LOCAL(同一個JVM)、NODE_LOCAL(同一個node,好比數據在HDFS上,可是和代碼在同一個node)、NO_PREF、RACK_LOCAL(不在同一個server,但在同一個機架)、ANY。固然優先級從高到低,可是若是在空閒的executor上面沒有未處理數據了,那麼就有兩個選擇:(1)要麼等現在繁忙的CPU閒下來處理儘量「本地」的數據,(1)要麼就不等直接啓動task去處理相對遠程的數據。默認當這種狀況發生Spark會等一下子(spark.locality),即策略(1),若是繁忙的CPU停不下來,就會執行策略(2)。
代碼裏對大對象的引用。在task裏面引用大對象的時候要當心,由於它會隨着task序列化到每一個節點上去,引起性能問題。只要序列化的過程不拋出異常,引用對象序列化的問題事實上不多被人重視。若是,這個大對象確實是須要的,那麼就不如干脆把它變成RDD好了。絕大多數時候,對於大對象的序列化行爲,是不知不覺發生的,或者說是預期以外的,好比在咱們的項目中有這樣一段代碼:
1
2
3
|
rdd.map(r
=
> {
println(BackfillTypeIndex)
})
|
其實呢,它等價於這樣:
1
2
3
|
rdd.map(r
=
> {
println(
this
.BackfillTypeIndex)
})
|
對於這樣的問題,一種最直接的解決方法就是:
1
2
|
val
dereferencedVariable
=
this
.BackfillTypeIndex
rdd.map(r
=
> println(dereferencedVariable))
// "this" is not serialized
|
相關地,註解@transient用來標識某變量不要被序列化,這對於將大對象從序列化的陷阱中排除掉是頗有用的。另外,注意class之間的繼承層級關係,有時候一個小的case class可能來自一棵大樹。
文件讀寫
文件存儲和讀取的優化。好比對於一些case而言,若是隻須要某幾列,使用rcfile和parquet這樣的格式會大大減小文件讀取成本。再有就是存儲文件到S3上或者HDFS上,能夠根據狀況選擇更合適的格式,好比壓縮率更高的格式。另外,特別是對於shuffle特別多的狀況,考慮留下必定量的額外內存給操做系統做爲操做系統的buffer cache,好比總共50G的內存,JVM最多分配到40G多一點。
文件分片。好比在S3上面就支持文件以分片形式存放,後綴是partXX。使用coalesce方法來設置分紅多少片,這個調整成並行級別或者其整數倍能夠提升讀寫性能。可是過高過低都很差,過低了無法充分利用S3並行讀寫的能力,過高了則是小文件太多,預處理、合併、鏈接創建等等都是時間開銷啊,讀寫還容易超過throttle。
任務
Spark的Speculation。經過設置spark.speculation等幾個相關選項,可讓Spark在發現某些task執行特別慢的時候,能夠在不等待完成的狀況下被從新執行,最後相同的task只要有一個執行完了,那麼最快執行完的那個結果就會被採納。
減小Shuffle。其實Spark的計算每每很快,可是大量開銷都花在網絡和IO上面,而shuffle就是一個典型。舉個例子,若是(k, v1) join (k, v2) => (k, v3),那麼,這種狀況其實Spark是優化得很是好的,由於須要join的都在一個node的一個partition裏面,join很快完成,結果也是在同一個node(這一系列操做能夠被放在同一個stage裏面)。可是若是數據結構被設計爲(obj1) join (obj2) => (obj3),而其中的join條件爲obj1.column1 == obj2.column1,這個時候每每就被迫shuffle了,由於再也不有同一個key使得數據在同一個node上的強保證。在必定要shuffle的狀況下,儘量減小shuffle前的數據規模,好比這個避免groupByKey的例子。下面這個比較的圖片來自Spark Summit 2013的一個演講,講的是同一件事情:
Repartition。運算過程當中數據量時大時小,選擇合適的partition數量關係重大,若是太多partition就致使有不少小任務和空任務產生;若是太少則致使運算資源無法充分利用,必要時候可使用repartition來調整,不過它也不是沒有代價的,其中一個最主要代價就是shuffle。再有一個常見問題是數據大小差別太大,這種狀況主要是數據的partition的key其實取值並不均勻形成的(默認使用HashPartitioner),須要改進這一點,好比重寫hash算法。測試的時候想知道partition的數量能夠調用rdd.partitions().size()獲知。
Task時間分佈。關注Spark UI,在Stage的詳情頁面上,能夠看獲得shuffle寫的總開銷,GC時間,當前方法棧,還有task的時間花費。若是你發現task的時間花費分佈太散,就是說有的花費時間很長,有的很短,這就說明計算分佈不均,須要從新審視數據分片、key的hash、task內部的計算邏輯等等,瓶頸出如今耗時長的task上面。
重用資源。有的資源申請開銷巨大,並且每每至關有限,好比創建鏈接,能夠考慮在partition創建的時候就建立好(好比使用mapPartition方法),這樣對於每一個partition內的每一個元素的操做,就只要重用這個鏈接就行了,不須要從新創建鏈接。
可供參考的文檔:官方調優文檔Tuning Spark,Spark配置的官方文檔,Spark Programming Guide,JVMGC調優文檔,JVM性能調優文檔,How-to: Tune Your Apache Spark Jobs part-1 & part-2。