順風車運營研發團隊 閆昌
一. smove算法
使用方式: smove source destination member
使用描述: 將member元素從source集合移動到destination集合
smove是原子性操做
若是 source 集合不存在或不包含指定的 member 元素,則 SMOVE 命令不執行任何操做,僅返回 0 。不然, member 元素從 source 集合中被移除,並添加到 destination 集合中去
當 destination 集合已經包含 member 元素時, SMOVE 命令只是簡單地將 source 集合中的 member 元素刪除。
當 source 或 destination 不是集合類型時,返回一個錯誤。
源碼分析:數組
void smoveCommand(client *c) { robj *srcset, *dstset, *ele; srcset = lookupKeyWrite(c->db,c->argv[1]); dstset = lookupKeyWrite(c->db,c->argv[2]); ele = c->argv[3]; if (srcset == NULL) {//若是source不存在, 則直接返回 addReply(c,shared.czero); return; } if (checkType(c,srcset,OBJ_SET) || (dstset && checkType(c,dstset,OBJ_SET))) return;//source和dest必須所有爲set類型, 不然退出 if (srcset == dstset) {//若是source和dest相同, 則直接返回 addReply(c,setTypeIsMember(srcset,ele->ptr) ? shared.cone : shared.czero); return; } if (!setTypeRemove(srcset,ele->ptr)) {//若是source移除失敗, 則直接返回 addReply(c,shared.czero); return; } notifyKeyspaceEvent(NOTIFY_SET,"srem",c->argv[1],c->db->id); if (setTypeSize(srcset) == 0) {//若是移除一個元素以後, source已經爲空, 則將source刪除 dbDelete(c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id); } if (!dstset) {//若是dest爲空, 須要先將dest建立出來 dstset = setTypeCreate(ele->ptr); dbAdd(c->db,c->argv[2],dstset); } signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[2]); server.dirty++; if (setTypeAdd(dstset,ele->ptr)) {//將元素添加到dest當中 server.dirty++; notifyKeyspaceEvent(NOTIFY_SET,"sadd",c->argv[2],c->db->id); } addReply(c,shared.cone); }
二. spopdom
使用方式spop key [count]
從設置值存儲中移除並返回一個或多個隨機key
源碼分析: count=0的狀況ide
void spopCommand(client *c) { robj *set, *ele, *aux; sds sdsele; int64_t llele; int encoding; if (c->argc == 3) { spopWithCountCommand(c); return; } else if (c->argc > 3) { addReply(c,shared.syntaxerr); return; } if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL || checkType(c,set,OBJ_SET)) return; /** 獲取隨機元素算法: 1. 若是是hash類型, 獲取hash key 1.1 正在rehash: 則獲取 rehashidx + random()%(ht[0]+ht[1]-rehashidx) //此時key可能在ht[0]也可能在ht[1]中, 但一但在rehashidx以後 1.2 沒有在rehash, 直接: random() % ht[0].length 2. 若是是整數集合類型: rand()%length */ encoding = setTypeRandomElement(set,&sdsele,&llele);//獲取隨機的一個元素 if (encoding == OBJ_ENCODING_INTSET) { ele = createStringObjectFromLongLong(llele); set->ptr = intsetRemove(set->ptr,llele,NULL);//刪除這個元素 } else { ele = createStringObject(sdsele,sdslen(sdsele)); setTypeRemove(set,ele->ptr);//刪除這個元素 } notifyKeyspaceEvent(NOTIFY_SET,"spop",c->argv[1],c->db->id); aux = createStringObject("SREM",4); rewriteClientCommandVector(c,3,aux,c->argv[1],ele); decrRefCount(aux); addReplyBulk(c,ele); decrRefCount(ele); if (setTypeSize(set) == 0) {//若是set已經爲空, 則須要刪除set集合 dbDelete(c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id); } signalModifiedKey(c->db,c->argv[1]); server.dirty++; }
count不爲0的狀況: 函數
當count數量大於set的個數時, 直接返回全部的值, 並刪除set
size-count=remining, 當remining*5>count時, 循環count次, 每次隨機彈出一個key
當remining<=5時, 循環remining次, 隨機彈出一個值並壓入到一個新的set中, 最後將這個新的set覆蓋原來的set源碼分析
void spopWithCountCommand(client *c) { long l; unsigned long count, size; robj *set; if (getLongFromObjectOrReply(c,c->argv[2],&l,NULL) != C_OK) return; if (l >= 0) { count = (unsigned long) l; } else { addReply(c,shared.outofrangeerr); return; } if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL || checkType(c,set,OBJ_SET)) return; if (count == 0) { addReply(c,shared.emptymultibulk); return; } size = setTypeSize(set); notifyKeyspaceEvent(NOTIFY_SET,"spop",c->argv[1],c->db->id); server.dirty += count; /* CASE 1: * 當count數量大於set的數量, 則直接將set的全部元素都返回, 並刪除set */ if (count >= size) { sunionDiffGenericCommand(c,c->argv+1,1,NULL,SET_OP_UNION);//獲取集合中的全部值 dbDelete(c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id); rewriteClientCommandVector(c,2,shared.del,c->argv[1]); signalModifiedKey(c->db,c->argv[1]); server.dirty++; return; } robj *propargv[3]; propargv[0] = createStringObject("SREM",4); propargv[1] = c->argv[1]; addReplyMultiBulkLen(c,count); /* Common iteration vars. */ sds sdsele; robj *objele; int encoding; int64_t llele; unsigned long remaining = size-count; /* Elements left after SPOP. */ /*當刪除count個元素後, 剩下的元素個數*5大於count時, 須要循環count次, 每次隨機pop出一個元素*/ if (remaining*SPOP_MOVE_STRATEGY_MUL > count) { while(count--) { /* Emit and remove. */ encoding = setTypeRandomElement(set,&sdsele,&llele);//獲取隨機的一個元素 if (encoding == OBJ_ENCODING_INTSET) { addReplyBulkLongLong(c,llele); objele = createStringObjectFromLongLong(llele); set->ptr = intsetRemove(set->ptr,llele,NULL);//刪除隨機出來的元素 } else { addReplyBulkCBuffer(c,sdsele,sdslen(sdsele)); objele = createStringObject(sdsele,sdslen(sdsele)); setTypeRemove(set,sdsele); } /* Replicate/AOF this command as an SREM operation */ propargv[2] = objele; alsoPropagate(server.sremCommand,c->db->id,propargv,3, PROPAGATE_AOF|PROPAGATE_REPL); decrRefCount(objele); } } else { /* 不然循環remining次, 將pop出來的元素賦值給新的set, 最後將新的set覆蓋原來的set值 */ robj *newset = NULL; /* Create a new set with just the remaining elements. */ while(remaining--) { encoding = setTypeRandomElement(set,&sdsele,&llele); if (encoding == OBJ_ENCODING_INTSET) { sdsele = sdsfromlonglong(llele); } else { sdsele = sdsdup(sdsele); } if (!newset) newset = setTypeCreate(sdsele); setTypeAdd(newset,sdsele); setTypeRemove(set,sdsele); sdsfree(sdsele); } incrRefCount(set); /* Protect the old set value. */ dbOverwrite(c->db,c->argv[1],newset);//將舊的set用新的set來覆蓋 /* Tranfer the old set to the client and release it. */ setTypeIterator *si; si = setTypeInitIterator(set); while((encoding = setTypeNext(si,&sdsele,&llele)) != -1) { if (encoding == OBJ_ENCODING_INTSET) { addReplyBulkLongLong(c,llele); objele = createStringObjectFromLongLong(llele); } else { addReplyBulkCBuffer(c,sdsele,sdslen(sdsele)); objele = createStringObject(sdsele,sdslen(sdsele)); } /* Replicate/AOF this command as an SREM operation */ propargv[2] = objele; alsoPropagate(server.sremCommand,c->db->id,propargv,3, PROPAGATE_AOF|PROPAGATE_REPL); decrRefCount(objele); } setTypeReleaseIterator(si); decrRefCount(set); } decrRefCount(propargv[0]); preventCommandPropagation(c); signalModifiedKey(c->db,c->argv[1]); server.dirty++; }
三. sismemberui
使用方式: sismember key member
判斷member元素是不是集合key的成員
源碼分析:this
int setTypeIsMember(robj *subject, sds value) { long long llval; if (subject->encoding == OBJ_ENCODING_HT) { return dictFind((dict*)subject->ptr,value) != NULL;//從字典中查找 } else if (subject->encoding == OBJ_ENCODING_INTSET) { if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) { return intsetFind((intset*)subject->ptr,llval);//從整數集合中查找 } } else { serverPanic("Unknown set encoding"); } return 0; }
由於整數集合中的元素是按從小到大有序排列的, 因此用二分法查找便可找出spa
static uint8_t intsetSearch(intset *is, int64_t value, uint32_t *pos) { int min = 0, max = intrev32ifbe(is->length)-1, mid = -1; int64_t cur = -1; if (intrev32ifbe(is->length) == 0) {//若是集合長度爲0, 則直接返回 if (pos) *pos = 0; return 0; } else { if (value > _intsetGet(is,intrev32ifbe(is->length)-1)) {//若是value大於整數集合的最大值, 則直接返回 if (pos) *pos = intrev32ifbe(is->length); return 0; } else if (value < _intsetGet(is,0)) {//若是value小於整數集合的最小值, 則直接返回 if (pos) *pos = 0; return 0; } } while(max >= min) {//二分法查找 mid = ((unsigned int)min + (unsigned int)max) >> 1; cur = _intsetGet(is,mid);//整數集合中的數組contents是以char來存儲的值, 但整數集合有int16, int32, int64三種值, 因此要找值時, 須要根據偏移量和長度來獲取正確的值 if (value > cur) { min = mid+1; } else if (value < cur) { max = mid-1; } else { break; } } if (value == cur) { if (pos) *pos = mid; return 1; } else { if (pos) *pos = min; return 0; } } static int64_t _intsetGet(intset *is, int pos) { return _intsetGetEncoded(is,pos,intrev32ifbe(is->encoding)); } static int64_t _intsetGetEncoded(intset *is, int pos, uint8_t enc) { int64_t v64; int32_t v32; int16_t v16; if (enc == INTSET_ENC_INT64) { memcpy(&v64,((int64_t*)is->contents)+pos,sizeof(v64));//須要根據偏移量(pos), 和長度來獲取到正確的值 memrev64ifbe(&v64); return v64; } else if (enc == INTSET_ENC_INT32) { memcpy(&v32,((int32_t*)is->contents)+pos,sizeof(v32)); memrev32ifbe(&v32); return v32; } else { memcpy(&v16,((int16_t*)is->contents)+pos,sizeof(v16)); memrev16ifbe(&v16); return v16; } }
四. smemberscode
執行方式: smembers key
返回集合key中的全部成員
不存在的key被視爲空集合
smembers底層調用sinterGenericCommand函數, 即將全部集合(如今只有一個)取交集
void sinterGenericCommand(client *c, robj **setkeys, unsigned long setnum, robj *dstkey) { robj **sets = zmalloc(sizeof(robj*)*setnum); setTypeIterator *si; robj *dstset = NULL; sds elesds; int64_t intobj; void *replylen = NULL; unsigned long j, cardinality = 0; int encoding; for (j = 0; j < setnum; j++) {//setnum的值爲1, 這個for循環只會執行一次 robj *setobj = dstkey ? lookupKeyWrite(c->db,setkeys[j]) : lookupKeyRead(c->db,setkeys[j]); if (!setobj) { zfree(sets); if (dstkey) { if (dbDelete(c->db,dstkey)) { signalModifiedKey(c->db,dstkey); server.dirty++; } addReply(c,shared.czero); } else { addReply(c,shared.emptymultibulk); } return; } if (checkType(c,setobj,OBJ_SET)) { zfree(sets); return; } sets[j] = setobj;//將集合賦值給新的集合sets[0] } qsort(sets,setnum,sizeof(robj*),qsortCompareSetsByCardinality); if (!dstkey) { replylen = addDeferredMultiBulkLength(c); } else { /* If we have a target key where to store the resulting set * create this key with an empty set inside */ dstset = createIntsetObject(); } si = setTypeInitIterator(sets[0]); while((encoding = setTypeNext(si,&elesds,&intobj)) != -1) { ... ... /* Only take action when all sets contain the member */ if (j == setnum) { if (!dstkey) {//while循環set, 將每次遍歷取出的值返回給客戶端 if (encoding == OBJ_ENCODING_HT) addReplyBulkCBuffer(c,elesds,sdslen(elesds)); else addReplyBulkLongLong(c,intobj); cardinality++; } else { ... } } } setTypeReleaseIterator(si); if (dstkey) { ...... } else { setDeferredMultiBulkLength(c,replylen,cardinality); } zfree(sets); }
五. srandmember
同spop命令, 一樣能夠接受多個count, 它與spop的區別爲srandmember不會刪除元素值, spop會刪除元素值
六. srem
使用方式 srem key member [member ....]
移除集合中的一個或多個member元素, 不存在的member元素會被忽略
源碼分析:
void sremCommand(client *c) { robj *set; int j, deleted = 0, keyremoved = 0; if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,set,OBJ_SET)) return; for (j = 2; j < c->argc; j++) {//能夠刪除多個元素 if (setTypeRemove(set,c->argv[j]->ptr)) { deleted++; if (setTypeSize(set) == 0) { dbDelete(c->db,c->argv[1]); keyremoved = 1; break; } } } if (deleted) { signalModifiedKey(c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_SET,"srem",c->argv[1],c->db->id); if (keyremoved) notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1], c->db->id); server.dirty += deleted; } addReplyLongLong(c,deleted); } int setTypeRemove(robj *setobj, sds value) { long long llval; if (setobj->encoding == OBJ_ENCODING_HT) { if (dictDelete(setobj->ptr,value) == DICT_OK) {//從字典中刪除 if (htNeedsResize(setobj->ptr)) dictResize(setobj->ptr); return 1; } } else if (setobj->encoding == OBJ_ENCODING_INTSET) { if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) { int success; setobj->ptr = intsetRemove(setobj->ptr,llval,&success);//從整數集合中刪除 if (success) return 1; } } else { serverPanic("Unknown set encoding"); } return 0; }