讀懂纔會用 : 帶你見識 Redis 的 zset

快餐車

本文從代碼角度分析Redis 的 zset 結構,但願經過本文掌握以下內容:java

  1. Redis 中 zset 不是單一結構完成,是跳錶和哈希表共同完成node

  2. 跳錶的實現原理,跳錶升維全靠隨機算法

  3. 跳錶中查找、插入、刪除的三個口訣數據庫

  4. 使用場景(簡單延時隊列、排行榜、簡單限流)編程

若是您還不能瞭然於胸,請繼續閱讀本文。數組

場景案例

假設咱們有某個班級全部學生的語文成績,想統計、查詢區間範圍、查詢單個學生成績、知足高性能讀取這些需求,Redis 的 zset 結構無疑是最好的選擇。Redis 提供了豐富的 API。示例:bash

ZADD yuwen 90 s01 89 s03 99 s02 74 s04 97 s05微信

以 yuwen 爲 key 分別存儲了 s01 到 s06 共計 6 名學生的分數,咱們能夠查詢任一學生的成績數據結構

ZSCORE yuwen s03app

能夠按照排序返回指定區間內的全部元素

ZRANGE yuwen 1 2 withscores

能夠訪問指定分數區間內的全部元素

ZRANGEBYSCORE yuwen 90 100 withscores

能夠統計指定區間內的個數

ZCOUNT yuwen 80 90

實現

zset 結構中,既支持按單個元素查詢,又支持範圍查詢,是如何實現的呢?咱們深刻代碼分析,在 Redis 的 t_zset.c 的註釋中,提到:

/* ZSETs are ordered sets using two data structures to hold the same elements
 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
 * data structure.
 *
 * The elements are added to a hash table mapping Redis objects to scores.
 * At the same time the elements are added to a skip list mapping scores
 * to Redis objects (so objects are sorted by scores in this "view").複製代碼

翻譯過來是 Redis 中有兩種數據結構來支持 zset 的功能,一個是 hash table ,一個是 skip list。先來看一下 zset 在代碼中的定義:

typedef struct zset {
    dict *dict;
    zskiplist *zsl;
} zset;複製代碼

dict 是一個hash table ,各類編程語言中都有實現。能夠保證 O(1) 的時間複雜度,不作過多解釋。咱們繼續看 zskiplist 的定義:

typedef struct zskiplist {
    struct zskiplistNode *header, *tail;
    unsigned long length;
    int level;
} zskiplist;複製代碼

zskiplist 是 Redis 對 skiplist 作了變種,skiplist 就是咱們常說的跳錶。

跳錶

跳錶由 William Pugh 於1990年發表的論文 Skip lists: a probabilistic alternative to balanced trees 中被首次提出,查找時間複雜度爲平均 O(logN),最差 O(N),在大部分狀況下效率可與平衡樹相媲美,但實現比平衡樹簡單的多,跳錶是一種典型的以空間換時間的數據結構。

跳錶具備如下幾個特色:

  • 由許多層結構組成。

  • 每一層都是一個有序的鏈表。

  • 最底層 (Level 1) 的鏈表包含全部元素。

  • 若是一個元素出如今 Level i 的鏈表中,則它在 Level i 之下的鏈表也都會出現。

  • 每一個節點包含兩個指針,一個指向同一鏈表中的下一個元素,一個指向下面一層的元素。

跳錶的查找會從頂層鏈表的頭部元素開始,而後遍歷該鏈表,直到找到元素大於或等於目標元素的節點,若是當前元素正好等於目標,那麼就直接返回它。若是當前元素小於目標元素,那麼就垂直降低到下一層繼續搜索,若是當前元素大於目標或到達鏈表尾部,則移動到前一個節點的位置,而後垂直降低到下一層。正由於 Skiplist 的搜索過程會不斷地從一層跳躍到下一層的,因此被稱爲跳躍表。

仍是舉個例子,假設連接包含1-10,共10個元素。咱們要找到第9個,須要從 header 遍歷,共 9 次才能找到



一次只能比較一個數,最壞的狀況下時間複雜度是O(n),若是咱們一次能夠比較2個元素就行了:



一次查找2個的話,咱們只找了5次就找到了。因此就有了相似下面的結構,在鏈表上增長一層減小了元素個數的「鏈表」:



若是增長兩層「鏈表」,只查找3次就能夠找到:


即使是咱們找元素8,也只須要遍歷 1->4->7->8,共4次查詢。

跳錶就是這樣的一種數據結構,結點是跳過一部分的,從而加快了查詢的速度。以前講HashMap 中咱們提到,java 8 中當哈希衝突個數大於 7 個的時候,轉換爲紅黑樹。跳錶跟紅黑樹二者的算法複雜度差很少,爲何Redis要使用跳錶而不使用紅黑樹呢?跳錶相對於紅黑樹,代碼簡單。若是咱們要查詢一個區間裏面的值,用平衡樹實現可能會麻煩。刪除一段區間時,若是是平衡樹,就會至關困難,畢竟涉及到樹的平衡問題,而跳錶則沒有這種煩惱。

整個查詢過程,能夠簡化理解爲 if (下一個是否大於結果) 下一個 else 下一層


增強版跳錶

Redis 中的對 skiplist 作了些改造:

  • 增長了後驅指針(*backward

  • 同時記錄value 和 score,且 score 能夠重複

  • 第一層維護了雙向鏈表

zset 結構整個類圖以下:


zskiplist 中保存的 zskiplistNode 節點定義:

typedef struct zskiplistNode {
    sds ele;
    double score;
    struct zskiplistNode *backward; //  指向上一個節點
    struct zskiplistLevel {
        struct zskiplistNode *forward;  // 指向下一個節點
        unsigned long span; // 節點以前的跨度
    } level[];  // 該節點的各層信息
} zskiplistNode;複製代碼

zskiplistNode 中定義了 zskiplistLevel 的數組,用來保存該 node 在每一層的指針。查詢跟咱們模擬的例子相似,不在詳細描述。重點看一下插入操做:

/* Insert a new node in the skiplist. Assumes the element does not already
 * exist (up to the caller to enforce that). The skiplist takes ownership
 * of the passed SDS string 'ele'. */
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    int i, level;

    serverAssert(!isnan(score));
    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        /* store rank that is crossed to reach the insert position */
        // 計算 span 信息,表示從該節點到下一個節點,須要跳躍多少次
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
        while (x->level[i].forward &&
                (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&
                    sdscmp(x->level[i].forward->ele,ele) < 0)))
        {
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    /* we assume the element is not already inside, since we allow duplicated
     * scores, reinserting the same element should never happen since the
     * caller of zslInsert() should test in the hash table if the element is
     * already inside or not. */
    // 隨機函數
    level = zslRandomLevel();
    // 若是須要上升層次記錄好位置
    if (level > zsl->level) {
        for (i = zsl->level; i < level; i++) {
            rank[i] = 0;
            update[i] = zsl->header;
            update[i]->level[i].span = zsl->length;
        }
        zsl->level = level;
    }
    x = zslCreateNode(level,score,ele);
    for (i = 0; i < level; i++) {
        x->level[i].forward = update[i]->level[i].forward;
        update[i]->level[i].forward = x;

        /* update span covered by update[i] as x is inserted here */
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }

    /* increment span for untouched levels */
    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
    }

    x->backward = (update[0] == zsl->header) ? NULL : update[0];
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
        zsl->tail = x;
    zsl->length++;
    return x;
}複製代碼

插入的時候,首先要進行查詢,而後從最底層開始,插入待插入的元素。而後看看從下而上,是否須要逐層插入。但是到底要不要插入上一層呢?咱們要想每層的跳躍都很是高效,那就越是平衡越好(第一層1級跳,第二層2級跳,第3層4級跳,第4層8級跳)。可是用算法實現起來,確實很是地複雜的,而且要嚴格地按照2地指數次冪,咱們還要對原有地結構進行調整。因此跳錶的思路是拋硬幣,聽天由命,產生一個隨機數。Redis 中 25%機率再向上擴展。這樣子,每個元素可以有X層的機率爲0.25^(X-1)次方。在 Redis 中level初始化時就定義好了,爲 32 層。那麼,第32層有多少個元素的機率你們能夠算一下。

整個插入過程,能夠簡化理解爲:先插入最底層 if (隨機機率) 擴展上一層

隨機函數:

/* Returns a random level for the new skiplist node we are going to create.
 * The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
 * (both inclusive), with a powerlaw-alike distribution where higher
 * levels are less likely to be returned. */
