memcached源碼分析三-libevent與命令解析

  轉載請註明出處http://www.javashuo.com/article/p-kbglwwgv-e.html,謝謝合做!html

  前面已經分析了memcached中的slabs內存管理及緩存對象如何利用item表達並存儲在slabs管理的空間中,並分析瞭如何實現LRU策略實現緩存對象的釋放。但memcached是一個client-server結構的緩存系統,服務端須要接收客戶端的指令,而後在服務端作相應的操做,好比新建緩存對象,讀取緩存對象。memcached使用libevent實現這些網絡通訊操做,接下來將會分析memcached如何使用libevent處理網絡事件。git

1. threads in memcached

  一般的網絡編程模型中,服務端都會有一個處於listening狀態的socket,當客戶端嘗試與服務端創建鏈接時,服務端會創建一個新的socket與該客戶端通訊,並將該通訊任務交給其它線程完成。事實上,memcached也採用了這樣的模型,但它經過libevent實現這個模型。github

  簡單地說libevent的工做方式就是:一個event_base,一些加入event_base中的events,每個event都有一個回調函數,event_bse不停地循環等待這些events發生,事件發生後調用對應event的回調函數。編程

  memcached中有2類event_base:main_base,位於主線程中,處理listening socket;normal event_base,位於工做線程中,處理accepted socket。它們的關係能夠簡單地以圖1-1表示,數組

 

圖1-1 memcached中的工做線程模型緩存

(ps: 工做線程的數量能夠在啓動memcached時設置)網絡

 memcached使用struct LIBEVENT_THREAD結構保存工做線程相關的信息,其定義以下,數據結構

typedef struct {
    pthread_t thread_id;        /* unique ID of this thread */
    struct event_base *base;    /* libevent handle this thread uses */
    struct event notify_event;  /* listen event for notify pipe */
    int notify_receive_fd;      /* receiving end of notify pipe */
    int notify_send_fd;         /* sending end of notify pipe */
    struct thread_stats stats;  /* Stats generated by this thread */
    struct conn_queue *new_conn_queue; /* queue of new connections to handle */
    cache_t *suffix_cache;      /* suffix cache */
#ifdef EXTSTORE
    cache_t *io_cache;          /* IO objects */
    void *storage;              /* data object for storage system */
#endif
    logger *l;                  /* logger buffer */
    void *lru_bump_buf;         /* async LRU bump buffer */
#ifdef TLS
    char   *ssl_wbuf;
#endif

} LIBEVENT_THREAD;

thread_id它保存了線程的線程id,base,保存了屬於該線程的event_base結構。socket

  notify_event, notify_received_fd, notify_send_fd三個成員一塊兒用於完成喚醒線程的任務,notify_event被加入到它的event_base中,監聽nofity_received_fd上的讀事件,當有讀事件發生時,喚醒線程。外部想要喚醒該線程時,向notify_send_fd發送數據,便可引起nofity_received_fd上的讀事件。這兩個fd的關聯經過pipe通道完成。async

  另外有一個new_conn_queue成員,這個成員用來協助完成主線程的任務分發工做。當主線程接受新的鏈接後,創建一個struct conn_queue_item結構,並將該結構push到某個工做線程的new_conn_queue指向的隊列上。worker thread被喚醒後從該隊列上pop新的鏈接創建新的事件加入它的event_base,開始監聽它的網絡通訊。如下是new_conn_queue的及其元素CQ_ITEM的定義

/* A connection queue. */
typedef struct conn_queue CQ;
struct conn_queue {
    CQ_ITEM *head;
    CQ_ITEM *tail;
    pthread_mutex_t lock;
};

typedef struct conn_queue_item CQ_ITEM;
struct conn_queue_item {
    int               sfd;
    enum conn_states  init_state;
    int               event_flags;
    int               read_buffer_size;
    enum network_transport     transport;
    enum conn_queue_item_modes mode;
    conn *c;
    void    *ssl;
    CQ_ITEM          *next;
};

CQ_ITEM中sfd包含了相應的socket,transport表示傳輸層協議類型,read_buffer_size表明讀數據的buffer初始大小,init_state表示這個conn的初始狀態,用於struct conn結構,conn結構是memcached中用於讀寫數據的一個關鍵數據結構,後面將進行介紹。相關的隊列操做函數是cq_pop與cq_push。

  不管是listening socekt的事件回調函數仍是accepted socket的事件回調函數,在memcached中都是event_handler,該函數會調用一個至關複雜的函數drive_mathine,它會根據狀態進行不一樣的處理。如main thread中的listening socket有鏈接事件到來時,回調函數將調用accept函數接受鏈接請求,並將新創建的socket交給某個worker thread處理。worker thread中的socket的事件發生時,處理工做相對較複雜,包括接收數據,創建item,並進行存儲,或者返回響應信息等。

