Redis(八):zset/zadd/zrange/zrembyscore 命令源碼解析

  前面幾篇文章,咱們徹底領略了redis的string,hash,list,set數據類型的實現方法,相信對redis已經再也不神祕。node

  本篇咱們將介紹redis的最後一種數據類型: zset 的相關實現。redis

 

  本篇事後,咱們對redis的各類基礎功能,應該不會再有疑惑。有可能的話,咱們後續將會對redis的高級功能的實現作解析。(如複製、哨兵模式、集羣模式)算法

 

  迴歸本篇主題,zset。zset 又稱有序集合(sorted set),便是序版本的set。通過上篇的介紹,你們能夠看到,redis的讀取功能至關有限,許可能是基於隨機數的方式進行讀取,其緣由就是set是無序的。當set有序以後,查詢能力就會獲得極大的提高。1. 能夠根據下標進行定位元素; 2. 能夠範圍查詢元素; 這是有序帶來的好處。數據結構

  那麼,咱們不妨先思考一下,如何實現有序?兩種方法:1. 根據添加順序定義,一、二、3... ; 2. 自定義排序值; 第1種方法實現簡單,添加時複雜度小,可是功能受限;第2種方法相對自由,對於每次插入均可能涉及重排序問題,可是查詢相對穩定,能夠沒必要徹底受限於系統實現;app

 

  一樣,咱們以功能列表,到數據結構,再功能實現的思路,來解析redis的zset有序集合的實現方式吧。less

 

零、redis zset相關操做方法dom

  zset: Redis 有序集合是string類型元素的集合,且不容許重複的成員。每一個元素都會關聯一個double類型的分數,經過分數來爲集合中的成員進行從小到大的排序。ide

  使用場景如: 保存任務隊列,該隊列由後臺定時掃描; 排行榜;函數

  從官方手冊上查到相關使用方法以下:ui

1> ZADD key score1 member1 [score2 member2]
功能: 向有序集合添加一個或多個成員,或者更新已存在成員的分數
返回值: 添加成功的元素個數(已存在的添加不成功)

2> ZCARD key
功能: 獲取有序集合的成員數
返回值: 元素個數或0

3> ZCOUNT key min max
功能: 計算在有序集合中指定區間分數的成員數
返回值: 區間內的元素個數

4> ZINCRBY key increment member
功能: 有序集合中對指定成員的分數加上增量 increment
返回值: member增長後的分數

5> ZINTERSTORE destination numkeys key [key ...]
功能: 計算給定的一個或多個有序集的交集並將結果集存儲在新的有序集合 key 中
返回值: 交集元素個數

6> ZLEXCOUNT key min max
功能: 在有序集合中計算指定字典區間內成員數量
返回值: 區間內的元素個數

7> ZRANGE key start stop [WITHSCORES]
功能: 經過索引區間返回有序集合指定區間內的成員
返回值: 區間內元素列表

8> ZRANGEBYLEX key min max [LIMIT offset count]
功能: 經過字典區間返回有序集合的成員
返回值: 區間內元素列表

9> ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT]
功能: 經過分數返回有序集合指定區間內的成員
返回值: 區間內元素列表

10> ZRANK key member
功能: 返回有序集合中指定成員的索引
返回值: member的排名或者 nil

11> ZREM key member [member ...]
功能: 移除有序集合中的一個或多個成員
返回值: 成功移除的元素個數

12> ZREMRANGEBYLEX key min max
功能: 移除有序集合中給定的字典區間的全部成員
返回值: 成功移除的元素個數

13> ZREMRANGEBYRANK key start stop
功能: 移除有序集合中給定的排名區間的全部成員
返回值: 成功移除的元素個數

14> ZREMRANGEBYSCORE key min max
功能: 移除有序集合中給定的分數區間的全部成員
返回值: 成功移除的元素個數

15> ZREVRANGE key start stop [WITHSCORES]
功能: 返回有序集中指定區間內的成員,經過索引,分數從高到低
返回值: 區間內元素列表及分數

16> ZREVRANGEBYSCORE key max min [WITHSCORES]
功能: 返回有序集中指定分數區間內的成員,分數從高到低排序
返回值: 區間內元素列表及分數

17> ZREVRANK key member
功能: 返回有序集合中指定成員的排名,有序集成員按分數值遞減(從大到小)排序
返回值: member排名或者 nil

18> ZSCORE key member
功能: 返回有序集中,成員的分數值
返回值: member分數

19> ZUNIONSTORE destination numkeys key [key ...]
功能: 計算給定的一個或多個有序集的並集,並存儲在新的 key 中
返回值: 存儲到新key的元素個數

20> ZSCAN key cursor [MATCH pattern] [COUNT count]
功能: 迭代有序集合中的元素(包括元素成員和元素分值)
返回值: 元素列表

21> ZPOPMAX/ZPOPMIN/BZPOPMAX/BZPOPMIN

 

1、zset 相關數據結構

  zset 的實現,使用了 ziplist, zskiplist 和 dict 進行實現。

/* ZSETs use a specialized version of Skiplists */
typedef struct zskiplistNode {
    sds ele;
    double score;
    struct zskiplistNode *backward;
    struct zskiplistLevel {
        struct zskiplistNode *forward;
        unsigned int span;
    } level[];
} zskiplistNode;
// 跳躍鏈表
typedef struct zskiplist {
    struct zskiplistNode *header, *tail;
    unsigned long length;
    int level;
} zskiplist;
// zset 主數據結構,dict + zskiplist
typedef struct zset {
    dict *dict;
    zskiplist *zsl;
} zset;
// zset 在合適場景下,將先使用 ziplist 存儲數據
typedef struct zlentry {
    unsigned int prevrawlensize, prevrawlen;
    unsigned int lensize, len;
    unsigned int headersize;
    unsigned char encoding;
    unsigned char *p;
} zlentry;

 

2、zadd 添加成員操做

  從添加實現中,咱們能夠完整領略數據結構的運用。

// 用法: ZADD key score1 member1 [score2 member2]
// t_zset.c
void zaddCommand(client *c) {
    // zadd 的多個參數變形, 使用 flags 進行區分複用
    zaddGenericCommand(c,ZADD_NONE);
}
void zaddGenericCommand(client *c, int flags) {
    static char *nanerr = "resulting score is not a number (NaN)";
    robj *key = c->argv[1];
    robj *zobj;
    sds ele;
    double score = 0, *scores = NULL, curscore = 0.0;
    int j, elements;
    int scoreidx = 0;
    /* The following vars are used in order to track what the command actually
     * did during the execution, to reply to the client and to trigger the
     * notification of keyspace change. */
    int added = 0;      /* Number of new elements added. */
    int updated = 0;    /* Number of elements with updated score. */
    int processed = 0;  /* Number of elements processed, may remain zero with
                           options like XX. */

    /* Parse options. At the end 'scoreidx' is set to the argument position
     * of the score of the first score-element pair. */
    // 從第三位置開始嘗試解析特殊標識(用法規範)
    // 按位與到 flags 中
    scoreidx = 2;
    while(scoreidx < c->argc) {
        char *opt = c->argv[scoreidx]->ptr;
        // NX: 不更新已存在的元素,只作添加操做
        if (!strcasecmp(opt,"nx")) flags |= ZADD_NX;
        // XX: 只作更新操做,不作添加操做
        else if (!strcasecmp(opt,"xx")) flags |= ZADD_XX;
        // CH: 將返回值從添加的新元素數修改成已更改元素的總數。 更改的元素是第添加的新元素以及已爲其更新分數的現有元素。 所以,命令行中指定的具備與過去相同分數的元素將不計算在內。 注意:一般,ZADD的返回值僅計算添加的新元素的數量。
        else if (!strcasecmp(opt,"ch")) flags |= ZADD_CH;
        // INCR: 使用指定元素增長指定分數, 與 ZINCRBY 相似,此場景下,只容許操做一個元素
        else if (!strcasecmp(opt,"incr")) flags |= ZADD_INCR;
        else break;
        scoreidx++;
    }

    /* Turn options into simple to check vars. */
    int incr = (flags & ZADD_INCR) != 0;
    int nx = (flags & ZADD_NX) != 0;
    int xx = (flags & ZADD_XX) != 0;
    int ch = (flags & ZADD_CH) != 0;

    /* After the options, we expect to have an even number of args, since
     * we expect any number of score-element pairs. */
    // 把特殊標識去除後,剩下的參數列表應該2n數,即 score-element 一一配對的,不然語法錯誤
    elements = c->argc-scoreidx;
    if (elements % 2) {
        addReply(c,shared.syntaxerr);
        return;
    }
    elements /= 2; /* Now this holds the number of score-element pairs. */

    /* Check for incompatible options. */
    // 互斥項
    if (nx && xx) {
        addReplyError(c,
            "XX and NX options at the same time are not compatible");
        return;
    }
    // 語法檢查,INCR 只能針對1個元素操做
    if (incr && elements > 1) {
        addReplyError(c,
            "INCR option supports a single increment-element pair");
        return;
    }

    /* Start parsing all the scores, we need to emit any syntax error
     * before executing additions to the sorted set, as the command should
     * either execute fully or nothing at all. */
    // 解析全部的 score 值爲double類型,賦值到 scores 中
    scores = zmalloc(sizeof(double)*elements);
    for (j = 0; j < elements; j++) {
        if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL)
            != C_OK) goto cleanup;
    }

    /* Lookup the key and create the sorted set if does not exist. */
    // 語法檢查
    zobj = lookupKeyWrite(c->db,key);
    if (zobj == NULL) {
        if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */
        // 建立原始key對象
        // 默認 zset_max_ziplist_entries=OBJ_ZSET_MAX_ZIPLIST_ENTRIES: 128
        // 默認 zset_max_ziplist_value=OBJ_ZSET_MAX_ZIPLIST_VALUE: 64
        // 因此此處默認主要是檢查 第1個member的長度是大於 64
        if (server.zset_max_ziplist_entries == 0 ||
            server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
        {
            // 2. 通用狀況使用 dict+quicklist 型的zset 
            zobj = createZsetObject();
        } else {
            // 1. 元素比較小的狀況下建立 ziplist 型的 zset
            zobj = createZsetZiplistObject();
        }
        // 將對象添加到db中,後續全部操做針對 zobj 操做便是對db的操做 (引用傳遞)
        dbAdd(c->db,key,zobj);
    } else {
        if (zobj->type != OBJ_ZSET) {
            addReply(c,shared.wrongtypeerr);
            goto cleanup;
        }
    }
    // 一個個元素循環添加
    for (j = 0; j < elements; j++) {
        score = scores[j];

        ele = c->argv[scoreidx+1+j*2]->ptr;
        // 分當前zobj的編碼不一樣進行添加 (ziplist, skiplist)
        // 3. ZIPLIST 編碼下的zset添加操做
        if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
            unsigned char *eptr;
            // 3.1. 查找是否存在要添加的元素 (肯定添加或更新)
            if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
                if (nx) continue;
                if (incr) {
                    score += curscore;
                    if (isnan(score)) {
                        addReplyError(c,nanerr);
                        goto cleanup;
                    }
                }

                /* Remove and re-insert when score changed. */
                if (score != curscore) {
                    // 3.2. 元素更新操做,先刪再插入
                    zobj->ptr = zzlDelete(zobj->ptr,eptr);
                    zobj->ptr = zzlInsert(zobj->ptr,ele,score);
                    server.dirty++;
                    updated++;
                }
                processed++;
            } else if (!xx) {
                /* Optimize: check if the element is too large or the list
                 * becomes too long *before* executing zzlInsert. */
                zobj->ptr = zzlInsert(zobj->ptr,ele,score);
                // 5. 超過一條件後,作 ziplist->skiplist 轉換
                // 默認 元素個數>128, 當前元素>64
                // 這兩個判斷不會重複嗎?? 兩個緣由: 1. 轉換函數內部會從新斷定; 2. 下一次循環時不會再走當前邏輯;
                if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries)
                    zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
                if (sdslen(ele) > server.zset_max_ziplist_value)
                    zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
                server.dirty++;
                added++;
                processed++;
            }
        } 
        // 4. skiplist 下的zset元素添加
        else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
            zset *zs = zobj->ptr;
            zskiplistNode *znode;
            dictEntry *de;
            // 判斷ele是否已存在,使用hash查找,快速
            de = dictFind(zs->dict,ele);
            if (de != NULL) {
                if (nx) continue;
                curscore = *(double*)dictGetVal(de);

                if (incr) {
                    score += curscore;
                    if (isnan(score)) {
                        addReplyError(c,nanerr);
                        /* Don't need to check if the sorted set is empty
                         * because we know it has at least one element. */
                        goto cleanup;
                    }
                }

                /* Remove and re-insert when score changes. */
                // 先刪再插入 skiplist
                if (score != curscore) {
                    zskiplistNode *node;
                    serverAssert(zslDelete(zs->zsl,curscore,ele,&node));
                    znode = zslInsert(zs->zsl,score,node->ele);
                    /* We reused the node->ele SDS string, free the node now
                     * since zslInsert created a new one. */
                    node->ele = NULL;
                    zslFreeNode(node);
                    /* Note that we did not removed the original element from
                     * the hash table representing the sorted set, so we just
                     * update the score. */
                    // 更新dict中的分數引用
                    dictGetVal(de) = &znode->score; /* Update score ptr. */
                    server.dirty++;
                    updated++;
                }
                processed++;
            } else if (!xx) {
                ele = sdsdup(ele);
                znode = zslInsert(zs->zsl,score,ele);
                // 添加skiplist的同時,也往 dict 中添加一份數據,由於hash的查找永遠是最快的
                serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
                server.dirty++;
                added++;
                processed++;
            }
        } else {
            serverPanic("Unknown sorted set encoding");
        }
    }

