maxOnHeapStorageMemory maxOffHeapStorageMemory setMemoryStore acquireStorageMemory acquireUnrollMemory acquireExecutionMemory releaseExecutionMemory releaseAllExecutionMemoryForTask releaseStorageMemory releaseAllStorageMemory releaseUnrollMemory executionMemoryUsed storageMemoryUsed getExecutionMemoryUsageForTask
protected val onHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.ON_HEAP) protected val offHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.OFF_HEAP) protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.ON_HEAP) protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.OFF_HEAP)
onHeapStorageMemoryPool.incrementPoolSize(onHeapStorageMemory) onHeapExecutionMemoryPool.incrementPoolSize(onHeapExecutionMemory) offHeapExecutionMemoryPool.incrementPoolSize(maxOffHeapMemory - offHeapStorageMemory) offHeapStorageMemoryPool.incrementPoolSize(offHeapStorageMemory)
private[memory] def releaseExecutionMemory( numBytes: Long, taskAttemptId: Long, memoryMode: MemoryMode): Unit = synchronized { memoryMode match { case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId) case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId) } }
def releaseMemory(numBytes: Long, taskAttemptId: Long): Unit = lock.synchronized { // 從內部的簿記量中獲取該任務使用的內存 val curMem = memoryForTask.getOrElse(taskAttemptId, 0L) // 檢查要釋放的內存是否超過了該任務實際使用的內存,並打印告警日誌 var memoryToFree = if (curMem < numBytes) { logWarning( s"Internal error: release called on $numBytes bytes but task only has $curMem bytes " + s"of memory from the $poolName pool") curMem } else { numBytes } if (memoryForTask.contains(taskAttemptId)) { // 更新簿記量 memoryForTask(taskAttemptId) -= memoryToFree // 若是該任務的內存使用量小於等於0,那麼從簿記量中移除該任務 if (memoryForTask(taskAttemptId) <= 0) { memoryForTask.remove(taskAttemptId) } } // 最後通知其餘等待的線程 // 由於可能會有其餘的任務在等待獲取執行內存 lock.notifyAll() // Notify waiters in acquireMemory() that memory has been freed }
private[memory] def releaseAllExecutionMemoryForTask(taskAttemptId: Long): Long = synchronized { onHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId) + offHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId) }
def releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode): Unit = synchronized { memoryMode match { case MemoryMode.ON_HEAP => onHeapStorageMemoryPool.releaseMemory(numBytes) case MemoryMode.OFF_HEAP => offHeapStorageMemoryPool.releaseMemory(numBytes) } }
final def releaseUnrollMemory(numBytes: Long, memoryMode: MemoryMode): Unit = synchronized { releaseStorageMemory(numBytes, memoryMode) }
override private[memory] def acquireExecutionMemory( numBytes: Long, taskAttemptId: Long, memoryMode: MemoryMode): Long = synchronized { // 檢查內存大小是否正確 assertInvariants() assert(numBytes >= 0) // 根據堆內存仍是直接內存決定使用不一樣的內存池和內存大小 val (executionPool, storagePool, storageRegionSize, maxMemory) = memoryMode match { case MemoryMode.ON_HEAP => ( onHeapExecutionMemoryPool, onHeapStorageMemoryPool, onHeapStorageRegionSize, maxHeapMemory) case MemoryMode.OFF_HEAP => ( offHeapExecutionMemoryPool, offHeapStorageMemoryPool, offHeapStorageMemory, maxOffHeapMemory) } /** * Grow the execution pool by evicting cached blocks, thereby shrinking the storage pool. * * When acquiring memory for a task, the execution pool may need to make multiple * attempts. Each attempt must be able to evict storage in case another task jumps in * and caches a large block between the attempts. This is called once per attempt. */ // 經過擠佔存儲內存來擴張執行內存, // 經過將緩存的塊溢寫到磁盤上,從而爲執行內存騰出空間 def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = { if (extraMemoryNeeded > 0) { // There is not enough free memory in the execution pool, so try to reclaim memory from // storage. We can reclaim any free memory from the storage pool. If the storage pool // has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim // the memory that storage has borrowed from execution. // 咱們能夠將剩餘的存儲內存都借過來用做執行內存 // 另外,若是存儲內存向執行內存借用了一部份內存,也就是說此時存儲內存的實際大小大於配置的值 // 那麼咱們就將全部的借用的存儲內存都還回來 val memoryReclaimableFromStorage = math.max( storagePool.memoryFree, storagePool.poolSize - storageRegionSize) if (memoryReclaimableFromStorage > 0) { // Only reclaim as much space as is necessary and available: // 只騰出必要大小的內存空間,這個方法會將內存中的block擠到磁盤中 val spaceToReclaim = storagePool.freeSpaceToShrinkPool( math.min(extraMemoryNeeded, memoryReclaimableFromStorage)) // 更新一些簿記量,存儲內存少了這麼多內存,相應的執行內存增長了這麼多內存 storagePool.decrementPoolSize(spaceToReclaim) executionPool.incrementPoolSize(spaceToReclaim) } } } /** * The size the execution pool would have after evicting storage memory. * * The execution memory pool divides this quantity among the active tasks evenly to cap * the execution memory allocation for each task. It is important to keep this greater * than the execution pool size, which doesn't take into account potential memory that * could be freed by evicting storage. Otherwise we may hit SPARK-12155. * * Additionally, this quantity should be kept below `maxMemory` to arbitrate fairness * in execution memory allocation across tasks, Otherwise, a task may occupy more than * its fair share of execution memory, mistakenly thinking that other tasks can acquire * the portion of storage memory that cannot be evicted. */ def computeMaxExecutionPoolSize(): Long = { maxMemory - math.min(storagePool.memoryUsed, storageRegionSize) } executionPool.acquireMemory( numBytes, taskAttemptId, maybeGrowExecutionPool, () => computeMaxExecutionPoolSize) }
def freeSpaceToShrinkPool(spaceToFree: Long): Long = lock.synchronized { val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree) val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory if (remainingSpaceToFree > 0) { // If reclaiming free memory did not adequately shrink the pool, begin evicting blocks: val spaceFreedByEviction = memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, memoryMode) // When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do // not need to decrement _memoryUsed here. However, we do need to decrement the pool size. spaceFreedByReleasingUnusedMemory + spaceFreedByEviction } else { spaceFreedByReleasingUnusedMemory } }
private[spark] def evictBlocksToFreeSpace( blockId: Option[BlockId], space: Long, memoryMode: MemoryMode): Long = { assert(space > 0) memoryManager.synchronized { var freedMemory = 0L val rddToAdd = blockId.flatMap(getRddId) val selectedBlocks = new ArrayBuffer[BlockId] def blockIsEvictable(blockId: BlockId, entry: MemoryEntry[_]): Boolean = { entry.memoryMode == memoryMode && (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) } // This is synchronized to ensure that the set of entries is not changed // (because of getValue or getBytes) while traversing the iterator, as that // can lead to exceptions. entries.synchronized { val iterator = entries.entrySet().iterator() while (freedMemory < space && iterator.hasNext) { val pair = iterator.next() val blockId = pair.getKey val entry = pair.getValue if (blockIsEvictable(blockId, entry)) { // We don't want to evict blocks which are currently being read, so we need to obtain // an exclusive write lock on blocks which are candidates for eviction. We perform a // non-blocking "tryLock" here in order to ignore blocks which are locked for reading: // 這裏之因此要獲取寫鎖是爲了防止在塊正在被讀取或寫入的時候將其擠出去 if (blockInfoManager.lockForWriting(blockId, blocking = false).isDefined) { selectedBlocks += blockId freedMemory += pair.getValue.size } } } } def dropBlock[T](blockId: BlockId, entry: MemoryEntry[T]): Unit = { val data = entry match { case DeserializedMemoryEntry(values, _, _) => Left(values) case SerializedMemoryEntry(buffer, _, _) => Right(buffer) } // 這裏的調用將塊擠出內存,若是容許寫到磁盤則溢寫到磁盤上 // 注意blockEvictionHandler的實現類就是BlockManager val newEffectiveStorageLevel = blockEvictionHandler.dropFromMemory(blockId, () => data)(entry.classTag) if (newEffectiveStorageLevel.isValid) { // The block is still present in at least one store, so release the lock // but don't delete the block info // 由於前面獲取了這些塊的寫鎖,尚未釋放, // 因此在這裏釋放這些塊的寫鎖 blockInfoManager.unlock(blockId) } else { // The block isn't present in any store, so delete the block info so that the // block can be stored again // 由於塊因爲從內存中移除又沒有寫到磁盤上,因此直接從內部的簿記量中移除該塊的信息 blockInfoManager.removeBlock(blockId) } } // 若是騰出的內存足夠多,比申請的量要大,這時纔會真正釋放相應的塊 if (freedMemory >= space) { var lastSuccessfulBlock = -1 try { logInfo(s"${selectedBlocks.size} blocks selected for dropping " + s"(${Utils.bytesToString(freedMemory)} bytes)") (0 until selectedBlocks.size).foreach { idx => val blockId = selectedBlocks(idx) val entry = entries.synchronized { entries.get(blockId) } // This should never be null as only one task should be dropping // blocks and removing entries. However the check is still here for // future safety. if (entry != null) { dropBlock(blockId, entry) // 這時爲測試留的一個鉤子方法 afterDropAction(blockId) } lastSuccessfulBlock = idx } logInfo(s"After dropping ${selectedBlocks.size} blocks, " + s"free memory is ${Utils.bytesToString(maxMemory - blocksMemoryUsed)}") freedMemory } finally { // like BlockManager.doPut, we use a finally rather than a catch to avoid having to deal // with InterruptedException // 若是不是全部的塊都轉移成功,那麼必然有的塊的寫鎖可能沒有釋放 // 因此在這裏將這些沒有移除成功的塊的寫鎖釋放掉 if (lastSuccessfulBlock != selectedBlocks.size - 1) { // the blocks we didn't process successfully are still locked, so we have to unlock them (lastSuccessfulBlock + 1 until selectedBlocks.size).foreach { idx => val blockId = selectedBlocks(idx) blockInfoManager.unlock(blockId) } } } } else {// 若是不能騰出足夠多的內存,那麼取消此次行動,釋放全部已經持有的塊的寫鎖 blockId.foreach { id => logInfo(s"Will not store $id") } selectedBlocks.foreach { id => blockInfoManager.unlock(id) } 0L } } }
因此,七繞八繞,饒了這麼一大圈,其實所謂的內存擠佔,其實就是把引用設爲null ^_^固然確定不是這麼簡單啦,其實在整個分析的過程當中咱們也能發現,所謂的內存管理大部分工做就是對任務使用內存一些簿記量的管理維護,這裏面有一些比較複雜的邏輯,例如給每一個任務分配多少內存的計算邏輯就比較複雜。
private[storage] override def dropFromMemory[T: ClassTag]( blockId: BlockId, data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel = { logInfo(s"Dropping block $blockId from memory") val info = blockInfoManager.assertBlockIsLockedForWriting(blockId) var blockIsUpdated = false val level = info.level // Drop to disk, if storage level requires // 若是存儲級別容許存到磁盤,那麼先溢寫到磁盤上 if (level.useDisk && !diskStore.contains(blockId)) { logInfo(s"Writing block $blockId to disk") data() match { case Left(elements) => diskStore.put(blockId) { channel => val out = Channels.newOutputStream(channel) serializerManager.dataSerializeStream( blockId, out, elements.toIterator)(info.classTag.asInstanceOf[ClassTag[T]]) } case Right(bytes) => diskStore.putBytes(blockId, bytes) } blockIsUpdated = true } // Actually drop from memory store val droppedMemorySize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L val blockIsRemoved = memoryStore.remove(blockId) if (blockIsRemoved) { blockIsUpdated = true } else { logWarning(s"Block $blockId could not be dropped from memory as it does not exist") } val status = getCurrentBlockStatus(blockId, info) if (info.tellMaster) { reportBlockStatus(blockId, status, droppedMemorySize) } // 向任務度量系統彙報塊更新的統計信息 if (blockIsUpdated) { addUpdatedBlockStatusToTaskMetrics(blockId, status) } status.storageLevel }
其中,存儲內存向執行內存借用 的邏輯相對簡單,僅僅是將兩個內存池的大小改一下,執行內存池減小必定的大小,存儲內存池則增長相應的大小。
override def acquireStorageMemory( blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean = synchronized { assertInvariants() assert(numBytes >= 0) val (executionPool, storagePool, maxMemory) = memoryMode match { case MemoryMode.ON_HEAP => ( onHeapExecutionMemoryPool, onHeapStorageMemoryPool, maxOnHeapStorageMemory) case MemoryMode.OFF_HEAP => ( offHeapExecutionMemoryPool, offHeapStorageMemoryPool, maxOffHeapStorageMemory) } // 由於執行內存擠佔不了,因此這裏若是申請的內存超過如今可用的內存,那麼就申請不了了 if (numBytes > maxMemory) { // Fail fast if the block simply won't fit logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " + s"memory limit ($maxMemory bytes)") return false } // 若是大於存儲內存的可用內存,那麼就須要向執行內存借用一部份內存 if (numBytes > storagePool.memoryFree) { // There is not enough free memory in the storage pool, so try to borrow free memory from // the execution pool. val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, numBytes - storagePool.memoryFree) // 存儲內存向執行內存借用的邏輯很簡單, // 僅僅是將兩個內存池的大小改一下, // 執行內存池減小必定的大小,存儲內存池則增長相應的大小 executionPool.decrementPoolSize(memoryBorrowedFromExecution) storagePool.incrementPoolSize(memoryBorrowedFromExecution) } // 經過storagePool申請必定量的內存 storagePool.acquireMemory(blockId, numBytes) }
def acquireMemory( blockId: BlockId, numBytesToAcquire: Long, numBytesToFree: Long): Boolean = lock.synchronized { assert(numBytesToAcquire >= 0) assert(numBytesToFree >= 0) assert(memoryUsed <= poolSize) // 首先調用MemoryStore的相關方法擠出一些塊以釋放內存 if (numBytesToFree > 0) { memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, memoryMode) } // NOTE: If the memory store evicts blocks, then those evictions will synchronously call // back into this StorageMemoryPool in order to free memory. Therefore, these variables // should have been updated. // 由於前面擠出一些塊後釋放內存時,BlockManager會經過MemoryManager相關方法更新內部的簿記量, // 因此這裏的memoryFree就會變化,會變大 val enoughMemory = numBytesToAcquire <= memoryFree if (enoughMemory) { _memoryUsed += numBytesToAcquire } enoughMemory }
override def acquireUnrollMemory( blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean = synchronized { acquireStorageMemory(blockId, numBytes, memoryMode) }