memcached網絡通訊模塊設計與實現分析

接上一篇redis網絡通訊模塊,memcached的網絡通訊藉助了libevent庫,簡化了底層的網絡讀寫事件的封裝,可是memcached多了多線程處理html

核心流程:客戶端 -> 服務端memcached master -> master fd -> master accept(fd) -> client fd -> worker線程處理client fd全部讀寫事件git

master處理流程

master處理流程在main()函數中,雖然main函數共有6892-8334共計1442行代碼,6892->8104共計1212行都是在解析memcached的配置、和作一些主流程無關的小操做。github

main中的核心代碼不到230行(8104->8334),仍是很是容易分析的redis

master作的核心工做有一下幾點數組

建立libevent事件循環-event_base_new_with_config()

這個是libevent事件循環的核心共享結構體,後面的事件註冊,啓動等都須要這個結構體安全

這裏須要注意的是memcached使用了帶參的event_base_new()函數,參數是libevent配置服務器

ev_config = event_config_new();
event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK);
main_base = event_base_new_with_config(ev_config);
複製代碼

不爲event_base分配鎖,這樣能夠提升一些libevent的性能,雖然這樣可能致使多線程下使用event_base變得不安全,可是memcached的event_base是一個線程一個event_base,因此不存在這個問題網絡

啓動tcp服務器

memcached.c::server_socket()函數中使用socket()和bind()方法啓動了tcp服務器,並獲得這個表明memcached server的fd多線程

核心代碼以下:socket

static int server_socket(const char *interface, int port, enum network_transport transport, FILE *portnumber_file, bool ssl_enabled) {
    // ...

    for (next= ai; next; next= next->ai_next) {
        conn *listen_conn_add;
        if ((sfd = new_socket(next)) == -1) {
            // ...
            continue;
        }

        // 省略ipv6邏輯...
        if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {
            // ...
            close(sfd);
            continue;
        } else {
            success++;
            if (!IS_UDP(transport) && listen(sfd, settings.backlog) == -1) {
                // ...
            }
            // ...
        }

        // 省略udp邏輯...
            if (!(listen_conn_add = conn_new(sfd, conn_listening,
                                             EV_READ | EV_PERSIST, 1,
                                             transport, main_base, NULL))) {
            // ...
        }

    // ...
}
複製代碼

註冊libevent事件循環事件

雖然memcached的main()函數一開始就建立了master的libevent事件循環,可是使用是很是靠後的,在worker線程都初始化完成後,才建立tcp服務器,併爲master的event_base註冊回調函數

master的event_handler回調函數以下:

void event_handler(const int fd, const short which, void *arg) {
    conn *c;

    c = (conn *)arg;
    assert(c != NULL);

    c->which = which;

    /* sanity */
    if (fd != c->sfd) {
        if (settings.verbose > 0)
            fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");
        conn_close(c);
        return;
    }

    drive_machine(c);

    /* wait for next event */
    return;
}
複製代碼

能夠看到這個event_handler裏面只是作了一些錯誤檢查,而後就當即交給drive_machine函數進行處理了

這個drive_machine主要是對c->state變量進行邏輯判斷處理,對於master的c變量的state屬性,它的值在memcached.c:653被設置了,值是memcached.c::6242行傳遞進去的,爲:conn_listening,drive_machine對應的邏輯以下

addrlen = sizeof(addr);
#ifdef HAVE_ACCEPT4
    if (use_accept4) {
        sfd = accept4(c->sfd, (struct sockaddr *)&addr, &addrlen, SOCK_NONBLOCK);
    } else {
        sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
    }
#else
    sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
#endif
    // 省略流程無關代碼...

#ifdef TLS
        // ssl相關邏輯省略...
#endif

        dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
                             DATA_BUFFER_SIZE, c->transport, ssl_v);
    }

    stop = true;
複製代碼

能夠看到master線程的tcp服務器有新客戶端鏈接進入後,drive_machine處理邏輯會當即獲取這個memcached客戶端鏈接fd爲sfd,並交給了dispatch_conn_new函數進行處理

dispatch_conn_new函數核心邏輯以下

