在上一篇文章中《Redis 命令執行過程(上)》中,咱們首先了解 Redis 命令執行的總體流程,而後細緻分析了從 Redis 啓動到創建 socket 鏈接,再到讀取 socket 數據到輸入緩衝區,解析命令,執行命令等過程的原理和實現細節。接下來,咱們來具體看一下 set 和 get 命令的實現細節和如何將命令結果經過輸出緩衝區和 socket 發送給 Redis 客戶端。redis
前文講到 processCommand 方法會從輸入緩衝區中解析出對應的 redisCommand,而後調用 call 方法執行解析出來的 redisCommand的 proc 方法。不一樣命令的的 proc 方法是不一樣的,好比說名爲 set 的 redisCommand 的 proc 是 setCommand 方法,而 get 的則是 getCommand 方法。經過這種形式,實際上實如今Java 中特別常見的多態策略。數據庫
void call(client *c, int flags) { .... c->cmd->proc(c); .... } // redisCommand結構體 struct redisCommand { char *name; // 對應方法的函數範式 redisCommandProc *proc; .... // 其餘定義 }; // 使用 typedef 定義的別名 typedef void redisCommandProc(client *c); // 不一樣的命令,調用不一樣的方法。 struct redisCommand redisCommandTable[] = { {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0}, {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0}, {"hmset",hsetCommand,-4,"wmF",0,NULL,1,1,1,0,0}, .... // 全部的 redis 命令都有 }
setCommand 會判斷set命令是否攜帶了nx、xx、ex或者px等可選參數,而後調用setGenericCommand命令。咱們直接來看 setGenericCommand 方法。緩存
setGenericCommand 方法的處理邏輯以下所示:異步
// t_string.c void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) { long long milliseconds = 0; /** * 設置了過時時間;expire是robj類型,獲取整數值 */ if (expire) { if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != C_OK) return; if (milliseconds <= 0) { addReplyErrorFormat(c,"invalid expire time in %s",c->cmd->name); return; } if (unit == UNIT_SECONDS) milliseconds *= 1000; } /** * NX,key存在時直接返回;XX,key不存在時直接返回 * lookupKeyWrite 是在對應的數據庫中尋找鍵值是否存在 */ if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db,key) != NULL) || (flags & OBJ_SET_XX && lookupKeyWrite(c->db,key) == NULL)) { addReply(c, abort_reply ? abort_reply : shared.nullbulk); return; } /** * 添加到數據字典 */ setKey(c->db,key,val); server.dirty++; /** * 過時時間添加到過時字典 */ if (expire) setExpire(c,c->db,key,mstime()+milliseconds); /** * 鍵空間通知 */ notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id); if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC, "expire",key,c->db->id); /** * 返回值,addReply 在 get 命令時再具體講解 */ addReply(c, ok_reply ? ok_reply : shared.ok); }
具體 setKey 和 setExpire 的方法實現咱們這裏就不細講,其實就是將鍵值添加到db的 dict 數據哈希表中,將鍵和過時時間添加到 expires 哈希表中,以下圖所示。socket
接下來看 getCommand 的具體實現,一樣的,它底層會調用 getGenericCommand 方法。函數
getGenericCommand 方法會調用 lookupKeyReadOrReply 來從 dict 數據哈希表中查找對應的 key值。若是找不到,則直接返回 C_OK;若是找到了,則根據值的類型,調用 addReply 或者 addReplyBulk 方法將值添加到輸出緩衝區中。oop
int getGenericCommand(client *c) { robj *o; // 調用 lookupKeyReadOrReply 從數據字典中查找對應的鍵 if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL) return C_OK; // 若是是string類型,調用 addReply 單行返回。若是是其餘對象類型,則調用 addReplyBulk if (o->type != OBJ_STRING) { addReply(c,shared.wrongtypeerr); return C_ERR; } else { addReplyBulk(c,o); return C_OK; } }
lookupKeyReadWithFlags 會從 redisDb 中查找對應的鍵值對,它首先會調用 expireIfNeeded判斷鍵是否過時而且須要刪除,若是爲過時,則調用 lookupKey 方法從 dict 哈希表中查找並返回。具體解釋能夠看代碼中的詳細註釋優化
/* * 查找key的讀操做,若是key找不到或者已經邏輯上過時返回 NULL,有一些反作用 * 1 若是key到達過時時間,它會被設備爲過時,而且刪除 * 2 更新key的最近訪問時間 * 3 更新全局緩存擊中機率 * flags 有兩個值: LOOKUP_NONE 通常都是這個;LOOKUP_NOTOUCH 不修改最近訪問時間 */ robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) { // db.c robj *val; // 檢查鍵是否過時 if (expireIfNeeded(db,key) == 1) { .... // master和 slave 對這種狀況的特殊處理 } // 查找鍵值字典 val = lookupKey(db,key,flags); // 更新全局緩存命中率 if (val == NULL) server.stat_keyspace_misses++; else server.stat_keyspace_hits++; return val; }
Redis 在調用查找鍵值系列方法前都會先調用 expireIfNeeded 來判斷鍵是否過時,而後根據 Redis 是否配置了懶刪除來進行同步刪除或者異步刪除。關於鍵刪除的細節能夠查看《詳解 Redis 內存管理機制和實現》一文。lua
在判斷鍵釋放過時的邏輯中有兩個特殊狀況:spa
/* * 在調用 lookupKey*系列方法前調用該方法。 * 若是是slave: * slave 並不主動過時刪除key,可是返回值仍然會返回鍵已經被刪除。 * master 若是key過時了,會主動刪除過時鍵,而且觸發 AOF 和同步操做。 * 返回值爲0表示鍵仍然有效,不然返回1 */ int expireIfNeeded(redisDb *db, robj *key) { // db.c // 獲取鍵的過時時間 mstime_t when = getExpire(db,key); mstime_t now; if (when < 0) return 0; /* * 若是當前是在執行lua腳本,根據其原子性,整個執行過時中時間都按照其開始執行的那一刻計算 * 也就是說lua執行時未過時的鍵,在它整個執行過程當中也都不會過時。 */ now = server.lua_caller ? server.lua_time_start : mstime(); // slave 直接返回鍵是否過時 if (server.masterhost != NULL) return now > when; // master時,鍵未過時直接返回 if (now <= when) return 0; // 鍵過時,刪除鍵 server.stat_expiredkeys++; // 觸發命令傳播 propagateExpire(db,key,server.lazyfree_lazy_expire); // 和鍵空間事件 notifyKeyspaceEvent(NOTIFY_EXPIRED, "expired",key,db->id); // 根據是否懶刪除,調用不一樣的函數 return server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) : dbSyncDelete(db,key); }
lookupKey 方法則是經過 dictFind 方法從 redisDb 的 dict 哈希表中查找鍵值,若是能找到,則根據 redis 的 maxmemory_policy 策略來判斷是更新 lru 的最近訪問時間,仍是調用 updateFU 方法更新其餘指標,這些指標能夠在後續內存不足時對鍵值進行回收。
robj *lookupKey(redisDb *db, robj *key, int flags) { // dictFind 根據 key 獲取字典的entry dictEntry *de = dictFind(db->dict,key->ptr); if (de) { // 獲取 value robj *val = dictGetVal(de); // 當處於 rdb aof 子進程複製階段或者 flags 不是 LOOKUP_NOTOUCH if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 && !(flags & LOOKUP_NOTOUCH)) { // 若是是 MAXMEMORY_FLAG_LFU 則進行相應操做 if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { updateLFU(val); } else { // 更新最近訪問時間 val->lru = LRU_CLOCK(); } } return val; } else { return NULL; } }
在全部的 redisCommand 執行的最後,通常都會調用 addReply 方法進行結果返回,咱們的分析也來到了 Redis 命令執行的返回數據階段。
addReply 方法作了兩件事情:
void addReply(client *c, robj *obj) { if (prepareClientToWrite(c) != C_OK) return; if (sdsEncodedObject(obj)) { // 須要將響應內容添加到output buffer中。整體思路是,先嚐試向固定buffer添加,添加失敗的話,在嘗試添加到響應鏈表 if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK) _addReplyObjectToList(c,obj); } else if (obj->encoding == OBJ_ENCODING_INT) { .... // 特殊狀況的優化 } else { serverPanic("Wrong obj->encoding in addReply()"); } }
prepareClientToWrite 首先判斷了當前 client是否須要返回數據:
接着若是這個 client 還未處於延遲等待寫入 (CLIENT_PENDING_WRITE)的狀態,則將其設置爲該狀態,並將其加入到 Redis 的等待寫入返回值客戶端隊列中,也就是 clients_pending_write隊列。
int prepareClientToWrite(client *c) { // 若是是 lua client 則直接OK if (c->flags & (CLIENT_LUA|CLIENT_MODULE)) return C_OK; // 客戶端發來過 REPLY OFF 或者 SKIP 命令,不須要發送返回值 if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR; // master 做爲client 向 slave 發送命令,不須要接收返回值 if ((c->flags & CLIENT_MASTER) && !(c->flags & CLIENT_MASTER_FORCE_REPLY)) return C_ERR; // AOF loading 時的假client 不須要返回值 if (c->fd <= 0) return C_ERR; // 將client加入到等待寫入返回值隊列中,下次事件週期會進行返回值寫入。 if (!clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_WRITE) && (c->replstate == REPL_STATE_NONE || (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack))) { // 設置標誌位而且將client加入到 clients_pending_write 隊列中 c->flags |= CLIENT_PENDING_WRITE; listAddNodeHead(server.clients_pending_write,c); } // 表示已經在排隊,進行返回數據 return C_OK; }
Redis 將存儲等待返回的響應數據的空間,也就是輸出緩衝區分紅兩部分,一個固定大小的 buffer 和一個響應內容數據的鏈表。在鏈表爲空而且 buffer 有足夠空間時,則將響應添加到 buffer 中。若是 buffer 滿了則建立一個節點追加到鏈表上。_addReplyToBuffer 和 _addReplyObjectToList 就是分別向這兩個空間寫數據的方法。
固定buffer和響應鏈表,總體上構成了一個隊列。這麼組織的好處是,既能夠節省內存,不需一開始預先分配大塊內存,而且能夠避免頻繁分配、回收內存。
上面就是響應內容寫入輸出緩衝區的過程,下面看一下將數據從輸出緩衝區寫入 socket 的過程。
prepareClientToWrite 函數,將客戶端加入到了Redis 的等待寫入返回值客戶端隊列中,也就是 clients_pending_write 隊列。請求處理的事件處理邏輯就結束了,等待 Redis 下一次事件循環處理時,將響應從輸出緩衝區寫入到 socket 中。
在 《Redis 事件機制詳解》
一文中咱們知道,Redis 在兩次事件循環之間會調用 beforeSleep 方法處理一些事情,而對 clients_pending_write 列表的處理就在其中。
下面的 aeMain 方法就是 Redis 事件循環的主邏輯,能夠看到每次循環時都會調用 beforesleep 方法。
void aeMain(aeEventLoop *eventLoop) { // ae.c eventLoop->stop = 0; while (!eventLoop->stop) { /* 若是有須要在事件處理前執行的函數,那麼執行它 */ if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); /* 開始處理事件*/ aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); } }
beforeSleep 函數會調用 handleClientsWithPendingWrites 函數來處理 clients_pending_write 列表。
handleClientsWithPendingWrites 方法會遍歷 clients_pending_write 列表,對於每一個 client 都會先調用 writeToClient 方法來嘗試將返回數據從輸出緩存區寫入到 socekt中,若是還未寫完,則只能調用 aeCreateFileEvent 方法來註冊一個寫數據事件處理器 sendReplyToClient,等待 Redis 事件機制的再次調用。
這樣的好處是對於返回數據較少的客戶端,不須要麻煩的註冊寫數據事件,等待事件觸發再寫數據到 socket,而是在下一次事件循環週期就直接將數據寫到 socket中,加快了數據返回的響應速度。
可是從這裏也會發現,若是 clients_pending_write 隊列過長,則處理時間也會好久,阻塞正常的事件響應處理,致使 Redis 後續命令延時增長。
// 直接將返回值寫到client的輸出緩衝區中,不須要進行系統調用,也不須要註冊寫事件處理器 int handleClientsWithPendingWrites(void) { listIter li; listNode *ln; // 獲取系統延遲寫隊列的長度 int processed = listLength(server.clients_pending_write); listRewind(server.clients_pending_write,&li); // 依次處理 while((ln = listNext(&li))) { client *c = listNodeValue(ln); c->flags &= ~CLIENT_PENDING_WRITE; listDelNode(server.clients_pending_write,ln); // 將緩衝值寫入client的socket中,若是寫完,則跳過以後的操做。 if (writeToClient(c->fd,c,0) == C_ERR) continue; // 還有數據未寫入,只能註冊寫事件處理器了 if (clientHasPendingReplies(c)) { int ae_flags = AE_WRITABLE; if (server.aof_state == AOF_ON && server.aof_fsync == AOF_FSYNC_ALWAYS) { ae_flags |= AE_BARRIER; } // 註冊寫事件處理器 sendReplyToClient,等待執行 if (aeCreateFileEvent(server.el, c->fd, ae_flags, sendReplyToClient, c) == AE_ERR) { freeClientAsync(c); } } } return processed; }
sendReplyToClient 方法其實也會調用 writeToClient 方法,該方法就是將輸出緩衝區中的 buf 和 reply 列表中的數據都儘量多的寫入到對應的 socket中。
// 將輸出緩衝區中的數據寫入socket,若是還有數據未處理則返回C_OK int writeToClient(int fd, client *c, int handler_installed) { ssize_t nwritten = 0, totwritten = 0; size_t objlen; sds o; // 仍然有數據未寫入 while(clientHasPendingReplies(c)) { // 若是緩衝區有數據 if (c->bufpos > 0) { // 寫入到 fd 表明的 socket 中 nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen); if (nwritten <= 0) break; c->sentlen += nwritten; // 統計本次一共輸出了多少子節 totwritten += nwritten; // buffer中的數據已經發送,則重置標誌位,讓響應的後續數據寫入buffer if ((int)c->sentlen == c->bufpos) { c->bufpos = 0; c->sentlen = 0; } } else { // 緩衝區沒有數據,從reply隊列中拿 o = listNodeValue(listFirst(c->reply)); objlen = sdslen(o); if (objlen == 0) { listDelNode(c->reply,listFirst(c->reply)); continue; } // 將隊列中的數據寫入 socket nwritten = write(fd, o + c->sentlen, objlen - c->sentlen); if (nwritten <= 0) break; c->sentlen += nwritten; totwritten += nwritten; // 若是寫入成功,則刪除隊列 if (c->sentlen == objlen) { listDelNode(c->reply,listFirst(c->reply)); c->sentlen = 0; c->reply_bytes -= objlen; if (listLength(c->reply) == 0) serverAssert(c->reply_bytes == 0); } } // 若是輸出的字節數量已經超過NET_MAX_WRITES_PER_EVENT限制,break if (totwritten > NET_MAX_WRITES_PER_EVENT && (server.maxmemory == 0 || zmalloc_used_memory() < server.maxmemory) && !(c->flags & CLIENT_SLAVE)) break; } server.stat_net_output_bytes += totwritten; if (nwritten == -1) { if (errno == EAGAIN) { nwritten = 0; } else { serverLog(LL_VERBOSE, "Error writing to client: %s", strerror(errno)); freeClient(c); return C_ERR; } } if (!clientHasPendingReplies(c)) { c->sentlen = 0; //若是內容已經所有輸出,刪除事件處理器 if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); // 數據所有返回,則關閉client和鏈接 if (c->flags & CLIENT_CLOSE_AFTER_REPLY) { freeClient(c); return C_ERR; } } return C_OK; }