spark 源碼分析之一 -- RDD的四種依賴關係

RDD的四種依賴關係

RDD四種依賴關係,分別是 ShuffleDependency、PrunDependency、RangeDependency和OneToOneDependency四種依賴關係。以下圖所示:org.apache.spark.Dependency有兩個一級子類,分別是 ShuffleDependency 和 NarrowDependency。其中,NarrowDependency 是一個抽象類,它有三個實現類,分別是OneToOneDependency、RangeDependency和 PruneDependency。apache

  

RDD的窄依賴

咱們先來看窄RDD是如何肯定依賴的父RDD的分區的呢?NarrowDependency 定義了一個抽象方法,以下:網絡

/**
   * Get the parent partitions for a child partition.
   * @param partitionId a partition of the child RDD
   * @return the partitions of the parent RDD that the child partition depends upon
   */
  def getParents(partitionId: Int): Seq[Int]

其輸入參數是子RDD 的 分區Id,輸出是子RDD 分區依賴的父RDD 的 partition 的 id 序列。less

下面,分別看三種子類的實現:ide

OneToOneDependency

首先,OneToOneDependency的getParent實現以下:函數

override def getParents(partitionId: Int): List[Int] = List(partitionId)

就一行代碼,實現比較簡單,子RDD對應的partition index 跟父 RDD 的partition 的 index 同樣。至關於父RDD 的 每個partition 複製到 子RDD 的對應分區中,分區的關係是一對一的。RDD的關係也是一對一的。ui

RangeDependency

其次,RangeDependency的 getParent 實現以下:this

  

/**
 * :: DeveloperApi ::
 * Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
 * @param rdd the parent RDD
 * @param inStart the start of the range in the parent RDD
 * @param outStart the start of the range in the child RDD
 * @param length the length of the range
 */
@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
  extends NarrowDependency[T](rdd) {

  override def getParents(partitionId: Int): List[Int] = {
    if (partitionId >= outStart && partitionId < outStart + length) {
      List(partitionId - outStart + inStart)
    } else {
      Nil
    }
  }
}

首先解釋三個變量:inStart:父RDD  range 的起始位置;outStart:子RDD range 的起始位置;length:range 的長度。spa

獲取 父RDD 的partition index 的規則是:若是子RDD 的 partition index 在父RDD 的range 內,則返回的 父RDD partition是 子RDD partition index - 父 RDD 分區range 起始 + 子RDD 分區range 起始。其中,(- 父 RDD 分區range 起始 + 子RDD 分區range 起始)即 子RDD 的分區的 range 起始位置和 父RDD 的分區的 range 的起始位置 的相對距離。子RDD 的 parttion index 加上這個相對距離就是 對應父的RDD partition。不然是無依賴的父 RDD 的partition index。父子RDD的分區關係是一對一的。RDD 的關係多是一對一(length 是1 ,就是特殊的 OneToOneDependency),也多是多對一,也多是一對多。code

PruneDependency

最後,PruneDependency的 getParent 實現以下:blog

 1 /**
 2  * Represents a dependency between the PartitionPruningRDD and its parent. In this
 3  * case, the child RDD contains a subset of partitions of the parents'.
 4  */
 5 private[spark] class PruneDependency[T](rdd: RDD[T], partitionFilterFunc: Int => Boolean)
 6   extends NarrowDependency[T](rdd) {
 7 
 8   @transient
 9   val partitions: Array[Partition] = rdd.partitions
10     .filter(s => partitionFilterFunc(s.index)).zipWithIndex
11     .map { case(split, idx) => new PartitionPruningRDDPartition(idx, split) : Partition }
12 
13   override def getParents(partitionId: Int): List[Int] = {
14     List(partitions(partitionId).asInstanceOf[PartitionPruningRDDPartition].parentSplit.index)
15   }
16 }

首先,解釋三個變量: rdd 是指向父RDD 的實例引用;partitionFilterFunc 是一個回調函數,做用是過濾出符合條件的父 RDD 的 partition 集合;PartitionPruningRDDPartition類聲明以下:

private[spark] class PartitionPruningRDDPartition(idx: Int, val parentSplit: Partition)
  extends Partition {
  override val index = idx
}

partitions的生成過程以下: 先根據父RDD 引用獲取父RDD 對應的 partition集合,而後根據過濾函數和partition index ,過濾出想要的父RDD 的 partition 集合而且從0 開始編號,最後,根據父RDD 的 partition 和 新編號實例化新的PartitionPruningRDDPartition實例,並放入到 partitions 集合中,至關因而先對parent RDD 的分區作Filter 剪枝操做。

在getParent 方法中, 先根據子RDD 的 partition index 獲取 到對應的 parent RDD 的對應分區,而後獲取Partition 的成員函數 index,該index 就是 父RDD 的 partition 在父RDD 的全部分區中的 index。 子RDD partition 和 父RDD partition的關係是 一對一的, 父RDD 和子RDD 的關係是 多對一,也多是一對多,也多是一對一。

簡言之,在窄依賴中,子RDD 的partition 和 父RDD 的 partition 的關係是 一對一的。

RDD的寬依賴

下面重點看 ShuffleDependency,ShuffleDependency表明的是 一個 shuffle stage 的輸出。先來看其構造方法,即其依賴的變量或實例:

