redis 內部有一個小型的事件驅動,它和 libevent 網絡庫的事件驅動同樣,都是依託 I/O 多路複用技術支撐起來的。linux
利用 I/O 多路複用技術,監聽感興趣的文件 I/O 事件,例如讀事件,寫事件等,同時也要維護一個以文件描述符爲主鍵,數據爲某個預設函數的事件表,這裏其實就是一個數組或者鏈表 。當事件觸發時,好比某個文件描述符可讀,系統會返回文件描述符值,用這個值在事件表中找到相應的數據項,從而實現回調。一樣的,定時事件也是能夠實現的,由於系統提供的 I/O 多路複用技術中的函數容許咱們設定時間值。redis
上面一段話比較綜合,可能須要一些 linux 系統編程和網絡編程的基礎,但你會看到多數事件驅動程序都是這麼實現的(?)。編程
redis 事件驅動內部有四個主要的數據結構,分別是:事件循環結構體,文件事件結構體,時間事件結構體和觸發事件結構體。api
// 文件事件結構體 /* File event structure */ typedef struct aeFileEvent { int mask; /* one of AE_(READABLE|WRITABLE) */ // 回調函數指針 aeFileProc *rfileProc; aeFileProc *wfileProc; // clientData 參數通常是指向 redisClient 的指針 void *clientData; } aeFileEvent; // 時間事件結構體 /* Time event structure */ typedef struct aeTimeEvent { long long id; /* time event identifier. */ long when_sec; /* seconds */ long when_ms; /* milliseconds */ // 定時回調函數指針 aeTimeProc *timeProc; // 定時事件清理函數,當刪除定時事件的時候會被調用 aeEventFinalizerProc *finalizerProc; // clientData 參數通常是指向 redisClient 的指針 void *clientData; // 定時事件表採用鏈表來維護 struct aeTimeEvent *next; } aeTimeEvent; // 觸發事件 /* A fired event */ typedef struct aeFiredEvent { int fd; int mask; } aeFiredEvent; // 事件循環結構體 /* State of an event based program */ typedef struct aeEventLoop { int maxfd; /* highest file descriptor currently registered */ int setsize; /* max number of file descriptors tracked */ // 記錄最大的定時事件 id + 1 long long timeEventNextId; // 用於系統時間的矯正 time_t lastTime; /* Used to detect system clock skew */ // I/O 事件表 aeFileEvent *events; /* Registered events */ // 被觸發的事件 aeFiredEvent *fired; /* Fired events */ // 定時事件表 aeTimeEvent *timeEventHead; // 事件循環結束標識 int stop; // 對於不一樣的 I/O 多路複用技術,有不一樣的數據,詳見各自實現 void *apidata; /* This is used for polling API specific data */ // 新的循環前須要執行的操做 aeBeforeSleepProc *beforesleep; } aeEventLoop;
上面的數據結構能給咱們很好的提示:事件循環結構體維護 I/O 事件表,定時事件表和觸發事件表。數組
redis 的主函數中調用 initServer() 函數從而初始化事件循環中心(EventLoop),它的主要工做是在 aeCreateEventLoop() 中完成的。網絡
aeEventLoop *aeCreateEventLoop(int setsize) { aeEventLoop *eventLoop; int i; // 分配空間 if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err; // 分配文件事件結構體空間 eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize); // 分配已觸發事件結構體空間 eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize); if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err; eventLoop->setsize = setsize; eventLoop->lastTime = time(NULL); // 時間事件鏈表頭 eventLoop->timeEventHead = NULL; // 後續提到 eventLoop->timeEventNextId = 0; eventLoop->stop = 0; eventLoop->maxfd = -1; // 進入事件循環前須要執行的操做,此項會在 redis main() 函數中設置 eventLoop->beforesleep = NULL; // 在這裏,aeApiCreate() 函數對於每一個 IO 多路複用模型的實現都有不一樣,具體參見源代碼,由於每種 IO 多路複用模型的初始化都不一樣 if (aeApiCreate(eventLoop) == -1) goto err; /* Events with mask == AE_NONE are not set. So let's initialize the * vector with it. */ // 初始化事件類型掩碼爲無事件狀態 for (i = 0; i < setsize; i++) eventLoop->events[i].mask = AE_NONE; return eventLoop; err: if (eventLoop) { zfree(eventLoop->events); zfree(eventLoop->fired); zfree(eventLoop); } return NULL; }
有上面初始化工做只是完成了一個空空的事件中心而已。要想驅動事件循環,還須要下面的工做。數據結構
文件 I/O 事件註冊主要操做在 aeCreateFileEvent() 中完成。aeCreateFileEvent() 會根據文件描述符的數值大小在事件循環結構體的 I/O 事件表中取一個數據空間,利用系統提供的 I/O 多路複用技術監聽感興趣的 I/O 事件,並設置回調函數。dom
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) { if (fd >= eventLoop->setsize) { errno = ERANGE; return AE_ERR; } // 在 I/O 事件表中選擇一個空間 aeFileEvent *fe = &eventLoop->events[fd]; // aeApiAddEvent() 只在此函數中調用,對於不一樣 IO 多路複用實現,會有所不一樣 if (aeApiAddEvent(eventLoop, fd, mask) == -1) return AE_ERR; fe->mask |= mask; // 設置回調函數 if (mask & AE_READABLE) fe->rfileProc = proc; if (mask & AE_WRITABLE) fe->wfileProc = proc; fe->clientData = clientData; if (fd > eventLoop->maxfd) eventLoop->maxfd = fd; return AE_OK; }
對於不一樣版本的 I/O 多路複用,好比 epoll,select,kqueue 等,redis 有各自的版本,但接口統一,譬如 aeApiAddEvent()。socket
之於定時事件,在事件循環結構體中用鏈表來維護。定時事件操做在 aeCreateTimeEvent() 中完成:分配定時事件結構體,設置觸發時間和回調函數,插入到定時事件表中。tcp
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc) { /* 自增 timeEventNextId 會在處理執行定時事件時會用到,用於防止出現死循環。 若是超過了最大 id,則跳過這個定時事件,爲的是避免死循環,即: 若是事件一執行的時候註冊了事件二,事件一執行完畢後事件二獲得執行,緊接着若是事件一有獲得執行就會成爲循環,所以維護了 timeEventNextId 。*/ long long id = eventLoop->timeEventNextId++; aeTimeEvent *te; // 分配空間 te = zmalloc(sizeof(*te)); if (te == NULL) return AE_ERR; // 填充時間事件結構體 te->id = id; // 計算超時時間 aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms); // proc == serverCorn te->timeProc = proc; te->finalizerProc = finalizerProc; te->clientData = clientData; // 頭插法 te->next = eventLoop->timeEventHead; eventLoop->timeEventHead = te; return id; }
initServer() 中調用了 aeCreateEventLoop() 完成了事件中心的初始化,initServer() 還作了監聽的準備。
/* Open the TCP listening socket for the user commands. */ // listenToPort() 中有調用 listen() if (server.port != 0 && listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR) exit(1); // UNIX 域套接字 /* Open the listening Unix domain socket. */ if (server.unixsocket != NULL) { unlink(server.unixsocket); /* don't care if this fails */ server.sofd = anetUnixServer(server.neterr,server.unixsocket,server.unixsocketperm); if (server.sofd == ANET_ERR) { redisLog(REDIS_WARNING, "Opening socket: %s", server.neterr); exit(1); } }
從上面能夠看出,redis 提供了 TCP 和 UNIX 域套接字兩種工做方式。以 TCP 工做方式爲例,listenPort() 建立綁定了套接字並啓動了監聽。
在進入事件循環前還須要作一些準備工做。緊接着,initServer() 爲全部的監聽套接字註冊了讀事件,響應函數爲 acceptTcpHandler() 或者 acceptUnixHandler()。
// 建立接收 TCP 或者 UNIX 域套接字的事件處理 // TCP /* Create an event handler for accepting new connections in TCP and Unix * domain sockets. */ for (j = 0; j < server.ipfd_count; j++) { // acceptTcpHandler() tcp 鏈接接受處理函數 if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR) { redisPanic( "Unrecoverable error creating server.ipfd file event."); } } // UNIX 域套接字 if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE, acceptUnixHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.sofd file event.");
來看看acceptTcpHandler() 作了什麼:
// 用於 TCP 接收請求的處理函數 void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { int cport, cfd; char cip[REDIS_IP_STR_LEN]; REDIS_NOTUSED(el); REDIS_NOTUSED(mask); REDIS_NOTUSED(privdata); // 接收客戶端請求 cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); // 出錯 if (cfd == AE_ERR) { redisLog(REDIS_WARNING,"Accepting client connection: %s", server.neterr); return; } // 記錄 redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport); // 真正有意思的地方 acceptCommonHandler(cfd,0); }
接收套接字與客戶端創建鏈接後,調用 acceptCommonHandler()。acceptCommonHandler() 主要工做就是:
以上作好了準備工做,能夠進入事件循環。跳出 initServer() 回到 main() 中,main() 會調用 aeMain()。進入事件循環發生在 aeProcessEvents() 中:
void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { // 進入事件循環可能會進入睡眠狀態。在睡眠以前,執行預設置的函數 aeSetBeforeSleepProc()。 if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); // AE_ALL_EVENTS 表示處理全部的事件 aeProcessEvents(eventLoop, AE_ALL_EVENTS); } } // 先處理定時事件,而後處理套接字事件 int aeProcessEvents(aeEventLoop *eventLoop, int flags) { int processed = 0, numevents; /* Nothing to do? return ASAP */ if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; /* Note that we want call select() even if there are no * file events to process as long as we want to process time * events, in order to sleep until the next time event is ready * to fire. */ if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { int j; aeTimeEvent *shortest = NULL; // tvp 會在 IO 多路複用的函數調用中用到,表示超時時間 struct timeval tv, *tvp; // 獲得最短未來會發生的定時事件 if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) shortest = aeSearchNearestTimer(eventLoop); // 計算睡眠的最短期 if (shortest) { // 存在定時事件 long now_sec, now_ms; /* Calculate the time missing for the nearest * timer to fire. */ // 獲得當前時間 aeGetTime(&now_sec, &now_ms); tvp = &tv; tvp->tv_sec = shortest->when_sec - now_sec; if (shortest->when_ms < now_ms) { // 須要借位 // 減法中的借位,毫秒向秒借位 tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000; tvp->tv_sec --; } else { // 不須要借位,直接減 tvp->tv_usec = (shortest->when_ms - now_ms)*1000; } // 當前系統時間已經超過定時事件設定的時間 if (tvp->tv_sec < 0) tvp->tv_sec = 0; if (tvp->tv_usec < 0) tvp->tv_usec = 0; } else { /* If we have to check for events but need to return * ASAP because of AE_DONT_WAIT we need to set the timeout * to zero */ // 若是沒有定時事件,見機行事 if (flags & AE_DONT_WAIT) { tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } else { /* Otherwise we can block */ tvp = NULL; /* wait forever */ } } // 調用 IO 多路複用函數阻塞監聽 numevents = aeApiPoll(eventLoop, tvp); // 處理已經觸發的事件 for (j = 0; j < numevents; j++) { // 找到 I/O 事件表中存儲的數據 aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int rfired = 0; /* note the fe->mask & mask & ... code: maybe an already processed * event removed an element that fired and we still didn't * processed, so we check if the event is still valid. */ // 讀事件 if (fe->mask & mask & AE_READABLE) { rfired = 1; fe->rfileProc(eventLoop,fd,fe->clientData,mask); } // 寫事件 if (fe->mask & mask & AE_WRITABLE) { if (!rfired || fe->wfileProc != fe->rfileProc) fe->wfileProc(eventLoop,fd,fe->clientData,mask); } processed++; } } // 處理定時事件 /* Check time events */ if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop); return processed; /* return the number of processed file/time events */ }
這裏以 select 版本的 redis api 實現做爲講解,aeApiPoll() 調用了 select() 進入了監聽輪詢。aeApiPoll() 的 tvp 參數是最小等待時間,它會被預先計算出來,它主要完成:
接下來的操做即是執行相應的回調函數,代碼在上一段中已經貼出:先處理 I/O 事件,再處理定時事件。
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { aeApiState *state = eventLoop->apidata; int retval, j, numevents = 0; /* 真有意思,在 aeApiState 結構中: typedef struct aeApiState { fd_set rfds, wfds; fd_set _rfds, _wfds; } aeApiState; 在調用 select() 的時候傳入的是 _rfds 和 _wfds,全部監聽的數據在 rfds 和 wfds 中。 在下次須要調用 selec() 的時候,會將 rfds 和 wfds 中的數據拷貝進 _rfds 和 _wfds 中。*/ memcpy(&state->_rfds,&state->rfds,sizeof(fd_set)); memcpy(&state->_wfds,&state->wfds,sizeof(fd_set)); retval = select(eventLoop->maxfd+1, &state->_rfds,&state->_wfds,NULL,tvp); if (retval > 0) { // 輪詢 for (j = 0; j <= eventLoop->maxfd; j++) { int mask = 0; aeFileEvent *fe = &eventLoop->events[j]; if (fe->mask == AE_NONE) continue; if (fe->mask & AE_READABLE && FD_ISSET(j,&state->_rfds)) mask |= AE_READABLE; if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds)) mask |= AE_WRITABLE; // 添加到觸發事件表中 eventLoop->fired[numevents].fd = j; eventLoop->fired[numevents].mask = mask; numevents++; } } return numevents; }
redis 的事件驅動總結以下:
後續分享更多內容。
----
搗亂 2014-3-9
http://daoluan.net