問題導讀: 1. 排序算子是如何做排序的? 2. 完整的排序流程是? 解決方案: 1 前言 在前面一系列博客中,特別在Shuffle博客系列中,曾描述過在生成ShuffleWrite的文件的時候,對每個partition會先進行排序並spill到文件中,最後合併成ShuffleWrite的文件,也就是每個Partition裏的內容已經進行了排序,在最後的action操作的時候需要對每個executor生成的shuffle文件相同的Partition進行合併,完成Action的操作。 排序算子和常見的reduce算子算法有何區別? 常見的一些聚合、reduce算子,不需要排序
- 將相同的hashcode分配到同一個partition,哪怕是不同的executor
- 在做最後的合併的時候,只需要合併不同的executor裏相同的partition就可以了
- 對每個partition進行排序,考慮內存因數,解決相同的Partition多文件合併的問題,使用外排序進行相同的key合併
2 排序 下面是一個常見的排序的小例子:
[Scala]
純文本查看
複製代碼
01
02
03
04
05
06
07
08
09
10
11
12
13
14
|
package
spark.sort
import
org.apache.spark.SparkConf
import
org.apache.spark.SparkContext
object
sortsample {
def
main(args
:
Array[String]) {
val
conf
=
new
SparkConf().setAppName(
"sortsample"
)
val
sc
=
new
SparkContext(conf)
var
pairs
=
sc.parallelize(Array((
"a"
,
0
),(
"b"
,
0
),(
"c"
,
3
),(
"d"
,
6
),(
"e"
,
0
),(
"f"
,
0
),(
"g"
,
3
),(
"h"
,
6
)),
2
);
pairs.sortByKey(
true
,
3
).collect().foreach(println);
}
}
|
核心代碼:OrderedRDDFunctions.scala 會很奇怪麼?RDD裏面並沒有sortByKey的方法?在這裏和前面博客裏提到的PairRDDFunctions一樣,隱式轉換:
[Scala]
純文本查看
複製代碼
1
2
3
4
|
implicit
def
rddToOrderedRDDFunctions[K
:
Ordering
:
ClassTag, V
:
ClassTag](rdd
:
RDD[(K, V)])
:
OrderedRDDFunctions[K, V, (K, V)]
=
{
new
OrderedRDDFunctions[K, V, (K, V)](rdd)
}
|
調用的是OrderedRDDFunctions.scala裏的方法
[Scala]
純文本查看
複製代碼
1
2
3
4
5
6
7
|
def
sortByKey(ascending
:
Boolean
=
true
, numPartitions
:
Int
=
self.partitions.length)
:
RDD[(K, V)]
=
self.withScope
{
val
part
=
new
RangePartitioner(numPartitions, self, ascending)
new
ShuffledRDD[K, V, V](self, part)
.setKeyOrdering(
if
(ascending) ordering
else
ordering.reverse)
}
|
對Partition採用了範圍分配的策略,爲何要使用範圍分配的策略?
- 對其它非排序類型的算子,使用散列算法,只要保證相同的key是分配在相同的partition就可以了,並不會影響相同的key的合併,計算。
- 對排序來說,如果只是保證相同的key在相同的Partition並不足夠,最後還是需要合併所有的Partition進行排序合併,如果這發生在Driver端做這件事,將會非常可怕,那麼我們可以做一些策略改變,制定一些Range,使排序相近的key分配到同一個Range上,在把Range擴大化,比如:一個Partition管理一個Range
2.1 分配Range Range的分配不合理,會影響數據的不均衡,導致executor在做同Partition排序的時候會不均衡,並行計算的整體性能往往會被單個最糟糕的運行節點所拖累,如果提高運算的速度,需要考慮數據分配的均衡性。 2.1.1 每個區塊採樣大小 獲取所有的key,依據所有的Key制定區間,這顯然是不明智的,後果變成一個全量數據的排序。我們可以採用部分採樣的策略,基於採樣數據進行區間劃分,首先我們需要評估一個簡單的採樣大小的閾值。 Partitioner.scala rangeBounds 代碼如下:
[Scala]
純文本查看
複製代碼
1
2
3
4
|
val
sampleSize
=
math.min(
20.0
* partitions,
1
e
6
)
// Assume the input partitions are roughly balanced and over-sample a little bit.
val
sampleSizePerPartition
=
math.ceil(
3.0
* sampleSize / rdd.partitions.length).toInt
val
(numItems, sketched)
=
RangePartitioner.sketch(rdd.map(
_
.
_
1
), sampleSizePerPartition)
|
partitions: 參數在指定sortByKey的時候設置的區塊大小:3
[Scala]
純文本查看
複製代碼
1
|
pairs.sortByKey(
true
,
3
)
|
rdd.partitions: 指的是在數據的分區塊大小:2
[Scala]
純文本查看
複製代碼
1
|
sc.parallelize(Array((
"a"
,
0
),(
"b"
,
0
),(
"c"
,
3
),(
"d"
,
6
),(
"e"
,
0
),(
"f"
,
0
),(
"g"
,
3
),(
"h"
,
6
)),
2
)
|
每個區塊需要採樣的數量是通過幾個固定參數來計算
[Scala]
純文本查看
複製代碼
1
|
val
sampleSizePerPartition
=
math.ceil(
3.0
* sampleSize / rdd.partitions.length).toInt
|
2.1.2 Sketch採樣(蓄水池採樣法)
[Scala]
純文本查看
複製代碼
01
02
03
04
05
06
07
08
09
10
11
12
13
14
|
def
sketch[K
:
ClassTag](
rdd
:
RDD[K],
sampleSizePerPartition
:
Int)
:
(Long, Array[(Int, Long, Array[K])])
=
{
val
shift
=
rdd.id
// val classTagK = classTag[K] // to avoid serializing the entire partitioner object
val
sketched
=
rdd.mapPartitionsWithIndex { (idx, iter)
=
>
val
seed
=
byteswap
32
(idx ^ (shift <<
16
))
val
(sample, n)
=
SamplingUtils.reservoirSampleAndCount(
iter, sampleSizePerPartition, seed)
Iterator((idx, n, sample))
}.collect()
val
numItems
=
sketched.map(
_
.
_
2
).sum
(numItems, sketched)
}
|
mapPartitionsWithIndex, collection 這些都是RDD ,都是需要在提交job進行運算的,也就是採樣的過程中,是通過executor執行了一次job
[Scala]
純文本查看
複製代碼
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
def
reservoirSampleAndCount[T
:
ClassTag](
input
:
Iterator[T],
k
:
Int,
seed
:
Long
=
Random.nextLong())
:
(Array[T], Long)
=
{
val
reservoir
=
new
Array[T](k)
// Put the first k elements in the reservoir.
var
i
=
0
while
(i < k && input.hasNext) {
val
item
=
input.next()
reservoir(i)
=
item
i +
=
1
}
// If we have consumed all the elements, return them. Otherwise do the replacement.
if
(i < k) {
// If input size < k, trim the array to return only an array of input size.
val
trimReservoir
=
new
Array[T](i)
System.arraycopy(reservoir,
0
, trimReservoir,
0
, i)
(trimReservoir, i)
}
else
{
// If input size > k, continue the sampling process.
var
l
=
i.toLong
val
rand
=
new
XORShiftRandom(seed)
while
(input.hasNext) {
val
item
=
input.next()
l +
=
1
// There are k elements in the reservoir, and the l-th element has been
// consumed. It should be chosen with probability k/l. The expression
// below is a random long chosen uniformly from [0,l)
val
replacementIndex
=
(rand.nextDouble() * l).toLong
if
(replacementIndex < k) {
reservoir(replacementIndex.toInt)
=
item
}
}
(reservoir, l)
}
}
|
函數reservoirSampleAndCount採樣
- 當數據小於要採樣的集合的時候,可以使用數據爲樣本
- 當數據集合超過需要採樣數目的時候會繼續遍歷整個數據集合,通過隨機數進行位置的隨機替換,保證採樣數據的隨機性
返回的結果裏包含了總數據集,區塊編號,區塊的數量,每個區塊的採樣集 2.1.3 重新採樣 爲了避免某些區塊的數據量過大,設置了一個閾值:
[Scala]
純文本查看
複製代碼
1
|
val
fraction
=
math.min(sampleSize / math.max(numItems,
1
L),
1.0
)
|
閾值=採樣數除於總數據量,當某個區塊的數據量*閾值大於每個區的採樣率的時候,認爲這個區塊的採樣率是不足的,需要重新採樣
[Scala]
純文本查看
複製代碼
1
2
3
4
5
|
val
imbalanced
=
new
PartitionPruningRDD(rdd.map(
_
.
_
1
), imbalancedPartitions.contains)
val
seed
=
byteswap
32
(-rdd.id -
1
)
val
reSampled
=
imbalanced.sample(withReplacement
=
false
, fraction, seed).collect()
val
weight
=
(
1.0
/ fraction).toFloat
candidates ++
=
reSampled.map(x
=
> (x, weight))
|
2.1.4 採樣集key的權重 我們在前面對每個區進行了相同數量的採樣(不包含重新採樣),但是每個區的數量有可能是不均衡的,爲了避免不均衡性需要對每個區採樣的key進行權重設置,儘量分配高權重給數據量多的區 權重因子:
[Scala]
純文本查看
複製代碼
1
|
val
weight
=
(n.toDouble / sample.length).toFloat
|
n 是區的數據數量 sample 是採樣的數量 這裏權重的最小值是1,因爲採樣的數量肯定是小於等於數據 當數據量大於採樣數量的時候,每個區的採樣數量是相同的,那麼意味着區的數據量越大,該區塊的key的權重也就越大 2.1.5 分配每個區塊的range 樣本已經採集好了,現在需要對依據樣本進行區塊的range進行分配
- 先對樣本進行排序
- 依據每個樣本的權重計算每個區塊平均所分配的權重
- 最後通過每個區分配的權重按照順序來決定獲取哪些樣本用作range,一個區分配一個樣本區間
[Scala]
純文本查看
複製代碼
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
def
determineBounds[K
:
Ordering
:
ClassTag](
candidates
:
ArrayBuffer[(K, Float)],
partitions
:
Int)
:
Array[K]
=
{
val
ordering
=
implicitly[Ordering[K]]
val
ordered
=
candidates.sortBy(
_
.
_
1
)
val
numCandidates
=
ordered.size
val
sumWeights
=
ordered.map(
_
.
_
2
.toDouble).sum
val
step
=
sumWeights / partitions
var
cumWeight
=
0.0
var
target
=
step
val
bounds
=
ArrayBuffer.empty[K]
var
i
=
0
var
j
=
0
var
previousBound
=
Option.empty[K]
while
((i < numCandidates) && (j < partitions -
1
)) {
val
(key, weight)
=
ordered(i)
cumWeight +
=
weight
if
(cumWeight >
=
target) {
// Skip duplicate values.
if
(previousBound.isEmpty || ordering.gt(key, previousBound.get)) {
bounds +
=
key
target +
=
step
j +
=
1
previousBound
=
Some(key)
}
}
i +
=
1
}
bounds.toArray
}
|
2.2 ShuffleWriter 在以前的博客裏介紹了SortShuffleWrite,在sortByKey的排序情況下使用了BypassMergeSortShuffleWriter,把焦點聚焦到key如何分配到Partitioner和每個Partition的文件將會如何寫入key,value生成Shuffle文件,在這兩點上BypassMergeSortShuffleWriter將明顯的不同於SortShuffleWrite
[Scala]
純文本查看
複製代碼
1
2
3
4
5
|
while
(records.hasNext()) {
final
Product
2
<K, V> record
=
records.next();
final
K key
=
record.
_
1
();
partitionWriters[partitioner.getPartition(key)].write(key, record.
_
2
());
}
|
2.2.1 分配key到Partition 在函數調用了partitioner.getPartition方法,還是回到RangePartitioner類中
[Scala]
純文本查看
複製代碼
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
def
getPartition(key
:
Any)
:
Int
=
{
val
k
=
key.asInstanceOf[K]
var
partition
=
0
if
(rangeBounds.length <
=
128
) {
// If we have less than 128 partitions naive search
while
(partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
partition +
=
1
}
}
else
{
// Determine which binary search method to use only once.
partition
=
binarySearch(rangeBounds, k)
// binarySearch either returns the match location or -[insertion point]-1
if
(partition <
0
) {
partition
=
-partition-
1
}
if
(partition > rangeBounds.length) {
partition
=
rangeBounds.length
}
}
if
(ascending) {
partition
}
else
{
rangeBounds.length - partition
}
}
|
- 當Partition的分配數小於128的時候,輪訓的查找每個Partition
- 當Partition大於128的時候,使用二分法查找Partition
2.2.2 生成shuffle文件
- 基於前面對key進行排序的partition的分配,寫到對應的partition文件中
- 合併Partition文件生成index和data文件(shuffle_shuffleid_mapid_0.index)(shuffle_shuffleid_mapid_0.data)因爲Partition已經合併了,最後一位reduceID都是爲0
注意:在這裏並沒有象SortShuffleWrite 對每個Partition進行排序,Spill 文件,最後合併文件,而是直接寫到了Partition文件中。 2.3 Shuffle Read讀取Shuffle文件 在BlockStoreShuffleReader的read函數裏
[Scala]
純文本查看
複製代碼
01
02
03
04
05
06
07
08
09
10
11
12
13
14
|
dep.keyOrdering
match
{
case
Some(keyOrd
:
Ordering[K])
=
>
// Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled,
// the ExternalSorter won't spill to disk.
val
sorter
=
new
ExternalSorter[K, C, C](context, ordering
=
Some(keyOrd), serializer
=
dep.serializer)
sorter.insertAll(aggregatedIter)
context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
CompletionIterator[Product
2
[K, C], Iterator[Product
2
[K, C]]](sorter.iterator, sorter.stop())
case
None
=
>
aggregatedIter
}
|
ExternalSorter.insertAll函數
[Scala]
純文本查看
複製代碼
1
2
3
4
5
6
|
while
(records.hasNext) {
addElementsRead()
val
kv
=
records.next()
buffer.insert(getPartition(kv.
_
1
), kv.
_
1
, kv.
_
2
.asInstanceOf[C])
maybeSpillCollection(usingMap
=
false
)
}
|
ExternalSorter函數,這個函數在前面的這篇博客裏介紹的比較清楚,這裏使用了buffer結構體
[Scala]
純文本查看
複製代碼
1
2
|
@
volatile
private
var
map
=
new
PartitionedAppendOnlyMap[K, C]
@
volatile
private
var
buffer
=
new
PartitionedPairBuffer[K, C]
|
在reduceByKey的這些算子相同的Key是需要合併的,所以需要使用Map結構處理相同的Key的值的合併問題,而對排序來說,並不需要相同的值合併,使用Array結構就可以了。 注:在Spark上實現Map、Array都使用了數組的結構,並沒有用鏈表結構 在上圖的PartitionPairBuffer結構中,有以下幾點要注意: 插入KV結構的時候,不進行排序,也就是在處理相同的Partition的時候直接讀取插入Array 會存在當內存不夠Spill到磁盤的情況,關於Spill請具體參考博客鏈接 2.3.1 排序 當ExternalSorter.insertAll函數完成後,纔會構建一個排序的迭代器
[Scala]
純文本查看
複製代碼
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
|
def
partitionedIterator
:
Iterator[(Int, Iterator[Product
2
[K, C]])]
=
{
val
collection
:
WritablePartitionedPairCollection[K, C]
=
if
(usingMap) map
else
buffer
val
usingMap
=
aggregator.isDefined
if
(spills.isEmpty) {
// Special case: if we have only in-memory data, we don't need to merge streams, and perhaps
// we don't even need to sort by anything other than partition ID
if
(!ordering.isDefined) {
// The user hasn't requested sorted keys, so only sort by partition ID, not key
groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None)))
}
else
{
// We do need to sort by both partition ID and key
groupByPartition(destructiveIterator(
collection.partitionedDestructiveSortedIterator(Some(keyComparator))))
}
}
else
{
// Merge spilled and in-memory data
merge(spills, destructiveIterator(
collection.partitionedDestructiveSortedIterator(comparator)))
}
}
|
這裏分成兩種情況: 還在內存裏沒有Spill到文件中去,這時候構建一個內存裏的PartitionedDestructiveSortedIterator迭代器,在迭代器中已經排序好了PartitionPairBuffer裏的內容
[Scala]
純文本查看
複製代碼
1
2
3
4
5
6
7
|
/** Iterate through the data in a given order. For this class this is not really destructive. */
override
def
partitionedDestructiveSortedIterator(keyComparator
:
Option[Comparator[K]])
:
Iterator[((Int, K), V)]
=
{
val
comparator
=
keyComparator.map(partitionKeyComparator).getOrElse(partitionComparator)
new
Sorter(
new
KVArraySortDataFormat[(Int, K), AnyRef]).sort(data,
0
, curSize, comparator)
iterator
}
|
Spill到文件裏的,文件裏的已經排好序了,需要對內存裏的PartitionPairBuffer進行排序(和前面一種情況相同的處理),最後對文件和內存進行外排序(外排序可參考博客) 2.4 最後的歸併 在Driver端Dag-scheduler-event-loop 線程中會處理每個executor返回的結果(剛纔Partition排序後的結果)
[Scala]
純文本查看
複製代碼
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
private
[scheduler]
def
handleTaskCompletion(event
:
CompletionEvent) {
....
case
Success
=
>
stage.pendingPartitions -
=
task.partitionId
task
match
{
case
rt
:
ResultTask[
_
,
_
]
=
>
// Cast to ResultStage here because it's part of the ResultTask
// TODO Refactor this out to a function that accepts a ResultStage
val
resultStage
=
stage.asInstanceOf[ResultStage]
resultStage.activeJob
match
{
case
Some(job)
=
>
if
(!job.finished(rt.outputId)) {
updateAccumulators(event)
job.finished(rt.outputId)
=
true
job.numFinished +
=
1
// If the whole job has finished, remove it
if
(job.numFinished
==
job.numPartitions) {
markStageAsFinished(resultStage)
cleanupStateForJobAndIndependentStages(job)
listenerBus.post(
SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded))
}
// taskSucceeded runs some user code that might throw an exception. Make sure
// we are resilient against that.
try
{
job.listener.taskSucceeded(rt.outputId, event.result)
}
catch
{
case
e
:
Exception
=
>
// TODO: Perhaps we want to mark the resultStage as failed?
job.listener.jobFailed(
new
SparkDriverExecutionException(e))
}
}
}
|
通過方法taskSucceeded的方法進行不同的Partition的合併
[Scala]
純文本查看
複製代碼
1
|
job.listener.taskSucceeded(rt.outputId, event.result)
|
[Scala]
純文本查看
複製代碼
1
2
3
4
5
6
7
8
9
|
override
def
taskSucceeded(index
:
Int, result
:
Any)
:
Unit
=
{
// resultHandler call must be synchronized in case resultHandler itself is not thread safe.
synchronized {
resultHandler(index, result.asInstanceOf[T])
}
if
(finishedTasks.incrementAndGet()
==
totalTasks) {
jobPromise.success(())
}
}
|
實際上是調用了resultHandler方法,我們來看看resultHandler是怎樣定義的
[Scala]
純文本查看
複製代碼
1
2
3
4
5
6
7
8
|
def
runJob[T, U
:
ClassTag](
rdd
|
|