表示並行計算的一個計算單元html
RDD 內部的數據集合在邏輯上和物理上被劃分紅多個小子集合,這樣的每個子集合咱們將其稱爲分區,分區的個數會決定並行計算的粒度,而每個分區數值的計算都是在一個單獨的任務中進行,所以並行任務的個數,也是由 RDD(其實是一個階段的末 RDD,調度章節會介紹)分區的個數決定的git
RDD的分區數量可經過rdd.getPartitions獲取。 getPartitions方法是在RDD類中定義的,由不一樣的子類進行具體的實現算法
獲取分區的定義數組
在RDD類中定義了getPartition方法,返回一個Partition列表,Partition對象只含有一個編碼index字段,不一樣RDD的partition會繼承Partition類,例如JdbcPartition、KafkaRDDPartition,HadoopPartition等。ide
class RDD{ // 獲取分區定義 def getPartitions:Array[Partition] } // Partition類定義 trait Partition { def index:Int }
transformation類型的RDD分區數量和父RDD的保持一致,Action類型的RDD分區數量,不一樣的數據源默認值不同,獲取的方式也不一樣函數
kafkaRDD的partition數量等於compute方法中生成OffsetRange的數量。oop
// DirectKafkaInputDStream類在接收到消息後經過compute方法計算獲得OffsetRange class OffsetRange( val topic:String, // Kafka topic name val partition:Int, // Kafka partition id val fromOffset:Long, val untilOffset:Long ){...} class KafkaRDD( val offsetRages:Array[OffsetRange] ) extends RDD{ // 遍歷OffsetRange數組,獲得數組下標和偏移量等信息生成KafkaRDDPartition offsetRanges.zipWithIndex.map{ case(o,i)=> new KafkaRDDPartition( i,o.topic,o.partition, o.fromOffset,o.untilOffset ) }.toArray }
HadoopRDD的分區是基於hadoop的splits方法進行的。每一個partition的大小默認等於hdfs的block的大小this
例如:一個txt文件12800M,則 val rdd1=sc.textFile("/data.txt");
rdd1默認會有12800/128=10個分區。編碼
class HadoopRDD(){ // 生成一個RDD的惟一ID val id=Int=sc.newRddId() def getPartitions:Array[Partition]={ // 調用hadoop的splits方法進行切割 val inputSplits=inputFormat.getSplits( jobConf,minPartitions ) // 組成spark的partition val array=new Array[Partition](inputSplits.size) for(i <- 0 until inputSplits.size){ array(i)=new HadoopPartition(id,i, inputSplits(i)) } } }
hadoop的FileInputFormat類: texfile的分區大小時指定的分區數和block樹中取較大值,因此當指定numPartitions,小於block數時無效,大於則生效spa
JDBC的partition劃分是指定開始行和結束行,而後將查詢到的結果分爲3個(默認值)partition。
class JdbcRDD(numPartitions:Int){ def getPartitions:Array[Partition]={ (0 until numPartitions).map{ new JdbcPartition(i,start,end) }.toArray } }
轉換類的RDD分區數量是由其父類的分區數決定的
// 獲取父RDD列表的第一個RDD class RDD{ def firstParent:RDD={ dependencies.head.rdd.asInstanceOf[RDD] } } class MapPartitionsRDD(){ // 獲取父RDD的partitions數量 def getPartitions: Array[Partition] = firstParent[T].partitions }
分區數量的原則:儘量的選擇大的分區值
Spark API | partition數量 |
---|---|
sc.parallelize(…) | sc.defaultParallelism |
sc.textFile(…) | max(傳參, block數) |
sc.newAPIHadoopRDD(…) | max(傳參, block數) |
new JdbcRDD(…) | 傳參 |
reduceByKey(),foldByKey(),combineByKey(), groupByKey(),sortByKey(),mapValues(),flatMapValues() 和父RDD相同
cogroup(), join(), ,leftOuterJoin(), rightOuterJoin(): 全部父RDD按照其partition數降序排列,從partition數最大的RDD開始查找是否存在partitioner,存在則partition數由此partitioner肯定,不然,全部RDD不存在partitioner,由spark.default.parallelism肯定,若還沒設置,最後partition數爲全部RDD中partition數的最大值
注意:只有Key-Value類型的RDD纔有分區的,非Key-Value類型的RDD分區的值是None的
abstract class Partitioner extends Serializable { def numPartitions: Int // 分區數量 def getPartition(key: Any): Int // 分區編號 }
partitioner分區器做用:
分區器的選擇:
object Partitioner{ def defaultPartitioner(rdd):Partitioner={ val hasPartitioner= rdds.filter( _.partitioner.exists(_numPartitions>0)) } // 若是RDD已經有分區則選取其分區數最多的 if(hasPartitioner.nonEmpty){ hasPartitioner.maxBy(_.partitions.length). partitioner.get }else{ if(rdd.context.conf.contains( "spark.default.parallelism" )){ // 若是在conf中配置了分區數則用之 new HashPartitioner( rdd.context.defaultParallelism ) }else{ // 若是沒有配置parallelism則和父RDD中最大的保持一致 new HashPartitioner(rdds.map( _.partitions.length ).max) } } }
HashPartitioner分區的原理很簡單,對於給定的key,計算其hashCode,併除於分區的個數取餘,若是餘數小於0,則用餘數+分區的個數,最後返回的值就是這個key所屬的分區ID
class HashPartitioner(partitions:Int) { def getPartition(key:Any):Int=key match{ case null=> 0 case _=> nonNegativeMod(key.hashCode, numPartitions) } def nonNegativeMod(x: Int, mod: Int): Int = { val rawMod = x % mod rawMod + (if (rawMod < 0) mod else 0) } // 判斷兩個RDD分區方式是否同樣 def equals(other:Any):Boolean= other match{ case h:HashPartitioner => h.numPartitions==numPartitions case _ => false } }
HashPartitioner分區可能致使每一個分區中數據量的不均勻。而RangePartitioner分區則儘可能保證每一個分區中數據量的均勻,將必定範圍內的數映射到某一個分區內。分區與分區之間數據是有序的,但分區內的元素是不能保證順序的。
RangePartitioner分區執行原理:
一句話歸納:就是遍歷每一個paritiion,對裏面的數據進行抽樣,把抽樣的數據進行排序,並按照對應的權重肯定邊界
給定總的數據抽樣大小,最多1M的數據量(10^6),最少20倍的RDD分區數量,也就是每一個RDD分區至少抽取20條數據
class RangePartitioner(partitions,rdd) { // 1. 計算樣本大小 val sampleSize =math.min(20.0 * partitions, 1e6) }
RDD各分區中的數據量可能會出現傾斜的狀況,乘於3的目的就是保證數據量小的分區可以採樣到足夠的數據,而對於數據量大的分區會進行第二次採樣
class RangePartitioner(partitions,rdd) { // 1. 計算樣本大小 val sampleSize = math.min(20.0 * partitions, 1e6) // 2. 計算樣本最大值 val sampleSizePerPartition = math.ceil( 3.0 * sampleSize / rdd.partitions.length ).toInt }
根據以上兩個值進行水塘抽樣,返回RDD的總數據量,分區ID和每一個分區的採樣數據。其中總數據量經過遍歷RDD全部partition的key累加獲得的,不是經過rdd.count計算獲得的
class RangePartitioner(partitions,rdd) { // 1. 計算樣本大小 val sampleSize =math.min(20.0 * partitions, 1e6) // 2. 計算樣本最大值 val sampleSizePerPartition = math.ceil( 3.0 * sampleSize / rdd.partitions.length ).toInt } // 3. 進行抽樣,返回總數據量,分區ID和樣本數據 val (numItems, sketched) = RangePartitioner.sketch( rdd.map(_._1), sampleSizePerPartition)
若是有較大RDD存在,則按照平均值去採樣的話數據量太少,容易形成數據傾斜,因此須要進行二次採樣
判斷是否須要從新採樣方法: 樣本數量佔比乘以當前RDD的總行數大於預設的每一個RDD最大抽取數量,說明這個RDD的數據量比較大,須要採樣更多的數據:eg: 0.2100=20<60;0.220000=2000>60
class RangePartitioner(partitions,rdd) { // 1. 計算樣本大小 val sampleSize =math.min(20.0 * partitions, 1e6) // 2. 計算樣本最大值 val sampleSizePerPartition = math.ceil( 3.0 * sampleSize / rdd.partitions.length ).toInt } // 3. 進行抽樣,返回總數據量,分區ID和樣本數據 val (numItems, sketched) = RangePartitioner.sketch( rdd.map(_._1), sampleSizePerPartition) // 4. 是否須要二次採樣 val imbalancedPartitions = mutable.Set.empty[Int] if (fraction * n > sampleSizePerPartition) { // 記錄須要從新採樣的RDD的ID imbalancedPartitions += idx }
計算每一個採樣數據的權重佔比,根據採樣數據的ID和權重生成出RDD分區邊界數組
權重計算方法:總數據量/當前RDD的採樣數據量
class RangePartitioner(partitions,rdd) { // 1. 計算樣本大小 val sampleSize = math.min(20.0 * partitions, 1e6) // 2. 計算樣本最大值 val sampleSizePerPartition = math.ceil( 3.0 * sampleSize / rdd.partitions.length ).toInt } // 3. 進行抽樣,返回總數據量,分區ID和樣本數據 val (numItems, sketched) = RangePartitioner.sketch( rdd.map(_._1), sampleSizePerPartition) // 4. 是否須要二次採樣 val imbalancedPartitions = mutable.Set.empty[Int] // 5. 保存樣本數據的集合buffer:包含數據和權重 val candidates = ArrayBuffer.empty[(K, Float)] if (fraction * n > sampleSizePerPartition) { // 記錄須要從新採樣的RDD的ID imbalancedPartitions += idx }else{ // 5. 計算樣本權重 val weight = ( // 採樣數據的佔比 n.toDouble / sample.length).toFloat for (key <- sample) { // 記錄採樣數據key和權重 candidates += ((key, weight)) } } }
對於數據分佈不均衡的RDD分區,從新進行二次抽樣。 二次抽樣採用的是RDD的採樣方法:RDD.sample
class RangePartitioner(partitions,rdd) { // 1. 計算樣本大小 val sampleSize = math.min(20.0 * partitions, 1e6) // 2. 計算樣本最大值 val sampleSizePerPartition = math.ceil( 3.0 * sampleSize / rdd.partitions.length ).toInt } // 3. 進行抽樣,返回總數據量,分區ID和樣本數據 val (numItems, sketched) = RangePartitioner.sketch( rdd.map(_._1), sampleSizePerPartition) // 4. 是否須要二次採樣 val imbalancedPartitions = mutable.Set.empty[Int] // 5. 保存樣本數據的集合buffer:包含數據和權重 val candidates = ArrayBuffer.empty[(K, Float)] if (fraction * n > sampleSizePerPartition) { // 記錄須要從新採樣的RDD的ID imbalancedPartitions += idx }else{ // 5. 計算樣本權重 val weight = ( // 採樣數據的佔比 n.toDouble / sample.length).toFloat for (key <- sample) { // 記錄採樣數據key和權重 candidates += ((key, weight)) } } // 6. 對於數據分佈不均衡的RDD分區,從新數據抽樣 if (imbalancedPartitions.nonEmpty) { // 利用rdd的sample抽樣函數API進行數據抽樣 val reSampled = imbalanced.sample( withReplacement = false, fraction, seed).collect() } }
將最終的抽樣數據計算出分區邊界數組返回,邊界數組裏面存放的是RDD裏面數據的key值, 好比最終返回的數組是:array[0,10,20,30..] 其中0,10,20,30是採樣數據中的key值,對於每一條數據都會判斷其在此數組的那個區間中間,例若有一條數據key值是3則其在0到10之間,屬於第一個分區,同理Key值爲15的數據在第二個分區
class RangePartitioner(partitions,rdd) { // 1. 計算樣本大小 val sampleSize = math.min(20.0 * partitions, 1e6) // 2. 計算樣本最大值 val sampleSizePerPartition = math.ceil( 3.0 * sampleSize / rdd.partitions.length ).toInt } // 3. 進行抽樣,返回總數據量,分區ID和樣本數據 val (numItems, sketched) = RangePartitioner.sketch( rdd.map(_._1), sampleSizePerPartition) // 4. 是否須要二次採樣 val imbalancedPartitions = mutable.Set.empty[Int] // 5. 保存樣本數據的集合buffer:包含數據和權重 val candidates = ArrayBuffer.empty[(K, Float)] if (fraction * n > sampleSizePerPartition) { // 記錄須要從新採樣的RDD的ID imbalancedPartitions += idx }else{ // 5. 計算樣本權重 val weight = ( // 採樣數據的佔比 n.toDouble / sample.length).toFloat for (key <- sample) { // 記錄採樣數據key和權重 candidates += ((key, weight)) } } // 6. 對於數據分佈不均衡的RDD分區,從新數據抽樣 if (imbalancedPartitions.nonEmpty) { // 利用rdd的sample抽樣函數API進行數據抽樣 val reSampled = imbalanced.sample( withReplacement = false, fraction, seed).collect() } // 7. 生成邊界數組 RangePartitioner.determineBounds( candidates, partitions) }
水塘抽樣概念: 它是一系列的隨機算法,其目的在於從包含n個項目的集合S中選取k個樣本,使得每條數據抽中的機率是k/n。其中n爲一很大或未知的數量,尤爲適用於不能把全部n個項目都存放到主內存的狀況
咱們能夠:定義取出的行號爲choice,第一次直接以第一行做爲取出行 choice ,然後第二次以二分之一律率決定是否用第二行替換 choice ,第三次以三分之一的機率決定是否以第三行替換 choice ……,以此類推。由上面的分析咱們能夠得出結論,在取第n個數據的時候,咱們生成一個0到1的隨機數p,若是p小於1/n,保留第n個數。大於1/n,繼續保留前面的數。直到數據流結束,返回此數,算法結束。
詳見: https://www.iteblog.com/archives/1525.html https://my.oschina.net/freelili/blog/2987667
實現:
難點:
// 計算出須要替換的數組下標 // 選取第n個數的機率是:n/l; 若是隨機替換數組值的機率是p=rand.nextDouble, // 則若是p<k/l;則替換池中任意一個數,即: p*l < k 則進行替換,用p*l做爲隨機替換的下標 val replacementIndex = (rand.nextDouble() * l).toLong if (replacementIndex < k) { // 替換reservoir[隨機抽取的下標]的值爲input[l]的值item reservoir(replacementIndex.toInt) = item }
若是分區邊界數組的大小小於或等於128的時候直接變量數組,不然採用二分查找法肯定key屬於某個分區。
遍歷數組,判斷當前key值是否屬於當前區間
// 根據RDD的key值返回對應的分區id。從0開始 def getPartition(key: Any): Int = { // 強制轉換key類型爲RDD中本來的數據類型 val k = key.asInstanceOf[K] var partition = 0 if (rangeBounds.length <= 128) { // 若是分區數據小於等於128個,那麼直接本地循環尋找當前k所屬的分區下標 // ordering.gt(x,y):若是x>y,則返回true while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) { partition += 1 }
對於分區數大於128的狀況,採樣二分法查找
// 根據RDD的key值返回對應的分區id。從0開始 def getPartition(key: Any): Int = { // 若是分區數量大於128個,那麼使用二分查找方法尋找對應k所屬的下標; // 可是若是k在rangeBounds中沒有出現,實質上返回的是一個負數(範圍)或者是一個超過rangeBounds大小的數(最後一個分區,比全部數據都大) // Determine which binary search method to use only once. partition = binarySearch(rangeBounds, k) // binarySearch either returns the match location or -[insertion point]-1 if (partition < 0) { partition = -partition-1 } if (partition > rangeBounds.length) { partition = rangeBounds.length }
自定義:
public class MyPartioner extends Partitioner { @Override public int numPartitions() { return 1000; } @Override public int getPartition(Object key) { String k = (String) key; int code = k.hashCode() % 1000; System.out.println(k+":"+code); return code < 0?code+1000:code; } @Override public boolean equals(Object obj) { if(obj instanceof MyPartioner){ if(this.numPartitions()==((MyPartioner) obj).numPartitions()){ return true; } return false; } return super.equals(obj); } }
調用:pairRdd.groupbykey(new MyPartitioner())
參考連接:https://ihainan.gitbooks.io/spark-source-code/content/section1/rddPartitions.html
sparkCore源碼解析系列: