Redis有序集合指令學習

ZADD

ZADD key [NX|XX] [CH] [INCR]score member [score member ...]node

將元素及對應分值添加到一個有序集合中redis

NX:不更新已經存在的key,只增長新元素函數

XX:只更新已經存在的key,不增長新元素oop

CH:abbr:changed.不指定時只返回新增的元素個數,指定時返回新增的和更新的元素個數之和this

INCR:參考zincrby編碼

//經過第二個參數區分是zadd仍是zincrby
void zaddCommand(client *c) {
    zaddGenericCommand(c,ZADD_NONE);
}
/* This generic command implements both ZADD and ZINCRBY. */
//zadd和zincrby兩個命令都是調用這個函數
void zaddGenericCommand(client *c, int flags) {
    static char *nanerr = "resulting score is not a number (NaN)";
    robj *key = c->argv[1];
    robj *zobj;
    sds ele;
    double score = 0, *scores = NULL;
    int j, elements;
    int scoreidx = 0;
    /* The following vars are used in order to track what the command actually
     * did during the execution, to reply to the client and to trigger the
     * notification of keyspace change. */
    int added = 0;      /* Number of new elements added. */
    int updated = 0;    /* Number of elements with updated score. */
    int processed = 0;  /* Number of elements processed, may remain zero with
                           options like XX. */

    /* Parse options. At the end 'scoreidx' is set to the argument position
     * of the score of the first score-element pair. */
    scoreidx = 2;//從第二個參數開始處理.先處理nx,xx,ch,incr參數
    while(scoreidx < c->argc) {
        char *opt = c->argv[scoreidx]->ptr;
        if (!strcasecmp(opt,"nx")) flags |= ZADD_NX;
        else if (!strcasecmp(opt,"xx")) flags |= ZADD_XX;
        else if (!strcasecmp(opt,"ch")) flags |= ZADD_CH;
        else if (!strcasecmp(opt,"incr")) flags |= ZADD_INCR;
        else break;
        scoreidx++;
    }

    /* Turn options into simple to check vars. */
    //從flag中取出相應的標誌賦給獨立的變量
    int incr = (flags & ZADD_INCR) != 0;
    int nx = (flags & ZADD_NX) != 0;
    int xx = (flags & ZADD_XX) != 0;
    int ch = (flags & ZADD_CH) != 0;

    /* After the options, we expect to have an even number of args, since
     * we expect any number of score-element pairs. */
    //member和score是一一對應的,因此確定是2的倍數.因此若是不是2的倍數或者根本
    //沒有member和score,直接返回命令語法錯誤
    elements = c->argc-scoreidx;
    if (elements % 2 || !elements) {
        addReply(c,shared.syntaxerr);
        return;
    }
    //elements賦值爲有多少對<element,score>
    elements /= 2; /* Now this holds the number of score-element pairs. */

    /* Check for incompatible options. */
    //nx和xxflag互斥,兩者不能同時出現
    if (nx && xx) {
        addReplyError(c,
            "XX and NX options at the same time are not compatible");
        return;
    }
    //如有incr標誌,則只能有一對<element,score>
    //爲何不能是多對?
    if (incr && elements > 1) {
        addReplyError(c,
            "INCR option supports a single increment-element pair");
        return;
    }

    /* Start parsing all the scores, we need to emit any syntax error
     * before executing additions to the sorted set, as the command should
     * either execute fully or nothing at all. */
    //依次檢查每個分數值
    scores = zmalloc(sizeof(double)*elements);
    for (j = 0; j < elements; j++) {
        //該函數中會檢查score是不是合法的double類型的值
        if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL)
            != C_OK) goto cleanup;
    }

    /* Lookup the key and create the sorted set if does not exist. */
    //根據key查找對應的有序集合的value
    zobj = lookupKeyWrite(c->db,key);
    //key不存在
    if (zobj == NULL) {
        //若是設置了xx這個flag,直接返回錯誤
        if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */
        //根據redis的配置,若是有序集合設置了不使用ziplist存儲或者說第一個插入元素的長度大於
        //設置的最大ziplist的元素長度值,則使用跳躍表存儲不然使用ziplist
        if (server.zset_max_ziplist_entries == 0 ||
            server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
        {
            zobj = createZsetObject();
        } else {
            zobj = createZsetZiplistObject();
        }
        //把key,zobj插入字典
        dbAdd(c->db,key,zobj);
    //key存在
    } else {
        //若是不是有序集合,直接返回錯誤
        if (zobj->type != OBJ_ZSET) {
            addReply(c,shared.wrongtypeerr);
            goto cleanup;
        }
    }

    //elements是<member,score>對數
    for (j = 0; j < elements; j++) {
        double newscore;
        score = scores[j];
        //retflags設置爲前文中的flags變量
        int retflags = flags;

        ele = c->argv[scoreidx+1+j*2]->ptr;
        //每次遍歷,score是分數,ele是member.調用zsetadd插入zobj
        int retval = zsetAdd(zobj, score, ele, &retflags, &newscore);
        if (retval == 0) {
            addReplyError(c,nanerr);
            goto cleanup;
        }
        //根據retflags,即一個元素是更新仍是新加入,仍是未作處理(即member存在,而且
        //score值與新設置的一致),更新相應的計數變量(這些變量最後會返回給客戶端)
        if (retflags & ZADD_ADDED) added++;
        if (retflags & ZADD_UPDATED) updated++;
        if (!(retflags & ZADD_NOP)) processed++;
        score = newscore;
    }
    server.dirty += (added+updated);
//經過命令中的flag,返回給客戶端不一樣的值
reply_to_client:
    if (incr) { /* ZINCRBY or INCR option. */
        if (processed)
            addReplyDouble(c,score);
        else
            addReply(c,shared.nullbulk);
    } else { /* ZADD. */
        addReplyLongLong(c,ch ? added+updated : added);
    }

//若是有更新或者新加,須要執行相應的watch key的通知及keyspace的通知
cleanup:
    zfree(scores);
    if (added || updated) {
        signalModifiedKey(c->db,key);
        notifyKeyspaceEvent(NOTIFY_ZSET,
            incr ? "zincr" : "zadd", key, c->db->id);
    }
}

