事件庫之Redis本身的事件模型-ae

#Redis本身的事件模型 aec++

##1.Redis的事件模型庫vim

你們到網上Google「Redis libevent」就能夠搜到Redis爲何沒有選擇libevent以及libev爲其事件模型庫,而是本身寫了一個事件模型。 從代碼中能夠看到它主要支持了epoll、select、kqueue、以及基於Solaris的event ports。主要提供了對兩種類型的事件驅動:api

  1. IO事件(文件事件),包括有IO的讀事件和寫事件。
  2. 定時器事件,包括有一次性定時器和循環定時器。

##2.使用示例數組

這裏寫了一個由標準輸入的讀事件驅動的echo服務例子,同時用一個5秒的循環定時器每一個5秒打印一次服務器狀態。這裏用了epoll爲底層 事件接口。具體的代碼抽取能夠從Redis的源碼中抽取"ae.c"、「ae.h」、"ae_select.c"、「ae_epoll.c」、"ae_evport.c"這幾個文件,經過 ae.c中的宏::服務器

#define HAVE_EPOLL 1    // illustrate to use epoll
#ifdef HAVE_EVPORT
#   include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#    include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#    include "ae_kqueue.c"
#else
#    include "ae_select.c"
#endif
#endif
#endif

這裏主要是分析Redis的事件模型的封裝,所以對於其對socket的包裝以及內存管理都不作分析。故採用標準輸入,同時須要將這些文件中 的內存管理接口"zmalloc()"以及"zfree()"替換成C庫中的「malloc()」還有"free()"。可使用sed或者vim的%s作替換操做。數據結構

將主程序貼在這裏::app

#include "ae.h"

#include <stdio.h>
#include <assert.h>
#include <unistd.h>
#include <sys/time.h>


#define MAXFD 5


void loop_init(struct aeEventLoop *l) 
{
        puts("I'm loop_init!!! \n");
}

void file_cb(struct aeEventLoop *l,int fd,void *data,int mask)
{
        char buf[51] ={0};
        read(fd,buf,51);
        printf("I'm file_cb ,here [EventLoop: %p],[fd : %d],[data: %p],[mask: %d] \n",l,fd,data,mask);
        printf("get %s",buf);
}

int time_cb(struct aeEventLoop *l,long long id,void *data)
{
        printf("now is %ld\n",time(NULL));
        printf("I'm time_cb,here [EventLoop: %p],[id : %lld],[data: %p] \n",l,id,data);
        return 5*1000;

}

void fin_cb(struct aeEventLoop *l,void *data)
{
        puts("call the unknow final function \n");
}

int main(int argc,char *argv[])
{
        aeEventLoop *l; 
        char *msg = "Here std say:";
        char *user_data = malloc(50*sizeof(char));
        if(! user_data)
                assert( ("user_data malloc error",user_data) );
        memset(user_data,'\0',50);
        memcpy(user_data,msg,sizeof(msg));

        l = aeCreateEventLoop(MAXFD);
        aeSetBeforeSleepProc(l,loop_init);
        int res;
        res = aeCreateFileEvent(l,STDIN_FILENO,AE_READABLE,file_cb,user_data);
        printf("create file event is ok? [%d]\n",res);
        res = aeCreateTimeEvent(l,5*1000,time_cb,NULL,fin_cb);
        printf("create time event is ok? [%d]\n",!res);

        aeMain(l);

        puts("Everything is ok !!!\n");
return 0;
}

沒有什麼邏輯,就是註冊一個標準輸入的讀事件,和一個定時器事件。這裏要說明的就是在ae.h中定義了讀、寫、定時器等回調函數的類型::less

typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData); 
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData); 
typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);

按這個類型定義回調函數就能夠。其中asFileProc和aeTimeProc比較容易理解,就是IO讀寫事件和定時器事件的回調函數。這裏要注意了,若是 定義的定時器回調函數返回值爲正數,那麼表示該定時器是一個循環定時器,即在第一次執行完後添加定時器事件時給定的延遲後不刪除定時器, 在延遲該返回值時間(單位是毫秒)後再次執行該定時器。因此就要注意,好比要每5秒執行一個操做,那麼在添加定時器時要給定其定時時間爲 5000毫秒,同時在該定時器的回調函數中也要返回5000.dom

而後aeBeforeSleepProc回調函數比較的撲朔迷離,從Sleep上不容易理解,其實想到select和epoll這些機制的做用就能夠想到了,這個函數是在 poll以前執行,從源碼中看到就是在每一個處理事件的循環開始出執行的。而aeEventFinalizerProc單從名字就更難理解,從源碼中看到它是在刪除 定時器事件時候執行的。socket

clientData比較好理解,就和在epoll中的ptr指針的做用同樣。主要能夠存放用戶對每一個事件上附加的數據。

事件循環的入口函數是aeMain(),將建立好的aeEventLoop傳入就能夠了。

使用起來很簡單,對於不是很複雜或者對接入層要求不高的應用能夠一試。

##3.ae.c分析

Redis的ae(姑且這麼稱呼Redis用的事件模型庫的名字)主要邏輯在文件「ae.c」中,其中根據使用的系統事件接口分別選擇包含"ae_epoll.c"或其餘 文件。用到的主要數據結構在文件「ae.h」中定義。下面用一個不規範的UML類圖表示了幾個主要數據結構之間的關係,其中連在一塊兒的表示一個數組或者 箭頭表示的鏈表。這麼畫主要是幫助理解。

類圖

下面根據上面的示例程序一一作說明。

###3.1 主要數據結構的建立

####3.1.1 aeCreateEventLoop

首先要建立一個aeCreateEventLoop對象。該對象須要一個最大文件描述符做爲參數setSize,這個參數的意義須要瞭解ae的數據存放結構。從上面的圖能夠看到 在aeEventLoop結構中有兩個數組(其實就是服務器程序慣用提早分配好內存而後用index映射到相應位置的作法),這兩個數組的大小就是這裏的參數值。 ae會建立一個 setSize*sizeof(aeFileEvent) 以及一個 setSize*siezeof(aeFiredEvent) 大小的內存,用文件描述符做爲其索引。這一能夠達到0(1)的速度找到事件數據所在位置。那麼這個大小定位多少合適呢?在Linux箇中,文件描述符是個有限的資源,當打開一個文件時就會消耗一個文件描述符,當關閉該文件描述符或者程序結束時會釋放該文件描述符資源,從而供其餘文件打開操做使用。當文件描述符超過最大值後,打開文件就會出錯。那麼這個最大值是多少呢?能夠經過/proc/sys/fs/file-max看到系統支持的最大的文件描述符數。經過 ulimit -n 能夠看到當前用戶能打開的最大的文件描述符。在我這裏的一臺8g內存的機器上,系統支持最大的文件描述是365146。而在這臺64bit的機器上 sizeof(aeFiredEvent) + sizeof(aeFileEvent) 大小爲40byte。按系統最大支持的文件描述符來算,固定消耗內存爲14.6M。這樣以文件描述符做爲數組的下標來索引,雖然這樣的哈希在接入量不大的狀況下會有大量的浪費。可是最多也就浪費14M 的內存,所以這樣的設計是可取的。

在作好這些準備後還要準備系統提供的事件模型接口。這裏以epoll爲例,其餘的能夠自行查閱源碼瞭解。ae首先提供了一個統一的結構名aeApiState,能夠想象成c++中接口。在包裝epoll的aeApiState中有一個epfd表示epoll佔用的fd,一個epoll_event *events,其實也是一個aeApiState數組::

