redis源碼解析-基礎數據-skiplist(跳躍表)

太長不看版node

  • 跳躍表是有序集合的底層實現之一, 除此以外它在 Redis 中沒有其餘應用。
  • 每一個跳躍表節點的層高都是 1 至 64 之間的隨機數
  • 層高越高出現的機率越低,層高爲i的機率爲(1-p) * p^{i-1}, (p=1/4)
  • 跳躍表中,分值能夠重複, 但對象成員惟一。分值相同時,節點按照成員對象的大小進行排序。

本篇解析基於redis 5.0.0版本,本篇涉及源碼文件爲t_zset.c, server.h。git

什麼是跳躍表

跳錶是一個隨機化的數據結構,實質就是一種能夠進行二分查找的有序鏈表。 github

咱們都知道在有序數組中進行查找,可使用二分查找,將時間複雜度降爲O(log n)。可是有序鏈表作不到,是由於有序鏈表獲取某元素複雜度爲O(n),沒法經過二分的思想去跳過一些元素的訪問。redis

例以下圖要查找元素50,就必須 5 -> 6 -> 10 -> 30 -> 49 這樣去找,而不能說先看 中心元素49小於50,則開始從中心右邊開始查找,跳過元素5,6,10, 30的訪問。算法

而跳躍表則是經過在節點中提取索引的方式,實現有序鏈表的快速查找。本質上是一個空間(額外的步進指針)換時間的操做。例以下圖:

這時查找元素50變成了 5 -> 49,略過了中間元素6,10, 30。上圖中經過首節點存儲不一樣步長的指針將鏈表完美二分,可是實際上的跳錶卻相似與下面這張圖的結構,大部分狀況喜好不是完美二分的:數組

跳躍表採用了隨機算法(層高越高几率越小)來決定層高,相同層之間經過指針相連。redis實現中某節點層高爲i的機率爲 (1-p) * p^{i-1}數據結構

爲何不採用最完美的二分結構? dom

考慮一下,插入節點的狀況。當中間插入一個節點,此時的二分結構會被打破,因此須要不斷的進行調整。想一想平衡樹,紅黑樹複雜的再平衡操做,而此處的再平衡調整比之有過之而無不及。而使用隨機算法進行層高選擇的方法也能夠實現O(logN)的平均複雜度,並且操做也相對簡化的不少。函數

跳躍表(redis實現)的空間複雜度

相關定義

// 層高最大值限制
#define ZSKIPLIST_MAXLEVEL 64 /* Should be enough for 2^64 elements */
// 層高是否繼續增加的機率
#define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
// 跳錶節點定義
typedef struct zskiplistNode {
    // 存儲內容
    sds ele;
    // 分值,用於排序
    double score;
    // 後退指針
    struct zskiplistNode *backward;
    // 變長數組,記錄層信息。層高越高跳過的節點越多(由於層高越高几率越低)
    struct zskiplistLevel {
        // 指向當前層下一個節點
        struct zskiplistNode *forward;
        // 當前節點與forward所指節點中間節點數
        unsigned long span;
    } level[];
} zskiplistNode;
// 跳錶結構管理節點
typedef struct zskiplist {
    struct zskiplistNode *header, *tail;
    // 長度
    unsigned long length;
    // 跳錶高度(全部節點最高層高)
    int level;
} zskiplist;

int zslRandomLevel(void) {
    // 計算當前插入元素層高的隨機函數
    int level = 1;
    // (random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF) 機率爲1/4
    while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
        level += 1;
    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
複製代碼

層高爲1機率爲 1-p(不進while)ui

層高爲2的機率爲 p(進一次while) * (1 - p)(不進while)

層高爲3的機率爲 p(進一次while) * p(進一次while) * (1 - p)(不進while)

...

層高爲n的機率爲 (1-p) * p^{n-1}

層高的指望 E = 1/(1-p)

在機率論和統計學中,數學指望(mean)(或均值,亦簡稱指望)是試驗中每次可能結果的機率乘以其結果的總和,是最基本的數學特徵之一。它反映隨機變量平均取值的大小

在redis實現中 p=1/4, 層高指望爲E約等於1.33,因此節點的平均層高約等於1.33是個常數,從而得出跳躍表的空間複雜度爲O(n)。

跳躍表(redis實現)相關操做

建立跳躍表

zskiplistNode *zslCreateNode(int level, double score, int ele) {
    zskiplistNode *zn =
        malloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
    zn->score = score;
    zn->ele = ele;
    return zn;
}

/* Create a new skiplist. */
zskiplist *zslCreate(void) {
    int j;
    zskiplist *zsl;

    zsl = malloc(sizeof(*zsl));
    zsl->level = 1;
    zsl->length = 0;
    // 頭節點層高爲64(層高的最大限制)
    zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
    for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
        zsl->header->level[j].forward = NULL;
        zsl->header->level[j].span = 0;
    }
    zsl->header->backward = NULL;
    zsl->tail = NULL;
    return zsl;
}
複製代碼

上述代碼中能夠看到,頭節點的層高數組直接爲最大長度,由於每次查找都要從頭部開始,並且整個跳躍表的高度是動態增長的,初始化時直接按照最大值申請高度,避免後續高度增長時爲頭節點從新分配內存。因此以前的跳躍表圖例應該以下圖所示:

由於有backward指針的存在,因此第一層能夠看做是一個雙向鏈表。

插入節點

int zslRandomLevel(void) {
    // 計算當前插入元素層高的隨機函數
    int level = 1;
    // (random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF) 機率爲1/4
    while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
        level += 1;
    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
    // update存放須要更新的節點
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    int i, level;

    serverAssert(!isnan(score));
    x = zsl->header;
    // 第一步,收集須要更新的節點與步長信息
    for (i = zsl->level-1; i >= 0; i--) {
        /* store rank that is crossed to reach the insert position */
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
        // score能夠重複,重複時使用ele大小進行排序
        while (x->level[i].forward &&
                (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&
                    sdscmp(x->level[i].forward->ele,ele) < 0)))
        {
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    // 第二步, 獲取隨機層高,補全須要更新的節點
    level = zslRandomLevel();
    if (level > zsl->level) {
        for (i = zsl->level; i < level; i++) {
            rank[i] = 0;
            update[i] = zsl->header;
            update[i]->level[i].span = zsl->length;
        }
        zsl->level = level;
    }
    // 第三步,建立並分層插入節點,同時更新同層前一節點步長信息
    x = zslCreateNode(level,score,ele);
    for (i = 0; i < level; i++) {
        x->level[i].forward = update[i]->level[i].forward;
        update[i]->level[i].forward = x;

        /* update span covered by update[i] as x is inserted here */
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }
    // 第四步,更新新增節點未涉及層節點的步長信息,以及跳錶相關信息
    /* increment span for untouched levels */
    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
    }

    x->backward = (update[0] == zsl->header) ? NULL : update[0];
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
        zsl->tail = x;
    zsl->length++;
    return x;
}
複製代碼

插入節點分爲四步(舉個栗子,邊吃邊看):

假設如今我須要插入元素80,且獲取到隨機的層高爲5( 爲了全部狀況都覆蓋到)。

  1. 收集須要更新的節點與步長信息
    • 將插入新增節點後每層受影響節點存在update數組中,update[i]爲第i + 1層會受影響節點(紅框框出來的就是例子中可能會受影響的節點)。
    • 將每層頭節點與會受影響的節點中間存在節點數存在rank數組中,rank[i]爲頭節點與第i + 1層會受影響節點中間存在的節點數(rank爲[6, 5, 3, 3])。

  1. 獲取隨機層高,補全須要更新的節點,同時可能更新跳錶高度
    • 經過zslRandomLevel函數計算當前插入節點側層高,層高越高出現的概率越小(咱們指定了是5,實際是隨機的)。
    • 由於搜索須要更新節點是從跳躍表當前高度的那一層開始的,若是新插入的節點的層高比當前表高還高,那麼高出的這幾層的頭節點也是須要更新信息的(第五層的頭節點後繼有人了,因此它也須要被更新)。
    • 若是當前層高高於表高,則更新表高(表高從4變成5)。

  1. 建立並分層插入節點,同時更新同層前一節點步長信息

    • 建立節點,而後根據當前節點的層高,在每一層進行節點插入(和簡單鏈表插入同樣)。
    • 更新下每層前一個節點(update[i]對應節點)與自身節點的步長信息。
  2. 更新新增節點未涉及層節點的步長信息,以及跳錶相關信息與節點自身的相關信息

    • 若是當前節點的層高比跳錶高度低,那麼高於當前節點層高的那些層中排在當前節點以後的節點步長信息都須要+1(由於在它和它的前一個節點之間插入了新元素)。
    • 更新跳錶長度與當前節點與第一層下一節點的後退指針(後退指針能夠理解爲只有底層鏈表有)。

