RangePartitioner分區執行原理:數組
計算整體的數據抽樣大小sampleSize,計算規則是:至少每一個分區抽取20個數據或者最多1M的數據量。app
RangePartitionerless
class RangePartitioner(partitions,rdd) { // 1. 計算樣本大小 val sampleSize = math.min(20.0 * partitions, 1e6) // 2. 計算樣本最大值 val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt // 3. 進行抽樣,返回總數據量,分區ID和樣本數據 val (numItems, sketched) = RangePartitioner.sketch( rdd.map(_._1), sampleSizePerPartition) // 4. 是否須要二次採樣 val imbalancedPartitions = mutable.Set.empty[Int] // 5. 保存樣本數據的集合buffer:包含數據和權重 val candidates = ArrayBuffer.empty[(K, Float)] if (fraction * n > sampleSizePerPartition) { // 記錄須要從新採樣的RDD的ID imbalancedPartitions += idx }else{ // 5. 計算樣本權重 val weight = ( // 採樣數據的佔比 n.toDouble / sample.length).toFloat for (key <- sample) { // 記錄採樣數據key和權重 candidates += ((key, weight)) } } // 6. 對於數據分佈不均衡的RDD分區,從新數據抽樣 if (imbalancedPartitions.nonEmpty) { // 利用rdd的sample抽樣函數API進行數據抽樣 val reSampled = imbalanced.sample( withReplacement = false, fraction, seed).collect() } // 7. 生成邊界數組 RangePartitioner.determineBounds(candidates, partitions) }
// An array of upper bounds for the first (partitions - 1) partitions private var rangeBounds: Array[K] = { if (partitions <= 1) { Array.empty } else { // This is the sample size we need to have roughly balanced output partitions, capped at 1M. // 給定總的數據抽樣大小,最多1M的數據量(10^6),最少20倍的RDD分區數量,也就是每一個RDD分區至少抽取20條數據 val sampleSize = math.min(20.0 * partitions, 1e6) // Assume the input partitions are roughly balanced and over-sample a little bit. // RDD各分區中的數據量可能會出現傾斜的狀況,乘於3的目的就是保證數據量小的分區可以採樣到足夠的數據,而對於數據量大的分區會進行第二次採樣 val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt // 從rdd中抽樣獲得的數據,返回值:(總數據量, Array[分區id,當前分區的數據量,當前分區抽取的數據]) val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition) if (numItems == 0L) { // 若是總的數據量爲0(RDD爲空),那麼直接返回一個空的數組 Array.empty } else { // If a partition contains much more than the average number of items, we re-sample from it // to ensure that enough items are collected from that partition. // 計算是否須要從新採樣:若是分區包含的數據量遠遠大於平均採樣的數據量則從新進行分區 // 樣本佔比:計算總樣本數量和總記錄數的佔比,佔比最大爲1.0 val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0) // 保存樣本數據的集合buffer:包含數據和權重 val candidates = ArrayBuffer.empty[(K, Float)] // 保存數據分佈不均衡的分區id(數據量超過fraction比率的分區) val imbalancedPartitions = mutable.Set.empty[Int] // 遍歷抽樣數據 sketched.foreach { case (idx, n, sample) => if (fraction * n > sampleSizePerPartition) { // 樣本數量佔比乘以當前RDD的總行數大於預設的每一個RDD最大抽取數量,說明這個RDD的數據量比較大,須要採樣更多的數據:eg: 0.2*100=20<60;0.2*20000=2000>60 // 若是樣本佔比乘以當前分區中的數據量大於以前計算的每一個分區的抽象數據大小,那麼表示當前分區抽取的數據太少了,該分區數據分佈不均衡,須要從新抽取 imbalancedPartitions += idx // 記錄須要從新採樣的RDD的ID } else { // The weight is 1 over the sampling probability. val weight = (n.toDouble / sample.length).toFloat // 採樣數據的佔比,RDD越大,權重越大 for (key <- sample) { candidates += ((key, weight)) } } } // 對於數據分佈不均衡的RDD分區,從新進行數據抽樣 if (imbalancedPartitions.nonEmpty) { // Re-sample imbalanced partitions with the desired sampling probability. val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains) val seed = byteswap32(-rdd.id - 1) // 利用rdd的sample抽樣函數API進行數據抽樣 val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect() val weight = (1.0 / fraction).toFloat candidates ++= reSampled.map(x => (x, weight)) } // 將最終的抽樣數據計算出分區邊界數組返回,邊界數組裏面存放的是RDD裏面數據的key值, // 好比array[0,10,20,30..]代表:key值在0到10的在第一個RDD,key值在10到20的在第二個RDD RangePartitioner.determineBounds(candidates, partitions) } } }
def sketch[K : ClassTag]( rdd: RDD[K], sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = { val shift = rdd.id // val classTagK = classTag[K] // to avoid serializing the entire partitioner object val sketched = rdd.mapPartitionsWithIndex { (idx, iter) => val seed = byteswap32(idx ^ (shift << 16)) /*水塘抽樣:返回抽樣數據和RDD的總數據量*/ val (sample, n) = SamplingUtils.reservoirSampleAndCount( iter, sampleSizePerPartition, seed) Iterator((idx, n, sample)) }.collect() // 計算全部RDD的總數據量 val numItems = sketched.map(_._2).sum (numItems, sketched) }
/** 依據候選中的權重劃分分區,權重值能夠理解爲該Key值所表明的元素數目 返回一個數組,長度爲partitions - 1,第i個元素做爲第i個分區內元素key值的上界 * Determines the bounds for range partitioning from candidates with weights indicating how many * items each represents. Usually this is 1 over the probability used to sample this candidate. * * @param candidates unordered candidates with weights 抽樣數據,包含了每一個樣本的權重 * @param partitions number of partitions 分區數量 * @return selected bounds */ def determineBounds[K : Ordering : ClassTag]( candidates: ArrayBuffer[(K, Float)], partitions: Int): Array[K] = { val ordering = implicitly[Ordering[K]] //依據Key進行排序,升序,因此按區間分區後,各個分區是有序的 val ordered = candidates.sortBy(_._1) // 採樣數據總數 val numCandidates = ordered.size // //計算出權重和 val sumWeights = ordered.map(_._2.toDouble).sum // 計算出步長:權重總數至關於預計數據總量,除以分區數就是每一個分區的數量,獲得的值便是按區間分割的區間步長 val step = sumWeights / partitions var cumWeight = 0.0 // 初始化target值爲區間大小 var target = step val bounds = ArrayBuffer.empty[K] var i = 0 var j = 0 var previousBound = Option.empty[K] // 遍歷採樣數據 while ((i < numCandidates) && (j < partitions - 1)) { val (key, weight) = ordered(i) // 計算採樣數據在當前RDD中的位置,若是大於區間大小則:記錄邊界KEY值 cumWeight += weight if (cumWeight >= target) { // Skip duplicate values. // 相同key值處於相同的Partition中,key值不一樣能夠進行分割 if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) { bounds += key //記錄邊界 target += step j += 1 previousBound = Some(key) } } i += 1 } bounds.toArray }
getPartition函數
// 根據RDD的key值返回對應的分區id。從0開始 def getPartition(key: Any): Int = { // 強制轉換key類型爲RDD中本來的數據類型 val k = key.asInstanceOf[K] var partition = 0 if (rangeBounds.length <= 128) { // If we have less than 128 partitions naive search // 若是分區數據小於等於128個,那麼直接本地循環尋找當前k所屬的分區下標 // ordering.gt(x,y):若是x>y,則返回true while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) { partition += 1 } } else { // 若是分區數量大於128個,那麼使用二分查找方法尋找對應k所屬的下標; // 可是若是k在rangeBounds中沒有出現,實質上返回的是一個負數(範圍)或者是一個超過rangeBounds大小的數(最後一個分區,比全部數據都大) // Determine which binary search method to use only once. partition = binarySearch(rangeBounds, k) // binarySearch either returns the match location or -[insertion point]-1 if (partition < 0) { partition = -partition-1 } if (partition > rangeBounds.length) { partition = rangeBounds.length } } // 根據數據排序是升序仍是降序進行數據的排列,默認爲升序 if (ascending) { partition } else { rangeBounds.length - partition } }