[Spark性能調優] Spark Shuffle 中 JVM 內存使用及配置詳情
本課主題
- JVM 內存使用架構剖析
- Spark 1.6.x 和 Spark 2.x 的 JVM 剖析
- Spark 1.6.x 之前 on Yarn 計算內存使用案例
- Spark Unified Memory 的運行原理和機制
|
引言
Spark 從1.6.x 開始對 JVM 的內存使用做出了一種全新的改變,Spark 1.6.x 之前是基於靜態固定的JVM內存使用架構和運行機制,若是你不知道 Spark 到底對 JVM 是怎麼使用,你怎麼能夠頗有信心地或者是徹底肯定地掌握和控制數據的緩存空間呢,因此掌握Spark對JVM的內存使用內幕是相當重要的。不少人對 Spark 的印象是:它是基於內存的,並且能夠緩存一大堆數據,顯現 Spark 是基於內存的觀點是錯的,Spark 只是優先充分地利用內存而已。若是你不知道 Spark 能夠緩存多少數據,你就誤亂地緩存數據的話,確定會有問題。html
在數據規模已經肯定的狀況下,你有多少 Executor 和每一個 Executor 可分配多少內存 (在這個物理硬件已經肯定的狀況下),你必須清楚知道你的內存最多可以緩存多少數據;在 Shuffle 的過程當中又使用了多少比例的緩存,這樣對於算法的編寫以及業務實現是相當重要的!!!算法
文章的後部份會介紹 Spark 2.x 版本 JVM 的內存使用比例,它被稱之爲 Spark Unified Memory,這是統一或者聯合的意思,可是 Spark 沒有用 Shared 這個字,由於 A 和 B 進行 Unified 和 A 和 B 進行 Shared 實際上是兩個不一樣的概念, Spark 在運行的時候會有不一樣類型的 OOM,你必須搞清楚這個 OOM 背後是由什麼致使的。 好比說咱們使用算子 mapPartition 的時候,通常會建立一些臨時對象或者是中間數據,你這個時候使用的臨時對象和中間數據,是存儲在一個叫 UserSpace 裏面的用戶操做空間,那你有沒有想過這個空間的大小會致使應用程序出現 OOM 的狀況,在 Spark 2.x 中 Broadcast 的數據是存儲在什麼地方;ShuffleMapTask 的數據又存儲在什麼地方,可能你會認爲 ShuffleMapTask 的數據是緩存在 Cache 中。這篇文章會介紹 JVM 在 Spark 1.6.X 之前和 2.X 版本對 Java 堆的使用,還會逐一解密上述幾個疑問,也會簡單介紹 Spark 1.6.x 之前版本在 Spark On Yarn 上內存的使用案例,但願這篇文章能爲讀者帶出如下的啓發: 緩存
- 瞭解 JVM 內存使用架構剖析
- 瞭解 JVM 在 Spark 1.6.x 之前和 Spark 2.x 中能夠緩存多少數據
- 瞭解 Spark Unified Memory 的原理與機制還有它三大核心空間的用途
- 瞭解 Shuffle 在 Spark 1.6.x 之前和 Spark 2.x 中可使用多少緩存
- 瞭解 Spark1.6.x 之前 on Yarn 對內存的使用
- 瞭解 在 Spark 1.6.x 之前和 Spark 2.x Shuffle 的參數配置
JVM 內存使用架構剖析
JVM 有不少不一樣的區,最開始的時候,它會經過類裝載器把類加載進來,在運行期數據區中有 "本地方法棧","程序計數器","Java 棧"、"Java 堆"和"方法區"以及本地方法接口和它的本地庫。從 Spark 的角度來談代碼的運行和數據的處理,主要是談 Java 堆 (Heap) 空間的運用。安全
[下圖是JVM 內存架構圖]
數據結構
- 本地方法棧:這個是在迭歸的時候確定是相當重要的;
- 程序計數器:這是一個全區計數器,對於線程切換是相當重要的;
- Java 棧 (Stack):Stack 區屬於線程私有,高效的程序通常都是併發的,每一個線程都會包含一個 Stack 區域,Stack 區域中含有基本的數據類型以及對象的引用,其它線程均不能直接訪問該區域;Java 棧分爲三大部份:基本數據類型區域、操做指令區域、上下文等;
- Java 堆 (Heap):存儲的所有都是 Object 對象實例,對象實例中通常都包含了其數據成員以及與該對象對應類的信息,它會指向類的引用一個,不一樣線程確定要操做這個對象;一個 JVM 實例在運行的時候只有一個 Heap 區域,並且該區域被全部的線程共享;補充說明:垃圾回收是回收堆 (heap) 中內容,堆上纔有咱們的對象。
- 方法區:又名靜態成員區域,包含整個程序的 class、static 成員等,類自己的字節碼是靜態的;它會被全部的線程共享和是全區級別的;
Spark 1.6.x 和 2.x 的 JVM 剖析
Spark JVM 到底能夠緩存多少數據架構
下圖顯示的是Spark 1.6.x 之前版本對 Java 堆 (heap) 的使用狀況,左則是 Storage 對內存的使用,右則是 Shuffle 對內存的使用,這叫 StaticMemoryManagement,數據處理以及類的實體對象都存放在 JVM 堆 (heap) 中。併發
[下圖是 Spark 1.6x 之前版本對 JVM 堆 Storage 和 Shuffle 的使用分佈]
app
Spark 1.6.x 版本對 JVM 堆的使用框架
JVM Heap 默認狀況下是 512MB,這是取決於 spark.executor.memory 的參數,在回答 Spark JVM 到底能夠緩存多少數據這個問題以前,首先了解一下 JVM Heap 在 Spark 中是如何分配內存比例的。不管你定義了 spark.executor.memory 的內存空間有多大,Spark 必然會定義一個安全空間,在默認狀況下只會使用 Java 堆上的 90% 做爲安全空間,在單個 Executor 的角度來說,就是 Heap Size x 90%。oop
埸景一:假設說在一個Executor,它可用的 Java Heap 大小是 10G,實際上 Spark 只能使用 90%,這個安全空間的比例是由spark.storage.safetyFaction 來控制的。(若是你內存的 Heap 很是大的話,能夠嘗試調高爲 95%),在安全空間中也會劃分三個不一樣的空間:一個是 Storage 空間、一個是 Unroll 空間和一個是 Shuffle 空間。
- 安全空間 (safe):計算公式是 spark.executor.memory x spark.storage.safetyFraction。也就是 Heap Size x 90%,在埸景一的例子中是 10 x 0.9 = 9G;
- 緩存空間 (Storage):計算公式是 spark.executor.memory x spark.storage.safetyFraction x spark.storage.memoryFraction。也就是 Heap Size x 90% x 60%;Heap Size x 54%,在埸景一的例子中是 10 x 0.9 x 0.6 = 5.4G;一個應用程序能夠緩存多少數據是由 safetyFraction 和 memoryFraction 這兩個參數共同決定的。
[下圖是 StaticMemoryManager.scala 中的 getMaxStorageMemory 方法]
- Unroll 空間:Shuffle 空間:計算公式是 spark.executor.memory x spark.shuffle.memoryFraction x spark.shuffle.safteyFraction。在 Shuffle 空間中也會有一個默認 80% 的安全空間比例,因此應該是 Heap Size x 20% x 80%;Heap Size x 16%,在埸景一的例子中是 10 x 0.2 x 0.8 = 1.6G;從內存的角度講,你須要從遠程抓取數據,抓取數據是一個 Shuffle 的過程,好比說你須要對數據進行排序,顯如今這個過程當中須要內存空間。
- 計算公式是 spark.executor.memory x spark.storage.safetyFraction x spark.storage.memoryFraction x spark.storage.unrollFraction
也就是 Heap Size x 90% x 60% x 20%;Heap Size x 10.8%,在埸景一的例子中是 10 x 0.9 x 0.6 x 0.2 = 1.8G,你可能把序例化後的數據放在內存中,當你使用數據時,你須要把序例化的數據進行反序例化。
[下圖是 StaticMemoryManager.scala 中的 maxUnrollMemory 變量]
- 對 cache 緩存數據的影響是因爲 Unroll 是一個優先級較高的操做,進行 Unroll 操做的時候會佔用 cache 的空間,並且又能夠擠掉緩存在內存中的數據 (若是該數據的緩存級別是 MEMORY_ONLY 的話,不然該數據會丟失)。
[下圖是 StaticMemoryManager.scala 中的 getMaxExecutionMemory 方法]
Spark Unified Memory 原理和運行機制
下圖是一種叫聯合內存 (Spark Unified Memeory),數據緩存與數據執行之間的內存能夠相互移動,這是一種更彈性的方式,下圖顯示的是 Spark 2.x 版本對 Java 堆 (heap) 的使用狀況,數據處理以及類的實體對象存放在 JVM 堆 (heap) 中。
[下圖是 Spark 2.x 版本對 JVM 堆 Storage 和 Execution 的使用分佈]
Spark 2.x 版本對 JVM 堆的使用
Spark 2.1.0 新型 JVM Heap 分紅三個部份:Reserved Memory、User Memory 和 Spark Memory。
- Reserved Memory:默認都是300MB,這個數字通常都是固定不變的,在系統運行的時候 Java Heap 的大小至少爲 Heap Reserved Memory x 1.5. e.g. 300MB x 1.5 = 450MB 的 JVM配置。通常本地開發例如說在 Windows 系統上,建義系統至少 2G 的大小。
[下圖是 UnifiedMemoryManager.scala 中 UnifiedMemoryManager 伴生對象裏的 RESERVED_SYSTEM_MEMORY_BYTES 參數]
SparkMemory空間默認是佔可用 HeapSize 的 60%,與上圖顯示的75%有點不一樣,固然這個參數是可配置的!!
[下圖是 UnifiedMemoryManager.scala 中 UnifiedMemoryManager 伴生對象裏的 getMaxMemory 方法]
- User Memory:寫 Spark 程序中產生的臨時數據或者是本身維護的一些數據結構也須要給予它一部份的存儲空間,你能夠這麼認爲,這是程序運行時用戶能夠主導的空間,叫用戶操做空間。它佔用的空間是 (Java Heap - Reserved Memory) x 25%(默認是25%,能夠有參數供調優),這樣設計可讓用戶操做時所須要的空間與系統框架運行時所須要的空間分離開。假設 Executor 有 4G 的大小,那麼在默認狀況下 User Memory 大小是:(4G - 300MB) x 25% = 949MB,也就是說一個 Stage 內部展開後 Task 的算子在運行時最大的大小不可以超過 949MB。例如工程師使用 mapPartition 等,一個 Task 內部全部算子使用的數據空間的大小若是大於 949MB 的話,那麼就會出現 OOM。思考題:有 100個 Executors 每一個 4G 大小,如今要處理 100G 的數據,假設這 100G 分配給 100個 Executors,每一個 Executor 分配 1G 的數據,這 1G 的數據遠遠少於 4G Executor 內存的大小,爲何還會出現 OOM 的狀況呢?那是由於在你的代碼中(e.g.你寫的應用程序算子)超過用戶空間的限制 (e.g. 949MB),而不是 RDD 自己的數據超過了限制。
- Spark Memeory:系統框架運行時須要使用的空間,這是從兩部份構成的,分別是 Storage Memeory 和 Execution Memory。如今 Storage 和 Execution (Shuffle) 採用了 Unified 的方式共同使用了 (Heap Size - 300MB) x 75%,默認狀況下 Storage 和 Execution 各佔該空間的 50%。你能夠從圖中能夠看出,Storgae 和 Execution 的存儲空間能夠往上和往下移動。
定義:所謂 Unified 的意思是 Storgae 和 Execution 在適當時候能夠借用彼此的 Memory,須要注意的是,當 Execution 空間不足並且 Storage 空間也不足的狀況下,Storage 空間若是曾經使用了超過 Unified 默認的 50% 空間的話則超過部份會被強制 drop 掉一部份數據來解決 Execution 空間不足的問題 (注意:drop 後數據會不會丟失主要是看你在程序設置的 storage_level 來決定你是 Drop 到那裏,可能 Drop 到磁盤上),這是由於執行(Execution) 比緩存 (Storage) 是更重要的事情。
[下圖是 UnifiedMemoryManager.scala 中 UnifiedMemoryManager 伴生對象裏的 apply 方法]
可是也有它的基本條件限制,Execution 向 Storage 借空間有兩種狀況:具體代碼實現能夠參考源碼補充 : Spark 2.1.X 中 Unified 和 Static MemoryManager
[下圖是 Execution 向 Storage 借空間的第一種狀況]
第一種狀況:Storage 曾經向 Execution 借了空間,它緩存的數據多是很是的多,而後 Execution 又不須要那麼大的空間 (默認狀況下各佔 50%),假設如今 Storage 佔了 80%,Execution 佔了 20%,而後 Execution 說本身空間不足,Execution 會向內存管理器發信號把 Storgae 曾經佔用的超過 50%數據的那部份強制擠掉,在這個例子中擠掉了 30%;
[下圖是 Execution 向 Storage 借空間的第二種狀況]
第二種狀況:Execution 能夠向 Storage Memory 借空間,在 Storage Memory 不足 50% 的狀況下,Storgae Memory 會很樂意地把剩餘空間借給 Execution。相反當 Execution 有剩餘空間的時候,Storgae 也能夠找 Execution 借空間。
- Storage Memeory:至關於舊版本的 Storage 空間,在舊版本中 Storage 佔了 54% 的 Heap 空間,這個空間會負責存儲 Persist、Unroll 以及 Broadcast 的數據。假設 Executor 有 4G 的大小,那麼 Storage 空間是:(4G - 300MB) x 75% x 50% = 1423.5MB 的空間,也就是說若是你的內存夠大的話,你能夠擴播足夠大的變量,擴播對於性能提高是一件很重要的事情,由於它全部的線程都是共享的。從算子運行的角度來說,Spark 會傾向於數據直接從 Storgae Memeory 中抓取過來,這也就所謂的內存計算。
- Execution Memeory:至關於舊版本的 Shuffle 空間,這個空間會負責存儲 ShuffleMapTask 的數據。好比說從上一個 Stage 抓取數據和一些聚合的操做、等等。在舊版本中 Shuffle 佔了 16% 的 Heap 空間。Execution 若是空間不足的狀況下,除了選擇向 Storage Memory 借空間之外,也能夠把一部份數據 Spill 到磁盤上,但不少時候基於性能調優方面的考慮都不想把數據 Spill 到磁盤上。思考題:你以爲是 Storgae 空間或者是 Execution 空間比較重要呢?
Spark1.6.x 之前 on Yarn 計算內存使用案例
這是一張 Spark 運行在 Yarn 上的架構圖,它有 Driver 和 Executor 部份,在 Driver 部份有一個內存控制參數,Spark 1.6.x 之前是spark.driver.memory,在實際生產環境下建義配置成 2G。若是 Driver 比較繁忙或者是常常把某些數據收集到 Driver 上的話,建義把這個參數調大一點。
圖的左邊是 Executor 部份,它是被 Yarn 管理的,每臺機制上都有一個 Node Manager;Node Manager 是被 Resources Manager 管理的,Resources Manager 的工做主要是管理全區級別的計算資源,計算資源核心就是內存和 CPU,每臺機器上都有一個 Node Manager 來管理當前內存和 CPU 等資源。Yarn 通常跟 Hadoop 藕合,它底層會有 HDFS Node Manager,主要是負責管理當前機器進程上的數據而且與HDFS Name Node 進行通訊。
[下圖是 Spark on Yarn 的架構圖]
在每一個節點上至少有兩個進程,一個是 HDFS Data Node,負責管理磁盤上的數據,另一個是 Yarn Node Manager,負責管理執行進程,在這兩個 Node 的下面有兩個 Executors,每一個 Executor 裏面運行的都是 Tasks。從 Yarn 的角度來說,會配置每一個 Executor 所佔用的空間,以防止資源競爭,Yarn 裏有一個叫 Node Memory Pool 的概念,能夠配置 64G 或者是 128G,Node Memory Pool 是當前節點上總共可以使用的內存大小。
圖中這兩個 Executors 在兩個不一樣的進程中 (JVM#1 和 JVM#2),裏面的 Task 是並行運行的,Task 是運行在線程中,但你能夠配置 Task 使用線程的數量,e.g. 2條線程或者是4條線程,但默認狀況下都是1條線程去處理一個Task,你也能夠用 spark.executor.cores 去配置可用的 Core 以及 spark.executor.memory 去配置可用的 RAM 的大小。
在 Yarn 上啓動 Spark Application 的時候能夠經過如下參數來調優:
- -num-executor 或者 spark.executor.instances 來指定運行時所須要的 Executor 的個數;
- -executor-memory 或者 spark.executor.memory 來指定每一個 Executor 在運行時所須要的內存空間;
- -executor-cores 或者是 spark.executor.cores 來指定每一個 Executor 在運行時所須要的 Cores 的個數;
- -driver-memory 或者是 spark.driver.memory 來指定 Driver 內存的大小;
- spark.task.cpus 來指定每一個 Task 運行時所須要的 Cores 的個數;
場景一:例如 Yarn 集羣上有 32 個 Node 來運行的 Node Manager,每一個 Node 的內存是 64G,每一個 Node 的 Cores 是 32 Cores,假如說每一個 Node 咱們要分配兩個 Executors,那麼能夠把每一個 Executor 分配 28G,Cores 分配爲 12 個 Cores,每一個 Spark Task 在運行的時候只須要一個 Core 就行啦,那麼咱們 32 個 Nodes 同時能夠運行: 32 個 Node x 2 個 Executors x (12 個 Cores / 1) = 768 個 Task Slots,也就是說這個集羣能夠並行運行 768 個 Task,若是 Job 超過了 Task 能夠並行運行的數量 (e.g. 768) 則須要排隊。那麼這個集羣模能夠緩存多少數據呢?從理論上:32 個 Node x 2 個 Executors x 28g x 90% 安全空間 x 60%緩存空間 = 967.68G,這個緩存數量對於普通的 Spark Job 而言是徹底夠用的,而實際上在運行中可能只能緩存 900G 的數據,900G 的數據從磁盤儲存的角度數據有多大呢?仍是 900G 嗎?不是的,數據通常都會膨脹好幾倍,這是和壓縮、序列化和反序列化框架有關,因此在磁盤上可能也就 300G 的樣子的數據。
總結
瞭解 Spark Shuffle 中的 JVM 內存使用空間對一個Spark應用程序的內存調優是相當重要的。跟據不一樣的內存控制原理分別對存儲和執行空間進行參數調優:spark.executor.memory, spark.storage.safetyFraction, spark.storage.memoryFraction, spark.storage.unrollFraction, spark.shuffle.memoryFraction, spark.shuffle.safteyFraction。
Spark 1.6 之前的版本是使用固定的內存分配策略,把 JVM Heap 中的 90% 分配爲安全空間,而後從這90%的安全空間中的 60% 做爲存儲空間,例如進行 Persist、Unroll 以及 Broadcast 的數據。而後再把這60%的20%做爲支持一些序列化和反序列化的數據工做。其次當程序運行時,JVM Heap 會把其中的 80% 做爲運行過程當中的安全空間,這80%的其中20%是用來負責 Shuffle 數據傳輸的空間。
Spark 2.0 中推出了聯合內存的概念,最主要的改變是存儲和運行的空間能夠動態移動。須要注意的是執行比存儲有更大的優先值,當空間不夠時,能夠向對方借空間,但前提是對方有足夠的空間或者是 Execution 能夠強制把 Storage 一部份空間擠掉。Excution 向 Storage 借空間有兩種方式:第一種方式是 Storage 曾經向 Execution 借了空間,它緩存的數據多是很是的多,當 Execution 須要空間時能夠強制拿回來;第二種方式是 Storage Memory 不足 50% 的狀況下,Storgae Memory 會很樂意地把剩餘空間借給 Execution。
若是是你的計算比較複雜的狀況,使用新型的內存管理 (Unified Memory Management) 會取得更好的效率,可是若是說計算的業務邏輯須要更大的緩存空間,此時使用老版本的固定內存管理 (StaticMemoryManagement) 效果會更好。
參考資料
資料來源來至
DT大數據夢工廠 大數據商業案例以及性能調優
[1] 第29課:完全解密Spark 1.6.X之前Shuffle中JVM內存使用及配置內幕詳情:Spark到底可以緩存多少數據、Shuffle到底佔用了多少數據、磁盤的數據遠遠比內存小卻仍是報告內存不足?
[2] 第30課:完全解密Spark 2.1.X中Shuffle中JVM Unified Memory內幕詳情:Spark Unified Memory的運行原理和機制是什麼?Spark JVM最小配置是什麼?用戶空間何時會出現OOM?Spark中的Broadcast究竟是存儲在什麼空間的?ShuffleMapTask的使用的數據到底在什麼地方?
[3] Spark Architecture
[4] Spark new memory management
Spark源碼圖片取自於 Spark 2.1.0版本
參考:http://www.cnblogs.com/jcchoiling/p/6494652.html