跳躍表是Redis zset的底層實現之一,zset在member較多時會採用跳躍表做爲底層實現,它在添加、刪除、查找節點上都擁有與紅黑樹至關的性能,它其實說白了就是一種特殊的鏈表,鏈表的每一個節點存了不一樣的「層」信息,用這種分層存節點的方式在查找節點時能跳過些節點,從而使添加、刪除、查找操做都擁有了O(logn)的平均時間複雜度。
下面簡單介紹一下跳躍表:
跳躍表最低層(第一層)是一個擁有跳躍表全部節點的普通鏈表,每次在往跳躍表插入鏈表節點時必定會插入到這個最低層,至因而否插入到上層 就由拋硬幣決定(這麼說不是很準確,redis裏這個機率是1/4而非1/2,爲了表述方便先這麼說),什麼意思呢?假設已經有一個跳躍表,其高度只有一層:
node
往表中插入節點「7」時,假設插入7時拋硬幣的結果是正,則在第二層中插入「7」節點,繼續拋一次看看還能不能上到第三層,爲反則中止插入,上層再也不插入「7」節點了:redis
同理插入「4」節點假設連續拋兩次都拋了正面,第三次拋了反面,則「4」節點會插入到二、3層:spring
這就是跳躍表最基本的樣子。數組
查找一個節點時,咱們只需從高層到低層,一個個鏈表查找,每次找到該層鏈表中小於等於目標節點的最大節點,直到找到爲止。因爲高層的鏈表迭代時會「跳過」低層的部分節點,因此跳躍表會比正常的鏈表查找少查部分節點,這也是skiplist名字的由來。緩存
假如咱們須要查找節點「5」:
先遍歷最高層,發現第三層頭結點的下一個節點是「4」,4<5,因此遊標定位到「4」節點,可是「4」節點的下一個節點是空,得繼續往低層走;第二層也從「4」節點開始,「4」節點在第二層的下一個節點是「7」,7>5,公交車作過頭了,回來依舊定位在「4」節點;繼續往低層走,第一層「4」節點的下一個節點是「5」,這就找到了。安全
事實上插入前也須要進行跳躍表查找操做,上文演示的插入流程只是直接用了鏈表而省略了這一步。
跳躍表有了個大概的瞭解,接下來咱們細說redis裏的skiplist數據結構
redis的skiplist有兩個主要的數據結構,併發
typedef struct zskiplistNode { robj *obj;//zset的member double score;//zset的score struct zskiplistNode *backward;//指向上一個節點,用於zrevrange命令 struct zskiplistLevel { struct zskiplistNode *forward;//指向下一個節點 unsigned int span;//到達後一個節點的跨度(兩個相鄰節點span爲1) } level[];//該節點在各層的信息,柔性數組成員 } zskiplistNode;
backward變量是特地爲zrevrange*系列命令準備的,目的是爲了使跳躍表實現反向遍歷,普通跳躍表的實現裏是非必要的。app
level變量記錄了此節點各層的信息:less
typedef struct zskiplist { struct zskiplistNode *header, *tail;//跳躍表頭尾節點 unsigned long length;//節點個數 int level;//除頭結點外最大的層數 } zskiplist;
zskiplist的頭結點不是一個有效的節點,它有ZSKIPLIST_MAXLEVEL層(32層),每層的forward指向該層跳躍表的第一個節點,若沒有則爲null。
btw:跳躍表節點的層數限制在了32,若想超過32層得連續32次拋硬幣都獲得正面,這得有足夠多的節點,redis限定了拋硬幣正面的機率爲1/4,因此到達32層的機率爲(1/2)^64,通常一臺64位的計算機能擁有的最大內存也沒法存儲這麼多zskiplistNode,因此對於基本使用 32層的上限已經足夠高了,再高也不必 浪費頭節點的內存。
最終zskiplist內存佈局以下:
也能夠去掉沒必要要的部分(backward、obj),做出以下抽象後的圖:
下文咱們都經過簡化圖來分析跳躍表的基本操做流程。
建立一個zskiplist就是建立一個高度爲ZSKIPLIST_MAXLEVEL(32)的頭結點,score爲0,member爲null。
代碼以下:
zskiplist *zslCreate(void) { int j; zskiplist *zsl; zsl = zmalloc(sizeof(*zsl)); zsl->level = 1; zsl->length = 0; zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);//畫圖,32level的頭結點 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) { //頭結點每一個level的下一個節點都初始化爲null,跨度爲0 zsl->header->level[j].forward = NULL; zsl->header->level[j].span = 0; } zsl->header->backward = NULL; zsl->tail = NULL; return zsl; } //爲指定高度的節點分配空間並賦值,insert操做也要用到 zskiplistNode *zslCreateNode(int level, double score, robj *obj) { zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));//柔性數組成員 zn->score = score; zn->obj = obj; return zn; }
執行完zslCreate後會獲得以下佈局:
插入一個節點時須要作如下工做
要找到新節點的插入位置,只需像上文介紹的那樣,從高層向低層找便可。在找的過程當中用update[]數組記錄每一層插入位置的上一個節點,用rank[]數組記錄每一層插入位置的上一個節點在跳躍表中的排名。根據update[]插入新節點,插入完成後再根據rank[]更新跨度信息便可。
ps:redis容許節點有重複的score,當score相同時根據member(代碼裏的obj指向的字符串)的字典序來排名。
/* Returns a random level for the new skiplist node we are going to create. * The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL * (both inclusive), with a powerlaw-alike distribution where higher * levels are less likely to be returned. */ int zslRandomLevel(void) {//返回一個隨機的層數,不是level的索引是層數 int level = 1; while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))//有1/4的機率加入到上一層中 level += 1; return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL; } //根據score、member插入一個節點到某個zskiplist中 //調用這個函數前須要確認obj(member)還不在跳躍表裏,避免重複插入 zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) { zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;//每一層的最後一個小於score的節點,由於插入節點須要修改每一層這個節點的上一個節點的信息(跨度),因此得保留一下,更重要的,這是保留插入位置 unsigned int rank[ZSKIPLIST_MAXLEVEL];//記錄每一層插入節點的上一個節點在skiplist中的排名 int i, level;//變量i做爲zslInsert函數裏循環的索引值,變量level爲插入節點的層數(不是層數的索引) serverAssert(!isnan(score));//isnan是一個macro,用於判斷參數是不是NAN(非數字) x = zsl->header;//x用於迭代zskiplistNode for (i = zsl->level-1; i >= 0; i--) {//從最高層向最底層查詢,找到合適的插入位置 /* store rank that is crossed to reach the insert position */ rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];//記錄每一層插入節點的上一個節點的排名 while (x->level[i].forward &&//當前層的下一個節點存在 (x->level[i].forward->score < score ||//下一個節點的分數小於須要插入的分數 (x->level[i].forward->score == score &&//score相同的狀況下,根據member字符串的大小來比較(二進制安全的memcmp) compareStringObjects(x->level[i].forward->obj,obj) < 0))) { rank[i] += x->level[i].span;//每層的跨度 x = x->level[i].forward;//下一個節點 } update[i] = x;//當前層的最後一個小於score的節點 } /* we assume the key is not already inside, since we allow duplicated * scores, and the re-insertion of score and redis object should never * happen since the caller of zslInsert() should test in the hash table * if the element is already inside or not. */ level = zslRandomLevel(); if (level > zsl->level) {//大於以前跳躍表的高度因此沒有記錄update[i],由於插入的節點有這麼高因此要修改這些頭結點的信息 for (i = zsl->level; i < level; i++) { rank[i] = 0; update[i] = zsl->header; update[i]->level[i].span = zsl->length;//高出部分的頭結點在還沒插入當前節點時跨度應該是整張表,插入以後會從新更新這個值 } zsl->level = level; } x = zslCreateNode(level,score,obj); for (i = 0; i < level; i++) { x->level[i].forward = update[i]->level[i].forward; update[i]->level[i].forward = x; /* update span covered by update[i] as x is inserted here */ //rank[0]是x在第0層的上一個節點的實際排名,rank[i]是x在第i層的上一個節點的實際排名,它們倆的差值爲x在第i層的上一個節點與x之間的距離 x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]); update[i]->level[i].span = (rank[0] - rank[i]) + 1; } /* increment span for untouched levels */ for (i = level; i < zsl->level; i++) { update[i]->level[i].span++; } x->backward = (update[0] == zsl->header) ? NULL : update[0]; if (x->level[0].forward) x->level[0].forward->backward = x; else zsl->tail = x;//尾節點 zsl->length++; return x; }
假如往圖2插入一個score爲7的節點,則會按照下圖方式所示進行:
找到插入位置(藍色的線表示查找路徑):
void zslFreeNode(zskiplistNode *node) { decrRefCount(node->obj);//member的引用計數-1,防止內存泄漏 zfree(node); }
void zslFree(zskiplist *zsl) { //任何一個節點必定有level[0],因此迭代level[0]來刪除全部節點 zskiplistNode *node = zsl->header->level[0].forward, *next; zfree(zsl->header); while(node) { next = node->level[0].forward; zslFreeNode(node); node = next; } zfree(zsl); }
主要分爲如下3個步驟:
/* Delete an element with matching score/object from the skiplist. */ //從skiplist邏輯上刪除一個節點並釋放該節點的內存 int zslDelete(zskiplist *zsl, double score, robj *obj) { zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; int i; x = zsl->header; for (i = zsl->level-1; i >= 0; i--) { while (x->level[i].forward && (x->level[i].forward->score < score || (x->level[i].forward->score == score && compareStringObjects(x->level[i].forward->obj,obj) < 0))) x = x->level[i].forward; update[i] = x; } /* We may have multiple elements with the same score, what we need * is to find the element with both the right score and object. */ x = x->level[0].forward;//要刪除的節點 if (x && score == x->score && equalStringObjects(x->obj,obj)) { zslDeleteNode(zsl, x, update); zslFreeNode(x);//obj的引用計數-1並釋放節點內存 return 1; } return 0; /* not found */ } /* Internal function used by zslDelete, zslDeleteRangeByScore and zslDeleteRangeByRank and zslDeleteRangeByLex*/ //從skiplist邏輯上刪除一個節點(不釋放內存,僅改變節點位置關係) //x爲要刪除的節點 //update爲每一層x的上一個節點(爲了更新x上一個節點的forward和span屬性) void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) { int i; for (i = 0; i < zsl->level; i++) { if (update[i]->level[i].forward == x) {//當前層有x節點 update[i]->level[i].span += x->level[i].span - 1;//...=x->level[i].span; update[i]->level[i].forward = x->level[i].forward;//跨過x節點 } else {//當前層沒有x節點 update[i]->level[i].span -= 1; } } if (x->level[0].forward) {//是不是tail節點 x->level[0].forward->backward = x->backward; } else { zsl->tail = x->backward; } while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)//刪除了最高層數的節點 zsl->level--; zsl->length--; }
代碼懶得貼了
根據查找範圍的類型 zskiplist查找能夠分爲三類:
功能:給定一個zero-based排名範圍(start,end),從zskiplist中找出知足該範圍的全部節點。
zrangeGenericCommand函數考慮了一些與客戶端交互的內容,學zskiplist的時候不必細看,它主要分爲如下兩步:
下面是zslGetElementByRank的代碼:
/* Finds an element by its rank. The rank argument needs to be 1-based. */ //O(logn) zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) { zskiplistNode *x; unsigned long traversed = 0; int i; x = zsl->header; for (i = zsl->level-1; i >= 0; i--) { //從高層向底層累加span直到累加的值等於rank while (x->level[i].forward && (traversed + x->level[i].span) <= rank) { traversed += x->level[i].span; x = x->level[i].forward; } if (traversed == rank) { return x; } } return NULL; }
功能:給定一個score的範圍,從zskiplist中找出知足該score範圍的全部節點。
爲了方便的表示score範圍的開閉區間,redis在server.h裏聲明瞭一個表示zset分數區間的類型zrangespec,關於score範圍查找的相關函數都會用到它:
/* Struct to hold a inclusive/exclusive range spec by score comparison. */ typedef struct { double min, max; //是不是開閉區間,1爲開,0位閉 int minex, maxex; /* are min or max exclusive? */ } zrangespec;
判斷一個score與zrangespec區間內最小值、最大值的關係:
//給定value是否>(>=)此範圍的下界 //gte(greater than or equal to) static int zslValueGteMin(double value, zrangespec *spec) { return spec->minex ? (value > spec->min) : (value >= spec->min); } //給定value是否<(<=)此範圍的上界 //lte(less than or equal to) int zslValueLteMax(double value, zrangespec *spec) { return spec->maxex ? (value < spec->max) : (value <= spec->max); }
根據上述兩個函數,就能夠用O(1)時間複雜度判斷一個zskiplist的score是否與zrangespec分數區間有交集:
/* Returns if there is a part of the zset is in range. */ //用O(1)的時間複雜度判斷zset(zsl)的分數範圍是否與給定分數範圍(range)有交集。 // //range(6,9] zsl{1,2,3,4,5} 或zsl{10,12,13} 都是不在範圍內 // int zslIsInRange(zskiplist *zsl, zrangespec *range) { zskiplistNode *x; /* Test for ranges that will always be empty. */ if (range->min > range->max || (range->min == range->max && (range->minex || range->maxex))) return 0; x = zsl->tail; if (x == NULL || !zslValueGteMin(x->score,range))//尾節點小於範圍下界 return 0; x = zsl->header->level[0].forward; if (x == NULL || !zslValueLteMax(x->score,range))//頭節點大於範圍上界 return 0; return 1;//在 }
genericZrangebyscoreCommand函數也考慮了不少與客戶端交互的內容,就學習底層跳躍表實現時不必細看,咱們只須要知道底層是如何作到的便可,主要執行以下步驟:
放一下核心的代碼zslFirstInRange:
/* Find the first node that is contained in the specified range. * Returns NULL when no element is contained in the range. */ //知足range條件最小的那個 O(logn) zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec *range) { zskiplistNode *x; int i; /* If everything is out of range, return early. */ if (!zslIsInRange(zsl,range)) return NULL;//保證下面的邏輯必定能找到範圍內的節點 x = zsl->header; for (i = zsl->level-1; i >= 0; i--) { /* Go forward while *OUT* of range. */ while (x->level[i].forward && !zslValueGteMin(x->level[i].forward->score,range)) x = x->level[i].forward; } /* This is an inner range, so the next node cannot be NULL. */ x = x->level[0].forward; serverAssert(x != NULL); /* Check if score <= max. */ if (!zslValueLteMax(x->score,range)) return NULL; return x; }
功能:在一個全部節點的score都相同的zskiplist中,找到知足member字符串字典序範圍的全部節點。
底層用memcmp比較兩個字符串的大小。實現的流程與genericZrangebyscoreCommand很像,有興趣再看。
zslgetrank 根據member和score得到節點在該skiplist中的rank
zslParseRange 把客戶端傳過來的範圍min、max轉換成zrangespec區間類型 返回給range參數。
...
其實做者Antirez已經給出了答覆:
https://news.ycombinator.com/item?id=1171423
劃重點:They are not very memory intensive. It's up to you basically.
既然取決於本身,skiplist實現簡單就選它了。至於可能的好處和壞處大概整理了一下有這些:
缺點:
優勢:
Skip Lists: A Probabilistic Alternative to Balanced Trees
Skip Lists: Done Right