baiyanc++
首先看一張咱們很是熟悉的redis命令執行圖:
那麼思考這樣一個問題,當咱們鏈接了redis服務端以後,而後輸入並執行某條redis命令:如set key1 value1。這條命令到底是如何被髮送到redis服務端的,redis服務端又是如何解析,並做出相應處理,並返回執行成功的呢?redis
redis在TCP協議基礎之上,封裝了本身的一套協議規範,方便服務端與客戶端去接收與解析數據,劃清命令參數之間的邊界,方便最終對以TCP字節流傳輸的數據進行處理。下面咱們使用tcpdump來捕獲redis-cli發送命令時的數據包:算法
tcpdump port 6379 -i lo -X
這時,咱們在客戶端中輸入set key1 value1命令。在tcpdump中捕獲的數據包以下:
第一個是客戶端發送命令到redis服務端時的數據包,而第二個是redis服務端響應給客戶端的數據包。咱們首先只看第一個數據包,它從客戶端43856端口發送到redis服務端的6379端口。首先前20個字節是IP頭部,後32字節是TCP頭部(因爲TCP頭部後面存在可選項)。
咱們主要關注從「2a33」開始的數據信息,從這裏開始就是redis具體的數據格式了。從右邊對數據的一個ASCII碼翻譯也能夠看到set、key一、value1的字樣,中間還有一些用.表示的字符,那麼這裏,咱們根據抓包結果分析一下redis數據傳輸的協議格式。segmentfault
看到這裏,咱們是否可以發現如下規律:數組
- redis以"*"做爲標誌,表示命令的開始。在*後面緊跟的數字表明參數的個數(set key1 value1有3個參數因此爲3)
- redis以"$"做爲命令參數的開始,後面緊跟的數字表明參數的長度(如key1的長度爲4因此爲$4)
- redis以"rn"做爲參數之間的分隔符,方便解析TCP字節流數據時定位邊界位置
綜合來看,客戶端向服務端發送的redis數據包格式以下:緩存
*3 \r\n set \r\n $4 \r\n key1 \r\n $6 \r\n value1 \r\n
相比FastCGI協議,redis僅僅使用幾個分隔符和特殊字符,就完成了對命令的傳輸語法及數據格式的規範化,同時服務端經過其中定義好的分隔符,也可以方便高效地從字節流數據中解析並讀取出正確的數據。這種通訊協議簡單高效,可以知足redis對高性能的要求。安全
既然命令已經經過redis數據傳輸協議安全地送達到了服務端,那麼,服務端就要開始對傳輸過來的字節流數據進行處理啦。因爲咱們在協議中清晰地定義了每一個參數的邊界(\r\n),因此,redis服務端解析起來也很是輕鬆。網絡
redis是典型事件驅動程序。爲了提升單進程的redis的性能,redis採用IO多路複用技術來處理客戶端的命令請求。redis會在建立客戶端實例的時,指定服務端接收到客戶端命令請求的事件時,所要執行的事件處理函數:app
client *createClient(int fd) { client *c = zmalloc(sizeof(client)); if (fd != -1) { anetNonBlock(NULL,fd); //設置非阻塞 anetEnableTcpNoDelay(NULL,fd); //設置不採用Nagle算法,避免半包與粘包現象 if (server.tcpkeepalive) anetKeepAlive(NULL,fd,server.tcpkeepalive); //設置keep-alive //注意這裏建立了一個文件事件。當客戶端讀事件就緒的時候,回調readQueryFromClient()函數 if (aeCreateFileEvent(server.el,fd,AE_READABLE,readQueryFromClient, c) == AE_ERR) { close(fd); zfree(c); return NULL; } } ... }
爲了暫存客戶端請求到服務端的字節流數據,redis封裝了一個接收緩衝區,來緩存從套接字中讀取的數據。後續的命令處理流程從緩衝區中讀取命令數據並處理便可。緩衝區的好處在於不用一直維持讀寫套接字。在後續的流程中,咱們只須要從緩衝區中讀取數據,而不是仍從套接字中讀取。這樣就能夠提早釋放套接字,節省資源。緩衝區的創建與使用就是在以前講過的客戶端回調函數readQueryFromClient()中完成的:tcp
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { ... qblen = sdslen(c->querybuf); //獲取緩衝區長度 if (c->querybuf_peak < qblen) c->querybuf_peak = qblen; c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); //建立一個sds結構做爲緩衝區 nread = read(fd, c->querybuf+qblen, readlen); //從套接字中讀取數據到緩衝區中暫存 ... //真正地處理命令 processInputBufferAndReplicate(c); }
這段代碼建立並往緩衝區中寫入了字節流數據,而後調用processInputBufferAndReplicate()去真正地處理命令。processInputBufferAndReplicate()函數中只是簡單的調用了==processInputBuffer()函數。因爲咱們以前的緩衝區中已經有了客戶端發給服務端的字節流數據,因此咱們須要在這一層進行數據初步的篩選與處理:
void processInputBuffer(client *c) { // 若是緩衝區尚未處理完,繼續循環處理 while(c->qb_pos < sdslen(c->querybuf)) { ... // 對字節流數據進行定製化分發處理 if (c->reqtype == PROTO_REQ_INLINE) { //若是是INLINE類型的請求 if (processInlineBuffer(c) != C_OK) break; //調用processInlineBuffer解析緩衝區數據 } else if (c->reqtype == PROTO_REQ_MULTIBULK) {//若是是MULTIBULK類型的請求 if (processMultibulkBuffer(c) != C_OK) break; //調用processMultibulkBuffer解析緩衝區數據 } else { serverPanic("Unknown request type"); } // 開始處理具體的命令 if (c->argc == 0) { //命令參數爲0個,非法 resetClient(c); } else { //命令參數不爲0,合法 // 調用processCommand()真正處理命令 if (processCommand(c) == C_OK) { // ... } } } }
看到這裏,讀者可能會有些疑惑。什麼是INLINE、什麼是MULTIBULK?在redis中,有兩種請求命令類型:
- INLINE類型:簡單字符串格式,如ping命令
- MULTIBULK類型:字符串數組格式。如set、get等等大部分命令都是這種類型
這個函數其實就是一個分發器。因爲底層的字節流數據是無規則的,因此咱們須要根據客戶端的reqtype字段,去區分請求字節流數據屬於那種請求類型,進而分發到對應的函數中進行處理。因爲咱們常常執行的命令都是MULTIBULK類型,咱們也以MULTIBULK類型爲例。對於set、get這種MULTIBULK請求類型,會被分發到processMultibulkBuffer()函數中進行處理。
在開啓TCP的Nagle算法時,TCP會將多個redis命令請求的數據包合併或者拆分發送。這樣就會出如今一個數據包中命令不完整、或者一個數據包中包含多個命令的狀況。爲了解決這個問題,processMultibulkBuffer()函數保證,當只有在緩衝區中包含一個完整請求時,這個函數纔會成功解析完字節流中的命令參數,並返回成功狀態碼。不然,會break出外部的while循環,等待下一次事件循環再從套接字中讀取剩餘的數據,再進行對命令的解析。這樣就保證了redis協議中的數據的完整性,也保證了實際命令參數的完整性。
int processMultibulkBuffer(client *c) { while(c->multibulklen) { ... /* 讀取命令參數字節流 */ if (sdslen(c->querybuf)-c->qb_pos < (size_t)(c->bulklen+2)) { //若是$後面表明參數長度的數字與實際命令長度不匹配(+2的位置是\r\n),說明數據不完整,直接跳出循環,等待下一次讀取剩餘數據 break; } else { //命令完整,進行一些執行命令以前的初始化工做 if (c->qb_pos == 0 && c->bulklen >= PROTO_MBULK_BIG_ARG && sdslen(c->querybuf) == (size_t)(c->bulklen+2)) { c->argv[c->argc++] = createObject(OBJ_STRING,c->querybuf); sdsIncrLen(c->querybuf,-2); c->querybuf = sdsnewlen(SDS_NOINIT,c->bulklen+2); sdsclear(c->querybuf); } else { c->argv[c->argc++] = createStringObject(c->querybuf+c->qb_pos,c->bulklen); c->qb_pos += c->bulklen+2; } c->bulklen = -1; c->multibulklen--; //處理下一個命令參數 } } }
咱們回到外層。當咱們成功執行processMultibulkBuffer()函數以後,說明當前命令已經完整,能夠對命令進行處理了。咱們想一下,加入要咱們去設計根據不一樣的命令,調用不一樣的處理函數,從而完成不一樣的功能,咱們應該怎麼作呢?想了想,咱們能夠簡單寫出如下代碼:
if (command == "get") { doGetCommand(); //get命令處理函數 } else if (command == "set") { doSetCommand(); //set命令處理函數 } else { printf("非法命令") }
以上代碼很是簡單,只是根據咱們獲得的不一樣命令請求,分發到不一樣的命令處理函數中進行定製化處理。那麼redis其實也是一樣的道理,那究竟redis是怎麼作的呢:
int processCommand(client *c) { //若是是退出命令直接返回 if (!strcasecmp(c->argv[0]->ptr,"quit")) { addReply(c,shared.ok); c->flags |= CLIENT_CLOSE_AFTER_REPLY; return C_ERR; } //去字典裏查找命令,並把要執行的命令處理函數賦值到c結構體中的cmd字段 c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr); // 返回值校驗 if (!c->cmd) { //沒有找到該命令 flagTransaction(c); sds args = sdsempty(); int i; for (i=1; i < c->argc && sdslen(args) < 128; i++) args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr); addReplyErrorFormat(c,"unknown command `%s`, with args beginning with: %s", (char*)c->argv[0]->ptr, args); sdsfree(args); return C_OK; } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) || //命令參數不匹配 (c->argc < -c->cmd->arity)) { flagTransaction(c); addReplyErrorFormat(c,"wrong number of arguments for '%s' command", c->cmd->name); return C_OK; } // 真正執行命令 if (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand && c->cmd->proc != discardCommand && c->cmd->proc != multiCommand && c->cmd->proc != watchCommand) { queueMultiCommand(c); addReply(c,shared.queued); } else { //真正執行命令 call(c,CMD_CALL_FULL); //核心函數 c->woff = server.master_repl_offset; if (listLength(server.ready_keys)) handleClientsBlockedOnKeys(); } return C_OK; }
在這個函數中,最重要的就是lookupCommand()函數和call()函數的調用了。在redis中,全部命令都存儲在一個字典中,這個字典長這樣:
struct redisCommand redisCommandTable[] = { {"module",moduleCommand,-2,"as",0,NULL,0,0,0,0,0}, {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0}, {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0}, {"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0}, {"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0}, {"psetex",psetexCommand,4,"wm",0,NULL,1,1,1,0,0}, {"append",appendCommand,3,"wm",0,NULL,1,1,1,0,0}, {"strlen",strlenCommand,2,"rF",0,NULL,1,1,1,0,0}, {"del",delCommand,-2,"w",0,NULL,1,-1,1,0,0}, {"unlink",unlinkCommand,-2,"wF",0,NULL,1,-1,1,0,0}, {"exists",existsCommand,-2,"rF",0,NULL,1,-1,1,0,0}, {"setbit",setbitCommand,4,"wm",0,NULL,1,1,1,0,0}, {"getbit",getbitCommand,3,"rF",0,NULL,1,1,1,0,0}, {"bitfield",bitfieldCommand,-2,"wm",0,NULL,1,1,1,0,0}, {"setrange",setrangeCommand,4,"wm",0,NULL,1,1,1,0,0}, {"getrange",getrangeCommand,4,"r",0,NULL,1,1,1,0,0}, {"substr",getrangeCommand,4,"r",0,NULL,1,1,1,0,0}, {"incr",incrCommand,2,"wmF",0,NULL,1,1,1,0,0}, {"decr",decrCommand,2,"wmF",0,NULL,1,1,1,0,0}, {"mget",mgetCommand,-2,"rF",0,NULL,1,-1,1,0,0}, {"rpush",rpushCommand,-3,"wmF",0,NULL,1,1,1,0,0}, {"lpush",lpushCommand,-3,"wmF",0,NULL,1,1,1,0,0}, {"rpushx",rpushxCommand,-3,"wmF",0,NULL,1,1,1,0,0}, {"lpushx",lpushxCommand,-3,"wmF",0,NULL,1,1,1,0,0}, {"linsert",linsertCommand,5,"wm",0,NULL,1,1,1,0,0}, {"rpop",rpopCommand,2,"wF",0,NULL,1,1,1,0,0}, {"lpop",lpopCommand,2,"wF",0,NULL,1,1,1,0,0}, {"brpop",brpopCommand,-3,"ws",0,NULL,1,-2,1,0,0}, {"brpoplpush",brpoplpushCommand,4,"wms",0,NULL,1,2,1,0,0}, {"blpop",blpopCommand,-3,"ws",0,NULL,1,-2,1,0,0}, {"llen",llenCommand,2,"rF",0,NULL,1,1,1,0,0}, {"lindex",lindexCommand,3,"r",0,NULL,1,1,1,0,0}, {"lset",lsetCommand,4,"wm",0,NULL,1,1,1,0,0}, {"lrange",lrangeCommand,4,"r",0,NULL,1,1,1,0,0}, {"ltrim",ltrimCommand,4,"w",0,NULL,1,1,1,0,0}, {"lrem",lremCommand,4,"w",0,NULL,1,1,1,0,0}, ... };
咱們能夠看到,這個字典是全部命令的集合,咱們調用lookupCommand就是從這裏獲取命令及命令的相關信息的。它是一個結構體數組,包含全部命令名稱、命令處理函數、參數個數、以及種種標記。其實這裏就至關於一個配置信息的維護,以及命令道處理函數名稱的映射關係,從而很好的解決了咱們一開始使用if-else來分發命令處理函數的難以維護、可擴展性差的問題。
在咱們成功在字典中找到一個命令的處理函數以後,咱們只須要去調用相應的命令處理函數就好啦。上面最後的call()函數中就對相應的命令處理函數進行了調用,並返回調用結果給客戶端。好比,setCommand()就是set命令的實際處理函數:
void setCommand(client *c) { int j; robj *expire = NULL; int unit = UNIT_SECONDS; int flags = OBJ_SET_NO_FLAGS; for (j = 3; j < c->argc; j++) { char *a = c->argv[j]->ptr; robj *next = (j == c->argc-1) ? NULL : c->argv[j+1]; if ((a[0] == 'n' || a[0] == 'N') && (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' && !(flags & OBJ_SET_XX)) { flags |= OBJ_SET_NX; } else if ((a[0] == 'x' || a[0] == 'X') && (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' && !(flags & OBJ_SET_NX)) { flags |= OBJ_SET_XX; } else if ((a[0] == 'e' || a[0] == 'E') && (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' && !(flags & OBJ_SET_PX) && next) { flags |= OBJ_SET_EX; unit = UNIT_SECONDS; expire = next; j++; } else if ((a[0] == 'p' || a[0] == 'P') && (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' && !(flags & OBJ_SET_EX) && next) { flags |= OBJ_SET_PX; unit = UNIT_MILLISECONDS; expire = next; j++; } else { addReply(c,shared.syntaxerr); return; } } c->argv[2] = tryObjectEncoding(c->argv[2]); setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL); }
這個函數首先對NX、EX參數進行了判斷及處理,最終調用了setGenericCommand(),來執行set命令的通用邏輯部分:
void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) { long long milliseconds = 0; /* initialized to avoid any harmness warning */ if (expire) { if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != C_OK) return; if (milliseconds <= 0) { addReplyErrorFormat(c,"invalid expire time in %s",c->cmd->name); return; } if (unit == UNIT_SECONDS) milliseconds *= 1000; } if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db,key) != NULL) || (flags & OBJ_SET_XX && lookupKeyWrite(c->db,key) == NULL)) { addReply(c, abort_reply ? abort_reply : shared.nullbulk); return; } setKey(c->db,key,val); server.dirty++; if (expire) setExpire(c,c->db,key,mstime()+milliseconds); notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id); if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC, "expire",key,c->db->id); addReply(c, ok_reply ? ok_reply : shared.ok); }
最終會調用addReply()通用返回函數,應該是要把執行結果返回給客戶端了。咱們看看該函數裏面作了些什麼:
void addReply(client *c, robj *obj) { if (prepareClientToWrite(c) != C_OK) return; if (sdsEncodedObject(obj)) { if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK) _addReplyStringToList(c,obj->ptr,sdslen(obj->ptr)); } else if (obj->encoding == OBJ_ENCODING_INT) { char buf[32]; size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr); if (_addReplyToBuffer(c,buf,len) != C_OK) _addReplyStringToList(c,buf,len); } else { serverPanic("Wrong obj->encoding in addReply()"); } }
咱們仔細閱讀這段代碼,好像並無找到執行結果是何時返回給客戶端的。在這個函數中,只是將返回結果添加到了輸出緩衝區中,一個命令就執行完了。那麼到底是何時返回的呢?是否還記得在介紹開啓事件循環時,提到函數beforesleep()會在每次事件循環阻塞等待文件事件以前執行,主要執行一些不是很費時的操做,好比過時鍵刪除操做,向客戶端返回命令回覆等。這樣,就能夠減節省返回執行結果時的網絡通訊開銷,將同一個客戶端上的多個命令的屢次返回,對多個命令作一個緩存,最終一次性統一返回,減小了返回的次數,提升了性能。
執行完set key1 value1命令以後,咱們獲得了一個"OK"的返回,表明命令執行成功。其實咱們仔細觀察上面返回的第二個數據包,其實底層是一個"+OK"的返回值。那麼爲何要有一個+號呢?由於除了咱們上面講過的set命令,還有get命令、lpush命令等等,他們的返回值都是不同的。get會返回數據集合、lpush會返回一個整數,表明列表的長度等等。一個字符串的表示是遠遠不能知足須要的。因此在redis通訊協議中,一共定義了五種返回值結構。客戶端經過每種返回結構的第一個字符,來判斷是哪一種類型的返回值:
- 狀態回覆:第一個字符是「+」;例如,SET命令執行完畢會向客戶端返回「+OK\r\n」。
- 錯誤回覆:第一個字符是「-」;例如,當客戶端請求命令不存在時,會向客戶端返回「-ERR unknown command 'testcmd'」。
- 整數回覆:第一個字符是「:」;例如,INCR命令執行完畢向客戶端返回「:100\r\n」。
- 批量回復:第一個字符是「$」;例如,GET命令查找鍵向客戶端返回結果「$5\r\nhello\r\n」,其中$5表示返回字符串長度。
- 多條批量回復:第一個字符是「」;例如,LRANGE命令可能會返回多個多個值,格式爲「3\r\n$6\r\nvalue1\r\n$6rnvalue2rn$6\r\nvalue3\r\n」,與命令請求協議格式相同,「\*3」表示返回值數目,「$6」表示當前返回值字符串長度,多個返回值用「\r\n」分隔開。
咱們執行set命令就是第一種類型,即狀態回覆。客戶端經過+號,就可以知道這是狀態回覆,從而就知道該如何讀取後面的字節流內容了。
至此,咱們就走完了一個redis命令的完整生命週期,同時也瞭解了redis通訊協議的格式與規範。接下來,我將會深刻每個命令的實現,你們加油。