spark 源碼分析之十五 -- Spark內存管理剖析

本篇文章主要剖析Spark的內存管理體系。html

在上篇文章 spark 源碼分析之十四 -- broadcast 是如何實現的?中對存儲相關的內容沒有作過多的剖析,下面計劃先剖析Spark的內存機制,進而進入內存存儲,最後再剖析磁盤存儲。本篇文章主要剖析內存管理機制。java

總體介紹

Spark內存管理相關類都在 spark core 模塊的 org.apache.spark.memory 包下。apache

文檔對這個包的解釋和說明以下:緩存

This package implements Spark's memory management system. This system consists of two main components, a JVM-wide memory manager and a per-task manager:

- org.apache.spark.memory.MemoryManager manages Spark's overall memory usage within a JVM. This component implements the policies for dividing the available memory across tasks and for allocating memory between storage (memory used caching and data transfer) and execution (memory used by computations, such as shuffles, joins, sorts, and aggregations).
- org.apache.spark.memory.TaskMemoryManager manages the memory allocated by individual tasks. Tasks interact with TaskMemoryManager and never directly interact with the JVM-wide MemoryManager. Internally, each of these components have additional abstractions for memory bookkeeping: - org.apache.spark.memory.MemoryConsumers are clients of the TaskMemoryManager and correspond to individual operators and data structures within a task. The TaskMemoryManager receives memory allocation requests from MemoryConsumers and issues callbacks to consumers in order to trigger spilling when running low on memory. - org.apache.spark.memory.MemoryPools are a bookkeeping abstraction used by the MemoryManager to track the division of memory between storage and execution.

 

即內存管理主要涉及了兩個組件:JVM 範圍的內存管理和單個任務的內存管理。數據結構

  1. MemoryManager管理Spark在JVM中的整體內存使用狀況。該組件實現了跨任務劃分可用內存以及在存儲(內存使用緩存和數據傳輸)和執行(計算使用的內存,如shuffle,鏈接,排序和聚合)之間分配內存的策略。
  2. TaskMemoryManager管理由各個任務分配的內存。任務與TaskMemoryManager交互,永遠不會直接與JVM範圍的MemoryManager交互。

 

在TaskMemoryManager內部,每一個組件都有額外的記憶簿來記錄內存使用狀況:app

 

  • MemoryConsumers是TaskMemoryManager的客戶端,對應於任務中的各個運算符和數據結構。TaskMemoryManager接收來自MemoryConsumers的內存分配請求,並向消費者發出回調,以便在內存不足時觸發溢出。
  • MemoryPools是MemoryManager用來跟蹤存儲和執行之間內存劃分的薄記抽象。 

如圖:ide

 

MemoryManager的兩種實現: 源碼分析

There are two implementations of org.apache.spark.memory.MemoryManager which vary in how they handle the sizing of their memory pools: 
- org.apache.spark.memory.UnifiedMemoryManager, the default in Spark 1.6+, enforces soft boundaries between storage and execution memory, allowing requests for memory in one region to be fulfilled by borrowing memory from the other.
- org.apache.spark.memory.StaticMemoryManager enforces hard boundaries between storage and execution memory by statically partitioning Spark's memory and preventing storage and execution from borrowing memory from each other. This mode is retained only for legacy compatibility purposes.

 

org.apache.spark.memory.MemoryManager有兩種實現,它們在處理內存池大小方面有所不一樣:post

  • org.apache.spark.memory.UnifiedMemoryManager,Spark 1.6+中的默認值,強制存儲內存和執行內存之間的軟邊界,容許經過從另外一個區域借用內存來知足一個區域中的內存請求。
  • org.apache.spark.memory.StaticMemoryManager 經過靜態分區Spark的內存,強制存儲內存和執行內存之間的硬邊界並防止存儲和執行從彼此借用內存。 僅爲了傳統兼容性目的而保留此模式。

先來一張本身畫的類圖,對涉及類之間的關係有一個比較直接的認識:測試

 

下面咱們逐一對涉及的類作說明。

MemoryMode

內存模式:主要分堆內內存和堆外內存,MemoryMode是一個枚舉類,從本質上來講,ON_HEAP和OFF_HEAP都是MemoryMode的子類。

MemoryPool

文檔說明以下:

Manages bookkeeping for an adjustable-sized region of memory. This class is internal to the MemoryManager. 

 

