Spark 動態(統一)內存管理模型

做者編輯:王瑋,胡玉林html

一.回顧

在前面的一篇文章中咱們介紹了spark靜態內存管理模式以及相關知識https://blog.csdn.net/anitinaj/article/details/80901328 在上一篇文章末尾,咱們陳述了傳統spark靜態內存管理模式的侷限性:
(1) 沒有適用於全部應用的默認配置,一般須要開發人員針對不一樣的應用進行不一樣的參數配置。好比根據任務的執行邏輯,調整shuffle和storage內存佔比來適應任務的需求。
(2) 這樣須要開發人員具有較高的spark原理知識。
(3) 那些不cache數據的應用在運行時只佔用一小部分可用內存,由於默認的內存配置中,storage用去了safety內存的60%。
所以,在1.6以後,spark引入了動態(統一)內存管理模式,本文將針對動態內存管理模式的設計理念以及原理進行相關陳述。web

二.整體概覽

spark從1.6版本之後,默認的內存管理方式就調整爲統一內存管理模式。由UnifiedMemoryManager實現。Unified Memory Management模型,重點是打破運行內存和存儲內存之間的界限,使spark在運行時,不一樣用途的內存之間能夠實現互相的拆借。
由下圖可知,spark每一個executor(JVM)內存由一下幾個部分組成:緩存

  1. Reserved Memory: 這部份內存是預留給系統使用, 在1.6.0默認爲300MB, 這一部份內存不計算在spark execution和storage中。可經過spark.testing.reservedMemory進行設置。而後把實際可用內存減去這個reservedMemory獲得usableMemory。ExecutionMemory 和 StorageMemory 會共享usableMemory * spark.memory.fraction(默認0.75)。
  2. User Memory : 分配Spark Memory剩餘的內存,用戶能夠根據須要使用。默認佔(Java Heap - Reserved Memory) * 0.25.
  3. Spark Memory: 計算方式爲(Java Heap – Reserved Memory) spark.memory.fraction,在1.6.0中,默認爲(Java Heap - 300M) 0.75。1. Spark Memory又分爲Storage Memory和Execution Memory兩部分。兩個邊界由spark.memory.storageFraction設定,默認爲0.5

三.設計理念

本節將對第二部分各個內存的分佈以及設計原理進行詳細的闡述
相對於靜態內存模型(即存儲和運行內存相互隔離、彼此不可拆借),動態內存實現了存儲和計算內存的動態拆借。也就是說,當計算內存超了,它會從空閒的存儲內存中借一部份內存使用,存儲內存不夠用的時候,也會向空閒的計算內存中拆借。值得注意的地方是,被借走用來執行運算的內存,在執行完任務以前是不會釋放內存的。通俗的講,運行任務會借存儲的內存,可是它直到執行完之後才能歸還內存。
和動態內存相關的參數以下:數據結構

    • spark.memory.fraction(默認0.75): 這個參數用來配置存儲和計算內存佔整個jvm的比例。這個參數設置的越低,也就是存儲和計算內存佔jvm的比例越低,就越可能頻繁的發生內存的釋放(將內存中的數據寫磁盤或者直接丟棄掉)。反之,若是這個參數越高,發生釋放內存的可能性就越小。這個參數的目的是在jvm中留下一部分空間用來保存spark內部數據,用戶數據結構,而且防止對數據的錯誤預估可能形成OOM的風險。
    • spark.memory.storageFraction(默認 0.5):在spark.memory.fraction中存儲內存所佔的比例,默認是0.5,若是使用的存儲內存超過了這個範圍,緩存的數據會被驅趕。
    • spark.memory.useLegacyMode(default false): 設置是否使用saprk1.5及之前遺留的內存管理模型,即靜態內存模型,上一篇文章咱們介紹過這個,主要是設置如下幾個參數,詳見上一篇文章。
      ○ spark.storage.memoryFraction
      ○ spark.storage.safetyFraction
      ○ spark.storage.unrollFraction
      ○ spark.shuffle.memoryFraction
      ○ spark.shuffle.safetyFraction

下面對動態內存設計原理的一些取捨進行分析:app

1.當內存壓力上升的時候

由於內存能夠被計算和存儲內存拆借,咱們必須明確在這種機制下,當內存壓力上升的時候,咱們如何取捨?接下來會從不一樣維度對下面三個取捨進行分析:
a、傾向於優先釋放計算內存
b、傾向於優先釋放存儲內存
c、不偏不倚,平等競爭jvm

維度一、釋放內存的代價函數

