Spark調優

Spark調優

寫在前面

對於調優, 我以爲是沒有放之四海而皆準的辦法.html

不少時候, 調優顯得沒有必要, 即便不進行調優, 程序也可以順利執行.java

在沒有出現問題的時候, 不進行調優, 即便是在大數據中, 這也是我經常採用的原則.node

而且, 針對問題再進行調優, 每每是更爲合適的.git

好比, 明明資源充足, 但程序運行依然很慢, 在這種狀況下, 咱們非得采起 kryo序列化的方式去增長一點點運行速度. 而沒有增長並行度.github

又或者有些時候, 減小沒必要要的 shuffle操做, 是更好地方式.apache

沒有哪一種調優手段是必須的, 且有效的.數組

這只是在幫助咱們去了解, 問題可能會出在哪些地方, 在代碼無可變動的狀況下, 又該如何調整, 以使程序順利運行. 生搬硬套, 萬萬不可取.緩存

官方連接: Tuning Spark安全

中文連接: Spark 調優服務器

因爲大多數 Spark 計算的內存性質,Spark 程序可能由集羣中的任何資源: 如 CPU,網絡帶寬, 內存 致使瓶頸, 一般狀況下,若是數據有合適的內存,瓶頸就是網絡帶寬,但有時您還須要進行一些調整,例如 以序列化形式存儲 RDD 來減小內存的使用。

本指南將涵蓋兩個主要的主題:數據序列化,這對於良好的網絡性能相當重要,而且還能夠減小內存使用和內存優化。咱們選幾個較小的主題進行展開。

數據序列化

序列化在任何分佈式應用程序的性能中起着重要的做用。很慢的將對象序列化或消費大量字節的格式將會大大減慢計算速度。一般,這多是您優化 Spark 應用程序的第一件事。Spark 宗旨在於方便和性能之間取得一個平衡(容許您使用操做中的任何 Java 類型)。它提供了兩種序列化庫:

  • Java serialization: 默認狀況下, Spark序列化對象使用的是 Java的 ObjectOutputStream, 只要你實現了 java.io.Serializable 接口, 就可以正常運行. 你也能夠經過使用 java.io.Externalizable 來控制 對象中的哪一部分屬性不須要進行序列化. Java序列化是至關靈活的, 但一般來講也是很是慢的. 並致使不少類序列化後的值太大.

參考: Serializable和Externalizable淺析

  • Kryo serialization: Spark一樣可使用 kryo(版本4)使得序列化對象變得更快, kryo的方式 通常要比 Java 自身的序列化速度快十倍以上, 但缺點是, 並不支持全部的類型的 序列化, 須要你在準備使用kryo的程序中註冊 classses.

你能夠切換到Kryo, 經過 sparkConf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer") 的方式. 經過這個序列化配置, 不只僅可以使 Kryo序列化使用在 shuffling操做中, 也能夠在 序列化 RDDs 到硬盤時使用. 之因此在 Spark中沒有將 Kryo當作默認實現方式的緣由是, 須要自身手動註冊對應的class.

可是建議你在 任何的 網絡密集型 application 中使用它. 從Spark 2.0.0以來, Spark在使用簡單類型, 如 基本類型的數組, 或是String類型, 在這些數據類型作 shuffling操做時, 使用的內部方式都是 Kryo.

Spark 自動包含 Kryo 序列化器,用於 Twitter chill 中 AllScalaRegistrar 涵蓋的許多經常使用的核心 Scala 類, 這意味着, 對於大多數常見類來講, 你並不須要經過以下方式註冊類:

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)

若是想要知道更多的 kryo細節, 能夠參考kryo的官方, 官方提供了更加先進的註冊方式, 如自定義 序列化 代碼.

Kryo serialization

還有一點須要注意到的地方是:

若是你的對象太大, 你須要增大 spark.kryoserializer.buffer 配置.

可是在看過配置說明以後, 發現實際上須要更改的配置是 spark.kryoserializer.buffer.max 暫時尚未通過驗證.

對於 spark.kryoserializer.buffer 配置而言, 默認值是 64k, 這個是 kryo buffer 的初始化值, 對於每一個worker中的每個核都只有一個 buffer. buffer 在須要的狀況下最終會增加到 spark.kryoserializer.buffer.max.

