sparkCore源碼解析之partition

job

1. 概念

表示並行計算的一個計算單元html

RDD 內部的數據集合在邏輯上和物理上被劃分紅多個小子集合,這樣的每個子集合咱們將其稱爲分區,分區的個數會決定並行計算的粒度,而每個分區數值的計算都是在一個單獨的任務中進行,所以並行任務的個數,也是由 RDD(其實是一個階段的末 RDD,調度章節會介紹)分區的個數決定的git

2. 獲取分區

RDD的分區數量可經過rdd.getPartitions獲取。 getPartitions方法是在RDD類中定義的,由不一樣的子類進行具體的實現算法

2.1. 接口

獲取分區的定義數組

在RDD類中定義了getPartition方法,返回一個Partition列表,Partition對象只含有一個編碼index字段,不一樣RDD的partition會繼承Partition類,例如JdbcPartition、KafkaRDDPartition,HadoopPartition等。ide

class RDD{
	// 獲取分區定義
	def getPartitions:Array[Partition]
}
 
// Partition類定義
trait Partition {
	def index:Int
}

2.2. 實現

img

transformation類型的RDD分區數量和父RDD的保持一致,Action類型的RDD分區數量,不一樣的數據源默認值不同,獲取的方式也不一樣函數

2.2.1. KafkaRDD

kafkaRDD的partition數量等於compute方法中生成OffsetRange的數量。oop

// DirectKafkaInputDStream類在接收到消息後經過compute方法計算獲得OffsetRange
class OffsetRange( 
	val topic:String, // Kafka topic name
	val partition:Int, // Kafka partition id
	val fromOffset:Long,
	val untilOffset:Long
){...}
 
class KafkaRDD(
		val offsetRages:Array[OffsetRange]
	) extends RDD{
	// 遍歷OffsetRange數組,獲得數組下標和偏移量等信息生成KafkaRDDPartition
	offsetRanges.zipWithIndex.map{
		case(o,i)=>
			new KafkaRDDPartition(
				i,o.topic,o.partition,
				o.fromOffset,o.untilOffset
			)
	}.toArray
}

2.2.2. HadoopRDD

HadoopRDD的分區是基於hadoop的splits方法進行的。每一個partition的大小默認等於hdfs的block的大小this

例如:一個txt文件12800M,則 val rdd1=sc.textFile("/data.txt"); rdd1默認會有12800/128=10個分區。編碼

class HadoopRDD(){
	
	// 生成一個RDD的惟一ID
	val id=Int=sc.newRddId()
 
	def getPartitions:Array[Partition]={
		// 調用hadoop的splits方法進行切割
		val inputSplits=inputFormat.getSplits(
			jobConf,minPartitions
		)
		// 組成spark的partition
		val array=new Array[Partition](inputSplits.size)
		for(i <- 0 until inputSplits.size){
			array(i)=new HadoopPartition(id,i,
				inputSplits(i))
		}
	}
}

hadoop的FileInputFormat類: texfile的分區大小時指定的分區數和block樹中取較大值,因此當指定numPartitions,小於block數時無效,大於則生效spa

2.2.3. JdbcRDD

JDBC的partition劃分是指定開始行和結束行,而後將查詢到的結果分爲3個(默認值)partition。

class JdbcRDD(numPartitions:Int){
	def getPartitions:Array[Partition]={
		(0 until numPartitions).map{
			new JdbcPartition(i,start,end)
		}.toArray
	}
}

2.2.4. MapPartitionsRDD

轉換類的RDD分區數量是由其父類的分區數決定的

// 獲取父RDD列表的第一個RDD
class RDD{
	def firstParent:RDD={
		dependencies.head.rdd.asInstanceOf[RDD]
	}
}
class MapPartitionsRDD(){
	// 獲取父RDD的partitions數量
	def getPartitions: Array[Partition] = 			firstParent[T].partitions
}

3. 分區數量

img

分區數量的原則:儘量的選擇大的分區值

3.1. RDD初始化相關

Spark API partition數量
sc.parallelize(…) sc.defaultParallelism
sc.textFile(…) max(傳參, block數)
sc.newAPIHadoopRDD(…) max(傳參, block數)
new JdbcRDD(…) 傳參