void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size, enum network_transport transport, void *ssl) {
    CQ_ITEM *item = cqi_new();
    char buf[1];
    if (item == NULL) {
        close(sfd);
        /* given that malloc failed this may also fail, but let's try */
        fprintf(stderr, "Failed to allocate memory for connection object\n");
        return ;
    }

    int tid = (last_thread + 1) % settings.num_threads;

    LIBEVENT_THREAD *thread = threads + tid;

    last_thread = tid;

    item->sfd = sfd;
    item->init_state = init_state;
    item->event_flags = event_flags;
    item->read_buffer_size = read_buffer_size;
    item->transport = transport;
    item->mode = queue_new_conn;
    item->ssl = ssl;

    cq_push(thread->new_conn_queue, item);

    MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
    buf[0] = 'c';
    if (write(thread->notify_send_fd, buf, 1) != 1) {
        perror("Writing to thread notify pipe");
    }
}
複製代碼

這裏面主要是

  1. 選擇一個worker線程用於處理這個客戶端鏈接後續的讀寫操做
  2. 聲明瞭一個CQ_ITEM結構體表明這個客戶端的鏈接,並保存到worker線程的new_conn_queue屬性上,這個屬性是個數組,而且被當成隊列使用了
  3. 向worker線程的notify_send_fd屬性發送一個字符'c'通知worker線程有新客戶端鏈接分配給它了

後續worker線程的event_base監聽到notify_receive_fd的可讀事件就開始工做了, 由於這裏使用pipe線程通訊技術,因此notify_send_fd寫數據,觸發notify_receive_fd的可讀事件,pipe線程通訊技術參考:blog.csdn.net/robertkun/a…

至此,memcached master線程,網絡相關的主流程就走完了,剩下的事情就交給worker進程處理了

libevent啓動時機

當客戶端向memcached服務器寫入輸入時,可使用clion進行斷點測試,好比在event_handler上進行斷點

conn_new()

memcached.c::conn_new()

註冊事件回調

event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
event_base_set(base, &c->event);
複製代碼

event_handler 就是設置的回調函數

worker處理流程

worker線程的回調在master裏就指定好了,參考:thread.c::setup_thread(),worker的回調函數被指定爲了thread_libevent_process,結合master的處理流程,這個worker的回調函數只有在master獲得新鏈接時被觸發

worker獲取客戶端鏈接

參考thread.c::thread_libevent_process()函數,核心代碼以下

item = cq_pop(me->new_conn_queue);

c = conn_new(item->sfd, item->init_state, item->event_flags,
                   item->read_buffer_size, item->transport,
                   me->base, item->ssl);
if (c == NULL) {
    // 省略錯誤處理...
} else {
    c->thread = me;
#ifdef TLS
    // 省略ssl邏輯...
#endif
}
複製代碼

worker調用cq_pop從隊列中獲取客戶端鏈接後,而後就交給了conn_new函數進行處理

conn_new函數把客戶端的讀寫事件交給了event_handler函數進行處理,核心代碼以下

event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
複製代碼

master也使用event_handler函數來處理master的事件,worker也使用event_handler處理事件,是不會衝突的,由於在event_handler裏的drive_machine執行體中,會經過這個事件參數conn結構體的state屬性的進行分支判斷, worker的conn的state從CQ_ITEM獲取,值爲conn_new_cmd

客戶端讀寫事件分發

客戶端往memcached服務器發送數據,觸發event_handler函數執行,此時state值爲conn_new_cmd 對應drive_machine核心代碼以下

while (!stop) {
    case conn_new_cmd:
        /* Only process nreqs at a time to avoid starving other connections */

        --nreqs;
        if (nreqs >= 0) {
            reset_cmd_handler(c);
        } else {
            // 異常處理邏輯...
        }
        break;
}
複製代碼

最上層是個無限循環,裏面確定有個出口,把請求交給了reset_cmd_handler()函數進行處理,核心代碼以下

static void reset_cmd_handler(conn *c) {
    // ...
    if (c->rbytes > 0) {
        conn_set_state(c, conn_parse_cmd);
    } else {
        conn_set_state(c, conn_waiting);
    }
}
複製代碼

若是還有數據能夠讀取,就設置state爲conn_parse_cmd狀態,不然,進入conn_waiting狀態

conn_parse_cmd