而 spark.kryoserializer.buffer.max 則是 kryo buffer 的最大值, 默認是 64m, 必須大於任何一個你須要序列化的對象 同時 要須要 2048m, 當遇到 buffer limit exceeded exception時 說明你須要增長這個值了.

最後須要注意的是, 即便你並不註冊 任何一個 classes, kryo依然能夠工做, 可是 此時不得不 存儲 類的全名稱, 這是很大的浪費.

內存調優

內存調優有三個方面須要考慮:

  • 你的對象所使用的內存總量(你可能想要整個數據集都存儲在內存中.)

  • 訪問對象的成本

  • 垃圾收集的開銷

通常來講, 訪問對象是很快速的, 可是很容易消耗比其字段中的 「raw」 數據多 2-5 倍的空間, 這是因爲如下幾個緣由:

  • 每一個不一樣的 Java 對象都有一個 「object header」,它大約是 16 個字節,包含一個指向它的類的指針。對於一個數據不多的對象(好比說一個Int字段),這能夠比數據大。

  • Java String 在原始字符串數據上具備大約 40 字節的開銷(由於它們存儲在 Char 數組中並保留額外的數據,例如長度),而且因爲 UTF-16 的內部使用而將每一個字符存儲爲 兩個 字節 String 編碼。所以,一個 10 個字符的字符串能夠容易地消耗 60 個字節。

  • 公共收集類,例如 HashMap 和 LinkedList,使用連接的數據結構,其中每一個條目(例如: Map.Entry)存在 「包裝器」 對象。該對象不只具備 header,還包括指針(一般爲 8 個字節)到列表中的下一個對象.

  • 原始類型的集合一般將它們存儲爲 「盒裝」 對象,例如: java.lang.Integer.

本節將從 Spark 的內存管理概述開始,而後討論用戶能夠採起的具體策略,以便在 application 中更有效地使用內存。具體來講,咱們將描述如何肯定對象的內存使用狀況,以及如何改進數據結構,或經過以序列化的格式存儲數據。而後咱們將介紹調整 Spark 的緩存大小和 Java 垃圾回收器。

內存管理

Spark中的內存使用主要能夠分爲兩大類, 執行 和 存儲.

執行內存 主要指 用於計算中的 混洗(shuffle), 合併(join), 聚合(aggregation).

存儲內存主要指 在集羣中 緩存 和 傳播內部數據 使用的內存.

在Spark中, 執行 和 存儲 共享統一區域(M). 當沒有 執行 內存在使用時, 存儲便可獲取全部的可用內存 反之亦然.

若是須要, 執行能夠 驅逐 存儲, 但這必須在 總的存儲 內存的使用量 低於 閾值(R). 換句話說, R 描述了M 緩存塊永遠不會被驅逐 的 區域. 因爲實現的複雜性, 存儲永遠 不會驅逐 執行.

該設計確保了幾個理想的性能.

首先,不使用緩存的應用程序能夠將整個空間用於執行,從而避免沒必要要的磁盤泄漏。

第二,使用緩存的應用程序能夠保留最小的存儲空間(R),其中數據塊不受驅逐。

最後,這種方法爲各類工做負載提供了合理的開箱即用性能,而不須要用戶內部如何分配內存的專業知識。

雖然提供了兩條相關配置, 可是典型 用戶 並不須要調整他們, 默認值已經可以知足大多數狀況下的使用要求了.

  • spark.memory.fraction 描述中提到的 內存 M, 是指佔用的 (JVM堆內存 - 300M) 的比率, 默認是0.6. 剩餘的0.4保留用於用戶數據結構,Spark中的內部元數據,而且在偶爾遇到異常大的記錄的狀況下保護OOM錯誤.

  • spark.memory.storageFraction 描述中所提到的R, 是指佔用 M 的比率, 默認是0.5. R 是 M 緩存塊中的緩存被執行驅逐的存儲空間.

第二個參數 也就已經描述了, 存儲 和 執行 各自須要佔用的比例, 也即, 對於 shuffle join agg 並不存在的 程序中, 徹底能夠將比例調低, 將內存供給 執行器使用. 默認是55開.

在 Jvm 的舊版本 或 長期支持的版本中, 應該指定 spark.memory.fraction 以適應 堆內存的大小.

肯定內存消耗

肯定數據集須要佔用內存的大小的最佳方式是, 建立RDD, 放入緩存中, 而後查看 Web UI 的 Storage 頁面.

爲了估算特定對象的內存佔用, 應該使用 SizeEstimator 的 estimate 方法, 這對於嘗試使用不一樣的數據佈局以減小內存使用量 以及 肯定廣播變量將在每一個執行程序堆上佔用的空間量頗有用.

調整數據結構

減小內存消耗的第一種方法是避免添加開銷的 Java 功能,例如基於指針的數據結構和包裝對象。有幾種方法能夠作到這一點:

  • 將數據結構設計爲偏好對象數組和原始類型,而不是標準的 Java 或 Scala 集合類(例如: HashMap)。該 fastutil 庫提供方便的集合類基本類型是與 Java 標準庫兼容。

    <dependency>
          <groupId>it.unimi.dsi</groupId>
          <artifactId>fastutil</artifactId>
          <version>8.3.0</version>
      </dependency>

fastUtil 官方連接

  • 儘量避免使用不少小對象和指針的嵌套結構. 好比Integer.

  • 考慮使用數字 ID 或枚舉對象而不是 字符串形式的 鍵.

  • 若是您的 RAM 小於32 GB,請設置 JVM 標誌 -XX:+UseCompressedOops,使指針爲4個字節而不是8個字節. 你能夠在 spark-env.sh中添加這個選項.

序列化RDD

儘管已經作了些許調整, 可是的對象仍然太大而沒法有效存儲,減小內存使用的一個更簡單的方法是以序列化形式存儲它們,使用 RDD 持久性 API 中的序列化 StorageLevel,例如: MEMORY_ONLY_SER, Spark 將會將每一個 RDD 分區存儲爲一個大字節數組。以序列化形式存儲數據的惟一缺點是訪問變得更慢,由於必須對每一個對象進行反序列化。若是你想以序列化形式緩存數據,強烈建議使用 Kryo,由於它致使比 Java 序列化更小的尺寸(並且確定比原 Java 對象)更小。

GC調優

當您的程序存儲的 RDD 有很大的」流失」時,JVM 垃圾收集多是一個問題。(程序中一般沒有問題,只讀一次 RDD,而後在其上運行許多操做)。當 Java 須要驅逐舊對象爲新的對象騰出空間時,須要跟蹤全部 Java 對象並找到未使用的。要記住的要點是,垃圾收集的成本與 Java 對象的數量成正比,所以使用較少對象的數據結構(例如: Ints 數組,而不是 LinkedList)大大下降了此成本。一個更好的方法是如上所述以序列化形式持久化對象:如今每一個 RDD 分區只有一個對象(一個字節數組)。在嘗試其餘技術以前,若是 GC 是一個問題,首先要使用序列化緩存。

因爲任務的工做內存(運行任務所需的空間)和緩存在節點上的 RDD 之間的干擾,GC 也多是一個問題。咱們將討論如何控制分配給RDD緩存的空間來減輕這一點。

測量 GC 的影響

GC 調整的第一步是收集關於垃圾收集發生頻率和GC花費的時間的統計信息。這能夠經過添加 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps 到 Java 選項來完成.

至於如何添加到 Java選項, 能夠參考: Spark集羣-Standalone 模式

下次運行 Spark 做業時,每當發生垃圾回收時,都會看到在工做日誌中打印的消息。請注意,這些日誌將在你的集羣的工做節點上(stdout 在其工做目錄中的文件中),而不是你的 driver節點。

高級 GC 優化

GC並不是只是在 Spark項目中獨有的, 而是放之Java而皆準的道理.

只要網上搜下, Java GC調優, 相信你會找到許許多多的資料, 我就不在這裏詳細描述了.

由於經過以前的描述, 已經懂得了如何估算使用內存, 查看特定對象的內存, 以及打印GC日誌.

最後, 能夠經過配置:

spark.executor.extraJavaOptions 來指定執行器的 GC 調整參數.

