本文基於Spark 1.6.0以後的版本
Spark 1.6.0引入了對堆外內存的管理並對內存管理模型進行了改進,SPARK-11389。git
從物理上,分爲堆內內存和堆外內存;從邏輯上分爲execution內存和storage內存。
Execution內存主要是用來知足task執行過程當中某些算子對內存的需求,例如shuffle過程當中map端產生的中間結果須要緩存在內存中。
Storage內存主要用來存儲RDD持久化的數據或者廣播變量。github
經過下面的代碼片斷(spark2.1版本),能夠清楚的知道execution內存和storage內存是如何分配Off-heap內存的。apache
protected[this] val maxOffHeapMemory = conf.getSizeAsBytes("spark.memory.offHeap.size", 0) protected[this] val offHeapStorageMemory = (maxOffHeapMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong offHeapExecutionMemoryPool.incrementPoolSize(maxOffHeapMemory - offHeapStorageMemory) offHeapStorageMemoryPool.incrementPoolSize(offHeapStorageMemory)
對於on-heap內存的劃分以下圖緩存
總內存
spark2.1中經過下面的代碼獲取
scala val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
安全
系統預留內存數據結構
預留內存在代碼中是一個常量RESERVED_SYSTEM_MEMORY_BYTES
指定爲300M
這裏要求總內存至少是預留內存的1.5倍val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
而且會作以下的檢測
scala if (systemMemory < minSystemMemory) { throw new IllegalArgumentException(s"System memory $systemMemory must " + s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " + s"option or spark.driver.memory in Spark configuration.") } // SPARK-12759 Check executor memory to fail fast if memory is insufficient if (conf.contains("spark.executor.memory")) { val executorMemory = conf.getSizeAsBytes("spark.executor.memory") if (executorMemory < minSystemMemory) { throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " + s"$minSystemMemory. Please increase executor memory using the " + s"--executor-memory option or spark.executor.memory in Spark configuration.") } }
app
Spark可用內存this
Spark可用總內存=(系統內存-預留內存)*spark.memory.fraction
val usableMemory = systemMemory - reservedMemory val memoryFraction = conf.getDouble("spark.memory.fraction", 0.6) (usableMemory * memoryFraction).toLong
spa
Storage內存
Storage內存=Spark可用內存*spark.memory.storageFraction
scala onHeapStorageRegionSize = (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong
scala
Execution內存
Execution內存=Spark可用內存-Storage內存
private[spark] class UnifiedMemoryManager private[memory] ( conf: SparkConf, val maxHeapMemory: Long, onHeapStorageRegionSize: Long, numCores: Int) extends MemoryManager( conf, numCores, onHeapStorageRegionSize, maxHeapMemory - onHeapStorageRegionSize)
Storage內存與Execution內存的動態調整
Storage can borrow as much execution memory as is free until execution reclaims its space. When this happens, cached blocks will be evicted from memory until sufficient borrowed memory is released to satisfy the execution memory request.
Similarly, execution can borrow as much storage memory as is free. However, execution memory is never evicted by storage due to the complexities involved in implementing this. The implication is that attempts to cache blocks may fail if execution has already eaten up most of the storage space, in which case the new blocks will be evicted immediately according to their respective storage levels.
上面這段文字是Spark官方對內存調整的註釋,總結有以下幾點
- 當execution內存有空閒的時候,storage能夠借用execution的內存;當execution須要內存的時候, storage會釋放借用的內存。這樣作是安全的,由於storage內存若是不夠能夠溢出到本地磁盤。
- 當storage內存有空閒的時候也能夠借給execution使用,可是當execution沒有使用完的狀況下是沒法歸還給storage的。由於execution是用來在計算過程當中存儲臨時結果的,若是內存被釋放會致使後續的計算失敗。
user可支配內存
這部份內存徹底由用戶來支配,例如存儲用戶自定義的數據結構。
更多更好的文章請關注數客聯盟