Redis 採用事件驅動機制來處理大量的網絡IO。它並無使用 libevent 或者 libev 這樣的成熟開源方案,而是本身實現一個很是簡潔的事件驅動庫 ae_event。程序員
Redis中的事件驅動庫只關注網絡IO,以及定時器。該事件庫處理下面兩類事件:api
事件驅動庫的代碼主要是在src/ae.c中實現的,其示意圖以下所示。數組
aeEventLoop
是整個事件驅動的核心,它管理着文件事件表和時間事件列表, 不斷地循環處理着就緒的文件事件和到期的時間事件。下面咱們就先分別介紹文件事件和時間事件,而後講述相關的aeEventLoop
源碼實現。bash
Redis基於Reactor模式開發了本身的網絡事件處理器,也就是文件事件處理器。文件事件處理器使用IO多路複用技術,同時監聽多個套接字,併爲套接字關聯不一樣的事件處理函數。當套接字的可讀或者可寫事件觸發時,就會調用相應的事件處理函數。服務器
Redis 使用的IO多路複用技術主要有:select
、epoll
、evport
和kqueue
等。每一個IO多路複用函數庫在 Redis 源碼中都對應一個單獨的文件,好比ae_select.c,ae_epoll.c, ae_kqueue.c等。Redis 會根據不一樣的操做系統,按照不一樣的優先級選擇多路複用技術。事件響應框架通常都採用該架構,好比 netty 和 libevent。網絡
以下圖所示,文件事件處理器有四個組成部分,它們分別是套接字、I/O多路複用程序、文件事件分派器以及事件處理器。數據結構
文件事件是對套接字操做的抽象,每當一個套接字準備好執行 accept、read、write和 close 等操做時,就會產生一個文件事件。由於 Redis 一般會鏈接多個套接字,因此多個文件事件有可能併發的出現。架構
I/O多路複用程序負責監聽多個套接字,並向文件事件派發器傳遞那些產生了事件的套接字。併發
儘管多個文件事件可能會併發地出現,但I/O多路複用程序老是會將全部產生的套接字都放到同一個隊列(也就是後文中描述的aeEventLoop
的fired
就緒事件表)裏邊,而後文件事件處理器會以有序、同步、單個套接字的方式處理該隊列中的套接字,也就是處理就緒的文件事件。框架
因此,一次 Redis 客戶端與服務器進行鏈接而且發送命令的過程如上圖所示。
Redis 的時間事件分爲如下兩類:
Redis 的時間事件的具體定義結構以下所示。
typedef struct aeTimeEvent {
/* 全局惟一ID */
long long id; /* time event identifier. */
/* 秒精確的UNIX時間戳,記錄時間事件到達的時間*/
long when_sec; /* seconds */
/* 毫秒精確的UNIX時間戳,記錄時間事件到達的時間*/
long when_ms; /* milliseconds */
/* 時間處理器 */
aeTimeProc *timeProc;
/* 事件結束回調函數,析構一些資源*/
aeEventFinalizerProc *finalizerProc;
/* 私有數據 */
void *clientData;
/* 前驅節點 */
struct aeTimeEvent *prev;
/* 後繼節點 */
struct aeTimeEvent *next;
} aeTimeEvent;
複製代碼
一個時間事件是定時事件仍是週期性事件取決於時間處理器的返回值:
when
屬性進行更新,讓這個事件在一段時間後再次達到。Redis 將全部時間事件都放在一個無序鏈表中,每次 Redis 會遍歷整個鏈表,查找全部已經到達的時間事件,而且調用相應的事件處理器。
介紹完文件事件和時間事件,咱們接下來看一下 aeEventLoop
的具體實現。
Redis 服務端在其初始化函數 initServer
中,會建立事件管理器aeEventLoop
對象。
函數aeCreateEventLoop
將建立一個事件管理器,主要是初始化 aeEventLoop
的各個屬性值,好比events
、fired
、timeEventHead
和apidata
:
aeEventLoop
對象。events
指針指向未就緒文件事件表、fired
指針指向就緒文件事件表。表的內容在後面添加具體事件時進行初變動。timeEventHead
和timeEventNextId
屬性。aeApiCreate
函數建立epoll
實例,並初始化 apidata
。aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;
/* 建立事件狀態結構 */
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
/* 建立未就緒事件表、就緒事件表 */
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
/* 設置數組大小 */
eventLoop->setsize = setsize;
/* 初始化執行最近一次執行時間 */
eventLoop->lastTime = time(NULL);
/* 初始化時間事件結構 */
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
eventLoop->aftersleep = NULL;
/* 將多路複用io與事件管理器關聯起來 */
if (aeApiCreate(eventLoop) == -1) goto err;
/* 初始化監聽事件 */
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;
err:
.....
}
複製代碼
aeApiCreate
函數首先建立了aeApiState
對象,初始化了epoll就緒事件表;而後調用epoll_create
建立了epoll
實例,最後將該aeApiState
賦值給apidata
屬性。
aeApiState
對象中epfd
存儲epoll
的標識,events
是一個epoll
就緒事件數組,當有epoll
事件發生時,全部發生的epoll
事件和其描述符將存儲在這個數組中。這個就緒事件數組由應用層開闢空間、內核負責把全部發生的事件填充到該數組。
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
/* 初始化epoll就緒事件表 */
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}
/* 建立 epoll 實例 */
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}
/* 事件管理器與epoll關聯 */
eventLoop->apidata = state;
return 0;
}
typedef struct aeApiState {
/* epoll_event 實例描述符*/
int epfd;
/* 存儲epoll就緒事件表 */
struct epoll_event *events;
} aeApiState;
複製代碼
aeFileEvent
是文件事件結構,對於每個具體的事件,都有讀處理函數和寫處理函數等。Redis 調用aeCreateFileEvent
函數針對不一樣的套接字的讀寫事件註冊對應的文件事件。
typedef struct aeFileEvent {
/* 監聽事件類型掩碼,值能夠是 AE_READABLE 或 AE_WRITABLE */
int mask;
/* 讀事件處理器 */
aeFileProc *rfileProc;
/* 寫事件處理器 */
aeFileProc *wfileProc;
/* 多路複用庫的私有數據 */
void *clientData;
} aeFileEvent;
/* 使用typedef定義的處理器函數的函數類型 */
typedef void aeFileProc(struct aeEventLoop *eventLoop,
int fd, void *clientData, int mask);
複製代碼
好比說,Redis 進行主從複製時,從服務器須要主服務器創建鏈接,它會發起一個 socekt鏈接,而後調用aeCreateFileEvent
函數針對發起的socket的讀寫事件註冊了對應的事件處理器,也就是syncWithMaster
函數。
aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL);
/* 符合aeFileProc的函數定義 */
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {....}
複製代碼
aeCreateFileEvent
的參數fd
指的是具體的socket
套接字,proc
指fd
產生事件時,具體的處理函數,clientData
則是回調處理函數時須要傳入的數據。 aeCreateFileEvent
主要作了三件事情:
fd
爲索引,在events
未就緒事件表中找到對應事件。aeApiAddEvent
函數,該事件註冊到具體的底層 I/O 多路複用中,本例爲epoll。int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
/* 取出 fd 對應的文件事件結構, fd 表明具體的 socket 套接字 */
aeFileEvent *fe = &eventLoop->events[fd];
/* 監聽指定 fd 的指定事件 */
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
/* 置文件事件類型,以及事件的處理器 */
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
/* 私有數據 */
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
複製代碼
如上文所說,Redis 基於的底層 I/O 多路複用庫有多套,因此aeApiAddEvent
也有多套實現,下面的源碼是epoll
下的實現。其核心操做就是調用epoll
的epoll_ctl
函數來向epoll
註冊響應事件。有關epoll
相關的知識能夠看一下《Java NIO源碼分析》
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0}; /* avoid valgrind warning */
/* 若是 fd 沒有關聯任何事件,那麼這是一個 ADD 操做。若是已經關聯了某個/某些事件,那麼這是一個 MOD 操做。 */
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
/* 註冊事件到 epoll */
ee.events = 0;
mask |= eventLoop->events[fd].mask; /* Merge old events */
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
/* 調用epoll_ctl 系統調用,將事件加入epoll中 */
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}
複製代碼
由於 Redis 中同時存在文件事件和時間事件兩個事件類型,因此服務器必須對這兩個事件進行調度,決定什麼時候處理文件事件,什麼時候處理時間事件,以及如何調度它們。
aeMain
函數以一個無限循環不斷地調用aeProcessEvents
函數來處理全部的事件。
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
/* 若是有須要在事件處理前執行的函數,那麼執行它 */
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
/* 開始處理事件*/
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}
複製代碼
下面是aeProcessEvents
的僞代碼,它會首先計算距離當前時間最近的時間事件,以此計算一個超時時間;而後調用aeApiPoll
函數去等待底層的I/O多路複用事件就緒;aeApiPoll
函數返回以後,會處理全部已經產生文件事件和已經達到的時間事件。
/* 僞代碼 */
int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
/* 獲取到達時間距離當前時間最接近的時間事件*/
time_event = aeSearchNearestTimer();
/* 計算最接近的時間事件距離到達還有多少毫秒*/
remaind_ms = time_event.when - unix_ts_now();
/* 若是事件已經到達,那麼remaind_ms爲負數,將其設置爲0 */
if (remaind_ms < 0) remaind_ms = 0;
/* 根據 remaind_ms 的值,建立 timeval 結構*/
timeval = create_timeval_with_ms(remaind_ms);
/* 阻塞並等待文件事件產生,最大阻塞時間由傳入的 timeval 結構決定,若是remaind_ms 的值爲0,則aeApiPoll 調用後馬上返回,不阻塞*/
/* aeApiPoll調用epoll_wait函數,等待I/O事件*/
aeApiPoll(timeval);
/* 處理全部已經產生的文件事件*/
processFileEvents();
/* 處理全部已經到達的時間事件*/
processTimeEvents();
}
複製代碼
與aeApiAddEvent
相似,aeApiPoll
也有多套實現,它其實就作了兩件事情,調用epoll_wait
阻塞等待epoll
的事件就緒,超時時間就是以前根據最快達到時間事件計算而來的超時時間;而後將就緒的epoll
事件轉換到fired就緒事件。aeApiPoll
就是上文所說的I/O多路複用程序。具體過程以下圖所示。
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp)
{
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
// 調用epoll_wait函數,等待時間爲最近達到時間事件的時間計算而來。
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
// 有至少一個事件就緒?
if (retval > 0)
{
int j;
/*爲已就緒事件設置相應的模式,並加入到 eventLoop 的 fired 數組中*/
numevents = retval;
for (j = 0; j < numevents; j++)
{
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN)
mask |= AE_READABLE;
if (e->events & EPOLLOUT)
mask |= AE_WRITABLE;
if (e->events & EPOLLERR)
mask |= AE_WRITABLE;
if (e->events & EPOLLHUP)
mask |= AE_WRITABLE;
/* 設置就緒事件表元素 */
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
// 返回已就緒事件個數
return numevents;
}
複製代碼
processFileEvent
是處理就緒文件事件的僞代碼,也是上文所述的文件事件分派器,它其實就是遍歷fired
就緒事件表,而後根據對應的事件類型來調用事件中註冊的不一樣處理器,讀事件調用rfileProc
,而寫事件調用wfileProc
。
void processFileEvent(int numevents) {
for (j = 0; j < numevents; j++) {
/* 從已就緒數組中獲取事件 */
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int fired = 0;
int invert = fe->mask & AE_BARRIER;
/* 讀事件 */
if (!invert && fe->mask & mask & AE_READABLE) {
/* 調用讀處理函數 */
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
/* 寫事件. */
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
if (invert && fe->mask & mask & AE_READABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
processed++;
}
}
}
複製代碼
而processTimeEvents
是處理時間事件的函數,它會遍歷aeEventLoop
的事件事件列表,若是時間事件到達就執行其timeProc
函數,並根據函數的返回值是否等於AE_NOMORE
來決定該時間事件是不是週期性事件,並修改器到達時間。
static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te;
long long maxId;
time_t now = time(NULL);
....
eventLoop->lastTime = now;
te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1;
/* 遍歷時間事件鏈表 */
while(te) {
long now_sec, now_ms;
long long id;
/* 刪除須要刪除的時間事件 */
if (te->id == AE_DELETED_EVENT_ID) {
aeTimeEvent *next = te->next;
if (te->prev)
te->prev->next = te->next;
else
eventLoop->timeEventHead = te->next;
if (te->next)
te->next->prev = te->prev;
if (te->finalizerProc)
te->finalizerProc(eventLoop, te->clientData);
zfree(te);
te = next;
continue;
}
/* id 大於最大maxId,是該循環週期生成的時間事件,不處理 */
if (te->id > maxId) {
te = te->next;
continue;
}
aeGetTime(&now_sec, &now_ms);
/* 事件已經到達,調用其timeProc函數*/
if (now_sec > te->when_sec ||
(now_sec == te->when_sec && now_ms >= te->when_ms))
{
int retval;
id = te->id;
retval = te->timeProc(eventLoop, id, te->clientData);
processed++;
/* 若是返回值不等於 AE_NOMORE,表示是一個週期性事件,修改其when_sec和when_ms屬性*/
if (retval != AE_NOMORE) {
aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
} else {
/* 一次性事件,標記爲需刪除,下次遍歷時會刪除*/
te->id = AE_DELETED_EVENT_ID;
}
}
te = te->next;
}
return processed;
}
複製代碼
當不在須要某個事件時,須要把事件刪除掉。例如: 若是fd同時監聽讀事件、寫事件。當不在須要監聽寫事件時,能夠把該fd的寫事件刪除。
aeDeleteEventLoop
函數的執行過程總結爲如下幾個步驟 一、根據fd
在未就緒表中查找到事件 二、取消該fd
對應的相應事件標識符 三、調用aeApiFree
函數,內核會將epoll監聽紅黑樹上的相應事件監聽取消。
接下來,咱們會繼續學習 Redis 的主從複製相關的原理,歡迎你們持續關注。