有序集SortedSet算是redis中一個頗有特點的數據結構,經過這篇文章來總結一下這塊知識點。redis
原文地址:http://www.jianshu.com/p/75ca...數組
redis中的有序集,容許用戶使用指定值對放進去的元素進行排序,而且基於該已排序的集合提供了一系列豐富的操做集合的API。
舉例以下:數據結構
//添加元素,table1爲有序集的名字,100爲用於排序字段(redis把它叫作score),a爲咱們要存儲的元素 127.0.0.1:6379> zadd table1 100 a (integer) 1 127.0.0.1:6379> zadd table1 200 b (integer) 1 127.0.0.1:6379> zadd table1 300 c (integer) 1 //按照元素索引返回有序集中的元素,索引從0開始 127.0.0.1:6379> zrange table1 0 1 1) "a" 2) "b" //按照元素排序範圍返回有序集中的元素,這裏用於排序的字段在redis中叫作score 127.0.0.1:6379> zrangebyscore table1 150 400 1) "b" 2) "c" //刪除元素 127.0.0.1:6379> zrem table1 b (integer) 1
在有序集中,用於排序的值叫作score,實際存儲的值叫作member。函數
因爲有序集中提供的API較多,這裏只舉了幾個常見的,具體能夠參考redis文檔。源碼分析
關於有序集,咱們有一個十分常見的使用場景就是用戶評論。在APP或者網站上發佈一條消息,下面會有不少評論,一般展現是按照發布時間倒序排列,這個需求就可使用有序集,以發佈評論的時間戳做爲score,而後按照展現評論的數量倒序查找有序集。網站
老規矩,咱們仍是從server.c文件中的命令表中找到相關命令的處理函數,而後一一分析。
依舊從添加元素開始,zaddCommand函數:spa
void zaddCommand(client *c) { zaddGenericCommand(c,ZADD_NONE); }
這裏能夠看到流程轉向了zaddGenericCommand,而且傳入了一個模式標記。
關於SortedSet的操做模式這裏簡單說明一下,先來看一條完整的zadd命令:code
zadd key [NX|XX] [CH] [INCR] score member [score member ...]
其中的可選項咱們依次看下:server
NX表示若是元素存在,則不執行替換操做直接返回。blog
XX表示只操做已存在的元素。
CH表示返回修改(包括添加,更新)元素的數量,只能被ZADD命令使用。
INCR表示在原來的score基礎上加上新的score,而不是替換。
上面代碼片斷中的ZADD_NONE表示普通操做。
接下來看下zaddGenericCommand函數的源碼,很長,耐心一點點看:
void zaddGenericCommand(client *c, int flags) { //一條錯誤提示信息 static char *nanerr = "resulting score is not a number (NaN)"; //有序集名字 robj *key = c->argv[1]; robj *zobj; sds ele; double score = 0, *scores = NULL; int j, elements; int scoreidx = 0; //記錄元素操做個數 int added = 0; int updated = 0; int processed = 0; //查找score的位置,默認score在位置2上,但因爲有各類模式,因此須要判斷 scoreidx = 2; while(scoreidx < c->argc) { char *opt = c->argv[scoreidx]->ptr; //判斷命令中是否設置了各類模式 if (!strcasecmp(opt,"nx")) flags |= ZADD_NX; else if (!strcasecmp(opt,"xx")) flags |= ZADD_XX; else if (!strcasecmp(opt,"ch")) flags |= ZADD_CH; else if (!strcasecmp(opt,"incr")) flags |= ZADD_INCR; else break; scoreidx++; } //設置模式 int incr = (flags & ZADD_INCR) != 0; int nx = (flags & ZADD_NX) != 0; int xx = (flags & ZADD_XX) != 0; int ch = (flags & ZADD_CH) != 0; //經過上面的解析,scoreidx爲真實的初始score的索引位置 //這裏客戶端參數數量減去scoreidx就是剩餘全部元素的數量 elements = c->argc - scoreidx; //因爲有序集中score,member成對出現,因此加一層判斷 if (elements % 2 || !elements) { addReply(c,shared.syntaxerr); return; } //這裏計算score,member有多少對 elements /= 2; //參數合法性校驗 if (nx && xx) { addReplyError(c, "XX and NX options at the same time are not compatible"); return; } //參數合法性校驗 if (incr && elements > 1) { addReplyError(c, "INCR option supports a single increment-element pair"); return; } //這裏開始解析score,先初始化scores數組 scores = zmalloc(sizeof(double)*elements); for (j = 0; j < elements; j++) { //填充數組,這裏注意元素是成對出現,因此各個score之間要隔一個member if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL) != C_OK) goto cleanup; } //這裏首先在client對應的db中查找該key,即有序集 zobj = lookupKeyWrite(c->db,key); if (zobj == NULL) { //沒有指定有序集且模式爲XX(只操做已存在的元素),直接返回 if (xx) goto reply_to_client; //根據元素數量選擇不一樣的存儲結構初始化有序集 if (server.zset_max_ziplist_entries == 0 || server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr)) { //哈希表 + 跳錶的組合模式 zobj = createZsetObject(); } else { //ziplist(壓縮鏈表)模式 zobj = createZsetZiplistObject(); } //加入db中 dbAdd(c->db,key,zobj); } else { //若是ZADD操做的集合類型不對,則返回 if (zobj->type != OBJ_ZSET) { addReply(c,shared.wrongtypeerr); goto cleanup; } } //這裏開始往有序集中添加元素 for (j = 0; j < elements; j++) { double newscore; //取出client傳過來的score score = scores[j]; int retflags = flags; //取出與之對應的member ele = c->argv[scoreidx+1+j*2]->ptr; //向有序集中添加元素,參數依次是有序集,要添加的元素的score,要添加的元素,操做模式,新的score int retval = zsetAdd(zobj, score, ele, &retflags, &newscore); //添加失敗則返回 if (retval == 0) { addReplyError(c,nanerr); goto cleanup; } //記錄操做 if (retflags & ZADD_ADDED) added++; if (retflags & ZADD_UPDATED) updated++; if (!(retflags & ZADD_NOP)) processed++; //設置新score值 score = newscore; } //操做記錄 server.dirty += (added+updated); //返回邏輯 reply_to_client: if (incr) { if (processed) addReplyDouble(c,score); else addReply(c,shared.nullbulk); } else { addReplyLongLong(c,ch ? added+updated : added); } //清理邏輯 cleanup: zfree(scores); if (added || updated) { signalModifiedKey(c->db,key); notifyKeyspaceEvent(NOTIFY_ZSET, incr ? "zincr" : "zadd", key, c->db->id); } }
代碼有點長,來張圖看一下存儲結構:
注:每一個entry都是由score+member組成
有了上面的結構圖之後,能夠想到刪除操做應該就是根據不一樣的存儲結構進行,若是是ziplist就執行鏈表刪除,若是是哈希表+跳錶結構,那就要把兩個集合都進行刪除。真實邏輯是什麼呢?
咱們來看下刪除函數zremCommand的源碼,相對短一點:
void zremCommand(client *c) { //獲取有序集名 robj *key = c->argv[1]; robj *zobj; int deleted = 0, keyremoved = 0, j; //作校驗 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL || checkType(c,zobj,OBJ_ZSET)) return; for (j = 2; j < c->argc; j++) { //一次刪除指定元素 if (zsetDel(zobj,c->argv[j]->ptr)) deleted++; //若是有序集中所有元素都被刪除,則回收有序表 if (zsetLength(zobj) == 0) { dbDelete(c->db,key); keyremoved = 1; break; } } //同步操做 if (deleted) { notifyKeyspaceEvent(NOTIFY_ZSET,"zrem",key,c->db->id); if (keyremoved) notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id); signalModifiedKey(c->db,key); server.dirty += deleted; } //返回 addReplyLongLong(c,deleted); }
看下具體的刪除操做源碼:
//參數zobj爲有序集,ele爲要刪除的元素 int zsetDel(robj *zobj, sds ele) { //與添加元素相同,根據不一樣的存儲結構執行不一樣的刪除邏輯 if (zobj->encoding == OBJ_ENCODING_ZIPLIST) { unsigned char *eptr; //ziplist是一個簡單的鏈表刪除節點操做 if ((eptr = zzlFind(zobj->ptr,ele,NULL)) != NULL) { zobj->ptr = zzlDelete(zobj->ptr,eptr); return 1; } } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; dictEntry *de; double score; de = dictUnlink(zs->dict,ele); if (de != NULL) { //查詢該元素的score score = *(double*)dictGetVal(de); //從哈希表中刪除元素 dictFreeUnlinkedEntry(zs->dict,de); //從跳錶中刪除元素 int retval = zslDelete(zs->zsl,score,ele,NULL); serverAssert(retval); //若是有須要則對哈希表進行resize操做 if (htNeedsResize(zs->dict)) dictResize(zs->dict); return 1; } } else { serverPanic("Unknown sorted set encoding"); } //沒有找到指定元素返回0 return 0; }
最後看一個查詢函數zrangeCommand源碼,也是很長,汗~~~,不過放心,有了上面的基礎,大體也能猜到查詢邏輯應該是什麼樣子的:
void zrangeCommand(client *c) { //第二個參數,0表示順序,1表示倒序 zrangeGenericCommand(c,0); } void zrangeGenericCommand(client *c, int reverse) { //有序集名 robj *key = c->argv[1]; robj *zobj; int withscores = 0; long start; long end; int llen; int rangelen; //參數校驗 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) || (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return; //根據參數附加信息判斷是否須要返回score if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) { withscores = 1; } else if (c->argc >= 5) { addReply(c,shared.syntaxerr); return; } //有序集校驗 if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL || checkType(c,zobj,OBJ_ZSET)) return; //索引值重置 llen = zsetLength(zobj); if (start < 0) start = llen+start; if (end < 0) end = llen+end; if (start < 0) start = 0; //返回空集 if (start > end || start >= llen) { addReply(c,shared.emptymultibulk); return; } if (end >= llen) end = llen-1; rangelen = (end-start)+1; //返回給客戶端結果長度 addReplyMultiBulkLen(c, withscores ? (rangelen*2) : rangelen); //一樣是根據有序集的不一樣結構執行不一樣的查詢邏輯 if (zobj->encoding == OBJ_ENCODING_ZIPLIST) { unsigned char *zl = zobj->ptr; unsigned char *eptr, *sptr; unsigned char *vstr; unsigned int vlen; long long vlong; //根據正序仍是倒序計算起始索引 if (reverse) eptr = ziplistIndex(zl,-2-(2*start)); else eptr = ziplistIndex(zl,2*start); serverAssertWithInfo(c,zobj,eptr != NULL); sptr = ziplistNext(zl,eptr); while (rangelen--) { serverAssertWithInfo(c,zobj,eptr != NULL && sptr != NULL); //注意嵌套的ziplistGet方法就是把eptr索引的值讀出來保存在後面三個參數中 serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong)); //返回value if (vstr == NULL) addReplyBulkLongLong(c,vlong); else addReplyBulkCBuffer(c,vstr,vlen); //若是須要則返回score if (withscores) addReplyDouble(c,zzlGetScore(sptr)); //倒序從後往前,正序從前日後 if (reverse) zzlPrev(zl,&eptr,&sptr); else zzlNext(zl,&eptr,&sptr); } } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; zskiplist *zsl = zs->zsl; zskiplistNode *ln; sds ele; //找到起始節點 if (reverse) { ln = zsl->tail; if (start > 0) ln = zslGetElementByRank(zsl,llen-start); } else { ln = zsl->header->level[0].forward; if (start > 0) ln = zslGetElementByRank(zsl,start+1); } //遍歷並返回給客戶端 while(rangelen--) { serverAssertWithInfo(c,zobj,ln != NULL); ele = ln->ele; addReplyBulkCBuffer(c,ele,sdslen(ele)); if (withscores) addReplyDouble(c,ln->score); ln = reverse ? ln->backward : ln->level[0].forward; } } else { serverPanic("Unknown sorted set encoding"); } }
上面就是關於有序集SortedSet的添加,刪除,查找的源碼。能夠看出SortedSet會根據存放元素的數量選擇ziplist或者哈希表+跳錶兩種數據結構進行實現,之因此源碼看上去很長,主要緣由也就是要根據不一樣的數據結構進行不一樣的代碼實現。只要掌握了這個核心思路,再看源碼就不會太難。
有序集的邏輯不難,就是代碼有點長,涉及到ziplist,skiplist,dict三套數據結構,其中除了常規的dict以外,另外兩個數據結構內容都很多,準備專門寫文章進行總結,就不在這裏贅述了。本文主要目的是總結一下有序集SortedSet的實現原理。