Spark內存管理

本文基於Spark 1.6.0以後的版本
Spark 1.6.0引入了對堆外內存的管理並對內存管理模型進行了改進,SPARK-11389git

從物理上,分爲堆內內存和堆外內存;從邏輯上分爲execution內存和storage內存。
Execution內存主要是用來知足task執行過程當中某些算子對內存的需求,例如shuffle過程當中map端產生的中間結果須要緩存在內存中。
Storage內存主要用來存儲RDD持久化的數據或者廣播變量。github

Off-heap內存

經過下面的代碼片斷(spark2.1版本),能夠清楚的知道execution內存和storage內存是如何分配Off-heap內存的。apache

protected[this] val maxOffHeapMemory = conf.getSizeAsBytes("spark.memory.offHeap.size", 0)
  protected[this] val offHeapStorageMemory =
    (maxOffHeapMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong

  offHeapExecutionMemoryPool.incrementPoolSize(maxOffHeapMemory - offHeapStorageMemory)
  offHeapStorageMemoryPool.incrementPoolSize(offHeapStorageMemory)

off-heap內存分配

On-heap內存

對於on-heap內存的劃分以下圖緩存

on-heap內存分配

  • 總內存
    spark2.1中經過下面的代碼獲取
    scala val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)安全

  • 系統預留內存數據結構

    預留內存在代碼中是一個常量RESERVED_SYSTEM_MEMORY_BYTES指定爲300M
    這裏要求總內存至少是預留內存的1.5倍val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
    而且會作以下的檢測
    scala if (systemMemory < minSystemMemory) { throw new IllegalArgumentException(s"System memory $systemMemory must " + s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " + s"option or spark.driver.memory in Spark configuration.") } // SPARK-12759 Check executor memory to fail fast if memory is insufficient if (conf.contains("spark.executor.memory")) { val executorMemory = conf.getSizeAsBytes("spark.executor.memory") if (executorMemory < minSystemMemory) { throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " + s"$minSystemMemory. Please increase executor memory using the " + s"--executor-memory option or spark.executor.memory in Spark configuration.") } }app

  • Spark可用內存this

    Spark可用總內存=(系統內存-預留內存)*spark.memory.fraction
    val usableMemory = systemMemory - reservedMemory val memoryFraction = conf.getDouble("spark.memory.fraction", 0.6) (usableMemory * memoryFraction).toLongspa

  • Storage內存
    Storage內存=Spark可用內存*spark.memory.storageFraction
    scala onHeapStorageRegionSize = (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLongscala

  • Execution內存

    Execution內存=Spark可用內存-Storage內存

    private[spark] class UnifiedMemoryManager private[memory] (
      conf: SparkConf,
      val maxHeapMemory: Long,
      onHeapStorageRegionSize: Long,
      numCores: Int)
    extends MemoryManager(
      conf,
      numCores,
      onHeapStorageRegionSize,
      maxHeapMemory - onHeapStorageRegionSize)
  • Storage內存與Execution內存的動態調整

    Storage can borrow as much execution memory as is free until execution reclaims its space. When this happens, cached blocks will be evicted from memory until sufficient borrowed memory is released to satisfy the execution memory request.

Similarly, execution can borrow as much storage memory as is free. However, execution memory is never evicted by storage due to the complexities involved in implementing this. The implication is that attempts to cache blocks may fail if execution has already eaten up most of the storage space, in which case the new blocks will be evicted immediately according to their respective storage levels.

上面這段文字是Spark官方對內存調整的註釋,總結有以下幾點
- 當execution內存有空閒的時候,storage能夠借用execution的內存;當execution須要內存的時候, storage會釋放借用的內存。這樣作是安全的,由於storage內存若是不夠能夠溢出到本地磁盤。

- 當storage內存有空閒的時候也能夠借給execution使用,可是當execution沒有使用完的狀況下是沒法歸還給storage的。由於execution是用來在計算過程當中存儲臨時結果的,若是內存被釋放會致使後續的計算失敗。
  • user可支配內存

    這部份內存徹底由用戶來支配,例如存儲用戶自定義的數據結構。


更多更好的文章請關注數客聯盟

相關文章
相關標籤/搜索