Memcached學習(二)--命令解析

總體流程

1. 當客戶端和Memcached創建TCP鏈接後,Memcached會基於Libevent的event事件來監聽客戶端是否有能夠讀取的數據。數組

2. 當客戶端有命令數據報文上報的時候,就會觸發drive_machine方法中的conn_read這個Case。緩存

3. memcached經過try_read_network方法讀取客戶端的報文。若是讀取失敗,則返回conn_closing,去關閉客戶端的鏈接;若是沒有讀取到任何數據,則會返回conn_waiting,繼續等待客戶端的事件到來,而且退出drive_machine的循環;若是數據讀取成功,則會將狀態轉交給conn_parse_cmd處理,讀取到的數據會存儲在c->rbuf容器中。網絡

4. conn_parse_cmd主要的工做就是用來解析命令。主要經過try_read_command這個方法來讀取c->rbuf中的命令數據,經過\n來分隔數據報文的命令。若是c->buf內存塊中的數據匹配不到\n,則返回繼續等待客戶端的命令數據報文到來conn_waiting;不然就會轉交給process_command方法,來處理具體的命令(命令解析會經過\0符號來分隔)。app

5. process_command主要用來處理具體的命令。其中tokenize_command這個方法很是重要,將命令拆解成多個元素(KEY的最大長度250)。例如咱們以get命令爲例,最終會跳轉到process_get_command這個命令 process_*_command這一系列就是處理具體的命令邏輯的。socket

6. 咱們進入process_get_command,當獲取數據處理完畢以後,會轉交到conn_mwrite這個狀態。若是獲取數據失敗,則關閉鏈接。memcached

7. 進入conn_mwrite後,主要是經過transmit方法來向客戶端提交數據。若是寫數據失敗,則關閉鏈接或退出drive_machine循環;若是寫入成功,則又轉交到conn_new_cmd這個狀態。函數

8. conn_new_cmd這個狀態主要是處理c->rbuf中剩餘的命令。主要看一下reset_cmd_handler這個方法,這個方法回去判斷c->rbytes中是否還有剩餘的報文沒處理,若是未處理,則轉交到conn_parse_cmd(第四步)繼續解析剩餘命令;若是已經處理了,則轉交到conn_waiting,等待新的事件到來。在轉交以前,每次都會執行一次conn_shrink方法。ui

9. conn_shrink方法主要用來處理命令報文容器c->rbuf和輸出內容的容器是否數據滿了?是否須要擴大buffer的大小,是否須要移動內存塊。接受命令報文的初始化內存塊大小2048,最大8192。spa

 

狀態變遷

狀態機drive_machine函數是worker線程網絡請求進行業務邏輯處理的核心。命令行

它的實現方式是:

  一個while循環裏面有一個巨大的switch case,根據鏈接對象 conn當前的鏈接狀態conn_state,進入不一樣的case,而每一個case可能會改變conn的鏈接狀態,也就是說在這個while+switch中,conn會不斷的發生狀態轉移,最後被分發到合適的case上做處理。能夠理解爲,這裏是一個有向圖,每一個case是一個頂點,有些case經過改變conn對象的鏈接狀態讓程序在下一次循環中進入另外一個case,幾回循環後程序最終進入到「無出度的頂點」而後結束狀態機,這裏的無出度的頂點就是帶設置stop=true的case分支。

看下大概的代碼結構:

 1 static void drive_machine(conn *c) {
 2     while (!stop) {
 3         switch(c->state) {
 4            case conn_listening: 
 5                  //......
 6            case conn_waiting:
 7                 //......
 8                 stop = true; break;
 9                //......
10         }
11    }
12 }

 

  主線程狀態機的行爲咱們已經知道了,永遠只會是conn_listening狀態,永遠只會進入drive_machine的conn_listening分支,accept鏈接把client fd 經過dispatch_conn_new函數分發給worker線程。

  下面咱們來看一下worker線程執行狀態機:

  當主線程調用dispatch_conn_new的時候,worker線程建立conn對象,初始狀態爲conn_new_cmd。因此當有worker線程監聽的client fd有請求過來時,例如客戶端發了一行命令(set xxx\r\n)會進入conn_new_cmd分支:

 1 case conn_new_cmd:
 2             /*
 3              這裏的reqs是請求的意思,其實叫「命令」更準確。一次event發生,有可能包含多個命令,
 4              從client fd裏面read到的一次數據,不能保證這個數據只是包含一個命令,有多是多個
 5              命令數據堆在一塊兒的一次事件通知。這個nreqs是用來控制一次event最多能處理多少個命令。
 6             */
 7             --nreqs;
 8             if (nreqs >= 0) {
 9                 /**
10                 準備執行命令。爲何叫reset cmd,reset_cmd_handler其實作了一些解析執行命令以前
11                 的初始化動下一個,都會從新進入這個case做。而像上面說的,一次event有可能有多個命令,每執行一個命令,若是還有
12                  conn_new_cmd,reset一下再執行下一個命令。
13                 */
14                 reset_cmd_handler(c);
15             } else {
16                //......
17             }
18             break;

 

  當client fd第一次有請求過來的時候,會進入reset_cmd_handler函數:

 1 static void reset_cmd_handler(conn *c) {
 2     c->cmd = -1;
 3     c->substate = bin_no_state;
 4     if(c->item != NULL) {
 5         item_remove(c->item);
 6         c->item = NULL;
 7     }
 8     conn_shrink(c); 
 9  
10  //第一次有請求過來觸發到此函數時,c->rbytes爲0
11     if (c->rbytes > 0) {
12         conn_set_state(c, conn_parse_cmd);
13     } else {
14         conn_set_state(c, c
15             onn_waiting);  //第一次請求進入此分支
16     }
17 }

 

  咱們在conn_new函數裏面把c->rbytes被始化爲0,而直至此咱們也沒有看到這個c->rbytes有被從新賦新值,因此其實第一次有請求過來,這個值仍是0,因此進入else分支,即執行conn_set_state(c,conn_waiting);而後從新回到狀態機執行下一次循環,進入conn_waiting分支:

 

