1.前言
在執行Spark的應用程序時,Spark集羣會啓動Driver和Executor兩種JVM進程,前者爲主控進程,負責建立Spark上下文,提交Spark做業(Job),並將做業轉化爲計算任務(Task),在各個Executor進程間協調任務的調度,後者負責在工做節點上執行具體的計算任務,並將結果返回給Driver,同時爲須要持久化的RDD提供存儲功能。因爲Driver的內存管理相對來講較爲簡單,本文主要對Executor的內存管理進行分析,下文中的Spark內存均特指Executor的內存。apache
2.堆內和堆外內存
做爲一個JVM進程,Executor的內存管理創建在JVM的內存管理之上,Spark對JVM的堆內(On-heap)空間進行了更爲詳細的分配,以充分利用內存。同時,Spark引入了堆外(Off-heap)內存,使之能夠直接在工做節點的系統內存中開闢空間,進一步優化了內存的使用。緩存
2.1 堆內內存(On-heap Memory)
堆內內存的大小,由Spark應用程序啓動時的–executor-memory或spark.executor.memory參數配置。Executor內運行的併發任務共享JVM堆內內存,這些任務在緩存RDD和廣播(Broadcast)數據時佔用的內存被規劃爲存儲(Storage)內存,而這些任務在執行Shuffle時佔用的內存被規劃爲執行(Execution)內存,剩餘的部分不作特殊規劃,那些Spark內部的對象實例,或者用戶定義的Spark應用程序中的對象實例,均佔用剩餘的空間。不一樣的管理模式下,這三部分佔用的空間大小各不相同(下面第2小節介紹)。併發
2.1.1 堆內內存的申請與釋放性能
Spark對堆內內存的管理是一種邏輯上的「規劃式」的管理,由於對象實例佔用內存的申請和釋放都由JVM完成,Spark只能在申請後和釋放前記錄這些內存:測試
申請內存優化
Spark在代碼中new一個對象實例
JVM從堆內內存分配空間,建立對象並返回對象引用
Spark保存該對象的引用,記錄該對象佔用的內存
釋放內存ui
Spark記錄該對象釋放的內存,刪除該對象的引用
等待JVM的垃圾回收機制釋放該對象佔用的堆內內存
2.1.2 堆內內存優缺點分析spa
咱們知道,堆內內存採用JVM來進行管理。而JVM的對象能夠以序列化的方式存儲,序列化的過程是將對象轉換爲二進制字節流,本質上能夠理解爲將非連續空間的鏈式存儲轉化爲連續空間或塊存儲,在訪問時則須要進行序列化的逆過程——反序列化,將字節流轉化爲對象,序列化的方式能夠節省存儲空間,但增長了存儲和讀取時候的計算開銷。操作系統
對於Spark中序列化的對象,因爲是字節流的形式,其佔用的內存大小可直接計算。
對於Spark中非序列化的對象,其佔用的內存是經過週期性地採樣近似估算而得,即並非每次新增的數據項都會計算一次佔用的內存大小。這種方法:code
- 下降了時間開銷可是有可能偏差較大,致使某一時刻的實際內存有可能遠遠超出預期
- 此外,在被Spark標記爲釋放的對象實例,頗有可能在實際上並無被JVM回收,致使實際可用的內存小於Spark記錄的可用內存。因此Spark並不能準確記錄實際可用的堆內內存,從而也就沒法徹底避免內存溢出(OOM, Out of Memory)的異常。
雖然不能精準控制堆內內存的申請和釋放,但Spark經過對存儲內存和執行內存各自獨立的規劃管理,能夠決定是否要在存儲內存裏緩存新的RDD,以及是否爲新的任務分配執行內存,在必定程度上能夠提高內存的利用率,減小異常的出現。
2.1.3 堆內內存分區(靜態方式,棄)
在靜態內存管理機制下,存儲內存、執行內存和其餘內存三部分的大小在Spark應用程序運行期間是固定的,但用戶能夠在應用程序啓動前進行配置,堆內內存的分配如圖所示:
能夠看到,可用的堆內內存的大小須要按照下面的方式計算:
可用的存儲內存 = systemMaxMemory * spark.storage.memoryFraction * spark.storage.safetyFraction 可用的執行內存 = systemMaxMemory * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction
其中systemMaxMemory取決於當前JVM堆內內存的大小,最後可用的執行內存或者存儲內存要在此基礎上與各自的memoryFraction參數和safetyFraction參數相乘得出。上述計算公式中的兩個safetyFraction參數,其意義在於在邏輯上預留出1-safetyFraction這麼一塊保險區域,下降因實際內存超出當前預設範圍而致使OOM的風險(上文提到,對於非序列化對象的內存採樣估算會產生偏差)。值得注意的是,這個預留的保險區域僅僅是一種邏輯上的規劃,在具體使用時Spark並無區別對待,和「其它內存」同樣交給了JVM去管理。
2.1.4 堆內內存分區(統一方式,現)
默認狀況下,Spark 僅僅使用了堆內內存。Executor 端的堆內內存區域大體能夠分爲如下四大塊:
分區 | 說明 |
Execution 內存 | 主要用於存放 Shuffle、Join、Sort、Aggregation 等計算過程當中的臨時數據 |
Storage 內存 | 主要用於存儲 spark 的 cache 數據,例如RDD的緩存、unroll數據 |
用戶內存(User Memory) | 主要用於存儲 RDD 轉換操做所須要的數據,例如 RDD 依賴等信息 |
預留內存(Reserved Memory) | 系統預留內存,會用來存儲Spark內部對象 |
整個 Executor 端堆內內存若是用圖來表示的話,能夠歸納以下:
咱們對上圖進行如下說明:
- systemMemory = Runtime.getRuntime.maxMemory,其實就是經過參數 spark.executor.memory 或 –executor-memory 配置的。
- reservedMemory 在 Spark 2.2.1 中是寫死的,其值等於 300MB,這個值是不能修改的(若是在測試環境下,咱們能夠經過 spark.testing.reservedMemory 參數進行修改);
- usableMemory = systemMemory – reservedMemory,這個就是 Spark 可用內存;
- 關於動態佔用機制,因爲統一內存管理方式中堆內堆外內存的管理均基於此機制,因此單獨提出來說解。參見文本第三節。
2.2 堆外內存(Off-heap Memory)
爲了進一步優化內存的使用以及提升Shuffle時排序的效率,Spark引入了堆外(Off-heap)內存,使之能夠直接在工做節點的系統內存中開闢空間,存儲通過序列化的二進制數據。除了沒有other空間,堆外內存與堆內內存的劃分方式相同,全部運行中的併發任務共享存儲內存和執行內存。
Spark 1.6 開始引入了Off-heap memory(詳見SPARK-11389)。這種模式不在 JVM 內申請內存,而是調用 Java 的 unsafe 相關 API 進行諸如 C 語言裏面的 malloc() 直接向操做系統申請內存。因爲這種方式不通過 JVM 內存管理,因此能夠避免頻繁的 GC,這種內存申請的缺點是必須本身編寫內存申請和釋放的邏輯。
2.2.2 堆外內存的優缺點
利用JDK Unsafe API(從Spark 2.0開始,在管理堆外的存儲內存時再也不基於Tachyon,而是與堆外的執行內存同樣,基於JDK Unsafe API實現[3]),Spark能夠直接操做系統堆外內存,減小了沒必要要的內存開銷,以及頻繁的GC掃描和回收,提高了處理性能。堆外內存能夠被精確地申請和釋放,並且序列化的數據佔用的空間能夠被精確計算,因此相比堆內內存來講下降了管理的難度,也下降了偏差。
2.2.3 堆外內存分區(靜態方式,棄)
堆外的空間分配較爲簡單,存儲內存、執行內存的大小一樣是固定的
可用的執行內存和存儲內存佔用的空間大小直接由參數spark.memory.storageFraction決定,因爲堆外內存佔用的空間能夠被精確計算,因此無需再設定保險區域。
靜態內存管理機制實現起來較爲簡單,但若是用戶不熟悉Spark的存儲機制,或沒有根據具體的數據規模和計算任務或作相應的配置,很容易形成「一半海水,一半火焰」的局面,即存儲內存和執行內存中的一方剩餘大量的空間,而另外一方卻早早被佔滿,不得不淘汰或移出舊的內容以存儲新的內容。因爲新的內存管理機制的出現,這種方式目前已經不多有開發者使用,出於兼容舊版本的應用程序的目的,Spark仍然保留了它的實現。
2.2.4 堆外內存分區(統一方式,現)
相比堆內內存,堆外內存只區分 Execution 內存和 Storage 內存,其內存分佈以下圖所示:
關於動態佔用機制,因爲統一內存管理方式中堆內堆外內存的管理均基於此機制,因此單獨提出來說解。參見文本第三節。
3. 動態佔用機制–Execution&&Storage
細心的同窗確定看到上面兩張圖中的 Execution 內存和 Storage 內存之間存在一條虛線,這是爲何呢?
在 Spark 1.5 以前,Execution 內存和 Storage 內存分配是靜態的,換句話說就是若是 Execution 內存不足,即便 Storage 內存有很大空閒程序也是沒法利用到的;反之亦然。這就致使咱們很難進行內存的調優工做,咱們必須很是清楚地瞭解 Execution 和 Storage 兩塊區域的內存分佈。
而目前 Execution 內存和 Storage 內存能夠互相共享的。也就是說,若是 Execution 內存不足,而 Storage 內存有空閒,那麼 Execution 能夠從 Storage 中申請空間;反之亦然。因此上圖中的虛線表明 Execution 內存和 Storage 內存是能夠隨着運做動態調整的,這樣能夠有效地利用內存資源。Execution 內存和 Storage 內存之間的動態調整能夠歸納以下:
3.1 動態調整策略
具體的實現邏輯以下:
- 程序提交的時候咱們都會設定基本的 Execution 內存和 Storage 內存區域(經過 spark.memory.storageFraction 參數設置);
- 在程序運行時,雙方的空間都不足時,則存儲到硬盤;將內存中的塊存儲到磁盤的策略是按照 LRU 規則(Least Recently Used)進行的。若己方空間不足而對方空餘時,可借用對方的空間;(存儲空間不足是指不足以放下一個完整的 Block)
- Execution 內存的空間被對方佔用後,可以讓對方將佔用的部分轉存到硬盤,而後」歸還」借用的空間
- Storage 內存的空間被對方佔用後,目前的實現是沒法讓對方」歸還」,由於須要考慮 Shuffle 過程當中的不少因素,實現起來較爲複雜;並且 Shuffle 過程產生的文件在後面必定會被使用到,而 Cache 在內存的數據不必定在後面使用。
注意,上面說的借用對方的內存須要借用方和被借用方的內存類型都同樣,都是堆內內存或者都是堆外內存,不存在堆內內存不夠去借用堆外內存的空間。
4. Task內存申請流程
爲了更好地使用使用內存,Executor 內運行的 Task 之間共享着 Execution 內存。具體的,Spark 內部維護了一個 HashMap 用於記錄每一個 Task 佔用的內存:
- 當 Task 須要在 Execution 內存區域申請 numBytes 內存,其先判斷 HashMap 裏面是否維護着這個 Task 的內存使用狀況,若是沒有,則將這個 Task 內存使用置爲0,而且以 TaskId 爲 key,內存使用爲 value 加入到 HashMap 裏面。
- 以後爲這個 Task 申請 numBytes 內存,若是 Execution 內存區域正好有大於 numBytes 的空閒內存,則在 HashMap 裏面將當前 Task 使用的內存加上 numBytes,而後返回;若是當前 Execution 內存區域沒法申請到每一個 Task 最小可申請的內存,則當前 Task 被阻塞,直到有其餘任務釋放了足夠的執行內存,該任務才能夠被喚醒。
- 每一個 Task 可使用 Execution 內存大小範圍爲 1/2N ~ 1/N,其中 N 爲當前 Executor 內正在運行的 Task 個數。
- 一個 Task 可以運行必須申請到最小內存爲 (1/2N * Execution 內存);當 N = 1 的時候,Task 可使用所有的 Execution 內存。好比若是 Execution 內存大小爲 10GB,當前 Executor 內正在運行的 Task 個數爲5,則該 Task 能夠申請的內存範圍爲 10 / (2 * 5) ~ 10 / 5,也就是 1GB ~ 2GB的範圍。
5. 內存分配示例
爲了更好的理解上面堆內內存和堆外內存的使用狀況,這裏給出一個簡單的例子
5.1 只用了堆內內存
如今咱們提交的 Spark 做業關於內存的配置以下:
--executor-memory 18g
因爲沒有設置 spark.memory.fraction
和 spark.memory.storageFraction
參數,咱們能夠看到 Spark UI 關於 Storage Memory 的顯示以下:
上圖很清楚地看到 Storage Memory 的可用內存是 10.1GB,這個數是咋來的呢?根據前面的規則,咱們能夠得出如下的計算:
systemMemory = spark.executor.memory reservedMemory = 300MB usableMemory = systemMemory - reservedMemory StorageMemory= usableMemory * spark.memory.fraction * spark.memory.storageFraction
若是咱們把數據代進去,得出如下的結果:
systemMemory = 18Gb = 19327352832 字節 reservedMemory = 300MB = 300 * 1024 * 1024 = 314572800 usableMemory = systemMemory - reservedMemory = 19327352832 - 314572800 = 19012780032 StorageMemory= usableMemory * spark.memory.fraction * spark.memory.storageFraction = 19012780032 * 0.6 * 0.5 = 5703834009.6 = 5.312109375GB
不對啊,和上面的 10.1GB 對不上啊。爲何呢?這是由於 Spark UI 上面顯示的 Storage Memory 可用內存其實等於 Execution 內存和 Storage 內存之和,也就是 usableMemory * spark.memory.fraction:
StorageMemory= usableMemory * spark.memory.fraction = 19012780032 * 0.6 = 11407668019.2 = 10.62421GB
仍是不對,這是由於咱們雖然設置了 –executor-memory 18g,可是 Spark 的 Executor 端經過 Runtime.getRuntime.maxMemory 拿到的內存其實沒這麼大,只有 17179869184 字節,因此 systemMemory = 17179869184,而後計算的數據以下:
systemMemory = 17179869184 字節 reservedMemory = 300MB = 300 * 1024 * 1024 = 314572800 usableMemory = systemMemory - reservedMemory = 17179869184 - 314572800 = 16865296384 StorageMemory= usableMemory * spark.memory.fraction = 16865296384 * 0.6 = 9.42421875 GB
咱們經過將上面的 16865296384 * 0.6 字節除於 1024 * 1024 * 1024 轉換成 9.42421875 GB,和 UI 上顯示的仍是對不上,這是由於 Spark UI 是經過除於 1000 * 1000 * 1000 將字節轉換成 GB,以下:
systemMemory = 17179869184 字節 reservedMemory = 300MB = 300 * 1024 * 1024 = 314572800 usableMemory = systemMemory - reservedMemory = 17179869184 - 314572800 = 16865296384 StorageMemory= usableMemory * spark.memory.fraction = 16865296384 * 0.6 字節 = 16865296384 * 0.6 / (1000 * 1000 * 1000) = 10.1GB
如今終於對上了。
具體將字節轉換成 GB 的計算邏輯以下(core 模塊下面的 /core/src/main/resources/org/apache/spark/ui/static/utils.js):
function formatBytes(bytes, type) { if (type !== 'display') return bytes; if (bytes == 0) return '0.0 B'; var k = 1000; var dm = 1; var sizes = ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']; var i = Math.floor(Math.log(bytes) / Math.log(k)); return parseFloat((bytes / Math.pow(k, i)).toFixed(dm)) + ' ' + sizes[i]; }
咱們設置了 –executor-memory 18g,可是 Spark 的 Executor 端經過 Runtime.getRuntime.maxMemory 拿到的內存其實沒這麼大,只有 17179869184 字節,這個數據是怎麼計算的?
Runtime.getRuntime.maxMemory 是程序可以使用的最大內存,其值會比實際配置的執行器內存的值小。這是由於內存分配池的堆部分分爲 Eden,Survivor 和 Tenured 三部分空間,而這裏面一共包含了兩個 Survivor 區域,而這兩個 Survivor 區域在任什麼時候候咱們只能用到其中一個,因此咱們可使用下面的公式進行描述:
ExecutorMemory = Eden + 2 * Survivor + Tenured Runtime.getRuntime.maxMemory = Eden + Survivor + Tenured
上面的 17179869184 字節可能由於你的 GC 配置不同獲得的數據不同,可是上面的計算公式是同樣的。
5.2 堆內內存+堆外內存
如今若是咱們啓用了堆外內存,狀況咋樣呢?咱們的內存相關配置以下:
spark.executor.memory 18g spark.memory.offHeap.enabled true spark.memory.offHeap.size 10737418240
從上面能夠看出,堆外內存爲 10GB,如今 Spark UI 上面顯示的 Storage Memory 可用內存爲 20.9GB,以下:
其實 Spark UI 上面顯示的 Storage Memory 可用內存等於堆內內存和堆外內存之和,計算公式以下:
堆內:
systemMemory = 17179869184 字節 reservedMemory = 300MB = 300 * 1024 * 1024 = 314572800 usableMemory = systemMemory - reservedMemory = 17179869184 - 314572800 = 16865296384 totalOnHeapStorageMemory = usableMemory * spark.memory.fraction = 16865296384 * 0.6 = 10119177830
堆外
totalOffHeapStorageMemory = spark.memory.offHeap.size = 10737418240 StorageMemory = totalOnHeapStorageMemory + totalOffHeapStorageMemory = (10119177830 + 10737418240) 字節 = (20856596070 / (1000 * 1000 * 1000)) GB = 20.9 GB
幾個問題
1. 再也不細分unroll,統一爲storage
MemoryManager在storage內存中細分了unroll,靜態內存管理的實現劃分了unroll這部份內存,並設置了比例。統一內存管理再也不細分unroll,統一爲storage。
2. 爲何設置300M預留內存
統一內存管理最第一版本other這部份內存沒有固定值300M設置,而是和靜態內存管理類似,設置的百分比,最第一版本佔25%。百分比設置在實際使用中出現了問題,若給定的內存較低時,例如1G,會致使OOM,具體討論參考這裏Make unified memory management work with small heaps,所以,other這部份內存作了修改,先劃出300M內存。
spark.memory.fraction由0.75 降至 0.6
spark.memory.fraction最第一版本的值是0.75,不少分析統一內存管理這塊的文章也是這麼介紹的,一樣的,在使用中發現這個值設置的偏高,致使了gc時間過長,spark 2.0版本將其調整爲0.6,詳細談論參見Reduce spark.memory.fraction default to avoid overrunning old gen in JVM default config。
總結
以下表:
內存類別 | 區域劃分 | 管理方式 | 優缺點 |
on-heap | – Execution Memory – Storage Memory – User Memory – Reserved Memory |
使用JVM管理 | |
off-heap | – Execution Memory – Storage Memory |
手動管理,不通過JVM | 能夠避免頻繁的 GC 可是必須本身編寫內存申請和釋放的邏輯 |