Redis源碼剖析(十二)--客戶端和服務器

 客戶端屬性

客戶端的狀態保存在結構體 redisClient 中,下面給出redisClient的部分屬性:redis

typedef struct redisClient{

    // 套接字描述符
    int fd; 
   
    // 客戶端狀態標誌
    int flags;

    // 輸入緩衝區
    sds querybuf;

    // 命令參數
    robj** argv;
    int argc;

    // 命令的實現函數
    struct redisCommand *cmd;

    // 固定輸出緩衝區
    char buf[REDIS_REPLY_CHUNK_BYTES];
    int bufpos;

    // 可變大小輸出緩衝區
    list* reply;

    // ......
};
  •  fd 屬性:客戶端使用的套接字描述符,僞客戶端的fd屬性爲 -1,普通客戶端的fd屬性爲大於0的整數。數據庫

  • flags 屬性:客戶端狀態。在主從服務器複製時,主服務器和從服務器互爲客戶端,REDIS_MASTER 標誌表示客戶端表明的是一個主服務器,REDIS_SLAVE 標誌表示客戶端表明的是一個從服務器。數組

  • querybuf 屬性:保存客戶端發送的命令請求。服務器

  • argv、argc 屬性:對客戶端的命令請求分析,獲得的命令參數及命令參數的個數。dom

  • cmd 屬性:服務器從客戶端發送的命令請求中分析獲得argv、argc參數後,會根據argv[0]的值,去查找該命令對應的實現函數,並使cmd指針指向該實現函數。函數

  • buf、bufpos 屬性:bufpos屬性記錄了buf 數組已使用的字節數量。this

  • reply 屬性:當buf 數組空間不夠用時,服務器會使用 reply 可變大小緩衝區。lua

 


 命令請求

命令的執行過程

  服務器在接收到命令後,會將命令以對象的形式保存在服務器client的參數列表 robj** argv 中,所以服務器執行命令請求時,服務器已經讀入了一套命令參數保存在參數列表中。執行命令的過程對應的函數是processCommand(),部分源碼以下:spa

int processCommand(redisClient *c) {

    // 查找命令,並進行命令合法性檢查,以及命令參數個數檢查
    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
    if (!c->cmd) {
        // 沒找到指定的命令
        flagTransaction(c);
        addReplyErrorFormat(c,"unknown command '%s'",
            (char*)c->argv[0]->ptr);
        return REDIS_OK;
    } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
               (c->argc < -c->cmd->arity)) {
        // 參數個數錯誤
        flagTransaction(c);
        addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
            c->cmd->name);
        return REDIS_OK;
    }
  // ......
     
    /* Exec the command */
    if (c->flags & REDIS_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
    {
        // 在事務上下文中
        // 除 EXEC 、 DISCARD 、 MULTI 和 WATCH 命令以外
        // 其餘全部命令都會被入隊到事務隊列中
        queueMultiCommand(c);
        addReply(c,shared.queued);
    } else {
        // 執行命令
        call(c,REDIS_CALL_FULL);

        c->woff = server.master_repl_offset;
        // 處理那些解除了阻塞的鍵
        if (listLength(server.ready_keys))
            handleClientsBlockedOnLists();
    }
    return REDIS_OK;
}

 

咱們總結出執行命令的大體過程:unix

  • 查找命令。對應的代碼是:c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr)

  • 執行命令前的準備

  • 執行命令。對應代碼是:call(c,REDIS_CALL_FULL)

 


 

查找命令

  lookupCommand 函數是對 dictFetchValue 函數的封裝。dictFetchValue 函數會從 server.commands 字典中查找 name 命令。這個保存命令表的字典,鍵是命令的名稱,值是命令表的地址。服務器初始化時會建立一張命令表。命令表部分代碼以下:

struct redisCommand redisCommandTable[] = {
    {"get",getCommand,2,"r",0,NULL,1,1,1,0,0},
    {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
    {"setnx",setnxCommand,3,"wm",0,NULL,1,1,1,0,0},
    
    // ......
};

 

 


 

 執行命令

執行命令調用了call(c, CMD_CALL_FULL)函數,該函數是執行命令的核心。該函數實際上是對 c->cmd->proc(c) 的封裝, proc 指向命令的實現函數。

void call(redisClient *c, int flags) {
    // start 記錄命令開始執行的時間
    long long dirty, start, duration;
    // 記錄命令開始執行前的 FLAG
    int client_old_flags = c->flags;

    // 若是能夠的話,將命令發送到 MONITOR
    if (listLength(server.monitors) &&
        !server.loading &&
        !(c->cmd->flags & REDIS_CMD_SKIP_MONITOR))
    {
        replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
    }

    /* Call the command. */
    c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL);
    redisOpArrayInit(&server.also_propagate);
    // 保留舊 dirty 計數器值
    dirty = server.dirty;
    // 計算命令開始執行的時間
    start = ustime();
    // 執行實現函數
    c->cmd->proc(c);
    // 計算命令執行耗費的時間
    duration = ustime()-start;
    // 計算命令執行以後的 dirty 值
    dirty = server.dirty-dirty;

    // 不將從 Lua 中發出的命令放入 SLOWLOG ,也不進行統計
    if (server.loading && c->flags & REDIS_LUA_CLIENT)
        flags &= ~(REDIS_CALL_SLOWLOG | REDIS_CALL_STATS);

    // 若是調用者是 Lua ,那麼根據命令 FLAG 和客戶端 FLAG
    // 打開傳播(propagate)標誌
    if (c->flags & REDIS_LUA_CLIENT && server.lua_caller) {
        if (c->flags & REDIS_FORCE_REPL)
            server.lua_caller->flags |= REDIS_FORCE_REPL;
        if (c->flags & REDIS_FORCE_AOF)
            server.lua_caller->flags |= REDIS_FORCE_AOF;
    }

    // 若是有須要,將命令放到 SLOWLOG 裏面
    if (flags & REDIS_CALL_SLOWLOG && c->cmd->proc != execCommand)
        slowlogPushEntryIfNeeded(c->argv,c->argc,duration);
    // 更新命令的統計信息
    if (flags & REDIS_CALL_STATS) {
        c->cmd->microseconds += duration;
        c->cmd->calls++;
    }

    // 將命令複製到 AOF 和 slave 節點
    if (flags & REDIS_CALL_PROPAGATE) {
        int flags = REDIS_PROPAGATE_NONE;

        // 強制 REPL 傳播
        if (c->flags & REDIS_FORCE_REPL) flags |= REDIS_PROPAGATE_REPL;

        // 強制 AOF 傳播
        if (c->flags & REDIS_FORCE_AOF) flags |= REDIS_PROPAGATE_AOF;

        // 若是數據庫有被修改,那麼啓用 REPL 和 AOF 傳播
        if (dirty)
            flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF);

        if (flags != REDIS_PROPAGATE_NONE)
            propagate(c->cmd,c->db->id,c->argv,c->argc,flags);
    }

    // 將客戶端的 FLAG 恢復到命令執行以前
    // 由於 call 可能會遞歸執行
    c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL);
    c->flags |= client_old_flags & (REDIS_FORCE_AOF|REDIS_FORCE_REPL);

    // 傳播額外的命令
    if (server.also_propagate.numops) {
        int j;
        redisOp *rop;

        for (j = 0; j < server.also_propagate.numops; j++) {
            rop = &server.also_propagate.ops[j];
            propagate(rop->cmd, rop->dbid, rop->argv, rop->argc, rop->target);
        }
        redisOpArrayFree(&server.also_propagate);
    }
    server.stat_numcommands++;
}

 

 執行命令 c->cmd->proc(c) 就至關於執行了命令實現的函數,而後會在執行完成後,由這些函數產生相應的命令回覆,根據回覆的大小,會將回復保存在輸出緩衝區 buf 或可變輸出緩衝區鏈表 reply 中。

 


 

 maxmemory策略

  Redis 服務器對內存使用會有一個server.maxmemory的限制,若是超過這個限制,就要經過刪除一些鍵空間來釋放一些內存,具體函數對應freeMemoryIfNeeded()。釋放內存時,能夠指定不一樣的策略。策略保存在maxmemory_policy中,能夠指定如下的幾個值:

#define MAXMEMORY_VOLATILE_LRU      0
#define MAXMEMORY_VOLATILE_TTL      1
#define MAXMEMORY_VOLATILE_RANDOM   2
#define MAXMEMORY_ALLKEYS_LRU       3
#define MAXMEMORY_ALLKEYS_RANDOM    4
#define MAXMEMORY_NO_EVICTION       5

能夠看出主要分爲三種:

  • LRU:優先刪除最近最少使用的鍵。
  • TTL:優先刪除生存時間最短的鍵。
  • RANDOM:隨機刪除。

而ALLKEYS和VOLATILE的不一樣之處就是要肯定是從數據庫的鍵值對字典仍是過時鍵字典中刪除。

int freeMemoryIfNeeded(void) {
    size_t mem_used, mem_tofree, mem_freed;
    int slaves = listLength(server.slaves);
    // 計算出 Redis 目前佔用的內存總數,但有兩個方面的內存不會計算在內:
    // 1)從服務器的輸出緩衝區的內存
    // 2)AOF 緩衝區的內存
    mem_used = zmalloc_used_memory();
    if (slaves) {
        listIter li;
        listNode *ln;

        listRewind(server.slaves,&li);
        while((ln = listNext(&li))) {
            redisClient *slave = listNodeValue(ln);
            unsigned long obuf_bytes = getClientOutputBufferMemoryUsage(slave);
            if (obuf_bytes > mem_used)
                mem_used = 0;
            else
                mem_used -= obuf_bytes;
        }
    }
    if (server.aof_state != REDIS_AOF_OFF) {
        mem_used -= sdslen(server.aof_buf);
        mem_used -= aofRewriteBufferSize();
    }

    // 若是目前使用的內存大小比設置的 maxmemory 要小,那麼無須執行進一步操做
    if (mem_used <= server.maxmemory) return REDIS_OK;

    // 若是佔用內存比 maxmemory 要大,可是 maxmemory 策略爲不淘汰,那麼直接返回
    if (server.maxmemory_policy == REDIS_MAXMEMORY_NO_EVICTION)
        return REDIS_ERR; /* We need to free memory, but policy forbids. */
    // 計算須要釋放多少字節的內存
    mem_tofree = mem_used - server.maxmemory;

    // 初始化已釋放內存的字節數爲 0
    mem_freed = 0;

    // 根據 maxmemory 策略,
    // 遍歷字典,釋放內存並記錄被釋放內存的字節數
    while (mem_freed < mem_tofree) {
        int j, k, keys_freed = 0;

        // 遍歷全部字典
        for (j = 0; j < server.dbnum; j++) {
            long bestval = 0; /* just to prevent warning */
            sds bestkey = NULL;
            dictEntry *de;
            redisDb *db = server.db+j;
            dict *dict;
            if (server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_LRU ||
                server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_RANDOM)
            {
                // 若是策略是 allkeys-lru 或者 allkeys-random 
                // 那麼淘汰的目標爲全部數據庫鍵
                dict = server.db[j].dict;
            } else {
                // 若是策略是 volatile-lru 、 volatile-random 或者 volatile-ttl 
                // 那麼淘汰的目標爲帶過時時間的數據庫鍵
                dict = server.db[j].expires;
            }

            // 跳過空字典
            if (dictSize(dict) == 0) continue;

            /* volatile-random and allkeys-random policy */
            // 若是使用的是隨機策略,那麼從目標字典中隨機選出鍵
            if (server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_RANDOM ||
                server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_RANDOM)
            {
                de = dictGetRandomKey(dict);
                bestkey = dictGetKey(de);
            }

            // 若是使用的是 LRU 策略,
            // 那麼從一集 sample 鍵中選出 IDLE 時間最長的那個鍵
            else if (server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_LRU ||
                server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_LRU)
            {
                struct evictionPoolEntry *pool = db->eviction_pool;

                while(bestkey == NULL) {
                    evictionPoolPopulate(dict, db->dict, db->eviction_pool);
                    /* Go backward from best to worst element to evict. */
                    for (k = REDIS_EVICTION_POOL_SIZE-1; k >= 0; k--) {
                        if (pool[k].key == NULL) continue;
                        de = dictFind(dict,pool[k].key);

                        /* Remove the entry from the pool. */
                        sdsfree(pool[k].key);
                        /* Shift all elements on its right to left. */
                        memmove(pool+k,pool+k+1,
                            sizeof(pool[0])*(REDIS_EVICTION_POOL_SIZE-k-1));
                        /* Clear the element on the right which is empty
                         * since we shifted one position to the left.  */
                        pool[REDIS_EVICTION_POOL_SIZE-1].key = NULL;
                        pool[REDIS_EVICTION_POOL_SIZE-1].idle = 0;

                        /* If the key exists, is our pick. Otherwise it is
                         * a ghost and we need to try the next element. */
                        if (de) {
                            bestkey = dictGetKey(de);
                            break;
                        } else {
                            /* Ghost... */
                            continue;
                        }
                    }
                }
            }
            // 策略爲 volatile-ttl ,從一集 sample 鍵中選出過時時間距離當前時間最接近的鍵
            else if (server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_TTL) {
                for (k = 0; k < server.maxmemory_samples; k++) {
                    sds thiskey;
                    long thisval;

                    de = dictGetRandomKey(dict);
                    thiskey = dictGetKey(de);
                    thisval = (long) dictGetVal(de);

                    /* Expire sooner (minor expire unix timestamp) is better
                     * candidate for deletion */
                    if (bestkey == NULL || thisval < bestval) {
                        bestkey = thiskey;
                        bestval = thisval;
                    }
                }
            }

            // 刪除被選中的鍵
            if (bestkey) {
                long long delta;

                robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
                propagateExpire(db,keyobj);
                // 計算刪除鍵所釋放的內存數量
                delta = (long long) zmalloc_used_memory();
                dbDelete(db,keyobj);
                delta -= (long long) zmalloc_used_memory();
                mem_freed += delta;
                
                // 對淘汰鍵的計數器增一
                server.stat_evictedkeys++;

                notifyKeyspaceEvent(REDIS_NOTIFY_EVICTED, "evicted",
                    keyobj, db->id);
                decrRefCount(keyobj);
                keys_freed++;

                if (slaves) flushSlavesOutputBuffers();
            }
        }

        if (!keys_freed) return REDIS_ERR; /* nothing to free... */
    }
    return REDIS_OK;
}
相關文章
相關標籤/搜索