釋放存儲內存的代價取決於storage level. 若是數據的存儲level是MEMORY_ONLY的話代價最高,由於當你釋放在內存中的數據的時候,你下次再複用的話只能從新計算了。若是數據的存儲level是MEMORY_AND_DIS_SER的時候,釋放內存的代價最低。由於這種方式,當內存不夠的時候,它會將數據序列化後放在磁盤上,避免複用的時候再計算,惟一的開銷只是I/O上。
釋放計算內存的代價不是很顯而易見。這裏沒有複用數據重計算的代價,由於計算內存中的任務數據會被移到硬盤,最後再歸併起來。最近的spark版本將計算的中間數據進行壓縮使得序列化的代價降到了最低。
值得注意的是,移到硬盤的數據總會再從新讀回來,從存儲內存移除的數據也許不會被用到,因此當沒有從新計算的風險時,釋放計算的內存要比釋放存儲內存的代價更高。ui

維度二、實現複雜度url

實現釋放存儲內存的策略很簡單:咱們只須要用目前的內存釋放策略釋放掉存儲內存中的數據就行了。
實現釋放計算內存卻相對來講很複雜。這裏有幾個實現該方案的思路:
a、當運行任務要拆借存儲內存的時候,給全部這些任務註冊一個回調函數以便往後調這個函數來回收內存
b、協同投票來進行內存的釋放spa

值得咱們注意的一個地方是,以上不管哪一種方式,都須要考慮一個地方:即若是我要釋放正在運行的任務的內存,同時咱們想要cache到存儲內存的一部分數據恰巧是由這個任務產生的,若是咱們如今釋放掉正在運行的任務的內存,就須要考慮在這種環境下會形成飢餓的狀況:即生成cache的數據的任務沒有足夠的內存空間來跑出cache的數據一直處於飢餓狀態。

此外,咱們還須要考慮,一旦咱們釋放掉計算內存,那麼那些須要cache的數據應該怎麼辦?最簡單的方式就是等待,直到計算內存有足夠的空閒,可是這樣就可能會形成死鎖,尤爲是當新的數據塊依賴於以前的計算內存中的數據塊。另外一個可選的操做就是丟掉那些最新寫入到磁盤中的塊而且一旦當計算內存夠了又立刻加載回來。爲了不老是丟掉那些等待中的塊,咱們能夠設置一個小的內存空間(好比堆內存的5%)去確保內存中至少有必定的比例的的數據塊。

所給的兩種方法都會增長額外的複雜度, 這兩種方式在第一次的實現中都被排除了。綜上目前看來,釋放掉存儲內存中的計算任務在實現上比較繁瑣,目前暫不考慮。

結論:咱們傾向於優先釋放掉存儲內存。即若是存儲內存拆借了計算內存,當計算內存須要進行計算而且內存空間不足的時候,優先把計算內存中這部分被用來存儲的內存釋放掉。

2.可選設計

可選的幾種設計理念:結合咱們前面的描述,針對在內存壓力下釋放存儲內存有如下幾個可選設計。

A: 釋放存儲內存數據塊,徹底平滑: 計算和存儲內存共享一片統一的區域。內存壓力上升,優先釋放掉存儲內存部分中的數據。若是壓力沒有緩解,開始將計算內存中運行的任務數據進行溢寫磁盤。

B:釋放存儲內存數據塊,靜態存儲空間預留:這種設計和A設計很像,不一樣的是會專門劃分一個預留存儲內存區域。在這個內存區域內,存儲內存不會被釋放,只有當存儲內存超出這個預留區域,纔會被釋放。這個參數由spark.memory.storageFraction 配置。

C:釋放存儲內存數據塊,動態存儲空間預留:這種設計於設計B很類似,可是存儲空間的那一部分區域再也不是靜態設置的了,而是動態分配。這樣設置帶來的不一樣是計算內存能夠儘量借走存儲內存中可用的部分。

結論:最終採用的的是設計C。

設計A被拒絕的緣由是:設計A不適合那些對cache內存重度依賴的saprk任務。

設計B被拒絕的緣由是:設計B在不少狀況下須要用戶去設置存儲內存中那部分最小的區域。另外不管咱們設置一個什麼值,只要它非0,那麼計算內存最終也會達到一個上限,好比,若是咱們將其設置爲0.6,那麼有效的執行內存就是堆內存的0.4 * 0.75=0.3,那麼若是用戶沒有cache數據,或是cache的數據達不到設置的0.6,那麼這種狀況就又回到了靜態內存模型那種狀況,並無改善什麼。

設計C:設計C就避免了B中的問題,只要存儲內存有空餘的,那麼計算內存就能夠借用,須要關注的問題是當計算內存已經使用了存儲內存中的全部可用內存可是又須要cache數據的時候應該怎麼處理。最先的版本中直接釋放最新的block來避免引入執行驅趕策略的複雜性