2. drive_machine

  首先,爲了服務端與客戶端的溝通,memcached定義了它的信息的格式協議。協議有兩類,一類是text protocol,一類是binary protocol,具體可參考https://github.com/memcached/memcached/wiki/Protocols。事件回調函數中根據協議對消息進行解析,並進行響應。

  前面提到,memcached中全部的libevent網絡事件的回調函數都是event_handler,而這個函數又簡單地調用了drive_machine完成工做,drive_mathine根據conn_states值作不一樣的操做。每個鏈接在memcached中都使用一個conn結構進行描述,其中包含了它的socket, event及讀寫數據使用到的相關buffer描述,而conn_states正是描述struct conn目前所處的狀態的成員,其取值範圍以下,  

/**
 * Possible states of a connection.
 */
enum conn_states {
    conn_listening,  /**< the socket which listens for connections */
    conn_new_cmd,    /**< Prepare connection for next command */
    conn_waiting,    /**< waiting for a readable socket */
    conn_read,       /**< reading in a command line */
    conn_parse_cmd,  /**< try to parse a command from the input buffer */
    conn_write,      /**< writing out a simple response */
    conn_nread,      /**< reading in a fixed number of bytes */
    conn_swallow,    /**< swallowing unnecessary bytes w/o storing */
    conn_closing,    /**< closing this connection */
    conn_mwrite,     /**< writing out many items sequentially */
    conn_closed,     /**< connection is closed */
    conn_watch,      /**< held by the logger thread as a watcher */
    conn_max_state   /**< Max state value (used for assertion) */
};

conn_listening狀態下drive_machine會從該listening socket上接受客戶端鏈接,創建新的鏈接,分發給工做線程。

具體的狀態轉移及所作的相關操做見圖2-1,因爲其中使用了許多struct conn的成員,在此以前先前conn的定義列出:

/**
 * The structure representing a connection into memcached.
 */
struct conn {
    int    sfd;
#ifdef TLS
    SSL    *ssl;
    char   *ssl_wbuf;
    bool ssl_enabled;
#endif
    sasl_conn_t *sasl_conn;
    bool sasl_started;
    bool authenticated;
    enum conn_states  state;
    enum bin_substates substate;
    rel_time_t last_cmd_time;
    struct event event;
    short  ev_flags;
    short  which;   /** which events were just triggered */

    char   *rbuf;   /** buffer to read commands into */
    char   *rcurr;  /** but if we parsed some already, this is where we stopped */
    int    rsize;   /** total allocated size of rbuf */
    int    rbytes;  /** how much data, starting from rcur, do we have unparsed */

    char   *wbuf;
    char   *wcurr;
    int    wsize;
    int    wbytes;
    /** which state to go into after finishing current write */
    enum conn_states  write_and_go;
    void   *write_and_free; /** free this memory after finishing writing */

    char   *ritem;  /** when we read in an item's value, it goes here */
    int    rlbytes;

    /* data for the nread state */

    /**
     * item is used to hold an item structure created after reading the command
     * line of set/add/replace commands, but before we finished reading the actual
     * data. The data is read into ITEM_data(item) to avoid extra copying.
     */

    void   *item;     /* for commands set/add/replace  */

    /* data for the swallow state */
    int    sbytes;    /* how many bytes to swallow */

    /* data for the mwrite state */
    struct iovec *iov;
    int    iovsize;   /* number of elements allocated in iov[] */
    int    iovused;   /* number of elements used in iov[] */

    struct msghdr *msglist;
    int    msgsize;   /* number of elements allocated in msglist[] */
    int    msgused;   /* number of elements used in msglist[] */
    int    msgcurr;   /* element in msglist[] being transmitted now */
    int    msgbytes;  /* number of bytes in current msg */

    item   **ilist;   /* list of items to write out */
    int    isize;
    item   **icurr;
    int    ileft;

    char   **suffixlist;
    int    suffixsize;
    char   **suffixcurr;
    int    suffixleft;
#ifdef EXTSTORE
    int io_wrapleft;
    unsigned int recache_counter;
    io_wrap *io_wraplist; /* linked list of io_wraps */
    bool io_queued; /* FIXME: debugging flag */
#endif
    enum protocol protocol;   /* which protocol this connection speaks */
    enum network_transport transport; /* what transport is used by this connection */

