接着上一篇Redis源碼閱讀筆記-命令的接收和執行過程(一),主要寫processCommand()
的調用過程,和call()
是如何調用各個命令的執行函數的。redis
processCommand()
processCommand()
主要做用是判斷命令的類型和簡單判斷參數數量,並對機器狀態(內存限制,磁盤,主從狀態等)和命令類型進行判斷,看是否能調用命令,最後調用call()
來真正調用各命令的執行函數。數據庫
// server.c /* If this function gets called we already read a whole * command, arguments are in the client argv/argc fields. * processCommand() execute the command or prepare the * server for a bulk read from the client. * * If C_OK is returned the client is still alive and valid and * other operations can be performed by the caller. Otherwise * if C_ERR is returned the client was destroyed (i.e. after QUIT). */ int processCommand(client *c) { /* The QUIT command is handled separately. Normal command procs will * go through checking for replication and QUIT will cause trouble * when FORCE_REPLICATION is enabled and would be implemented in * a regular command proc. */ // 檢查是不是退出命令 if (!strcasecmp(c->argv[0]->ptr,"quit")) { addReply(c,shared.ok); c->flags |= CLIENT_CLOSE_AFTER_REPLY; return C_ERR; } /* Now lookup the command and check ASAP about trivial error conditions * such as wrong arity, bad command name and so forth. */ // 從命令字典中判斷c->argv[0]->ptr中是不是正確的命令 // 命令集保存在server.commands中,這是一個redis的字典結構 // 在initServerConfig()的時候經過populateCommandTable()函數將命令集添加進字典中 c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr); // c->cmd 是一個redisCommand的結構,其中proc是一個函數指針,指向對應命令的操做 if (!c->cmd) { // 命令不在命令集中,返回錯誤信息給客戶端 flagTransaction(c); sds args = sdsempty(); int i; for (i=1; i < c->argc && sdslen(args) < 128; i++) args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr); addReplyErrorFormat(c,"unknown command `%s`, with args beginning with: %s", (char*)c->argv[0]->ptr, args); sdsfree(args); return C_OK; } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) || (c->argc < -c->cmd->arity)) { // 命令在命令集中 // 可是命令須要參數,且argc中的參數數量不等於命令所需的,或者少於所須要 // 返回錯誤信息給客戶端 flagTransaction(c); addReplyErrorFormat(c,"wrong number of arguments for '%s' command", c->cmd->name); return C_OK; } /* Check if the user is authenticated */ // 檢查客戶端鏈接是否已經登陸驗證(設置了須要驗證的話) if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand) { flagTransaction(c); addReply(c,shared.noautherr); return C_OK; } /* If cluster is enabled perform the cluster redirection here. * However we don't perform the redirection if: * 1) The sender of this command is our master. * 2) The command has no key arguments. */ // 若是啓動了集羣,則進行集羣的重定向 // 下面兩種狀況不重定向: // 1. 命令時Master節點發送過來的 // 2. 命令沒有關鍵參數 if (server.cluster_enabled && // 集羣是否啓動 !(c->flags & CLIENT_MASTER) && // 非Master節點的命令 !(c->flags & CLIENT_LUA && server.lua_caller->flags & CLIENT_MASTER) && // 非Mater節點的lua函數 !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 && c->cmd->proc != execCommand)) // 命令有關鍵參數 { int hashslot; int error_code; // getNodeByQuery獲取爲該命令服務的節點,並獲取錯誤代碼和哈希槽 clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc, &hashslot,&error_code); if (n == NULL || n != server.cluster->myself) { if (c->cmd->proc == execCommand) { discardTransaction(c); } else { flagTransaction(c); } // 命令重定向到節點 clusterRedirectClient(c,n,hashslot,error_code); return C_OK; } } /* Handle the maxmemory directive. * * First we try to free some memory if possible (if there are volatile * keys in the dataset). If there are not the only thing we can do * is returning an error. */ // 若是有限定最大使用內存 // 則先嚐試釋放內存,若是內存不足則返回OOM錯誤 if (server.maxmemory) { // 檢查和嘗試釋放內存 // 原理是,zmalloc.c中會有一個used_memory的全局變量,來記錄redis申請了多少內存 // PS: AOF的緩衝區和slaves的輸出緩衝並不會算入已使用內存中 // 當內存不夠時,freeMemoryIfNeeded()會根據設置的最大內存規則區嘗試釋放內存 int retval = freeMemoryIfNeeded(); /* freeMemoryIfNeeded may flush slave output buffers. This may result * into a slave, that may be the active client, to be freed. */ if (server.current_client == NULL) return C_ERR; /* It was impossible to free enough memory, and the command the client * is trying to execute is denied during OOM conditions? Error. */ if ((c->cmd->flags & CMD_DENYOOM) && retval == C_ERR) { flagTransaction(c); addReply(c, shared.oomerr); return C_OK; } } /* Don't accept write commands if there are problems persisting on disk * and if this is a master instance. */ // 同時知足3個條件: // 1. 磁盤存在問題,aof等寫操做失敗 // 2. 本機是master主機 // 3. 命令屬於寫操做 // 則會返回錯誤 if (((server.stop_writes_on_bgsave_err && server.saveparamslen > 0 && server.lastbgsave_status == C_ERR) || server.aof_last_write_status == C_ERR) && server.masterhost == NULL && (c->cmd->flags & CMD_WRITE || c->cmd->proc == pingCommand)) { flagTransaction(c); if (server.aof_last_write_status == C_OK) addReply(c, shared.bgsaveerr); else addReplySds(c, sdscatprintf(sdsempty(), "-MISCONF Errors writing to the AOF file: %s\r\n", strerror(server.aof_last_write_errno))); return C_OK; } /* Don't accept write commands if there are not enough good slaves and * user configured the min-slaves-to-write option. */ // 若是同時知足 // 1. 本機是master // 2. 用戶配置了 min-slaves-to-write 的選項 // 3. 命令屬於寫操做 // 4. 可用的 slaves 數量少於 repl_min_slaves_to_write 所須要的數量 // 則返回錯誤 if (server.masterhost == NULL && server.repl_min_slaves_to_write && server.repl_min_slaves_max_lag && c->cmd->flags & CMD_WRITE && server.repl_good_slaves_count < server.repl_min_slaves_to_write) { flagTransaction(c); addReply(c, shared.noreplicaserr); return C_OK; } /* Don't accept write commands if this is a read only slave. But * accept write commands if this is our master. */ // 若是同時知足 // 1. 本機是 slave // 2. 本機是 只讀 的slave // 3. client不是來自 master // 4. client的命令屬於寫操做 // 則返回錯誤 if (server.masterhost && server.repl_slave_ro && !(c->flags & CLIENT_MASTER) && c->cmd->flags & CMD_WRITE) { addReply(c, shared.roslaveerr); return C_OK; } /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */ // 若是client是消息訂閱的鏈接 // 只容許如下操做: // * ping // * subscribe // * unsubscribe // * psubscribe // * punsubscribe if (c->flags & CLIENT_PUBSUB && c->cmd->proc != pingCommand && c->cmd->proc != subscribeCommand && c->cmd->proc != unsubscribeCommand && c->cmd->proc != psubscribeCommand && c->cmd->proc != punsubscribeCommand) { addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context"); return C_OK; } /* Only allow INFO and SLAVEOF when slave-serve-stale-data is no and * we are a slave with a broken link with master. */ if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED && server.repl_serve_stale_data == 0 && !(c->cmd->flags & CMD_STALE)) { // // 若是本機是與master斷開的slave時 flagTransaction(c); addReply(c, shared.masterdownerr); return C_OK; } /* Loading DB? Return an error if the command has not the * CMD_LOADING flag. */ if (server.loading && !(c->cmd->flags & CMD_LOADING)) { addReply(c, shared.loadingerr); return C_OK; } /* Lua script too slow? Only allow a limited number of commands. */ // 當lua腳本很慢的時候,只容許部分操做 if (server.lua_timedout && c->cmd->proc != authCommand && c->cmd->proc != replconfCommand && !(c->cmd->proc == shutdownCommand && c->argc == 2 && tolower(((char*)c->argv[1]->ptr)[0]) == 'n') && !(c->cmd->proc == scriptCommand && c->argc == 2 && tolower(((char*)c->argv[1]->ptr)[0]) == 'k')) { flagTransaction(c); addReply(c, shared.slowscripterr); return C_OK; } /* Exec the command */ // 執行各個命令 if (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand && c->cmd->proc != discardCommand && c->cmd->proc != multiCommand && c->cmd->proc != watchCommand) { // 若是是client屬於multi,且命令不是(exec,discard,multi,watch),則將命令加入到隊列中 queueMultiCommand(c); addReply(c,shared.queued); } else { // 執行命令 call(c,CMD_CALL_FULL); c->woff = server.master_repl_offset; if (listLength(server.ready_keys)) handleClientsBlockedOnLists(); } return C_OK; }
call()
call()
是Redis中執行命令的核心函數。服務器
// server.c /* Call() is the core of Redis execution of a command. * * The following flags can be passed: * CMD_CALL_NONE No flags. * CMD_CALL_SLOWLOG Check command speed and log in the slow log if needed. * CMD_CALL_STATS Populate command stats. * CMD_CALL_PROPAGATE_AOF Append command to AOF if it modified the dataset * or if the client flags are forcing propagation. * CMD_CALL_PROPAGATE_REPL Send command to salves if it modified the dataset * or if the client flags are forcing propagation. * CMD_CALL_PROPAGATE Alias for PROPAGATE_AOF|PROPAGATE_REPL. * CMD_CALL_FULL Alias for SLOWLOG|STATS|PROPAGATE. * * The exact propagation behavior depends on the client flags. * Specifically: * * 1. If the client flags CLIENT_FORCE_AOF or CLIENT_FORCE_REPL are set * and assuming the corresponding CMD_CALL_PROPAGATE_AOF/REPL is set * in the call flags, then the command is propagated even if the * dataset was not affected by the command. * 2. If the client flags CLIENT_PREVENT_REPL_PROP or CLIENT_PREVENT_AOF_PROP * are set, the propagation into AOF or to slaves is not performed even * if the command modified the dataset. * * Note that regardless of the client flags, if CMD_CALL_PROPAGATE_AOF * or CMD_CALL_PROPAGATE_REPL are not set, then respectively AOF or * slaves propagation will never occur. * * Client flags are modified by the implementation of a given command * using the following API: * * forceCommandPropagation(client *c, int flags); * preventCommandPropagation(client *c); * preventCommandAOF(client *c); * preventCommandReplication(client *c); * */ void call(client *c, int flags) { long long dirty, start, duration; int client_old_flags = c->flags; /* Sent the command to clients in MONITOR mode, only if the commands are * not generated from reading an AOF. */ // 將命令發送到monitor的客戶端(當server正在讀AOF的數據或者客戶端的命令設置了CMD_SKIP_MONITOR或CMD_ADMIN) // PS: MONITOR, 實時打印出 Redis 服務器接收到的命令,調試用 if (listLength(server.monitors) && !server.loading && !(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN))) { replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc); } /* Initialization: clear the flags that must be set by the command on * demand, and initialize the array for additional commands propagation. */ c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP); // 初始化also_propagate,部分命令會調用also_propagate()將一些AOF的傳播函數添加進server.also_propagate中, // 而後在call()的末尾處調用 redisOpArray prev_also_propagate = server.also_propagate; redisOpArrayInit(&server.also_propagate); /* Call the command. */ dirty = server.dirty; start = ustime(); // 執行命令 c->cmd->proc(c); duration = ustime()-start; // 命令的執行時間 dirty = server.dirty-dirty; // 用來記錄DB是否被修改了 if (dirty < 0) dirty = 0; /* When EVAL is called loading the AOF we don't want commands called * from Lua to go into the slowlog or to populate statistics. */ // 當正在加載AOF,且是執行lua時 // 不進入slow log 和 不統計數據 if (server.loading && c->flags & CLIENT_LUA) flags &= ~(CMD_CALL_SLOWLOG | CMD_CALL_STATS); /* If the caller is Lua, we want to force the EVAL caller to propagate * the script if the command flag or client flag are forcing the * propagation. */ // 若是是lua腳本,強制repl 和 aof if (c->flags & CLIENT_LUA && server.lua_caller) { if (c->flags & CLIENT_FORCE_REPL) server.lua_caller->flags |= CLIENT_FORCE_REPL; if (c->flags & CLIENT_FORCE_AOF) server.lua_caller->flags |= CLIENT_FORCE_AOF; } /* Log the command into the Slow log if needed, and populate the * per-command statistics that we show in INFO commandstats. */ // 記錄slow log if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) { char *latency_event = (c->cmd->flags & CMD_FAST) ? "fast-command" : "command"; latencyAddSampleIfNeeded(latency_event,duration/1000); slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration); } if (flags & CMD_CALL_STATS) { c->lastcmd->microseconds += duration; c->lastcmd->calls++; } /* Propagate the command into the AOF and replication link */ // 傳播數據庫的變更到AOF或者主從複製中 if (flags & CMD_CALL_PROPAGATE && (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP) { int propagate_flags = PROPAGATE_NONE; /* Check if the command operated changes in the data set. If so * set for replication / AOF propagation. */ // 若是有數據庫的變更,這設置傳播AOF和主從 if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL); /* If the client forced AOF / replication of the command, set * the flags regardless of the command effects on the data set. */ // 判斷是否須要強制傳播 if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL; if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF; /* However prevent AOF / replication propagation if the command * implementatino called preventCommandPropagation() or similar, * or if we don't have the call() flags to do so. */ // 判斷client是否阻止AOF傳播或者主從複製傳播 if (c->flags & CLIENT_PREVENT_REPL_PROP || !(flags & CMD_CALL_PROPAGATE_REPL)) propagate_flags &= ~PROPAGATE_REPL; if (c->flags & CLIENT_PREVENT_AOF_PROP || !(flags & CMD_CALL_PROPAGATE_AOF)) propagate_flags &= ~PROPAGATE_AOF; /* Call propagate() only if at least one of AOF / replication * propagation is needed. Note that modules commands handle replication * in an explicit way, so we never replicate them automatically. */ if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE)) // 傳播 propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags); } /* Restore the old replication flags, since call() can be executed * recursively. */ c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP); c->flags |= client_old_flags & (CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP); /* Handle the alsoPropagate() API to handle commands that want to propagate * multiple separated commands. Note that alsoPropagate() is not affected * by CLIENT_PREVENT_PROP flag. */ // 調用使用alsoPropagate()添加的方法,alsoPropagate()會將方法添加到server.also_propagate中 if (server.also_propagate.numops) { int j; redisOp *rop; if (flags & CMD_CALL_PROPAGATE) { for (j = 0; j < server.also_propagate.numops; j++) { rop = &server.also_propagate.ops[j]; int target = rop->target; /* Whatever the command wish is, we honor the call() flags. */ if (!(flags&CMD_CALL_PROPAGATE_AOF)) target &= ~PROPAGATE_AOF; if (!(flags&CMD_CALL_PROPAGATE_REPL)) target &= ~PROPAGATE_REPL; if (target) propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target); } } redisOpArrayFree(&server.also_propagate); } server.also_propagate = prev_also_propagate; server.stat_numcommands++; }