Redis 服務端程序實現原理

上篇咱們簡單介紹了 redis 客戶端的一些基本概念,包括其 client 數據結構中對應的相關字段的含義,本篇咱們結合這些,來分析分析 redis 服務端程序是如何運行的。一條命令請求的完成,客戶端服務端都經歷了什麼?服務端程序中定時函數 serverCron 都有哪些邏輯?java

1、redis 客戶端如何鏈接服務端

咱們日常最簡單的一個 redis 客戶端命令,redis-cli,這個命令會致使咱們的客戶端向服務端發起一個 connect 鏈接操做,具體就是如下幾個步驟。git

一、網絡鏈接程序員

第一步是網絡鏈接,也就是咱們的客戶端會與服務端進行 TCP 三次握手,並指明使用 socket 通訊協議。github

接着服務端 redis 使用 epoll 事件機制監聽端口的讀事件,一旦事件可讀則斷定是有客戶端嘗試創建鏈接,服務端會檢查最大容許鏈接數是否到達,若是達到則拒絕創建鏈接,不然服務端會建立一個 fd 文件描述符並返回給客戶端,表明鏈接成功創建。redis

二、更新客戶端鏈接信息數據庫

以前介紹 redis 客戶端的時候,咱們說過 redisServer 中有這麼一個字段:api

struct redisServer {
    ........
    list *clients;              /* List of active clients */
    ........
}
複製代碼

clients 字段是一個雙端鏈表結構,保存了全部成功創建鏈接的客戶端 client 信息,那麼咱們第二步就是建立一個 client 結構的客戶端抽象實例並添加到 redisServer 結構 clients 鏈表中。數組

三、爲新客戶端註冊讀事件bash

每個客戶端鏈接都對應一個 fd 文件描述符,咱們只須要監聽這個文件描述符的讀事件,便可判斷該套接字上是否有信息發送過來。服務器

這裏也同樣,咱們經過註冊該 fd 的讀事件,當該客戶端發送信息給服務端時,咱們無需去輪詢便可發現該客戶端在請求服務端的動做,繼而服務端程序解析命令。

2、redis 如何執行一條命令

redis 服務端程序啓動後,會初始化一些字段變量,爲 redisServer 中的一些字段賦默認值,還會讀取用戶指定的配置文件內容並加載配置,反應到具體數據結構內,最後會調用 asMain 函數進行事件循環監聽。

每當客戶端發起鏈接請求,或者發送命令過來,這裏的事件分發器就會監聽到套接字的可讀事件,因而找到可讀事件所綁定的事件處理器 readQueryFromClient,並調用它。

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    client *c = (client*) privdata;
    ........
    //讀取客戶端輸入緩衝區大小
    qblen = sdslen(c->querybuf);
    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
    //從 fd 文件描述符對應的 socket 中讀取命令數據
    //保存進 querybuf 輸入緩衝區
    nread = read(fd, c->querybuf+qblen, readlen);
    if (nread == -1) {
        if (errno == EAGAIN) {
            //異常返回
            return;
        } else {
            //異常釋放客戶端鏈接
            serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
            freeClient(c);
            return;
        }
    } else if (nread == 0) {
        //客戶端已經關閉、釋放客戶端
        serverLog(LL_VERBOSE, "Client closed connection");
        freeClient(c);
        return;
    } else if (c->flags & CLIENT_MASTER) {
       c->pending_querybuf = sdscatlen(c->pending_querybuf,
                                        c->querybuf+qblen,nread);
    }
    sdsIncrLen(c->querybuf,nread);
    c->lastinteraction = server.unixtime;
    if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
    server.stat_net_input_bytes += nread;
    //若是輸入緩衝區長度超過系統設置最大長度,釋放客戶端
    if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
        sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();

        bytes = sdscatrepr(bytes,c->querybuf,64);
        serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
        sdsfree(ci);
        sdsfree(bytes);
        freeClient(c);
        return;
    }
    if (!(c->flags & CLIENT_MASTER)) {
        processInputBuffer(c);
    } else {
        size_t prev_offset = c->reploff;
        //這裏會讀取緩衝區寫入的命令
        processInputBuffer(c);
        size_t applied = c->reploff - prev_offset;
        if (applied) {
            replicationFeedSlavesFromMasterStream(server.slaves,
                    c->pending_querybuf, applied);
            sdsrange(c->pending_querybuf,applied,-1);
        }
    }
}

