Redis之Set命令

0.前言

redis對無序集合的操做幾個命令,本文介紹幾個命令實際操做過程。redis

1.sadd命令
2.求差集和求並集命令
3.求交集命令算法

1.sadd命令

void saddCommand(redisClient *c) {
    robj *set;
    int j, added = 0;
   
     /*查找集合,若是不存在建立新的集合*/
    set = lookupKeyWrite(c->db,c->argv[1]);
    if (set == NULL) {
          /*
          *建立集合,若是添加的元素能夠轉換爲longlong類型,則存儲格式採用intset數據結構,不然採用hash table數據結構進行存儲
          */
        set = setTypeCreate(c->argv[2]);
        dbAdd(c->db,c->argv[1],set);
    } else {
        if (set->type != REDIS_SET) {
            addReply(c,shared.wrongtypeerr);
            return;
        }
    }

    for (j = 2; j < c->argc; j++) {
        c->argv[j] = tryObjectEncoding(c->argv[j]);
          /*元素添加進集合中*/
        if (setTypeAdd(set,c->argv[j])) added++;
    }
    if (added) {
        signalModifiedKey(c->db,c->argv[1]);
        notifyKeyspaceEvent(REDIS_NOTIFY_SET,"sadd",c->argv[1],c->db->id);
    }
    server.dirty += added;
    addReplyLongLong(c,added);
}

int setTypeAdd(robj *subject, robj *value) {
    long long llval;
    if (subject->encoding == REDIS_ENCODING_HT) {
        if (dictAdd(subject->ptr,value,NULL) == DICT_OK) {
            incrRefCount(value);
            return 1;
        }
    } else if (subject->encoding == REDIS_ENCODING_INTSET) {
          /*若是添加元素能夠轉換爲longlong類型,保存至intset中,不然須要轉換存儲結構爲hash table*/
        if (isObjectRepresentableAsLongLong(value,&llval) == REDIS_OK) {
            uint8_t success = 0;
            subject->ptr = intsetAdd(subject->ptr,llval,&success);
            if (success) {
                /* 爲了防止intset過大,set_max_intset_entries值做爲一個閥值,佔用空間大於此值,則將存儲結構轉換爲hash table類型*/
                if (intsetLen(subject->ptr) > server.set_max_intset_entries)
                    setTypeConvert(subject,REDIS_ENCODING_HT);
                return 1;
            }
        } else {
            /* 轉換爲longlong失敗,須要轉換爲hash table*/
            setTypeConvert(subject,REDIS_ENCODING_HT);

            /* 新元素添加至hash table中*/
            redisAssertWithInfo(NULL,value,dictAdd(subject->ptr,value,NULL) == DICT_OK);
            incrRefCount(value);
            return 1;
        }
    } else {
        redisPanic("Unknown set encoding");
    }
    return 0;
}

2.求差集和並集命令(sdiff,sdiffstore,sunion,sunionstore)

sdiff求差集, sdiffstore求差集並保存結果, sunion求並集, sunionstore求並集並保存結果, 幾種運算過程都是經過sunionDiffGenericCommand函數進行,此處將幾個命令所有列出.數據結構

