一、DagScheduler分析git
DagScheduler功能主要是負責RDD的各個stage的分解和任務提交。Stage分解是從觸發任務調度過程的finalStage開始倒推尋找父stage,若是父stage沒有提交任務則循環提交缺失的父stage。每一個stage有一個父RDD的概念,根據分區數的多少建立多個任務(Task)。github
Task的調度實際是經過TaskSchedulerImp來完成的,TaskSchedulerImp里根據環境部署的不一樣又會使用不一樣的Backend,好比Yarn集羣、獨立集羣等其Backend是不同的,這裏先有個概念,先不深究Backend。apache
這裏先看看DagScheduler的核心邏輯把。裏面首先要研究的一個方法:app
def submitMissingTasks(stage: Stage, jobId: Int)ide
該方法就是提交stage執行,爲何叫這個名稱呢?說明這裏的stage是需先須要提交執行的,沒有其餘依賴的stage還未執行了。函數
submitMissingTasks方法會根據RDD的依賴關係建立兩種task,ResultTask和ShuffleMapTask。spa
一步步來,只看關鍵代碼,由於總體代碼太多了不利於理解關鍵邏輯。scala
1.1 生成序列化的taskBinarycode
taskBinaryBytes = stage match { case stage: ShuffleMapStage => JavaUtils.bufferToArray( closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef)) case stage: ResultStage => JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef)) }
taskBinaryBytes待會是要封裝成對像分發到遠端Executor上執行的,因此必須是可序列化的。部署
二者最主要區別就是:ShuffleMapStage的入參是依賴的shuffleDep;而ResultStage的入參是函數的定義func。
1.2 生成task
如今有了taskBinaryBytes,下一步就是生成Task了。
val tasks: Seq[Task[_]] = try { stage match { case stage: ShuffleMapStage => stage.pendingPartitions.clear() partitionsToCompute.map { id => val locs = taskIdToLocations(id) val part = partitions(id) stage.pendingPartitions += id new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber, taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier()) } case stage: ResultStage => partitionsToCompute.map { id => val p: Int = stage.partitions(id) val part = partitions(p) val locs = taskIdToLocations(id) new ResultTask(stage.id, stage.latestInfo.attemptNumber, taskBinary, part, locs, id, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier()) } } } catch { case NonFatal(e) => abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e)) runningStages -= stage return }
兩種Task類型:ShuffleMapTask和ResultTask。這裏要主要的是對Task而言,有多少分區(partition)就會生成多少個Task,Task是到分區維度的,而不是到RDD維度的,這個概念必定要明確。
1.3 提交Task
最後一步就是提交任務執行。這裏就要用到taskScheduler了,固然了,這裏的taskScheduler目前就是指TaskSchedulerImp。
taskScheduler.submitTasks(new TaskSet( tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
DagScheduler裏還有一個方法這裏能夠提一下,就是:
submitWaitingChildStages(stage)
這個方法是提交等待當前stage執行的等待stage,這樣DAG的整個調度過程就完整了。
二、Task執行
兩種Task類型:ShuffleMapTask和ResultTask。
2.1 ResultTask
咱們先看ResultTask的執行,它相對比較簡單,核心方式是runTask,核心代碼:
override def runTask(context: TaskContext): U = { val ser = SparkEnv.get.closureSerializer.newInstance() val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) func(context, rdd.iterator(partition, context)) }
反序列化出來RDD和func,而後執行rdd的iterator方法獲取數據集,並在此數據集上執行func函數,要注意實際上這是一次迭代過程而不是屢次迭代過程。
2.2 ShuffleMapTask
ShuffleMapTask任務的執行相對複雜些。
核心方法仍是runTask,核心代碼:
override def runTask(context: TaskContext): MapStatus = { val ser = SparkEnv.get.closureSerializer.newInstance() val rddAndDep = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) val rdd = rddAndDep._1 val dep = rddAndDep._2 dep.shuffleWriterProcessor.write(rdd, dep, partitionId, context, partition) }
首先反序列化出RDD和依賴項ShuffleDependency。而後用ShuffleWriterProcessor寫數據到RDD。
這裏的dep其實沒太大意義,主要就是來判斷是否要進行合併使用的,不影響理解整個shuffle流程,因此咱們能夠先不要管dep:
dep.shuffleWriterProcessor.write(rdd, dep, partitionId, context, partition)
這裏的rdd實際就是ShuffleMapTask所要生成的數據集。這句代碼究竟是什麼意思呢? ShuffleWriterProcessor其實是將數據集寫到了BlockManager上去的,先看看ShuffleWriterProcessor的含義。
2.3 ShuffleWriterProcessor
ShuffleWriterProcessor的關鍵方法的定義先看一下。
def write(rdd: RDD[_],dep: ShuffleDependency[_, _, _], partitionId: Int, context: TaskContext,partition: Partition): MapStatus = { var writer: ShuffleWriter[Any, Any] = null val manager = SparkEnv.get.shuffleManager writer = manager.getWriter[Any, Any]( dep.shuffleHandle, partitionId, context, createMetricsReporter(context)) writer.write( rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) writer.stop(success = true).get }
ShuffleManager實際上就是BlockManager,管理塊空間的。
Write是Shuffle寫入器,寫到BlockManager去;rdd.iterator(partition, context)就是當前Shuffle類型的RDD定義的數據集,dep是rdd計算數據集時依賴的RDD(這裏的dep沒多大意思先無論)。
這段代碼的做用就是將shuffle rdd數據集輸出到BlockManager上,在讀取RDD的數據時,若是該RDD是shuffle類型,則須要到BlockManager上去讀取,這裏就是這個做用。
2.4 Shuffle RDD的相關概念
Shuffle類的RDD是指這類RDD的compute方法是依賴於其餘RDD的,這裏的其餘RDD能夠是多個。執行shuffle的RDD的計算過程的時候,是將一到多個依賴RDD的迭代器的輸出做爲數據源迭代器,在此之上執行本身的操做。因此shuffle RDD的compute方法裏必定會用到依賴RDD的iterator方法。
能夠看看CoGroupedRDD的源碼,就能很快的理解shuffle的含義。
附錄:github源碼地址:
https://github.com/apache/spark/tree/master/core/src/main/scala/org/apache/spark