即它負責管理可調大小的內存區域的簿記工做。能夠這樣理解,內存就是一個金庫,它是一個負責記帳的管家,主要負責記錄內存的借出歸還。這個類專門爲MempryManager而設計。

給內存記帳,其實從本質上來講,它不是Spark內存管理部分的核心功能,可是又很重要,它的核心方法都是被MemoryManager來調用的。

理解了這個類,其子類就比較好理解了。記帳的管家有兩種實現,分別是StorageMemoryPool和ExecutionMemoryPool。

StorageMemoryPool

文檔解釋:

Performs bookkeeping for managing an adjustable-size pool of memory that is used for storage (caching).

 

說白了,它就是專門給負責存儲或緩存的內存區域記帳的。

其類結構以下:

它有三種方法:

1. acquireMemory:獲取N個字節的內存給指定的block,若是有必要,即內存不夠用了,能夠將其餘的從內存中驅除。源碼以下:

圖中標記的邏輯,參照下文MemoryStore的剖析。

2. releaseMemory:釋放內存。源碼以下:

很簡單,就只是在統計值_memoryUsed 上面作減法。

3. freeSpaceToShrinkPool:可用空間經過`spaceToFree`字節縮小此存儲內存池的大小。源碼以下:

 

簡單地能夠看出,這個方法是在收縮存儲內存池以前調用的,由於這個方法返回值是要收縮的值。

收縮存儲內存池是爲了擴大執行內存池,即這個方法是在收縮存儲內存,擴大執行內存時用的,這個方法只是爲了縮小存儲內存池做準備的,並無真正的縮小存儲內存池。

實現思路,首先先計算須要驅逐的內存大小,若是須要驅逐內存,則跟 acquireMemory 方法相似,調用MemoryStore 的 evictBlocksToFreeSpace方法,不然直接返回。

總結:這個類是給存儲內存池記帳的,也負責不夠時或內存池不知足縮小條件時,通知MemoryStore驅逐內存。

 

ExecutionMemoryPool

文檔解釋:

Implements policies and bookkeeping for sharing an adjustable-sized pool of memory between tasks. 
Tries to ensure that each task gets a reasonable share of memory,
instead of some task ramping up to a large amount first and then causing others to spill to disk repeatedly.
If there are N tasks, it ensures that each task can acquire at least 1 / 2N of the memory before it has to spill,
and at most 1 / N. Because N varies dynamically, we keep track of the set of active tasks and redo the calculations
of 1 / 2N and 1 / N in waiting tasks whenever this set changes. This is all done by synchronizing access to mutable
state and using wait() and notifyAll() to signal changes to callers. Prior to Spark 1.6, this arbitration of memory
across tasks was performed by the ShuffleMemoryManager.

 

實現策略和簿記,以便在任務之間共享可調大小的內存池。 嘗試確保每一個任務得到合理的內存份額,而不是首先增長大量任務而後致使其餘任務重複溢出到磁盤。

若是有N個任務,它確保每一個任務在溢出以前至少能夠獲取1 / 2N的內存,最多1 / N.

因爲N動態變化,咱們會跟蹤活動任務的集合並在每當任務集合改變時重作等待任務中的1 / 2N和1 / N的計算。

這一切都是經過同步對可變狀態的訪問並使用 wait() 和 notifyAll() 來通知對調用者的更改來完成的。 在Spark 1.6以前,跨任務的內存仲裁由ShuffleMemoryManager執行。 

 
類內部結構以下:

memoryForTask聲明以下:

1 @GuardedBy("lock")
2 private val memoryForTask = new mutable.HashMap[Long, Long]()

其中,key 指的是 taskAttemptId, value 是內存使用狀況(以byte計算)。它用來記錄每個任務內存使用狀況。

它也有三類方法:

1. 獲取總的或每個任務的內存使用大小,源碼以下:

memoryForTask 記錄了每個task使用的內存大小。

 

2. 給一個任務分配內存,源碼以下:

numBytes表示申請的內存大小(in byte),taskAttemptId 表示申請內存的 task id,maybeGrowPool 表示一個可能會增長執行池大小的回調。 它接受一個參數(Long),表示應該擴展此池的所需內存量。computeMaxPoolSize 表示在此給定時刻返回此池的最大容許大小的回調。這不是字段,由於在某些狀況下最大池大小是可變的。 例如,在統一內存管理中,能夠經過驅逐緩存塊來擴展執行池,從而縮小存儲池。

