TimSort源碼詳解

Python的排序算法由Peter Tim提出,所以稱爲TimSort。它最早被使用於Python語言,後被多種語言做爲默認的排序算法。TimSort實際上能夠看做是mergeSort+binarySort,它主要是針對歸併排序作了一系列優化。若是想看Python的TimSort源碼,在Cpython的Github倉庫能找到,這裏面還包含一個List對象的PyList_Sort函數。這篇文章爲了方便借用JAVA對TimSort的實現源碼來講明其原理。html

一.binarySort函數python

TimSort很是適合大量數據的排序,對於少許數據的排序,TimSort選擇使用binarySort來實現,所以我想先介紹一下binarySort的過程。git

咱們知道插入排序的思路是經過交換元素位置的方式依次插入元素(若是不太瞭解插入排序能夠先去熟悉一下),當要插入元素時,從已排序的部分的最後一位開始,依次比較其與待插入的元素的值,這樣來找到待插入元素的位置。顯然,在插入排序的過程當中,始終是有一個在增加的有序部分和在縮短的無序部分。排序過程見下圖(圖源自博客):github

 

 可是插入排序有個很明顯的問題,在找當前元素的位置時它是一步一步地在有序部分往前推動的,而有序列表的插入能夠經過二分法來減小比較次數,這和二分查找的目的不一樣可是思路相同(能夠本身嘗試一下實現它),咱們稱其爲二分插入,經過二分插入實現的排序就是二分排序(binarySort)。咱們能夠看一下它的Java源碼:算法

//a是數組,lo是待排序部分(有序部分+無序部分)的最低位(包含),hi是最高位(不包含),start是無序部分的最低位,c是比較函數即排序的依據
private static <T> void binarySort(T[] a, int lo, int hi, int start, Comparator<? super T> c) {
    assert lo <= start && start <= hi;
    if (start == lo)
        start++;
    for ( ; start < hi; start++) {//接下來就是二分插入的過程
        T pivot = a[start];
        int left = lo;
        int right = start;
        assert left <= right;
        while (left < right) {
            int mid = (left + right) >>> 1;
            if (c.compare(pivot, a[mid]) < 0)
                right = mid;
            else
                left = mid + 1;
        }
        assert left == right;
        int n = start - left;//n表示要移動的元素數量
        //優化插入過程,當要移動的元素數量爲1或2時,能夠直接交換元素位置;
        //不然將left後的元素日後挪一位再插入,方式是經過arraycopy函數複製
        switch (n) {
            case 2:  a[left + 2] = a[left + 1];
            case 1:  a[left + 1] = a[left];
                break;
            default: System.arraycopy(a, left, a, left + 1, n);
        }
        a[left] = pivot;
    }
}

二.runc#

這是TimSort中最重要的一個概念,實在找不到合適的翻譯(無奈臉)。run實際上就是一個連續上升(包含相等)或者降低(不包含相等)的子串。好比對於數組[1,3,2,4,6,4,7,7,3,2],其中有四個run,第一個是[1,3],第二個是[2,4,6],第三個是[4,7,7],第四個是[3,2],在函數中對於單調遞減的run會被反轉成遞增的序列。源碼中經過countRunAndMakeAscending()函數來獲得run:數組

private static <T> int countRunAndMakeAscending(T[] a, int lo, int hi, Comparator<? super T> c) {
    assert lo < hi;
    int runHi = lo + 1;
    if (runHi == hi)
        return 1;

    //找到run的結束位置,若是是降低的序列將其反轉
    if (c.compare(a[runHi++], a[lo]) < 0) {
        while (runHi < hi && c.compare(a[runHi], a[runHi - 1]) < 0)
            runHi++;
        reverseRange(a, lo, runHi);
    } else {
        while (runHi < hi && c.compare(a[runHi], a[runHi - 1]) >= 0)
            runHi++;
    }

    return runHi - lo;//返回值爲run的長度
}

 三.TimSort排序過程函數

直接上源碼分析,能夠參考代碼註釋和下面的解釋來閱讀:源碼分析

static <T> void sort(T[] a, int lo, int hi, Comparator<? super T> c, T[] work, int workBase, int workLen) {
    assert c != null && a != null && lo >= 0 && lo <= hi && hi <= a.length;

    int nRemaining  = hi - lo;//待排序的數組長度
    if (nRemaining < 2)
        return;  //長度爲0或1的數組無需排序

    // 若是數組長度小於32(即MIN_MERGE,TimSort的Python版本里這個值爲64),直接用binarySort排序
    if (nRemaining < MIN_MERGE) {
        int initRunLen = countRunAndMakeAscending(a, lo, hi, c);//找到第一個run,返回其長度
        binarySort(a, lo, hi, lo + initRunLen, c);//第一個run已排好序,所以binarySort的參數start=lo+initRunLen
        return;
    }

    TimSort<T> ts = new TimSort<>(a, c, work, workBase, workLen);
    int minRun = minRunLength(nRemaining);//最小run長度,看法釋A
    do {
        // 找run
        int runLen = countRunAndMakeAscending(a, lo, hi, c);

        // 若是run長度小於minRun,將其擴展爲min(nRemaining,minRun)
        if (runLen < minRun) {
            int force = nRemaining <= minRun ? nRemaining : minRun;
            binarySort(a, lo, lo + force, lo + runLen, c);//擴展run到長度force
            runLen = force;
        }

        ts.pushRun(lo, runLen);// 將run保存到棧中,看法釋B
        ts.mergeCollapse();// 根據規則合併相鄰的run,看法釋C

        // 繼續尋找run
        lo += runLen;
        nRemaining -= runLen;
    } while (nRemaining != 0);

    // Merge all remaining runs to complete sort
    assert lo == hi;
    ts.mergeForceCollapse();//最後收尾,將棧中全部run從棧頂開始依次鄰近合併,獲得一個run
    assert ts.stackSize == 1;
}

 

