【Redis學習筆記】2018-06-27 incr、unlink命令

順風車運營研發團隊 閆昌
一. 樂觀鎖與悲觀鎖
悲觀鎖: 數據被外界修改保守態度(悲觀), 所以, 在整個數據處理過程當中, 將數據處理鎖定狀態. 實現方式: 在對任意記錄修改前, 先嚐試爲該記錄加上排他鎖, 若是加鎖失敗, 說明該記錄正在被修改, 當前查詢可能要等待或拋出異常, 若是成功加鎖, 那麼就能夠對記錄作修改
樂觀鎖: 樂觀鎖假設認爲數據通常狀況下不會形成衝突, 因此在數據進行提交更新的時候, 纔會正式對數據的衝突進行檢測, 若是發現衝突了, 則返回錯誤信息
二. incr命令是否使用了樂觀鎖或悲觀鎖
假設redis數據庫裏如今有一個key a的值爲10, 同一時刻有兩個redis客戶端(客戶端1, 客戶端2)對a進行了incr操做, 那麼a的值應該爲多少呢?
假設使用了樂觀鎖, 客戶端1和客戶端2同時獲取到了a, 且a此時先將值修改成了11, 那麼客戶端2此時設置值爲11時, 發現值已經成了11, 會報錯
假設使用了悲觀鎖, 客戶端1搶到a時會鎖定住, 客戶端2此時會搶不到鎖, 這樣能夠保證原子性
redis代碼中的incr實現:redis

void incrCommand(client *c) {
    incrDecrCommand(c,1);
}
 
void incrDecrCommand(client *c, long long incr) {
    long long value, oldvalue;
    robj *o, *new;
 
    o = lookupKeyWrite(c->db,c->argv[1]); //從數據庫中尋找須要修改的key
    if (o != NULL && checkType(c,o,OBJ_STRING)) return; //若是key的類型爲lis, set, zset, hash等, 則直接返回, 只有當key的類型爲string時才能夠繼續
    if (getLongLongFromObjectOrReply(c,o,&value,NULL) != C_OK) return;
 
    oldvalue = value;
    if ((incr < 0 && oldvalue < 0 && incr < (LLONG_MIN-oldvalue)) ||
        (incr > 0 && oldvalue > 0 && incr > (LLONG_MAX-oldvalue))) {//防止數據越界, incr > (LLONG_MAX-oldvalue) ===== incr+oldvalue>LLONG_MAX
        addReplyError(c,"increment or decrement would overflow");
        return;
    }
    value += incr; //直接將值相加
 
    if (o && o->refcount == 1 && o->encoding == OBJ_ENCODING_INT &&
        (value < 0 || value >= OBJ_SHARED_INTEGERS) &&
        value >= LONG_MIN && value <= LONG_MAX)
    {
        new = o;
        o->ptr = (void*)((long)value);
    } else {
        new = createStringObjectFromLongLong(value);
        if (o) {//直接寫入數據庫
            dbOverwrite(c->db,c->argv[1],new);
        } else {
            dbAdd(c->db,c->argv[1],new);
        }
    }
    signalModifiedKey(c->db,c->argv[1]);
    notifyKeyspaceEvent(NOTIFY_STRING,"incrby",c->argv[1],c->db->id);
    server.dirty++;
    addReply(c,shared.colon);
    addReply(c,new);
    addReply(c,shared.crlf);
}

結論: 從代碼中能夠看到, incr不用鎖來實現, 不保證原子性, 命令操做時, 直接對key加1操做數據庫

三. redis 4.0的線程異步

redis4.0啓動會默認起四個線程: 其中一個主線程, 其它三個子進程爲後臺線程, 須要主線程發信號給子進程, 子進程再處理相應的邏輯函數

root 43529 1 43529 0 4 16:10 ? 00:00:11 ./redis-server *:7777
root 43529 1 43530 0 4 16:10 ? 00:00:00 ./redis-server *:7777
root 43529 1 43531 0 4 16:10 ? 00:00:00 ./redis-server *:7777
root 43529 1 43532 0 4 16:10 ? 00:00:00 ./redis-server *:7777

四. unlink命令ui

unlink命令爲redis4.0新加的命令, 對一個key進行unlink操做, redis主進程會將此key交給子線程去異步刪除atom

//del和unlink命令底層調用的都爲delGenericCommand函數, 只是第二個參數不一樣
void delCommand(client *c) {
    delGenericCommand(c,0);
}
 