若是以前該任務沒有申請過,則將(taskAttemptId <- 0) 放入到 memoryForTask map 中, 而後釋放鎖並喚醒lock鎖等待區的線程。

被喚醒的由於synchronized實現的是一個互斥鎖,因此當前僅當只有一個線程執行while循環。

首先根據 (須要的內存大小 - 池總空閒內存大小)來確認是否須要擴大池,因爲存儲池可能會偷執行池的內存,因此須要執行 maybeGrowPool 方法。

computeMaxPoolSize計算出此時該池容許的最大內存大小。而後分別算出每一個任務最大分配內存和最小分配內存。進而計算出分配給該任務的最大分配大小(maxToGrant)和實際分配大小(toGrant)。

若是實際分配大小 小於須要分配的內存大小 而且 當前任務佔有內存 + 實際分配內存 < 每一個任務最小分配內存,則該線程進入鎖wait區等待,等待內存可用時喚醒,不然將內存分配給任務。

能夠看到這個方法中的wait和notify方法並非成對的,由於新添加的taskAttemptId不能知足內存可用的條件。由於這個鎖是從外部傳過來的,即MemoryManager也可能對其作了操做,使內存空餘下來,可供任務分配。

3. 釋放task內存,源碼以下:

它有兩個方法,分別是釋放當前任務已經使用的全部內存空間 releaseAllMemoryForTask 和釋放當前任務的指定大小的內存空間 releaseMemory。

思路:

releaseAllMemoryForTask 先計算好當前任務使用的所有內存,而後調用 releaseMemory 方法釋放內存。

releaseMemory 方法則會比對當前使用內存和要釋放的內存,若是要釋放的內存大小小於 當前使用的 ,作減法便可。釋放以後的任務內存若是小於等於0,則移除task便可,最後通知lock鎖等待區的對象,讓其從新分配內存。

在這個記帳的實現裏,每個來的task不必定是能夠分配到內存的,因此,鎖在其中起了很大的資源協調的做用,也防止了內存的溢出。

 

MemoryManager

文檔說明:

An abstract memory manager that enforces how memory is shared between execution and storage. In this context, execution memory refers to that used for computation in shuffles, joins, sorts and aggregations, while storage memory refers to that used for caching and propagating internal data across the cluster. There exists one MemoryManager per JVM.

一種抽象內存管理器,用於強制執行和存儲之間共享內存的方式。在這個上下文下,執行內存是指用於在shuffle,join,sort和aggregation中進行計算的內存,而存儲內存是指用於在羣集中緩存和傳播內部數據的內存。 每一個JVM都有一個MemoryManager。

先來講一下其依賴的MemoryPool,源碼以下:

MemoryPool中的lock對象就是MemoryManager對象

存儲內存池和執行內存池分別有兩個:堆內和堆外。

onHeapStorageMemory和onHeapExecutionMemory 是從構造方法傳過來的,先不予考慮。

maxOffHeapMemory 默認是 0, 能夠根據 spark.memory.offHeap.size 參數設置,文檔對這個參數的說明以下:

The absolute amount of memory in bytes which can be used for off-heap allocation. 
This setting has no impact on heap memory usage, so if your executors' total memory consumption must fit within some hard limit 
then be sure to shrink your JVM heap size  accordingly. This must be set to a positive value when spark.memory.offHeap.enabled=true.

 

存儲堆外內存 = 最大堆外內存(offHeapStorageMemory) X 堆外存儲內存佔比,這個佔比默認是0.5,能夠根據 spark.memory.storageFraction 來調節

執行堆外內存 = 最大堆外內存 - 存儲堆外內存

還有跟 Tungsten 管理內存有關的常量:

這三個常量分別定義了tungsten的內存形式、內存頁大小和內存分配器。

 

其方法解釋以下:

1. 獲取存儲池最大使用內存,抽象方法,待子類實現。

 

2. 獲取已使用內存

3. 獲取內存,這也是抽象方法,待子類實現

 

4. 釋放內存

這些請求都委託給對應的MemoryPool來作了

1.6 以前 使用MemoryManager子類 StaticMemoryManager 來作內存管理。

StaticMemoryManager

這個靜態內存管理中的執行池和存儲池之間有嚴格的界限,兩個池的大小永不改變。

注意:若是想使用這個內存管理方式,設置 spark.memory.useLegacyMode 爲 true便可(默認是false)

 

下面咱們重點看1.6 以後的默認使用的MemoryManager子類 -- UnifiedMemoryManager