1 @DeveloperApi
2 class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
3     @transient private val _rdd: RDD[_ <: Product2[K, V]],
4     val partitioner: Partitioner,
5     val serializer: Serializer = SparkEnv.get.serializer,
6     val keyOrdering: Option[Ordering[K]] = None,
7     val aggregator: Option[Aggregator[K, V, C]] = None,
8     val mapSideCombine: Boolean = false)
9   extends Dependency[Product2[K, V]]

其中,_rdd 代指父RDD實例;partitioner是用於給shuffle的輸出分區的分區器;serializer,主要用於序列化,默認是org.apache.spark.serializer.JavaSerializer,能夠經過`spark.serializer` 參數指定;keyOrdering RDD shuffle的key 的順序。aggregator,map或reduce 端用於RDD shuffle的combine聚合器;mapSideCombine 是否執行部分的聚合(即 map端的預聚合,能夠提升網絡傳輸效率和reduce 端的執行效率),默認是false。由於並非全部的都適合這樣作。好比求全局平均值,均值,平方差等,但像全局最大值,最小值等是適合用mapSideCombine 的。注意,當mapSideCombine 爲 true時, 必須設置combine聚合器,由於 shuffle 前須要使用聚合器作 map-combine 操做。

partitioner的7種實現

partitioner 定義了 RDD 裏的key-value 對 是如何按 key 來分區的。映射每個 key 到一個分區 id,從 0 到 分區數 - 1; 注意,分區器必須是肯定性的,即給定同一個 key,必須返回同一個分區,便於任務失敗時,追溯分區數據,確保了每個要參與計算的分區數據的一致性。即 partition 肯定了 shuffle 過程當中 數據是要流向哪一個具體的分區的。

org.apache.spark.Partition的 7 個實現類以下:

  

咱們先來看Partitioner 的方法定義:

1 abstract class Partitioner extends Serializable {
2   def numPartitions: Int
3   def getPartition(key: Any): Int
4 }

其中,numPartitions 是返回 子RDD 的 partition 數量;getPartition 會根據指定的 key 返回 子RDD 的 partition index。

HashPartitioner 的 getPartition 的 實現以下,思路是 key.hashcode() mod 子RDD的 partition 數量:

1 def getPartition(key: Any): Int = key match {
2     case null => 0
3     case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
4   }

RangePartitioner 的 getPartition 的實現以下:

 1 def getPartition(key: Any): Int = {
 2     val k = key.asInstanceOf[K]
 3     var partition = 0
 4     if (rangeBounds.length <= 128) { // 不大於 128 分區
 5       // If we have less than 128 partitions naive search
 6       while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
 7         partition += 1
 8       }
 9     } else { // 大於 128 個分區數量
10       // Determine which binary search method to use only once.
11       partition = binarySearch(rangeBounds, k) // 二分查找
12       // binarySearch either returns the match location or -[insertion point]-1
13       if (partition < 0) {
14         partition = -partition-1
15       }
16       if (partition > rangeBounds.length) {
17         partition = rangeBounds.length
18       }
19     }
20     if (ascending) {
21       partition
22     } else {
23       rangeBounds.length - partition
24     }
25   }

PythonPartitioner 的 getPartition 以下,跟hash 很類似:

1 override def getPartition(key: Any): Int = key match {
2     case null => 0
3     // we don't trust the Python partition function to return valid partition ID's so
4     // let's do a modulo numPartitions in any case
5     case key: Long => Utils.nonNegativeMod(key.toInt, numPartitions)
6     case _ => Utils.nonNegativeMod(key.hashCode(), numPartitions)
7   }

PartitionIdPassthrough 的 getPartition 以下:

1 override def getPartition(key: Any): Int = key.asInstanceOf[Int]

GridPartitioner 的 getPartition 以下,思想,二元組定位到網格的partition:

 1 override val numPartitions: Int = rowPartitions * colPartitions
 2 
 3   /**
 4    * Returns the index of the partition the input coordinate belongs to.
 5    *
 6    * @param key The partition id i (calculated through this method for coordinate (i, j) in
 7    *            `simulateMultiply`, the coordinate (i, j) or a tuple (i, j, k), where k is
 8    *            the inner index used in multiplication. k is ignored in computing partitions.
 9    * @return The index of the partition, which the coordinate belongs to.
10    */
11   override def getPartition(key: Any): Int = {
12     key match {
13       case i: Int => i
14       case (i: Int, j: Int) =>
15         getPartitionId(i, j)
16       case (i: Int, j: Int, _: Int) =>
17         getPartitionId(i, j)
18       case _ =>
19         throw new IllegalArgumentException(s"Unrecognized key: $key.")
20     }
21   }
22 
23   /** Partitions sub-matrices as blocks with neighboring sub-matrices. */
24   private def getPartitionId(i: Int, j: Int): Int = {
25     require(0 <= i && i < rows, s"Row index $i out of range [0, $rows).")
26     require(0 <= j && j < cols, s"Column index $j out of range [0, $cols).")
27     i / rowsPerPart + j / colsPerPart * rowPartitions
28   }

包括匿名類,還有好多種,就不一一介紹了。總而言之,寬依賴是根據partitioner 肯定 分區內的數據具體到哪一個分區。

至此,RDD 的窄依賴和寬依賴都介紹清楚了。

相關文章
相關標籤/搜索