長平狐 memcached源代碼閱讀筆記(二):網絡處理部分

Java、PHP、Ruby、iOS、Python 等 JetBrains 開發工具低至 99 元(3折),詳情» html

既然memcached是一個緩存服務器,並且要提供高效的緩存服務,那麼網絡層確定要很是 有效率才行。要能支撐大量的併發鏈接,還要有很優秀的響應速度。除此以外,由於memcached的核心業務並非網絡層,它的核心是緩存機制。那麼就必 須採用一種機制,將網絡層隔離,以避免網絡通訊部分纏繞在系統的各處,擾亂了核心邏輯。 java

在這一點上要感謝基於事件驅動的網絡庫libevent。memcached就是採用這個來做爲它的網絡層,因此對於memcache來講,即便有成千上萬的鏈接處理起來也不是什麼難事。 linux

libevent的事件驅動機制除了能提升網絡處理的效率外,還抽象了各個操做系統上最高效的方式:好比Linux上的epoll, FreeBSD上的kqueue 以及Windows的IOCP。就不說跨平臺了,單單使用好其中一種機制就不是一件容易事。除此以外,使用事件驅動的機制,還能讓你將網絡處理部分與業務 邏輯相分離。  編程

好,今天咱們就來看看memcached是如何利用libevent構建其網絡層的(注意,memcache同時支持TCP和UDP,不過本文只會關注TCP部分)。 windows

爲了更好的理解libevent,咱們先來看看linux上如何使用epoll實現基於事件的網絡編程:  緩存

