RDD依賴關係爲成兩種:窄依賴(Narrow Dependency)、寬依賴(Shuffle Dependency)。窄依賴表示每一個父RDD中的Partition最多被子RDD的一個Partition所使用;寬依賴表示一個父RDD的Partition都會被多個子RDD的Partition所使用。數組
RDD的窄依賴(Narrow Dependency)是RDD中最多見的依賴關係,用來表示每個父RDD中的Partition最多被子RDD的一個Partition所使用,以下圖所示,父RDD有2~3個Partition,每個分區都只對應子RDD的一個Partition(join with inputs co-partitioned:對數據進行基於相同Key的數值相加)。app
窄依賴分爲兩類:第一類是一對一的依賴關係,在Spark中用OneToOneDependency來表示父RDD與子RDD的依賴關係是一對一的依賴關係,如map、filter、join with inputs co-partitioned;第二類是範圍依賴關係,在Spark中用RangeDependency表示,表示父RDD與子RDD的一對一的範圍內依賴關係,如union。OneToOneDependency依賴關係的Dependency.scala的源碼以下。ide
/** * :: DeveloperApi :: * Represents a one-to-one dependency between partitions of the parent and child RDDs. */ @DeveloperApi class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) { override def getParents(partitionId: Int): List[Int] = List(partitionId) }
OneToOneDependency的getParents重寫方法引入了參數partitionId,而在具體的方法中也使用了這個參數,這代表子RDD在使用getParents方法的時候,查詢的是相同partitionId的內容。也就是說,子RDD僅僅依賴父RDD中相同partitionID的Partition。oop
Spark窄依賴中第二種依賴關係是RangeDependency。Dependency.scala的RangeDependency的源碼以下。優化
/** * :: DeveloperApi :: * Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs. * @param rdd the parent RDD * @param inStart the start of the range in the parent RDD * @param outStart the start of the range in the child RDD * @param length the length of the range */ @DeveloperApi class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int) extends NarrowDependency[T](rdd) { override def getParents(partitionId: Int): List[Int] = { if (partitionId >= outStart && partitionId < outStart + length) { List(partitionId - outStart + inStart) } else { Nil } } }
RangeDependency和OneToOneDependency最大的區別是實現方法中出現了outStart、length、instart,子RDD在經過getParents方法查詢對應的Partition時,會根據這個partitionId減去插入時的開始ID,再加上它在父RDD中的位置ID,換而言之,就是將父RDD中的Partition,根據partitionId的順序依次插入到子RDD中。ui
分析完Spark中的源碼,下邊經過兩個例子來說解從實例角度去看RDD窄依賴輸出的結果。對於OneToOneDependency,採用map操做進行實驗,實驗代碼和結果以下所示。this
val sparkSession = SparkSession.builder().master("local").appName("wordcount").getOrCreate() val sc = sparkSession.sparkContext sc.setLogLevel("WARN") // val people = sparkSession.read.parquet("...").as[Person] val num = Array(100,80,70) val rddnum1 = sc.parallelize(num) val mapRdd = rddnum1.map(_*2) mapRdd.collect().foreach(println)
對於RangeDependency,採用union操做進行實驗,實驗代碼和結果以下所示。spa
val sparkSession = SparkSession.builder().master("local").appName("wordcount").getOrCreate() val sc = sparkSession.sparkContext sc.setLogLevel("WARN") // 建立數組1 val data1 = Array("spark","scala","hadoop") // 建立數組2 val data2 = Array("SPARK","SCALA","HADOOP") // 將數組1的數據造成RDD1 val rdd1 = sc.parallelize(data1) // 將數組2的數據造成RDD2 val rdd2 = sc.parallelize(data2) // 把RDD1與RDD2聯合 val unionRdd = rdd1.union(rdd2) // 將結果收集並輸出 unionRdd.collect().foreach(println)
RDD的寬依賴(Shuffle Dependency)是一種會致使計算時產生Shuffle操做的RDD操做,用來表示一個父RDD的Partition都會被多個子RDD的Partition使用,以下圖中groupByKey算子操做所示,父RDD有3個Partition,每一個Partition中的數據會被子RDD中的兩個Partition使用。scala
寬依賴的源碼位於Dependency.scala文件的ShuffleDependency方法中,newShuffleId()產生了新的shuffleId,代表寬依賴過程須要涉及shuffle操做,後續的代碼表示寬依賴進行時的shuffle操做須要向shuffleManager註冊信息。Dependency.scala的ShuffleDependency的源碼以下。3d
@DeveloperApi class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( @transient private val _rdd: RDD[_ <: Product2[K, V]], val partitioner: Partitioner, val serializer: Serializer = SparkEnv.get.serializer, val keyOrdering: Option[Ordering[K]] = None, val aggregator: Option[Aggregator[K, V, C]] = None, val mapSideCombine: Boolean = false) extends Dependency[Product2[K, V]] { override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]] private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName // Note: It's possible that the combiner class tag is null, if the combineByKey // methods in PairRDDFunctions are used instead of combineByKeyWithClassTag. private[spark] val combinerClassName: Option[String] = Option(reflect.classTag[C]).map(_.runtimeClass.getName) val shuffleId: Int = _rdd.context.newShuffleId() val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle( shuffleId, _rdd.partitions.length, this) _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this)) @DeveloperApi class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( @transient private val _rdd: RDD[_ <: Product2[K, V]], val partitioner: Partitioner, val serializer: Serializer = SparkEnv.get.serializer, val keyOrdering: Option[Ordering[K]] = None, val aggregator: Option[Aggregator[K, V, C]] = None, val mapSideCombine: Boolean = false) extends Dependency[Product2[K, V]] { override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]] private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName // Note: It's possible that the combiner class tag is null, if the combineByKey // methods in PairRDDFunctions are used instead of combineByKeyWithClassTag. // 注意:若是在PairRDDFunctions方法中使用combineByKeyWithClassTag,combiner類標籤可能爲空 private[spark] val combinerClassName: Option[String] = Option(reflect.classTag[C]).map(_.runtimeClass.getName) val shuffleId: Int = _rdd.context.newShuffleId() val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle( shuffleId, _rdd.partitions.length, this) _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this)) }
Spark中寬依賴關係很是常見,其中較經典的操做爲GroupByKey(將輸入的key-value類型的數據進行分組,對相同key的value值進行合併,生成一個tuple2),具體代碼和操做結果以下所示。輸入5個tuple2類型的數據,經過運行產生3個tuple2數據。
val sparkSession = SparkSession.builder().master("local").appName("wordcount").getOrCreate() val sc = sparkSession.sparkContext sc.setLogLevel("WARN") val data = Array(Tuple2("spark",100),Tuple2("spark",95),Tuple2("hadoop",99),Tuple2("hadoop",80),Tuple2("scala",75)) val rdd = sc.parallelize(data) val rddGroup = rdd.groupByKey() rddGroup.collect().foreach(println)
在圖論中,若是一個有向圖沒法從任意頂點出發通過若干條邊回到該點,則這個圖是一個有向無環圖(DAG圖)。而在Spark中,因爲計算過程不少時候會有前後順序,受制於某些任務必須比另外一些任務較早執行的限制,咱們必須對任務進行排隊,造成一個隊列的任務集合,這個隊列的任務集合就是DAG圖,每個定點就是一個任務,每一條邊表明一種限制約束(Spark中的依賴關係)。
經過DAG,Spark能夠對計算的流程進行優化,對於數據處理,能夠將在單一節點上進行的計算操做進行合併,而且計算中間數據經過內存進行高效讀寫,對於數據處理,須要涉及Shuffle操做的步驟劃分Stage,從而使計算資源的利用更加高效和合理,減小計算資源的等待過程,減小計算中間數據讀寫產生的時間浪費(基於內存的高效讀寫)。
Spark中DAG生成過程的重點是對Stage的劃分,其劃分的依據是RDD的依賴關係,對於不一樣的依賴關係,高層調度器會進行不一樣的處理。對於窄依賴,RDD之間的數據不須要進行Shuffle,多個數據處理能夠在同一臺機器的內存中完成,因此窄依賴在Spark中被劃分爲同一個Stage;對於寬依賴,因爲Shuffle的存在,必須等到父RDD的Shuffle處理完成後,才能開始接下來的計算,因此會在此處進行Stage的切分。
在Spark中,DAG生成的流程關鍵在於回溯,在程序提交後,高層調度器將全部的RDD當作是一個Stage,而後對此Stage進行從後往前的回溯,遇到Shuffle就斷開,遇到窄依賴,則歸併到同一個Stage。等到全部的步驟回溯完成,便生成一個DAG圖。
DAG生成的相關源碼位於Spark的DAGScheduler.scala。getParentStages獲取或建立一個給定RDD的父Stages列表,getParentStages調用了getShuffleMapStage,,getShuffleMapStage調用了getAncestorShuffleDependencies,getAncestorShuffleDependencies返回給定RDD的父節點中直接的Shuffle依賴。DAGScheduler.scala的getParentStages的源碼以下。
/** * Get or create the list of parent stages for a given RDD. The new Stages will be created with * the provided firstJobId. */ private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = { val parents = new HashSet[Stage] val visited = new HashSet[RDD[_]] // We are manually maintaining a stack here to prevent StackOverflowError // caused by recursively visiting val waitingForVisit = new Stack[RDD[_]] def visit(r: RDD[_]) { if (!visited(r)) { visited += r // Kind of ugly: need to register RDDs with the cache here since // we can't do it in its constructor because # of partitions is unknown for (dep <- r.dependencies) { dep match { case shufDep: ShuffleDependency[_, _, _] => parents += getShuffleMapStage(shufDep, firstJobId) case _ => waitingForVisit.push(dep.rdd) } } } } waitingForVisit.push(rdd) while (waitingForVisit.nonEmpty) { visit(waitingForVisit.pop()) } parents.toList }
DAGScheduler.scala的getShuffleMapStage的源碼以下。
/** * Get or create a shuffle map stage for the given shuffle dependency's map side. */ private def getShuffleMapStage( shuffleDep: ShuffleDependency[_, _, _], firstJobId: Int): ShuffleMapStage = { shuffleToMapStage.get(shuffleDep.shuffleId) match { case Some(stage) => stage case None => // We are going to register ancestor shuffle dependencies getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep => if (!shuffleToMapStage.contains(dep.shuffleId)) { shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId) } } // Then register current shuffleDep val stage = newOrUsedShuffleStage(shuffleDep, firstJobId) shuffleToMapStage(shuffleDep.shuffleId) = stage stage } }
DAGScheduler.scala的getAncestorShuffleDependencies的源碼以下。
/** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */ private def getAncestorShuffleDependencies(rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = { val parents = new Stack[ShuffleDependency[_, _, _]] val visited = new HashSet[RDD[_]] // We are manually maintaining a stack here to prevent StackOverflowError // caused by recursively visiting val waitingForVisit = new Stack[RDD[_]] def visit(r: RDD[_]) { if (!visited(r)) { visited += r for (dep <- r.dependencies) { dep match { case shufDep: ShuffleDependency[_, _, _] => if (!shuffleToMapStage.contains(shufDep.shuffleId)) { parents.push(shufDep) } case _ => } waitingForVisit.push(dep.rdd) } } } waitingForVisit.push(rdd) while (waitingForVisit.nonEmpty) { visit(waitingForVisit.pop()) } parents }
下面經過一個簡單計數案例講解DAG具體的生成流程和關係。示例代碼以下。
val conf = new SparkConf() conf.setAppName("My first spark app").setMaster("local") val sc = new SparkContext(conf) sc.setLogLevel("WARN") val lines = sc.textFile("./src/test3/words.txt") // 操做一 經過flatmap造成新的MapPartitionRDD val words = lines.flatMap(lines=>lines.split(" ")) // 操做二 經過map造成新的MapPartitionRDD val pairs = words.map(word=>(word,1)) // 操做三 reduceByKey(包含兩步reduce) // 此步驟生成MapPartitionRDD和ShuffleRDD val WordCounts = pairs.reduceByKey(_+_) WordCounts.collect().foreach(println) println(pairs.toDebugString) // 經過toDebugString查看RDD的譜系 println("====================================================") println(WordCounts.toDebugString) println("====================================================") sc.stop()
具體解釋爲:在程序正式運行前,Spark的DAG調度器會將整個流程設定爲一個Stage,此Stage包含3個操做,5個RDD,分別爲MapPartitionRDD(讀取文件數據時)、MapPartitionRDD(flatMap操做)、MapPartitionRDD(map操做)、MapPartitionRDD(reduceByKey的local段的操做)、ShuffleRDD(reduceByKeyshuffle操做)。
(1)回溯整個流程,在shuffleRDD與MapPartitionRDD(reduceByKey的local段的操做)中存在shuffle操做,整個RDD先在此切開,造成兩個Stage。
(2)繼續向前回溯,MapPartitionRDD(reduceByKey的local段的操做)與MapPartitionRDD (map操做)中間不存在Shuffle(即兩個RDD的依賴關係爲窄依賴),歸爲同一個Stage。
(3)繼續回溯,發現往前的全部的RDD之間都不存在Shuffle,應歸爲同一個Stage。
(4)回溯完成,造成DAG,由兩個Stage構成:
第一個Stage由MapPartitionRDD(讀取文件數據時)、MapPartitionRDD(flatMap操做)、MapPartitionRDD(map操做)、MapPartitionRDD(reduceByKey的local段的操做)構成。第二個Stage由ShuffleRDD(reduceByKey Shuffle操做)構成。