Spark RDD概念學習系列之RDD的依賴關係(寬依賴和窄依賴)(三)

RDD的依賴關係?node

 

     RDD和它依賴的parent RDD(s)的關係有兩種不一樣的類型,即窄依賴(narrow dependency)寬依賴(wide dependency)apache

     1)窄依賴指的是每個parent RDD的Partition最多被子RDD的一個Partition使用,如圖1所示。數據結構

     2)寬依賴指的是多個子RDD的Partition會依賴同一個parent RDD的Partition,如圖2所示。ide

 

  RDD做爲數據結構,本質上是一個只讀的分區記錄集合。一個RDD能夠包含多個分區,每一個分區就是一個dataset片斷。RDD能夠相互依賴。this

  1)若是RDD的每一個分區最多隻能被一個Child RDD的一個分區使用,則稱之爲narrow dependency。spa

  2)若是多個Child RDD分區均可以依賴,則稱之爲wide / shuffle dependency 。code

 

 

Spark之因此將依賴分爲narrow 和 shuffle / wide 。基於兩點緣由排序

  一、首先,narrow dependencies能夠支持在同一個cluster node上,以pipeline形式執行多條命令,例如在執行了map後,緊接着執行filter。ip

    相反,shuffle / wide dependencies 須要全部的父分區都是可用的,可能還須要調用相似MapReduce之類的操做進行跨節點傳遞。ci

     二、其次,則是從失敗恢復的角度考慮。 narrow dependencies的失敗恢復更有效,由於它只須要從新計算丟失的parent partition便可,並且能夠並行地在不一樣節點進行重計算。

       相反,shuffle / wide dependencies 牽涉RDD各級的多個parent partition。

 

 

 

                          

        圖 1  RDD的窄依賴                  圖 2 RDD的寬依賴

 

     接下來能夠從不一樣類型的轉換來進一步理解RDD的窄依賴和寬依賴的區別,如圖3所示。

     對於map和filter形式的轉換來講,它們只是將Partition的數據根據轉換的規則進行轉化,並不涉及其餘的處理,能夠簡單地認爲只是將數據從一個形式轉換到另外一個形式。對於union,只是將多個RDD合併成一個,parent RDD的Partition(s)不會有任何的變化,能夠認爲只是把parent RDD的Partition(s)簡單進行復制與合併。對於join,若是每一個Partition僅僅和已知的、特定的Partition進行join,那麼這個依賴關係也是窄依賴。對於這種有規則的數據的join,並不會引入昂貴的Shuffle。對於窄依賴,因爲RDD每一個Partition依賴固定數量的parent RDD(s)的Partition(s),所以能夠經過一個計算任務來處理這些Partition,而且這些Partition相互獨立,這些計算任務也就能夠並行執行了。

   對於groupByKey,子RDD的全部Partition(s)會依賴於parent RDD的全部Partition(s),子RDD的Partition是parent RDD的全部Partition Shuffle的結果,所以這兩個RDD是不能經過一個計算任務來完成的。一樣,對於須要parent RDD的全部Partition進行join的轉換,也是須要Shuffle,這類join的依賴就是寬依賴而不是前面提到的窄依賴了。

 

 

                  

  不一樣的操做依據其特性,可能會產生不一樣的依賴。例如map、filter操做會產生 narrow dependency 。reduceBykey操做會產生 wide / shuffle dependency。

 

 

 

  通俗點來講,RDD的每一個Partition,僅僅依賴於父RDD中的一個Partition,這纔是窄。 就這麼簡單!

   反正子Rdd的partition和父Rdd的Partition若是是一對一就是窄依賴,這樣理解就好區分了 !!!
 

 

             

     我之前總感受這是窄依賴,其實 Rdd1的partition0依賴父Rdd0的 partition0和partition1,因此是寬依賴

 

 

 

    

全部的依賴都要實現trait Dependency[T]:

abstract class Dependency[T] extends Serializable {
    def rdd: RDD[T]
}

 

  其中rdd就是依賴的parent RDD。

 

 

 

 

 

 對於窄依賴的實現(有兩種)

 

abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
    //返回子RDD的partitionId依賴的全部的parent RDD的Partition(s)
    def getParents(partitionId: Int): Seq[Int]
    override def rdd: RDD[T] = _rdd
}

  窄依賴是有兩種具體實現,分別以下:

 

 

 

·一種是一對一的依賴,即OneToOneDependency

class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
    override def getParents(partitionId: Int) = List(partitionId)

    經過getParents的實現不難看出,RDD僅僅依賴於parent RDD相同ID的Partition。

 

 


  還有一個是範圍的依賴,即RangeDependency,它僅僅被org.apache.spark.rdd.UnionRDD使用。UnionRDD是把多個RDD合成一個RDD,這些RDD是被拼接而成,即每一個parent RDD的Partition的相對順序不會變,只不過每一個parent RDD在UnionRDD中的Partition的起始位置不一樣。所以它的getPartents以下:

複製代碼

override def getParents(partitionId: Int) = {
    if(partitionId >= outStart && partitionId < outStart + length) {
       List(partitionId - outStart + inStart)
    } else {
       Nil
    }
}

複製代碼

  其中,inStart是parent RDD中Partition的起始位置,outStart是在UnionRDD中的起始位置,length就是parent RDD中Partition的數量。

 

 

 

 

 

對於寬依賴的實現(只有一種)

  寬依賴的實現只有一種:ShuffleDependency。子RDD依賴於parent RDD的全部Partition,所以須要Shuffle過程:

複製代碼

class ShuffleDependency[K, V, C](
    @transient _rdd: RDD[_ <: Product2[K, V]],
    val partitioner: Partitioner,
    val serializer: Option[Serializer] = None,
    val keyOrdering: Option[Ordering[K]] = None,
    val aggregator: Option[Aggregator[K, V, C]] = None,
    val mapSideCombine: Boolean = false)
extends Dependency[Product2[K, V]] {
 
override def rdd = _rdd.asInstanceOf[RDD[Product2[K, V]]]
//獲取新的shuffleId
val shuffleId: Int = _rdd.context.newShuffleId()
//向ShuffleManager註冊Shuffle的信息
val shuffleHandle: ShuffleHandle =
_rdd.context.env.shuffleManager.registerShuffle(
    shuffleId, _rdd.partitions.size, this)
 
    _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}

複製代碼

  注意:寬依賴支持兩種Shuffle Manager。

  即org.apache.spark.shuffle.hash.HashShuffleManager(基於Hash的Shuffle機制)org.apache.spark.shuffle.sort.SortShuffleManager(基於排序的Shuffle機制)。

相關文章
相關標籤/搜索