Redis 源碼走讀(一)事件驅動機制與命令處理

eventloop 

從 server.c 的 main 方法看起react

int main(int argc, char **argv) {
.......

    aeSetBeforeSleepProc(server.el,beforeSleep);
    aeSetAfterSleepProc(server.el,afterSleep);
    aeMain(server.el);
    aeDeleteEventLoop(server.el);
    return 0;
}

aeMain.credis

//在死循環中調用 aeProcessEvents 方法,處理能夠執行的 time event 與 file event
// 在 server.c 的 main 函數中會被調用
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}
/* Process every pending time event, then every pending file event
 * (that may be registered by time event callbacks just processed).
 * Without special flags the function sleeps until some file event
 * fires, or when the next time event occurs (if any).
 *
 * If flags is 0, the function does nothing and returns.
 * if flags has AE_ALL_EVENTS set, all the kind of events are processed.
 * if flags has AE_FILE_EVENTS set, file events are processed.
 * if flags has AE_TIME_EVENTS set, time events are processed.
 * if flags has AE_DONT_WAIT set the function returns ASAP until all
 * if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called.
 * the events that's possible to process without to wait are processed.
 *
 * The function returns the number of events processed. */
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
........

    /* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
    //優先執行 time event
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;

        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            //找到time event 鏈表裏,最近的 time event
            shortest = aeSearchNearestTimer(eventLoop);
        //計算從如今起到這個time event 被執行,要等待多久
        if (shortest) {
            long now_sec, now_ms;

            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;

            /* How many milliseconds we need to wait for the next
             * time event to fire? */
            long long ms =
                (shortest->when_sec - now_sec)*1000 +
                shortest->when_ms - now_ms;

            if (ms > 0) {
                tvp->tv_sec = ms/1000;
                tvp->tv_usec = (ms % 1000)*1000;
            } else {
                tvp->tv_sec = 0;
                tvp->tv_usec = 0;
            }
        } else {
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
            if (flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                tvp = NULL; /* wait forever */
            }
        }

        /* Call the multiplexing API, will return only on timeout or when
         * some event fires. */
        //調用 IO 多路複用的代碼,找到可讀寫的 file event
        numevents = aeApiPoll(eventLoop, tvp);

        /* After sleep callback. */
        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
            eventLoop->aftersleep(eventLoop);

        //遍歷 event loop 的 fired 數組對應的 fd
        for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;//記錄了事件類型:read/write
            int fd = eventLoop->fired[j].fd;//事件的 fd
            int fired = 0; /* Number of events fired for current fd. */

            /* Normally we execute the readable event first, and the writable
             * event laster. This is useful as sometimes we may be able
             * to serve the reply of a query immediately after processing the
             * query.
             *
             * However if AE_BARRIER is set in the mask, our application is
             * asking us to do the reverse: never fire the writable event
             * after the readable. In such a case, we invert the calls.
             * This is useful when, for instance, we want to do things
             * in the beforeSleep() hook, like fsynching a file to disk,
             * before replying to a client. */
            int invert = fe->mask & AE_BARRIER;

        /* Note the "fe->mask & mask & ..." code: maybe an already
             * processed event removed an element that fired and we still
             * didn't processed, so we check if the event is still valid.
             *
             * Fire the readable event if the call sequence is not
             * inverted. */
            if (!invert && fe->mask & mask & AE_READABLE) {
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
            }

            /* Fire the writable event. */
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            /* If we have to invert the call, fire the readable event now
             * after the writable one. */
            if (invert && fe->mask & mask & AE_READABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            processed++;
        }
    }
    /* Check time events */
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}

標準的事件驅動框架,在死循環中調用aeProcessEvents方法算法

aeProcessEvents 方法比較長,裏面會處理兩種事件TimeEvent 與 FileEvent,本文關注的重點是 FileEventapi

aeProcessEvents 調用 aeApiPoll 方法來查找監聽的 fd 上有哪些是可用的,找到可用的 fd 以後,根據 fd 的事件類型,決定調用 wfileProc 仍是rfileProc 來處理相關的事件, 本文裏咱們關心的是 client 發來的 command 會被如何處理,那就是rfileProc了,rfileProc的設置過程在後文中被說起數組

aeApiPoll 在多個文件中被實現,Redis 用條件編譯的手法決定採用哪一種實現,頗有意思app

