Redis中單機數據庫的實現

1. 內存操做層 zmalloc 系接口

redis爲了優化內存操做, 封裝了一層內存操做接口. 默認狀況下, 其底層實現就是最簡樸的libc中的malloc系列接口. 若是有定製化需求, 能夠經過配置方式, 將底層內存操做的實現更換爲tcmallocjemalloc庫.c++

redis封裝的這一層接口, 其接口定義與默認實如今zmalloc.hzmalloc.c中. 其默認實現支持在O(1)複雜度的狀況下返回內存塊的大小. 具體實現上的思路也十分簡樸: 就是在內存塊頭部多分配一個long的空間, 將內存塊的大小記錄在此.程序員

zmalloc

zmalloc.c中能夠看到, 底層接口的具體實現可選的有tcmallocjemalloc兩種:redis

/* Explicitly override malloc/free etc when using tcmalloc. */
#if defined(USE_TCMALLOC)
#define malloc(size) tc_malloc(size)
#define calloc(count,size) tc_calloc(count,size)
#define realloc(ptr,size) tc_realloc(ptr,size)
#define free(ptr) tc_free(ptr)
#elif defined(USE_JEMALLOC)
#define malloc(size) je_malloc(size)
#define calloc(count,size) je_calloc(count,size)
#define realloc(ptr,size) je_realloc(ptr,size)
#define free(ptr) je_free(ptr)
#define mallocx(size,flags) je_mallocx(size,flags)
#define dallocx(ptr,flags) je_dallocx(ptr,flags)
#endif

內存分配接口的實現以下, 從代碼中能夠看出, 其頭部多分配了PREFIX_SIZE個字節用於存儲數據區的長度.數據庫

void *zmalloc(size_t size) {
    void *ptr = malloc(size+PREFIX_SIZE);

    if (!ptr) zmalloc_oom_handler(size);
#ifdef HAVE_MALLOC_SIZE
    update_zmalloc_stat_alloc(zmalloc_size(ptr));
    return ptr;
#else
    *((size_t*)ptr) = size;
    update_zmalloc_stat_alloc(size+PREFIX_SIZE);
    return (char*)ptr+PREFIX_SIZE;
#endif
}

2. 事件與IO多路複用

redis中的事件處理機制和內存分配同樣, 也是糊了一層接口. 其底層實現是可選的. 默認狀況下, 會自動選取當前OS平臺上速度最快的多路IO接口, 好比在Linux平臺上就是epoll, 在Sun/Solaris系列平臺上會選擇evport, 在BSD/Mac OS平臺上會選擇kqueue, 實再沒得選了, 會使用POSIX標準的select接口. 保證起碼能跑起來api

事件處理對各個不一樣底層實現的包裹, 分別在ae_epoll.c, ae_evport.c, ae_kqueue, ae_select.c中, 事件處理器的定義與實如今ae.hae.c中.數組

ae.h中, 定義了bash

  1. 事件處理回調函數的別名: ae***Proc函數指針類型別名
  2. 文件事件與定時事件兩種結構: aeFileEventaeTimeEvent
  3. 結構aeFiredEvent, 表明被觸發的事件
  4. 結構aeEventLoop, 一個事件處理器. 包含一個事件循環.

分別以下:服務器

#define AE_NONE 0       /* No events registered. */
#define AE_READABLE 1   /* Fire when descriptor is readable. */
#define AE_WRITABLE 2   /* Fire when descriptor is writable. */
#define AE_BARRIER 4    /* With WRITABLE, never fire the event if the
                           READABLE event already fired in the same event
                           loop iteration. Useful when you want to persist
                           things to disk before sending replies, and want
                           to do that in a group fashion. */

// .....

/* Types and data structures */
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);

/* File event structure */
typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
    aeFileProc *rfileProc;  // 可讀回調
    aeFileProc *wfileProc;  // 可寫回調
    void *clientData;       // 自定義數據
} aeFileEvent;

/* Time event structure */
typedef struct aeTimeEvent {
    long long id; /* time event identifier. */      // 定時事件的編號
    long when_sec; /* seconds */                    // 觸發時間(單位爲秒)
    long when_ms; /* milliseconds */                // 觸發時間(單位爲毫秒)
    aeTimeProc *timeProc;                           // 定時回調
    aeEventFinalizerProc *finalizerProc;            // 事件自殺析構回調
    void *clientData;                               // 自定義數據
    struct aeTimeEvent *prev;                       // 鏈表的前向指針與後向指針
    struct aeTimeEvent *next;
} aeTimeEvent;

/* A fired event */
typedef struct aeFiredEvent {   // 描述了一個被觸發的事件
    int fd;                     // 文件描述符
    int mask;                   // 觸發的類型
} aeFiredEvent;

/* State of an event based program */
typedef struct aeEventLoop {  // 描述了一個熱火朝天不停循環的事件處理器
    int maxfd;      // 當前監控的文件事件中, 文件描述符的最大值
    int setsize;    // 事件處理器的最大容量(能監聽的文件事件的個數)
    long long timeEventNextId;  // 下一個到期要執行的定時事件的編號
    time_t lastTime;    // 上一次有回調函數執行的時間戳, 用以防止服務器時間抖動或溢出
    aeFileEvent *events;    // 全部註冊的文件事件
    aeFiredEvent *fired;    // 全部被觸發的文件事件
    aeTimeEvent *timeEventHead; // 定時事件鏈表
    int stop;               // 急停按鈕
    void *apidata;          // 底層多路IO接口所需的額外數據
    aeBeforeSleepProc *beforesleep; // 一次循環結束, 全部事件處理結束後, 要執行的回調函數
    aeBeforeSleepProc *aftersleep;  // 一次循環開始, 在執行任何事件以前, 要執行的回調函數
} aeEventLoop;

對於文件事件來講, 除了可讀AE_READABLE與可寫AE_WRITABLE兩種監聽觸發方式外, 還有一種額外的AE_BARRIER觸發方式. 若監聽文件事件時, 將AE_BARRIERAE_WRITABLE組合時, 保證若當前文件若是正在處理可讀(被可讀觸發), 就再也不同時觸發可寫. 在一些特殊場景下這個特性是比較有用的.網絡

事件處理器aeEventLoop中, 對於文件事件的處理, 走的都是老套路. 這裏須要注意的是事件處理器中, 定時事件的設計:數據結構

  1. 全部定時事件, 像糖葫蘆同樣, 串成一個鏈表
  2. 定時事件不是嚴格定時的, 定時事件中不適宜執行耗時操做

事件處理器的核心接口有如下幾個:

// 建立一個事件處理器實例
aeEventLoop *aeCreateEventLoop(int setsize);
// 銷燬一個事件處理器實例
void aeDeleteEventLoop(aeEventLoop *eventLoop);
// 事件處理器急停
void aeStop(aeEventLoop *eventLoop);
// 建立一個文件事件, 並添加進事件處理器中
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData);
// 從事件處理器中刪除一個文件事件. 
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);
// 經過文件描述符, 獲取事件處理器中, 該文件事件註冊的觸發方式(可讀|可寫|BARRIER)
int aeGetFileEvents(aeEventLoop *eventLoop, int fd);
// 建立一個定時事件, 並掛在事件處理器的時間事件鏈表中. 返回的是建立好的定時事件的編號
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
        aeTimeProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc);
// 刪除事件處理器中的某個時間事件
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id);
// 調用多路IO接口, 處理全部被觸發的文件事件, 與全部到期的定時事件
int aeProcessEvents(aeEventLoop *eventLoop, int flags);
// tricky接口.內部調用poll接口, 等待milliseconds毫秒, 或直至指定fd上的指定事件被觸發
int aeWait(int fd, int mask, long long milliseconds);
// 事件處理器啓動器
void aeMain(aeEventLoop *eventLoop);
// 獲取底層路IO接口的名稱
char *aeGetApiName(void);

void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep);
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep);
int aeGetSetSize(aeEventLoop *eventLoop);
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);

核心代碼以下:

