Redis有序集合ZSet能夠按分數進行排序, 存儲結構可能使用ziplist,skiplist和hash表, zset_max_ziplist_entries和zset_max_ziplist_value兩個字段控制zset採用何種存儲方式, zset_max_ziplist_entries表示ziplist中存儲score和member佔用的內存空間超過該值, 則存儲結構會轉變爲skiplist和hash表; zset_max_ziplist_value表示ziplist中存儲的member值佔用的內存空間超過該值, 則存儲結構會轉變爲skiplist和hash表. 存儲使用ziplist時, ziplist存儲格式爲[member, score, member, score....], 以score值升序進行排序.存儲使用skiplist時, 須要hash表配合使用, hash表存儲以member爲key, score爲值, 加快member檢索score速度; skiplist存儲score和member, 並以score值進行升序排序.node
添加元素到有序集合中, 命令格式 : ZADD key score member [[score member] [score member] ...], 入口函數zaddCommand安全
void zaddCommand(redisClient *c) { zaddGenericCommand(c,0); } /*函數向有序集合中添加一個元素, 在incr值設置時, 同時能夠實現對score值進行累加操做*/ void zaddGenericCommand(redisClient *c, int incr) { static char *nanerr = "resulting score is not a number (NaN)"; robj *key = c->argv[1]; robj *ele; robj *zobj; robj *curobj; double score = 0, *scores = NULL, curscore = 0.0; int j, elements = (c->argc-2)/2; int added = 0, updated = 0; if (c->argc % 2) { addReply(c,shared.syntaxerr); return; } /* 獲取scores值, score必須爲數字, 不然直接返回錯誤*/ scores = zmalloc(sizeof(double)*elements); for (j = 0; j < elements; j++) { if (getDoubleFromObjectOrReply(c,c->argv[2+j*2],&scores[j],NULL) != REDIS_OK) goto cleanup; } /* 若是有序集合不存在, 直接進行建立 */ zobj = lookupKeyWrite(c->db,key); if (zobj == NULL) { /*對限制條件進行判斷,選擇存儲結構*/ if (server.zset_max_ziplist_entries == 0 || server.zset_max_ziplist_value < sdslen(c->argv[3]->ptr)) { /*建立有序集合, 存儲結構式skiplist*/ zobj = createZsetObject(); } else { /*建立有序集合, 存儲結構式ziplist*/ zobj = createZsetZiplistObject(); } dbAdd(c->db,key,zobj); } else { if (zobj->type != REDIS_ZSET) { addReply(c,shared.wrongtypeerr); goto cleanup; } } for (j = 0; j < elements; j++) { score = scores[j]; if (zobj->encoding == REDIS_ENCODING_ZIPLIST) { unsigned char *eptr; /* 在skiplist中進行查找, 找到則刪除原來的, 插入新的, 不然直接進行插入操做*/ ele = c->argv[3+j*2]; if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) { /*incr值設置, 則須要進行累加*/ if (incr) { score += curscore; if (isnan(score)) { addReplyError(c,nanerr); goto cleanup; } } /* 若是member和score都沒有變化, 則不進行任何操做*/ if (score != curscore) { zobj->ptr = zzlDelete(zobj->ptr,eptr); zobj->ptr = zzlInsert(zobj->ptr,ele,score); server.dirty++; updated++; } } else { /* 一樣插入元素時進行檢測ziplist轉skiplist的閥值*/ zobj->ptr = zzlInsert(zobj->ptr,ele,score); if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries) zsetConvert(zobj,REDIS_ENCODING_SKIPLIST); if (sdslen(ele->ptr) > server.zset_max_ziplist_value) zsetConvert(zobj,REDIS_ENCODING_SKIPLIST); server.dirty++; added++; } } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; zskiplistNode *znode; dictEntry *de; /*存儲結構爲skiplist時, 首先從hash表中經過member查找到score, 一樣找到刪除原來的, 找不到則直接插入*/ ele = c->argv[3+j*2] = tryObjectEncoding(c->argv[3+j*2]); de = dictFind(zs->dict,ele); if (de != NULL) { curobj = dictGetKey(de); curscore = *(double*)dictGetVal(de); if (incr) { score += curscore; if (isnan(score)) { addReplyError(c,nanerr); goto cleanup; } } /* member和score徹底同樣, 則不進行任何操做*/ if (score != curscore) { redisAssertWithInfo(c,curobj,zslDelete(zs->zsl,curscore,curobj)); znode = zslInsert(zs->zsl,score,curobj); incrRefCount(curobj); /* Re-inserted in skiplist. */ dictGetVal(de) = &znode->score; /* Update score ptr. */ server.dirty++; updated++; } } else { znode = zslInsert(zs->zsl,score,ele); incrRefCount(ele); /* Inserted in skiplist. */ redisAssertWithInfo(c,NULL,dictAdd(zs->dict,ele,&znode->score) == DICT_OK); incrRefCount(ele); /* Added to dictionary. */ server.dirty++; added++; } } else { redisPanic("Unknown sorted set encoding"); } } if (incr) /* ZINCRBY */ addReplyDouble(c,score); else /* ZADD */ addReplyLongLong(c,added); cleanup: zfree(scores); if (added || updated) { signalModifiedKey(c->db,key); notifyKeyspaceEvent(REDIS_NOTIFY_ZSET, incr ? "zincr" : "zadd", key, c->db->id); } }
統計score值在一個範圍內的元素數量, 命令格式: ZCOUNT key min max, zcount操做其實很簡單, ziplist存儲結構, 只須要依次遍歷而後比較score值是否在範圍內, 並記錄知足條件的元素個數便可. skiplist能夠對score值進行快速檢索, 所以能夠找到落入範圍內開始元素和結束元素排名, 經過簡單運算能夠得出知足條件的元素數量.數據結構
void zcountCommand(redisClient *c) { robj *key = c->argv[1]; robj *zobj; zrangespec range; int count = 0; /* 解析min和max參數值, 並放入range中 */ if (zslParseRange(c->argv[2],c->argv[3],&range) != REDIS_OK) { addReplyError(c,"min or max is not a float"); return; } /*查找有序集合*/ if ((zobj = lookupKeyReadOrReply(c, key, shared.czero)) == NULL || checkType(c, zobj, REDIS_ZSET)) return; if (zobj->encoding == REDIS_ENCODING_ZIPLIST) { unsigned char *zl = zobj->ptr; unsigned char *eptr, *sptr; double score; /* 查找第一個位於range範圍內的元素*/ eptr = zzlFirstInRange(zl,&range); /* 找不到直接返回空 */ if (eptr == NULL) { addReply(c, shared.czero); return; } /* 找到第一個符號條件元素, 而後依次遍歷ziplist對符合條件的元素進行計數*/ sptr = ziplistNext(zl,eptr); score = zzlGetScore(sptr); redisAssertWithInfo(c,zobj,zslValueLteMax(score,&range)); while (eptr) { score = zzlGetScore(sptr); /* score必須小於給定返回的最大值max, 不然計數結束 */ if (!zslValueLteMax(score,&range)) { break; } else { count++; zzlNext(zl,&eptr,&sptr); } } } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; zskiplist *zsl = zs->zsl; zskiplistNode *zn; unsigned long rank; /* skiplist中查找第一個落入範圍的元素 */ zn = zslFirstInRange(zsl, &range); if (zn != NULL) { /*首先計算出大於min的元素個數count, rank獲取的是大於min值第一個元素排名*/ rank = zslGetRank(zsl, zn->score, zn->obj); count = (zsl->length - (rank - 1)); /* skiplist中查找最後一個落入範圍內的元素 */ zn = zslLastInRange(zsl, &range); if (zn != NULL) { /*rank獲取的是最後一個落入返回內的元素排名*/ rank = zslGetRank(zsl, zn->score, zn->obj); /*(zsl->length-rank)表示全部大於max元素數量, 與count作減法計算出結果*/ count -= (zsl->length - rank); } } } else { redisPanic("Unknown sorted set encoding"); } addReplyLongLong(c, count); }
獲取一個位置範圍內的元素, 命令格式: ZRANGE key start stop [WITHSCORES], start, stop元素表明位置下標, 從0開始. 這裏只對range操做進行講述, 其餘的range操做大同小異, 只是對增長了一些判斷的條件參數, 不在展開一一說明.函數
void zrangeCommand(redisClient *c) { zrangeGenericCommand(c,0); } /*求range範圍內元素*/ void zrangeGenericCommand(redisClient *c, int reverse) { robj *key = c->argv[1]; robj *zobj; int withscores = 0; long start; long end; int llen; int rangelen; /*取出start和stop值*/ if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) || (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return; /*設置withscores標誌位*/ if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) { withscores = 1; } else if (c->argc >= 5) { addReply(c,shared.syntaxerr); return; } if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL || checkType(c,zobj,REDIS_ZSET)) return; /*因爲start和end能夠是負值, 所有進行轉換爲正值*/ llen = zsetLength(zobj); if (start < 0) start = llen+start; if (end < 0) end = llen+end; if (start < 0) start = 0; /* 判斷range範圍是否符合條件,不合條件直接返回空 */ if (start > end || start >= llen) { addReply(c,shared.emptymultibulk); return; } /*下標超出範圍則置爲爲集合結尾元素位置*/ if (end >= llen) end = llen-1; rangelen = (end-start)+1; /* Return the result in form of a multi-bulk reply */ addReplyMultiBulkLen(c, withscores ? (rangelen*2) : rangelen); if (zobj->encoding == REDIS_ENCODING_ZIPLIST) { unsigned char *zl = zobj->ptr; unsigned char *eptr, *sptr; unsigned char *vstr; unsigned int vlen; long long vlong; /* ziplist首先找到start位置的元素, 而後依次遍歷rangelen個元素, 返回給客戶端*/ if (reverse) eptr = ziplistIndex(zl,-2-(2*start)); else eptr = ziplistIndex(zl,2*start); redisAssertWithInfo(c,zobj,eptr != NULL); sptr = ziplistNext(zl,eptr); while (rangelen--) { redisAssertWithInfo(c,zobj,eptr != NULL && sptr != NULL); redisAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong)); if (vstr == NULL) addReplyBulkLongLong(c,vlong); else addReplyBulkCBuffer(c,vstr,vlen); if (withscores) addReplyDouble(c,zzlGetScore(sptr)); if (reverse) zzlPrev(zl,&eptr,&sptr); else zzlNext(zl,&eptr,&sptr); } } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; zskiplist *zsl = zs->zsl; zskiplistNode *ln; robj *ele; /* skiplist一樣根據start位置, 找到相應的元素, 遍歷rangelen個元素返回給客戶端*/ if (reverse) { ln = zsl->tail; if (start > 0) ln = zslGetElementByRank(zsl,llen-start); } else { ln = zsl->header->level[0].forward; if (start > 0) ln = zslGetElementByRank(zsl,start+1); } while(rangelen--) { redisAssertWithInfo(c,zobj,ln != NULL); ele = ln->obj; addReplyBulk(c,ele); if (withscores) addReplyDouble(c,ln->score); ln = reverse ? ln->backward : ln->level[0].forward; } } else { redisPanic("Unknown sorted set encoding"); } }
求交集zinterstore, 求並集zunionstore, 兩個命令操做相對比較複雜, 操做使用的是同一個函數, 命令格式以下, 很是相似.
zinterstor命令格式 : ZINTERSTORE destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE SUM|MIN|MAX]
zunionstore命令格式: ZUNIONSTORE destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE SUM|MIN|MAX]
ui
/*有序集合求並集入口函數*/ void zunionstoreCommand(redisClient *c) { zunionInterGenericCommand(c,c->argv[1], REDIS_OP_UNION); } /*有序集合求交集入口函數*/ void zinterstoreCommand(redisClient *c) { zunionInterGenericCommand(c,c->argv[1], REDIS_OP_INTER); } #define REDIS_AGGR_SUM 1 //求和操做 #define REDIS_AGGR_MIN 2 //取最小值 #define REDIS_AGGR_MAX 3 //取最大值 #define zunionInterDictValue(_e) (dictGetVal(_e) == NULL ? 1.0 : *(double*)dictGetVal(_e)) /*聚合操做函數, 比較大小和求和操做*/ inline static void zunionInterAggregate(double *target, double val, int aggregate) { if (aggregate == REDIS_AGGR_SUM) { *target = *target + val; /* The result of adding two doubles is NaN when one variable * is +inf and the other is -inf. When these numbers are added, * we maintain the convention of the result being 0.0. */ if (isnan(*target)) *target = 0.0; } else if (aggregate == REDIS_AGGR_MIN) { *target = val < *target ? val : *target; } else if (aggregate == REDIS_AGGR_MAX) { *target = val > *target ? val : *target; } else { /* safety net */ redisPanic("Unknown ZUNION/INTER aggregate type"); } } /*具體進行並集和交集操做的函數*/ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { int i, j; long setnum; int aggregate = REDIS_AGGR_SUM; zsetopsrc *src; zsetopval zval; robj *tmp; unsigned int maxelelen = 0; robj *dstobj; zset *dstzset; zskiplistNode *znode; int touched = 0; /* 獲取表示key數量的numkeys字段 */ if ((getLongFromObjectOrReply(c, c->argv[2], &setnum, NULL) != REDIS_OK)) return; if (setnum < 1) { addReplyError(c, "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE"); return; } /* numkeys字段大於實際輸入的key數量, 直接返回語法錯誤提示 */ if (setnum > c->argc-3) { addReply(c,shared.syntaxerr); return; } /* 讀取全部的key對應的集合 */ src = zcalloc(sizeof(zsetopsrc) * setnum); for (i = 0, j = 3; i < setnum; i++, j++) { robj *obj = lookupKeyWrite(c->db,c->argv[j]); if (obj != NULL) { if (obj->type != REDIS_ZSET && obj->type != REDIS_SET) { zfree(src); addReply(c,shared.wrongtypeerr); return; } src[i].subject = obj; src[i].type = obj->type; src[i].encoding = obj->encoding; } else { src[i].subject = NULL; } /*weight默認爲1*/ src[i].weight = 1.0; } /* 若是後面還有參數, 解析剩餘參數weights和aggregate字段 */ if (j < c->argc) { int remaining = c->argc - j; while (remaining) { if (remaining >= (setnum + 1) && !strcasecmp(c->argv[j]->ptr,"weights")) { j++; remaining--; for (i = 0; i < setnum; i++, j++, remaining--) { if (getDoubleFromObjectOrReply(c,c->argv[j],&src[i].weight, "weight value is not a float") != REDIS_OK) { zfree(src); return; } } } else if (remaining >= 2 && !strcasecmp(c->argv[j]->ptr,"aggregate")) { j++; remaining--; if (!strcasecmp(c->argv[j]->ptr,"sum")) { aggregate = REDIS_AGGR_SUM; } else if (!strcasecmp(c->argv[j]->ptr,"min")) { aggregate = REDIS_AGGR_MIN; } else if (!strcasecmp(c->argv[j]->ptr,"max")) { aggregate = REDIS_AGGR_MAX; } else { zfree(src); addReply(c,shared.syntaxerr); return; } j++; remaining--; } else { zfree(src); addReply(c,shared.syntaxerr); return; } } } /* 對集合按集合元素多少進行升序排列 */ qsort(src,setnum,sizeof(zsetopsrc),zuiCompareByCardinality); /*建立一個新的集合存放計算結果*/ dstobj = createZsetObject(); dstzset = dstobj->ptr; memset(&zval, 0, sizeof(zval)); if (op == REDIS_OP_INTER) { /* 最少元素集合爲空直接跳過不執行 */ if (zuiLength(&src[0]) > 0) { /* 相似於無序集合求交集, 遍歷第一個集合, 並在剩餘的集合中查找, 查找不到則跳過該元素, 所有查找到則將該元素放入結果集合dstzset中*/ zuiInitIterator(&src[0]); while (zuiNext(&src[0],&zval)) { double score, value; score = src[0].weight * zval.score; if (isnan(score)) score = 0; for (j = 1; j < setnum; j++) { /* 若是後面集合中有和第一個集合和第一個集合是同一個集合, 則特殊判斷, 由於迭代操做不安全 */ if (src[j].subject == src[0].subject) { value = zval.score*src[j].weight; zunionInterAggregate(&score,value,aggregate); } else if (zuiFind(&src[j],&zval,&value)) { /* 找到元素, 而後score值與weight值作乘積, 最後進行聚合操做*/ value *= src[j].weight; zunionInterAggregate(&score,value,aggregate); } else { break; } } /*只有待查元素在全部集合中都出現,纔將此元素添加進結果集合中*/ if (j == setnum) { tmp = zuiObjectFromValue(&zval); znode = zslInsert(dstzset->zsl,score,tmp); incrRefCount(tmp); /* added to skiplist */ dictAdd(dstzset->dict,tmp,&znode->score); incrRefCount(tmp); /* added to dictionary */ /*判斷並存儲最大元素長度, 後面判斷是否須要轉換數據結構*/ if (tmp->encoding == REDIS_ENCODING_RAW) if (sdslen(tmp->ptr) > maxelelen) maxelelen = sdslen(tmp->ptr); } } zuiClearIterator(&src[0]); } } else if (op == REDIS_OP_UNION) { dict *accumulator = dictCreate(&setDictType,NULL); dictIterator *di; dictEntry *de; double score; if (setnum) { /*爲了儘量的減小rehash操做, 擴展存放結果字典空間爲最後一個集合的大小, 上面已經排序過, 最後一個是最大的集合*/ dictExpand(accumulator,zuiLength(&src[setnum-1])); } /* 下面開始循環全部集合, 並在accumulator中查找, 若是找到則進行相應的運算, 不然直接插入accumulator中*/ for (i = 0; i < setnum; i++) { if (zuiLength(&src[i]) == 0) continue; zuiInitIterator(&src[i]); while (zuiNext(&src[i],&zval)) { /* Initialize value */ score = src[i].weight * zval.score; if (isnan(score)) score = 0; /* 查找元素是否已經在accumulator字典中 */ de = dictFind(accumulator,zuiObjectFromValue(&zval)); if (de == NULL) { tmp = zuiObjectFromValue(&zval); /* 記錄元素最長的值, 後面用於判斷是否須要對集合進行轉換*/ if (tmp->encoding == REDIS_ENCODING_RAW) { if (sdslen(tmp->ptr) > maxelelen) maxelelen = sdslen(tmp->ptr); } /* 直接添加到字典中 */ de = dictAddRaw(accumulator,tmp); incrRefCount(tmp); dictSetDoubleVal(de,score); } else { /* 元素存在,按照指定的規則進行運算 */ zunionInterAggregate(&de->v.d,score,aggregate); } } zuiClearIterator(&src[i]); } /*遍歷將accumulator字典轉化爲有序集合*/ di = dictGetIterator(accumulator); dictExpand(dstzset->dict,dictSize(accumulator)); while((de = dictNext(di)) != NULL) { robj *ele = dictGetKey(de); score = dictGetDoubleVal(de); znode = zslInsert(dstzset->zsl,score,ele); incrRefCount(ele); /* added to skiplist */ dictAdd(dstzset->dict,ele,&znode->score); incrRefCount(ele); /* added to dictionary */ } dictReleaseIterator(di); /* We can free the accumulator dictionary now. */ dictRelease(accumulator); } else { redisPanic("Unknown operator"); } /*存儲目標key存在,則刪除原來的集合*/ if (dbDelete(c->db,dstkey)) { signalModifiedKey(c->db,dstkey); touched = 1; server.dirty++; } if (dstzset->zsl->length) { /* 判斷是否須要將存儲結構轉換爲ziplist */ if (dstzset->zsl->length <= server.zset_max_ziplist_entries && maxelelen <= server.zset_max_ziplist_value) zsetConvert(dstobj,REDIS_ENCODING_ZIPLIST); dbAdd(c->db,dstkey,dstobj); addReplyLongLong(c,zsetLength(dstobj)); if (!touched) signalModifiedKey(c->db,dstkey); notifyKeyspaceEvent(REDIS_NOTIFY_ZSET, (op == REDIS_OP_UNION) ? "zunionstore" : "zinterstore", dstkey,c->db->id); server.dirty++; } else { decrRefCount(dstobj); addReply(c,shared.czero); if (touched) notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",dstkey,c->db->id); } zfree(src); }