根據以前的一系列分析,咱們對spark做業從建立到調度分發,到執行,最後結果回傳driver的過程有了一個大概的瞭解。可是在分析源碼的過程當中也留下了大量的問題,最主要的就是涉及到的spark中重要的幾個基礎模塊,咱們對這些基礎設施的內部細節並非很瞭解,以前走讀源碼時基本只是大概瞭解每一個模塊的做用以及對外的主要接口,這些重要的模塊包括BlockMananger, MemoryMananger, ShuffleManager, MapOutputTracker, rpc模塊NettyRPCEnv,以及BroadcastManager。 而對於調度系統涉及到的幾個類包括DAGSchedulerManager, TaskSchedulerManager, CoarseGrainedSchedulerBackend, CoarseGrainedExecutorBackend, Executor, TaskRunner,咱們以前已經作了較爲詳細的分析,所以這幾個模塊暫告一段落。
最終調用的是:DiskBlockManager相關方法包括createTempShuffleBlock,getDiskWriter, DiskBlockObjectWriter相關方法,包括write方法和commitAndGet方法
def getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = { driverEndpoint.askSync[IndexedSeq[Seq[BlockManagerId]]]( GetLocationsMultipleBlockIds(blockIds))
case GetLocationsMultipleBlockIds(blockIds) => context.reply(getLocationsMultipleBlockIds(blockIds))
private def getLocations(blockId: BlockId): Seq[BlockManagerId] = { if (blockLocations.containsKey(blockId)) blockLocations.get(blockId).toSeq else Seq.empty }
def putSingle[T: ClassTag]( blockId: BlockId, value: T, level: StorageLevel, tellMaster: Boolean = true): Boolean = { putIterator(blockId, Iterator(value), level, tellMaster) }
private def doPutIterator[T]( blockId: BlockId, iterator: () => Iterator[T], level: StorageLevel, classTag: ClassTag[T], tellMaster: Boolean = true, keepReadLock: Boolean = false): Option[PartiallyUnrolledIterator[T]] = { // doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info => val startTimeMs = System.currentTimeMillis var iteratorFromFailedMemoryStorePut: Option[PartiallyUnrolledIterator[T]] = None // Size of the block in bytes var size = 0L // 若是存儲等級中包含內存級別,那麼咱們優先寫入內存中 if (level.useMemory) { // Put it in memory first, even if it also has useDisk set to true; // We will drop it to disk later if the memory store can't hold it. // 對於不進行序列化的狀況,只能存儲內存中 if (level.deserialized) { memoryStore.putIteratorAsValues(blockId, iterator(), classTag) match { case Right(s) => size = s case Left(iter) => // Not enough space to unroll this block; drop to disk if applicable // 內存空間不夠時,若是存儲等級容許磁盤,則存儲到磁盤中 if (level.useDisk) { logWarning(s"Persisting block $blockId to disk instead.") diskStore.put(blockId) { channel => val out = Channels.newOutputStream(channel) // 注意對於存儲到磁盤的狀況必定是要序列化的 serializerManager.dataSerializeStream(blockId, out, iter)(classTag) } size = diskStore.getSize(blockId) } else { iteratorFromFailedMemoryStorePut = Some(iter) } } } else { // !level.deserialized // 以序列化的形式進行存儲 memoryStore.putIteratorAsBytes(blockId, iterator(), classTag, level.memoryMode) match { case Right(s) => size = s case Left(partiallySerializedValues) => // Not enough space to unroll this block; drop to disk if applicable if (level.useDisk) { logWarning(s"Persisting block $blockId to disk instead.") diskStore.put(blockId) { channel => val out = Channels.newOutputStream(channel) partiallySerializedValues.finishWritingToStream(out) } size = diskStore.getSize(blockId) } else { iteratorFromFailedMemoryStorePut = Some(partiallySerializedValues.valuesIterator) } } } } else if (level.useDisk) {// 對於存儲級別不容許存入內存的狀況,咱們只能選擇存入磁盤 diskStore.put(blockId) { channel => val out = Channels.newOutputStream(channel) // 存儲到磁盤是必定要序列化的 serializerManager.dataSerializeStream(blockId, out, iterator())(classTag) } size = diskStore.getSize(blockId) } // 獲取剛剛剛剛寫入的塊的狀態信息 val putBlockStatus = getCurrentBlockStatus(blockId, info) val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid // 若是塊存儲成功,那麼進行接下來的動做 if (blockWasSuccessfullyStored) { // Now that the block is in either the memory or disk store, tell the master about it. info.size = size // 向driver彙報塊信息 if (tellMaster && info.tellMaster) { reportBlockStatus(blockId, putBlockStatus) } // 更新任務度量系統中關於塊信息的相關統計值 addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus) logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs))) // 若是副本數大於1,那麼須要進行額外的複製 if (level.replication > 1) { val remoteStartTime = System.currentTimeMillis val bytesToReplicate = doGetLocalBytes(blockId, info) // [SPARK-16550] Erase the typed classTag when using default serialization, since // NettyBlockRpcServer crashes when deserializing repl-defined classes. // TODO(ekl) remove this once the classloader issue on the remote end is fixed. val remoteClassTag = if (!serializerManager.canUseKryo(classTag)) { scala.reflect.classTag[Any] } else { classTag } try { replicate(blockId, bytesToReplicate, level, remoteClassTag) } finally { bytesToReplicate.dispose() } logDebug("Put block %s remotely took %s" .format(blockId, Utils.getUsedTimeMs(remoteStartTime))) } } assert(blockWasSuccessfullyStored == iteratorFromFailedMemoryStorePut.isEmpty) iteratorFromFailedMemoryStorePut } }
private def doPutBytes[T]( blockId: BlockId, bytes: ChunkedByteBuffer, level: StorageLevel, classTag: ClassTag[T], tellMaster: Boolean = true, keepReadLock: Boolean = false): Boolean = { doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info => val startTimeMs = System.currentTimeMillis // Since we're storing bytes, initiate the replication before storing them locally. // This is faster as data is already serialized and ready to send. // 啓動副本複製 val replicationFuture = if (level.replication > 1) { Future { // This is a blocking action and should run in futureExecutionContext which is a cached // thread pool. The ByteBufferBlockData wrapper is not disposed of to avoid releasing // buffers that are owned by the caller. replicate(blockId, new ByteBufferBlockData(bytes, false), level, classTag) }(futureExecutionContext) } else { null } val size = bytes.size // 若是緩存級別中包含內存,優先寫入內存中 if (level.useMemory) { // Put it in memory first, even if it also has useDisk set to true; // We will drop it to disk later if the memory store can't hold it. // 是否以序列化形式存儲 val putSucceeded = if (level.deserialized) { val values = serializerManager.dataDeserializeStream(blockId, bytes.toInputStream())(classTag) memoryStore.putIteratorAsValues(blockId, values, classTag) match { case Right(_) => true case Left(iter) => // If putting deserialized values in memory failed, we will put the bytes directly to // disk, so we don't need this iterator and can close it to free resources earlier. iter.close() false } } else { // 若是以序列化格式存儲,則不須要反序列化 val memoryMode = level.memoryMode memoryStore.putBytes(blockId, size, memoryMode, () => { // 若是存在非直接內存,那麼須要將數據拷貝一份到直接內存中 if (memoryMode == MemoryMode.OFF_HEAP && bytes.chunks.exists(buffer => !buffer.isDirect)) { bytes.copy(Platform.allocateDirectBuffer) } else { bytes } }) } // 若是插入內存失敗,而且容許寫入磁盤的話,就將數據寫入磁盤 // 插入內存失敗通常是由於內存不夠引發 if (!putSucceeded && level.useDisk) { logWarning(s"Persisting block $blockId to disk instead.") diskStore.putBytes(blockId, bytes) } } else if (level.useDisk) {// 若是隻容許存儲到磁盤,那就只能存到磁盤了 // 存儲到磁盤的數據必定是序列化的 diskStore.putBytes(blockId, bytes) } // 剛剛插入的塊的信息 val putBlockStatus = getCurrentBlockStatus(blockId, info) val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid if (blockWasSuccessfullyStored) { // Now that the block is in either the memory or disk store, // tell the master about it. info.size = size // 向driver端的BlockManagerMaster組件彙報塊信息 if (tellMaster && info.tellMaster) { reportBlockStatus(blockId, putBlockStatus) } // 更新任務度量值 addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus) } logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs))) if (level.replication > 1) { // Wait for asynchronous replication to finish // 等待以前啓動的副本複製線程完成 // 注意這裏的超時被設成了無窮大 try { ThreadUtils.awaitReady(replicationFuture, Duration.Inf) } catch { case NonFatal(t) => throw new Exception("Error occurred while waiting for replication to finish", t) } } if (blockWasSuccessfullyStored) { None } else { Some(bytes) } }.isEmpty }
