sparkCore源碼解析之RangePartitioner

img

HashPartitioner分區可能致使每一個分區中數據量的不均勻。而RangePartitioner分區則儘可能保證每一個分區中數據量的均勻,將必定範圍內的數映射到某一個分區內。分區與分區之間數據是有序的,但分區內的元素是不能保證順序的。html

  RangePartitioner分區執行原理:算法

  1. 計算整體的數據抽樣大小sampleSize,計算規則是:至少每一個分區抽取20個數據或者最多1M的數據量。數組

  2. 根據sampleSize和分區數量計算每一個分區的數據抽樣樣本數量最大值sampleSizePrePartition函數

  3. 根據以上兩個值進行水塘抽樣,返回RDD的總數據量,分區ID和每一個分區的採樣數據。.net

  4. 計算出數據量較大的分區經過RDD.sample進行從新抽樣。scala

  5. 經過抽樣數組 candidates: ArrayBuffer[(K, wiegth)]計算出分區邊界的數組BoundsArray3d

  6. 在取數據時,若是分區數小於128則直接獲取,若是大於128則經過二分法,獲取當前Key屬於那個區間,返回對應的BoundsArray下標即爲partitionsIDcode

1. 獲取區間數組

1.1. 給定樣本總數

給定總的數據抽樣大小,最多1M的數據量(10^6),最少20倍的RDD分區數量,也就是每一個RDD分區至少抽取20條數據htm

class RangePartitioner(partitions,rdd) {

// 1. 計算樣本大小
 val sampleSize =  math.min(20.0 * partitions, 1e6) 
}

1.2. 計算樣本最大值

RDD各分區中的數據量可能會出現傾斜的狀況,乘於3的目的就是保證數據量小的分區可以採樣到足夠的數據,而對於數據量大的分區會進行第二次採樣blog

class RangePartitioner(partitions,rdd) {
	

// 1. 計算樣本大小
 val sampleSize =  math.min(20.0 * partitions, 1e6)
// 2. 計算樣本最大值
val sampleSizePerPartition = 
	math.ceil(

		3.0 * sampleSize / rdd.partitions.length

	).toInt

}

1.3. 水塘抽樣

根據以上兩個值進行水塘抽樣,返回RDD的總數據量,分區ID和每一個分區的採樣數據。其中總數據量是估計值,不是經過rdd.count計算獲得的

class RangePartitioner(partitions,rdd) {
// 1. 計算樣本大小
 val sampleSize =  math.min(20.0 * partitions, 1e6)
// 2. 計算樣本最大值
val sampleSizePerPartition = 
	math.ceil(

		3.0 * sampleSize / rdd.partitions.length

	).toInt

    
// 3. 進行抽樣,返回總數據量,分區ID和樣本數據
val (numItems, sketched) = RangePartitioner.sketch(
    rdd.map(_._1), sampleSizePerPartition)

}

1.4. 是否須要二次採樣

若是有較大RDD存在,則按照平均值去採樣的話數據量太少,容易形成數據傾斜,因此須要進行二次採樣

判斷是否須要從新採樣方法:

樣本數量佔比乘以當前RDD的總行數大於預設的每一個RDD最大抽取數量,說明這個RDD的數據量比較大,須要採樣更多的數據:eg: 0.2100=20<60;0.220000=2000>60

class RangePartitioner(partitions,rdd) {
// 1. 計算樣本大小
 val sampleSize =  math.min(20.0 * partitions, 1e6)
// 2. 計算樣本最大值
val sampleSizePerPartition = 
	math.ceil(

		3.0 * sampleSize / rdd.partitions.length

	).toInt  
// 3. 進行抽樣,返回總數據量,分區ID和樣本數據
val (numItems, sketched) = RangePartitioner.sketch(
    rdd.map(_._1), sampleSizePerPartition)
    
    
// 4. 是否須要二次採樣
val imbalancedPartitions = 	mutable.Set.empty[Int]
 if (fraction * n > sampleSizePerPartition) {
	// 記錄須要從新採樣的RDD的ID
	imbalancedPartitions += idx 

}

1.5. 計算樣本權重

計算每一個採樣數據的權重佔比,根據採樣數據的ID和權重生成出RDD分區邊界數組

權重計算方法:總數據量/當前RDD的採樣數據量

class RangePartitioner(partitions,rdd) {
// 1. 計算樣本大小
 val sampleSize =  math.min(20.0 * partitions, 1e6)
// 2. 計算樣本最大值
val sampleSizePerPartition = 
	math.ceil(

		3.0 * sampleSize / rdd.partitions.length

	).toInt  
// 3. 進行抽樣,返回總數據量,分區ID和樣本數據
val (numItems, sketched) = RangePartitioner.sketch(
    rdd.map(_._1), sampleSizePerPartition) 
// 4. 是否須要二次採樣
val imbalancedPartitions = 	mutable.Set.empty[Int]
 if (fraction * n > sampleSizePerPartition) {
	// 記錄須要從新採樣的RDD的ID
	imbalancedPartitions += idx

}else{

     
// 5. 計算樣本權重
	val weight = (
	  // 採樣數據的佔比
		n.toDouble / sample.length).toFloat 
            for (key <- sample) {
			// 記錄採樣數據key和權重
              candidates += ((key, weight))
            }
	}
}

1.6. 二次抽樣

對於數據分佈不均衡的RDD分區,從新進行二次抽樣。

二次抽樣採用的是RDD的採樣方法:RDD.sample

class RangePartitioner(partitions,rdd) {
// 1. 計算樣本大小
 val sampleSize =  math.min(20.0 * partitions, 1e6)
// 2. 計算樣本最大值
val sampleSizePerPartition = 
	math.ceil(

		3.0 * sampleSize / rdd.partitions.length

	).toInt  
// 3. 進行抽樣,返回總數據量,分區ID和樣本數據
val (numItems, sketched) = RangePartitioner.sketch(
    rdd.map(_._1), sampleSizePerPartition) 
// 4. 是否須要二次採樣
val imbalancedPartitions = 	mutable.Set.empty[Int]
 if (fraction * n > sampleSizePerPartition) {
	// 記錄須要從新採樣的RDD的ID
	imbalancedPartitions += idx

}else{  
// 5. 計算樣本權重
	val weight = (
	  // 採樣數據的佔比
		n.toDouble / sample.length).toFloat 
            for (key <- sample) {
			// 記錄採樣數據key和權重
              candidates += ((key, weight))
            }
	}

    
// 6. 對於數據分佈不均衡的RDD分區,從新數據抽樣
if (imbalancedPartitions.nonEmpty) {
	// 利用rdd的sample抽樣函數API進行數據抽樣
   val reSampled = imbalanced.sample(
    withReplacement = false, fraction, seed).collect()
}

1.7. 生成邊界數組

將最終的抽樣數據計算出分區邊界數組返回,邊界數組裏面存放的是RDD裏面數據的key值,

好比最終返回的數組是:array[0,10,20,30..]

其中0,10,20,30是採樣數據中的key值,對於每一條數據都會判斷其在此數組的那個區間中間,例若有一條數據key值是3則其在0到10之間,屬於第一個分區,同理Key值爲15的數據在第二個分區

class RangePartitioner(partitions,rdd) {
// 1. 計算樣本大小
 val sampleSize =  math.min(20.0 * partitions, 1e6)
// 2. 計算樣本最大值
val sampleSizePerPartition = 
	math.ceil(

		3.0 * sampleSize / rdd.partitions.length

	).toInt  
// 3. 進行抽樣,返回總數據量,分區ID和樣本數據
val (numItems, sketched) = RangePartitioner.sketch(
    rdd.map(_._1), sampleSizePerPartition) 
// 4. 是否須要二次採樣
val imbalancedPartitions = 	mutable.Set.empty[Int]
 if (fraction * n > sampleSizePerPartition) {
	// 記錄須要從新採樣的RDD的ID
	imbalancedPartitions += idx

}else{  
// 5. 計算樣本權重
	val weight = (
	  // 採樣數據的佔比
		n.toDouble / sample.length).toFloat 
            for (key <- sample) {
			// 記錄採樣數據key和權重
              candidates += ((key, weight))
            }
	}
// 6. 對於數據分佈不均衡的RDD分區,從新數據抽樣
if (imbalancedPartitions.nonEmpty) {
	// 利用rdd的sample抽樣函數API進行數據抽樣
   val reSampled = imbalanced.sample(
    withReplacement = false, fraction, seed).collect()
}

    
// 7. 生成邊界數組
RangePartitioner.determineBounds(candidates, partitions)

}

2. 水塘抽樣算法

水塘抽樣概念:

它是一系列的隨機算法,其目的在於從包含n個項目的集合S中選取k個樣本,使得每條數據抽中的機率是k/n。其中n爲一很大或未知的數量,尤爲適用於不能把全部n個項目都存放到主內存的狀況

咱們能夠:定義取出的行號爲choice,第一次直接以第一行做爲取出行 choice ,然後第二次以二分之一律率決定是否用第二行替換 choice ,第三次以三分之一的機率決定是否以第三行替換 choice ……,以此類推。由上面的分析咱們能夠得出結論,在取第n個數據的時候,咱們生成一個0到1的隨機數p,若是p小於1/n,保留第n個數。大於1/n,繼續保留前面的數。直到數據流結束,返回此數,算法結束。

詳見:https://www.iteblog.com/archives/1525.html

3. 定位分區ID

若是分區邊界數組的大小小於或等於128的時候直接變量數組,不然採用二分查找法肯定key屬於某個分區。

3.1. 數組直接獲取

遍歷數組,判斷當前key值是否屬於當前區間

// 根據RDD的key值返回對應的分區id。從0開始

def getPartition(key: Any): Int = {

​    // 強制轉換key類型爲RDD中本來的數據類型

​    val k = key.asInstanceOf[K]

​    var partition = 0

​    if (rangeBounds.length <= 128) {

​      // 若是分區數據小於等於128個,那麼直接本地循環尋找當前k所屬的分區下標

​      // ordering.gt(x,y):若是x>y,則返回true

​      while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {

​        partition += 1

​      }

3.2. 二分法查找

對於分區數大於128的狀況,採樣二分法查找

// 根據RDD的key值返回對應的分區id。從0開始

 def getPartition(key: Any): Int = {

// 若是分區數量大於128個,那麼使用二分查找方法尋找對應k所屬的下標;

     // 可是若是k在rangeBounds中沒有出現,實質上返回的是一個負數(範圍)或者是一個超過rangeBounds大小的數(最後一個分區,比全部數據都大)

     // Determine which binary search method to use only once.

     partition = binarySearch(rangeBounds, k)

     // binarySearch either returns the match location or -[insertion point]-1

     if (partition < 0) {

       partition = -partition-1

     }

     if (partition > rangeBounds.length) {

       partition = rangeBounds.length

     }

查看完整源碼...

相關文章
相關標籤/搜索