int zslRandomLevel(void) {
    int level = 1;
    while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
        level += 1;
    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}複製代碼


對於 zskiplist 的刪除操做,能夠分爲3個步驟:

  • 根據member(obj)和score找到節點的位置(代碼裏變量x即爲該節點,update記錄每層x的上一個節點)

  • 調動zslDeleteNode把x節點從skiplist邏輯上刪除

  • 釋放x節點內存

/* Delete an element with matching score/element from the skiplist.
 * The function returns 1 if the node was found and deleted, otherwise
 * 0 is returned.
 *
 * If 'node' is NULL the deleted node is freed by zslFreeNode(), otherwise
 * it is not freed (but just unlinked) and *node is set to the node pointer,
 * so that it is possible for the caller to reuse the node (including the
 * referenced SDS string at node->ele). */
int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    int i;

    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward &&
                (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&
                     sdscmp(x->level[i].forward->ele,ele) < 0)))
        {
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    /* We may have multiple elements with the same score, what we need
     * is to find the element with both the right score and object. */
    x = x->level[0].forward;
    if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
        zslDeleteNode(zsl, x, update);
        if (!node)
            zslFreeNode(x);
        else
            *node = x;
        return 1;
    }
    return 0; /* not found */
}複製代碼

整個刪除過程,能夠簡化理解爲:先找到,斷關聯,刪內存

在 zset 的建立中(zaddGenericCommand 方法)隱藏這一個邏輯分支:

if (server.zset_max_ziplist_entries == 0 ||
    server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
{
    zobj = createZsetObject();
} else {
    zobj = createZsetZiplistObject();
}複製代碼

Redis 初始化的時候,只判斷存儲的元素長度是否大於64個字節(server.zset_max_ziplist_entries默認128)。大於64個字節選擇zkiplist,不然ziplist。當執行增刪改查的方法,根據是ziplist 仍是 zkiplist 選擇不一樣的實現。關於zkiplist 本文不詳細敘述。只須要記住zset 在兩種狀況下使用 ziplist :

  1. 保存的元素個數不足 128 個

  2. 單個元素的大小超過 64 bytes

執行增刪改查的方法,根據是ziplist 仍是 zkiplist 選擇不一樣的實現。zkiplist 本文不詳細敘述。

結論

至此,咱們介紹了Redis 中 zset 最複雜的跳錶部分,結合代碼和理解,請思考這4個命令背後都是依賴於什麼數據結構的支撐。

ZSCORE yuwen s03 基於哈希表 O(1)複雜度

ZRANGE yuwen 1 2 withscores 基於skiplist和span查找

ZRANGEBYSCORE yuwen 90 100 withscores 基於skiplist和score查找

ZREVRANGE yuwen 1 2 withscores 基於skiplist和score 和 * backward 查找

使用場景

1. 延時隊列

zset 會按 score 進行排序,若是 score 表明想要執行時間的時間戳。在某個時間將它插入zset集合中,它變會按照時間戳大小進行排序,也就是對執行時間先後進行排序。

起一個死循環線程不斷地進行取第一個key值,若是當前時間戳大於等於該key值的socre就將它取出來進行消費刪除,能夠達到延時執行的目的。

2. 排行榜

常常瀏覽技術社區的話,應該對 「1小時最熱門」 這類榜單不陌生。如何實現呢?若是記錄在數據庫中,不太容易對實時統計數據作區分。咱們以當前小時的時間戳做爲 zset 的 key,把貼子ID做爲 member ,點擊數評論數等做爲 score,當 score 發生變化時更新 score。利用 ZREVRANGE 或者 ZRANGE 查到對應數量的記錄。

3. 限流

滑動窗口是限流常見的一直策略。若是咱們把一個用戶的 ID 做爲key 來定義一個 zset ,member 或者 score 都爲訪問時的時間戳。咱們只需統計某個 key 下在指定時間戳區間內的個數,就能獲得這個用戶滑動窗口內訪問頻次,與最大經過次數比較,來決定是否容許經過。

以上三種場景的示例代碼,在下一篇給出。也歡迎你們思考是否還有其餘應用場景。


關注我

若是您在微信閱讀,請您點擊連接 關注我 ,若是您在 PC 上閱讀請掃碼關注我,歡迎與我交流隨時指出錯誤

相關文章
相關標籤/搜索