Redis會爲監聽端口的Server Socket的fd在事件循環中註冊讀就緒事件,並添加相應的handler進行處理。node
void initServer(void) { ...... // 爲監聽的端口的fd設置epoll事件和回調, 針對TCP socket /* Create an event handler for accepting new connections in TCP and Unix * domain sockets. */ for (j = 0; j < server.ipfd_count; j++) { if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR) { serverPanic( "Unrecoverable error creating server.ipfd file event."); } } ...... }
同時也會爲客戶端鏈接的Client Socket的fd在事件循環中註冊相應的讀寫事件,並添加與之相對的handler進行處理。react
好比接收到一個客戶端鏈接,建立並註冊讀就緒事件:redis
client *createClient(int fd) { client *c = zmalloc(sizeof(client)); ...... if (fd != -1) { anetNonBlock(NULL,fd); anetEnableTcpNoDelay(NULL,fd); if (server.tcpkeepalive) anetKeepAlive(NULL,fd,server.tcpkeepalive); // 註冊事件 if (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR) { close(fd); zfree(c); return NULL; } } ...... }
下面會分開兩個方向寫:算法
會以TCP鏈接爲主。緩存
// networking.c // 當Server Socket接收到客戶端鏈接,就會有AE_READABLE的事件,而後就會調用該Handler void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { int cport, cfd, max = MAX_ACCEPTS_PER_CALL; char cip[NET_IP_STR_LEN]; UNUSED(el); UNUSED(mask); UNUSED(privdata); // 一個限制,每次事件循環,只接受最多MAX_ACCEPTS_PER_CALL(1000)個Client進行處理 // 防止短期內要處理過多的Client while(max--) { // anetTcpAccept()是一個Socket操做的封裝,裏面調用了accept()將Client Socket的fd返回, // 並返回遠端的IP和端口號 cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); if (cfd == ANET_ERR) { if (errno != EWOULDBLOCK) serverLog(LL_WARNING, "Accepting client connection: %s", server.neterr); return; } serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport); // 獲取到的Client Socket的fd會進入acceptCommonHandler()進行處理 // 會進行一些判斷Redis是否已經超過了最大鏈接數等處理 // 若是沒錯誤的話,會將其封裝成client結構體,放入server.clients中 acceptCommonHandler(cfd,0,cip); } } static void acceptCommonHandler(int fd, int flags, char *ip) { client *c; // 將Client Socket的fd封裝建立個一個client結構 // 會在createClient()中將fd註冊事件循環的讀就緒事件 if ((c = createClient(fd)) == NULL) { serverLog(LL_WARNING, "Error registering fd event for the new client: %s (fd=%d)", strerror(errno),fd); close(fd); /* May be already closed, just ignore errors */ return; } // 若是超過最大鏈接數,將會發送錯誤給客戶端(忽略錯誤),並斷開鏈接 /* If maxclient directive is set and this is one client more... close the * connection. Note that we create the client instead to check before * for this condition, since now the socket is already set in non-blocking * mode and we can send an error for free using the Kernel I/O */ if (listLength(server.clients) > server.maxclients) { char *err = "-ERR max number of clients reached\r\n"; // 忽略發送數據的結果 /* That's a best effort error message, don't check write errors */ if (write(c->fd,err,strlen(err)) == -1) { /* Nothing to do, Just to avoid the warning... */ } server.stat_rejected_conn++; // 斷開client c的鏈接 // 會關閉socket,並註銷全部事件循環的事件 freeClient(c); return; } /* If the server is running in protected mode (the default) and there * is no password set, nor a specific interface is bound, we don't accept * requests from non loopback interfaces. Instead we try to explain the * user what to do to fix it if needed. */ // 若是Redis是在 protected mode // 且 沒有綁定固定端口 // 且 沒有設置訪問密碼 // 且 不是來之Unix Socket的鏈接 // 且 ip 不爲空 if (server.protected_mode && server.bindaddr_count == 0 && server.requirepass == NULL && !(flags & CLIENT_UNIX_SOCKET) && ip != NULL) { // 那麼就會嘗試判斷ip是否爲本地鏈接,若是不是就斷開鏈接(由於不安全啊~) if (strcmp(ip,"127.0.0.1") && strcmp(ip,"::1")) { char *err = "-DENIED Redis is running in protected mode because protected " "mode is enabled, no bind address was specified, no " "authentication password is requested to clients. In this mode " "connections are only accepted from the loopback interface. " "If you want to connect from external computers to Redis you " "may adopt one of the following solutions: " "1) Just disable protected mode sending the command " "'CONFIG SET protected-mode no' from the loopback interface " "by connecting to Redis from the same host the server is " "running, however MAKE SURE Redis is not publicly accessible " "from internet if you do so. Use CONFIG REWRITE to make this " "change permanent. " "2) Alternatively you can just disable the protected mode by " "editing the Redis configuration file, and setting the protected " "mode option to 'no', and then restarting the server. " "3) If you started the server manually just for testing, restart " "it with the '--protected-mode no' option. " "4) Setup a bind address or an authentication password. " "NOTE: You only need to do one of the above things in order for " "the server to start accepting connections from the outside.\r\n"; if (write(c->fd,err,strlen(err)) == -1) { /* Nothing to do, Just to avoid the warning... */ } server.stat_rejected_conn++; freeClient(c); return; } } // 鏈接數+1 server.stat_numconnections++; c->flags |= flags; }
client
的結構/* With multiplexing we need to take per-client state. * Clients are taken in a linked list. */ typedef struct client { uint64_t id; /* Client incremental unique ID. */ int fd; /* Client socket. */ redisDb *db; /* Pointer to currently SELECTed DB. */ robj *name; /* As set by CLIENT SETNAME. */ sds querybuf; /* Buffer we use to accumulate client queries. */ size_t qb_pos; /* The position we have read in querybuf. */ sds pending_querybuf; /* If this client is flagged as master, this buffer represents the yet not applied portion of the replication stream that we are receiving from the master. */ size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size. */ int argc; /* Num of arguments of current command. */ robj **argv; /* Arguments of current command. */ struct redisCommand *cmd, *lastcmd; /* Last command executed. */ int reqtype; /* Request protocol type: PROTO_REQ_* */ int multibulklen; /* Number of multi bulk arguments left to read. */ long bulklen; /* Length of bulk argument in multi bulk request. */ list *reply; /* List of reply objects to send to the client. */ unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */ size_t sentlen; /* Amount of bytes already sent in the current buffer or object being sent. */ time_t ctime; /* Client creation time. */ time_t lastinteraction; /* Time of the last interaction, used for timeout */ time_t obuf_soft_limit_reached_time; int flags; /* Client flags: CLIENT_* macros. */ int authenticated; /* When requirepass is non-NULL. */ int replstate; /* Replication state if this is a slave. */ int repl_put_online_on_ack; /* Install slave write handler on ACK. */ int repldbfd; /* Replication DB file descriptor. */ off_t repldboff; /* Replication DB file offset. */ off_t repldbsize; /* Replication DB file size. */ sds replpreamble; /* Replication DB preamble. */ long long read_reploff; /* Read replication offset if this is a master. */ long long reploff; /* Applied replication offset if this is a master. */ long long repl_ack_off; /* Replication ack offset, if this is a slave. */ long long repl_ack_time;/* Replication ack time, if this is a slave. */ long long psync_initial_offset; /* FULLRESYNC reply offset other slaves copying this slave output buffer should use. */ char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */ int slave_listening_port; /* As configured with: SLAVECONF listening-port */ char slave_ip[NET_IP_STR_LEN]; /* Optionally given by REPLCONF ip-address */ int slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */ multiState mstate; /* MULTI/EXEC state */ int btype; /* Type of blocking op if CLIENT_BLOCKED. */ blockingState bpop; /* blocking state */ long long woff; /* Last write global replication offset. */ list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */ dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */ list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */ sds peerid; /* Cached peer ID. */ listNode *client_list_node; /* list node in client list */ /* Response buffer */ int bufpos; char buf[PROTO_REPLY_CHUNK_BYTES]; } client;
PS: 有註釋就懶得寫了,並且部分屬性我還沒細看安全
client *createClient(int fd)
createClient()
的主要功能是傳入Client Socket的fd,用來初始化建立一個client,client中記錄則該鏈接的一些操做數據,好比WATCH KEY
的列表等。服務器
// networking.c // 傳入Client Socket的fd,用來初始化建立一個client client *createClient(int fd) { // 申請內存 client *c = zmalloc(sizeof(client)); /* passing -1 as fd it is possible to create a non connected client. * This is useful since all the commands needs to be executed * in the context of a client. When commands are executed in other * contexts (for instance a Lua script) we need a non connected client. */ if (fd != -1) { // 將Client Socket設成非阻塞模式(epoll等須要) anetNonBlock(NULL,fd); // 關閉TCP的Nagle算法,使得能更快響應客戶端的請求 anetEnableTcpNoDelay(NULL,fd); // 開啓keepalive if (server.tcpkeepalive) anetKeepAlive(NULL,fd,server.tcpkeepalive); // 爲客戶端註冊讀就緒事件,並註冊handler if (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR) { close(fd); zfree(c); return NULL; } } // 初始化client的各個參數 selectDb(c,0); uint64_t client_id; atomicGetIncr(server.next_client_id,client_id,1); c->id = client_id; c->fd = fd; c->name = NULL; c->bufpos = 0; c->qb_pos = 0; c->querybuf = sdsempty(); c->pending_querybuf = sdsempty(); c->querybuf_peak = 0; c->reqtype = 0; c->argc = 0; c->argv = NULL; c->cmd = c->lastcmd = NULL; c->multibulklen = 0; c->bulklen = -1; c->sentlen = 0; c->flags = 0; c->ctime = c->lastinteraction = server.unixtime; c->authenticated = 0; c->replstate = REPL_STATE_NONE; c->repl_put_online_on_ack = 0; c->reploff = 0; c->read_reploff = 0; c->repl_ack_off = 0; c->repl_ack_time = 0; c->slave_listening_port = 0; c->slave_ip[0] = '\0'; c->slave_capa = SLAVE_CAPA_NONE; c->reply = listCreate(); c->reply_bytes = 0; c->obuf_soft_limit_reached_time = 0; listSetFreeMethod(c->reply,freeClientReplyValue); listSetDupMethod(c->reply,dupClientReplyValue); c->btype = BLOCKED_NONE; c->bpop.timeout = 0; c->bpop.keys = dictCreate(&objectKeyHeapPointerValueDictType,NULL); c->bpop.target = NULL; c->bpop.xread_group = NULL; c->bpop.xread_consumer = NULL; c->bpop.xread_group_noack = 0; c->bpop.numreplicas = 0; c->bpop.reploffset = 0; c->woff = 0; c->watched_keys = listCreate(); c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType,NULL); c->pubsub_patterns = listCreate(); c->peerid = NULL; c->client_list_node = NULL; listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid); listSetMatchMethod(c->pubsub_patterns,listMatchObjects); if (fd != -1) linkClient(c); initClientMultiState(c); return c; }
void freeClient(client *c)
freeClient()
是釋放client,斷開鏈接,釋放緩存等操做。app
// networking.c void freeClient(client *c) { listNode *ln; /* If a client is protected, yet we need to free it right now, make sure * to at least use asynchronous freeing. */ if (c->flags & CLIENT_PROTECTED) { // 將client添加到 erver.clients_to_close中 // 等時間事件serverCron,調用freeClientsInAsyncFreeQueue()來釋放裏面的鏈接 freeClientAsync(c); return; } /* If it is our master that's beging disconnected we should make sure * to cache the state to try a partial resynchronization later. * * Note that before doing this we make sure that the client is not in * some unexpected state, by checking its flags. */ // 與Master斷開的處理 if (server.master && c->flags & CLIENT_MASTER) { serverLog(LL_WARNING,"Connection with master lost."); if (!(c->flags & (CLIENT_CLOSE_AFTER_REPLY| CLIENT_CLOSE_ASAP| CLIENT_BLOCKED))) { replicationCacheMaster(c); return; } } // 與 Slave 斷開鏈接的處理 /* Log link disconnection with slave */ if ((c->flags & CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR)) { serverLog(LL_WARNING,"Connection with replica %s lost.", replicationGetSlaveName(c)); } /* 下面就是釋放各類內存等操做了 */ /* Free the query buffer */ sdsfree(c->querybuf); sdsfree(c->pending_querybuf); c->querybuf = NULL; /* Deallocate structures used to block on blocking ops. */ if (c->flags & CLIENT_BLOCKED) unblockClient(c); dictRelease(c->bpop.keys); /* UNWATCH all the keys */ unwatchAllKeys(c); listRelease(c->watched_keys); /* Unsubscribe from all the pubsub channels */ pubsubUnsubscribeAllChannels(c,0); pubsubUnsubscribeAllPatterns(c,0); dictRelease(c->pubsub_channels); listRelease(c->pubsub_patterns); /* Free data structures. */ listRelease(c->reply); freeClientArgv(c); /* Unlink the client: this will close the socket, remove the I/O * handlers, and remove references of the client from different * places where active clients may be referenced. */ // 真正斷開Socket的地方 unlinkClient(c); /* Master/slave cleanup Case 1: * we lost the connection with a slave. */ if (c->flags & CLIENT_SLAVE) { if (c->replstate == SLAVE_STATE_SEND_BULK) { if (c->repldbfd != -1) close(c->repldbfd); if (c->replpreamble) sdsfree(c->replpreamble); } list *l = (c->flags & CLIENT_MONITOR) ? server.monitors : server.slaves; ln = listSearchKey(l,c); serverAssert(ln != NULL); listDelNode(l,ln); /* We need to remember the time when we started to have zero * attached slaves, as after some time we'll free the replication * backlog. */ if (c->flags & CLIENT_SLAVE && listLength(server.slaves) == 0) server.repl_no_slaves_since = server.unixtime; refreshGoodSlavesCount(); } /* Master/slave cleanup Case 2: * we lost the connection with the master. */ if (c->flags & CLIENT_MASTER) replicationHandleMasterDisconnection(); /* If this client was scheduled for async freeing we need to remove it * from the queue. */ if (c->flags & CLIENT_CLOSE_ASAP) { ln = listSearchKey(server.clients_to_close,c); serverAssert(ln != NULL); listDelNode(server.clients_to_close,ln); } /* Release other dynamically allocated client structure fields, * and finally release the client structure itself. */ if (c->name) decrRefCount(c->name); zfree(c->argv); freeClientMultiState(c); sdsfree(c->peerid); zfree(c); }
這裏就是主要看,Client鏈接上以後,主要的處理流程。dom
主要是從createClient()
中,爲客戶端註冊讀就緒事件的readQueryFromClient()
這個Handler開始。socket
這裏涉及到了Redis的通訊協議,配合https://redis.io/topics/protocol食用纔是正道。
從上面連接能夠得知,請求能夠分爲2種類型:
*
開頭的多個請求整個調用流程大概爲:
readQueryFromClient()
, 客戶端Client讀就緒後,向Socket讀取數據,並存入client的buf中,而後調用processInputBufferAndReplicate()
。processInputBufferAndReplicate()
會根據client的類型(Master的Client 和 其餘Client),分別調用processInputBuffer()
對收到的數據進行處理,兩種Client都調用processInputBuffer()
,可是Master的Client須要額外處理。processInputBuffer()
會對數據進行必定處理,取出client的buf中未處理的數據,並判斷請求的類型(INLINE/MULTIBULK),並將數據的數量和值保存進client->argc
和client->argv
中,而後調用processCommand()
(在server.c
中)執行。processCommand()
是真正檢查和執行命令的函數。PS: processCommand()
後面單獨寫,主要是懶
readQueryFromClient()
// 客戶端Client讀就緒後,調用的Handler void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { client *c = (client*) privdata; int nread, readlen; size_t qblen; UNUSED(el); UNUSED(mask); // 默認的讀取緩存大小 readlen = PROTO_IOBUF_LEN; /* If this is a multi bulk request, and we are processing a bulk reply * that is large enough, try to maximize the probability that the query * buffer contains exactly the SDS string representing the object, even * at the risk of requiring more read(2) calls. This way the function * processMultiBulkBuffer() can avoid copying buffers to create the * Redis Object representing the argument. */ // 若是是一個批量請求 if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1 && c->bulklen >= PROTO_MBULK_BIG_ARG) { ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf); /* Note that the 'remaining' variable may be zero in some edge case, * for example once we resume a blocked client after CLIENT PAUSE. */ if (remaining > 0 && remaining < readlen) readlen = remaining; } qblen = sdslen(c->querybuf); // 查詢query的長度最大值 if (c->querybuf_peak < qblen) c->querybuf_peak = qblen; // 擴大c->querybuf的SDS字符串到相應的長度 c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); // socket中讀取數據 nread = read(fd, c->querybuf+qblen, readlen); if (nread == -1) { if (errno == EAGAIN) { return; } else { serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno)); freeClient(c); return; } } else if (nread == 0) { serverLog(LL_VERBOSE, "Client closed connection"); freeClient(c); return; } else if (c->flags & CLIENT_MASTER) { /* Append the query buffer to the pending (not applied) buffer * of the master. We'll use this buffer later in order to have a * copy of the string applied by the last command executed. */ // 複製拼接字符串,將`c->querybuf+qblen`後的數據,複製拼接到`c->pending_querybuf`後 // master的鏈接是處理`c->pending_querybuf`的 c->pending_querybuf = sdscatlen(c->pending_querybuf, c->querybuf+qblen,nread); } // 更新sds的長度 sdsIncrLen(c->querybuf,nread); c->lastinteraction = server.unixtime; if (c->flags & CLIENT_MASTER) c->read_reploff += nread; server.stat_net_input_bytes += nread; // 超過最大長度限制 if (sdslen(c->querybuf) > server.client_max_querybuf_len) { sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty(); bytes = sdscatrepr(bytes,c->querybuf,64); serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); sdsfree(ci); sdsfree(bytes); freeClient(c); return; } /* Time to process the buffer. If the client is a master we need to * compute the difference between the applied offset before and after * processing the buffer, to understand how much of the replication stream * was actually applied to the master state: this quantity, and its * corresponding part of the replication stream, will be propagated to * the sub-slaves and to the replication backlog. */ // 處理接收到的數據 processInputBufferAndReplicate(c); }
能夠看到readQueryFromClient()
的主要工做其實很簡單,主要是從socket中讀取數據,並將其存入client->querybuf
中
processInputBufferAndReplicate()
/* This is a wrapper for processInputBuffer that also cares about handling * the replication forwarding to the sub-slaves, in case the client 'c' * is flagged as master. Usually you want to call this instead of the * raw processInputBuffer(). */ // 對processInputBuffer()的封裝,主要是master鏈接的處理差別 void processInputBufferAndReplicate(client *c) { if (!(c->flags & CLIENT_MASTER)) { // 若是這個不是master的client鏈接 processInputBuffer(c); } else { // master的client鏈接 size_t prev_offset = c->reploff; processInputBuffer(c); size_t applied = c->reploff - prev_offset; // applied是計算c->pending_querybuf處理了哪些數據,而後在使用sdsrange()清除已經被處理的 if (applied) { // 用戶將數據代理到該Redis的slaves中 replicationFeedSlavesFromMasterStream(server.slaves, c->pending_querybuf, applied); sdsrange(c->pending_querybuf,applied,-1); } } }
processInputBufferAndReplicate()
主要是對processInputBuffer()
進行一個封裝,其實直接寫在readQueryFromClient()
也能夠,但如今這麼作,對於之後添加c->flags
不一樣的處理時,更加直觀。
processInputBuffer()
/* This function is called every time, in the client structure 'c', there is * more query buffer to process, because we read more data from the socket * or because a client was blocked and later reactivated, so there could be * pending query buffer, already representing a full command, to process. */ // 讀取數據,判斷數據格式是否正確 void processInputBuffer(client *c) { server.current_client = c; /* Keep processing while there is something in the input buffer */ // `c->qb_pos`是已經讀取的`c->querybuf`遊標 // 因此當`c->qb_pos` >= `c->querybuf`時,就不須要處理了 while(c->qb_pos < sdslen(c->querybuf)) { /* Return if clients are paused. */ // 若是非slave鏈接,並且當前服務器的全部client都被暫停了 // 則退出循環 if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break; // client 處於CLIENT_BLOCKED,停止 // TODO: 看看什麼狀況下會處於這個狀態 /* Immediately abort if the client is in the middle of something. */ if (c->flags & CLIENT_BLOCKED) break; /* Don't process input from the master while there is a busy script * condition on the slave. We want just to accumulate the replication * stream (instead of replying -BUSY like we do with other clients) and * later resume the processing. */ if (server.lua_timedout && c->flags & CLIENT_MASTER) break; /* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is * written to the client. Make sure to not let the reply grow after * this flag has been set (i.e. don't process more commands). * * The same applies for clients we want to terminate ASAP. */ // CLIENT_CLOSE_AFTER_REPLY: 表示響應客戶端的請求後斷開鏈接 // CLIENT_CLOSE_ASAP: 表示要斷開這個鏈接 // 因此 `CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP`是已經響應請求,須要斷開的鏈接 if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break; /* Determine request type when unknown. */ // 判斷命令的類型 if (!c->reqtype) { // 當client剛建立的時候`c->reqtype`默認爲0(詳細見`createClient()`) // 經過讀數據的第一位判斷是命令類型是MULTIBULK仍是INLINE // 詳細協議看 https://redis.io/topics/protocol if (c->querybuf[c->qb_pos] == '*') { c->reqtype = PROTO_REQ_MULTIBULK; } else { c->reqtype = PROTO_REQ_INLINE; } } if (c->reqtype == PROTO_REQ_INLINE) { // 處理INLINE請求的buffer // 其實就是處理好請求的命令數據,存進去`client->argc`和`client->argv`中 if (processInlineBuffer(c) != C_OK) break; } else if (c->reqtype == PROTO_REQ_MULTIBULK) { // 處理MULTIBULK請求 // 與processInlineBuffer()處理的方式相似,當時協議格式不同 if (processMultibulkBuffer(c) != C_OK) break; } else { serverPanic("Unknown request type"); } /* Multibulk processing could see a <= 0 length. */ if (c->argc == 0) { resetClient(c); } else { /* Only reset the client when the command was executed. */ // 調用server.c中的processCommand()函數執行命令 if (processCommand(c) == C_OK) { if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { /* Update the applied replication offset of our master. */ c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos; } /* Don't reset the client structure for clients blocked in a * module blocking command, so that the reply callback will * still be able to access the client argv and argc field. * The client will be reset in unblockClientFromModule(). */ if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE) resetClient(c); } /* freeMemoryIfNeeded may flush slave output buffers. This may * result into a slave, that may be the active client, to be * freed. */ if (server.current_client == NULL) break; } } /* Trim to pos */ if (c->qb_pos) { // 將已經處理的`c->querybuf`中的數據刪除 sdsrange(c->querybuf,c->qb_pos,-1); c->qb_pos = 0; } server.current_client = NULL; }
processInputBuffer()
最主要的工做就是對鏈接傳入的數據,進行必定格式化(並無檢查內容),方便processCommand()
調用,同時經過首字符,判斷請求的類型(INLINE、MULTIBULK)。