/* Include the best multiplexing layer supported by this system.
 * The following should be ordered by performances, descending. */
 //用宏實現編譯期重載,很穩
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif

就看最經典的 epoll 好了:框架

typedef struct aeApiState {
    int epfd;
    struct epoll_event *events;
} aeApiState;

//建立eventloop
static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));

    if (!state) return -1;
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
    if (!state->events) {
        zfree(state);
        return -1;
    }
    state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
    if (state->epfd == -1) {
        zfree(state->events);
        zfree(state);
        return -1;
    }
    eventLoop->apidata = state;
    return 0;
}

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0}; /* avoid valgrind warning */
    /* If the fd was already monitored for some event, we need a MOD
     * operation. Otherwise we need an ADD operation. */
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;//epoll_ctl函數的 op 參數的可能的取值:EPOLL_CTL_ADD 註冊、EPOLL_CTL_MOD 修 改、EPOLL_CTL_DEL 刪除

    ee.events = 0;
    //同時修改 eventLoop 裏 event 的 mask 標記,和關聯的 epoll fd 所監聽的事件集合
    mask |= eventLoop->events[fd].mask; /* Merge old events */
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
}

//傳入的 tvp 是 epoll 超時時間,若是 tvp 爲 null,則永久阻塞
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    if (retval > 0) {
        int j;

        numevents = retval;
        //遍歷可讀寫的 fd
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;

            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;

            //設置 eventLoop.fired 數組裏的元素,這些元素表明可讀寫的 fd
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}

代碼不算複雜,實際上對系統調用作了一層簡單的封裝socket

調用 epoll_ctl 方法來註冊監聽 fdtcp

調用 epoll_wait 方法來等待,直到被監聽的 fd 上有事件發生爲止函數

比較有趣的作法是aeFileEvent 結構體裏定義了一個 mask 屬性來記錄這個 fd 被監聽的事件,應該是爲了便於後續查找。

 

 

新 client 創建鏈接

networking.c

client *createClient(int fd) {
    client *c = zmalloc(sizeof(client));

    //fd == -1,說明這是一個用於執行 lua 腳本的無鏈接的僞客戶端,能夠省去一些開銷
    /* passing -1 as fd it is possible to create a non connected client.
     * This is useful since all the commands needs to be executed
     * in the context of a client. When commands are executed in other
     * contexts (for instance a Lua script) we need a non connected client. */
    if (fd != -1) {
        anetNonBlock(NULL,fd);//將這個 fd 設爲 non block 模式
        anetEnableTcpNoDelay(NULL,fd);//調用 setsockopt 方法,禁止使用nagle 算法,確保數據包能儘量快速的發出去
        if (server.tcpkeepalive)
            anetKeepAlive(NULL,fd,server.tcpkeepalive);
        // 給這個 client 關聯的 fd 註冊 read 事件處理函數:readQueryFromClient,其定義在文件尾部
        if (aeCreateFileEvent(server.el,fd,AE_READABLE,
            readQueryFromClient, c) == AE_ERR)
        {
            close(fd);
            zfree(c);
            return NULL;
        }
    }

調用 aeCreateFileEvent 方法給這個 fd 註冊 read 事件處理函數 readQueryFromClient,也就是設置到這個 fd 的 rfileProc 屬性裏

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    aeFileEvent *fe = &eventLoop->events[fd];

    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

 

當 client 發送 command 過來的時候,eventloop 會發現這個 fd 可讀,而後調用 readQueryFromClient 進行處理

 

處理client 發送的 command

//回調函數,這個函數被觸發的時候,說明 client 觸發了 read 事件
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
.....
    /* Time to process the buffer. If the client is a master we need to
     * compute the difference between the applied offset before and after
     * processing the buffer, to understand how much of the replication stream
     * was actually applied to the master state: this quantity, and its
     * corresponding part of the replication stream, will be propagated to
     * the sub-slaves and to the replication backlog. */
    if (!(c->flags & CLIENT_MASTER)) {
        processInputBuffer(c);//非 master
    } else {
        //本機爲 master,除了處理 buffer 裏的命令,還要解決主從複製的問題
        size_t prev_offset = c->reploff;
        processInputBuffer(c);
        size_t applied = c->reploff - prev_offset;
        if (applied) {
            replicationFeedSlavesFromMasterStream(server.slaves,
                    c->pending_querybuf, applied);
            sdsrange(c->pending_querybuf,applied,-1);
        }
    }
}

