Spark分區器HashPartitioner

Spark中分區器直接決定了RDD中分區的個數;也決定了RDD中每條數據通過Shuffle過程屬於哪一個分區;也決定了Reduce的個數。這三點看起來是不一樣的方面的,但其深層的含義是一致的。
php

咱們須要注意的是,只有Key-Value類型的RDD纔有分區的,非Key-Value類型的RDD分區的值是None的。
java

注:有的時候,HashPartitioner存在 分區碰撞問題,即不一樣的值可能計算出來的分區是同樣的 key的hashcode % reduce個數.(例如:java.itcast.cn與php.itcast.cn的hashcode值就是同樣的,固然這是小几率事件,可是有的時候還真的是會發生的),因此有的時候須要咱們本身實現分區程序!在spark中實現自定義的分區只須要實現partitioner trait並實現裏面的方法!算法


固然:在Spark中,存在兩類分區函數:HashPartitioner和RangePartitioner,它們都是繼承自Partitioner,主要提供了每一個RDD有幾個分區(numPartitions)以及對於給定的值返回一個分區ID(0~numPartitions-1),也就是決定這個值是屬於那個分區的。apache




HashPartitioner分區

  HashPartitioner分區的原理很簡單,對於給定的key,計算其hashCode,併除於分區的個數取餘,若是餘數小於0,則用餘數+分區的個數,最後返回的值就是這個key所屬的分區ID。實現以下:數組

01 /////////////////////////////////////////////////////////////////////
02  User: 過往記憶
03  Date: 2015-11-10
04  Time: 06:59
05  bolg: http://www.iteblog.com
06  本文地址:http://www.iteblog.com/archives/1522
07  過往記憶博客,專一於hadoop、hive、spark、shark、flume的技術博客,大量的乾貨
08  過往記憶博客微信公共賬號:iteblog_hadoop
09 /////////////////////////////////////////////////////////////////////
10  
11 class HashPartitioner(partitions: Int) extends Partitioner {
12   require(partitions >=0, s"Number of partitions ($partitions) cannot be negative.")
13  
14   def numPartitions: Int = partitions
15  
16   def getPartition(key: Any): Int = key match {
17     case null =0
18     case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
19   }
20  
21   override def equals(other: Any): Boolean = other match {
22     case h: HashPartitioner =>
23       h.numPartitions == numPartitions
24     case _ =>
25       false
26   }
27  
28   override def hashCode: Int = numPartitions
29 }

RangePartitioner分區

  從HashPartitioner分區的實現原理咱們能夠看出,其結果可能致使每一個分區中數據量的不均勻,極端狀況下會致使某些分區擁有RDD的所有數據,這顯然不是咱們須要的。而RangePartitioner分區則儘可能保證每一個分區中數據量的均勻,並且分區與分區之間是有序的,也就是說一個分區中的元素確定都是比另外一個分區內的元素小或者大;可是分區內的元素是不能保證順序的。簡單的說就是將必定範圍內的數映射到某一個分區內。微信

  前面討論過,RangePartitioner分區器的主要做用就是將必定範圍內的數映射到某一個分區內,因此它的實現中分界的算法尤其重要。這個算法對應的函數是rangeBounds。這個函數主要經歷了兩個過程:以Spark 1.1版本爲界,Spark 1.1版本社區對rangeBounds函數進行了一次重大的重構。less

  由於在Spark 1.1版本以前,RangePartitioner分區對整個數據集進行了2次的掃描:一次是計算RDD中元素的個數;一次是進行採樣。具體的代碼以下:dom