ZINCRBY

ZINCRBY key increment memberspa

若是key存在,就給相應member的score增長increment指針

不然直接給key設置分數爲incrementcode

//與zadd調用同一個函數,至關於zadd key incr,把incr flag置位
void zincrbyCommand(client *c) {
    zaddGenericCommand(c,ZADD_INCR);
}

ZCARD

ZCARD keyserver

返回有序集合的元素個數

void zcardCommand(client *c) {
    robj *key = c->argv[1];
    robj *zobj;
    //查找key對應的value
    if ((zobj = lookupKeyReadOrReply(c,key,shared.czero)) == NULL ||
        checkType(c,zobj,OBJ_ZSET)) return;
    //經過zsetLength獲取zobj中的元素個數
    addReplyLongLong(c,zsetLength(zobj));
}
unsigned int zsetLength(const robj *zobj) {
    int length = -1;
    //若是是ziplist,經過zzlLength函數獲取長度
    //若是長度字段中的值小於UINT16_MAX,直接返回長度。不然須要遍歷獲取長度
    if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
        length = zzlLength(zobj->ptr);
    //若是是skiplist,直接返回zsl->length
    } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
        length = ((const zset*)zobj->ptr)->zsl->length;
    } else {
        serverPanic("Unknown sorted set encoding");
    }
    return length;
}

ZCOUNT

ZCOUNT key min max

返回key中score值在min和max之間的元素個數

其中min和max能夠加(,如 zcount key (5 (10

加左括號表示不包含。不加表示包含

void zcountCommand(client *c) {
    robj *key = c->argv[1];
    robj *zobj;
    zrangespec range;
    int count = 0;

    /* Parse the range arguments */
    //斷定範圍.並將最大最小及是否包含寫入range結構體中
    if (zslParseRange(c->argv[2],c->argv[3],&range) != C_OK) {
        addReplyError(c,"min or max is not a float");
        return;
    }

    /* Lookup the sorted set */
    if ((zobj = lookupKeyReadOrReply(c, key, shared.czero)) == NULL ||
        checkType(c, zobj, OBJ_ZSET)) return;
    //判斷zobj底層編碼是ziplist仍是skiplist
    if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
        unsigned char *zl = zobj->ptr;
        unsigned char *eptr, *sptr;
        double score;
        //找出第一個在範圍以內的元素
        /* Use the first element in range as the starting point */
        eptr = zzlFirstInRange(zl,&range);

        /* No "first" element */
        if (eptr == NULL) {
            addReply(c, shared.czero);
            return;
        }

        /* First element is in range */
        //ziplist中member和score是兩個entry,而且member以後保存着score
        //總體順序是按score從小到大排列,score相同時,按member的字典序排列
        sptr = ziplistNext(zl,eptr);
        //因此此處從第一個元素的下一個entry處獲取score
        score = zzlGetScore(sptr);
        serverAssertWithInfo(c,zobj,zslValueLteMax(score,&range));

        /* Iterate over elements in range */
        //迭代這個ziplist,若是score知足要求,則count++而且繼續迭代,不然跳出
        //最後會返回count
        while (eptr) {
            score = zzlGetScore(sptr);

            /* Abort when the node is no longer in range. */
            if (!zslValueLteMax(score,&range)) {
                break;
            } else {
                count++;
                zzlNext(zl,&eptr,&sptr);
            }
        }
    } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
        zset *zs = zobj->ptr;
        zskiplist *zsl = zs->zsl;
        zskiplistNode *zn;
        unsigned long rank;
        //若是是跳錶,也是先取出第一個元素
        /* Find first element in range */
        zn = zslFirstInRange(zsl, &range);

        /* Use rank of first element, if any, to determine preliminary count */
        if (zn != NULL) {
            //獲取第一個元素的排名
            rank = zslGetRank(zsl, zn->score, zn->ele);
            count = (zsl->length - (rank - 1));
            //若是最大值大於zsl中的最大值,則此count就是要找的個數
            /* Find last element in range */
            zn = zslLastInRange(zsl, &range);

            /* Use rank of last element, if any, to determine the actual count */
            if (zn != NULL) {
                //若是最大值小於zsl中的最大值,則首先找到最後一個元素的rank
                rank = zslGetRank(zsl, zn->score, zn->ele);
                //從新計算count,與以前的計算公式合併以後爲
                //count = (zsl->length-(rankmin-1))-(zsl->length-rankmax))
                //      = rankmax-rankmin+1
                count -= (zsl->length - rank);
            }
        }
    } else {
        serverPanic("Unknown sorted set encoding");
    }
    //返回count
    addReplyLongLong(c, count);
}

ZRANGEBYSCORE

ZRANGEBYSCORE key min max WITHSCORES

獲取有序結合中分值位於 min和max之間的全部元素

withscores:將member 和 score一塊兒返回

limit offset count:從偏移offset開始獲取count個元素

min和max能夠爲 -inf,+inf,分別表示負無窮和正無窮

//入口函數
void zrangebyscoreCommand(client *c) {
    genericZrangebyscoreCommand(c,0);
}
/* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE. */
void genericZrangebyscoreCommand(client *c, int reverse) {
    zrangespec range;
    robj *key = c->argv[1];
    robj *zobj;
    long offset = 0, limit = -1;
    int withscores = 0;
    unsigned long rangelen = 0;
    void *replylen = NULL;
    int minidx, maxidx;
    //該函數同時用於zrangbyscore和zrevrangebyscore
    //兩者經過函數中的reverse參數標識
    //正序時第二個參數是min,第三個參數是max,逆序反之
    /* Parse the range arguments. */
    if (reverse) {
        /* Range is given as [max,min] */
        maxidx = 2; minidx = 3;
    } else {
        /* Range is given as [min,max] */
        minidx = 2; maxidx = 3;
    }
    //將參數解析出來賦值到range變量
    if (zslParseRange(c->argv[minidx],c->argv[maxidx],&range) != C_OK) {
        addReplyError(c,"min or max is not a float");
        return;
    }

    /* Parse optional extra arguments. Note that ZCOUNT will exactly have
     * 4 arguments, so we'll never enter the following code path. */
    if (c->argc > 4) {
        int remaining = c->argc - 4;
        int pos = 4;
        //解析withscores和limit參數
        while (remaining) {
            if (remaining >= 1 && !strcasecmp(c->argv[pos]->ptr,"withscores")) {
                pos++; remaining--;
                withscores = 1;
            } else if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) {
                if ((getLongFromObjectOrReply(c, c->argv[pos+1], &offset, NULL)
                        != C_OK) ||
                    (getLongFromObjectOrReply(c, c->argv[pos+2], &limit, NULL)
                        != C_OK))
                {
                    return;
                }
                pos += 3; remaining -= 3;
            } else {
                addReply(c,shared.syntaxerr);
                return;
            }
        }
    }

    /* Ok, lookup the key and get the range */
    if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL ||
        checkType(c,zobj,OBJ_ZSET)) return;
    //按zset底層編碼是ziplist仍是skiplist分別處理
    if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
        unsigned char *zl = zobj->ptr;
        unsigned char *eptr, *sptr;
        unsigned char *vstr;
        unsigned int vlen;
        long long vlong;
        double score;

        /* If reversed, get the last node in range as starting point. */
        //按正序仍是逆序分別取最後一個值或者第一個值
        if (reverse) {
            eptr = zzlLastInRange(zl,&range);
        } else {
            eptr = zzlFirstInRange(zl,&range);
        }

        /* No "first" element in the specified interval. */
        if (eptr == NULL) {
            addReply(c, shared.emptymultibulk);
            return;
        }

        /* Get score pointer for the first element. */
        serverAssertWithInfo(c,zobj,eptr != NULL);
        sptr = ziplistNext(zl,eptr);

        /* We don't know in advance how many matching elements there are in the
         * list, so we push this object that will represent the multi-bulk
         * length in the output buffer, and will "fix" it later */
        replylen = addDeferredMultiBulkLength(c);

        /* If there is an offset, just traverse the number of elements without
         * checking the score because that is done in the next loop. */
        //若是有offset,先偏移相應的元素
        //注意此處zzlNext傳入了兩個指針,會一次偏移一個<member,score>對
        //注意此處offset初始值是0,若是沒指定則不會進入此處循環
        while (eptr && offset--) {
            if (reverse) {
                zzlPrev(zl,&eptr,&sptr);
            } else {
                zzlNext(zl,&eptr,&sptr);
            }
        }

        //若是有limit,則進入循環.取limit次.limit的初始值爲-1,即便沒指定,也會進入循環
        //直到eptr爲null或者循環中break掉
        while (eptr && limit--) {
            score = zzlGetScore(sptr);

            /* Abort when the node is no longer in range. */
            //不在範圍以內時break掉
            if (reverse) {
                if (!zslValueGteMin(score,&range)) break;
            } else {
                if (!zslValueLteMax(score,&range)) break;
            }

            /* We know the element exists, so ziplistGet should always succeed */
            serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
            //取出相應的值.可能爲str,賦值給vstr,長度爲vlen,或者爲整型,賦值給vlong
            rangelen++;
            if (vstr == NULL) {
                addReplyBulkLongLong(c,vlong);
            } else {
                addReplyBulkCBuffer(c,vstr,vlen);
            }
            //若是設置了withscores標誌,則返回分數
            if (withscores) {
                addReplyDouble(c,score);
            }
            //開始迭代下一個節點
            /* Move to next node */
            if (reverse) {
                zzlPrev(zl,&eptr,&sptr);
            } else {
                zzlNext(zl,&eptr,&sptr);
            }
        }
    } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
        zset *zs = zobj->ptr;
        zskiplist *zsl = zs->zsl;
        zskiplistNode *ln;

        //同ziplist,先查找起始或最終節點
        /* If reversed, get the last node in range as starting point. */
        if (reverse) {
            ln = zslLastInRange(zsl,&range);
        } else {
            ln = zslFirstInRange(zsl,&range);
        }

        /* No "first" element in the specified interval. */
        if (ln == NULL) {
            addReply(c, shared.emptymultibulk);
            return;
        }

        /* We don't know in advance how many matching elements there are in the
         * list, so we push this object that will represent the multi-bulk
         * length in the output buffer, and will "fix" it later */
        //返回客戶端時先返回元素個數,但此處並不知道須要返回多少個元素,因此先佔個位置
        //replylen是存儲len字段的指針
        replylen = addDeferredMultiBulkLength(c);

        /* If there is an offset, just traverse the number of elements without
         * checking the score because that is done in the next loop. */
        //處理offset.向前或向後skip
        while (ln && offset--) {
            if (reverse) {
                ln = ln->backward;
            } else {
                ln = ln->level[0].forward;
            }
        }

        //處理limit
        while (ln && limit--) {
            /* Abort when the node is no longer in range. */
            if (reverse) {
                if (!zslValueGteMin(ln->score,&range)) break;
            } else {
                if (!zslValueLteMax(ln->score,&range)) break;
            }

            rangelen++;
            addReplyBulkCBuffer(c,ln->ele,sdslen(ln->ele));

            if (withscores) {
                addReplyDouble(c,ln->score);
            }

            /* Move to next node */
            if (reverse) {
                ln = ln->backward;
            } else {
                ln = ln->level[0].forward;
            }
        }
    } else {
        serverPanic("Unknown sorted set encoding");
    }
    //若是有withscores參數,返回給客戶端的字符串數量是2倍
    if (withscores) {
        rangelen *= 2;
    }
    //將rangelen放入replylen指向的位置,返回給客戶端
    setDeferredMultiBulkLength(c, replylen, rangelen);
}

