redis用的人比較多,其中zset你們都熟悉,主要用於排名場景。
zset數據結構,分紅兩部分,一部分是用於排序,一部分用於緩存鍵值。
先看看結構:node
typedef struct zset { dict *dict; //緩存 zskiplist *zsl; //排序結構 } zset;
上面,跳躍表用於排序結構,能夠按照名次,積分查找對應鍵, 時間複雜度: log(n)。
按照名次,積分範圍查找一系列鍵時, 先查詢知足條件的第一個鍵,而後當前鍵查找後續鍵, 時間複雜度: log(n) + o(m), n=總鍵數, m=查詢結果鍵數。
跳躍表結構:redis
typedef struct zskiplist { struct zskiplistNode *header, *tail; //頭結點:用於順序查詢,經常使用方式; 尾結點:用於倒序簡單查詢。 unsigned long length; //結點數 int level; //跳躍層級 } zskiplist;
結點結構:緩存
typedef struct zskiplistNode { robj *obj; //鍵 double score; //積分 struct zskiplistNode *backward; //前一個結點, 和level[0]可看做雙鏈表 struct zskiplistLevel { //跳躍層關係, 每層都是單鏈表 struct zskiplistNode *forward; //此層下一個結點 unsigned int span; //此層下一個結點和當前結點距離(二者隔了多少結點) } level[]; //最多32層 } zskiplistNode;
查詢:
根據名次範圍查詢數據結構
void zrangeGenericCommand(client *c, int reverse) { ...... zset *zs = zobj->ptr; //zset結構變量 zskiplist *zsl = zs->zsl; //跳躍表 zskiplistNode *ln; robj *ele; /* Check if starting point is trivial, before doing log(N) lookup. */ if (reverse) { //是否倒序查詢 ln = zsl->tail; //默認取尾結點 if (start > 0) ln = zslGetElementByRank(zsl,llen-start); //若是start>0, 則取對應結點 } else { ln = zsl->header->level[0].forward; //默認取第一個結點 if (start > 0) ln = zslGetElementByRank(zsl,start+1); } while(rangelen--) { //取rangelen個結點 serverAssertWithInfo(c,zobj,ln != NULL); ele = ln->obj; addReplyBulk(c,ele); //響應鍵名 if (withscores) addReplyDouble(c,ln->score); //響應鍵值 ln = reverse ? ln->backward : ln->level[0].forward; //設置下一個結點 } ...... } /* Finds an element by its rank. The rank argument needs to be 1-based. */ zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) { zskiplistNode *x; unsigned long traversed = 0; //當前名次 int i; x = zsl->header; //頭結點, 從頭結點的下一個結點遍歷 for (i = zsl->level-1; i >= 0; i--) { //從高層到低層鏈表遍歷 while (x->level[i].forward && (traversed + x->level[i].span) <= rank) //若是有下一個結點,且下一個結點的名次<=rank { traversed += x->level[i].span; x = x->level[i].forward; } if (traversed == rank) { //找到對應名次的結點 return x; } } return NULL; }
zslGetElementByRank()時間複雜度理想值 = log(n)this
若是有刪除,添加操做,和查詢相似,須要額外維護跳躍表關係。spa
/* Delete all the elements with score between min and max from the skiplist. * Min and max are inclusive, so a score >= min || score <= max is deleted. * Note that this function takes the reference to the hash table view of the * sorted set, in order to remove the elements from the hash table too. * 根據積分範圍刪除結點
*/ unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec *range, dict *dict) { zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; //update維護跳躍表層級關係,用於zslDeleteNode() unsigned long removed = 0; int i; x = zsl->header; for (i = zsl->level-1; i >= 0; i--) { while (x->level[i].forward && (range->minex ? //不満足積分條件時,循環 x->level[i].forward->score <= range->min : x->level[i].forward->score < range->min)) x = x->level[i].forward; update[i] = x; //此層最接近於條件的結點 } /* Current node is the last with score < or <= min. */ x = x->level[0].forward; //第一個最可能知足條件的結點 /* Delete nodes while in range. */ while (x && (range->maxex ? x->score < range->max : x->score <= range->max)) //知足條件的結點 { zskiplistNode *next = x->level[0].forward; zslDeleteNode(zsl,x,update); //更新層級關係 dictDelete(dict,x->obj); //刪除緩存 zslFreeNode(x); removed++; x = next; } return removed; } /* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */ void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) { int i; for (i = 0; i < zsl->level; i++) { //更新層級關係 if (update[i]->level[i].forward == x) { update[i]->level[i].span += x->level[i].span - 1; update[i]->level[i].forward = x->level[i].forward; } else { update[i]->level[i].span -= 1; } } if (x->level[0].forward) { //維護當前結點的下一個結點 x->level[0].forward->backward = x->backward; } else { zsl->tail = x->backward; //維護尾結點 } while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL) zsl->level--; //維護層數 zsl->length--; //維護結點數 }
根據鍵名查找積分, 有了zset->dict這個鍵值緩存,只須要時間複雜度0(1)code
int zsetScore(robj *zobj, robj *member, double *score) { if (!zobj || !member) return C_ERR; if (zobj->encoding == OBJ_ENCODING_ZIPLIST) { //ziplit實現 if (zzlFind(zobj->ptr, member, score) == NULL) return C_ERR; } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; dictEntry *de = dictFind(zs->dict, member); //找到緩存entry if (de == NULL) return C_ERR; *score = *(double*)dictGetVal(de); //獲取對應積分 } else { serverPanic("Unknown sorted set encoding"); } return C_OK; }
若是zset對象用ziplist實現,則查詢和刪除操做時間複雜度 = o(n)server
...對象