順風車運營研發團隊 閆昌
一. 樂觀鎖與悲觀鎖
悲觀鎖: 數據被外界修改保守態度(悲觀), 所以, 在整個數據處理過程當中, 將數據處理鎖定狀態. 實現方式: 在對任意記錄修改前, 先嚐試爲該記錄加上排他鎖, 若是加鎖失敗, 說明該記錄正在被修改, 當前查詢可能要等待或拋出異常, 若是成功加鎖, 那麼就能夠對記錄作修改
樂觀鎖: 樂觀鎖假設認爲數據通常狀況下不會形成衝突, 因此在數據進行提交更新的時候, 纔會正式對數據的衝突進行檢測, 若是發現衝突了, 則返回錯誤信息
二. incr命令是否使用了樂觀鎖或悲觀鎖
假設redis數據庫裏如今有一個key a的值爲10, 同一時刻有兩個redis客戶端(客戶端1, 客戶端2)對a進行了incr操做, 那麼a的值應該爲多少呢?
假設使用了樂觀鎖, 客戶端1和客戶端2同時獲取到了a, 且a此時先將值修改成了11, 那麼客戶端2此時設置值爲11時, 發現值已經成了11, 會報錯
假設使用了悲觀鎖, 客戶端1搶到a時會鎖定住, 客戶端2此時會搶不到鎖, 這樣能夠保證原子性
redis代碼中的incr實現:redis
void incrCommand(client *c) { incrDecrCommand(c,1); } void incrDecrCommand(client *c, long long incr) { long long value, oldvalue; robj *o, *new; o = lookupKeyWrite(c->db,c->argv[1]); //從數據庫中尋找須要修改的key if (o != NULL && checkType(c,o,OBJ_STRING)) return; //若是key的類型爲lis, set, zset, hash等, 則直接返回, 只有當key的類型爲string時才能夠繼續 if (getLongLongFromObjectOrReply(c,o,&value,NULL) != C_OK) return; oldvalue = value; if ((incr < 0 && oldvalue < 0 && incr < (LLONG_MIN-oldvalue)) || (incr > 0 && oldvalue > 0 && incr > (LLONG_MAX-oldvalue))) {//防止數據越界, incr > (LLONG_MAX-oldvalue) ===== incr+oldvalue>LLONG_MAX addReplyError(c,"increment or decrement would overflow"); return; } value += incr; //直接將值相加 if (o && o->refcount == 1 && o->encoding == OBJ_ENCODING_INT && (value < 0 || value >= OBJ_SHARED_INTEGERS) && value >= LONG_MIN && value <= LONG_MAX) { new = o; o->ptr = (void*)((long)value); } else { new = createStringObjectFromLongLong(value); if (o) {//直接寫入數據庫 dbOverwrite(c->db,c->argv[1],new); } else { dbAdd(c->db,c->argv[1],new); } } signalModifiedKey(c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_STRING,"incrby",c->argv[1],c->db->id); server.dirty++; addReply(c,shared.colon); addReply(c,new); addReply(c,shared.crlf); }
結論: 從代碼中能夠看到, incr不用鎖來實現, 不保證原子性, 命令操做時, 直接對key加1操做數據庫
三. redis 4.0的線程異步
redis4.0啓動會默認起四個線程: 其中一個主線程, 其它三個子進程爲後臺線程, 須要主線程發信號給子進程, 子進程再處理相應的邏輯函數
root 43529 1 43529 0 4 16:10 ? 00:00:11 ./redis-server *:7777 root 43529 1 43530 0 4 16:10 ? 00:00:00 ./redis-server *:7777 root 43529 1 43531 0 4 16:10 ? 00:00:00 ./redis-server *:7777 root 43529 1 43532 0 4 16:10 ? 00:00:00 ./redis-server *:7777
四. unlink命令ui
unlink命令爲redis4.0新加的命令, 對一個key進行unlink操做, redis主進程會將此key交給子線程去異步刪除atom
//del和unlink命令底層調用的都爲delGenericCommand函數, 只是第二個參數不一樣 void delCommand(client *c) { delGenericCommand(c,0); } void unlinkCommand(client *c) { delGenericCommand(c,1); }
//若是是unlink, 則調用dbAsyncDelete異步刪除. 若是是del, 則調用dbAsyncDelete同步刪除 void delGenericCommand(client *c, int lazy) { int numdel = 0, j; for (j = 1; j < c->argc; j++) { expireIfNeeded(c->db,c->argv[j]); int deleted = lazy ? dbAsyncDelete(c->db,c->argv[j]) : dbSyncDelete(c->db,c->argv[j]); if (deleted) { signalModifiedKey(c->db,c->argv[j]); notifyKeyspaceEvent(NOTIFY_GENERIC, "del",c->argv[j],c->db->id); server.dirty++; numdel++; } } addReplyLongLong(c,numdel); }
int dbAsyncDelete(redisDb *db, robj *key) { if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);//若是key已通過期, 則直接從字典中將此key刪除 dictEntry *de = dictUnlink(db->dict,key->ptr);//獲取要刪除的字典key=>val if (de) { robj *val = dictGetVal(de); size_t free_effort = lazyfreeGetFreeEffort(val); if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) { atomicIncr(lazyfree_objects,1); bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL);//會將要刪除的key發送給後臺子線程去處理 dictSetVal(db->dict,de,NULL);//這裏將val設置爲了null } } if (de) { dictFreeUnlinkedEntry(db->dict,de);//刪除key和val, 由於val已經被前一步設置爲了null, 因此這一步至關於只刪除key if (server.cluster_enabled) slotToKeyDel(key);//若是是集羣模式, 須要將slot上的key刪除 return 1; } else { return 0; } }
dictEntry *dictUnlink(dict *ht, const void *key) { return dictGenericDelete(ht,key,1); } static dictEntry *dictGenericDelete(dict *d, const void *key, int nofree) { uint64_t h, idx; dictEntry *he, *prevHe; int table; if (d->ht[0].used == 0 && d->ht[1].used == 0) return NULL; if (dictIsRehashing(d)) _dictRehashStep(d); h = dictHashKey(d, key); for (table = 0; table <= 1; table++) {//防止正在rehash, 因此要遍歷兩個hash idx = h & d->ht[table].sizemask; he = d->ht[table].table[idx]; prevHe = NULL; while(he) {//獲取字典中的key, val if (key==he->key || dictCompareKeys(d, key, he->key)) { /* Unlink the element from the list */ if (prevHe) prevHe->next = he->next; else d->ht[table].table[idx] = he->next; if (!nofree) { dictFreeKey(d, he); dictFreeVal(d, he); zfree(he); } d->ht[table].used--; return he; } prevHe = he; he = he->next;//hash裏key可能會衝突, 因此要日後遍歷 } if (!dictIsRehashing(d)) break; } return NULL; /* not found */ }
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) { struct bio_job *job = zmalloc(sizeof(*job)); job->time = time(NULL); job->arg1 = arg1; job->arg2 = arg2; job->arg3 = arg3; pthread_mutex_lock(&bio_mutex[type]);//開啓線程鎖 listAddNodeTail(bio_jobs[type],job);//在要刪除的隊列中添加元素 bio_pending[type]++; pthread_cond_signal(&bio_newjob_cond[type]);//向刪除元素的線程發送信號, 使其開始處理 pthread_mutex_unlock(&bio_mutex[type]);//釋放線程鎖 }
整體流程圖:spa
五. 關於slotToKeyDel函數線程
在集羣模式中, 一共有16384個slot, 每一個redis的key必定在一個slot裏, 當對key進行操做時, 經過CRC(key)&16383來肯定這個key屬於哪一個slot
在clusterState.slots_to_keys跳躍表每一個節點的分值都是一個槽號, 而每一個節點成員都是一個數據庫鍵
當往數據庫添加一個鍵時, 節點會將這個鍵以及鍵對應的槽號關聯到slots_to_keys中, 當刪除時, slots_to_keys會刪除健值的對應關係
因此在刪除一個key時, 若是是集羣模式, 會調用slotToKeyDel來刪除跳躍表中的對應關係code