Redis源碼閱讀筆記-事件和事件循環

Redis源碼閱讀筆記-事件和事件循環

Reids 是採用單線程和IO多路複用是處理來之客戶端的請求的,其中主要用到了evport,epoll,kqueue,select四種多路複用(按優先順序)。api

其中evport應該是Solaris上的,epoll是Linux上,kqueue則是FreeBSD上,而最後的select則是兼容性的選擇,不少系統都支持。數組

// ae.c

/* Include the best multiplexing layer supported by this system.
 * The following should be ordered by performances, descending. */
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif

Redis將以上4種,都封裝好,而後使用aeEventLoop進行調用。app

在Redis的main函數最後,會調用aeMain()進入事件循環,直至結束:ide

// server.c

int main(int argc, char **argv) {
    ......
    // 設置每次事件循環等待前的事件處理
    aeSetBeforeSleepProc(server.el,beforeSleep);
    // 設置每次事件循環等待後的事件處理
    aeSetAfterSleepProc(server.el,afterSleep);
    // 進入事件循環
    aeMain(server.el);
    aeDeleteEventLoop(server.el);
    return 0;
}

事件循環 aeEventLoop

aeEventLoop的結構體

// ae.h

/* State of an event based program */
typedef struct aeEventLoop {
    int maxfd;   /* highest file descriptor currently registered */
    int setsize; /* max number of file descriptors tracked */
    long long timeEventNextId;
    time_t lastTime;     /* Used to detect system clock skew */
    aeFileEvent *events; /* Registered events */
    aeFiredEvent *fired; /* Fired events */
    aeTimeEvent *timeEventHead;
    int stop;
    void *apidata; /* This is used for polling API specific data */
    aeBeforeSleepProc *beforesleep;
    aeBeforeSleepProc *aftersleep;
} aeEventLoop;
  • maxfd: 記錄aeEventLoop時間循環中,註冊的文件描述符最大值
  • setsize: 最大文件描述符支持的數量
  • timeEventNextId: 記錄時間事件的Id,每添加一個時間事件則增1,每處理一個時間事件則減一(詳細能夠看ae.c中的函數aeCreateTimeEvent()processTimeEvents()
  • lastTime: 檢查系統時鐘誤差所用的
  • events: 一個長度爲setsize大小的aeFileEvent數組,其中數組的下標是文件描述符,例event[fd],能夠參考aeCreateFileEvent()函數,aeFileEvent中保存在該fd註冊的事件和事件對應着的讀寫操做(具體結構參考下文)。
  • fired: 一個長度爲setsize大小的aeFiredEvent數組,該數組是保存在該次事件循環中,有哪些fd觸發了事件,觸發了什麼事件(具體結構參考下文)。
  • timeEventHead: 記錄着時間事件的鏈表頭,時間事件是保存在一個鏈表中的(具體結構參考下文)
  • stop: 事件循環aeEventLoop是否已經中止的標識符,當stop爲1時,表示已經中止了
  • apidata: 是給封裝4個多路複用(evportepollkueueselect)時使用的。
  • beforesleep: 保存着事件循環阻塞前調用的方法(參考aeSetBeforeSleepProc()
  • aftersleep: 保存着事件循環阻塞後調用的方法(參考aeSetAfterSleepProc()

aeFileEvent結構體

/* File event structure */
typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
    aeFileProc *rfileProc;
    aeFileProc *wfileProc;
    void *clientData;
} aeFileEvent;
  • mask: 記錄該文件描述符fd已經註冊了什麼事件
    • READABLE: 「可讀」事件
    • WRITABLE: 「可寫」事件
    • BARRIER: 表示在處理「可寫」事件以前,不處理「可讀」事件
  • rfileProc: 記錄讀操做的函數指針
  • wfileProc: 記錄寫操做的函數指針
  • clientData: 私有數據

aeFiredEvent結構體

/* A fired event */
typedef struct aeFiredEvent {
    int fd;
    int mask;
} aeFiredEvent;
  • fd: 有事件觸發的文件描述符
  • mask: 記錄文件描述符觸發了什麼事件的標識

aeMain()aeProcessEvents()的流程

aeMain()aeEventLoop中的main函數(廢話),調用後,就會循環調用aeProcessEvents(),直至stop爲1。函數

aeProcessEvents()則是事件循環的主要操做,他會調用多路複用函數並阻塞對應的時間和計算觸發時間事件。oop

代碼

// ae.c

// 事件循環的主程序
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {

        // 觸發sleep前的事件,其實就是 epoll_wait() 阻塞前執行
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        // 真正調用epoll的函數,AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP 表示觸發全部類型的時間和觸發epoll_wait()阻塞後的事件
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}
// ae.c

/* Process every pending time event, then every pending file event
 * (that may be registered by time event callbacks just processed).
 * Without special flags the function sleeps until some file event
 * fires, or when the next time event occurs (if any).
 *
 * If flags is 0, the function does nothing and returns.
 * if flags has AE_ALL_EVENTS set, all the kind of events are processed.
 * if flags has AE_FILE_EVENTS set, file events are processed.
 * if flags has AE_TIME_EVENTS set, time events are processed.
 * if flags has AE_DONT_WAIT set the function returns ASAP until all
 * if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called.
 * the events that's possible to process without to wait are processed.
 *
 * The function returns the number of events processed. */
// 處理時間事件,調用各個文件描述符對應的讀就緒/寫就緒的函數,還有觸發 AfterSleepProc 
// flags 是標識函數處理的方式,
// AE_ALL_EVENTS: 標識全部事件都處理
// AE_FILE_EVENTS: 處理文件描述符事件
// AE_TIME_EVENTS: 處理時間事件
// AE_DONT_WAIT: 函數不阻塞(就是aeApiPoll()不會阻塞)
// AE_CALL_AFTER_SLEEP: 觸發 AfterSleepProc
// 其實目前代碼看來,只有ae.c 和 networking.c中調用到了該函數
// 基本上都是設爲AE_ALL_EVENTS
// 差異僅爲ae.c中要加上AE_CALL_AFTER_SLEEP
// networking.c要加上AE_DONT_WAIT
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    /* Nothing to do? return ASAP */
    // 若是不須要檢查時間事件和文件(指文件描述符)事件,則直接返回
    // 其實目前沒有這個狀況
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    /* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
    // 若是有文件描述符 
    // 或
    // falgs 表示須要檢查時間事件 且 沒有標爲不等待
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;

        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            // 從數組中找出距離時間最短的時間事件
            // PS: 從註釋中看aeSearchNearestTimer()是遍歷的結構,每次都是O(N)
            shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
            // 若是有時間事件

            long now_sec, now_ms;
            // 獲取當前時間
            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;

            // 計算肯定epoll_wait()的timeout,讓epoll_wait()能在下一個時間事件須要觸發的時候返回
            /* How many milliseconds we need to wait for the next
             * time event to fire? */
            long long ms =
                (shortest->when_sec - now_sec)*1000 +
                shortest->when_ms - now_ms;

            if (ms > 0) {
                tvp->tv_sec = ms/1000;
                tvp->tv_usec = (ms % 1000)*1000;
            } else {
                tvp->tv_sec = 0;
                tvp->tv_usec = 0;
            }
        } else {
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
            if (flags & AE_DONT_WAIT) {
                // 若是是AE_DONT_WAIT,那就就會將epoll_wait()的timeout設爲0,那麼就不會阻塞了
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                // 像註釋說的那樣,這個狀況可讓epoll_wait()阻塞,直至有時間返回
                /* Otherwise we can block */
                tvp = NULL; /* wait forever */
            }
        }

        /* Call the multiplexing API, will return only on timeout or when
         * some event fires. */
        // 這裏將會阻塞tvp中的時間(除非設置了AE_DONT_WAIT)
        // numevents是有多少個事件,其中事件類型和其文件描述符保存在eventLoop->fired中
        numevents = aeApiPoll(eventLoop, tvp);

        /* After sleep callback. */
        // 調用阻塞後的處理
        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
            eventLoop->aftersleep(eventLoop);

        // 處理每一個eventLoop->fired的事件
        for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int fired = 0; /* Number of events fired for current fd. */

            /* Normally we execute the readable event first, and the writable
             * event laster. This is useful as sometimes we may be able
             * to serve the reply of a query immediately after processing the
             * query.
             *
             * However if AE_BARRIER is set in the mask, our application is
             * asking us to do the reverse: never fire the writable event
             * after the readable. In such a case, we invert the calls.
             * This is useful when, for instance, we want to do things
             * in the beforeSleep() hook, like fsynching a file to disk,
             * before replying to a client. */
            // AE_BARRIER 是表示優先處理 「可寫事件」,Redis默認是優先處理「可讀事件」的
            int invert = fe->mask & AE_BARRIER;

            /* Note the "fe->mask & mask & ..." code: maybe an already
             * processed event removed an element that fired and we still
             * didn't processed, so we check if the event is still valid.
             *
             * Fire the readable event if the call sequence is not
             * inverted. */
            // 可讀的文件描述符
            if (!invert && fe->mask & mask & AE_READABLE) {
                // rfileProc是爲每一個文件描述符註冊的讀事件
                // 好比監聽端口的服務端文件描述符,則是在server.c中的initServer()函數中,
                // 調用了aeCreateFileEvent()來註冊的
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
            }

            /* Fire the writable event. */
            // 可寫的文件描述符
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    // rfileProc是爲每一個文件描述符註冊的寫事件
                    // 好比監聽端口的服務端文件描述符,則是在server.c中的initServer()函數中,
                    // 調用了aeCreateFileEvent()來註冊的
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            /* If we have to invert the call, fire the readable event now
             * after the writable one. */
            if (invert && fe->mask & mask & AE_READABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            processed++;
        }
    }

    // 檢查並執行時間事件(若是有須要觸發的話)
    /* Check time events */
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}

大概流程

其餘相關函數的做用

函數 做用
aeEventLoop *aeCreateEventLoop(int setsize) aeEventLoop的建立和初始化函數
void aeDeleteEventLoop(aeEventLoop *eventLoop) 刪除釋放aeDeleteEventLoop的函數
void aeStop(aeEventLoop *eventLoop) 中止事件循環
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,aeFileProc *proc, void *clientData) 爲文件描述符fd註冊事件,並將其添加到eventLoop->events
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) 爲文件描述符fd註銷事件,並將其從eventLoop->events中移除
int aeGetFileEvents(aeEventLoop *eventLoop, int fd) eventLoop->events中獲取fd註冊時的mask
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,aeTimeProc *proc, void *clientData,aeEventFinalizerProc *finalizerProc) 註冊添加時間事件
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id) 刪除時間事件
int aeProcessEvents(aeEventLoop *eventLoop, int flags) 事件循環的主要函數
int aeWait(int fd, int mask, long long milliseconds) 阻塞等待文件描述符fd讀/寫/錯誤 就緒
void aeMain(aeEventLoop *eventLoop) Main函數
char *aeGetApiName(void) 獲取所使用的多路複用的名字(evport,epoll,kqueue,select)
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) 設置阻塞前處理函數
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep) 設置阻塞後處理函數
int aeGetSetSize(aeEventLoop *eventLoop) 獲取eventLoopsetsize
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize) 重置eventLoopsetsize

