RDD的依賴關係?node
RDD和它依賴的parent RDD(s)的關係有兩種不一樣的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)。apache
1)窄依賴指的是每個parent RDD的Partition最多被子RDD的一個Partition使用,如圖1所示。數據結構
2)寬依賴指的是多個子RDD的Partition會依賴同一個parent RDD的Partition,如圖2所示。ide
RDD做爲數據結構,本質上是一個只讀的分區記錄集合。一個RDD能夠包含多個分區,每一個分區就是一個dataset片斷。RDD能夠相互依賴。this
1)若是RDD的每一個分區最多隻能被一個Child RDD的一個分區使用,則稱之爲narrow dependency。spa
2)若是多個Child RDD分區均可以依賴,則稱之爲wide / shuffle dependency 。code
Spark之因此將依賴分爲narrow 和 shuffle / wide 。基於兩點緣由排序
一、首先,narrow dependencies能夠支持在同一個cluster node上,以pipeline形式執行多條命令,例如在執行了map後,緊接着執行filter。ip
相反,shuffle / wide dependencies 須要全部的父分區都是可用的,可能還須要調用相似MapReduce之類的操做進行跨節點傳遞。ci
二、其次,則是從失敗恢復的角度考慮。 narrow dependencies的失敗恢復更有效,由於它只須要從新計算丟失的parent partition便可,並且能夠並行地在不一樣節點進行重計算。
相反,shuffle / wide dependencies 牽涉RDD各級的多個parent partition。
圖 1 RDD的窄依賴 圖 2 RDD的寬依賴
接下來能夠從不一樣類型的轉換來進一步理解RDD的窄依賴和寬依賴的區別,如圖3所示。
對於map和filter形式的轉換來講,它們只是將Partition的數據根據轉換的規則進行轉化,並不涉及其餘的處理,能夠簡單地認爲只是將數據從一個形式轉換到另外一個形式。對於union,只是將多個RDD合併成一個,parent RDD的Partition(s)不會有任何的變化,能夠認爲只是把parent RDD的Partition(s)簡單進行復制與合併。對於join,若是每一個Partition僅僅和已知的、特定的Partition進行join,那麼這個依賴關係也是窄依賴。對於這種有規則的數據的join,並不會引入昂貴的Shuffle。對於窄依賴,因爲RDD每一個Partition依賴固定數量的parent RDD(s)的Partition(s),所以能夠經過一個計算任務來處理這些Partition,而且這些Partition相互獨立,這些計算任務也就能夠並行執行了。
對於groupByKey,子RDD的全部Partition(s)會依賴於parent RDD的全部Partition(s),子RDD的Partition是parent RDD的全部Partition Shuffle的結果,所以這兩個RDD是不能經過一個計算任務來完成的。一樣,對於須要parent RDD的全部Partition進行join的轉換,也是須要Shuffle,這類join的依賴就是寬依賴而不是前面提到的窄依賴了。
不一樣的操做依據其特性,可能會產生不一樣的依賴。例如map、filter操做會產生 narrow dependency 。reduceBykey操做會產生 wide / shuffle dependency。
通俗點來講,RDD的每一個Partition,僅僅依賴於父RDD中的一個Partition,這纔是窄。 就這麼簡單!
反正子Rdd的partition和父Rdd的Partition若是是一對一就是窄依賴,這樣理解就好區分了 !!!
我之前總感受這是窄依賴,其實 Rdd1的partition0依賴父Rdd0的 partition0和partition1,因此是寬依賴!
全部的依賴都要實現trait Dependency[T]:
abstract class Dependency[T] extends Serializable { def rdd: RDD[T] }
其中rdd就是依賴的parent RDD。
對於窄依賴的實現(有兩種)
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] { //返回子RDD的partitionId依賴的全部的parent RDD的Partition(s) def getParents(partitionId: Int): Seq[Int] override def rdd: RDD[T] = _rdd }
窄依賴是有兩種具體實現,分別以下:
·一種是一對一的依賴,即OneToOneDependency:
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) { override def getParents(partitionId: Int) = List(partitionId)
經過getParents的實現不難看出,RDD僅僅依賴於parent RDD相同ID的Partition。
還有一個是範圍的依賴,即RangeDependency,它僅僅被org.apache.spark.rdd.UnionRDD使用。UnionRDD是把多個RDD合成一個RDD,這些RDD是被拼接而成,即每一個parent RDD的Partition的相對順序不會變,只不過每一個parent RDD在UnionRDD中的Partition的起始位置不一樣。所以它的getPartents以下:
override def getParents(partitionId: Int) = { if(partitionId >= outStart && partitionId < outStart + length) { List(partitionId - outStart + inStart) } else { Nil } }
其中,inStart是parent RDD中Partition的起始位置,outStart是在UnionRDD中的起始位置,length就是parent RDD中Partition的數量。
對於寬依賴的實現(只有一種)
寬依賴的實現只有一種:ShuffleDependency。子RDD依賴於parent RDD的全部Partition,所以須要Shuffle過程:
class ShuffleDependency[K, V, C]( @transient _rdd: RDD[_ <: Product2[K, V]], val partitioner: Partitioner, val serializer: Option[Serializer] = None, val keyOrdering: Option[Ordering[K]] = None, val aggregator: Option[Aggregator[K, V, C]] = None, val mapSideCombine: Boolean = false) extends Dependency[Product2[K, V]] { override def rdd = _rdd.asInstanceOf[RDD[Product2[K, V]]] //獲取新的shuffleId val shuffleId: Int = _rdd.context.newShuffleId() //向ShuffleManager註冊Shuffle的信息 val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle( shuffleId, _rdd.partitions.size, this) _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this)) }
注意:寬依賴支持兩種Shuffle Manager。
即org.apache.spark.shuffle.hash.HashShuffleManager(基於Hash的Shuffle機制)和org.apache.spark.shuffle.sort.SortShuffleManager(基於排序的Shuffle機制)。