首先是啓動器, 啓動器負責三件事:

  1. 響應急停
  2. 執行beforesleep回調, 若是設置了該回調的值, 那麼在每次事件循環以前, 該函數會被執行
  3. 一遍一遍的輪大米, 調用aeProcessEvents, 即事件循環
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}

而後是事件循環

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    // 無事快速退朝
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        // 在 "事件處理器中至少有一個文件事件" 或 "以阻塞形式執行事件循環, 且處理定時事件"
        // 時, 進入該分支
		// 這裏有一點繞, 其實就是, 若是
		//	"事件處理器中沒有監放任何文件事件", 且, 
		//  "執行事件循環時指明不執行定時事件, 或雖然指明瞭執行定時事件, 可是要求以非阻塞形式運行事件循環"
		// 都不會進入該分支, 不進入該分支時, 分兩種狀況:
		//	1. 沒監放任何文件事件, 且不執行定時事件, 至關於直接返回了
		//	2. 沒監放任何文件事件, 執行定時事件, 但要求以非阻塞形式執行. 則當即執行全部到期的時間事件, 若是沒有, 則至關於直接返回


		// 第一步: 計算距離下一個到期的定時事件, 還有多長時間. 計該時間爲tvp
		// 即使本次事件循環沒有指明要處理定時事件, 也計算這個時間
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;

        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
            long now_sec, now_ms;

            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;

            /* How many milliseconds we need to wait for the next
             * time event to fire? */
            long long ms =
                (shortest->when_sec - now_sec)*1000 +
                shortest->when_ms - now_ms;

            if (ms > 0) {
                tvp->tv_sec = ms/1000;
                tvp->tv_usec = (ms % 1000)*1000;
            } else {
                tvp->tv_sec = 0;
                tvp->tv_usec = 0;
            }
        } else {
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
            if (flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                tvp = NULL; /* wait forever */
            }
        }

		// 第二步: 以tvp爲超時時間, 調用多路IO接口, 獲取被觸發的文件事件, 並處理文件事件
		// 若是tvp爲null, 即當前事件處理器中沒有定時事件, 則調用aeApiPoll的超時時間是無限的
        numevents = aeApiPoll(eventLoop, tvp);

        /* After sleep callback. */
        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
            eventLoop->aftersleep(eventLoop);

        for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int fired = 0; /* Number of events fired for current fd. */

             // 一般狀況下, 若是一個文件事件, 同時被可讀與可寫同時觸發
             // 都是先執行可讀回調, 再執行可寫回調
             // 但若是事件掩碼中帶了AE_BARRIER, 就會扭轉這個行爲, 先執行可寫回調, 再執行可讀加高
            int invert = fe->mask & AE_BARRIER;

            if (!invert && fe->mask & mask & AE_READABLE) {
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
            }

            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            if (invert && fe->mask & mask & AE_READABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            processed++;
        }
    }

	// 按需執行時間事件
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}

3. 網絡庫

anet.hanet.c中, 對*nix Tcp Socket相關接口進行了一層薄封裝, 基本上就是在原生接口的基礎上作了一些錯誤處理, 並整合了一些接口調用. 這部分代碼很簡單, 甚至很無聊.

4. 單機數據庫的啓動

在Redis進程中, 有一個全局變量 struct redisServer server, 這個變量描述的就是Redis服務端. (結構體定義位於server.h中, 全局變量的定義位於server.c中), 結構體的定義以下:

struct redisServer {
    // ...
    aeEventLoop * el;   // 事件處理器
    // ...
    redisDb * db;       // 數據庫數組.
    // ...
    int ipfd[CONFIG_BINDADDR_MAX];  // listening tcp socket fd
    int ipfd_count;
    int sofd;                       // listening unix socket fd
    // ...
    list * clients;                     // 全部活動客戶端
    list * clients_to_close;            // 待關閉的客戶端
    list * clients_pending_write;       // 待寫入的客戶端
    // ...
    client * current_clients;           // 當前客戶端, 僅在crash報告時才使用的客戶端
    int clients_paused;                 // 暫停標識
    // ...
    list * clients_waiting_acks;        // 執行WAIT命令的客戶端
    // ...
    uint64_t next_client_id;            // 下一個鏈接至服務端的客戶端的編號, 自增編號
}

這個結構體中的字段實再是太多了, 這裏僅列出一些與單機數據庫相關的字段

結構體中定義了一個指針字段, 指向struct redisDb數組類型. 這個結構的定義以下(也位於server.h中)

typedef struct redisDb {
    dict *dict;                 /* 數據庫的鍵空間 */
    dict *expires;              /* 有時效的鍵, 以及對應的過時時間 */
    dict *blocking_keys;        /* 隊列: 與阻塞操做有關*/
    dict *ready_keys;           /* 隊列: 與阻塞操做有關 */
    dict *watched_keys;         /* 與事務相關 */
    int id;                     /* 數據庫編號 */
    long long avg_ttl;          /* 統計值 */
} redisDb;

在介紹阻塞操做與事務以前, 咱們只須要關心三個字段:

  1. dict 數據庫中的全部k-v對都存儲在這裏, 這就是數據庫的鍵空間
  2. expires 數據庫中全部有時效的鍵都存儲在這裏
  3. id 數據庫的編號

從數據結構上也能看出來, 單機數據庫的啓動其實須要作如下的事情:

  1. 初始化全局變量server, 並初始化server.db數組, 數組中的每個元素就是一個數據庫
  2. 建立一個事件處理器, 用於監聽來自用戶的命令, 並進行處理

server.c文件中的main函數跟下去, 就能看到上面兩步, 這裏的代碼很繁雜, 下面只選取與單機數據庫啓動相關的代碼進行展現:

int main(int argc, char **argv) {
    // 初始化基礎設計
    // ...

    server.sentinel_mode = checkForSentinelMode(argc,argv);	// 從命令行參數中解析, 是否以sentinel模式啓動server

    initServerConfig();				// 初始化server配置
    moduleInitModulesSystem();		// 初始化Module system

    // ...

    if (server.sentinel_mode) {     // sentinel相關邏輯
        initSentinelConfig();
        initSentinel();
    }

    // 若是以 redis-check-rdb 或 redis-check-aof 模式啓動server, 則就是調用相應的 xxx_main()函數, 而後終止進程就好了
    if (strstr(argv[0],"redis-check-rdb") != NULL)
        redis_check_rdb_main(argc,argv,NULL);
    else if (strstr(argv[0],"redis-check-aof") != NULL)
        redis_check_aof_main(argc,argv);
    
    // 處理配置文件與命令行參數, 並輸出歡迎LOGO...
    // ...

    // 判斷運行模式, 若是是後臺運行, 則進入daemonize()中, 使進程守護化
    server.supervised = redisIsSupervised(server.supervised_mode);
    int background = server.daemonize && !server.supervised;
    if (background) daemonize();

    // 單機數據庫啓動的核心操做: 初始化server
    initServer();

    // 雜事: 建立pid文件, 設置進程名, 輸出ASCII LOGO, 檢查TCP backlog隊列等
    //...

    if(!server.sentinel_mode) {
        // 加載自定義Module
        // ...
    } else {
        sentinelIsRunning();
    }

    // 檢查內存狀態, 若有必要, 輸出警告日誌
    // ...

    // 向事件處理器註冊事前回調與過後回調, 並開啓事件循環
    // 至此, Redis服務端已經啓動
    aeSetBeforeSleepProc(server.el,beforeSleep);
    aeSetAfterSleepProc(server.el,afterSleep);
    aeMain(server.el)


    // 服務端關閉的善後工做
    aeDeleteEventLoop(server.el);
    return 0;
}

main函數中能大體瞭解啓動流程, 也基本能猜出來, 在最核心的initServer函數的調用中, 至少要作兩件事:

  1. 初始化事件處理器server.el
  2. 初始化服務端的數據庫server.db
  3. 初始化監聽socket, 並將監聽socket fd加入事件處理回調

以下:

void initSetver(void) {
    // 處理 SIG_XXX 信號
    // ...

    // 初始化server.xxx下的一部分字段, 值得關注的有:
    server.pid = getpid()
    server.current_client = NULL;
    server.clients = listCreate();
    server.clients_to_close = listCreate();
    //...
    server.clients_pending_write = listCreate();
    //...
    server.unblocked_clients = listCreate();
    server.ready_keys = listCreate();
    server.clients_waiting_acks = listCreate();
    //...
    server.clients_paused = 0
    //...

    // 建立全局共享對象
    createSharedObjects();

    // 嘗試根據最大支持的客戶端鏈接數, 調整最大打開文件數
    adjustOpenFilesLimit(void)

    // 建立事件處理器
    server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR)
    if (server.el == NULL) {
        // ...
        exit(1);
    }

    // 初始化數據庫數組
    server.db = zmalloc(sizeof(redisDb) * server.dbnum);

    // 監聽tcp端口, tcp 監聽文件描述符存儲在 server.ipfd 中
    if (server.port != 0 && listenToPort(server.port, server.ipfd, &server.ipfd_count) == C_ERR) exit(1);

    // 監聽unix端口
    if (server.unixsocket != NULL) {
        //...
    }

    // 若是沒有任何tcp監聽fd存在, abort
    if (server.ipfd_count == 0 && server.sofd < 0) {
        //...
        exit(1);
    }

    // 初始化數據庫數組中的各個數據庫
    for (j = 0; j < server.dbnum; j++) {
        server.db[j].dict = dictCreate(&dbDictType, NULL);              // 初始化鍵空間
        server.db[j].expires = dictCreate(&keyptrDictType, NULL);       // 初始化有時效的鍵字典. key == 鍵, value == 過時時間戳
        server.db[j].blocking_keys = dictCreate(&keylistDictType, NULL);// 初始化阻塞鍵字典.
        server.db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType, NULL); // 初始化解除阻塞鍵字典
        server.db[j].watched_keys = dictCreate(&keylistDictType, NULL); // 初始化與事務相關的一個字典
        server.db[j].id = j;                                            // 初始化數據庫的編號
        server.db[j].avg_ttl = 0;                                       // 初始化統計數據
    }

    // 其它一大堆細節操做
    // ...

    // 向事件處理器中加一個定時回調, 這個回調中處理了不少瑣事. 好比清理過時客戶端鏈接, 過時key等
    if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
        //...
        exit(1);
    }

    // 將tcp與unix監聽fd添加到事件處理器中
    // 回調函數分別爲: acceptTcpHandler和acceptUnixHandler
    for (j = 0; j < server.ipfd_count; j++) {
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler, NULL) == AE_ERR) {
            serverPanic("Unrecoverable error creating server.ipfd file event");
        }
    }
    if (server.sofd > 0 && aeCreateFileEvent(server.el, server.sofd, AE_READABLE, acceptUnixHandler, NULL) == AE_ERR) {
        serverPanic("Unrecoverable error creating server.sofd file event.");
    }

    // 註冊一個額外的事件回調: 用於喚醒一個被module操做阻塞的客戶端
    // ...

    // 若有須要, 打開AOF文件
    // ...

    // 對32位機器, 且沒有配置maxmemory時, 自動設置爲3GB
    // ...

    // 其它與單機數據庫無關的操做
    // ...
}

以上就是單機數據庫的啓動流程

總結: 0. Redis服務端進程中, 以server這個全局變量來描述服務端 0. server全局變量中, 持有着全部數據庫的指針(server.db), 一個事件處理器(server.el), 與全部與其鏈接的客戶端(server.clients), 監聽的tcp/unix socket fd(server.ipfd, server.sofd) 0. 在Redis服務端啓動過程當中, 會初始化監聽的tcp/unix socket fd, 並把它們加入到事件處理器中, 相應的事件回調分別爲acceptTcpHandleracceptUnixHandler

5. 客戶端與Redis服務端創建鏈接的過程

上面已經講了Redis單機數據庫服務端的啓動過程, 下面來看一看, 當一個客戶端經過tcp鏈接至服務端時, 服務端會作些什麼. 顯然要從listening tcp socket fd在server.el中的事件回調開始看起.

邏輯上來說, Redis服務端須要作如下幾件事:

  1. 創建鏈接, 接收請求
  2. 協議交互, 命令處理

這裏咱們先只看創建鏈接部分, 下面是acceptTcpHandler函數, 即listening tcp socket fd的可讀事件回調:

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    char cip[NET_IP_STR_LEN];
    UNUSED(el);
    UNUSED(mask);
    UNUSED(privdata);

    while(max--) {
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);	// 創建數據鏈接
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                serverLog(LL_WARNING,
                    "Accepting client connection: %s", server.neterr);
            return;
        }
        serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
        acceptCommonHandler(cfd,0,cip);										// 處理請求
    }
}

這個函數中只作了兩件事:

  1. 創建tcp數據鏈接
  2. 調用acceptCommonHandler

下面是acceptCommonHandler

#define MAX_ACCEPTS_PER_CALL 1000
static void acceptCommonHandler(int fd, int flags, char *ip) {
    client *c;

	// 建立一個client實例
    if ((c = createClient(fd)) == NULL) {
        // ...
        return;
    }
    // 若是設置了鏈接上限, 而且當前數據鏈接數已達上限, 則關閉這個數據鏈接, 退出, 什麼也不作
    if (listLength(server.clients) > server.maxclients) {
        //...
        return;
    }

    // 若是服務端運行在保護模式下, 而且沒有登陸密碼機制, 那麼不接受外部tcp請求
    if (server.protected_mode &&
        server.bindaddr_count == 0 &&
        server.requirepass == NULL &&
        !(flags & CLIENT_UNIX_SOCKET) &&
        ip != NULL)
    {
        if (strcmp(ip,"127.0.0.1") && strcmp(ip,"::1")) {
            // ...
            return;
        }
    }

    server.stat_numconnections++;
    c->flags |= flags;
}

這裏能夠看到, 對每個數據鏈接, Redis中將其抽象成了一個 struct client 結構, 全部建立struct client實例的細節被隱藏在createClient函數中, 下面是createClient的實現:

client *createClient(int fd) {
    client * c = zmalloc(sizeof(client))

    if (fd != -1) {
        anetNonBlock(NULL, fd);             // 設置非阻塞
        anetEnableTcpNoDelay(NULL, fd);     // 設置no delay
        if (server.tcpkeepalive) anetKeepAlive(NULL, fd, server.tcpkeepalive);      // 按需設置keep alive
        if (aeCreateFileEvent(server.el, fd, AE_READABLE, readQueryFromClient, c) == AE_ERR) {  // 註冊事件回調
            close(fd); zfree(c); return NULL;
        }
    }

    selectDb(c, 0)  // 默認選擇第一個數據庫

    uint64_t client_id;
    atomicGetIncr(server.next_client_id, client_id, 1); // 原子自增 server.next_client_id

    // 其它字段初始化
    c->id = client_id;
    //...

    // 把這個client實例添加到 server.clients尾巴上去
    if (fd != -1) listAddNodeTail(server.clients,c);

    return c;
}

6 Redis通訊協議

6.1 通訊方式

客戶端與服務器之間的通訊方式有兩種

  1. 一問一答式的(request-response pattern). 由客戶端發起請求, 服務端處理邏輯, 而後回發回應
  2. 若客戶端訂閱了服務端的某個channel, 這時通訊方式爲單向的傳輸: 服務端會主動向客戶端推送消息. 客戶端無需發起請求

在單機數據庫範疇中, 咱們先不討論Redis的發佈訂閱模式, 即在本文的討論範圍內, 客戶端與服務器的通訊始終是一問一答式的. 這種狀況下, 還分爲兩種狀況:

  1. 客戶端每次請求中, 僅包含一個命令. 服務端在迴應中, 也僅給出這一個命令的執行結果
  2. 客戶端每次請求中, 包含多個有序的命令. 服務端在迴應中, 按序給出這多個命令的執行結果

後者在官方文檔中被稱爲 pipelining

6.2 通訊協議