3.2. 通用transformation

  • filter(),map(),flatMap(),distinct():和父RDD相同
  • union: 兩個RDD的和rdd.union(otherRDD):rdd.partitions.size + otherRDD. partitions.size
  • intersection:取較大的rdd.intersection(otherRDD):max(rdd.partitions.size, otherRDD. partitions.size)
  • rdd.subtract(otherRDD) :rdd.partitions.size
  • cartesian:兩個RDD數量的乘積rdd.cartesian(otherRDD): rdd.partitions.size * otherRDD. partitions.size

3.3. Key-based Transformations

reduceByKey(),foldByKey(),combineByKey(), groupByKey(),sortByKey(),mapValues(),flatMapValues() 和父RDD相同

cogroup(), join(), ,leftOuterJoin(), rightOuterJoin(): 全部父RDD按照其partition數降序排列,從partition數最大的RDD開始查找是否存在partitioner,存在則partition數由此partitioner肯定,不然,全部RDD不存在partitioner,由spark.default.parallelism肯定,若還沒設置,最後partition數爲全部RDD中partition數的最大值

4. 分區器

注意:只有Key-Value類型的RDD纔有分區的,非Key-Value類型的RDD分區的值是None的

abstract class Partitioner extends Serializable {
  def numPartitions: Int // 分區數量
  def getPartition(key: Any): Int // 分區編號
}

4.1. 做用

partitioner分區器做用:

  1. 決定Shuffle過程當中Reducer個數(其實是子RDD的分區個數)以及Map端一條數據記錄應該分配給那幾個Reducer
  2. 決定RDD的分區數量,例如執行groupByKey(new HashPartitioner(2))所生成的ShuffledRDD中,分區數目等於2
  3. 決定CoGroupedRDD與父RDD之間的依賴關係

4.2. 種類

img

分區器的選擇:

  1. 若是RDD已經有了分區器,則在已有分區器裏面挑選分區數量最多的一個分區器。
  2. 若是RDD沒有指定分區器,則默認使用HashPartitioner分區器。
  3. 用戶能夠本身聲明RangePartitioner分區器
object Partitioner{
	def defaultPartitioner(rdd):Partitioner={
		val hasPartitioner=
			rdds.filter(
				_.partitioner.exists(_numPartitions>0))
			}
		// 若是RDD已經有分區則選取其分區數最多的
		if(hasPartitioner.nonEmpty){
			hasPartitioner.maxBy(_.partitions.length).
				partitioner.get
		}else{
			if(rdd.context.conf.contains(
				"spark.default.parallelism"
			)){
				// 若是在conf中配置了分區數則用之
				new HashPartitioner(
					rdd.context.defaultParallelism
				)
			}else{
				// 若是沒有配置parallelism則和父RDD中最大的保持一致
				new HashPartitioner(rdds.map(
					_.partitions.length
				).max)
			}
		}
}

4.2.1. HashPartitioner

HashPartitioner分區的原理很簡單,對於給定的key,計算其hashCode,併除於分區的個數取餘,若是餘數小於0,則用餘數+分區的個數,最後返回的值就是這個key所屬的分區ID

class HashPartitioner(partitions:Int) {
	def getPartition(key:Any):Int=key match{
		case null=> 0
		case _=> nonNegativeMod(key.hashCode, numPartitions)
	}
  def nonNegativeMod(x: Int, mod: Int): Int = {
    val rawMod = x % mod
    rawMod + (if (rawMod < 0) mod else 0)
  }	
	// 判斷兩個RDD分區方式是否同樣
	def equals(other:Any):Boolean= other match{
		case h:HashPartitioner => 
			h.numPartitions==numPartitions
		case  _ => false
	}
}

4.2.2. RangePartitioner

img

HashPartitioner分區可能致使每一個分區中數據量的不均勻。而RangePartitioner分區則儘可能保證每一個分區中數據量的均勻,將必定範圍內的數映射到某一個分區內。分區與分區之間數據是有序的,但分區內的元素是不能保證順序的。

  RangePartitioner分區執行原理:

  1. 計算整體的數據抽樣大小sampleSize,計算規則是:至少每一個分區抽取20個數據或者最多1M的數據量。
  2. 根據sampleSize和分區數量計算每一個分區的數據抽樣樣本數量最大值sampleSizePrePartition
  3. 根據以上兩個值進行水塘抽樣,返回RDD的總數據量,分區ID和每一個分區的採樣數據。
  4. 計算出數據量較大的分區經過RDD.sample進行從新抽樣。
  5. 經過抽樣數組 candidates: ArrayBuffer[(K, wiegth)]計算出分區邊界的數組BoundsArray
  6. 在取數據時,若是分區數小於128則直接獲取,若是大於128則經過二分法,獲取當前Key屬於那個區間,返回對應的BoundsArray下標即爲partitionsID

