spark學習04- RDD 依賴關係

依賴關係

基本概念

RDD的依賴關係有一種相似於上下文之間的聯繫,這種關係也是存在於各個RDD算子間的,相鄰兩個RDD間的關係被稱做依賴關係,多個連續的RDD之間的關係,被稱做血緣關係。
每一個RDD都會保存血緣關係,就像是知道本身的父親是誰,本身的父親的父親是誰同樣。 緩存

RDD不會保存數據,所以當一個算子出錯的時候,爲了可以提升容錯性,須要經過算子間的依賴關係找到數據源頭,再按順序執行,從而從新讀取計算。ide

def main(args: Array[String]): Unit = {
    val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
    val sc = new SparkContext(sparConf)

    val lines: RDD[String] = sc.makeRDD(List("hello world","hello spark"))
    println(lines.toDebugString)
    println("*************************")
    val words: RDD[String] = lines.flatMap(_.split(" "))
    println(words.toDebugString)
    println("*************************")
    val wordToOne = words.map(word=>(word,1))
    println(wordToOne.toDebugString)
    println("*************************")
    val wordToSum: RDD[(String, Int)] = wordToOne.reduceByKey(_+_)
    println(wordToSum.toDebugString)
    println("*************************")
    val array: Array[(String, Int)] = wordToSum.collect()
    array.foreach(println)
    sc.stop()
  }

輸出的血緣關係日誌以下:this

(1) ParallelCollectionRDD[0] at makeRDD at RDD_Dependence_01.scala:13 []
*************************
(1) MapPartitionsRDD[1] at flatMap at RDD_Dependence_01.scala:16 []
 |  ParallelCollectionRDD[0] at makeRDD at RDD_Dependence_01.scala:13 []
*************************
(1) MapPartitionsRDD[2] at map at RDD_Dependence_01.scala:19 []
 |  MapPartitionsRDD[1] at flatMap at RDD_Dependence_01.scala:16 []
 |  ParallelCollectionRDD[0] at makeRDD at RDD_Dependence_01.scala:13 []
*************************
(1) ShuffledRDD[3] at reduceByKey at RDD_Dependence_01.scala:22 []
 +-(1) MapPartitionsRDD[2] at map at RDD_Dependence_01.scala:19 []
    |  MapPartitionsRDD[1] at flatMap at RDD_Dependence_01.scala:16 []
    |  ParallelCollectionRDD[0] at makeRDD at RDD_Dependence_01.scala:13 []
*************************

寬依賴和窄依賴

窄依賴

窄依賴指的是父RDD的分區數據只提供給一個對應的子RDD的分區spa

寬依賴

寬依賴指的是父RDD的分區數據提供給多個對應的子RDD的分區,當父RDD有Shuffle操做的時候,父RDD與子RDD的依賴關係一定是寬依賴,所以其也被稱爲Shuffle依賴。scala

階段劃分

DAG(Directed Acyclic Graph)有向無環圖是由點和線組成的拓撲圖形,該圖形具備方向, 不會閉環。例如,DAG 記錄了 RDD 的轉換過程和任務的階段。3d

DAGScheduler部分源碼解釋了任務的階段劃分過程:日誌

  1. 在handleJobSubmitted方法有一個傳入參數爲finalRDD,經過 finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) 方法,能夠看出不管有多少個RDD,都會默認經過最終的RDD去建立一個resultStage。
  2. 以後createResultStage調用了getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage]方法,經過 getShuffleDependencies( rdd: RDD[_]) 返回依賴關係的鏈式結構(ShuffleDependency的存儲map),如: A <-- B <-- C
  3. 遍歷ShuffleDependency的存儲map,經過getOrCreateShuffleMapStage(shuffleDep, firstJobId) 去建立階段,這裏經過firstJobId去作關聯,緩存的stage在shuffleIdToMapStage中。
/**
   * Create a ResultStage associated with the provided jobId.
   */
  private def createResultStage(
      rdd: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      jobId: Int,
      callSite: CallSite): ResultStage = {
    checkBarrierStageWithDynamicAllocation(rdd)
    checkBarrierStageWithNumSlots(rdd)
    checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
    val parents = getOrCreateParentStages(rdd, jobId) //這裏調用
    val id = nextStageId.getAndIncrement()
    val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
    stageIdToStage(id) = stage
    updateJobIdStageIdMaps(jobId, stage)
    stage
  }

  /**
   * Get or create the list of parent stages for a given RDD.  The new Stages will be created with
   * the provided firstJobId.
   */
  private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
    getShuffleDependencies(rdd).map { shuffleDep =>
      getOrCreateShuffleMapStage(shuffleDep, firstJobId)
    }.toList
  }
  
    /**
   * Returns shuffle dependencies that are immediate parents of the given RDD.
   *
   * This function will not return more distant ancestors.  For example, if C has a shuffle
   * dependency on B which has a shuffle dependency on A:
   *
   * A <-- B <-- C
   *
   * calling this function with rdd C will only return the B <-- C dependency.
   *
   * This function is scheduler-visible for the purpose of unit testing.
   */
  private[scheduler] def getShuffleDependencies(
      rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
    val parents = new HashSet[ShuffleDependency[_, _, _]]
    val visited = new HashSet[RDD[_]]
    val waitingForVisit = new ListBuffer[RDD[_]]
    waitingForVisit += rdd
    while (waitingForVisit.nonEmpty) {
      val toVisit = waitingForVisit.remove(0)
      if (!visited(toVisit)) {
        visited += toVisit
        toVisit.dependencies.foreach {
          case shuffleDep: ShuffleDependency[_, _, _] =>
            parents += shuffleDep
          case dependency =>
            waitingForVisit.prepend(dependency.rdd)
        }
      }
    }
    parents
  }

任務劃分

RDD 任務切分爲:Application、Job、Stage 和 Taskcode

  • Application:初始化一個 SparkContext 即生成一個 Application;
  • Job:一個 Action 算子就會生成一個 Job;
  • Stage:Stage 等於寬依賴(ShuffleDependency)的個數加 1;
  • Task:一個 Stage 階段中,最後一個 RDD 的分區個數就是 Task 的個數。

注意:Application->Job->Stage->Task 每一層都是 1 對 n 的關係。blog

相關文章
相關標籤/搜索