【Redis5源碼學習】淺析redis命令執行的生命週期

baiyanc++

引入

首先看一張咱們很是熟悉的redis命令執行圖:

那麼思考這樣一個問題,當咱們鏈接了redis服務端以後,而後輸入並執行某條redis命令:如set key1 value1。這條命令到底是如何被髮送到redis服務端的,redis服務端又是如何解析,並做出相應處理,並返回執行成功的呢?redis

客戶端到服務端的命令傳輸(請求)

redis在TCP協議基礎之上,封裝了本身的一套協議規範,方便服務端與客戶端去接收與解析數據,劃清命令參數之間的邊界,方便最終對以TCP字節流傳輸的數據進行處理。下面咱們使用tcpdump來捕獲redis-cli發送命令時的數據包:算法

tcpdump port 6379 -i lo -X

這時,咱們在客戶端中輸入set key1 value1命令。在tcpdump中捕獲的數據包以下:

第一個是客戶端發送命令到redis服務端時的數據包,而第二個是redis服務端響應給客戶端的數據包。咱們首先只看第一個數據包,它從客戶端43856端口發送到redis服務端的6379端口。首先前20個字節是IP頭部,後32字節是TCP頭部(因爲TCP頭部後面存在可選項)。
咱們主要關注從「2a33」開始的數據信息,從這裏開始就是redis具體的數據格式了。從右邊對數據的一個ASCII碼翻譯也能夠看到set、key一、value1的字樣,中間還有一些用.表示的字符,那麼這裏,咱們根據抓包結果分析一下redis數據傳輸的協議格式。segmentfault

  • 2a33:0x2a是字符"*"的ASCII碼值,0x33是"3"的ASCII碼值(十進制值是51)
  • 0d0a:0d是"r"的ASCII碼值,0a是"n"的ASCII碼值
  • 7365:是"s"和"e"的ASCII碼值
  • 740d:是"t"和"r"的ASCII碼值
  • 0a24:是"n"和"$"的ASCII碼值
  • 340d:是"4"和"r"的ASCII碼值
  • 0a6b:是"n"和"k"的ASCII碼值
  • 6579:是"e"和"y"的ASCII碼值
  • 310d:是"1"和"r"的ASCII碼值
  • 0a24:是"n"和"$"的ASCII碼值
  • 360d:是」6"和"r"的ASCII碼值
  • 0a76:是"n"和"v"的ASCII碼值
  • 616c:是"a"和"l"的ASCII碼值
  • 7565:是"u"和"e"的ASCII碼值
  • 310d:是"1"和"r"的ASCII碼值
  • 0a: 是"n"的ASCII碼值

看到這裏,咱們是否可以發現如下規律:數組

  • redis以"*"做爲標誌,表示命令的開始。在*後面緊跟的數字表明參數的個數(set key1 value1有3個參數因此爲3)
  • redis以"$"做爲命令參數的開始,後面緊跟的數字表明參數的長度(如key1的長度爲4因此爲$4)
  • redis以"rn"做爲參數之間的分隔符,方便解析TCP字節流數據時定位邊界位置

綜合來看,客戶端向服務端發送的redis數據包格式以下:緩存

*3 \r\n set \r\n $4 \r\n key1 \r\n $6 \r\n value1 \r\n

相比FastCGI協議,redis僅僅使用幾個分隔符和特殊字符,就完成了對命令的傳輸語法及數據格式的規範化,同時服務端經過其中定義好的分隔符,也可以方便高效地從字節流數據中解析並讀取出正確的數據。這種通訊協議簡單高效,可以知足redis對高性能的要求。安全

服務端對命令的處理

既然命令已經經過redis數據傳輸協議安全地送達到了服務端,那麼,服務端就要開始對傳輸過來的字節流數據進行處理啦。因爲咱們在協議中清晰地定義了每一個參數的邊界(\r\n),因此,redis服務端解析起來也很是輕鬆。網絡

第一步:回調函數的使用

redis是典型事件驅動程序。爲了提升單進程的redis的性能,redis採用IO多路複用技術來處理客戶端的命令請求。redis會在建立客戶端實例的時,指定服務端接收到客戶端命令請求的事件時,所要執行的事件處理函數:app

