Spark任務調度-Stage的劃分與提交

本文介紹Spark任務調度框架中Stage的原理,並分析其實現機制。網絡

Stage的基本概念

用戶提交的計算任務是一個由RDD依賴構成的DAG,Spark會把RDD的依賴以shuffle依賴爲邊界劃分紅多個Stage,這些Stage之間也相互依賴,造成了Stage的DAG。而後,DAGScheduler會按依賴關係順序執行這些Stage。併發

要是把RDD依賴構成的DAG當作是邏輯執行計劃(logic plan),那麼,能夠把Stage當作物理執行計劃,爲了更好的理解這個概念,咱們來看一個例子。框架

下面的代碼用來對README.md文件中包含整數值的單詞進行計數,並打印RDD之間的依賴關係(Lineage):ide

 scala> val counts = sc.textFile("README.md")
 .flatMap(x=>x.split("\\W+"))
 .filter(_.matches(".*\\d.*"))
 .map(x=>(x,1))
 .reduceByKey(_+_)
 // 調用一個action函數,用來觸發任務的提交和執行
 scala> counts.collect()
 
 // 打印RDD的依賴關係(Lineage)
 scala> counts.toDebugString
 res7String =
 (2ShuffledRDD[17at reduceByKey at <console>:24 []
  +-(2MapPartitionsRDD[16at map at <console>:24 []
     |  MapPartitionsRDD[15at filter at <console>:24 []
     |  MapPartitionsRDD[14at flatMap at <console>:24 []
     |  README.md MapPartitionsRDD[13at textFile at <console>:24[]
     |  README.md HadoopRDD[12at textFile at <console>:24 []

如圖1所示,從圖中能夠看到從RDD依賴關係到Stage的轉換:函數

image.png

圖1 RDD依賴關係到Stage的轉換
oop

經過Spark UI的DAG Visualization頁面能夠證明這一點:post

image.png

圖2 Spark UI的Stage查看
性能

從圖1和圖2中能夠看到,因爲flatMap、filter、map這些操做都不會產生shuffle,因此被劃分到了一個Stage中,而reduceByKey會產生shuffle操做,因此被劃分到了一個新的Stage中。spa

從概念上講,Stage是一個並行執行的計算任務集,該任務集中的每一個計算任務都會有相同的shuffle依賴,並會執行相同的處理函數,它們做爲Spark Job執行的一部分。scala

Stage的劃分

從上面的分析可知,DAGScheduler會根據RDD的依賴關係來劃分Stage。那麼,劃分Stage的依據是什麼呢?實際上,Spark是根據RDD的窄依賴和寬依賴來劃分Stage的。

在《RDD轉換操做原理》一節中分析過:在計算RDD時,寬依賴會產生Shuffle。爲了最大程度提升計算性能,減小網絡數據傳輸,Spark會把窄依賴劃分到一個Stage,直到出現了寬依賴。

也就是說,每一個Stage包含一系列RDD的窄轉換(narrow transformations),這些轉換操做能夠在不進行Shuffling的狀況下完成計算,這些Stage在Shuffle的邊界(例如:shuffle發生的地方)處被分開,所以,能夠說Stage是RDD依賴關係(RDD Lineage)在Shuffle階段分裂的結果。

在每一個Stage中,RDD的窄轉換(例如:map()或filter()等)操做造成流水線(pipeline),中間結果能夠保存在內存中,以便加快計算性能,這些任務造成一個任務集(TaskSet),可是Shuffle操做卻須要依賴一個或多個Stage。

image.png圖3 Stage的劃分示意圖

圖3是一個如何根據RDD的依賴來劃分Stage的示意圖。爲了更好的理解Stage的劃分,下面分析一下圖3的Stage的劃分過程:

從rddF開始,rddF的計算依賴rddE,因爲他們是窄依賴(map操做),不會產生Shuffle。因此rddF和rddE會被劃分到一個Stage中,咱們繼續看rddE對其父RDD的依賴。

rddE依賴於兩個RDD:rddB和rddD。因爲rddE對這兩個RDD的依賴都是寬依賴(intersection操做會產生Shuffle),此時就要做爲Stage的分界點,因此會把rddE和rddF劃分到一個Stage中(即:Stage 4)。

而因爲rddE和rddB,rddD都是寬依賴,因此將不會和rddE劃分到一個Stage中。也就是說,rddD和rddB分別在一個新的Stage中。下面繼續分析rddB和rddD的依賴。

rddB依賴rddA,而rddB和rddA是寬依賴,由於在計算rddB時會調用rddA的reduceByKey函數,此時可能會產生Shuffle,因此,rddB會被單獨劃分紅一個Stage(即:Stage3),而rddA不依賴任何其餘的RDD,也會被劃分到一個新的Stage中(即:Stage1)。

在看rddD,因爲rddD是rddC經過map轉換獲得的,這個過程沒有Shuffle產生,因此會把rddD和rddC的依賴劃分到一個Stage中,而rddC再也不和其餘的RDD產生依賴,因此,rddC和rddD的依賴也就產生了Stage2。

另外,從圖3中能夠看出,Stage1和Stage2是相互獨立的,能夠併發執行,而Stage3必需要等到Stage1完成後才能執行,而Stage4必需要Stage3和Stage2都完成後才能執行。

Stage的分類和實現

在Spark中有兩類Stage:

  • ResultStage:執行action操做的函數,獲得最終的結果。

  • ShuffleMapStage:用來計算RDD中間結果。

這兩類Stage分別由ResultStage和ShuffleMapStage類來實現,另外,這兩種類型的Stage須要遵循Stage的實現合約(Stage抽象類)。類圖關係以下:

image.png

Stage抽象類聲明以下:

 private[scheduler] abstract class Stage

爲了可以更好的理解後面的兩種具體的Stage,下面對Stage的抽象類中重要的成員作一個說明:

成員名 說明
Id Stage的惟一標識。這是一個整數,每一個Stage對象都是惟一的。
rdd 運行該Stage的RDD。
numTasks 該Stage中的任務總數。要注意的是:有些ResultStages可能不須要計算全部分區,例如:first(),lookup(),take()等。
parents 這是一個List[Stage],它是該Stage依賴的Stage列表。
firstJobId 對於FIFO調度,此變量是此Stage屬於的第一個Job的ID。
numPartitions RDD的分區數。
jobIds 該Stage屬於的Job集。
findMissingPartitions 返回須要計算(missing)但還沒計算的分區id集合

ResultStage

ResultStage是Job的最後一個Stage,該Stage是基於執行action函數的rdd來建立的。該Stage用來計算一個action操做的結果。該類的聲明以下:

 private[sparkclass ResultStage(
     idInt,
     rddRDD[_],
     val func: (TaskContextIterator[_]) => _,
     val partitionsArray[Int],
     parentsList[Stage], //依賴的父Stage
     firstJobIdInt,
     callSiteCallSite)
   extends Stage(idrddpartitions.lengthparentsfirstJobId,callSite) {

爲了計算action操做的結果,ResultStage會在目標RDD的一個或多個分區上使用函數:func。須要計算的分區id集合保存在成員變量:partitions中。但對於有些action操做,好比:first(),take()等,函數:func可能不會在全部分區上使用。

另外,在提交Job時,會先建立ResultStage。但在提交Stage時,會先遞歸找到該Stage依賴的父級Stage,並先提交父級Stage。以下圖所示:

image.png

ShuffleMapStage

ShuffleMapStages是在DAG執行過程當中產生的Stage,用來爲Shuffle產生數據。ShuffleMapStages發生在每一個Shuffle操做以前,在Shuffle以前可能有多個窄轉換操做,好比:map,filter,這些操做能夠造成流水線(pipeline)。當執行ShuffleMapStages時,會產生Map的輸出文件,這些文件會被隨後的Reduce任務使用。

ShuffleMapStages也能夠做爲Jobs,經過DAGScheduler.submitMapStage函數單獨進行提交。對於這樣的Stages,會在變量mapStageJobs中跟蹤提交它們的ActiveJobs。要注意的是,可能有多個ActiveJob嘗試計算相同的ShuffleMapStages。

它爲一個shuffle過程產生map操做的輸出文件。它也多是自適應查詢規劃/自適應調度工做的最後階段。以下圖所示:

image.png

Stage提交的實現

經過前面的分析咱們知道,RDD的每一個Action操做都會提交一個Job,而DAGScheduler會根據RDD的依賴關係把這個Job劃分紅一個或多個相互依賴的Stage,從而造成一個DAG。而後,根據Stage的DAG來執行Stage。那麼,這個過程是如何實現的呢?下面就經過代碼層面來分析一下實現過程:

Stage提交流程的函數調用順序

 RDD.count()// 經過RDD的action操做來提交Job
 SparkContext#runJob(...)  // 調用SparkContext中的runJob函數
 DAGScheduler#runJob(...)  // 調用DAGScheduler中的runJob函數
 DAGScheduler#submitJob(...) // 提交Job。其實是向事件總線提交JobSubmitted事件
 DAGSchedulerEventProcessLoop#post(JobSubmitted(...)) //發送Job提交事件
 DAGSchedulerEventProcessLoop#doOnReceive(event:DAGSchedulerEvent//獲取DAGSchedulerEvent
 DAGScheduler#handleJobSubmitted(...)  //處理Job提交事件
 DAGScheduler#submitStage(stageStage)  //提交Stage

提交過程的代碼實現

從以上的函數調用流程來看,事件的處理是在handleJobSubmitted函數中進行的,咱們來看一下該函數是如何處理Job提交事件的。代碼以下(省去不相關代碼):

 private[schedulerdef handleJobSubmitted(...) {
     var finalStageResultStage = null
  ...
     // 首先,建立ResultStage和他依賴的Stage,造成一個Stage的DAG
     finalStage = createResultStage(finalRDDfuncpartitionsjobId,callSite)
    ...
     // 提交finalStage
     submitStage(finalStage)  
 }

Stage DAG的構建

Stage(包括父Stage)的建立,都是在createResultStage函數中完成的。下面分析createResultStage的實現,該函數的代碼以下:

   private def createResultStage(...): ResultStage = {
    ...
     // 遞歸建立Stage的父Stage,及其祖先Stage,最終造成Stage的DAG
     val parents = getOrCreateParentStages(rddjobId)
     val id = nextStageId.getAndIncrement()
     // 建立ResultStage
     val stage = new ResultStage(idrddfuncpartitionsparents,jobIdcallSite)
 ...
     stage
  }

createResultStage函數的基本邏輯以下圖所示:

image.png

如上圖所示:createResultStage函數建立Stage的過程是經過遞歸調用實現的。

在getOrCreateParentStages函數中,會基於stage的rdd的依賴關係向上查找其父rdd,如果窄依賴則繼續向上查找;如果寬依賴(shuffle依賴)則會調用getOrCreateShuffleMapStage函數來建立一個ShuffleMapStage。

Stage的提交

經過函數createResultStage建立了一個Stage的DAG,並能夠經過finalStage(它是一個ResultStage)來獲取Stage的依賴關係(DAG)。執行到這裏全部的Stage(包括Stage的依賴關係)就建立完成了,此時就能夠開始提交Stage了。

Stage的提交時經過submitStage函數來實現的。該函數的主要實現邏輯以下圖所示:

image.png

可見在提交Stage時,也是經過遞歸提交最早依賴的Stage,最後提交ResultStage。其實現代碼以下:

 /** Submits stage, but first recursively submits any missing parents. */
   private def submitStage(stageStage) {
     val jobId = activeJobForStage(stage)
     
     if (jobId.isDefined) {
      ...
       if (!waitingStages(stage&& !runningStages(stage&&!failedStages(stage)) {
         // 查找並獲取依賴的父Stage
         val missing = getMissingParentStages(stage).sortBy(_.id)
        ...
         if (missing.isEmpty) {
            ...
           // 已經找到所有的依賴Stage並已提交,最後提交最後一個Stage
           submitMissingTasks(stagejobId.get)
        } else {
           // 先提交依賴的父Stage
           for (parent <- missing) {
             submitStage(parent)
          }
           waitingStages += stage
        }
      }
    } else {
       abortStage(stage"No active job for stage " + stage.idNone)
    }
  }

Stage和RDD

在建立Stage時,只會記錄以shuffle依賴爲邊界的最後一個RDD。以下:

  Stage(
    val id: Int,
    val rdd: RDD[_], // rdd是每一個Stage的最後一個RDD
    ...
  )

在建立任務時,會調用該RDD的計算函數,因爲在每一個Stage中RDD都是相互依賴的,並且同一個Stage中的RDD都是窄依賴,這意味着同一個Stage中RDD的分區計算的中間過程能夠造成pipeline(不須要持久化到磁盤)。因此,當計算某個Stage中的最後一個RDD分區數據時,會根據依賴關係計算其依賴的父RDD的分區數據,而且會以pipeline的方式執行。

總結

本文說明了Stage的實現原理,並對Stage的提交過程進行了分析。

c

相關文章
相關標籤/搜索