Redis(四):del/unlink 命令源碼解析

  上一篇文章從根本上理解了set/get的處理過程,至關於理解了 增、改、查的過程,如今就差一個刪了。本篇咱們來看一下刪除過程。node

  對於客戶端來講,刪除操做無需區分何種數據類型,只管進行 del 操做便可。redis

 

零、刪除命令 del 的定義算法

  主要有兩個: del/unlink, 差異是 unlink 速度會更快, 由於其使用了異步刪除優化模式, 其定義以下:安全

    // 標識只有一個 w, 說明就是一個普通的寫操做,沒啥好說的
    {"del",delCommand,-2,"w",0,NULL,1,-1,1,0,0}
    // 標識爲 wF, 說明它是一個快速寫的操做,其實就是有一個異步優化的過程,稍後詳解
    {"unlink",unlinkCommand,-2,"wF",0,NULL,1,-1,1,0,0}

 

1、delCommand數據結構

  delCommand 的做用就是直接刪除某個 key 的數據,釋放內存便可。框架

// db.c, del 命令處理    
void delCommand(client *c) {
    // 同步刪除
    delGenericCommand(c,0);
}

/* This command implements DEL and LAZYDEL. */
void delGenericCommand(client *c, int lazy) {
    int numdel = 0, j;

    for (j = 1; j < c->argc; j++) {
        // 自動過時數據清理
        expireIfNeeded(c->db,c->argv[j]);
        // 此處分同步刪除和異步刪除, 主要差異在於對於複雜數據類型的刪除方面,如hash,list,set...
        // 針對 string 的刪除是徹底同樣的
        int deleted  = lazy ? dbAsyncDelete(c->db,c->argv[j]) :
                              dbSyncDelete(c->db,c->argv[j]);
        // 寫命令的傳播問題
        if (deleted) {
            signalModifiedKey(c->db,c->argv[j]);
            notifyKeyspaceEvent(NOTIFY_GENERIC,
                "del",c->argv[j],c->db->id);
            server.dirty++;
            numdel++;
        }
    }
    // 響應刪除數據量, 粒度到 key 級別
    addReplyLongLong(c,numdel);
}

  框架代碼一看即明,只是相比於咱們普通的刪除是多了很多事情。不然也不存在設計了。less

 

2、unlinkCommand異步

  以下,其實和del是一毛同樣的,僅是變化了一個 lazy 標識而已。函數

// db.c, unlink 刪除處理
void unlinkCommand(client *c) {
    // 與 del 一致,只是 lazy 標識不同
    delGenericCommand(c,1);
}

 

3、刪除數據過程詳解工具

  刪除數據分同步和異步兩種實現方式,道理都差很少,只是一個是後臺刪一個是前臺刪。咱們分別來看看。

1. 同步刪除 dbSyncDelete

  同步刪除很簡單,只要把對應的key刪除,val刪除就好了,若是有內層引用,則進行遞歸刪除便可。

