【Redis5源碼學習】淺析redis中的IO多路複用與事件機制

baiyanredis

引入

讀這篇文章以前請先閱讀:淺析服務器併發IO性能提高之路—從網絡編程基礎到epoll,以更好的理解本文的內容,謝謝。
咱們知道,咱們在使用redis的時候,經過客戶端發送一個get命令,就可以獲得redis服務端返回的數據。redis是基於傳統的C/S架構實現的。它經過監聽一個TCP端口(6379)的方式來接收來自客戶端的鏈接,從而進行後續命令的執行,並把執行結果返回給客戶端。編程

redis是一個合格的服務端程序

咱們先思考一個問題:做爲一個合格的服務端程序,咱們在命令行輸入一個get命令以後,redis服務端是怎麼處理這個命令,並把結果返回給客戶端的呢?
要回答這個問題,咱們先回顧上一篇文章中講過的,客戶端與服務器須要分別建立一個套接字代表本身所在的網絡地址與端口號,而後基於TCP協議來進行套接字之間的通訊。一般狀況下,一個服務端程序的socket通訊流程以下:segmentfault

int main(int argc, char *argv[]) {
    listenSocket = socket(); //調用socket()系統調用建立一個監聽套接字描述符
    bind(listenSocket);  //綁定地址與端口
    listen(listenSocket); //由默認的主動套接字轉換爲服務器適用的被動套接字
    while (1) { //不斷循環去監聽是否有客戶端鏈接事件到來
        connSocket = accept($listenSocket); //接受客戶端鏈接
        read(connsocket); //從客戶端讀取數據,只能同時處理一個客戶端
        write(connsocket); //返回給客戶端數據,只能同時處理一個客戶端
    }
    return 0;
}

在redis中,一樣要通過以上幾個步驟。與客戶端創建鏈接以後,就會讀取客戶端發來的命令,而後執行命令,最後經過調用write系統調用,將命令的執行結果返回給客戶端。
可是這樣一個進程只能同時處理一個客戶端的鏈接與讀寫事件。爲了讓單進程的服務端應用同時處理多個客戶端的事件,咱們採用了IO多路複用機制。目前最好的IO多路複用機制就是epoll。回顧咱們上一篇文章中最終使用epoll建立的服務器代碼:api

int main(int argc, char *argv[]) {

    listenSocket = socket(AF_INET, SOCK_STREAM, 0); //同上,建立一個監聽套接字描述符
    
    bind(listenSocket)  //同上,綁定地址與端口
    
    listen(listenSocket) //同上,由默認的主動套接字轉換爲服務器適用的被動套接字
    
    epfd = epoll_create(EPOLL_SIZE); //建立一個epoll實例
    
    ep_events = (epoll_event*)malloc(sizeof(epoll_event) * EPOLL_SIZE); //建立一個epoll_event結構存儲套接字集合
    event.events = EPOLLIN;
    event.data.fd = listenSocket;
    
    epoll_ctl(epfd, EPOLL_CTL_ADD, listenSocket, &event); //將監聽套接字加入到監聽列表中
    
    while (1) {
    
        event_cnt = epoll_wait(epfd, ep_events, EPOLL_SIZE, -1); //等待返回已經就緒的套接字描述符們
        
        for (int i = 0; i < event_cnt; ++i) { //遍歷全部就緒的套接字描述符
            if (ep_events[i].data.fd == listenSocket) { //若是是監聽套接字描述符就緒了,說明有一個新客戶端鏈接到來
            
                connSocket = accept(listenSocket); //調用accept()創建鏈接
                
                event.events = EPOLLIN;
                event.data.fd = connSocket;
                
                epoll_ctl(epfd, EPOLL_CTL_ADD, connSocket, &event); //添加對新創建的鏈接套接字描述符的監聽,以監聽後續在鏈接描述符上的讀寫事件
                
            } else { //若是是鏈接套接字描述符事件就緒,則能夠進行讀寫
            
                strlen = read(ep_events[i].data.fd, buf, BUF_SIZE); //從鏈接套接字描述符中讀取數據, 此時必定會讀到數據,不會產生阻塞
                if (strlen == 0) { //已經沒法從鏈接套接字中讀到數據,須要移除對該socket的監聽
                
                    epoll_ctl(epfd, EPOLL_CTL_DEL, ep_events[i].data.fd, NULL); //刪除對這個描述符的監聽
                    
                    close(ep_events[i].data.fd);
                } else {
                    write(ep_events[i].data.fd, buf, str_len); //若是該客戶端可寫 把數據寫回到客戶端
                }
            }
        }
    }
    close(listenSocket);
    close(epfd);
    return 0;
}