同時設計C是惟一一個同時知足下列條件的:

  1. 存儲內存沒有上限。
  2. 計算內存沒有上限。
  3. 保障了存儲空間有一個小的保留區域。

四.實現類分析-UnifiedMemoryManager

闡述該類的幾個主要方法:

1. acquireExecutionMemory(numBytes: Long, taskAttemptId: Long, memoryMode: MemoryMode)方法:當前的任務嘗試從executor中獲取numBytes這麼大的內存。該方法直接向ExecutionMemoryPool索要所需內存,索要內存有如下幾個關注點:

  • 當ExecutionMemory 內存充足,則不會觸發向Storage申請內存。
  • 每一個Task可以被使用的內存被限制在 poolSize / (2 numActiveTasks) ~ maxPoolSize / numActiveTasks 之間。val maxMemoryPerTask = maxPoolSize / numActiveTasks和 `val minMemoryPerTask = poolSize / (2numActiveTasks)`其中maxPoolSize= maxMemory(storage+execution的最大內存) - math.min(storageMemoryUsed, storageRegionSize),poolSize= 當前這個pool的大小。而maxPoolSize也表明了execution pool的最大內存。
  • 索要的內存大小:val memoryReclaimableFromStorage =math.max(storageMemoryPool.memoryFree, storageMemoryPool.poolSize - storageRegionSize)取決於StorageMemoryPool的剩餘內存和 storageMemoryPool 從ExecutionMemory借來的內存哪一個大,取最大的那個,做爲能夠從新歸還的最大內存。用公式表達出來就是這一個樣子:ExecutionMemory 能借到的最大內存= StorageMemory 借的內存 + StorageMemory 空閒內存固然,若是實際須要的小於可以借到的最大值,則以實際須要值爲準。val spaceToReclaim = storageMemoryPool.freeSpaceToShrinkPool( math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
    ExecutionMemoryPool 的acquireMemory方法主要以下:

    程序一直處理該task的請求,直到系統斷定沒法知足該請求或者已經爲該請求分配到足夠的內存爲止。若是當前execution內存池剩餘內存不足以知足這次請求時,會向storage部分請求釋放出被借走的內存以知足這次請求。
    根據此刻execution內存池的總大小maxPoolSize,以及從memoryForTask中統計出的處於active狀態的task的個數計算出每一個task可以獲得的最大內存數maxMemoryPerTask = maxPoolSize / numActiveTasks。每一個task可以獲得的最少內存數minMemoryPerTask = poolSize / (2 * numActiveTasks)。
    根據申請內存的task當前使用的execution內存大小決定分配給該task多少內存,總的內存不能超過maxMemoryPerTask。可是若是execution內存池可以分配的最大內存小於numBytes而且若是把可以分配的內存分配給當前task,可是該task最終獲得的execution內存仍是小於minMemoryPerTask時,該task進入等待狀態,等其餘task申請內存時將其喚醒。若是知足,就會返回可以分配的內存數,而且更新memoryForTask,將該task使用的內存調整爲分配後的值。一個Task最少須要minMemoryPerTask才能開始執行。

2. acquireStorageMemory(blockId: BlockId,numBytes: Long, evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)])方法:

  • 流程和acquireExecutionMemory相似,當storage的內存不足時,一樣會向execution借內存,但區別是當且僅當ExecutionMemory有空閒內存時,StorageMemory 才能借走該內存。能借到的內存數爲:val memoryBorrowedFromExecution = Math.min(onHeapExecutionMemoryPool.memoryFree, numBytes)。因此StorageMemory從ExecutionMemory借走的內存,徹底取決於當時ExecutionMemory是否是有空閒內存。借到內存後,storageMemoryPool增長借到的這部份內存,以後同上同樣,會調用StorageMemoryPool的acquireMemory方法,主要以下:

    在申請內存時,若是numBytes大於此刻storage內存池的剩餘內存,即if (numBytesToFree > 0),那麼須要storage內存池釋放一部份內存以知足申請需求。釋放內存後若是memoryFree >= numBytes,就會把這部份內存分配給申請內存的task,而且更新storage內存池的使用狀況。同時他與ExecutionMemoryPool不一樣的是,他不會像前者那樣分不到資源就進行等待,acquireStorageMemory只會返回一個true或是false,告知內存分配是否成功。

五.總結

結合兩篇文章,咱們對spark的兩種內存管理模型都作了一個簡單的介紹,二者的不一樣之處也作出了說明,但願這兩篇文章對spark的使用者有必定的幫助,也歡迎你們交流。

參考內容:

  1. spark設計文檔《unified-memory-management-spark》
  2. http://blog.csdn.net/dabokele/article/details/51475469
相關文章
相關標籤/搜索