UnifiedMemoryManager

先來看文檔說明:

這個MemoryManager保證了存儲池和執行池之間的軟邊界,便可以互相借用內存來知足彼此動態的內存需求變化。執行和存儲的佔比由 spark.memory.storageFraction 配置,默認是0.6,即偏向於存儲池。其中存儲池的默認佔比是由 spark.memory.storageFraction 參數決定,默認是 0.5 ,即 存儲池默認佔比 = 0.6 * 0.5 = 0.3 ,即存儲池默認佔比爲0.3。存儲池能夠儘量多的向執行池借用空閒內存。可是當執行池須要它的內存的時候,會把一部份內存池的內存對象從內存中驅逐出,直到知足執行池的內存需求。相似地,執行池也能夠儘量地借用存儲池中的空閒內存,不一樣的是,執行內存不會被存儲池驅逐出內存,也就是說,緩存block時可能會由於執行池佔用了大量的內存池不能釋放致使緩存block失敗,在這種狀況下,新的block會根據StorageLevel作相應處理。

 

咱們主要來看其實現的父類MemoryManager 的方法:

1. 獲取存儲池最大使用內存:

其中,maxHeapMemory 是從構造方法傳進來的成員變量,maxOffHeapMemory 是根據參數 spark.memory.offHeap.size 配置生成的。

能夠看出,存儲池的容許的最大使用內存是實時變化的,由於總內存不變,執行池內存使用狀況隨任務執行狀況變化而變化。

 

2. 獲取內存,逐一來看:

實現思路:先根據存儲方式(堆內仍是堆外)肯定存儲池,執行池,存儲區域內存大小和最大總內存。

而後調用執行池的 acquireMemory 方法申請內存,computeMaxExecutionPoolSize是隨存儲的實時變化而變化的,增大ExecutionPool的回調也被調用來確保有足夠空間可供執行池分配。

acquireUnrollMemory 直接調用 acquireStorageMemory 方法。

acquireStorageMemory實現思路:先根據存儲方式(堆內仍是堆外)肯定存儲池,執行池,存儲區域內存大小和最大總內存。

存儲內存若是大於最大內存,直接存儲失敗,不然,繼續查看所需內存大小是否大於內存池最大空閒內存,若是大於,則從執行池中申請足夠的空閒空間,注意,真正申請的空間大小在0 和numBytes - storagePool.memoryFree 之間,繼續調用storagePool的acquireMemory 方法去申請內存,若是不夠申請,則會驅逐出舊或空的block塊。

最後,咱們來看一下其伴生對象:

首先 apply 方法就相似於工廠方法的創造方法。咱們對比下面的一張圖,來講明一下Spark內存結構:

系統內存:能夠根據 spark.testing.memory 參數來配置(主要用於測試),默認是JVM 的可使用的最大內存。

保留內存:能夠根據 spark.testing.reservedMemory 參數來配置(主要用於測試), 默認是 300M

最小系統內存:保留內存 * 1.5 後,再向下取整

系統內存的約束:系統內存必須大於最小保留內存,即 系統可用內存必須大於 450M, 能夠經過 --driver-memory 或  spark.driver.memory 或 --executor-memory 或spark.executor.memory 來調節

可用內存 = 系統內存 - 保留內存

堆內內存佔比默認是0.6, 能夠根據 spark.memory.fraction 參數來調節

最大堆內內存 = 堆內可用內存 * 堆內內存佔比

堆內內存存儲池佔比默認是 0.5 ,能夠根據spark.memory.storageFraction 來調節。

默認堆內存儲內存大小 = 最大堆內內存 * 堆內內存存儲池佔比。即堆內存儲池內存大小默認是 (系統JVM最大可用內存 -  300M)* 0.6 * 0.5, 即約等於JVM最大可用內存的三分之一。

注意: 下圖中的spark.memory.fraction是0.75,是Spark 1.6 的默認配置。在Spark 2.4.3 中默認是0.6。

 圖片來源:https://0x0fff.com/spark-memory-management/

至此,Saprk 的內存管理模塊基本上剖析完畢。

總結:先介紹了內存的管理池,即MemoryPool的實現,而後重點分析了Spark 1.6 之後的內存管理機制,着重說明Spark內部的內存是如何劃分以及如何動態調整內存的。

 

注,關於堆內內存和堆外內存的介紹,可參照:https://www.jianshu.com/p/50be08b54bee

相關文章
相關標籤/搜索