Redis 漸進式 rehash 源碼分析

rehash的做用

隨着咱們的redis操做不斷執行,哈希表保存的鍵值對會逐漸地增多或者減小,當字典內數據過大時,會致使更多的鍵衝突,形成查詢數據的成本增長。當數據減小時,已經分配的內存還在佔用,會形成內存浪費。爲了讓哈希表的負載因子維持在一個合理的範圍以內,程序須要對哈希表的大小進行相應的擴展或者收縮redis

rehash的原理

  • rehash:首先咱們看下字典、跟哈希表的結構定義
/* * 字典 */
typedef struct dict {

    // 類型特定函數
    dictType *type;

    // 私有數據
    void *privdata;

    // 哈希表
    dictht ht[2];

    // rehash 索引
    // 當 rehash 不在進行時,值爲 -1
    int rehashidx; /* rehashing not in progress if rehashidx == -1 */

    // 目前正在運行的安全迭代器的數量
    int iterators; /* number of iterators currently running */

} dict;
複製代碼
// 哈希表
typedef struct dictht {
    
    // 哈希表數組
    dictEntry **table;

    // 哈希表大小
    unsigned long size;
    
    // 哈希表大小掩碼,用於計算索引值
    // 老是等於 size - 1
    unsigned long sizemask;

    // 該哈希表已有節點的數量
    unsigned long used;

} dictht;
複製代碼

經過看結構定義咱們先大概瞭解到,判斷一個字段是否正在rehash經過判斷if rehashidx == -1, rehash是在ht[1]上重新分配內存,將ht[0]的數據遷移到ht[1]數據庫

  • 漸進式: rehash的過程不是一次完成的,而是在字典的讀寫操做,以及定時事件中每次完成必定量的遷移

擴容流程源碼分析

所以是字典相關的操做,而且擴容通常存在於須要設置鍵值的時候,所以咱們先直奔dict.c文件看下是否有Add或者Set字符串內容的函數,經過搜索跟代碼查看,咱們發現有一個函數的邏輯仍是挺像的數組

int dictAdd(dict *d, void *key, void *val) {
    // 嘗試添加鍵到字典,並返回包含了這個鍵的新哈希節點
    // T = O(N)
    dictEntry *entry = dictAddRaw(d,key);

    // todo ...
}
複製代碼

這個函數會調用dictAddRaw(...)方法給dict分配內存繼續查看dictAddRaw(...)的代碼安全

dictEntry *dictAddRaw(dict *d, void *key) {
    int index;
    dictEntry *entry;
    dictht *ht;

    // 若是條件容許的話,進行單步 rehash
    // T = O(1)
    if (dictIsRehashing(d)) _dictRehashStep(d);

    /* Get the index of the new element, or -1 if * the element already exists. */
    // 計算鍵在哈希表中的索引值
    // 若是值爲 -1 ,那麼表示鍵已經存在
    // T = O(N)
    if ((index = _dictKeyIndex(d, key)) == -1)
        return NULL;

    // T = O(1)
    /* Allocate the memory and store the new entry */
    // 若是字典正在 rehash ,那麼將新鍵添加到 1 號哈希表
    // 不然,將新鍵添加到 0 號哈希表
    ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
    // 爲新節點分配空間
    entry = zmalloc(sizeof(*entry));
    // 將新節點插入到鏈表表頭
    entry->next = ht->table[index];
    ht->table[index] = entry;
    // 更新哈希表已使用節點數量
    ht->used++;

    /* Set the hash entry fields. */
    // 設置新節點的鍵
    // T = O(1)
    dictSetKey(d, entry, key);

    return entry;
}
複製代碼

繼續查看_dictKeyIndex內的代碼函數

static int _dictKeyIndex(dict *d, const void *key)
{
    unsigned int h, idx, table;
    dictEntry *he;

    /* Expand the hash table if needed */
    // 單步 rehash
    // T = O(N)
    if (_dictExpandIfNeeded(d) == DICT_ERR)
        return -1;

        /* Compute the key hash value */
    // 計算 key 的哈希值
    h = dictHashKey(d, key);
    // T = O(1)
    for (table = 0; table <= 1; table++) {

        // 計算索引值
        idx = h & d->ht[table].sizemask;

        /* Search if this slot does not already contain the given key */
        // 查找 key 是否存在
        // T = O(1)
        he = d->ht[table].table[idx];
        while(he) {
            if (dictCompareKeys(d, key, he->key))
                return -1;
            he = he->next;
        }

        // 若是運行到這裏時,說明 0 號哈希表中全部節點都不包含 key
        // 若是這時 rehahs 正在進行,那麼繼續對 1 號哈希表進行 rehash
        if (!dictIsRehashing(d)) break;
    }

    // 返回索引值
    return idx;
}
複製代碼

經過查看上述代碼,咱們能夠發現幾處關鍵點,字典內索引值的計算時經過oop

// 計算 key 的哈希值
    h = dictHashKey(d, key);
    idx = h & d->ht[table].sizemask;
複製代碼

算出來的,同時咱們也能看到,當存在鍵衝突時,查找鍵的成本源碼分析

he = d->ht[table].table[idx];
        while(he) {
            if (dictCompareKeys(d, key, he->key))
                return -1;
            he = he->next;
        }
複製代碼

最關鍵的是_dictExpandIfNeeded經過函數名咱們就以爲這個跟擴容有關測試

static int _dictExpandIfNeeded(dict *d)
{
    /* Incremental rehashing already in progress. Return. */
    // 漸進式 rehash 已經在進行了,直接返回
    if (dictIsRehashing(d)) return DICT_OK;

    /* If the hash table is empty expand it to the initial size. */
    // 若是字典(的 0 號哈希表)爲空,那麼建立並返回初始化大小的 0 號哈希表
    // T = O(1)
    if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);

    /* If we reached the 1:1 ratio, and we are allowed to resize the hash * table (global setting) or we should avoid it but the ratio between * elements/buckets is over the "safe" threshold, we resize doubling * the number of buckets. */
    // 一下兩個條件之一爲真時,對字典進行擴展
    // 1)字典已使用節點數和字典大小之間的比率接近 1:1
    // 而且 dict_can_resize 爲真
    // 2)已使用節點數和字典大小之間的比率超過 dict_force_resize_ratio
    if (d->ht[0].used >= d->ht[0].size &&
        (dict_can_resize ||
         d->ht[0].used/d->ht[0].size > dict_force_resize_ratio))
    {
        // 新哈希表的大小至少是目前已使用節點數的兩倍
        // T = O(N)
        return dictExpand(d, d->ht[0].used*2);
    }

    return DICT_OK;
}
複製代碼

經過上述代碼能夠看到,最根本的內存分配操做是在_dictExpandIfNeeded(...)函數內執行的。該函數會判斷當哈希表上已使用鍵值數比分配內存大dict_force_resize_ratio(表明常量5)倍時,會從新分配內存,內存大小時原來已使用數的2倍this

總結

  • 整個源碼流程看完,咱們發如今執行dictAdd(...)向字典內增長鍵值時,會調用_dictExpandIfNeeded(...)查看ht[0].used/ht[0].size > 5是否爲true,若是是則從新分配內存,大小爲ht[0].used * 2
  • 在查看dictAddRaw(...)函數代碼時,有一處命令
// 若是條件容許的話,進行單步 rehash
    // T = O(1)
if (dictIsRehashing(d)) _dictRehashStep(d);
複製代碼

_dictRehashStep的做用是,執行一個鍵值從h[0]h[1]的遷移,在dict.c內搜索該函數,會發現跟dict相關的讀寫操做都會調用該函數,這也驗證rehahs的過程不是一步完成的,是漸進式的spa

收縮流程源碼分析

字典內存的收縮主要是在定時事件內,定時檢查,判斷,相關代碼以下

void databasesCron(void) {

    // todo ...
    
    // 在沒有 BGSAVE 或者 BGREWRITEAOF 執行時,對哈希表進行 rehash
    if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
        /* We use global counters so if we stop the computation at a given * DB we'll be able to start from the successive in the next * cron loop iteration. */
        static unsigned int resize_db = 0;
        static unsigned int rehash_db = 0;
        unsigned int dbs_per_call = REDIS_DBCRON_DBS_PER_CALL;
        unsigned int j;

        /* Don't test more DBs than we have. */
        // 設定要測試的數據庫數量
        if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum;

        /* Resize */
        // 調整字典的大小
        for (j = 0; j < dbs_per_call; j++) {
            tryResizeHashTables(resize_db % server.dbnum);
            resize_db++;
        }

        /* Rehash */
        // 對字典進行漸進式 rehash
        if (server.activerehashing) {
            for (j = 0; j < dbs_per_call; j++) {
                int work_done = incrementallyRehash(rehash_db % server.dbnum);
                rehash_db++;
                if (work_done) {
                    /* If the function did some work, stop here, we'll do * more at the next cron loop. */
                    break;
                }
            }
        }
    }
}
複製代碼

上述代碼除了循環和判斷外,有兩個比較特別的函數

  1. tryResizHashTables,相關源碼
void tryResizeHashTables(int dbid) {
    if (htNeedsResize(server.db[dbid].dict))
        dictResize(server.db[dbid].dict);
    if (htNeedsResize(server.db[dbid].expires))
        dictResize(server.db[dbid].expires);
}

//htNeedsResize
int htNeedsResize(dict *dict) {
    long long size, used;

    size = dictSlots(dict);
    used = dictSize(dict);
    return (size && used && size > DICT_HT_INITIAL_SIZE &&
            (used*100/size < REDIS_HT_MINFILL));
}
複製代碼

經過分析源碼,咱們能夠看到該函數會首先調用htNeedsResize,判斷used* 100 / size < REDIS_HT_MINFILL若是是true則會調用dictResize從新分配內存

  1. incrementallyRehash,相關源碼
int incrementallyRehash(int dbid) {

    /* Keys dictionary */
    if (dictIsRehashing(server.db[dbid].dict)) {
        dictRehashMilliseconds(server.db[dbid].dict,1);
        return 1; /* already used our millisecond for this loop... */
    }

    /* Expires */
    if (dictIsRehashing(server.db[dbid].expires)) {
        dictRehashMilliseconds(server.db[dbid].expires,1);
        return 1; /* already used our millisecond for this loop... */
    }

    return 0;
}


//dictRehashMillisecnods
 /* 在給定毫秒數內,以 100 步爲單位,對字典進行 rehash 。 * * T = O(N) */
int dictRehashMilliseconds(dict *d, int ms) {
    // 記錄開始時間
    long long start = timeInMilliseconds();
    int rehashes = 0;

    while(dictRehash(d,100)) {
        rehashes += 100;
        // 若是時間已過,跳出
        if (timeInMilliseconds()-start > ms) break;
    }

    return rehashes;
}
複製代碼

經過分析,能夠看出該函數的做用,是對正在rehash的字典,每次執行1毫秒,每次循環100次的哈希表數據遷移。

總結

  • 哈希表的收縮主要是在定時事件內執行
  • 當已使用的鍵值佔比不到分配內存的0.1時,就進行內存收縮
  • 除了在上面擴容中說到的在進行讀寫時會進行數據遷移,在定時件事內也會進行數據遷移。同時能夠避免,長時間沒有讀寫操做,數據一直沒法遷移的問題
相關文章
相關標籤/搜索