複製代碼

總的來講,readQueryFromClient 主要完成的就是將 socket 中發來的命令讀取到客戶端輸入緩衝區,而後調用 processInputBuffer 處理緩衝區中的命令。

void processInputBuffer(client *c) {
    server.current_client = c;
    while(sdslen(c->querybuf)) {
        if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;
        if (c->flags & CLIENT_BLOCKED) break;
        if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;
        //判斷請求類型
        if (!c->reqtype) {
            if (c->querybuf[0] == '*') {
                c->reqtype = PROTO_REQ_MULTIBULK;
            } else {
                c->reqtype = PROTO_REQ_INLINE;
            }
        }
        //根據不一樣的請求類型,執行命令解析
        //實際上就是把命令的名稱、參數解析存入 argc 數組中
        if (c->reqtype == PROTO_REQ_INLINE) {
            if (processInlineBuffer(c) != C_OK) break;
        } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
            if (processMultibulkBuffer(c) != C_OK) break;
        } else {
            serverPanic("Unknown request type");
        }
        if (c->argc == 0) {
            resetClient(c);
        } else {
            //查找執行命令
            if (processCommand(c) == C_OK) {
                if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
                    c->reploff = c->read_reploff - sdslen(c->querybuf);
                }
                if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE)
                    resetClient(c);
            }
            if (server.current_client == NULL) break;
        }
    }
    server.current_client = NULL;
}
複製代碼

processCommand 函數會從客戶端實例命令參數字段中拿到命令的名稱、參數類型、參數值等等信息。redisServer 在成功啓動後,會調用 populateCommandTable 方法初始化 redisCommandTable,存入一個字典集合。

每個 redisCommand 是這麼一個數據結構:

struct redisCommand {
    //命令名稱
    char *name;
    //函數指針,指向一個具體實現
    redisCommandProc *proc;
    //參數個數
    int arity;
    //命令的類型,寫命令?讀命令?等
    char *sflags;
    int flags;    
    redisGetKeysProc *getkeys_proc;
    int firstkey; 
    int lastkey;  
    int keystep;
    //服務器啓動後共調用該命令次數
    //服務器啓動後執行該命令耗時總
    long long microseconds, calls;
};
複製代碼

processCommand 最後會找到命令,進而執行命令,並將命令執行的結果寫入客戶端輸出緩衝區,並將響應寫回客戶端。以上就是 redis 對於一條命令請求的執行過程,隨着咱們的不斷學習,以上內容會不斷深刻,如今你能夠理解的大概就好。

3、週期系統函數 serverCron

redis 能夠說是事件驅動中間件,它主要有兩種事件,文件事件和時間事件,文件事件咱們就很少說,時間事件主要分爲兩種,一種是定時事件,另外一種週期事件。

定時事件指的是,預約的程序將會在某個具體的時間節點執行。週期事件是指,預約程序每隔某個時間間隔就會被調用執行。

而咱們的 serverCron 顯然是一個週期時間事件,在正式分析其源碼實現以前,咱們先來看看它的前世今身,在哪裏被註冊,又是如何被調用的。

void initServer(void) {

    。。。。。
    
    if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
        serverPanic("Can't create event loop timers.");
        exit(1);
    }
    。。。。。
    
}
複製代碼

咱們 redis 服務器啓動初始化時,會調用 aeCreateTimeEvent 綁定一個 serverCron 的時間事件。

這是 redis 中事件循環結構

typedef struct aeEventLoop {
    int maxfd;   /* highest file descriptor currently registered */
    int setsize; /* max number of file descriptors tracked */
    long long timeEventNextId;
    time_t lastTime;     /* Used to detect system clock skew */
    aeFileEvent *events; /* Registered events */
    aeFiredEvent *fired; /* Fired events */
    aeTimeEvent *timeEventHead;
    int stop;
    void *apidata; /* This is used for polling API specific data */
    aeBeforeSleepProc *beforesleep;
    aeBeforeSleepProc *aftersleep;
} aeEventLoop;
複製代碼

