深刻剖析 redis 事件驅動

概述

redis 內部有一個小型的事件驅動,它和 libevent 網絡庫的事件驅動同樣,都是依託 I/O 多路複用技術支撐起來的。linux

利用 I/O 多路複用技術,監聽感興趣的文件 I/O 事件,例如讀事件,寫事件等,同時也要維護一個以文件描述符爲主鍵,數據爲某個預設函數的事件表,這裏其實就是一個數組或者鏈表 。當事件觸發時,好比某個文件描述符可讀,系統會返回文件描述符值,用這個值在事件表中找到相應的數據項,從而實現回調。一樣的,定時事件也是能夠實現的,由於系統提供的 I/O 多路複用技術中的函數容許咱們設定時間值。redis

redis_event_summary

上面一段話比較綜合,可能須要一些 linux 系統編程和網絡編程的基礎,但你會看到多數事件驅動程序都是這麼實現的(?)。編程

redis 事件驅動數據結構

redis 事件驅動內部有四個主要的數據結構,分別是:事件循環結構體,文件事件結構體,時間事件結構體和觸發事件結構體。api

// 文件事件結構體
/* File event structure */
typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE) */

    // 回調函數指針
    aeFileProc *rfileProc;
    aeFileProc *wfileProc;

    // clientData 參數通常是指向 redisClient 的指針
    void *clientData;
} aeFileEvent;

// 時間事件結構體
/* Time event structure */
typedef struct aeTimeEvent {
    long long id; /* time event identifier. */
    long when_sec; /* seconds */
    long when_ms; /* milliseconds */

    // 定時回調函數指針
    aeTimeProc *timeProc;

    // 定時事件清理函數,當刪除定時事件的時候會被調用
    aeEventFinalizerProc *finalizerProc;

    // clientData 參數通常是指向 redisClient 的指針
    void *clientData;

    // 定時事件表採用鏈表來維護
    struct aeTimeEvent *next;
} aeTimeEvent;

// 觸發事件
/* A fired event */
typedef struct aeFiredEvent {
    int fd;
    int mask;
} aeFiredEvent;

// 事件循環結構體
/* State of an event based program */
typedef struct aeEventLoop {
    int maxfd;   /* highest file descriptor currently registered */
    int setsize; /* max number of file descriptors tracked */

    // 記錄最大的定時事件 id + 1
    long long timeEventNextId;

    // 用於系統時間的矯正
    time_t lastTime;     /* Used to detect system clock skew */

    // I/O 事件表
    aeFileEvent *events; /* Registered events */

    // 被觸發的事件
    aeFiredEvent *fired; /* Fired events */

    // 定時事件表
    aeTimeEvent *timeEventHead;

    // 事件循環結束標識
    int stop;

    // 對於不一樣的 I/O 多路複用技術,有不一樣的數據,詳見各自實現
    void *apidata; /* This is used for polling API specific data */

    // 新的循環前須要執行的操做
    aeBeforeSleepProc *beforesleep;
} aeEventLoop;

上面的數據結構能給咱們很好的提示:事件循環結構體維護 I/O 事件表,定時事件表和觸發事件表。數組

事件循環中心

redis 的主函數中調用 initServer() 函數從而初始化事件循環中心(EventLoop),它的主要工做是在 aeCreateEventLoop() 中完成的。網絡

aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop;
    int i;

    // 分配空間
    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;

    // 分配文件事件結構體空間
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);

    // 分配已觸發事件結構體空間
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;

    eventLoop->setsize = setsize;
    eventLoop->lastTime = time(NULL);

    // 時間事件鏈表頭
    eventLoop->timeEventHead = NULL;

    // 後續提到
    eventLoop->timeEventNextId = 0;
    eventLoop->stop = 0;
    eventLoop->maxfd = -1;

    // 進入事件循環前須要執行的操做,此項會在 redis main() 函數中設置
    eventLoop->beforesleep = NULL;

    // 在這裏,aeApiCreate() 函數對於每一個 IO 多路複用模型的實現都有不一樣,具體參見源代碼,由於每種 IO 多路複用模型的初始化都不一樣
    if (aeApiCreate(eventLoop) == -1) goto err;

    /* Events with mask == AE_NONE are not set. So let's initialize the
     * vector with it. */
    // 初始化事件類型掩碼爲無事件狀態
    for (i = 0; i < setsize; i++)
        eventLoop->events[i].mask = AE_NONE;
    return eventLoop;

err:
    if (eventLoop) {
        zfree(eventLoop->events);
        zfree(eventLoop->fired);
        zfree(eventLoop);
    }
    return NULL;
}

有上面初始化工做只是完成了一個空空的事件中心而已。要想驅動事件循環,還須要下面的工做。數據結構

事件註冊詳解

文件 I/O 事件註冊主要操做在 aeCreateFileEvent() 中完成。aeCreateFileEvent() 會根據文件描述符的數值大小在事件循環結構體的 I/O 事件表中取一個數據空間,利用系統提供的 I/O 多路複用技術監聽感興趣的 I/O 事件,並設置回調函數。dom

io_event_table

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    // 在 I/O 事件表中選擇一個空間
    aeFileEvent *fe = &eventLoop->events[fd];

    // aeApiAddEvent() 只在此函數中調用,對於不一樣 IO 多路複用實現,會有所不一樣
    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;

    fe->mask |= mask;

    // 設置回調函數
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

對於不一樣版本的 I/O 多路複用,好比 epoll,select,kqueue 等,redis 有各自的版本,但接口統一,譬如 aeApiAddEvent()。socket

redis_event_api

之於定時事件,在事件循環結構體中用鏈表來維護。定時事件操做在 aeCreateTimeEvent() 中完成:分配定時事件結構體,設置觸發時間和回調函數,插入到定時事件表中。tcp

time_event_table

long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
        aeTimeProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc)
{
/*    自增
    timeEventNextId 會在處理執行定時事件時會用到,用於防止出現死循環。
    若是超過了最大 id,則跳過這個定時事件,爲的是避免死循環,即:
    若是事件一執行的時候註冊了事件二,事件一執行完畢後事件二獲得執行,緊接着若是事件一有獲得執行就會成爲循環,所以維護了 timeEventNextId 。*/
    long long id = eventLoop->timeEventNextId++;
    aeTimeEvent *te;

    // 分配空間
    te = zmalloc(sizeof(*te));
    if (te == NULL) return AE_ERR;

    // 填充時間事件結構體
    te->id = id;

    // 計算超時時間
    aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);

    // proc == serverCorn
    te->timeProc = proc;
    te->finalizerProc = finalizerProc;
    te->clientData = clientData;

    // 頭插法
    te->next = eventLoop->timeEventHead;
    eventLoop->timeEventHead = te;
    return id;
}

準備監聽工做

initServer() 中調用了 aeCreateEventLoop() 完成了事件中心的初始化,initServer() 還作了監聽的準備。

    /* Open the TCP listening socket for the user commands. */
    // listenToPort() 中有調用 listen()
    if (server.port != 0 &&
        listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR)
        exit(1);

    // UNIX 域套接字
    /* Open the listening Unix domain socket. */
    if (server.unixsocket != NULL) {
        unlink(server.unixsocket); /* don't care if this fails */
        server.sofd = anetUnixServer(server.neterr,server.unixsocket,server.unixsocketperm);
        if (server.sofd == ANET_ERR) {
            redisLog(REDIS_WARNING, "Opening socket: %s", server.neterr);
            exit(1);
        }
    }

從上面能夠看出,redis 提供了 TCP 和 UNIX 域套接字兩種工做方式。以 TCP 工做方式爲例,listenPort() 建立綁定了套接字並啓動了監聽。

爲監聽套接字註冊事件

在進入事件循環前還須要作一些準備工做。緊接着,initServer() 爲全部的監聽套接字註冊了讀事件,響應函數爲 acceptTcpHandler() 或者 acceptUnixHandler()。

    // 建立接收 TCP 或者 UNIX 域套接字的事件處理
    // TCP
    /* Create an event handler for accepting new connections in TCP and Unix
     * domain sockets. */
    for (j = 0; j < server.ipfd_count; j++) {

        // acceptTcpHandler() tcp 鏈接接受處理函數
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
            acceptTcpHandler,NULL) == AE_ERR)
            {
                redisPanic(
                    "Unrecoverable error creating server.ipfd file event.");
            }
    }

    // UNIX 域套接字
    if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
        acceptUnixHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.sofd file event.");

來看看acceptTcpHandler() 作了什麼:

// 用於 TCP 接收請求的處理函數
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd;
    char cip[REDIS_IP_STR_LEN];
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);
    REDIS_NOTUSED(privdata);

    // 接收客戶端請求
    cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);

    // 出錯
    if (cfd == AE_ERR) {
        redisLog(REDIS_WARNING,"Accepting client connection: %s", server.neterr);
        return;
    }

    // 記錄
    redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);

    // 真正有意思的地方
    acceptCommonHandler(cfd,0);
}

接收套接字與客戶端創建鏈接後,調用 acceptCommonHandler()。acceptCommonHandler() 主要工做就是:

  1. 創建並保存服務端與客戶端的鏈接信息,這些信息保存在一個 struct redisClient 結構體中;
  2. 爲與客戶端鏈接的套接字註冊讀事件,相應的回調函數爲 readQueryFromClient(),readQueryFromClient() 做用是從套接字讀取數據,執行相應操做並回復客戶端。

redis 事件循環

