Spark分析之DAGScheduler

DAGScheduler概述:是一個面向Stage層面的調度器;數組

主要入參有:緩存

dagScheduler.runJob(rddcleanedFunc, partitions, callSite, allowLocal,resultHandler, localProperties.get)app

rdd: final RDD;函數

cleanedFunc: 計算每一個分區的函數;spa

resultHander: 結果偵聽器;code

 

主要功能以下:blog

一、接收用戶提交的job;遞歸

二、將job根據類型劃分爲不一樣的stage,記錄哪些RDD、Stage被物化,並在每個stage內產生一系列的task,並封裝成TaskSet;ci

三、決定每一個Task的最佳位置(任務在數據所在的節點上運行),並結合當前的緩存狀況;將TaskSet提交給TaskScheduler;rem

四、從新提交Shuffle輸出丟失的Stage給TaskScheduler;

  注:一個Stage內部的錯誤不是由shuffle輸出丟失形成的,DAGScheduler是無論的,由TaskScheduler負責嘗試從新提交task執行;

 

以以下示例描述Job提交過程:

val sc = new SparkContext("local[2]", "WordCount", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR")))
val textFile = sc.textFile("xxx")
val result = textFile.flatMap(line => line.split("\t")).map(word => (word, 1)).reduceByKey(_ + _)
result.collect

RDD.collect

  ==>sc.runJob                  #####至此完成了將RDD提交DAGScheduler#####

    val results = new Array[U](partitions.size) //result存放的是返回值,數組大小爲最後一個RDD的partition的個數

    ==>dagScheduler.runJob(rdd, func, partitions, resultHandler......)     //DAGScheduler的輸入:RDD and partitions to compute

      ==>dagScheduler.submitJob

        ==>eventProcessActor ! JobSubmitted

def receive = {
    case JobSubmitted(jobId, rdd, func, partitions, allowLocal...) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal...)
}


//完成job到stage的轉換,生成finalStage並提交
private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      allowLocal: Boolean...){
     //注意:該RDD是final RDD,而不是一系列的RDD,用finalRDD來建立finalStage
     //newStage操做對應會生成新的result stage或者shuffle stage:內部有一個isShuffleMap變量來標識該stage是shuffle or result
     var finalStage: Stage = newStage(rdd, partitions.size, None, jobId, Some(callSite))
 
    //使用finalStage來構建job
    val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)

    //對於簡單的job,沒有依賴關係而且只有一個partition,該類job會使用local thread處理而並不是提交到TaskScheduler上處理
    if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) {
        runLocally(job)
    } else {
        submitStage(finalStage) //提交finalStage
    }
}

handleJobSubmitted方法完成了job到stage的轉換,生成finalStage;每一個job都有一個finalStage。

 

newStage()方法分析:根據finalRDD生成finalStage

private def newStage(
      rdd: RDD[_],  numTasks: Int,     //task個數就是partitions個數
      shuffleDep: Option[ShuffleDependency[_,_]],
      jobId: Int, callSite: Option[String] = None) : Stage = {
    val id = nextStageId.getAndIncrement() 
    val stage = new Stage(id, rdd, numTasks, shuffleDep, getParentStages(rdd, jobId), jobId, callSite)
   ......
}


private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {
    val parents = new HashSet[Stage]
    val visited = new HashSet[RDD[_]]
    def visit(r: RDD[_]) {
      if (!visited(r)) {
        visited += r
        for (dep <- r.dependencies) {
          dep match {
            case shufDep: ShuffleDependency[_,_] =>
              parents += getShuffleMapStage(shufDep, jobId)
            case _ =>
              visit(dep.rdd)
          }
        }
      }
    }
    visit(rdd)
    parents.toList
}

private def getShuffleMapStage(shuffleDep: ShuffleDependency[_,_], jobId: Int): Stage = {
    shuffleToMapStage.get(shuffleDep.shuffleId) match {
      case Some(stage) => stage
      case None =>
        val stage =
          newOrUsedStage(shuffleDep.rdd, shuffleDep.rdd.partitions.size, shuffleDep, jobId)
        shuffleToMapStage(shuffleDep.shuffleId) = stage
        stage
    }

newStage()後產生的finalStage中已經包含了該stage的全部依賴的父Stage;

經過getParentStages()方法構建該stage的依賴關係;反向visit RDD DAG圖,遇到窄依賴就將依賴的RDD加入到stage,遇到寬依賴就切開並遞歸寬依賴的stage;

生成stage實例,stage的id經過nextStageId的值加一獲得,task的個數就是partitions的個數;

有兩種類型的Stage:ShuffleStage和ResultStage;

Stage內部有一個isShuffleMap變量標識該Stage是shuffle仍是result類型;

Spark對stage的劃分是按照寬依賴來進行區分的:根據RDD的依賴關係,若是遇到寬依賴則建立ShuffleStage;

 

submitStage()方法分析:計算stage之間的依賴關係(Stage DAG)並對依賴關係進行處理

private def submitStage(stage: Stage) { 
 if (!waiting(stage) && !running(stage) && !failed(stage)) {
  val missing = getMissingParentStages(stage).sortBy(_.id)  //根據final stage發現是否有parent stage
  if (missing == Nil) { // 若是計算中發現當前的stage沒有任何依賴或者全部的依賴都已經準備完畢,則提交task
 submitMissingTasks(stage, jobId.get)
   running += stage //設置當前的stage爲running,由於當前的stage沒有未處理完的依賴的stage
  } else { //若是有parent stage,須要先submit parent, 由於stage之間須要順序執行
   for (parent <- missing) {
    submitStage(parent)
   }
   waiting += stage   //當前stage放入到waiting列表中,表示該stage須要等待parent先執行完成
  }
 }
}

//根據final stage的parents找出全部的parent stage
private def getMissingParentStages(stage: Stage): List[Stage] = { 
 ......
 dep match {
  //若是是ShuffleDependency,則新建一個shuffle map stage,且該stage是可用的話則加入missing中
  case shufDep: ShuffleDependency[_,_] =>  //ShuffleDependecy
   val mapStage = getShuffleMapStage(shufDep, stage.jobId)
   if (!mapStage.isAvailable) {
    missing += mapStage
   }
  case narrowDep: NarrowDependency[_] =>  //NarrowDependecy
   visit(narrowDep.rdd)
 }
}

getMissParentStages(stage)處理步驟:

一、根據該stage獲得該stage的parent,也就是RDD的依賴關係,生成parentStage是經過RDD的dependencies;

二、若是依賴關係是寬依賴,則生成一個mapStage來做爲finalStage的parent;也就是說對於須要shuffle操做的job,會生成mapStage和finalStage進行處理

三、若是依賴關係是窄依賴,不會生成新的stage。也就是說對於不須要shuffle的job只須要一個finalStage;

注意:getMissParentStages(stage)獲得的結果集是按照stageid的降序排列的

 

submitStage()處理步驟:

一、計算該stage的getMissParentStages(),若是當前stage沒有任何依賴或者全部的依賴都已執行完,則提交該stage;

二、若是發現該stage有依賴的stage未執行,則先執行完全部依賴的父stage(根據getMissParentStages()方法獲得的結果集降序來執行stage);

 

submitMissingTasks()方法分析:把stage根據parition拆分紅task(決定每一個Task的最佳位置)生成TaskSet,並提交到TaskScheduler

private def submitMissingTasks(stage: Stage, jobId: Int) {
 //首先根據stage所依賴的RDD的partition的分佈,會產生出與partition數量相等的task
 var tasks = ArrayBuffer[Task[_]]()

 //對於finalStage或是mapStage會產生不一樣的task。
 //檢查該stage時是否ShuffleMap,若是是則生成ShuffleMapTask
 if (stage.isShuffleMap) { //mapStage:表示還有其餘stage依賴此stage
  for (p <- 0 until stage.numPartitions if stage.outputLocs(p) == Nil) {
   //task根據partition的locality進行分佈
   val locs = getPreferredLocs(stage.rdd, p)
   tasks += new ShuffleMapTask(stage.id, stage.rdd, stage.shuffleDep.get, p, locs)
  }
 } else { //finalStage:該類型stage直接輸出結果生成ResultTask
  val job = resultStageToJob(stage)
  for (id <- 0 until job.numPartitions if !job.finished(id)) {
   val partition = job.partitions(id)
   val locs = getPreferredLocs(stage.rdd, partition)
   //因爲是ResultTask,所以須要傳入定義的func,也就是若是處理結果返回
   tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id)
  }
 }
 //向TaskSchuduler提交任務,以stage爲單位,一個stage對應一個TaskSet
 taskSched.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
}

 submitMissingTask()方法的處理步驟:

一、經過stage.isShuffleMap來決定生成的是ShuffleMapTask仍是ResultTask;

二、若是是ShuffleMapTask則根據stage所依賴的RDD的partition分佈,產生和partition數量相同的task,這些task根據partition的locality進行分佈’

三、把stage對應生成全部的task封裝到一個TaskSet中,提交給TaskScheduler的submitTasks()方法進行調度;

 

從新提交shuffle輸出丟失的stage

case ResubmitFailedStages =>
      dagScheduler.resubmitFailedStages()

private[scheduler] def resubmitFailedStages() {
    if (failedStages.size > 0) {
      logInfo("Resubmitting failed stages")
      clearCacheLocs()
      val failedStagesCopy = failedStages.toArray
      failedStages.clear()
      for (stage <- failedStagesCopy.sortBy(_.jobId)) {
        submitStage(stage)
      }
    }
    submitWaitingStages()
}

####至此完成了DAGScheduler提交TaskSet到TaskSchuduler#####

 

Job的生成:

一旦driver程序中出現action,就會生成一個job,好比:count等,向DAGScheduler提交job;若是driver程序後面還有action,那麼其餘action也會對應生成相應的job;

因此:driver有多少個action就會生成多少個job。爲何spark將driver程序稱爲application而不是job的緣由,估計就是這吧。

每個job可能會包含多個stage,最後一個stage產生result。在提交job過程當中,DAGScheduler會首先劃分stage,而後先提交無parent stage的stages,並在提交過程當中計算該stage的task數目和類型,並提交具體的task;無parent stage的stage提交完後,依賴該stage的stage才能提交。

相關文章
相關標籤/搜索