一句話歸納:就是遍歷每一個paritiion,對裏面的數據進行抽樣,把抽樣的數據進行排序,並按照對應的權重肯定邊界

4.2.2.1. 獲取區間數組

img

4.2.2.1.1. 給定樣本總數

給定總的數據抽樣大小,最多1M的數據量(10^6),最少20倍的RDD分區數量,也就是每一個RDD分區至少抽取20條數據 ​

class RangePartitioner(partitions,rdd) {
// 1. 計算樣本大小
 val sampleSize =math.min(20.0 * partitions, 1e6)
}
4.2.2.1.2. 計算樣本最大值

RDD各分區中的數據量可能會出現傾斜的狀況,乘於3的目的就是保證數據量小的分區可以採樣到足夠的數據,而對於數據量大的分區會進行第二次採樣

class RangePartitioner(partitions,rdd) {
​	
// 1. 計算樣本大小
 val sampleSize = 
​			math.min(20.0 * partitions, 1e6)
 
// 2. 計算樣本最大值
val sampleSizePerPartition = 
​	math.ceil(
​		3.0 * sampleSize / rdd.partitions.length
​	).toInt
}
4.2.2.1.3. 水塘抽樣

根據以上兩個值進行水塘抽樣,返回RDD的總數據量,分區ID和每一個分區的採樣數據。其中總數據量經過遍歷RDD全部partition的key累加獲得的,不是經過rdd.count計算獲得的

class RangePartitioner(partitions,rdd) {
	
// 1. 計算樣本大小
 val sampleSize =math.min(20.0 * partitions, 1e6)
 
// 2. 計算樣本最大值
val sampleSizePerPartition = 
	math.ceil(
		3.0 * sampleSize / rdd.partitions.length
	).toInt
}
// 3. 進行抽樣,返回總數據量,分區ID和樣本數據
val (numItems, sketched) = RangePartitioner.sketch(
		rdd.map(_._1), sampleSizePerPartition)
4.2.2.1.4. 是否須要二次採樣

若是有較大RDD存在,則按照平均值去採樣的話數據量太少,容易形成數據傾斜,因此須要進行二次採樣

判斷是否須要從新採樣方法: 樣本數量佔比乘以當前RDD的總行數大於預設的每一個RDD最大抽取數量,說明這個RDD的數據量比較大,須要採樣更多的數據:eg: 0.2100=20<60;0.220000=2000>60

class RangePartitioner(partitions,rdd) {
	
// 1. 計算樣本大小
 val sampleSize =math.min(20.0 * partitions, 1e6)
 
// 2. 計算樣本最大值
val sampleSizePerPartition = 
	math.ceil(
		3.0 * sampleSize / rdd.partitions.length
	).toInt
}
// 3. 進行抽樣,返回總數據量,分區ID和樣本數據
val (numItems, sketched) = RangePartitioner.sketch(
		rdd.map(_._1), sampleSizePerPartition)
 
// 4. 是否須要二次採樣
val imbalancedPartitions = 	mutable.Set.empty[Int]
 if (fraction * n > sampleSizePerPartition) {
	// 記錄須要從新採樣的RDD的ID
	imbalancedPartitions += idx 
}
4.2.2.1.5. 計算樣本權重

計算每一個採樣數據的權重佔比,根據採樣數據的ID和權重生成出RDD分區邊界數組

權重計算方法:總數據量/當前RDD的採樣數據量

class RangePartitioner(partitions,rdd) {
​	
// 1. 計算樣本大小
 val sampleSize = 
​			math.min(20.0 * partitions, 1e6)
 
// 2. 計算樣本最大值
val sampleSizePerPartition = 
​	math.ceil(
​		3.0 * sampleSize / rdd.partitions.length
​	).toInt
}
// 3. 進行抽樣,返回總數據量,分區ID和樣本數據
val (numItems, sketched) = 		RangePartitioner.sketch(
​		rdd.map(_._1), sampleSizePerPartition)
 
// 4. 是否須要二次採樣
val imbalancedPartitions = 	mutable.Set.empty[Int]
 
