Redis源碼分析-壓縮列表ziplist

// 文中引用的代碼來源於Redis3.2redis

前言

Redis是基於內存的nosql,有些場景下爲了節省內存redis會用「時間」換「空間」。
ziplist就是很典型的例子。sql

介紹

ziplist是list鍵、hash鍵以及zset鍵的底層實現之一(3.0以後list鍵已經不直接用ziplist和linkedlist做爲底層實現了,取而代之的是quicklist
這些鍵的常規底層實現以下:數組

  • list鍵:雙向鏈表
  • hash鍵:字典dict
  • zset鍵:跳躍表zskiplist

可是當list鍵裏包含的元素較少、而且每一個元素要麼是小整數要麼是長度較小的字符串時,redis將會用ziplist做爲list鍵的底層實現。同理hash和zset在這種場景下也會使用ziplist。數據結構

既然已有底層結構能夠實現list、hash、zset鍵,爲何還要用ziplist呢?
固然是爲了節省內存空間
咱們先來看看ziplist是如何壓縮的nosql

原理

總體佈局

ziplist是由一系列特殊編碼的連續內存塊組成的順序存儲結構,相似於數組,ziplist在內存中是連續存儲的,可是不一樣於數組,爲了節省內存 ziplist的每一個元素所佔的內存大小能夠不一樣(數組中叫元素,ziplist叫節點entry,下文都用「節點」),每一個節點能夠用來存儲一個整數或者一個字符串。
下圖是ziplist在內存中的佈局函數

圖1 總體佈局

  • zlbytes: ziplist的長度(單位: 字節),是一個32位無符號整數
  • zltail: ziplist最後一個節點的偏移量,反向遍歷ziplist或者pop尾部節點的時候有用。
  • zllen: ziplist的節點(entry)個數
  • entry: 節點
  • zlend: 值爲0xFF,用於標記ziplist的結尾

普通數組的遍歷是根據數組裏存儲的數據類型 找到下一個元素的,例如int類型的數組訪問下一個元素時每次只須要移動一個sizeof(int)就行(實際上開發者只需讓指針p+1就行,在這裏引入sizeof(int)只是爲了說明區別)。
上文說了,ziplist的每一個節點的長度是能夠不同的,而咱們面對不一樣長度的節點又不可能直接sizeof(entry),那麼它是怎麼訪問下一個節點呢?
ziplist將一些必要的偏移量信息記錄在了每個節點裏,使之能跳到上一個節點或下一個節點。
接下來咱們看看節點的佈局佈局

節點的佈局(entry)

每一個節點由三部分組成:prevlength、encoding、data性能

  • prevlengh: 記錄上一個節點的長度,爲了方便反向遍歷ziplist
  • encoding: 當前節點的編碼規則,下文會詳細說
  • data: 當前節點的值,能夠是數字或字符串

爲了節省內存,根據上一個節點的長度prevlength 能夠將ziplist節點分爲兩類:
圖2 entry佈局ui

  • entry的前8位小於254,則這8位就表示上一個節點的長度
  • entry的前8位等於254,則意味着上一個節點的長度沒法用8位表示,後面32位纔是真實的prevlength。用254 不用255(11111111)做爲分界是由於255是zlend的值,它用於判斷ziplist是否到達尾部。

根據當前節點存儲的數據類型及長度,能夠將ziplist節點分爲9類
其中整數節點分爲6類:
圖3 整數節點encoding部分this

整數節點的encoding的長度爲8位,其中高2位用來區分整數節點和字符串節點(高2位爲11時是整數節點),低6位用來區分整數節點的類型,定義以下:

#define ZIP_INT_16B (0xc0 | 0<<4)//整數data,佔16位(2字節)
#define ZIP_INT_32B (0xc0 | 1<<4)//整數data,佔32位(4字節)
#define ZIP_INT_64B (0xc0 | 2<<4)//整數data,佔64位(8字節)
#define ZIP_INT_24B (0xc0 | 3<<4)//整數data,佔24位(3字節)
#define ZIP_INT_8B 0xfe //整數data,佔8位(1字節)
/* 4 bit integer immediate encoding */
//整數值1~13的節點沒有data,encoding的低四位用來表示data
#define ZIP_INT_IMM_MASK 0x0f
#define ZIP_INT_IMM_MIN 0xf1    /* 11110001 */
#define ZIP_INT_IMM_MAX 0xfd    /* 11111101 */

值得注意的是 最後一種encoding是存儲整數0~12的節點的encoding,它沒有額外的data部分,encoding的高4位表示這個類型,低4位就是它的data。這種類型的節點的encoding大小介於ZIP_INT_24B與ZIP_INT_8B之間(1~13),可是爲了表示整數0,取出低四位xxxx以後會將其-1做爲實際的data值(0~12)。在函數zipLoadInteger中,咱們能夠看到這種類型節點的取值方法:

...
 } else if (encoding >= ZIP_INT_IMM_MIN && encoding <= ZIP_INT_IMM_MAX) {
        ret = (encoding & ZIP_INT_IMM_MASK)-1;
 }
...

字符串節點分爲3類:

圖4 字符串節點encoding部分

  • 當data小於63字節時(2^6),節點存爲上圖的第一種類型,高2位爲00,低6位表示data的長度。
  • 當data小於16383字節時(2^14),節點存爲上圖的第二種類型,高2位爲01,後續14位表示data的長度。
  • 當data小於4294967296字節時(2^32),節點存爲上圖的第二種類型,高2位爲10,下一字節起連續32位表示data的長度。

上圖能夠看出:
不一樣於整數節點encoding永遠是8位,字符串節點的encoding能夠有8位、16位、40位三種長度
相同encoding類型的整數節點 data長度是固定的,可是相同encoding類型的字符串節點,data長度取決於encoding後半部分的值。

#define ZIP_STR_06B (0 << 6)//字符串data,最多有2^6字節(encoding後半部分的length有6位,length決定data有多少字節)
#define ZIP_STR_14B (1 << 6)//字符串data,最多有2^14字節
#define ZIP_STR_32B (2 << 6)//字符串data,最多有2^32字節

上文介紹了ziplist節點(entry)的分類,知道了節點能夠細分爲9種類型,那麼當遍歷一個ziplist時,指針到達某個節點時 如何判斷出節點的類型從而找到data呢?

已知節點的位置,求data的值

根據圖2 entry佈局 能夠看出,若要算出data的偏移量,得先計算出prevlength所佔內存大小(1字節和5字節):

//根據ptr指向的entry,返回這個entry的prevlensize
#define ZIP_DECODE_PREVLENSIZE(ptr, prevlensize) do {                          \
    if ((ptr)[0] < ZIP_BIGLEN) {                                               \
        (prevlensize) = 1;                                                     \
    } else {                                                                   \
        (prevlensize) = 5;                                                     \
    }                                                                          \
} while(0);

接着再用ZIP_DECODE_LENGTH(ptr + prevlensize, encoding, lensize, len)算出encoding所佔的字節,返回給lensize;data所佔的字節返回給len

//根據ptr指向的entry求出該entry的len(encoding裏存的 data所佔字節)和lensize(encoding所佔的字節)
#define ZIP_DECODE_LENGTH(ptr, encoding, lensize, len) do {                    \
    ZIP_ENTRY_ENCODING((ptr), (encoding));                                     \
    if ((encoding) < ZIP_STR_MASK) {                                           \
        if ((encoding) == ZIP_STR_06B) {                                       \
            (lensize) = 1;                                                     \
            (len) = (ptr)[0] & 0x3f;                                           \
        } else if ((encoding) == ZIP_STR_14B) {                                \
            (lensize) = 2;                                                     \
            (len) = (((ptr)[0] & 0x3f) << 8) | (ptr)[1];                       \
        } else if (encoding == ZIP_STR_32B) {                                  \
            (lensize) = 5;                                                     \
            (len) = ((ptr)[1] << 24) |                                         \
                    ((ptr)[2] << 16) |                                         \
                    ((ptr)[3] <<  8) |                                         \
                    ((ptr)[4]);                                                \
        } else {                                                               \
            assert(NULL);                                                      \
        }                                                                      \
    } else {                                                                   \
        (lensize) = 1;                                                         \
        (len) = zipIntSize(encoding);                                          \
    }                                                                          \
} while(0);

//將ptr的encoding解析成1個字節:00000000、01000000、10000000(字符串類型)和11??????(整數類型)
//若是是整數類型,encoding直接照抄ptr的;若是是字符串類型,encoding被截斷成一個字節並清零後6位
#define ZIP_ENTRY_ENCODING(ptr, encoding) do {  \
    (encoding) = (ptr[0]); \
    if ((encoding) < ZIP_STR_MASK) (encoding) &= ZIP_STR_MASK; \
} while(0)

//根據encoding返回數據(整數)所佔字節數
unsigned int zipIntSize(unsigned char encoding) {
    switch(encoding) {
    case ZIP_INT_8B:  return 1;
    case ZIP_INT_16B: return 2;
    case ZIP_INT_24B: return 3;
    case ZIP_INT_32B: return 4;
    case ZIP_INT_64B: return 8;
    default: return 0; /* 4 bit immediate */
    }
    assert(NULL);
    return 0;
}

完成以上步驟以後,便可算出data的位置:ptr+prevlensize+lensize,以及data的長度len

ziplist接口

上文已經闡述了ziplist的底層內存佈局,接下來看看一些基本的增刪改查操做在ziplist中是如何執行的。

ziplistNew 建立一個ziplist O(1)

/* Create a new empty ziplist. */
unsigned char *ziplistNew(void) {
    unsigned int bytes = ZIPLIST_HEADER_SIZE+1;//<zlbytes>4字節<zltail>4字節<zllen>2字節<zlend>1字節,沒有entry節點
    unsigned char *zl = zmalloc(bytes);
    ZIPLIST_BYTES(zl) = intrev32ifbe(bytes);//<zlbytes>賦值
    ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(ZIPLIST_HEADER_SIZE);//<zltail>
    ZIPLIST_LENGTH(zl) = 0;//<zllen>
    zl[bytes-1] = ZIP_END;//<zlend>
    return zl;
}
#define ZIPLIST_HEADER_SIZE     (sizeof(uint32_t)*2+sizeof(uint16_t))//空ziplist除了<zlend>的大小
#define ZIPLIST_BYTES(zl)       (*((uint32_t*)(zl)))//<zlbyte>的指針的值,可讀可寫
#define ZIPLIST_TAIL_OFFSET(zl) (*((uint32_t*)((zl)+sizeof(uint32_t))))//<zltail>的指針的值
#define ZIPLIST_HEADER_SIZE     (sizeof(uint32_t)*2+sizeof(uint16_t))//空ziplist除了<zlend>的大小
#define ZIPLIST_LENGTH(zl)      (*((uint16_t*)((zl)+sizeof(uint32_t)*2)))//<zllen>的指針的值

參照着圖1理解會直觀些,分配了一塊內存並初始化<zlbytes><zltail><zllen><zlend>,沒有entry。

ziplistFind 從ziplist裏找出一個entry O(n)

//返回p節點以後data與vstr(長度是vlen)相等的節點,只找p節點以後每隔skip的節點
//時間複雜度 O(n)
unsigned char *ziplistFind(unsigned char *p, unsigned char *vstr, unsigned int vlen, unsigned int skip) {
    int skipcnt = 0;
    unsigned char vencoding = 0;
    long long vll = 0;

    while (p[0] != ZIP_END) {
        unsigned int prevlensize, encoding, lensize, len;
        unsigned char *q;

        ZIP_DECODE_PREVLENSIZE(p, prevlensize);
        ZIP_DECODE_LENGTH(p + prevlensize, encoding, lensize, len);
        q = p + prevlensize + lensize;//當前節點的data

        if (skipcnt == 0) {
            /* Compare current entry with specified entry */
            if (ZIP_IS_STR(encoding)) {//判斷當前節點是否是字符串節點
                if (len == vlen && memcmp(q, vstr, vlen) == 0) {
                    return p;
                }
            } else {
                /* Find out if the searched field can be encoded. Note that
                 * we do it only the first time, once done vencoding is set
                 * to non-zero and vll is set to the integer value. */
                if (vencoding == 0) {//這個代碼塊只會執行一次,計算vstr的整數表示
                    if (!zipTryEncoding(vstr, vlen, &vll, &vencoding)) {
                        //將參數給的節點vstr當作整數節點轉換;將data值返回給vll,節點編碼返回給vencoding
                        //進入這個代碼塊說明將vstr轉換成整數失敗,vencoding不變,下次判斷當前節點是整數節點以後能夠跳過這個節點
                        /* If the entry can't be encoded we set it to
                         * UCHAR_MAX so that we don't retry again the next
                         * time. */
                        vencoding = UCHAR_MAX;//當前節點是整數節點,可是vstr是字符串節點,跳過不用比較了
                    }
                    /* Must be non-zero by now */
                    assert(vencoding);
                }

                /* Compare current entry with specified entry, do it only
                 * if vencoding != UCHAR_MAX because if there is no encoding
                 * possible for the field it can't be a valid integer. */
                if (vencoding != UCHAR_MAX) {
                    long long ll = zipLoadInteger(q, encoding);//算出當前節點的data
                    if (ll == vll) {
                        return p;
                    }
                }
            }

            /* Reset skip count */
            skipcnt = skip;
        } else {
            /* Skip entry */
            skipcnt--;
        }

        /* Move to next entry */
        p = q + len;
    }

    return NULL;
}

//嘗試將entry地址的內容轉換成整數,並根據這個整數算出一個合適的encoding返回給encoding參數。
//若沒法轉換成整數,則encoding不變,返回0,等到下次調用zipEncodeLength時再計算一個該字符串的encoding
int zipTryEncoding(unsigned char *entry, unsigned int entrylen, long long *v, unsigned char *encoding) {
    long long value;

    if (entrylen >= 32 || entrylen == 0) return 0;
    if (string2ll((char*)entry,entrylen,&value)) {
        /* Great, the string can be encoded. Check what's the smallest
         * of our encoding types that can hold this value. */
        if (value >= 0 && value <= 12) {
            *encoding = ZIP_INT_IMM_MIN+value;
        } else if (value >= INT8_MIN && value <= INT8_MAX) {
            *encoding = ZIP_INT_8B;
        } else if (value >= INT16_MIN && value <= INT16_MAX) {
            *encoding = ZIP_INT_16B;
        } else if (value >= INT24_MIN && value <= INT24_MAX) {
            *encoding = ZIP_INT_24B;
        } else if (value >= INT32_MIN && value <= INT32_MAX) {
            *encoding = ZIP_INT_32B;
        } else {
            *encoding = ZIP_INT_64B;
        }
        *v = value;
        return 1;
    }
    return 0;
}

/* Read integer encoded as 'encoding' from 'p' */
int64_t zipLoadInteger(unsigned char *p, unsigned char encoding) {
    int16_t i16;
    int32_t i32;
    int64_t i64, ret = 0;
    if (encoding == ZIP_INT_8B) {
        ret = ((int8_t*)p)[0];
    } else if (encoding == ZIP_INT_16B) {
        memcpy(&i16,p,sizeof(i16));
        memrev16ifbe(&i16);
        ret = i16;
    } else if (encoding == ZIP_INT_32B) {
        memcpy(&i32,p,sizeof(i32));
        memrev32ifbe(&i32);
        ret = i32;
    } else if (encoding == ZIP_INT_24B) {
        i32 = 0;
        memcpy(((uint8_t*)&i32)+1,p,sizeof(i32)-sizeof(uint8_t));
        memrev32ifbe(&i32);
        ret = i32>>8;
    } else if (encoding == ZIP_INT_64B) {
        memcpy(&i64,p,sizeof(i64));
        memrev64ifbe(&i64);
        ret = i64;
    } else if (encoding >= ZIP_INT_IMM_MIN && encoding <= ZIP_INT_IMM_MAX) {
        ret = (encoding & ZIP_INT_IMM_MASK)-1;
    } else {
        assert(NULL);
    }
    return ret;
}

其餘接口

  • ziplistInsert 往ziplist裏插入一個entry 時間複雜度 平均:O(n), 最壞:O(n²)
  • ziplistDelete 從siplist裏刪除一個entry 時間複雜度 平均:O(n), 最壞:O(n²)

爲何插入節點和刪除節點兩個接口的最壞時間複雜度會是O(n²)呢?這是因爲ziplist的「連鎖更新」致使的,連鎖更新在最壞狀況下須要對ziplist執行n次空間重分配操做,並且每次空間重分配的最壞時間複雜度爲O(n) ----《Redis設計與實現》
可是出現「連鎖更新」的狀況並很少見,因此這裏基本不會形成性能問題。
篇幅有限這裏不能細說連鎖更新,感興趣能夠閱讀《Redis設計與實現》的相關章節以及ziplist.c裏的__ziplistCascadeUpdate()函數。

總結

  • ziplist是爲節省內存空間而生的。
  • ziplist是一個爲Redis專門提供的底層數據結構之一,自己能夠有序也能夠無序。看成爲listhash的底層實現時,節點之間沒有順序;看成爲zset的底層實現時,節點之間會按照大小順序排列。
相關文章
相關標籤/搜索