其和aeFiredEvent相對應,當epoll_wait()返回時,會將pending的文件描述符的信息放在aeFiredEvent數組中,包括有fd,以及mask事件類型,此時的aeFiredEvent不是以fd做爲下標的,而是把這個數組當成一個緩衝區,存放一次epoll_wait()返回的全部fd,同時用epoll_event數組存放了epoll_wait()返回中的epoll_data數據,用其數據能夠填充aeFiredEvent數組的內容供ae使用找到pending d的aeFileEvent對象。並在下一次進入epoll_wait()前處理完。這樣完成了對epoll數據封裝。

typedef struct aeApiState {
    int epfd;
    struct epoll_event *events;
} aeApiState;

3.1.2 aeCreateFileEvent

建立IO事件時須要指定要要註冊的文件的文件描述符fd,以及要監聽的事件類型mask。ae會先經過fd找到其對應的aeCreateFileEvent對象所在內存位置::

typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE) */
    aeFileProc *rfileProc;  
    aeFileProc *wfileProc;  
    void *clientData;   
} aeFileEvent;

而後添加其要監聽的事件類型mask fe->mask |= mask;,接着回根據要監聽的類型添加其讀事件或者寫事件的回調函數,即aeFileProc。同時更新maxfd以備後用,如在select中的最大fd的指定。

在建立文件事件的過程當中還要經過宏判斷後include進來的底層事件模型接口來註冊IO事件。這裏和上面同樣以epoll爲例,其餘的事件模型也相似。經過aeApiAddEvent將文件描述符fd和事件類型mask傳給epoll操做。首先經過fd爲下標找到aeCreateFileEvent對應的位置,而後取得epoll的epfd.經過EPOLL_CTL_ADD和EPOLL_CTL_MOD來加入或者修改epoll在該fd上事件的類型。

####3.1.3 aeCreateTimeEvent

ae的定時器是用一個單鏈表來管理的,將定時器依次從head插入到單鏈表中。插入的過程當中會取得將來的牆上時間做爲其超時的時刻。即將當前時間加上添加定時器時給定的延遲時間。定時器結構以下。並設置超時以及註銷定時器時的回調函數還用clientData::

typedef struct aeTimeEvent {
    long long id; /* time event identifier. */ 
    long when_sec; /* seconds */                 
    long when_ms; /* milliseconds */           
    aeTimeProc *timeProc;                      
    aeEventFinalizerProc *finalizerProc;        
    void *clientData;                           
    struct aeTimeEvent *next;                   
} aeTimeEvent;

###3.2 事件循環

####3.2.1 aeMain入口函數

ae事件循環的基本機構就是用一個無限循環,而後再循環中去檢測各個事件的發生。固然這裏不是徹底意義上的輪詢,由於循環裏面封裝了epoll,select等事件驅動機制::

while (!eventLoop->stop) {
    if (eventLoop->beforesleep != NULL)
        eventLoop->beforesleep(eventLoop);
    aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}

這裏的beforesleep就是上文中敘述過的,進入一次循環以前作的操做。後面會說到定時的過程其實也就是一個epoll或者select模擬的sleep過程,而等待事件到來也是「sleep」在epoll或者select上。因此這個叫名字感受也算貼切。固然這裏是YY一下。不過能夠幫助理解。

####3.2.2 aeProcessEvents

ae中最主要的邏輯應該也就是事件的處理了。從上面知道aeProcessEvents是處理事件的入口。在進入事件處理函數時,首先若沒有任何事件則當即返回::

/* Nothing to do? return ASAP */
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

這裏註釋中說的ASAP我不太理解表示的啥意思,望高人指點。

而後判斷是否有定時器事件,若是有那麼就去取得最近的一個將超時定時器的時間減去當前時間做爲epoll或者select等事件接口的超時時間。該尋找過程就是經過遍歷單鏈表得來的。這樣指定超時時間,在有IO事件pending時能夠處理IO事件,同時若沒有則能夠保證從epoll或者select中返回去處理定時器事件。不過這裏也能夠不註冊定時器事件而後將事件的flags與上AE_DONT_WAIT,那麼就會在poll中一直等待IO時間的到來。