自上向下描述協議:

  1. 協議描述的是數據
  2. 數據有五種類型. 分別是短字符串, 長字符串, 錯誤信息, 數值, 數組. 其中數組是複合類型. 協議中, 數據與數據之間以\r\n兩個字節做爲定界符
  3. 短字符串: +Hello World\r\n+開頭, 末尾的\r\n是定界符, 因此顯然, 短字符串自己不支持表示\r\n, 會和定界符衝突
  4. 錯誤信息: -Error message\r\n-開頭, 末尾的\r\n是定界符. 顯然, 錯誤信息裏也不支持包含\r\n
  5. 數值: :9527\r\n:開頭, ASCII字符表示的數值, 僅支持表示整數
  6. 長字符串: $8\r\nfuck you\r\n 先以$開頭, 後面跟着字符串的總長度, 以ASCII字符表示. 再跟一個定界符, 接下來的8個字節就是長字符串的內容, 隨後再是一個定界符.
  7. 數組: 這是一個複合類型, 以*開頭, 後面跟着數組的容量, 以ASCII字符表示, 再跟一個定界符. 而後後面跟着多個其它數據. 好比 *2\r\n$3\r\nfoo\r\n+Hello World\r\n表示的是一個容量爲2的數組, 第一個數組元素是一個長字符串, 第二個數組元素是一個短字符串.

注意: 0. 通常狀況下, 協議體是一個數組 0. 長字符串在表示長度爲0的字符串時, 是$0\r\n\r\n 0. 還有一個特殊值, $-1\r\n, 它用來表示語義中的null

協議自己是支持二進制數據傳輸的, 只須要將二進制數據做爲長字符串傳輸就能夠. 因此這並不能算是一個嚴格意義上的字符協議. 但若是傳輸的數據多數是可閱讀的數據的話, 協議自己的可讀性是很強的

6.3 請求體

客戶端向服務端發送請求時, 語法格式基本以下:

6.3.1 單命令請求

單命令請求體, 就是一個數組, 數組中將命令與參數各做爲數組元素存儲, 以下是一個 LLEN mylist 的請求

*2\r\n$4\r\nLLEN\r\n$6\r\nmylist\r\n

這個數組容量爲2, 兩個元素均爲長字符串, 內容分別是LLENmylist

6.3.2 多命令請求(pipelining request)

多命令請求體, 直接把多個單命令請求體拼起來就好了, 每一個命令是一個數組, 即請求體中有多個數組

6.3.3 內聯命令

若是按協議向服務端發送請求, 能夠看到, 請求體的第一字節始終要爲*. 爲了方便一些弱客戶端與服務器交互, REDIS支持所謂的內聯命令. 內聯命令僅是請求體的另外一種表達方式.

內聯命令的使用場合是:

  1. 手頭沒有redis-cli
  2. 手寫請求體太麻煩
  3. 只想作一些簡單操做

這種狀況下, 直接把適用於redis-cli的字符命令, 發送給服務端就好了. 內聯命令支持經過telnet發送.

6.4 迴應體

Redis服務端接收到任何一個有效的命令時, 都會給服務端寫回應. 請求-迴應的最小單位是命令, 因此對於多命令請求, 服務端會把這多個請求的對應的迴應, 按順序返回給客戶端.

迴應體依然遵照通訊協議規約.

6.5 手擼協議示例

單命令請求

[root@localhost ~]# echo -e '*2\r\n$4\r\nINCR\r\n$7\r\ncounter\r\n' | nc localhost 6379
:2
[root@localhost ~]#

多命令請求

[root@localhost ~]# echo -e '*1\r\n$4\r\nPING\r\n*2\r\n$4\r\nINCR\r\n$7\r\ncounter\r\n*2\r\n$4\r\nINCR\r\n$7\r\ncounter\r\n*2\r\n$4\r\nINCR\r\n$7\r\ncounter\r\n' | nc localhost 6379
+PONG
:4
:5
:6
[root@localhost ~]#

內聯命令請求

[root@localhost ~]# echo -e 'PING' | nc localhost 6379
+PONG
[root@localhost ~]# echo -e 'INCR counter' | nc localhost 6379
:7
[root@localhost ~]# echo -e 'INCR counter' | nc localhost 6379
:8
[root@localhost ~]#

經過telnet發送內聯命令請求

[root@localhost ~]# telnet localhost 6379
Trying ::1...
Connected to localhost.
Escape character is '^]'.



PING
+PONG


INCR counter
:9

INCR counter
:10

INCR counter
:11

^]
telnet> quit
Connection closed.
[root@localhost ~]#

6.6 協議總結及注意事項

優勢:

  1. 可讀性很高, 也能夠容納二進制數據(長字符串類型)
  2. 協議簡單, 解析容易, 解析性能也很高. 得益於各類數據類型頭字節的設計, 以及長字符串類型自帶長度字段, 因此解析起來基本飛快, 就算是二流程序員寫出來的協議解析器, 跑的也飛快
  3. 支持多命令請求, 特定場合下充分發揮這個特性, 能夠提升命令的吞吐量
  4. 支持內聯命令請求. 這種請求雖然限制頗多(不支持二進制數據類型, 不支持\r\n這種數據, 不支持多命令請求), 但對於運維人員來講, 可讀性更高, 對閱讀更友好

注意事項:

  1. 非內聯命令請求, 請求體中的每一個數據都是長字符串類型. 即使是在表達數值, 也須要寫成長字符串形式(數值的ASCII表示). 這給服務端解析協議, 以及區分請求命令是否爲內聯命令帶來了很大的便利
  2. 短字符串, 數值, 錯誤信息這三種數據類型, 僅出如今迴應體中

7. 服務端處理請求及寫回應的過程

當客戶端與服務端創建鏈接後, 雙方就要進行協議交互. 簡單來講就是如下幾步:

  1. 客戶端發送請求
  2. 服務端解析請求, 進行邏輯處理
  3. 服務端寫回包

顯然, 從三層(tcp或unix)上接收到的數據, 首先要解析成協議數據, 再進一步解析成命令, 服務端才能進行處理. 三層上的協議是流式協議, 一個請求體可能要分屢次才能完整接收, 這必然要涉及到一個接收緩衝機制. 先來看一看struct client結構中與接收緩衝機制相關的一些字段:

type struct client {
    uint64_t id;            /* Client incremental unique ID. */							// 客戶端ID, 自增, 惟一
    int fd;                 /* Client socket. */										// 客戶端數據鏈接的底層文件描述符
    redisDb *db;            /* Pointer to currently SELECTed DB. */						// 指向客戶端當前選定的DB
    robj *name;             /* As set by CLIENT SETNAME. */								// 客戶端的名稱. 由命令 CLIENT SETNAME 設定
    sds querybuf;           /* Buffer we use to accumulate client queries. */			// 請求體接收緩衝區
    sds pending_querybuf;   /* If this is a master, this buffer represents the			// cluster特性相關的一個緩衝區, 暫忽視
                               yet not applied replication stream that we
                               are receiving from the master. */
    size_t querybuf_peak;   /* Recent (100ms or more) peak of querybuf size. */			// 近期(100ms或者更長時間中)接收緩衝區大小的峯值
    int argc;               /* Num of arguments of current command. */					// 當前處理的命令的參數個數
    robj **argv;            /* Arguments of current command. */							// 當前處理的命令的參數列表
    struct redisCommand *cmd, *lastcmd;  /* Last command executed. */					// 當前正在處理的命令字, 以及上一次最後執行的命令字
    int reqtype;            /* Request protocol type: PROTO_REQ_* */					// 區分客戶端是否之內聯形式上載請求
    int multibulklen;       /* Number of multi bulk arguments left to read. */			// 命令字剩餘要讀取的參數個數
    long bulklen;           /* Length of bulk argument in multi bulk request. */		// 當前讀取到的參數的長度
} client;

7.1 經過事件處理器, 在傳輸層獲取客戶端發送的數據

咱們先從宏觀上看一下, 服務端是如何解析請求, 並進行邏輯處理的. 先從服務端數據鏈接的處理回調readQueryFromClient看起:

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    client *c = (client*) privdata;
    int nread, readlen;
    size_t qblen;
    UNUSED(el);
    UNUSED(mask);

    readlen = PROTO_IOBUF_LEN;
    // 對於大參數的優化
    if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
        && c->bulklen >= PROTO_MBULK_BIG_ARG)
    {
        // 若是讀取到了一個超大參數, 那麼先把這個超大參數讀進來
        // 保證緩衝區中超大參數以後沒有其它數據
        ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);
        if (remaining < readlen) readlen = remaining;
    }

    // 接收數據
    qblen = sdslen(c->querybuf);
    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
    nread = read(fd, c->querybuf+qblen, readlen);
    // ...

    sdsIncrLen(c->querybuf,nread);
    c->lastinteraction = server.unixtime;
    if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
    server.stat_net_input_bytes += nread;
    if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
        // client的緩衝區中數據若是超長了, 最有可能的緣由就是服務端處理能力不夠, 數據堆積了
        // 或者是受惡意攻擊了
        // 這種狀況下, 幹掉這個客戶端
        // ...
        return;
    }

    // 處理緩衝區的二進制數據
    if (!(c->flags & CLIENT_MASTER)) {
        processInputBuffer(c);
    } else {
        // cluster特性相關的操做, 暫時忽略
        // ...
    }
}

這裏有一個優化寫法, 是針對特大參數的讀取的. 在介紹這個優化方法以前, 先大體的看一下做爲單機數據庫, 服務端請求客戶端請求的概覽流程(你可能須要在閱讀完如下幾節以後再回頭來看這裏):

通常狀況下, 請求體的長度不會超過16kb(即便包括多個命令), 那麼處理流程(假設請求是協議數據, 而非內聯命令)是這樣的:

handle_request_1

而正常狀況下, 一個請求體的長度不該該超過16kb, 而若是請求體的長度超過16kb的話, 有下面兩種可能:

  1. 請求中的命令數量太多, 好比平均一個命令及其參數, 佔用100字節, 而這個請求體中包含了200個命令
  2. 請求中命令數量很少, 可是有一個命令攜帶了超大參數, 好比, 只是一個SET單命令請求, 但其值參數是一個長度爲40kb的二進制數據

附帶多個命令, 致使請求體超過16kb時

在第一種狀況下, 若是知足如下條件的話, 這個請求會被服務端看成兩個請求去處理:

初次數據鏈接的可讀回調接收了16kb的數據, 這16kb的數據剛好是多個請求. 沒有請求被截斷! 便是一個20kb的請求體, 被自然的拆分紅了兩個請求, 第一個請求剛好16kb, 一字節很少, 一字節很多, 第二個請求是剩餘的4kb. 那麼服務端看來, 這就是兩個多命令請求

這種狀況極其罕見, Redis中沒有對這種狀況作處理.

通常狀況下, 若是發送一個20kb的多命令請求, 且請求中沒有超大參數的話(意味着命令特別多, 上百個), 那麼總會有一個命令被截斷, 若是被階段, 第一次的處理流程就會在處理最後一個被截斷的半截命令時, 在如圖紅線處流程終止, 而僅在第二次處理流程, 即處理剩餘的4kb數據時, 纔會走到藍線處. 即兩次收包, 一次寫回包:

handle_request_2

附帶超大參數, 致使請求體超過16kb

在第二種狀況下, 因爲某幾個參數特別巨大(大於PROTO_MBULK_BIG_ARG宏的值, 即32kb), 致使請求體一次不能接收完畢. 勢必也要進行屢次接收. 原本, 樸素的思想上, 只須要在遇到超大參數的時候, 調整16kb這個閾值便可. 好比一個參數爲40kb的二進制數據, 那麼在接收到這個參數時, 破例調用read時一次性把這個參數讀取完整便可. 除此外不須要什麼特殊處理.

便是, 處理流程應當以下:

handle_request_3

但這樣會有一個問題: 當在processMultiBulkBuffer中, 將超大參數以string對象的形式放在client->argv中時, 建立這個string對象, 底層是拷貝操做:

int processMultibulkBuffer(client * c) {
    // ....
            c->argv[c->argc++] = createStringObject(c->querybuf+pos, c->bulklen);
    // ....
}

對於超大參數來講, 這種string對象的建立過程, 涉及到的內存分配與數據拷貝開銷是很昂貴的. 爲了不這種數據拷貝, Redis把對於超大參數的處理優化成了以下:

handle_request_4

7.2 對二進制數據進行協議解析

從傳輸層獲取到數據以後, 下一步就是傳遞給processInputBuffer函數, 來進行協議解析, 以下:

void processInputBuffer(client * c) {
    server.current_client = c;
    // 每次循環, 解析並執行出一個命令字(及其參數), 並執行相應的命令
    // 直到解析失敗, 或緩衝區數據耗盡爲止
    while(sdslen(c->querybuf)) {
        // 在某些場合下, 不作處理
        if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;
        if (c->flags & CLIENT_BLOCKED) break;
        if (c->flag & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;

        // 若是client的reqtype字段中沒有標明該客戶端傳遞給服務端的請求類型(是內聯命令, 仍是協議數據)
        // 那麼分析緩衝區中的第一個字節來決定. 這通常發生在客戶端向服務端發送第一個請求的時候
        if (!c->reqtype) {
            if (c->querybuf[0] == '*') c->reqtype = PROTO_REQ_MULTIBULK;
            else c->reqtype = PROTO_REQ_INLINE;
        }

        // 按類型解析出一個命令, 及其參數
        if (c->reqtype == PROTO_REQ_INLINE) {
            if (processInlineBuffer(c) != C_OK) break;
        } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
            if (processMultibulkBuffer(c) != C_OK) break;
        } else {
            serverPanic("Unknow request type")
        }

        // 執行命令邏輯
        if (c->argc == 0) {
            // 當argc的值爲0時, 表明本次解析沒有解析出任何東西, 數據已耗盡
            resetClient(c);
        } else {
            // 執行邏輯
            if(processCommand(c) == C_OK) {
                if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
                    c->reploff = c->read_reploff - sdslen(c->querybuf);
                }

                if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE) {
                    resetClient(c);
                }
            }
            if (server.current_client == NULL) break;
        }
    }

    server.current_client = NULL;
}

這一步作了分流處理, 若是是內聯命令, 則經過processInlineBuffer進行協議解析, 若是不是內聯命令, 則經過processMultibulkBuffer進行協議解析. 協議解析完成後, 客戶端上載的數據就被轉換成了command, 而後接下來調用processCommand來執行服務端邏輯.

咱們略過內聯命令的解析, 直接看非內聯命令請求的協議解析過程

int processMultibulkBuffer(client * c) {
    //...

    // 解析一個請求的頭部時, 即剛開始解析一個請求時, multibulklen表明着當前命令的命令+參數個數, 該字段初始值爲0
    // 如下條件分支將讀取一個命令的參數個數, 好比 '*1\r\n$4\r\nPING\r\n'
    // 如下分支完成後, 將有:
    //      c->multibulklen = 1
    //      c->argv = zmalloc(sizeof(robj*))
    //      pos = <指向$>
    if (c->multibulklen == 0) {
        // ...

        // 讀取當前命令的參數個數, 即賦值c->multibulklen
        newline = strchr(c->querybuf, '\r');
        if (newline == NULL) {
            // 請求體不知足協議格式
            // ...
            return C_ERR:
        }

        // ...

        
        ok = string2ll(c->querybuf+1, newline-(c->querybuf+1), &ll)
        if (!ok || ll > 1024 * 1024) {
            // 解析數值出錯, 或數值大於1MB
            return C_ERR;
        }
        // 如今pos指向的是命令字
        pos = (newline-c->querybuf) + 2
        if (ll <= 0) {
            // 解出來一個空命令字, 什麼也不作, 把緩衝區頭部移除掉, 而且返回成功
            sdsrange(c->querybuf, pos, -1);
            return C_OK;
        }

        c->multibulklen = ll;

        // 爲命令字及其參數準備存儲空間: 將它們都存儲在c->argv中, 且是以redisObject的形式存儲着
        if (c->argv) zfree(c->argv);
        c->argv = zmalloc(sizeof(robj*)*c->multibulklen);
    }

    // ...

    // 讀取解析命令字自己與其全部參數
    while(c->multibulklen) {
        // 讀取當前長字符串的長度
        if (c->bulklen == -1) {
            // ...

            // 若是當前指針指向的參數, 是一個超大參數, 則把接收緩衝區修剪爲該參數
            pos += newline-(c->querybuf+pos) + 2; // 如今pos指向的是參數, 或命令字的起始位置
            if (ll >= PROTO_MBULK_BIG_ARG) {
                size_t qblen;
                sdsrange(c->querybuf, pos, -1); // 將接收緩衝區修剪爲該參數
                pos = 0;
                qblen = sdslen(c->querybuf);
                if (qblen < (size_t)ll + 2) {
                    // 若是這個超大參數還未接收完畢, 則擴充接收緩衝區, 爲其預留空間
                    c->querybuf = sdsMakeRoomFor(c->querybuf, ll+2-qblen);
                }
            }

            c->bulklen == ll;
        }

        // 讀取長字符串內容, 便是命令字自己, 或某個參數
        if (sdslen(c->querybuf) - pos < (size_t)(c->bulklen+2)) {
            // 當前緩衝區中, 該長字符串的內容還沒有接收完畢, 跳出解析
            break;
        } else {
            if (pos == 0 && c->bulklen >= PROTO_MBULK_BIG_ARG && sdslen(c->querybuf) == (size_t)(c->bulklen+2)) {
                // 對於超大參數, 爲了不數據複製的開銷, 直接以接收緩衝區爲底料, 建立一個字符串對象
                // 而對於接收緩衝區, 爲其從新分配內存
                // 注意, 若是當前指針指向的是一個超大參數, 接收機制保證了, 這個超大參數以後, 緩衝區沒有其它數據
                c->argv[c->argc++] = createObject(OBJ_STRING, c->querybuf)
                sdsIncrLen(c->querybuf, -2);    // 移除掉超大參數末尾的\r\n
                c->querybuf = sdsnewlen(NULL, c->bulklen + 2);  // 爲接收緩衝區從新分配內存
                sdsclear(c->querybuf);
                pos = 0;
            } else {
                // 對於普通參數或命令字, 以拷貝形式新建字符串對象
                c->argv[c->argc++] = createStringObject(c->querybuf+pos, c->bulklen);
                pos += c->bulklen+2; // pos指向下一個參數
            }

            c->bulklen = -1;
            c->multibulklen--;
        }
    }

    // 至此, 一個命令, 及其全部參數, 均被存儲在 c->argv中, 以redisObject形式存儲着, 對象的類型是string
    // 扔掉全部已經解析過的二進制數據
    if (pos) sdsrange(c->querybuf, pos, -1);

    if (c->multibulklen == 0) return C_OK

    return C_ERR;
}

7.3 命令的執行

一個命令及其全部參數若是被成功解析, 則processMultibulkBuffer函數會返回C_OK, 並在隨後調用processCommand執行這個命令. processCommand內部包含了將命令的迴應寫入c->buf的動做(封裝在函數addReply中. 若是命令或命令中的參數在解析過程出錯, 或參數還沒有接收完畢, 則會返回C_ERR, 退棧, 從新返回, 直至再次從傳輸層讀取數據, 補齊全部參數, 再次回到這裏.

接下來咱們來看執行命令並寫回包的過程, processCommand中的關鍵代碼以下:

int processCommand(client * c) {
    if (!strcasecmp(c->argv[0]->ptr, "quit")) {
        // 對 quit 命令作單獨處理, 調用addReply後, 返回
        addReply(c, shared.ok);
        c->flags |= CLIENT_CLOSE_AFTER_REPLY;
        return C_ERR;
    }

    // 拿出c->argv[0]中存儲的string對象中的ptr字段(實際上是個sds字符串)
    // 與server.commands中的命令字表進行比對, 查找出對應的命令, 即 struct redisCommand* 句柄
    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
    if (!c->cmd) {
        // 查無該命令, 進行錯誤處理
        flagTransaction(c);     // 若是這是一個在事務中的命令, 則將 CLIENT_DIRTY_EXEC 標誌位貼到 c-flags 中, 指示着事務出錯
        addReplyErrorFormat(c, "unknown command '%s'", (char *)c->argv[0]->ptr)
        return C_OK;
    } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) || (c->argc < -c->cmd->arity)) {
        // 對比命令所須要的參數數量, 是否和argc中的數量一致, 若是不一致, 說明錯誤
        flagTransaction(c);     // 依然同上, 事務出錯, 則打 CLIENT_DIRTY_EXEC 標誌位
        addReplyErrorFormat(c, "wrong number of arguments for '%s' command", c->cmd->name);
        return C_OK;
    }

    // 檢查用戶的登陸態, 即在服務端要求登陸認證時, 若當前用戶還未認證, 且當前命令不是認證命令的話, 則報錯
    if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand) {
        flagTransaction(c);
        addReply(c, shared.noautherr);
        return C_OK;
    }

    // 與集羣相關的代碼
    // 若是集羣模式開啓, 則一般狀況下須要將命令重定向至其它實例, 但在如下兩種狀況下不須要重定向:
    //  1. 命令的發送者是master
    //  2. 命令沒有鍵相關的參數
    if (server.cluster_enabled && 
        !(c->flags & CLIENT_MASTER) && 
        !(c->flags & CLIENT_LUA && server.lua_caller->flags & CLIENT_MASTER) && 
        !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 && c->cmd->proc != execCommand)) {
        
        int hashslot, error_code;
        clusterNode * n = getNodeByQuery(c, c->cmd, c->argv, c->argc, &hashslot, &error_code);

        if (n == NULL || n != server.cluster->myseld) {
            if (c->cmd->proc == execCommand) {
                discardTransaction(c);
            } else {
                flagTransaction(c);
            }

            clusterRedirectClient(c, n, hashslot, error_code); // 重定向
            return C_OK;
        }
    }

    // 在正式執行命令以前, 首先先嚐試的放個屁, 擠點內存出來
    if (server.maxmemory) {
        int retval = freeMemoryIfNeeded();

        if (server.current_client == NULL) return C_ERR;
        if ((c->cmd->flags & CMD_DENYOOM) && retval == C_ERR ) {
            flagTransaction(c);
            addReply(c, shared.oomerr);
            return C_OK;
        }
    }

    // 在持久化出錯的狀況下, master不接受寫操做
    if  (    
            (
                (server.stop_writes_on_bgsave_err && server.saveparamslen > 0 && server.lastbgsave_status == C_ERR) || 
                server.aof_last_write_status == C_ERR
            ) && 
            server.masterhost == NULL &&
            (c->cmdd->flags & CMD_WRITE || c->cmd->proc == pingCommand)
        ) {

        flagTransaction(c);
        if (server.aof_last_write_status == C_OK) {
            addReply(c, shared.bgsaveerr);
        } else {
            addReplySds(c, sdscatprintf(sdsempty(), "-MISCONF Errors writing to the AOF file: %s\r\n", strerror(server.aof_last_write_errno)));
        }
        return C_OK;
    }

    // 還有幾種不接受寫操做的場景, 這裏代碼省略掉, 這幾種場景包括:
    //      1. 用戶配置了 min_slaves-to_write配置項, 而符合要求的slave數量不達標
    //      2. 當前是一個只讀slave
    // ...

    // 在發佈-訂閱模式中, 僅支持 (P)SUBSCRIBE/(P)UNSUBSCRIBE/PING/QUIT 命令

    if (c->flags & CLIENT_PUBSUB &&
        c->cmd->proc != pingCommand &&
        c->cmd->proc != subscribeCommand &&
        c->cmd->proc != unsubscribeCommand &&
        c->cmd->proc != psubscribeCommand &&
        c->cmd->proc != punsubscribeCommand
    ) {
        addReplyError(c, "only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context");
        return C_OK;
    }

    // 當用戶配置slave-serve-stale-data爲no, 且當前是一個slave, 且與master中斷鏈接的狀況下, 僅支持 INFO 和 SLAVEOF 命令
    if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED && server.repl_server_stale_data == 0 && !(c->cmd->flags & CMD_STALE)) {
        flagTransaction(c);
        addReply(c, shared.masterdownerr);
        return C_OK;
    }

    // 若是當前正在進行loading操做, 則僅接受 LOADING 相關命令
    if (server.loading && !(c->cmd->flags & CMD_LOADING)) {
        addReply(c, shared.loadingerr);
        return C_OK;
    }

    // 若是是lua腳本操做, 則支持受限數量的命令
    // ...

    // 終於到了終點: 執行命令
    // 對於事務中的命令: 進隊列, 但不執行
    // 對於事務結束標記EXEC: 調用call, 執行隊列中的全部命令
    // 對於非事務命令: 也是調用call, 就地執行
    if (
        c->flags & CLIENT_MULTI && 
        c->cmd->proc != execCommand && 
        c->cmd->proc != discardCommand && 
        c->cmd->proc != multiCommand && 
        c->cmd->proc != watchCommand) {
        // 若是是事務, 則將命令先排進隊列
        queueMultiCommand(c);
        addReply(c, shared.queued);
    } else {
        // 若是是非事務執行, 則直接執行
        call (c, CMD_CALL_FULL);
        c->woff = server.master_repl_offset;
        if (listLength(server.ready_keys)) {
            handleClientsBlockedOnLists();
        }
    }

    return C_OK;
}

須要注意的點有:

  1. 服務端全局變量server中, 持有着一個命令表, 它自己是一個dict結構, 其key是爲字符串, 即Redis命令的字符串, value則是爲redisCommand結構, 描述了一個Redis中的命令. 能夠看到processCommand一開始作的第一件事, 就是用c->argv[0]->ptr去查詢對應的redisCommand句柄
  2. Redis中的事務是以一個MULTI命令標記開始, 由EXEC命令標記結束的. 若是一個命令處於事務之中, 那麼processCommand內部並不會真正執行這個命令, 僅當最終EXEC命令來臨時, 將事務中的全部命令所有執行掉. 而實際執行命令的函數, 是爲call, 這是Redis命令執行流程中最核心的一個函數.
  3. 客戶端能夠以pipelining的形式, 經過一次請求發送多個命令, 但這不是事務, 若是沒有將命令包裹在MULTIEXEC中, 那麼這多個命令實際上是一個個就地執行的.

有關事務的更詳細細節咱們會在稍後討論, 但目前, 咱們先來看一看這個最核心的call函數, 如下代碼隱藏了無關細節

void call(client * c, int flags) {
    // ...

    // 與主從複製相關的代碼: 將命令分發給全部MONITOR模式下的從實例
    // ...

    // 調用命令處理函數
    // ...
    c->cmd->proc(c);
    // ...


    // AOF, 主從複製相關代碼
    // ...
}

撥開其它特性無論, 單機數據庫在這一步其實很簡單, 就是調用了對應命令字的回調函數來處理

咱們挑其中一個命令SET來看一下, 命令處理回調中都幹了些什麼:

void setCommand(client * c) {
    int j;
    robj * expire = NULL;
    int uint = UINT_SECONDS;
    int flags = OBJ_SET_NO_FLAGS;

    for (j = 3; i < c->argc; j++) {
        // 處理SET命令中的額外參數
        // ...
    }

    c->argv[2] = tryObjectEncoding(c->argv[2]);
    setGenericCommand(c, flags, c->argv[1], c->argv[2], expire, unit, NULL, NULL)
}

void setGenericCommand(client * c, int flags, robj * key, robj, * val, robj * expire, int unit, robj * ok_reply, robj * abort_reply) {
    long long milliseconds = 0;

    if (expire) {
        // 從string對象中讀取數值化的過時時間
        if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != C_OK) return;
        
        if (milliseconds <= 0){
            addReplyErrorFormat(c, "invalid expire time in %s", c->cmd->name);
            return;
        }

        if (unit == UNIT_SECONDS) milliseconds *= 1000;
    }

    if (
        (flags & OBJ_SET_NX && lookupKeyWrite(c->db, key) != NULL) ||
        (flags & OBJ_SET_XX && lookupKeyWrite(c->db, key) == NULL)
    ) {
        // NX 與 XX 時, 判斷鍵是否存在
        addReply(c, abort_reply ? abort_reply : shared.nullbulk);
        return;
    }

    // 執行所謂的set操做, 即把鍵值對插入至數據庫鍵空間去, 若是鍵已存在, 則替換值
    setKey(c->db, key, val);

    server.dirty++; // 統計數據更新

    // 設置過時時間
    if(expire) setExpire(c, c->db, key, kstime() + milliseconds);
    notifyKeyspaceEvent(NOTIFY_STRING, "set", key, c->db->id);
    if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC, "expire", key, c->db->id);
    addReply(c, ok_reply ? ok_reply : shared.ok);
}

void setKey(redisDb * db, robj * key, robj * val) {
    if (lookupKeyWrite(db, key) == NULL) {
        dbAdd(db, key, val);    // 對 dictAdd 的封裝
    } else {
        dbOverwrite(db, key, val); // 對 dictReplace的封裝
    }

    incrRefCount(val);
    removeExpire(db, key);
    signalModifiedKey(db, key);
}

setCommand->setGenericCommand->setKey->dictXXX這樣一路追下來, 能夠看到最終的操做, 是對於某個數據庫實例中, 鍵空間的操做. 邏輯比較清晰. 其它的命令也基本相似, 鑑於Redis中的命令衆多, 這裏沒有篇幅去一個一個的介紹, 也沒有必要.

至此, 用戶的命令經由服務端接收, 解析, 以及執行在了某個數據庫實例上. 咱們在一路跟代碼的過程當中, 隨處可見的錯誤處理中都有addReplyXXX系列的函數調用, 在SET命令的最終執行中, 在setGenericCommand中, setKey操做成功後, 也會調用addReply. 顯然, 這個函數是用於將服務端的迴應數據寫入到某個地方去的.

從邏輯上來說, 服務端應當將每一個命令的迴應寫入到一個緩衝區中, 而後在適宜的時刻, 序列化爲迴應體(符合協議規約的二進制數據), 經由網絡IO回發給客戶端. 接下來, 咱們就從addReply入手, 看一看回包的流程.

7.4 寫回包流程

addReply系列函數衆多, 有十幾個, 其它函數諸如addReplyBulk, addReplyError, addReplyErrorFormat等, 只是一些簡單的變體. 這一系列函數完成的功能都是相似的: 總結起來就是兩點:

  1. 將回包信息寫入回包緩衝區

addReply以下:

void addReply(client * c, robj * obj) {
    // 檢查客戶端是否能夠寫入回包數據, 這個函數在每一個 addReplyXXX 函數中都會被首先調用
    // 它的職責有:
    //    0. 多數狀況下, 對於正常的客戶端, 它會返回 C_OK. 而且若是該客戶端的可寫事件沒有在事件處理器中註冊回調的話, 它會將這個客戶端先掛在 server.clients_pending_write 這個鏈表上
    //    0. 在特殊狀況下, 對於不能接收回包的客戶端(好比這是一個假客戶端, 沒有數據鏈接sockfd, 這種客戶端在AOF特性中有有到), 返回 C_ERR
    if (prepareClientToWrite(c) != C_OK) return;

    if (sdsEncodedObject(obj)) {
        // 若是迴應數據, 是RAW或EMBSTR編碼的string對象
        if (_addReplyToBuffer(c, obj->ptr, sdslen(obj->ptr)) != C_OK) {
            // 先嚐試調用 _addReplyToBuffer, 將回應數據寫進 client->buf 緩衝區中
            // 若是失敗了, 則把迴應數據掛到 client->reply 這個鏈表上去
            _addReplyObjectToList(c, obj);
        }
    } else if (obj->encoding == OBJ_ENCODING_INT) 
        // 若是迴應數據是INT編碼的string對象, 這裏有優化操做
        if (listLength(c->reply) == 0 && (sizeof(c->buf) - c->bufpos) >= 32) {
            char buf[32];
            int len;

            // 因爲INT編碼的string對象內部是使用 redisObject->ptr 指針直接存儲的數值
            // 因此直接讀這個指針的值, 將其轉換爲字符表示, 嘗試存進 client->buf 緩衝區中
            len = ll2string(buf, sizeof(buf), (long)obj->ptr);

            if (_addReplyToBuffer(c, buf, len) == C_OK) return;
        }

        // 若是存進 client->buf 失敗, 就把對應的數值轉換爲 EMBSTR 或RAW 編碼的string對象
        // 掛在 client->reply 鏈表上
        obj = getDecodedObject(obj);
        if (_addReplyToBuffer(c, obj->ptr, sdslen(obj->ptr)) != C_OK)
            _addReplyObjectToList(c, obj);
        
        decrRefCount(obj);
    } else {
        serverPanic("Wrong obj->encoding in addReply()")
    }
}

