Reids 是採用單線程和IO多路複用是處理來之客戶端的請求的,其中主要用到了evport,epoll,kqueue,select四種多路複用(按優先順序)。api
其中evport應該是Solaris上的,epoll是Linux上,kqueue則是FreeBSD上,而最後的select則是兼容性的選擇,不少系統都支持。數組
// ae.c /* Include the best multiplexing layer supported by this system. * The following should be ordered by performances, descending. */ #ifdef HAVE_EVPORT #include "ae_evport.c" #else #ifdef HAVE_EPOLL #include "ae_epoll.c" #else #ifdef HAVE_KQUEUE #include "ae_kqueue.c" #else #include "ae_select.c" #endif #endif #endif
Redis將以上4種,都封裝好,而後使用aeEventLoop
進行調用。app
在Redis的main函數最後,會調用aeMain()
進入事件循環,直至結束:ide
// server.c int main(int argc, char **argv) { ...... // 設置每次事件循環等待前的事件處理 aeSetBeforeSleepProc(server.el,beforeSleep); // 設置每次事件循環等待後的事件處理 aeSetAfterSleepProc(server.el,afterSleep); // 進入事件循環 aeMain(server.el); aeDeleteEventLoop(server.el); return 0; }
aeEventLoop
aeEventLoop
的結構體// ae.h /* State of an event based program */ typedef struct aeEventLoop { int maxfd; /* highest file descriptor currently registered */ int setsize; /* max number of file descriptors tracked */ long long timeEventNextId; time_t lastTime; /* Used to detect system clock skew */ aeFileEvent *events; /* Registered events */ aeFiredEvent *fired; /* Fired events */ aeTimeEvent *timeEventHead; int stop; void *apidata; /* This is used for polling API specific data */ aeBeforeSleepProc *beforesleep; aeBeforeSleepProc *aftersleep; } aeEventLoop;
maxfd
: 記錄aeEventLoop
時間循環中,註冊的文件描述符最大值setsize
: 最大文件描述符支持的數量timeEventNextId
: 記錄時間事件的Id,每添加一個時間事件則增1,每處理一個時間事件則減一(詳細能夠看ae.c
中的函數aeCreateTimeEvent()
和processTimeEvents()
)lastTime
: 檢查系統時鐘誤差所用的events
: 一個長度爲setsize
大小的aeFileEvent
數組,其中數組的下標是文件描述符,例event[fd]
,能夠參考aeCreateFileEvent()
函數,aeFileEvent
中保存在該fd
註冊的事件和事件對應着的讀寫操做(具體結構參考下文)。fired
: 一個長度爲setsize
大小的aeFiredEvent
數組,該數組是保存在該次事件循環中,有哪些fd
觸發了事件,觸發了什麼事件(具體結構參考下文)。timeEventHead
: 記錄着時間事件的鏈表頭,時間事件是保存在一個鏈表中的(具體結構參考下文)stop
: 事件循環aeEventLoop
是否已經中止的標識符,當stop
爲1時,表示已經中止了apidata
: 是給封裝4個多路複用(evport
,epoll
,kueue
,select
)時使用的。beforesleep
: 保存着事件循環阻塞前調用的方法(參考aeSetBeforeSleepProc()
)aftersleep
: 保存着事件循環阻塞後調用的方法(參考aeSetAfterSleepProc()
)aeFileEvent
結構體/* File event structure */ typedef struct aeFileEvent { int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */ aeFileProc *rfileProc; aeFileProc *wfileProc; void *clientData; } aeFileEvent;
mask
: 記錄該文件描述符fd
已經註冊了什麼事件
READABLE
: 「可讀」事件WRITABLE
: 「可寫」事件BARRIER
: 表示在處理「可寫」事件以前,不處理「可讀」事件rfileProc
: 記錄讀操做的函數指針wfileProc
: 記錄寫操做的函數指針clientData
: 私有數據aeFiredEvent
結構體/* A fired event */ typedef struct aeFiredEvent { int fd; int mask; } aeFiredEvent;
fd
: 有事件觸發的文件描述符mask
: 記錄文件描述符觸發了什麼事件的標識aeMain()
和aeProcessEvents()
的流程aeMain()
是aeEventLoop
中的main函數(廢話),調用後,就會循環調用aeProcessEvents()
,直至stop
爲1。函數
aeProcessEvents()
則是事件循環的主要操做,他會調用多路複用函數並阻塞對應的時間和計算觸發時間事件。oop
// ae.c // 事件循環的主程序 void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { // 觸發sleep前的事件,其實就是 epoll_wait() 阻塞前執行 if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); // 真正調用epoll的函數,AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP 表示觸發全部類型的時間和觸發epoll_wait()阻塞後的事件 aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); } }
// ae.c /* Process every pending time event, then every pending file event * (that may be registered by time event callbacks just processed). * Without special flags the function sleeps until some file event * fires, or when the next time event occurs (if any). * * If flags is 0, the function does nothing and returns. * if flags has AE_ALL_EVENTS set, all the kind of events are processed. * if flags has AE_FILE_EVENTS set, file events are processed. * if flags has AE_TIME_EVENTS set, time events are processed. * if flags has AE_DONT_WAIT set the function returns ASAP until all * if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called. * the events that's possible to process without to wait are processed. * * The function returns the number of events processed. */ // 處理時間事件,調用各個文件描述符對應的讀就緒/寫就緒的函數,還有觸發 AfterSleepProc // flags 是標識函數處理的方式, // AE_ALL_EVENTS: 標識全部事件都處理 // AE_FILE_EVENTS: 處理文件描述符事件 // AE_TIME_EVENTS: 處理時間事件 // AE_DONT_WAIT: 函數不阻塞(就是aeApiPoll()不會阻塞) // AE_CALL_AFTER_SLEEP: 觸發 AfterSleepProc // 其實目前代碼看來,只有ae.c 和 networking.c中調用到了該函數 // 基本上都是設爲AE_ALL_EVENTS // 差異僅爲ae.c中要加上AE_CALL_AFTER_SLEEP // networking.c要加上AE_DONT_WAIT int aeProcessEvents(aeEventLoop *eventLoop, int flags) { int processed = 0, numevents; /* Nothing to do? return ASAP */ // 若是不須要檢查時間事件和文件(指文件描述符)事件,則直接返回 // 其實目前沒有這個狀況 if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; /* Note that we want call select() even if there are no * file events to process as long as we want to process time * events, in order to sleep until the next time event is ready * to fire. */ // 若是有文件描述符 // 或 // falgs 表示須要檢查時間事件 且 沒有標爲不等待 if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { int j; aeTimeEvent *shortest = NULL; struct timeval tv, *tvp; if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) // 從數組中找出距離時間最短的時間事件 // PS: 從註釋中看aeSearchNearestTimer()是遍歷的結構,每次都是O(N) shortest = aeSearchNearestTimer(eventLoop); if (shortest) { // 若是有時間事件 long now_sec, now_ms; // 獲取當前時間 aeGetTime(&now_sec, &now_ms); tvp = &tv; // 計算肯定epoll_wait()的timeout,讓epoll_wait()能在下一個時間事件須要觸發的時候返回 /* How many milliseconds we need to wait for the next * time event to fire? */ long long ms = (shortest->when_sec - now_sec)*1000 + shortest->when_ms - now_ms; if (ms > 0) { tvp->tv_sec = ms/1000; tvp->tv_usec = (ms % 1000)*1000; } else { tvp->tv_sec = 0; tvp->tv_usec = 0; } } else { /* If we have to check for events but need to return * ASAP because of AE_DONT_WAIT we need to set the timeout * to zero */ if (flags & AE_DONT_WAIT) { // 若是是AE_DONT_WAIT,那就就會將epoll_wait()的timeout設爲0,那麼就不會阻塞了 tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } else { // 像註釋說的那樣,這個狀況可讓epoll_wait()阻塞,直至有時間返回 /* Otherwise we can block */ tvp = NULL; /* wait forever */ } } /* Call the multiplexing API, will return only on timeout or when * some event fires. */ // 這裏將會阻塞tvp中的時間(除非設置了AE_DONT_WAIT) // numevents是有多少個事件,其中事件類型和其文件描述符保存在eventLoop->fired中 numevents = aeApiPoll(eventLoop, tvp); /* After sleep callback. */ // 調用阻塞後的處理 if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) eventLoop->aftersleep(eventLoop); // 處理每一個eventLoop->fired的事件 for (j = 0; j < numevents; j++) { aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int fired = 0; /* Number of events fired for current fd. */ /* Normally we execute the readable event first, and the writable * event laster. This is useful as sometimes we may be able * to serve the reply of a query immediately after processing the * query. * * However if AE_BARRIER is set in the mask, our application is * asking us to do the reverse: never fire the writable event * after the readable. In such a case, we invert the calls. * This is useful when, for instance, we want to do things * in the beforeSleep() hook, like fsynching a file to disk, * before replying to a client. */ // AE_BARRIER 是表示優先處理 「可寫事件」,Redis默認是優先處理「可讀事件」的 int invert = fe->mask & AE_BARRIER; /* Note the "fe->mask & mask & ..." code: maybe an already * processed event removed an element that fired and we still * didn't processed, so we check if the event is still valid. * * Fire the readable event if the call sequence is not * inverted. */ // 可讀的文件描述符 if (!invert && fe->mask & mask & AE_READABLE) { // rfileProc是爲每一個文件描述符註冊的讀事件 // 好比監聽端口的服務端文件描述符,則是在server.c中的initServer()函數中, // 調用了aeCreateFileEvent()來註冊的 fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } /* Fire the writable event. */ // 可寫的文件描述符 if (fe->mask & mask & AE_WRITABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { // rfileProc是爲每一個文件描述符註冊的寫事件 // 好比監聽端口的服務端文件描述符,則是在server.c中的initServer()函數中, // 調用了aeCreateFileEvent()來註冊的 fe->wfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } /* If we have to invert the call, fire the readable event now * after the writable one. */ if (invert && fe->mask & mask & AE_READABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } processed++; } } // 檢查並執行時間事件(若是有須要觸發的話) /* Check time events */ if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop); return processed; /* return the number of processed file/time events */ }
函數 | 做用 |
---|---|
aeEventLoop *aeCreateEventLoop(int setsize) |
aeEventLoop 的建立和初始化函數 |
void aeDeleteEventLoop(aeEventLoop *eventLoop) |
刪除釋放aeDeleteEventLoop 的函數 |
void aeStop(aeEventLoop *eventLoop) |
中止事件循環 |
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,aeFileProc *proc, void *clientData) |
爲文件描述符fd 註冊事件,並將其添加到eventLoop->events 中 |
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) |
爲文件描述符fd 註銷事件,並將其從eventLoop->events 中移除 |
int aeGetFileEvents(aeEventLoop *eventLoop, int fd) |
從eventLoop->events 中獲取fd 註冊時的mask |
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,aeTimeProc *proc, void *clientData,aeEventFinalizerProc *finalizerProc) |
註冊添加時間事件 |
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id) |
刪除時間事件 |
int aeProcessEvents(aeEventLoop *eventLoop, int flags) |
事件循環的主要函數 |
int aeWait(int fd, int mask, long long milliseconds) |
阻塞等待文件描述符fd 讀/寫/錯誤 就緒 |
void aeMain(aeEventLoop *eventLoop) |
Main函數 |
char *aeGetApiName(void) |
獲取所使用的多路複用的名字(evport,epoll,kqueue,select) |
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) |
設置阻塞前處理函數 |
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep) |
設置阻塞後處理函數 |
int aeGetSetSize(aeEventLoop *eventLoop) |
獲取eventLoop 的setsize |
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize) |
重置eventLoop 的setsize |
從上面能夠看出,事件循環裏面,一共處理了3大類的時間:ui
BeforeSleepProc
和AfterSleepProc
的阻塞先後處理下面來看看它們是作了哪些事情this
BeforeSleepProc
在server.c
中,將beforeSleep()
設置爲阻塞前的處理函數idea
/* This function gets called every time Redis is entering the * main loop of the event driven library, that is, before to sleep * for ready file descriptors. */ void beforeSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); /* Call the Redis Cluster before sleep function. Note that this function * may change the state of Redis Cluster (from ok to fail or vice versa), * so it's a good idea to call it before serving the unblocked clients * later in this function. */ // 啓動集羣 if (server.cluster_enabled) clusterBeforeSleep(); /* Run a fast expire cycle (the called function will return * ASAP if a fast cycle is not needed). */ // 過時鍵的回收 if (server.active_expire_enabled && server.masterhost == NULL) activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); /* Send all the slaves an ACK request if at least one client blocked * during the previous event loop iteration. */ // 主從的響應 if (server.get_ack_from_slaves) { robj *argv[3]; argv[0] = createStringObject("REPLCONF",8); argv[1] = createStringObject("GETACK",6); argv[2] = createStringObject("*",1); /* Not used argument. */ replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3); decrRefCount(argv[0]); decrRefCount(argv[1]); decrRefCount(argv[2]); server.get_ack_from_slaves = 0; } /* Unblock all the clients blocked for synchronous replication * in WAIT. */ if (listLength(server.clients_waiting_acks)) processClientsWaitingReplicas(); /* Check if there are clients unblocked by modules that implement * blocking commands. */ moduleHandleBlockedClients(); /* Try to process pending commands for clients that were just unblocked. */ if (listLength(server.unblocked_clients)) processUnblockedClients(); /* Write the AOF buffer on disk */ // 寫AOF flushAppendOnlyFile(0); /* Handle writes with pending output buffers. */ handleClientsWithPendingWrites(); /* Before we are going to sleep, let the threads access the dataset by * releasing the GIL. Redis main thread will not touch anything at this * time. */ // 釋放GIL鎖 if (moduleCount()) moduleReleaseGIL(); }
能夠看出,其主要工做有:線程
等
因此,在阻塞的啓動,其實Redis是會存在其餘線程對Redis的數據集等進行處理的。
但在響應客戶端請求其中,有且只會有主線程對數據集進行操做,因此使得請求是串行訪問。
AfterSleepProc
在server.c
中,將afterSleep()
設置爲阻塞後的處理函數
/* This function is called immadiately after the event loop multiplexing * API returned, and the control is going to soon return to Redis by invoking * the different events callbacks. */ void afterSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); if (moduleCount()) moduleAcquireGIL(); }
在事件循環阻塞後,第一件事是設置GIL鎖,使得只有主線程對數據集進行操做。
server.c
的initServer()
函數中)networking.c
中)aof.c
中)cluster.c
中)replication.c
中)sentinel.c
中)在server.c
中,將serverCron()
設置爲一個時間事件。
從註釋上看,serverCron()
作了挺多東西的:
serverCron()
中,挺巧妙的利用了run_with_period()
使得每一個工做都間隔一段事件執行,避免操做太頻繁。
serverCron()
具體的代碼就不貼出來了,由於和其餘各類功能都有關聯,就不是事件循環裏面寫了。