case conn_parse_cmd :
    if (c->try_read_command(c) == 0) {
        /* wee need more data! */
        conn_set_state(c, conn_waiting);
    }

    break;
複製代碼

進行數據讀取,而後將狀態設置爲conn_waiting, 這個try_read_command裏面就對客戶端的輸入進行了處理,並把結果返回了客戶端,參考: try_read_command_ascii

static int try_read_command_ascii(conn *c) {
    char *el, *cont;

    if (c->rbytes == 0)
        return 0;

    el = memchr(c->rcurr, '\n', c->rbytes);
    if (!el) {
        // 略過異常處理邏輯...
        return 0;
    }
    cont = el + 1;
    if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
        el--;
    }
    *el = '\0';

    assert(cont <= (c->rcurr + c->rbytes));

    c->last_cmd_time = current_time;
    process_command(c, c->rcurr);

    c->rbytes -= (cont - c->rcurr);
    c->rcurr = cont;

    assert(c->rcurr <= (c->rbuf + c->rsize));

    return 1;
}
複製代碼

調用process_command進行請求處理了,至此,memcached的網絡處理到這裏就結束了,參考memcached.c::process_command()

一些注意的點

本文基於memcached master分支寫成,參考:github.com/memcached/m…

memcached第一版沒有多線程,多線程的worker處理是在後續加上的,參考github上的1.2.0第一版:github.com/memcached/m…

pthread_create函數用來建立線程

核心共享結構體LIBEVENT_THREAD

memcached使用的是多線程模型,而不是多進程模型,這一點很是重要,這意味着memcached的master線程和worker線程能夠共享一些結構體變量,好比表明worker線程的LIBEVENT_THREAD結構體,源碼以下

typedef struct {
    pthread_t thread_id;        /* unique ID of this thread */
    struct event_base *base;    /* libevent handle this thread uses */
    struct event notify_event;  /* listen event for notify pipe */
    int notify_receive_fd;      /* receiving end of notify pipe */
    int notify_send_fd;         /* sending end of notify pipe */
    struct thread_stats stats;  /* Stats generated by this thread */
    struct conn_queue *new_conn_queue; /* queue of new connections to handle */
    cache_t *suffix_cache;      /* suffix cache */
#ifdef EXTSTORE
    cache_t *io_cache;          /* IO objects */
    void *storage;              /* data object for storage system */
#endif
    logger *l;                  /* logger buffer */
    void *lru_bump_buf;         /* async LRU bump buffer */
#ifdef TLS
    char   *ssl_wbuf;
#endif

} LIBEVENT_THREAD;
複製代碼

memcached就是master經過往共享的notify_send_fd字段上寫入數據,觸發worker線程處理新的客戶端鏈接fd的

master線程獲取客戶端鏈接fd,經過隊列傳給worker進行處理

thread.c::dispatch_conn_new() -> cq_push(thread->new_conn_queue, item) -> write(thread->notify_send_fd, buf, 1)

write(thread->notify_send_fd, buf, 1)就是觸發worker線程的event_base,notify_send_fd的可讀事件

memcached客戶端鏈接狀態枚舉

enum conn_states {
    conn_listening,  /**< the socket which listens for connections */
    conn_new_cmd,    /**< Prepare connection for next command */
    conn_waiting,    /**< waiting for a readable socket */
    conn_read,       /**< reading in a command line */
    conn_parse_cmd,  /**< try to parse a command from the input buffer */
    conn_write,      /**< writing out a simple response */
    conn_nread,      /**< reading in a fixed number of bytes */
    conn_swallow,    /**< swallowing unnecessary bytes w/o storing */
    conn_closing,    /**< closing this connection */
    conn_mwrite,     /**< writing out many items sequentially */
    conn_closed,     /**< connection is closed */
    conn_watch,      /**< held by the logger thread as a watcher */
    conn_max_state   /**< Max state value (used for assertion) */
};
複製代碼

CQ_ITEM的state屬性的值在memcached.c::5725被指定成了conn_new_cmd

master創建的tcp服務器,接收的memcached客戶端鏈接均是fd,注意區分fd表明的角色

參考資料

  1. blog.csdn.net/ta893115871…
  2. www.cnblogs.com/gqtcgq/p/72…
相關文章
相關標籤/搜索