本文介紹Spark任務調度框架中Stage的原理,並分析其實現機制。網絡
用戶提交的計算任務是一個由RDD依賴構成的DAG,Spark會把RDD的依賴以shuffle依賴爲邊界劃分紅多個Stage,這些Stage之間也相互依賴,造成了Stage的DAG。而後,DAGScheduler會按依賴關係順序執行這些Stage。併發
要是把RDD依賴構成的DAG當作是邏輯執行計劃(logic plan),那麼,能夠把Stage當作物理執行計劃,爲了更好的理解這個概念,咱們來看一個例子。框架
下面的代碼用來對README.md文件中包含整數值的單詞進行計數,並打印RDD之間的依賴關係(Lineage):ide
scala> val counts = sc.textFile("README.md")
.flatMap(x=>x.split("\\W+"))
.filter(_.matches(".*\\d.*"))
.map(x=>(x,1))
.reduceByKey(_+_)
// 調用一個action函數,用來觸發任務的提交和執行
scala> counts.collect()
// 打印RDD的依賴關係(Lineage)
scala> counts.toDebugString
res7: String =
(2) ShuffledRDD[17] at reduceByKey at <console>:24 []
+-(2) MapPartitionsRDD[16] at map at <console>:24 []
| MapPartitionsRDD[15] at filter at <console>:24 []
| MapPartitionsRDD[14] at flatMap at <console>:24 []
| README.md MapPartitionsRDD[13] at textFile at <console>:24[]
| README.md HadoopRDD[12] at textFile at <console>:24 []
如圖1所示,從圖中能夠看到從RDD依賴關係到Stage的轉換:函數
圖1 RDD依賴關係到Stage的轉換
oop
經過Spark UI的DAG Visualization頁面能夠證明這一點:post
圖2 Spark UI的Stage查看
性能
從圖1和圖2中能夠看到,因爲flatMap、filter、map這些操做都不會產生shuffle,因此被劃分到了一個Stage中,而reduceByKey會產生shuffle操做,因此被劃分到了一個新的Stage中。spa
從概念上講,Stage是一個並行執行的計算任務集,該任務集中的每一個計算任務都會有相同的shuffle依賴,並會執行相同的處理函數,它們做爲Spark Job執行的一部分。scala
從上面的分析可知,DAGScheduler會根據RDD的依賴關係來劃分Stage。那麼,劃分Stage的依據是什麼呢?實際上,Spark是根據RDD的窄依賴和寬依賴來劃分Stage的。
在《RDD轉換操做原理》一節中分析過:在計算RDD時,寬依賴會產生Shuffle。爲了最大程度提升計算性能,減小網絡數據傳輸,Spark會把窄依賴劃分到一個Stage,直到出現了寬依賴。
也就是說,每一個Stage包含一系列RDD的窄轉換(narrow transformations),這些轉換操做能夠在不進行Shuffling的狀況下完成計算,這些Stage在Shuffle的邊界(例如:shuffle發生的地方)處被分開,所以,能夠說Stage是RDD依賴關係(RDD Lineage)在Shuffle階段分裂的結果。
在每一個Stage中,RDD的窄轉換(例如:map()或filter()等)操做造成流水線(pipeline),中間結果能夠保存在內存中,以便加快計算性能,這些任務造成一個任務集(TaskSet),可是Shuffle操做卻須要依賴一個或多個Stage。
圖3 Stage的劃分示意圖
圖3是一個如何根據RDD的依賴來劃分Stage的示意圖。爲了更好的理解Stage的劃分,下面分析一下圖3的Stage的劃分過程:
從rddF開始,rddF的計算依賴rddE,因爲他們是窄依賴(map操做),不會產生Shuffle。因此rddF和rddE會被劃分到一個Stage中,咱們繼續看rddE對其父RDD的依賴。
rddE依賴於兩個RDD:rddB和rddD。因爲rddE對這兩個RDD的依賴都是寬依賴(intersection操做會產生Shuffle),此時就要做爲Stage的分界點,因此會把rddE和rddF劃分到一個Stage中(即:Stage 4)。
而因爲rddE和rddB,rddD都是寬依賴,因此將不會和rddE劃分到一個Stage中。也就是說,rddD和rddB分別在一個新的Stage中。下面繼續分析rddB和rddD的依賴。
rddB依賴rddA,而rddB和rddA是寬依賴,由於在計算rddB時會調用rddA的reduceByKey函數,此時可能會產生Shuffle,因此,rddB會被單獨劃分紅一個Stage(即:Stage3),而rddA不依賴任何其餘的RDD,也會被劃分到一個新的Stage中(即:Stage1)。
在看rddD,因爲rddD是rddC經過map轉換獲得的,這個過程沒有Shuffle產生,因此會把rddD和rddC的依賴劃分到一個Stage中,而rddC再也不和其餘的RDD產生依賴,因此,rddC和rddD的依賴也就產生了Stage2。
另外,從圖3中能夠看出,Stage1和Stage2是相互獨立的,能夠併發執行,而Stage3必需要等到Stage1完成後才能執行,而Stage4必需要Stage3和Stage2都完成後才能執行。
在Spark中有兩類Stage:
ResultStage:執行action操做的函數,獲得最終的結果。
ShuffleMapStage:用來計算RDD中間結果。
這兩類Stage分別由ResultStage和ShuffleMapStage類來實現,另外,這兩種類型的Stage須要遵循Stage的實現合約(Stage抽象類)。類圖關係以下:
Stage抽象類聲明以下:
private[scheduler] abstract class Stage
爲了可以更好的理解後面的兩種具體的Stage,下面對Stage的抽象類中重要的成員作一個說明:
成員名 | 說明 |
---|---|
Id | Stage的惟一標識。這是一個整數,每一個Stage對象都是惟一的。 |
rdd | 運行該Stage的RDD。 |
numTasks | 該Stage中的任務總數。要注意的是:有些ResultStages可能不須要計算全部分區,例如:first(),lookup(),take()等。 |
parents | 這是一個List[Stage],它是該Stage依賴的Stage列表。 |
firstJobId | 對於FIFO調度,此變量是此Stage屬於的第一個Job的ID。 |
numPartitions | RDD的分區數。 |
jobIds | 該Stage屬於的Job集。 |
findMissingPartitions | 返回須要計算(missing)但還沒計算的分區id集合 |
ResultStage是Job的最後一個Stage,該Stage是基於執行action函數的rdd來建立的。該Stage用來計算一個action操做的結果。該類的聲明以下:
private[spark] class ResultStage(
id: Int,
rdd: RDD[_],
val func: (TaskContext, Iterator[_]) => _,
val partitions: Array[Int],
parents: List[Stage], //依賴的父Stage
firstJobId: Int,
callSite: CallSite)
extends Stage(id, rdd, partitions.length, parents, firstJobId,callSite) {
爲了計算action操做的結果,ResultStage會在目標RDD的一個或多個分區上使用函數:func
。須要計算的分區id集合保存在成員變量:partitions
中。但對於有些action操做,好比:first(),take()等,函數:func
可能不會在全部分區上使用。
另外,在提交Job時,會先建立ResultStage。但在提交Stage時,會先遞歸找到該Stage依賴的父級Stage,並先提交父級Stage。以下圖所示:
ShuffleMapStages是在DAG執行過程當中產生的Stage,用來爲Shuffle產生數據。ShuffleMapStages發生在每一個Shuffle操做以前,在Shuffle以前可能有多個窄轉換操做,好比:map,filter,這些操做能夠造成流水線(pipeline)。當執行ShuffleMapStages時,會產生Map的輸出文件,這些文件會被隨後的Reduce任務使用。
ShuffleMapStages也能夠做爲Jobs,經過DAGScheduler.submitMapStage函數單獨進行提交。對於這樣的Stages,會在變量mapStageJobs
中跟蹤提交它們的ActiveJobs。要注意的是,可能有多個ActiveJob嘗試計算相同的ShuffleMapStages。
它爲一個shuffle過程產生map操做的輸出文件。它也多是自適應查詢規劃/自適應調度工做的最後階段。以下圖所示:
經過前面的分析咱們知道,RDD的每一個Action操做都會提交一個Job,而DAGScheduler會根據RDD的依賴關係把這個Job劃分紅一個或多個相互依賴的Stage,從而造成一個DAG。而後,根據Stage的DAG來執行Stage。那麼,這個過程是如何實現的呢?下面就經過代碼層面來分析一下實現過程:
RDD.count()// 經過RDD的action操做來提交Job
SparkContext#runJob(...) // 調用SparkContext中的runJob函數
DAGScheduler#runJob(...) // 調用DAGScheduler中的runJob函數
DAGScheduler#submitJob(...) // 提交Job。其實是向事件總線提交JobSubmitted事件
DAGSchedulerEventProcessLoop#post(JobSubmitted(...)) //發送Job提交事件
DAGSchedulerEventProcessLoop#doOnReceive(event:DAGSchedulerEvent) //獲取DAGSchedulerEvent
DAGScheduler#handleJobSubmitted(...) //處理Job提交事件
DAGScheduler#submitStage(stage: Stage) //提交Stage
從以上的函數調用流程來看,事件的處理是在handleJobSubmitted函數中進行的,咱們來看一下該函數是如何處理Job提交事件的。代碼以下(省去不相關代碼):
private[scheduler] def handleJobSubmitted(...) {
var finalStage: ResultStage = null
...
// 首先,建立ResultStage和他依賴的Stage,造成一個Stage的DAG
finalStage = createResultStage(finalRDD, func, partitions, jobId,callSite)
...
// 提交finalStage
submitStage(finalStage)
}
Stage(包括父Stage)的建立,都是在createResultStage函數中完成的。下面分析createResultStage的實現,該函數的代碼以下:
private def createResultStage(...): ResultStage = {
...
// 遞歸建立Stage的父Stage,及其祖先Stage,最終造成Stage的DAG
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
// 建立ResultStage
val stage = new ResultStage(id, rdd, func, partitions, parents,jobId, callSite)
...
stage
}
createResultStage函數的基本邏輯以下圖所示:
如上圖所示:createResultStage函數建立Stage的過程是經過遞歸調用實現的。
在getOrCreateParentStages函數中,會基於stage的rdd的依賴關係向上查找其父rdd,如果窄依賴則繼續向上查找;如果寬依賴(shuffle依賴)則會調用getOrCreateShuffleMapStage函數來建立一個ShuffleMapStage。
經過函數createResultStage建立了一個Stage的DAG,並能夠經過finalStage(它是一個ResultStage)來獲取Stage的依賴關係(DAG)。執行到這裏全部的Stage(包括Stage的依賴關係)就建立完成了,此時就能夠開始提交Stage了。
Stage的提交時經過submitStage
函數來實現的。該函數的主要實現邏輯以下圖所示:
可見在提交Stage時,也是經過遞歸提交最早依賴的Stage,最後提交ResultStage。其實現代碼以下:
/** Submits stage, but first recursively submits any missing parents. */
private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
...
if (!waitingStages(stage) && !runningStages(stage) &&!failedStages(stage)) {
// 查找並獲取依賴的父Stage
val missing = getMissingParentStages(stage).sortBy(_.id)
...
if (missing.isEmpty) {
...
// 已經找到所有的依賴Stage並已提交,最後提交最後一個Stage
submitMissingTasks(stage, jobId.get)
} else {
// 先提交依賴的父Stage
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id, None)
}
}
在建立Stage時,只會記錄以shuffle依賴爲邊界的最後一個RDD。以下:
Stage(
val id: Int,
val rdd: RDD[_], // rdd是每一個Stage的最後一個RDD
...
)
在建立任務時,會調用該RDD的計算函數,因爲在每一個Stage中RDD都是相互依賴的,並且同一個Stage中的RDD都是窄依賴,這意味着同一個Stage中RDD的分區計算的中間過程能夠造成pipeline(不須要持久化到磁盤)。因此,當計算某個Stage中的最後一個RDD分區數據時,會根據依賴關係計算其依賴的父RDD的分區數據,而且會以pipeline的方式執行。
本文說明了Stage的實現原理,並對Stage的提交過程進行了分析。
c