其中指針 timeEventHead 是一個雙端鏈表,全部的時間事件都會以鏈表的形式存儲在這裏,具體指向的結構是 aeTimeEvent。

typedef struct aeTimeEvent {
    long long id; /* time event identifier. */
    //下一次何時被執行(單位秒)
    long when_sec; /* seconds */
    //下一次何時被執行(單位毫秒)
    long when_ms; /* milliseconds */
    //時間事件處理函數
    aeTimeProc *timeProc;
    aeEventFinalizerProc *finalizerProc;
    void *clientData;
    //先後鏈表指針
    struct aeTimeEvent *prev;
    struct aeTimeEvent *next;
} aeTimeEvent;
複製代碼

serverCron 在這裏會被建立並添加到時間事件鏈表中,並設置它下一次執行時間爲當前時間,具體你能夠自行深刻查看調用棧,那麼下一次時間事件檢查的時候,serverCron 就必定會被執行。

好了,至此 serverCron 已經註冊進 redis 的時間事件結構中,那麼何時檢查並調用呢?

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}
複製代碼

還記的咱們 redis 成功啓動後,會進入主事件循環中嗎?aeProcessEvents 裏面具體不一行行帶你們分析了,咱們挑相關的進行分析。

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    。。。。。
    //遍歷整個時間事件鏈表,找到最快要被執行的任務
    //計算與當前時間的差值
    if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
            long now_sec, now_ms;

            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;
            long long ms =
                (shortest->when_sec - now_sec)*1000 +
                shortest->when_ms - now_ms;
            //記錄差值保存進變量 tvp
            if (ms > 0) {
                tvp->tv_sec = ms/1000;
                tvp->tv_usec = (ms % 1000)*1000;
            } else {
                //已經錯過執行該時間事件,tvp 賦零
                tvp->tv_sec = 0;
                tvp->tv_usec = 0;
            }
        } else {
            if (flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                tvp = NULL; /* wait forever */
            }
        }
        //aeApiPoll 會處理文件事件,最長 tvp 時間就要返回
        numevents = aeApiPoll(eventLoop, tvp);
        。。。。。
        //檢查處理時間事件
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);
}
複製代碼

你看,實際上儘管咱們對週期時間事件指定了嚴格的執行間隔,但實際上大多數狀況下,時間事件會晚於咱們既定時間節點。

processTimeEvents 函數檢查全部時間事件函數,若是有符合條件應該獲得執行的,會當即執行該事件處理器,並根據事件處理器返回的狀態,刪除時間事件或設置下一次執行時間。

static int processTimeEvents(aeEventLoop *eventLoop) {
    。。。。。。
    //獲取當前時間
    aeGetTime(&now_sec, &now_ms);
    if (now_sec > te->when_sec ||
    (now_sec == te->when_sec && now_ms >= te->when_ms))
    {
        int retval;

        id = te->id;
        retval = te->timeProc(eventLoop, id, te->clientData);
        processed++;
        if (retval != AE_NOMORE) {
            //這是一個週期執行的時間事件,設置下次執行時間
            aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
        } else {
            //刪除事件
            te->id = AE_DELETED_EVENT_ID;
        }
    }
    te = te->next;
}
複製代碼

以上,你應該瞭解到 serverCron 什麼時候註冊的,什麼時候被執行,通過了哪些過程。下面咱們具體看 serverCron 的內容。

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    int j;
    UNUSED(eventLoop);
    UNUSED(id);
    UNUSED(clientData);

     if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period);

    //更新 server.unixtime 和 server.mxtime
    updateCachedTime();

    //每間隔 100 毫秒,統計一次這段時間內命令的執行狀況
    run_with_period(100) {
        trackInstantaneousMetric(STATS_METRIC_COMMAND,server.stat_numcommands);
        trackInstantaneousMetric(STATS_METRIC_NET_INPUT,
                server.stat_net_input_bytes);
        trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT,
                server.stat_net_output_bytes);
    }
    。。。。。。
}
複製代碼

