【Spark】sortBy[T]和sortByKey[T]排序詳解

問題導讀:

1. 排序算子是如何做排序的?
2. 完整的排序流程是?


解決方案:

1 前言

在前面一系列博客中,特別在Shuffle博客系列中,曾描述過在生成ShuffleWrite的文件的時候,對每個partition會先進行排序並spill到文件中,最後合併成ShuffleWrite的文件,也就是每個Partition裏的內容已經進行了排序,在最後的action操作的時候需要對每個executor生成的shuffle文件相同的Partition進行合併,完成Action的操作。

排序算子和常見的reduce算子算法有何區別?
常見的一些聚合、reduce算子,不需要排序
  • 將相同的hashcode分配到同一個partition,哪怕是不同的executor
  • 在做最後的合併的時候,只需要合併不同的executor裏相同的partition就可以了
  • 對每個partition進行排序,考慮內存因數,解決相同的Partition多文件合併的問題,使用外排序進行相同的key合併


2 排序

下面是一個常見的排序的小例子:


[Scala]  純文本查看  複製代碼
?
01
02
03
04
05
06
07
08
09
10
11
12
13
14
package spark.sort
 
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
 
object sortsample {
   def main(args : Array[String]) {
     
     val conf = new SparkConf().setAppName( "sortsample" )
     val sc = new SparkContext(conf)
     var pairs = sc.parallelize(Array(( "a" , 0 ),( "b" , 0 ),( "c" , 3 ),( "d" , 6 ),( "e" , 0 ),( "f" , 0 ),( "g" , 3 ),( "h" , 6 )), 2 );
     pairs.sortByKey( true , 3 ).collect().foreach(println);
   }
}


核心代碼:OrderedRDDFunctions.scala

會很奇怪麼?RDD裏面並沒有sortByKey的方法?在這裏和前面博客裏提到的PairRDDFunctions一樣,隱式轉換:


[Scala]  純文本查看  複製代碼
?
1
2
3
4
implicit def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V : ClassTag](rdd : RDD[(K, V)])
   : OrderedRDDFunctions[K, V, (K, V)] = {
   new OrderedRDDFunctions[K, V, (K, V)](rdd)
}


調用的是OrderedRDDFunctions.scala裏的方法

[Scala]  純文本查看  複製代碼
?
1
2
3
4
5
6
7
def sortByKey(ascending : Boolean = true , numPartitions : Int = self.partitions.length)
      : RDD[(K, V)] = self.withScope
  {
    val part = new RangePartitioner(numPartitions, self, ascending)
    new ShuffledRDD[K, V, V](self, part)
      .setKeyOrdering( if (ascending) ordering else ordering.reverse)
  }


對Partition採用了範圍分配的策略,爲何要使用範圍分配的策略?
  • 對其它非排序類型的算子,使用散列算法,只要保證相同的key是分配在相同的partition就可以了,並不會影響相同的key的合併,計算。
  • 對排序來說,如果只是保證相同的key在相同的Partition並不足夠,最後還是需要合併所有的Partition進行排序合併,如果這發生在Driver端做這件事,將會非常可怕,那麼我們可以做一些策略改變,制定一些Range,使排序相近的key分配到同一個Range上,在把Range擴大化,比如:一個Partition管理一個Range



 

2.1 分配Range
Range的分配不合理,會影響數據的不均衡,導致executor在做同Partition排序的時候會不均衡,並行計算的整體性能往往會被單個最糟糕的運行節點所拖累,如果提高運算的速度,需要考慮數據分配的均衡性。

2.1.1 每個區塊採樣大小
獲取所有的key,依據所有的Key制定區間,這顯然是不明智的,後果變成一個全量數據的排序。我們可以採用部分採樣的策略,基於採樣數據進行區間劃分,首先我們需要評估一個簡單的採樣大小的閾值。
Partitioner.scala rangeBounds
代碼如下:


[Scala]  純文本查看  複製代碼
?
1
2
3
4
val sampleSize = math.min( 20.0 * partitions, 1 e 6 )
       // Assume the input partitions are roughly balanced and over-sample a little bit.
       val sampleSizePerPartition = math.ceil( 3.0 * sampleSize / rdd.partitions.length).toInt
       val (numItems, sketched) = RangePartitioner.sketch(rdd.map( _ . _ 1 ), sampleSizePerPartition)


partitions: 參數在指定sortByKey的時候設置的區塊大小:3

[Scala]  純文本查看  複製代碼
?
1
pairs.sortByKey( true , 3 )


rdd.partitions: 指的是在數據的分區塊大小:2

[Scala]  純文本查看  複製代碼
?
1
sc.parallelize(Array(( "a" , 0 ),( "b" , 0 ),( "c" , 3 ),( "d" , 6 ),( "e" , 0 ),( "f" , 0 ),( "g" , 3 ),( "h" , 6 )), 2 )