當 fd 可讀時,eventloop 會觸發 readQueryFromClient 這個回調函數,再調用 processInputBuffer 函數

/* This function is called every time, in the client structure 'c', there is
 * more query buffer to process, because we read more data from the socket
 * or because a client was blocked and later reactivated, so there could be
 * pending query buffer, already representing a full command, to process. */
void processInputBuffer(client *c) {
.....

        if (c->reqtype == PROTO_REQ_INLINE) {
            if (processInlineBuffer(c) != C_OK) break;
        } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
            if (processMultibulkBuffer(c) != C_OK) break;
        } else {
            serverPanic("Unknown request type");
        }

        /* Multibulk processing could see a <= 0 length. */
        if (c->argc == 0) {
            resetClient(c);
        } else {
            /* Only reset the client when the command was executed. */
            //終於開始執行 command 了
            if (processCommand(c) == C_OK) {
                if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
                    /* Update the applied replication offset of our master. */
                    c->reploff = c->read_reploff - sdslen(c->querybuf);
                }

                /* Don't reset the client structure for clients blocked in a
                 * module blocking command, so that the reply callback will
                 * still be able to access the client argv and argc field.
                 * The client will be reset in unblockClientFromModule(). */
                if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE)
                    resetClient(c);
            }
            /* freeMemoryIfNeeded may flush slave output buffers. This may
             * result into a slave, that may be the active client, to be
             * freed. */
            if (server.current_client == NULL) break;
        }
    }
    server.current_client = NULL;
}

調用processCommand 方法,顧名思義,裏面會對 client 發來的指令作處理

其實現位於server.c 裏

/* If this function gets called we already read a whole
 * command, arguments are in the client argv/argc fields.
 * processCommand() execute the command or prepare the
 * server for a bulk read from the client.
 *
 * If C_OK is returned the client is still alive and valid and
 * other operations can be performed by the caller. Otherwise
 * if C_ERR is returned the client was destroyed (i.e. after QUIT). */
int processCommand(client *c) {
......
    /* Now lookup the command and check ASAP about trivial error conditions
     * such as wrong arity, bad command name and so forth. */
    // 從 command dict 裏查找對應的 command 實現,
    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
    //檢查 command 是否存在,以及參數的數量是否正確
    if (!c->cmd) {
        flagTransaction(c);
        addReplyErrorFormat(c,"unknown command '%s'",
            (char*)c->argv[0]->ptr);
        return C_OK;
    } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
               (c->argc < -c->cmd->arity)) {
        flagTransaction(c);
        addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
            c->cmd->name);
        return C_OK;
    }
.....

    //前面是檢查參數和處理各類異常狀況
    /* Exec the command */
    //若是處在 multi 命令開啓的事務環境中
    if (c->flags & CLIENT_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
    {
        //把命令放到 queue 裏
        queueMultiCommand(c);
        addReply(c,shared.queued);
    } else {
        //執行非事務,普通命令,實現位於本文件的2200多行
 call(c,CMD_CALL_FULL);
        c->woff = server.master_repl_offset;
        if (listLength(server.ready_keys))
            handleClientsBlockedOnKeys();
    }
    return C_OK;
}

這個方法有兩個關鍵點:

1. 調用 lookupCommand 方法查找 client 提交的 command 對應的實現(redis server 啓動的時候會初始化一個 dict,裏面存放了 command 名稱到實現函數的映射關係,去這個 dict 裏查就行了)

2. 執行函數,咱們先不關注事務,只看最簡單的普通命令,那麼會調用call 方法

其實現位於 server.c 裏

void call(client *c, int flags) {
......
    /* Call the command. */
    dirty = server.dirty;
    start = ustime();
    c->cmd->proc(c);//執行命令
    duration = ustime()-start;//計算命令執行時間
    dirty = server.dirty-dirty;
    if (dirty < 0) dirty = 0;
....
}

主要是用 cmd 的 proc 屬性,一個函數指針來完成實際操做

至於 cmd 和它的 proc 屬性,是在上一步的 lookupCommand 方法裏被設置的。

例如最簡單的 get 方法,就對應於getCommand 這個方法:

    {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},

其具體實現位於t_string.c 裏,細節暫時就不跟進了。

如今咱們就大體上能理解client 發送的 command 的流轉過程了。

相關文章
相關標籤/搜索