前面分析了worker線程的初始化,以及主線程建立socket並監聽的過程。本節會分析鏈接如何創建與分發。數組
A,能夠摸清楚master線程的大體邏輯:網絡
1)初始化各個worker線程socket
2)執行socket,bind,listen...主線程進行監聽tcp
3)一旦有新的鏈接創建,則調用event_handlermemcached
B,woker線程被建立以後的邏輯:函數
1)監聽管道recv端的fd,一旦有數據過來,則調用thread_libevent_processoop
注意,worker線程其實也是利用event_base_loop將本身進行阻塞。主線程阻塞在監聽的fd上,而worker線程則阻塞在監聽管道recv端的fd上。回憶一下前文,memcached_thread_init函數中創建了管道,用於master線程和worker線程間的通訊。只有master線程接受了新的請求以後,纔會利用管道告知worker線程,而worker線程只有等管道有數據傳輸來的時候,纔會被喚醒。this
用圖展示初始狀態下的master和worker線程:spa
圖中假設主線程socket函數返回的fd是26。能夠看到master線程以及4條worker線程,都由於沒有任何事件而處於阻塞狀態。線程
另外,前文提到一個很重要的結構體conn,conns數組由conn指針組成。memcahed的每一個鏈接都對應有一個conn實例,能夠根據fd在conns數組裏找到。因爲master線程的套接口是26,因此conns[26]的指向的conn就表示master線程正監聽的socket以及一些附加信息。
咱們來模擬一下有鏈接過來時,memcached內部的執行。前文提到,一旦有鏈接過來,則master線程會被觸發執行event_handler函數。
void event_handler(const int fd, const short which, void *arg) { conn *c; c = (conn *)arg; assert(c != NULL); c->which = which; /* sanity */ if (fd != c->sfd) { if (settings.verbose > 0) fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n"); conn_close(c); return; } // 主要邏輯所有封裝在drive_machine中 drive_machine(c); /* wait for next event */ return; }
event_handler中傳入的conn,就是conns[26]。
在event_handler裏會繼續將這個conn傳遞給drive_machine。
static void drive_machine(conn *c) { bool stop = false; int sfd; socklen_t addrlen; struct sockaddr_storage addr; int nreqs = settings.reqs_per_event; int res; const char *str; #ifdef HAVE_ACCEPT4 static int use_accept4 = 1; #else static int use_accept4 = 0; #endif assert(c != NULL); // 一個大的while循環,維護了一個狀態機,根據conn的當前狀態作出處理,跳到下一狀態 while (!stop) { switch(c->state) { // 初始狀態爲conn_listening case conn_listening: addrlen = sizeof(addr); // accept鏈接,會產生一個新的fd,該鏈接以後的讀寫均經過新fd完成 #ifdef HAVE_ACCEPT4 if (use_accept4) { sfd = accept4(c->sfd, (struct sockaddr *)&addr, &addrlen, SOCK_NONBLOCK); } else { sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen); } #else sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen); #endif // accept失敗 if (sfd == -1) { // 若是是accept4未被實現,則換用accept繼續嘗試接受鏈接 if (use_accept4 && errno == ENOSYS) { use_accept4 = 0; continue; } perror(use_accept4 ? "accept4()" : "accept()"); // 若是鏈接隊列已經沒有未處理的鏈接,則終止循環 if (errno == EAGAIN || errno == EWOULDBLOCK) { /* these are transient, so don't log anything */ stop = true; } // 鏈接打滿,accept_new_conns(false)會終止event繼續觸發 else if (errno == EMFILE) { if (settings.verbose > 0) fprintf(stderr, "Too many open connections\n"); accept_new_conns(false); stop = true; } else { perror("accept()"); stop = true; } break; } // 設置新的套接字爲非阻塞 if (!use_accept4) { if (fcntl(sfd, F_SETFL, fcntl(sfd, F_GETFL) | O_NONBLOCK) < 0) { perror("setting O_NONBLOCK"); close(sfd); break; } } if (settings.maxconns_fast && stats_state.curr_conns + stats_state.reserved_fds >= settings.maxconns - 1) { str = "ERROR Too many open connections\r\n"; res = write(sfd, str, strlen(str)); close(sfd); STATS_LOCK(); stats.rejected_conns++; STATS_UNLOCK(); } else { // !!!分發並通知worker線程有一個新的鏈接 dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE, c->transport); } stop = true; break; case conn_waiting: ... case conn_read: ... case conn_parse_cmd : ... case conn_new_cmd: ... case conn_nread: ... case conn_swallow: ... case conn_write: ... case conn_mwrite: ... case conn_closing: ... case conn_closed: ... case conn_watch: ... case conn_max_state: ... } } return; }
前文曾提到drive_machine是一個大的狀態機。上面的代碼只保留了對conn_listening的處理,由於master線程接受新鏈接時,就是這個狀態。
代碼裏調用accept4或者accept函數產生新的fd,爲27。accept以後,主要就是調用dispatch_conn_new來對鏈接作分發,而且通知worker線程。
咱們來看dispatch_conn_new的實現:
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size, enum network_transport transport) { // CQ_ITEM用於封裝新鏈接的一些信息 CQ_ITEM *item = cqi_new(); char buf[1]; if (item == NULL) { close(sfd); /* given that malloc failed this may also fail, but let's try */ fprintf(stderr, "Failed to allocate memory for connection object\n"); return ; } // 挑選worker線程,採用輪循機制 int tid = (last_thread + 1) % settings.num_threads; LIBEVENT_THREAD *thread = threads + tid; last_thread = tid; // 設置item item->sfd = sfd; item->init_state = init_state; item->event_flags = event_flags; item->read_buffer_size = read_buffer_size; item->transport = transport; // 將item放入線程的new_conn_queue隊列 cq_push(thread->new_conn_queue, item); MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id); // 經過管道,寫入一字節的c,用來達到通知子線程的目的 buf[0] = 'c'; if (write(thread->notify_send_fd, buf, 1) != 1) { perror("Writing to thread notify pipe"); } }
能夠很明顯的看到,worker線程的分發是採用輪循機制的,每次選出來的都是threads數組中的下一個。
CQ_ITEM結構體用於封裝新accept的鏈接的一些相關信息,每一個線程內部都維護着一個CQ_ITEM隊列。當主線程經過管道寫入字符c以後,子線程會被通知到,有一個新的鏈接來了。因而,子線程隨後當即從CQ_ITEM隊列中取出CQ_ITEM,並對這個新的鏈接設置監聽事件等等。子線程具體的實現,後面會分析到,這裏先看下CQ_ITEM:
typedef struct conn_queue_item CQ_ITEM; struct conn_queue_item { int sfd; // accept產生的新fd enum conn_states init_state; // 子線程拿到新的鏈接以後,鏈接對應的狀態 int event_flags; // 子線程對新鏈接設置的事件 int read_buffer_size; // 通常2M enum network_transport transport; // tcp or udp conn *c; CQ_ITEM *next; };
好,至此咱們已經看完了主線程所作的工做。
只要不斷的有新鏈接進來,主線程就會不斷調用event_handler,並在drive_machine狀態機中accept & dispatch鏈接。至於接下來,接收客戶端發來的命令並作出響應等等,都是在worker線程裏完成。
這張圖畫出了子線程中的CQ_ITEM隊列,以及主線程經過管道告知子線程。
注意worker1線的鏈接,fd分別是27,31,35...由於中間其餘fd對應的鏈接會分別被worker2,worker3,worker4處理。
另外借用一張其餘博客的圖,也挺清楚的:
本小節開始看worker線程獲取通知以後,所作的一些處理。前文說到,marster線程會向管道寫入一個字符'c',用來告知worker線程有新的鏈接了。因而worker線程監聽管道的事件被觸發,worker線程會進入thread_libevent_process函數:
static void thread_libevent_process(int fd, short which, void *arg) { LIBEVENT_THREAD *me = arg; CQ_ITEM *item; char buf[1]; unsigned int timeout_fd; // 從管道里讀取一個字節 if (read(fd, buf, 1) != 1) { if (settings.verbose > 0) fprintf(stderr, "Can't read from libevent pipe\n"); return; } switch (buf[0]) { // 字符c case 'c': // item是從new_conn_queue隊列中取出的一個CQ_ITEM對象 item = cq_pop(me->new_conn_queue); if (NULL != item) { // 調用conn_new來建立conn,而且設置監聽事件 conn *c = conn_new(item->sfd, item->init_state, item->event_flags, item->read_buffer_size, item->transport, me->base); if (c == NULL) { if (IS_UDP(item->transport)) { fprintf(stderr, "Can't listen for events on UDP socket\n"); exit(1); } else { if (settings.verbose > 0) { fprintf(stderr, "Can't listen for events on fd %d\n", item->sfd); } close(item->sfd); } } else { c->thread = me; } // 釋放item cqi_free(item); } break; case 'r': ... case 'p': ... case 't': ... } }
注意conn_new函數,conn_new在前文出現過(server_socket函數中用conn_new建立了conns[26])。本例中,conn_new執行完以後,會新建立一個conn對象,而且將conns[27]指向它,今後之後,線程worker1就利用該conn來與客戶端交互。
值得一提的是,新的conn中,state被置爲conn_new_cmd,代表該鏈接已經創建,等待接受client發送的命令。而主線程所使用的conns[26],state永遠爲conn_listening,代表主線程一直在等待新的鏈接。
在conn_new中,設置事件的語句爲:
event_set(&c->event, sfd, event_flags, event_handler, (void *)c); event_base_set(base, &c->event);
可見,子線程的事件被觸發以後,也是調用event_handler函數,和主線程的事件觸發的函數同樣。
回憶下event_handler中的狀態機,worker線程的conn初始狀態爲conn_new_cmd,因此一旦worker線程接受到client的命令,便會進入drive_machine中的case conn_new_cmd分支。固然這涉及到後續具體的命令處理,已經不在本文的探討範疇了。
最後來看一下worker線程thread_libevent_process處理完畢以後的狀態: