Redis中。處理網絡IO時,採用的是事件驅動機制。但它沒有使用libevent或者libev這種庫,而是本身實現了一個很easy明瞭的事件驅動庫ae_event,主要代碼只400行左右。node
沒有選擇libevent或libev的緣由大概在於。這些庫爲了迎合通用性形成代碼龐大,而且當中的很是多功能,比方監控子進程,複雜的定時器等。這些都不是Redis所需要的。git
Redis中的事件驅動庫僅僅關注網絡IO,以及定時器。該事件庫處理如下兩類事件:github
a:文件事件(file event):用於處理Redisserver和client之間的網絡IO。redis
b:時間事件(time eveat):Redis服務器中的一些操做(比方serverCron函數)需要在給定的時間點運行,而時間事件就是處理這類定時操做的。算法
事件驅動庫的代碼主要是在src/ae.c中實現的。api
一:文件事件數組
Redis基於Reactor模式開發了本身的網絡事件處理器,也就是文件事件處理器。服務器
文件事件處理器使用IO多路複用技術。同一時候監聽多個套接字,併爲套接字關聯不一樣的事件處理函數。網絡
當套接字的可讀或者可寫事件觸發時,就會調用對應的事件處理函數。數據結構
Redis使用的IO多路複用技術主要有:select、epoll、evport和kqueue等。每個IO多路複用函數庫在Redis源代碼中都相應一個單獨的文件,比方ae_select.c,ae_epoll.c, ae_kqueue.c等。
這些多路複用技術,依據不一樣的操做系統,Redis依照必定的優先級。選擇當中的一種使用。在ae.c中。是這樣實現的:
#ifdef HAVE_EVPORT #include "ae_evport.c" #else #ifdef HAVE_EPOLL #include "ae_epoll.c" #else #ifdef HAVE_KQUEUE #include "ae_kqueue.c" #else #include "ae_select.c" #endif #endif #endif
注意這裏是include的.c文件,所以。使用哪一種多路複用技術。是在編譯階段就決定了的。
文件事件由結構體aeFileEvent表示,它的定義例如如下:
/* File event structure */ typedef struct aeFileEvent { int mask; /* one of AE_(READABLE|WRITABLE) */ aeFileProc *rfileProc; aeFileProc *wfileProc; void *clientData; } aeFileEvent;
當中mask表示描寫敘述符註冊的事件。可以是AE_READABLE。AE_WRITABLE或者是AE_READABLE|AE_WRITABLE。
rfileProc和wfileProc分別表示可讀和可寫事件的回調函數。
clientData是用戶提供的數據,在調用回調函數時被當作參數。注意。該數據是可讀和可寫事件共用的。
二:時間事件
Redis的時間事件主要有一次性事件和週期性事件兩種。一次性時間事件僅觸發一次。而週期性事件每隔一段時間就觸發一次。
時間事件由aeTimeEvent結構體表示,它的定義例如如下:
/* Time event structure */ typedef struct aeTimeEvent { long long id; /* time event identifier. */ long when_sec; /* seconds */ long when_ms; /* milliseconds */ aeTimeProc *timeProc; aeEventFinalizerProc *finalizerProc; void *clientData; struct aeTimeEvent *next; } aeTimeEvent;
id用於標識時間事件。id號依照從小到大的順序遞增,新時間事件的id號比舊時間事件的id號要大。
when_sec和when_ms表示時間事件的下次觸發時間,實際上就是一個Unix時間戳,when_sec記錄它的秒數,when_ms記錄它的毫秒數。所以觸發時間是一個絕對值,而非相對值。
timeProc是時間事件處理器,也就是時間事件觸發時的回調函數;
finalizerProc是刪除該時間事件時要調用的函數;
clientData是用戶提供的數據。在調用timeProc和finalizerProc時。做爲參數;
所有的時間事件aeTimeEvent結構被組織成一個鏈表。next指針就運行鏈表中,當前aeTimeEvent結構的後繼結點。
aeTimeEvent結構鏈表是一個無序鏈表。也就是說它並不依照事件的觸發時間而排序。
每當建立一個新的時間事件aeTimeEvent結構時,該結構就插入鏈表的頭部。所以。當監控時間事件時,需要遍歷整個鏈表。查找所有已到達的時間事件,並調用對應的事件處理器。
在眼下版本號中,正常模式下的Redisserver僅僅使用serverCron一個時間事件。而在benchmark模式下,server也僅僅使用兩個時間事件。
所以。時間事件鏈表的這樣的設計儘管簡單粗暴,但是也能知足性能需求。
三:事件循環結構
在事件驅動的實現中,需要有一個事件循環結構來監控調度所有的事件,比方Libevent庫中的event_base,libev中的ev_loop等。
在Redis中的事件驅動庫中,事件循環結構是由aeEventLoop結構體實現的。aeEventLoop結構是Redis中事件驅動機制的主要數據結構。它的定義例如如下:
typedef struct aeEventLoop { int maxfd; /* highest file descriptor currently registered */ int setsize; /* max number of file descriptors tracked */ long long timeEventNextId; time_t lastTime; /* Used to detect system clock skew */ aeFileEvent *events; /* Registered events */ aeFiredEvent *fired; /* Fired events */ aeTimeEvent *timeEventHead; int stop; void *apidata; /* This is used for polling API specific data */ aeBeforeSleepProc *beforesleep; } aeEventLoop;
events是aeFileEvent結構的數組,每個aeFileEvent結構表示一個註冊的文件事件。events數組以描寫敘述符的值爲下標。
fired是aeFiredEvent結構的數組,aeFiredEvent結構表示一個觸發的文件事件。
結構中包括了描寫敘述符,以及其上已經觸發的事件。該數組不是以描寫敘述符的值爲下標。而是依次保存所有觸發的文件事件。當處理事件時,輪訓fired數組中的每個元素。而後依次處理。
setsize表示eventLoop->events和eventLoop->fired數組的大小。所以。setsize- 1就表示所能處理的最大的描寫敘述符的值。
lastTime:爲了處理時間事件而記錄的Unix時間戳。主要爲了在系統時間被調整時能夠儘快的處理時間事件;
timeEventHead:時間事件aeTimeEvent結構組成的鏈表的頭指針。
timeEventNextId:下個時間事件的ID,該ID依次遞增,所以當前時間事件的最大ID爲timeEventNextId-1;
stop:是否中止事件監控;
maxfd:當前處理的最大的描寫敘述符的值,主要是在select中使用。
beforesleep:每次監控事件觸發以前。需要調用的函數。
apidata表示詳細的底層多路複用所使用的數據結構,比方對於select來講。該結構中保存了讀寫描寫敘述符數組;對於epoll來講。該結構中保存了epoll描寫敘述符,以及epoll_event結構數組;
四:監控調度時間事件
監控調度時間事件是由函數processTimeEvents實現的,它的代碼例如如下:
static int processTimeEvents(aeEventLoop *eventLoop) { int processed = 0; aeTimeEvent *te; long long maxId; time_t now = time(NULL); /* If the system clock is moved to the future, and then set back to the * right value, time events may be delayed in a random way. Often this * means that scheduled operations will not be performed soon enough. * * Here we try to detect system clock skews, and force all the time * events to be processed ASAP when this happens: the idea is that * processing events earlier is less dangerous than delaying them * indefinitely, and practice suggests it is. */ if (now < eventLoop->lastTime) { te = eventLoop->timeEventHead; while(te) { te->when_sec = 0; te = te->next; } } eventLoop->lastTime = now; te = eventLoop->timeEventHead; maxId = eventLoop->timeEventNextId-1; while(te) { long now_sec, now_ms; long long id; if (te->id > maxId) { te = te->next; continue; } aeGetTime(&now_sec, &now_ms); if (now_sec > te->when_sec || (now_sec == te->when_sec && now_ms >= te->when_ms)) { int retval; id = te->id; retval = te->timeProc(eventLoop, id, te->clientData); processed++; /* After an event is processed our time event list may * no longer be the same, so we restart from head. * Still we make sure to don't process events registered * by event handlers itself in order to don't loop forever. * To do so we saved the max ID we want to handle. * * FUTURE OPTIMIZATIONS: * Note that this is NOT great algorithmically. Redis uses * a single time event so it's not a problem but the right * way to do this is to add the new elements on head, and * to flag deleted elements in a special way for later * deletion (putting references to the nodes to delete into * another linked list). */ if (retval != AE_NOMORE) { aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms); } else { aeDeleteTimeEvent(eventLoop, id); } te = eventLoop->timeEventHead; } else { te = te->next; } } return processed; }
首先推斷系統時間是否被調整了。將當前時間now,與上次記錄的時間戳eventLoop->lastTime相比較。假設now小於eventLoop->lastTime。說明系統時間被調整到過去了,比方由201603312030調整到了201603312000了。這樣的狀況下,直接將所有事件的觸發時間的秒數清0,這意味着所有的時間事件都會立刻觸發。
之因此這麼作。是因爲提早處理比延後處理的危急性要小;
而後更新eventLoop->lastTime爲now;
接下來,先記錄當前的maxId。之因此這麼作,是因爲有時間事件觸發後。要又一次回到鏈表頭結點開始處理。而在時間事件的觸發回調函數中。有可能註冊了新的時間事件,成爲新的鏈表頭結點,這就可能致使會無限處理下去。爲了防止這樣的狀況發生。記錄當前的maxId,僅僅處理當前的時間事件;
輪訓鏈表eventLoop->timeEventHead,針對當中的每一個事件節點te,假設te的id大於maxId。說明該事件,是在以前已經觸發的時間事件的回調函數中註冊的。不處理這種事件,直接處理下一個;
而後獲得當前時間,推斷當前時間是否已經超過了te的觸發時間,如果。說明該事件需要觸發,調用觸發回調函數te->timeProc,該函數的返回值爲retval;
假設retval是AE_NOMORE,說明觸發的時間事件是一次性事件,直接從鏈表中刪除;不然。說明該事件是週期性事件。將其觸發時間更改成當前時間加上retval;
事件觸發後,鏈表已經被改動了,要又一次回到鏈表頭結點開始處理。因爲Redis中僅僅有一個時間事件,所以採用了這樣的簡單粗暴的算法。更好的處理方式是處理完當前事件後。標記該節點需要刪除(比方在還有一個鏈表中保存該節點的指針),而後接着處理下一個節點,所有節點處理完以後,將標記爲刪除的節點統一刪除就能夠。
最後返回觸發的事件總數。
五:監控調度所有事件
監控調度所有事件是由函數aeProcessEvents實現的,它的代碼例如如下:
int aeProcessEvents(aeEventLoop *eventLoop, int flags) { int processed = 0, numevents; /* Nothing to do? return ASAP */ if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; /* Note that we want call select() even if there are no * file events to process as long as we want to process time * events, in order to sleep until the next time event is ready * to fire. */ if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { int j; aeTimeEvent *shortest = NULL; struct timeval tv, *tvp; if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) shortest = aeSearchNearestTimer(eventLoop); if (shortest) { long now_sec, now_ms; /* Calculate the time missing for the nearest * timer to fire. */ aeGetTime(&now_sec, &now_ms); tvp = &tv; tvp->tv_sec = shortest->when_sec - now_sec; if (shortest->when_ms < now_ms) { tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000; tvp->tv_sec --; } else { tvp->tv_usec = (shortest->when_ms - now_ms)*1000; } if (tvp->tv_sec < 0) tvp->tv_sec = 0; if (tvp->tv_usec < 0) tvp->tv_usec = 0; } else { /* If we have to check for events but need to return * ASAP because of AE_DONT_WAIT we need to set the timeout * to zero */ if (flags & AE_DONT_WAIT) { tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } else { /* Otherwise we can block */ tvp = NULL; /* wait forever */ } } numevents = aeApiPoll(eventLoop, tvp); for (j = 0; j < numevents; j++) { aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int rfired = 0; /* note the fe->mask & mask & ... code: maybe an already processed * event removed an element that fired and we still didn't * processed, so we check if the event is still valid. */ if (fe->mask & mask & AE_READABLE) { rfired = 1; fe->rfileProc(eventLoop,fd,fe->clientData,mask); } if (fe->mask & mask & AE_WRITABLE) { if (!rfired || fe->wfileProc != fe->rfileProc) fe->wfileProc(eventLoop,fd,fe->clientData,mask); } processed++; } } /* Check time events */ if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop); return processed; /* return the number of processed file/time events */ }
依據flags處理不一樣的事件:
假設flags爲0。則該函數直接返回。
假設flags中設置了AE_ALL_EVENTS,則處理所有的文件事件和時間事件;
假設flags中設置了AE_FILE_EVENTS。則處理所有的文件事件。
假設flags中設置了AE_TIME_EVENTS,則處理所有的時間事件;
假設flags中設置了AE_DONT_WAIT,則調用多路複用函數時。不會堵塞等
待事件的觸發。將所有已觸發的事件處理完後立刻返回。
眼下在Redis中,調用aeProcessEvents時設置的flags僅僅有AE_ALL_EVENTS和
AE_FILE_EVENTS|AE_DONT_WAIT兩種。
函數中,首先假設flags中既沒有設置AE_TIME_EVENTS。也沒有設置AE_FILE_EVENTS。則該函數直接返回0.
接下來,假設已經註冊過文件事件,或者需要處理時間事件且不是AE_DONT_WAIT。則需要調用底層多路複用函數aeApiPoll。所以需要計算調用aeApiPoll函數時,最長堵塞時間tvp。該值是由最先要觸發的時間事件(假設有的話)決定的。
假設需要處理時間事件且不是AE_DONT_WAIT。這樣的狀況下,不管有沒有文件事件,都要堵塞一段時間。堵塞的時間依據shortest獲得,shortest是經過調用aeSearchNearestTimer獲得的最先要觸發的時間事件。獲得shortest後,計算得出其觸發時間距離當前時間的差值,該差值就是堵塞時間tvp。
不然。假設註冊過文件事件,並且flags中設置了AE_DONT_WAIT。則將tvp中的值設置爲0。表示全然不堵塞;
假設註冊過文件事件,但是flags中沒有設置AE_DONT_WAIT,則將tvp置爲NULL,表示一直堵塞,直到有文件事件觸發;
獲得最長堵塞時間tvp以後。以tvp爲參數調用aeApiPoll等待文件事件的觸發。該函數由不一樣的底層多路複用函數實現。終於都返回觸發的文件事件總數numevents,並將觸發的事件和描寫敘述符,依次記錄到eventLoop->fired中。
接下來。依次輪訓eventLoop->fired中的前numevents個元素。調用對應的事件回調函數。注意。假設一個套接字又可讀又可寫的話,那麼server將先處理可讀事件,而後在處理可寫事件。
觸發的文件事件是依次處理的,假設某個文件事件的處理時間過長,就會影響到下一個事件的處理。在事件驅動的實現中,要由用戶保證事件回調函數能夠高速返回,而不堵塞。
注意。有這樣一種狀況。比方描寫敘述符3和4都有事件觸發了,在3的事件回調函數中,調用aeDeleteFileEvent將4的註冊事件刪除了。這樣在處理描寫敘述符4時,就不該該再次調用4的回調函數了。
因此,每次調用事件回調函數以前,都推斷該描寫敘述符上的註冊事件是否還有效。而且假設可讀和可寫事件的回調函數一樣的話,僅僅能調用一次該函數。
處理完文件事件以後(或者沒有文件事件。而只堵塞了tvp的時間),假設flags中設置了AE_TIME_EVENTS。則調用processTimeEvents處理時間事件,因已經堵塞了tvp的時間,所以此時確定有觸發的時間事件。最後。返回所有觸發的事件總數。
因爲時間事件在文件事件以後處理,並且事件之間不會出現搶佔,因此時間事件的實際處理時間。通常會比時間事件設定的到達時間稍晚一些。
再次強調一點:對文件事件和時間事件的處理都是同步、有序、原子地運行的,server不會中途中斷事件處理,也不會對事件進行搶佔。
所以。不管是文件事件的回調函數,仍是時間事件的回調函數。都需要儘可地下降程序的堵塞時間,從而下降形成事件飢餓的可能性。比方,在命令回覆回調函數中,將一個命令回覆寫入到client套接字時。假設寫人字節數超過了一個預設常量的話。命令回覆函數就會主動用break跳出寫人循環。將餘下的數據留到下次再寫。
另外,時間事件也會將很耗時的持久化操做放到子線程或者子進程運行。
六:事件循環監控
事件循環監控是由函數aeMain實現的,它的代碼例如如下:
void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); aeProcessEvents(eventLoop, AE_ALL_EVENTS); } }
僅僅要eventLoop->stop不爲1。則持續調用aeProcessEvents監控調度所有事件的觸發。正常狀況下,在Redisserver中,eventLoop->stop永遠不可能爲1。
在Redisserver的主函數中,所有初始化工做完畢以後,就會調用該函數。監控所有事件的觸發。
七:樣例:ECHOserver
如下是使用Redis的事件驅動庫,實現的一個簡單echoserver:
#define SERVER_PORT 9998 typedef struct { char clientaddr[INET_ADDRSTRLEN]; int port; char buf[1024]; }Userbuf; void setunblock(int fd) { int flags; if ((flags = fcntl(fd, F_GETFL)) == -1) { perror("fcntl(F_GETFL) error"); return; } flags |= O_NONBLOCK; if (fcntl(fd, F_SETFL, flags) == -1) { perror("fcntl(F_SETFL) error"); return; } return; } void acceptfun(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask) { int acceptfd = -1; struct sockaddr_in cliaddr; socklen_t addrlen = sizeof(cliaddr); acceptfd = accept(fd, (struct sockaddr *)&cliaddr, &addrlen); if (acceptfd < 0) { perror("accept error\n"); return; } Userbuf *usrbuf = calloc(1, sizeof(Userbuf)); printf("calloc %p\n", usrbuf); inet_ntop(AF_INET, &cliaddr.sin_addr, usrbuf->clientaddr, INET_ADDRSTRLEN), usrbuf->port = ntohs(cliaddr.sin_port); printf("\naccept from <%s:%d>\n", usrbuf->clientaddr, usrbuf->port); setunblock(acceptfd); if (aeCreateFileEvent(eventLoop, acceptfd, AE_READABLE, readfun, usrbuf) != AE_OK) { perror("aeCreateFileEvent error"); close(acceptfd); printf("free %p\n", usrbuf); free(usrbuf); return; } return; } void readfun(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask) { char readbuf[1024] = {}; int len = -1; Userbuf *usrbuf = (Userbuf *)clientData; if ((len = read(fd, readbuf, 1024)) > 0) { printf("read from <%s:%d>: %s\n", usrbuf->clientaddr, usrbuf->port, readbuf); memcpy(usrbuf->buf, readbuf, 1024); if (aeCreateFileEvent(eventLoop, fd, AE_WRITABLE, writefun, clientData) != AE_OK) { printf("aeCreateFileEvent error\n"); goto END; } else return; } else if (len == 0) { printf("close link from %s\n", usrbuf->buf); goto END; } else { printf("read error from %s\n", usrbuf->buf); goto END; } END: close(fd); aeDeleteFileEvent(eventLoop, fd, AE_READABLE); aeDeleteFileEvent(eventLoop, fd, AE_WRITABLE); printf("free %p\n", clientData); free(clientData); return; } void writefun(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask) { int len = 0; char *buf = ((Userbuf *)clientData)->buf; len = strlen(buf); printf("write to client: %s\n", buf); if(write(fd, buf, len) != len) { perror("write error"); close(fd); aeDeleteFileEvent(eventLoop, fd, AE_READABLE); aeDeleteFileEvent(eventLoop, fd, AE_WRITABLE); printf("free %p\n", clientData); free(clientData); } aeDeleteFileEvent(eventLoop, fd, AE_WRITABLE); } int main() { int listenfd; aeEventLoop *eventloop = NULL; struct sockaddr_in seraddr; listenfd = socket(AF_INET, SOCK_STREAM, 0); if (listenfd < 0) { perror("socket error"); return -1; } seraddr.sin_family = AF_INET; seraddr.sin_addr.s_addr = htonl(INADDR_ANY); seraddr.sin_port = htons(SERVER_PORT); if (bind(listenfd, (struct sockaddr *)&seraddr, sizeof(seraddr)) < 0) { perror("bind error"); close(listenfd); return -1; } if (listen(listenfd, 5) < 0) { perror("listen error"); close(listenfd); return -1; } eventloop = aeCreateEventLoop(1024); if (eventloop == NULL) { printf("aeCreateEventLoop error\n"); close(listenfd); return -1; } if (aeCreateFileEvent(eventloop, listenfd, AE_READABLE, acceptfun, NULL) != AE_OK) { perror("aeCreateFileEvent error"); close(listenfd); aeDeleteEventLoop(eventloop); return -1; } aeMain(eventloop); return 0; }
這裏要注意的是,對於同一個acceptfd,調用aeCreateFileEvent函數。分別註冊可讀事件和可寫事件時。其clientData是共享的。
假設在註冊可寫事件時,改動了clientData,則可讀事件的clientData也對應改變,這是因爲一個描寫敘述符僅僅有一個aeFileEvent結構。
client的代碼依據Webbench改寫。詳細代碼見:
https://github.com/gqtc/redis-3.0.5/blob/master/redis-3.0.5/tests/hhunittest/test_ae_client.c
其它有關事件驅動的代碼實現。可以參考:
https://github.com/gqtc/redis-3.0.5/blob/master/redis-3.0.5/src/ae.c
https://github.com/gqtc/redis-3.0.5/blob/master/redis-3.0.5/src/ae_epoll.c
https://github.com/gqtc/redis-3.0.5/blob/master/redis-3.0.5/src/ae_select.c