// db.c, 同步刪除數據
/* Delete a key, value, and associated expiration entry if any, from the DB */
int dbSyncDelete(redisDb *db, robj *key) {
    /* Deleting an entry from the expires dict will not free the sds of
     * the key, because it is shared with the main dictionary. */
    // 首先從 expires 隊列刪除,而後再從 db->dict 中刪除
    if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
    if (dictDelete(db->dict,key->ptr) == DICT_OK) {
        if (server.cluster_enabled) slotToKeyDel(key);
        return 1;
    } else {
        return 0;
    }
}
// dict.c, 如上, 僅僅是 dictDelete() 就能夠了,因此真正的刪除動做是在 dict 中實現的。
int dictDelete(dict *ht, const void *key) {
    // nofree: 0, 即要求釋放內存
    return dictGenericDelete(ht,key,0);
}
// dict.c, nofree: 0:要釋放相應的val內存, 1:不釋放相應val內存只刪除key
/* Search and remove an element */
static int dictGenericDelete(dict *d, const void *key, int nofree)
{
    unsigned int h, idx;
    dictEntry *he, *prevHe;
    int table;

    if (d->ht[0].size == 0) return DICT_ERR; /* d->ht[0].table is NULL */
    if (dictIsRehashing(d)) _dictRehashStep(d);
    h = dictHashKey(d, key);
    // ht[0] 和 ht[1] 若有可能都進行掃描
    for (table = 0; table <= 1; table++) {
        idx = h & d->ht[table].sizemask;
        he = d->ht[table].table[idx];
        prevHe = NULL;
        while(he) {
            if (dictCompareKeys(d, key, he->key)) {
                /* Unlink the element from the list */
                if (prevHe)
                    prevHe->next = he->next;
                else
                    d->ht[table].table[idx] = he->next;
                // no nofree, 就是要 free 內存咯
                if (!nofree) {
                    // 看起來 key/value 須要單獨釋放內存哦
                    dictFreeKey(d, he);
                    dictFreeVal(d, he);
                }
                zfree(he);
                d->ht[table].used--;
                return DICT_OK;
            }
            prevHe = he;
            he = he->next;
        }
        // 若是沒有進行 rehashing, 只需掃描0就好了
        if (!dictIsRehashing(d)) break;
    }
    return DICT_ERR; /* not found */
}

  其實對於有GC收集器的語言來講,根本不用關注內存的釋放問題,自有後臺工具處理,然而對於 c 語言這種級別語言,則是須要自行關注內存的。這也是本文存在的意義,否則對於一個 hash 表的元素刪除操做,如上很難嗎?並無。

  下面,咱們就來看看 redis 是如何具體釋放內存的吧。

// dict.h, 釋放key, value 的邏輯也是很是簡單,用一個宏就定義好了
// 釋放依賴於 keyDestructor, valDestructor
#define dictFreeKey(d, entry) \
    if ((d)->type->keyDestructor) \
        (d)->type->keyDestructor((d)->privdata, (entry)->key)
#define dictFreeVal(d, entry) \
    if ((d)->type->valDestructor) \
        (d)->type->valDestructor((d)->privdata, (entry)->v.val)
// 因此,咱們有必要回去看看 key,value 的析構方法
// 而這,又依賴於具體的數據類型,也就是你在 setXXX 的時候用到的數據類型
// 咱們看一下這個 keyDestructor,valDestructor 初始化的樣子
// server.c  kv的析構函數定義
/* Db->dict, keys are sds strings, vals are Redis objects. */
dictType dbDictType = {
    dictSdsHash,                /* hash function */
    NULL,                       /* key dup */
    NULL,                       /* val dup */
    dictSdsKeyCompare,          /* key compare */
    dictSdsDestructor,          /* key destructor */
    dictObjectDestructor   /* val destructor */
};

// 1. 先看看 key destructor, key 的釋放
// server.c, 直接調用 sds 提供的服務便可
void dictSdsDestructor(void *privdata, void *val)
{
    DICT_NOTUSED(privdata);
    // sds 直接釋放key就好了
    sdsfree(val);
}
// sds.c, 真正釋放 value 內存
/* Free an sds string. No operation is performed if 's' is NULL. */
void sdsfree(sds s) {
    if (s == NULL) return;
    // zfree, 確實很簡單嘛, 由於 sds 是連續的內存空間,直接使用系統提供的方法便可刪除
    s_free((char*)s-sdsHdrSize(s[-1]));
}

// 2. value destructor 對value的釋放, 若是說 key 必定是string格式的話,value可主不必定了,由於 redis提供豐富的數據類型呢
// server.c
void dictObjectDestructor(void *privdata, void *val)
{
    DICT_NOTUSED(privdata);

    if (val == NULL) return; /* Lazy freeing will set value to NULL. */
    decrRefCount(val);
}
// 減小 value 的引用計數
void decrRefCount(robj *o) {
    if (o->refcount == 1) {
        switch(o->type) {
            // string 類型
            case OBJ_STRING: freeStringObject(o); break;
            // list 類型
            case OBJ_LIST: freeListObject(o); break;
            // set 類型
            case OBJ_SET: freeSetObject(o); break;
            // zset 類型
            case OBJ_ZSET: freeZsetObject(o); break;
            // hash 類型
            case OBJ_HASH: freeHashObject(o); break;
            default: serverPanic("Unknown object type"); break;
        }
        zfree(o);
    } else {
        if (o->refcount <= 0) serverPanic("decrRefCount against refcount <= 0");
        if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount--;
    }
}

  額,能夠看出,對key的釋放天然是簡單之極。而對 value 則謹慎許多,首先它表面上只對引用作減操做。只有發只剩下1個引用即只有當前引用的狀況下,本次釋放就是最後一次釋放,因此纔會回收內存。

// 在介紹不一樣數據類型的內存釋放前,咱們能夠先來看下每一個元素的數據結構
// dict.h
typedef struct dictEntry {
    // 存儲 key 字段內容
    void *key;
    // 用一個聯合體存儲value
    union {
        // 存儲數據時使用 *val 存儲
        void *val;
        uint64_t u64;
        // 存儲過時時間時使用該字段
        int64_t s64;
        // 存儲 score 時使用
        double d;
    } v;
    // 存在hash衝突時,做鏈表使用
    struct dictEntry *next;
} dictEntry;

// 1. string 類型的釋放
// object.c
void freeStringObject(robj *o) {
    // 直接調用 sds服務釋放
    if (o->encoding == OBJ_ENCODING_RAW) {
        sdsfree(o->ptr);
    }
}

// 2. list 類型的釋放
// object.c
void freeListObject(robj *o) {
    switch (o->encoding) {
    case OBJ_ENCODING_QUICKLIST:
        quicklistRelease(o->ptr);
        break;
    default:
        serverPanic("Unknown list encoding type");
    }
}
// quicklist.c
/* Free entire quicklist. */
void quicklistRelease(quicklist *quicklist) {
    unsigned long len;
    quicklistNode *current, *next;

    current = quicklist->head;
    len = quicklist->len;
    // 鏈表依次迭代就能夠釋放完成了
    while (len--) {
        next = current->next;
        // 釋放list具體值
        zfree(current->zl);
        quicklist->count -= current->count;
        // 釋放list對象
        zfree(current);

        quicklist->len--;
        current = next;
    }
    zfree(quicklist);
}

// 3. set 類型的釋放
// object.c, set 分兩種類型, ht, intset
void freeSetObject(robj *o) {
    switch (o->encoding) {
    case OBJ_ENCODING_HT:
        // hash 類型則須要刪除每一個 hash 的 kv
        dictRelease((dict*) o->ptr);
        break;
    case OBJ_ENCODING_INTSET:
        // intset 直接釋放
        zfree(o->ptr);
        break;
    default:
        serverPanic("Unknown set encoding type");
    }
}
// dict.c, 
/* Clear & Release the hash table */
void dictRelease(dict *d)
{
    // ht[0],ht[1] 依次清理
    _dictClear(d,&d->ht[0],NULL);
    _dictClear(d,&d->ht[1],NULL);
    zfree(d);
}
// dict.c, 
/* Destroy an entire dictionary */
int _dictClear(dict *d, dictht *ht, void(callback)(void *)) {
    unsigned long i;

    /* Free all the elements */
    for (i = 0; i < ht->size && ht->used > 0; i++) {
        dictEntry *he, *nextHe;

        if (callback && (i & 65535) == 0) callback(d->privdata);
        // 元素爲空,hash未命中,但只要 used > 0, 表明就還有須要刪除的元素存在
        // 其實對於只有少數幾個元素的狀況下,這個效率就呵呵了
        if ((he = ht->table[i]) == NULL) continue;
        while(he) {
            nextHe = he->next;
            // 這裏的釋放 kv 邏輯和前面是一致的
            // 看起來像是遞歸,其實否則,由於redis不存在數據類型嵌套問題,好比 hash下存儲hash, 因此不會存在遞歸
            // 具體結構會在後續解讀到
            dictFreeKey(d, he);
            dictFreeVal(d, he);
            zfree(he);
            ht->used--;
            he = nextHe;
        }
    }
    /* Free the table and the allocated cache structure */
    zfree(ht->table);
    /* Re-initialize the table */
    _dictReset(ht);
    return DICT_OK; /* never fails */
}

// 4. hash 類型的釋放
// object.c, hash和set實際上是很類似的,代碼也作了大量的複用
void freeHashObject(robj *o) {
    switch (o->encoding) {
    case OBJ_ENCODING_HT:
        // ht 形式與set一致
        dictRelease((dict*) o->ptr);
        break;
    case OBJ_ENCODING_ZIPLIST:
        // ziplist 直接釋放
        zfree(o->ptr);
        break;
    default:
        serverPanic("Unknown hash encoding type");
        break;
    }
}

// 5. zset 類型的釋放
// object.c, zset 的存儲形式與其餘幾個
void freeZsetObject(robj *o) {
    zset *zs;
    switch (o->encoding) {
    case OBJ_ENCODING_SKIPLIST:
        zs = o->ptr;
        // 釋放dict 數據, ht 0,1 的釋放
        dictRelease(zs->dict);
        // 釋放skiplist 數據, 主要看下這個
        zslFree(zs->zsl);
        zfree(zs);
        break;
    case OBJ_ENCODING_ZIPLIST:
        zfree(o->ptr);
        break;
    default:
        serverPanic("Unknown sorted set encoding");
    }
}
// t_zset.c, 釋放跳錶數據
/* Free a whole skiplist. */
void zslFree(zskiplist *zsl) {
    zskiplistNode *node = zsl->header->level[0].forward, *next;

    zfree(zsl->header);
    while(node) {
        // 基於第0層數據釋放,也基於第0層作迭代,直到刪除完成
        // 由於其餘層數據都是引用的第0層的數據,因此釋放時無需關注
        next = node->level[0].forward;
        zslFreeNode(node);
        node = next;
    }
    zfree(zsl);
}
// t_zset 也很簡單,只是把 node.ele 釋放掉,再把自身釋放到便可
// 這樣的刪除方式依賴於其存儲結構,我們後續再聊
/* Free the specified skiplist node. The referenced SDS string representation
 * of the element is freed too, unless node->ele is set to NULL before calling
 * this function. */
void zslFreeNode(zskiplistNode *node) {
    sdsfree(node->ele);
    zfree(node);
}

 

2. 異步刪除過程

  異步刪除按理說會更復雜,更有意思些。只不過咱們前面已經把核心的東西擼了個遍,這剩下的也很少了。

// lazyfree.c, 
int dbAsyncDelete(redisDb *db, robj *key) {
    /* Deleting an entry from the expires dict will not free the sds of
     * the key, because it is shared with the main dictionary. */
    if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);

    /* If the value is composed of a few allocations, to free in a lazy way
     * is actually just slower... So under a certain limit we just free
     * the object synchronously. */
    dictEntry *de = dictFind(db->dict,key->ptr);
    if (de) {
        robj *val = dictGetVal(de);
        size_t free_effort = lazyfreeGetFreeEffort(val);

        /* If releasing the object is too much work, let's put it into the
         * lazy free list. */
        // 其實異步方法與同步方法的差異在這,即要求 刪除的元素影響須大於某閥值(64)
        // 不然按照同步方式直接刪除,由於那樣代價更小
        if (free_effort > LAZYFREE_THRESHOLD) {
            // 異步釋放+1,原子操做
            atomicIncr(lazyfree_objects,1,&lazyfree_objects_mutex);
            // 將 value 的釋放添加到異步線程隊列中去,後臺處理, 任務類型爲 異步釋放內存
            bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL);
            // 設置val爲NULL, 以便在外部進行刪除時忽略釋放value相關內存
            dictSetVal(db->dict,de,NULL);
        }
    }

    /* Release the key-val pair, or just the key if we set the val
     * field to NULL in order to lazy free it later. */
    if (dictDelete(db->dict,key->ptr) == DICT_OK) {
        if (server.cluster_enabled) slotToKeyDel(key);
        return 1;
    } else {
        return 0;
    }
}
// bio.c, 添加異步任務到線程中, 類型由type決定,線程安全地添加
// 而後嘛,後臺線程就不會停地運行了任務了
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
    struct bio_job *job = zmalloc(sizeof(*job));

    job->time = time(NULL);
    job->arg1 = arg1;
    job->arg2 = arg2;
    job->arg3 = arg3;
    // 上鎖操做
    pthread_mutex_lock(&bio_mutex[type]);
    listAddNodeTail(bio_jobs[type],job);
    bio_pending[type]++;
    // 喚醒任務線程
    pthread_cond_signal(&bio_newjob_cond[type]);
    pthread_mutex_unlock(&bio_mutex[type]);
}
// bio.c, 後臺線程任務框架,總之仍是有事情可作了。
void *bioProcessBackgroundJobs(void *arg) {
    struct bio_job *job;
    unsigned long type = (unsigned long) arg;
    sigset_t sigset;

    /* Check that the type is within the right interval. */
    if (type >= BIO_NUM_OPS) {
        serverLog(LL_WARNING,
            "Warning: bio thread started with wrong type %lu",type);
        return NULL;
    }

    /* Make the thread killable at any time, so that bioKillThreads()
     * can work reliably. */
    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
    pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);

    pthread_mutex_lock(&bio_mutex[type]);
    /* Block SIGALRM so we are sure that only the main thread will
     * receive the watchdog signal. */
    sigemptyset(&sigset);
    sigaddset(&sigset, SIGALRM);
    if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))
        serverLog(LL_WARNING,
            "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));
    // 任務一直運行
    while(1) {
        listNode *ln;

        /* The loop always starts with the lock hold. */
        if (listLength(bio_jobs[type]) == 0) {
            // 注意此處將會釋放鎖喲,以便外部能夠添加任務進來
            pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);
            continue;
        }
        /* Pop the job from the queue. */
        ln = listFirst(bio_jobs[type]);
        job = ln->value;
        /* It is now possible to unlock the background system as we know have
         * a stand alone job structure to process.*/
        pthread_mutex_unlock(&bio_mutex[type]);

        /* Process the job accordingly to its type. */
        if (type == BIO_CLOSE_FILE) {
            close((long)job->arg1);
        } else if (type == BIO_AOF_FSYNC) {
            aof_fsync((long)job->arg1);
        } 
        // 也就是這玩意了,會去處理提交過來的任務
        else if (type == BIO_LAZY_FREE) {
            /* What we free changes depending on what arguments are set:
             * arg1 -> free the object at pointer.
             * arg2 & arg3 -> free two dictionaries (a Redis DB).
             * only arg3 -> free the skiplist. */
            // 本文介紹的刪除value形式,用第一種狀況
            if (job->arg1)
                lazyfreeFreeObjectFromBioThread(job->arg1);
            else if (job->arg2 && job->arg3)
                lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3);
            else if (job->arg3)
                lazyfreeFreeSlotsMapFromBioThread(job->arg3);
        } else {
            serverPanic("Wrong job type in bioProcessBackgroundJobs().");
        }
        zfree(job);

        /* Unblock threads blocked on bioWaitStepOfType() if any. */
        // 喚醒全部相關等待線程
        pthread_cond_broadcast(&bio_step_cond[type]);

        /* Lock again before reiterating the loop, if there are no longer
         * jobs to process we'll block again in pthread_cond_wait(). */
        pthread_mutex_lock(&bio_mutex[type]);
        listDelNode(bio_jobs[type],ln);
        bio_pending[type]--;
    }
}

// lazyfree.c, 和同步刪除一致了
/* Release objects from the lazyfree thread. It's just decrRefCount()
 * updating the count of objects to release. */
void lazyfreeFreeObjectFromBioThread(robj *o) {
    decrRefCount(o);
    atomicDecr(lazyfree_objects,1,&lazyfree_objects_mutex);
}

  今後處redis異步處理過程,咱們能夠看到,redis並非每次進入異步時都進行異步操做,而是在必要的時候纔會進行。這也提示咱們,不要爲了異步而異步,而是應該計算利弊。

 

  如此,整個 del/unlink 的過程就完成了。整體來講,刪除都是基於hash的簡單remove而已,惟一有點難度是對內存的回收問題,這其實就是一個簡單的使用引用計數器算法實現的垃圾回收器應該作的事而已。勿須多言。具體過程需依賴於數據的存儲結構,主要目的天然是遞歸釋放空間,避免內存泄漏了。

相關文章
相關標籤/搜索