spark存儲模塊以內存存儲--MemeoryStore

MemeoryStore

上一節,咱們對BlockManager的主要寫入方法作了一個整理,知道了BlockMananger的主要寫入邏輯,以及對於塊信息的管理。可是,因爲spark的整個存儲模塊是在是很龐大,並且不少細節的邏輯錯綜複雜,若是對於每一個細節都刨根問底,一來精力有限,二來感受也沒有太大的必要,固然若是時間容許確定是越詳細越好,在這裏,個人分析的主要目的是理清存儲模塊的重點邏輯,但願可以提綱契領地把各個模塊的脈絡領出來,創建起對spark-core中各模塊的總體認知,這樣咱們在遇到一些問題的時候就可以很快地知道應該從何處下手,從哪一個具體的模塊去找問題。
好了廢話很少說,本節接着上一節。上一篇,咱們分析了BlockManager的幾個主要的存儲方法,發現BlockManager主要依靠內部的兩個組件MemoryStore和DiskStore來進行實際的數據寫入和塊的管理。
本節,咱們就來看一下MemoryStore這個組件。java

不過,我仍是延續我一向的風格,從外部對一個類的方法調用爲切入點分析這個類的做用和邏輯。
因此,咱們先來看一下上一節對於MemoryStore的主要的方法調用的總結:api

memoryStore.putIteratorAsValues
memoryStore.putIteratorAsBytes
memoryStore.putBytes

memoryStore.putIteratorAsValues

這個方法主要是用於存儲級別是非序列化的狀況,即直接以java對象的形式將數據存放在jvm堆內存上。咱們都知道,在jvm堆內存上存放大量的對象並非什麼好事,gc壓力大,擠佔內存,可能引發頻繁的gc,可是也有明顯的好處,就是省去了序列化和反序列化耗時,並且直接從堆內存取數據顯然比任何其餘方式(磁盤和直接內存)都要快不少,因此對於內存充足且要緩存的數據量本省不是很大的狀況,這種方式也不失爲一種不錯的選擇。數組