這裏會看到, 迴應緩衝區有兩個:

  1. client->buf是一個二進制的迴應緩衝區, 但它的長度是有限的. 默認長度也是16KB, 受宏PROTO_REPLY_CHUNK_BYTES的值限定
  2. client->reply是一個對象緩衝區, 它是一個鏈表, 無容量限制, 該鏈表中掛的應該都是以 RAW 或 EMBSTR 編碼的 string 對象.

咱們在addReply中會看到, 老是試圖先把迴應數據先經過_addReplyToBuffer添加到client->buf中去, 若是不成功的話, 再把數據經過_addReplyObjectToList掛在client->reply這個鏈表中去.

這個也很好理解, 當client->buf寫滿的時候, 再寫不下下一個addReply要添加的數據時, 就會退而求其次, 將這個迴應數據先掛在client->reply中去. client->buf就是每次調用網絡IO時, 一次性寫入的數據量. 若是超過這個量了, 會分屢次寫回應.

如今問題來了: addReplyXXX系列函數只是將回應數據寫向二進制緩衝區或鏈表隊列上, 並把客戶端添加到server->clients_pending_write列表中去. 那麼, 究竟是什麼時機, 將數據回寫給客戶端呢?

這個實現就很是苟了!!

按常理思惟, 應該在整個 收包->解析->執行命令->寫回包至緩衝區或隊列這個流程中, 在整個請求體被處理結束後, 給客戶端的數據鏈接的可寫事件綁定一個事件回調, 用於寫回包. 但Redis的實現, 並非這樣.

還記得, 在服務端啓動流程中, 開啓服務端事件循環的時候, 有這麼一段代碼:

int main(int argc, char ** argv) {
    //...

    aeSetBeforeSleepProc(server.el, beforeSleep);   // 就是這裏
    aeSetAfterSleepProc(server.el, afterSleep);
    aeMain(server.el);

    aeDeleteEventLoop(server.el);

    return 0;
}

服務端的事件處理器, 有一個事前回調, 也有一個過後回調, 而處理向客戶端寫回包的代碼, 就在事前回調中. 來看這個beforeSleep函數:

void beforeSleep(struct aeEventLoop * eventLoop) {
    // ...

    handleClientsWithPendingWrites();   // 就是在這裏, 處理全部客戶端的寫回包

    // ...
}

這是什麼操做? 這意味着服務端的事件處理器, 每次處理一個事件以前, 都要檢查一遍是否還有迴應須要給客戶端寫. 而爲何Redis在寫回包的時候, 不使用傳統的註冊可寫事件->在可寫事件回調中寫回包->寫成功後註銷可寫事件這種經典套路, Redis在handleClientsWithPendingWrites函數實現的註釋中, 給出了理由: 大意就是, 不使用事件註冊這種套路, 就避免了事件處理機制中會產生的系統調用開銷.

咱們從handleClientsWithPendingWrites函數開始, 簡單的捋一下寫回包是怎麼實現的

int handleClientsWithPendingWrites(void) {
    // ...

    while((ln = listNext(&li))) {
        client * c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_WRITE;
        listDelNode(server.clients_pending_write, ln);

        // 將client->buf及client->reply中的數據寫至客戶端
        if (writeToClient(c->fd, c, 0) == C_ERR) continue;

        // 若是client->reply中還有數據, 則藉助事件處理器來寫後續回包
        // 正常狀況下, 在上面的writeToClient中, 該寫的數據已經全寫到客戶端了
        // 僅有在網絡異常的狀況下, writeToClient處理寫回包失敗, 致使數據沒寫完時, 纔會進入這個分支
        // 這個邏輯是正確的: 網絡有異常的狀況下, 經過註冊事件的形式, 待稍後再重試寫回包
        // 網絡正常的狀況下, 一次性把全部數據都寫完
        if (clienetHasPendingReplies(c)) {
            int ae_flags = AE_WRITABLE;

            if (server.aof_state == AOF_ON && server.aof_fsync == AOF_FSYNC_ALWAYS) {
                ae_flags |= AE_BARRIER;
            }

            if (aeCreateFileEvent(server.el, c->fd, ae_flags, sendReplyToClient, c) == AE_ERR) {
                freeClientAsync(c);
            }
        }
    }

    // ...
}

writeToClient函數中就寫的很直白了, 就是調用write向客戶端寫回包. 若是client->reply上還有數據, 也一個個的寫給客戶端. 這裏就不展現這個函數的實現了, 很簡單, 沒什麼可說的. sendReplyToClient幾乎等同於writeToClient, 只是將後者包裝成了符合事件處理回調函數簽名的形式.

寫回包過程當中有兩點須要注意:

  1. 若是writeToClient因爲一些異常緣由(特別是網絡波動), 致使在寫client->reply中的某個結點的數據失敗(writeEAGAIN時), writeToClient是會返回 C_OK的, 但實際上數據未發送完畢. 這種狀況下, 後續會再經過事件處理器, 來執行重試.
  2. 若是在writeToClient中, 把數據發送結束了, 在函數末尾, 是會主動刪除掉對於client->fdWRITEABLE事件的監聽的.

至此, 從客戶端發送請求, 到服務端接收二進制數據, 解析二進制數據, 執行命令, 再寫回包的整個流程就結束了.

8. 總結

  1. Redis服務端的運行機制的核心在於事件處理器. 其實現至關樸素, 不難讀懂. 事件處理器會自動選擇當前操做系統平臺上最優的多路IO複用接口, 這就是是爲何Redis服務端雖然是單進程單線程在跑, 但依然快成狗的緣由
  2. Redis服務端的核心代碼是xxxCommand一系列命令處理函數. 這部分命令處理函數分佈在t_hash.c, t_list.c, t_set.c, t_string.c, t_zset.cserver.c中, 分別是各類Redis Value Type相關的命令實現.
  3. Redis服務端的核心數據結構實例是全局變量server. 就單機範疇的討論來講, clients字段及其它相關字段, 持有着客戶端. el字段持有着事件處理器, db字段持有着數據庫
  4. Redis協議是一個設計簡單, 且閱讀友好的協議
  5. Redis服務端的啓動過程其實十分簡單, 在單機數據庫範疇裏, 主要是初始化事件處理器el與數據庫db字段. 其中db字段就是初始化了一坨dict, 而後監聽服務端口.
  6. Redis服務端有一個優化點, 本文沒有涉及, 便是服務端啓動過程當中會調用一個名爲createSharedObjects()函數. 這個函數中建立了大量的string對象, 用於全局複用
  7. 客戶端與服務端的交互, 主要涉及二進制數據讀取, 協議解析, 命令執行, 與寫回包. 其中對於超大參數的讀取有特殊優化
  8. 服務端寫回包沒有使用事件處理器監聽數據鏈接的可寫事件, 而是以一種輪詢的方式, 在每次事件處理器運行以前, 處理全部clients的待回包數據, 將其回寫. 僅在網絡波動等異常狀況下致使須要重試發送的場合下, 寫回包才涉及事件處理器.
相關文章
相關標籤/搜索