內存管理器能夠說是spark內核中最重要的基礎模塊之一,shuffle時的排序,rdd緩存,展開內存,廣播變量,Task運行結果的存儲等等,凡是須要使用內存的地方都須要向內存管理器定額申請。我認爲內存管理器的主要做用是爲了儘量減少內存溢出的同時提升內存利用率。舊版本的spark的內存管理是靜態內存管理器StaticMemoryManager,而新版本(應該是從1.6以後吧,記不清了)則改爲了統一內存管理器UnifiedMemoryManager,同一內存管理器相對於靜態內存管理器最大的區別在於執行內存和存儲內存兩者之間沒有明確的界限,能夠相互借用,可是執行內存的優先級更高,也就是說若是執行內存不夠用就會擠佔存儲內存,這時會將一部分緩存的rdd溢寫到磁盤上直到騰出足夠的空間。可是執行內存任何狀況下都不會被擠佔,想一想這也能夠理解,畢竟執行內存是用於shuffle時排序的,這隻能在內存中進行,而rdd緩存的要求就沒有這麼嚴格。
有幾個參數控制各個部份內存的使用比例,緩存
咱們首先總體看一下MemoryManager這個類,jvm
maxOnHeapStorageMemory maxOffHeapStorageMemory setMemoryStore acquireStorageMemory acquireUnrollMemory acquireExecutionMemory releaseExecutionMemory releaseAllExecutionMemoryForTask releaseStorageMemory releaseAllStorageMemory releaseUnrollMemory executionMemoryUsed storageMemoryUsed getExecutionMemoryUsageForTask
能夠發現,MemoryManager內部的方法比較少並且是有規律的,它將內存在功能上分爲三種:StorageMemory,UnrollMemory,ExecutionMemory,
針對這三種內存分別有申請內存的方法和釋放內存的方法,而且三種申請內存的方法都是抽象方法,由子類實現。
此外,咱們看一下MemoryManager內部有哪些成員變量:ide
protected val onHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.ON_HEAP) protected val offHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.OFF_HEAP) protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.ON_HEAP) protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.OFF_HEAP)
這四個成員變量分別表明四種內存池。這裏要注意的是,MemoryPool的構造其中有一個Object類型參數用於同步鎖,MemoryPool內部的一些方法會獲取該對象鎖用於同步。
咱們看一下他們的初始化:測試
onHeapStorageMemoryPool.incrementPoolSize(onHeapStorageMemory) onHeapExecutionMemoryPool.incrementPoolSize(onHeapExecutionMemory) offHeapExecutionMemoryPool.incrementPoolSize(maxOffHeapMemory - offHeapStorageMemory) offHeapStorageMemoryPool.incrementPoolSize(offHeapStorageMemory)
其實就是調用ExecutionMemoryPool的相關方法,ui
private[memory] def releaseExecutionMemory( numBytes: Long, taskAttemptId: Long, memoryMode: MemoryMode): Unit = synchronized { memoryMode match { case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId) case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId) } }
代碼邏輯很簡單,就很少說了。
其實從這個方法,咱們大概能夠看出,spark內存管理的含義,其實spark的內存管理說到底就是對內存使用量的記錄和管理,而並非像操做系統或jvm那樣真正地進行內存的分配和回收。this
def releaseMemory(numBytes: Long, taskAttemptId: Long): Unit = lock.synchronized { // 從內部的簿記量中獲取該任務使用的內存 val curMem = memoryForTask.getOrElse(taskAttemptId, 0L) // 檢查要釋放的內存是否超過了該任務實際使用的內存,並打印告警日誌 var memoryToFree = if (curMem < numBytes) { logWarning( s"Internal error: release called on $numBytes bytes but task only has $curMem bytes " + s"of memory from the $poolName pool") curMem } else { numBytes } if (memoryForTask.contains(taskAttemptId)) { // 更新簿記量 memoryForTask(taskAttemptId) -= memoryToFree // 若是該任務的內存使用量小於等於0,那麼從簿記量中移除該任務 if (memoryForTask(taskAttemptId) <= 0) { memoryForTask.remove(taskAttemptId) } } // 最後通知其餘等待的線程 // 由於可能會有其餘的任務在等待獲取執行內存 lock.notifyAll() // Notify waiters in acquireMemory() that memory has been freed }
把堆上的執行內存和直接內存的執行內存中該任務使用的內存都釋放掉,
onHeapExecutionMemoryPool和offHeapExecutionMemoryPool是同一個類,只是一個記錄執行內存對直接內存的使用,一個記錄執行內存對堆內存的使用。spa
private[memory] def releaseAllExecutionMemoryForTask(taskAttemptId: Long): Long = synchronized { onHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId) + offHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId) }
對於存儲內存的使用的記錄並無執行內存那麼細,不會記錄每一個RDD使用了多少內存操作系統
def releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode): Unit = synchronized { memoryMode match { case MemoryMode.ON_HEAP => onHeapStorageMemoryPool.releaseMemory(numBytes) case MemoryMode.OFF_HEAP => offHeapStorageMemoryPool.releaseMemory(numBytes) } }
這裏,咱們看一下釋放展開內存的方法,發現展開內存使用的就是存儲內存。回顧一下BlockManager部分,展開內存的申請主要是在將數據經過MemoryStore存儲成塊時須要將數據臨時放在內存中,這時就須要申請展開內存。線程
final def releaseUnrollMemory(numBytes: Long, memoryMode: MemoryMode): Unit = synchronized { releaseStorageMemory(numBytes, memoryMode) }
從上面分析的幾個釋放內存的方法不難看出,所謂的釋放內存其實只是對內存管理器內部的一些簿記量的改變,這就要求外部的調用者必須確保它們確實釋放了這麼多的內存,不然內存管理就會和實際的內存使用狀況出現很大誤差。固然,好在內存管理器是spark內部的模塊,並不向用戶開放,因此在用戶代碼中不會調用內存管理模塊。rest
開篇咱們講到,spark的內存管理器分爲兩種,而新的版本默認都是使用統一內存管理器UnifiedMemoryManager,後面靜態內存管理器會逐漸啓用,因此這裏咱們也重點分析統一內存管理。
前面,咱們分析了父類MemoryManager中釋放內存的幾個方法,而申請內存的幾個方法都是抽象方法,這些方法的實現都是在子類中,也就是UnifiedMemoryManager中實現的。
這個方法是用來申請執行內存的。其中定義了幾個局部方法,maybeGrowExecutionPool方法用來擠佔存儲內存以擴展執行內存空間;
computeMaxExecutionPoolSize方法用來計算最大的執行內存大小。
最後調用了executionPool.acquireMemory方法實際申請執行內存。
override private[memory] def acquireExecutionMemory( numBytes: Long, taskAttemptId: Long, memoryMode: MemoryMode): Long = synchronized { // 檢查內存大小是否正確 assertInvariants() assert(numBytes >= 0) // 根據堆內存仍是直接內存決定使用不一樣的內存池和內存大小 val (executionPool, storagePool, storageRegionSize, maxMemory) = memoryMode match { case MemoryMode.ON_HEAP => ( onHeapExecutionMemoryPool, onHeapStorageMemoryPool, onHeapStorageRegionSize, maxHeapMemory) case MemoryMode.OFF_HEAP => ( offHeapExecutionMemoryPool, offHeapStorageMemoryPool, offHeapStorageMemory, maxOffHeapMemory) } /** * Grow the execution pool by evicting cached blocks, thereby shrinking the storage pool. * * When acquiring memory for a task, the execution pool may need to make multiple * attempts. Each attempt must be able to evict storage in case another task jumps in * and caches a large block between the attempts. This is called once per attempt. */ // 經過擠佔存儲內存來擴張執行內存, // 經過將緩存的塊溢寫到磁盤上,從而爲執行內存騰出空間 def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = { if (extraMemoryNeeded > 0) { // There is not enough free memory in the execution pool, so try to reclaim memory from // storage. We can reclaim any free memory from the storage pool. If the storage pool // has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim // the memory that storage has borrowed from execution. // 咱們能夠將剩餘的存儲內存都借過來用做執行內存 // 另外,若是存儲內存向執行內存借用了一部份內存,也就是說此時存儲內存的實際大小大於配置的值 // 那麼咱們就將全部的借用的存儲內存都還回來 val memoryReclaimableFromStorage = math.max( storagePool.memoryFree, storagePool.poolSize - storageRegionSize) if (memoryReclaimableFromStorage > 0) { // Only reclaim as much space as is necessary and available: // 只騰出必要大小的內存空間,這個方法會將內存中的block擠到磁盤中 val spaceToReclaim = storagePool.freeSpaceToShrinkPool( math.min(extraMemoryNeeded, memoryReclaimableFromStorage)) // 更新一些簿記量,存儲內存少了這麼多內存,相應的執行內存增長了這麼多內存 storagePool.decrementPoolSize(spaceToReclaim) executionPool.incrementPoolSize(spaceToReclaim) } } } /** * The size the execution pool would have after evicting storage memory. * * The execution memory pool divides this quantity among the active tasks evenly to cap * the execution memory allocation for each task. It is important to keep this greater * than the execution pool size, which doesn't take into account potential memory that * could be freed by evicting storage. Otherwise we may hit SPARK-12155. * * Additionally, this quantity should be kept below `maxMemory` to arbitrate fairness * in execution memory allocation across tasks, Otherwise, a task may occupy more than * its fair share of execution memory, mistakenly thinking that other tasks can acquire * the portion of storage memory that cannot be evicted. */ def computeMaxExecutionPoolSize(): Long = { maxMemory - math.min(storagePool.memoryUsed, storageRegionSize) } executionPool.acquireMemory( numBytes, taskAttemptId, maybeGrowExecutionPool, () => computeMaxExecutionPoolSize) }
這個方法的代碼我就不貼了,主要是一些複雜的內存申請規則的計算,以及內部簿記量的維護,此外若是現有可用的內存量過小,則會等待(經過對象鎖等待)直到其餘任務釋放一些內存;
除此以外最重要的就是對上面提到的maybeGrowExecutionPool方法的調用,因此咱們重點仍是看一下maybeGrowExecutionPool方法。
因爲這個方法在前面已經貼出來,而且標上了很詳細的註釋,因此代碼邏輯略過,其中有一個關鍵的調用storagePool.freeSpaceToShrinkPool,這個方法實現了將內存中的塊擠出去的邏輯。
咱們發現其中調用了memoryStore.evictBlocksToFreeSpace方法,
def freeSpaceToShrinkPool(spaceToFree: Long): Long = lock.synchronized { val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree) val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory if (remainingSpaceToFree > 0) { // If reclaiming free memory did not adequately shrink the pool, begin evicting blocks: val spaceFreedByEviction = memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, memoryMode) // When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do // not need to decrement _memoryUsed here. However, we do need to decrement the pool size. spaceFreedByReleasingUnusedMemory + spaceFreedByEviction } else { spaceFreedByReleasingUnusedMemory } }
這個方法看似很長,其實大概能夠總結爲一點。
由於MemoryStore存儲了內存中全部塊的實際數據,因此能夠根據這些信息知道每一個塊實際大小,這樣就能計算出須要擠出哪些塊,固然這個過程當中還有一些細節的處理,好比塊的寫鎖的獲取和釋放等等。
這裏面,實際將塊從內存中釋放(本質上就是將塊的數據對應的MemoryEntry的引用設爲null,這樣gc就能夠回收這個塊)的功能代碼在blockEvictionHandler.dropFromMemory方法中實現,也就是
BlockManager.dropFromMemory。
private[spark] def evictBlocksToFreeSpace( blockId: Option[BlockId], space: Long, memoryMode: MemoryMode): Long = { assert(space > 0) memoryManager.synchronized { var freedMemory = 0L val rddToAdd = blockId.flatMap(getRddId) val selectedBlocks = new ArrayBuffer[BlockId] def blockIsEvictable(blockId: BlockId, entry: MemoryEntry[_]): Boolean = { entry.memoryMode == memoryMode && (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) } // This is synchronized to ensure that the set of entries is not changed // (because of getValue or getBytes) while traversing the iterator, as that // can lead to exceptions. entries.synchronized { val iterator = entries.entrySet().iterator() while (freedMemory < space && iterator.hasNext) { val pair = iterator.next() val blockId = pair.getKey val entry = pair.getValue if (blockIsEvictable(blockId, entry)) { // We don't want to evict blocks which are currently being read, so we need to obtain // an exclusive write lock on blocks which are candidates for eviction. We perform a // non-blocking "tryLock" here in order to ignore blocks which are locked for reading: // 這裏之因此要獲取寫鎖是爲了防止在塊正在被讀取或寫入的時候將其擠出去 if (blockInfoManager.lockForWriting(blockId, blocking = false).isDefined) { selectedBlocks += blockId freedMemory += pair.getValue.size } } } } def dropBlock[T](blockId: BlockId, entry: MemoryEntry[T]): Unit = { val data = entry match { case DeserializedMemoryEntry(values, _, _) => Left(values) case SerializedMemoryEntry(buffer, _, _) => Right(buffer) } // 這裏的調用將塊擠出內存,若是容許寫到磁盤則溢寫到磁盤上 // 注意blockEvictionHandler的實現類就是BlockManager val newEffectiveStorageLevel = blockEvictionHandler.dropFromMemory(blockId, () => data)(entry.classTag) if (newEffectiveStorageLevel.isValid) { // The block is still present in at least one store, so release the lock // but don't delete the block info // 由於前面獲取了這些塊的寫鎖,尚未釋放, // 因此在這裏釋放這些塊的寫鎖 blockInfoManager.unlock(blockId) } else { // The block isn't present in any store, so delete the block info so that the // block can be stored again // 由於塊因爲從內存中移除又沒有寫到磁盤上,因此直接從內部的簿記量中移除該塊的信息 blockInfoManager.removeBlock(blockId) } } // 若是騰出的內存足夠多,比申請的量要大,這時纔會真正釋放相應的塊 if (freedMemory >= space) { var lastSuccessfulBlock = -1 try { logInfo(s"${selectedBlocks.size} blocks selected for dropping " + s"(${Utils.bytesToString(freedMemory)} bytes)") (0 until selectedBlocks.size).foreach { idx => val blockId = selectedBlocks(idx) val entry = entries.synchronized { entries.get(blockId) } // This should never be null as only one task should be dropping // blocks and removing entries. However the check is still here for // future safety. if (entry != null) { dropBlock(blockId, entry) // 這時爲測試留的一個鉤子方法 afterDropAction(blockId) } lastSuccessfulBlock = idx } logInfo(s"After dropping ${selectedBlocks.size} blocks, " + s"free memory is ${Utils.bytesToString(maxMemory - blocksMemoryUsed)}") freedMemory } finally { // like BlockManager.doPut, we use a finally rather than a catch to avoid having to deal // with InterruptedException // 若是不是全部的塊都轉移成功,那麼必然有的塊的寫鎖可能沒有釋放 // 因此在這裏將這些沒有移除成功的塊的寫鎖釋放掉 if (lastSuccessfulBlock != selectedBlocks.size - 1) { // the blocks we didn't process successfully are still locked, so we have to unlock them (lastSuccessfulBlock + 1 until selectedBlocks.size).foreach { idx => val blockId = selectedBlocks(idx) blockInfoManager.unlock(blockId) } } } } else {// 若是不能騰出足夠多的內存,那麼取消此次行動,釋放全部已經持有的塊的寫鎖 blockId.foreach { id => logInfo(s"Will not store $id") } selectedBlocks.foreach { id => blockInfoManager.unlock(id) } 0L } } }
總結一下這個方法的主要邏輯:
因此,七繞八繞,饒了這麼一大圈,其實所謂的內存擠佔,其實就是把引用設爲null ^_^固然確定不是這麼簡單啦,其實在整個分析的過程當中咱們也能發現,所謂的內存管理大部分工做就是對任務使用內存一些簿記量的管理維護,這裏面有一些比較複雜的邏輯,例如給每一個任務分配多少內存的計算邏輯就比較複雜。
private[storage] override def dropFromMemory[T: ClassTag]( blockId: BlockId, data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel = { logInfo(s"Dropping block $blockId from memory") val info = blockInfoManager.assertBlockIsLockedForWriting(blockId) var blockIsUpdated = false val level = info.level // Drop to disk, if storage level requires // 若是存儲級別容許存到磁盤,那麼先溢寫到磁盤上 if (level.useDisk && !diskStore.contains(blockId)) { logInfo(s"Writing block $blockId to disk") data() match { case Left(elements) => diskStore.put(blockId) { channel => val out = Channels.newOutputStream(channel) serializerManager.dataSerializeStream( blockId, out, elements.toIterator)(info.classTag.asInstanceOf[ClassTag[T]]) } case Right(bytes) => diskStore.putBytes(blockId, bytes) } blockIsUpdated = true } // Actually drop from memory store val droppedMemorySize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L val blockIsRemoved = memoryStore.remove(blockId) if (blockIsRemoved) { blockIsUpdated = true } else { logWarning(s"Block $blockId could not be dropped from memory as it does not exist") } val status = getCurrentBlockStatus(blockId, info) if (info.tellMaster) { reportBlockStatus(blockId, status, droppedMemorySize) } // 向任務度量系統彙報塊更新的統計信息 if (blockIsUpdated) { addUpdatedBlockStatusToTaskMetrics(blockId, status) } status.storageLevel }
咱們再來看一下對於存儲內存的申請。
其中,存儲內存向執行內存借用 的邏輯相對簡單,僅僅是將兩個內存池的大小改一下,執行內存池減小必定的大小,存儲內存池則增長相應的大小。
override def acquireStorageMemory( blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean = synchronized { assertInvariants() assert(numBytes >= 0) val (executionPool, storagePool, maxMemory) = memoryMode match { case MemoryMode.ON_HEAP => ( onHeapExecutionMemoryPool, onHeapStorageMemoryPool, maxOnHeapStorageMemory) case MemoryMode.OFF_HEAP => ( offHeapExecutionMemoryPool, offHeapStorageMemoryPool, maxOffHeapStorageMemory) } // 由於執行內存擠佔不了,因此這裏若是申請的內存超過如今可用的內存,那麼就申請不了了 if (numBytes > maxMemory) { // Fail fast if the block simply won't fit logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " + s"memory limit ($maxMemory bytes)") return false } // 若是大於存儲內存的可用內存,那麼就須要向執行內存借用一部份內存 if (numBytes > storagePool.memoryFree) { // There is not enough free memory in the storage pool, so try to borrow free memory from // the execution pool. val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, numBytes - storagePool.memoryFree) // 存儲內存向執行內存借用的邏輯很簡單, // 僅僅是將兩個內存池的大小改一下, // 執行內存池減小必定的大小,存儲內存池則增長相應的大小 executionPool.decrementPoolSize(memoryBorrowedFromExecution) storagePool.incrementPoolSize(memoryBorrowedFromExecution) } // 經過storagePool申請必定量的內存 storagePool.acquireMemory(blockId, numBytes) }
def acquireMemory( blockId: BlockId, numBytesToAcquire: Long, numBytesToFree: Long): Boolean = lock.synchronized { assert(numBytesToAcquire >= 0) assert(numBytesToFree >= 0) assert(memoryUsed <= poolSize) // 首先調用MemoryStore的相關方法擠出一些塊以釋放內存 if (numBytesToFree > 0) { memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, memoryMode) } // NOTE: If the memory store evicts blocks, then those evictions will synchronously call // back into this StorageMemoryPool in order to free memory. Therefore, these variables // should have been updated. // 由於前面擠出一些塊後釋放內存時,BlockManager會經過MemoryManager相關方法更新內部的簿記量, // 因此這裏的memoryFree就會變化,會變大 val enoughMemory = numBytesToAcquire <= memoryFree if (enoughMemory) { _memoryUsed += numBytesToAcquire } enoughMemory }
能夠看到,這裏也調用了memoryStore.evictBlocksToFreeSpace方法來說一部分塊擠出內存,以此來爲新的block騰出空間。
另外還有對展開內存的申請,實際就是申請存儲內存。
override def acquireUnrollMemory( blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean = synchronized { acquireStorageMemory(blockId, numBytes, memoryMode) }
內存管理,本質上是對shuffle排序過程當中使用的內存和rdd緩存使用的內存的簿記,經過對內存使用量的詳細精確的記錄和管理,最大限度避免OOM的發生,同時儘可能提升內存利用率。