redis基於原有的select、poll與epoll機制,結合本身獨特的業務需求,封裝了本身的一套事件處理函數,咱們把它叫作ae(a simple event-driven programming library)。而redis具體使用select、epoll仍是mac上的kqueue技術,redis會首先進行判斷,而後選擇性能最優的那個:數組

/* Include the best multiplexing layer supported by this system.
 * The following should be ordered by performances, descending. */
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif

由於 select 函數是做爲 POSIX 標準中的系統調用,在不一樣版本的操做系統上都會實現,因此將其做爲兜底方案。爲了講述方便,後面的文章均使用epoll機制來說解。服務器

redis中的IO多路複用

當咱們在命令行中啓動一個redis-server的時候,redis其實作了和咱們以前寫的epoll服務器相似的操做,重點的函數調用有如下三個:網絡

int main(int argc, char **argv) {
    ...
    initServerConfig(); //初始化存儲服務端信息的結構體
    ...
    initServer(); //初始化redis事件循環並調用epoll_create與epoll_ctl。建立socket、bind、listen、accept都在這個函數中進行調用,並註冊調用後返回的監聽描述符和鏈接描述符
    ...
    aeMain(); //執行while(1)事件循環,並調用epoll_wait獲取已就緒的描述符,並調用對應的handler
    ...
}

接下來咱們一個一個來看:架構

initServerConfig()

redis服務端的全部信息都存儲在一個redisServer結構體中,這個結構體字段很是多,好比服務端的套接字信息(如地址和端口),還有不少支持redis其餘功能如集羣、持久化等的配置信息都存儲在這個結構體中。這個函數調用就是對redisServer結構體的全部字段進行初始化並賦一個初始值。因爲咱們此次講解的是事件與IO多路複用機制在redis中的應用,因此咱們只關注其中的幾個字段便可。併發

initServer()

這個函數調用是咱們的重中之重。初始化完服務器的相關信息以後,就須要進行套接字的建立、綁定、監聽並與客戶端創建鏈接了。在這個函數中,進行了咱們常說的建立socket、bind、listen、accept、epoll_create、epoll_ctl調用,咱們能夠對照上文的epoll服務器,逐步瞭解redis的事件機制。initServer()的主要函數調用以下:socket

void initServer(void) {
    ...
    server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR); 
    ...

    if (server.port != 0 && listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
        exit(1);
    ...

    for (j = 0; j < server.ipfd_count; j++) {
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR){
                serverPanic("Unrecoverable error creating server.ipfd file event.");
       }
    }
    ...
}

咱們按照從上到下的順序解讀這幾行關鍵代碼:

aeCreateEventLoop()

在redis中,有一個aeEventLoop的概念,它來管理全部相關的事件描述字段、存儲已註冊的事件、已就緒的事件:

typedef struct aeEventLoop {
    int stop; //標識事件循環(即while(1))是否結束
    
    aeFileEvent *events;  //存儲已經註冊的文件事件(文件事件即客戶端鏈接與讀寫事件)
    aeFiredEvent *fired;  //存儲已就緒的文件事件
    aeTimeEvent *timeEventHead; //存儲時間事件(時間事件後面再講)
    
    void *apidata; /* 存儲epoll相關信息 */
    
    aeBeforeSleepProc *beforesleep; //事件發生前須要調用的函數
    aeBeforeSleepProc *aftersleep; //事件發生後須要調用的函數
} aeEventLoop;

redis將全部經過epoll_wait()返回的就緒描述符都存儲在fired數組中,而後遍歷這個數組,並調用對應的事件處理函數,一次性處理完全部事件。在aeCreateEventLoop()函數中,對這個管理全部事件信息的結構體字段進行了初始化,這裏面也包括調用epoll_create(),對epoll的epfd進行初始化:

aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop;
    int i;

    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
    eventLoop->setsize = setsize;
    eventLoop->lastTime = time(NULL);
    eventLoop->timeEventHead = NULL;
    eventLoop->timeEventNextId = 0;
    eventLoop->stop = 0;
    eventLoop->maxfd = -1;
    eventLoop->beforesleep = NULL;
    eventLoop->aftersleep = NULL;
    if (aeApiCreate(eventLoop) == -1) goto err; //調用aeApiCreate(),內部會調用epoll_create()
    for (i = 0; i < setsize; i++)
        eventLoop->events[i].mask = AE_NONE;
    return eventLoop;
}

在aeApiCreate()函數中,調用了epoll_create(),並將建立好的epfd放到eventLoop結構體的apidata字段保管:

typedef struct aeApiState {
    int epfd;
    struct epoll_event *events;
} aeApiState;

static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));

    if (!state) return -1;
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
    if (!state->events) {
        zfree(state);
        return -1;
    }
    state->epfd = epoll_create(1024); /* 調用epoll_create初始化epoll的epfd */
    if (state->epfd == -1) {
        zfree(state->events);
        zfree(state);
        return -1;
    }
    eventLoop->apidata = state; //將建立好的epfd放到eventLoop結構體的apidata字段保管
    return 0;
}

