RDD四種依賴關係,分別是 ShuffleDependency、PrunDependency、RangeDependency和OneToOneDependency四種依賴關係。以下圖所示:org.apache.spark.Dependency有兩個一級子類,分別是 ShuffleDependency 和 NarrowDependency。其中,NarrowDependency 是一個抽象類,它有三個實現類,分別是OneToOneDependency、RangeDependency和 PruneDependency。apache
咱們先來看窄RDD是如何肯定依賴的父RDD的分區的呢?NarrowDependency 定義了一個抽象方法,以下:網絡
/** * Get the parent partitions for a child partition. * @param partitionId a partition of the child RDD * @return the partitions of the parent RDD that the child partition depends upon */ def getParents(partitionId: Int): Seq[Int]
其輸入參數是子RDD 的 分區Id,輸出是子RDD 分區依賴的父RDD 的 partition 的 id 序列。less
下面,分別看三種子類的實現:ide
首先,OneToOneDependency的getParent實現以下:函數
override def getParents(partitionId: Int): List[Int] = List(partitionId)
就一行代碼,實現比較簡單,子RDD對應的partition index 跟父 RDD 的partition 的 index 同樣。至關於父RDD 的 每個partition 複製到 子RDD 的對應分區中,分區的關係是一對一的。RDD的關係也是一對一的。ui
其次,RangeDependency的 getParent 實現以下:this
/** * :: DeveloperApi :: * Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs. * @param rdd the parent RDD * @param inStart the start of the range in the parent RDD * @param outStart the start of the range in the child RDD * @param length the length of the range */ @DeveloperApi class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int) extends NarrowDependency[T](rdd) { override def getParents(partitionId: Int): List[Int] = { if (partitionId >= outStart && partitionId < outStart + length) { List(partitionId - outStart + inStart) } else { Nil } } }
首先解釋三個變量:inStart:父RDD range 的起始位置;outStart:子RDD range 的起始位置;length:range 的長度。spa
獲取 父RDD 的partition index 的規則是:若是子RDD 的 partition index 在父RDD 的range 內,則返回的 父RDD partition是 子RDD partition index - 父 RDD 分區range 起始 + 子RDD 分區range 起始。其中,(- 父 RDD 分區range 起始 + 子RDD 分區range 起始)即 子RDD 的分區的 range 起始位置和 父RDD 的分區的 range 的起始位置 的相對距離。子RDD 的 parttion index 加上這個相對距離就是 對應父的RDD partition。不然是無依賴的父 RDD 的partition index。父子RDD的分區關係是一對一的。RDD 的關係多是一對一(length 是1 ,就是特殊的 OneToOneDependency),也多是多對一,也多是一對多。code
最後,PruneDependency的 getParent 實現以下:blog
1 /** 2 * Represents a dependency between the PartitionPruningRDD and its parent. In this 3 * case, the child RDD contains a subset of partitions of the parents'. 4 */ 5 private[spark] class PruneDependency[T](rdd: RDD[T], partitionFilterFunc: Int => Boolean) 6 extends NarrowDependency[T](rdd) { 7 8 @transient 9 val partitions: Array[Partition] = rdd.partitions 10 .filter(s => partitionFilterFunc(s.index)).zipWithIndex 11 .map { case(split, idx) => new PartitionPruningRDDPartition(idx, split) : Partition } 12 13 override def getParents(partitionId: Int): List[Int] = { 14 List(partitions(partitionId).asInstanceOf[PartitionPruningRDDPartition].parentSplit.index) 15 } 16 }
首先,解釋三個變量: rdd 是指向父RDD 的實例引用;partitionFilterFunc 是一個回調函數,做用是過濾出符合條件的父 RDD 的 partition 集合;PartitionPruningRDDPartition類聲明以下:
private[spark] class PartitionPruningRDDPartition(idx: Int, val parentSplit: Partition) extends Partition { override val index = idx }
partitions的生成過程以下: 先根據父RDD 引用獲取父RDD 對應的 partition集合,而後根據過濾函數和partition index ,過濾出想要的父RDD 的 partition 集合而且從0 開始編號,最後,根據父RDD 的 partition 和 新編號實例化新的PartitionPruningRDDPartition實例,並放入到 partitions 集合中,至關因而先對parent RDD 的分區作Filter 剪枝操做。
在getParent 方法中, 先根據子RDD 的 partition index 獲取 到對應的 parent RDD 的對應分區,而後獲取Partition 的成員函數 index,該index 就是 父RDD 的 partition 在父RDD 的全部分區中的 index。 子RDD partition 和 父RDD partition的關係是 一對一的, 父RDD 和子RDD 的關係是 多對一,也多是一對多,也多是一對一。
簡言之,在窄依賴中,子RDD 的partition 和 父RDD 的 partition 的關係是 一對一的。
下面重點看 ShuffleDependency,ShuffleDependency表明的是 一個 shuffle stage 的輸出。先來看其構造方法,即其依賴的變量或實例:
1 @DeveloperApi 2 class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( 3 @transient private val _rdd: RDD[_ <: Product2[K, V]], 4 val partitioner: Partitioner, 5 val serializer: Serializer = SparkEnv.get.serializer, 6 val keyOrdering: Option[Ordering[K]] = None, 7 val aggregator: Option[Aggregator[K, V, C]] = None, 8 val mapSideCombine: Boolean = false) 9 extends Dependency[Product2[K, V]]
其中,_rdd 代指父RDD實例;partitioner是用於給shuffle的輸出分區的分區器;serializer,主要用於序列化,默認是org.apache.spark.serializer.JavaSerializer,能夠經過`spark.serializer` 參數指定;keyOrdering RDD shuffle的key 的順序。aggregator,map或reduce 端用於RDD shuffle的combine聚合器;mapSideCombine 是否執行部分的聚合(即 map端的預聚合,能夠提升網絡傳輸效率和reduce 端的執行效率),默認是false。由於並非全部的都適合這樣作。好比求全局平均值,均值,平方差等,但像全局最大值,最小值等是適合用mapSideCombine 的。注意,當mapSideCombine 爲 true時, 必須設置combine聚合器,由於 shuffle 前須要使用聚合器作 map-combine 操做。
partitioner 定義了 RDD 裏的key-value 對 是如何按 key 來分區的。映射每個 key 到一個分區 id,從 0 到 分區數 - 1; 注意,分區器必須是肯定性的,即給定同一個 key,必須返回同一個分區,便於任務失敗時,追溯分區數據,確保了每個要參與計算的分區數據的一致性。即 partition 肯定了 shuffle 過程當中 數據是要流向哪一個具體的分區的。
org.apache.spark.Partition的 7 個實現類以下:
咱們先來看Partitioner 的方法定義:
1 abstract class Partitioner extends Serializable { 2 def numPartitions: Int 3 def getPartition(key: Any): Int 4 }
其中,numPartitions 是返回 子RDD 的 partition 數量;getPartition 會根據指定的 key 返回 子RDD 的 partition index。
HashPartitioner 的 getPartition 的 實現以下,思路是 key.hashcode() mod 子RDD的 partition 數量:
1 def getPartition(key: Any): Int = key match { 2 case null => 0 3 case _ => Utils.nonNegativeMod(key.hashCode, numPartitions) 4 }
RangePartitioner 的 getPartition 的實現以下:
1 def getPartition(key: Any): Int = { 2 val k = key.asInstanceOf[K] 3 var partition = 0 4 if (rangeBounds.length <= 128) { // 不大於 128 分區 5 // If we have less than 128 partitions naive search 6 while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) { 7 partition += 1 8 } 9 } else { // 大於 128 個分區數量 10 // Determine which binary search method to use only once. 11 partition = binarySearch(rangeBounds, k) // 二分查找 12 // binarySearch either returns the match location or -[insertion point]-1 13 if (partition < 0) { 14 partition = -partition-1 15 } 16 if (partition > rangeBounds.length) { 17 partition = rangeBounds.length 18 } 19 } 20 if (ascending) { 21 partition 22 } else { 23 rangeBounds.length - partition 24 } 25 }
PythonPartitioner 的 getPartition 以下,跟hash 很類似:
1 override def getPartition(key: Any): Int = key match { 2 case null => 0 3 // we don't trust the Python partition function to return valid partition ID's so 4 // let's do a modulo numPartitions in any case 5 case key: Long => Utils.nonNegativeMod(key.toInt, numPartitions) 6 case _ => Utils.nonNegativeMod(key.hashCode(), numPartitions) 7 }
PartitionIdPassthrough 的 getPartition 以下:
1 override def getPartition(key: Any): Int = key.asInstanceOf[Int]
GridPartitioner 的 getPartition 以下,思想,二元組定位到網格的partition:
1 override val numPartitions: Int = rowPartitions * colPartitions 2 3 /** 4 * Returns the index of the partition the input coordinate belongs to. 5 * 6 * @param key The partition id i (calculated through this method for coordinate (i, j) in 7 * `simulateMultiply`, the coordinate (i, j) or a tuple (i, j, k), where k is 8 * the inner index used in multiplication. k is ignored in computing partitions. 9 * @return The index of the partition, which the coordinate belongs to. 10 */ 11 override def getPartition(key: Any): Int = { 12 key match { 13 case i: Int => i 14 case (i: Int, j: Int) => 15 getPartitionId(i, j) 16 case (i: Int, j: Int, _: Int) => 17 getPartitionId(i, j) 18 case _ => 19 throw new IllegalArgumentException(s"Unrecognized key: $key.") 20 } 21 } 22 23 /** Partitions sub-matrices as blocks with neighboring sub-matrices. */ 24 private def getPartitionId(i: Int, j: Int): Int = { 25 require(0 <= i && i < rows, s"Row index $i out of range [0, $rows).") 26 require(0 <= j && j < cols, s"Column index $j out of range [0, $cols).") 27 i / rowsPerPart + j / colsPerPart * rowPartitions 28 }
包括匿名類,還有好多種,就不一一介紹了。總而言之,寬依賴是根據partitioner 肯定 分區內的數據具體到哪一個分區。
至此,RDD 的窄依賴和寬依賴都介紹清楚了。