一分鐘瞭解spark的調優

Tuning Spark

因爲大多數 Spark 計算的內存性質, Spark 程序可能由集羣中的任何資源( CPU ,網絡帶寬或內存)致使瓶頸。 一般狀況下,若是數據有合適的內存,瓶頸就是網絡帶寬,但有時您還須要進行一些調整,例如 以序列化形式存儲 RDD 來減小內存的使用。 本指南將涵蓋兩個主要的主題:數據序列化,這對於良好的網絡性能相當重要,而且還能夠減小內存使用和內存優化。 咱們選幾個較小的主題進行展開。html

數據序列化

序列化在任何分佈式應用程序的性能中起着重要的做用。 很慢的將對象序列化或消費大量字節的格式將會大大減慢計算速度。 一般,這多是您優化 Spark 應用程序的第一件事。 Spark 宗旨在於方便和性能之間取得一個平衡(容許您使用操做中的任何 Java 類型)。 它提供了兩種序列化庫:java

  • Java serialization: 默認狀況下,使用 Java ObjectOutputStream 框架的 Spark 序列化對象,而且能夠與您建立的任何實現 java.io.Serializable 的類一塊兒使用。 您還能夠經過擴展 java.io.Externalizable 來更緊密地控制序列化的性能。 Java 序列化是靈活的,但一般至關緩慢,並致使許多類的大型序列化格式。
  • Kryo serialization: Spark 也可使用 Kryo 庫(版本2)來更快地對對象進行序列化。 Kryo 比 Java 序列化(一般高達10x)要快得多,並且更緊湊,但並不支持全部的 Serializable 類型,而且須要先註冊您將在程序中使用的類以得到最佳性能。

您能夠經過使用 SparkConf 初始化做業 並進行調用來切換到使用 Kryo conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")。此設置配置用於不只在工做節點之間進行洗牌數據的串行器,並且還將 RDD 序列化到磁盤。 Kryo 不是默認的惟一緣由是由於自定義註冊要求,但咱們建議您嘗試在任何網絡密集型應用程序。自從 Spark 2.0.0 以來,咱們在使用簡單類型,簡單類型的數組或字符串類型對RDD進行混洗時,內部使用 Kryo serializer 。git

Spark 自動包含 Kryo 序列化器,用於 Twitter chill 中 AllScalaRegistrar 涵蓋的許多經常使用的核心 Scala 類。github

要使用 Kryo 註冊本身的自定義類,請使用該 registerKryoClasses 方法。apache

val conf = new SparkConf().setMaster(...).setAppName(...) conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2])) val sc = new SparkContext(conf)

所述 Kryo 文檔 描述了更先進的註冊選項,如添加自定義序列的代碼。api

若是您的對象很大,您可能還須要增長 spark.kryoserializer.buffer 配置。該值須要足夠大才能容納您將序列化的最大對象。數組

最後,若是您沒有註冊自定義類, Kryo 仍然能夠工做,但它必須存儲每一個對象的完整類名稱,這是浪費的。緩存

內存調優

有三個方面的考慮在調整內存使用:該的存儲你的對象所使用的(你可能但願你的整個數據集,以適應在內存中),則成本訪問這些對象,而且開銷垃圾收集(若是你有高成交物品條款)。安全

默認狀況下, Java 對象能夠快速訪問,但能夠輕鬆地消耗比其字段中的 「raw」 數據多2-5倍的空間。這是因爲如下幾個緣由:服務器

  • 每一個不一樣的 Java 對象都有一個 「object header」 ,它大約是16個字節,包含一個指向它的類的指針。對於一個數據不多的對象(好比說一個Int字段),這能夠比數據大。
  • Java String 在原始字符串數據上具備大約40字節的開銷(由於它們存儲在 Char 數組中並保留額外的數據,例如長度),而且因爲 UTF-16 的內部使用而將每一個字符存儲爲兩個字節 String 編碼。所以,一個10個字符的字符串能夠容易地消耗60個字節。
  • 公共收集類,例如 HashMap 和 LinkedList ,使用連接的數據結構,其中每一個條目(例如: Map.Entry )存在」包裝器」對象。該對象不只具備 header ,還包括指針(一般爲8個字節)到列表中的下一個對象。
  • 原始類型的集合一般將它們存儲爲」盒裝」對象,例如: java.lang.Integer