1 case conn_waiting:
2     if (!update_event(c, EV_READ | EV_PERSIST)) {
3         //。。。
4     }
5     conn_set_state(c, conn_read);
6     stop = true;
7     break;

 

 在conn_waiting分支你會發現,這裏的代碼僅僅是把狀態改變conn_read而後就stop=true,結束狀態機了!沒錯,退出while循環了!此次事件觸發就此結束了!你會以爲很奇怪,我客戶端明明發了一個請求,(set xxx\r\n),你什麼都沒處理就只是把鏈接狀態改爲conn_read就完事了?!沒錯,至少這一次狀態機的執行行爲是這樣!

  究竟是怎麼回事?其實這裏是利用了一點:libevent的epoll默認是「水平觸發」!也就是說,客戶端發來一個set xxx\r\n,我這邊一天沒有read,epoll還會有下一次通知,也就是說,這個請求有兩次事件通知!第一次通知的做用僅是爲了把鏈接狀態改成conn_read! 當worker線程由於同一個client fd同一個請求收到第二次通知的時候,再次執行狀態機,而後進入conn_read分支。

 

 1 //讀取事件
 2 //例若有用戶提交數據過來的時候,工做線程監聽到事件後,最終會調用這塊代碼
 3  //讀取數據的事件,當客戶端有數據報文上傳的時候,就會觸發libevent的讀事件
 4  case conn_read:
 5        //try_read_network 主要讀取TCP數據
 6        //返回try_read_result的枚舉類型結構,經過這個枚舉類型,來判斷是否已經讀取到數據,是否讀取失敗等狀況
 7         res = IS_UDP(c->transport) ? try_read_udp(c) :try_read_network(c);
 8 
 9          switch (res) {
10             //沒有讀取到數據,那麼繼續將事件設置爲等待。
11              //while(stop)會繼續循環,去調用conn_waiting這個case
12              case READ_NO_DATA_RECEIVED:
13                  conn_set_state(c, conn_waiting);
14                  break;
15                  //若是有數據讀取到了,這個時候就須要調用conn_parse_cmd邏輯
16                  //conn_parse_cmd:主要用來解析讀取到的命令
17              case READ_DATA_RECEIVED:
18                  conn_set_state(c, conn_parse_cmd);
19                  break;
20                  //讀取失敗的狀態,則直接調用conn_closing 關閉客戶端的鏈接
21              case READ_ERROR:
22                  conn_set_state(c, conn_closing);
23                  break;
24              case READ_MEMORY_ERROR: /* Failed to allocate more memory */
25                  /* State already set by try_read_network */
26                  break;
27              }
28              break;

 

 

  進入conn_read此時才調用函數try_read_network函數讀出請求(set xxx\r\n)。讀取到的數據會放進c->rbuf的buf中。若是buf沒有空間存儲更多數據的時候,就會觸發內存塊的從新分配。從新分配,memcached限制了4次,估計是擔心客戶端的惡意攻擊致使存儲命令行數據報文的buf不斷的realloc。

 1 //這個方法是經過TCP的方式讀取客戶端傳遞過來的命令數據
 2 static enum try_read_result try_read_network(conn *c) {
 3     //這個方法會最終返回try_read_result的枚舉類型
 4     //默認設置READ_NO_DATA_RECEIVED:沒有接受到數據
 5     enum try_read_result gotdata = READ_NO_DATA_RECEIVED;
 6     int res;
 7     int num_allocs = 0;
 8     assert(c != NULL);
 9  
10     //c->rcurr 存放未解析命令內容指針   c->rbytes 還有多少沒解析過的數據
11     //c->rbuf 用於讀取命令的buf,存儲命令字符串的指針  c->rsize rbuf的size
12     //這邊每次都會將前一次剩餘的命令報文,移動到c->rbuf的頭部。
13     if (c->rcurr != c->rbuf) {
14         if (c->rbytes != 0) /* otherwise there's nothing to copy */
15             memmove(c->rbuf, c->rcurr, c->rbytes);
16         c->rcurr = c->rbuf;
17     }
18     //循環從fd中讀取數據
19     while (1) {
20         //若是buf滿了,則須要從新分配一塊更大的內存
21         //當未解析的數據size 大於等於 buf塊的size,則須要從新分配
22         if (c->rbytes >= c->rsize) {
23             //最多分配4次
24             if (num_allocs == 4) {
25                 return gotdata;
26             }
27             ++num_allocs;
28             //重新分配一塊新的內存塊,內存大小爲rsize的兩倍
29             char *new_rbuf = realloc(c->rbuf, c->rsize * 2);
30             if (!new_rbuf) {
31                 STATS_LOCK();
32                 stats.malloc_fails++;
33                 STATS_UNLOCK();
34                 if (settings.verbose > 0) {
35                     fprintf(stderr, "Couldn't realloc input buffer\n");
36                 }
37                 c->rbytes = 0; /* ignore what we read */
38                 out_of_memory(c, "SERVER_ERROR out of memory reading request");
39                 c->write_and_go = conn_closing;
40                 return READ_MEMORY_ERROR;
41             }
42             //c->rcurr和c->rbuf指向到新的buf塊
43             c->rcurr = c->rbuf = new_rbuf;
44             c->rsize *= 2; //rsize則乘以2
45         }
46  
47         //avail能夠計算出buf塊中剩餘的空間多大
48         int avail = c->rsize - c->rbytes;
49  
50         //這邊咱們能夠看到Socket的讀取方法
51         //c->sfd爲Socket的ID
52         //c->rbuf + c->rbytes 意思是從buf塊中空餘的內存地址開始存放新讀取到的數據
53         //avail 每次接收最大能讀取多大的數據
54         res = read(c->sfd, c->rbuf + c->rbytes, avail);
55  
56         //若是接受到的結果res大於0,則說明Socket中讀取到了數據
57         //設置成READ_DATA_RECEIVED枚舉類型,代表讀取到了數據
58         if (res > 0) {
59             pthread_mutex_lock(&c->thread->stats.mutex); //線程鎖
60             c->thread->stats.bytes_read += res;
61             pthread_mutex_unlock(&c->thread->stats.mutex);
62             gotdata = READ_DATA_RECEIVED;
63             c->rbytes += res; //未處理的數據量 + 當前讀取到的命令size
64             if (res == avail) {
65                 continue;
66             } else {
67                 break;
68             }
69         }
70         //判斷讀取失敗的兩種狀況
71         if (res == 0) {
72             return READ_ERROR;
73         }
74         if (res == -1) {
75             if (errno == EAGAIN || errno == EWOULDBLOCK) {
76                 break;
77             }
78             return READ_ERROR;
79         }
80     }
81     return gotdata;
82 }

 

 

  try_read_network函數就是從socket中把數據讀到c->rbuf中去而已,同時初始化一些變量例如rbytes等,讀取數據成功則返回READ_DATA_RECEIVED,狀態機 conn_set_state(c, conn_parse_cmd);進入conn_parse_cmd狀態:

 

 1  case conn_parse_cmd :
 2       /**
 3       try_read_network後,到達conn_parse_cmd狀態,但try_read_network並不確保每次到達
 4       的數據都足夠一個完整的cmd(ascii協議狀況下每每是沒有"\r\n",即回車換行),
 5       因此下面的try_read_command之因此叫try就是這個緣由,
 6       當讀到的數據還不夠成爲一個cmd的時候,返回0,conn繼續進入conn_waiting狀態等待更多的數據到達。
 7       */
 8       if (try_read_command(c) == 0) {
 9           /* wee need more data! */
10           conn_set_state(c, conn_waiting);
11       }
12       break;

 

  進行conn_parse_cmd主要是調用try_read_command函數讀取命令,上面註釋也說明了數據不夠一個cmd的狀況,下面咱們進入try_read_command,看看try_read_command不返回0時,也就是足夠一個cmd後是怎麼解析這個cmd的。

 

//若是咱們已經在c->rbuf中有能夠處理的命令行了,則就能夠調用此函數來處理命令解析
static int try_read_command(conn *c) {
    //......省略部分代碼
    //有兩種模式,是不是二進制模式仍是ascii模式
    if (c->protocol == binary_prot) {
        //更多代碼
    } else {
        //這邊主要處理非二進制模式的命令解析
        char *el, *cont;
 
        //若是c->rbytes==0 表示buf容器中沒有能夠處理的命令報文,則返回0
        //0 是讓程序繼續等待接收新的客戶端報文
        if (c->rbytes == 0)
            return 0;
        //查找命令中是否有\n,memcache的命令經過\n來分割
        //當客戶端的數據報文過來的時候,Memcached經過查找接收到的數據中是否有\n換行符來判斷收到的命令數據包是否完整
        //例如命令:set username 10234344 \n get username \n
        //這個命令就能夠分割成兩個命令,分別是set和get的命令
        //el返回\n的字符指針地址
        el = memchr(c->rcurr, '\n', c->rbytes);
 
        //若是沒有找到\n,說明命令不完整,則返回0,繼續等待接收新的客戶端數據報文
        if (!el) {
            //c->rbytes是接收到的數據包的長度
            //這邊很是有趣,若是一次接收的數據報文大於了1K,則Memcached回去判斷這個請求是否太大了,是否有問題?
            //而後會關閉這個客戶端的連接
            if (c->rbytes > 1024) {
                /*
                 * We didn't have a '\n' in the first k. This _has_ to be a
                 * large multiget, if not we should just nuke the connection.
                 */
                char *ptr = c->rcurr;
                while (*ptr == ' ') { /* ignore leading whitespaces */
                    ++ptr;
                } 
                if (ptr - c->rcurr > 100
                        || (strncmp(ptr, "get ", 4) && strncmp(ptr, "gets ", 5))) {
 
                    conn_set_state(c, conn_closing);
                    return 1;
                }
            }
            return 0;
        }
        //若是找到了\n,說明c->rcurr中有完整的命令了
        cont = el + 1; //下一個命令開始的指針節點
        //這邊判斷是不是\r\n,若是是\r\n,則el往前移一位
        if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
            el--;
        }
        //而後將命令的最後一個字符用 \0(字符串結束符號)來分隔
        *el = '\0';
        assert(cont <= (c->rcurr + c->rbytes));
        c->last_cmd_time = current_time; //最後命令時間
        //處理命令,c->rcurr就是命令
        process_command(c, c->rcurr);
        c->rbytes -= (cont - c->rcurr); //這個地方爲什麼不這樣寫?c->rbytes = c->rcurr - cont
        c->rcurr = cont; //將c->rcurr指向到下一個命令的指針節點
 
        assert(c->rcurr <= (c->rbuf + c->rsize));
    }
    return 1;
}

 

  

  上面try_read_command把命令讀出(其實只是簡單地找出一個完整的命令,在後面加個\0而已)。

  在這裏插一下memcached的SET命令的協議,或者你能夠看memcached/doc/protocol.txt中的說明:

  完成一個SET命令,其實須要兩行,也就是須要按兩次回車換行「\r\n」,第一行叫「命令行」,格式是SET key flags exptime bytes\r\n,如SET name 0 0 5\r\n, 鍵爲name,flags標誌位可暫時無論,超時設爲0,value的字節長度是4。而後纔有第二行叫「數據行」,格式爲:value\r\n,例如:calix\r\n。這兩行分別敲下去,SET命令纔算完成。

  因此處理SET命令時上面的try_read_command首先處理的是SET name 0 0 5\r\n這個「命令行」。

  看看進入process_command函數如何執行:

 

 1 /**
 2 這裏就是對命令的解析和執行了
 3 (其實準確來講,這裏只是執行了命令的一半(例如若是是SET命令,則是「命令行」部分),
 4 而後根據命令類型再次改變conn_state使程序再次進入狀態機,完成命令的
 5 另外一半工做,後面詳說)
 6 command此時的指針值等於conn的rcurr
 7 */
 8 static void process_command(conn *c, char *command) {
 9     token_t tokens[MAX_TOKENS];
10     size_t ntokens;
11     int comm; //命令類型
12     c->msgcurr = 0;
13     c->msgused = 0;
14     c->iovused = 0;
15     if (add_msghdr(c) != 0) {
16         out_of_memory(c, "SERVER_ERROR out of memory preparing response");
17         return;
18     }
19     /**
20     下面這個tokenize_command是一個詞法分析,把command分解成一個個token
21     */
22     ntokens = tokenize_command(command, tokens, MAX_TOKENS);
23     //下面是對上面分解出來的token再進行語法分析,解析命令,下面的comm變量爲最終解析出來命令類型
24     if (ntokens >= 3 &&
25         ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
26          (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {
27         process_get_command(c, tokens, ntokens, false);
28     } else if ((ntokens == 6 || ntokens == 7) &&
29                ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
30                 (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
31                 (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
32                 (strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||
33                 (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) {
34         //add/set/replace/prepend/append爲「更新」命令,調用同一個函數執行命令。詳見process_update_command定義處
35         process_update_command(c, tokens, ntokens, comm, false);
36     }
37    //...... 
38 }

 

  process_command 方法中調用了tokenize_command方法來分解命令。例如:set username zhuli\n 則會分解成三個元素:set和username和zhuli這三個元素。

 

//拆分命令方法
static size_t tokenize_command(char *command, token_t *tokens,
        const size_t max_tokens) {
    char *s, *e;
    size_t ntokens = 0; //命令參數遊標
    size_t len = strlen(command); //命令長度
    unsigned int i = 0;
 
    assert(command != NULL && tokens != NULL && max_tokens > 1);
 
    s = e = command;
    for (i = 0; i < len; i++) {
        //指針不停往前走,若是遇到空格,則會停下來,將命令元素拆分出來,放進tokens這個數組中
        if (*e == ' ') {
            if (s != e) {
                tokens[ntokens].value = s;
                tokens[ntokens].length = e - s;
                ntokens++;
                //這邊將空格替換成\0
                //Memcached這邊的代碼寫的很是的好,這邊的命令進行切割的時候,並無將內存塊進行拷貝,而是在原來的內存塊上進行切割
                *e = '\0';
                //最多8個元素
                if (ntokens == max_tokens - 1) {
                    e++;
                    s = e; /* so we don't add an extra token */
                    break;
                }
            }
            s = e + 1;
        }
        e++;
    }
 
    if (s != e) {
        tokens[ntokens].value = s;
        tokens[ntokens].length = e - s;
        ntokens++;
    }
 
    /*
     * If we scanned the whole string, the terminal value pointer is null,
     * otherwise it is the first unprocessed character.
     */
    tokens[ntokens].value = *e == '\0' ? NULL : e;
    tokens[ntokens].length = 0;
    ntokens++;
    //返回值爲參數個數,例如分解出3個元素,則返回3
    return ntokens;
}

 

 

  上面的代碼能夠看出首先咱們要對命令進行「解析」,詞法語法分析等等,最終咱們的set name 0 0 5\r\n命令會進入process_update_command函數中執行:

static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas) {
    if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
        out_string(c, "CLIENT_ERROR bad command line format"); //key過長,out_string函數的做用是輸出響應,
        //詳見out_string定義處
        return;
    }
    key = tokens[KEY_TOKEN].value; //鍵名
    nkey = tokens[KEY_TOKEN].length; //鍵長度
    //下面這個if同時把命令相應的參數(如緩存超時時間等)賦值給相應變量:exptime_int等
    if (! (safe_strtoul(tokens[2].value, (uint32_t *)&flags)
           && safe_strtol(tokens[3].value, &exptime_int)
           && safe_strtol(tokens[4].value, (int32_t *)&vlen))) {
        out_string(c, "CLIENT_ERROR bad command line format");
        return;
    }
    exptime = exptime_int;
    if (exptime < 0)
        exptime = REALTIME_MAXDELTA + 1;
    //在這裏執行內存分配工做。詳見內存管理篇
    it = item_alloc(key, nkey, flags, realtime(exptime), vlen); 
    ITEM_set_cas(it, req_cas_id);
    c->item = it; //將item指針指向分配的item空間
    c->ritem = ITEM_data(it); //將 ritem 指向 it->data中要存放 value 的空間地址
    c->rlbytes = it->nbytes; //data的大小
    c->cmd = comm; //命令類型
    conn_set_state(c, conn_nread); //繼續調用狀態機,執行命令的另外一半工做。
}

 

  process_update_command函數最終執行了item_alloc爲咱們要set的數據(稱爲item)分配了內存。同時,爲c對象賦了相應的一些值。

  可是其實這裏僅僅是爲item分配了空間,尚未把value塞進去,由於咱們僅僅執行了SET命令的「命令行「部分,根據「命令行」部分的信息分配空間。代碼最後一行看到在這裏,咱們又把c的狀態變成了conn_nread,等「數據行」達到,epoll事件觸發狀態機下一次循環進入conn_nread分支,其實就是完成SET命令的第二部分,讀出「數據行」:

case conn_nread:
     /**
    由process_update_command執行後進入此狀態,process_update_command函數只執行了add/set/replace 等命令的一半,
    剩下的一半由這裏完成。
    例如若是是上面的set命令,process_update_command只完成了「命令行」部分,分配了item空間,
    但尚未把value塞到對應的 item中去。所以,在這一半要完成的動做就是把value的數據從socket中讀出來,
    塞到剛拿到的item空間中去
    */
    /*
    下面的rlbytes字段表示要讀的「value數據」還剩下多少字節 (注意與"rbytes"的區別)
    若是是第一次由process_update_command進入到此,rlbytes此時在process_update_command中被初始化爲item->nbytes, 
    即value的總字節數,SET name 0 0 5\r\n中的5。
    */
    if (c->rlbytes == 0) {
        /**
        注意rlbytes爲0纔讀完,不然狀態機一直會進來這個conn_nread分支繼續讀value數據,
        讀完就調用complete_nread完成收尾工做,程序會跟着complete_nread進入下一個
        狀態。因此執行完complete_nread會break; 
        */
        complete_nread(c);
        break;
    }
    //若是還有數據沒讀完,繼續往下執行。可知,下面的動做就是繼續從buffer中讀value數據往item中的data的value位置塞。

    if (c->rbytes > 0) { 
        /**
         進入到這個if,是由於有可能先前讀到的buffer已經有「數據行」部分,由於一次事件通知,
         不保證socket可讀數據只有一個\r\n。
       */
        /**
        取rbytes與rlbytes中最小的值。
        爲啥?
        由於這裏咱們的目的是剩下的還沒讀的value的字節,而rlbytes表明的是還剩下的字節數
        若是rlbytes比rbytes小,只讀rlbytes長度就夠了,rbytes中多出來的部分不是咱們這個時候想要的
        若是rbytes比rlbytes小,即便你要rlbytes這麼多,但buffer中沒有這麼多給你讀。
        */
        int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;
        if (c->ritem != c->rcurr) {
            memmove(c->ritem, c->rcurr, tocopy); //往分配的item中塞,即爲key設置value的過程
        }
        c->ritem += tocopy;
        c->rlbytes -= tocopy;
        c->rcurr += tocopy;
        c->rbytes -= tocopy;
        if (c->rlbytes == 0) {
            break;
        }
    }
    //這裏每每是咱們先前讀到buffer的數據還沒足夠的狀況下,從socket中讀。
    res = read(c->sfd, c->ritem, c->rlbytes);//往分配的item中塞,即爲key設置value的過程
    if (res > 0) {
        if (c->rcurr == c->ritem) {
            c->rcurr += res;
        }
        c->ritem += res;
        c->rlbytes -= res;
        break;
    }

 

 

  上面主要經過這一行 res = read(c->sfd, c->ritem, c->rlbytes); 把value塞到剛分配出來的item空間,完成「數據行」部分的工做,邏輯上就是對key「賦值」。賦值結束後,調用complete_nread作一些收尾的工做。

static void complete_nread(conn *c) {
//......
        complete_nread_ascii(c);
//......
}
 
static void complete_nread_ascii(conn *c) {
     ret = store_item(it, comm, c);
     switch (ret)
     {
      case STORED:
          out_string(c, "STORED");
          break;
      //......
      }
    //......
}
 
static void out_string(conn *c, const char *str) {
    size_t len;
    c->msgcurr = 0;
    c->msgused = 0;
    c->iovused = 0;
    add_msghdr(c);
    len = strlen(str);
 
    memcpy(c->wbuf, str, len);
    memcpy(c->wbuf + len, "\r\n", 2);
    c->wbytes = len + 2;
    c->wcurr = c->wbuf;
 
    conn_set_state(c, conn_write);
    c->write_and_go = conn_new_cmd;
    return;
}

 

進入狀態機conn_write狀態進行輸出:

 case conn_write:
           //......
           /* fall through... */
        case conn_mwrite:
              transmit(c);
           //......
 
 
static enum transmit_result transmit(conn *c) {
    //......
    res = sendmsg(c->sfd, m, 0);
   //......
}

 

最後經過調用sendmsg把咱們的」STORED」字符串響應給客戶端。

附上處理 SET 命令狀態機的狀態轉換圖

 

相關文章
相關標籤/搜索