void unlinkCommand(client *c) {
    delGenericCommand(c,1);
}
//若是是unlink, 則調用dbAsyncDelete異步刪除.  若是是del, 則調用dbAsyncDelete同步刪除
void delGenericCommand(client *c, int lazy) {
    int numdel = 0, j;
 
    for (j = 1; j < c->argc; j++) {
        expireIfNeeded(c->db,c->argv[j]);
        int deleted  = lazy ? dbAsyncDelete(c->db,c->argv[j]) :
                              dbSyncDelete(c->db,c->argv[j]);
        if (deleted) {
            signalModifiedKey(c->db,c->argv[j]);
            notifyKeyspaceEvent(NOTIFY_GENERIC,
                "del",c->argv[j],c->db->id);
            server.dirty++;
            numdel++;
        }
    }
    addReplyLongLong(c,numdel);
}
int dbAsyncDelete(redisDb *db, robj *key) {
    if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);//若是key已通過期, 則直接從字典中將此key刪除
 
    dictEntry *de = dictUnlink(db->dict,key->ptr);//獲取要刪除的字典key=>val
    if (de) {
        robj *val = dictGetVal(de);
        size_t free_effort = lazyfreeGetFreeEffort(val);
 
        if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) {
            atomicIncr(lazyfree_objects,1);
            bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL);//會將要刪除的key發送給後臺子線程去處理
            dictSetVal(db->dict,de,NULL);//這裏將val設置爲了null
        }
    }
 
    if (de) {
        dictFreeUnlinkedEntry(db->dict,de);//刪除key和val, 由於val已經被前一步設置爲了null, 因此這一步至關於只刪除key
        if (server.cluster_enabled) slotToKeyDel(key);//若是是集羣模式, 須要將slot上的key刪除
        return 1;
    } else {
        return 0;
    }
}
dictEntry *dictUnlink(dict *ht, const void *key) {
    return dictGenericDelete(ht,key,1);
}
  
static dictEntry *dictGenericDelete(dict *d, const void *key, int nofree) {
    uint64_t h, idx;
    dictEntry *he, *prevHe;
    int table;
 
    if (d->ht[0].used == 0 && d->ht[1].used == 0) return NULL;
 
    if (dictIsRehashing(d)) _dictRehashStep(d);
    h = dictHashKey(d, key);
 
    for (table = 0; table <= 1; table++) {//防止正在rehash, 因此要遍歷兩個hash
        idx = h & d->ht[table].sizemask;
        he = d->ht[table].table[idx];
        prevHe = NULL;
        while(he) {//獲取字典中的key, val
            if (key==he->key || dictCompareKeys(d, key, he->key)) {
                /* Unlink the element from the list */
                if (prevHe)
                    prevHe->next = he->next;
                else
                    d->ht[table].table[idx] = he->next;
                if (!nofree) {
                    dictFreeKey(d, he);
                    dictFreeVal(d, he);
                    zfree(he);
                }
                d->ht[table].used--;
                return he;
            }
            prevHe = he;
            he = he->next;//hash裏key可能會衝突, 因此要日後遍歷
        }
        if (!dictIsRehashing(d)) break;
    }
    return NULL; /* not found */
}
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
    struct bio_job *job = zmalloc(sizeof(*job));
 
    job->time = time(NULL);
    job->arg1 = arg1;
    job->arg2 = arg2;
    job->arg3 = arg3;
    pthread_mutex_lock(&bio_mutex[type]);//開啓線程鎖
    listAddNodeTail(bio_jobs[type],job);//在要刪除的隊列中添加元素
    bio_pending[type]++;
    pthread_cond_signal(&bio_newjob_cond[type]);//向刪除元素的線程發送信號, 使其開始處理
    pthread_mutex_unlock(&bio_mutex[type]);//釋放線程鎖
}

整體流程圖:spa

clipboard.png

五. 關於slotToKeyDel函數線程

在集羣模式中, 一共有16384個slot, 每一個redis的key必定在一個slot裏, 當對key進行操做時, 經過CRC(key)&16383來肯定這個key屬於哪一個slot
在clusterState.slots_to_keys跳躍表每一個節點的分值都是一個槽號, 而每一個節點成員都是一個數據庫鍵
當往數據庫添加一個鍵時, 節點會將這個鍵以及鍵對應的槽號關聯到slots_to_keys中, 當刪除時, slots_to_keys會刪除健值的對應關係
因此在刪除一個key時, 若是是集羣模式, 會調用slotToKeyDel來刪除跳躍表中的對應關係code

相關文章
相關標籤/搜索