本節將從 Spark 的內存管理概述開始,而後討論用戶能夠採起的具體策略,以便在他/她的應用程序中更有效地使用內存。具體來講,咱們將描述如何肯定對象的內存使用狀況,以及如何改進數據結構,或經過以串行格式存儲數據。而後咱們將介紹調整 Spark 的緩存大小和 Java 垃圾回收器。

內存管理概述

Spark 中的內存使用大部分屬於兩類:執行和存儲。執行存儲器是指用於以混洗,鏈接,排序和聚合計算的存儲器,而存儲內存是指用於在集羣中緩存和傳播內部數據的內存。在 Spark 中,執行和存儲共享一個統一的區域(M)。當沒有使用執行存儲器時,存儲器能夠獲取全部可用的存儲器,反之亦然。若是須要,執行能夠驅逐存儲,但只有在總存儲內存使用量低於某個閾值(R)以前。換句話說, R 描述 M 緩存塊永遠不會被驅逐的區域。因爲實施的複雜性,存儲不得驅逐執行。

該設計確保了幾個理想的性能。首先,不使用緩存的應用程序能夠將整個空間用於執行,從而避免沒必要要的磁盤泄漏。第二,使用緩存的應用程序能夠保留最小的存儲空間(R),其中數據塊不受驅逐。最後,這種方法爲各類工做負載提供了合理的開箱即用性能,而不須要用戶內部如何分配內存的專業知識。

雖然有兩種相關配置,但典型用戶不須要調整它們,由於默認值適用於大多數工做負載:

  • spark.memory.fraction 表示大小 M(JVM堆空間 - 300MB)(默認爲0.6)的一小部分。剩餘的空間(40%)保留用於用戶數據結構,Spark中的內部元數據,而且在稀疏和異常大的記錄的狀況下保護OOM錯誤。
  • spark.memory.storageFraction 表示大小 R 爲 M (默認爲0.5)的一小部分。 R 是 M 緩存塊中的緩存被執行驅逐的存儲空間。

spark.memory.fraction 應該設置值,以便在 JVM 的舊版或」終身」版本中溫馨地適應這一堆堆空間。有關詳細信息,請參閱下面高級 GC 調優的討論。

肯定內存消耗

大小數據集所需的內存消耗量的最佳方式是建立 RDD ,將其放入緩存中,並查看 Web UI 中的「存儲」頁面。該頁面將告訴您 RDD 佔用多少內存。

爲了估計特定對象的內存消耗,使用 SizeEstimator 的 estimate 方法這是用於與不一樣的數據佈局試驗修剪內存使用狀況,以及肯定的空間的廣播變量將佔據每一個執行器堆的量是有用的。

調整數據結構

減小內存消耗的第一種方法是避免添加開銷的 Java 功能,例如基於指針的數據結構和包裝對象。有幾種方法能夠作到這一點:

  1. 將數據結構設計爲偏好對象數組和原始類型,而不是標準的 Java 或 Scala 集合類(例如: HashMap )。該 fastutil 庫提供方便的集合類基本類型是與 Java 標準庫兼容。
  2. 儘量避免使用不少小對象和指針的嵌套結構。
  3. 考慮使用數字 ID 或枚舉對象而不是鍵的字符串。
  4. 若是您的 RAM 小於32 GB,請設置 JVM 標誌 -XX:+UseCompressedOops ,使指針爲4個字節而不是8個字節。您能夠添加這些選項 spark-env.sh

序列化 RDD 存儲

當您的對象仍然太大而沒法有效存儲,儘管這種調整,減小內存使用的一個更簡單的方法是以序列化形式存儲它們,使用 RDD 持久性 API 中的序列化 StorageLevel ,例如: MEMORY_ONLY_SER 。 Spark 將會將每一個 RDD 分區存儲爲一個大字節數組。以序列化形式存儲數據的惟一缺點是訪問時間較短,由於必須對每一個對象進行反序列化。若是您想以序列化形式緩存數據,咱們強烈建議使用 Kryo ,由於它致使比 Java 序列化更小的尺寸(並且確定比原 Java 對象)更小。

垃圾收集調整