client *createClient(int fd) {
    
    client *c = zmalloc(sizeof(client));

    if (fd != -1) {
        anetNonBlock(NULL,fd); //設置非阻塞
        anetEnableTcpNoDelay(NULL,fd); //設置不採用Nagle算法,避免半包與粘包現象
        if (server.tcpkeepalive)
            anetKeepAlive(NULL,fd,server.tcpkeepalive); //設置keep-alive
        //注意這裏建立了一個文件事件。當客戶端讀事件就緒的時候,回調readQueryFromClient()函數
        if (aeCreateFileEvent(server.el,fd,AE_READABLE,readQueryFromClient, c) == AE_ERR) {
            close(fd);
            zfree(c);
            return NULL;
        }
    }
    ... 
}

爲了暫存客戶端請求到服務端的字節流數據,redis封裝了一個接收緩衝區,來緩存從套接字中讀取的數據。後續的命令處理流程從緩衝區中讀取命令數據並處理便可。緩衝區的好處在於不用一直維持讀寫套接字。在後續的流程中,咱們只須要從緩衝區中讀取數據,而不是仍從套接字中讀取。這樣就能夠提早釋放套接字,節省資源。緩衝區的創建與使用就是在以前講過的客戶端回調函數readQueryFromClient()中完成的:tcp

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
   ...
    qblen = sdslen(c->querybuf); //獲取緩衝區長度
    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen; 
    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); //建立一個sds結構做爲緩衝區
    nread = read(fd, c->querybuf+qblen, readlen); //從套接字中讀取數據到緩衝區中暫存
    ...
    //真正地處理命令
    processInputBufferAndReplicate(c);
}

第二步:分發器的使用

這段代碼建立並往緩衝區中寫入了字節流數據,而後調用processInputBufferAndReplicate()去真正地處理命令。processInputBufferAndReplicate()函數中只是簡單的調用了==processInputBuffer()函數。因爲咱們以前的緩衝區中已經有了客戶端發給服務端的字節流數據,因此咱們須要在這一層進行數據初步的篩選與處理:

void processInputBuffer(client *c) {
    // 若是緩衝區尚未處理完,繼續循環處理
    while(c->qb_pos < sdslen(c->querybuf)) {
         ...
        // 對字節流數據進行定製化分發處理
        if (c->reqtype == PROTO_REQ_INLINE) { //若是是INLINE類型的請求
            if (processInlineBuffer(c) != C_OK) break;  //調用processInlineBuffer解析緩衝區數據
        } else if (c->reqtype == PROTO_REQ_MULTIBULK) {//若是是MULTIBULK類型的請求
            if (processMultibulkBuffer(c) != C_OK) break; //調用processMultibulkBuffer解析緩衝區數據
        } else { 
            serverPanic("Unknown request type");
        }

       // 開始處理具體的命令
        if (c->argc == 0) { //命令參數爲0個,非法
            resetClient(c);
        } else { //命令參數不爲0,合法
            // 調用processCommand()真正處理命令
            if (processCommand(c) == C_OK) { //
                ...
            }
        }
    }
}

看到這裏,讀者可能會有些疑惑。什麼是INLINE、什麼是MULTIBULK?在redis中,有兩種請求命令類型:

  • INLINE類型:簡單字符串格式,如ping命令
  • MULTIBULK類型:字符串數組格式。如set、get等等大部分命令都是這種類型

這個函數其實就是一個分發器。因爲底層的字節流數據是無規則的,因此咱們須要根據客戶端的reqtype字段,去區分請求字節流數據屬於那種請求類型,進而分發到對應的函數中進行處理。因爲咱們常常執行的命令都是MULTIBULK類型,咱們也以MULTIBULK類型爲例。對於set、get這種MULTIBULK請求類型,會被分發到processMultibulkBuffer()函數中進行處理。

第三步:檢查接收緩衝區的數據完整性

在開啓TCP的Nagle算法時,TCP會將多個redis命令請求的數據包合併或者拆分發送。這樣就會出如今一個數據包中命令不完整、或者一個數據包中包含多個命令的狀況。爲了解決這個問題,processMultibulkBuffer()函數保證,當只有在緩衝區中包含一個完整請求時,這個函數纔會成功解析完字節流中的命令參數,並返回成功狀態碼。不然,會break出外部的while循環,等待下一次事件循環再從套接字中讀取剩餘的數據,再進行對命令的解析。這樣就保證了redis協議中的數據的完整性,也保證了實際命令參數的完整性。

int processMultibulkBuffer(client *c) {
    while(c->multibulklen) {
        ...
        /* 讀取命令參數字節流 */
        if (sdslen(c->querybuf)-c->qb_pos < (size_t)(c->bulklen+2)) { //若是$後面表明參數長度的數字與實際命令長度不匹配(+2的位置是\r\n),說明數據不完整,直接跳出循環,等待下一次讀取剩餘數據
            break;
        } else { //命令完整,進行一些執行命令以前的初始化工做
            if (c->qb_pos == 0 && c->bulklen >= PROTO_MBULK_BIG_ARG && sdslen(c->querybuf) == (size_t)(c->bulklen+2)) {
                c->argv[c->argc++] = createObject(OBJ_STRING,c->querybuf); 
                sdsIncrLen(c->querybuf,-2); 
                c->querybuf = sdsnewlen(SDS_NOINIT,c->bulklen+2);
                sdsclear(c->querybuf);
            } else {
                c->argv[c->argc++] =
                    createStringObject(c->querybuf+c->qb_pos,c->bulklen);
                c->qb_pos += c->bulklen+2;
            }
            c->bulklen = -1;
            c->multibulklen--; //處理下一個命令參數
        }
    }
}

第四步:真正地處理命令

咱們回到外層。當咱們成功執行processMultibulkBuffer()函數以後,說明當前命令已經完整,能夠對命令進行處理了。咱們想一下,加入要咱們去設計根據不一樣的命令,調用不一樣的處理函數,從而完成不一樣的功能,咱們應該怎麼作呢?想了想,咱們能夠簡單寫出如下代碼:

if (command == "get") {
    doGetCommand(); //get命令處理函數
} else if (command == "set") {
    doSetCommand(); //set命令處理函數
} else {
    printf("非法命令")
}

以上代碼很是簡單,只是根據咱們獲得的不一樣命令請求,分發到不一樣的命令處理函數中進行定製化處理。那麼redis其實也是一樣的道理,那究竟redis是怎麼作的呢:

int processCommand(client *c) {
    //若是是退出命令直接返回
    if (!strcasecmp(c->argv[0]->ptr,"quit")) { 
        addReply(c,shared.ok);
        c->flags |= CLIENT_CLOSE_AFTER_REPLY;
        return C_ERR;
    }
    //去字典裏查找命令,並把要執行的命令處理函數賦值到c結構體中的cmd字段
    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
    // 返回值校驗
    if (!c->cmd) { //沒有找到該命令
        flagTransaction(c);
        sds args = sdsempty();
        int i;
        for (i=1; i < c->argc && sdslen(args) < 128; i++)
            args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr);
        addReplyErrorFormat(c,"unknown command `%s`, with args beginning with: %s",
            (char*)c->argv[0]->ptr, args);
        sdsfree(args);
        return C_OK;
    } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) || //命令參數不匹配
               (c->argc < -c->cmd->arity)) {
        flagTransaction(c);
        addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
            c->cmd->name);
        return C_OK;
    }
   // 真正執行命令
    if (c->flags & CLIENT_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
    {
        queueMultiCommand(c);
        addReply(c,shared.queued);
    } else { //真正執行命令
        call(c,CMD_CALL_FULL); //核心函數
        c->woff = server.master_repl_offset;
        if (listLength(server.ready_keys))
            handleClientsBlockedOnKeys();
    }
    return C_OK;
}

在這個函數中,最重要的就是lookupCommand()函數和call()函數的調用了。在redis中,全部命令都存儲在一個字典中,這個字典長這樣:

struct redisCommand redisCommandTable[] = {
    {"module",moduleCommand,-2,"as",0,NULL,0,0,0,0,0},
    {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
    {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
    {"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0},
    {"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0},
    {"psetex",psetexCommand,4,"wm",0,NULL,1,1,1,0,0},
    {"append",appendCommand,3,"wm",0,NULL,1,1,1,0,0},
    {"strlen",strlenCommand,2,"rF",0,NULL,1,1,1,0,0},
    {"del",delCommand,-2,"w",0,NULL,1,-1,1,0,0},
    {"unlink",unlinkCommand,-2,"wF",0,NULL,1,-1,1,0,0},
    {"exists",existsCommand,-2,"rF",0,NULL,1,-1,1,0,0},
    {"setbit",setbitCommand,4,"wm",0,NULL,1,1,1,0,0},
    {"getbit",getbitCommand,3,"rF",0,NULL,1,1,1,0,0},
    {"bitfield",bitfieldCommand,-2,"wm",0,NULL,1,1,1,0,0},
    {"setrange",setrangeCommand,4,"wm",0,NULL,1,1,1,0,0},
    {"getrange",getrangeCommand,4,"r",0,NULL,1,1,1,0,0},
    {"substr",getrangeCommand,4,"r",0,NULL,1,1,1,0,0},
    {"incr",incrCommand,2,"wmF",0,NULL,1,1,1,0,0},
    {"decr",decrCommand,2,"wmF",0,NULL,1,1,1,0,0},
    {"mget",mgetCommand,-2,"rF",0,NULL,1,-1,1,0,0},
    {"rpush",rpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
    {"lpush",lpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
    {"rpushx",rpushxCommand,-3,"wmF",0,NULL,1,1,1,0,0},
    {"lpushx",lpushxCommand,-3,"wmF",0,NULL,1,1,1,0,0},
    {"linsert",linsertCommand,5,"wm",0,NULL,1,1,1,0,0},
    {"rpop",rpopCommand,2,"wF",0,NULL,1,1,1,0,0},
    {"lpop",lpopCommand,2,"wF",0,NULL,1,1,1,0,0},
    {"brpop",brpopCommand,-3,"ws",0,NULL,1,-2,1,0,0},
    {"brpoplpush",brpoplpushCommand,4,"wms",0,NULL,1,2,1,0,0},
    {"blpop",blpopCommand,-3,"ws",0,NULL,1,-2,1,0,0},
    {"llen",llenCommand,2,"rF",0,NULL,1,1,1,0,0},
    {"lindex",lindexCommand,3,"r",0,NULL,1,1,1,0,0},
    {"lset",lsetCommand,4,"wm",0,NULL,1,1,1,0,0},
    {"lrange",lrangeCommand,4,"r",0,NULL,1,1,1,0,0},
    {"ltrim",ltrimCommand,4,"w",0,NULL,1,1,1,0,0},
    {"lrem",lremCommand,4,"w",0,NULL,1,1,1,0,0},
    ...
};

咱們能夠看到,這個字典是全部命令的集合,咱們調用lookupCommand就是從這裏獲取命令及命令的相關信息的。它是一個結構體數組,包含全部命令名稱、命令處理函數、參數個數、以及種種標記。其實這裏就至關於一個配置信息的維護,以及命令道處理函數名稱的映射關係,從而很好的解決了咱們一開始使用if-else來分發命令處理函數的難以維護、可擴展性差的問題。
在咱們成功在字典中找到一個命令的處理函數以後,咱們只須要去調用相應的命令處理函數就好啦。上面最後的call()函數中就對相應的命令處理函數進行了調用,並返回調用結果給客戶端。好比,setCommand()就是set命令的實際處理函數:

void setCommand(client *c) {
    int j;
    robj *expire = NULL;
    int unit = UNIT_SECONDS;
    int flags = OBJ_SET_NO_FLAGS;

    for (j = 3; j < c->argc; j++) {
        char *a = c->argv[j]->ptr;
        robj *next = (j == c->argc-1) ? NULL : c->argv[j+1];

        if ((a[0] == 'n' || a[0] == 'N') &&
            (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
            !(flags & OBJ_SET_XX))
        {
            flags |= OBJ_SET_NX;
        } else if ((a[0] == 'x' || a[0] == 'X') &&
                   (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
                   !(flags & OBJ_SET_NX))
        {
            flags |= OBJ_SET_XX;
        } else if ((a[0] == 'e' || a[0] == 'E') &&
                   (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
                   !(flags & OBJ_SET_PX) && next)
        {
            flags |= OBJ_SET_EX;
            unit = UNIT_SECONDS;
            expire = next;
            j++;
        } else if ((a[0] == 'p' || a[0] == 'P') &&
                   (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
                   !(flags & OBJ_SET_EX) && next)
        {
            flags |= OBJ_SET_PX;
            unit = UNIT_MILLISECONDS;
            expire = next;
            j++;
        } else {
            addReply(c,shared.syntaxerr);
            return;
        }
    }

    c->argv[2] = tryObjectEncoding(c->argv[2]);
    setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL);
}

這個函數首先對NX、EX參數進行了判斷及處理,最終調用了setGenericCommand(),來執行set命令的通用邏輯部分:

void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
    long long milliseconds = 0; /* initialized to avoid any harmness warning */

    if (expire) {
        if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != C_OK)
            return;
        if (milliseconds <= 0) {
            addReplyErrorFormat(c,"invalid expire time in %s",c->cmd->name);
            return;
        }
        if (unit == UNIT_SECONDS) milliseconds *= 1000;
    }

    if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db,key) != NULL) ||
        (flags & OBJ_SET_XX && lookupKeyWrite(c->db,key) == NULL))
    {
        addReply(c, abort_reply ? abort_reply : shared.nullbulk);
        return;
    }
    setKey(c->db,key,val);
    server.dirty++;
    if (expire) setExpire(c,c->db,key,mstime()+milliseconds);
    notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);
    if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC,
        "expire",key,c->db->id);
    addReply(c, ok_reply ? ok_reply : shared.ok);
}

最終會調用addReply()通用返回函數,應該是要把執行結果返回給客戶端了。咱們看看該函數裏面作了些什麼:

void addReply(client *c, robj *obj) {
    if (prepareClientToWrite(c) != C_OK) return;

    if (sdsEncodedObject(obj)) {
        if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
            _addReplyStringToList(c,obj->ptr,sdslen(obj->ptr));
    } else if (obj->encoding == OBJ_ENCODING_INT) {
        char buf[32];
        size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
        if (_addReplyToBuffer(c,buf,len) != C_OK)
            _addReplyStringToList(c,buf,len);
    } else {
        serverPanic("Wrong obj->encoding in addReply()");
    }
}

咱們仔細閱讀這段代碼,好像並無找到執行結果是何時返回給客戶端的。在這個函數中,只是將返回結果添加到了輸出緩衝區中,一個命令就執行完了。那麼到底是何時返回的呢?是否還記得在介紹開啓事件循環時,提到函數beforesleep()會在每次事件循環阻塞等待文件事件以前執行,主要執行一些不是很費時的操做,好比過時鍵刪除操做,向客戶端返回命令回覆等。這樣,就能夠減節省返回執行結果時的網絡通訊開銷,將同一個客戶端上的多個命令的屢次返回,對多個命令作一個緩存,最終一次性統一返回,減小了返回的次數,提升了性能。

客戶端到服務端的命令傳輸(響應)

執行完set key1 value1命令以後,咱們獲得了一個"OK"的返回,表明命令執行成功。其實咱們仔細觀察上面返回的第二個數據包,其實底層是一個"+OK"的返回值。那麼爲何要有一個+號呢?由於除了咱們上面講過的set命令,還有get命令、lpush命令等等,他們的返回值都是不同的。get會返回數據集合、lpush會返回一個整數,表明列表的長度等等。一個字符串的表示是遠遠不能知足須要的。因此在redis通訊協議中,一共定義了五種返回值結構。客戶端經過每種返回結構的第一個字符,來判斷是哪一種類型的返回值:

  • 狀態回覆:第一個字符是「+」;例如,SET命令執行完畢會向客戶端返回「+OK\r\n」。
  • 錯誤回覆:第一個字符是「-」;例如,當客戶端請求命令不存在時,會向客戶端返回「-ERR unknown command 'testcmd'」。
  • 整數回覆:第一個字符是「:」;例如,INCR命令執行完畢向客戶端返回「:100\r\n」。
  • 批量回復:第一個字符是「$」;例如,GET命令查找鍵向客戶端返回結果「$5\r\nhello\r\n」,其中$5表示返回字符串長度。
  • 多條批量回復:第一個字符是「」;例如,LRANGE命令可能會返回多個多個值,格式爲「3\r\n$6\r\nvalue1\r\n$6rnvalue2rn$6\r\nvalue3\r\n」,與命令請求協議格式相同,「\*3」表示返回值數目,「$6」表示當前返回值字符串長度,多個返回值用「\r\n」分隔開。

咱們執行set命令就是第一種類型,即狀態回覆。客戶端經過+號,就可以知道這是狀態回覆,從而就知道該如何讀取後面的字節流內容了。

總結

至此,咱們就走完了一個redis命令的完整生命週期,同時也瞭解了redis通訊協議的格式與規範。接下來,我將會深刻每個命令的實現,你們加油。

參考資料

相關文章
相關標籤/搜索