Memcached的網絡模型是基於Libevent網絡庫開發的,同時Memcached採用多線程的工做方式,工做線程和主線程之間採用pipe進行通訊。Memcached的網絡線程模型主要涉及兩個主要文件:memcached.c 和thread.c文件。redis
Memcached的網絡模型流程大體以下:網絡
一、memcached會在main函數中建立主線程的event_base,將監聽端口的socket註冊到主線程的event_base,由主線程來監聽和接受客戶端鏈接。數據結構
二、main函數建立主線程的同時,也會建立N個工做線程,每一個工做線程都擁有各自的event_base 和LIBEVENT_THREAD數據結構來存儲線程的信息(線程基本信息、線程隊列、pipe文件描述符)。工做線程會將pipe管道的接收端 fd 註冊到本身的event_base。多線程
三、當有新鏈接創建時,主線程會經過accept 函數來與客戶端創建新鏈接,同時將新鏈接相關的信息填入CQ_ITEM結構並放入工做線程的conn_queue隊列,同時向選定的工做線程的管道寫入字符,以此觸發工做線程的libevent事件。框架
四、主線程是經過求餘數的方式來選擇線程池中的一個工做線程,工做線程獲得通知後,會從conn_queue隊列中取出CQ_ITEM,並將fd註冊到工做線程的Libevent實例上,從而由工做線程來處理該鏈接的全部後續事件。socket
總體框架圖:tcp
主線程的主要工做就是監聽端口和初始化工做線程。下面代碼值列出一部分相關內容。memcached
int main (int argc, char **argv) { //這個方法主要用來建立工做線程 memcached_thread_init(settings.num_threads, NULL); errno = 0; if (settings.port && server_sockets(settings.port,tcp_transport,portnumber_file)) { vperror("failed to listen on TCP port %d", settings.port); exit(EX_OSERR); } /* enter the event loop */ //這邊開始進行主線程的事件循環 if (event_base_loop(main_base, 0) != 0) { retval = EXIT_FAILURE; } }
memcached會經過memcached_thread_init 方法來建立工做線程。函數
void memcached_thread_init(int nthreads, void *arg) { //......省略部分代碼 for (i = 0; i < nthreads; i++) { int fds[2]; //這邊會建立pipe,主要用於主線程和工做線程之間的通訊 if (pipe(fds)) { perror("Can't create notify pipe"); exit(1); } // threads是每一個線程都擁有的基本結構:LIBEVENT_THREAD threads[i].notify_receive_fd = fds[0]; threads[i].notify_send_fd = fds[1]; //這個方法很是重要,主要是建立每一個線程本身的libevent的event_base //監聽本身的通訊管道接收端,同時初始化工做隊列 setup_thread(&threads[i]); /* Reserve three fds for the libevent base, and two for the pipe */ stats_state.reserved_fds += 5; } /* Create threads after we've done all the libevent setup. */ //這裏是循環建立線程 //線程建立的回調函數是worker_libevent for (i = 0; i < nthreads; i++) { create_worker(worker_libevent, &threads[i]); } /* Wait for all the threads to set themselves up before returning. */ pthread_mutex_lock(&init_lock); wait_for_thread_registration(nthreads); pthread_mutex_unlock(&init_lock); }
setup_thread 方法建立線程本身的event_base,工做線程在初始化時會將pipe的寫事件註冊到event_base,其寫事件回調函數爲 thread_libevent_process。當主線程接受到客戶端鏈接時,向工做線程的pipe寫字符,就會觸發工做線程的thread_libevent_process 回調函數。oop
static void setup_thread(LIBEVENT_THREAD *me) { //.......省略部分代碼 //每一個獨立的線程都應該有本身獨立的event_base me->base = event_init(); if (! me->base) { fprintf(stderr, "Can't allocate event base\n"); exit(1); } /* Listen for notifications from other threads */ //這邊很是重要,這邊主要建立pipe的讀事件EV_READ的監聽 //當pipe中有寫入事件的時候,libevent就會回調thread_libevent_process方法 event_set(&me->notify_event, me->notify_receive_fd, EV_READ | EV_PERSIST, thread_libevent_process, me); event_base_set(me->base, &me->notify_event); //添加事件操做 if (event_add(&me->notify_event, 0) == -1) { fprintf(stderr, "Can't monitor libevent notify pipe\n"); exit(1); } //初始化一個工做隊列 me->new_conn_queue = malloc(sizeof(struct conn_queue)); if (me->new_conn_queue == NULL) { perror("Failed to allocate memory for connection queue"); exit(EXIT_FAILURE); } cq_init(me->new_conn_queue); //初始化線程鎖 if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) { perror("Failed to initialize mutex"); exit(EXIT_FAILURE); } me->suffix_cache = cache_create("suffix", SUFFIX_SIZE, sizeof(char*), NULL, NULL); if (me->suffix_cache == NULL) { fprintf(stderr, "Failed to create suffix cache\n"); exit(EXIT_FAILURE); } }
以上是工做線程建立時初始化event_base的部分,真正建立線程的方法是 memcached_thread_init 中的create_work方法。
//真正建立工做線程 static void create_worker(void *(*func)(void *), void *arg) { pthread_attr_t attr; int ret; pthread_attr_init(&attr); if ((ret = pthread_create(&((LIBEVENT_THREAD*)arg)->thread_id, &attr, func, arg)) != 0) { fprintf(stderr, "Can't create thread: %s\n", strerror(ret)); exit(1); } }
create_worker方法在建立線程時,指定了線程的運行函數爲worker_libevent,工做線程的運行函數其實就是進入事件循環,等待監聽的事件觸發。
//工做線程運行函數 static void *worker_libevent(void *arg) { //......省略部分代碼 register_thread_initialized(); //這個方法主要是開啓事件的循環 //每一個線程中都會有本身獨立的event_base和事件的循環機制 //memcache的每一個工做線程都會獨立處理本身接管的鏈接 event_base_loop(me->base, 0); //銷燬event_base event_base_free(me->base); return NULL; }
到目前爲止,每一個工做線程的初始化工做已經完成,每一個工做線程只監聽了pipe的寫事件,其回調函數爲thread_libevent_process。
//管道有數據寫入時的回調函數 static void thread_libevent_process(int fd, short which, void *arg) { LIBEVENT_THREAD *me = arg; CQ_ITEM *item; char buf[1]; conn *c; unsigned int timeout_fd; //回調函數中回去讀取pipe中的信息 //主線程中若是有新的鏈接,會向其中一個線程的pipe中寫入1 //這邊讀取pipe中的數據,若是爲1,則說明從pipe中獲取的數據是正確的 if (read(fd, buf, 1) != 1) { if (settings.verbose > 0) fprintf(stderr, "Can't read from libevent pipe\n"); return; } switch (buf[0]) { case 'c': //從工做線程的隊列中獲取一個CQ_ITEM鏈接信息 item = cq_pop(me->new_conn_queue); //若是item不爲空,則須要進行鏈接的接管 if (NULL == item) { break; } switch (item->mode) { case queue_new_conn: //conn_new這個方法很是重要,主要是建立socket的讀寫等監聽事件。 //init_state 爲初始化的類型,主要在drive_machine中經過這個狀態類判斷處理類型 c = conn_new(item->sfd, item->init_state, item->event_flags, item->read_buffer_size, item->transport, me->base); if (c == NULL) { if (IS_UDP(item->transport)) { fprintf(stderr, "Can't listen for events on UDP socket\n"); exit(1); } else { if (settings.verbose > 0) { fprintf(stderr, "Can't listen for events on fd %d\n", item->sfd); } close(item->sfd); } } else { c->thread = me; } break; case queue_redispatch: conn_worker_readd(item->c); break; } cqi_free(item); break; /* we were told to pause and report in */ case 'p': register_thread_initialized(); break; /* a client socket timed out */ case 't': if (read(fd, &timeout_fd, sizeof(timeout_fd)) != sizeof(timeout_fd)) { if (settings.verbose > 0) fprintf(stderr, "Can't read timeout fd from libevent pipe\n"); return; } conn_close_idle(conns[timeout_fd]); break; } }
在新鏈接到來時,會調用conn_new 函數,監聽新鏈接的讀寫事件。而且讀寫事件的回調函數爲event_handler,event_handler方法的核心是 drive_machine,在這個函數中,memcached會根據鏈接的不一樣狀態來進行不一樣的操做。
//主線程主要是監聽用戶的socket鏈接事件;工做線程主要監聽socket的讀寫事件 //當用戶socket的鏈接有數據傳遞過來的時候,就會調用event_handler這個回調函數 conn *conn_new(){ //......省略部分代碼 event_set(&c->event, sfd, event_flags, event_handler, (void *)c); event_base_set(base, &c->event); }
static void drive_machine(conn *c) { //......省略部分代碼 while (!stop) { //這邊經過state來處理不一樣類型的事件 switch(c->state) { //這邊主要處理tcp鏈接,只有在主線程的下,纔會執行listening監聽操做 //監聽狀態 case conn_listening: //...... //等待狀態,等待客戶端的數據報文到來 case conn_waiting: //...... //讀取事件 //例若有用戶提交數據過來的時候,工做線程監聽到事件後,最終會調用這塊代碼 //讀取數據的事件,當客戶端有數據報文上傳的時候,就會觸發libevent的讀事件 case conn_read: //...... } } return; }
drive_machine方法也是主線程event_base 回調函數的核心,主線程的socket是經過main函數中server_sockets方法建立的,而server_sockets中主要調用了server_socket這個方法,咱們能夠看下server_socket這個方法:
static int server_socket(const char *interface, int port, enum network_transport transport,FILE *portnumber_file) { //建立一個新的事件 //咱們發現上面的工做線程也是調用這個方法,可是區別是這個方法指定了state的類型爲:conn_listening //注意這邊有一個conn_listening,這個參數主要是指定調用drive_machine這個方法中的conn_listen代碼塊。 if (!(listen_conn_add = conn_new(sfd, conn_listening,EV_READ | EV_PERSIST, 1,transport, main_base))) { fprintf(stderr, "failed to create listening connection\n"); exit(EXIT_FAILURE); } listen_conn_add->next = listen_conn; listen_conn = listen_conn_add; }
conn_new 方法已經介紹過了,該方法最終會進入drive_machine方法,而且鏈接狀態爲 conn_listening。memcached在 conn_listening的狀態時,會調用dispath_conn_new來將新鏈接的相關信息push到工做線程的隊列中。
case conn_listening: addrlen = sizeof(addr); sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen); if (sfd == -1) { if (use_accept4 && errno == ENOSYS) { use_accept4 = 0; continue; } perror(use_accept4 ? "accept4()" : "accept()"); if (errno == EAGAIN || errno == EWOULDBLOCK) { /* these are transient, so don't log anything */ stop = true; } else if (errno == EMFILE) { if (settings.verbose > 0) fprintf(stderr, "Too many open connections\n"); accept_new_conns(false); stop = true; } else { perror("accept()"); stop = true; } break; } if (!use_accept4) { if (fcntl(sfd, F_SETFL, fcntl(sfd, F_GETFL) | O_NONBLOCK) < 0) { perror("setting O_NONBLOCK"); close(sfd); break; } } if (settings.maxconns_fast && stats_state.curr_conns + stats_state.reserved_fds >= settings.maxconns - 1) { str = "ERROR Too many open connections\r\n"; res = write(sfd, str, strlen(str)); close(sfd); STATS_LOCK(); stats.rejected_conns++; STATS_UNLOCK(); } else { //若是客戶端用socket鏈接上來,則會調用這個分發邏輯的函數 //這個函數會將鏈接信息分發到某一個工做線程中,而後工做線程接管具體的讀寫操做 dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE, c->transport); } stop = true; break;
dispath_conn_new 方法其實就是申請CQ_ITEM結構來保存鏈接信息,並將該結構PUSH到選定線程的隊列中,同時向該線程的pipe寫入字符,觸發該工做線程的libevent網絡時間,從源碼也能夠發現,memcached選擇工做線程的方式是經過取餘數來實現的。
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size, enum network_transport transport) { //每一個鏈接連上來的時候,都會申請一塊CQ_ITEM的內存塊,用於存儲鏈接的基本信息 CQ_ITEM *item = cqi_new(); char buf[1]; //若是item建立失敗,則關閉鏈接 if (item == NULL) { close(sfd); /* given that malloc failed this may also fail, but let's try */ fprintf(stderr, "Failed to allocate memory for connection object\n"); return ; } //這個方法很是重要。主要是經過求餘數的方法來獲得當前的鏈接須要哪一個線程來接管 //並且last_thread會記錄每次最後一次使用的工做線程,每次記錄以後就可讓工做線程進入一個輪詢,保證了每一個工做線程處理的鏈接數的平衡 int tid = (last_thread + 1) % settings.num_threads; //獲取線程的基本結構 LIBEVENT_THREAD *thread = threads + tid; last_thread = tid; item->sfd = sfd; item->init_state = init_state; item->event_flags = event_flags; item->read_buffer_size = read_buffer_size; item->transport = transport; item->mode = queue_new_conn; //向工做線程的隊列中放入CQ_ITEM cq_push(thread->new_conn_queue, item); MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id); buf[0] = 'c'; //向工做線程的pipe中寫入1 //工做線程監聽到pipe中有寫入數據,工做線程接收到通知後,就會向thread->new_conn_queue隊列中pop出一個item,而後進行鏈接的接管操做 if (write(thread->notify_send_fd, buf, 1) != 1) { perror("Writing to thread notify pipe"); } }
如下是儲存鏈接信息的CQ_ITEM結構以及每一個線程的處理隊列結構。處理隊列結構其實是鏈表實現的。
//儲存鏈接信息的CQ_ITEM結構 typedef struct conn_queue_item CQ_ITEM; struct conn_queue_item { int sfd; //socket的fd enum conn_states init_state; //事件類型 int event_flags; //libevent的flags int read_buffer_size; //讀取的buffer的size enum network_transport transport; CQ_ITEM *next; //下一個item的地址 }; //每一個線程的處理隊列結構。 typedef struct conn_queue CQ; struct conn_queue { CQ_ITEM *head; CQ_ITEM *tail; pthread_mutex_t lock; }