//  5. 保存樣本數據的集合buffer:包含數據和權重
val candidates = ArrayBuffer.empty[(K, Float)]
 
 if (fraction * n > sampleSizePerPartition) {
​	// 記錄須要從新採樣的RDD的ID
​	imbalancedPartitions += idx 
}else{
// 5. 計算樣本權重
​	val weight = (
​	  // 採樣數據的佔比
​		n.toDouble / sample.length).toFloat 
​            for (key <- sample) {
​			// 記錄採樣數據key和權重
​              candidates += ((key, weight))
​            }
​	}
}
4.2.2.1.6. 二次抽樣

對於數據分佈不均衡的RDD分區,從新進行二次抽樣。 二次抽樣採用的是RDD的採樣方法:RDD.sample

class RangePartitioner(partitions,rdd) {
​	
// 1. 計算樣本大小
 val sampleSize = 
​			math.min(20.0 * partitions, 1e6)
 
// 2. 計算樣本最大值
val sampleSizePerPartition = 
​	math.ceil(
​		3.0 * sampleSize / rdd.partitions.length
​	).toInt
}
// 3. 進行抽樣,返回總數據量,分區ID和樣本數據
val (numItems, sketched) = 		RangePartitioner.sketch(
​		rdd.map(_._1), sampleSizePerPartition)
 
// 4. 是否須要二次採樣
val imbalancedPartitions = 	mutable.Set.empty[Int]
 
//  5. 保存樣本數據的集合buffer:包含數據和權重
val candidates = ArrayBuffer.empty[(K, Float)]
 
 if (fraction * n > sampleSizePerPartition) {
​	// 記錄須要從新採樣的RDD的ID
​	imbalancedPartitions += idx 
}else{
// 5. 計算樣本權重
​	val weight = (
​	  // 採樣數據的佔比
​		n.toDouble / sample.length).toFloat 
​            for (key <- sample) {
​			// 記錄採樣數據key和權重
​              candidates += ((key, weight))
​            }
​	}
// 6. 對於數據分佈不均衡的RDD分區,從新數據抽樣
if (imbalancedPartitions.nonEmpty) {
​	// 利用rdd的sample抽樣函數API進行數據抽樣
​          val reSampled = imbalanced.sample(
​				withReplacement = 
​					false, fraction, seed).collect()
}
 
}
4.2.2.1.7. 生成邊界數組

將最終的抽樣數據計算出分區邊界數組返回,邊界數組裏面存放的是RDD裏面數據的key值, 好比最終返回的數組是:array[0,10,20,30..] 其中0,10,20,30是採樣數據中的key值,對於每一條數據都會判斷其在此數組的那個區間中間,例若有一條數據key值是3則其在0到10之間,屬於第一個分區,同理Key值爲15的數據在第二個分區

class RangePartitioner(partitions,rdd) {
​	
// 1. 計算樣本大小
 val sampleSize = 
​			math.min(20.0 * partitions, 1e6)
 
// 2. 計算樣本最大值
val sampleSizePerPartition = 
​	math.ceil(
​		3.0 * sampleSize / rdd.partitions.length
​	).toInt
}
// 3. 進行抽樣,返回總數據量,分區ID和樣本數據
val (numItems, sketched) = 		RangePartitioner.sketch(
​		rdd.map(_._1), sampleSizePerPartition)
 
// 4. 是否須要二次採樣
val imbalancedPartitions = 	mutable.Set.empty[Int]
 
//  5. 保存樣本數據的集合buffer:包含數據和權重
val candidates = ArrayBuffer.empty[(K, Float)]
 
 if (fraction * n > sampleSizePerPartition) {
​	// 記錄須要從新採樣的RDD的ID
​	imbalancedPartitions += idx 
}else{
// 5. 計算樣本權重
​	val weight = (
​	  // 採樣數據的佔比
​		n.toDouble / sample.length).toFloat 
​            for (key <- sample) {
​			// 記錄採樣數據key和權重
​              candidates += ((key, weight))
​            }
​	}
// 6. 對於數據分佈不均衡的RDD分區,從新數據抽樣
if (imbalancedPartitions.nonEmpty) {
​	// 利用rdd的sample抽樣函數API進行數據抽樣
​          val reSampled = imbalanced.sample(
​				withReplacement = 
​					false, fraction, seed).collect()
}
// 7. 生成邊界數組
RangePartitioner.determineBounds(
​	candidates, partitions)
}

4.2.2.2. 水塘抽樣算法

水塘抽樣概念: 它是一系列的隨機算法,其目的在於從包含n個項目的集合S中選取k個樣本,使得每條數據抽中的機率是k/n。其中n爲一很大或未知的數量,尤爲適用於不能把全部n個項目都存放到主內存的狀況

咱們能夠:定義取出的行號爲choice,第一次直接以第一行做爲取出行 choice ,然後第二次以二分之一律率決定是否用第二行替換 choice ,第三次以三分之一的機率決定是否以第三行替換 choice ……,以此類推。由上面的分析咱們能夠得出結論,在取第n個數據的時候,咱們生成一個0到1的隨機數p,若是p小於1/n,保留第n個數。大於1/n,繼續保留前面的數。直到數據流結束,返回此數,算法結束。

詳見: https://www.iteblog.com/archives/1525.html https://my.oschina.net/freelili/blog/2987667

實現:

  1. 獲取到須要抽樣RDD分區的樣本大小k和分區的全部KEY數組input
  2. 初始化抽樣結果集reservoir爲分區前K個KEY值
  3. 若是分區的總數小於預計樣本大小k,則將當前分區的全部數據做爲樣本數據,不然到第四步
  4. 遍歷分區裏全部Key組成的數組input
  5. 生成隨機須要替換input數組的下標,若是下標小於K則替換
  6. 返回抽取的key值數組和當前分區的總數據量: (reservoir, l)

難點:

// 計算出須要替換的數組下標
// 選取第n個數的機率是:n/l; 若是隨機替換數組值的機率是p=rand.nextDouble,
// 則若是p<k/l;則替換池中任意一個數,即: p*l < k 則進行替換,用p*l做爲隨機替換的下標
 val replacementIndex = (rand.nextDouble() * l).toLong
if (replacementIndex < k) {
// 替換reservoir[隨機抽取的下標]的值爲input[l]的值item
          reservoir(replacementIndex.toInt) = item
 }

4.2.2.3. 定位分區ID

img

若是分區邊界數組的大小小於或等於128的時候直接變量數組,不然採用二分查找法肯定key屬於某個分區。

4.2.2.3.1. 數組直接獲取

遍歷數組,判斷當前key值是否屬於當前區間

// 根據RDD的key值返回對應的分區id。從0開始
def getPartition(key: Any): Int = {
​    // 強制轉換key類型爲RDD中本來的數據類型
​    val k = key.asInstanceOf[K]
​    var partition = 0
​    if (rangeBounds.length <= 128) {
​      // 若是分區數據小於等於128個,那麼直接本地循環尋找當前k所屬的分區下標
​      // ordering.gt(x,y):若是x>y,則返回true
​      while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
​        partition += 1
​      }
4.2.2.3.2. 二分法查找

對於分區數大於128的狀況,採樣二分法查找

// 根據RDD的key值返回對應的分區id。從0開始
 def getPartition(key: Any): Int = {
// 若是分區數量大於128個,那麼使用二分查找方法尋找對應k所屬的下標;
​      // 可是若是k在rangeBounds中沒有出現,實質上返回的是一個負數(範圍)或者是一個超過rangeBounds大小的數(最後一個分區,比全部數據都大)
​      // Determine which binary search method to use only once.
​      partition = binarySearch(rangeBounds, k)
​      // binarySearch either returns the match location or -[insertion point]-1
​      if (partition < 0) {
​        partition = -partition-1
​      }
​      if (partition > rangeBounds.length) {
​        partition = rangeBounds.length
​      }

5. 自定義分區器

自定義:

  1. 繼承Partitioner方法,
  2. 重寫getPartition、numPartitions、equals等方法。
public class MyPartioner extends Partitioner { 
    @Override 
    public int numPartitions() { 
        return 1000; 
    } 

    @Override 
    public int getPartition(Object key) { 
        String k = (String) key; 
        int code = k.hashCode() % 1000; 
        System.out.println(k+":"+code); 
        return  code < 0?code+1000:code; 
    } 

    @Override 
    public boolean equals(Object obj) { 
        if(obj instanceof MyPartioner){ 
            if(this.numPartitions()==((MyPartioner) obj).numPartitions()){ 
                return true; 
            } 
            return false; 
        } 
        return super.equals(obj); 
    } 
}

調用:pairRdd.groupbykey(new MyPartitioner())

參考連接:https://ihainan.gitbooks.io/spark-source-code/content/section1/rddPartitions.html

sparkCore源碼解析系列:

  1. sparkCore源碼解析之block
  2. sparkCore源碼解析之partition
  3. sparkCore源碼解析之Job
  4. sparkCore源碼解析之shuffle
  5. sparkCore源碼解析之完整腦圖地址
相關文章
相關標籤/搜索