解釋A:在執行排序算法以前,會計算minRun的值,minRun會從[16,32]區間中選擇一個數字,使得數組的長度除以minRun等於或者略小於2的冪次方。好比長度是65,那麼minrun的值就是17;若是長度是174minrun就是22。minRunLength()函數代碼以下:post

private static int minRunLength(int n) {
    assert n >= 0;
    int r = 0;      // 若是n的低位有任何一位爲1,r就會置1
    while (n >= 32) {
        r |= (n & 1);
        n >>= 1;
    }
    return n + r;
}

解釋B:存run是經過兩個棧,分別保存run的起始位置和長度,能夠看pushRun()函數代碼:

private int stackSize = 0;  // 棧中run的數量
private final int[] runBase;
private final int[] runLen;

private void pushRun(int runBase, int runLen) {
     this.runBase[stackSize] = runBase;
     this.runLen[stackSize] = runLen;
     stackSize++;
}

解釋C:這裏的合併規則以下:假設棧頂三個run依次爲X,Y,Z,X爲棧頂run,要求它們的長度知足X+Y<Z及X<Y兩個條件。其實這就是TimSort算法的精髓所在了,它經過這樣的方式盡力保證合併的平衡性,即讓待合併的兩個數組儘量長度接近,從而提升合併的效率。經過這兩個條件限制,保證了棧中的run從棧底到棧頂是從大到小排列的,而且合併的收斂速度與斐波那契數列同樣。能夠看mergeCollapse()函數代碼:

private void mergeCollapse() {
    while (stackSize > 1) {
        int n = stackSize - 2;
        if (n > 0 && runLen[n-1] <= runLen[n] + runLen[n+1]) {//條件一不知足的話,Y就會和X、Z中較小的run合併
            if (runLen[n - 1] < runLen[n + 1])
                n--;
            mergeAt(n);
        } else if (runLen[n] <= runLen[n + 1]) {//條件二不知足的話,Y就和X合併
            mergeAt(n);
        } else {
            break; // Invariant is established
        }
    }
}

四.合併的方式

到這裏咱們就把整個流程講完了,還有最後一個問題沒有講--如何合併run?合併兩個run須要額外空間(能夠不用,可是效率過低),額外空間大小咱們能夠設爲較小的run的長度。假設咱們有先後X、Y兩個run須要合併,X較小,那麼X能夠放入臨時內存中,而後從小到大合併;若是Y較小,那麼把Y放入臨時內存,而後從大到小排序。這個流程其實也比較簡單(圖源自佛西先森博客):

 

 

 而且,因爲兩個run都是已經排好序的序列,咱們能夠在run合併以前計算A中最後一個元素在B中的位置i,那麼B中i以後的元素都不須要參與合併;同理,咱們也能夠計算B中第一個元素在A中位置j,A中j以前的元素都不須要參與合併。

在歸併排序算法中合併兩個數組就是一一比較每一個元素,把較小的放到相應的位置,而後比較下一個,這樣有一個缺點就是若是A中若是有大量的元素A[i...j]是小於B中某一個元素B[k]的,程序仍然會持續的比較A[i...j]中的每個元素和B[k],增長合併過程當中的時間消耗。

爲了優化合並的過程,TimSort設定了一個閾值MIN_GALLOP,若是A中連續MIN_GALLOP個元素比B中某一個元素要小,則經過二分搜索找到A[0]B中的位置i0,把Bi0以前的元素直接放入合併的空間中,而後再在A中找到B[i0]所在的位置j0,把Aj0以前的元素直接放入合併空間中,如此循環直至在AB中每次找到的新的位置和原位置的差值是小於MIN_GALLOP的,這才中止而後繼續進行一對一的比較。

五.總結

總結一下上面的排序的過程:

  1. 若是長度小於32直接進行二分插入排序
  2. 遍歷數組組成一個run
  3. 獲得一個run以後會把他放入棧中
  4. 若是棧頂部幾個的run符合合併條件,就會合並相鄰的兩個run
  5. 合併會使用盡可能小的內存空間和GALLOP模式來加速合併

參考資料:1.世界上最快的排序算法——Timsort

      2.JDK8官方源碼

相關文章
相關標籤/搜索