其中 run_with_period 爲何能作到顯式控制 100 毫秒內只執行一次呢?

其實 run_with_period 的宏定義以下:

#define run_with_period(_ms_)
    if ((_ms_ <= 1000/server.hz) ||
    !(server.cronloops%((_ms_)/(1000/server.hz))))
複製代碼

server.hz 是 redisServer 結構中的一個字段,能夠容許咱們經過配置文件進行調節,它是一個整數,描述服務 serverCron 在一秒內執行 N 次。server.cronloops 描述服務器自啓動以來,共執行 serverCron 次數。

那麼,1000/server.hz 描述的就是 serverCron 每間隔多少毫秒就須要被執行,若是咱們傳入的 ms 小於這個間隔,返回 1 並立馬執行後續函數體。或者根據 serverCron 已經執行的次數,計算間隔時間是否達到,返回 0 或 1。

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    。。。。。
    //更新全局 lru 時鐘,這個用於每一個 redis 對象最長未訪問淘汰策略
    unsigned long lruclock = getLRUClock();
    atomicSet(server.lruclock,lruclock);

    //不斷比較當前內存使用量,存儲最高峯值內存使用量
    if (zmalloc_used_memory() > server.stat_peak_memory)
        server.stat_peak_memory = zmalloc_used_memory();

    server.resident_set_size = zmalloc_get_rss();

    // 若是收到了SIGTERM信號,嘗試退出
    if (server.shutdown_asap) {
        if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0);
        serverLog(LL_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");
        server.shutdown_asap = 0;
    }
    。。。。。。
}
複製代碼

lru 後面咱們會繼續說的,redis 維護一個全局 lru 時鐘參照,每一個 redisObject 結構中也會有一個本身的 lru 時鐘,它記錄的是上一次訪問該對象時的時鐘,這些信息會用於鍵值淘汰策略。因此,服務器會定時的更新這個全局 lru 時鐘,保證它準確。

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    。。。。。
    //每間隔五秒,輸出非空數據庫中的相關屬性信息
    run_with_period(5000) {
        for (j = 0; j < server.dbnum; j++) {
            long long size, used, vkeys;

            size = dictSlots(server.db[j].dict);
            used = dictSize(server.db[j].dict);
            vkeys = dictSize(server.db[j].expires);
            if (used || vkeys) {
                serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
                /* dictPrintStats(server.dict); */
            }
        }
    }

    //若是不是sentinel模式,則每5秒輸出一個connected的client的信息到log
    if (!server.sentinel_mode) {
        run_with_period(5000) {
            serverLog(LL_VERBOSE,
                "%lu clients connected (%lu slaves), %zu bytes in use",
                listLength(server.clients)-listLength(server.slaves),
                listLength(server.slaves),
                zmalloc_used_memory());
        }
    }
    。。。。。。
}
複製代碼
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    。。。。。
    clientsCron();

    databasesCron();
    。。。。。。
}
複製代碼

clientsCron 會檢查有哪些客戶端鏈接超時並將他們釋放,還會檢查客戶端的輸入緩衝區 querybuff 是否太大,或者該客戶端不是很活躍,那麼會釋放掉該客戶端的輸入緩衝區並從新建立一個默認大小的。

