Timsort是spark中用做外部排序的機制。一個典型的應用是在spark sql中用來作Order操做的實現。Order時候將行記錄插入到ExternalSorter中,ExternalSorter用timsort排序數組,返回排序後的Iterator。sql
spark sql的物理計劃中,排序Sort屬於agg相關的聚合操做。相關的類有:SortAggregateExec、SortBasedAggregationIterator、SortExec等。express
排序後數據的聚合操做。構造方法和入參以下:apache
case class SortAggregateExec(數組
requiredChildDistributionExpressions: Option[Seq[Expression]],ide
groupingExpressions: Seq[NamedExpression],測試
aggregateExpressions: Seq[AggregateExpression],ui
aggregateAttributes: Seq[Attribute],this
initialInputBufferOffset: Int,spa
resultExpressions: Seq[NamedExpression],orm
child: SparkPlan)
extends UnaryExecNode
物理執行經過doExecute(): RDD[InternalRow]方法。主要代碼:
val outputIter = new SortBasedAggregationIterator(
groupingExpressions,
child.output,
iter,
aggregateExpressions,
aggregateAttributes,
initialInputBufferOffset,
resultExpressions,
(expressions, inputSchema) =>
newMutableProjection(expressions, inputSchema, subexpressionEliminationEnabled),
numOutputRows)
if (!hasInput && groupingExpressions.isEmpty) {
// There is no input and there is no grouping expressions.
// We need to output a single row as the output.
numOutputRows += 1
Iterator[UnsafeRow](outputIter.outputForEmptyGroupingKeyWithoutInput())
} else {
outputIter
}
經過構造SortBasedAggregationIterator迭代器來生成聚合後的數據迭代。將聚合前的數據迭代器做爲入參傳入SortBasedAggregationIterator中。
真正執行外部排序的類。 定義:
case class SortExec(
sortOrder: Seq[SortOrder],
global: Boolean,
child: SparkPlan,
testSpillFrequency: Int = 0)
extends UnaryExecNode with CodegenSupport
child不用說天然是子執行計劃。
testSpillFrequency表示是否階段性的spill數據到磁盤,Int型表示每隔多少條數據就spill到磁盤。通常在測試環境下使用。
sortOrder是排序的字段屬性。
global表示是否全局排序,若是全局排序的話通常須要先將各分區的數據打散shuffle,而後再執行排序。
def createSorter(): UnsafeExternalRowSorter
生成外部排序類,而後對原始數據的每行數據,插入到外部排序類,最後外部排序類返回排序後的迭代器Iterator。
protected override def doExecute(): RDD[InternalRow] = {
val peakMemory = longMetric("peakMemory")
val spillSize = longMetric("spillSize")
val sortTime = longMetric("sortTime")
child.execute().mapPartitionsInternal { iter =>
val sorter = createSorter()
val metrics = TaskContext.get().taskMetrics()
// Remember spill data size of this task before execute this operator so that we can
// figure out how many bytes we spilled for this operator.
val spillSizeBefore = metrics.memoryBytesSpilled
val sortedIterator = sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])
sortTime += sorter.getSortTimeNanos / 1000000
peakMemory += sorter.getPeakMemoryUsage
spillSize += metrics.memoryBytesSpilled - spillSizeBefore
metrics.incPeakExecutionMemory(sorter.getPeakMemoryUsage)
sortedIterator
}
}
利用UnsafeExternalRowSorter生成排序後的Iterator。
UnsafeExternalRowSorter在Spark-catalyst包裏。路徑sql/execution/ UnsafeExternalRowSorter。
它又使用UnsafeExternalSorter做爲內部排序迭代器。UnsafeExternalRowSorter自己的邏輯不復雜,主要是封裝了UnsafeExternalSorter來排序。它將原始數據插入到UnsafeExternalSorter中,最後獲取UnsafeExternalSorter的排序迭代器。
UnsafeExternalSorter在spark-core中。
路徑:org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter。它是Java類。
最終它是經過Timsort來對內存數據排序的。
包名:org.apache.spark.util.collection;
MIN_MERGE:最小merge長度,若是待排序數組長度小於該值則直接用二分差值法排序,不然引用merge過程。
private final SortDataFormat<K, Buffer> s;
SortDataFormat定義了待排序數據的格式。
少於32個元素的時候用binarySort排序。是簡單實現。
先找出一段已經排序好的數組,lo~hi,而後從hi+1處開始知道數組的最後循環迭代。每次迭代排序lo~hi+index這麼長的數組,這個數組有個特色就是前面的數據已經排序好只有最後一個元素沒有排序。每次迭代過程大體以下:
找到比最後一個原色大的數組的位置start,而後複製start~hi+index到start+1~hi+index+1,而後將hi+index+1(也就是最後一個原色)複製到start位置處,完成整段數組的排序。
若是多於32個元素,則要複雜一點了。
利用SortState來作多於32個元素的排序。
/**
* March over the array once, left to right, finding natural runs,
* extending short natural runs to minRun elements, and merging runs
* to maintain stack invariant.
*/
SortState sortState = new SortState(a, c, hi - lo);
int minRun = minRunLength(nRemaining);
do {
// Identify next run
int runLen = countRunAndMakeAscending(a, lo, hi, c);
// If run is short, extend to min(minRun, nRemaining)
if (runLen < minRun) {
int force = nRemaining <= minRun ? nRemaining : minRun;
binarySort(a, lo, lo + force, lo + runLen, c);
runLen = force;
}
// Push run onto pending-run stack, and maybe merge
sortState.pushRun(lo, runLen);
sortState.mergeCollapse();
// Advance to find next run
lo += runLen;
nRemaining -= runLen;
} while (nRemaining != 0);
// Merge all remaining runs to complete sort
assert lo == hi;
sortState.mergeForceCollapse();
assert sortState.stackSize == 1;
大體過程以下:
對2段局部排序的數組,找出插入點,而後執行Range複製插入過程,一次將多個區間數據移動,這樣對於2段局部排序好的數組,最多執行2-3次批量移動複製過程就能夠完成總體排序。
對於內存不夠放的局部排序數據,保存到多個磁盤文件,每一個磁盤文件都是一個排序好的文件,這裏叫UnsafeSorterSpillReader。
用UnsafeSorterSpillMerger作多個磁盤文件的排序類。每一個磁盤文件做爲一個文件句柄插入到PriorityQueue排序隊列中,將每次取數據時從這多個排序隊列中取出最小的元素,實現排序。