Redis數據類型使用場景及有序集合SortedSet底層實現詳解

  Redis經常使用數據類型有字符串String、字典dict、列表List、集合Set、有序集合SortedSet,本文將簡單介紹各數據類型及其使用場景,並重點剖析有序集合SortedSet的實現。redis

  List的底層實現是相似Linked List雙端鏈表的結構,而不是數組,插入速度快,不須要節點的移動,但不支持隨機訪問,須要順序遍歷到索引所在節點。List有兩個主要的使用場景:sql

  1. 記住用戶最新發表的博文,每次用戶發表了文章,將文章id使用LPUSH加入到列表中,用戶訪問本身的主頁時,使用LRANGE 0 9獲取最新10條博文(使用LTRIM 0 9能夠取出最新10條文章的同時,刪除舊的文章),而不用使用order by sql語句去後端數據庫取數據。
  2. 生產者/消費者模式,生產者往List中加入數據,消費者從List中取數據。當List爲空時,消費者rpop返回爲NULL,這是會進行輪詢,等待一段時間繼續去取。輪詢模式有以下缺點:
    1. 客戶端和redis耗費cpu和網絡帶寬等資源執行無效命令。
    2. 取回NULL後,sleep會使有新數據時,客戶端消費不夠及時。

  爲了解決輪詢的問題,Redis提供了brpop和blpop實現Blocking讀,當List爲空時,等待一段時間再返回,當有數據時,按請求順序返回給各客戶端。(當List爲空時,能夠將請求Blocking讀命令的客戶端加入此List的Blocking讀列表中,有數據時按列表序返回)數據庫

  集合Set的底層實現是相似Hash,不過value全爲NULL,set有求並、交、差集及隨機取的功能。使用場景以下:後端

  1. 表示對象之間的聯繫,好比求擁有標籤一、二、10的新聞,使用sinter tag:1:news tag:2:news tag:10:news。
  2. 隨機發牌,使用spop,spop隨機返回集合中的元素,好比給每位玩家發五張牌,每位玩家調用五次spop便可,爲了下次發牌不須要再將牌加入set,能夠在此次發牌前調用sunionstore將牌複製。

  有序集合SortedSet(t_zset.c),集合中的每一個值都帶有分數,按分數排序,底層實現較爲複雜,用到了ziplist、skiplist和dict數據結構,後文將進行詳細介紹。使用場景以下:數組

  1. 排行榜問題,好比遊戲排行榜,按用戶分數排序,並取top N個用戶。

  在redis中,全部數據類型都被封裝在一個redisObject結構中,用於提供統一的接口,結構以下表1:網絡

 表1 redisObject數據結構

