ZADD key [NX|XX] [CH] [INCR]score member [score member ...]node
將元素及對應分值添加到一個有序集合中redis
NX:不更新已經存在的key,只增長新元素函數
XX:只更新已經存在的key,不增長新元素oop
CH:abbr:changed.不指定時只返回新增的元素個數,指定時返回新增的和更新的元素個數之和this
INCR:參考zincrby編碼
//經過第二個參數區分是zadd仍是zincrby void zaddCommand(client *c) { zaddGenericCommand(c,ZADD_NONE); }
/* This generic command implements both ZADD and ZINCRBY. */ //zadd和zincrby兩個命令都是調用這個函數 void zaddGenericCommand(client *c, int flags) { static char *nanerr = "resulting score is not a number (NaN)"; robj *key = c->argv[1]; robj *zobj; sds ele; double score = 0, *scores = NULL; int j, elements; int scoreidx = 0; /* The following vars are used in order to track what the command actually * did during the execution, to reply to the client and to trigger the * notification of keyspace change. */ int added = 0; /* Number of new elements added. */ int updated = 0; /* Number of elements with updated score. */ int processed = 0; /* Number of elements processed, may remain zero with options like XX. */ /* Parse options. At the end 'scoreidx' is set to the argument position * of the score of the first score-element pair. */ scoreidx = 2;//從第二個參數開始處理.先處理nx,xx,ch,incr參數 while(scoreidx < c->argc) { char *opt = c->argv[scoreidx]->ptr; if (!strcasecmp(opt,"nx")) flags |= ZADD_NX; else if (!strcasecmp(opt,"xx")) flags |= ZADD_XX; else if (!strcasecmp(opt,"ch")) flags |= ZADD_CH; else if (!strcasecmp(opt,"incr")) flags |= ZADD_INCR; else break; scoreidx++; } /* Turn options into simple to check vars. */ //從flag中取出相應的標誌賦給獨立的變量 int incr = (flags & ZADD_INCR) != 0; int nx = (flags & ZADD_NX) != 0; int xx = (flags & ZADD_XX) != 0; int ch = (flags & ZADD_CH) != 0; /* After the options, we expect to have an even number of args, since * we expect any number of score-element pairs. */ //member和score是一一對應的,因此確定是2的倍數.因此若是不是2的倍數或者根本 //沒有member和score,直接返回命令語法錯誤 elements = c->argc-scoreidx; if (elements % 2 || !elements) { addReply(c,shared.syntaxerr); return; } //elements賦值爲有多少對<element,score> elements /= 2; /* Now this holds the number of score-element pairs. */ /* Check for incompatible options. */ //nx和xxflag互斥,兩者不能同時出現 if (nx && xx) { addReplyError(c, "XX and NX options at the same time are not compatible"); return; } //如有incr標誌,則只能有一對<element,score> //爲何不能是多對? if (incr && elements > 1) { addReplyError(c, "INCR option supports a single increment-element pair"); return; } /* Start parsing all the scores, we need to emit any syntax error * before executing additions to the sorted set, as the command should * either execute fully or nothing at all. */ //依次檢查每個分數值 scores = zmalloc(sizeof(double)*elements); for (j = 0; j < elements; j++) { //該函數中會檢查score是不是合法的double類型的值 if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL) != C_OK) goto cleanup; } /* Lookup the key and create the sorted set if does not exist. */ //根據key查找對應的有序集合的value zobj = lookupKeyWrite(c->db,key); //key不存在 if (zobj == NULL) { //若是設置了xx這個flag,直接返回錯誤 if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */ //根據redis的配置,若是有序集合設置了不使用ziplist存儲或者說第一個插入元素的長度大於 //設置的最大ziplist的元素長度值,則使用跳躍表存儲不然使用ziplist if (server.zset_max_ziplist_entries == 0 || server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr)) { zobj = createZsetObject(); } else { zobj = createZsetZiplistObject(); } //把key,zobj插入字典 dbAdd(c->db,key,zobj); //key存在 } else { //若是不是有序集合,直接返回錯誤 if (zobj->type != OBJ_ZSET) { addReply(c,shared.wrongtypeerr); goto cleanup; } } //elements是<member,score>對數 for (j = 0; j < elements; j++) { double newscore; score = scores[j]; //retflags設置爲前文中的flags變量 int retflags = flags; ele = c->argv[scoreidx+1+j*2]->ptr; //每次遍歷,score是分數,ele是member.調用zsetadd插入zobj int retval = zsetAdd(zobj, score, ele, &retflags, &newscore); if (retval == 0) { addReplyError(c,nanerr); goto cleanup; } //根據retflags,即一個元素是更新仍是新加入,仍是未作處理(即member存在,而且 //score值與新設置的一致),更新相應的計數變量(這些變量最後會返回給客戶端) if (retflags & ZADD_ADDED) added++; if (retflags & ZADD_UPDATED) updated++; if (!(retflags & ZADD_NOP)) processed++; score = newscore; } server.dirty += (added+updated); //經過命令中的flag,返回給客戶端不一樣的值 reply_to_client: if (incr) { /* ZINCRBY or INCR option. */ if (processed) addReplyDouble(c,score); else addReply(c,shared.nullbulk); } else { /* ZADD. */ addReplyLongLong(c,ch ? added+updated : added); } //若是有更新或者新加,須要執行相應的watch key的通知及keyspace的通知 cleanup: zfree(scores); if (added || updated) { signalModifiedKey(c->db,key); notifyKeyspaceEvent(NOTIFY_ZSET, incr ? "zincr" : "zadd", key, c->db->id); } }
ZINCRBY key increment memberspa
若是key存在,就給相應member的score增長increment指針
不然直接給key設置分數爲incrementcode
//與zadd調用同一個函數,至關於zadd key incr,把incr flag置位 void zincrbyCommand(client *c) { zaddGenericCommand(c,ZADD_INCR); }
ZCARD keyserver
返回有序集合的元素個數
void zcardCommand(client *c) { robj *key = c->argv[1]; robj *zobj; //查找key對應的value if ((zobj = lookupKeyReadOrReply(c,key,shared.czero)) == NULL || checkType(c,zobj,OBJ_ZSET)) return; //經過zsetLength獲取zobj中的元素個數 addReplyLongLong(c,zsetLength(zobj)); }
unsigned int zsetLength(const robj *zobj) { int length = -1; //若是是ziplist,經過zzlLength函數獲取長度 //若是長度字段中的值小於UINT16_MAX,直接返回長度。不然須要遍歷獲取長度 if (zobj->encoding == OBJ_ENCODING_ZIPLIST) { length = zzlLength(zobj->ptr); //若是是skiplist,直接返回zsl->length } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { length = ((const zset*)zobj->ptr)->zsl->length; } else { serverPanic("Unknown sorted set encoding"); } return length; }
ZCOUNT key min max
返回key中score值在min和max之間的元素個數
其中min和max能夠加(,如 zcount key (5 (10
加左括號表示不包含。不加表示包含
void zcountCommand(client *c) { robj *key = c->argv[1]; robj *zobj; zrangespec range; int count = 0; /* Parse the range arguments */ //斷定範圍.並將最大最小及是否包含寫入range結構體中 if (zslParseRange(c->argv[2],c->argv[3],&range) != C_OK) { addReplyError(c,"min or max is not a float"); return; } /* Lookup the sorted set */ if ((zobj = lookupKeyReadOrReply(c, key, shared.czero)) == NULL || checkType(c, zobj, OBJ_ZSET)) return; //判斷zobj底層編碼是ziplist仍是skiplist if (zobj->encoding == OBJ_ENCODING_ZIPLIST) { unsigned char *zl = zobj->ptr; unsigned char *eptr, *sptr; double score; //找出第一個在範圍以內的元素 /* Use the first element in range as the starting point */ eptr = zzlFirstInRange(zl,&range); /* No "first" element */ if (eptr == NULL) { addReply(c, shared.czero); return; } /* First element is in range */ //ziplist中member和score是兩個entry,而且member以後保存着score //總體順序是按score從小到大排列,score相同時,按member的字典序排列 sptr = ziplistNext(zl,eptr); //因此此處從第一個元素的下一個entry處獲取score score = zzlGetScore(sptr); serverAssertWithInfo(c,zobj,zslValueLteMax(score,&range)); /* Iterate over elements in range */ //迭代這個ziplist,若是score知足要求,則count++而且繼續迭代,不然跳出 //最後會返回count while (eptr) { score = zzlGetScore(sptr); /* Abort when the node is no longer in range. */ if (!zslValueLteMax(score,&range)) { break; } else { count++; zzlNext(zl,&eptr,&sptr); } } } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; zskiplist *zsl = zs->zsl; zskiplistNode *zn; unsigned long rank; //若是是跳錶,也是先取出第一個元素 /* Find first element in range */ zn = zslFirstInRange(zsl, &range); /* Use rank of first element, if any, to determine preliminary count */ if (zn != NULL) { //獲取第一個元素的排名 rank = zslGetRank(zsl, zn->score, zn->ele); count = (zsl->length - (rank - 1)); //若是最大值大於zsl中的最大值,則此count就是要找的個數 /* Find last element in range */ zn = zslLastInRange(zsl, &range); /* Use rank of last element, if any, to determine the actual count */ if (zn != NULL) { //若是最大值小於zsl中的最大值,則首先找到最後一個元素的rank rank = zslGetRank(zsl, zn->score, zn->ele); //從新計算count,與以前的計算公式合併以後爲 //count = (zsl->length-(rankmin-1))-(zsl->length-rankmax)) // = rankmax-rankmin+1 count -= (zsl->length - rank); } } } else { serverPanic("Unknown sorted set encoding"); } //返回count addReplyLongLong(c, count); }
ZRANGEBYSCORE key min max WITHSCORES
獲取有序結合中分值位於 min和max之間的全部元素
withscores:將member 和 score一塊兒返回
limit offset count:從偏移offset開始獲取count個元素
min和max能夠爲 -inf,+inf,分別表示負無窮和正無窮
//入口函數 void zrangebyscoreCommand(client *c) { genericZrangebyscoreCommand(c,0); }
/* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE. */ void genericZrangebyscoreCommand(client *c, int reverse) { zrangespec range; robj *key = c->argv[1]; robj *zobj; long offset = 0, limit = -1; int withscores = 0; unsigned long rangelen = 0; void *replylen = NULL; int minidx, maxidx; //該函數同時用於zrangbyscore和zrevrangebyscore //兩者經過函數中的reverse參數標識 //正序時第二個參數是min,第三個參數是max,逆序反之 /* Parse the range arguments. */ if (reverse) { /* Range is given as [max,min] */ maxidx = 2; minidx = 3; } else { /* Range is given as [min,max] */ minidx = 2; maxidx = 3; } //將參數解析出來賦值到range變量 if (zslParseRange(c->argv[minidx],c->argv[maxidx],&range) != C_OK) { addReplyError(c,"min or max is not a float"); return; } /* Parse optional extra arguments. Note that ZCOUNT will exactly have * 4 arguments, so we'll never enter the following code path. */ if (c->argc > 4) { int remaining = c->argc - 4; int pos = 4; //解析withscores和limit參數 while (remaining) { if (remaining >= 1 && !strcasecmp(c->argv[pos]->ptr,"withscores")) { pos++; remaining--; withscores = 1; } else if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) { if ((getLongFromObjectOrReply(c, c->argv[pos+1], &offset, NULL) != C_OK) || (getLongFromObjectOrReply(c, c->argv[pos+2], &limit, NULL) != C_OK)) { return; } pos += 3; remaining -= 3; } else { addReply(c,shared.syntaxerr); return; } } } /* Ok, lookup the key and get the range */ if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL || checkType(c,zobj,OBJ_ZSET)) return; //按zset底層編碼是ziplist仍是skiplist分別處理 if (zobj->encoding == OBJ_ENCODING_ZIPLIST) { unsigned char *zl = zobj->ptr; unsigned char *eptr, *sptr; unsigned char *vstr; unsigned int vlen; long long vlong; double score; /* If reversed, get the last node in range as starting point. */ //按正序仍是逆序分別取最後一個值或者第一個值 if (reverse) { eptr = zzlLastInRange(zl,&range); } else { eptr = zzlFirstInRange(zl,&range); } /* No "first" element in the specified interval. */ if (eptr == NULL) { addReply(c, shared.emptymultibulk); return; } /* Get score pointer for the first element. */ serverAssertWithInfo(c,zobj,eptr != NULL); sptr = ziplistNext(zl,eptr); /* We don't know in advance how many matching elements there are in the * list, so we push this object that will represent the multi-bulk * length in the output buffer, and will "fix" it later */ replylen = addDeferredMultiBulkLength(c); /* If there is an offset, just traverse the number of elements without * checking the score because that is done in the next loop. */ //若是有offset,先偏移相應的元素 //注意此處zzlNext傳入了兩個指針,會一次偏移一個<member,score>對 //注意此處offset初始值是0,若是沒指定則不會進入此處循環 while (eptr && offset--) { if (reverse) { zzlPrev(zl,&eptr,&sptr); } else { zzlNext(zl,&eptr,&sptr); } } //若是有limit,則進入循環.取limit次.limit的初始值爲-1,即便沒指定,也會進入循環 //直到eptr爲null或者循環中break掉 while (eptr && limit--) { score = zzlGetScore(sptr); /* Abort when the node is no longer in range. */ //不在範圍以內時break掉 if (reverse) { if (!zslValueGteMin(score,&range)) break; } else { if (!zslValueLteMax(score,&range)) break; } /* We know the element exists, so ziplistGet should always succeed */ serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong)); //取出相應的值.可能爲str,賦值給vstr,長度爲vlen,或者爲整型,賦值給vlong rangelen++; if (vstr == NULL) { addReplyBulkLongLong(c,vlong); } else { addReplyBulkCBuffer(c,vstr,vlen); } //若是設置了withscores標誌,則返回分數 if (withscores) { addReplyDouble(c,score); } //開始迭代下一個節點 /* Move to next node */ if (reverse) { zzlPrev(zl,&eptr,&sptr); } else { zzlNext(zl,&eptr,&sptr); } } } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; zskiplist *zsl = zs->zsl; zskiplistNode *ln; //同ziplist,先查找起始或最終節點 /* If reversed, get the last node in range as starting point. */ if (reverse) { ln = zslLastInRange(zsl,&range); } else { ln = zslFirstInRange(zsl,&range); } /* No "first" element in the specified interval. */ if (ln == NULL) { addReply(c, shared.emptymultibulk); return; } /* We don't know in advance how many matching elements there are in the * list, so we push this object that will represent the multi-bulk * length in the output buffer, and will "fix" it later */ //返回客戶端時先返回元素個數,但此處並不知道須要返回多少個元素,因此先佔個位置 //replylen是存儲len字段的指針 replylen = addDeferredMultiBulkLength(c); /* If there is an offset, just traverse the number of elements without * checking the score because that is done in the next loop. */ //處理offset.向前或向後skip while (ln && offset--) { if (reverse) { ln = ln->backward; } else { ln = ln->level[0].forward; } } //處理limit while (ln && limit--) { /* Abort when the node is no longer in range. */ if (reverse) { if (!zslValueGteMin(ln->score,&range)) break; } else { if (!zslValueLteMax(ln->score,&range)) break; } rangelen++; addReplyBulkCBuffer(c,ln->ele,sdslen(ln->ele)); if (withscores) { addReplyDouble(c,ln->score); } /* Move to next node */ if (reverse) { ln = ln->backward; } else { ln = ln->level[0].forward; } } } else { serverPanic("Unknown sorted set encoding"); } //若是有withscores參數,返回給客戶端的字符串數量是2倍 if (withscores) { rangelen *= 2; } //將rangelen放入replylen指向的位置,返回給客戶端 setDeferredMultiBulkLength(c, replylen, rangelen); }
ZRANK key member
返回有序集合中元素member的rank
以0爲起始rank,元素分數從低到高
zrevrank,元素分數從高到低
void zrankGenericCommand(client *c, int reverse) { robj *key = c->argv[1]; robj *ele = c->argv[2]; robj *zobj; long rank; //經過key找出有序集合的value zobj if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL || checkType(c,zobj,OBJ_ZSET)) return; serverAssertWithInfo(c,ele,sdsEncodedObject(ele)); //在zobj中查找ele(第二個參數member) rank = zsetRank(zobj,ele->ptr,reverse); if (rank >= 0) { addReplyLongLong(c,rank); } else { addReply(c,shared.nullbulk); } } void zrankCommand(client *c) { zrankGenericCommand(c, 0); }
long zsetRank(robj *zobj, sds ele, int reverse) { unsigned long llen; unsigned long rank; llen = zsetLength(zobj); //ziplist從前日後遍歷,比較entry中的元素與ele,每次將rank++ if (zobj->encoding == OBJ_ENCODING_ZIPLIST) { unsigned char *zl = zobj->ptr; unsigned char *eptr, *sptr; eptr = ziplistIndex(zl,0); serverAssert(eptr != NULL); sptr = ziplistNext(zl,eptr); serverAssert(sptr != NULL); rank = 1; while(eptr != NULL) { if (ziplistCompare(eptr,(unsigned char*)ele,sdslen(ele))) break; rank++; zzlNext(zl,&eptr,&sptr); } if (eptr != NULL) { //若是是逆序取,直接將llen-rank就是逆向的rank if (reverse) return llen-rank; else return rank-1; } else { return -1; } //skiplist經過zslGetRank獲取rank,具體過程爲跳錶查找,將相應路過節點的span相加 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; zskiplist *zsl = zs->zsl; dictEntry *de; double score; de = dictFind(zs->dict,ele); if (de != NULL) { score = *(double*)dictGetVal(de); rank = zslGetRank(zsl,score,ele); /* Existing elements always have a rank. */ serverAssert(rank != 0); //逆向取rank if (reverse) return llen-rank; else return rank-1; } else { return -1; } } else { serverPanic("Unknown sorted set encoding"); } }
ZREM key member [member ...]
從有序集合中刪除相應的member
void zremCommand(client *c) { robj *key = c->argv[1]; robj *zobj; int deleted = 0, keyremoved = 0, j; //根據key找到對應的zobj if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL || checkType(c,zobj,OBJ_ZSET)) return; //依次刪除相應的元素,每次刪除以後檢查zset是否爲空,若是爲空,刪掉該key,而且break for (j = 2; j < c->argc; j++) { if (zsetDel(zobj,c->argv[j]->ptr)) deleted++; if (zsetLength(zobj) == 0) { dbDelete(c->db,key); keyremoved = 1; break; } } //若是確實有member被刪除掉,通知keyspace zrem事件 //若是zset整個都被刪除了,通知keyspace del事件 if (deleted) { notifyKeyspaceEvent(NOTIFY_ZSET,"zrem",key,c->db->id); if (keyremoved) notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id); signalModifiedKey(c->db,key); server.dirty += deleted; } //返回給客戶端實際刪除的member個數 addReplyLongLong(c,deleted); }
/* Delete the element 'ele' from the sorted set, returning 1 if the element * existed and was deleted, 0 otherwise (the element was not there). */ int zsetDel(robj *zobj, sds ele) { if (zobj->encoding == OBJ_ENCODING_ZIPLIST) { unsigned char *eptr; //ziplist先找到ele所在位置的指針eptr if ((eptr = zzlFind(zobj->ptr,ele,NULL)) != NULL) { //將該元素刪除.ziplist刪除時會resize,此處將刪除以後ziplist的指針復值給zobj->ptr zobj->ptr = zzlDelete(zobj->ptr,eptr); return 1; } } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; dictEntry *de; double score; //skiplist現將zobj->ptr->dict相應的ele刪除掉。此處並未真實刪除 //而是將ele所在的dictEntry返回 de = dictUnlink(zs->dict,ele); if (de != NULL) { /* Get the score in order to delete from the skiplist later. */ //經過dictEntry獲取score score = *(double*)dictGetVal(de); /* Delete from the hash table and later from the skiplist. * Note that the order is important: deleting from the skiplist * actually releases the SDS string representing the element, * which is shared between the skiplist and the hash table, so * we need to delete from the skiplist as the final step. */ //此處將dict中的key和value實際free掉 dictFreeUnlinkedEntry(zs->dict,de); /* Delete from skiplist. */ //從skiplist中刪除元素.ele這個sds在hash和skiplist共享.從skiplist中刪除時 //會釋放此sds,因此必須先刪除dict中的元素再刪除skiplist中的元素 int retval = zslDelete(zs->zsl,score,ele,NULL); serverAssert(retval); //若是hash表中元素使用率小於10%,進行dict的resize if (htNeedsResize(zs->dict)) dictResize(zs->dict); return 1; } } else { serverPanic("Unknown sorted set encoding"); } return 0; /* No such element found. */ }