上一節,咱們對BlockManager的主要寫入方法作了一個整理,知道了BlockMananger的主要寫入邏輯,以及對於塊信息的管理。可是,因爲spark的整個存儲模塊是在是很龐大,並且不少細節的邏輯錯綜複雜,若是對於每一個細節都刨根問底,一來精力有限,二來感受也沒有太大的必要,固然若是時間容許確定是越詳細越好,在這裏,個人分析的主要目的是理清存儲模塊的重點邏輯,但願可以提綱契領地把各個模塊的脈絡領出來,創建起對spark-core中各模塊的總體認知,這樣咱們在遇到一些問題的時候就可以很快地知道應該從何處下手,從哪一個具體的模塊去找問題。
好了廢話很少說,本節接着上一節。上一篇,咱們分析了BlockManager的幾個主要的存儲方法,發現BlockManager主要依靠內部的兩個組件MemoryStore和DiskStore來進行實際的數據寫入和塊的管理。
本節,咱們就來看一下MemoryStore這個組件。java
不過,我仍是延續我一向的風格,從外部對一個類的方法調用爲切入點分析這個類的做用和邏輯。
因此,咱們先來看一下上一節對於MemoryStore的主要的方法調用的總結:api
memoryStore.putIteratorAsValues memoryStore.putIteratorAsBytes memoryStore.putBytes
這個方法主要是用於存儲級別是非序列化的狀況,即直接以java對象的形式將數據存放在jvm堆內存上。咱們都知道,在jvm堆內存上存放大量的對象並非什麼好事,gc壓力大,擠佔內存,可能引發頻繁的gc,可是也有明顯的好處,就是省去了序列化和反序列化耗時,並且直接從堆內存取數據顯然比任何其餘方式(磁盤和直接內存)都要快不少,因此對於內存充足且要緩存的數據量本省不是很大的狀況,這種方式也不失爲一種不錯的選擇。數組
private[storage] def putIteratorAsValues[T]( blockId: BlockId, values: Iterator[T], classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = { // 用於存儲java對象的容器 val valuesHolder = new DeserializedValuesHolder[T](classTag) putIterator(blockId, values, classTag, MemoryMode.ON_HEAP, valuesHolder) match { // 存儲成功 case Right(storedSize) => Right(storedSize) // 存儲失敗的狀況 case Left(unrollMemoryUsedByThisBlock) => // ValuesHolder內部的數組和vector會相互轉換 // 數據寫入完成後會將vector中的數據轉移到數組中 val unrolledIterator = if (valuesHolder.vector != null) { valuesHolder.vector.iterator } else { valuesHolder.arrayValues.toIterator } // 返回寫入一半的迭代器、 // 外部調用者一半會選擇關閉這個迭代器以釋放被使用的內存 Left(new PartiallyUnrolledIterator( this, MemoryMode.ON_HEAP, unrollMemoryUsedByThisBlock, unrolled = unrolledIterator, rest = values)) } }
這個方法的邏輯很簡單,做用也比較單一,主要是對實際存儲方法putIterator的返回結果作處理,若是失敗的話,就封裝一個PartiallyUnrolledIterator返回給外部調用這個,調用這個通常須要將這個寫入一半的迭代器關閉。緩存
這個方法看似很長,其實邏輯相對簡單,主要作的事就是把數據一條一條往ValuesHolder中寫,並週期性地檢查內存,若是內存不夠就經過內存管理器MemoryManager申請內存,每次申請當前內存量的1.5倍。
最後,將ValuesHolder中的數據轉移到一個數組中(其實數據在SizeTrackingVector中也是以數組的形式存儲,只不過SizeTrackingVector對象內部處理數組還有一些其餘的簿記量,更爲關鍵的是咱們須要將存儲的數據以同一的接口進行包裝,以利於MemoryStore進行同一管理)。最後還有關鍵的一步,就是釋放展開內存,從新申請存儲內存。
此外,這個過程當中有使用到memoryManager,具體的方法調用是:安全
memoryManager.acquireUnrollMemory(blockId, memory, memoryMode)
------------------------------分割線------------------------------app
private def putIterator[T]( blockId: BlockId, values: Iterator[T], classTag: ClassTag[T], memoryMode: MemoryMode, valuesHolder: ValuesHolder[T]): Either[Long, Long] = { require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") // Number of elements unrolled so far var elementsUnrolled = 0 // Whether there is still enough memory for us to continue unrolling this block var keepUnrolling = true // Initial per-task memory to request for unrolling blocks (bytes). // 用於數據在內存展開的初始的內存使用量 val initialMemoryThreshold = unrollMemoryThreshold // How often to check whether we need to request more memory // 檢查內存的頻率,每寫這麼多條數據就會檢查一次是否須要申請額外的內存 val memoryCheckPeriod = conf.get(UNROLL_MEMORY_CHECK_PERIOD) // Memory currently reserved by this task for this particular unrolling operation // 內存閾值,開始時等於初始閾值 var memoryThreshold = initialMemoryThreshold // Memory to request as a multiple of current vector size // 內存增加因子,每次申請的內存是當前內存的這個倍數 val memoryGrowthFactor = conf.get(UNROLL_MEMORY_GROWTH_FACTOR) // Keep track of unroll memory used by this particular block / putIterator() operation // 當前的塊使用的內存大小 var unrollMemoryUsedByThisBlock = 0L // Request enough memory to begin unrolling // 首先進行初始的內存申請,向MemoryManager申請內存 keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode) if (!keepUnrolling) { logWarning(s"Failed to reserve initial memory threshold of " + s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.") } else { // 若是成功申請到內存,則累加記錄 unrollMemoryUsedByThisBlock += initialMemoryThreshold } // Unroll this block safely, checking whether we have exceeded our threshold periodically // 循環將每條數據寫入容器中valuesHolder while (values.hasNext && keepUnrolling) { valuesHolder.storeValue(values.next()) // 若是寫入數據的條數達到一個週期,那麼就檢查一下是否須要申請額外的內存 if (elementsUnrolled % memoryCheckPeriod == 0) { // 經過valuesHolder獲取已經寫入的數據的評估大小 // 注意,這裏的數據大小隻是估計值,並非十分準確 // 具體如何進行估算的能夠看valuesHolder內部實現 val currentSize = valuesHolder.estimatedSize() // If our vector's size has exceeded the threshold, request more memory // 若是已寫入的數據大小超過了當前閾值 if (currentSize >= memoryThreshold) { // 這裏每次申請的內存量都是不同的 // 每次申請的內存是當前已使用內存的1.5倍(默認) val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) if (keepUnrolling) { // 記錄累積申請的內存量 unrollMemoryUsedByThisBlock += amountToRequest } // New threshold is currentSize * memoryGrowthFactor // 目前已經向內存管理器申請的內存量 memoryThreshold += amountToRequest } } // 記錄插入的數據條數 elementsUnrolled += 1 } // Make sure that we have enough memory to store the block. By this point, it is possible that // the block's actual memory usage has exceeded the unroll memory by a small amount, so we // perform one final call to attempt to allocate additional memory if necessary. // 若是keepUnrolling爲true,說明順利地將全部數據插入, // 並未遇到申請內存失敗的狀況 if (keepUnrolling) { // 將內部的數據轉移到一個數組中 val entryBuilder = valuesHolder.getBuilder() // 數據在內存中的精確大小 val size = entryBuilder.preciseSize // 實際的大小可能大於申請的內存量 // 所以根據實際大小還要再申請額外的內存 if (size > unrollMemoryUsedByThisBlock) { val amountToRequest = size - unrollMemoryUsedByThisBlock keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) if (keepUnrolling) { unrollMemoryUsedByThisBlock += amountToRequest } } if (keepUnrolling) { // 獲取MemoryEntry對象,該對象是對插入數據的包裝 val entry = entryBuilder.build() // Synchronize so that transfer is atomic memoryManager.synchronized { // 這一步主要是釋放申請的展開內存 // 而後申請存儲內存 // 這裏須要弄清楚展開內存的概念 // 展開狀態指的是對象在內存中處於一種比較鬆散的狀態,這樣的狀態方便作一些管理如統計大小等 // 而隨後將對象轉移到數組中,處於一種比較緊實的狀態,數組相對來講佔用的額外內存是比較小的 // 一個數組只是一個對象,只有一個對象頭,能夠用來管理大量的對象 releaseUnrollMemoryForThisTask(memoryMode, unrollMemoryUsedByThisBlock) // 申請存儲內存 val success = memoryManager.acquireStorageMemory(blockId, entry.size, memoryMode) assert(success, "transferring unroll memory to storage memory failed") } // 放入map中管理起來 entries.synchronized { entries.put(blockId, entry) } logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format(blockId, Utils.bytesToString(entry.size), Utils.bytesToString(maxMemory - blocksMemoryUsed))) Right(entry.size) } else { // We ran out of space while unrolling the values for this block logUnrollFailureMessage(blockId, entryBuilder.preciseSize) // 若是失敗,返回已經申請的展開內存 Left(unrollMemoryUsedByThisBlock) } } else { // We ran out of space while unrolling the values for this block logUnrollFailureMessage(blockId, valuesHolder.estimatedSize()) Left(unrollMemoryUsedByThisBlock) } }
咱們再看另外一個方法。套路基本和putIteratorAsValues是同樣同樣的。
最大的區別在於ValuesHolder類型不一樣。非序列化形式存儲使用的是DeserializedMemoryEntry,而序列化形式存儲使用的是SerializedMemoryEntry。jvm
private[storage] def putIteratorAsBytes[T]( blockId: BlockId, values: Iterator[T], classTag: ClassTag[T], memoryMode: MemoryMode): Either[PartiallySerializedBlock[T], Long] = { require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") // Initial per-task memory to request for unrolling blocks (bytes). val initialMemoryThreshold = unrollMemoryThreshold // 字節數組的塊大小,默認是1m val chunkSize = if (initialMemoryThreshold > Int.MaxValue) { logWarning(s"Initial memory threshold of ${Utils.bytesToString(initialMemoryThreshold)} " + s"is too large to be set as chunk size. Chunk size has been capped to " + s"${Utils.bytesToString(Int.MaxValue)}") Int.MaxValue } else { initialMemoryThreshold.toInt } // 字節數組的容器 val valuesHolder = new SerializedValuesHolder[T](blockId, chunkSize, classTag, memoryMode, serializerManager) putIterator(blockId, values, classTag, memoryMode, valuesHolder) match { case Right(storedSize) => Right(storedSize) case Left(unrollMemoryUsedByThisBlock) => // 部分展開,部分以序列化形式存儲的block Left(new PartiallySerializedBlock( this, serializerManager, blockId, valuesHolder.serializationStream, valuesHolder.redirectableStream, unrollMemoryUsedByThisBlock, memoryMode, valuesHolder.bbos, values, classTag)) } }
咱們再來看另外一個被外部調用用來插入數據的方法。很簡單,不說了。ide
def putBytes[T: ClassTag]( blockId: BlockId, size: Long, memoryMode: MemoryMode, _bytes: () => ChunkedByteBuffer): Boolean = { require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") // 首先向內存管理器申請內存 // 這裏申請的是存儲內存,由於要插入的字節數組, // 因此不須要再展開,也就不須要申請展開內存 if (memoryManager.acquireStorageMemory(blockId, size, memoryMode)) { // We acquired enough memory for the block, so go ahead and put it val bytes = _bytes() assert(bytes.size == size) // 這裏直接構建了一個SerializedMemoryEntry // 並放到map中管理起來 val entry = new SerializedMemoryEntry[T](bytes, memoryMode, implicitly[ClassTag[T]]) entries.synchronized { entries.put(blockId, entry) } logInfo("Block %s stored as bytes in memory (estimated size %s, free %s)".format( blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed))) true } else { false } }
經過對上面的三個方法,其實主要是前兩個方法的分析,咱們發現,除了對內存進行簿記管理以外,以及經過內存管理器申請內存以外,插入數據最主要的工做其實都是有ValuesHolder對象來完成的。
ValuesHolder特質有兩個實現類:DeserializedValuesHolder和SerializedValuesHolder。ui
DeserializedValuesHolder對象內部有兩個成員:vector,是一個SizeTrackingVector;arrayValues,是一個存放值的數組,用於在全部數據插入後,將主句轉移到一個數組中,方便包裝成一個MemoryEntry對象。大部分工做是有SizeTrackingVector完成的。this
private class DeserializedValuesHolder[T] (classTag: ClassTag[T]) extends ValuesHolder[T] { // Underlying vector for unrolling the block var vector = new SizeTrackingVector[T]()(classTag) var arrayValues: Array[T] = null override def storeValue(value: T): Unit = { vector += value } override def estimatedSize(): Long = { vector.estimateSize() } override def getBuilder(): MemoryEntryBuilder[T] = new MemoryEntryBuilder[T] { // We successfully unrolled the entirety of this block arrayValues = vector.toArray vector = null override val preciseSize: Long = SizeEstimator.estimate(arrayValues) override def build(): MemoryEntry[T] = DeserializedMemoryEntry[T](arrayValues, preciseSize, classTag) } }
上面提到的SizeTrackingVector繼承了這個特質,除了這個特質,還集成了PrimitiveVector類,可是PrimitiveVector類基本上就是對一個數組的簡單包裝。
SizeTrackingVector最重要的功能:追蹤對象的大小,就是在SizeTracker特之中實現的。
我大體說一下這個特質是如何實現對象大小跟蹤和估算的,代碼實現也並不複雜,感興趣的能夠看一看,限於篇幅這裏就不貼了。
可見這麼作並非什麼精確,可是因爲是抽樣,並且抽樣週期越日後面越長,因此對於數據插入的效率影響很小,並且這種不精確性其實在後續的內存檢查過程當中是有考慮到的。在全部數據插入完的收尾工做中,會對對象大小作一次精確計算。此外,熟悉spark內存管理的同窗應該知道,其實spark通常會配置一個安全因子(通常是0.9),也就是說只是用配置的內存大小的90%,就是爲了儘量地減小這種不精確的內存估算形成OOM的可能性。
private class SerializedValuesHolder[T]( blockId: BlockId, chunkSize: Int, classTag: ClassTag[T], memoryMode: MemoryMode, serializerManager: SerializerManager) extends ValuesHolder[T] { val allocator = memoryMode match { case MemoryMode.ON_HEAP => ByteBuffer.allocate _ // 調用unsafe的本地方法申請直接內存 // 這個方法之因此沒有調用ByteBuffer.allocateDirect方法 // 是由於這個方法分配的直接內存大小收到參數MaxDirectMemorySize限制 // 因此這裏繞過ByteBuffer.allocateDirect方法,經過反射和unsafe類建立直接內存對象 case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _ } val redirectableStream = new RedirectableOutputStream val bbos = new ChunkedByteBufferOutputStream(chunkSize, allocator) redirectableStream.setOutputStream(bbos) val serializationStream: SerializationStream = { val autoPick = !blockId.isInstanceOf[StreamBlockId] val ser = serializerManager.getSerializer(classTag, autoPick).newInstance() // 包裝壓縮流和序列化流 ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream)) } // 寫入方法,寫入的對象通過序列化,壓縮, // 而後通過ChunkedByteBufferOutputStream被分割成一個個的字節數組塊 override def storeValue(value: T): Unit = { serializationStream.writeObject(value)(classTag) } override def estimatedSize(): Long = { bbos.size } override def getBuilder(): MemoryEntryBuilder[T] = new MemoryEntryBuilder[T] { // We successfully unrolled the entirety of this block serializationStream.close() override def preciseSize(): Long = bbos.size override def build(): MemoryEntry[T] = SerializedMemoryEntry[T](bbos.toChunkedByteBuffer, memoryMode, classTag) } }
大概看一下,主要的邏輯很簡單,這裏面有幾個注意點:
MemoryStore.scala這個文件中乍看代碼有八百多行,可是其實很大部分代碼是一些輔助類,比較核心的寫入邏輯也就是前面提到的幾個方法,再加上核心的兩個類DeserializedValuesHolder和SerializedValuesHolder實現了以對象或字節數組的形式存儲數據。