每個區塊需要採樣的數量是通過幾個固定參數來計算

[Scala]  純文本查看  複製代碼
?
1
val sampleSizePerPartition = math.ceil( 3.0 * sampleSize / rdd.partitions.length).toInt



2.1.2 Sketch採樣(蓄水池採樣法)

[Scala]  純文本查看  複製代碼
?
01
02
03
04
05
06
07
08
09
10
11
12
13
14
def sketch[K : ClassTag](
     rdd : RDD[K],
     sampleSizePerPartition : Int) : (Long, Array[(Int, Long, Array[K])]) = {
   val shift = rdd.id
   // val classTagK = classTag[K] // to avoid serializing the entire partitioner object
   val sketched = rdd.mapPartitionsWithIndex { (idx, iter) = >
     val seed = byteswap 32 (idx ^ (shift << 16 ))
     val (sample, n) = SamplingUtils.reservoirSampleAndCount(
       iter, sampleSizePerPartition, seed)
     Iterator((idx, n, sample))
   }.collect()
   val numItems = sketched.map( _ . _ 2 ).sum
   (numItems, sketched)
}


mapPartitionsWithIndex, collection 這些都是RDD ,都是需要在提交job進行運算的,也就是採樣的過程中,是通過executor執行了一次job

[Scala]  純文本查看  複製代碼
?
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def reservoirSampleAndCount[T : ClassTag](
     input : Iterator[T],
     k : Int,
     seed : Long = Random.nextLong())
   : (Array[T], Long) = {
   val reservoir = new Array[T](k)
   // Put the first k elements in the reservoir.
   var i = 0
   while (i < k && input.hasNext) {
     val item = input.next()
     reservoir(i) = item
     i + = 1
   }
 
   // If we have consumed all the elements, return them. Otherwise do the replacement.
   if (i < k) {
     // If input size < k, trim the array to return only an array of input size.
     val trimReservoir = new Array[T](i)
     System.arraycopy(reservoir, 0 , trimReservoir, 0 , i)
     (trimReservoir, i)
   } else {
     // If input size > k, continue the sampling process.
     var l = i.toLong
     val rand = new XORShiftRandom(seed)
     while (input.hasNext) {
       val item = input.next()
       l + = 1
       // There are k elements in the reservoir, and the l-th element has been
       // consumed. It should be chosen with probability k/l. The expression
       // below is a random long chosen uniformly from [0,l)
       val replacementIndex = (rand.nextDouble() * l).toLong
       if (replacementIndex < k) {
         reservoir(replacementIndex.toInt) = item
       }
     }
     (reservoir, l)
   }
}


函數reservoirSampleAndCount採樣
  • 當數據小於要採樣的集合的時候,可以使用數據爲樣本
  • 當數據集合超過需要採樣數目的時候會繼續遍歷整個數據集合,通過隨機數進行位置的隨機替換,保證採樣數據的隨機性

返回的結果裏包含了總數據集,區塊編號,區塊的數量,每個區塊的採樣集


2.1.3 重新採樣
爲了避免某些區塊的數據量過大,設置了一個閾值:


[Scala]  純文本查看  複製代碼
?
1
val fraction = math.min(sampleSize / math.max(numItems, 1 L), 1.0 )


閾值=採樣數除於總數據量,當某個區塊的數據量*閾值大於每個區的採樣率的時候,認爲這個區塊的採樣率是不足的,需要重新採樣

[Scala]  純文本查看  複製代碼
?
1
2
3
4
5
val imbalanced = new PartitionPruningRDD(rdd.map( _ . _ 1 ), imbalancedPartitions.contains)
           val seed = byteswap 32 (-rdd.id - 1 )
           val reSampled = imbalanced.sample(withReplacement = false , fraction, seed).collect()
           val weight = ( 1.0 / fraction).toFloat
           candidates ++ = reSampled.map(x = > (x, weight))



2.1.4 採樣集key的權重
我們在前面對每個區進行了相同數量的採樣(不包含重新採樣),但是每個區的數量有可能是不均衡的,爲了避免不均衡性需要對每個區採樣的key進行權重設置,儘量分配高權重給數據量多的區
權重因子:


[Scala]  純文本查看  複製代碼
?
1
val weight = (n.toDouble / sample.length).toFloat


n 是區的數據數量
sample 是採樣的數量
這裏權重的最小值是1,因爲採樣的數量肯定是小於等於數據

當數據量大於採樣數量的時候,每個區的採樣數量是相同的,那麼意味着區的數據量越大,該區塊的key的權重也就越大



2.1.5 分配每個區塊的range
樣本已經採集好了,現在需要對依據樣本進行區塊的range進行分配
  • 先對樣本進行排序
  • 依據每個樣本的權重計算每個區塊平均所分配的權重
  • 最後通過每個區分配的權重按照順序來決定獲取哪些樣本用作range,一個區分配一個樣本區間