01 // An array of upper bounds for the first (partitions - 1) partitions
02 private val rangeBounds: Array[K] = {
03     if (partitions == 1) {
04       Array()
05     else {
06       val rddSize = rdd.count()
07       val maxSampleSize = partitions * 20.0
08       val frac =math.min(maxSampleSize / math.max(rddSize, 1), 1.0)
09       val rddSample =rdd.sample(false, frac, 1).map(_._1).collect().sorted
10       if (rddSample.length == 0) {
11         Array()
12       else {
13         val bounds = new Array[K](partitions - 1)
14         for (i <- 0 until partitions - 1) {
15           val index = (rddSample.length - 1) * (i + 1) / partitions
16           bounds(i) = rddSample(index)
17         }
18         bounds
19       }
20     }
21 }

  注意看裏面的rddSize的計算和rdd.sample的計算。因此若是你進行一次sortByKey操做就會對RDD掃描三次!而咱們都知道,分區函數性能對整個Spark做業的性能是有直接的影響,並且影響很大,直接影響做業運行的總時間,因此社區不得不對RangePartitioner中的rangeBounds算法進行重構。ide

  在閱讀新版本的RangePartitioner以前,建議先去了解一下Reservoir sampling(水塘抽樣),由於其中的實現用到了Reservoir sampling算法進行採樣。
採樣總數函數

  在新的rangeBounds算法總,採樣總數作了一個限制,也就是最大隻採樣1e6的樣本(也就是1000000):

1 val sampleSize = math.min(20.0 * partitions, 1e6)

因此若是你的分區個數爲5,則採樣樣本數量爲100.0

父RDD中每一個分區採樣樣本數

  按照咱們的思路,正常狀況下,父RDD每一個分區須要採樣的數據量應該是sampleSize/rdd.partitions.size,可是咱們看代碼的時候發現父RDD每一個分區須要採樣的數據量是正常數的3倍。

1 val sampleSizePerPartition = math.ceil(3.0* sampleSize / rdd.partitions.size).toInt

這是由於父RDD各分區中的數據量可能會出現傾斜的狀況,乘於3的目的就是保證數據量小的分區可以採樣到足夠的數據,而對於數據量大的分區會進行第二次採樣。

採樣算法

  這個地方就是RangePartitioner分區的核心了,其內部使用的就是水塘抽樣,而這個抽樣特別適合那種總數很大並且未知,並沒有法將全部的數據所有存放到主內存中的狀況。也就是咱們不須要事先知道RDD中元素的個數(不須要調用rdd.count()了!)。其主要實現以下:

01 /////////////////////////////////////////////////////////////////////
02  User: 過往記憶
03  Date: 2015-11-10
04  Time: 06:59
05  bolg: http://www.iteblog.com
06  本文地址:http://www.iteblog.com/archives/1522
07  過往記憶博客,專一於hadoop、hive、spark、shark、flume的技術博客,大量的乾貨
08  過往記憶博客微信公共賬號:iteblog_hadoop
09 /////////////////////////////////////////////////////////////////////
10  
11 val (numItems, sketched) =RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
12  
13 def sketch[K : ClassTag](
14       rdd: RDD[K],
15       sampleSizePerPartition: Int):(Long, Array[(Int, Int, Array[K])]) = {
16     val shift = rdd.id
17     // val classTagK = classTag[K] // to avoid serializing the entire partitioner object
18     val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
19       val seed = byteswap32(idx ^ (shift << 16))
20       val (sample, n) = SamplingUtils.reservoirSampleAndCount(
21         iter, sampleSizePerPartition, seed)
22       Iterator((idx, n, sample))
23     }.collect()
24     val numItems = sketched.map(_._2.toLong).sum
25     (numItems, sketched)
26 }
27  
28 def reservoirSampleAndCount[T: ClassTag](
29       input: Iterator[T],
30       k: Int,
31       seed: Long = Random.nextLong())
32     : (Array[T], Int) = {
33     val reservoir = new Array[T](k)
34     // Put the first k elements in the reservoir.
35     var = 0
36     while (i < k && input.hasNext) {
37       val item = input.next()
38       reservoir(i) = item
39       i += 1
40     }
41  
42     // If we have consumed all the elements, return them. Otherwise do the replacement.
43     if (i < k) {
44       // If input size < k, trim the array to return only an array of input size.
45       val trimReservoir = new Array[T](i)
46       System.arraycopy(reservoir, 0, trimReservoir, 0, i)
47       (trimReservoir, i)
48     else {
49       // If input size > k, continue the sampling process.
50       val rand = new XORShiftRandom(seed)
51       while (input.hasNext) {
52         val item = input.next()
53         val replacementIndex = rand.nextInt(i)
54         if (replacementIndex < k) {
55           reservoir(replacementIndex) = item
56         }
57         i += 1
58       }
59       (reservoir, i)
60 }
61 }

  RangePartitioner.sketch的第一個參數是rdd.map(_._1),也就是把父RDD的key傳進來,由於分區只須要對Key進行操做便可。該函數返回值是val (numItems, sketched) ,其中numItems至關於記錄rdd元素的總數;而sketched的類型是Array[(Int, Int, Array[K])],記錄的是分區的編號、該分區中總元素的個數以及從父RDD中每一個分區採樣的數據。

  sketch函數對父RDD中的每一個分區進行採樣,並記錄下分區的ID和分區中數據總和。

  reservoirSampleAndCount函數就是典型的水塘抽樣實現,惟一不一樣的是該算法還記錄下i的值,這個就是該分區中元素的總和。

  咱們以前討論過,父RDD各分區中的數據量可能不均勻,在極端狀況下,有些分區內的數據量會佔有整個RDD的絕大多數的數據,若是按照水塘抽樣進行採樣,會致使該分區所採樣的數據量不足,因此咱們須要對該分區再一次進行採樣,而此次採樣使用的就是rdd的sample函數。實現以下:

01 val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
02 val candidates = ArrayBuffer.empty[(K, Float)]
03 val imbalancedPartitions = mutable.Set.empty[Int]
04 sketched.foreach { case (idx, n, sample) =>
05   if (fraction * n > sampleSizePerPartition) {
06     imbalancedPartitions += idx
07   else {
08     // The weight is 1 over the sampling probability.
09     val weight = (n.toDouble / sample.size).toFloat
10     for (key <- sample) {
11       candidates += ((key, weight))
12     }
13   }
14 }
15 if (imbalancedPartitions.nonEmpty) {
16   // Re-sample imbalanced partitions with the desired sampling probability.
17   val imbalanced = newPartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
18   val seed = byteswap32(-rdd.id - 1)
19   val reSampled = imbalanced.sample(withReplacement =false, fraction, seed).collect()
20   val weight = (1.0 / fraction).toFloat
21   candidates ++= reSampled.map(x => (x, weight))
22 }

咱們能夠看到,從新採樣的採樣因子和Spark 1.1以前的採樣因子一致。對於知足於fraction * n > sampleSizePerPartition條件的分區,咱們對其再一次採樣。全部採樣完的數據所有存放在candidates 中。

確認邊界

從上面的採樣算法能夠看出,對於不一樣的分區weight的值是不同的,這個值對應的就是每一個分區的採樣間隔。

01 def determineBounds[K : Ordering : ClassTag](
02     candidates: ArrayBuffer[(K, Float)],
03     partitions: Int): Array[K] = {
04   val ordering = implicitly[Ordering[K]]
05   val ordered = candidates.sortBy(_._1)
06   val numCandidates = ordered.size
07   val sumWeights = ordered.map(_._2.toDouble).sum
08   val step = sumWeights / partitions
09   var cumWeight = 0.0
10   var target = step
11   val bounds = ArrayBuffer.empty[K]
12   var = 0
13   var = 0
14   var previousBound = Option.empty[K]
15   while ((i < numCandidates) && (j < partitions - 1)) {
16     val (key, weight) = ordered(i)
17     cumWeight += weight
18     if (cumWeight > target) {
19       // Skip duplicate values.
20       if(previousBound.isEmpty || ordering.gt(key, previousBound.get)) {
21         bounds += key
22         target += step
23         j += 1
24         previousBound = Some(key)
25       }
26     }
27     i += 1
28
相關文章
相關標籤/搜索