/*求並集*/
void sunionCommand(redisClient *c) {
    sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
}
/*求並集並保存結果*/
void sunionstoreCommand(redisClient *c) {
    sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
}
/*求差集*/
void sdiffCommand(redisClient *c) {
    sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
}
/*求差集並保存結果*/
void sdiffstoreCommand(redisClient *c) {
    sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
}
/*通用的求差集和並集函數*/
void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj *dstkey, int op) {
    robj **sets = zmalloc(sizeof(robj*)*setnum);
    setTypeIterator *si;
    robj *ele, *dstset = NULL;
    int j, cardinality = 0;
    int diff_algo = 1;
   
     /*取出須要操做的集合*/
    for (j = 0; j < setnum; j++) {
        robj *setobj = dstkey ?
            lookupKeyWrite(c->db,setkeys[j]) :
            lookupKeyRead(c->db,setkeys[j]);
        if (!setobj) {
            sets[j] = NULL;
            continue;
        }
        if (checkType(c,setobj,REDIS_SET)) {
            zfree(sets);
            return;
        }
        sets[j] = setobj;
    }

    /*
     *依據待運算集合中元素數量,選擇計算差集算法, 其中算法1時間複雜度:O(N*M), N是第一個集合中元素個數, M是參與運算的集合數量.
     *算法2時間複雜度:O(N), N是全部集合中元素數量總和
     */
    if (op == REDIS_OP_DIFF && sets[0]) {
        long long algo_one_work = 0, algo_two_work = 0;

        for (j = 0; j < setnum; j++) {
            if (sets[j] == NULL) continue;

            algo_one_work += setTypeSize(sets[0]);
            algo_two_work += setTypeSize(sets[j]);
        }

        /*
          *algo_one_work值即爲算法1中N*M, algo_two_work值即爲算法2中N. 考慮到若是參與運算集合爲intset時, 算法1的時間複雜度穩定性要好於算法2,
          *所以沒有直接比較二者大小選擇算法, 而是算法1理論時間複雜度一半大於算法2時, 才使用算法2
          */
        algo_one_work /= 2;
        diff_algo = (algo_one_work <= algo_two_work) ? 1 : 2;

        if (diff_algo == 1 && setnum > 1) {
            /*爲了提升算法1速度, 儘快找到重複元素, 對集合列表按照元素數量進行了降序排序*/
            qsort(sets+1,setnum-1,sizeof(robj*),
                qsortCompareSetsByRevCardinality);
        }
    }

    /*建立一個臨時集合存放計算結果*/
    dstset = createIntsetObject();

    if (op == REDIS_OP_UNION) {
        /* 求並集很簡單了, 直接遍歷全部元素, 添加進dstset集合中便可*/
        for (j = 0; j < setnum; j++) {
            if (!sets[j]) continue; /* non existing keys are like empty sets */

            si = setTypeInitIterator(sets[j]);
            while((ele = setTypeNextObject(si)) != NULL) {
                if (setTypeAdd(dstset,ele)) cardinality++;
                decrRefCount(ele);
            }
            setTypeReleaseIterator(si);
        }
    } else if (op == REDIS_OP_DIFF && sets[0] && diff_algo == 1) {
        /*
          *算法1對集合1進行遍歷, 並判斷集合1中的元素是否在其餘集合中出現, 沒有出現則添加到dstset集合中, 做爲差集的一個元素
          */
        si = setTypeInitIterator(sets[0]);
          /*
          *循環外層對集合1進行遍歷, 內層對其餘參與運算的集合進行遍歷
          */
        while((ele = setTypeNextObject(si)) != NULL) {
            for (j = 1; j < setnum; j++) {
                if (!sets[j]) continue; /* no key is an empty set. */
                if (sets[j] == sets[0]) break; /* same set! */
                if (setTypeIsMember(sets[j],ele)) break;
            }
            if (j == setnum) {
                /* 其餘集合中沒有找到該元素, 添加到差集集合中*/
                setTypeAdd(dstset,ele);
                cardinality++;
            }
            decrRefCount(ele);
        }
        setTypeReleaseIterator(si);
    } else if (op == REDIS_OP_DIFF && sets[0] && diff_algo == 2) {
        /*
          *算法2將集合1中元素直接copy進dstset集合中, 經過遍歷其餘全部集合, 而後確認其餘集合中的元素沒有在dstset中出現, 出現則從dstset中刪除, 最終獲取差集
          */
        for (j = 0; j < setnum; j++) {
            if (!sets[j]) continue; /* non existing keys are like empty sets */

            si = setTypeInitIterator(sets[j]);
            while((ele = setTypeNextObject(si)) != NULL) {
                if (j == 0) {
                         /*集合1中元素添加進dstset中*/
                    if (setTypeAdd(dstset,ele)) cardinality++;
                } else {
                         /*其餘集合中元素出如今dstset中,則刪除該元素*/
                    if (setTypeRemove(dstset,ele)) cardinality--;
                }
                decrRefCount(ele);
            }
            setTypeReleaseIterator(si);

            if (cardinality == 0) break;
        }
    }

    if (!dstkey) {
         /*運算結果不須要存儲,直接返回結果元素至客戶端*/
        addReplyMultiBulkLen(c,cardinality);
        si = setTypeInitIterator(dstset);
        while((ele = setTypeNextObject(si)) != NULL) {
            addReplyBulk(c,ele);
            decrRefCount(ele);
        }
        setTypeReleaseIterator(si);
        decrRefCount(dstset);
    } else {
        /* 須要存儲, 首先刪除原來可能已經存在dstkey的集合*/
        int deleted = dbDelete(c->db,dstkey);
        if (setTypeSize(dstset) > 0) {
            dbAdd(c->db,dstkey,dstset);
            addReplyLongLong(c,setTypeSize(dstset));
            notifyKeyspaceEvent(REDIS_NOTIFY_SET,
                op == REDIS_OP_UNION ? "sunionstore" : "sdiffstore",
                dstkey,c->db->id);
        } else {
            decrRefCount(dstset);
            addReply(c,shared.czero);
            if (deleted)
                notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",
                    dstkey,c->db->id);
        }
        signalModifiedKey(c->db,dstkey);
        server.dirty++;
    }
    zfree(sets);
}

3.求交集命令(sinter,sinterstore)

sinter求交集, sinterstore求交集並保存結果, 都是經過sinterGenericCommand函數進行相應的操做app

/*求交集*/
void sinterCommand(redisClient *c) {
    sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
}
/*求交集並保存結果*/
void sinterstoreCommand(redisClient *c) {
    sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
}
/*通用求交集函數*/
void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum, robj *dstkey) {
    robj **sets = zmalloc(sizeof(robj*)*setnum);
    setTypeIterator *si;
    robj *eleobj, *dstset = NULL;
    int64_t intobj;
    void *replylen = NULL;
    unsigned long j, cardinality = 0;
    int encoding;

     /*遍歷全部key, 讀出全部傳入的全部集合*/
    for (j = 0; j < setnum; j++) {
        robj *setobj = dstkey ?
            lookupKeyWrite(c->db,setkeys[j]) :
            lookupKeyRead(c->db,setkeys[j]);
        if (!setobj) {
            zfree(sets);
            if (dstkey) {
                if (dbDelete(c->db,dstkey)) {
                    signalModifiedKey(c->db,dstkey);
                    server.dirty++;
                }
                addReply(c,shared.czero);
            } else {
                addReply(c,shared.emptymultibulk);
            }
            return;
        }
        if (checkType(c,setobj,REDIS_SET)) {
            zfree(sets);
            return;
        }
        sets[j] = setobj;
    }
    /* 按照集合中元素數量升序排列, 提升後面算法性能, 儘快決定元素是不是交集元素*/
    qsort(sets,setnum,sizeof(robj*),qsortCompareSetsByCardinality);

    /* The first thing we should output is the total number of elements...
     * since this is a multi-bulk write, but at this stage we don't know
     * the intersection set size, so we use a trick, append an empty object
     * to the output list and save the pointer to later modify it with the
     * right length */
    if (!dstkey) {
        replylen = addDeferredMultiBulkLength(c);
    } else {
        /* If we have a target key where to store the resulting set
         * create this key with an empty set inside */
        dstset = createIntsetObject();
    }

    /* Iterate all the elements of the first (smallest) set, and test
     * the element against all the other sets, if at least one set does
     * not include the element it is discarded */
    si = setTypeInitIterator(sets[0]);
    while((encoding = setTypeNext(si,&eleobj,&intobj)) != -1) {
        for (j = 1; j < setnum; j++) {
            if (sets[j] == sets[0]) continue;
               /*
               *依據不一樣的編碼進行相應的操做
               */
            if (encoding == REDIS_ENCODING_INTSET) {
                /* 編碼均爲intset時,則直接進行查找 */
                if (sets[j]->encoding == REDIS_ENCODING_INTSET &&
                    !intsetFind((intset*)sets[j]->ptr,intobj))
                {
                    break;
                /* 編碼爲hash table時, 從新建立object進行比較 */
                } else if (sets[j]->encoding == REDIS_ENCODING_HT) {
                    eleobj = createStringObjectFromLongLong(intobj);
                    if (!setTypeIsMember(sets[j],eleobj)) {
                        decrRefCount(eleobj);
                        break;
                    }
                    decrRefCount(eleobj);
                }
            } else if (encoding == REDIS_ENCODING_HT) {
                    /*待查集合爲intset, 則能夠直接安卓long類型進行查找, 不然只能object在hash table中查找*/
                if (eleobj->encoding == REDIS_ENCODING_INT &&
                    sets[j]->encoding == REDIS_ENCODING_INTSET &&
                    !intsetFind((intset*)sets[j]->ptr,(long)eleobj->ptr))
                {
                    break;
                } else if (!setTypeIsMember(sets[j],eleobj)) {
                    break;
                }
            }
        }

        /* 查找到最後一個集合表示此元素在全部集合中均出現, 做爲交集結果 */
        if (j == setnum) {
            if (!dstkey) {
                if (encoding == REDIS_ENCODING_HT)
                    addReplyBulk(c,eleobj);
                else
                    addReplyBulkLongLong(c,intobj);
                cardinality++;
            } else {
                if (encoding == REDIS_ENCODING_INTSET) {
                    eleobj = createStringObjectFromLongLong(intobj);
                    setTypeAdd(dstset,eleobj);
                    decrRefCount(eleobj);
                } else {
                    setTypeAdd(dstset,eleobj);
                }
            }
        }
    }
    setTypeReleaseIterator(si);
    
     /*判斷是否須要存儲交集結果, 並進行相應操做*/
    if (dstkey) {
        int deleted = dbDelete(c->db,dstkey);
        if (setTypeSize(dstset) > 0) {
            dbAdd(c->db,dstkey,dstset);
            addReplyLongLong(c,setTypeSize(dstset));
            notifyKeyspaceEvent(REDIS_NOTIFY_SET,"sinterstore",
                dstkey,c->db->id);
        } else {
            decrRefCount(dstset);
            addReply(c,shared.czero);
            if (deleted)
                notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",
                    dstkey,c->db->id);
        }
        signalModifiedKey(c->db,dstkey);
        server.dirty++;
    } else {
        setDeferredMultiBulkLength(c,replylen,cardinality);
    }
    zfree(sets);
}

總結

集合的幾種操做都是比較耗時的, 使用時對於特別龐大的集合進行運算須要謹慎, 可能影響總體性能.ide

相關文章
相關標籤/搜索