順風車運營研發團隊 譚淼
跳躍表(skiplist)是一種有序的數據結構,它經過在每一個節點中維持多個指向其餘節點的指針,從而達到指向其餘節點的目的。在Redis中,有序集合是經過跳躍表和hash實現的。html
1、跳躍表
爲了更好的閱讀下面的文章,建議先對跳躍表的基本概念進行學習,連接以下:https://www.cnblogs.com/a8457...node
2、數據結構
先看一下與跳躍表有關的數據結構。數組
一、zskiplistNode
zskiplistNode是跳躍表節點,用於存儲跳躍表節點。數據結構
typedef struct zskiplistNode { sds ele; //zset元素 double score; //zset分值 struct zskiplistNode *backward; //前一個節點 struct zskiplistLevel { struct zskiplistNode *forward; //後一個節點 unsigned int span; //後一個節點的跨度 } level[]; //zskiplistLevel結構體的數組 } zskiplistNode;
該數據結構如圖所示:dom
ele保存的是zset元素,score存儲的是zset元素的分值,backward是指向該zskiplistNode的前一個節點,level是一個存儲zskiplistLevel結構體的數組,其中zskiplistLevel的數據結構爲:函數
其中forward是是指向該zskiplistNode的下一個節點,span是到下一個節點的步長。學習
二、zskiplistspa
zskiplist是跳躍表,用於存儲跳躍表的關鍵信息。3d
typedef struct zskiplist { struct zskiplistNode *header, *tail; unsigned long length; int level; } zskiplist;
該數據結構如圖所示:指針
header指針指向了跳躍表的頭結點,tail指針指向了跳躍表的尾節點,length記錄了跳躍表的長度,level記錄了跳躍表的層數。
3、跳躍表的初始化
跳躍表的初始化使用的是zslCreate()函數,函數的代碼以下所示:
zskiplist *zslCreate(void) { int j; zskiplist *zsl; zsl = zmalloc(sizeof(*zsl)); zsl->level = 1; zsl->length = 0; zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL); for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) { zsl->header->level[j].forward = NULL; zsl->header->level[j].span = 0; } zsl->header->backward = NULL; zsl->tail = NULL; return zsl; }
在zslCreate()函數中,首先使用zmalloc()函數進行內存分配,
void *zmalloc(size_t size) { void *ptr = malloc(size+PREFIX_SIZE); //申請size+PREFIX_SIZE大小的空間,PREFIX_SIZE是size_t或者long long的大小 if (!ptr) zmalloc_oom_handler(size);//異常處理 *((size_t*)ptr) = size;//在多申請的空間記錄申請空間的大小 update_zmalloc_stat_alloc(size+PREFIX_SIZE);//更新總的使用空間 return (char*)ptr+PREFIX_SIZE; }
zmalloc()是malloc()的封裝,是在malloc()的基礎上多分配一個size_t或者long long大小的內存,用來存儲申請的空間的大小。
後續代碼是對zskiplist進行初始化操做,值得一提的是zslCreateNode()函數。
zskiplistNode *zslCreateNode(int level, double score, sds ele) { zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel)); zn->score = score; zn->ele = ele; return zn; }
根據函數名能夠看出,這個函數是在建立第一個zskiplistNode節點,並對其進行初始化。通過上述初始化後,能夠得到zskiplist結果以下圖所示。
4、跳躍表的插入
跳躍表的插入使用的是zslInsert()函數,該函數以下:
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) { zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; unsigned int rank[ZSKIPLIST_MAXLEVEL]; int i, level; serverAssert(!isnan(score)); x = zsl->header; for (i = zsl->level-1; i >= 0; i--) { rank[i] = i == (zsl->level-1) ? 0 : rank[i+1]; while (x->level[i].forward && (x->level[i].forward->score < score || (x->level[i].forward->score == score && sdscmp(x->level[i].forward->ele,ele) < 0))) { rank[i] += x->level[i].span; x = x->level[i].forward; } update[i] = x; } level = zslRandomLevel(); if (level > zsl->level) { for (i = zsl->level; i < level; i++) { rank[i] = 0; update[i] = zsl->header; update[i]->level[i].span = zsl->length; } zsl->level = level; } x = zslCreateNode(level,score,ele); for (i = 0; i < level; i++) { x->level[i].forward = update[i]->level[i].forward; update[i]->level[i].forward = x; x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]); update[i]->level[i].span = (rank[0] - rank[i]) + 1; } for (i = level; i < zsl->level; i++) { update[i]->level[i].span++; } x->backward = (update[0] == zsl->header) ? NULL : update[0]; if (x->level[0].forward) x->level[0].forward->backward = x; else zsl->tail = x; zsl->length++; return x; }
該函數有一個難點,即zslRandomLevel()函數。該函數的定義爲:
int zslRandomLevel(void) { int level = 1; while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF)) level += 1; return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL; }
首先,該函數中ZSKIPLIST_P的值爲0.25,所以表達式ZSKIPLIST_P 0xFFFF的值約爲0.25 65535,而表達式random()&0xFFFF是對random()返回的隨機數進行對0xFFFF的取餘數。所以(random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF) 返回true的機率約爲0.25。因爲是while循環,所以返回的level值的機率狀況以下表所示:
解決了這個函數,zslInsert()函數便容易許多,下面能夠以插入score = 1的節點爲例,來進行一次插入的流程。
x = zsl->header; for (i = zsl->level-1; i >= 0; i--) { rank[i] = i == (zsl->level-1) ? 0 : rank[i+1]; while (x->level[i].forward && (x->level[i].forward->score < score || (x->level[i].forward->score == score && sdscmp(x->level[i].forward->ele,ele) < 0))) { rank[i] += x->level[i].span; x = x->level[i].forward; } update[i] = x; }
x = zsl->header,如今將x賦值爲header;
i = zsl->level-1,因爲level的值爲1,因此這個for循環能夠進入一次;
i =0, zsl->level-1 = 0,兩個值相等. 因此rank[0] = 0;
x->level[0]->forward ,因爲level[0]->forward的值爲null,因此這個while進不去;
update[0] = x,因此如今update[0]的值爲header指向的節點。
level = zslRandomLevel(); if (level > zsl->level) { for (i = zsl->level; i < level; i++) { rank[i] = 0; update[i] = zsl->header; update[i]->level[i].span = zsl->length; } zsl->level = level; }
如今假設由zslRandomLevel()返回的level = 1;
zsl->level = 1, 因此這個if進不去。
x = zslCreateNode(level,score,ele); for (i = 0; i < level; i++) { x->level[i].forward = update[i]->level[i].forward; update[i]->level[i].forward = x; x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]); update[i]->level[i].span = (rank[0] - rank[i]) + 1; }
新建一個node節點x, level = 1, score = 1;
for循環能夠進入一次,for循環內部的代碼爲更新update和新節點x的值,更新後如圖所示。
for (i = level; i < zsl->level; i++) { update[i]->level[i].span++; } x->backward = (update[0] == zsl->header) ? NULL : update[0];
for條件 i < zsl->level 不知足,所以沒法進入for循環,隨後修改 x->backward 的值爲null。
if (x->level[0].forward) x->level[0].forward->backward = x; else zsl->tail = x; zsl->length++; return x;
if條件不知足, 進入else,將x賦值給zsl->tail;
隨後zsl->length自增1。