一樣的方式,要把一個 fd 掛載到 EPFD 上去,須要調用系統 API epoll_ctl ,搜索一下這個函數名。在文件 ae_epoll.c 中咱們找到 aeApiAddEvent 函數:redis
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { aeApiState *state = eventLoop->apidata; struct epoll_event ee = {0}; /* avoid valgrind warning */ /* If the fd was already monitored for some event, we need a MOD * operation. Otherwise we need an ADD operation. */ int op = eventLoop->events[fd].mask == AE_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; ee.events = 0; mask |= eventLoop->events[fd].mask; /* Merge old events */ if (mask & AE_READABLE) ee.events |= EPOLLIN; if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; ee.data.fd = fd; if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1; return 0; }
當把一個 fd 綁定到 EPFD 上去的時候,先從 eventLoop( aeEventLoop類型 )中尋找是否存在已關注的事件類型,若是已經有了,說明使用 epoll_ctl 是更改已綁定的 fd 事件類型( EPOLL_CTL_MOD ),不然就是添加 fd 到 EPFD 上。api
在 aeApiAddEvent 加個斷點,再重啓下 redis-server 。觸發斷點後的調用堆棧以下:服務器
#0 aeCreateFileEvent (eventLoop=0x7ffff083a0a0, fd=15, mask=mask@entry=1, proc=0x437f50 <acceptTcpHandler>, clientData=clientData@entry=0x0) at ae.c:145 #1 0x000000000042f83b in initServer () at server.c:1927 #2 0x0000000000423803 in main (argc=<optimized out>, argv=0x7fffffffe588) at server.c:3857
一樣在 initServer 函數中,結合上文分析的偵聽 fd 的建立過程,去掉無關代碼,抽出這個函數的主脈絡獲得以下僞代碼:網絡
void initServer(void) { //記錄程序進程 ID server.pid = getpid(); //建立程序的 aeEventLoop 對象和 epfd 對象 server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR); //建立偵聽 fd listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR) //將偵聽 fd 設置爲非阻塞的 anetNonBlock(NULL,server.sofd); //建立 Redis 的定時器,用於執行定時任務 cron /* Create the timer callback, this is our way to process many background * operations incrementally, like clients timeout, eviction of unaccessed * expired keys and so forth. */ aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR //將偵聽 fd 綁定到 epfd 上去 /* Create an event handler for accepting new connections in TCP and Unix * domain sockets. */ aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR //建立一個管道,用於在須要時去喚醒 epoll_wait 掛起的整個 EventLoop /* Register a readable event for the pipe used to awake the event loop * when a blocked client in a module needs attention. */ aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE, moduleBlockedClientPipeReadable,NULL) == AE_ERR) }
注意:這裏所說的「主脈絡」是指咱們關心的網絡通訊的主脈絡,不表明這個函數中其餘代碼就不是主要的。app
如何驗證這個斷點處掛載到 EPFD 上的 fd 就是偵聽 fd 呢?很簡單,建立偵聽 fd 時,用 GDB 記錄下這個 fd 的值。例如,當個人電腦某次運行時,偵聽 fd 的值是 15 。以下圖( 調試工具用的是 CGDB ):dom
而後在運行程序至綁定 fd 的地方,確認一下綁定到 EPFD 上的 fd 值:socket
這裏的 fd 值也是 15 ,說明綁定的 fd 是偵聽 fd 。固然在綁定偵聽 fd 時,同時也指定了只關注可讀事件,並設置事件回調函數爲 acceptTcpHandler 。對於偵聽 fd ,通常只要關注可讀事件就能夠了,當觸發可讀事件,說明有新的鏈接到來。tcp
aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR
acceptTcpHandler 函數定義以下( 位於文件 networking.c 中 ):函數
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { int cport, cfd, max = MAX_ACCEPTS_PER_CALL; char cip[NET_IP_STR_LEN]; UNUSED(el); UNUSED(mask); UNUSED(privdata); while(max--) { cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); if (cfd == ANET_ERR) { if (errno != EWOULDBLOCK) serverLog(LL_WARNING, "Accepting client connection: %s", server.neterr); return; } serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport); acceptCommonHandler(cfd,0,cip); } }
anetTcpAccept 函數中調用的就是咱們上面說的 anetGenericAccept 函數了。工具
int anetTcpAccept(char *err, int s, char *ip, size_t ip_len, int *port) { int fd; struct sockaddr_storage sa; socklen_t salen = sizeof(sa); if ((fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)) == -1) return ANET_ERR; if (sa.ss_family == AF_INET) { struct sockaddr_in *s = (struct sockaddr_in *)&sa; if (ip) inet_ntop(AF_INET,(void*)&(s->sin_addr),ip,ip_len); if (port) *port = ntohs(s->sin_port); } else { struct sockaddr_in6 *s = (struct sockaddr_in6 *)&sa; if (ip) inet_ntop(AF_INET6,(void*)&(s->sin6_addr),ip,ip_len); if (port) *port = ntohs(s->sin6_port); } return fd; }
至此,這段流程總算連起來了,在 acceptTcpHandler 上加個斷點,而後從新運行一下 redis-server ,再開個 redis-cli 去鏈接 redis-server 。看看是否能觸發該斷點,若是能觸發該斷點,說明咱們的分析是正確的。
經驗證,確實觸發了該斷點。
在 acceptTcpHandler 中成功接受新鏈接後,產生客戶端 fd ,而後調用 acceptCommonHandler 函數,在該函數中調用 createClient 函數,在 createClient 函數中先將客戶端 fd 設置成非阻塞的,而後將該 fd 關聯到 EPFD 上去,同時記錄到整個程序的 aeEventLoop 對象上。
client *createClient(int fd) { //將客戶端 fd 設置成非阻塞的 anetNonBlock(NULL,fd); //啓用 tcp NoDelay 選項 anetEnableTcpNoDelay(NULL,fd); //根據配置,決定是否啓動 tcpkeepalive 選項 if (server.tcpkeepalive) anetKeepAlive(NULL,fd,server.tcpkeepalive); //將客戶端 fd 綁定到 epfd,同時記錄到 aeEventLoop 上,關注的事件爲 AE_READABLE,回調函數爲 //readQueryFromClient aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR return c; }
客戶端 fd 觸發可讀事件後,回調函數是 readQueryFromClient 。該函數實現以下( 位於 networking.c 文件中):
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { client *c = (client*) privdata; int nread, readlen; size_t qblen; UNUSED(el); UNUSED(mask); readlen = PROTO_IOBUF_LEN; /* If this is a multi bulk request, and we are processing a bulk reply * that is large enough, try to maximize the probability that the query * buffer contains exactly the SDS string representing the object, even * at the risk of requiring more read(2) calls. This way the function * processMultiBulkBuffer() can avoid copying buffers to create the * Redis Object representing the argument. */ if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1 && c->bulklen >= PROTO_MBULK_BIG_ARG) { int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf); if (remaining < readlen) readlen = remaining; } qblen = sdslen(c->querybuf); if (c->querybuf_peak < qblen) c->querybuf_peak = qblen; c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); nread = read(fd, c->querybuf+qblen, readlen); if (nread == -1) { if (errno == EAGAIN) { return; } else { serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno)); freeClient(c); return; } } else if (nread == 0) { serverLog(LL_VERBOSE, "Client closed connection"); freeClient(c); return; } else if (c->flags & CLIENT_MASTER) { /* Append the query buffer to the pending (not applied) buffer * of the master. We'll use this buffer later in order to have a * copy of the string applied by the last command executed. */ c->pending_querybuf = sdscatlen(c->pending_querybuf, c->querybuf+qblen,nread); } sdsIncrLen(c->querybuf,nread); c->lastinteraction = server.unixtime; if (c->flags & CLIENT_MASTER) c->read_reploff += nread; server.stat_net_input_bytes += nread; if (sdslen(c->querybuf) > server.client_max_querybuf_len) { sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty(); bytes = sdscatrepr(bytes,c->querybuf,64); serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); sdsfree(ci); sdsfree(bytes); freeClient(c); return; } /* Time to process the buffer. If the client is a master we need to * compute the difference between the applied offset before and after * processing the buffer, to understand how much of the replication stream * was actually applied to the master state: this quantity, and its * corresponding part of the replication stream, will be propagated to * the sub-slaves and to the replication backlog. */ if (!(c->flags & CLIENT_MASTER)) { processInputBuffer(c); } else { size_t prev_offset = c->reploff; processInputBuffer(c); size_t applied = c->reploff - prev_offset; if (applied) { replicationFeedSlavesFromMasterStream(server.slaves, c->pending_querybuf, applied); sdsrange(c->pending_querybuf,applied,-1); } } }
給這個函數加個斷點,而後從新運行下 redis-server ,再啓動一個客戶端,而後嘗試給服務器發送一個命令「set hello world」。可是在咱們實際調試的時候會發現。只要 redis-cli 一鏈接成功,GDB 就觸發該斷點,此時並無發送咱們預想的命令。咱們單步調試 readQueryFromClient 函數,將收到的數據打印出來,獲得以下字符串:
(gdb) p c->querybuf $8 = (sds) 0x7ffff09b8685 "*1\r\n$7\r\nCOMMAND\r\n"
c → querybuf 是什麼呢?這裏 c 的類型是 client 結構體,它是上文中鏈接接收成功後產生的新客戶端 fd 綁定回調函數時產生的、並傳遞給 readQueryFromClient 函數的參數。咱們能夠在 server.h 中找到它的定義:
* With multiplexing we need to take per-client state. * Clients are taken in a linked list. */ typedef struct client { uint64_t id; /* Client incremental unique ID. */ int fd; /* Client socket. */ redisDb *db; /* Pointer to currently SELECTed DB. */ robj *name; /* As set by CLIENT SETNAME. */ sds querybuf; /* Buffer we use to accumulate client queries. */ //省略掉部分字段 } client;
client 其實是存儲每一個客戶端鏈接信息的對象,其 fd 字段就是當前鏈接的 fd,querybuf 字段就是當前鏈接的接收緩衝區,也就是說每一個新客戶端鏈接都會產生這樣一個對象。從 fd 上收取數據後就存儲在這個 querybuf 字段中。
咱們貼一下完整的 createClient 函數的代碼:
client *createClient(int fd) { client *c = zmalloc(sizeof(client)); /* passing -1 as fd it is possible to create a non connected client. * This is useful since all the commands needs to be executed * in the context of a client. When commands are executed in other * contexts (for instance a Lua script) we need a non connected client. */ if (fd != -1) { anetNonBlock(NULL,fd); anetEnableTcpNoDelay(NULL,fd); if (server.tcpkeepalive) anetKeepAlive(NULL,fd,server.tcpkeepalive); if (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR) { close(fd); zfree(c); return NULL; } } selectDb(c,0); uint64_t client_id; atomicGetIncr(server.next_client_id,client_id,1); c->id = client_id; c->fd = fd; c->name = NULL; c->bufpos = 0; c->querybuf = sdsempty(); c->pending_querybuf = sdsempty(); c->querybuf_peak = 0; c->reqtype = 0; c->argc = 0; c->argv = NULL; c->cmd = c->lastcmd = NULL; c->multibulklen = 0; c->bulklen = -1; c->sentlen = 0; c->flags = 0; c->ctime = c->lastinteraction = server.unixtime; c->authenticated = 0; c->replstate = REPL_STATE_NONE; c->repl_put_online_on_ack = 0; c->reploff = 0; c->read_reploff = 0; c->repl_ack_off = 0; c->repl_ack_time = 0; c->slave_listening_port = 0; c->slave_ip[0] = '\0'; c->slave_capa = SLAVE_CAPA_NONE; c->reply = listCreate(); c->reply_bytes = 0; c->obuf_soft_limit_reached_time = 0; listSetFreeMethod(c->reply,freeClientReplyValue); listSetDupMethod(c->reply,dupClientReplyValue); c->btype = BLOCKED_NONE; c->bpop.timeout = 0; c->bpop.keys = dictCreate(&objectKeyPointerValueDictType,NULL); c->bpop.target = NULL; c->bpop.numreplicas = 0; c->bpop.reploffset = 0; c->woff = 0; c->watched_keys = listCreate(); c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType,NULL); c->pubsub_patterns = listCreate(); c->peerid = NULL; listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid); listSetMatchMethod(c->pubsub_patterns,listMatchObjects); if (fd != -1) listAddNodeTail(server.clients,c); initClientMultiState(c); return c; }