redisObject源碼(server.h)
typedef struct redisObject {
    unsigned type:4;//對象類型,用於分辨字符串、列表、
//集合、有序集合、字典,有序集合爲REDIS_ZSET
unsigned encoding:4;//編碼,標識底層數據結構,
//有序集合有REDIS_ENCODING_ZIPLIST(壓縮列表)、REDIS_ENCODING_SKIPLIST(跳錶)
//記錄鍵最近一次被訪問的時間,長時間不被訪問的對象可被內存回收 unsigned lru:LRU_BITS;

/* LRU time (relative to global lru_clock) or * LFU data (least significant 8 bits frequency * and most significant 16 bits access time). */ int refcount;//引用計數,用於對象內存回收,
//當爲0時回收內存,引用計數可實現不一樣鍵相同值的共享,
//事實上,redis會初始化建立0到9999個整數對象用於共享,從而節約內存
void *ptr;//指向底層數據結構實例的指針 } robj;

 

 

   有序列表有壓縮列表ziplist和跳錶skiplist兩種實現方式,經過encoding識別,當數據項數目小於zset_max_ziplist_entries(默認爲128),且保存的全部元素長度不超過zset_max_ziplist_value(默認爲64)時,則用ziplist實現有序集合,不然使用zset結構,zset底層使用skiplist跳錶和dict字典。建立有序集合的關鍵代碼以下表2:app

表2 建立有序集合dom

zaddGenericCommand函數
if (server.zset_max_ziplist_entries == 0 ||
server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
        {
            zobj = createZsetObject(); //建立zset
        } else {
            zobj = createZsetZiplistObject();//建立ziplist
        }

 

  ziplist是一個內存連續的特殊雙向鏈表LinkList,減小了內存碎片和指針的佔用,用於節省內存,但對ziplist進行操做會致使內存的從新分配,影響性能,故在元素較少時用ziplist。ziplist內存佈局以下:curl

<zlbytes> <zltail> <zllen> <entry> <entry> ... <entry> <zlend>

表3 ziplist在內存中各字節含義

Field
含義
zlbytes(uint32_t)
ziplist佔用的內存字節數,包括zlbytes自己
zltail(uint32_t)
最後一個entry的offset偏移值
zllen(uint16_t)
數據項entry的個數
entry(變長)
數據項
zlend(uint8_t)
標識ziplist的結束,值爲255

  數據項entry的內存結構以下:<prevlen> <encoding> <entry-data>,當保存的是小整型數據時,entry沒有entry-data域, encoding自己包含了整型元素值。Entry各字節含義以下表4:

表4 entry各Field含義

Field
含義
prevlen
上一個數據項entry的長度。當長度小於254字節,則prevlen佔1字節,當長度大於或等於254字節,則prevlen佔5字節,首字節值爲254,剩下4字節表示上一entry長度。
encoding
encoding的值依賴於數據entry-data。首字節的前兩個bit爲00、0一、10,標識entry-data爲字符串,同時表示encoding的長度分別爲一、二、5字節,除前兩個bit,剩下的bit表示字符串長度;前兩個bit爲11,表示entry-data爲整型,接下來的2 bit表示整數類型。entry-data不一樣類型及encoding以下:
1)       |00pppppp| - 1 byte,字符串且長度小於等於63字節(6bit)
2)       |01pppppp|qqqqqqqq| - 2 bytes,字符串且長度小於等於16383字節(14bit)
3)       |10000000|qqqqqqqq|rrrrrrrr|ssssssss|tttttttt| - 5 bytes,字符串且長度大於等於16384(後面四個字節表示長度,首字節的低位6bit設爲0)
4)       |11000000| - 1 bytes,len字段爲1字節,後面的entry-data爲整型且類型爲int16_t (2 bytes)
5)       |11010000| - 1 bytes, entry-data爲整型且類型爲int32_t (4 bytes)
6)       |11100000| - 1 bytes, entry-data爲整型且類型爲int64_t (8 bytes)
7)       |11110000| - 1 bytes, entry-data爲整型且佔3 bytes
8)       |11111110| - 1 bytes, entry-data爲整型且佔1 bytes
9)       |1111xxxx| - (with xxxx between 0000 and 1101),xxxx的值從1到13,可用於表示entry-data(1到12),encoding包含entry-data的值,從而不須要entry-data域
10)    |11111111| - 用於標識ziplist的結束
entry-data
具體的數據

  ziplist在內存中的實例如圖1,zibytes佔4字節(小端存儲),值爲15,表示此ziplist佔用內存15字節;zltail佔4字節,值爲12,表示最後一個數據項entry(這裏是5所在的entry),距離ziplist的開頭offset爲12字節;entries佔2字節,表示數據項數目爲2; "00 f3"表示第一個entry(值爲2),」00」表示前一個entry的長度爲0(prevlen),」f3」對應encoding中的第9種狀況(「11110011」),表示數據爲整型且值爲2;」02 f6」表示第二個entry,」02」表示前一個entry的長度爲2(prevlen),」f6」也對應encoding的第9種狀況(「11110110」),表示數據爲整型且值爲6.

圖1 ziplist在內存中的實例

  ziplist在redis中插入數據的源碼及註釋如表5:

表5 ziplist插入數據源碼

ziplist插入邏輯源碼(ziplist.c)

/* Insert item at "p". */

unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {

    size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen;

    unsigned int prevlensize, prevlen = 0;

    size_t offset;

    int nextdiff = 0;

    unsigned char encoding = 0;

    long long value = 123456789; /* initialized to avoid warning. Using a value

                                    that is easy to see if for some reason

                                    we use it uninitialized. */

    zlentry tail;

 

    /* Find out prevlen for the entry that is inserted. */

    //插入位置前面一個entry節點佔用的字節數prevlen

    if (p[0] != ZIP_END) {//插入節點不在末尾節點,直接從p的前面字節讀

        ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);

    } else {//插入節點在末尾位置,找到末尾節點

        unsigned char *ptail = ZIPLIST_ENTRY_TAIL(zl);

        if (ptail[0] != ZIP_END) {

            prevlen = zipRawEntryLength(ptail);

        }

    }

    /* See if the entry can be encoded */

    if (zipTryEncoding(s,slen,&value,&encoding)) {//判斷s是否能夠轉化爲整數,並將整數值和enconding分別存在value和encoding指針

        /* 'encoding' is set to the appropriate integer encoding */

        reqlen = zipIntSize(encoding);//整數值長度

    } else {

        /* 'encoding' is untouched, however zipStoreEntryEncoding will use the

         * string length to figure out how to encode it. */

        reqlen = slen;//字符串長度

    }

 

    /* We need space for both the length of the previous entry and

     * the length of the payload. */

    //得出新插入節點佔用的總字節數reqlen

    reqlen += zipStorePrevEntryLength(NULL,prevlen);

    reqlen += zipStoreEntryEncoding(NULL,encoding,slen);

 

    /* When the insert position is not equal to the tail, we need to

     * make sure that the next entry can hold this entry's length in

     * its prevlen field. */

    //插入新節點不在末尾位置,則插入位置p所指向的entry節點的prevlen,

    //值會變成新插入節點的總長度,且prevlen所佔用的字節數可能會變化,

    //nextdiff表示新插入節點下一節點的prevlen須要空間的變化,負值表示變小,

    //正值表示擴大

    int forcelarge = 0;

    nextdiff = (p[0] != ZIP_END) ? zipPrevLenByteDiff(p,reqlen) : 0;

    if (nextdiff == -4 && reqlen < 4) {

        nextdiff = 0;

        forcelarge = 1;

    }

 

    /* Store offset because a realloc may change the address of zl. */

    offset = p-zl;

    zl = ziplistResize(zl,curlen+reqlen+nextdiff);//從新分配空間,並將zl的每字節都填充到新分配的內存中

    p = zl+offset;

    //將p後面的數據項進行移動

    /* Apply memory move when necessary and update tail offset. */

    if (p[0] != ZIP_END) {

        /* Subtract one because of the ZIP_END bytes */

        memmove(p+reqlen,p-nextdiff,curlen-offset-1+nextdiff);

 

        /* Encode this entry's raw length in the next entry. */

        if (forcelarge)//設置下一個節點的prevlen

            zipStorePrevEntryLengthLarge(p+reqlen,reqlen);

        else

            zipStorePrevEntryLength(p+reqlen,reqlen);

 

        /* Update offset for tail */

        ZIPLIST_TAIL_OFFSET(zl) =

            intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+reqlen);

 

        /* When the tail contains more than one entry, we need to take

         * "nextdiff" in account as well. Otherwise, a change in the

         * size of prevlen doesn't have an effect on the *tail* offset. */

        zipEntry(p+reqlen, &tail);

        if (p[reqlen+tail.headersize+tail.len] != ZIP_END) {

            ZIPLIST_TAIL_OFFSET(zl) =

                intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);

        }

    } else {

        /* This element will be the new tail. */

        ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(p-zl);

    }

 

    /* When nextdiff != 0, the raw length of the next entry has changed, so

     * we need to cascade the update throughout the ziplist */

    if (nextdiff != 0) {

        offset = p-zl;

        zl = __ziplistCascadeUpdate(zl,p+reqlen);

        p = zl+offset;

    }

 

    /* Write the entry */

    //將新數據項放入插入位置

    p += zipStorePrevEntryLength(p,prevlen);

    p += zipStoreEntryEncoding(p,encoding,slen);

    if (ZIP_IS_STR(encoding)) {

        memcpy(p,s,slen);

    } else {

        zipSaveInteger(p,value,encoding);

    }

    ZIPLIST_INCR_LENGTH(zl,1);

    return zl;

}

 

  zset在redis中的定義如表6:

表6 zset源碼

zset定義(server.h)

typedef struct zset {

    dict *dict;//字典

    zskiplist *zsl;//跳錶

} zset;

 

  zset同時使用dict和zskiplist實現有序集合的功能,dict是爲了快速得到指定元素的分值(zscore命令,時間複雜度爲O(1)),zskiplist是爲了快速範圍查詢(zrank、zrange命令)。本文重點講解跳錶的知識。

  skiplist是在有序鏈表的基礎上發展而來,在有序鏈表中進行查找,須要進行順序遍歷,時間複雜度爲O(n),一樣,進行插入也須要順序遍歷到插入位置,時間複雜度也爲O(n)。

 

圖2 有序鏈表

  利用有序的性質,每兩個節點多加一個指針,指向下下個節點,如圖3所示,新增長的指針能夠構成一個新的有序鏈表,新鏈表節點個數只有下層鏈表的一半,當查找元素時,能夠重新鏈表開始向右查找,碰到比查找元素大的節點,則回到下一層鏈表查找,好比查找元素20,查找路徑以下圖中標記爲紅的路徑(head->8->17->23,23比20大,到下一層查找,17->20),因爲新增的指針,查找元素時不須要和每一個節點進行比較,須要比較的節點大概爲原來的一半。

圖3 雙層有序鏈表

  能夠在新產生的鏈表之上,每隔兩個節點,再增長一個指針,從而產生第三層鏈表,如圖4所示,紅色箭頭表明查找路徑,從最上層鏈表開始查找,一次能夠跳過四個節點,進一步加快了查找速度。

圖4 多層有向鏈表

 

  skiplist借鑑了多層鏈表的思想,但多層鏈表這種嚴格的2:1關係,會致使插入和刪除節點破壞上下層之間的2:1關係,致使插入位置和刪除位置及後續的全部節點都須要進行調整。skiplist並不採用這種嚴格的2:1對應關係,每一個節點的層數採用隨機生成的方法,節點插入例子以下圖5所示,插入節點不會影響其它節點的層數,且只需調整插入節點先後的指針,不須要對全部節點進行調整,下降了插入的複雜度。

圖5 skiplist插入節點過程

  skiplist隨機生成層數level的的代碼如表7:

  表7 隨機生成節點層數

zslRandomLevel函數(t_zset.c)

int zslRandomLevel(void) {

    //隨機生成節點層數,當第i層節點存在時,第i+1節點存在的機率爲ZSKIPLIST_P = 1/4

    //ZSKIPLIST_MAXLEVEL 64,表示節點的最大層數

    int level = 1;

    while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))

        level += 1;

    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;

}

 

  skiplist時間複雜度爲o(),所佔用空間的大小依賴於插入元素隨機生成的層數,每一個元素level至少爲1,層數越高,生成的機率越低,節點的層數服從必定的機率分佈,以下:

  1. 節點剛好只有一層的機率爲1-p
  2. 節點層數大於等於2的機率爲p,剛好等於2的機率爲p(1-p)
  3. 節點層數大於等於k的機率爲pk-1,剛好等於k的機率爲pk-1(1-p)

  每一個節點的平均層數計算以下:

  

  平均層數表明每一個節點的平均指針數目,在redis中,p=1/4,所以平均指針數目爲1.33。

  在redis中skiplist的定義代碼如表8,zskiplist表示跳錶, zskiplistNode表示跳錶中的節點, zskiplistNode包含了分值,每一個節點按分值排序,且節點包含後退指針,用於雙向遍歷。

表8 redis中跳錶結構

zskiplist及zskiplistNode(server.h)

/* ZSETs use a specialized version of Skiplists */

typedef struct zskiplistNode {

    sds ele;//實際存儲的數據

    double score;//分值

    struct zskiplistNode *backward;//後退指針,指向前一個節點

    struct zskiplistLevel {

        struct zskiplistNode *forward;//前進指針,指向下一個節點

        unsigned long span;//跨度,表示該層鏈表的這一節點到下一節點跨越的節點數,用於計算rank

    } level[];//層級數組,每一個層級都有到下一個節點的指針和跨度

} zskiplistNode;//跳錶節點

 

