太長不看版node
- 跳躍表是有序集合的底層實現之一, 除此以外它在 Redis 中沒有其餘應用。
- 每一個跳躍表節點的層高都是 1 至 64 之間的隨機數
- 層高越高出現的機率越低,層高爲i的機率爲
。
- 跳躍表中,分值能夠重複, 但對象成員惟一。分值相同時,節點按照成員對象的大小進行排序。
本篇解析基於redis 5.0.0版本,本篇涉及源碼文件爲t_zset.c, server.h。git
跳錶是一個隨機化的數據結構,實質就是一種能夠進行二分查找的有序鏈表。 github
咱們都知道在有序數組中進行查找,可使用二分查找,將時間複雜度降爲O(log n)。可是有序鏈表作不到,是由於有序鏈表獲取某元素複雜度爲O(n),沒法經過二分的思想去跳過一些元素的訪問。redis
例以下圖要查找元素50,就必須 5 -> 6 -> 10 -> 30 -> 49 這樣去找,而不能說先看 中心元素49小於50,則開始從中心右邊開始查找,跳過元素5,6,10, 30的訪問。算法
這時查找元素50變成了 5 -> 49,略過了中間元素6,10, 30。上圖中經過首節點存儲不一樣步長的指針將鏈表完美二分,可是實際上的跳錶卻相似與下面這張圖的結構,大部分狀況喜好不是完美二分的:數組
跳躍表採用了隨機算法(層高越高几率越小)來決定層高,相同層之間經過指針相連。redis實現中某節點層高爲i的機率爲 。數據結構
爲何不採用最完美的二分結構? dom
考慮一下,插入節點的狀況。當中間插入一個節點,此時的二分結構會被打破,因此須要不斷的進行調整。想一想平衡樹,紅黑樹複雜的再平衡操做,而此處的再平衡調整比之有過之而無不及。而使用隨機算法進行層高選擇的方法也能夠實現O(logN)的平均複雜度,並且操做也相對簡化的不少。函數
// 層高最大值限制
#define ZSKIPLIST_MAXLEVEL 64 /* Should be enough for 2^64 elements */
// 層高是否繼續增加的機率
#define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
// 跳錶節點定義
typedef struct zskiplistNode {
// 存儲內容
sds ele;
// 分值,用於排序
double score;
// 後退指針
struct zskiplistNode *backward;
// 變長數組,記錄層信息。層高越高跳過的節點越多(由於層高越高几率越低)
struct zskiplistLevel {
// 指向當前層下一個節點
struct zskiplistNode *forward;
// 當前節點與forward所指節點中間節點數
unsigned long span;
} level[];
} zskiplistNode;
// 跳錶結構管理節點
typedef struct zskiplist {
struct zskiplistNode *header, *tail;
// 長度
unsigned long length;
// 跳錶高度(全部節點最高層高)
int level;
} zskiplist;
int zslRandomLevel(void) {
// 計算當前插入元素層高的隨機函數
int level = 1;
// (random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF) 機率爲1/4
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
複製代碼
層高爲1機率爲 1-p(不進while)ui
層高爲2的機率爲 p(進一次while) * (1 - p)(不進while)
層高爲3的機率爲 p(進一次while) * p(進一次while) * (1 - p)(不進while)
...
層高爲n的機率爲
層高的指望
在機率論和統計學中,數學指望(mean)(或均值,亦簡稱指望)是試驗中每次可能結果的機率乘以其結果的總和,是最基本的數學特徵之一。它反映隨機變量平均取值的大小
在redis實現中 p=1/4, 層高指望爲E約等於1.33,因此節點的平均層高約等於1.33是個常數,從而得出跳躍表的空間複雜度爲O(n)。
zskiplistNode *zslCreateNode(int level, double score, int ele) {
zskiplistNode *zn =
malloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
zn->score = score;
zn->ele = ele;
return zn;
}
/* Create a new skiplist. */
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;
zsl = malloc(sizeof(*zsl));
zsl->level = 1;
zsl->length = 0;
// 頭節點層高爲64(層高的最大限制)
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
zsl->header->level[j].forward = NULL;
zsl->header->level[j].span = 0;
}
zsl->header->backward = NULL;
zsl->tail = NULL;
return zsl;
}
複製代碼
上述代碼中能夠看到,頭節點的層高數組直接爲最大長度,由於每次查找都要從頭部開始,並且整個跳躍表的高度是動態增長的,初始化時直接按照最大值申請高度,避免後續高度增長時爲頭節點從新分配內存。因此以前的跳躍表圖例應該以下圖所示:
int zslRandomLevel(void) {
// 計算當前插入元素層高的隨機函數
int level = 1;
// (random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF) 機率爲1/4
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
// update存放須要更新的節點
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
serverAssert(!isnan(score));
x = zsl->header;
// 第一步,收集須要更新的節點與步長信息
for (i = zsl->level-1; i >= 0; i--) {
/* store rank that is crossed to reach the insert position */
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
// score能夠重複,重複時使用ele大小進行排序
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
update[i] = x;
}
// 第二步, 獲取隨機層高,補全須要更新的節點
level = zslRandomLevel();
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
// 第三步,建立並分層插入節點,同時更新同層前一節點步長信息
x = zslCreateNode(level,score,ele);
for (i = 0; i < level; i++) {
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
/* update span covered by update[i] as x is inserted here */
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
// 第四步,更新新增節點未涉及層節點的步長信息,以及跳錶相關信息
/* increment span for untouched levels */
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
zsl->length++;
return x;
}
複製代碼
插入節點分爲四步(舉個栗子,邊吃邊看):
建立並分層插入節點,同時更新同層前一節點步長信息
更新新增節點未涉及層節點的步長信息,以及跳錶相關信息與節點自身的相關信息
/* Find the rank for an element by both score and key. * Returns 0 when the element cannot be found, rank otherwise. * Note that the rank is 1-based due to the span of zsl->header to the * first element. */
unsigned long zslGetRank(zskiplist *zsl, double score, sds ele) {
zskiplistNode *x;
unsigned long rank = 0;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) <= 0))) {
rank += x->level[i].span;
x = x->level[i].forward;
}
/* x might be equal to zsl->header, so test if obj is non-NULL */
if (x->ele && sdscmp(x->ele,ele) == 0) {
return rank;
}
}
return 0;
}
/* Finds an element by its rank. The rank argument needs to be 1-based. */
zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
zskiplistNode *x;
unsigned long traversed = 0;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
{
traversed += x->level[i].span;
x = x->level[i].forward;
}
if (traversed == rank) {
return x;
}
}
return NULL;
}
複製代碼
redis實現中跳躍表和dict共同實現了zset,dict實現O(1)複雜度獲取元素對應score,跳躍表用來處理區間查詢的相關操做,同時由於score能夠重複,因此跳躍表無需實現經過ele獲取score(經過dict查)以及經過score獲取ele(貌似也沒有這個需求)。
通常查詢需求有兩個:
/* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
int i;
for (i = 0; i < zsl->level; i++) {
// 被刪除節點在第i層有節點,則update[i]爲被刪除節點的前一個節點
if (update[i]->level[i].forward == x) {
// 步長 = 原步長 + 被刪除節點步長 - 1(被刪除節點)
update[i]->level[i].span += x->level[i].span - 1;
// 指針越過被刪除節點
update[i]->level[i].forward = x->level[i].forward;
} else {
// 被刪除節點在第i層無節點,則 步長 = 原步長 - 1(被刪除節點)
update[i]->level[i].span -= 1;
}
}
if (x->level[0].forward) {
// 更新被刪除節點下一節點的後退指針
x->level[0].forward->backward = x->backward;
} else {
zsl->tail = x->backward;
}
while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
zsl->level--;
zsl->length--;
}
int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
x = x->level[i].forward;
}
update[i] = x;
}
/* We may have multiple elements with the same score, what we need * is to find the element with both the right score and object. */
x = x->level[0].forward;
if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
zslDeleteNode(zsl, x, update);
if (!node)
zslFreeNode(x);
else
*node = x;
return 1;
}
return 0; /* not found */
}
複製代碼
刪除節點與添加節點步驟相似,分爲三步: