根據以前的一系列分析,咱們對spark做業從建立到調度分發,到執行,最後結果回傳driver的過程有了一個大概的瞭解。可是在分析源碼的過程當中也留下了大量的問題,最主要的就是涉及到的spark中重要的幾個基礎模塊,咱們對這些基礎設施的內部細節並非很瞭解,以前走讀源碼時基本只是大概瞭解每一個模塊的做用以及對外的主要接口,這些重要的模塊包括BlockMananger, MemoryMananger, ShuffleManager, MapOutputTracker, rpc模塊NettyRPCEnv,以及BroadcastManager。 而對於調度系統涉及到的幾個類包括DAGSchedulerManager, TaskSchedulerManager, CoarseGrainedSchedulerBackend, CoarseGrainedExecutorBackend, Executor, TaskRunner,咱們以前已經作了較爲詳細的分析,所以這幾個模塊暫告一段落。
本篇,咱們來看一下spark中最基礎的一個的模塊--存儲系統BlockManager的內部實現。java
首先,咱們來整理一下在一個做業的運行過程當中都有哪些地方使用到了BlockManager。數組
DAGScheduler.getCacheLocs。這個方法的調用是在提交一個stage時,須要獲取分區的偏向位置時會調用該方法。咱們知道rdd是能夠緩存的,而rdd的緩存就是經過blockManager來管理的,有一個專門的RDDBlockId用來表示一個RDD緩存塊的惟一標識。緩存
最終調用的方法是:blockManagerMaster.getLocations(blockIds)
廣播變量。在DAGscheduler中提交stage時須要把rdd和ShuffleDependency(對於ResultStage則是一個函數)對象序列化用於網絡傳輸,實際上序列化後的字節數組是經過broadcastManager組件進行網絡傳輸的,而broadcastManager實際又是經過BlockMananger來將要廣播的數據存儲成block,並在executor端發送rpc請求向BlockManangerMaster請求數據。每一個廣播變量會對應一個TorrentBroadcast對象,TorrentBroadcast對象內的writeBlocks和readBlocks是讀寫廣播變量的方法,網絡
最終調用的方法是:blockManager.putSingle和blockManager.putBytes
Shuffle的map階段輸出。若是咱們沒有啓動外部shuffle服務及ExternalShuffle,那麼就會用spark本身的shuffle機制,在map階段輸出時經過blockManager對輸出的文件進行管理。shuffle這部分主要使用的是DiskBlockManager組件。app
最終調用的是:DiskBlockManager相關方法包括createTempShuffleBlock,getDiskWriter, DiskBlockObjectWriter相關方法,包括write方法和commitAndGet方法
任務運行結果序列化後傳回driver。這裏分爲兩種狀況,若是結果序列化後體積較小,小於maxDirectResultSize,則直接經過rpc接口傳回,若是體積較大,就須要先經過blockManager寫入executor幾點的內存和磁盤中,而後在driver端進行拉取。async
最終調用的是:blockManager.putBytes
此外,咱們還注意到,以上幾種情形中使用的BlockId都是不一樣的,具體能夠看一下BlockId.scala文件中關於各類BlockId的定義。
因此,接下來,咱們的思路就很清晰了,以上面提到的對BlockManager的方法調用爲切入點進行分析。函數
這個方法用於獲取指定的blockId對應的塊所在存儲位置。源碼分析
def getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = { driverEndpoint.askSync[IndexedSeq[Seq[BlockManagerId]]]( GetLocationsMultipleBlockIds(blockIds))
}this
這裏向driverEndpoint發送了一個GetLocations消息,注意這裏的driverEndpoint並非DriverEndpoint的端點引用,在SparkEnv的構造過程咱們能夠看到,這是一個BlockManagerMasterEndpoint端點的引用。因此咱們須要在BlockManagerMasterEndpoint中尋找對於該消息的處理。注意,因爲這裏調用了ask方法,因此在服務端是由receiveAndReply方法來處理並響應的。spa
咱們截取了對GetLocations處理的部分代碼
case GetLocationsMultipleBlockIds(blockIds) => context.reply(getLocationsMultipleBlockIds(blockIds))
調用的是getLocations方法:
private def getLocations(blockId: BlockId): Seq[BlockManagerId] = { if (blockLocations.containsKey(blockId)) blockLocations.get(blockId).toSeq else Seq.empty }
這個方法很簡單,就是直接從緩存中查找blockId對應的位置,位置信息用BlockManagerId封裝。那麼緩存中的信息何時加進去呢?固然是寫入新的block並更新block位置信息的時候,後面的會分析到。
這個方法寫入一個有單個對象組成的塊,
def putSingle[T: ClassTag]( blockId: BlockId, value: T, level: StorageLevel, tellMaster: Boolean = true): Boolean = { putIterator(blockId, Iterator(value), level, tellMaster) }
能夠看到,把對象包裝成了一個只有一個元素的迭代器,而後調用putIterator方法,最後調用doPutIterator方法
上面的方法,最終調用了doPutIterator方法。
private def doPutIterator[T]( blockId: BlockId, iterator: () => Iterator[T], level: StorageLevel, classTag: ClassTag[T], tellMaster: Boolean = true, keepReadLock: Boolean = false): Option[PartiallyUnrolledIterator[T]] = { // doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info => val startTimeMs = System.currentTimeMillis var iteratorFromFailedMemoryStorePut: Option[PartiallyUnrolledIterator[T]] = None // Size of the block in bytes var size = 0L // 若是存儲等級中包含內存級別,那麼咱們優先寫入內存中 if (level.useMemory) { // Put it in memory first, even if it also has useDisk set to true; // We will drop it to disk later if the memory store can't hold it. // 對於不進行序列化的狀況,只能存儲內存中 if (level.deserialized) { memoryStore.putIteratorAsValues(blockId, iterator(), classTag) match { case Right(s) => size = s case Left(iter) => // Not enough space to unroll this block; drop to disk if applicable // 內存空間不夠時,若是存儲等級容許磁盤,則存儲到磁盤中 if (level.useDisk) { logWarning(s"Persisting block $blockId to disk instead.") diskStore.put(blockId) { channel => val out = Channels.newOutputStream(channel) // 注意對於存儲到磁盤的狀況必定是要序列化的 serializerManager.dataSerializeStream(blockId, out, iter)(classTag) } size = diskStore.getSize(blockId) } else { iteratorFromFailedMemoryStorePut = Some(iter) } } } else { // !level.deserialized // 以序列化的形式進行存儲 memoryStore.putIteratorAsBytes(blockId, iterator(), classTag, level.memoryMode) match { case Right(s) => size = s case Left(partiallySerializedValues) => // Not enough space to unroll this block; drop to disk if applicable if (level.useDisk) { logWarning(s"Persisting block $blockId to disk instead.") diskStore.put(blockId) { channel => val out = Channels.newOutputStream(channel) partiallySerializedValues.finishWritingToStream(out) } size = diskStore.getSize(blockId) } else { iteratorFromFailedMemoryStorePut = Some(partiallySerializedValues.valuesIterator) } } } } else if (level.useDisk) {// 對於存儲級別不容許存入內存的狀況,咱們只能選擇存入磁盤 diskStore.put(blockId) { channel => val out = Channels.newOutputStream(channel) // 存儲到磁盤是必定要序列化的 serializerManager.dataSerializeStream(blockId, out, iterator())(classTag) } size = diskStore.getSize(blockId) } // 獲取剛剛剛剛寫入的塊的狀態信息 val putBlockStatus = getCurrentBlockStatus(blockId, info) val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid // 若是塊存儲成功,那麼進行接下來的動做 if (blockWasSuccessfullyStored) { // Now that the block is in either the memory or disk store, tell the master about it. info.size = size // 向driver彙報塊信息 if (tellMaster && info.tellMaster) { reportBlockStatus(blockId, putBlockStatus) } // 更新任務度量系統中關於塊信息的相關統計值 addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus) logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs))) // 若是副本數大於1,那麼須要進行額外的複製 if (level.replication > 1) { val remoteStartTime = System.currentTimeMillis val bytesToReplicate = doGetLocalBytes(blockId, info) // [SPARK-16550] Erase the typed classTag when using default serialization, since // NettyBlockRpcServer crashes when deserializing repl-defined classes. // TODO(ekl) remove this once the classloader issue on the remote end is fixed. val remoteClassTag = if (!serializerManager.canUseKryo(classTag)) { scala.reflect.classTag[Any] } else { classTag } try { replicate(blockId, bytesToReplicate, level, remoteClassTag) } finally { bytesToReplicate.dispose() } logDebug("Put block %s remotely took %s" .format(blockId, Utils.getUsedTimeMs(remoteStartTime))) } } assert(blockWasSuccessfullyStored == iteratorFromFailedMemoryStorePut.isEmpty) iteratorFromFailedMemoryStorePut } }
總結一下這段代碼的主要邏輯:
從上面的步驟能夠看到,在完成數據寫入後,會經過rpc調用向BlockManagerMaster彙報塊的信息,這也解答了blockManagerMaster.getLocations方法從內存的map結構中查詢塊的位置信息的來源。
單純就存儲數據來講,最重要的無疑是內存管理器MemoryStore和磁盤管理器DiskStore。
對於MemoryStore和DiskStore調用的存儲方法有:
memoryStore.putIteratorAsValues memoryStore.putIteratorAsBytes diskStore.put(blockId: BlockId)(writeFunc: WritableByteChannel => Unit): Unit diskStore.getSize(blockId)
咱們再來接着看另外一個寫入方法,putBytes,即寫入字節數組數據。它的實際寫入的邏輯在doPutBytes方法中,咱們看一下這個方法:
這個方法的主要步驟與doPutIterator方法差很少。只不過doPutIterator方法插入的是java對象,若是存儲級別要求序列化或者存儲到磁盤時,須要將對象序列化。
private def doPutBytes[T]( blockId: BlockId, bytes: ChunkedByteBuffer, level: StorageLevel, classTag: ClassTag[T], tellMaster: Boolean = true, keepReadLock: Boolean = false): Boolean = { doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info => val startTimeMs = System.currentTimeMillis // Since we're storing bytes, initiate the replication before storing them locally. // This is faster as data is already serialized and ready to send. // 啓動副本複製 val replicationFuture = if (level.replication > 1) { Future { // This is a blocking action and should run in futureExecutionContext which is a cached // thread pool. The ByteBufferBlockData wrapper is not disposed of to avoid releasing // buffers that are owned by the caller. replicate(blockId, new ByteBufferBlockData(bytes, false), level, classTag) }(futureExecutionContext) } else { null } val size = bytes.size // 若是緩存級別中包含內存,優先寫入內存中 if (level.useMemory) { // Put it in memory first, even if it also has useDisk set to true; // We will drop it to disk later if the memory store can't hold it. // 是否以序列化形式存儲 val putSucceeded = if (level.deserialized) { val values = serializerManager.dataDeserializeStream(blockId, bytes.toInputStream())(classTag) memoryStore.putIteratorAsValues(blockId, values, classTag) match { case Right(_) => true case Left(iter) => // If putting deserialized values in memory failed, we will put the bytes directly to // disk, so we don't need this iterator and can close it to free resources earlier. iter.close() false } } else { // 若是以序列化格式存儲,則不須要反序列化 val memoryMode = level.memoryMode memoryStore.putBytes(blockId, size, memoryMode, () => { // 若是存在非直接內存,那麼須要將數據拷貝一份到直接內存中 if (memoryMode == MemoryMode.OFF_HEAP && bytes.chunks.exists(buffer => !buffer.isDirect)) { bytes.copy(Platform.allocateDirectBuffer) } else { bytes } }) } // 若是插入內存失敗,而且容許寫入磁盤的話,就將數據寫入磁盤 // 插入內存失敗通常是由於內存不夠引發 if (!putSucceeded && level.useDisk) { logWarning(s"Persisting block $blockId to disk instead.") diskStore.putBytes(blockId, bytes) } } else if (level.useDisk) {// 若是隻容許存儲到磁盤,那就只能存到磁盤了 // 存儲到磁盤的數據必定是序列化的 diskStore.putBytes(blockId, bytes) } // 剛剛插入的塊的信息 val putBlockStatus = getCurrentBlockStatus(blockId, info) val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid if (blockWasSuccessfullyStored) { // Now that the block is in either the memory or disk store, // tell the master about it. info.size = size // 向driver端的BlockManagerMaster組件彙報塊信息 if (tellMaster && info.tellMaster) { reportBlockStatus(blockId, putBlockStatus) } // 更新任務度量值 addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus) } logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs))) if (level.replication > 1) { // Wait for asynchronous replication to finish // 等待以前啓動的副本複製線程完成 // 注意這裏的超時被設成了無窮大 try { ThreadUtils.awaitReady(replicationFuture, Duration.Inf) } catch { case NonFatal(t) => throw new Exception("Error occurred while waiting for replication to finish", t) } } if (blockWasSuccessfullyStored) { None } else { Some(bytes) } }.isEmpty }
對於MemoryStore和DiskStore調用的方法有:
memoryStore.putBytes diskStore.putBytes(blockId, bytes)
綜上,咱們把一個spark做業運行過程當中須要調用到BlockManager的時機以及調用的BlockManager的一些寫入數據的方法大體整理了一下。BlockManager主要是經過內部的兩個組件MemoryStore和DiskStore來管理數據向內存或磁盤寫入的。此外DiskBlockManager組件主要是用來管理Block和磁盤文件之間的對應關係,分配文件路徑,管理本地文件系統路徑等做用。對於MemoryStore和DiskStore的調用主要有以下幾個方法:
memoryStore.putIteratorAsValues memoryStore.putIteratorAsBytes diskStore.put(blockId: BlockId)(writeFunc: WritableByteChannel => Unit): Unit diskStore.getSize(blockId) memoryStore.putBytes diskStore.putBytes(blockId, bytes)