    /* data for UDP clients */
    int    request_id; /* Incoming UDP request ID, if this is a UDP "connection" */
    struct sockaddr_in6 request_addr; /* udp: Who sent the most recent request */
    socklen_t request_addr_size;
    unsigned char *hdrbuf; /* udp packet headers */
    int    hdrsize;   /* number of headers' worth of space is allocated */

    bool   noreply;   /* True if the reply should not be sent. */
    /* current stats command */
    struct {
        char *buffer;
        size_t size;
        size_t offset;
    } stats;

    /* Binary protocol stuff */
    /* This is where the binary header goes */
    protocol_binary_request_header binary_header;
    uint64_t cas; /* the cas to return */
    short cmd; /* current command being processed */
    int opaque;
    int keylen;
    conn   *next;     /* Used for generating a list of conn structures */
    LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */
    int (*try_read_command)(conn *c); /* pointer for top level input parser */
    ssize_t (*read)(conn  *c, void *buf, size_t count);
    ssize_t (*sendmsg)(conn *c, struct msghdr *msg, int flags);
    ssize_t (*write)(conn *c, void *buf, size_t count);
};

圖2-1 struct conn的狀態轉移圖

  如今結合struct conn定義與狀態轉移圖,對一個簡單的客戶端命令開始到客戶端收到返回消息的過程進行描述,

  1. 首先,main_base中的listening socket對應的鏈接的conn_states爲conn_listening.
  2. 當新的鏈接到達後,drive_machine調用accept函數接受新的鏈接,它(conns[id])被分發給工做線程,並處於conn_new_cmd的狀態
  3. 處於conn_new_cmd狀態的conns[id]檢查它的成員rbytes,查看緩存rbuf中是否有數據,無數據則進入conn_waiting狀態.
  4. 處於conn_waiting狀態的conns[id]的成員event在event_base中等待觸發,令conns[id]的下一個狀態爲conn_read
  5. conns[id]的讀事件觸發,處於conn_read狀態的它從網絡上讀數據存儲conns[id].rbuf中,並進入conn_parse_cmd狀態
  6. 處於conn_parse_cmd狀態的conns[id]從conns[id].buf中解析命令頭,以binary protocol爲例,解析出來的頭部信息放入conns[id]. binary_header中,包含了key的長度,extended信息的長度,body的長度,命令等關鍵信息,這些工做調用conns. try_read_command函數完成。
  7. try_read_command根據解析的命令作不一樣的處理:信息已經足夠,不須要讀取更多信息,直接處理而後返回結果,如查詢版本的命令、noop命令等,處理後進入conn_mwrite狀態返回結果,而後再次進入conn_new_cmd狀態;命令還須要讀取更多信息,那麼首先讀取extras部分的信息,長度由conns[id].binary_header. extlen與conns[id].binary_header. keylen決定,這個長度信息放入conns[id].rlbytes中,存放數據的buf起始點記錄在conns[id].ritem中,進入conn_nread狀態(調用bin_read_key函數完成)。
  8. 處於conn_nread狀態的conns[id],或者它的rbuf中已經有須要的rlbytes數據,或者仍然須要從網絡上讀取數據放入ritem起始的位置(此時ritem指向rbuf可用空間),數據足夠後調用complete_nread函數進行處理。
  9. 若是前一次conn_nread讀取的是extras部分的信息,complete_nread會根據命令類型進行處理:不須要數據信息,如delete命令、touch命令,處理後進入conn_mwrite狀態返回結果,而後進入conn_new_cmd狀態;須要繼續讀取數據信息,如add、set命令,首先它會根據binary_head中的body長度計算出須要的存儲空間,預分配item進行存儲,即conns[id].item,令conns[id].ritem指向conns[id].item的數據區,conns[id].rlbytes記錄須要讀取的數據長度,再次進入conn_nread狀態。
  10. 再次進入conn_nread狀態的conns[id]讀取完數據後,調用complete_nread進行處理,將item加入到hashtable與lru的鏈表上,完成存儲。進入conn_new_cmd狀態。

conn_mwrite狀態下,drive_machine會向鏈接的客戶端發送消息(調用transmit函數),若是順利完成則進入下一次的conn_new_cmd狀態。

conn_write狀態表示要發送的數據在wbuf中,須要先加入到msglist中,再進入conn_mwrite狀態。

若是處理過程當中出現了錯誤,致使鏈接發生混亂或者鏈接中斷了,那麼進入conn_closing,關閉鏈接。

memcached中向客戶端返回消息使用了庫函數sendmsg,所以須要返回給客戶端的消息都存儲在結構struct conn中的msglist中,其存儲結構能夠大體表示如圖2-2:

圖2-2 struct conn結構中msglist的結構

msglist是一個動態擴展的msghdr數組, msgused是數組中已被使用的部分的計數, msgsize是數組長度,msgcurr是目前發送的消息。

iov也是一個動態擴展的iovec數組,iovsize表示數據長度,iovused表示數組中已使用的部分的計數。

buf是存儲須要發送的數據的空間,它的起始地址與長度記錄在一個iovec結構中,這些buf通常是con中的wbuf, write_and_free,ilist中的item的數據區。

struct conn結構中有一個substate成員對於complete_nread處理binary protocol很重要,complete根據該值決定後續須要作的處理。substate的取值範圍定義以下:

enum bin_substates {
    bin_no_state,
    bin_reading_set_header,
    bin_reading_cas_header,
    bin_read_set_value,
    bin_reading_get_key,
    bin_reading_stat,
    bin_reading_del_header,
    bin_reading_incr_header,
    bin_read_flush_exptime,
    bin_reading_sasl_auth,
    bin_reading_sasl_auth_data,
    bin_reading_touch_key,
};

這些狀態是與binary protocol的命令對應的,如

set命令,substate首先會設置爲bin_reading_set_header狀態,表示conns[id]在第一次conn_nread狀態讀取了它須要的額外信息,但它還有data須要讀取,須要預分配conns[id].item,後續的conn_nread狀態讀數據直接讀入item中。而後它會設置爲bin_read_set_value狀態,表示第二次conn_nread讀取了data數據並已存入item中,須要將該item放入hashtable與lru鏈表中。

delete命令,substate會設置爲bin_reading_del_header狀態,它僅須要一次conn_nread狀態讀取所須要的額外信息,而後從hashtable與lru鏈表中刪除item,返回刪除結果。

struct conn中部分其它成員的做用

ilist是一個指向item的指針的鏈表,存儲的gets命令須要返回的item的指針,它也是一個動態擴展的數據,由於text協議的gets命令可能請求多個緩存對象。

protocol表明鏈接使用的協議類型,text或者binary

transport表明UDP或者TCP傳輸層協議

thread存儲這個鏈接歸屬的工做線程的指針

suffixlist是一個char指針的動態擴展數組,存儲着客戶端的flags的字符串形式。

3.部分源碼函數功能說明

void event_handler(const int fd, const short which, void *arg) 

網絡事件的回調函數,事實上drive_machine纔是核心函數,event_handler簡單地調用了drive_machine。

static void drive_machine(conn *c) 

根據c->state的值進行不一樣的處理。

void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
                       int read_buffer_size, enum network_transport transport, void *ssl) 

conn_listening狀態的conn在接受到新的鏈接後,調用該函數構造一個conn結構,並將其分發給工做線程處理。

static enum try_read_result try_read_network(conn *c)

conn_read狀態的鏈接c調用該函數從網絡上讀取數據。

static int try_read_command_binary(conn *c) 

在將數據讀到c.rbuf後,調用該函數對binary protocol的命令進行解析,解析結果存儲c.binary_head中。

static void dispatch_bin_command(conn *c)

在try_read_command_binary中調用,根據命令執行不一樣的操做:命令不須要額外數據了,處理並返回結果;命令仍須要數據,調用bin_read_key設置好c.ritem、c.rlbytes及c.substate,函數返回,進入conn_nread狀態。

static void complete_nread_binary(conn *c) 

binaray命令須要的extras數據已經放到了緩衝區,根據c->substate決定完成命令並返回或者繼續讀data部分。

static int try_read_command_ascii(conn *c) 

text協議使用的解析命令的函數,實際上調用process_command完成工做。

static void process_command(conn *c, char *command) 

根據命令行中的命令關鍵字進行相應的處理,如get命令調用process_get_command;add/set命令調用process_update_command;incr/decr調用process_arithmetic_command…

static enum transmit_result transmit(conn *c) 

將鏈接的c->msglist中的消息發送回客戶端。核心操做爲庫函數sendmsg。

相關文章
相關標籤/搜索