接上一篇redis網絡通訊模塊,memcached的網絡通訊藉助了libevent庫,簡化了底層的網絡讀寫事件的封裝,可是memcached多了多線程處理html
核心流程:客戶端 -> 服務端memcached master -> master fd -> master accept(fd) -> client fd -> worker線程處理client fd全部讀寫事件git
master處理流程在main()函數中,雖然main函數共有6892-8334共計1442行代碼,6892->8104共計1212行都是在解析memcached的配置、和作一些主流程無關的小操做。github
main中的核心代碼不到230行(8104->8334),仍是很是容易分析的redis
master作的核心工做有一下幾點數組
這個是libevent事件循環的核心共享結構體,後面的事件註冊,啓動等都須要這個結構體安全
這裏須要注意的是memcached使用了帶參的event_base_new()函數,參數是libevent配置服務器
ev_config = event_config_new();
event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK);
main_base = event_base_new_with_config(ev_config);
複製代碼
不爲event_base分配鎖,這樣能夠提升一些libevent的性能,雖然這樣可能致使多線程下使用event_base變得不安全,可是memcached的event_base是一個線程一個event_base,因此不存在這個問題網絡
memcached.c::server_socket()函數中使用socket()和bind()方法啓動了tcp服務器,並獲得這個表明memcached server的fd多線程
核心代碼以下:socket
static int server_socket(const char *interface, int port, enum network_transport transport, FILE *portnumber_file, bool ssl_enabled) {
// ...
for (next= ai; next; next= next->ai_next) {
conn *listen_conn_add;
if ((sfd = new_socket(next)) == -1) {
// ...
continue;
}
// 省略ipv6邏輯...
if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {
// ...
close(sfd);
continue;
} else {
success++;
if (!IS_UDP(transport) && listen(sfd, settings.backlog) == -1) {
// ...
}
// ...
}
// 省略udp邏輯...
if (!(listen_conn_add = conn_new(sfd, conn_listening,
EV_READ | EV_PERSIST, 1,
transport, main_base, NULL))) {
// ...
}
// ...
}
複製代碼
雖然memcached的main()函數一開始就建立了master的libevent事件循環,可是使用是很是靠後的,在worker線程都初始化完成後,才建立tcp服務器,併爲master的event_base註冊回調函數
master的event_handler回調函數以下:
void event_handler(const int fd, const short which, void *arg) {
conn *c;
c = (conn *)arg;
assert(c != NULL);
c->which = which;
/* sanity */
if (fd != c->sfd) {
if (settings.verbose > 0)
fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");
conn_close(c);
return;
}
drive_machine(c);
/* wait for next event */
return;
}
複製代碼
能夠看到這個event_handler裏面只是作了一些錯誤檢查,而後就當即交給drive_machine函數進行處理了
這個drive_machine主要是對c->state
變量進行邏輯判斷處理,對於master的c變量的state屬性,它的值在memcached.c:653被設置了,值是memcached.c::6242行傳遞進去的,爲:conn_listening,drive_machine對應的邏輯以下
addrlen = sizeof(addr);
#ifdef HAVE_ACCEPT4
if (use_accept4) {
sfd = accept4(c->sfd, (struct sockaddr *)&addr, &addrlen, SOCK_NONBLOCK);
} else {
sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
}
#else
sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
#endif
// 省略流程無關代碼...
#ifdef TLS
// ssl相關邏輯省略...
#endif
dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
DATA_BUFFER_SIZE, c->transport, ssl_v);
}
stop = true;
複製代碼
能夠看到master線程的tcp服務器有新客戶端鏈接進入後,drive_machine處理邏輯會當即獲取這個memcached客戶端鏈接fd爲sfd,並交給了dispatch_conn_new函數進行處理
dispatch_conn_new函數核心邏輯以下
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size, enum network_transport transport, void *ssl) {
CQ_ITEM *item = cqi_new();
char buf[1];
if (item == NULL) {
close(sfd);
/* given that malloc failed this may also fail, but let's try */
fprintf(stderr, "Failed to allocate memory for connection object\n");
return ;
}
int tid = (last_thread + 1) % settings.num_threads;
LIBEVENT_THREAD *thread = threads + tid;
last_thread = tid;
item->sfd = sfd;
item->init_state = init_state;
item->event_flags = event_flags;
item->read_buffer_size = read_buffer_size;
item->transport = transport;
item->mode = queue_new_conn;
item->ssl = ssl;
cq_push(thread->new_conn_queue, item);
MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
buf[0] = 'c';
if (write(thread->notify_send_fd, buf, 1) != 1) {
perror("Writing to thread notify pipe");
}
}
複製代碼
這裏面主要是
後續worker線程的event_base監聽到notify_receive_fd的可讀事件就開始工做了, 由於這裏使用pipe線程通訊技術,因此notify_send_fd寫數據,觸發notify_receive_fd的可讀事件,pipe線程通訊技術參考:blog.csdn.net/robertkun/a…
至此,memcached master線程,網絡相關的主流程就走完了,剩下的事情就交給worker進程處理了
當客戶端向memcached服務器寫入輸入時,可使用clion進行斷點測試,好比在event_handler上進行斷點
memcached.c::conn_new()
註冊事件回調
event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
event_base_set(base, &c->event);
複製代碼
event_handler
就是設置的回調函數
worker線程的回調在master裏就指定好了,參考:thread.c::setup_thread(),worker的回調函數被指定爲了thread_libevent_process
,結合master的處理流程,這個worker的回調函數只有在master獲得新鏈接時被觸發
參考thread.c::thread_libevent_process()函數,核心代碼以下
item = cq_pop(me->new_conn_queue);
c = conn_new(item->sfd, item->init_state, item->event_flags,
item->read_buffer_size, item->transport,
me->base, item->ssl);
if (c == NULL) {
// 省略錯誤處理...
} else {
c->thread = me;
#ifdef TLS
// 省略ssl邏輯...
#endif
}
複製代碼
worker調用cq_pop從隊列中獲取客戶端鏈接後,而後就交給了conn_new函數進行處理
conn_new函數把客戶端的讀寫事件交給了event_handler函數進行處理,核心代碼以下
event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
複製代碼
master也使用event_handler函數來處理master的事件,worker也使用event_handler處理事件,是不會衝突的,由於在event_handler裏的drive_machine執行體中,會經過這個事件參數conn結構體的state屬性的進行分支判斷, worker的conn的state從CQ_ITEM獲取,值爲conn_new_cmd
客戶端往memcached服務器發送數據,觸發event_handler函數執行,此時state值爲conn_new_cmd 對應drive_machine核心代碼以下
while (!stop) {
case conn_new_cmd:
/* Only process nreqs at a time to avoid starving other connections */
--nreqs;
if (nreqs >= 0) {
reset_cmd_handler(c);
} else {
// 異常處理邏輯...
}
break;
}
複製代碼
最上層是個無限循環,裏面確定有個出口,把請求交給了reset_cmd_handler()函數進行處理,核心代碼以下
static void reset_cmd_handler(conn *c) {
// ...
if (c->rbytes > 0) {
conn_set_state(c, conn_parse_cmd);
} else {
conn_set_state(c, conn_waiting);
}
}
複製代碼
若是還有數據能夠讀取,就設置state爲conn_parse_cmd狀態,不然,進入conn_waiting狀態
case conn_parse_cmd :
if (c->try_read_command(c) == 0) {
/* wee need more data! */
conn_set_state(c, conn_waiting);
}
break;
複製代碼
進行數據讀取,而後將狀態設置爲conn_waiting, 這個try_read_command裏面就對客戶端的輸入進行了處理,並把結果返回了客戶端,參考: try_read_command_ascii
static int try_read_command_ascii(conn *c) {
char *el, *cont;
if (c->rbytes == 0)
return 0;
el = memchr(c->rcurr, '\n', c->rbytes);
if (!el) {
// 略過異常處理邏輯...
return 0;
}
cont = el + 1;
if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
el--;
}
*el = '\0';
assert(cont <= (c->rcurr + c->rbytes));
c->last_cmd_time = current_time;
process_command(c, c->rcurr);
c->rbytes -= (cont - c->rcurr);
c->rcurr = cont;
assert(c->rcurr <= (c->rbuf + c->rsize));
return 1;
}
複製代碼
調用process_command進行請求處理了,至此,memcached的網絡處理到這裏就結束了,參考memcached.c::process_command()
本文基於memcached master分支寫成,參考:github.com/memcached/m…
memcached第一版沒有多線程,多線程的worker處理是在後續加上的,參考github上的1.2.0第一版:github.com/memcached/m…
pthread_create函數用來建立線程
memcached使用的是多線程模型,而不是多進程模型,這一點很是重要,這意味着memcached的master線程和worker線程能夠共享一些結構體變量,好比表明worker線程的LIBEVENT_THREAD結構體,源碼以下
typedef struct {
pthread_t thread_id; /* unique ID of this thread */
struct event_base *base; /* libevent handle this thread uses */
struct event notify_event; /* listen event for notify pipe */
int notify_receive_fd; /* receiving end of notify pipe */
int notify_send_fd; /* sending end of notify pipe */
struct thread_stats stats; /* Stats generated by this thread */
struct conn_queue *new_conn_queue; /* queue of new connections to handle */
cache_t *suffix_cache; /* suffix cache */
#ifdef EXTSTORE
cache_t *io_cache; /* IO objects */
void *storage; /* data object for storage system */
#endif
logger *l; /* logger buffer */
void *lru_bump_buf; /* async LRU bump buffer */
#ifdef TLS
char *ssl_wbuf;
#endif
} LIBEVENT_THREAD;
複製代碼
memcached就是master經過往共享的notify_send_fd字段上寫入數據,觸發worker線程處理新的客戶端鏈接fd的
thread.c::dispatch_conn_new() -> cq_push(thread->new_conn_queue, item) -> write(thread->notify_send_fd, buf, 1)
write(thread->notify_send_fd, buf, 1)
就是觸發worker線程的event_base,notify_send_fd的可讀事件
enum conn_states {
conn_listening, /**< the socket which listens for connections */
conn_new_cmd, /**< Prepare connection for next command */
conn_waiting, /**< waiting for a readable socket */
conn_read, /**< reading in a command line */
conn_parse_cmd, /**< try to parse a command from the input buffer */
conn_write, /**< writing out a simple response */
conn_nread, /**< reading in a fixed number of bytes */
conn_swallow, /**< swallowing unnecessary bytes w/o storing */
conn_closing, /**< closing this connection */
conn_mwrite, /**< writing out many items sequentially */
conn_closed, /**< connection is closed */
conn_watch, /**< held by the logger thread as a watcher */
conn_max_state /**< Max state value (used for assertion) */
};
複製代碼
CQ_ITEM的state屬性的值在memcached.c::5725被指定成了conn_new_cmd
master創建的tcp服務器,接收的memcached客戶端鏈接均是fd,注意區分fd表明的角色