Spark統一內存管理

Spark1.6 之後,增長統一內存管理機制內存管理模塊包括堆內內存(On-heap Memory),堆外內存(Off-heap Memory)兩大區域。html

1.堆內內存

clipboard.png
Executor Memory:主要用於存放 Shuffle、Join、Sort、Aggregation 等計算過程當中的臨時數據
Storage Memory:主要用於存儲 spark 的 cache 數據,例如RDD的緩存、unroll數據
User Memory:主要用於存儲 RDD 轉換操做所須要的數據,例如 RDD 依賴等信息
Reserved Memory:系統預留內存,會用來存儲Spark內部對象apache

private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024
val reservedMemory = conf.getLong("spark.testing.reservedMemory",
      if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)

systemMemory:緩存

val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)

Runtime.getRuntime.maxMemory就是JVM運行時的堆內存,在Java程序中經過-Xmx -Xms配置,spark中經過spark.executor.memory--executor-memory 配置的。
useableMemory:spark可用內存app

val usableMemory = systemMemory - reservedMemory

補充:oop

val minSystemMemory = (reservedMemory * 1.5).ceil.toLong

execution Memory不得小於reservedMemory 的1.5倍。ui

2.堆外內存

Spark 1.6 開始引入了Off-heap memory,調用Java的Unsafe類API申請堆外的內存資源,這種方式不進行Java內存管理,可避免頻繁GC,但須要本身實現內存申請和釋放的邏輯。this

clipboard.png

clipboard.png

3.堆內內存動態調整

初始化:程序提交時,execution和storage各佔0.5(經過spark.memory.storageFraction配置)spa

onHeapStorageRegionSize =
    (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong

這意味着code

  • 在程序運行時,若是雙方的空間都不足時,則存儲到硬盤;將內存中的塊存儲到磁盤的策略是按照 LRU 規則進行的。若己方空間不足而對方空餘時,可借用對方的空間;(存儲空間不足是指不足以放下一個完整的 Block)
  • Execution 內存的空間被對方佔用後,可以讓對方將佔用的部分轉存到硬盤,而後"歸還"借用的空間
  • Storage 內存的空間被對方佔用後,目前的實現是沒法讓對方"歸還",由於須要考慮 Shuffle 過程當中的不少因素,實現起來較爲複雜;並且 Shuffle 過程產生的文件在後面必定會被使用到,而 Cache 在內存的數據不必定在後面使用。

注意,上面說的借用對方的內存須要借用方和被借用方的內存類型都同樣,都是堆內內存或者都是堆外內存,不存在堆內內存不夠去借用堆外內存的空間。htm

4.Task內存分配

/**
 * Try to acquire up to `numBytes` of memory for the given task and return the number of bytes
 * obtained, or 0 if none can be allocated.
 *  * This call may block until there is enough free memory in some situations, to make sure each
 * task has a chance to ramp up to at least 1 / 2N of the total memory pool (where N is the # of
 * active tasks) before it is forced to spill. This can happen if the number of tasks increase
 * but an older task had a lot of memory already.
 *  * @param numBytes number of bytes to acquire
 * @param taskAttemptId the task attempt acquiring memory
 * @param maybeGrowPool a callback that potentially grows the size of this pool. It takes in
 *                      one parameter (Long) that represents the desired amount of memory by
 *                      which this pool should be expanded.
 * @param computeMaxPoolSize a callback that returns the maximum allowable size of this pool
 *                           at this given moment. This is not a field because the max pool
 *                           size is variable in certain cases. For instance, in unified
 *                           memory management, the execution pool can be expanded by evicting
 *                           cached blocks, thereby shrinking the storage pool.
 *  * @return the number of bytes granted to the task.
   */
  private[memory] def acquireMemory(
      numBytes: Long,
      taskAttemptId: Long,
      maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => Unit,
      computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized {
    assert(numBytes > 0, s"invalid number of bytes requested: $numBytes")

    // TODO: clean up this clunky method signature

    // Add this task to the taskMemory map just so we can keep an accurate count of the number
    // of active tasks, to let other tasks ramp down their memory in calls to `acquireMemory`
    if (!memoryForTask.contains(taskAttemptId)) {
      memoryForTask(taskAttemptId) = 0L
      // This will later cause waiting tasks to wake up and check numTasks again
      lock.notifyAll()
    }

    // Keep looping until we're either sure that we don't want to grant this request (because this
    // task would have more than 1 / numActiveTasks of the memory) or we have enough free
    // memory to give it (we always let each task get at least 1 / (2 * numActiveTasks)).
    // TODO: simplify this to limit each task to its own slot
    while (true) {
      val numActiveTasks = memoryForTask.keys.size
      val curMem = memoryForTask(taskAttemptId)

      // In every iteration of this loop, we should first try to reclaim any borrowed execution
      // space from storage. This is necessary because of the potential race condition where new
      // storage blocks may steal the free execution memory that this task was waiting for.
      maybeGrowPool(numBytes - memoryFree)

      // Maximum size the pool would have after potentially growing the pool.
      // This is used to compute the upper bound of how much memory each task can occupy. This
      // must take into account potential free memory as well as the amount this pool currently
      // occupies. Otherwise, we may run into SPARK-12155 where, in unified memory management,
      // we did not take into account space that could have been freed by evicting cached blocks.
      val maxPoolSize = computeMaxPoolSize()
      val maxMemoryPerTask = maxPoolSize / numActiveTasks
      val minMemoryPerTask = poolSize / (2 * numActiveTasks)

      // How much we can grant this task; keep its share within 0 <= X <= 1 / numActiveTasks
      val maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - curMem))
      // Only give it as much memory as is free, which might be none if it reached 1 / numTasks
      val toGrant = math.min(maxToGrant, memoryFree)

      // We want to let each task get at least 1 / (2 * numActiveTasks) before blocking;
      // if we can't give it this much now, wait for other tasks to free up memory
      // (this happens if older tasks allocated lots of memory before N grew)
      if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {
        logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free")
        lock.wait()
      } else {
        memoryForTask(taskAttemptId) += toGrant
        return toGrant
      }
    }
    0L  // Never reached
  }
  • 每一個task=>memory都存放在memoryForTask這個mutable.HashMap裏
  • 若是當前要分配的task id不存在,就設爲0L。
  • 申請不到足夠的內存,方法阻塞,直到有足夠的內存(val minMemoryPerTask = poolSize / (2 * numActiveTasks)
  • 若是隻有一個task,可使用所有execution內存

5.Spark UI 數據解釋

內存分配池的堆部分分爲 Eden,Survivor 和 Tenured 三部分空間,而這裏面一共包含了兩個 Survivor 區域,而這兩個 Survivor 區域在任什麼時候候咱們只能用到其中一個,因此咱們可使用下面的公式進行描述:

ExecutorMemory = Eden + 2 * Survivor + Tenured
Runtime.getRuntime.maxMemory = Eden + Survivor + Tenured

Runtime.getRuntime.maxMemory的差別取決於GC配置
spark.executor.memory設爲1g,如圖

clipboard.png

384.1MB = (Runtime.getRuntime.maxMemory (910.5MB) - ReservedMemory (300MB)) × spark.memory.fraction (0.6) × 頁面以1000爲換算單位(1000/1024 × 1000/1024)

clipboard.png

366.3MB = (Runtime.getRuntime.maxMemory (910.5MB) - ReservedMemory (300MB)) × spark.memory.fraction (0.6)

加上1g堆外內存:

spark.memory.offHeap.enabled    true
spark.memory.offHeap.size       1G

clipboard.png

1390.3MB = (Runtime.getRuntime.maxMemory (910.5MB) - ReservedMemory (300MB)) × spark.memory.fraction (0.6) + 1 × 1024MB

clipboard.png

1.5G ≈ 1457.8MB = ((Runtime.getRuntime.maxMemory (910.5MB) - ReservedMemory (300MB)) × spark.memory.fraction (0.6) + 1 × 1024MB) × 頁面以1000爲換算單位(1000/1024 × 1000/1024)

參考文章:
https://www.iteblog.com/archi...(過往記憶)
http://spark.apache.org/docs/...

相關文章
相關標籤/搜索