在得到事件接口的超時時間後,用其調用封裝事件接口的函數aeApiPoll。這裏依舊以epoll做爲示例。其將首先得到apidata,而後從中得到epoll的文件描述符epfd,並用events指針指向的數組內存以及超時時間調用epoll的epoll_wait().在上面已經描述了,epoll()返回時會將結果放在epoll_event數組中同時返回新的文件描述符。經過對返回數據的事件類型作判斷能夠填充到aeFiredEvent中fd和事件類型信息。

而後返回到ae的邏輯中,經過遍歷aeFiredEvent數組取得fd能夠找到pending事件的aeFileEvent,而後根據事件的類型去調用用戶定義的IO回調函數。

當epoll或者select超時返回時並註冊了定時器事件時,經過processTimeEvents進入去處理超時事件::

/* If the system clock is moved to the future, and then set back to the
 * right value, time events may be delayed in a random way. Often this
 * means that scheduled operations will not be performed soon enough.
 *
 * Here we try to detect system clock skews, and force all the time
 * events to be processed ASAP when this happens: the idea is that
 * processing events earlier is less dangerous than delaying them
 * indefinitely, and practice suggests it is. */  
if (now < eventLoop->lastTime) {  
    te = eventLoop->timeEventHead;
    while(te) {
        te->when_sec = 0;
        te = te->next;
    }
}

這裏的註釋說明了這麼作的意義,其實就是若是系統事件變動了,就將全部的定時器時間設爲0,讓他在本次循環中超時並被執行。

當一個定時器被處理的時候,此時可能會加入新的定時,好比在定時器處理函數中加入新的定時器。而此時僅應該處理上一個時間段的狀態,不該該在本次循環中去處理新的定時器。所以ae在EventLoop中加入了一個timeEventNextId的成員表示這次循環中最大的定時器id+1,這樣在遍歷定時器列表時,先保存最大的定時器id,而後遍歷過程過濾掉定時器列表可能加入新的定時器便可::

if (te->id > maxId) {
        te = te->next;   
        continue;
    }

這裏定時器的邏輯是若單鏈表中的定時器時間比當前時間晚就執行定時器註冊的回調函數。若是該回調函數返回正值,那麼就更新定時器時間爲該值以後,從而能夠循環執行定時器。若是該回調函數返回AE_NOMORE,那麼在執行完回調函數後註銷該定時器。

###3.3 清理工做

####3.3.1 註銷IO事件

註銷IO事件不是以aeFileEvent爲單位而是該IO事件加上其監聽的事件類型爲對象,所以其接口爲aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)。其首先經過fd找到去掉aeFileEvent對象,而後得到已有的mask,對其進行減操做後,構成fd上新的mask事件類型。經過修改epoll或者select中註冊的IO事件來完成。這裏以epoll爲例,會根據該文件描述符上是否還有待等待的事件類型分別調用epoll_ctr的EPOLL_CTL_MOD或者EPOLL_CTL_DEL命令。

####3.3.2註銷Timer時間

註銷定時器事件的操做比較暴力,直接遍歷鏈表,找到定時器id匹配的項,使用單鏈表刪除操做進行刪除。這裏再刪除以前會調用定時器上的finalizerProc。

####3.3.3註銷aeEventLooop

最後註銷aeEventLooop就是對相關內存的釋放。

##4.總結

分析到這就結束了。感受ae比較的直觀。主要提供了一個IO事件和定時器事件的事件驅動模型。定時器的單鏈表邏輯能夠再改進,好比用最小堆或者Timing-Wheel等著名的定時器解決方法。這樣的一個模型用select能夠跨到Windows上。所以用這套東西寫的server再客戶端測試的時候,也能夠複用接入層。

相關文章
相關標籤/搜索