今天咱們來了解一下 Redis 命令執行的過程。在以前的文章中《當 Redis 發生高延遲時,到底發生了什麼》咱們曾簡單的描述了一條命令的執行過程,本篇文章展現深刻說明一下,加深讀者對 Redis 的瞭解。node
以下圖所示,一條命令執行完成而且返回數據一共涉及三部分,第一步是創建鏈接階段,響應了socket的創建,而且建立了client對象;第二步是處理階段,從socket讀取數據到輸入緩衝區,而後解析並得到命令,執行命令並將返回值存儲到輸出緩衝區中;第三步是數據返回階段,將返回值從輸出緩衝區寫到socket中,返回給客戶端,最後關閉client。 redis
這三個階段之間是經過事件機制串聯了,在 Redis 啓動階段首先要註冊socket鏈接創建事件處理器:算法
接下來,咱們分別來看一下各個步驟的具體原理和代碼實現。數據庫
Redis 服務器啓動時,會調用 initServer 方法,首先會創建 Redis 本身的事件機制 eventLoop,而後在其上註冊週期時間事件處理器,最後在所監聽的 socket 上 建立文件事件處理器,監聽 socket 創建鏈接的事件,其處理函數爲 acceptTcpHandler。緩存
void initServer(void) { // server.c
....
/** * 建立eventLoop */
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
/* Open the TCP listening socket for the user commands. */
if (server.port != 0 &&
listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
exit(1);
/** * 註冊週期時間事件,處理後臺操做,好比說客戶端操做、過時鍵等 */
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1);
}
/** * 爲全部監聽的socket建立文件事件,監聽可讀事件;事件處理函數爲acceptTcpHandler * */
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
serverPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
....
}
複製代碼
在《Redis 事件機制詳解》一文中,咱們曾詳細介紹過 Redis 的事件機制,能夠說,Redis 命令執行過程當中都是由事件機制協調管理的,也就是 initServer 方法中生成的 aeEventLoop。當socket發生對應的事件時,aeEventLoop 對調用已經註冊的對應的事件處理器。bash
當客戶端向 Redis 創建 socket時,aeEventLoop 會調用 acceptTcpHandler 處理函數,服務器會爲每一個連接建立一個 Client 對象,並建立相應文件事件來監聽socket的可讀事件,並指定事件處理函數。服務器
acceptTcpHandler 函數會首先調用 anetTcpAccept
方法,它底層會調用 socket 的 accept 方法,也就是接受客戶端來的創建鏈接請求,而後調用 acceptCommonHandler
方法,繼續後續的邏輯處理。併發
// 當客戶端創建連接時進行的eventloop處理函數 networking.c
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
....
// 層層調用,最後在anet.c 中 anetGenericAccept 方法中調用 socket 的 accept 方法
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
serverLog(LL_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
/**
* 進行socket 創建鏈接後的處理
*/
acceptCommonHandler(cfd,0,cip);
}
複製代碼
acceptCommonHandler 則首先調用 createClient 建立 client,接着判斷當前 client 的數量是否超出了配置的 maxclients,若是超過,則給客戶端發送錯誤信息,而且釋放 client。app
static void acceptCommonHandler(int fd, int flags, char *ip) { //networking.c
client *c;
// 建立redisClient
c = createClient(fd)
// 當 maxClient 屬性被設置,而且client數量已經超出時,給client發送error,而後釋放鏈接
if (listLength(server.clients) > server.maxclients) {
char *err = "-ERR max number of clients reached\r\n";
if (write(c->fd,err,strlen(err)) == -1) {
}
server.stat_rejected_conn++;
freeClient(c);
return;
}
.... // 處理爲設置密碼時默認保護狀態的客戶端鏈接
// 統計鏈接數
server.stat_numconnections++;
c->flags |= flags;
}
複製代碼
createClient 方法用於建立 client,它表明着鏈接到 Redis 客戶端,每一個客戶端都有各自的輸入緩衝區和輸出緩衝區,輸入緩衝區存儲客戶端經過 socket 發送過來的數據,輸出緩衝區則存儲着 Redis 對客戶端的響應數據。client一共有三種類型,不一樣類型的對應緩衝區的大小都不一樣。socket
createClient 方法除了建立 client 結構體並設置其屬性值外,還會對 socket進行配置並註冊讀事件處理器
設置 socket 爲 非阻塞 socket、設置 NO_DELAY 和 SO_KEEPALIVE標誌位來關閉 Nagle 算法而且啓動 socket 存活檢查機制。
設置讀事件處理器,當客戶端經過 socket 發送來數據後,Redis 會調用 readQueryFromClient 方法。
client *createClient(int fd) {
client *c = zmalloc(sizeof(client));
// fd 爲 -1,表示其餘特殊狀況建立的client,redis在進行好比lua腳本執行之類的狀況下也會建立client
if (fd != -1) {
// 配置socket爲非阻塞、NO_DELAY不開啓Nagle算法和SO_KEEPALIVE
anetNonBlock(NULL,fd);
anetEnableTcpNoDelay(NULL,fd);
if (server.tcpkeepalive)
anetKeepAlive(NULL,fd,server.tcpkeepalive);
/**
* 向 eventLoop 中註冊了 readQueryFromClient。
* readQueryFromClient 的做用就是從client中讀取客戶端的查詢緩衝區內容。
* 綁定讀事件到事件 loop (開始接收命令請求)
*/
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
{
close(fd);
zfree(c);
return NULL;
}
}
// 默認選擇數據庫
selectDb(c,0);
uint64_t client_id;
atomicGetIncr(server.next_client_id,client_id,1);
c->id = client_id;
c->fd = fd;
.... // 設置client的屬性
return c;
}
複製代碼
client 的屬性中有不少屬性,好比後邊會看到的輸入緩衝區 querybuf 和輸出緩衝區 buf,這裏由於代碼過長作了省略,感興趣的同窗能夠自行閱讀源碼。
readQueryFromClient 方法會調用 read 方法從 socket 中讀取數據到輸入緩衝區中,而後判斷其大小是否大於系統設置的 client_max_querybuf_len,若是大於,則向 Redis返回錯誤信息,並關閉 client。
將數據讀取到輸入緩衝區後,readQueryFromClient 方法會根據 client 的類型來作不一樣的處理,若是是普通類型,則直接調用 processInputBuffer 來處理;若是是主從客戶端,還須要將命令同步到本身的從服務器中。也就是說,Redis實例將主實例傳來的命令執行後,繼續將命令同步給本身的從實例。
// 處理從client中讀取客戶端的輸入緩衝區內容。
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
client *c = (client*) privdata;
....
if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
// 從 fd 對應的socket中讀取到 client 中的 querybuf 輸入緩衝區
nread = read(fd, c->querybuf+qblen, readlen);
if (nread == -1) {
.... // 出錯釋放 client
} else if (nread == 0) {
// 客戶端主動關閉 connection
serverLog(LL_VERBOSE, "Client closed connection");
freeClient(c);
return;
} else if (c->flags & CLIENT_MASTER) {
/*
* 當這個client表明主從的master節點時,將query buffer和 pending_querybuf結合
* 用於主從複製中的命令傳播????
*/
c->pending_querybuf = sdscatlen(c->pending_querybuf,
c->querybuf+qblen,nread);
}
// 增長已經讀取的字節數
sdsIncrLen(c->querybuf,nread);
c->lastinteraction = server.unixtime;
if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
server.stat_net_input_bytes += nread;
// 若是大於系統配置的最大客戶端緩存區大小,也就是配置文件中的client-query-buffer-limit
if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
// 返回錯誤信息,而且關閉client
bytes = sdscatrepr(bytes,c->querybuf,64);
serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
sdsfree(ci);
sdsfree(bytes);
freeClient(c);
return;
}
if (!(c->flags & CLIENT_MASTER)) {
// processInputBuffer 處理輸入緩衝區
processInputBuffer(c);
} else {
// 若是client是master的鏈接
size_t prev_offset = c->reploff;
processInputBuffer(c);
// 判斷是否同步偏移量發生變化,則通知到後續的slave
size_t applied = c->reploff - prev_offset;
if (applied) {
replicationFeedSlavesFromMasterStream(server.slaves,
c->pending_querybuf, applied);
sdsrange(c->pending_querybuf,applied,-1);
}
}
}
複製代碼
processInputBuffer 主要是將輸入緩衝區中的數據解析成對應的命令,根據命令類型是 PROTO_REQ_MULTIBULK 仍是 PROTO_REQ_INLINE,來分別調用 processInlineBuffer 和 processMultibulkBuffer 方法來解析命令。
而後調用 processCommand 方法來執行命令。執行成功後,若是是主從客戶端,還須要更新同步偏移量 reploff 屬性,而後重置 client,讓client能夠接收一條命令。
void processInputBuffer(client *c) { // networking.c
server.current_client = c;
/* 當緩衝區中還有數據時就一直處理 */
while(sdslen(c->querybuf)) {
.... // 處理 client 的各類狀態
/* 判斷命令請求類型 telnet發送的命令和redis-cli發送的命令請求格式不一樣 */
if (!c->reqtype) {
if (c->querybuf[0] == '*') {
c->reqtype = PROTO_REQ_MULTIBULK;
} else {
c->reqtype = PROTO_REQ_INLINE;
}
}
/**
* 從緩衝區解析命令
*/
if (c->reqtype == PROTO_REQ_INLINE) {
if (processInlineBuffer(c) != C_OK) break;
} else if (c->reqtype == PROTO_REQ_MULTIBULK) {
if (processMultibulkBuffer(c) != C_OK) break;
} else {
serverPanic("Unknown request type");
}
/* 參數個數爲0時重置client,能夠接受下一個命令 */
if (c->argc == 0) {
resetClient(c);
} else {
// 執行命令
if (processCommand(c) == C_OK) {
if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
// 若是是master的client發來的命令,則 更新 reploff
c->reploff = c->read_reploff - sdslen(c->querybuf);
}
// 若是不是阻塞狀態,則重置client,能夠接受下一個命令
if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE)
resetClient(c);
}
}
}
server.current_client = NULL;
}
複製代碼
解析命令暫時不看,就是將 redis 命令文本信息,記錄到client的argv/argc屬性中
processCommand 方法會處理不少邏輯,不過大體能夠分爲三個部分:首先是調用 lookupCommand 方法得到對應的 redisCommand;接着是檢測當前 Redis 是否能夠執行該命令;最後是調用 call 方法真正執行命令。
processCommand會作以下邏輯處理:
int processCommand(client *c) {
// 1 處理 quit 命令
if (!strcasecmp(c->argv[0]->ptr,"quit")) {
addReply(c,shared.ok);
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
return C_ERR;
}
/** * 根據 argv[0] 查找對應的 command * 2 命令字典查找指定命令;全部的命令都存儲在命令字典中 struct redisCommand redisCommandTable[]={} */
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
if (!c->cmd) {
// 處理未知命令
} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
(c->argc < -c->cmd->arity)) {
// 處理參數錯誤
}
// 3 檢查用戶驗證
if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)
{
flagTransaction(c);
addReply(c,shared.noautherr);
return C_OK;
}
/** * 4 若是是集羣模式,處理集羣重定向。當命令發送者是master或者 命令沒有任何key的參數時能夠不重定向 */
if (server.cluster_enabled &&
!(c->flags & CLIENT_MASTER) &&
!(c->flags & CLIENT_LUA &&
server.lua_caller->flags & CLIENT_MASTER) &&
!(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&
c->cmd->proc != execCommand))
{
int hashslot;
int error_code;
// 查詢能夠執行的node信息
clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
&hashslot,&error_code);
if (n == NULL || n != server.cluster->myself) {
if (c->cmd->proc == execCommand) {
discardTransaction(c);
} else {
flagTransaction(c);
}
clusterRedirectClient(c,n,hashslot,error_code);
return C_OK;
}
}
// 5 處理maxmemory請求,先嚐試回收一下,若是不行,則返回異常
if (server.maxmemory) {
int retval = freeMemoryIfNeeded();
....
}
/** * 6 當此服務器是master時:aof持久化失敗時,或上一次bgsave執行錯誤, * 且配置bgsave參數和stop_writes_on_bgsave_err;禁止執行寫命令 */
if (((server.stop_writes_on_bgsave_err &&
server.saveparamslen > 0 &&
server.lastbgsave_status == C_ERR) ||
server.aof_last_write_status == C_ERR) &&
server.masterhost == NULL &&
(c->cmd->flags & CMD_WRITE ||
c->cmd->proc == pingCommand)) { .... }
/** * 7 當此服務器時master時:若是配置了repl_min_slaves_to_write, * 當slave數目小於時,禁止執行寫命令 */
if (server.masterhost == NULL &&
server.repl_min_slaves_to_write &&
server.repl_min_slaves_max_lag &&
c->cmd->flags & CMD_WRITE &&
server.repl_good_slaves_count < server.repl_min_slaves_to_write) { .... }
/** * 8 當時只讀slave時,除了master的不接受其餘寫命令 */
if (server.masterhost && server.repl_slave_ro &&
!(c->flags & CLIENT_MASTER) &&
c->cmd->flags & CMD_WRITE) { .... }
/** * 9 當客戶端正在訂閱頻道時,只會執行如下命令 */
if (c->flags & CLIENT_PUBSUB &&
c->cmd->proc != pingCommand &&
c->cmd->proc != subscribeCommand &&
c->cmd->proc != unsubscribeCommand &&
c->cmd->proc != psubscribeCommand &&
c->cmd->proc != punsubscribeCommand) { .... }
/** * 10 服務器爲slave,但沒有正確鏈接master時,只會執行帶有CMD_STALE標誌的命令,如info等 */
if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&
server.repl_serve_stale_data == 0 &&
!(c->cmd->flags & CMD_STALE)) {...}
/** * 11 正在加載數據庫時,只會執行帶有CMD_LOADING標誌的命令,其他都會被拒絕 */
if (server.loading && !(c->cmd->flags & CMD_LOADING)) { .... }
/** * 12 當服務器由於執行lua腳本阻塞時,只會執行如下幾個命令,其他都會拒絕 */
if (server.lua_timedout &&
c->cmd->proc != authCommand &&
c->cmd->proc != replconfCommand &&
!(c->cmd->proc == shutdownCommand &&
c->argc == 2 &&
tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
!(c->cmd->proc == scriptCommand &&
c->argc == 2 &&
tolower(((char*)c->argv[1]->ptr)[0]) == 'k')) {....}
/** * 13 開始執行命令 */
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
/** * 開啓了事務,命令只會入隊列 */
queueMultiCommand(c);
addReply(c,shared.queued);
} else {
/** * 直接執行命令 */
call(c,CMD_CALL_FULL);
c->woff = server.master_repl_offset;
if (listLength(server.ready_keys))
handleClientsBlockedOnLists();
}
return C_OK;
}
struct redisCommand redisCommandTable[] = {
{"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
{"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
{"hmset",hsetCommand,-4,"wmF",0,NULL,1,1,1,0,0},
.... // 全部的 redis 命令都有
}
複製代碼
call 方法是 Redis 中執行命令的通用方法,它會處理通用的執行命令的前置和後續操做。
命令傳播就是將命令寫入 repl-backlog-buffer 緩衝中,併發送給各個從服務器中。
// 執行client中持有的 redisCommand 命令
void call(client *c, int flags) {
/** * dirty記錄數據庫修改次數;start記錄命令開始執行時間us;duration記錄命令執行花費時間 */
long long dirty, start, duration;
int client_old_flags = c->flags;
/** * 有監視器的話,須要將不是從AOF獲取的命令會發送給監視器。固然,這裏會消耗時間 */
if (listLength(server.monitors) &&
!server.loading &&
!(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN)))
{
replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
}
....
/* Call the command. */
dirty = server.dirty;
start = ustime();
// 處理命令,調用命令處理函數
c->cmd->proc(c);
duration = ustime()-start;
dirty = server.dirty-dirty;
if (dirty < 0) dirty = 0;
.... // Lua 腳本的一些特殊處理
/** * CMD_CALL_SLOWLOG 表示要記錄慢查詢日誌 */
if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) {
char *latency_event = (c->cmd->flags & CMD_FAST) ?
"fast-command" : "command";
latencyAddSampleIfNeeded(latency_event,duration/1000);
slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration);
}
/** * CMD_CALL_STATS 表示要統計 */
if (flags & CMD_CALL_STATS) {
c->lastcmd->microseconds += duration;
c->lastcmd->calls++;
}
/** * CMD_CALL_PROPAGATE表示要進行廣播命令 */
if (flags & CMD_CALL_PROPAGATE &&
(c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
{
int propagate_flags = PROPAGATE_NONE;
/** * dirty大於0時,須要廣播命令給slave和aof */
if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
....
/** * 廣播命令,寫如aof,發送命令到slave * 也就是傳說中的傳播命令 */
if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
}
....
}
複製代碼
因爲文章篇幅問題,本篇文章就先講到這裏,後半部分在接下來的文章中進行講解,歡迎你們繼續關注。