databasesCron 會首先隨機遍歷全部的數據庫並抽取 expired 集合中部分鍵,判斷是否過時並執行相應的刪除操做。除此以外,該函數還會隨機訪問部分數據庫,並根據其狀態觸發 rehash。

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {

    。。。。
    //若是服務沒有在執行 rdb 備份生成,也沒有在 aof 備份生成 
    //而且有被延遲的 aof rewrite,那麼這裏會執行
    //當服務器正在進行 BGSAVE 備份的期間,全部的 rewrite 請求都會被延遲
    if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
        server.aof_rewrite_scheduled)
    {
        rewriteAppendOnlyFileBackground();
    }
    //若是有 rdb 子進程或 aof 子進程
    if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
        ldbPendingChildren())
    {
        int statloc;
        pid_t pid;

        if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
            int exitcode = WEXITSTATUS(statloc);
            int bysignal = 0;

            if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
            //子進程 id 等於負一,說明子進程退出或異常,記錄日誌
            if (pid == -1) {
                serverLog(LL_WARNING,"wait3() returned an error: %s. "
                    "rdb_child_pid = %d, aof_child_pid = %d",
                    strerror(errno),
                    (int) server.rdb_child_pid,
                    (int) server.aof_child_pid);
            } else if (pid == server.rdb_child_pid) {
                //pid 指向 rdb 子進程 id
                //判斷若是子進程退出了,進行一些後續的 rdb 操做
                //更新 dirty,lastsave 時間等等
                backgroundSaveDoneHandler(exitcode,bysignal);
                if (!bysignal && exitcode == 0) receiveChildInfo();
            } else if (pid == server.aof_child_pid) {
                //pid 指向 aof 子進程 id
                //aof 子進程退出,處理其後續的一些收尾
                backgroundRewriteDoneHandler(exitcode,bysignal);
                if (!bysignal && exitcode == 0) receiveChildInfo();
            } else {
                if (!ldbRemoveChild(pid)) {
                    serverLog(LL_WARNING,
                        "Warning, detected child with unmatched pid: %ld",
                        (long)pid);
                }
            }
            updateDictResizePolicy();
            closeChildInfoPipe();
        }
    } else {
        //這部分咱們前面的文章介紹過
        //saveparams 保存了 save 全部的配置項,是一個數組
        //這裏校驗是否達到條件
         for (j = 0; j < server.saveparamslen; j++) {
            struct saveparam *sp = server.saveparams+j;
            if (server.dirty >= sp->changes &&
                server.unixtime-server.lastsave > sp->seconds &&
                (server.unixtime-server.lastbgsave_try >
                 CONFIG_BGSAVE_RETRY_DELAY ||
                 server.lastbgsave_status == C_OK))
            {
                serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
                    sp->changes, (int)sp->seconds);
                rdbSaveInfo rsi, *rsiptr;
                rsiptr = rdbPopulateSaveInfo(&rsi);
                rdbSaveBackground(server.rdb_filename,rsiptr);
                break;
            }
         }
    。。。。
}
複製代碼
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {

    。。。。
    if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);

    //每一秒檢查一次上一輪aof的寫入是否發生了錯誤,若是有錯誤則嘗試從新寫一次
    run_with_period(1000) {
        if (server.aof_last_write_status == C_ERR)
            flushAppendOnlyFile(0);
    }

    freeClientsInAsyncFreeQueue();

    clientsArePaused();

    run_with_period(1000) replicationCron();

    run_with_period(100) {
        if (server.cluster_enabled) clusterCron();
    }

    run_with_period(100) {
        if (server.sentinel_mode) sentinelTimer();
    }

    run_with_period(1000) {
        migrateCloseTimedoutSockets();
    }

    if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
        server.rdb_bgsave_scheduled &&
        (server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY ||
         server.lastbgsave_status == C_OK))
    {
        rdbSaveInfo rsi, *rsiptr;
        rsiptr = rdbPopulateSaveInfo(&rsi);
        if (rdbSaveBackground(server.rdb_filename,rsiptr) == C_OK)
            server.rdb_bgsave_scheduled = 0;
    }

    //增長 serverCron 執行次數
    server.cronloops++;
    return 1000/server.hz;
    。。。。
}
複製代碼

以上,咱們分析了 serverCron 的內部邏輯,雖說咱們配置上能夠指定它執行間隔,可是實際上取決於具體的執行時間,內部邏輯也很多,但願你能瞭解了個大概。

好了,這是咱們對於 redis 服務端程序的一點點了解,若是以爲我有說不對的地方或者你有更深的理解,也歡迎你加我微信一塊兒探討。

接下來,咱們的 redis 之旅從單擊開始步入多機模式,下一篇多機數據庫的理~


關注公衆不迷路,一個愛分享的程序員。 公衆號回覆「1024」加做者微信一塊兒探討學習! 每篇文章用到的全部案例代碼素材都會上傳我我的 github github.com/SingleYam/o… 歡迎來踩!

YangAM 公衆號
相關文章
相關標籤/搜索