private[storage] def putIteratorAsValues[T](
  blockId: BlockId,
  values: Iterator[T],
  classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {

// 用於存儲java對象的容器
val valuesHolder = new DeserializedValuesHolder[T](classTag)

putIterator(blockId, values, classTag, MemoryMode.ON_HEAP, valuesHolder) match {
    // 存儲成功
  case Right(storedSize) => Right(storedSize)
    // 存儲失敗的狀況
  case Left(unrollMemoryUsedByThisBlock) =>
    // ValuesHolder內部的數組和vector會相互轉換
    // 數據寫入完成後會將vector中的數據轉移到數組中
    val unrolledIterator = if (valuesHolder.vector != null) {
      valuesHolder.vector.iterator
    } else {
      valuesHolder.arrayValues.toIterator
    }

    // 返回寫入一半的迭代器、
    // 外部調用者一半會選擇關閉這個迭代器以釋放被使用的內存
    Left(new PartiallyUnrolledIterator(
      this,
      MemoryMode.ON_HEAP,
      unrollMemoryUsedByThisBlock,
      unrolled = unrolledIterator,
      rest = values))
}
}

這個方法的邏輯很簡單,做用也比較單一,主要是對實際存儲方法putIterator的返回結果作處理,若是失敗的話,就封裝一個PartiallyUnrolledIterator返回給外部調用這個,調用這個通常須要將這個寫入一半的迭代器關閉。緩存

MemoryStore.putIterator

這個方法看似很長,其實邏輯相對簡單,主要作的事就是把數據一條一條往ValuesHolder中寫,並週期性地檢查內存,若是內存不夠就經過內存管理器MemoryManager申請內存,每次申請當前內存量的1.5倍。
最後,將ValuesHolder中的數據轉移到一個數組中(其實數據在SizeTrackingVector中也是以數組的形式存儲,只不過SizeTrackingVector對象內部處理數組還有一些其餘的簿記量,更爲關鍵的是咱們須要將存儲的數據以同一的接口進行包裝,以利於MemoryStore進行同一管理)。最後還有關鍵的一步,就是釋放展開內存,從新申請存儲內存。
此外,這個過程當中有使用到memoryManager,具體的方法調用是:安全

memoryManager.acquireUnrollMemory(blockId, memory, memoryMode)

------------------------------分割線------------------------------app

private def putIterator[T](
  blockId: BlockId,
  values: Iterator[T],
  classTag: ClassTag[T],
  memoryMode: MemoryMode,
  valuesHolder: ValuesHolder[T]): Either[Long, Long] = {
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")

// Number of elements unrolled so far
var elementsUnrolled = 0
// Whether there is still enough memory for us to continue unrolling this block
var keepUnrolling = true
// Initial per-task memory to request for unrolling blocks (bytes).
// 用於數據在內存展開的初始的內存使用量
val initialMemoryThreshold = unrollMemoryThreshold
// How often to check whether we need to request more memory
// 檢查內存的頻率,每寫這麼多條數據就會檢查一次是否須要申請額外的內存
val memoryCheckPeriod = conf.get(UNROLL_MEMORY_CHECK_PERIOD)
// Memory currently reserved by this task for this particular unrolling operation
// 內存閾值,開始時等於初始閾值
var memoryThreshold = initialMemoryThreshold
// Memory to request as a multiple of current vector size
// 內存增加因子,每次申請的內存是當前內存的這個倍數
val memoryGrowthFactor = conf.get(UNROLL_MEMORY_GROWTH_FACTOR)
// Keep track of unroll memory used by this particular block / putIterator() operation
// 當前的塊使用的內存大小
var unrollMemoryUsedByThisBlock = 0L

// Request enough memory to begin unrolling
// 首先進行初始的內存申請,向MemoryManager申請內存
keepUnrolling =
  reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode)

if (!keepUnrolling) {
  logWarning(s"Failed to reserve initial memory threshold of " +
    s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
} else {
  // 若是成功申請到內存,則累加記錄
  unrollMemoryUsedByThisBlock += initialMemoryThreshold
}

// Unroll this block safely, checking whether we have exceeded our threshold periodically
// 循環將每條數據寫入容器中valuesHolder
while (values.hasNext && keepUnrolling) {
  valuesHolder.storeValue(values.next())
  // 若是寫入數據的條數達到一個週期,那麼就檢查一下是否須要申請額外的內存
  if (elementsUnrolled % memoryCheckPeriod == 0) {
    // 經過valuesHolder獲取已經寫入的數據的評估大小
    // 注意,這裏的數據大小隻是估計值,並非十分準確
    // 具體如何進行估算的能夠看valuesHolder內部實現
    val currentSize = valuesHolder.estimatedSize()
    // If our vector's size has exceeded the threshold, request more memory
    // 若是已寫入的數據大小超過了當前閾值
    if (currentSize >= memoryThreshold) {
      // 這裏每次申請的內存量都是不同的
      // 每次申請的內存是當前已使用內存的1.5倍(默認)
      val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
      keepUnrolling =
        reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode)
      if (keepUnrolling) {
        // 記錄累積申請的內存量
        unrollMemoryUsedByThisBlock += amountToRequest
      }
      // New threshold is currentSize * memoryGrowthFactor
      // 目前已經向內存管理器申請的內存量
      memoryThreshold += amountToRequest
    }
  }
  // 記錄插入的數據條數
  elementsUnrolled += 1
}

// Make sure that we have enough memory to store the block. By this point, it is possible that
// the block's actual memory usage has exceeded the unroll memory by a small amount, so we
// perform one final call to attempt to allocate additional memory if necessary.
// 若是keepUnrolling爲true,說明順利地將全部數據插入,
// 並未遇到申請內存失敗的狀況
if (keepUnrolling) {
  // 將內部的數據轉移到一個數組中
  val entryBuilder = valuesHolder.getBuilder()
  // 數據在內存中的精確大小
  val size = entryBuilder.preciseSize
  // 實際的大小可能大於申請的內存量
  // 所以根據實際大小還要再申請額外的內存
  if (size > unrollMemoryUsedByThisBlock) {
    val amountToRequest = size - unrollMemoryUsedByThisBlock
    keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode)
    if (keepUnrolling) {
      unrollMemoryUsedByThisBlock += amountToRequest
    }
  }

  if (keepUnrolling) {
    // 獲取MemoryEntry對象,該對象是對插入數據的包裝
    val entry = entryBuilder.build()
    // Synchronize so that transfer is atomic
    memoryManager.synchronized {
      // 這一步主要是釋放申請的展開內存
      // 而後申請存儲內存
      // 這裏須要弄清楚展開內存的概念
      // 展開狀態指的是對象在內存中處於一種比較鬆散的狀態,這樣的狀態方便作一些管理如統計大小等
      // 而隨後將對象轉移到數組中,處於一種比較緊實的狀態,數組相對來講佔用的額外內存是比較小的
      // 一個數組只是一個對象,只有一個對象頭,能夠用來管理大量的對象
      releaseUnrollMemoryForThisTask(memoryMode, unrollMemoryUsedByThisBlock)
      // 申請存儲內存
      val success = memoryManager.acquireStorageMemory(blockId, entry.size, memoryMode)
      assert(success, "transferring unroll memory to storage memory failed")
    }

    // 放入map中管理起來
    entries.synchronized {
      entries.put(blockId, entry)
    }

    logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format(blockId,
      Utils.bytesToString(entry.size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
    Right(entry.size)
  } else {
    // We ran out of space while unrolling the values for this block
    logUnrollFailureMessage(blockId, entryBuilder.preciseSize)
    // 若是失敗,返回已經申請的展開內存
    Left(unrollMemoryUsedByThisBlock)
  }
} else {
  // We ran out of space while unrolling the values for this block
  logUnrollFailureMessage(blockId, valuesHolder.estimatedSize())
  Left(unrollMemoryUsedByThisBlock)
}
}

memoryStore.putIteratorAsBytes

咱們再看另外一個方法。套路基本和putIteratorAsValues是同樣同樣的。
最大的區別在於ValuesHolder類型不一樣。非序列化形式存儲使用的是DeserializedMemoryEntry,而序列化形式存儲使用的是SerializedMemoryEntry。jvm

private[storage] def putIteratorAsBytes[T](
  blockId: BlockId,
  values: Iterator[T],
  classTag: ClassTag[T],
  memoryMode: MemoryMode): Either[PartiallySerializedBlock[T], Long] = {

require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")

// Initial per-task memory to request for unrolling blocks (bytes).
val initialMemoryThreshold = unrollMemoryThreshold
// 字節數組的塊大小,默認是1m
val chunkSize = if (initialMemoryThreshold > Int.MaxValue) {
  logWarning(s"Initial memory threshold of ${Utils.bytesToString(initialMemoryThreshold)} " +
    s"is too large to be set as chunk size. Chunk size has been capped to " +
    s"${Utils.bytesToString(Int.MaxValue)}")
  Int.MaxValue
} else {
  initialMemoryThreshold.toInt
}

// 字節數組的容器
val valuesHolder = new SerializedValuesHolder[T](blockId, chunkSize, classTag,
  memoryMode, serializerManager)

putIterator(blockId, values, classTag, memoryMode, valuesHolder) match {
  case Right(storedSize) => Right(storedSize)
  case Left(unrollMemoryUsedByThisBlock) =>
    // 部分展開,部分以序列化形式存儲的block
    Left(new PartiallySerializedBlock(
      this,
      serializerManager,
      blockId,
      valuesHolder.serializationStream,
      valuesHolder.redirectableStream,
      unrollMemoryUsedByThisBlock,
      memoryMode,
      valuesHolder.bbos,
      values,
      classTag))
}
}

memoryStore.putBytes

咱們再來看另外一個被外部調用用來插入數據的方法。很簡單,不說了。ide

def putBytes[T: ClassTag](
  blockId: BlockId,
  size: Long,
  memoryMode: MemoryMode,
  _bytes: () => ChunkedByteBuffer): Boolean = {
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
// 首先向內存管理器申請內存
// 這裏申請的是存儲內存,由於要插入的字節數組,
// 因此不須要再展開,也就不須要申請展開內存
if (memoryManager.acquireStorageMemory(blockId, size, memoryMode)) {
  // We acquired enough memory for the block, so go ahead and put it
  val bytes = _bytes()
  assert(bytes.size == size)
  // 這裏直接構建了一個SerializedMemoryEntry
  // 並放到map中管理起來
  val entry = new SerializedMemoryEntry[T](bytes, memoryMode, implicitly[ClassTag[T]])
  entries.synchronized {
    entries.put(blockId, entry)
  }
  logInfo("Block %s stored as bytes in memory (estimated size %s, free %s)".format(
    blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
  true
} else {
  false
}
}

小結

經過對上面的三個方法,其實主要是前兩個方法的分析,咱們發現,除了對內存進行簿記管理以外,以及經過內存管理器申請內存以外,插入數據最主要的工做其實都是有ValuesHolder對象來完成的。
ValuesHolder特質有兩個實現類:DeserializedValuesHolder和SerializedValuesHolder。ui

DeserializedValuesHolder

DeserializedValuesHolder對象內部有兩個成員:vector,是一個SizeTrackingVector;arrayValues,是一個存放值的數組,用於在全部數據插入後,將主句轉移到一個數組中,方便包裝成一個MemoryEntry對象。大部分工做是有SizeTrackingVector完成的。this

private class DeserializedValuesHolder[T] (classTag: ClassTag[T]) extends ValuesHolder[T] {
  // Underlying vector for unrolling the block
  var vector = new SizeTrackingVector[T]()(classTag)
  var arrayValues: Array[T] = null

  override def storeValue(value: T): Unit = {
    vector += value
  }

  override def estimatedSize(): Long = {
    vector.estimateSize()
  }

  override def getBuilder(): MemoryEntryBuilder[T] = new MemoryEntryBuilder[T] {
    // We successfully unrolled the entirety of this block
    arrayValues = vector.toArray
    vector = null

    override val preciseSize: Long = SizeEstimator.estimate(arrayValues)

    override def build(): MemoryEntry[T] =
      DeserializedMemoryEntry[T](arrayValues, preciseSize, classTag)
  }
}

SizeTracker

上面提到的SizeTrackingVector繼承了這個特質,除了這個特質,還集成了PrimitiveVector類,可是PrimitiveVector類基本上就是對一個數組的簡單包裝。
SizeTrackingVector最重要的功能:追蹤對象的大小,就是在SizeTracker特之中實現的。

我大體說一下這個特質是如何實現對象大小跟蹤和估算的,代碼實現也並不複雜,感興趣的能夠看一看,限於篇幅這裏就不貼了。

  • 每插入必定數量的數據(姑且稱之爲週期),就會對當前的對象進行一次取樣,而這個取樣的週期會愈來愈長,以1.1倍的速率增加;
  • 取樣就是計算對象大小,並與前一次取樣做比較,並且只會保留最近兩次的取樣數據;
  • 每次取樣其實就是獲取兩個數據,當前對象大小,當前插入的數據條數;
  • 這樣與上一次取樣一比較,就可以計算出每條數據的大小了;
  • 最後,在返回整個對象大小時,是拿最近一次取樣時記錄下的對象大小,以及根據最近的狀況估算的每條數據的大小乘以自從上次取樣以來新插入的數據量,兩者相加做爲對象大小的估算值,

可見這麼作並非什麼精確,可是因爲是抽樣,並且抽樣週期越日後面越長,因此對於數據插入的效率影響很小,並且這種不精確性其實在後續的內存檢查過程當中是有考慮到的。在全部數據插入完的收尾工做中,會對對象大小作一次精確計算。此外,熟悉spark內存管理的同窗應該知道,其實spark通常會配置一個安全因子(通常是0.9),也就是說只是用配置的內存大小的90%,就是爲了儘量地減小這種不精確的內存估算形成OOM的可能性。

SerializedValuesHolder

private class SerializedValuesHolder[T](
    blockId: BlockId,
    chunkSize: Int,
    classTag: ClassTag[T],
    memoryMode: MemoryMode,
    serializerManager: SerializerManager) extends ValuesHolder[T] {
  val allocator = memoryMode match {
    case MemoryMode.ON_HEAP => ByteBuffer.allocate _
      // 調用unsafe的本地方法申請直接內存
      // 這個方法之因此沒有調用ByteBuffer.allocateDirect方法
      // 是由於這個方法分配的直接內存大小收到參數MaxDirectMemorySize限制
      // 因此這裏繞過ByteBuffer.allocateDirect方法,經過反射和unsafe類建立直接內存對象
    case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
  }

  val redirectableStream = new RedirectableOutputStream
  val bbos = new ChunkedByteBufferOutputStream(chunkSize, allocator)
  redirectableStream.setOutputStream(bbos)
  val serializationStream: SerializationStream = {
    val autoPick = !blockId.isInstanceOf[StreamBlockId]
    val ser = serializerManager.getSerializer(classTag, autoPick).newInstance()
    // 包裝壓縮流和序列化流
    ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream))
  }

  // 寫入方法,寫入的對象通過序列化,壓縮,
  // 而後通過ChunkedByteBufferOutputStream被分割成一個個的字節數組塊
  override def storeValue(value: T): Unit = {
    serializationStream.writeObject(value)(classTag)
  }

  override def estimatedSize(): Long = {
    bbos.size
  }

  override def getBuilder(): MemoryEntryBuilder[T] = new MemoryEntryBuilder[T] {
    // We successfully unrolled the entirety of this block
    serializationStream.close()

    override def preciseSize(): Long = bbos.size

    override def build(): MemoryEntry[T] =
      SerializedMemoryEntry[T](bbos.toChunkedByteBuffer, memoryMode, classTag)
  }
}

大概看一下,主要的邏輯很簡單,這裏面有幾個注意點:

  • 對於直接內存分配,spark並無使用jdk的高級api,而是反射配合unsafe類分配直接內存,這樣能夠繞過jvm參數MaxDirectMemorySize的限制,這也體現了spark的做者儘量的下降用戶使用難度
  • 另外,咱們看到序列化流其實通過了層層包裝(典型的裝飾器模式),序列化和壓縮以及分塊是比較重要的幾個點,感興趣的話能夠深究,序列化和壓縮若是深刻了解都是很大的課題,因此這裏也僅僅是走馬觀花,不深究了。

總結

MemoryStore.scala這個文件中乍看代碼有八百多行,可是其實很大部分代碼是一些輔助類,比較核心的寫入邏輯也就是前面提到的幾個方法,再加上核心的兩個類DeserializedValuesHolder和SerializedValuesHolder實現了以對象或字節數組的形式存儲數據。

相關文章
相關標籤/搜索