依賴關係
基本概念
RDD的依賴關係有一種相似於上下文之間的聯繫,這種關係也是存在於各個RDD算子間的,相鄰兩個RDD間的關係被稱做依賴關係,多個連續的RDD之間的關係,被稱做血緣關係。
每一個RDD都會保存血緣關係,就像是知道本身的父親是誰,本身的父親的父親是誰同樣。 緩存
RDD不會保存數據,所以當一個算子出錯的時候,爲了可以提升容錯性,須要經過算子間的依賴關係找到數據源頭,再按順序執行,從而從新讀取計算。ide
def main(args: Array[String]): Unit = { val sparConf = new SparkConf().setMaster("local").setAppName("WordCount") val sc = new SparkContext(sparConf) val lines: RDD[String] = sc.makeRDD(List("hello world","hello spark")) println(lines.toDebugString) println("*************************") val words: RDD[String] = lines.flatMap(_.split(" ")) println(words.toDebugString) println("*************************") val wordToOne = words.map(word=>(word,1)) println(wordToOne.toDebugString) println("*************************") val wordToSum: RDD[(String, Int)] = wordToOne.reduceByKey(_+_) println(wordToSum.toDebugString) println("*************************") val array: Array[(String, Int)] = wordToSum.collect() array.foreach(println) sc.stop() }
輸出的血緣關係日誌以下:this
(1) ParallelCollectionRDD[0] at makeRDD at RDD_Dependence_01.scala:13 [] ************************* (1) MapPartitionsRDD[1] at flatMap at RDD_Dependence_01.scala:16 [] | ParallelCollectionRDD[0] at makeRDD at RDD_Dependence_01.scala:13 [] ************************* (1) MapPartitionsRDD[2] at map at RDD_Dependence_01.scala:19 [] | MapPartitionsRDD[1] at flatMap at RDD_Dependence_01.scala:16 [] | ParallelCollectionRDD[0] at makeRDD at RDD_Dependence_01.scala:13 [] ************************* (1) ShuffledRDD[3] at reduceByKey at RDD_Dependence_01.scala:22 [] +-(1) MapPartitionsRDD[2] at map at RDD_Dependence_01.scala:19 [] | MapPartitionsRDD[1] at flatMap at RDD_Dependence_01.scala:16 [] | ParallelCollectionRDD[0] at makeRDD at RDD_Dependence_01.scala:13 [] *************************
寬依賴和窄依賴
窄依賴
窄依賴指的是父RDD的分區數據只提供給一個對應的子RDD的分區spa
寬依賴
寬依賴指的是父RDD的分區數據提供給多個對應的子RDD的分區,當父RDD有Shuffle操做的時候,父RDD與子RDD的依賴關係一定是寬依賴,所以其也被稱爲Shuffle依賴。scala
階段劃分
DAG(Directed Acyclic Graph)有向無環圖是由點和線組成的拓撲圖形,該圖形具備方向, 不會閉環。例如,DAG 記錄了 RDD 的轉換過程和任務的階段。3d
DAGScheduler部分源碼解釋了任務的階段劃分過程:日誌
- 在handleJobSubmitted方法有一個傳入參數爲finalRDD,經過
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
方法,能夠看出不管有多少個RDD,都會默認經過最終的RDD去建立一個resultStage。 - 以後createResultStage調用了
getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage]
方法,經過getShuffleDependencies( rdd: RDD[_])
返回依賴關係的鏈式結構(ShuffleDependency的存儲map),如: A <-- B <-- C - 遍歷ShuffleDependency的存儲map,經過
getOrCreateShuffleMapStage(shuffleDep, firstJobId)
去建立階段,這裏經過firstJobId去作關聯,緩存的stage在shuffleIdToMapStage中。
/** * Create a ResultStage associated with the provided jobId. */ private def createResultStage( rdd: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], jobId: Int, callSite: CallSite): ResultStage = { checkBarrierStageWithDynamicAllocation(rdd) checkBarrierStageWithNumSlots(rdd) checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size) val parents = getOrCreateParentStages(rdd, jobId) //這裏調用 val id = nextStageId.getAndIncrement() val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite) stageIdToStage(id) = stage updateJobIdStageIdMaps(jobId, stage) stage } /** * Get or create the list of parent stages for a given RDD. The new Stages will be created with * the provided firstJobId. */ private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = { getShuffleDependencies(rdd).map { shuffleDep => getOrCreateShuffleMapStage(shuffleDep, firstJobId) }.toList } /** * Returns shuffle dependencies that are immediate parents of the given RDD. * * This function will not return more distant ancestors. For example, if C has a shuffle * dependency on B which has a shuffle dependency on A: * * A <-- B <-- C * * calling this function with rdd C will only return the B <-- C dependency. * * This function is scheduler-visible for the purpose of unit testing. */ private[scheduler] def getShuffleDependencies( rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = { val parents = new HashSet[ShuffleDependency[_, _, _]] val visited = new HashSet[RDD[_]] val waitingForVisit = new ListBuffer[RDD[_]] waitingForVisit += rdd while (waitingForVisit.nonEmpty) { val toVisit = waitingForVisit.remove(0) if (!visited(toVisit)) { visited += toVisit toVisit.dependencies.foreach { case shuffleDep: ShuffleDependency[_, _, _] => parents += shuffleDep case dependency => waitingForVisit.prepend(dependency.rdd) } } } parents }
任務劃分
RDD 任務切分爲:Application、Job、Stage 和 Taskcode
- Application:初始化一個 SparkContext 即生成一個 Application;
- Job:一個 Action 算子就會生成一個 Job;
- Stage:Stage 等於寬依賴(ShuffleDependency)的個數加 1;
- Task:一個 Stage 階段中,最後一個 RDD 的分區個數就是 Task 的個數。
注意:Application->Job->Stage->Task 每一層都是 1 對 n 的關係。blog