以上作好了準備工做,能夠進入事件循環。跳出 initServer() 回到 main() 中,main() 會調用 aeMain()。進入事件循環發生在 aeProcessEvents() 中:

  1. 根據定時事件表計算須要等待的最短期;
  2. 調用 redis api aeApiPoll() 進入監聽輪詢,若是沒有事件發生就會進入睡眠狀態,其實就是 I/O 多路複用 select() epoll() 等的調用;
  3. 有事件發生會被喚醒,處理已觸發的 I/O 事件和定時事件。
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {

        // 進入事件循環可能會進入睡眠狀態。在睡眠以前,執行預設置的函數 aeSetBeforeSleepProc()。
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);

        // AE_ALL_EVENTS 表示處理全部的事件
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}

// 先處理定時事件,而後處理套接字事件
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    /* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {

        int j;
        aeTimeEvent *shortest = NULL;
        // tvp 會在 IO 多路複用的函數調用中用到,表示超時時間
        struct timeval tv, *tvp;

        // 獲得最短未來會發生的定時事件
        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop);

        // 計算睡眠的最短期
        if (shortest) { // 存在定時事件
            long now_sec, now_ms;

            /* Calculate the time missing for the nearest
             * timer to fire. */
            // 獲得當前時間
            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;
            tvp->tv_sec = shortest->when_sec - now_sec;
            if (shortest->when_ms < now_ms) { // 須要借位
                // 減法中的借位,毫秒向秒借位
                tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
                tvp->tv_sec --;
            } else { // 不須要借位,直接減
                tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
            }

            // 當前系統時間已經超過定時事件設定的時間
            if (tvp->tv_sec < 0) tvp->tv_sec = 0;
            if (tvp->tv_usec < 0) tvp->tv_usec = 0;
        } else {
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
            // 若是沒有定時事件,見機行事
            if (flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                tvp = NULL; /* wait forever */
            }
        }

        // 調用 IO 多路複用函數阻塞監聽
        numevents = aeApiPoll(eventLoop, tvp);

        // 處理已經觸發的事件
        for (j = 0; j < numevents; j++) {
            // 找到 I/O 事件表中存儲的數據
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;

         /* note the fe->mask & mask & ... code: maybe an already processed
             * event removed an element that fired and we still didn't
             * processed, so we check if the event is still valid. */
            // 讀事件
            if (fe->mask & mask & AE_READABLE) {
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            // 寫事件
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }
            processed++;
        }
    }

    // 處理定時事件
    /* Check time events */
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}

事件觸發

這裏以 select 版本的 redis api 實現做爲講解,aeApiPoll() 調用了 select() 進入了監聽輪詢。aeApiPoll() 的 tvp 參數是最小等待時間,它會被預先計算出來,它主要完成:

  1. 拷貝讀寫的 fdset。select() 的調用會破壞傳入的 fdset,實際上有兩份 fdset,一份做爲備份,另外一份用做調用。每次調用 select() 以前都從備份中直接拷貝一份;
  2. 調用 select();
  3. 被喚醒後,檢查 fdset 中的每個文件描述符,並將可讀或者可寫的描述符記錄到觸發表當中。

接下來的操做即是執行相應的回調函數,代碼在上一段中已經貼出:先處理 I/O 事件,再處理定時事件。

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, j, numevents = 0;

    /*
    真有意思,在 aeApiState 結構中:
    typedef struct aeApiState {
        fd_set rfds, wfds;
        fd_set _rfds, _wfds;
    } aeApiState;
    在調用 select() 的時候傳入的是 _rfds 和 _wfds,全部監聽的數據在 rfds 和 wfds 中。
    在下次須要調用 selec() 的時候,會將 rfds 和 wfds 中的數據拷貝進 _rfds 和 _wfds 中。*/
    memcpy(&state->_rfds,&state->rfds,sizeof(fd_set));
    memcpy(&state->_wfds,&state->wfds,sizeof(fd_set));

    retval = select(eventLoop->maxfd+1,
                &state->_rfds,&state->_wfds,NULL,tvp);
    if (retval > 0) {
        // 輪詢
        for (j = 0; j <= eventLoop->maxfd; j++) {
            int mask = 0;
            aeFileEvent *fe = &eventLoop->events[j];

            if (fe->mask == AE_NONE) continue;
            if (fe->mask & AE_READABLE && FD_ISSET(j,&state->_rfds))
                mask |= AE_READABLE;
            if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds))
                mask |= AE_WRITABLE;

            // 添加到觸發事件表中
            eventLoop->fired[numevents].fd = j;
            eventLoop->fired[numevents].mask = mask;
            numevents++;
        }
    }
    return numevents;
}

總結

redis 的事件驅動總結以下:

  1. 初始化事件循環結構體
  2. 註冊監聽套接字的讀事件
  3. 註冊定時事件
  4. 進入事件循環
  5. 若是監聽套接字變爲可讀,會接收客戶端請求,併爲對應的套接字註冊讀事件
  6. 若是與客戶端鏈接的套接字變爲可讀,執行相應的操做

redis_event_summary

後續分享更多內容。

----

搗亂 2014-3-9

http://daoluan.net

相關文章
相關標籤/搜索