Spark RDD 窄依賴研究

1.. 簡介ide

  spark從RDD依賴上來講分爲窄依賴和寬依賴。spa

其中能夠這樣區分是哪一種依賴:當父RDD的一個partition被子RDD的多個partitions引用到的時候則說明是寬依賴,不然爲窄依賴。scala

寬依賴會觸發shuffe,寬依賴也是一個job鐘不一樣stage的分界線。code

本篇文章主要討論一下窄依賴的場景。blog

 

2.依賴關係的創建ip

  字RDD內部維護着父RDD的依賴關係,下列是依賴的抽象類,其中屬性rdd就是父RDDci

/**
 * :: DeveloperApi ::
 * Base class for dependencies.
 */
@DeveloperApi
abstract class Dependency[T] extends Serializable {
  def rdd: RDD[T]
}

  

3.窄依賴的三種形式:get

  窄依賴的抽象類以下:input

/**
 * :: DeveloperApi ::
 * Base class for dependencies where each partition of the child RDD depends on a small number
 * of partitions of the parent RDD. Narrow dependencies allow for pipelined execution.
 */
@DeveloperApi
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
  /**
   * Get the parent partitions for a child partition.
   * @param partitionId a partition of the child RDD
   * @return the partitions of the parent RDD that the child partition depends upon
   */
  def getParents(partitionId: Int): Seq[Int]

  override def rdd: RDD[T] = _rdd
}

 

  窄依賴形式一:MAP,Filter....it

 

如上兩個RDD的轉換時經過MAP或者Filter等轉換的,RDD的各個partition都是一一對應的,從執行時能夠並行化的。

子RDD的分區依賴的父RDD的分區ID是同樣不會有變化,這樣的窄依賴實現類以下:

/**
 * :: DeveloperApi ::
 * Represents a one-to-one dependency between partitions of the parent and child RDDs.
 */
@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
  override def getParents(partitionId: Int): List[Int] = List(partitionId) //子RDD的某個分區ID是和父RDD的分區ID是一致的
}

 

  

  窄依賴方式二:UNION

先來看看其實現類:

/**
 * :: DeveloperApi ::
 * Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
 * @param rdd the parent RDD
 * @param inStart the start of the range in the parent RDD
 * @param outStart the start of the range in the child RDD
 * @param length the length of the range
 */
@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
  extends NarrowDependency[T](rdd) {

  override def getParents(partitionId: Int): List[Int] = {
    if (partitionId >= outStart && partitionId < outStart + length) {
      List(partitionId - outStart + inStart)
    } else {
      Nil
    }
  }

 

一開始並很差理解上述代碼,可參考下圖,下圖中將各個參數的意義圖形化展現:

因此上述中子RDD分區中的位號(partitionid)和父RDD的位置號(partitionid)相對的差值 (outStart-inStart)

if (partitionId >= outStart && partitionId < outStart + length) 這段代碼的意義:檢查當前子RDD分區ID是否在當前父RDD下的範圍內
partitionId - outStart + inStart 的意思是:當前子RDD分區id(位置號)與差值相減得出其在父RDD上的分區位置號(id)其實就是:partitionId - (outStart-inStart)


窄依賴方式三:join with inputs co-partitioned此場景適用於窄依賴方式一。
相關文章
相關標籤/搜索