轉載:時延軍.http://shiyanjun.cnhtml
Spark在Map階段調度運行的ShuffleMapTask,最後會生成.data和.index文件,能夠經過個人這篇文章 Spark Shuffle過程分析:Map階段處理流程 瞭解具體流程和詳情。同時,在Executor上運行一個ShuffleMapTask,返回了一個MapStatus對象,下面是ShuffleMapTask執行後返回結果的相關代碼片斷:數組
var writer: ShuffleWriter[Any, Any] = null try { val manager = SparkEnv.get.shuffleManager writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) writer.stop(success = true).get } catch { case e: Exception => try { if (writer != null) { writer.stop(success = false) } } catch { case e: Exception => log.debug("Could not stop writer", e) } throw e }
若是ShuffleMapTask執行過程沒有發生異常,則最後執行的調用爲:網絡
writer.stop(success = true).get
這裏返回了一個MapStatus類型的對象,MapStatus的定義以下所示:app
private[spark] sealed trait MapStatus { def location: BlockManagerId def getSizeForBlock(reduceId: Int): Long }
其中包含了運行ShuffleMapTask所在的BlockManager的地址,以及後續Reduce階段每一個ResultTask計算須要Map輸出的大小(Size)。咱們能夠看下MapStatus如何建立的,在SortShuffleWriter的write()方法中,能夠看到MapStatus的建立,以下代碼所示:ide
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
繼續跟蹤能夠看到,調用了MapStatus的伴生對象的apply()方法:函數
def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = { if (uncompressedSizes.length > 2000) { HighlyCompressedMapStatus(loc, uncompressedSizes) } else { new CompressedMapStatus(loc, uncompressedSizes) } }
uncompressedSizes表示Partition的個數,若是大於2000則建立HighlyCompressedMapStatus對象,不然建立CompressedMapStatus對象,他們具體的實現能夠參考源碼。oop
含有Shuffle過程的Spark Application示例優化
咱們先給出一個簡單的Spark Application程序代碼,以下所示:ui
al rdd = sc.textFile("/temp/*.h") val finalRdd = rdd.flatMap(line => line.split("\\s+")).map(w => (w, 1)).reduceByKey(_ + _) finalRdd.toDebugString finalRdd.saveAsTextFile("/temp/output")
經過RDD的toDebugString()方法,打印調試信息:this
scala> finalRdd.toDebugString res0: String = (133) ShuffledRDD[6] at reduceByKey at <console>:30 [] +-(133) MapPartitionsRDD[5] at map at <console>:30 [] | MapPartitionsRDD[4] at flatMap at <console>:30 [] | /temp/*.h MapPartitionsRDD[3] at textFile at <console>:29 [] | /temp/*.h HadoopRDD[2] at textFile at <console>:29 []
能夠看到這個過程當中,調用了reduceByKey(),建立了一個ShuffledRDD,這在計算過程當中會執行Shuffle操做。
ShuffleMapTask執行結果上報處理流程
Spark Application提交之後,會生成ShuffleMapStage和/或ResultStage,而一個ShuffleMapStage對應一組實際須要運行的ShuffleMapTask,ResultStage對應一組實際須要運行ResultTask,每組Task都是有TaskSetManager來管理的,而且只有ShuffleMapStage對應的一組ShuffleMapTask都運行成功結束之後,纔會調度ResultStage。 因此,咱們這裏關注的是,當ShuffleMapStage中最後一個ShuffleMapTask運行成功後,如何將Map階段的信息上報給調度器(Driver上的TaskScheduler和DAGScheduler),瞭解這個處理流程對理解後續的Reduce階段處理相當重要,這個過程的詳細處理流程,以下圖所示: 咱們將整個流程按照順序分爲以下幾個過程來描述:
ShuffleMapTask完成後處理結果 Executor會啓動一個TaskRunner線程來運行ShuffleMapTask,ShuffleMapTask完成後,會對結果進行序列化處理,代碼以下所示:
val directResult = new DirectTaskResult(valueBytes, accumUpdates) val serializedDirectResult = ser.serialize(directResult) val resultSize = serializedDirectResult.limit
根據序列化後結果serializedDirectResult的大小resultSize,會進行一些優化,代碼以下所示:
val serializedResult: ByteBuffer = { if (maxResultSize > 0 && resultSize > maxResultSize) { logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " + s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " + s"dropping it.") ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize)) } else if (resultSize > maxDirectResultSize) { val blockId = TaskResultBlockId(taskId) env.blockManager.putBytes( blockId, new ChunkedByteBuffer(serializedDirectResult.duplicate()), StorageLevel.MEMORY_AND_DISK_SER) logInfo( s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)") ser.serialize(new IndirectTaskResult[Any](blockId, resultSize)) } else { logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver") serializedDirectResult } }
若是結果大小沒有超過指定的DirectTaskResult的最大限制值maxDirectResultSize,就直接將上面的DirectTaskResult的序列化結果發送給Driver;若是結果大小超過了Task結果的最大限制值maxResultSize,則直接丟棄結果;不然,當結果大小介於maxDirectResultSize與maxResultSize之間時,會基於Task ID建立一個TaskResultBlockId,而後經過BlockManager將結果暫時保存在Executor上(DiskStore或MemoryStore),以便後續計算直接請求獲取該數據。 最後,結果會調用CoarseGrainedExecutorBackend的statusUpdate方法,以下所示:
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
將Task對應的運行狀態、運行結果發送給Driver。
Driver獲取Task運行結果 集羣模式下,Driver端負責接收Task運行結果的是CoarseGrainedSchedulerBackend,它內部有一個DriverEndpoint來負責實際網絡通訊,以及接收Task狀態及其結果,代碼以下所示:
case StatusUpdate(executorId, taskId, state, data) => scheduler.statusUpdate(taskId, state, data.value) if (TaskState.isFinished(state)) { executorDataMap.get(executorId) match { case Some(executorInfo) => executorInfo.freeCores += scheduler.CPUS_PER_TASK makeOffers(executorId) case None => // Ignoring the update since we don't know about the executor. logWarning(s"Ignored task status update ($taskId state $state) " + s"from unknown executor with ID $executorId") } }
若是消息類型爲StatusUpdate,則首先直接調用了TaskSchedulerImpl的statusUpdate()方法,來獲取Task的運行狀態及其結果,代碼以下所示:
case StatusUpdate(executorId, taskId, state, data) => scheduler.statusUpdate(taskId, state, data.value) if (TaskState.isFinished(state)) { executorDataMap.get(executorId) match { case Some(executorInfo) => executorInfo.freeCores += scheduler.CPUS_PER_TASK makeOffers(executorId) case None => // Ignoring the update since we don't know about the executor. logWarning(s"Ignored task status update ($taskId state $state) " + s"from unknown executor with ID $executorId") } }
若是Task狀態是TaskState.FINISHED,則經過TaskResultGetter來獲取Task運行返回的結果,這裏存在DirectTaskResult和IndirectTaskResult兩種類型的結果,他們的處理方式不一樣:對於DirectTaskResult類型的結果,以下所示:
case directResult: DirectTaskResult[_] => if (!taskSetManager.canFetchMoreResults(serializedData.limit())) { return } // deserialize "value" without holding any lock so that it won't block other threads. directResult.value(taskResultSerializer.get())
直接從DirectTaskResult中就能夠經過反序列化獲得結果,而對於IndirectTaskResult類型的結果,邏輯相對複雜一些,以下所示:
case directResult: DirectTaskResult[_] => if (!taskSetManager.canFetchMoreResults(serializedData.limit())) { return } // deserialize "value" without holding any lock so that it won't block other threads. directResult.value(taskResultSerializer.get())
結果大小超過指定的限制值,在ShuffleMapTask運行過程當中會直接經過BlockManager存儲到Executor的內存/磁盤上,這裏就會根據結果Block ID,經過BlockManager來獲取到結果對應的Block數據。
更新Driver端Task、Stage狀態,並調度Stage運行 獲取到ShuffleMapTask運行的結果數據後,須要更新TaskSetManager中對應的狀態信息,以便爲後續調度Task運行提供決策支持,代碼以下所示:
scheduler.handleSuccessfulTask(taskSetManager, tid, result)
上面代碼調用了TaskSetManager的handleSuccessfulTask()方法,更新相關狀態,同時繼續更新DAGScheduler中對應的狀態,代碼片斷以下所示:
sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info) maybeFinishTaskSet()
調用DAGScheduler的taskEnded()方法,更新Stage信息。若是一個ShuffleMapTask運行完成後,並且是對應的ShuffleMapStage中最後一個ShuffleMapTask,則該ShuffleMapStage也完成了,則會註冊該ShuffleMapStage運行獲得的全部Map輸出結果,代碼以下所示:
mapOutputTracker.registerMapOutputs( shuffleStage.shuffleDep.shuffleId, shuffleStage.outputLocInMapOutputTrackerFormat(), changeEpoch = true)
上面MapOutputTracker維護了一個ConcurrentHashMap[Int, Array[MapStatus]]內存結構,用來管理每一個ShuffleMapTask運行完成返回的結果數據,其中Key是Shuffle ID,Value使用數組記錄每一個Map ID對應的輸出結果信息。 下面代碼判斷ShuffleMapStage是否可用,從而進行相應的處理:
if (!shuffleStage.isAvailable) { // Some tasks had failed; let's resubmit this shuffleStage. // TODO: Lower-level scheduler should also deal with this logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name + ") because some of its tasks had failed: " + shuffleStage.findMissingPartitions().mkString(", ")) submitStage(shuffleStage) } else { // Mark any map-stage jobs waiting on this stage as finished if (shuffleStage.mapStageJobs.nonEmpty) { val stats = mapOutputTracker.getStatistics(shuffleStage.shuffleDep) for (job <- shuffleStage.mapStageJobs) { markMapStageJobAsFinished(job, stats) } } submitWaitingChildStages(shuffleStage) }
若是ShuffleMapStage不可用,說明還有某些Partition對應的結果沒有計算(或者某些計算失敗),Spark會從新提交該ShuffleMapStage;若是可用,則說明當前ShuffleMapStage已經運行完成,更新對應的狀態和結果信息:標記ShuffleMapStage已經完成,同時提交Stage依賴關係鏈中相鄰下游的Stage運行。若是後面是ResultStage,則會提交該ResultStage運行。
釋放資源、從新調度Task運行 一個ShuffleMapTask運行完成,要釋放掉對應的Executor佔用的資源,在Driver端會增長對應的資源列表,同時調度Task到該釋放的Executor上運行,可見CoarseGrainedSchedulerBackend.DriverEndpoint中對應的處理邏,代碼以下所示:
if (TaskState.isFinished(state)) { executorDataMap.get(executorId) match { case Some(executorInfo) => executorInfo.freeCores += scheduler.CPUS_PER_TASK makeOffers(executorId)
上面makeOffers()方法,會調度一個Task到該executorId標識的Executor上運行。若是ShuffleMapStage已經完成,那麼這裏可能會調度ResultStage階段的ResultTask運行。
Reduce階段處理流程
上面咱們給出的例子中,執行reduceByKey後,因爲上游的RDD沒有按照key執行分區操做,因此一定會建立一個ShuffledRDD,能夠在PairRDDFunctions類的源碼中看到combineByKeyWithClassTag方法,實現代碼以下所示:
def combineByKeyWithClassTag[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope { require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0 if (keyClass.isArray) { if (mapSideCombine) { throw new SparkException("Cannot use map-side combining with array keys.") } if (partitioner.isInstanceOf[HashPartitioner]) { throw new SparkException("Default partitioner cannot partition array keys.") } } val aggregator = new Aggregator[K, V, C]( self.context.clean(createCombiner), self.context.clean(mergeValue), self.context.clean(mergeCombiners)) if (self.partitioner == Some(partitioner)) { self.mapPartitions(iter => { val context = TaskContext.get() new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true) } else { new ShuffledRDD[K, V, C](self, partitioner) .setSerializer(serializer) .setAggregator(aggregator) .setMapSideCombine(mapSideCombine) } }
這裏,由於咱們給出的例子的上下文中,self.partitioner == Some(partitioner)不成立,因此最終建立了一個ShuffledRDD對象。因此,對於Reduce階段的處理流程,咱們基於ShuffledRDD的處理過程來進行分析。 咱們從ResultTask類開始,該類中實現了runTask()方法,代碼以下所示:
override def runTask(context: TaskContext): U = { // Deserialize the RDD and the func using the broadcast variables. val deserializeStartTime = System.currentTimeMillis() val ser = SparkEnv.get.closureSerializer.newInstance() val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime func(context, rdd.iterator(partition, context)) }
其中,最核心的就是上面的rdd.iterator()調用,具體處理過程,以下圖所示: 最終,它用來計算一個RDD,即對應ShuffledRDD的計算。iterator()方法是在RDD類中給出的,以下所示:
final def iterator(split: Partition, context: TaskContext): Iterator[T] = { if (storageLevel != StorageLevel.NONE) { getOrCompute(split, context) } else { computeOrReadCheckpoint(split, context) } }
跟蹤getOrCompute()方法,最終應該是在ShuffledRDD類的compute()方法中定義。
ShuffledRDD計算 ShuffledRDD對應的compute方法的實現代碼,以下所示:
override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = { val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]] SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context) .read() .asInstanceOf[Iterator[(K, C)]] }
上面主要是經過BlockStoreShuffleReader的read()方法,來實現ShuffledRDD的計算,咱們經過下面的序列圖來看一下詳細的執行流程: 跟蹤Map的輸出結果,是基於Executor端的MapOutputTracker與Driver端的MapOutputTrackerMaster來實現的,其中MapOutputTrackerMaster做爲Server端,MapOutputTracker做爲Client端。Driver端管理了一個Spark Application計算程序的ShuffleMapStage中全部ShuffleMapTask的輸出,因此在Reduce過程當中Executor會經過MapOutputTracker與Driver的MapOutputTrackerMaster進行通訊獲取。 調用BlockStoreShuffleReader的read()方法,最終獲得了Reduce過程當中須要的輸入,即ShuffleMapTask的輸出結果所在的位置。一般,爲了可以使計算在數據本地進行,每一個ResultTask運行所在的Executor節點會存在對應的Map輸出,是經過BlockManager來管理這些數據的,經過Block ID來標識。因此,上圖中最後返回了一個BlockManager ID及受其管理的一個Block ID列表,而後Executor上的ResultTask就可以根據BlockManager ID來獲取到對應的Map輸出數據,從而進行數據的計算。 ResultTask運行完成後,最終返回一個記錄的迭代器,此時計算獲得的最終結果數據,是在各個ResultTask運行所在的Executor上的,而數據又是按Block來存儲的,是經過BlockManager來管理的。
保存結果RDD 根據前面的程序示例,最後調用了RDD的saveAsTextFile(),這會又生成一個ResultStage,進而對應着一組ResultTask。保存結果RDD的處理流程,以下圖所示: 上面整個流程,會執行設置RDD輸出到HDFS的Writer(一個寫文件的函數)、提交ResultStage、構建包含ResultTask的TaskSet、調度ResultTask到指定Executor上執行這幾個核心的過程。實際上,在每一個Executor上運行的ResultTask的核心處理邏輯,主要是下面這段函數代碼:
val writer = new SparkHadoopWriter(hadoopConf) writer.preSetup() val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => { val taskAttemptId = (context.taskAttemptId % Int.MaxValue).toInt val (outputMetrics, callback) = SparkHadoopWriterUtils.initHadoopOutputMetrics(context) writer.setup(context.stageId, context.partitionId, taskAttemptId) writer.open() var recordsWritten = 0L Utils.tryWithSafeFinallyAndFailureCallbacks { while (iter.hasNext) { val record = iter.next() writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef]) // Update bytes written metric every few records SparkHadoopWriterUtils.maybeUpdateOutputMetrics(outputMetrics, callback, recordsWritten) recordsWritten += 1 } }(finallyBlock = writer.close()) writer.commit() outputMetrics.setBytesWritten(callback()) outputMetrics.setRecordsWritten(recordsWritten) }
還記得咱們在計算ShuffledRDD的過程當中,最終的ResultTask生成了一個結果的迭代器。當調用saveAsTextFile()時,ResultStage對應的一組ResultTask會在Executor上運行,將每一個迭代器對應的結果數據保存到HDFS上。