當您的程序存儲的 RDD 有很大的」流失」時, JVM 垃圾收集多是一個問題。(程序中一般沒有問題,只讀一次 RDD ,而後在其上運行許多操做)。 當 Java 須要驅逐舊對象爲新的對象騰出空間時,須要跟蹤全部 Java 對象並找到未使用的。要記住的要點是,垃圾收集的成本與 Java 對象的數量成正比,所以使用較少對象的數據結構(例如: Ints數組,而不是 LinkedList )大大下降了此成本。 一個更好的方法是如上所述以序列化形式持久化對象:如今每一個 RDD 分區只有一個對象(一個字節數組)。 在嘗試其餘技術以前,若是 GC 是一個問題,首先要使用序列化緩存

因爲任務的工做記憶(運行任務所需的空間)和緩存在節點上的 RDD 之間的干擾, GC 也多是一個問題。咱們將討論如何控制分配給RDD緩存的空間來減輕這一點。

測量 GC 的影響

GC 調整的第一步是收集關於垃圾收集發生頻率和GC花費的時間的統計信息。這能夠經過添加 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps 到 Java 選項來完成。(有關將 Java 選項傳遞給 Spark 做業的信息,請參閱配置指南)下次運行 Spark 做業時,每當發生垃圾回收時,都會看到在工做日誌中打印的消息。請注意,這些日誌將在您的羣集的工做節點上( stdout 在其工做目錄中的文件中),而不是您的驅動程序。

高級 GC 優化

爲了進一步調整垃圾收集,咱們首先須要瞭解一些關於 JVM 內存管理的基本信息:

  • Java堆空間分爲兩個區域 Young 和 Old 。 Young 一代的目的是持有短命的物體,而 Old 一代的目標是使用壽命更長的物體。

  • Young 一代進一步分爲三個區域[ Eden , Survivor1 , Survivor2 ]。

  • 垃圾收集過程的簡化說明:當 Eden 已滿時, Eden 上運行了一個小型 GC ,並將 Eden 和 Survivor1 中存在的對象複製到 Survivor2 。倖存者地區被交換。若是一個對象足夠老,或者 Survivor2 已滿,則會移動到 Old 。最後,當 Old 接近滿時,一個完整的 GC 被調用。

Spark 中 GC 調優的目的是確保只有長壽命的 RDD 存儲在 Old 版本中,而且 Young 版本的大小足夠存儲短命期的對象。這將有助於避免使用完整的 GC 來收集任務執行期間建立的臨時對象。可能有用的一些步驟是:

  • 經過收集 GC 統計信息來檢查垃圾收集是否太多。若是在任務完成以前屢次調用完整的 GC ,這意味着沒有足夠的可用於執行任務的內存。

  • 若是過小的集合太多,而不是不少主要的 GC ,爲 Eden 分配更多的內存將會有所幫助。您能夠將 Eden 的大小設置爲對每一個任務須要多少內存的估計。若是肯定 Eden 的大小 E ,那麼您可使用該選項設置年輕一代的大小 -Xmn=4/3*E 。(按比例增長4/3是考慮倖存者地區使用的空間。)

  • 在打印的 GC 統計信息中,若是 OldGen 接近於滿,則經過下降減小用於緩存的內存量 spark.memory.fraction; 緩存較少的對象比減慢任務執行更好。或者,考慮減小年輕一代的大小。這意味着 -Xmn 若是您將其設置爲如上所述下降。若是沒有,請嘗試更改 JVM NewRatio 參數的值。許多 JVM 默認爲2,這意味着 Old 版本佔據堆棧的2/3。它應該足夠大,使得該分數超過 spark.memory.fraction

  • 嘗試使用 G1GC 垃圾回收器 -XX:+UseG1GC。在垃圾收集是瓶頸的一些狀況下,它能夠提升性能. 請注意,對於大型 excutor 的堆大小,經過設置 -XX:G1HeapRegionSize 參數來增長 G1 區域的大小 是很是重要的

  • 例如,若是您的任務是從 HDFS 讀取數據,則可使用從 HDFS 讀取的數據塊的大小來估計任務使用的內存量。請注意,解壓縮塊的大小一般是塊大小的2或3倍。因此若是咱們但願有3或4個任務的工做空間,而 HDFS 塊的大小是128MB,咱們能夠估計 Eden 的大小4*3*128MB

  • 監控垃圾收集的頻率和時間如何隨着新設置的變化而變化。

咱們的經驗代表, GC 調整的效果取決於您的應用程序和可用的內存量。有更多的優化選項 在線描述,但在較高的水平,管理完整的 GC 如何常常發生能夠減小開銷幫助。

能夠經過spark.executor.extraJavaOptions在做業的配置中設置來指定執行器的 GC 調整標誌。

其餘注意事項

並行度水平

集羣不會被充分利用,除非您將每一個操做的並行級別設置得足夠高。自動星火設置的 「地圖」 任務的數量根據其大小對每一個文件運行(儘管你能夠經過可選的參數來控制它 SparkContext.textFile ,等等),以及用於分佈式」減小」操做,如: groupByKey 和 reduceByKey ,它採用了最大父 RDD 的分區數。您能夠將並行級別做爲第二個參數傳遞(請參閱 spark.PairRDDFunctions 文檔),或者將 config 屬性設置 spark.default.parallelism 爲更改默認值。通常來講,咱們建議您的羣集中每一個 CPU 內核有2-3個任務。

減小任務的內存使用

有時,您將獲得一個 OutOfMemoryError ,由於您的 RDD 不適合內存,而是由於您的其中一個任務的工做集(如其中一個 reduce 任務groupByKey)太大。 Spark 的 shuffle 操做(sortByKey, groupByKey, reduceByKey, join,等)創建每一個任務中的哈希表來進行分組,而這每每是很大的。這裏最簡單的解決方案是增長並行級別,以便每一個任務的輸入集都更小。 Spark 能夠有效地支持短達200 ms 的任務,由於它能夠將多個任務中的一個執行者JVM重用,而且任務啓動成本低,所以您能夠將並行級別安全地提升到集羣中的核心數量。

廣播大的變量

使用 可用的廣播功能 SparkContext 能夠大大減小每一個序列化任務的大小,以及在羣集上啓動做業的成本。若是您的任務使用其中的驅動程序中的任何大對象(例如:靜態查找表),請考慮將其變爲廣播變量。 Spark 打印主機上每一個任務的序列化大小,所以您能夠查看該任務以決定您的任務是否過大; 通常任務大於20 KB大概值得優化。

數據本地化

數據本地化可能會對 Spark job 的性能產生重大影響。若是數據和在其上操做的代碼在一塊兒,則計算每每是快速的。但若是代碼和數據分開,則必須移動到另外一個。一般,代碼大小遠小於數據,所以將數據代碼從一個地方寄送到另外一個地方比一大塊數據更快。 Spark 圍繞數據局部性的通常原則構建其調度。

數據本地化是指數據和代碼處理有多近。根據數據的當前位置有幾個地方級別。從最近到最遠的順序:

  • PROCESS_LOCAL 數據與運行代碼在同一個 JVM 中。這是可能的最好的地方
  • NODE_LOCAL 數據在同一個節點上。示例可能在同一節點上的 HDFS 或同一節點上的另外一個執行程序中。這比 PROCESS_LOCAL 由於數據必須在進程之間移動慢一些
  • NO_PREF 數據從任何地方一樣快速訪問,而且沒有本地偏好
  • RACK_LOCAL 數據位於同一機架上的服務器上。數據位於同一機架上的不一樣服務器上,所以須要經過網絡發送,一般經過單個交換機發送
  • ANY 數據在網絡上的其餘地方,而不在同一個機架中

Spark 喜歡將全部 task 安排在最佳的本地級別,但這並不老是可能的。在任何空閒 executor 中沒有未處理數據的狀況下, Spark 將切換到較低的本地級別。有兩個選項: a )等待一個繁忙的 CPU 釋放在相同服務器上的數據上啓動任務,或者 b )當即在更遠的地方啓動一個新的任務,須要在那裏移動數據。

Spark 一般作的是等待一個繁忙的 CPU 釋放的但願。一旦超時,它將開始將數據從遠處移動到可用的 CPU 。每一個級別之間的回退等待超時能夠在一個參數中單獨配置或所有配置; 有關詳細信息,請參閱配置頁面 spark.locality 上的 參數。若是您的 task 很長,而且本地化差,您應該增長這些設置,但默認值一般會很好。

概要

這是一個簡短的指南,指出調整 Spark 應用程序時應該瞭解的主要問題 - 最重要的是數據序列化和內存調整。對於大多數程序,以序列化形式切換到 Kryo 序列化和持久化數據將會解決大多數常見的性能問題。隨時在 Spark 郵件列表中詢問有關其餘調優最佳作法的信息。

相關文章
相關標籤/搜索