Redis 設計與實現 10:五大數據類型之有序集合

有序集合 sorted set (下面咱們叫zset 吧) 有兩種編碼方式:壓縮列表 ziplist 和跳錶 skiplisthtml

編碼一:ziplist

zsetziplist 中,成員(member)和分數(score)是挨在一塊兒的,元素按照分數從小到大存儲。redis

舉個例子,咱們用如下命令建立一個zset源碼分析

redis> ZADD key 26.1 z 1 a 2 b
(integer) 3

那麼這個zset的結構大體以下:
性能


下面咱們來分析一下 zscore 命令的源碼,進一步瞭解 zset 是如何利用 ziplist 存儲的編碼

int zsetScore(robj *zobj, sds member, double *score) {
    // ...
    if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
        if (zzlFind(zobj->ptr, member, score) == NULL) return C_ERR;
    }
    // ...
    return C_OK;
}

unsigned char *zzlFind(unsigned char *zl, sds ele, double *score) {
    // eptr 是 member 的指針,sptr 是 score 的指針
    unsigned char *eptr = ziplistIndex(zl,0), *sptr;

    // 遍歷 ziplist
    while (eptr != NULL) {
        // 由於 member 和 score 是挨着存儲的,因此獲取 member 的下一個節點就是 score 啦
        sptr = ziplistNext(zl,eptr);
        serverAssert(sptr != NULL);

        // 對比當前的 member 和要查詢的 member 是否相等
        if (ziplistCompare(eptr,(unsigned char*)ele,sdslen(ele))) {
            // 若是相等,則獲取分數
            if (score != NULL) *score = zzlGetScore(sptr);
            return eptr;
        }

        // 不相等則繼續往下遍歷
        eptr = ziplistNext(zl,sptr);
    }
    return NULL;
}

// 獲取分數
double zzlGetScore(unsigned char *sptr) {
    unsigned char *vstr;
    unsigned int vlen;
    long long vlong;
    char buf[128];
    double score;

    serverAssert(sptr != NULL);
    // ziplistGet 經過 sptr 指針獲取值。根據節點的編碼(前文有說到ziplist節點的編碼) 對參數賦值
    // 若是是字符串,則賦值到 vstr; 若是是整數,則賦值到 vlong。
    serverAssert(ziplistGet(sptr,&vstr,&vlen,&vlong));

    if (vstr) {
        // 若是是字符串,那麼存的就是浮點數
        memcpy(buf,vstr,vlen);
        buf[vlen] = '\0';
        // 字符串轉換成浮點數
        score = strtod(buf,NULL);
    } else {
        // 整數類型就直接賦值
        score = vlong;
    }

    return score;
}

編碼二:skiplist

跳錶的實現

skiplist 編碼的底層實現是跳錶。spa

下面是跳錶的結構圖 (圖片來自 《Redis 設計與實現》圖片集 )
設計

  1. 圖中最左部分就是 zskiplist 結構,其代碼實現以下(server.h):
typedef struct zskiplist {
    // 頭指針和尾指針,指向頭尾節點
    struct zskiplistNode *header, *tail;
    // 跳錶的節點數(不包含頭結點,空跳錶也會包含頭結點)
    unsigned long length;
    // 全部節點中,最大的層數
    int level;
} zskiplist;
  1. 圖中右邊的四個節點,就是跳錶節點 zskiplistNode,其代碼實現以下(server.h):
typedef struct zskiplistNode {
    // 成員
    sds ele;
    // 分數
    double score;
    // 後退指針,指向前一個節點
    struct zskiplistNode *backward;
    // 層,每一個節點可能有不少層,每一個層可能指向不一樣的節點
    struct zskiplistLevel {
        // 前進指針,指向下一個節點
        struct zskiplistNode *forward;
        // 跟下一個節點之間的跨度
        unsigned long span;
    } level[];
} zskiplistNode;

跳錶最重要的一個地方就是層 level,爲何這麼說呢?3d

假設zset 用鏈表有序存儲,若是咱們要查找數據,只能從頭至尾遍歷,時間複雜度是 \(O(n)\),效率很低。
鏈表指針

有什麼辦法提升效率呢?咱們能夠在上面添加一層索引。
鏈表加索引
能夠看出,咱們遍歷的性能變高了。例如咱們想找到 6,先遍歷第一層,5 到 7 之間,再往下探,就能找到 6 了!
有讀者就發現了,若是數據量很大,那找起來也很慢。
是的,那麼怎麼解決呢?再往上加索引唄!
鏈表再加幾層索引
這不,鏈表就變成了跳錶了!而上面說的層,就是這些索引啦!最終跳錶的查找時間複雜度是 \(O(logn)\)code


咱們來看看 zrange 命令的核心實現,來感覺一下跳錶的遍歷吧

zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
    zskiplistNode *x;
    unsigned long traversed = 0;
    int i;
    // 層頭結點開始
    x = zsl->header;
    // 層從高到低
    for (i = zsl->level-1; i >= 0; i--) {
        // 只要遍歷的數沒有達到 rank,就一直遍歷
        while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
        {
            // 每次加上層的跨度
            traversed += x->level[i].span;
            // 往前走
            x = x->level[i].forward;
        }
        // 若是這一層走完還沒到 rank,那就往下層走,若是仍是找不到就繼續走,直到走到最底層
        if (traversed == rank) {
            return x;
        }
    }
    return NULL;
}

zset 的結構

skiplist 編碼的 zset 的結構定義以下:

typedef struct zset {
    dict *dict;
    zskiplist *zsl;
} zset;

結構中包含了一個字典和一個跳錶,爲何用了跳錶還須要字典呢?
命令zscore這種單找一個值的,若是隻用跳錶的話,那麼查找的時間複雜度是 \(O(logn)\),加上一個字典能夠把時間複雜度縮減爲 \(O(n)\)

那麼確定有同窗就會說,加一個字典會浪費了不少空間。
的確,多加一個字典確定會多佔用必定的空間,空間換時間是一種常見的作法。不過字典的值指向的對象跟跳錶的對象是共用的。

下圖是一個 zset 的示例,爲了方便,把他們指向的字符串對象都分別畫出來了,其實是共享的。(圖片來自 《Redis 設計與實現》圖片集 )

源碼分析

咱們來看看 skiplist 編碼下的 zscore 如何實現吧。

int zsetScore(robj *zobj, sds member, double *score) {
    // 前面其餘 ziplist 編碼的就省略了...
    // if ...
    else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
        zset *zs = zobj->ptr;
        // 直接經過 dict 查找,時間複雜度複雜度 O(1)
        dictEntry *de = dictFind(zs->dict, member);
        if (de == NULL) return C_ERR;
        *score = *(double*)dictGetVal(de);
    }
    
    // ...
    return C_OK;
}

編碼轉換

當有序集合對象能夠同時知足如下兩個條件時,對象使用 ziplist 編碼:

  • 有序集合保存的元素數量小於128個(可經過 zset-max-ziplist-entries 修改配置);
  • 有序集合保存的全部元素成員的長度都小於64字節(可經過 zset-max-ziplist-value 修改配置);

不能知足以上兩個條件的有序集合對象將使用 skiplist 編碼。

相關文章
相關標籤/搜索