Redis經常使用數據類型有字符串String、字典dict、列表List、集合Set、有序集合SortedSet,本文將簡單介紹各數據類型及其使用場景,並重點剖析有序集合SortedSet的實現。redis
List的底層實現是相似Linked List雙端鏈表的結構,而不是數組,插入速度快,不須要節點的移動,但不支持隨機訪問,須要順序遍歷到索引所在節點。List有兩個主要的使用場景:sql
爲了解決輪詢的問題,Redis提供了brpop和blpop實現Blocking讀,當List爲空時,等待一段時間再返回,當有數據時,按請求順序返回給各客戶端。(當List爲空時,能夠將請求Blocking讀命令的客戶端加入此List的Blocking讀列表中,有數據時按列表序返回)數據庫
集合Set的底層實現是相似Hash,不過value全爲NULL,set有求並、交、差集及隨機取的功能。使用場景以下:後端
有序集合SortedSet(t_zset.c),集合中的每一個值都帶有分數,按分數排序,底層實現較爲複雜,用到了ziplist、skiplist和dict數據結構,後文將進行詳細介紹。使用場景以下:數組
在redis中,全部數據類型都被封裝在一個redisObject結構中,用於提供統一的接口,結構以下表1:網絡
表1 redisObject數據結構
redisObject源碼(server.h) |
typedef struct redisObject { unsigned type:4;//對象類型,用於分辨字符串、列表、
|
有序列表有壓縮列表ziplist和跳錶skiplist兩種實現方式,經過encoding識別,當數據項數目小於zset_max_ziplist_entries(默認爲128),且保存的全部元素長度不超過zset_max_ziplist_value(默認爲64)時,則用ziplist實現有序集合,不然使用zset結構,zset底層使用skiplist跳錶和dict字典。建立有序集合的關鍵代碼以下表2:app
表2 建立有序集合dom
zaddGenericCommand函數 |
if (server.zset_max_ziplist_entries == 0 || server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr)) { zobj = createZsetObject(); //建立zset } else { zobj = createZsetZiplistObject();//建立ziplist }
|
ziplist是一個內存連續的特殊雙向鏈表LinkList,減小了內存碎片和指針的佔用,用於節省內存,但對ziplist進行操做會致使內存的從新分配,影響性能,故在元素較少時用ziplist。ziplist內存佈局以下:curl
<zlbytes> <zltail> <zllen> <entry> <entry> ... <entry> <zlend>
表3 ziplist在內存中各字節含義
Field |
含義 |
zlbytes(uint32_t) |
ziplist佔用的內存字節數,包括zlbytes自己 |
zltail(uint32_t) |
最後一個entry的offset偏移值 |
zllen(uint16_t) |
數據項entry的個數 |
entry(變長) |
數據項 |
zlend(uint8_t) |
標識ziplist的結束,值爲255 |
數據項entry的內存結構以下:<prevlen> <encoding> <entry-data>,當保存的是小整型數據時,entry沒有entry-data域, encoding自己包含了整型元素值。Entry各字節含義以下表4:
表4 entry各Field含義
Field |
含義 |
prevlen |
上一個數據項entry的長度。當長度小於254字節,則prevlen佔1字節,當長度大於或等於254字節,則prevlen佔5字節,首字節值爲254,剩下4字節表示上一entry長度。 |
encoding |
encoding的值依賴於數據entry-data。首字節的前兩個bit爲00、0一、10,標識entry-data爲字符串,同時表示encoding的長度分別爲一、二、5字節,除前兩個bit,剩下的bit表示字符串長度;前兩個bit爲11,表示entry-data爲整型,接下來的2 bit表示整數類型。entry-data不一樣類型及encoding以下: 1) |00pppppp| - 1 byte,字符串且長度小於等於63字節(6bit) 2) |01pppppp|qqqqqqqq| - 2 bytes,字符串且長度小於等於16383字節(14bit) 3) |10000000|qqqqqqqq|rrrrrrrr|ssssssss|tttttttt| - 5 bytes,字符串且長度大於等於16384(後面四個字節表示長度,首字節的低位6bit設爲0) 4) |11000000| - 1 bytes,len字段爲1字節,後面的entry-data爲整型且類型爲int16_t (2 bytes) 5) |11010000| - 1 bytes, entry-data爲整型且類型爲int32_t (4 bytes) 6) |11100000| - 1 bytes, entry-data爲整型且類型爲int64_t (8 bytes) 7) |11110000| - 1 bytes, entry-data爲整型且佔3 bytes 8) |11111110| - 1 bytes, entry-data爲整型且佔1 bytes 9) |1111xxxx| - (with xxxx between 0000 and 1101),xxxx的值從1到13,可用於表示entry-data(1到12),encoding包含entry-data的值,從而不須要entry-data域 10) |11111111| - 用於標識ziplist的結束 |
entry-data |
具體的數據 |
ziplist在內存中的實例如圖1,zibytes佔4字節(小端存儲),值爲15,表示此ziplist佔用內存15字節;zltail佔4字節,值爲12,表示最後一個數據項entry(這裏是5所在的entry),距離ziplist的開頭offset爲12字節;entries佔2字節,表示數據項數目爲2; "00 f3"表示第一個entry(值爲2),」00」表示前一個entry的長度爲0(prevlen),」f3」對應encoding中的第9種狀況(「11110011」),表示數據爲整型且值爲2;」02 f6」表示第二個entry,」02」表示前一個entry的長度爲2(prevlen),」f6」也對應encoding的第9種狀況(「11110110」),表示數據爲整型且值爲6.
圖1 ziplist在內存中的實例
ziplist在redis中插入數據的源碼及註釋如表5:
表5 ziplist插入數據源碼
ziplist插入邏輯源碼(ziplist.c) |
/* Insert item at "p". */ unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) { size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen; unsigned int prevlensize, prevlen = 0; size_t offset; int nextdiff = 0; unsigned char encoding = 0; long long value = 123456789; /* initialized to avoid warning. Using a value that is easy to see if for some reason we use it uninitialized. */ zlentry tail; /* Find out prevlen for the entry that is inserted. */ //插入位置前面一個entry節點佔用的字節數prevlen if (p[0] != ZIP_END) {//插入節點不在末尾節點,直接從p的前面字節讀 ZIP_DECODE_PREVLEN(p, prevlensize, prevlen); } else {//插入節點在末尾位置,找到末尾節點 unsigned char *ptail = ZIPLIST_ENTRY_TAIL(zl); if (ptail[0] != ZIP_END) { prevlen = zipRawEntryLength(ptail); } } /* See if the entry can be encoded */ if (zipTryEncoding(s,slen,&value,&encoding)) {//判斷s是否能夠轉化爲整數,並將整數值和enconding分別存在value和encoding指針 /* 'encoding' is set to the appropriate integer encoding */ reqlen = zipIntSize(encoding);//整數值長度 } else { /* 'encoding' is untouched, however zipStoreEntryEncoding will use the * string length to figure out how to encode it. */ reqlen = slen;//字符串長度 } /* We need space for both the length of the previous entry and * the length of the payload. */ //得出新插入節點佔用的總字節數reqlen reqlen += zipStorePrevEntryLength(NULL,prevlen); reqlen += zipStoreEntryEncoding(NULL,encoding,slen); /* When the insert position is not equal to the tail, we need to * make sure that the next entry can hold this entry's length in * its prevlen field. */ //插入新節點不在末尾位置,則插入位置p所指向的entry節點的prevlen, //值會變成新插入節點的總長度,且prevlen所佔用的字節數可能會變化, //nextdiff表示新插入節點下一節點的prevlen須要空間的變化,負值表示變小, //正值表示擴大 int forcelarge = 0; nextdiff = (p[0] != ZIP_END) ? zipPrevLenByteDiff(p,reqlen) : 0; if (nextdiff == -4 && reqlen < 4) { nextdiff = 0; forcelarge = 1; } /* Store offset because a realloc may change the address of zl. */ offset = p-zl; zl = ziplistResize(zl,curlen+reqlen+nextdiff);//從新分配空間,並將zl的每字節都填充到新分配的內存中 p = zl+offset; //將p後面的數據項進行移動 /* Apply memory move when necessary and update tail offset. */ if (p[0] != ZIP_END) { /* Subtract one because of the ZIP_END bytes */ memmove(p+reqlen,p-nextdiff,curlen-offset-1+nextdiff); /* Encode this entry's raw length in the next entry. */ if (forcelarge)//設置下一個節點的prevlen zipStorePrevEntryLengthLarge(p+reqlen,reqlen); else zipStorePrevEntryLength(p+reqlen,reqlen); /* Update offset for tail */ ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+reqlen); /* When the tail contains more than one entry, we need to take * "nextdiff" in account as well. Otherwise, a change in the * size of prevlen doesn't have an effect on the *tail* offset. */ zipEntry(p+reqlen, &tail); if (p[reqlen+tail.headersize+tail.len] != ZIP_END) { ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff); } } else { /* This element will be the new tail. */ ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(p-zl); } /* When nextdiff != 0, the raw length of the next entry has changed, so * we need to cascade the update throughout the ziplist */ if (nextdiff != 0) { offset = p-zl; zl = __ziplistCascadeUpdate(zl,p+reqlen); p = zl+offset; } /* Write the entry */ //將新數據項放入插入位置 p += zipStorePrevEntryLength(p,prevlen); p += zipStoreEntryEncoding(p,encoding,slen); if (ZIP_IS_STR(encoding)) { memcpy(p,s,slen); } else { zipSaveInteger(p,value,encoding); } ZIPLIST_INCR_LENGTH(zl,1); return zl; }
|
zset在redis中的定義如表6:
表6 zset源碼
zset定義(server.h) |
typedef struct zset { dict *dict;//字典 zskiplist *zsl;//跳錶 } zset;
|
zset同時使用dict和zskiplist實現有序集合的功能,dict是爲了快速得到指定元素的分值(zscore命令,時間複雜度爲O(1)),zskiplist是爲了快速範圍查詢(zrank、zrange命令)。本文重點講解跳錶的知識。
skiplist是在有序鏈表的基礎上發展而來,在有序鏈表中進行查找,須要進行順序遍歷,時間複雜度爲O(n),一樣,進行插入也須要順序遍歷到插入位置,時間複雜度也爲O(n)。
圖2 有序鏈表
利用有序的性質,每兩個節點多加一個指針,指向下下個節點,如圖3所示,新增長的指針能夠構成一個新的有序鏈表,新鏈表節點個數只有下層鏈表的一半,當查找元素時,能夠重新鏈表開始向右查找,碰到比查找元素大的節點,則回到下一層鏈表查找,好比查找元素20,查找路徑以下圖中標記爲紅的路徑(head->8->17->23,23比20大,到下一層查找,17->20),因爲新增的指針,查找元素時不須要和每一個節點進行比較,須要比較的節點大概爲原來的一半。
圖3 雙層有序鏈表
能夠在新產生的鏈表之上,每隔兩個節點,再增長一個指針,從而產生第三層鏈表,如圖4所示,紅色箭頭表明查找路徑,從最上層鏈表開始查找,一次能夠跳過四個節點,進一步加快了查找速度。
圖4 多層有向鏈表
skiplist借鑑了多層鏈表的思想,但多層鏈表這種嚴格的2:1關係,會致使插入和刪除節點破壞上下層之間的2:1關係,致使插入位置和刪除位置及後續的全部節點都須要進行調整。skiplist並不採用這種嚴格的2:1對應關係,每一個節點的層數採用隨機生成的方法,節點插入例子以下圖5所示,插入節點不會影響其它節點的層數,且只需調整插入節點先後的指針,不須要對全部節點進行調整,下降了插入的複雜度。
圖5 skiplist插入節點過程
skiplist隨機生成層數level的的代碼如表7:
表7 隨機生成節點層數
zslRandomLevel函數(t_zset.c) |
int zslRandomLevel(void) { //隨機生成節點層數,當第i層節點存在時,第i+1節點存在的機率爲ZSKIPLIST_P = 1/4 //ZSKIPLIST_MAXLEVEL 64,表示節點的最大層數 int level = 1; while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF)) level += 1; return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL; }
|
skiplist時間複雜度爲o(),所佔用空間的大小依賴於插入元素隨機生成的層數,每一個元素level至少爲1,層數越高,生成的機率越低,節點的層數服從必定的機率分佈,以下:
每一個節點的平均層數計算以下:
平均層數表明每一個節點的平均指針數目,在redis中,p=1/4,所以平均指針數目爲1.33。
在redis中skiplist的定義代碼如表8,zskiplist表示跳錶, zskiplistNode表示跳錶中的節點, zskiplistNode包含了分值,每一個節點按分值排序,且節點包含後退指針,用於雙向遍歷。
表8 redis中跳錶結構
zskiplist及zskiplistNode(server.h) |
/* ZSETs use a specialized version of Skiplists */ typedef struct zskiplistNode { sds ele;//實際存儲的數據 double score;//分值 struct zskiplistNode *backward;//後退指針,指向前一個節點 struct zskiplistLevel { struct zskiplistNode *forward;//前進指針,指向下一個節點 unsigned long span;//跨度,表示該層鏈表的這一節點到下一節點跨越的節點數,用於計算rank } level[];//層級數組,每一個層級都有到下一個節點的指針和跨度 } zskiplistNode;//跳錶節點 typedef struct zskiplist { struct zskiplistNode *header, *tail;//跳錶頭節點和尾節點 unsigned long length;//跳錶元素個數 int level;//跳錶的最高層數(不包括頭節點,頭節點實際上並不存儲數據) } zskiplist;
|
redis中,zskiplist插入元素的代碼如表9,在查找插入位置的過程當中,記下每層須要更新的前一節點在update數組中。
表9 跳錶插入節點源代碼
zslInsert(t_zset.c) |
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) { zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; unsigned int rank[ZSKIPLIST_MAXLEVEL]; int i, level; serverAssert(!isnan(score)); x = zsl->header; for (i = zsl->level-1; i >= 0; i--) { /* store rank that is crossed to reach the insert position */ //rank[i]初始化爲rank[i+1],因此rank[i]-rank[i+1]表示在i層走過的節點數 rank[i] = i == (zsl->level-1) ? 0 : rank[i+1]; while (x->level[i].forward && (x->level[i].forward->score < score || (x->level[i].forward->score == score && sdscmp(x->level[i].forward->ele,ele) < 0))) { rank[i] += x->level[i].span; x = x->level[i].forward; } // 記錄將要和新節點相鏈接的節點,x表示新節點在i層鏈接的上一節點 update[i] = x; } /* we assume the element is not already inside, since we allow duplicated * scores, reinserting the same element should never happen since the * caller of zslInsert() should test in the hash table if the element is * already inside or not. */ level = zslRandomLevel();//隨機生成此節點的層數 if (level > zsl->level) { for (i = zsl->level; i < level; i++) { rank[i] = 0; update[i] = zsl->header; update[i]->level[i].span = zsl->length; } zsl->level = level; } x = zslCreateNode(level,score,ele); for (i = 0; i < level; i++) { x->level[i].forward = update[i]->level[i].forward; update[i]->level[i].forward = x; /* update span covered by update[i] as x is inserted here */ //rank[0]表示0層鏈表,插入節點x左邊的節點數 //rank[i]表示i層鏈表,插入節點x左邊的節點數 //rank[0] - rank[i]+1表示i層鏈表,x前一節點到x的跨度 x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]); update[i]->level[i].span = (rank[0] - rank[i]) + 1; } /* increment span for untouched levels */ //在level及之上的每層,update[i]到下一節點的距離因爲插入了x節點而加1 for (i = level; i < zsl->level; i++) { update[i]->level[i].span++; } //更新後退指針 x->backward = (update[0] == zsl->header) ? NULL : update[0]; if (x->level[0].forward) x->level[0].forward->backward = x; else zsl->tail = x; zsl->length++; return x; }
|
與平衡樹(AVL、紅黑樹)比,skiplist有以下優勢,這也是redis使用跳錶作有序集合底層結構而不選用平衡樹的緣由。