DAGScheduler概述:是一個面向Stage層面的調度器;數組
主要入參有:緩存
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,resultHandler, localProperties.get)app
rdd: final RDD;函數
cleanedFunc: 計算每一個分區的函數;spa
resultHander: 結果偵聽器;code
主要功能以下:blog
一、接收用戶提交的job;遞歸
二、將job根據類型劃分爲不一樣的stage,記錄哪些RDD、Stage被物化,並在每個stage內產生一系列的task,並封裝成TaskSet;ci
三、決定每一個Task的最佳位置(任務在數據所在的節點上運行),並結合當前的緩存狀況;將TaskSet提交給TaskScheduler;rem
四、從新提交Shuffle輸出丟失的Stage給TaskScheduler;
注:一個Stage內部的錯誤不是由shuffle輸出丟失形成的,DAGScheduler是無論的,由TaskScheduler負責嘗試從新提交task執行;
以以下示例描述Job提交過程:
val sc = new SparkContext("local[2]", "WordCount", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR"))) val textFile = sc.textFile("xxx") val result = textFile.flatMap(line => line.split("\t")).map(word => (word, 1)).reduceByKey(_ + _) result.collect
RDD.collect
==>sc.runJob #####至此完成了將RDD提交DAGScheduler#####
val results = new Array[U](partitions.size) //result存放的是返回值,數組大小爲最後一個RDD的partition的個數
==>dagScheduler.runJob(rdd, func, partitions, resultHandler......) //DAGScheduler的輸入:RDD and partitions to compute
==>dagScheduler.submitJob
==>eventProcessActor ! JobSubmitted
def receive = { case JobSubmitted(jobId, rdd, func, partitions, allowLocal...) => dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal...) } //完成job到stage的轉換,生成finalStage並提交 private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], allowLocal: Boolean...){ //注意:該RDD是final RDD,而不是一系列的RDD,用finalRDD來建立finalStage //newStage操做對應會生成新的result stage或者shuffle stage:內部有一個isShuffleMap變量來標識該stage是shuffle or result var finalStage: Stage = newStage(rdd, partitions.size, None, jobId, Some(callSite)) //使用finalStage來構建job val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties) //對於簡單的job,沒有依賴關係而且只有一個partition,該類job會使用local thread處理而並不是提交到TaskScheduler上處理 if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) { runLocally(job) } else { submitStage(finalStage) //提交finalStage } }
handleJobSubmitted方法完成了job到stage的轉換,生成finalStage;每一個job都有一個finalStage。
newStage()方法分析:根據finalRDD生成finalStage
private def newStage( rdd: RDD[_], numTasks: Int, //task個數就是partitions個數 shuffleDep: Option[ShuffleDependency[_,_]], jobId: Int, callSite: Option[String] = None) : Stage = { val id = nextStageId.getAndIncrement() val stage = new Stage(id, rdd, numTasks, shuffleDep, getParentStages(rdd, jobId), jobId, callSite) ...... } private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = { val parents = new HashSet[Stage] val visited = new HashSet[RDD[_]] def visit(r: RDD[_]) { if (!visited(r)) { visited += r for (dep <- r.dependencies) { dep match { case shufDep: ShuffleDependency[_,_] => parents += getShuffleMapStage(shufDep, jobId) case _ => visit(dep.rdd) } } } } visit(rdd) parents.toList } private def getShuffleMapStage(shuffleDep: ShuffleDependency[_,_], jobId: Int): Stage = { shuffleToMapStage.get(shuffleDep.shuffleId) match { case Some(stage) => stage case None => val stage = newOrUsedStage(shuffleDep.rdd, shuffleDep.rdd.partitions.size, shuffleDep, jobId) shuffleToMapStage(shuffleDep.shuffleId) = stage stage }
newStage()後產生的finalStage中已經包含了該stage的全部依賴的父Stage;
經過getParentStages()方法構建該stage的依賴關係;反向visit RDD DAG圖,遇到窄依賴就將依賴的RDD加入到stage,遇到寬依賴就切開並遞歸寬依賴的stage;
生成stage實例,stage的id經過nextStageId的值加一獲得,task的個數就是partitions的個數;
有兩種類型的Stage:ShuffleStage和ResultStage;
Stage內部有一個isShuffleMap變量標識該Stage是shuffle仍是result類型;
Spark對stage的劃分是按照寬依賴來進行區分的:根據RDD的依賴關係,若是遇到寬依賴則建立ShuffleStage;
submitStage()方法分析:計算stage之間的依賴關係(Stage DAG)並對依賴關係進行處理
private def submitStage(stage: Stage) { if (!waiting(stage) && !running(stage) && !failed(stage)) { val missing = getMissingParentStages(stage).sortBy(_.id) //根據final stage發現是否有parent stage if (missing == Nil) { // 若是計算中發現當前的stage沒有任何依賴或者全部的依賴都已經準備完畢,則提交task submitMissingTasks(stage, jobId.get) running += stage //設置當前的stage爲running,由於當前的stage沒有未處理完的依賴的stage } else { //若是有parent stage,須要先submit parent, 由於stage之間須要順序執行 for (parent <- missing) { submitStage(parent) } waiting += stage //當前stage放入到waiting列表中,表示該stage須要等待parent先執行完成 } } } //根據final stage的parents找出全部的parent stage private def getMissingParentStages(stage: Stage): List[Stage] = { ...... dep match { //若是是ShuffleDependency,則新建一個shuffle map stage,且該stage是可用的話則加入missing中 case shufDep: ShuffleDependency[_,_] => //ShuffleDependecy val mapStage = getShuffleMapStage(shufDep, stage.jobId) if (!mapStage.isAvailable) { missing += mapStage } case narrowDep: NarrowDependency[_] => //NarrowDependecy visit(narrowDep.rdd) } }
getMissParentStages(stage)處理步驟:
一、根據該stage獲得該stage的parent,也就是RDD的依賴關係,生成parentStage是經過RDD的dependencies;
二、若是依賴關係是寬依賴,則生成一個mapStage來做爲finalStage的parent;也就是說對於須要shuffle操做的job,會生成mapStage和finalStage進行處理
三、若是依賴關係是窄依賴,不會生成新的stage。也就是說對於不須要shuffle的job只須要一個finalStage;
注意:getMissParentStages(stage)獲得的結果集是按照stageid的降序排列的
submitStage()處理步驟:
一、計算該stage的getMissParentStages(),若是當前stage沒有任何依賴或者全部的依賴都已執行完,則提交該stage;
二、若是發現該stage有依賴的stage未執行,則先執行完全部依賴的父stage(根據getMissParentStages()方法獲得的結果集降序來執行stage);
submitMissingTasks()方法分析:把stage根據parition拆分紅task(決定每一個Task的最佳位置)生成TaskSet,並提交到TaskScheduler
private def submitMissingTasks(stage: Stage, jobId: Int) { //首先根據stage所依賴的RDD的partition的分佈,會產生出與partition數量相等的task var tasks = ArrayBuffer[Task[_]]() //對於finalStage或是mapStage會產生不一樣的task。 //檢查該stage時是否ShuffleMap,若是是則生成ShuffleMapTask if (stage.isShuffleMap) { //mapStage:表示還有其餘stage依賴此stage for (p <- 0 until stage.numPartitions if stage.outputLocs(p) == Nil) { //task根據partition的locality進行分佈 val locs = getPreferredLocs(stage.rdd, p) tasks += new ShuffleMapTask(stage.id, stage.rdd, stage.shuffleDep.get, p, locs) } } else { //finalStage:該類型stage直接輸出結果生成ResultTask val job = resultStageToJob(stage) for (id <- 0 until job.numPartitions if !job.finished(id)) { val partition = job.partitions(id) val locs = getPreferredLocs(stage.rdd, partition) //因爲是ResultTask,所以須要傳入定義的func,也就是若是處理結果返回 tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id) } } //向TaskSchuduler提交任務,以stage爲單位,一個stage對應一個TaskSet taskSched.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties)) }
submitMissingTask()方法的處理步驟:
一、經過stage.isShuffleMap來決定生成的是ShuffleMapTask仍是ResultTask;
二、若是是ShuffleMapTask則根據stage所依賴的RDD的partition分佈,產生和partition數量相同的task,這些task根據partition的locality進行分佈’
三、把stage對應生成全部的task封裝到一個TaskSet中,提交給TaskScheduler的submitTasks()方法進行調度;
從新提交shuffle輸出丟失的stage
case ResubmitFailedStages => dagScheduler.resubmitFailedStages() private[scheduler] def resubmitFailedStages() { if (failedStages.size > 0) { logInfo("Resubmitting failed stages") clearCacheLocs() val failedStagesCopy = failedStages.toArray failedStages.clear() for (stage <- failedStagesCopy.sortBy(_.jobId)) { submitStage(stage) } } submitWaitingStages() }
####至此完成了DAGScheduler提交TaskSet到TaskSchuduler#####
Job的生成:
一旦driver程序中出現action,就會生成一個job,好比:count等,向DAGScheduler提交job;若是driver程序後面還有action,那麼其餘action也會對應生成相應的job;
因此:driver有多少個action就會生成多少個job。爲何spark將driver程序稱爲application而不是job的緣由,估計就是這吧。
每個job可能會包含多個stage,最後一個stage產生result。在提交job過程當中,DAGScheduler會首先劃分stage,而後先提交無parent stage的stages,並在提交過程當中計算該stage的task數目和類型,並提交具體的task;無parent stage的stage提交完後,依賴該stage的stage才能提交。