事件

從上面能夠看出,事件循環裏面,一共處理了3大類的時間:ui

  • BeforeSleepProcAfterSleepProc的阻塞先後處理
  • 文件描述符就緒的事件
  • 時間事件

下面來看看它們是作了哪些事情this

BeforeSleepProc

server.c中,將beforeSleep()設置爲阻塞前的處理函數idea

/* This function gets called every time Redis is entering the
 * main loop of the event driven library, that is, before to sleep
 * for ready file descriptors. */
void beforeSleep(struct aeEventLoop *eventLoop) {
    UNUSED(eventLoop);

    /* Call the Redis Cluster before sleep function. Note that this function
     * may change the state of Redis Cluster (from ok to fail or vice versa),
     * so it's a good idea to call it before serving the unblocked clients
     * later in this function. */
    // 啓動集羣
    if (server.cluster_enabled) clusterBeforeSleep();

    /* Run a fast expire cycle (the called function will return
     * ASAP if a fast cycle is not needed). */
    // 過時鍵的回收
    if (server.active_expire_enabled && server.masterhost == NULL)
        activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);

    /* Send all the slaves an ACK request if at least one client blocked
     * during the previous event loop iteration. */
    // 主從的響應
    if (server.get_ack_from_slaves) {
        robj *argv[3];

        argv[0] = createStringObject("REPLCONF",8);
        argv[1] = createStringObject("GETACK",6);
        argv[2] = createStringObject("*",1); /* Not used argument. */
        replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);
        decrRefCount(argv[0]);
        decrRefCount(argv[1]);
        decrRefCount(argv[2]);
        server.get_ack_from_slaves = 0;
    }

    /* Unblock all the clients blocked for synchronous replication
     * in WAIT. */
    if (listLength(server.clients_waiting_acks))
        processClientsWaitingReplicas();

    /* Check if there are clients unblocked by modules that implement
     * blocking commands. */
    moduleHandleBlockedClients();

    /* Try to process pending commands for clients that were just unblocked. */
    if (listLength(server.unblocked_clients))
        processUnblockedClients();

    /* Write the AOF buffer on disk */
    // 寫AOF
    flushAppendOnlyFile(0);

    /* Handle writes with pending output buffers. */
    handleClientsWithPendingWrites();

    /* Before we are going to sleep, let the threads access the dataset by
     * releasing the GIL. Redis main thread will not touch anything at this
     * time. */
    // 釋放GIL鎖
    if (moduleCount()) moduleReleaseGIL();
}