listenToPort()

在建立完epfd以後,咱們就要進行socket建立、綁定、監聽的操做了,這幾步在listenToPort()函數來進行:

int listenToPort(int port, int *fds, int *count) {
    if (server.bindaddr_count == 0) server.bindaddr[0] = NULL;
    for (j = 0; j < server.bindaddr_count || j == 0; j++) { //遍歷全部的ip地址
        if (server.bindaddr[j] == NULL) { //尚未綁定地址
           ...
        } else if (strchr(server.bindaddr[j],':')) { //綁定IPv6地址
            ...
        } else { //綁定IPv4地址,通常會進到這個if分支中
            fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j], server.tcp_backlog);  //真正的綁定邏輯
        }
        ...
    }
    return C_OK;
}

redis會先進行綁定ip地址類型的判斷,咱們通常是IPv4,因此通常會走到第三個分支,調用anetTcpServer()函數來進行具體的綁定邏輯:

static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog)
{
   ...
    if ((rv = getaddrinfo(bindaddr,_port,&hints,&servinfo)) != 0) {
        anetSetError(err, "%s", gai_strerror(rv));
        return ANET_ERR;
    }
    for (p = servinfo; p != NULL; p = p->ai_next) {
        if ((s = socket(p->ai_family,p->ai_socktype,p->ai_protocol)) == -1) //調用socket()建立一個監聽套接字
            continue;

        if (af == AF_INET6 && anetV6Only(err,s) == ANET_ERR) goto error;
        if (anetSetReuseAddr(err,s) == ANET_ERR) goto error;
        if (anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog) == ANET_ERR) s = ANET_ERR; //調用bind()與listen()綁定端口並轉化爲服務端被動套接字
        goto end;
    }
}

在調用socket()系統調用建立了套接字以後,須要進一步調用bind()與listen(),這兩步是在anetListen()函數內部實現的:

static int anetListen(char *err, int s, struct sockaddr *sa, socklen_t len, int backlog) {
    if (bind(s,sa,len) == -1) { //調用bind()綁定端口
        anetSetError(err, "bind: %s", strerror(errno));
        close(s);
        return ANET_ERR;
    }

    if (listen(s, backlog) == -1) { //調用listen()將主動套接字轉換爲被動監聽套接字
        anetSetError(err, "listen: %s", strerror(errno));
        close(s);
        return ANET_ERR;
    }
    return ANET_OK;
}

看到這裏,咱們知道redis和咱們寫過的epoll服務器同樣,都是須要進行套接字建立、綁定、監聽的過程。

aeCreateFileEvent

在redis中,把客戶端鏈接事件、讀寫事件統稱爲文件事件。咱們剛纔完成了socket建立、bind、listen的過程。目前咱們已經有了一個監聽描述符,那麼咱們須要首先將監聽描述符添加到epoll的監聽列表,以監聽客戶端的鏈接事件。在initServer()中,經過調用aeCreateFileEvent(),同時指定了它的事件處理函數acceptTcpHandler()來實現對客戶端鏈接事件的處理:

for (j = 0; j < server.ipfd_count; j++) {
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR){
                serverPanic("Unrecoverable error creating server.ipfd file event.");
        }
    }

跟進aeCreateFileEvent()函數,發現其內部進一步調用了aeApiAddEvent()函數:

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) {
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    aeFileEvent *fe = &eventLoop->events[fd];

    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0}; 
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    ee.events = 0;
    mask |= eventLoop->events[fd].mask;
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1; //調用epoll_ctl添加客戶端鏈接事件
    return 0;
}

aeApiAddEvent函數會調用epoll_ctl(),將客戶端鏈接事件添加到監聽列表。同時,redis會將該事件的處理函數放到aeFileEvent結構體中進行存儲:

typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
    aeFileProc *rfileProc; //讀事件處理程序
    aeFileProc *wfileProc; //寫事件處理程序
    void *clientData;  //客戶端數據
} aeFileEvent;

對照以前咱們寫過的epoll服務端程序,咱們已經實現瞭如下幾個步驟:

int main(int argc, char *argv[]) {

    listenSocket = socket(AF_INET, SOCK_STREAM, 0); //建立一個監聽套接字描述符
    
    bind(listenSocket)  //綁定地址與端口
    
    listen(listenSocket) //由默認的主動套接字轉換爲服務器適用的被動套接字
    
    epfd = epoll_create(EPOLL_SIZE); //建立一個epoll實例
    
    ep_events = (epoll_event*)malloc(sizeof(epoll_event) * EPOLL_SIZE); //建立一個epoll_event結構存儲套接字集合
    event.events = EPOLLIN;
    event.data.fd = listenSocket;
    
    epoll_ctl(epfd, EPOLL_CTL_ADD, listenSocket, &event); //將監聽套接字加入到監聽列表中
   ...
}

