從main函數開始,位於memcached.capp
1 int main (int argc, char **argv) { 2 //....................... 3 //....................... 4 //....................... 5 6 7 /* handle SIGINT 註冊信號處理函數,目前sig_handler是空函數*/ 8 signal(SIGINT, sig_handler); 9 10 settings_init(); 11 /* init settings 12 初始化默認設置,其中 13 settings.port = 11211; 默認端口 14 settings.maxbytes = 64 * 1024 * 1024; 默認使用64MB內存 15 settings.num_threads = 4; 默認啓動4個工做線程 16 settings.item_size_max = 1024 * 1024; 默認每對 key value的value最大1MB 17 */ 18 19 20 //....................... 21 //....................... 22 //....................... 23 24 25 /* 初始化主線程的libevent實例 */ 26 main_base = event_init(); 27 28 29 //....................... 30 //....................... 31 //....................... 32 33 34 /* 啓動工做線程 */ 35 thread_init(settings.num_threads, main_base); 36 /* 37 // 主線程是分配線程,分配工做給工做線程 38 dispatcher_thread.base = main_base; 39 dispatcher_thread.thread_id = pthread_self(); 40 //設置工做線程的屬性以及它們各自的libevent實例初始化 41 for (i = 0; i < nthreads; i++) { 42 int fds[2]; 43 pipe(fds) 44 threads[i].notify_receive_fd = fds[0]; 45 threads[i].notify_send_fd = fds[1]; 46 setup_thread(&threads[i]); 47 } 48 //啓動線程,線程處理函數爲worker_libevent, 每一個線程有各自的event_base_loop 49 for (i = 0; i < nthreads; i++) { 50 create_worker(worker_libevent, &threads[i]); 51 } 52 */ 53 54 55 //....................... 56 //....................... 57 //....................... 58 59 60 /* 建立服務端的socket */ 61 server_sockets(settings.port, tcp_transport, portnumber_file)) 62 63 //....................... 64 //....................... 65 //....................... 66 67 68 /* 主線程的libevent消息循環 */ 69 if (event_base_loop(main_base, 0) != 0) { 70 retval = EXIT_FAILURE; 71 } 72 73 74 //....................... 75 //....................... 76 //....................... 77 78 79 return retval; 80 }
主線程socket
1 server_sockets(settings.port, tcp_transport, portnumber_file) 2 ---> 3 server_socket(settings.inter, port, transport, portnumber_file); 4 ---> 5 listen(sfd, settings.backlog) 6 listen_conn_add = conn_new(sfd, conn_listening, EV_READ | EV_PERSIST, 1, transport, main_base) 7 ---> 8 //設置sfd的消息響應函數event_handler 9 event_set(&c->event, sfd, event_flags, event_handler, (void *)c); 10 event_base_set(base, &c->event); 11 ---> 12 void event_handler(const int fd, const short which, void *arg) { 13 conn *c; 14 c = (conn *)arg; 15 16 //....................... 17 //....................... 18 //....................... 19 20 drive_machine(c); 21 22 return; 23 } 24 ---> 25 //drive_machine 26 sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen) 27 dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST, 28 DATA_BUFFER_SIZE, tcp_transport); 29 30 ---> 31 void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, 32 int read_buffer_size, enum network_transport transport) { 33 char buf[1]; 34 int tid = (last_thread + 1) % settings.num_threads; 35 36 LIBEVENT_THREAD *thread = threads + tid; 37 38 //往其中一個worker線程的管道寫,分配這個連接給worker 39 buf[0] = 'c'; 40 if (write(thread->notify_send_fd, buf, 1) != 1) { 41 perror("Writing to thread notify pipe"); 42 } 43 }
函數補充:tcp
1 static void setup_thread(LIBEVENT_THREAD *me) { 2 me->base = event_init(); 3 if (! me->base) { 4 fprintf(stderr, "Can't allocate event base\n"); 5 exit(1); 6 } 7 8 // 把管道的讀事件註冊到libevent對象中 9 // 分配線程會往工做線程的 notify_send_fd 寫數據,而後觸發工做線程的 notify_receive_fd 的讀事件 10 event_set(&me->notify_event, me->notify_receive_fd, 11 EV_READ | EV_PERSIST, thread_libevent_process, me); 12 event_base_set(me->base, &me->notify_event); 13 14 if (event_add(&me->notify_event, 0) == -1) { 15 fprintf(stderr, "Can't monitor libevent notify pipe\n"); 16 exit(1); 17 } 18 19 //....................... 20 //....................... 21 //....................... 22 }
工做線程memcached
thread_libevent_process -> conn_new -> event_set(&c->event, sfd, event_flags, event_handler, (void *)c);event_base_set(base, &c->event);函數
->event_handler -> drive_machineoop
工做線程的消息libevent事件處理就由drive_machine負責ui
嘗試去理解它的工做流程spa
drive_machine線程
-->3d
case conn_read:
case READ_DATA_RECEIVED:
conn_set_state(c, conn_parse_cmd);
-->
case conn_parse_cmd :
if (try_read_command(c) == 0) {
-->
static int try_read_command(conn *c) {
//..............
if (c->protocol == binary_prot) {
}
else {
//............
process_command(c, c->rcurr);
}
-->
static void process_command(conn *c, char *command) {
//假設是set
} else if ((ntokens == 6 || ntokens == 7) &&
((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
(strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
(strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
(strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||
(strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) {
process_update_command(c, tokens, ntokens, comm, false);
-->
process_update_command(c, tokens, ntokens, comm, false);
{
//...........
//其實我不懂爲何要分配一個出來
it = item_alloc(key, nkey, flags, realtime(exptime), vlen);
//................
conn_set_state(c, conn_nread);
}
-->
//drive_machine
case conn_nread:
if (c->rlbytes == 0) {
complete_nread(c);
-->
complete_nread_ascii(c);
{
ret = store_item(it, comm, c);
}
不知道理解得對不對
-----------------------------------------------------------------------------------------------------------------
用戶鏈接memcached成功後,主要邏輯在drive_machine這個函數
我嘗試打日誌,理解它的流程
A:表示memcached B:表示用戶
A: ./memcached -m 512 -p 14444 -vv
B: telnet 127.0.0.1 14444
A: ############################## state = conn_listening
B: add id 1 0 4
A:
############################## state = conn_new_cmd
############################## state = conn_waiting
############################## state = conn_read
############################## state = conn_parse_cmd
32: Client using the ascii protocol
<32 add id 1 0 4
############################## state = conn_nread
B: 1234
A:
############################## state = conn_nread
############################## state = conn_nread
>32 STORED
############################## state = conn_write
############################## state = conn_mwrite
############################## state = conn_write
############################## state = conn_mwrite
############################## state = conn_new_cmd
############################## state = conn_waiting
B: 收到
STORED
B: get id
A:
############################## state = conn_read
############################## state = conn_parse_cmd
<32 get id
>32 sending key id
>32 END
############################## state = conn_mwrite
############################## state = conn_mwrite
############################## state = conn_new_cmd
############################## state = conn_waiting
B: 收到
VALUE id 1 4
1234
END
B: quit
A:
############################## state = conn_read############################## state = conn_parse_cmd<32 quit############################## state = conn_closing<32 connection closed.