[Scala]  純文本查看  複製代碼
?
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def determineBounds[K : Ordering : ClassTag](
     candidates : ArrayBuffer[(K, Float)],
     partitions : Int) : Array[K] = {
   val ordering = implicitly[Ordering[K]]
   val ordered = candidates.sortBy( _ . _ 1 )
   val numCandidates = ordered.size
   val sumWeights = ordered.map( _ . _ 2 .toDouble).sum
   val step = sumWeights / partitions
   var cumWeight = 0.0
   var target = step
   val bounds = ArrayBuffer.empty[K]
   var i = 0
   var j = 0
   var previousBound = Option.empty[K]
   while ((i < numCandidates) && (j < partitions - 1 )) {
     val (key, weight) = ordered(i)
     cumWeight + = weight
     if (cumWeight > = target) {
       // Skip duplicate values.
       if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {
         bounds + = key
         target + = step
         j + = 1
         previousBound = Some(key)
       }
     }
     i + = 1
   }
   bounds.toArray
}


2.2 ShuffleWriter
在以前的博客裏介紹了SortShuffleWrite,在sortByKey的排序情況下使用了BypassMergeSortShuffleWriter,把焦點聚焦到key如何分配到Partitioner和每個Partition的文件將會如何寫入key,value生成Shuffle文件,在這兩點上BypassMergeSortShuffleWriter將明顯的不同於SortShuffleWrite


[Scala]  純文本查看  複製代碼
?
1
2
3
4
5
while (records.hasNext()) {
       final Product 2 <K, V> record = records.next();
       final K key = record. _ 1 ();
       partitionWriters[partitioner.getPartition(key)].write(key, record. _ 2 ());
     }



2.2.1 分配key到Partition
在函數調用了partitioner.getPartition方法,還是回到RangePartitioner類中


[Scala]  純文本查看  複製代碼
?
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def getPartition(key : Any) : Int = {
    val k = key.asInstanceOf[K]
    var partition = 0
    if (rangeBounds.length < = 128 ) {
      // If we have less than 128 partitions naive search
      while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
        partition + = 1
      }
    } else {
      // Determine which binary search method to use only once.
      partition = binarySearch(rangeBounds, k)
      // binarySearch either returns the match location or -[insertion point]-1
      if (partition < 0 ) {
        partition = -partition- 1
      }
      if (partition > rangeBounds.length) {
        partition = rangeBounds.length
      }
    }
    if (ascending) {
      partition
    } else {
      rangeBounds.length - partition
    }
  }


  • 當Partition的分配數小於128的時候,輪訓的查找每個Partition
  • 當Partition大於128的時候,使用二分法查找Partition


2.2.2 生成shuffle文件

  • 基於前面對key進行排序的partition的分配,寫到對應的partition文件中
  • 合併Partition文件生成index和data文件(shuffle_shuffleid_mapid_0.index)(shuffle_shuffleid_mapid_0.data)因爲Partition已經合併了,最後一位reduceID都是爲0
 



注意:在這裏並沒有象SortShuffleWrite 對每個Partition進行排序,Spill 文件,最後合併文件,而是直接寫到了Partition文件中。

2.3 Shuffle Read讀取Shuffle文件
在BlockStoreShuffleReader的read函數裏


[Scala]  純文本查看  複製代碼
?
01
02
03
04
05
06
07
08
09
10
11
12
13
14
dep.keyOrdering match {
     case Some(keyOrd : Ordering[K]) = >
       // Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled,
       // the ExternalSorter won't spill to disk.
       val sorter =
         new ExternalSorter[K, C, C](context, ordering = Some(keyOrd), serializer = dep.serializer)
       sorter.insertAll(aggregatedIter)
       context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
       context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
       context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
       CompletionIterator[Product 2 [K, C], Iterator[Product 2 [K, C]]](sorter.iterator, sorter.stop())
     case None = >
       aggregatedIter
   }


ExternalSorter.insertAll函數

[Scala]  純文本查看  複製代碼
?
1
2
3
4
5
6
while (records.hasNext) {
        addElementsRead()
        val kv = records.next()
        buffer.insert(getPartition(kv. _ 1 ), kv. _ 1 , kv. _ 2 .asInstanceOf[C])
        maybeSpillCollection(usingMap = false )
      }


ExternalSorter函數,這個函數在前面的這篇博客裏介紹的比較清楚,這裏使用了buffer結構體

[Scala]  純文本查看  複製代碼
?
1
2
@ volatile private var map = new PartitionedAppendOnlyMap[K, C]
  @ volatile private var buffer = new PartitionedPairBuffer[K, C]


在reduceByKey的這些算子相同的Key是需要合併的,所以需要使用Map結構處理相同的Key的值的合併問題,而對排序來說,並不需要相同的值合併,使用Array結構就可以了。
注:在Spark上實現Map、Array都使用了數組的結構,並沒有用鏈表結構


 

在上圖的PartitionPairBuffer結構中,有以下幾點要注意:
插入KV結構的時候,不進行排序,也就是在處理相同的Partition的時候直接讀取插入Array
會存在當內存不夠Spill到磁盤的情況,關於Spill請具體參考博客鏈接


2.3.1 排序
當ExternalSorter.insertAll函數完成後,纔會構建一個排序的迭代器


[Scala]  純文本查看  複製代碼
?
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
def partitionedIterator : Iterator[(Int, Iterator[Product 2 [K, C]])] = {
val collection : WritablePartitionedPairCollection[K, C] = if (usingMap) map else buffer
   val usingMap = aggregator.isDefined
   if (spills.isEmpty) {
     // Special case: if we have only in-memory data, we don't need to merge streams, and perhaps
     // we don't even need to sort by anything other than partition ID
     if (!ordering.isDefined) {
       // The user hasn't requested sorted keys, so only sort by partition ID, not key
       groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None)))
     } else {
       // We do need to sort by both partition ID and key
       groupByPartition(destructiveIterator(
         collection.partitionedDestructiveSortedIterator(Some(keyComparator))))
     }
   } else {
     // Merge spilled and in-memory data
     merge(spills, destructiveIterator(
       collection.partitionedDestructiveSortedIterator(comparator)))
   }
}


這裏分成兩種情況:
還在內存裏沒有Spill到文件中去,這時候構建一個內存裏的PartitionedDestructiveSortedIterator迭代器,在迭代器中已經排序好了PartitionPairBuffer裏的內容


[Scala]  純文本查看  複製代碼
?
1
2
3
4
5
6
7
/** Iterate through the data in a given order. For this class this is not really destructive. */
override def partitionedDestructiveSortedIterator(keyComparator : Option[Comparator[K]])
   : Iterator[((Int, K), V)] = {
   val comparator = keyComparator.map(partitionKeyComparator).getOrElse(partitionComparator)
   new Sorter( new KVArraySortDataFormat[(Int, K), AnyRef]).sort(data, 0 , curSize, comparator)
   iterator
}


Spill到文件裏的,文件裏的已經排好序了,需要對內存裏的PartitionPairBuffer進行排序(和前面一種情況相同的處理),最後對文件和內存進行外排序(外排序可參考博客)

2.4 最後的歸併
在Driver端Dag-scheduler-event-loop 線程中會處理每個executor返回的結果(剛纔Partition排序後的結果)


[Scala]  純文本查看  複製代碼
?
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
   private [scheduler] def handleTaskCompletion(event : CompletionEvent) {
....
   case Success = >
         stage.pendingPartitions - = task.partitionId
         task match {
           case rt : ResultTask[ _ , _ ] = >
             // Cast to ResultStage here because it's part of the ResultTask
             // TODO Refactor this out to a function that accepts a ResultStage
             val resultStage = stage.asInstanceOf[ResultStage]
             resultStage.activeJob match {
               case Some(job) = >
                 if (!job.finished(rt.outputId)) {
                   updateAccumulators(event)
                   job.finished(rt.outputId) = true
                   job.numFinished + = 1
                   // If the whole job has finished, remove it
                   if (job.numFinished == job.numPartitions) {
                     markStageAsFinished(resultStage)
                     cleanupStateForJobAndIndependentStages(job)
                     listenerBus.post(
                       SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded))
                   }
 
                   // taskSucceeded runs some user code that might throw an exception. Make sure
                   // we are resilient against that.
                   try {
                     job.listener.taskSucceeded(rt.outputId, event.result)
                   } catch {
                     case e : Exception = >
                       // TODO: Perhaps we want to mark the resultStage as failed?
                       job.listener.jobFailed( new SparkDriverExecutionException(e))
                   }
                 }
}



通過方法taskSucceeded的方法進行不同的Partition的合併

[Scala]  純文本查看  複製代碼
?
1
job.listener.taskSucceeded(rt.outputId, event.result)


[Scala]  純文本查看  複製代碼
?
1
2
3
4
5
6
7
8
9
override def taskSucceeded(index : Int, result : Any) : Unit = {
   // resultHandler call must be synchronized in case resultHandler itself is not thread safe.
   synchronized {
     resultHandler(index, result.asInstanceOf[T])
   }
   if (finishedTasks.incrementAndGet() == totalTasks) {
     jobPromise.success(())
   }
}


實際上是調用了resultHandler方法,我們來看看resultHandler是怎樣定義的

[Scala]  純文本查看  複製代碼
?
1
2
3
4
5
6
7
8
def runJob[T, U : ClassTag](
     rdd
相關文章
相關標籤/搜索