memcached源代碼分析-----set命令處理流程


        轉載請註明出處:http://blog.csdn.net/luotuo44/article/details/44236591git



        前一篇博文以get命令爲樣例把整個處理流程簡單講述了一遍。本篇博文將以set命令具體講述memcached的處理流程。github

具體的命令爲「set tt 3 0 10」。並若是固然memcachedserver沒有名爲tt的item。數組



讀取命令:

        在前一篇博文的最後,conn的狀態被設置爲conn_new_cmd,回到了一開始的狀態。網絡

假設此時conn結構體裏面的buff還有其它命令,或者該client的socket緩衝區裏面還有數據(命令),那麼就會繼續處理命令而不會退出drive_machine函數。處理完後。又會回到conn_new_cmd狀態。app

      《半同步半異步網絡模型》指明瞭memcached是經過worker線程運行client的命令,並且一個worker線程要處理多個client的命令。假設某一個惡意的client發送了大量的get命令,那麼worker線程將不斷地反覆前一篇博文講述的處理流程。換言之,worker線程將困死在drive_machine裏面不能出來。這形成的後果是致使該worker線程負責的其它client處於飢餓狀態,因爲它們的命令得不處處理(要退出drive_machine才幹知道其它client也發送了命令,進而進行處理)。less

        爲了不client發現飢餓現象,memcached的解決方法是:worker線程連續處理某一個client的命令數不能超過一個特定值。異步

這個特定值由全局變量settings.reqs_per_event肯定(默認值是20), 可以在啓動memcached的時候經過命令行參數設置。具體參考《memcached啓動參數具體解釋以及關鍵配置的默認值》。socket


static void drive_machine(conn *c) {
    bool stop = false;
    int nreqs = settings.reqs_per_event;//20

    assert(c != NULL);

	//drive_machine被調用會進行狀態推斷,並進行一些處理。但也可能發生狀態的轉換
	//此時就需要一個循環,當進行狀態轉換時。也能處理
    while (!stop) {

        switch(c->state) {

        case conn_new_cmd:
			
            --nreqs;
            if (nreqs >= 0) {
				//假設該conn的讀緩衝區沒有數據,那麼將狀態改爲conn_waiting
				//假設該conn的讀緩衝區有數據,  那麼將狀態改爲conn_pase_cmd
                reset_cmd_handler(c);
            } else {

                if (c->rbytes > 0) {
                    /* We have already read in data into the input buffer,
                       so libevent will most likely not signal read events
                       on the socket (unless more data is available. As a
                       hack we should just put in a request to write data,
                       because that should be possible ;-)
                    */
                    if (!update_event(c, EV_WRITE | EV_PERSIST)) {
                        if (settings.verbose > 0)
                            fprintf(stderr, "Couldn't update event\n");
                        conn_set_state(c, conn_closing);
                        break;
                    }
                }
                stop = true;
            }
            break;

        }
    }

    return;
}

        從上面代碼可以得知。假設某個client的命令數過多,會被memcached強制退出drive_mahcine。假設該client的socket裏面還有數據並且是libevent是水平觸發的,那麼libevent會本身主動觸發事件,能再次進入drive_mahcine函數。ide

但假設該client的命令都讀進conn結構體的讀緩衝區,那麼就必須等到client再次發送命令,libevent纔會觸發。memcached

但client一直再也不發送命令了呢?爲了解決問題,memcached採用了一種很是巧妙的處理方法:爲這個clientsocket設置可寫事件。除非clientsocket的寫緩衝區已滿,不然libevent都會爲這個client觸發事件。事件一觸發。那麼worker線程就會進入drive_machine函數處理這個client的命令。

 

 

        固然咱們若是nreqs大於0,而後看一下reset_cmd_handler函數。該函數會推斷conn的讀緩衝區是否還有數據。此外,該函數另外一個關鍵的數據:調節conn緩衝區的大小。前一篇博文已經說到,memcached會盡量把clientsocket裏面的數據讀入conn的讀緩衝區。這樣的特性會撐大conn的讀緩衝區。除了讀緩衝區,用於回寫數據的iovec和msghdr數組也會被撐大,這也要收縮。因爲是在處理完一條命令後才進行的收縮,因此收縮不會致使數據的丟失。

        寫緩衝區呢?不需要收縮寫緩衝區嗎。conn結構體也是有寫緩衝區的啊?這是因爲寫緩衝區不會被撐大。從前一篇博文的迴應命令可以知道,迴應命令時並無使用到寫緩衝區。寫緩衝區是在向client返回錯誤信息時纔會用到的,而錯誤信息不會太大。也就不會撐大寫緩衝區了。

struct conn {
    int    sfd;//該conn相應的socket fd
    sasl_conn_t *sasl_conn;
    bool authenticated;
    enum conn_states  state;//當前狀態
    enum bin_substates substate;
    rel_time_t last_cmd_time;
    struct event event;//該conn相應的event
    short  ev_flags;//event當前監聽的事件類型
    short  which;   /** which events were just triggered */ //觸發event回調函數的緣由

	//讀緩衝區
    char   *rbuf;   /** buffer to read commands into */ 
	//有效數據的開始位置。從rbuf到rcurr之間的數據是已經處理的了。變成無效數據了
    char   *rcurr;  /** but if we parsed some already, this is where we stopped */
	//讀緩衝區的長度
    int    rsize;   /** total allocated size of rbuf */
	//有效數據的長度
    int    rbytes;  /** how much data, starting from rcur, do we have unparsed */

    char   *wbuf;
    char   *wcurr;
    int    wsize;
    int    wbytes;
    /** which state to go into after finishing current write */
    enum conn_states  write_and_go;
    void   *write_and_free; /** free this memory after finishing writing */

	//數據直通車
	char   *ritem;  /** when we read in an item's value, it goes here */
    int    rlbytes;

    /* data for the nread state */

    /**
     * item is used to hold an item structure created after reading the command
     * line of set/add/replace commands, but before we finished reading the actual
     * data. The data is read into ITEM_data(item) to avoid extra copying.
     */

    void   *item;     /* for commands set/add/replace  */

    /* data for the swallow state */
    int    sbytes;    /* how many bytes to swallow */

    /* data for the mwrite state */
	//ensure_iov_space函數會擴大數組長度.如下的msglist數組所使用到的
	//iovec結構體數組就是iov指針所指向的。因此當調用ensure_iov_space
	//分配新的iovec數組後,需要又一次調整msglist數組元素的值。

這個調整 //也是在ensure_iov_space函數裏面完畢的 struct iovec *iov;//iovec數組指針 //數組大小 int iovsize; /* number of elements allocated in iov[] */ //已經使用的數組元素個數 int iovused; /* number of elements used in iov[] */ //因爲msghdr結構體裏面的iovec結構體數組長度是有限制的。因此爲了能 //傳輸不少其它的數據,僅僅能添加msghdr結構體的個數.add_msghdr函數負責添加 struct msghdr *msglist;//msghdr數組指針 //數組大小 int msgsize; /* number of elements allocated in msglist[] */ //已經使用了的msghdr元素個數 int msgused; /* number of elements used in msglist[] */ //正在用sendmsg函數傳輸msghdr數組中的哪個元素 int msgcurr; /* element in msglist[] being transmitted now */ //msgcurr指向的msghdr總共同擁有多少個字節 int msgbytes; /* number of bytes in current msg */ //worker線程需要佔有這個item。直至把item的數據都寫回給client了 //故需要一個item指針數組記錄本conn佔有的item item **ilist; /* list of items to write out */ int isize;//數組的大小 item **icurr;//當前使用到的item(在釋放佔用item時會用到) int ileft;//ilist數組中有多少個item需要釋放 enum protocol protocol; /* which protocol this connection speaks */ enum network_transport transport; /* what transport is used by this connection */ bool noreply; /* True if the reply should not be sent. */ /* current stats command */ ... conn *next; /* Used for generating a list of conn structures */ LIBEVENT_THREAD *thread;//這個conn屬於哪一個worker線程 }; static void reset_cmd_handler(conn *c) { c->cmd = -1; c->substate = bin_no_state; if(c->item != NULL) {//conn_new_cmd狀態下,item爲NULL item_remove(c->item); c->item = NULL; } conn_shrink(c); if (c->rbytes > 0) {//讀緩衝區裏面有數據 conn_set_state(c, conn_parse_cmd);//接着去解析讀到的數據 } else { conn_set_state(c, conn_waiting);//不然等待數據的到來 } } #define DATA_BUFFER_SIZE 2048 /** Initial size of list of items being returned by "get". */ #define ITEM_LIST_INITIAL 200 /** Initial size of list of CAS suffixes appended to "gets" lines. */ #define SUFFIX_LIST_INITIAL 20 /** Initial size of the sendmsg() scatter/gather array. */ #define IOV_LIST_INITIAL 400 /** Initial number of sendmsg() argument structures to allocate. */ #define MSG_LIST_INITIAL 10 /** High water marks for buffer shrinking */ #define READ_BUFFER_HIGHWAT 8192 #define ITEM_LIST_HIGHWAT 400 #define IOV_LIST_HIGHWAT 600 #define MSG_LIST_HIGHWAT 100 //收縮到初始大小 static void conn_shrink(conn *c) { assert(c != NULL); if (IS_UDP(c->transport)) return; //c->rbytes指明瞭當前讀緩衝區有效數據的長度。當其小於DATA_BUFFER_SIZE //才進行讀緩衝區收縮。因此不會致使client命令數據的丟失。

if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) { char *newbuf; if (c->rcurr != c->rbuf) memmove(c->rbuf, c->rcurr, (size_t)c->rbytes); newbuf = (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE); if (newbuf) { c->rbuf = newbuf; c->rsize = DATA_BUFFER_SIZE; } /* TODO check other branch... */ c->rcurr = c->rbuf; } if (c->isize > ITEM_LIST_HIGHWAT) { item **newbuf = (item**) realloc((void *)c->ilist, ITEM_LIST_INITIAL * sizeof(c->ilist[0])); if (newbuf) { c->ilist = newbuf; c->isize = ITEM_LIST_INITIAL; } /* TODO check error condition? */ } if (c->msgsize > MSG_LIST_HIGHWAT) { struct msghdr *newbuf = (struct msghdr *) realloc((void *)c->msglist, MSG_LIST_INITIAL * sizeof(c->msglist[0])); if (newbuf) { c->msglist = newbuf; c->msgsize = MSG_LIST_INITIAL; } /* TODO check error condition? */ } if (c->iovsize > IOV_LIST_HIGHWAT) { struct iovec *newbuf = (struct iovec *) realloc((void *)c->iov, IOV_LIST_INITIAL * sizeof(c->iov[0])); if (newbuf) { c->iov = newbuf; c->iovsize = IOV_LIST_INITIAL; } /* TODO check return value */ } }



讀取數據:

        咱們若是conn的讀緩衝區裏面沒有數據,此時conn的狀態被設置爲conn_waiting,等待client發送命令數據。

若是client發送數據過來。libevent將檢測到clientsocket變成可讀,而後進入在libevent的回調函數中調用drive_machine函數,進入有限狀態機。在有限狀態機裏面。conn的狀態會被設置爲conn_read。

接着在conn_read case中,memcached會把client發送的命令數據儘量地讀入到conn的讀緩衝區中。

固然爲了防止沒有惡意的client,memcached也是有限度的:僅僅撐大讀緩衝區4次。這對於正常的client命令來講已是足夠的了。

static void drive_machine(conn *c) {
    bool stop = false;
    int res;
    assert(c != NULL);

	//drive_machine被調用會進行狀態推斷,並進行一些處理。但也可能發生狀態的轉換
	//此時就需要一個循環,當進行狀態轉換時,也能處理
    while (!stop) {

        switch(c->state) {

        case conn_waiting://等待socket變成可讀的
            if (!update_event(c, EV_READ | EV_PERSIST)) {//更新監聽事件失敗
                if (settings.verbose > 0)
                    fprintf(stderr, "Couldn't update event\n");
                conn_set_state(c, conn_closing);
                break;
            }

            conn_set_state(c, conn_read);
            stop = true;//居然stop循環。只是不要緊,因爲event的可讀事件是水平觸發的
            break;

        case conn_read:
            res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c);

            switch (res) {
            case READ_NO_DATA_RECEIVED://沒有讀取到數據
                conn_set_state(c, conn_waiting);//等待
                break;
            case READ_DATA_RECEIVED://讀取到了數據,接着就去解析數據
                conn_set_state(c, conn_parse_cmd);
                break;
            case READ_ERROR://read函數的返回值等於0或者-1時。會返回這個值
                conn_set_state(c, conn_closing);//直接關閉這個client
                break;
            case READ_MEMORY_ERROR: /* Failed to allocate more memory */
                /* State already set by try_read_network */
                break;
            }
            break;


        case conn_parse_cmd :
			//返回1表示正在處理讀取的一條命令
			//返回0表示需要繼續讀取socket的數據才幹解析命令
			//假設讀取到了一條完整的命令。那麼函數內部會去解析,
			//並進行調用process_command函數進行一些處理.
			//像set、add、replace這些命令。會在處理的時候調用
			//conn_set_state(c, conn_nread)
            if (try_read_command(c) == 0) {
                /* wee need more data! */
                conn_set_state(c, conn_waiting);
            }

            break;
			
        }
    }

    return;
}


 //儘量把socket的所有數據都讀進c指向的一個緩衝區裏面
static enum try_read_result try_read_network(conn *c) {
    enum try_read_result gotdata = READ_NO_DATA_RECEIVED;
    int res;
    int num_allocs = 0;
    assert(c != NULL);

    if (c->rcurr != c->rbuf) {
		//rcurr 和 rbuf之間是一條已經解析了的命令。現在可以丟棄了
        if (c->rbytes != 0) /* otherwise there's nothing to copy */
            memmove(c->rbuf, c->rcurr, c->rbytes);
        c->rcurr = c->rbuf;
    }

    while (1) {
		//因爲本函數會盡量把socket數據都讀取到rbuf指向的緩衝區裏面,
		//因此可能出現當前緩衝區不夠大的狀況(即rbytes>=rsize)
        if (c->rbytes >= c->rsize) {
			//可能有壞蛋發無窮無盡的數據過來,而本函數又是儘量把所有數據都
			//讀進緩衝區。爲了防止壞蛋耗光server的內存,因此就僅僅分配4次內存
            if (num_allocs == 4) {
                return gotdata;
            }
            ++num_allocs;
            char *new_rbuf = realloc(c->rbuf, c->rsize * 2);
            if (!new_rbuf) {
				//儘管分配內存失敗,但realloc保證c->rbuf仍是合法可用的指針
                c->rbytes = 0; /* ignore what we read */

                out_of_memory(c, "SERVER_ERROR out of memory reading request");
                c->write_and_go = conn_closing;//關閉這個conn
                return READ_MEMORY_ERROR;
            }
            c->rcurr = c->rbuf = new_rbuf;
            c->rsize *= 2;
        }

        int avail = c->rsize - c->rbytes;
        res = read(c->sfd, c->rbuf + c->rbytes, avail);
        if (res > 0) {
            pthread_mutex_lock(&c->thread->stats.mutex);
            c->thread->stats.bytes_read += res;//記錄該線程讀取了多少字節
            pthread_mutex_unlock(&c->thread->stats.mutex);
            gotdata = READ_DATA_RECEIVED;
            c->rbytes += res;
            if (res == avail) {//可能還有數據沒有讀出來
                continue;
            } else {
                break;//socket臨時還沒數據了(即已經讀取完)
            }
        }
        if (res == 0) {
            return READ_ERROR;
        }
        if (res == -1) {
            if (errno == EAGAIN || errno == EWOULDBLOCK) {
                break;
            }
            return READ_ERROR;
        }
    }
    return gotdata;
}

        假設conn沒有讀取到clientsocket的數據,那麼conn的狀態又會設置爲conn_waiting(等待數據狀態)。假設讀取到數據後,就會把狀態設置爲conn_parse_cmd,接着就會去解析該數據。由於網絡緣由,可能這一次並無接收到完整的一條命令。在解析命令的時候會發現這樣的狀況。此時將conn的狀態設置爲conn_waiting。再次等待socket數據。



解析命令:

通訊協議:


        在解說memcached怎麼解析命令前,先說一下memcached的通訊協議。平時使用的都是」sett 3 0 10」這種命令形式,還真不知道有什麼通訊協議。事實上memcached同一時候支持文本協議和二進制這兩種協議,memcached贊成client使用二進制和文本兩種通訊協議中的一種。平時咱們使用的是文本協議。之因此咱們不需要顯式地選擇某一種協議,是因爲client選擇哪一種協議,由client第一次發送的命令肯定(一旦肯定就不能更改)。Memcached推斷client選定哪一種協議的方法也很是easy:推斷命令的第一個字符。假設第一個字符等於128,那麼就是二進制協議,不然就是文本協議。

這樣行得通,是因爲文本協議中不論什麼字符(ascii碼)都不會取128這個值。本文僅僅解說文本協議。


推斷命令的完整性:

        在詳細解析client命令的內容以前,還需要作一個工做:推斷是否接收到完整的一條命令。Memcached推斷的方法也簡單:假設接收的數據中包括換行符就說明接收到完整的一條命令,不然就不完整,需要又一次讀取clientsocket(把conn狀態設置爲conn_waiting)。

        由於不一樣的平臺對於行尾有不一樣的處理,有的爲」\r\n」,有的爲」\n」。memcached必須處理這樣的狀況。Memcached的解決方式是:無論它!

直接把命令最後一個字符的後一個字符(the character past the end of the command)改成’\0’,這樣命令數據就變成一個C語言的字符串了。更巧妙的是。memcached還用一個暫時變量指向’\n’字符的下一個字符。

這樣,無論行尾是」\r\n」仍是」\n」都不重要了。

static int try_read_command(conn *c) {
    assert(c != NULL);
    assert(c->rcurr <= (c->rbuf + c->rsize));
    assert(c->rbytes > 0);

	//memcached支持文本和二進制兩種協議。

對於TCP這種有鏈接協議,memcached爲該 //fd分配conn的時候,並不指明其是用哪一種協議的。此時用negotiating_prot表明待 //協商的意思(negotiate是談判、協商)。而是在client第一次發送數據給 //memcached的時候用第一個字節來指明.以後的通訊都是使用指明的這種協議。 //對於UDP這種無鏈接協議,指明每次都指明使用哪一種協議了 if (c->protocol == negotiating_prot || c->transport == udp_transport) { //對於TCP僅僅會進入該推斷體裏面一次,而UDP就要次次都進入了 //PROTOCOL_BINARY_REQ爲0x80,即128。對於ascii的文原本說。是不會取這個值的 if ((unsigned char)c->rbuf[0] == (unsigned char)PROTOCOL_BINARY_REQ) { c->protocol = binary_prot; } else { c->protocol = ascii_prot; } } if (c->protocol == binary_prot) { ...//二進制協議。這裏不展開解說 } else {//文本協議 char *el, *cont; if (c->rbytes == 0)//讀緩衝區裏面沒有數據,被耍啦 return 0;//返回0表示需要繼續讀取socket的數據才幹解析命令 el = memchr(c->rcurr, '\n', c->rbytes); if (!el) {//沒有找到\n,說明沒有讀取到一條完整的命令 if (c->rbytes > 1024) {//接收了1024個字符都沒有回車符,值得懷疑 /* * We didn't have a '\n' in the first k. This _has_ to be a * large multiget, if not we should just nuke the connection. */ char *ptr = c->rcurr; while (*ptr == ' ') { /* ignore leading whitespaces */ ++ptr; } if (ptr - c->rcurr > 100 || //太多的空格符 (strncmp(ptr, "get ", 4) && strncmp(ptr, "gets ", 5))) {//是get或者gets命令,但一次獲取太多信息了 conn_set_state(c, conn_closing);//必須幹掉這種扯蛋的connclient return 1; } } return 0;//返回0表示需要繼續讀取socket的數據才幹解析命令 } //來到這裏,說明已經讀取到至少一條完整的命令 cont = el + 1;//用cont指向下一行的開始。無論行尾是\n仍是\r\n //不一樣的平臺對於行尾有不一樣的處理,有的爲\r\n有的則是\n。

因此memcached //還要推斷一下\n前面的一個字符是否爲\r if ((el - c->rcurr) > 1 && *(el - 1) == '\r') { el--;//指向行尾的開始字符 } //'\0',C語言字符串結尾符號。結合c->rcurr這個開始位置,就可以肯定 //這個命令(現在被看做一個字符串)的開始和結束位置。rcurr指向了一個字符串。

//注意,下一條命令的開始位置由前面的cont指明瞭 *el = '\0'; assert(cont <= (c->rcurr + c->rbytes)); c->last_cmd_time = current_time; //處理這個命令 process_command(c, c->rcurr);//命令字符串由c->rcurr指向 //cont指明下一條命令的開始位置 //更新curr指針和剩餘字節數 c->rbytes -= (cont - c->rcurr); c->rcurr = cont; assert(c->rcurr <= (c->rbuf + c->rsize)); } return 1;//返回1表示正在處理讀取的一條命令 }



符號化命令內容:

        爲了能運行命令。必須能識別出client發送的詳細是什麼命令以及有什麼參數。

爲了作到這一步,就得先命令字符串(try_read_command函數中已經把命令數據看成一個C語言的字符串了)裏面的每一個詞切割出來。

比方將字符串"set tt 3 0 10"切割爲」set」、」tt」、」3」、」0」和」10」這個5個詞,在memcached裏面用一個專門的名稱token表示這些詞。

Memcached在判別詳細的命令前,要作的一步就是將命令內容進行符號化。

        在process_command函數中,memcached會調用tokenize_command函數把命令字符串符號化。

process_command函數還定義了一個局部數組tokens用於指明命令字符串裏面每一個token。如下是tokenize_command函數的詳細實現。

#define MAX_TOKENS 8

typedef struct token_s {
    char *value;
    size_t length;
} token_t;


static void process_command(conn *c, char *command) {

    token_t tokens[MAX_TOKENS];
	size_t ntokens;

	ntokens = tokenize_command(command, tokens, MAX_TOKENS);
	...
}


//將一條命令切割成一個個的token,並用tokens數組一一相應的指向
//比方命令"set tt 3 0 10",將被切割成"set"、"tt"、"3"、"0"、"10"
//並用tokens數組的5個元素相應指向。token_t類型的value成員指向相應token
//在command裏面的位置。length則指明該token的長度
//返回token的數目,最後一個token是無心義的
static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) {
    char *s, *e;
    size_t ntokens = 0;
    size_t len = strlen(command);
    unsigned int i = 0;

    assert(command != NULL && tokens != NULL && max_tokens > 1);

    s = e = command;
    for (i = 0; i < len; i++) {
        if (*e == ' ') {//假設有連續多個空格符。那麼需要跳過
            if (s != e) {//s此時指向非空格符,並且是某個token的第一個字符
                tokens[ntokens].value = s;//指向token的開始位置
                tokens[ntokens].length = e - s;//這個token的長度
                ntokens++;
                *e = '\0';//賦值爲'\0',這樣這個token就是s開始的一個字符串
                if (ntokens == max_tokens - 1) {
					//這條命令至少有max_tokens-2個token
                    e++;
                    s = e; /* so we don't add an extra token */
                    break;
                }
            }
            s = e + 1;//最後s會指向第一個非空格符
        }
        e++;
    }

	//當這條命令是以空格符結尾的。那麼上面那個循環結束後,s等於e。

//不然s 不等於 e。此時s指向最後一個token的開始位置,e則指向token //最後一個字符的下一個字符(the first element past the end) if (s != e) {//處理最後一個token tokens[ntokens].value = s; tokens[ntokens].length = e - s; ntokens++; } /* * If we scanned the whole string, the terminal value pointer is null, * otherwise it is the first unprocessed character. */ //最多僅僅處理max_tokens-1(等於7)個token。剩下的不處理 tokens[ntokens].value = *e == '\0' ?

NULL : e; tokens[ntokens].length = 0; ntokens++; return ntokens; }


        通過命令符號化後,使用起來就會很是easy的了。

比方依據tokens[0]的內容可以推斷這個命令是什麼命令,假設是set命令(tokens[0]的內容等於」get」),天然tokens[1]就是鍵值了。接下來的tokens[2]、tokens[3]、tokens[4]就是鍵值的三個參數了。



運行命令:

依據token推斷命令和提取參數:


        把命令符號化後,很是easy就能提取出命令和相應的參數。

typedef struct token_s {
    char *value;
    size_t length;
} token_t;

#define COMMAND_TOKEN 0
#define KEY_TOKEN 1

#define MAX_TOKENS 8



static void process_command(conn *c, char *command) {

    token_t tokens[MAX_TOKENS];
    size_t ntokens;
    int comm;

    assert(c != NULL);

    ntokens = tokenize_command(command, tokens, MAX_TOKENS);//將命令記號化
    if (ntokens >= 3 &&
        ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
         (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {

		...//get命令

    } else if ((ntokens == 6 || ntokens == 7) &&
               ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
                (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
                (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
                (strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||
                (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) {

		//set命令
        process_update_command(c, tokens, ntokens, comm, false);

    } 
	...
}


#define KEY_MAX_LENGTH 250


static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas) {
    char *key;
    size_t nkey;
    unsigned int flags;
    int32_t exptime_int = 0;
    time_t exptime;
    int vlen;

    assert(c != NULL);

	//server不需要回覆信息給client,這可以下降網絡IO進而提快速度
	//這樣的設置是一次性的。不影響下一條命令
    set_noreply_maybe(c, tokens, ntokens);//處理用戶命令裏面的noreply

	//鍵值的長度太長了。KEY_MAX_LENGTH爲250
    if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
        out_string(c, "CLIENT_ERROR bad command line format");
        return;
    }

    key = tokens[KEY_TOKEN].value;
    nkey = tokens[KEY_TOKEN].length;

	//將字符串轉成unsigned long,獲取flags、exptime_int、vlen。
	//它們的字符串形式必須是純數字,不然轉換失敗,返回false
    if (! (safe_strtoul(tokens[2].value, (uint32_t *)&flags)
           && safe_strtol(tokens[3].value, &exptime_int)
           && safe_strtol(tokens[4].value, (int32_t *)&vlen))) {
        out_string(c, "CLIENT_ERROR bad command line format");
        return;
    }

    /* Ubuntu 8.04 breaks when I pass exptime to safe_strtol */
    exptime = exptime_int;


	...
}



static inline bool set_noreply_maybe(conn *c, token_t *tokens, size_t ntokens)
{
    int noreply_index = ntokens - 2;

    /*
      NOTE: this function is not the first place where we are going to
      send the reply.  We could send it instead from process_command()
      if the request line has wrong number of tokens.  However parsing
      malformed line for "noreply" option is not reliable anyway, so
      it can't be helped.
    */
    if (tokens[noreply_index].value
        && strcmp(tokens[noreply_index].value, "noreply") == 0) {
        c->noreply = true;
    }
    return c->noreply;
}


分配item:

        好了,現在已經知道是set命令,並且鍵值和相應的參數都已經提取出來了。接下來可以真正處理set命令了。

set命令是:鍵值已存在則更新,不存在則加入。但在這裏不管那麼多,直接調用item_alloc申請一個item。事實上process_update_command函數處理的命令不不過set,還包含replace、add、append等等。這些命令也是直接申請一個新的item。

        item_alloc函數會直接調用do_item_alloc函數申請一個item。前面的很是多博文一直在部分介紹do_item_alloc函數,但都沒有給出過完整版。現在就給出神奇函數的全部代碼。對於這個函數一些討論參數前面的一些博文吧。

static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas) {
    char *key;//鍵值
    size_t nkey;//鍵值長度
    unsigned int flags;//item的flags
    time_t exptime;//item的超時
    int vlen;//item數據域的長度
    uint64_t req_cas_id=0;
    item *it;

  
    /* Negative exptimes can underflow and end up immortal. realtime() will
       immediately expire values that are greater than REALTIME_MAXDELTA, but less
       than process_started, so lets aim for that. */
    if (exptime < 0)//此時會立刻過時失效
        exptime = REALTIME_MAXDELTA + 1;//REALTIME_MAXDELTA等於30天

	//在存儲item數據的時候,都會本身主動在數據的最後加上"\r\n"
    vlen += 2;//+2是因爲data後面還要加上"\r\n"這兩個字符
    if (vlen < 0 || vlen - 2 < 0) {
        out_string(c, "CLIENT_ERROR bad command line format");
        return;
    }


	//依據所需的大小分配相應的item,並給這個item賦值。
	//除了time和refcount成員外,其它的都賦值了。並把鍵值、flag這些值都拷貝
	//到item後面的buff裏面了,至於data。因爲現在都還沒拿到因此還沒賦值
	//realtime(exptime)是直接賦值給item的exptime成員
    it = item_alloc(key, nkey, flags, realtime(exptime), vlen);

    if (it == 0) {
        if (! item_size_ok(nkey, flags, vlen))
            out_string(c, "SERVER_ERROR object too large for cache");
        else
            out_of_memory(c, "SERVER_ERROR out of memory storing object");
        /* swallow the data line */
        c->write_and_go = conn_swallow;
        c->sbytes = vlen;

        /* Avoid stale data persisting in cache because we failed alloc.
         * Unacceptable for SET. Anywhere else too? */
        if (comm == NREAD_SET) {
            it = item_get(key, nkey);
            if (it) {
                item_unlink(it);
                item_remove(it);
            }
        }

        return;
    }
    ITEM_set_cas(it, req_cas_id);

	//本函數並不會把item插入到哈希表和LRU隊列。這個插入工做由
	//complete_nread_ascii函數完畢。
    c->item = it;
    c->ritem = ITEM_data(it); //數據直通車
    c->rlbytes = it->nbytes;//等於vlen(要比用戶輸入的長度大2,因爲要加上\r\n)
    c->cmd = comm;
    conn_set_state(c, conn_nread);
}


item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes) {
    item *it;
    /* do_item_alloc handles its own locks */
    it = do_item_alloc(key, nkey, flags, exptime, nbytes, 0);
    return it;
}



/*@null@*/
//key、flags、exptime三個參數是用戶在使用set、add命令存儲一條數據時輸入的參數。
//nkey是key字符串的長度。nbytes則是用戶要存儲的data長度+2,因爲在data的結尾處還要加上"\r\n"
//cur_hv則是依據鍵值key計算獲得的哈希值。

item *do_item_alloc(char *key, const size_t nkey, const int flags, const rel_time_t exptime, const int nbytes, const uint32_t cur_hv) { uint8_t nsuffix; item *it = NULL; char suffix[40]; //要存儲這個item需要的總空間 size_t ntotal = item_make_header(nkey + 1, flags, nbytes, suffix, &nsuffix); if (settings.use_cas) { ntotal += sizeof(uint64_t); } //依據大小推斷從屬於哪一個slab unsigned int id = slabs_clsid(ntotal); if (id == 0)//0表示不屬於不論什麼一個slab return 0; mutex_lock(&cache_lock); /* do a quick check if we have any expired items in the tail.. */ int tries = 5; /* Avoid hangs if a slab has nothing but refcounted stuff in it. */ int tries_lrutail_reflocked = 1000; int tried_alloc = 0; item *search; item *next_it; void *hold_lock = NULL; rel_time_t oldest_live = settings.oldest_live; search = tails[id]; /* We walk up *only* for locked items. Never searching for expired. * Waste of CPU for almost all deployments */ //第一次看這個for循環,直接以爲search等於NULL,直接看for循環後面的代碼 //這個循環裏面會在相應LRU隊列中查找過時失效的item。最多嘗試tries個item。 //從LRU的隊尾開始嘗試。

假設item被其它worker線程引用了,那麼就嘗試下一 //個。假設沒有的被其它worker線程所引用,那麼就測試該item是否過時失效。 //假設過時失效了。那麼就可以使用這個item(終於會返回這個item)。假設沒有 //過時失效,那麼再也不嘗試其它item了(因爲是從LRU隊列的隊尾開始嘗試的), //直接調用slabs_alloc申請一個新的內存存儲item。假設申請新內存都失敗, //那麼在贊成LRU淘汰的狀況下就會啓動踢人機制。 for (; tries > 0 && search != NULL; tries--, search=next_it) { /* we might relink search mid-loop, so search->prev isn't reliable */ next_it = search->prev; if (search->nbytes == 0 && search->nkey == 0 && search->it_flags == 1) { /* We are a crawler, ignore it. */ //這是一個爬蟲item,直接跳過 tries++;//爬蟲item不計入嘗試的item數中 continue; } uint32_t hv = hash(ITEM_key(search), search->nkey); /* Attempt to hash item lock the "search" item. If locked, no * other callers can incr the refcount */ /* Don't accidentally grab ourselves, or bail if we can't quicklock */ //嘗試搶佔鎖,搶不了就走人,不等待鎖。 if (hv == cur_hv || (hold_lock = item_trylock(hv)) == NULL) continue; /* Now see if the item is refcount locked */ if (refcount_incr(&search->refcount) != 2) {//引用數>=3 /* Avoid pathological case with ref'ed items in tail */ //刷新這個item的訪問時間以及在LRU隊列中的位置 do_item_update_nolock(search); tries_lrutail_reflocked--; tries++; refcount_decr(&search->refcount); //此時引用數>=2 itemstats[id].lrutail_reflocked++; /* Old rare bug could cause a refcount leak. We haven't seen * it in years, but we leave this code in to prevent failures * just in case */ //考慮這種狀況:某一個worker線程經過refcount_incr添加了一個 //item的引用數。

但因爲某種緣由(多是內核出了問題),這個worker //線程還沒來得及調用refcount_decr就掛了。此時這個item的引用數 //就確定不會等於0。也就是總有worker線程佔用着它.但實際上這個 //worker線程早就掛了。

因此對於這種狀況需要修復。直接把這個item //的引用計數賦值爲1。 //依據什麼推斷某一個worker線程掛了呢?首先在memcached裏面,通常 //來講,不論什麼函數都的調用都不會耗時太大的。即便這個函數需要加鎖 //因此假設這個item的最後一次訪問時間距離現在都比較遙遠了,但它 //卻還被一個worker所引用。那麼就差點兒可以推斷這個worker線程掛了. //在1.4.16版本號以前,這個時間距離都是固定的爲3個小時。

從1.4.16開 //就使用settings.tail_repair_time存儲時間距離。可以在啓動memcached //的時候設置,默認時間距離爲1個小時。現在這個版本號1.4.21默認都不 //進行這個修復了,settings.tail_repair_time的默認值爲0。因爲 //memcached的做者很是少看到這個bug了。預計是因爲操做系統的進一步穩定 //http://brionas.github.io/2014/01/06/memcached-manage/ //http://www.oschina.net/news/46787/memcached-1-4-16 if (settings.tail_repair_time && search->time + settings.tail_repair_time < current_time) { itemstats[id].tailrepairs++; search->refcount = 1; do_item_unlink_nolock(search, hv); } if (hold_lock) item_trylock_unlock(hold_lock); if (tries_lrutail_reflocked < 1) break; continue; } //search指向的item的refcount等於2,這說明此時這個item除了本worker //線程外,沒有其它不論什麼worker線程索引其。可以放心釋放並重用這個item //因爲這個循環是從lru鏈表的後面開始遍歷的。因此一開始search就指向 //了最不常用的item。假設這個item都沒有過時。那麼其它的比其更常用 //的item就不要刪除了(即便它們過時了)。此時僅僅能向slabs申請內存 /* Expired or flushed */ if ((search->exptime != 0 && search->exptime < current_time) || (search->time <= oldest_live && oldest_live <= current_time)) { //search指向的item是一個過時失效的item,可以使用之 itemstats[id].reclaimed++; if ((search->it_flags & ITEM_FETCHED) == 0) { itemstats[id].expired_unfetched++; } it = search; //又一次計算一下這個slabclass_t分配出去的內存大小 //直接霸佔舊的item就需要又一次計算 slabs_adjust_mem_requested(it->slabs_clsid, ITEM_ntotal(it), ntotal); do_item_unlink_nolock(it, hv);//從哈希表和lru鏈表中刪除 /* Initialize the item block: */ it->slabs_clsid = 0; } else if ((it = slabs_alloc(ntotal, id)) == NULL) {//申請內存失敗 //此刻,過時失效的item沒有找到。申請內存又失敗了。看來僅僅能使用 //LRU淘汰一個item(即便這個item並無過時失效) tried_alloc = 1;//標誌嘗試過了alloc if (settings.evict_to_free == 0) {//設置了不進行LRU淘汰item //此時僅僅能向client回覆錯誤了 itemstats[id].outofmemory++; } else { itemstats[id].evicted++;//添加被踢的item數 itemstats[id].evicted_time = current_time - search->time; //即便一個item的exptime成員設置爲永不超時(0)。仍是會被踢的 if (search->exptime != 0) itemstats[id].evicted_nonzero++; if ((search->it_flags & ITEM_FETCHED) == 0) { itemstats[id].evicted_unfetched++; } it = search; //又一次計算一下這個slabclass_t分配出去的內存大小 //直接霸佔舊的item就需要又一次計算 slabs_adjust_mem_requested(it->slabs_clsid, ITEM_ntotal(it), ntotal); do_item_unlink_nolock(it, hv);//從哈希表和lru鏈表中刪除 /* Initialize the item block: */ it->slabs_clsid = 0; /* If we've just evicted an item, and the automover is set to * angry bird mode, attempt to rip memory into this slab class. * TODO: Move valid object detection into a function, and on a * "successful" memory pull, look behind and see if the next alloc * would be an eviction. Then kick off the slab mover before the * eviction happens. */ //一旦發現有item被踢,那麼就啓動內存頁重分配操做 //這個太頻繁了,不推薦 if (settings.slab_automove == 2) slabs_reassign(-1, id); } } //引用計數減一。此時該item已經沒有不論什麼worker線程索引其,並且哈希表也 //再也不索引其 refcount_decr(&search->refcount); /* If hash values were equal, we don't grab a second lock */ if (hold_lock) item_trylock_unlock(hold_lock); break; } //沒有嘗試過alloc。並且在查找特定次數後仍是沒有找到可用的item if (!tried_alloc && (tries == 0 || search == NULL)) it = slabs_alloc(ntotal, id); if (it == NULL) { itemstats[id].outofmemory++; mutex_unlock(&cache_lock); return NULL; } assert(it->slabs_clsid == 0); assert(it != heads[id]); /* Item initialization can happen outside of the lock; the item's already * been removed from the slab LRU. */ it->refcount = 1; /* the caller will have a reference */ mutex_unlock(&cache_lock); //脫離以前的先後關係 it->next = it->prev = it->h_next = 0; it->slabs_clsid = id; //此時這個item沒有插入不論什麼LRU隊列和沒有插入到哈希表中 DEBUG_REFCNT(it, '*'); //默認狀況下memcached是支持CAS的,假設想取消可以在啓動memcached的時候添加 //參數C(大寫的c) it->it_flags = settings.use_cas ? ITEM_CAS : 0; it->nkey = nkey; it->nbytes = nbytes; memcpy(ITEM_key(it), key, nkey); it->exptime = exptime; memcpy(ITEM_suffix(it), suffix, (size_t)nsuffix); it->nsuffix = nsuffix; return it; }



        process_update_command函數申請分配一個item後。並無直接直接把這個item插入到LRU隊列和哈希表中,而不過用conn結構體的item成員指向這個申請獲得的item,並且用ritem成員指向item結構體的數據域(這爲了方便寫入數據)。最後conn的狀態改動爲conn_nread。就這樣process_update_command函數曳然而止了。



填充item數據域:

        值得注意的是,前面的命令處理過程是沒有把item的數據寫入到item結構體中。

現在要退出到有限本身主動機drive_machine函數中,查看memcached是怎麼處理conn_nread狀態的。儘管process_update_command留下了手尾,但它也用conn的成員變量記錄了一些重要值,用於填充item的數據域。

比方rlbytes表示需要用多少字節填充item。rbytes表示讀緩衝區還有多少字節可以使用;ritem指向數據填充地點。

static void drive_machine(conn *c) {
    bool stop = false;
	int res;

    while (!stop) {

        switch(c->state) {
	case conn_nread:
			//對於set、add、replace這種命令會將state設置成conn_nread
			//因爲在conn_read。它僅僅讀取了一行的數據。就去解析。但數據是
			//在第二行輸入的(client輸入進行操做的時候),此時,rlbytes
			//等於data的長度。

本case裏面會從conn的讀緩衝區、socket讀緩衝區 //讀取數據到item裏面。 //rlbytes標識還有多少字節需要讀取到item裏面。僅僅要沒有讀取足夠的 //數據,conn的狀態都是保持爲conn_nread。

即便讀取到足夠的數據 //狀態仍是不變,但此時rlbytes等於0。此刻會進入如下的這個if裏面 if (c->rlbytes == 0) { //處理完畢後會調用out_string函數。假設用戶明白要求不需要回復 //那麼conn的狀態變成conn_new_cmd。

假設需要回復,那麼狀態改成 //conn_write,並且write_and_go成員賦值爲conn_new_cmd complete_nread(c);//完畢對一個item的操做 break; } /* first check if we have leftovers in the conn_read buffer */ if (c->rbytes > 0) {//conn讀緩衝區裏面還有數據,那麼把數據直接賦值到item裏面 //rlbytes是需要讀取的字節數, rbytes是讀緩衝區擁有的字節數 int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes; if (c->ritem != c->rcurr) { memmove(c->ritem, c->rcurr, tocopy); } c->ritem += tocopy; c->rlbytes -= tocopy; c->rcurr += tocopy; c->rbytes -= tocopy; if (c->rlbytes == 0) {//conn讀緩衝區的數據能知足item的所需數據,無需從socket中讀取 break; } } //如下的代碼中,僅僅要不發生socket錯誤,那麼無論是否讀取到足夠的數據 //都不會改變conn的狀態,也就是說。下一次進入狀態機仍是爲conn_nread狀態 /* now try reading from the socket */ res = read(c->sfd, c->ritem, c->rlbytes);//直接從socket中讀取數據 if (res > 0) { if (c->rcurr == c->ritem) { c->rcurr += res; } c->ritem += res; c->rlbytes -= res; break; } if (res == 0) { /* end of stream */ conn_set_state(c, conn_closing); break; } if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {//socket裏面沒有數據 if (!update_event(c, EV_READ | EV_PERSIST)) { conn_set_state(c, conn_closing); break; } stop = true;//此時就不要再讀了。中止狀態機,等待libevent通知有數據可讀 break; } /* otherwise we have a real error, on which we close the connection */ conn_set_state(c, conn_closing); break; } } }



存儲item:

        填充數據仍是比較簡單的。填充數據後這個item就是完整的了,此時需要把item插入到LRU隊列和哈希表中。

Memcached是調用complete_nread函數完畢這操做。

complete_nread內部會間接調用函數do_store_item,後者會先調用do_item_get函數查詢當前memcachedserver是否已經存在一樣鍵值的item,而後依據不一樣的命令(add、replace、set)進行不一樣的處理。

static void complete_nread(conn *c) {
    assert(c != NULL);
    assert(c->protocol == ascii_prot
           || c->protocol == binary_prot);

    if (c->protocol == ascii_prot) {//文本協議
        complete_nread_ascii(c);
    } else if (c->protocol == binary_prot) {//二進制協議
        complete_nread_binary(c);
    }
}


/*
 * we get here after reading the value in set/add/replace commands. The command
 * has been stored in c->cmd, and the item is ready in c->item.
 */
static void complete_nread_ascii(conn *c) {
    assert(c != NULL);

	//此時這個item不在LRU隊列,也不在哈希表中
	//並且引用數等於1(就是本worker線程在引用它)
	
    item *it = c->item;
    int comm = c->cmd;
    enum store_item_type ret;

    pthread_mutex_lock(&c->thread->stats.mutex);
    c->thread->stats.slab_stats[it->slabs_clsid].set_cmds++;
    pthread_mutex_unlock(&c->thread->stats.mutex);

	//保證最後的兩個字符是"\r\n",不然就是錯誤數據
    if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
        out_string(c, "CLIENT_ERROR bad data chunk");
    } else {
      ret = store_item(it, comm, c);//將這個item存放到LRU對和哈希表中

	  //輸出迴應信息
      switch (ret) {
      case STORED:
          out_string(c, "STORED");
          break;
      case EXISTS:
          out_string(c, "EXISTS");
          break;
      case NOT_FOUND:
          out_string(c, "NOT_FOUND");
          break;
      case NOT_STORED:
          out_string(c, "NOT_STORED");
          break;
      default:
          out_string(c, "SERVER_ERROR Unhandled storage type.");
      }

    }

	//本worker線程取消對這個item的引用
    item_remove(c->item);       /* release the c->item reference */
    c->item = 0;
}


enum store_item_type store_item(item *item, int comm, conn* c) {
    enum store_item_type ret;
    uint32_t hv;

    hv = hash(ITEM_key(item), item->nkey);
    item_lock(hv);
    ret = do_store_item(item, comm, c, hv);
    item_unlock(hv);
    return ret;
}


 //主調函數store_item會加item_lock(hv)鎖
 //set、add、replace命令終於都會調用本函數進行存儲的
 //comm參數保存了詳細是哪一個命令
enum store_item_type do_store_item(item *it, int comm, conn *c, const uint32_t hv) {
    char *key = ITEM_key(it);
    item *old_it = do_item_get(key, it->nkey, hv);//查詢舊值
    enum store_item_type stored = NOT_STORED;

    item *new_it = NULL;
    int flags;

    if (old_it != NULL && comm == NREAD_ADD) {
        /* add only adds a nonexistent item, but promote to head of LRU */
		//因爲已經有一樣鍵值的舊item了。因此add命令使用失敗。但
		//仍是會刷新舊item的訪問時間以及LRU隊列中的位置
        do_item_update(old_it);
    } else if (!old_it && (comm == NREAD_REPLACE
        || comm == NREAD_APPEND || comm == NREAD_PREPEND))
    {
        /* replace only replaces an existing value; don't store */
    } else if (comm == NREAD_CAS) {
        /* validate cas operation */
        if(old_it == NULL) {
            // LRU expired
            stored = NOT_FOUND;
            pthread_mutex_lock(&c->thread->stats.mutex);
            c->thread->stats.cas_misses++;
            pthread_mutex_unlock(&c->thread->stats.mutex);
        }
        else if (ITEM_get_cas(it) == ITEM_get_cas(old_it)) {
            // cas validates
            // it and old_it may belong to different classes.
            // I'm updating the stats for the one that's getting pushed out
            pthread_mutex_lock(&c->thread->stats.mutex);
            c->thread->stats.slab_stats[old_it->slabs_clsid].cas_hits++;
            pthread_mutex_unlock(&c->thread->stats.mutex);

            item_replace(old_it, it, hv);
            stored = STORED;
        } else {
            pthread_mutex_lock(&c->thread->stats.mutex);
            c->thread->stats.slab_stats[old_it->slabs_clsid].cas_badval++;
            pthread_mutex_unlock(&c->thread->stats.mutex);

            if(settings.verbose > 1) {
                fprintf(stderr, "CAS:  failure: expected %llu, got %llu\n",
                        (unsigned long long)ITEM_get_cas(old_it),
                        (unsigned long long)ITEM_get_cas(it));
            }
            stored = EXISTS;
        }
    } else {
        /*
         * Append - combine new and old record into single one. Here it's
         * atomic and thread-safe.
         */
        if (comm == NREAD_APPEND || comm == NREAD_PREPEND) {
            /*
             * Validate CAS
             */
            if (ITEM_get_cas(it) != 0) {
                // CAS much be equal
                if (ITEM_get_cas(it) != ITEM_get_cas(old_it)) {
                    stored = EXISTS;
                }
            }

            if (stored == NOT_STORED) {
                /* we have it and old_it here - alloc memory to hold both */
                /* flags was already lost - so recover them from ITEM_suffix(it) */

                flags = (int) strtol(ITEM_suffix(old_it), (char **) NULL, 10);

				//因爲是追加數據,先前分配的item可能不夠大,因此要又一次申請item
                new_it = do_item_alloc(key, it->nkey, flags, old_it->exptime, it->nbytes + old_it->nbytes - 2 /* CRLF */, hv);

                if (new_it == NULL) {
                    /* SERVER_ERROR out of memory */
                    if (old_it != NULL)
                        do_item_remove(old_it);

                    return NOT_STORED;
                }

                /* copy data from it and old_it to new_it */

                if (comm == NREAD_APPEND) {
                    memcpy(ITEM_data(new_it), ITEM_data(old_it), old_it->nbytes);
                    memcpy(ITEM_data(new_it) + old_it->nbytes - 2 /* CRLF */, ITEM_data(it), it->nbytes);
                } else {
                    /* NREAD_PREPEND */
                    memcpy(ITEM_data(new_it), ITEM_data(it), it->nbytes);
                    memcpy(ITEM_data(new_it) + it->nbytes - 2 /* CRLF */, ITEM_data(old_it), old_it->nbytes);
                }

                it = new_it;
            }
        }

		//add、set、replace命令還沒處理,但以前已經處理了不合理的狀況
		//即add命令已經確保了眼下哈希表還沒存儲相應鍵值的item,replace命令
		//已經保證哈希表已經存儲了相應鍵值的item
        if (stored == NOT_STORED) {
            if (old_it != NULL)//replace和set命令會進入這裏
                item_replace(old_it, it, hv);//刪除舊item,插入新item
            else//add和set命令會進入這裏     
                do_item_link(it, hv);//對於一個沒有存在的key,使用set命令會來到這裏

            c->cas = ITEM_get_cas(it);

            stored = STORED;
        }
    }

    if (old_it != NULL)
        do_item_remove(old_it);         /* release our reference */
    if (new_it != NULL)
        do_item_remove(new_it);

    if (stored == STORED) {
        c->cas = ITEM_get_cas(it);
    }

    return stored;
}


int item_replace(item *old_it, item *new_it, const uint32_t hv) {
    return do_item_replace(old_it, new_it, hv);
}


//把舊的刪除,插入新的。replace命令會調用本函數.
//無論舊item是否有其它worker線程在引用,都是直接將之從哈希表和LRU隊列中刪除
int do_item_replace(item *it, item *new_it, const uint32_t hv) {
    MEMCACHED_ITEM_REPLACE(ITEM_key(it), it->nkey, it->nbytes,
                           ITEM_key(new_it), new_it->nkey, new_it->nbytes);
    assert((it->it_flags & ITEM_SLABBED) == 0);

    do_item_unlink(it, hv);//直接丟棄舊item
    return do_item_link(new_it, hv);//插入新item。做爲替換
}

        關於do_item_unlink和do_item_link函數可以參考《插入和刪除item》。

至此已經完畢了item的存儲。



迴應命令:

        在complete_nread_ascii函數中,無論是存儲成功仍是失敗都會調用out_string函數迴應client。

static void out_string(conn *c, const char *str) {
    size_t len;

    assert(c != NULL);

    if (c->noreply) {//不需要回覆信息給client
        if (settings.verbose > 1)
            fprintf(stderr, ">%d NOREPLY %s\n", c->sfd, str);
        c->noreply = false; //重置
        conn_set_state(c, conn_new_cmd);
        return;
    }


    /* Nuke a partial output... */
    c->msgcurr = 0;
    c->msgused = 0;
    c->iovused = 0;
    add_msghdr(c);

    len = strlen(str);
    if ((len + 2) > c->wsize) {///2是後面的\r\n
        /* ought to be always enough. just fail for simplicity */
        str = "SERVER_ERROR output line too long";
        len = strlen(str);
    }

    memcpy(c->wbuf, str, len);
    memcpy(c->wbuf + len, "\r\n", 2);
    c->wbytes = len + 2;
    c->wcurr = c->wbuf;

    conn_set_state(c, conn_write);//寫狀態
    c->write_and_go = conn_new_cmd;//寫完後的下一個狀態
    return;
}


static void drive_machine(conn *c) {
    bool stop = false;
    int res;


    assert(c != NULL);

	//drive_machine被調用會進行狀態推斷,並進行一些處理。但也可能發生狀態的轉換
	//此時就需要一個循環,當進行狀態轉換時。也能處理
    while (!stop) {

        switch(c->state) {

        case conn_write:

            if (c->iovused == 0 || (IS_UDP(c->transport) && c->iovused == 1)) {
                if (add_iov(c, c->wcurr, c->wbytes) != 0) {
                    if (settings.verbose > 0)
                        fprintf(stderr, "Couldn't build response\n");
                    conn_set_state(c, conn_closing);
                    break;
                }
            }

            /* fall through... */

        case conn_mwrite:
			...
		}
	}

}

        對於狀態conn_mwrite的詳細處理,可以參考前一篇博文的《迴應命令》。需要注意的是,當memcached迴應完client後。還需要釋放conn對保存item的佔有。這和前一篇博文是同樣的,參考前一篇博文就能夠。

相關文章
相關標籤/搜索