咱們已經實現了對套接字的建立、bind、listen,已經過epoll_create()實現了epfd的建立,並將初始的監聽套接字描述符事件添加到了epoll的監聽列表中,併爲他指定了事件處理函數。下一步,就應該到了while(1)循環調用epoll_wait()的階段了。經過阻塞調用epoll_wait(),返回全部已經就緒的套接字描述符,觸發相應事件,而後對事件進行處理。

aeMain()

最後就是經過while(1)循環,等待客戶端鏈接事件的到來啦:

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}

在eventLoop中,採用stop標誌來斷定循環是否結束。若是沒有結束,那麼循環調用aeProcessEvents()。咱們猜想,這裏面就調用了epoll_wait(),阻塞等待事件的到來,而後遍歷全部就緒的套接字描述符,而後調用對應的事件處理函數便可:

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
        numevents = aeApiPoll(eventLoop, tvp); //調用epoll_wait()
        ...
}

咱們跟進aeApiPoll,來看看epoll_wait()是如何調用的:

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata; //
    int retval, numevents = 0;

    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    if (retval > 0) {
        int j;
        numevents = retval;
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;

            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}

首先從eventLoop中拿出以前在aeApiCreate()中建立的epfd與已經註冊的事件集合,調用epoll_wait()等待事件們的到來,並返回全部就緒事件的描述符集合。隨後,遍歷全部就緒的描述符集合,判斷它是什麼類型的描述符,是可讀仍是可寫的,隨後將全部就緒可處理的事件存儲到eventLoop中的fired數組中,並把相應數組位置上的可讀仍是可寫標記也一併存儲。
回到外部調用處,咱們如今已經把全部可以處理的事件都放到了fired數組中,那麼咱們就能夠經過遍歷這個數組,拿到全部能夠處理的事件,而後調用對應的事件處理函數:

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
        numevents = aeApiPoll(eventLoop, tvp); //調用epoll_wait()

        for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; //循環拿出全部就緒的事件
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int fired = 0; 

            if (!invert && fe->mask & mask & AE_READABLE) {
                fe->rfileProc(eventLoop,fd,fe->clientData,mask); //若是該事件是讀事件,調用讀事件處理函數
                fired++;
            }

            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask); //若是該事件是寫事件,調用寫事件處理函數
                    fired++;
                }
            }
        }
    }
    ...
}

至於如何區分是客戶端鏈接事件以及讀寫事件,redis經過指定不一樣的事件處理函數(如accept事件是acceptTcpHandler事件處理函數),讀或寫事件又是其餘的事件處理函數。經過這層封裝,免去了判斷套接字描述符類型的步驟,直接調用以前註冊的事件處理函數便可、
回顧咱們以前寫過的的epoll服務器,是否是和這一段代碼很類似呢?

while (1) {
    
        event_cnt = epoll_wait(epfd, ep_events, EPOLL_SIZE, -1); //等待返回已經就緒的套接字描述符們
        
        for (int i = 0; i < event_cnt; ++i) { //遍歷全部就緒的套接字描述符
            if (ep_events[i].data.fd == listenSocket) { //若是是監聽套接字描述符就緒了,說明有一個新客戶端鏈接到來
            
                connSocket = accept(listenSocket); //調用accept()創建鏈接
                
                event.events = EPOLLIN;
                event.data.fd = connSocket;
                
                epoll_ctl(epfd, EPOLL_CTL_ADD, connSocket, &event); //添加對新創建的鏈接套接字描述符的監聽,以監聽後續在鏈接描述符上的讀寫事件
                
            } else { //若是是鏈接套接字描述符事件就緒,則能夠進行讀寫
            
                strlen = read(ep_events[i].data.fd, buf, BUF_SIZE); //從鏈接套接字描述符中讀取數據, 此時必定會讀到數據,不會產生阻塞
                if (strlen == 0) { //已經沒法從鏈接套接字中讀到數據,須要移除對該socket的監聽
                
                    epoll_ctl(epfd, EPOLL_CTL_DEL, ep_events[i].data.fd, NULL); //刪除對這個描述符的監聽
                    
                    close(ep_events[i].data.fd);
                } else {
                    write(ep_events[i].data.fd, buf, str_len); //若是該客戶端可寫 把數據寫回到客戶端
                }
            }
        }
    }

總結

至此,咱們就掌握了redis中的IO多路複用場景。redis把全部鏈接與讀寫事件、還有咱們沒提到的時間事件一塊兒集中管理,並對底層IO多路複用機制進行了封裝,最終實現了單進程可以處理多個鏈接以及讀寫事件。這就是IO多路複用在redis中的應用。

相關文章
相關標籤/搜索