redis爲了優化內存操做, 封裝了一層內存操做接口. 默認狀況下, 其底層實現就是最簡樸的libc中的malloc
系列接口. 若是有定製化需求, 能夠經過配置方式, 將底層內存操做的實現更換爲tcmalloc
或jemalloc
庫.c++
redis封裝的這一層接口, 其接口定義與默認實如今zmalloc.h
與zmalloc.c
中. 其默認實現支持在O(1)複雜度的狀況下返回內存塊的大小. 具體實現上的思路也十分簡樸: 就是在內存塊頭部多分配一個long
的空間, 將內存塊的大小記錄在此.程序員
在zmalloc.c
中能夠看到, 底層接口的具體實現可選的有tcmalloc
與jemalloc
兩種:redis
/* Explicitly override malloc/free etc when using tcmalloc. */ #if defined(USE_TCMALLOC) #define malloc(size) tc_malloc(size) #define calloc(count,size) tc_calloc(count,size) #define realloc(ptr,size) tc_realloc(ptr,size) #define free(ptr) tc_free(ptr) #elif defined(USE_JEMALLOC) #define malloc(size) je_malloc(size) #define calloc(count,size) je_calloc(count,size) #define realloc(ptr,size) je_realloc(ptr,size) #define free(ptr) je_free(ptr) #define mallocx(size,flags) je_mallocx(size,flags) #define dallocx(ptr,flags) je_dallocx(ptr,flags) #endif
內存分配接口的實現以下, 從代碼中能夠看出, 其頭部多分配了PREFIX_SIZE
個字節用於存儲數據區的長度.數據庫
void *zmalloc(size_t size) { void *ptr = malloc(size+PREFIX_SIZE); if (!ptr) zmalloc_oom_handler(size); #ifdef HAVE_MALLOC_SIZE update_zmalloc_stat_alloc(zmalloc_size(ptr)); return ptr; #else *((size_t*)ptr) = size; update_zmalloc_stat_alloc(size+PREFIX_SIZE); return (char*)ptr+PREFIX_SIZE; #endif }
redis中的事件處理機制和內存分配同樣, 也是糊了一層接口. 其底層實現是可選的. 默認狀況下, 會自動選取當前OS平臺上速度最快的多路IO接口, 好比在Linux平臺上就是epoll
, 在Sun/Solaris系列平臺上會選擇evport
, 在BSD/Mac OS平臺上會選擇kqueue
, 實再沒得選了, 會使用POSIX標準的select
接口. 保證起碼能跑起來api
事件處理對各個不一樣底層實現的包裹, 分別在ae_epoll.c
, ae_evport.c
, ae_kqueue
, ae_select.c
中, 事件處理器的定義與實如今ae.h
與ae.c
中.數組
ae.h
中, 定義了bash
ae***Proc
函數指針類型別名aeFileEvent
與aeTimeEvent
aeFiredEvent
, 表明被觸發的事件aeEventLoop
, 一個事件處理器. 包含一個事件循環.分別以下:服務器
#define AE_NONE 0 /* No events registered. */ #define AE_READABLE 1 /* Fire when descriptor is readable. */ #define AE_WRITABLE 2 /* Fire when descriptor is writable. */ #define AE_BARRIER 4 /* With WRITABLE, never fire the event if the READABLE event already fired in the same event loop iteration. Useful when you want to persist things to disk before sending replies, and want to do that in a group fashion. */ // ..... /* Types and data structures */ typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask); typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData); typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData); typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop); /* File event structure */ typedef struct aeFileEvent { int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */ aeFileProc *rfileProc; // 可讀回調 aeFileProc *wfileProc; // 可寫回調 void *clientData; // 自定義數據 } aeFileEvent; /* Time event structure */ typedef struct aeTimeEvent { long long id; /* time event identifier. */ // 定時事件的編號 long when_sec; /* seconds */ // 觸發時間(單位爲秒) long when_ms; /* milliseconds */ // 觸發時間(單位爲毫秒) aeTimeProc *timeProc; // 定時回調 aeEventFinalizerProc *finalizerProc; // 事件自殺析構回調 void *clientData; // 自定義數據 struct aeTimeEvent *prev; // 鏈表的前向指針與後向指針 struct aeTimeEvent *next; } aeTimeEvent; /* A fired event */ typedef struct aeFiredEvent { // 描述了一個被觸發的事件 int fd; // 文件描述符 int mask; // 觸發的類型 } aeFiredEvent; /* State of an event based program */ typedef struct aeEventLoop { // 描述了一個熱火朝天不停循環的事件處理器 int maxfd; // 當前監控的文件事件中, 文件描述符的最大值 int setsize; // 事件處理器的最大容量(能監聽的文件事件的個數) long long timeEventNextId; // 下一個到期要執行的定時事件的編號 time_t lastTime; // 上一次有回調函數執行的時間戳, 用以防止服務器時間抖動或溢出 aeFileEvent *events; // 全部註冊的文件事件 aeFiredEvent *fired; // 全部被觸發的文件事件 aeTimeEvent *timeEventHead; // 定時事件鏈表 int stop; // 急停按鈕 void *apidata; // 底層多路IO接口所需的額外數據 aeBeforeSleepProc *beforesleep; // 一次循環結束, 全部事件處理結束後, 要執行的回調函數 aeBeforeSleepProc *aftersleep; // 一次循環開始, 在執行任何事件以前, 要執行的回調函數 } aeEventLoop;
對於文件事件來講, 除了可讀AE_READABLE
與可寫AE_WRITABLE
兩種監聽觸發方式外, 還有一種額外的AE_BARRIER
觸發方式. 若監聽文件事件時, 將AE_BARRIER
與AE_WRITABLE
組合時, 保證若當前文件若是正在處理可讀(被可讀觸發), 就再也不同時觸發可寫. 在一些特殊場景下這個特性是比較有用的.網絡
事件處理器aeEventLoop
中, 對於文件事件的處理, 走的都是老套路. 這裏須要注意的是事件處理器中, 定時事件的設計:數據結構
事件處理器的核心接口有如下幾個:
// 建立一個事件處理器實例 aeEventLoop *aeCreateEventLoop(int setsize); // 銷燬一個事件處理器實例 void aeDeleteEventLoop(aeEventLoop *eventLoop); // 事件處理器急停 void aeStop(aeEventLoop *eventLoop); // 建立一個文件事件, 並添加進事件處理器中 int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData); // 從事件處理器中刪除一個文件事件. void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask); // 經過文件描述符, 獲取事件處理器中, 該文件事件註冊的觸發方式(可讀|可寫|BARRIER) int aeGetFileEvents(aeEventLoop *eventLoop, int fd); // 建立一個定時事件, 並掛在事件處理器的時間事件鏈表中. 返回的是建立好的定時事件的編號 long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc); // 刪除事件處理器中的某個時間事件 int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id); // 調用多路IO接口, 處理全部被觸發的文件事件, 與全部到期的定時事件 int aeProcessEvents(aeEventLoop *eventLoop, int flags); // tricky接口.內部調用poll接口, 等待milliseconds毫秒, 或直至指定fd上的指定事件被觸發 int aeWait(int fd, int mask, long long milliseconds); // 事件處理器啓動器 void aeMain(aeEventLoop *eventLoop); // 獲取底層路IO接口的名稱 char *aeGetApiName(void); void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep); void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep); int aeGetSetSize(aeEventLoop *eventLoop); int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);
核心代碼以下:
首先是啓動器, 啓動器負責三件事:
beforesleep
回調, 若是設置了該回調的值, 那麼在每次事件循環以前, 該函數會被執行aeProcessEvents
, 即事件循環
void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); } }
而後是事件循環
int aeProcessEvents(aeEventLoop *eventLoop, int flags) { int processed = 0, numevents; // 無事快速退朝 if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { // 在 "事件處理器中至少有一個文件事件" 或 "以阻塞形式執行事件循環, 且處理定時事件" // 時, 進入該分支 // 這裏有一點繞, 其實就是, 若是 // "事件處理器中沒有監放任何文件事件", 且, // "執行事件循環時指明不執行定時事件, 或雖然指明瞭執行定時事件, 可是要求以非阻塞形式運行事件循環" // 都不會進入該分支, 不進入該分支時, 分兩種狀況: // 1. 沒監放任何文件事件, 且不執行定時事件, 至關於直接返回了 // 2. 沒監放任何文件事件, 執行定時事件, 但要求以非阻塞形式執行. 則當即執行全部到期的時間事件, 若是沒有, 則至關於直接返回 // 第一步: 計算距離下一個到期的定時事件, 還有多長時間. 計該時間爲tvp // 即使本次事件循環沒有指明要處理定時事件, 也計算這個時間 int j; aeTimeEvent *shortest = NULL; struct timeval tv, *tvp; if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) shortest = aeSearchNearestTimer(eventLoop); if (shortest) { long now_sec, now_ms; aeGetTime(&now_sec, &now_ms); tvp = &tv; /* How many milliseconds we need to wait for the next * time event to fire? */ long long ms = (shortest->when_sec - now_sec)*1000 + shortest->when_ms - now_ms; if (ms > 0) { tvp->tv_sec = ms/1000; tvp->tv_usec = (ms % 1000)*1000; } else { tvp->tv_sec = 0; tvp->tv_usec = 0; } } else { /* If we have to check for events but need to return * ASAP because of AE_DONT_WAIT we need to set the timeout * to zero */ if (flags & AE_DONT_WAIT) { tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } else { /* Otherwise we can block */ tvp = NULL; /* wait forever */ } } // 第二步: 以tvp爲超時時間, 調用多路IO接口, 獲取被觸發的文件事件, 並處理文件事件 // 若是tvp爲null, 即當前事件處理器中沒有定時事件, 則調用aeApiPoll的超時時間是無限的 numevents = aeApiPoll(eventLoop, tvp); /* After sleep callback. */ if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) eventLoop->aftersleep(eventLoop); for (j = 0; j < numevents; j++) { aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int fired = 0; /* Number of events fired for current fd. */ // 一般狀況下, 若是一個文件事件, 同時被可讀與可寫同時觸發 // 都是先執行可讀回調, 再執行可寫回調 // 但若是事件掩碼中帶了AE_BARRIER, 就會扭轉這個行爲, 先執行可寫回調, 再執行可讀加高 int invert = fe->mask & AE_BARRIER; if (!invert && fe->mask & mask & AE_READABLE) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } if (fe->mask & mask & AE_WRITABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { fe->wfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } if (invert && fe->mask & mask & AE_READABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } processed++; } } // 按需執行時間事件 if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop); return processed; /* return the number of processed file/time events */ }
在anet.h
與anet.c
中, 對*nix Tcp Socket相關接口進行了一層薄封裝, 基本上就是在原生接口的基礎上作了一些錯誤處理, 並整合了一些接口調用. 這部分代碼很簡單, 甚至很無聊.
在Redis進程中, 有一個全局變量 struct redisServer server
, 這個變量描述的就是Redis服務端. (結構體定義位於server.h
中, 全局變量的定義位於server.c
中), 結構體的定義以下:
struct redisServer { // ... aeEventLoop * el; // 事件處理器 // ... redisDb * db; // 數據庫數組. // ... int ipfd[CONFIG_BINDADDR_MAX]; // listening tcp socket fd int ipfd_count; int sofd; // listening unix socket fd // ... list * clients; // 全部活動客戶端 list * clients_to_close; // 待關閉的客戶端 list * clients_pending_write; // 待寫入的客戶端 // ... client * current_clients; // 當前客戶端, 僅在crash報告時才使用的客戶端 int clients_paused; // 暫停標識 // ... list * clients_waiting_acks; // 執行WAIT命令的客戶端 // ... uint64_t next_client_id; // 下一個鏈接至服務端的客戶端的編號, 自增編號 }
這個結構體中的字段實再是太多了, 這裏僅列出一些與單機數據庫相關的字段
結構體中定義了一個指針字段, 指向struct redisDb
數組類型. 這個結構的定義以下(也位於server.h
中)
typedef struct redisDb { dict *dict; /* 數據庫的鍵空間 */ dict *expires; /* 有時效的鍵, 以及對應的過時時間 */ dict *blocking_keys; /* 隊列: 與阻塞操做有關*/ dict *ready_keys; /* 隊列: 與阻塞操做有關 */ dict *watched_keys; /* 與事務相關 */ int id; /* 數據庫編號 */ long long avg_ttl; /* 統計值 */ } redisDb;
在介紹阻塞操做與事務以前, 咱們只須要關心三個字段:
dict
數據庫中的全部k-v對都存儲在這裏, 這就是數據庫的鍵空間expires
數據庫中全部有時效的鍵都存儲在這裏id
數據庫的編號從數據結構上也能看出來, 單機數據庫的啓動其實須要作如下的事情:
server
, 並初始化server.db
數組, 數組中的每個元素就是一個數據庫在server.c
文件中的main
函數跟下去, 就能看到上面兩步, 這裏的代碼很繁雜, 下面只選取與單機數據庫啓動相關的代碼進行展現:
int main(int argc, char **argv) { // 初始化基礎設計 // ... server.sentinel_mode = checkForSentinelMode(argc,argv); // 從命令行參數中解析, 是否以sentinel模式啓動server initServerConfig(); // 初始化server配置 moduleInitModulesSystem(); // 初始化Module system // ... if (server.sentinel_mode) { // sentinel相關邏輯 initSentinelConfig(); initSentinel(); } // 若是以 redis-check-rdb 或 redis-check-aof 模式啓動server, 則就是調用相應的 xxx_main()函數, 而後終止進程就好了 if (strstr(argv[0],"redis-check-rdb") != NULL) redis_check_rdb_main(argc,argv,NULL); else if (strstr(argv[0],"redis-check-aof") != NULL) redis_check_aof_main(argc,argv); // 處理配置文件與命令行參數, 並輸出歡迎LOGO... // ... // 判斷運行模式, 若是是後臺運行, 則進入daemonize()中, 使進程守護化 server.supervised = redisIsSupervised(server.supervised_mode); int background = server.daemonize && !server.supervised; if (background) daemonize(); // 單機數據庫啓動的核心操做: 初始化server initServer(); // 雜事: 建立pid文件, 設置進程名, 輸出ASCII LOGO, 檢查TCP backlog隊列等 //... if(!server.sentinel_mode) { // 加載自定義Module // ... } else { sentinelIsRunning(); } // 檢查內存狀態, 若有必要, 輸出警告日誌 // ... // 向事件處理器註冊事前回調與過後回調, 並開啓事件循環 // 至此, Redis服務端已經啓動 aeSetBeforeSleepProc(server.el,beforeSleep); aeSetAfterSleepProc(server.el,afterSleep); aeMain(server.el) // 服務端關閉的善後工做 aeDeleteEventLoop(server.el); return 0; }
從main
函數中能大體瞭解啓動流程, 也基本能猜出來, 在最核心的initServer
函數的調用中, 至少要作兩件事:
server.el
server.db
以下:
void initSetver(void) { // 處理 SIG_XXX 信號 // ... // 初始化server.xxx下的一部分字段, 值得關注的有: server.pid = getpid() server.current_client = NULL; server.clients = listCreate(); server.clients_to_close = listCreate(); //... server.clients_pending_write = listCreate(); //... server.unblocked_clients = listCreate(); server.ready_keys = listCreate(); server.clients_waiting_acks = listCreate(); //... server.clients_paused = 0 //... // 建立全局共享對象 createSharedObjects(); // 嘗試根據最大支持的客戶端鏈接數, 調整最大打開文件數 adjustOpenFilesLimit(void) // 建立事件處理器 server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR) if (server.el == NULL) { // ... exit(1); } // 初始化數據庫數組 server.db = zmalloc(sizeof(redisDb) * server.dbnum); // 監聽tcp端口, tcp 監聽文件描述符存儲在 server.ipfd 中 if (server.port != 0 && listenToPort(server.port, server.ipfd, &server.ipfd_count) == C_ERR) exit(1); // 監聽unix端口 if (server.unixsocket != NULL) { //... } // 若是沒有任何tcp監聽fd存在, abort if (server.ipfd_count == 0 && server.sofd < 0) { //... exit(1); } // 初始化數據庫數組中的各個數據庫 for (j = 0; j < server.dbnum; j++) { server.db[j].dict = dictCreate(&dbDictType, NULL); // 初始化鍵空間 server.db[j].expires = dictCreate(&keyptrDictType, NULL); // 初始化有時效的鍵字典. key == 鍵, value == 過時時間戳 server.db[j].blocking_keys = dictCreate(&keylistDictType, NULL);// 初始化阻塞鍵字典. server.db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType, NULL); // 初始化解除阻塞鍵字典 server.db[j].watched_keys = dictCreate(&keylistDictType, NULL); // 初始化與事務相關的一個字典 server.db[j].id = j; // 初始化數據庫的編號 server.db[j].avg_ttl = 0; // 初始化統計數據 } // 其它一大堆細節操做 // ... // 向事件處理器中加一個定時回調, 這個回調中處理了不少瑣事. 好比清理過時客戶端鏈接, 過時key等 if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) { //... exit(1); } // 將tcp與unix監聽fd添加到事件處理器中 // 回調函數分別爲: acceptTcpHandler和acceptUnixHandler for (j = 0; j < server.ipfd_count; j++) { if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler, NULL) == AE_ERR) { serverPanic("Unrecoverable error creating server.ipfd file event"); } } if (server.sofd > 0 && aeCreateFileEvent(server.el, server.sofd, AE_READABLE, acceptUnixHandler, NULL) == AE_ERR) { serverPanic("Unrecoverable error creating server.sofd file event."); } // 註冊一個額外的事件回調: 用於喚醒一個被module操做阻塞的客戶端 // ... // 若有須要, 打開AOF文件 // ... // 對32位機器, 且沒有配置maxmemory時, 自動設置爲3GB // ... // 其它與單機數據庫無關的操做 // ... }
以上就是單機數據庫的啓動流程
總結: 0. Redis服務端進程中, 以server
這個全局變量來描述服務端 0. server
全局變量中, 持有着全部數據庫的指針(server.db
), 一個事件處理器(server.el
), 與全部與其鏈接的客戶端(server.clients
), 監聽的tcp/unix socket fd(server.ipfd, server.sofd
) 0. 在Redis服務端啓動過程當中, 會初始化監聽的tcp/unix socket fd, 並把它們加入到事件處理器中, 相應的事件回調分別爲acceptTcpHandler
與acceptUnixHandler
上面已經講了Redis單機數據庫服務端的啓動過程, 下面來看一看, 當一個客戶端經過tcp鏈接至服務端時, 服務端會作些什麼. 顯然要從listening tcp socket fd在server.el
中的事件回調開始看起.
邏輯上來說, Redis服務端須要作如下幾件事:
這裏咱們先只看創建鏈接部分, 下面是acceptTcpHandler
函數, 即listening tcp socket fd的可讀事件回調:
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { int cport, cfd, max = MAX_ACCEPTS_PER_CALL; char cip[NET_IP_STR_LEN]; UNUSED(el); UNUSED(mask); UNUSED(privdata); while(max--) { cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); // 創建數據鏈接 if (cfd == ANET_ERR) { if (errno != EWOULDBLOCK) serverLog(LL_WARNING, "Accepting client connection: %s", server.neterr); return; } serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport); acceptCommonHandler(cfd,0,cip); // 處理請求 } }
這個函數中只作了兩件事:
acceptCommonHandler
下面是acceptCommonHandler
#define MAX_ACCEPTS_PER_CALL 1000 static void acceptCommonHandler(int fd, int flags, char *ip) { client *c; // 建立一個client實例 if ((c = createClient(fd)) == NULL) { // ... return; } // 若是設置了鏈接上限, 而且當前數據鏈接數已達上限, 則關閉這個數據鏈接, 退出, 什麼也不作 if (listLength(server.clients) > server.maxclients) { //... return; } // 若是服務端運行在保護模式下, 而且沒有登陸密碼機制, 那麼不接受外部tcp請求 if (server.protected_mode && server.bindaddr_count == 0 && server.requirepass == NULL && !(flags & CLIENT_UNIX_SOCKET) && ip != NULL) { if (strcmp(ip,"127.0.0.1") && strcmp(ip,"::1")) { // ... return; } } server.stat_numconnections++; c->flags |= flags; }
這裏能夠看到, 對每個數據鏈接, Redis中將其抽象成了一個 struct client
結構, 全部建立struct client
實例的細節被隱藏在createClient
函數中, 下面是createClient
的實現:
client *createClient(int fd) { client * c = zmalloc(sizeof(client)) if (fd != -1) { anetNonBlock(NULL, fd); // 設置非阻塞 anetEnableTcpNoDelay(NULL, fd); // 設置no delay if (server.tcpkeepalive) anetKeepAlive(NULL, fd, server.tcpkeepalive); // 按需設置keep alive if (aeCreateFileEvent(server.el, fd, AE_READABLE, readQueryFromClient, c) == AE_ERR) { // 註冊事件回調 close(fd); zfree(c); return NULL; } } selectDb(c, 0) // 默認選擇第一個數據庫 uint64_t client_id; atomicGetIncr(server.next_client_id, client_id, 1); // 原子自增 server.next_client_id // 其它字段初始化 c->id = client_id; //... // 把這個client實例添加到 server.clients尾巴上去 if (fd != -1) listAddNodeTail(server.clients,c); return c; }
客戶端與服務器之間的通訊方式有兩種
在單機數據庫範疇中, 咱們先不討論Redis的發佈訂閱模式, 即在本文的討論範圍內, 客戶端與服務器的通訊始終是一問一答式的. 這種狀況下, 還分爲兩種狀況:
後者在官方文檔中被稱爲 pipelining
自上向下描述協議:
數據
數據
有五種類型. 分別是短字符串
, 長字符串
, 錯誤信息
, 數值
, 數組
. 其中數組
是複合類型. 協議中, 數據與數據之間以\r\n
兩個字節做爲定界符短字符串
: +Hello World\r\n
以+
開頭, 末尾的\r\n
是定界符, 因此顯然, 短字符串自己不支持表示\r\n
, 會和定界符衝突錯誤信息
: -Error message\r\n
以-
開頭, 末尾的\r\n
是定界符. 顯然, 錯誤信息裏也不支持包含\r\n
數值
: :9527\r\n
以:
開頭, ASCII字符表示的數值, 僅支持表示整數長字符串
: $8\r\nfuck you\r\n
先以$
開頭, 後面跟着字符串的總長度, 以ASCII字符表示. 再跟一個定界符, 接下來的8個字節就是長字符串的內容, 隨後再是一個定界符.數組
: 這是一個複合類型, 以*
開頭, 後面跟着數組的容量, 以ASCII字符表示, 再跟一個定界符. 而後後面跟着多個其它數據. 好比 *2\r\n$3\r\nfoo\r\n+Hello World\r\n
表示的是一個容量爲2的數組, 第一個數組元素是一個長字符串, 第二個數組元素是一個短字符串.注意: 0. 通常狀況下, 協議體是一個數組 0. 長字符串在表示長度爲0的字符串時, 是$0\r\n\r\n
0. 還有一個特殊值, $-1\r\n
, 它用來表示語義中的null
協議自己是支持二進制數據傳輸的, 只須要將二進制數據做爲長字符串傳輸就能夠. 因此這並不能算是一個嚴格意義上的字符協議. 但若是傳輸的數據多數是可閱讀的數據的話, 協議自己的可讀性是很強的
客戶端向服務端發送請求時, 語法格式基本以下:
單命令請求體, 就是一個數組, 數組中將命令與參數各做爲數組元素存儲, 以下是一個 LLEN mylist
的請求
*2\r\n$4\r\nLLEN\r\n$6\r\nmylist\r\n
這個數組容量爲2, 兩個元素均爲長字符串, 內容分別是LLEN
與mylist
多命令請求體, 直接把多個單命令請求體拼起來就好了, 每一個命令是一個數組, 即請求體中有多個數組
若是按協議向服務端發送請求, 能夠看到, 請求體的第一字節始終要爲*
. 爲了方便一些弱客戶端與服務器交互, REDIS支持所謂的內聯命令. 內聯命令僅是請求體的另外一種表達方式.
內聯命令的使用場合是:
redis-cli
這種狀況下, 直接把適用於redis-cli
的字符命令, 發送給服務端就好了. 內聯命令支持經過telnet發送.
Redis服務端接收到任何一個有效的命令時, 都會給服務端寫回應. 請求-迴應的最小單位是命令, 因此對於多命令請求, 服務端會把這多個請求的對應的迴應, 按順序返回給客戶端.
迴應體依然遵照通訊協議規約.
單命令請求
[root@localhost ~]# echo -e '*2\r\n$4\r\nINCR\r\n$7\r\ncounter\r\n' | nc localhost 6379 :2 [root@localhost ~]#
多命令請求
[root@localhost ~]# echo -e '*1\r\n$4\r\nPING\r\n*2\r\n$4\r\nINCR\r\n$7\r\ncounter\r\n*2\r\n$4\r\nINCR\r\n$7\r\ncounter\r\n*2\r\n$4\r\nINCR\r\n$7\r\ncounter\r\n' | nc localhost 6379 +PONG :4 :5 :6 [root@localhost ~]#
內聯命令請求
[root@localhost ~]# echo -e 'PING' | nc localhost 6379 +PONG [root@localhost ~]# echo -e 'INCR counter' | nc localhost 6379 :7 [root@localhost ~]# echo -e 'INCR counter' | nc localhost 6379 :8 [root@localhost ~]#
經過telnet發送內聯命令請求
[root@localhost ~]# telnet localhost 6379 Trying ::1... Connected to localhost. Escape character is '^]'. PING +PONG INCR counter :9 INCR counter :10 INCR counter :11 ^] telnet> quit Connection closed. [root@localhost ~]#
優勢:
\r\n
這種數據, 不支持多命令請求), 但對於運維人員來講, 可讀性更高, 對閱讀更友好注意事項:
當客戶端與服務端創建鏈接後, 雙方就要進行協議交互. 簡單來講就是如下幾步:
顯然, 從三層(tcp或unix)上接收到的數據, 首先要解析成協議數據, 再進一步解析成命令, 服務端才能進行處理. 三層上的協議是流式協議, 一個請求體可能要分屢次才能完整接收, 這必然要涉及到一個接收緩衝機制. 先來看一看struct client
結構中與接收緩衝機制相關的一些字段:
type struct client { uint64_t id; /* Client incremental unique ID. */ // 客戶端ID, 自增, 惟一 int fd; /* Client socket. */ // 客戶端數據鏈接的底層文件描述符 redisDb *db; /* Pointer to currently SELECTed DB. */ // 指向客戶端當前選定的DB robj *name; /* As set by CLIENT SETNAME. */ // 客戶端的名稱. 由命令 CLIENT SETNAME 設定 sds querybuf; /* Buffer we use to accumulate client queries. */ // 請求體接收緩衝區 sds pending_querybuf; /* If this is a master, this buffer represents the // cluster特性相關的一個緩衝區, 暫忽視 yet not applied replication stream that we are receiving from the master. */ size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size. */ // 近期(100ms或者更長時間中)接收緩衝區大小的峯值 int argc; /* Num of arguments of current command. */ // 當前處理的命令的參數個數 robj **argv; /* Arguments of current command. */ // 當前處理的命令的參數列表 struct redisCommand *cmd, *lastcmd; /* Last command executed. */ // 當前正在處理的命令字, 以及上一次最後執行的命令字 int reqtype; /* Request protocol type: PROTO_REQ_* */ // 區分客戶端是否之內聯形式上載請求 int multibulklen; /* Number of multi bulk arguments left to read. */ // 命令字剩餘要讀取的參數個數 long bulklen; /* Length of bulk argument in multi bulk request. */ // 當前讀取到的參數的長度 } client;
咱們先從宏觀上看一下, 服務端是如何解析請求, 並進行邏輯處理的. 先從服務端數據鏈接的處理回調readQueryFromClient
看起:
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { client *c = (client*) privdata; int nread, readlen; size_t qblen; UNUSED(el); UNUSED(mask); readlen = PROTO_IOBUF_LEN; // 對於大參數的優化 if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1 && c->bulklen >= PROTO_MBULK_BIG_ARG) { // 若是讀取到了一個超大參數, 那麼先把這個超大參數讀進來 // 保證緩衝區中超大參數以後沒有其它數據 ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf); if (remaining < readlen) readlen = remaining; } // 接收數據 qblen = sdslen(c->querybuf); if (c->querybuf_peak < qblen) c->querybuf_peak = qblen; c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); nread = read(fd, c->querybuf+qblen, readlen); // ... sdsIncrLen(c->querybuf,nread); c->lastinteraction = server.unixtime; if (c->flags & CLIENT_MASTER) c->read_reploff += nread; server.stat_net_input_bytes += nread; if (sdslen(c->querybuf) > server.client_max_querybuf_len) { // client的緩衝區中數據若是超長了, 最有可能的緣由就是服務端處理能力不夠, 數據堆積了 // 或者是受惡意攻擊了 // 這種狀況下, 幹掉這個客戶端 // ... return; } // 處理緩衝區的二進制數據 if (!(c->flags & CLIENT_MASTER)) { processInputBuffer(c); } else { // cluster特性相關的操做, 暫時忽略 // ... } }
這裏有一個優化寫法, 是針對特大參數的讀取的. 在介紹這個優化方法以前, 先大體的看一下做爲單機數據庫, 服務端請求客戶端請求的概覽流程(你可能須要在閱讀完如下幾節以後再回頭來看這裏):
通常狀況下, 請求體的長度不會超過16kb(即便包括多個命令), 那麼處理流程(假設請求是協議數據, 而非內聯命令)是這樣的:
而正常狀況下, 一個請求體的長度不該該超過16kb, 而若是請求體的長度超過16kb的話, 有下面兩種可能:
SET
單命令請求, 但其值參數是一個長度爲40kb的二進制數據在第一種狀況下, 若是知足如下條件的話, 這個請求會被服務端看成兩個請求去處理:
初次數據鏈接的可讀回調接收了16kb的數據, 這16kb的數據剛好是多個請求. 沒有請求被截斷! 便是一個20kb的請求體, 被自然的拆分紅了兩個請求, 第一個請求剛好16kb, 一字節很少, 一字節很多, 第二個請求是剩餘的4kb. 那麼服務端看來, 這就是兩個多命令請求
這種狀況極其罕見, Redis中沒有對這種狀況作處理.
通常狀況下, 若是發送一個20kb的多命令請求, 且請求中沒有超大參數的話(意味着命令特別多, 上百個), 那麼總會有一個命令被截斷, 若是被階段, 第一次的處理流程就會在處理最後一個被截斷的半截命令時, 在如圖紅線處流程終止, 而僅在第二次處理流程, 即處理剩餘的4kb數據時, 纔會走到藍線處. 即兩次收包, 一次寫回包:
在第二種狀況下, 因爲某幾個參數特別巨大(大於PROTO_MBULK_BIG_ARG
宏的值, 即32kb), 致使請求體一次不能接收完畢. 勢必也要進行屢次接收. 原本, 樸素的思想上, 只須要在遇到超大參數的時候, 調整16kb這個閾值便可. 好比一個參數爲40kb的二進制數據, 那麼在接收到這個參數時, 破例調用read
時一次性把這個參數讀取完整便可. 除此外不須要什麼特殊處理.
便是, 處理流程應當以下:
但這樣會有一個問題: 當在processMultiBulkBuffer
中, 將超大參數以string對象的形式放在client->argv
中時, 建立這個string對象, 底層是拷貝操做:
int processMultibulkBuffer(client * c) { // .... c->argv[c->argc++] = createStringObject(c->querybuf+pos, c->bulklen); // .... }
對於超大參數來講, 這種string對象的建立過程, 涉及到的內存分配與數據拷貝開銷是很昂貴的. 爲了不這種數據拷貝, Redis把對於超大參數的處理優化成了以下:
從傳輸層獲取到數據以後, 下一步就是傳遞給processInputBuffer
函數, 來進行協議解析, 以下:
void processInputBuffer(client * c) { server.current_client = c; // 每次循環, 解析並執行出一個命令字(及其參數), 並執行相應的命令 // 直到解析失敗, 或緩衝區數據耗盡爲止 while(sdslen(c->querybuf)) { // 在某些場合下, 不作處理 if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break; if (c->flags & CLIENT_BLOCKED) break; if (c->flag & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break; // 若是client的reqtype字段中沒有標明該客戶端傳遞給服務端的請求類型(是內聯命令, 仍是協議數據) // 那麼分析緩衝區中的第一個字節來決定. 這通常發生在客戶端向服務端發送第一個請求的時候 if (!c->reqtype) { if (c->querybuf[0] == '*') c->reqtype = PROTO_REQ_MULTIBULK; else c->reqtype = PROTO_REQ_INLINE; } // 按類型解析出一個命令, 及其參數 if (c->reqtype == PROTO_REQ_INLINE) { if (processInlineBuffer(c) != C_OK) break; } else if (c->reqtype == PROTO_REQ_MULTIBULK) { if (processMultibulkBuffer(c) != C_OK) break; } else { serverPanic("Unknow request type") } // 執行命令邏輯 if (c->argc == 0) { // 當argc的值爲0時, 表明本次解析沒有解析出任何東西, 數據已耗盡 resetClient(c); } else { // 執行邏輯 if(processCommand(c) == C_OK) { if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { c->reploff = c->read_reploff - sdslen(c->querybuf); } if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE) { resetClient(c); } } if (server.current_client == NULL) break; } } server.current_client = NULL; }
這一步作了分流處理, 若是是內聯命令, 則經過processInlineBuffer
進行協議解析, 若是不是內聯命令, 則經過processMultibulkBuffer
進行協議解析. 協議解析完成後, 客戶端上載的數據就被轉換成了command, 而後接下來調用processCommand
來執行服務端邏輯.
咱們略過內聯命令的解析, 直接看非內聯命令請求的協議解析過程
int processMultibulkBuffer(client * c) { //... // 解析一個請求的頭部時, 即剛開始解析一個請求時, multibulklen表明着當前命令的命令+參數個數, 該字段初始值爲0 // 如下條件分支將讀取一個命令的參數個數, 好比 '*1\r\n$4\r\nPING\r\n' // 如下分支完成後, 將有: // c->multibulklen = 1 // c->argv = zmalloc(sizeof(robj*)) // pos = <指向$> if (c->multibulklen == 0) { // ... // 讀取當前命令的參數個數, 即賦值c->multibulklen newline = strchr(c->querybuf, '\r'); if (newline == NULL) { // 請求體不知足協議格式 // ... return C_ERR: } // ... ok = string2ll(c->querybuf+1, newline-(c->querybuf+1), &ll) if (!ok || ll > 1024 * 1024) { // 解析數值出錯, 或數值大於1MB return C_ERR; } // 如今pos指向的是命令字 pos = (newline-c->querybuf) + 2 if (ll <= 0) { // 解出來一個空命令字, 什麼也不作, 把緩衝區頭部移除掉, 而且返回成功 sdsrange(c->querybuf, pos, -1); return C_OK; } c->multibulklen = ll; // 爲命令字及其參數準備存儲空間: 將它們都存儲在c->argv中, 且是以redisObject的形式存儲着 if (c->argv) zfree(c->argv); c->argv = zmalloc(sizeof(robj*)*c->multibulklen); } // ... // 讀取解析命令字自己與其全部參數 while(c->multibulklen) { // 讀取當前長字符串的長度 if (c->bulklen == -1) { // ... // 若是當前指針指向的參數, 是一個超大參數, 則把接收緩衝區修剪爲該參數 pos += newline-(c->querybuf+pos) + 2; // 如今pos指向的是參數, 或命令字的起始位置 if (ll >= PROTO_MBULK_BIG_ARG) { size_t qblen; sdsrange(c->querybuf, pos, -1); // 將接收緩衝區修剪爲該參數 pos = 0; qblen = sdslen(c->querybuf); if (qblen < (size_t)ll + 2) { // 若是這個超大參數還未接收完畢, 則擴充接收緩衝區, 爲其預留空間 c->querybuf = sdsMakeRoomFor(c->querybuf, ll+2-qblen); } } c->bulklen == ll; } // 讀取長字符串內容, 便是命令字自己, 或某個參數 if (sdslen(c->querybuf) - pos < (size_t)(c->bulklen+2)) { // 當前緩衝區中, 該長字符串的內容還沒有接收完畢, 跳出解析 break; } else { if (pos == 0 && c->bulklen >= PROTO_MBULK_BIG_ARG && sdslen(c->querybuf) == (size_t)(c->bulklen+2)) { // 對於超大參數, 爲了不數據複製的開銷, 直接以接收緩衝區爲底料, 建立一個字符串對象 // 而對於接收緩衝區, 爲其從新分配內存 // 注意, 若是當前指針指向的是一個超大參數, 接收機制保證了, 這個超大參數以後, 緩衝區沒有其它數據 c->argv[c->argc++] = createObject(OBJ_STRING, c->querybuf) sdsIncrLen(c->querybuf, -2); // 移除掉超大參數末尾的\r\n c->querybuf = sdsnewlen(NULL, c->bulklen + 2); // 爲接收緩衝區從新分配內存 sdsclear(c->querybuf); pos = 0; } else { // 對於普通參數或命令字, 以拷貝形式新建字符串對象 c->argv[c->argc++] = createStringObject(c->querybuf+pos, c->bulklen); pos += c->bulklen+2; // pos指向下一個參數 } c->bulklen = -1; c->multibulklen--; } } // 至此, 一個命令, 及其全部參數, 均被存儲在 c->argv中, 以redisObject形式存儲着, 對象的類型是string // 扔掉全部已經解析過的二進制數據 if (pos) sdsrange(c->querybuf, pos, -1); if (c->multibulklen == 0) return C_OK return C_ERR; }
一個命令及其全部參數若是被成功解析, 則processMultibulkBuffer
函數會返回C_OK
, 並在隨後調用processCommand
執行這個命令. processCommand
內部包含了將命令的迴應寫入c->buf
的動做(封裝在函數addReply
中. 若是命令或命令中的參數在解析過程出錯, 或參數還沒有接收完畢, 則會返回C_ERR
, 退棧, 從新返回, 直至再次從傳輸層讀取數據, 補齊全部參數, 再次回到這裏.
接下來咱們來看執行命令並寫回包的過程, processCommand
中的關鍵代碼以下:
int processCommand(client * c) { if (!strcasecmp(c->argv[0]->ptr, "quit")) { // 對 quit 命令作單獨處理, 調用addReply後, 返回 addReply(c, shared.ok); c->flags |= CLIENT_CLOSE_AFTER_REPLY; return C_ERR; } // 拿出c->argv[0]中存儲的string對象中的ptr字段(實際上是個sds字符串) // 與server.commands中的命令字表進行比對, 查找出對應的命令, 即 struct redisCommand* 句柄 c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr); if (!c->cmd) { // 查無該命令, 進行錯誤處理 flagTransaction(c); // 若是這是一個在事務中的命令, 則將 CLIENT_DIRTY_EXEC 標誌位貼到 c-flags 中, 指示着事務出錯 addReplyErrorFormat(c, "unknown command '%s'", (char *)c->argv[0]->ptr) return C_OK; } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) || (c->argc < -c->cmd->arity)) { // 對比命令所須要的參數數量, 是否和argc中的數量一致, 若是不一致, 說明錯誤 flagTransaction(c); // 依然同上, 事務出錯, 則打 CLIENT_DIRTY_EXEC 標誌位 addReplyErrorFormat(c, "wrong number of arguments for '%s' command", c->cmd->name); return C_OK; } // 檢查用戶的登陸態, 即在服務端要求登陸認證時, 若當前用戶還未認證, 且當前命令不是認證命令的話, 則報錯 if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand) { flagTransaction(c); addReply(c, shared.noautherr); return C_OK; } // 與集羣相關的代碼 // 若是集羣模式開啓, 則一般狀況下須要將命令重定向至其它實例, 但在如下兩種狀況下不須要重定向: // 1. 命令的發送者是master // 2. 命令沒有鍵相關的參數 if (server.cluster_enabled && !(c->flags & CLIENT_MASTER) && !(c->flags & CLIENT_LUA && server.lua_caller->flags & CLIENT_MASTER) && !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 && c->cmd->proc != execCommand)) { int hashslot, error_code; clusterNode * n = getNodeByQuery(c, c->cmd, c->argv, c->argc, &hashslot, &error_code); if (n == NULL || n != server.cluster->myseld) { if (c->cmd->proc == execCommand) { discardTransaction(c); } else { flagTransaction(c); } clusterRedirectClient(c, n, hashslot, error_code); // 重定向 return C_OK; } } // 在正式執行命令以前, 首先先嚐試的放個屁, 擠點內存出來 if (server.maxmemory) { int retval = freeMemoryIfNeeded(); if (server.current_client == NULL) return C_ERR; if ((c->cmd->flags & CMD_DENYOOM) && retval == C_ERR ) { flagTransaction(c); addReply(c, shared.oomerr); return C_OK; } } // 在持久化出錯的狀況下, master不接受寫操做 if ( ( (server.stop_writes_on_bgsave_err && server.saveparamslen > 0 && server.lastbgsave_status == C_ERR) || server.aof_last_write_status == C_ERR ) && server.masterhost == NULL && (c->cmdd->flags & CMD_WRITE || c->cmd->proc == pingCommand) ) { flagTransaction(c); if (server.aof_last_write_status == C_OK) { addReply(c, shared.bgsaveerr); } else { addReplySds(c, sdscatprintf(sdsempty(), "-MISCONF Errors writing to the AOF file: %s\r\n", strerror(server.aof_last_write_errno))); } return C_OK; } // 還有幾種不接受寫操做的場景, 這裏代碼省略掉, 這幾種場景包括: // 1. 用戶配置了 min_slaves-to_write配置項, 而符合要求的slave數量不達標 // 2. 當前是一個只讀slave // ... // 在發佈-訂閱模式中, 僅支持 (P)SUBSCRIBE/(P)UNSUBSCRIBE/PING/QUIT 命令 if (c->flags & CLIENT_PUBSUB && c->cmd->proc != pingCommand && c->cmd->proc != subscribeCommand && c->cmd->proc != unsubscribeCommand && c->cmd->proc != psubscribeCommand && c->cmd->proc != punsubscribeCommand ) { addReplyError(c, "only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context"); return C_OK; } // 當用戶配置slave-serve-stale-data爲no, 且當前是一個slave, 且與master中斷鏈接的狀況下, 僅支持 INFO 和 SLAVEOF 命令 if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED && server.repl_server_stale_data == 0 && !(c->cmd->flags & CMD_STALE)) { flagTransaction(c); addReply(c, shared.masterdownerr); return C_OK; } // 若是當前正在進行loading操做, 則僅接受 LOADING 相關命令 if (server.loading && !(c->cmd->flags & CMD_LOADING)) { addReply(c, shared.loadingerr); return C_OK; } // 若是是lua腳本操做, 則支持受限數量的命令 // ... // 終於到了終點: 執行命令 // 對於事務中的命令: 進隊列, 但不執行 // 對於事務結束標記EXEC: 調用call, 執行隊列中的全部命令 // 對於非事務命令: 也是調用call, 就地執行 if ( c->flags & CLIENT_MULTI && c->cmd->proc != execCommand && c->cmd->proc != discardCommand && c->cmd->proc != multiCommand && c->cmd->proc != watchCommand) { // 若是是事務, 則將命令先排進隊列 queueMultiCommand(c); addReply(c, shared.queued); } else { // 若是是非事務執行, 則直接執行 call (c, CMD_CALL_FULL); c->woff = server.master_repl_offset; if (listLength(server.ready_keys)) { handleClientsBlockedOnLists(); } } return C_OK; }
須要注意的點有:
server
中, 持有着一個命令表, 它自己是一個dict結構, 其key是爲字符串, 即Redis命令的字符串, value則是爲redisCommand
結構, 描述了一個Redis中的命令. 能夠看到processCommand
一開始作的第一件事, 就是用c->argv[0]->ptr
去查詢對應的redisCommand
句柄MULTI
命令標記開始, 由EXEC
命令標記結束的. 若是一個命令處於事務之中, 那麼processCommand
內部並不會真正執行這個命令, 僅當最終EXEC
命令來臨時, 將事務中的全部命令所有執行掉. 而實際執行命令的函數, 是爲call
, 這是Redis命令執行流程中最核心的一個函數.MULTI
和EXEC
中, 那麼這多個命令實際上是一個個就地執行的.有關事務的更詳細細節咱們會在稍後討論, 但目前, 咱們先來看一看這個最核心的call
函數, 如下代碼隱藏了無關細節
void call(client * c, int flags) { // ... // 與主從複製相關的代碼: 將命令分發給全部MONITOR模式下的從實例 // ... // 調用命令處理函數 // ... c->cmd->proc(c); // ... // AOF, 主從複製相關代碼 // ... }
撥開其它特性無論, 單機數據庫在這一步其實很簡單, 就是調用了對應命令字的回調函數來處理
咱們挑其中一個命令SET
來看一下, 命令處理回調中都幹了些什麼:
void setCommand(client * c) { int j; robj * expire = NULL; int uint = UINT_SECONDS; int flags = OBJ_SET_NO_FLAGS; for (j = 3; i < c->argc; j++) { // 處理SET命令中的額外參數 // ... } c->argv[2] = tryObjectEncoding(c->argv[2]); setGenericCommand(c, flags, c->argv[1], c->argv[2], expire, unit, NULL, NULL) } void setGenericCommand(client * c, int flags, robj * key, robj, * val, robj * expire, int unit, robj * ok_reply, robj * abort_reply) { long long milliseconds = 0; if (expire) { // 從string對象中讀取數值化的過時時間 if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != C_OK) return; if (milliseconds <= 0){ addReplyErrorFormat(c, "invalid expire time in %s", c->cmd->name); return; } if (unit == UNIT_SECONDS) milliseconds *= 1000; } if ( (flags & OBJ_SET_NX && lookupKeyWrite(c->db, key) != NULL) || (flags & OBJ_SET_XX && lookupKeyWrite(c->db, key) == NULL) ) { // NX 與 XX 時, 判斷鍵是否存在 addReply(c, abort_reply ? abort_reply : shared.nullbulk); return; } // 執行所謂的set操做, 即把鍵值對插入至數據庫鍵空間去, 若是鍵已存在, 則替換值 setKey(c->db, key, val); server.dirty++; // 統計數據更新 // 設置過時時間 if(expire) setExpire(c, c->db, key, kstime() + milliseconds); notifyKeyspaceEvent(NOTIFY_STRING, "set", key, c->db->id); if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC, "expire", key, c->db->id); addReply(c, ok_reply ? ok_reply : shared.ok); } void setKey(redisDb * db, robj * key, robj * val) { if (lookupKeyWrite(db, key) == NULL) { dbAdd(db, key, val); // 對 dictAdd 的封裝 } else { dbOverwrite(db, key, val); // 對 dictReplace的封裝 } incrRefCount(val); removeExpire(db, key); signalModifiedKey(db, key); }
從 setCommand
->setGenericCommand
->setKey
->dictXXX
這樣一路追下來, 能夠看到最終的操做, 是對於某個數據庫實例中, 鍵空間的操做. 邏輯比較清晰. 其它的命令也基本相似, 鑑於Redis中的命令衆多, 這裏沒有篇幅去一個一個的介紹, 也沒有必要.
至此, 用戶的命令經由服務端接收, 解析, 以及執行在了某個數據庫實例上. 咱們在一路跟代碼的過程當中, 隨處可見的錯誤處理中都有addReplyXXX
系列的函數調用, 在SET
命令的最終執行中, 在setGenericCommand
中, setKey
操做成功後, 也會調用addReply
. 顯然, 這個函數是用於將服務端的迴應數據寫入到某個地方去的.
從邏輯上來說, 服務端應當將每一個命令的迴應寫入到一個緩衝區中, 而後在適宜的時刻, 序列化爲迴應體(符合協議規約的二進制數據), 經由網絡IO回發給客戶端. 接下來, 咱們就從addReply
入手, 看一看回包的流程.
addReply
系列函數衆多, 有十幾個, 其它函數諸如addReplyBulk
, addReplyError
, addReplyErrorFormat
等, 只是一些簡單的變體. 這一系列函數完成的功能都是相似的: 總結起來就是兩點:
addReply
以下:
void addReply(client * c, robj * obj) { // 檢查客戶端是否能夠寫入回包數據, 這個函數在每一個 addReplyXXX 函數中都會被首先調用 // 它的職責有: // 0. 多數狀況下, 對於正常的客戶端, 它會返回 C_OK. 而且若是該客戶端的可寫事件沒有在事件處理器中註冊回調的話, 它會將這個客戶端先掛在 server.clients_pending_write 這個鏈表上 // 0. 在特殊狀況下, 對於不能接收回包的客戶端(好比這是一個假客戶端, 沒有數據鏈接sockfd, 這種客戶端在AOF特性中有有到), 返回 C_ERR if (prepareClientToWrite(c) != C_OK) return; if (sdsEncodedObject(obj)) { // 若是迴應數據, 是RAW或EMBSTR編碼的string對象 if (_addReplyToBuffer(c, obj->ptr, sdslen(obj->ptr)) != C_OK) { // 先嚐試調用 _addReplyToBuffer, 將回應數據寫進 client->buf 緩衝區中 // 若是失敗了, 則把迴應數據掛到 client->reply 這個鏈表上去 _addReplyObjectToList(c, obj); } } else if (obj->encoding == OBJ_ENCODING_INT) // 若是迴應數據是INT編碼的string對象, 這裏有優化操做 if (listLength(c->reply) == 0 && (sizeof(c->buf) - c->bufpos) >= 32) { char buf[32]; int len; // 因爲INT編碼的string對象內部是使用 redisObject->ptr 指針直接存儲的數值 // 因此直接讀這個指針的值, 將其轉換爲字符表示, 嘗試存進 client->buf 緩衝區中 len = ll2string(buf, sizeof(buf), (long)obj->ptr); if (_addReplyToBuffer(c, buf, len) == C_OK) return; } // 若是存進 client->buf 失敗, 就把對應的數值轉換爲 EMBSTR 或RAW 編碼的string對象 // 掛在 client->reply 鏈表上 obj = getDecodedObject(obj); if (_addReplyToBuffer(c, obj->ptr, sdslen(obj->ptr)) != C_OK) _addReplyObjectToList(c, obj); decrRefCount(obj); } else { serverPanic("Wrong obj->encoding in addReply()") } }
這裏會看到, 迴應緩衝區有兩個:
client->buf
是一個二進制的迴應緩衝區, 但它的長度是有限的. 默認長度也是16KB, 受宏PROTO_REPLY_CHUNK_BYTES
的值限定client->reply
是一個對象緩衝區, 它是一個鏈表, 無容量限制, 該鏈表中掛的應該都是以 RAW 或 EMBSTR 編碼的 string 對象.咱們在addReply
中會看到, 老是試圖先把迴應數據先經過_addReplyToBuffer
添加到client->buf
中去, 若是不成功的話, 再把數據經過_addReplyObjectToList
掛在client->reply
這個鏈表中去.
這個也很好理解, 當client->buf
寫滿的時候, 再寫不下下一個addReply
要添加的數據時, 就會退而求其次, 將這個迴應數據先掛在client->reply
中去. client->buf
就是每次調用網絡IO時, 一次性寫入的數據量. 若是超過這個量了, 會分屢次寫回應.
如今問題來了: addReplyXXX
系列函數只是將回應數據寫向二進制緩衝區或鏈表隊列上, 並把客戶端添加到server->clients_pending_write
列表中去. 那麼, 究竟是什麼時機, 將數據回寫給客戶端呢?
這個實現就很是苟了!!
按常理思惟, 應該在整個 收包->解析->執行命令->寫回包至緩衝區或隊列
這個流程中, 在整個請求體被處理結束後, 給客戶端的數據鏈接的可寫事件綁定一個事件回調, 用於寫回包. 但Redis的實現, 並非這樣.
還記得, 在服務端啓動流程中, 開啓服務端事件循環的時候, 有這麼一段代碼:
int main(int argc, char ** argv) { //... aeSetBeforeSleepProc(server.el, beforeSleep); // 就是這裏 aeSetAfterSleepProc(server.el, afterSleep); aeMain(server.el); aeDeleteEventLoop(server.el); return 0; }
服務端的事件處理器, 有一個事前回調, 也有一個過後回調, 而處理向客戶端寫回包的代碼, 就在事前回調中. 來看這個beforeSleep
函數:
void beforeSleep(struct aeEventLoop * eventLoop) { // ... handleClientsWithPendingWrites(); // 就是在這裏, 處理全部客戶端的寫回包 // ... }
這是什麼操做? 這意味着服務端的事件處理器, 每次處理一個事件以前, 都要檢查一遍是否還有迴應須要給客戶端寫. 而爲何Redis在寫回包的時候, 不使用傳統的註冊可寫事件->在可寫事件回調中寫回包->寫成功後註銷可寫事件
這種經典套路, Redis在handleClientsWithPendingWrites
函數實現的註釋中, 給出了理由: 大意就是, 不使用事件註冊這種套路, 就避免了事件處理機制中會產生的系統調用開銷.
咱們從handleClientsWithPendingWrites
函數開始, 簡單的捋一下寫回包是怎麼實現的
int handleClientsWithPendingWrites(void) { // ... while((ln = listNext(&li))) { client * c = listNodeValue(ln); c->flags &= ~CLIENT_PENDING_WRITE; listDelNode(server.clients_pending_write, ln); // 將client->buf及client->reply中的數據寫至客戶端 if (writeToClient(c->fd, c, 0) == C_ERR) continue; // 若是client->reply中還有數據, 則藉助事件處理器來寫後續回包 // 正常狀況下, 在上面的writeToClient中, 該寫的數據已經全寫到客戶端了 // 僅有在網絡異常的狀況下, writeToClient處理寫回包失敗, 致使數據沒寫完時, 纔會進入這個分支 // 這個邏輯是正確的: 網絡有異常的狀況下, 經過註冊事件的形式, 待稍後再重試寫回包 // 網絡正常的狀況下, 一次性把全部數據都寫完 if (clienetHasPendingReplies(c)) { int ae_flags = AE_WRITABLE; if (server.aof_state == AOF_ON && server.aof_fsync == AOF_FSYNC_ALWAYS) { ae_flags |= AE_BARRIER; } if (aeCreateFileEvent(server.el, c->fd, ae_flags, sendReplyToClient, c) == AE_ERR) { freeClientAsync(c); } } } // ... }
writeToClient
函數中就寫的很直白了, 就是調用write
向客戶端寫回包. 若是client->reply
上還有數據, 也一個個的寫給客戶端. 這裏就不展現這個函數的實現了, 很簡單, 沒什麼可說的. sendReplyToClient
幾乎等同於writeToClient
, 只是將後者包裝成了符合事件處理回調函數簽名的形式.
寫回包過程當中有兩點須要注意:
writeToClient
因爲一些異常緣由(特別是網絡波動), 致使在寫client->reply
中的某個結點的數據失敗(write
報EAGAIN
時), writeToClient
是會返回 C_OK
的, 但實際上數據未發送完畢. 這種狀況下, 後續會再經過事件處理器, 來執行重試.writeToClient
中, 把數據發送結束了, 在函數末尾, 是會主動刪除掉對於client->fd
的WRITEABLE
事件的監聽的.至此, 從客戶端發送請求, 到服務端接收二進制數據, 解析二進制數據, 執行命令, 再寫回包的整個流程就結束了.
xxxCommand
一系列命令處理函數. 這部分命令處理函數分佈在t_hash.c
, t_list.c
, t_set.c
, t_string.c
, t_zset.c
與server.c
中, 分別是各類Redis Value Type相關的命令實現.server
. 就單機範疇的討論來講, clients
字段及其它相關字段, 持有着客戶端. el
字段持有着事件處理器, db
字段持有着數據庫el
與數據庫db
字段. 其中db
字段就是初始化了一坨dict
, 而後監聽服務端口.createSharedObjects()
函數. 這個函數中建立了大量的string對象, 用於全局複用