轉自:http://www.javashuo.com/article/p-fdfyirll-gs.html算法
在Spark中,RDD(Resilient Distributed Dataset)是其最基本的抽象數據集,其中每一個RDD是由若干個Partition組成。在Job運行期間,參與運算的Partition數據分佈在多臺機器的內存當中。這裏可將RDD當作一個很是大的數組,其中Partition是數組中的每一個元素,而且這些元素分佈在多臺機器中。圖一中,RDD1包含了5個Partition,RDD2包含了3個Partition,這些Partition分佈在4個節點中。數組
Spark包含兩種數據分區方式:HashPartitioner(哈希分區)和RangePartitioner(範圍分區)。通常而言,對於初始讀入的數據是不具備任何的數據分區方式的。數據分區方式只做用於<Key,Value>形式的數據。所以,當一個Job包含Shuffle操做類型的算子時,如groupByKey,reduceByKey etc,此時就會使用數據分區方式來對數據進行分區,即肯定某一個Key對應的鍵值對數據分配到哪個Partition中。在Spark Shuffle階段中,共分爲Shuffle Write階段和Shuffle Read階段,其中在Shuffle Write階段中,Shuffle Map Task對數據進行處理產生中間數據,而後再根據數據分區方式對中間數據進行分區。最終Shffle Read階段中的Shuffle Read Task會拉取Shuffle Write階段中產生的並已經分好區的中間數據。圖2中描述了Shuffle階段與Partition關係。下面則分別介紹Spark中存在的兩種數據分區方式。less
一、HashPartitioner原理簡介 ide
HashPartitioner採用哈希的方式對<Key,Value>鍵值對數據進行分區。其數據分區規則爲 partitionId = Key.hashCode % numPartitions,其中partitionId表明該Key對應的鍵值對數據應當分配到的Partition標識,Key.hashCode表示該Key的哈希值,numPartitions表示包含的Partition個數。圖3簡單描述了HashPartitioner的數據分區過程。ui
二、HashPartitioner源碼詳解spa
class HashPartitioner(partitions: Int) extends Partitioner { require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.") /** * 包含的分區個數 */ def numPartitions: Int = partitions /** * 得到Key對應的partitionId */ def getPartition(key: Any): Int = key match { case null => 0 case _ => Utils.nonNegativeMod(key.hashCode, numPartitions) } override def equals(other: Any): Boolean = other match { case h: HashPartitioner => h.numPartitions == numPartitions case _ => false } override def hashCode: Int = numPartitions } def nonNegativeMod(x: Int, mod: Int): Int = { val rawMod = x % mod rawMod + (if (rawMod < 0) mod else 0) }
1、RangePartitioner原理簡介
Spark引入RangePartitioner的目的是爲了解決HashPartitioner所帶來的分區傾斜問題,也即分區中包含的數據量不均衡問題。HashPartitioner採用哈希的方式將同一類型的Key分配到同一個Partition中,所以當某一或某幾種類型數據量較多時,就會形成若干Partition中包含的數據過大問題,而在Job執行過程當中,一個Partition對應一個Task,此時就會使得某幾個Task運行過慢。RangePartitioner基於抽樣的思想來對數據進行分區。圖4簡單描述了RangePartitioner的數據分區過程。.net
二、RangePartitioner源碼詳解
① 肯定採樣數據的規模:RangePartitioner默認對生成的子RDD中的每一個Partition採集20條數據,樣本數據最大爲1e6條。3d
// 總共須要採集的樣本數據個數,其中partitions表明最終子RDD中包含的Partition個數 val sampleSize = math.min(20.0 * partitions, 1e6)
② 肯定父RDD中每一個Partition中應當採集的數據量:這裏注意的是,對父RDD中每一個Partition採集的數據量會在平均值上乘以3,這裏是爲了後繼在進行判斷一個Partition是否發生了傾斜,當一個Partition包含的數據量超過了平均值的三倍,此時會認爲該Partition發生了數據傾斜,會對該Partition調用sample算子進行從新採樣。code
// 被採樣的RDD中每一個partition應該被採集的數據,這裏將平均採集每一個partition中數據的3倍 val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
③ 調用sketch方法進行數據採樣:sketch方法返回的結果爲<採樣RDD的數據量,<partitionId, 分區數據量,分區採樣的數據量>>。在sketch方法中會使用水塘抽樣算法對待採樣的各個分區進行數據採樣,這裏採用水塘抽樣算法是因爲實現沒法知道每一個Partition中包含的數據量,而水塘抽樣算法能夠保證在不知道總體的數據量下仍然能夠等機率地抽取出每條數據。圖4簡單描述了水塘抽樣過程。blog
// 使用sketch方法進行數據抽樣 val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition) /** * @param rdd 須要採集數據的RDD * @param sampleSizePerPartition 每一個partition採集的數據量 * @return <採樣RDD數據總量,<partitionId, 當前分區的數據量,當前分區採集的數據量>> */ def sketch[K : ClassTag]( rdd: RDD[K], sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = { val shift = rdd.id val sketched = rdd.mapPartitionsWithIndex { (idx, iter) => val seed = byteswap32(idx ^ (shift << 16)) // 使用水塘抽樣算法進行抽樣,抽樣結果是個二元組<Partition中抽取的樣本量,Partition中包含的數據量> val (sample, n) = SamplingUtils.reservoirSampleAndCount( iter, sampleSizePerPartition, seed) Iterator((idx, n, sample)) }.collect() val numItems = sketched.map(_._2).sum (numItems, sketched) }
④ 數據抽樣完成後,須要對不均衡的Partition從新進行抽樣,默認當Partition中包含的數據量大於平均值的三倍時,該Partition是不均衡的。當採樣完成後,利用樣本容量和RDD中包含的數據總量,能夠獲得總體的一個數據採樣率fraction。利用此採樣率對不均衡的Partition調用sample算子從新進行抽樣。
// 計算數據採樣率 val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0) // 存放採樣Key以及採樣權重 val candidates = ArrayBuffer.empty[(K, Float)] // 存放不均衡的Partition val imbalancedPartitions = mutable.Set.empty[Int] //(idx, n, sample)=> (partition id, 當前分區數據個數,當前partition的採樣數據) sketched.foreach { case (idx, n, sample) => // 當一個分區中的數據量大於平均分區數據量的3倍時,認爲該分區是傾斜的 if (fraction * n > sampleSizePerPartition) { imbalancedPartitions += idx } // 在三倍以內的認爲沒有發生數據傾斜 else { // 每條數據的採樣間隔 = 1/採樣率 = 1/(sample.size/n.toDouble) = n.toDouble/sample.size val weight = (n.toDouble / sample.length).toFloat // 對當前分區中的採樣數據,對每一個key造成一個二元組<key, weight> for (key <- sample) { candidates += ((key, weight)) } } } // 對於非均衡的partition,從新採用sample算子進行抽樣 if (imbalancedPartitions.nonEmpty) { val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains) val seed = byteswap32(-rdd.id - 1) val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect() val weight = (1.0 / fraction).toFloat candidates ++= reSampled.map(x => (x, weight)) }
⑤ 肯定各個Partition的Key範圍:使用determineBounds方法來肯定每一個Partition中包含的Key範圍,先對採樣的Key進行排序,而後計算每一個Partition平均包含的Key權重,而後採用平均分配原則來肯定各個Partition包含的Key範圍。如當前採樣Key以及權重爲:<1, 0.2>, <2, 0.1>, <3, 0.1>, <4, 0.3>, <5, 0.1>, <6, 0.3>,如今將其分配到3個Partition中,則每一個Partition的平均權重爲:(0.2 + 0.1 + 0.1 + 0.3 + 0.1 + 0.3) / 3 = 0.36。此時Partition1 ~ 3分配的Key以及總權重爲<Partition1, {1, 2, 3}, 0.4> <Partition2, {4, 5}, 0.4> <Partition1, {6}, 0.3>。
/** * @param candidates 未按採樣間隔排序的抽樣數據 * @param partitions 最終生成的RDD包含的分區個數 * @return 分區邊界 */ def determineBounds[K : Ordering : ClassTag]( candidates: ArrayBuffer[(K, Float)], partitions: Int): Array[K] = { val ordering = implicitly[Ordering[K]] // 對樣本按照key進行排序 val ordered = candidates.sortBy(_._1) // 抽取的樣本容量 val numCandidates = ordered.size // 抽取的樣本對應的採樣間隔之和 val sumWeights = ordered.map(_._2.toDouble).sum // 平均每一個分區的步長 val step = sumWeights / partitions var cumWeight = 0.0 var target = step // 分區邊界值 val bounds = ArrayBuffer.empty[K] var i = 0 var j = 0 var previousBound = Option.empty[K] while ((i < numCandidates) && (j < partitions - 1)) { val (key, weight) = ordered(i) cumWeight += weight // 當前的採樣間隔小於target,繼續迭代,也即這些key應該放在同一個partition中 if (cumWeight >= target) { // Skip duplicate values. if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) { bounds += key target += step j += 1 previousBound = Some(key) } } i += 1 } bounds.toArray }
⑥ 計算每一個Key所在Partition:當分區範圍長度在128之內,使用順序搜索來肯定Key所在的Partition,不然使用二分查找算法來肯定Key所在的Partition。
/** * 得到每一個Key所在的partitionId */ def getPartition(key: Any): Int = { val k = key.asInstanceOf[K] var partition = 0 // 若是獲得的範圍不大於128,則進行順序搜索 if (rangeBounds.length <= 128) { // If we have less than 128 partitions naive search while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) { partition += 1 } } // 範圍大於128,則進行二分搜索該key所在範圍,便可獲得該key所在的partitionId else { // Determine which binary search method to use only once. partition = binarySearch(rangeBounds, k) // binarySearch either returns the match location or -[insertion point]-1 if (partition < 0) { partition = -partition-1 } if (partition > rangeBounds.length) { partition = rangeBounds.length } } if (ascending) { partition } else { rangeBounds.length - partition } }
//只須要繼承Partitioner,重寫兩個方法 class MyPartitioner(val num: Int) extends Partitioner { //這裏定義partitioner個數 override def numPartitions: Int = ??? //這裏定義分區規則 override def getPartition(key: Any): Int = ??? }
class MyPartitioner(val num:Int) extends Partitioner { override def numPartitions: Int = num override def getPartition(key: Any): Int = { val len = key.toString.length //根據單詞長度對分區個數取模 len % num } }
bject testMyPartitioner { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("test").setMaster("local[*]") val sc = new SparkContext(conf) val rdd1 = sc.parallelize(List("lijie hello lisi", "zhangsan wangwu mazi", "hehe haha nihaoa heihei lure hehe hello word")) val rdd2=rdd1.flatMap(_.split(" ")).map(x=>{ (x,1) }) //這裏指定自定義分區,而後輸出 val rdd3 =rdd2.sortBy(_._2).partitionBy(new MyPartitioner(4)).mapPartitions(x=>x).saveAsTextFile("file:///f:/out") println(rdd2.collect().toBuffer) sc.stop() } }
由於這裏定義的是4個partition 因此最後產生4個文件
其中part-00000 和 part-00001以下:
其中part-00002 和 part-00003以下:
其中part-00000中zhangsan的長度對4取模爲0和這個文件中其餘較短的單詞同樣,因此在一個分區, part-00003沒有內容,說明上面的單詞的長度對4取模結果沒有爲3的
參考:https://blog.csdn.net/qq_20641565/article/details/76130724