#define MAX_EVENTS 10
main(){
         int sfd;
         int flags;
         int efd;
         int nfds;
         int i;
         struct epoll_event ev, events[MAX_EVENTS];
         struct addrinfo *ai;
         struct addrinfo hints = {.ai_flags = AI_PASSIVE,
                                .ai_family = AF_UNSPEC};
        hints.ai_socktype = SOCK_STREAM;
         char port_buf[NI_MAXSERV];
        snprintf(port_buf,  sizeof(port_buf),  " %d ", port);
        getaddrinfo(NULL, port_buf, &hints, &ai);
         // create a socket to linsten
        sfd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
        flags = fcntl(sfd, F_GETFL,  0);
         // set nob block
        fcntl(sfd, F_SETFL, flags | O_NONBLOCK);
         // bind
        bind(sfd, ai->ai_addr, ai->ai_addrlen);
         // listen
        listen(sfd,  15);
         // create epoll
        efd = epoll_create( 10);
        ev.events = EPOLLIN;
        ev.data.fd = sfd;
         // setup epoll
        epoll_ctl(efd, EPOLL_CTL_ADD, sfd, &ev);
         while( true){
                 // wait, block until a new connection is comming
                nfds = epoll_wait(efd, events, MAX_EVENTS, - 1);
                 for(i =  0; i < nfds; ++i){
                   dispatch(events[i]);
                }
        }

服務器

我在上面的代碼里加了少許的註釋。代碼的前半部分沒有什麼,和全部的網絡編程同樣,就是建立套接字,而後bind到地址,開始監聽。注意,這裏咱們 將這個文件描述符設置爲NON_BLOCK的,這一點很關鍵。而後咱們建立epoll,設置感興趣的事件,而後在一個循環裏wait事件的發生。 wait會阻塞,當一個事件發生時會繼續運行,而後根據事件的類型做出不一樣的處理。 網絡

可能有人要問,我沒有發現任何事件觸發的意思啊,這跟.NET裏咱們熟悉的事件處理差異太大了: 數據結構

btnOK.OnClick +=  delegate(sender, e){
    // ...

併發

不過若是你熟悉一點Win32編程的話就不那麼認爲了。不熟悉的話確定據說過消息循環吧。想一想看,上面的代碼是否是跟Win32裏的消息循環很是類 似。一個loop,而後接受事件,翻譯事件,而後根據事件的類型分發給事件處理句柄。只不過在Win32裏編寫窗體程序時,事件的來源是鼠標點擊按鈕或者 鍵盤操做,而這裏的事件來源是網絡:多是一個新的鏈接到來,也多是讀緩衝區裏數據已經準備好(要了解Win32消息循環的更多細節能夠參看我這篇文 章:點擊這裏)。至於爲什麼基於事件的這種機制對於大併發的系統頗有用不在本文的範圍內,若是要闡述清楚這個可能須要介紹如下IO模型等問題,不過你也能夠經過閱讀我以前寫的並行和異步瞭解一些概念。

不過要讓應用開發人員都親力親爲的處理消息循環來作事件驅動的編程太過於麻煩了,不論是後來的MFC,仍是VB抑或是WinForm中,那個消息循 環再也沒出現過。取而代之的是如今好用的事件。網絡編程也同樣,要親力親爲的處理這種細節,太麻煩了,並且太容易出錯。最麻煩的還有各個平臺提供的機制還 不同,如是相似libevent這樣的類庫就應運而生。 

如今咱們來看看memcache如何使用libevent進行高效的網絡編程:

//event_base 是libevent的核心數據結構

staticstruct event_base *main_base;
main_base = event_init();
//而後就是建立scoket等
fd = socket(...)
bind()
listen()
//調用conn_new方法
conn_new(sfd, conn_listening,EV_READ | EV_PERSIST, 1,
                                             transport, main_base);

conn *conn_new(const int sfd, enum conn_states init_state,
                const int event_flags,
                const int read_buffer_size, enum network_transport transport,
                struct event_base *base) {
    c->sfd = sfd;
    c->state = init_state;
    //設置感興趣的事件,事件句柄是event_handler
    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
    event_base_set(base, &c->event);
    event_add(&c->event0);
}
void event_handler(const int fd, const short which, void *arg) {
    drive_machine(c);
}
static void drive_machine(conn *c) {
 while (!stop) {
        switch(c->state) {
        case conn_listening:
            sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
            flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
                fcntl(sfd, F_SETFL, flags | O_NONBLOCK);
           dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
                                     DATA_BUFFER_SIZE, tcp_transport);

}

爲了代碼更清晰,我刪除了錯誤處理等代碼。上面的代碼從構建libevent的核心數據結構event_base開始,而後建立socket,給該 socket設置感興趣的事件,而且設置了事件處理句柄event_handler,而後咱們根據conn數據結構的state字段判斷如今是什麼事件並 做出相應的處理(conn數據結構是memcache裏處理網絡鏈接的核心數據結構之一)。第一步固然是一個新的鏈接的到來:鏈接到了memcache服 務器,這是第一個事件。而後咱們經過dispatch_conn_new函數分發這個新到來的鏈接。在鏈接到來以後咱們就要進入下一步處理了:好比從緩衝 區讀取數據,在這裏就是接收memcache客戶端發過來的各類命令,或者向緩衝區寫出數據,好比咱們接到一個get命令,而後咱們就從緩存裏讀取相應的 key對應的值,而後將該值寫到緩衝區(關於memcache協議(命令)處理的內容後面的文章會有介紹,本文咱們只關注網絡處理部分)。

不知道剛纔你閱讀過我那篇Win32消息循環的文章沒,在窗體編程中,爲了在作耗時操做時不讓界面假死咱們經常使用的作法就是建立一個有別於消息循環的 主線程,而後在這個線程裏處理這些耗時操做。這裏也同樣,當鏈接到來後,咱們要讀命令,解析命令,處理命令。爲了避免讓這些操做阻塞了主線程,即監聽鏈接到 來的線程(想一想若是阻塞告終果會是怎樣?),memcache也是相似的處理方法:主線程接收新鏈接,而後建立一些線程(可配置的)來處理命令。好,咱們 仍是來看代碼吧:

thread_init(settings.num_threads, main_base);
void thread_init( int nthreads,  struct event_base *main_base) {
    threads = calloc(nthreads,  sizeof(LIBEVENT_THREAD));

    dispatcher_thread. base = main_base;
    dispatcher_thread.thread_id = pthread_self();
    //這裏很巧妙
     for (i =  0; i < nthreads; i++) {

        //建立一個管道,管道對應兩個描述符,一個用於寫,一個用於讀。 

        int fds[2];
        pipe(fds);

        threads[i].notify_receive_fd = fds[0];
        threads[i].notify_send_fd = fds[1];

        setup_thread(&threads[i]);
        stats.reserved_fds += 5;
    }

    for (i = 0; i < nthreads; i++) {
        create_worker(worker_libevent, &threads[i]);
    }
   //只有全部的線程都初始化完畢後,這裏纔會繼續執行,不然阻塞
    while (init_count < nthreads) {
        pthread_cond_wait(&init_cond, &init_lock);
    }
}
static void setup_thread(LIBEVENT_THREAD *me) {
    me->base = event_init();

    //在這裏,咱們在管道的讀端註冊感興趣的事件 

    event_set(&me->notify_event, me->notify_receive_fd,
              EV_READ | EV_PERSIST, thread_libevent_process, me);
    event_base_set(me->base, &me->notify_event);
    event_add(&me->notify_event, 0);
}
//事件處理函數,這裏又調用了conn_new函數
static void thread_libevent_process(int fd, short which, void *arg) {
    LIBEVENT_THREAD *me = arg;
        CQ_ITEM *item;

        //從隊列裏取出要處理的鏈接

        item = cq_pop(me->new_conn_queue);
        conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
                           item->read_buffer_size, item->transport, me->base);
      
}
//建立worker線程用來處理命令等
static void create_worker(void *(*func)(void *), void *arg) {
    pthread_t       thread;
    pthread_attr_t  attr;
    int             ret;

    pthread_attr_init(&attr);

    ret = pthread_create(&thread, &attr, func, arg);

 上面的代碼會在memcache初始化的時候就會執行,會建立一堆線程等待事件處理。那麼是用什麼方式將事件傳遞給這些線程呢,這裏實現有點巧妙。注意到這幾行代碼:

pipe(fds); 

threads[i].notify_receive_fd = fds[0];
threads[i].notify_send_fd = fds[1];

這裏建立了一個管道(pipe),管道對應兩個描述符,一個用於寫,一個讀。再來看看前面一段代碼裏咱們沒有貼出的dispatch_conn_new函數:

void  dispatch_conn_new( int  sfd,  enum  conn_states init_state,  int  event_flags, int read_buffer_size,  enum network_transport transport) {

    CQ_ITEM *item = cqi_new();
    int tid = (last_thread + 1) % settings.num_threads;

    LIBEVENT_THREAD *thread = threads + tid;

    last_thread = tid;

    item->sfd = sfd;
    item->init_state = init_state;
    item->event_flags = event_flags;
    item->read_buffer_size = read_buffer_size;
    item->transport = transport;
    //將鏈接放到一個隊列裏
    cq_push(thread->new_conn_queue, item); 

    //向管道的寫端寫一個字節的數字,人爲的觸發事件 

    write(thread->notify_send_fd, ""1);

有意思的代碼就在最後一行,在接收到一個鏈接以後,從剛纔建立的一些LIBEVENT_THREAD裏,選一個。這個結構裏就有剛纔建立的那個管道 對應的兩個描述符,而後往管道的寫端寫如一個字節,由於上面的setup_thread裏爲管道的讀端設置了感興趣的事件,這個時候事件就觸發了(人爲的 觸發一個事件,而且將剛纔的鏈接放到一個隊列裏(這裏是每一個線程一個隊列),這樣就將一個同步的事件轉換爲異步的了)。事件觸發後,thread_libevent_process函數就會執行。 而後又進入到conn_new函數,進入到drive_machine函數,又根據conn的state進行事件處理。

PS:我不知道這種利用管道,而後人爲的觸發事件的機制是否是一種什麼模式或慣用法。除了在memcache這裏我見到了這種方式外,在java的 NIO裏也有相似的使用。在java NIO裏建立一個Selector後,會建立一個管道(在Linux上,而在windows上會建立一對socket),咱們能夠經過向管道的寫端寫入一 個字節來喚醒已經阻塞的selector。

在介紹完memcache的網絡處理部分後,下一篇咱們就能夠看看memcache是如何從網絡上讀取內容,解析命令的。 


原文連接: http://www.cnblogs.com/yuyijq/archive/2012/01/02/2291025.html
相關文章
相關標籤/搜索