memcached源碼閱讀筆記一

 

從main函數開始,位於memcached.capp

 

 1 int main (int argc, char **argv) {
 2     //.......................
 3     //.......................
 4     //.......................
 5     
 6     
 7     /* handle SIGINT 註冊信號處理函數,目前sig_handler是空函數*/
 8     signal(SIGINT, sig_handler);
 9     
10     settings_init();
11     /* init settings
12          初始化默認設置,其中
13          settings.port = 11211;                     默認端口
14          settings.maxbytes = 64 * 1024 * 1024;      默認使用64MB內存
15          settings.num_threads = 4;                  默認啓動4個工做線程
16          settings.item_size_max = 1024 * 1024;      默認每對 key value的value最大1MB
17      */
18     
19     
20     //.......................
21     //.......................
22     //.......................
23     
24     
25     /* 初始化主線程的libevent實例 */
26     main_base = event_init();
27     
28     
29     //.......................
30     //.......................
31     //.......................
32     
33     
34     /* 啓動工做線程 */
35     thread_init(settings.num_threads, main_base);
36     /*
37          // 主線程是分配線程,分配工做給工做線程
38          dispatcher_thread.base = main_base;
39          dispatcher_thread.thread_id = pthread_self();
40          //設置工做線程的屬性以及它們各自的libevent實例初始化
41          for (i = 0; i < nthreads; i++) {
42          int fds[2];
43          pipe(fds)
44          threads[i].notify_receive_fd = fds[0];
45          threads[i].notify_send_fd = fds[1];
46          setup_thread(&threads[i]);
47          }
48          //啓動線程,線程處理函數爲worker_libevent, 每一個線程有各自的event_base_loop
49          for (i = 0; i < nthreads; i++) {
50          create_worker(worker_libevent, &threads[i]);
51          }
52     */
53     
54     
55     //.......................
56     //.......................
57     //.......................
58     
59     
60     /* 建立服務端的socket */
61     server_sockets(settings.port, tcp_transport, portnumber_file))
62     
63     //.......................
64     //.......................
65     //.......................
66     
67     
68     /* 主線程的libevent消息循環 */
69     if (event_base_loop(main_base, 0) != 0) {
70         retval = EXIT_FAILURE;
71     }
72     
73     
74     //.......................
75     //.......................
76     //.......................
77     
78     
79     return retval;
80 }

 

主線程socket

 1 server_sockets(settings.port, tcp_transport, portnumber_file)
 2 --->
 3 server_socket(settings.inter, port, transport, portnumber_file);
 4 --->
 5 listen(sfd, settings.backlog)
 6 listen_conn_add = conn_new(sfd, conn_listening, EV_READ | EV_PERSIST, 1, transport, main_base)
 7 --->    
 8 //設置sfd的消息響應函數event_handler
 9 event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
10 event_base_set(base, &c->event);
11 --->
12 void event_handler(const int fd, const short which, void *arg) {
13     conn *c;
14     c = (conn *)arg;
15     
16     //.......................
17     //.......................
18     //.......................
19     
20     drive_machine(c);
21     
22     return;
23 }
24 --->
25 //drive_machine
26 sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)
27 dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
28                   DATA_BUFFER_SIZE, tcp_transport);
29 
30 --->
31 void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
32                        int read_buffer_size, enum network_transport transport) {
33     char buf[1];
34     int tid = (last_thread + 1) % settings.num_threads;
35     
36     LIBEVENT_THREAD *thread = threads + tid;
37     
38     //往其中一個worker線程的管道寫,分配這個連接給worker
39     buf[0] = 'c';
40     if (write(thread->notify_send_fd, buf, 1) != 1) {
41         perror("Writing to thread notify pipe");
42     }
43 }

 

 

函數補充:tcp

 1 static void setup_thread(LIBEVENT_THREAD *me) {
 2     me->base = event_init();
 3     if (! me->base) {
 4         fprintf(stderr, "Can't allocate event base\n");
 5         exit(1);
 6     }
 7     
 8     // 把管道的讀事件註冊到libevent對象中
 9     // 分配線程會往工做線程的 notify_send_fd 寫數據,而後觸發工做線程的 notify_receive_fd 的讀事件
10     event_set(&me->notify_event, me->notify_receive_fd,
11               EV_READ | EV_PERSIST, thread_libevent_process, me);
12     event_base_set(me->base, &me->notify_event);
13     
14     if (event_add(&me->notify_event, 0) == -1) {
15         fprintf(stderr, "Can't monitor libevent notify pipe\n");
16         exit(1);
17     }
18     
19     //.......................
20     //.......................
21     //.......................
22 }

 

工做線程memcached

thread_libevent_process -> conn_new -> event_set(&c->event, sfd, event_flags, event_handler, (void *)c);event_base_set(base, &c->event);函數

->event_handler -> drive_machineoop

工做線程的消息libevent事件處理就由drive_machine負責ui

 

嘗試去理解它的工做流程spa

drive_machine線程

-->3d

case conn_read:

    case READ_DATA_RECEIVED:
    conn_set_state(c, conn_parse_cmd);

-->

case conn_parse_cmd :
    if (try_read_command(c) == 0) {

-->

static int try_read_command(conn *c) {

    //..............

    if (c->protocol == binary_prot) {

    }

    else {

        //............

        process_command(c, c->rcurr);

    }

-->

static void process_command(conn *c, char *command) { 

    //假設是set    

  } else if ((ntokens == 6 || ntokens == 7) &&
  ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
  (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
  (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
  (strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||
  (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) {

    process_update_command(c, tokens, ntokens, comm, false);

-->

process_update_command(c, tokens, ntokens, comm, false);
{

   //...........
  //其實我不懂爲何要分配一個出來
  it = item_alloc(key, nkey, flags, realtime(exptime), vlen);
  //................
  conn_set_state(c, conn_nread);
}

-->

//drive_machine
case conn_nread:
  if (c->rlbytes == 0) {
    complete_nread(c);

-->

complete_nread_ascii(c);
{
  ret = store_item(it, comm, c);
}

 

不知道理解得對不對

 

 

-----------------------------------------------------------------------------------------------------------------

用戶鏈接memcached成功後,主要邏輯在drive_machine這個函數

我嘗試打日誌,理解它的流程

A:表示memcached  B:表示用戶

A:   ./memcached -m 512 -p 14444 -vv 
B:   telnet 127.0.0.1 14444

A:   ##############################  state = conn_listening  
B:   add id 1 0 4

A:   
##############################  state = conn_new_cmd
##############################  state = conn_waiting
##############################  state = conn_read
##############################  state = conn_parse_cmd 
32: Client using the ascii protocol
<32 add id 1 0 4
##############################  state = conn_nread
 
B:  1234
A:
##############################  state = conn_nread
##############################  state = conn_nread
>32 STORED
##############################  state = conn_write
##############################  state = conn_mwrite
##############################  state = conn_write
##############################  state = conn_mwrite
##############################  state = conn_new_cmd
##############################  state = conn_waiting

B: 收到
STORED  

 

B: get id

A:

############################## state = conn_read
############################## state = conn_parse_cmd
<32 get id
>32 sending key id
>32 END
############################## state = conn_mwrite
############################## state = conn_mwrite
############################## state = conn_new_cmd
############################## state = conn_waiting

B: 收到

VALUE id 1 4
1234
END

 

B: quit

A:

############################## state = conn_read############################## state = conn_parse_cmd<32 quit############################## state = conn_closing<32 connection closed.

相關文章
相關標籤/搜索