能夠看出,其主要工做有:線程

  • 在阻塞前對集羣進行必定處理
  • 啓動過時鍵的回收
  • 主從的響應
  • 將AOF寫入硬盤
  • 釋放GIL鎖

因此,在阻塞的啓動,其實Redis是會存在其餘線程對Redis的數據集等進行處理的。

但在響應客戶端請求其中,有且只會有主線程對數據集進行操做,因此使得請求是串行訪問。

AfterSleepProc

server.c中,將afterSleep()設置爲阻塞後的處理函數

/* This function is called immadiately after the event loop multiplexing
 * API returned, and the control is going to soon return to Redis by invoking
 * the different events callbacks. */
void afterSleep(struct aeEventLoop *eventLoop) {
    UNUSED(eventLoop);
    if (moduleCount()) moduleAcquireGIL();
}

在事件循環阻塞後,第一件事是設置GIL鎖,使得只有主線程對數據集進行操做。

文件事件

  1. 爲Server端的接口(TCP Socket,Unix Socket,管道)客戶端鏈接的可讀事件(在server.cinitServer()函數中)
  2. 爲各個客戶端鏈接的Socket添加讀/寫事件(在networking.c中)
  3. AOF的管道(Pipe)添加讀/寫事件(在aof.c中)
  4. Cluster集羣鏈接的讀/寫事件(在cluster.c中)
  5. 主從複製鏈接的讀/寫事件(在replication.c中)
  6. Redis哨兵模式鏈接的讀/寫事件(在sentinel.c中)

時間事件

server.c中,將serverCron()設置爲一個時間事件。

從註釋上看,serverCron()作了挺多東西的:

  • 激活過時鍵的回收
  • 看門狗
  • 更新統計信息
  • 對Redis DB的 Hash表 進行 rehash
  • 觸發BGSAVE / AOF 的重寫
  • 處理各類客戶端的超時
  • 主從複製的重連
  • 其餘工做

serverCron()中,挺巧妙的利用了run_with_period()使得每一個工做都間隔一段事件執行,避免操做太頻繁。

serverCron()具體的代碼就不貼出來了,由於和其餘各類功能都有關聯,就不是事件循環裏面寫了。

相關文章
相關標籤/搜索