因爲大部分Spark計算都是在內存中完成的,因此Spark程序的瓶頸可能由集羣中任意一種資源致使,如:CPU、網絡帶寬、或者內存等。最多見的狀況是,數據能裝進內存,而瓶頸是網絡帶寬;固然,有時候咱們也須要作一些優化調整來減小內存佔用,例如將RDD以序列化格式保存(storing RDDs in serialized form)。本文將主要涵蓋兩個主題:1.數據序列化(這對於優化網絡性能極爲重要);2.減小內存佔用以及內存調優。同時,咱們也會說起其餘幾個比較小的主題。html
序列化在任何一種分佈式應用性能優化時都扮演幾位重要的角色。若是序列化格式序列化過程緩慢,或者須要佔用字節不少,都會大大拖慢總體的計算效率。一般,序列化都是Spark應用優化時首先須要關注的地方。Spark着眼於要達到便利性(容許你在計算過程當中使用任何Java類型)和性能的一個平衡。Spark主要提供了兩個序列化庫:java
java.io.Serializable 接口的對象,都能被序列化。同時,你還能夠經過擴展
java.io.Externalizable 來控制序列化性能。Java序列化很靈活但性能較差,同時序列化後佔用的字節數也較多。
Serializable
接口的類型,它須要你在程序中 register 須要序列化的類型,以獲得最佳性能。要切換到使用 Kryo,你能夠在 SparkConf 初始化的時候調用 conf.set(「spark.serializer」, 「org.apache.spark.serializer.KryoSerializer」)。這個設置不只控制各個worker節點之間的混洗數據序列化格式,同時還控制RDD存到磁盤上的序列化格式。目前,Kryo不是默認的序列化格式,由於它須要你在使用前註冊須要序列化的類型,不過咱們仍是建議在對網絡敏感的應用場景下使用Kryo。git
Spark對一些經常使用的Scala核心類型(包括在Twitter chill 庫的AllScalaRegistrar中)自動使用Kryo序列化格式。github
若是你的自定義類型須要使用Kryo序列化,能夠用 registerKryoClasses 方法先註冊:web
val conf = new SparkConf().setMaster(...).setAppName(...) conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2])) val sc = new SparkContext(conf)
Kryo的文檔(Kryo documentation )中有詳細描述了更多的高級選項,如:自定義序列化代碼等。apache
若是你的對象很大,你可能須要增大 spark.kryoserializer.buffer 配置項(config)。其值至少須要大於最大對象的序列化長度。api
最後,若是你不註冊須要序列化的自定義類型,Kryo也能工做,不過每個對象實例的序列化結果都會包含一份完整的類名,這有點浪費空間。數組
內存佔用調優主要須要考慮3點:1.數據佔用的總內存(你多半會但願整個數據集都能裝進內存吧);2.訪問數據集中每一個對象的開銷;3.垃圾回收的開銷(若是你的數據集中對象週轉速度很快的話)。緩存
通常,Java對象的訪問時很快的,但同時Java對象會比原始數據(僅包含各個字段值)佔用的空間多2~5倍。主要緣由有:安全
本節只是Spark內存管理的一個概要,下面咱們會更詳細地討論各類Spark內存調優的具體策略。特別地,咱們會討論如何評估數據的內存使用量,以及如何改進 – 要麼改變你的數據結構,要麼以某種序列化格式存儲數據。最後,咱們還會討論如何調整Spark的緩存大小,以及如何調優Java的垃圾回收器。
Spark中內存主要用於兩類目的:執行計算和數據存儲。執行計算的內存主要用於混洗(Shuffle)、關聯(join)、排序(sort)以及聚合(aggregation),而數據存儲的內存主要用於緩存和集羣內部數據傳播。Spark中執行計算和數據存儲都是共享同一個內存區域(M)。若是執行計算沒有佔用內存,那麼數據存儲能夠申請佔用全部可用的內存,反之亦然。執行計算可能會搶佔數據存儲使用的內存,並將存儲於內存的數據逐出內存,直到數據存儲佔用的內存比例下降到一個指定的比例(R)。換句話說,R是M基礎上的一個子區域,這個區域的內存數據永遠不會被逐出內存。然而,數據存儲不會搶佔執行計算的內存(不然實現太複雜了)。
這樣設計主要有這麼幾個須要考慮的點。首先,不須要緩存數據的應用能夠把整個空間用來執行計算,從而避免頻繁地把數據吐到磁盤上。其次,須要緩存數據的應用可以有一個數據存儲比例(R)的最低保證,也避免這部分緩存數據被所有逐出內存。最後,這個實現方式可以在默認狀況下,爲大多數使用場景提供合理的性能,而不須要專家級用戶來設置內存使用如何劃分。
雖然有兩個內存劃分相關的配置參數,但通常來講,用戶不須要設置,由於默認值已經可以適用於絕大部分的使用場景:
肯定一個數據集佔用內存總量最好的辦法就是,建立一個RDD,並緩存到內存中,而後再到web UI上」Storage」頁面查看。頁面上會展現這個RDD總共佔用了多少內存。
要評估一個特定對象的內存佔用量,能夠用 SizeEstimator.estimate 方法。這個方法對試驗哪一種數據結構可以裁剪內存佔用量比較有用,同時,也能夠幫助用戶瞭解廣播變量在每一個執行器堆上佔用的內存量。
減小內存消耗的首要方法就是避免過多的Java封裝(減小對象頭和額外輔助字段),好比基於指針的數據結構和包裝對象等。如下有幾條建議:
spark-env.sh 中設置這個參數。
若是通過上面的調整後,存儲的數據對象仍是太大,那麼你能夠試試將這些對象以序列化格式存儲,所須要作的只是經過 RDD persistence API 設置好存儲級別,如:MEMORY_ONLY_SER。Spark會將RDD的每一個分區以一個巨大的字節數組形式存儲起來。以序列化格式存儲的惟一缺點就是訪問數據會變慢一點,由於Spark須要反序列化每一個被訪問的對象。若是你須要序列化緩存數據,咱們強烈建議你使用Kryo(using Kryo),和Java序列化相比,Kryo能大大減小序列化對象佔用的空間(固然也比原始Java對象小不少)。
JVM的垃圾回收在某些狀況下可能會形成瓶頸,好比,你的RDD存儲常常須要「換入換出」(新RDD搶佔了老RDD內存,不過若是你的程序沒有這種狀況的話那JVM垃圾回收通常不是問題,好比,你的RDD只是載入一次,後續只是在這一個RDD上作操做)。當Java須要把老對象逐出內存的時候,JVM須要跟蹤全部的Java對象,並找出那些對象已經沒有用了。歸納起來就是,垃圾回收的開銷和對象個數成正比,因此減小對象的個數(好比用 Int數組取代 LinkedList),就能大大減小垃圾回收的開銷。固然,一個更好的方法就如前面所說的,以序列化形式存儲數據,這時每一個RDD分區都只包含有一個對象了(一個巨大的字節數組)。在嘗試其餘技術方案前,首先能夠試試用序列化RDD的方式(serialized caching)評估一下GC是否是一個瓶頸。
若是你的做業中各個任務須要的工做內存和節點上存儲的RDD緩存佔用的內存產生衝突,那麼GC極可能會出現問題。下面咱們將討論一下如何控制好RDD緩存使用的內存空間,以減小這種衝突。
衡量GC的影響
GC調優的第一步是統計一下,垃圾回收啓動的頻率以及GC所使用的總時間。給JVM設置一下這幾個參數(參考Spark配置指南 – configuration guide,查看Spark做業中的Java選項參數):-verbose:gc -XX:+PrintGCDetails,就能夠在後續Spark做業的worker日誌中看到每次GC花費的時間。注意,這些日誌是在集羣worker節點上(在各節點的工做目錄下stdout文件中),而不是你的驅動器所在節點。
高級GC調優
爲了進一步調優GC,咱們就須要對JVM內存管理有一個基本的瞭解:
Spark GC調優的目標就是確保老生代(Old generation )只保存長生命週期RDD,而同時新生代(Young generation )的空間又能足夠保存短生命週期的對象。這樣就能在任務執行期間,避免啓動full GC。如下是GC調優的主要步驟:
咱們的不少經驗代表,GC調優的效果和你的程序代碼以及可用的總內存相關。網上還有很多調優的選項說明(many more tuning options),但整體來講,就是控制好full GC的啓動頻率,就能有效減小垃圾回收開銷。
通常來講集羣並不會滿負荷運轉,除非你吧每一個操做的並行度都設得足夠大。Spark會自動根據對應的輸入文件大小來設置「map」類算子的並行度(固然你能夠經過一個SparkContext.textFile等函數的可選參數來控制並行度),而對於想 groupByKey 或reduceByKey這類 「reduce」 算子,會使用其各父RDD分區數的最大值。你能夠將並行度做爲構建RDD第二個參數(參考spark.PairRDDFunctions
),或者設置 spark.default.parallelism 這個默認值。通常來講,評估並行度的時候,咱們建議2~3個任務共享一個CPU。
若是RDD比內存要大,有時候你可能收到一個OutOfMemoryError,但其實這是由於你的任務集中的某個任務太大了,如reduce任務groupByKey。Spark的混洗(Shuffle)算子(sortByKey,groupByKey,reduceByKey,join等)會在每一個任務中構建一個哈希表,以便在任務中對數據分組,這個哈希表有時會很大。最簡單的修復辦法就是增大並行度,以減少單個任務的輸入集。Spark對於200ms之內的短任務支持很是好,由於Spark能夠跨任務複用執行器JVM,任務的啓動開銷很小,所以把並行度增長到比集羣中總CPU核數還可能是沒有任何問題的。
使用SparkContext中的廣播變量相關功能(broadcast functionality)能大大減小每一個任務自己序列化的大小,以及集羣中啓動做業的開銷。若是你的Spark任務正在使用驅動器(driver)程序中定義的巨大對象(好比:靜態查詢表),請考慮使用廣播變量替代之。Spark會在master上將各個任務的序列化後大小打印出來,因此你能夠檢查一下各個任務是否過大;一般來講,大於20KB的任務就值得優化一下。
數據本地性對Spark做業每每會有較大的影響。若是代碼和其所操做的數據在統一節點上,那麼計算速度確定會更快一些。但若是兩者不在一塊兒,那必然須要挪動其中之一。通常來講,挪動序列化好的代碼確定比挪動一大堆數據要快。Spark就是基於這個通常性原則來構建數據本地性的調度。
數據本地性是指代碼和其所處理的數據的距離。基於數據當前的位置,數據本地性能夠劃分紅如下幾個層次(按從近到遠排序):
Spark傾向於讓全部任務都具備最佳的數據本地性,但這並不是老是可行的。某些狀況下,可能會出現一些空閒的執行器(executor)沒有待處理的數據,那麼Spark可能就會犧牲一些數據本地性。有兩種可能的選項:a)等待已經有任務的CPU,待其釋放後當即在同一臺機器上啓動一個任務;b)當即在其餘節點上啓動新任務,並把所須要的數據複製過去。
而一般,Spark會等待一小會,看看是否有CPU會被釋放出來。一旦等待超時,則當即在其餘節點上啓動並將所需的數據複製過去。數據本地性各個級別之間的回落超時能夠單獨配置,也能夠在統一參數內一塊兒設定;詳細請參考 configuration page 中的 spark.locality 相關參數。若是你的任務執行時間比較長而且數據本地性不好,你就應該試試調大這幾個參數,不過默認值通常都能適用於大多數場景了。
本文是一個簡短的Spark調優指南,列舉了Spark應用調優一些比較重要的考慮點 – 最重要的就是,數據序列化和內存調優。對於絕大多數應用來講,用Kryo格式序列化數據可以解決大多數的性能問題。若是您有其餘關於性能調優最佳實踐的問題,歡迎郵件諮詢(Spark mailing list )。