其餘

並行度

相信只要稍微瞭解過Spark調優, 就會看到這個觀點.

除非您爲每一個操做設置足夠高的並行度,不然羣集將沒法充分利用. Spark根據文件的大小自動設置要在每一個文件上運行的 「映射」 任務的數量(儘管您能夠經過可選的參數來控制它SparkContext.textFile,等等),而且對於分佈式的 reduce 操做(例如groupByKey和reduceByKey),它使用最大的父RDD的分區數。您能夠將並行性級別做爲第二個參數傳遞,或將config屬性設置spark.default.parallelism爲更改默認值。一般,咱們建議集羣中每一個CPU內核執行2-3個任務.

減小任務的內存使用

有時並不是由於RDD沒有足夠的內存致使內存溢出, 而是由於 而是由於 某一個 任務的 工做集, 如在 groupByKey 的 reduce任務 太大了, Spark的 shuffle操做 (sortByKey, groupByKey, reduceByKey, join 等操做) 創建每一個任務中的哈希表來進行分組,而這每每是很大的. 這裏最簡單的解決方案是 增長並行級別, 以便每一個任務的輸入集都更小。Spark 能夠有效地支持短達 200ms 的任務,由於它能夠將多個任務中的一個執行者 JVM 重用,而且任務啓動成本低,所以您能夠將並行級別安全地提升到比集羣中的核心數量更多。

廣播大的變量

使用 可用的廣播功能 SparkContext 能夠大大減小每一個序列化任務的大小,以及在羣集上啓動做業的成本。若是您的任務使用其中的驅動程序中的任何大對象(例如:靜態查找表),請考慮將其變爲廣播變量。Spark 打印主機上每一個任務的序列化大小,所以您能夠查看該任務以決定您的任務是否過大; 通常任務大於 20KB 大概值得優化。

數據本地化

數據本地化可能會對 Spark job 的性能產生重大影響。若是數據和在其上操做的代碼在一塊兒,則計算每每是快速的。但若是代碼和數據分開,則必須移動到另外一個。一般,代碼大小遠小於數據,所以將數據代碼從一個地方寄送到另外一個地方比一大塊數據更快。Spark 圍繞數據局部性的通常原則構建其調度。

數據本地化是指數據和代碼處理有多近。根據數據的當前位置有幾個地方級別。從最近到最遠的順序:

  • PROCESS_LOCAL 數據與運行代碼在同一個 JVM 中。這是可能的最好的地方

  • NODE_LOCAL 數據在同一個節點上。示例可能在同一節點上的 HDFS 或同一節點上的另外一個執行程序中。這比 PROCESS_LOCAL 由於數據必須在進程之間移動慢一些

  • NO_PREF 數據從任何地方一樣快速訪問,而且沒有本地偏好

  • RACK_LOCAL 數據位於同一機架上的服務器上。數據位於同一機架上的不一樣服務器上,所以須要經過網絡發送,一般經過單個交換機發送

  • ANY 數據在網絡上的其餘地方,而不在同一個機架中

Spark 喜歡將全部 task 安排在最佳的本地級別,但並不能作到永遠如願以償。在任何空閒 executor 中沒有未處理數據的狀況下,Spark 將切換到較低的本地級別。

有兩個選項:

  1. 等待繁忙的CPU釋放以在同一服務器上的數據上啓動任務

  2. 當即將數據移動到更遠的地方啓動新任務

Spark一般要作的是稍等一下,以期釋放繁忙的CPU。

一旦超時到期,它將開始將數據從很遠的地方移到空閒的CPU中。每一個級別之間的回退等待超時能夠單獨配置,也能夠一塊兒配置在一個參數中。

有關詳細信息,請參見配置頁面spark.locality上的 參數。若是您的任務很長而且位置不佳,則應該增長這些設置,可是默認設置一般效果很好。

參數有:

spark.locality.wait 默認值3秒, 級別會逐漸從 PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL 逐漸過渡. 採起的時間都是相同的.

能夠分別指定三種類型對應的數據. spark.locality.wait.node, spark.locality.wait.process, spark.locality.wait.rack.

相關文章
相關標籤/搜索