從 server.c 的 main 方法看起react
int main(int argc, char **argv) { ....... aeSetBeforeSleepProc(server.el,beforeSleep); aeSetAfterSleepProc(server.el,afterSleep); aeMain(server.el); aeDeleteEventLoop(server.el); return 0; }
aeMain.credis
//在死循環中調用 aeProcessEvents 方法,處理能夠執行的 time event 與 file event // 在 server.c 的 main 函數中會被調用 void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); } }
/* Process every pending time event, then every pending file event * (that may be registered by time event callbacks just processed). * Without special flags the function sleeps until some file event * fires, or when the next time event occurs (if any). * * If flags is 0, the function does nothing and returns. * if flags has AE_ALL_EVENTS set, all the kind of events are processed. * if flags has AE_FILE_EVENTS set, file events are processed. * if flags has AE_TIME_EVENTS set, time events are processed. * if flags has AE_DONT_WAIT set the function returns ASAP until all * if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called. * the events that's possible to process without to wait are processed. * * The function returns the number of events processed. */ int aeProcessEvents(aeEventLoop *eventLoop, int flags) { ........ /* Note that we want call select() even if there are no * file events to process as long as we want to process time * events, in order to sleep until the next time event is ready * to fire. */ //優先執行 time event if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { int j; aeTimeEvent *shortest = NULL; struct timeval tv, *tvp; if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) //找到time event 鏈表裏,最近的 time event shortest = aeSearchNearestTimer(eventLoop); //計算從如今起到這個time event 被執行,要等待多久 if (shortest) { long now_sec, now_ms; aeGetTime(&now_sec, &now_ms); tvp = &tv; /* How many milliseconds we need to wait for the next * time event to fire? */ long long ms = (shortest->when_sec - now_sec)*1000 + shortest->when_ms - now_ms; if (ms > 0) { tvp->tv_sec = ms/1000; tvp->tv_usec = (ms % 1000)*1000; } else { tvp->tv_sec = 0; tvp->tv_usec = 0; } } else { /* If we have to check for events but need to return * ASAP because of AE_DONT_WAIT we need to set the timeout * to zero */ if (flags & AE_DONT_WAIT) { tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } else { /* Otherwise we can block */ tvp = NULL; /* wait forever */ } } /* Call the multiplexing API, will return only on timeout or when * some event fires. */ //調用 IO 多路複用的代碼,找到可讀寫的 file event numevents = aeApiPoll(eventLoop, tvp); /* After sleep callback. */ if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) eventLoop->aftersleep(eventLoop); //遍歷 event loop 的 fired 數組對應的 fd for (j = 0; j < numevents; j++) { aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask;//記錄了事件類型:read/write int fd = eventLoop->fired[j].fd;//事件的 fd int fired = 0; /* Number of events fired for current fd. */ /* Normally we execute the readable event first, and the writable * event laster. This is useful as sometimes we may be able * to serve the reply of a query immediately after processing the * query. * * However if AE_BARRIER is set in the mask, our application is * asking us to do the reverse: never fire the writable event * after the readable. In such a case, we invert the calls. * This is useful when, for instance, we want to do things * in the beforeSleep() hook, like fsynching a file to disk, * before replying to a client. */ int invert = fe->mask & AE_BARRIER; /* Note the "fe->mask & mask & ..." code: maybe an already * processed event removed an element that fired and we still * didn't processed, so we check if the event is still valid. * * Fire the readable event if the call sequence is not * inverted. */ if (!invert && fe->mask & mask & AE_READABLE) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } /* Fire the writable event. */ if (fe->mask & mask & AE_WRITABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { fe->wfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } /* If we have to invert the call, fire the readable event now * after the writable one. */ if (invert && fe->mask & mask & AE_READABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } processed++; } } /* Check time events */ if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop); return processed; /* return the number of processed file/time events */ }
標準的事件驅動框架,在死循環中調用aeProcessEvents方法算法
aeProcessEvents 方法比較長,裏面會處理兩種事件TimeEvent 與 FileEvent,本文關注的重點是 FileEventapi
aeProcessEvents 調用 aeApiPoll 方法來查找監聽的 fd 上有哪些是可用的,找到可用的 fd 以後,根據 fd 的事件類型,決定調用 wfileProc 仍是rfileProc 來處理相關的事件, 本文裏咱們關心的是 client 發來的 command 會被如何處理,那就是rfileProc了,rfileProc的設置過程在後文中被說起數組
aeApiPoll 在多個文件中被實現,Redis 用條件編譯的手法決定採用哪一種實現,頗有意思app
/* Include the best multiplexing layer supported by this system. * The following should be ordered by performances, descending. */ //用宏實現編譯期重載,很穩 #ifdef HAVE_EVPORT #include "ae_evport.c" #else #ifdef HAVE_EPOLL #include "ae_epoll.c" #else #ifdef HAVE_KQUEUE #include "ae_kqueue.c" #else #include "ae_select.c" #endif #endif #endif
就看最經典的 epoll 好了:框架
typedef struct aeApiState { int epfd; struct epoll_event *events; } aeApiState; //建立eventloop static int aeApiCreate(aeEventLoop *eventLoop) { aeApiState *state = zmalloc(sizeof(aeApiState)); if (!state) return -1; state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize); if (!state->events) { zfree(state); return -1; } state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */ if (state->epfd == -1) { zfree(state->events); zfree(state); return -1; } eventLoop->apidata = state; return 0; } static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { aeApiState *state = eventLoop->apidata; struct epoll_event ee = {0}; /* avoid valgrind warning */ /* If the fd was already monitored for some event, we need a MOD * operation. Otherwise we need an ADD operation. */ int op = eventLoop->events[fd].mask == AE_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;//epoll_ctl函數的 op 參數的可能的取值:EPOLL_CTL_ADD 註冊、EPOLL_CTL_MOD 修 改、EPOLL_CTL_DEL 刪除 ee.events = 0; //同時修改 eventLoop 裏 event 的 mask 標記,和關聯的 epoll fd 所監聽的事件集合 mask |= eventLoop->events[fd].mask; /* Merge old events */ if (mask & AE_READABLE) ee.events |= EPOLLIN; if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; ee.data.fd = fd; if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1; return 0; } //傳入的 tvp 是 epoll 超時時間,若是 tvp 爲 null,則永久阻塞 static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { aeApiState *state = eventLoop->apidata; int retval, numevents = 0; retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1); if (retval > 0) { int j; numevents = retval; //遍歷可讀寫的 fd for (j = 0; j < numevents; j++) { int mask = 0; struct epoll_event *e = state->events+j; if (e->events & EPOLLIN) mask |= AE_READABLE; if (e->events & EPOLLOUT) mask |= AE_WRITABLE; if (e->events & EPOLLERR) mask |= AE_WRITABLE; if (e->events & EPOLLHUP) mask |= AE_WRITABLE; //設置 eventLoop.fired 數組裏的元素,這些元素表明可讀寫的 fd eventLoop->fired[j].fd = e->data.fd; eventLoop->fired[j].mask = mask; } } return numevents; }
代碼不算複雜,實際上對系統調用作了一層簡單的封裝socket
調用 epoll_ctl 方法來註冊監聽 fdtcp
調用 epoll_wait 方法來等待,直到被監聽的 fd 上有事件發生爲止函數
比較有趣的作法是aeFileEvent 結構體裏定義了一個 mask 屬性來記錄這個 fd 被監聽的事件,應該是爲了便於後續查找。
networking.c
client *createClient(int fd) { client *c = zmalloc(sizeof(client)); //fd == -1,說明這是一個用於執行 lua 腳本的無鏈接的僞客戶端,能夠省去一些開銷 /* passing -1 as fd it is possible to create a non connected client. * This is useful since all the commands needs to be executed * in the context of a client. When commands are executed in other * contexts (for instance a Lua script) we need a non connected client. */ if (fd != -1) { anetNonBlock(NULL,fd);//將這個 fd 設爲 non block 模式 anetEnableTcpNoDelay(NULL,fd);//調用 setsockopt 方法,禁止使用nagle 算法,確保數據包能儘量快速的發出去 if (server.tcpkeepalive) anetKeepAlive(NULL,fd,server.tcpkeepalive); // 給這個 client 關聯的 fd 註冊 read 事件處理函數:readQueryFromClient,其定義在文件尾部 if (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR) { close(fd); zfree(c); return NULL; } }
調用 aeCreateFileEvent 方法給這個 fd 註冊 read 事件處理函數 readQueryFromClient,也就是設置到這個 fd 的 rfileProc 屬性裏
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) { if (fd >= eventLoop->setsize) { errno = ERANGE; return AE_ERR; } aeFileEvent *fe = &eventLoop->events[fd]; if (aeApiAddEvent(eventLoop, fd, mask) == -1) return AE_ERR; fe->mask |= mask; if (mask & AE_READABLE) fe->rfileProc = proc; if (mask & AE_WRITABLE) fe->wfileProc = proc; fe->clientData = clientData; if (fd > eventLoop->maxfd) eventLoop->maxfd = fd; return AE_OK; }
當 client 發送 command 過來的時候,eventloop 會發現這個 fd 可讀,而後調用 readQueryFromClient 進行處理
//回調函數,這個函數被觸發的時候,說明 client 觸發了 read 事件 void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { ..... /* Time to process the buffer. If the client is a master we need to * compute the difference between the applied offset before and after * processing the buffer, to understand how much of the replication stream * was actually applied to the master state: this quantity, and its * corresponding part of the replication stream, will be propagated to * the sub-slaves and to the replication backlog. */ if (!(c->flags & CLIENT_MASTER)) { processInputBuffer(c);//非 master } else { //本機爲 master,除了處理 buffer 裏的命令,還要解決主從複製的問題 size_t prev_offset = c->reploff; processInputBuffer(c); size_t applied = c->reploff - prev_offset; if (applied) { replicationFeedSlavesFromMasterStream(server.slaves, c->pending_querybuf, applied); sdsrange(c->pending_querybuf,applied,-1); } } }
當 fd 可讀時,eventloop 會觸發 readQueryFromClient 這個回調函數,再調用 processInputBuffer 函數
/* This function is called every time, in the client structure 'c', there is * more query buffer to process, because we read more data from the socket * or because a client was blocked and later reactivated, so there could be * pending query buffer, already representing a full command, to process. */ void processInputBuffer(client *c) { ..... if (c->reqtype == PROTO_REQ_INLINE) { if (processInlineBuffer(c) != C_OK) break; } else if (c->reqtype == PROTO_REQ_MULTIBULK) { if (processMultibulkBuffer(c) != C_OK) break; } else { serverPanic("Unknown request type"); } /* Multibulk processing could see a <= 0 length. */ if (c->argc == 0) { resetClient(c); } else { /* Only reset the client when the command was executed. */ //終於開始執行 command 了 if (processCommand(c) == C_OK) { if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { /* Update the applied replication offset of our master. */ c->reploff = c->read_reploff - sdslen(c->querybuf); } /* Don't reset the client structure for clients blocked in a * module blocking command, so that the reply callback will * still be able to access the client argv and argc field. * The client will be reset in unblockClientFromModule(). */ if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE) resetClient(c); } /* freeMemoryIfNeeded may flush slave output buffers. This may * result into a slave, that may be the active client, to be * freed. */ if (server.current_client == NULL) break; } } server.current_client = NULL; }
調用processCommand 方法,顧名思義,裏面會對 client 發來的指令作處理
其實現位於server.c 裏
/* If this function gets called we already read a whole * command, arguments are in the client argv/argc fields. * processCommand() execute the command or prepare the * server for a bulk read from the client. * * If C_OK is returned the client is still alive and valid and * other operations can be performed by the caller. Otherwise * if C_ERR is returned the client was destroyed (i.e. after QUIT). */ int processCommand(client *c) { ...... /* Now lookup the command and check ASAP about trivial error conditions * such as wrong arity, bad command name and so forth. */ // 從 command dict 裏查找對應的 command 實現, c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr); //檢查 command 是否存在,以及參數的數量是否正確 if (!c->cmd) { flagTransaction(c); addReplyErrorFormat(c,"unknown command '%s'", (char*)c->argv[0]->ptr); return C_OK; } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) || (c->argc < -c->cmd->arity)) { flagTransaction(c); addReplyErrorFormat(c,"wrong number of arguments for '%s' command", c->cmd->name); return C_OK; } ..... //前面是檢查參數和處理各類異常狀況 /* Exec the command */ //若是處在 multi 命令開啓的事務環境中 if (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand && c->cmd->proc != discardCommand && c->cmd->proc != multiCommand && c->cmd->proc != watchCommand) { //把命令放到 queue 裏 queueMultiCommand(c); addReply(c,shared.queued); } else { //執行非事務,普通命令,實現位於本文件的2200多行 call(c,CMD_CALL_FULL); c->woff = server.master_repl_offset; if (listLength(server.ready_keys)) handleClientsBlockedOnKeys(); } return C_OK; }
這個方法有兩個關鍵點:
1. 調用 lookupCommand 方法查找 client 提交的 command 對應的實現(redis server 啓動的時候會初始化一個 dict,裏面存放了 command 名稱到實現函數的映射關係,去這個 dict 裏查就行了)
2. 執行函數,咱們先不關注事務,只看最簡單的普通命令,那麼會調用call 方法
其實現位於 server.c 裏
void call(client *c, int flags) { ...... /* Call the command. */ dirty = server.dirty; start = ustime(); c->cmd->proc(c);//執行命令 duration = ustime()-start;//計算命令執行時間 dirty = server.dirty-dirty; if (dirty < 0) dirty = 0; .... }
主要是用 cmd 的 proc 屬性,一個函數指針來完成實際操做
至於 cmd 和它的 proc 屬性,是在上一步的 lookupCommand 方法裏被設置的。
例如最簡單的 get 方法,就對應於getCommand 這個方法:
{"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
其具體實現位於t_string.c 裏,細節暫時就不跟進了。
如今咱們就大體上能理解client 發送的 command 的流轉過程了。