reply_to_client:
    if (incr) { /* ZINCRBY or INCR option. */
        if (processed)
            addReplyDouble(c,score);
        else
            addReply(c,shared.nullbulk);
    } else { /* ZADD. */
        addReplyLongLong(c,ch ? added+updated : added);
    }

cleanup:
    zfree(scores);
    if (added || updated) {
        signalModifiedKey(c->db,key);
        notifyKeyspaceEvent(NOTIFY_ZSET,
            incr ? "zincr" : "zadd", key, c->db->id);
    }
}

// 1. 元素比較小的狀況下建立 ziplist 型的 zset
// object.c, 建立ziplist 的zset
robj *createZsetZiplistObject(void) {
    unsigned char *zl = ziplistNew();
    robj *o = createObject(OBJ_ZSET,zl);
    o->encoding = OBJ_ENCODING_ZIPLIST;
    return o;
}
// 2. 建立通用的 zset 實例
// object.c
robj *createZsetObject(void) {
    zset *zs = zmalloc(sizeof(*zs));
    robj *o;
    // zsetDictType 稍有不一樣
    zs->dict = dictCreate(&zsetDictType,NULL);
    // 首次遇到 skiplist, 咱去瞅瞅是如何建立的
    zs->zsl = zslCreate();
    o = createObject(OBJ_ZSET,zs);
    o->encoding = OBJ_ENCODING_SKIPLIST;
    return o;
}
// server.c, zset建立時使用的dict類型,與hash有不一樣
/* Sorted sets hash (note: a skiplist is used in addition to the hash table) */
dictType zsetDictType = {
    dictSdsHash,               /* hash function */
    NULL,                      /* key dup */
    NULL,                      /* val dup */
    dictSdsKeyCompare,         /* key compare */
    NULL,                      /* Note: SDS string shared & freed by skiplist */
    NULL                       /* val destructor */
};
// 建立 skiplist 對象
/* Create a new skiplist. */
zskiplist *zslCreate(void) {
    int j;
    zskiplist *zsl;

    zsl = zmalloc(sizeof(*zsl));
    zsl->level = 1;
    zsl->length = 0;
    // 建立header節點,ZSKIPLIST_MAXLEVEL 32
    zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
    // 初始化header
    for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
        zsl->header->level[j].forward = NULL;
        zsl->header->level[j].span = 0;
    }
    zsl->header->backward = NULL;
    zsl->tail = NULL;
    return zsl;
}
/* Create a skiplist node with the specified number of levels.
 * The SDS string 'ele' is referenced by the node after the call. */
zskiplistNode *zslCreateNode(int level, double score, sds ele) {
    zskiplistNode *zn =
        zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
    zn->score = score;
    zn->ele = ele;
    return zn;
}