查找節點

/* Find the rank for an element by both score and key. * Returns 0 when the element cannot be found, rank otherwise. * Note that the rank is 1-based due to the span of zsl->header to the * first element. */
unsigned long zslGetRank(zskiplist *zsl, double score, sds ele) {
    zskiplistNode *x;
    unsigned long rank = 0;
    int i;

    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward &&
            (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                sdscmp(x->level[i].forward->ele,ele) <= 0))) {
            rank += x->level[i].span;
            x = x->level[i].forward;
        }

        /* x might be equal to zsl->header, so test if obj is non-NULL */
        if (x->ele && sdscmp(x->ele,ele) == 0) {
            return rank;
        }
    }
    return 0;
}

/* Finds an element by its rank. The rank argument needs to be 1-based. */
zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
    zskiplistNode *x;
    unsigned long traversed = 0;
    int i;

    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
        {
            traversed += x->level[i].span;
            x = x->level[i].forward;
        }
        if (traversed == rank) {
            return x;
        }
    }
    return NULL;
}
複製代碼

redis實現中跳躍表和dict共同實現了zset,dict實現O(1)複雜度獲取元素對應score,跳躍表用來處理區間查詢的相關操做,同時由於score能夠重複,因此跳躍表無需實現經過ele獲取score(經過dict查)以及經過score獲取ele(貌似也沒有這個需求)。

通常查詢需求有兩個:

  • 根據rank查詢節點,主要是爲了經過該節點指針進行遍歷獲取某個區間的節點數據。
  • 根據score與ele(score可能重複,因此須要ele)獲取節點的rank,進行count之類的數值計算。

大致的流程都是按照從左上方開始向右下方搜索的路線進行查詢( 如上圖紅線標記路徑)。

刪除節點

/* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
    int i;
    for (i = 0; i < zsl->level; i++) {
        // 被刪除節點在第i層有節點,則update[i]爲被刪除節點的前一個節點
        if (update[i]->level[i].forward == x) {
            // 步長 = 原步長 + 被刪除節點步長 - 1(被刪除節點)
            update[i]->level[i].span += x->level[i].span - 1;
            // 指針越過被刪除節點
            update[i]->level[i].forward = x->level[i].forward;
        } else {
            // 被刪除節點在第i層無節點,則 步長 = 原步長 - 1(被刪除節點)
            update[i]->level[i].span -= 1;
        }
    }
    if (x->level[0].forward) {
        // 更新被刪除節點下一節點的後退指針
        x->level[0].forward->backward = x->backward;
    } else {
        zsl->tail = x->backward;
    }
    while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
        zsl->level--;
    zsl->length--;
}

int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    int i;

    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward &&
                (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&
                     sdscmp(x->level[i].forward->ele,ele) < 0)))
        {
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    /* We may have multiple elements with the same score, what we need * is to find the element with both the right score and object. */
    x = x->level[0].forward;
    if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
        zslDeleteNode(zsl, x, update);
        if (!node)
            zslFreeNode(x);
        else
            *node = x;
        return 1;
    }
    return 0; /* not found */
}
複製代碼

刪除節點與添加節點步驟相似,分爲三步:

  1. 收集須要更新的節點。
  2. 刪除節點所在的層鏈表移除節點(和簡單鏈表移除節點同樣),並更新前一節點的步長信息(update[i]所存節點)。
  3. 更新跳躍表高度與長度。
相關文章
相關標籤/搜索