Memcached學習(三)--消息迴應

conn結構主要是存儲單個客戶端的鏈接詳情信息。每個客戶端鏈接到Memcached都會有這麼一個數據結構。數組

typedef struct conn conn;
struct conn {
    //....
    /* data for the mwrite state */
    //iov主要存儲iov的數據結構
    //iov數據結構會在conn_new中初始化,初始化的時候,系統會分配400個iovec的結構,最高水位600個
    struct iovec *iov;
    //iov的長度
    int    iovsize;   /* number of elements allocated in iov[] */
    //iovused 這個主要記錄iov使用了多少
    int    iovused;   /* number of elements used in iov[] */
 
    //msglist主要存儲msghdr的列表數據結構
    //msglist數據結構在conn_new中初始化的時候,系統會分配10個結構
    struct msghdr *msglist;
    //msglist的長度,初始化爲10個,最高水位100,不夠用的時候會realloc,每次擴容都會擴容一倍
    int    msgsize;   /* number of elements allocated in msglist[] */
    //msglist已經使用的長度
    int    msgused;   /* number of elements used in msglist[] */
    //這個參數主要幫助記錄那些msglist已經發送過了,哪些沒有發送過。
    int    msgcurr;   /* element in msglist[] being transmitted now */
    int    msgbytes;  /* number of bytes in current msg */
}

 咱們能夠看一下conn_new這個方法,在這個方法裏面會對iov和msglist兩個參數進行初始化。緩存

conn *conn_new(const int sfd, enum conn_states init_state,const int event_flags, const int read_buffer_size,
        enum network_transport transport, struct event_base *base) 
{
//...省略部分代碼 c->iov = (struct iovec *) malloc(sizeof(struct iovec) * c->iovsize); //初始化iov c->msglist = (struct msghdr *) malloc(sizeof(struct msghdr) * c->msgsize); //初始化msglist }

數據結構關係圖(iov和msglist之間的關係):數據結構

 

從process_get_command開始

咱們繼續從process_get_command,獲取memcached的緩存數據這個方法開始。memcached

在這個方法中,咱們主要看add_iov這個方法。Memcached主要是經過add_iov方法,將須要發送給客戶端的數據裝到iov和msglist結構中去的。oop

 

/* ntokens is overwritten here... shrug.. */
//處理GET請求的命令
static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens,
        bool return_cas) {
    //處理GET命令
    char *key;
    size_t nkey;
    int i = 0;
    item *it;
    //&tokens[0] 是操做的方法
    //&tokens[1] 爲key
    //token_t 存儲了value和length
    token_t *key_token = &tokens[KEY_TOKEN];
    char *suffix;
    assert(c != NULL);
 
    do {
        //若是key的長度不爲0
        while (key_token->length != 0) {
 
            key = key_token->value;
            nkey = key_token->length;
 
            //判斷key的長度是否超過了最大的長度,memcache key的最大長度爲250
            //這個地方須要很是注意,咱們在日常的使用中,仍是要注意key的字節長度的
            if (nkey > KEY_MAX_LENGTH) {
                //out_string 向外部輸出數據
                out_string(c, "CLIENT_ERROR bad command line format");
                while (i-- > 0) {
                    item_remove(*(c->ilist + i));
                }
                return;
            }
            //這邊是從Memcached的內存存儲快中去取數據
            it = item_get(key, nkey);
            if (settings.detail_enabled) {
                //狀態記錄,key的記錄數的方法
                stats_prefix_record_get(key, nkey, NULL != it);
            }
            //若是獲取到了數據
            if (it) {
                //c->ilist 存放用於向外部寫數據的buf
                //若是ilist過小,則從新分配一塊內存
                if (i >= c->isize) {
                    item **new_list = realloc(c->ilist,
                            sizeof(item *) * c->isize * 2);
                    if (new_list) {
                        //存放須要向客戶端寫數據的item的列表的長度
                        c->isize *= 2;
                        //存放須要向客戶端寫數據的item的列表,這邊支持
                        c->ilist = new_list;
                    } else {
                        STATS_LOCK();
                        stats.malloc_fails++;
                        STATS_UNLOCK();
                        item_remove(it);
                        break;
                    }
                }
 
                /*
                 * Construct the response. Each hit adds three elements to the
                 * outgoing data list:
                 *   "VALUE "
                 *   key
                 *   " " + flags + " " + data length + "\r\n" + data (with \r\n)
                 */
                //初始化返回出去的數據結構
                if (return_cas) {
                    //......
                } else {
                    MEMCACHED_COMMAND_GET(c->sfd, ITEM_key(it), it->nkey,
                            it->nbytes, ITEM_get_cas(it));
                    //將須要返回的數據填充到IOV結構中
                    //命令:get userId
                    //返回的結構:
                    //VALUE userId 0 5
                    //55555
                    //END
                    if (<strong><span style="color:#FF0000;">add_iov</span></strong>(c, "VALUE ", 6) != 0
                            || <strong><span style="color:#FF0000;">add_iov</span></strong>(c, ITEM_key(it), it->nkey) != 0
                            || <strong><span style="color:#FF0000;">add_iov</span></strong>(c, ITEM_suffix(it),
                                    it->nsuffix + it->nbytes) != 0) {
                        item_remove(it);
                        break;
                    }
                }
 
                if (settings.verbose > 1) {
                    int ii;
                    fprintf(stderr, ">%d sending key ", c->sfd);
                    for (ii = 0; ii < it->nkey; ++ii) {
                        fprintf(stderr, "%c", key[ii]);
                    }
                    fprintf(stderr, "\n");
                }
 
                /* item_get() has incremented it->refcount for us */
                pthread_mutex_lock(&c->thread->stats.mutex);
                c->thread->stats.slab_stats[it->slabs_clsid].get_hits++;
                c->thread->stats.get_cmds++;
                pthread_mutex_unlock(&c->thread->stats.mutex);
                item_update(it);
                *(c->ilist + i) = it;
                i++;
 
            } else {
                pthread_mutex_lock(&c->thread->stats.mutex);
                c->thread->stats.get_misses++;
                c->thread->stats.get_cmds++;
                pthread_mutex_unlock(&c->thread->stats.mutex);
                MEMCACHED_COMMAND_GET(c->sfd, key, nkey, -1, 0);
            }
 
            key_token++;
        }
 
        /*
         * If the command string hasn't been fully processed, get the next set
         * of tokens.
         */
        //若是命令行中的命令沒有所有被處理,則繼續下一個命令
        //一個命令行中,能夠get多個元素
        if (key_token->value != NULL) {
            ntokens = tokenize_command(key_token->value, tokens, MAX_TOKENS);
            key_token = tokens;
        }
 
    } while (key_token->value != NULL);
 
    c->icurr = c->ilist;
    c->ileft = i;
    if (return_cas) {
        c->suffixcurr = c->suffixlist;
        c->suffixleft = i;
    }
 
    if (settings.verbose > 1)
        fprintf(stderr, ">%d END\n", c->sfd);
 
    /*
     If the loop was terminated because of out-of-memory, it is not
     reliable to add END\r\n to the buffer, because it might not end
     in \r\n. So we send SERVER_ERROR instead.
     */
    //添加結束標誌符號
    if (key_token->value != NULL || <strong><span style="color:#FF0000;">add_iov</span></strong>(c, "END\r\n", 5) != 0
            || (IS_UDP(c->transport) && build_udp_headers(c) != 0)) {
        out_of_memory(c, "SERVER_ERROR out of memory writing get response");
    } else {
        //將狀態修改成寫,這邊讀取到item的數據後,又開始須要往客戶端寫數據了。
        conn_set_state(c, conn_mwrite);
        c->msgcurr = 0;
    }
}

add_iov 方法

add_iov方法,主要做用:ui

1. 將Memcached須要發送的數據,分紅N多個IOV的塊this

2. 將IOV塊添加到msghdr的結構中去。spa

static int add_iov(conn *c, const void *buf, int len) {
    struct msghdr *m;
    int leftover;
    bool limit_to_mtu;
 
    assert(c != NULL);
 
    do {
        //消息數組 msglist 存儲msghdr結構
        //這邊是獲取最新的msghdr數據結構指針
        m = &c->msglist[c->msgused - 1];
 
        /*
         * Limit UDP packets, and the first payloads of TCP replies, to
         * UDP_MAX_PAYLOAD_SIZE bytes.
         */
        limit_to_mtu = IS_UDP(c->transport) || (1 == c->msgused);
 
        /* We may need to start a new msghdr if this one is full. */
        //若是msghdr結構中的iov滿了,則須要使用更新的msghdr數據結構
        if (m->msg_iovlen == IOV_MAX
                || (limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) {
            //添加msghdr,這個方法中回去判斷初始化的時候10個msghdr結構是否夠用,不夠用的話會擴容
            add_msghdr(c);
            //指向下一個新的msghdr數據結構
            m = &c->msglist[c->msgused - 1];
        }
 
        //確認IOV的空間大小,初始化默認是400個,水位600
        //若是IOV也不夠用了,就會去擴容
        if (ensure_iov_space(c) != 0)
            return -1;
 
        /* If the fragment is too big to fit in the datagram, split it up */
        if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) {
            leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE;
            len -= leftover;
        } else {
            leftover = 0;
        }
 
        m = &c->msglist[c->msgused - 1];
        //m->msg_iov參數指向c->iov這個結構。
        //具體m->msg_iov如何指向到c->iov這個結構的,須要看一下add_msghdr這個方法
        //向IOV中填充BUF
        m->msg_iov[m->msg_iovlen].iov_base = (void *) buf;
        //buf的長度
        m->msg_iov[m->msg_iovlen].iov_len = len; //填充長度
 
        c->msgbytes += len;
        c->iovused++;
        m->msg_iovlen++; //msg_iovlen + 1
 
        buf = ((char *) buf) + len;
        len = leftover;
    } while (leftover > 0);
 
    return 0;
}

 

 

add_msghdr 方法 msghdr擴容

 

在add_iov方法中,咱們能夠看到,當IOV塊添加滿了以後,會調用這個方法擴容msgdhr的個數。命令行

這個方法主要兩個做用:指針

1. 檢查c->msglist列表長度是否夠用。

2. 使用最新的c->msglist中的一個msghdr元素,而且將msghdr->msg_iov指向c->iov最新未使用的那個iov的指針地址。

static int add_msghdr(conn *c) {
    //c->msglist 這個列表用來存儲msghdr結構
    struct msghdr *msg;
 
    assert(c != NULL);
 
    //若是msglist的長度和已經使用的長度相等的時候,說明msglist已經用完了,須要擴容
    if (c->msgsize == c->msgused) {
        //擴容兩倍
        msg = realloc(c->msglist, c->msgsize * 2 * sizeof(struct msghdr));
        if (!msg) {
            STATS_LOCK();
            stats.malloc_fails++;
            STATS_UNLOCK();
            return -1;
        }
        c->msglist = msg; //將c->msglist指向當前新的列表
        c->msgsize *= 2; //size也會跟着增長
    }
 
    //msg從新指向未使用的msghdr指針位置
    msg = c->msglist + c->msgused;
 
    /* this wipes msg_iovlen, msg_control, msg_controllen, and
     msg_flags, the last 3 of which aren't defined on solaris: */
    //將新的msghdr塊初始化設置爲0
    memset(msg, 0, sizeof(struct msghdr));
 
    //新的msghdr的msg_iov指向 struct iovec *iov結構
    msg->msg_iov = &c->iov[c->iovused];
 
    if (IS_UDP(c->transport) && c->request_addr_size > 0) {
        msg->msg_name = &c->request_addr;
        msg->msg_namelen = c->request_addr_size;
    }
 
    c->msgbytes = 0;
    c->msgused++;
 
    if (IS_UDP(c->transport)) {
        /* Leave room for the UDP header, which we'll fill in later. */
        return add_iov(c, NULL, UDP_HEADER_SIZE);
    }
 
    return 0;
}

ensure_iov_space 方法 IOV擴容

這個方法主要檢查c->iov是否還有剩餘空間,若是不夠用了,則擴容2倍。

 

static int ensure_iov_space(conn *c) {
    assert(c != NULL);
 
    //若是IOV也使用完了....IOV,分配新的IOV
    if (c->iovused >= c->iovsize) {
        int i, iovnum;
        struct iovec *new_iov = (struct iovec *) realloc(c->iov,
                (c->iovsize * 2) * sizeof(struct iovec));
        if (!new_iov) {
            STATS_LOCK();
            stats.malloc_fails++;
            STATS_UNLOCK();
            return -1;
        }
        c->iov = new_iov;
        c->iovsize *= 2; //擴容兩倍
 
        /* Point all the msghdr structures at the new list. */
        for (i = 0, iovnum = 0; i < c->msgused; i++) {
            c->msglist[i].msg_iov = &c->iov[iovnum];
            iovnum += c->msglist[i].msg_iovlen;
        }
    }
 
    return 0;
}

 

 

conn_mwrite

conn_mwrite狀態在drive_machine這個方法中。主要就是向客戶端寫數據了。

從上面的add_iov方法中,咱們知道Memcached會將須要待發送的數據寫入c->msglist結構中。

真正寫數據的方法是transmit。

//drive_machine方法
        //這個conn_mwrite是向客戶端寫數據
        case conn_mwrite:
            if (IS_UDP(c->transport) && c->msgcurr == 0
                    && build_udp_headers(c) != 0) {
                if (settings.verbose > 0)
                    fprintf(stderr, "Failed to build UDP headers\n");
                conn_set_state(c, conn_closing);
                break;
            }
            //transmit這個方法很是重要,主要向客戶端寫數據的操做都在這個方法中進行
            //返回transmit_result枚舉類型,用於判斷是否寫成功,若是失敗,則關閉鏈接
            switch (transmit(c)) {
 
            //若是向客戶端發送數據成功
            case TRANSMIT_COMPLETE:
                if (c->state == conn_mwrite) {
                    conn_release_items(c);
                    /* XXX:  I don't know why this wasn't the general case */
                    if (c->protocol == binary_prot) {
                        conn_set_state(c, c->write_and_go);
                    } else {
                        //這邊是TCP的狀態
                        //狀態又會切回到conn_new_cmd這個狀態
                        //conn_new_cmd主要是繼續解析c->rbuf容器中剩餘的命令參數
                        conn_set_state(c, conn_new_cmd);
                    }
                } else if (c->state == conn_write) {
                    if (c->write_and_free) {
                        free(c->write_and_free);
                        c->write_and_free = 0;
                    }
                    conn_set_state(c, c->write_and_go);
                } else {
                    if (settings.verbose > 0)
                        fprintf(stderr, "Unexpected state %d\n", c->state);
                    conn_set_state(c, conn_closing);
                }
                break;

transmit 方法

 

//這個方法主要向客戶端寫數據
//若是數據沒有發送完,則會一直循環conn_mwrite這個狀態,直到數據發送完成爲止
static enum transmit_result transmit(conn *c) {
    assert(c != NULL);
 
    //每次發送以前,都會來校驗前一次的數據是否發送完了
    //若是前一次的msghdr結構體內的數據已經發送完了,則c->msgcurr指針就會日後移動一位,
    //移動到下一個等待發送的msghdr結構體指針上
    //c->msgcurr初始值爲:0
    if (c->msgcurr < c->msgused && c->msglist[c->msgcurr].msg_iovlen == 0) {
        /* Finished writing the current msg; advance to the next. */
        c->msgcurr++;
    }
 
    //若是c->msgcurr(已發送)小於c->msgused(已使用),則就能夠知道還沒發送完,則須要繼續發送
    //若是c->msgcurr(已發送)等於c->msgused(已使用),則說明已經發送完了,返回TRANSMIT_COMPLETE狀態
    if (c->msgcurr < c->msgused) {
        ssize_t res;
 
        //從c->msglist取出一個待發送的msghdr結構
        struct msghdr *m = &c->msglist[c->msgcurr];
        //向客戶端發送數據
        res = sendmsg(c->sfd, m, 0);
        //發送成功的狀況
        if (res > 0) {
            pthread_mutex_lock(&c->thread->stats.mutex);
            c->thread->stats.bytes_written += res;
            pthread_mutex_unlock(&c->thread->stats.mutex);
 
            /* We've written some of the data. Remove the completed
             iovec entries from the list of pending writes. */
            //這邊會檢查發送了多少
            while (m->msg_iovlen > 0 && res >= m->msg_iov->iov_len) {
                res -= m->msg_iov->iov_len;
                m->msg_iovlen--;
                m->msg_iov++;
            }
 
            /* Might have written just part of the last iovec entry;
             adjust it so the next write will do the rest. */
            if (res > 0) {
                m->msg_iov->iov_base = (caddr_t) m->msg_iov->iov_base + res;
                m->msg_iov->iov_len -= res;
            }
            return TRANSMIT_INCOMPLETE;
        }
        //發送失敗的狀況
        if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
            if (!update_event(c, EV_WRITE | EV_PERSIST)) {
                if (settings.verbose > 0)
                    fprintf(stderr, "Couldn't update event\n");
                conn_set_state(c, conn_closing);
                return TRANSMIT_HARD_ERROR;
            }
            return TRANSMIT_SOFT_ERROR;
        }
        /* if res == 0 or res == -1 and error is not EAGAIN or EWOULDBLOCK,
         we have a real error, on which we close the connection */
        if (settings.verbose > 0)
            perror("Failed to write, and not due to blocking");
 
        if (IS_UDP(c->transport))
            conn_set_state(c, conn_read);
        else
            conn_set_state(c, conn_closing);
        return TRANSMIT_HARD_ERROR;
    } else {
        return TRANSMIT_COMPLETE;
    }
}

conn_shrink 方法

 

  當數據發送成功後,會跳轉到conn_new_cmd這個狀態繼續處理,而後進入reset_cmd_handler方法,而後進入conn_shrink方法。

  conn_shrink主要是用於檢查讀取和發送的buf的大小,是否超過了預約的水位,若是超過了,則須要從新realloc。

 

//從新設置命令handler
static void reset_cmd_handler(conn *c) {
    c->cmd = -1;
    c->substate = bin_no_state;
    if (c->item != NULL) {
        item_remove(c->item);
        c->item = NULL;
    }
    conn_shrink(c); //這個方法是檢查c->rbuf容器的大小
    //若是剩餘未解析的命令 > 0的話,繼續跳轉到conn_parse_cmd解析命令
    if (c->rbytes > 0) {
        conn_set_state(c, conn_parse_cmd);
    } else {
        //若是命令都解析完成了,則繼續等待新的數據到來
        conn_set_state(c, conn_waiting);
    }
}

 

 

//檢查rbuf的大小
static void conn_shrink(conn *c) {
    assert(c != NULL);
 
    if (IS_UDP(c->transport))
        return;
 
    //若是bufsize大於READ_BUFFER_HIGHWAT(8192)的時候須要從新處理
    //DATA_BUFFER_SIZE等於2048,因此咱們能夠看到以前的代碼中對rbuf最多隻能進行4次recalloc
    if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
        char *newbuf;
 
        if (c->rcurr != c->rbuf)
            memmove(c->rbuf, c->rcurr, (size_t) c->rbytes); //內存移動
 
        newbuf = (char *) realloc((void *) c->rbuf, DATA_BUFFER_SIZE);
 
        if (newbuf) {
            c->rbuf = newbuf;
            c->rsize = DATA_BUFFER_SIZE;
        }
        /* TODO check other branch... */
        c->rcurr = c->rbuf;
    }
 
    if (c->isize > ITEM_LIST_HIGHWAT) {
        item **newbuf = (item**) realloc((void *) c->ilist,
                ITEM_LIST_INITIAL * sizeof(c->ilist[0]));
        if (newbuf) {
            c->ilist = newbuf;
            c->isize = ITEM_LIST_INITIAL;
        }
        /* TODO check error condition? */
    }
 
    //若是大於c->msglist的水位了,則從新realloc
    if (c->msgsize > MSG_LIST_HIGHWAT) {
        struct msghdr *newbuf = (struct msghdr *) realloc((void *) c->msglist,
                MSG_LIST_INITIAL * sizeof(c->msglist[0]));
        if (newbuf) {
            c->msglist = newbuf;
            c->msgsize = MSG_LIST_INITIAL;
        }
        /* TODO check error condition? */
    }
 
    //若是大於c->iovsize的水位了,則從新realloc
    if (c->iovsize > IOV_LIST_HIGHWAT) {
        struct iovec *newbuf = (struct iovec *) realloc((void *) c->iov,
                IOV_LIST_INITIAL * sizeof(c->iov[0]));
        if (newbuf) {
            c->iov = newbuf;
            c->iovsize = IOV_LIST_INITIAL;
        }
        /* TODO check return value */
    }
}
相關文章
相關標籤/搜索