// 3. ZIPLIST 編碼下的zset添加操做
// 3.1. 查找是否存在要添加的元素 (肯定添加或更新)
// t_zset.c, 查找指定ele
unsigned char *zzlFind(unsigned char *zl, sds ele, double *score) {
    unsigned char *eptr = ziplistIndex(zl,0), *sptr;
    // 遍歷全部ziplist
    // 可見,此時的ziplist並無表現出有序啊
    while (eptr != NULL) {
        // eptr 至關因而 key
        // sptr 至關於score
        sptr = ziplistNext(zl,eptr);
        serverAssert(sptr != NULL);

        if (ziplistCompare(eptr,(unsigned char*)ele,sdslen(ele))) {
            /* Matching element, pull out score. */
            // 找到相應的 key 後,解析下一值,即 score
            if (score != NULL) *score = zzlGetScore(sptr);
            return eptr;
        }
        /* Move to next element. */
        // 移動兩次對象,纔會到下一元素(由於存儲是 key-score 相鄰存儲)
        eptr = ziplistNext(zl,sptr);
    }
    return NULL;
}
// t_zset.c, 獲取元素的score
double zzlGetScore(unsigned char *sptr) {
    unsigned char *vstr;
    unsigned int vlen;
    long long vlong;
    char buf[128];
    double score;

    serverAssert(sptr != NULL);
    serverAssert(ziplistGet(sptr,&vstr,&vlen,&vlong));
    // 帶小數點不帶小數點
    if (vstr) {
        memcpy(buf,vstr,vlen);
        buf[vlen] = '\0';
        // 作類型轉換
        score = strtod(buf,NULL);
    } else {
        score = vlong;
    }

    return score;
}

// 3.2. 元素更新操做,先刪再插入
// t_zset.c
/* Delete (element,score) pair from ziplist. Use local copy of eptr because we
 * don't want to modify the one given as argument. */
unsigned char *zzlDelete(unsigned char *zl, unsigned char *eptr) {
    unsigned char *p = eptr;

    /* TODO: add function to ziplist API to delete N elements from offset. */
    zl = ziplistDelete(zl,&p);
    zl = ziplistDelete(zl,&p);
    return zl;
}
// 添加 ele-score 到 ziplist 中
/* Insert (element,score) pair in ziplist. This function assumes the element is
 * not yet present in the list. */
unsigned char *zzlInsert(unsigned char *zl, sds ele, double score) {
    unsigned char *eptr = ziplistIndex(zl,0), *sptr;
    double s;
    // 在上面查找時,咱們看到ziplist也是遍歷,覺得是無序的ziplist
    // 然而實際上,插入時是維護了順序的喲
    while (eptr != NULL) {
        sptr = ziplistNext(zl,eptr);
        serverAssert(sptr != NULL);
        s = zzlGetScore(sptr);
        // 找到第一個比score大的位置,在其前面插入 ele-score
        if (s > score) {
            /* First element with score larger than score for element to be
             * inserted. This means we should take its spot in the list to
             * maintain ordering. */
            zl = zzlInsertAt(zl,eptr,ele,score);
            break;
        } else if (s == score) {
            /* Ensure lexicographical ordering for elements. */
            // 當分數相同時,按字典順序排列
            if (zzlCompareElements(eptr,(unsigned char*)ele,sdslen(ele)) > 0) {
                zl = zzlInsertAt(zl,eptr,ele,score);
                break;
            }
        }

        /* Move to next element. */
        eptr = ziplistNext(zl,sptr);
    }

    /* Push on tail of list when it was not yet inserted. */
    // 以上遍歷完成都沒有找到相應位置,說明當前score是最大值,將其插入尾部
    if (eptr == NULL)
        zl = zzlInsertAt(zl,NULL,ele,score);
    return zl;
}
// 在eptr的前面插入 ele-score
unsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, sds ele, double score) {
    unsigned char *sptr;
    char scorebuf[128];
    int scorelen;
    size_t offset;

    scorelen = d2string(scorebuf,sizeof(scorebuf),score);
    if (eptr == NULL) {
        // 直接插入到尾部
        zl = ziplistPush(zl,(unsigned char*)ele,sdslen(ele),ZIPLIST_TAIL);
        zl = ziplistPush(zl,(unsigned char*)scorebuf,scorelen,ZIPLIST_TAIL);
    } else {
        /* Keep offset relative to zl, as it might be re-allocated. */
        offset = eptr-zl;
        // 直接在 eptr 位置添加 ele, 其餘元素後移
        zl = ziplistInsert(zl,eptr,(unsigned char*)ele,sdslen(ele));
        eptr = zl+offset;

        /* Insert score after the element. */
        // 此時的 eptr 已經插入ele以後的位置,後移一位後,就能夠找到 score 的存儲位置
        serverAssert((sptr = ziplistNext(zl,eptr)) != NULL);
        zl = ziplistInsert(zl,sptr,(unsigned char*)scorebuf,scorelen);
    }
    return zl;
}

// 4. skiplist 下的zset元素添加
// 4.1. 添加元素
// t_zset.c, 添加 ele-score 到 skiplist 中
/* Insert a new node in the skiplist. Assumes the element does not already
 * exist (up to the caller to enforce that). The skiplist takes ownership
 * of the passed SDS string 'ele'. */
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
    // ZSKIPLIST_MAXLEVEL 32
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    int i, level;

    serverAssert(!isnan(score));
    x = zsl->header;
    // 初始 zsl->level = 1
    // 從header的最高層開始遍歷
    for (i = zsl->level-1; i >= 0; i--) {
        /* store rank that is crossed to reach the insert position */
        // 計算出每層能夠插入的位置
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
        // 當前level的score小於須要添加的元素時,往前推動skiplist
        while (x->level[i].forward &&
                (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&
                    sdscmp(x->level[i].forward->ele,ele) < 0)))
        {
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    /* we assume the element is not already inside, since we allow duplicated
     * scores, reinserting the same element should never happen since the
     * caller of zslInsert() should test in the hash table if the element is
     * already inside or not. */
    // 獲得一隨機的level, 決定要寫的節點數
    // 若是當前的level太小,則變動level, 從新初始化大的level
    level = zslRandomLevel();
    if (level > zsl->level) {
        for (i = zsl->level; i < level; i++) {
            rank[i] = 0;
            update[i] = zsl->header;
            update[i]->level[i].span = zsl->length;
        }
        zsl->level = level;
    }
    // 構建新的 skiplist 節點,爲每一層節點添加一樣的數據
    x = zslCreateNode(level,score,ele);
    for (i = 0; i < level; i++) {
        // 讓i層的節點與x關聯
        x->level[i].forward = update[i]->level[i].forward;
        update[i]->level[i].forward = x;

        /* update span covered by update[i] as x is inserted here */
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }

    /* increment span for untouched levels */
    // 若是當前level較小,則存在有的level未賦值狀況,須要主動+1
    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
    }
    // 關聯好header後,設置backward指針
    x->backward = (update[0] == zsl->header) ? NULL : update[0];
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
        // 同有後繼節點,說明是尾節點,賦值tail
        zsl->tail = x;
    zsl->length++;
    return x;
}

  ziplist添加沒啥好說的,skiplist能夠稍微提提,大致步驟爲四步: 

    1. 找位置, 從最高層開始, 判斷是否後繼節點小,若是小則直接在本層迭代,不然轉到下一層迭代; (每一層都要迭代至相應的位置)
    2. 計算獲得一新的隨機level,用於決定當前節點的層級;
    3. 依次對每一層與原跳錶作關聯;
    4. 設置backward指針;(雙向鏈表)

  相對說,skiplist 仍是有點抽象,咱們畫個圖來描述下上面的操做:

// 補充,咱們看一下隨機level的計算算法
// t_zset.c
/* Returns a random level for the new skiplist node we are going to create.
 * The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
 * (both inclusive), with a powerlaw-alike distribution where higher
 * levels are less likely to be returned. */
int zslRandomLevel(void) {
    int level = 1;
    // n次隨機值獲得 level, ZSKIPLIST_P:0.25
    // 按隨機機率,應該是有1/4的命中機率(若是不是呢??)
    while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
        level += 1;
    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

  先看插入過程的目的,主要是爲了先理解 skiplist 的構造過程。而在zset的更新過程,是先刪除原節點,再進行插入的這麼個過程。因此我們仍是有必要再來看看 skiplist 的刪除節點過程。

// t_zset.c, 刪除skiplist的指定節點
/* Delete an element with matching score/element from the skiplist.
 * The function returns 1 if the node was found and deleted, otherwise
 * 0 is returned.
 *
 * If 'node' is NULL the deleted node is freed by zslFreeNode(), otherwise
 * it is not freed (but just unlinked) and *node is set to the node pointer,
 * so that it is possible for the caller to reuse the node (including the
 * referenced SDS string at node->ele). */
int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    int i;

    x = zsl->header;
    // 與添加時查找對應位置同樣,先進行遍歷,找到最每一個層級最接近 node 的位置
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward &&
                (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&
                     sdscmp(x->level[i].forward->ele,ele) < 0)))
        {
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    /* We may have multiple elements with the same score, what we need
     * is to find the element with both the right score and object. */
    // 進行精確比對,相同才進行刪除
    x = x->level[0].forward;
    if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
        // 執行刪除動做
        zslDeleteNode(zsl, x, update);
        if (!node)
            zslFreeNode(x);
        else
            *node = x;
        return 1;
    }
    return 0; /* not found */
}
// 刪除 x對應的節點
// update 是node的每一層級對應的前驅節點
/* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
    int i;
    for (i = 0; i < zsl->level; i++) {
        if (update[i]->level[i].forward == x) {
            update[i]->level[i].span += x->level[i].span - 1;
            update[i]->level[i].forward = x->level[i].forward;
        } else {
            // 不相等說明該層不存在指向 x 的引用
            update[i]->level[i].span -= 1;
        }
    }
    // 更新第0層尾節點指針
    if (x->level[0].forward) {
        x->level[0].forward->backward = x->backward;
    } else {
        zsl->tail = x->backward;
    }
    // 下降 skiplist 的層級,直到第一個非空的節點爲止
    while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
        zsl->level--;
    zsl->length--;
}

  skiplist 刪除過程的示意圖以下:

 

   最後,咱們再來看另外一種狀況,即zset發生編碼轉換時,是如何作的。即如何從 ziplist 轉換到 skiplist 中呢?

// t_zset.c, 編碼類型轉換
void zsetConvert(robj *zobj, int encoding) {
    zset *zs;
    zskiplistNode *node, *next;
    sds ele;
    double score;
    // 編碼相同,直接返回
    if (zobj->encoding == encoding) return;
    // ziplist -> skiplist 轉換
    if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
        unsigned char *zl = zobj->ptr;
        unsigned char *eptr, *sptr;
        unsigned char *vstr;
        unsigned int vlen;
        long long vlong;

        if (encoding != OBJ_ENCODING_SKIPLIST)
            serverPanic("Unknown target encoding");

        zs = zmalloc(sizeof(*zs));
        zs->dict = dictCreate(&zsetDictType,NULL);
        zs->zsl = zslCreate();

        eptr = ziplistIndex(zl,0);
        serverAssertWithInfo(NULL,zobj,eptr != NULL);
        sptr = ziplistNext(zl,eptr);
        serverAssertWithInfo(NULL,zobj,sptr != NULL);

        while (eptr != NULL) {
            score = zzlGetScore(sptr);
            serverAssertWithInfo(NULL,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
            if (vstr == NULL)
                ele = sdsfromlonglong(vlong);
            else
                ele = sdsnewlen((char*)vstr,vlen);
            // 依次插入 skiplist 和 dict 中便可
            node = zslInsert(zs->zsl,score,ele);
            serverAssert(dictAdd(zs->dict,ele,&node->score) == DICT_OK);
            // zzlNext 封裝了同時迭代 eptr 和 sptr 方法
            zzlNext(zl,&eptr,&sptr);
        }

        zfree(zobj->ptr);
        zobj->ptr = zs;
        zobj->encoding = OBJ_ENCODING_SKIPLIST;
    }
    // skiplist -> ziplist 逆向轉換
    else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
        unsigned char *zl = ziplistNew();
        if (encoding != OBJ_ENCODING_ZIPLIST)
            serverPanic("Unknown target encoding");

        /* Approach similar to zslFree(), since we want to free the skiplist at
         * the same time as creating the ziplist. */
        zs = zobj->ptr;
        dictRelease(zs->dict);
        node = zs->zsl->header->level[0].forward;
        zfree(zs->zsl->header);
        zfree(zs->zsl);
        // 正向迭代轉換
        while (node) {
            zl = zzlInsertAt(zl,NULL,node->ele,node->score);
            next = node->level[0].forward;
            zslFreeNode(node);
            node = next;
        }

        zfree(zs);
        zobj->ptr = zl;
        zobj->encoding = OBJ_ENCODING_ZIPLIST;
    } else {
        serverPanic("Unknown sorted set encoding");
    }
}
// 基於ziplist, 同時迭代 ele-score
/* Move to next entry based on the values in eptr and sptr. Both are set to
 * NULL when there is no next entry. */
void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
    unsigned char *_eptr, *_sptr;
    serverAssert(*eptr != NULL && *sptr != NULL);

    _eptr = ziplistNext(zl,*sptr);
    if (_eptr != NULL) {
        _sptr = ziplistNext(zl,_eptr);
        serverAssert(_sptr != NULL);
    } else {
        /* No next entry. */
        _sptr = NULL;
    }

    *eptr = _eptr;
    *sptr = _sptr;
}

  至此,整個添加過程結束。自己是不太複雜的,主要針對 ziplist 和 skiplist 的分別處理(注意有逆向編碼)。但爲了講清總體關係,稍顯雜亂。

 

3、zrange 範圍查詢

  範圍查詢功能,redis提供了好幾個,zrange/zrangebyscore/zrangebylex... 應該說查詢方式都不太同樣,不過咱們也沒必要糾結這些,只管理會大概就行。就挑一個以 下標進行範圍查詢的實現講解下就行。

// 用法: ZRANGE key start stop [WITHSCORES]
// t_zset.c
void zrangeCommand(client *c) {
    zrangeGenericCommand(c,0);
}

void zrangeGenericCommand(client *c, int reverse) {
    robj *key = c->argv[1];
    robj *zobj;
    int withscores = 0;
    long start;
    long end;
    int llen;
    int rangelen;

    if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
        (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return;

    if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
        withscores = 1;
    } else if (c->argc >= 5) {
        addReply(c,shared.syntaxerr);
        return;
    }

    if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL
         || checkType(c,zobj,OBJ_ZSET)) return;

    /* Sanitize indexes. */
    // 小於0,則表明反向查詢,但實際的輸出順序不是按此值運算的(提供了 reverse 方法)
    llen = zsetLength(zobj);
    if (start < 0) start = llen+start;
    if (end < 0) end = llen+end;
    if (start < 0) start = 0;

    /* Invariant: start >= 0, so this test will be true when end < 0.
     * The range is empty when start > end or start >= length. */
    if (start > end || start >= llen) {
        addReply(c,shared.emptymultibulk);
        return;
    }
    if (end >= llen) end = llen-1;
    rangelen = (end-start)+1;

    /* Return the result in form of a multi-bulk reply */
    addReplyMultiBulkLen(c, withscores ? (rangelen*2) : rangelen);
    // 一樣,分 ZIPLIST 和 SKIPLIST 編碼分別實現
    if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
        unsigned char *zl = zobj->ptr;
        unsigned char *eptr, *sptr;
        unsigned char *vstr;
        unsigned int vlen;
        long long vlong;
        // ziplist 以 ele-score 方式存儲,因此步長是 2
        if (reverse)
            eptr = ziplistIndex(zl,-2-(2*start));
        else
            eptr = ziplistIndex(zl,2*start);

        serverAssertWithInfo(c,zobj,eptr != NULL);
        sptr = ziplistNext(zl,eptr);
        // 依次迭代輸出
        while (rangelen--) {
            serverAssertWithInfo(c,zobj,eptr != NULL && sptr != NULL);
            serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
            if (vstr == NULL)
                addReplyBulkLongLong(c,vlong);
            else
                addReplyBulkCBuffer(c,vstr,vlen);

            if (withscores)
                addReplyDouble(c,zzlGetScore(sptr));
            // ziplist 提供正向迭代,返回迭代功能,其實就是 offset的加減問題
            if (reverse)
                zzlPrev(zl,&eptr,&sptr);
            else
                zzlNext(zl,&eptr,&sptr);
        }

    } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
        zset *zs = zobj->ptr;
        zskiplist *zsl = zs->zsl;
        zskiplistNode *ln;
        sds ele;

        /* Check if starting point is trivial, before doing log(N) lookup. */
        // 反向使用 tail 迭代,不然使用header迭代
        if (reverse) {
            ln = zsl->tail;
            if (start > 0)
                // 獲取下標元素應該只是一個迭代循環問題,不過仍是稍微細看一下skiplist實現
                ln = zslGetElementByRank(zsl,llen-start);
        } else {
            ln = zsl->header->level[0].forward;
            if (start > 0)
                ln = zslGetElementByRank(zsl,start+1);
        }

        while(rangelen--) {
            serverAssertWithInfo(c,zobj,ln != NULL);
            ele = ln->ele;
            addReplyBulkCBuffer(c,ele,sdslen(ele));
            if (withscores)
                addReplyDouble(c,ln->score);
            // 直接正向或反向迭代便可
            ln = reverse ? ln->backward : ln->level[0].forward;
        }
    } else {
        serverPanic("Unknown sorted set encoding");
    }
}
// 根據排名查找元素
/* Finds an element by its rank. The rank argument needs to be 1-based. */
zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
    zskiplistNode *x;
    unsigned long traversed = 0;
    int i;

    x = zsl->header;
    // 好像沒有相像中的簡單哦
    // 請仔細品
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
        {
            // span 的做用??
            traversed += x->level[i].span;
            x = x->level[i].forward;
        }
        if (traversed == rank) {
            return x;
        }
    }
    return NULL;
}

  根據範圍查找元素,總體是比較簡單,迭代輸出而已。只是 skiplist 的span維護,得好好想一想。

 

4、zrembyscore 根據分數刪除元素

  zrembyscore, 首先這是個刪除命令,其實它是根據分數查詢,咱們能夠同時解析這兩種狀況。

// t_zset.c, 
void zremrangebyscoreCommand(client *c) {
    // 幾個範圍刪除,都複用 zremrangeGenericCommand
    // ZRANGE_RANK/ZRANGE_SCORE/ZRANGE_LEX
    zremrangeGenericCommand(c,ZRANGE_SCORE);
}
void zremrangeGenericCommand(client *c, int rangetype) {
    robj *key = c->argv[1];
    robj *zobj;
    int keyremoved = 0;
    unsigned long deleted = 0;
    // score 存儲使用另外的數據結構
    zrangespec range;
    zlexrangespec lexrange;
    long start, end, llen;

    /* Step 1: Parse the range. */
    // 解析參數,除了 rank 方式的查詢,其餘兩個都使用 另外的專門數據結構存儲參數
    if (rangetype == ZRANGE_RANK) {
        if ((getLongFromObjectOrReply(c,c->argv[2],&start,NULL) != C_OK) ||
            (getLongFromObjectOrReply(c,c->argv[3],&end,NULL) != C_OK))
            return;
    } else if (rangetype == ZRANGE_SCORE) {
        if (zslParseRange(c->argv[2],c->argv[3],&range) != C_OK) {
            addReplyError(c,"min or max is not a float");
            return;
        }
    } else if (rangetype == ZRANGE_LEX) {
        if (zslParseLexRange(c->argv[2],c->argv[3],&lexrange) != C_OK) {
            addReplyError(c,"min or max not valid string range item");
            return;
        }
    }

    /* Step 2: Lookup & range sanity checks if needed. */
    if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
        checkType(c,zobj,OBJ_ZSET)) goto cleanup;

    if (rangetype == ZRANGE_RANK) {
        /* Sanitize indexes. */
        llen = zsetLength(zobj);
        if (start < 0) start = llen+start;
        if (end < 0) end = llen+end;
        if (start < 0) start = 0;

        /* Invariant: start >= 0, so this test will be true when end < 0.
         * The range is empty when start > end or start >= length. */
        if (start > end || start >= llen) {
            addReply(c,shared.czero);
            goto cleanup;
        }
        if (end >= llen) end = llen-1;
    }

    /* Step 3: Perform the range deletion operation. */
    if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
        // 針對不一樣的刪除類型,使用不一樣的刪除方法
        // 因此,這段代碼的複用體如今哪裏呢???
        switch(rangetype) {
        case ZRANGE_RANK:
            zobj->ptr = zzlDeleteRangeByRank(zobj->ptr,start+1,end+1,&deleted);
            break;
        case ZRANGE_SCORE:
            // 3.1. 咱們只看 score 的刪除 --ziplist
            zobj->ptr = zzlDeleteRangeByScore(zobj->ptr,&range,&deleted);
            break;
        case ZRANGE_LEX:
            zobj->ptr = zzlDeleteRangeByLex(zobj->ptr,&lexrange,&deleted);
            break;
        }
        if (zzlLength(zobj->ptr) == 0) {
            dbDelete(c->db,key);
            keyremoved = 1;
        }
    } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
        zset *zs = zobj->ptr;
        switch(rangetype) {
        case ZRANGE_RANK:
            deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);
            break;
        case ZRANGE_SCORE:
            // 3.2. skiplist 的刪除rangeByScore 方法
            deleted = zslDeleteRangeByScore(zs->zsl,&range,zs->dict);
            break;
        case ZRANGE_LEX:
            deleted = zslDeleteRangeByLex(zs->zsl,&lexrange,zs->dict);
            break;
        }
        if (htNeedsResize(zs->dict)) dictResize(zs->dict);
        if (dictSize(zs->dict) == 0) {
            dbDelete(c->db,key);
            keyremoved = 1;
        }
    } else {
        serverPanic("Unknown sorted set encoding");
    }

    /* Step 4: Notifications and reply. */
    if (deleted) {
        char *event[3] = {"zremrangebyrank","zremrangebyscore","zremrangebylex"};
        signalModifiedKey(c->db,key);
        notifyKeyspaceEvent(NOTIFY_ZSET,event[rangetype],key,c->db->id);
        if (keyremoved)
            notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
    }
    server.dirty += deleted;
    addReplyLongLong(c,deleted);

cleanup:
    if (rangetype == ZRANGE_LEX) zslFreeLexRange(&lexrange);
}
// server.h, 範圍查詢參數存儲
/* Struct to hold a inclusive/exclusive range spec by score comparison. */
typedef struct {
    double min, max;
    int minex, maxex; /* are min or max exclusive? */
} zrangespec;

// 3.1. ziplist 的刪除range方法
// t_zset.c
unsigned char *zzlDeleteRangeByScore(unsigned char *zl, zrangespec *range, unsigned long *deleted) {
    unsigned char *eptr, *sptr;
    double score;
    unsigned long num = 0;
    if (deleted != NULL) *deleted = 0;
    // 找到首個在範圍內的指針,進行迭代
    eptr = zzlFirstInRange(zl,range);
    if (eptr == NULL) return zl;

    /* When the tail of the ziplist is deleted, eptr will point to the sentinel
     * byte and ziplistNext will return NULL. */
    while ((sptr = ziplistNext(zl,eptr)) != NULL) {
        score = zzlGetScore(sptr);
        // 確定是比 min 大的,因此只需確認比 max 小便可
        if (zslValueLteMax(score,range)) {
            /* Delete both the element and the score. */
            zl = ziplistDelete(zl,&eptr);
            zl = ziplistDelete(zl,&eptr);
            num++;
        } else {
            /* No longer in range. */
            break;
        }
    }

    if (deleted != NULL) *deleted = num;
    return zl;
}

/* Find pointer to the first element contained in the specified range.
 * Returns NULL when no element is contained in the range. */
unsigned char *zzlFirstInRange(unsigned char *zl, zrangespec *range) {
    unsigned char *eptr = ziplistIndex(zl,0), *sptr;
    double score;

    /* If everything is out of range, return early. */
    // 比較第1個元素和最後 一個元素,便可確認是否在範圍內
    if (!zzlIsInRange(zl,range)) return NULL;

    while (eptr != NULL) {
        sptr = ziplistNext(zl,eptr);
        serverAssert(sptr != NULL);

        score = zzlGetScore(sptr);
        // score >= min
        if (zslValueGteMin(score,range)) {
            /* Check if score <= max. */
            if (zslValueLteMax(score,range))
                return eptr;
            return NULL;
        }

        /* Move to next element. */
        eptr = ziplistNext(zl,sptr);
    }

    return NULL;
}
// 檢查zl是否在range範圍內
// 檢查第1個分數和最後一個數便可
/* Returns if there is a part of the zset is in range. Should only be used
 * internally by zzlFirstInRange and zzlLastInRange. */
int zzlIsInRange(unsigned char *zl, zrangespec *range) {
    unsigned char *p;
    double score;

    /* Test for ranges that will always be empty. */
    if (range->min > range->max ||
            (range->min == range->max && (range->minex || range->maxex)))
        return 0;

    p = ziplistIndex(zl,-1); /* Last score. */
    if (p == NULL) return 0; /* Empty sorted set */
    score = zzlGetScore(p);
    // scoreMax >= min
    if (!zslValueGteMin(score,range))
        return 0;

    p = ziplistIndex(zl,1); /* First score. */
    serverAssert(p != NULL);
    score = zzlGetScore(p);
    // scoreMin <= max
    if (!zslValueLteMax(score,range))
        return 0;

    return 1;
}

// 3.2. 刪除 skiplist 中的range元素
/* Delete all the elements with score between min and max from the skiplist.
 * Min and max are inclusive, so a score >= min || score <= max is deleted.
 * Note that this function takes the reference to the hash table view of the
 * sorted set, in order to remove the elements from the hash table too. */
unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec *range, dict *dict) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned long removed = 0;
    int i;

    x = zsl->header;
    // 找出每層小於 range->min 的元素
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward && (range->minex ?
            x->level[i].forward->score <= range->min :
            x->level[i].forward->score < range->min))
                x = x->level[i].forward;
        update[i] = x;
    }

    /* Current node is the last with score < or <= min. */
    x = x->level[0].forward;
    // 從第0層開始,依次刪除引用,刪除元素
    // 同有找到符合條件的元素時,一次循環也不會成立
    /* Delete nodes while in range. */
    while (x &&
           (range->maxex ? x->score < range->max : x->score <= range->max))
    {
        // 保留下一次迭代
        zskiplistNode *next = x->level[0].forward;
        zslDeleteNode(zsl,x,update);
        // 同步刪除 dict 數據
        dictDelete(dict,x->ele);
        zslFreeNode(x); /* Here is where x->ele is actually released. */
        removed++;
        x = next;
    }
    return removed;
}

  刪除的邏輯比較清晰,ziplist和skiplist分開處理。大致思路相同是:找到第一個符合條件的元素,而後迭代,直到第一個不符合條件的元素爲止。

 

  set雖然從定義上與zset有不少相通之處,然而在實現上倒是大相徑庭的。因爲不少東西和以前介紹的知識有重合的地方,也沒啥好特別說的。zset 的解析差很少就到這裏了。

  你以爲zset還有什麼有意思的實現呢?歡迎討論。

相關文章
相關標籤/搜索