Redis源碼解析-基礎數據-qulicklist(快速列表)

太長不看版node

  • 快速列表是一個元素爲壓縮列表的雙向鏈表。
  • 快速列表是列表對象list的底層實現之一。
  • 快速列表是在Redis3.2版本中引入的。
  • 快速列表節點中壓縮列表的最大字節長度(配置項爲負數時)或最多元素個數(配置項爲正數時)由配置項 list-max-ziplist-size 決定,默認約束爲最大長度8Kb。
  • 快速列表提供了選項能夠使用LZF壓縮算法對中間的節點中的ziplist進行壓縮,列表兩邊多少節點不被壓縮由配置項 list-compress-depth 決定,默認對全部節點不進行壓縮。

本篇解析基於redis 5.0.0版本,本篇涉及源碼文件爲quicklist.c, quicklist.h, redic.conf。git

什麼是快速列表

快速列表是一個元素爲壓縮列表的雙向鏈表。 github

qulicklist是列表對象(list)的底層實現之一,是在Redis3.2中爲了兼顧空間效率與時間效率而引入的。 redis

壓縮列表很是的節省空間,可是當節點過多或者過大以後,由於修改元素時須要從新分配存儲空間致使效率會很是低下,因此只在節點較少且較小時使用。 壓縮列表相關戳這裏瞭解

雙向鏈表修改節點效率比較高(複雜度O(1)),可是數據結構元數據佔用的空間相對比較大(每一個節點有兩個指針佔用16個字節)。算法

快速列表在時間效率和空間效率之間作了折中,由雙向鏈表將若干個壓縮列表鏈接在一塊兒組成,相比於雙向鏈表更加節省空間,相比於壓縮列表操做更加的高效。數據結構

快速列表的結構定義

typedef struct quicklistNode {
    // 向前指針
    struct quicklistNode *prev;
    // 向後指針
    struct quicklistNode *next;
    // ziplist或者是被壓縮以後的ziplist(quicklistLZF)
    unsigned char *zl;
    // ziplist的字節長度,不受壓縮影響
    unsigned int sz;
    // ziplist中包含的元素個數
    unsigned int count : 16;     /* count of items in ziplist */
    // ziplist編碼標示,1爲原始數據,2爲被壓縮後數據
    unsigned int encoding : 2;   /* RAW==1 or LZF==2 */
    // 數據存儲方式(當前只有ziplist)
    unsigned int container : 2;  /* NONE==1 or ZIPLIST==2 */
    // 須要再次被壓縮標記,壓縮節點被解壓縮讀取後會置爲1
    unsigned int recompress : 1; /* was this node previous compressed? */
    // 測試用例使用
    unsigned int attempted_compress : 1; /* node can't compress; too small */
    // 預備字段
    unsigned int extra : 10; /* more bits to steal for future usage */
} quicklistNode;

typedef struct quicklistLZF {
    // ziplist被壓縮以後的字節長度
    unsigned int sz; /* LZF size in bytes*/
    // ziplist被壓縮後的內容
    char compressed[];
} quicklistLZF;

typedef struct quicklist {
    // 頭節點
    quicklistNode *head;
    // 尾節點
    quicklistNode *tail;
    // 快速列表中全部元素的個數
    unsigned long count; 
    // 快速列表節點(quicklistNode)個數
    unsigned long len;
    // 節點中ziplist長度的大小
    int fill : 16; 
    // 頭部開始或尾部開始 分別有幾個節點不被壓縮
    unsigned int compress : 16; 
} quicklist;
複製代碼

由於快速列表的常見的應用場景大可能是訪問兩邊的節點(例如:lpush, lpop, rpush, rpop等),爲了進一步的節省空間,快速列表提供了選項能夠使用LZF壓縮算法對中間的節點中的ziplist進行壓縮,節點壓縮後本來指向ziplist的sz指針元素指向quicklistLZF結構體。 快速列表兩邊分別有幾個節點不被壓縮有配置項 list-compress-depth 決定,存儲在qulicklist的compress元素中,默認爲0(不進行節點壓縮)。post

快速列表節點中壓縮列表的最大字節長度或最多元素個數由配置項 list-max-ziplist-size 決定,存儲在qulicklist的fill元素中。配置項爲正數時表示節點中壓縮列表最多元素個數,爲負數時表示節點中壓縮列表最大字節長度,默認約束爲快速列表節點中壓縮列表最大長度爲8Kb。測試

節點ziplist最大長度
-5 64Kb
-4 32Kb
-3 16Kb
-2 8Kb
-1 4Kb

在快速列表的結構體定義中,使用告終構體位域(即 unsigned int count : 16; 這種寫法),其實就是爲了節省空間某些元素只使用幾個bit位長度的空間。詳細的解釋戳這裏優化

快速列表相關操做

push操做

#ifdef __GNUC__
# define likely(x) __builtin_expect(!!(x), 1)
# define unlikely(x) __builtin_expect(!!(x), 0)
#else
# define likely(x) !!(x)
# define unlikely(x) !!(x)
#endif

int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) {
    quicklistNode *orig_head = quicklist->head;
    // 快速列表節點是否容許插入 大機率容許插入
    // 內部根據fill值進行ziplist字節長度或個數的約束檢測
    if (likely(
            _quicklistNodeAllowInsert(quicklist->head, quicklist->fill, sz))) {
        // 容許插入,插入元素到快速列表節點的ziplist頭部
        quicklist->head->zl =
            ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD);
        // 更新ziplist字節長度
        quicklistNodeUpdateSz(quicklist->head);
    } else {
        // ziplist字節長度或個數超限,則從新建立一個節點
        quicklistNode *node = quicklistCreateNode();
        // 元素插入新節點ziplist頭部
        node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);
        // 更新ziplist字節長度
        quicklistNodeUpdateSz(node);
        // 將新建節點插入頭節點以前
        _quicklistInsertNodeBefore(quicklist, quicklist->head, node);
    }
    quicklist->count++;
    quicklist->head->count++;
    return (orig_head != quicklist->head);
}

int quicklistPushTail(quicklist *quicklist, void *value, size_t sz) {
    quicklistNode *orig_tail = quicklist->tail;
    if (likely(
            _quicklistNodeAllowInsert(quicklist->tail, quicklist->fill, sz))) {
        // 容許插入,插入元素到快速列表節點的ziplist尾部
        quicklist->tail->zl =
            ziplistPush(quicklist->tail->zl, value, sz, ZIPLIST_TAIL);
        // 更新ziplist字節長度
        quicklistNodeUpdateSz(quicklist->tail);
    } else {
        quicklistNode *node = quicklistCreateNode();
        // 元素插入新節點ziplist尾部
        node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_TAIL);
        // 更新ziplist字節長度
        quicklistNodeUpdateSz(node);
        // 將新建節點插入尾節點以後
        _quicklistInsertNodeAfter(quicklist, quicklist->tail, node);
    }
    quicklist->count++;
    quicklist->tail->count++;
    return (orig_tail != quicklist->tail);
}
複製代碼

push操做頭插與尾插的步驟一致,都是根據fill元素信息判斷是否要新增快速列表節點,而後調用ziplistPush將value經過頭插或尾插方法插入到ziplist的頭部或尾部。ui

likely()和unlikely()宏是用來進行編譯優化引導的,lickly(a條件)就是告訴編譯器a條件大機率是真的,編譯器針對這一信息進行優化編譯提升CPU預取指令的正確率。unlikely()是大機率爲假。

likely()與unlikely()宏詳細解釋戳這裏瞭解

pop操做

int quicklistPop(quicklist *quicklist, int where, unsigned char **data,
                 unsigned int *sz, long long *slong) {
    unsigned char *vstr;
    unsigned int vlen;
    long long vlong;
    if (quicklist->count == 0)
        return 0;
    int ret = quicklistPopCustom(quicklist, where, &vstr, &vlen, &vlong,
                                 _quicklistSaver);
    if (data)
        *data = vstr;
    if (slong)
        *slong = vlong;
    if (sz)
        *sz = vlen;
    return ret;
}

int quicklistPopCustom(quicklist *quicklist, int where, unsigned char **data,
                       unsigned int *sz, long long *sval,
                       void *(*saver)(unsigned char *data, unsigned int sz)) {
    unsigned char *p;
    unsigned char *vstr;
    unsigned int vlen;
    long long vlong;
    int pos = (where == QUICKLIST_HEAD) ? 0 : -1;

    if (quicklist->count == 0)
        return 0;

    if (data)
        *data = NULL;
    if (sz)
        *sz = 0;
    if (sval)
        *sval = -123456789;

    quicklistNode *node;
    if (where == QUICKLIST_HEAD && quicklist->head) {
        node = quicklist->head;
    } else if (where == QUICKLIST_TAIL && quicklist->tail) {
        node = quicklist->tail;
    } else {
        return 0;
    }
    // 根據下標從ziplist中獲取元素
    p = ziplistIndex(node->zl, pos);
    if (ziplistGet(p, &vstr, &vlen, &vlong)) {
        if (vstr) {
            if (data)
                *data = saver(vstr, vlen);
            if (sz)
                *sz = vlen;
        } else {
            if (data)
                *data = NULL;
            if (sval)
                *sval = vlong;
        }
        刪除該元素
        quicklistDelIndex(quicklist, node, &p);
        return 1;
    }
    return 0;
}
複製代碼

pop操做相對比較簡單,由於存儲了頭節點和尾節點,因此不管是頭部pop仍是尾部pop均可以直接獲取到對應的ziplist,而後取出ziplist頭部value或者尾部value後進行元素刪除。

根據下標獲取元素

int quicklistIndex(const quicklist *quicklist, const long long idx, quicklistEntry *entry) {
    quicklistNode *n;
    unsigned long long accum = 0;
    unsigned long long index;
    int forward = idx < 0 ? 0 : 1; /* < 0 -> reverse, 0+ -> forward */

    initEntry(entry);
    entry->quicklist = quicklist;

    if (!forward) {
        index = (-idx) - 1;
        n = quicklist->tail;
    } else {
        index = idx;
        n = quicklist->head;
    }

    if (index >= quicklist->count)
        return 0;

    while (likely(n)) {
        // 遍歷快速列表節點,找到目標下標元素所處的節點
        if ((accum + n->count) > index) {
            break;
        } else {
            D("Skipping over (%p) %u at accum %lld", (void *)n, n->count,
              accum);
            accum += n->count;
            n = forward ? n->next : n->prev;
        }
    }

    if (!n)
        return 0;

    D("Found node: %p at accum %llu, idx %llu, sub+ %llu, sub- %llu", (void *)n,
      accum, index, index - accum, (-index) - 1 + accum);

    entry->node = n;
    // 計算目標元素 ziplist內的偏移量
    if (forward) {
        /* forward = normal head-to-tail offset. */
        // 正向遍歷
        entry->offset = index - accum;
    } else {
        /* reverse = need negative offset for tail-to-head, so undo * the result of the original if (index < 0) above. */
        // 反向遍歷
        entry->offset = (-index) - 1 + accum;
    }
    // 如有須要則進行節點解壓
    quicklistDecompressNodeForUse(entry->node);
    // 根據元素在ziplist中的下標獲取元素
    entry->zi = ziplistIndex(entry->node->zl, entry->offset);
    ziplistGet(entry->zi, &entry->value, &entry->sz, &entry->longval);
    /* The caller will use our result, so we don't re-compress here. * The caller can recompress or delete the node as needed. */
    return 1;
}
複製代碼

根據下標獲取元素,經過遍歷快速列表節點找到元素所屬的節點,拿到對應的ziplist,若是節點被壓縮則進行解壓,而後計算出元素在ziplist中的下標拿到元素。假設快速列表有N個節點,每一個節點的ziplist有M個元素,則根據下標獲取元素的複雜度爲O(N + M)。

相關文章
相關標籤/搜索