ZRANK

ZRANK key member

返回有序集合中元素member的rank

以0爲起始rank,元素分數從低到高

zrevrank,元素分數從高到低

void zrankGenericCommand(client *c, int reverse) {
    robj *key = c->argv[1];
    robj *ele = c->argv[2];
    robj *zobj;
    long rank;
    //經過key找出有序集合的value zobj
    if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
        checkType(c,zobj,OBJ_ZSET)) return;

    serverAssertWithInfo(c,ele,sdsEncodedObject(ele));
    //在zobj中查找ele(第二個參數member)
    rank = zsetRank(zobj,ele->ptr,reverse);
    if (rank >= 0) {
        addReplyLongLong(c,rank);
    } else {
        addReply(c,shared.nullbulk);
    }
}

void zrankCommand(client *c) {
    zrankGenericCommand(c, 0);
}
long zsetRank(robj *zobj, sds ele, int reverse) {
    unsigned long llen;
    unsigned long rank;

    llen = zsetLength(zobj);
    //ziplist從前日後遍歷,比較entry中的元素與ele,每次將rank++
    if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
        unsigned char *zl = zobj->ptr;
        unsigned char *eptr, *sptr;

        eptr = ziplistIndex(zl,0);
        serverAssert(eptr != NULL);
        sptr = ziplistNext(zl,eptr);
        serverAssert(sptr != NULL);

        rank = 1;
        while(eptr != NULL) {
            if (ziplistCompare(eptr,(unsigned char*)ele,sdslen(ele)))
                break;
            rank++;
            zzlNext(zl,&eptr,&sptr);
        }

        if (eptr != NULL) {
            //若是是逆序取,直接將llen-rank就是逆向的rank
            if (reverse)
                return llen-rank;
            else
                return rank-1;
        } else {
            return -1;
        }
    //skiplist經過zslGetRank獲取rank,具體過程爲跳錶查找,將相應路過節點的span相加
    } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
        zset *zs = zobj->ptr;
        zskiplist *zsl = zs->zsl;
        dictEntry *de;
        double score;

        de = dictFind(zs->dict,ele);
        if (de != NULL) {
            score = *(double*)dictGetVal(de);
            rank = zslGetRank(zsl,score,ele);
            /* Existing elements always have a rank. */
            serverAssert(rank != 0);
            //逆向取rank
            if (reverse)
                return llen-rank;
            else
                return rank-1;
        } else {
            return -1;
        }
    } else {
        serverPanic("Unknown sorted set encoding");
    }
}