typedef struct zskiplist {

    struct zskiplistNode *header, *tail;//跳錶頭節點和尾節點

    unsigned long length;//跳錶元素個數

    int level;//跳錶的最高層數(不包括頭節點,頭節點實際上並不存儲數據)

} zskiplist;

 

   redis中,zskiplist插入元素的代碼如表9,在查找插入位置的過程當中,記下每層須要更新的前一節點在update數組中。

表9 跳錶插入節點源代碼

zslInsert(t_zset.c)

zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {

    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;

    unsigned int rank[ZSKIPLIST_MAXLEVEL];

    int i, level;

 

    serverAssert(!isnan(score));

    x = zsl->header;

    for (i = zsl->level-1; i >= 0; i--) {

        /* store rank that is crossed to reach the insert position */

        //rank[i]初始化爲rank[i+1],因此rank[i]-rank[i+1]表示在i層走過的節點數

        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];

        while (x->level[i].forward &&

                (x->level[i].forward->score < score ||

                    (x->level[i].forward->score == score &&

                    sdscmp(x->level[i].forward->ele,ele) < 0)))

        {

            rank[i] += x->level[i].span;

            x = x->level[i].forward;

        }

        // 記錄將要和新節點相鏈接的節點,x表示新節點在i層鏈接的上一節點

        update[i] = x;

    }

    /* we assume the element is not already inside, since we allow duplicated

     * scores, reinserting the same element should never happen since the

     * caller of zslInsert() should test in the hash table if the element is

     * already inside or not. */

    level = zslRandomLevel();//隨機生成此節點的層數

    if (level > zsl->level) {

        for (i = zsl->level; i < level; i++) {

            rank[i] = 0;

            update[i] = zsl->header;

            update[i]->level[i].span = zsl->length;

        }

        zsl->level = level;

    }

    x = zslCreateNode(level,score,ele);

    for (i = 0; i < level; i++) {

        x->level[i].forward = update[i]->level[i].forward;

        update[i]->level[i].forward = x;

 

        /* update span covered by update[i] as x is inserted here */

        //rank[0]表示0層鏈表,插入節點x左邊的節點數

        //rank[i]表示i層鏈表,插入節點x左邊的節點數

        //rank[0] - rank[i]+1表示i層鏈表,x前一節點到x的跨度

        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);

        update[i]->level[i].span = (rank[0] - rank[i]) + 1;

    }

 

    /* increment span for untouched levels */

    //在level及之上的每層,update[i]到下一節點的距離因爲插入了x節點而加1

    for (i = level; i < zsl->level; i++) {

        update[i]->level[i].span++;

    }

    //更新後退指針

    x->backward = (update[0] == zsl->header) ? NULL : update[0];

    if (x->level[0].forward)

        x->level[0].forward->backward = x;

    else

        zsl->tail = x;

    zsl->length++;

    return x;

}

 

  與平衡樹(AVL、紅黑樹)比,skiplist有以下優勢,這也是redis使用跳錶作有序集合底層結構而不選用平衡樹的緣由。

  1. 佔用內存少。經過調節機率p,可使每一個節點的平均指針數發生變化,redis中爲1.33,而二叉樹每一個節點都有兩個指針。
  2. ZRANGE or ZREVRANGE等範圍查詢更簡單。Skiplist能夠看做特殊的雙向鏈表,只需找到範圍中的最小節點,順序遍歷便可,而平衡樹找到範圍中的最小節點,仍需中序遍歷。
  3. 和紅黑樹等比,skiplist實現和調試簡單。

參考文獻

  1. An introduction to Redis data types and abstractions
  2. Redis內部數據結構詳解(4)——ziplist
  3. Pugh W. Skip lists: a probabilistic alternative to balanced trees[J]. Communications of the ACM, 1990, 33(6): 668-677.
  4. Redis爲何用跳錶而不用平衡樹?
  5. Is there any particular reason you chose skip list instead of btrees except for simplicity? 
相關文章
相關標籤/搜索