ZREM

ZREM key member [member ...]

從有序集合中刪除相應的member

void zremCommand(client *c) {
    robj *key = c->argv[1];
    robj *zobj;
    int deleted = 0, keyremoved = 0, j;
    //根據key找到對應的zobj
    if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
        checkType(c,zobj,OBJ_ZSET)) return;
    //依次刪除相應的元素,每次刪除以後檢查zset是否爲空,若是爲空,刪掉該key,而且break
    for (j = 2; j < c->argc; j++) {
        if (zsetDel(zobj,c->argv[j]->ptr)) deleted++;
        if (zsetLength(zobj) == 0) {
            dbDelete(c->db,key);
            keyremoved = 1;
            break;
        }
    }
    //若是確實有member被刪除掉,通知keyspace zrem事件
    //若是zset整個都被刪除了,通知keyspace del事件
    if (deleted) {
        notifyKeyspaceEvent(NOTIFY_ZSET,"zrem",key,c->db->id);
        if (keyremoved)
            notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
        signalModifiedKey(c->db,key);
        server.dirty += deleted;
    }
    //返回給客戶端實際刪除的member個數
    addReplyLongLong(c,deleted);
}
/* Delete the element 'ele' from the sorted set, returning 1 if the element
 * existed and was deleted, 0 otherwise (the element was not there). */
int zsetDel(robj *zobj, sds ele) {
    if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
        unsigned char *eptr;
        //ziplist先找到ele所在位置的指針eptr
        if ((eptr = zzlFind(zobj->ptr,ele,NULL)) != NULL) {
            //將該元素刪除.ziplist刪除時會resize,此處將刪除以後ziplist的指針復值給zobj->ptr
            zobj->ptr = zzlDelete(zobj->ptr,eptr);
            return 1;
        }
    } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
        zset *zs = zobj->ptr;
        dictEntry *de;
        double score;
        //skiplist現將zobj->ptr->dict相應的ele刪除掉。此處並未真實刪除
        //而是將ele所在的dictEntry返回
        de = dictUnlink(zs->dict,ele);
        if (de != NULL) {
            /* Get the score in order to delete from the skiplist later. */
            //經過dictEntry獲取score
            score = *(double*)dictGetVal(de);

            /* Delete from the hash table and later from the skiplist.
             * Note that the order is important: deleting from the skiplist
             * actually releases the SDS string representing the element,
             * which is shared between the skiplist and the hash table, so
             * we need to delete from the skiplist as the final step. */
            //此處將dict中的key和value實際free掉
            dictFreeUnlinkedEntry(zs->dict,de);

            /* Delete from skiplist. */
            //從skiplist中刪除元素.ele這個sds在hash和skiplist共享.從skiplist中刪除時
            //會釋放此sds,因此必須先刪除dict中的元素再刪除skiplist中的元素
            int retval = zslDelete(zs->zsl,score,ele,NULL);
            serverAssert(retval);
            //若是hash表中元素使用率小於10%,進行dict的resize
            if (htNeedsResize(zs->dict)) dictResize(zs->dict);
            return 1;
        }
    } else {
        serverPanic("Unknown sorted set encoding");
    }
    return 0; /* No such element found. */
}
相關文章
相關標籤/搜索