Redis源碼閱讀筆記-命令的接收和執行過程(一)

Redis會爲監聽端口的Server Socket的fd在事件循環中註冊讀就緒事件,並添加相應的handler進行處理。node

void initServer(void) {
    ......
    // 爲監聽的端口的fd設置epoll事件和回調, 針對TCP socket
    /* Create an event handler for accepting new connections in TCP and Unix
     * domain sockets. */
    for (j = 0; j < server.ipfd_count; j++) {
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
            acceptTcpHandler,NULL) == AE_ERR)
            {
                serverPanic(
                    "Unrecoverable error creating server.ipfd file event.");
            }
    }
    ......
}

同時也會爲客戶端鏈接的Client Socket的fd在事件循環中註冊相應的讀寫事件,並添加與之相對的handler進行處理。react

好比接收到一個客戶端鏈接,建立並註冊讀就緒事件:redis

client *createClient(int fd) {
    client *c = zmalloc(sizeof(client));

    ......

    if (fd != -1) {
        anetNonBlock(NULL,fd);
        anetEnableTcpNoDelay(NULL,fd);
        if (server.tcpkeepalive)
            anetKeepAlive(NULL,fd,server.tcpkeepalive);
        // 註冊事件
        if (aeCreateFileEvent(server.el,fd,AE_READABLE,
            readQueryFromClient, c) == AE_ERR)
        {
            close(fd);
            zfree(c);
            return NULL;
        }
    }

    ......
}

下面會分開兩個方向寫:算法

  • Server Socket接收到Client會如何處理
  • Client是如何接收命令和執行並響應的

會以TCP鏈接爲主。緩存

接收Client的鏈接

// networking.c

// 當Server Socket接收到客戶端鏈接,就會有AE_READABLE的事件,而後就會調用該Handler
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    char cip[NET_IP_STR_LEN];
    UNUSED(el);
    UNUSED(mask);
    UNUSED(privdata);

    // 一個限制,每次事件循環,只接受最多MAX_ACCEPTS_PER_CALL(1000)個Client進行處理
    // 防止短期內要處理過多的Client
    while(max--) {
        // anetTcpAccept()是一個Socket操做的封裝,裏面調用了accept()將Client Socket的fd返回,
        // 並返回遠端的IP和端口號
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                serverLog(LL_WARNING,
                    "Accepting client connection: %s", server.neterr);
            return;
        }
        serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
        // 獲取到的Client Socket的fd會進入acceptCommonHandler()進行處理
        // 會進行一些判斷Redis是否已經超過了最大鏈接數等處理
        // 若是沒錯誤的話,會將其封裝成client結構體,放入server.clients中
        acceptCommonHandler(cfd,0,cip);
    }
}

static void acceptCommonHandler(int fd, int flags, char *ip) {
    client *c;
    // 將Client Socket的fd封裝建立個一個client結構
    // 會在createClient()中將fd註冊事件循環的讀就緒事件
    if ((c = createClient(fd)) == NULL) {
        serverLog(LL_WARNING,
            "Error registering fd event for the new client: %s (fd=%d)",
            strerror(errno),fd);
        close(fd); /* May be already closed, just ignore errors */
        return;
    }

    // 若是超過最大鏈接數,將會發送錯誤給客戶端(忽略錯誤),並斷開鏈接
    /* If maxclient directive is set and this is one client more... close the
     * connection. Note that we create the client instead to check before
     * for this condition, since now the socket is already set in non-blocking
     * mode and we can send an error for free using the Kernel I/O */
    if (listLength(server.clients) > server.maxclients) {
        char *err = "-ERR max number of clients reached\r\n";

        // 忽略發送數據的結果
        /* That's a best effort error message, don't check write errors */
        if (write(c->fd,err,strlen(err)) == -1) {
            /* Nothing to do, Just to avoid the warning... */
        }
        server.stat_rejected_conn++;
        // 斷開client c的鏈接
        // 會關閉socket,並註銷全部事件循環的事件
        freeClient(c);
        return;
    }

    /* If the server is running in protected mode (the default) and there
     * is no password set, nor a specific interface is bound, we don't accept
     * requests from non loopback interfaces. Instead we try to explain the
     * user what to do to fix it if needed. */
    // 若是Redis是在 protected mode
    // 且 沒有綁定固定端口
    // 且 沒有設置訪問密碼
    // 且 不是來之Unix Socket的鏈接
    // 且 ip 不爲空

    if (server.protected_mode &&
        server.bindaddr_count == 0 &&
        server.requirepass == NULL &&
        !(flags & CLIENT_UNIX_SOCKET) &&
        ip != NULL)
    {
        // 那麼就會嘗試判斷ip是否爲本地鏈接,若是不是就斷開鏈接(由於不安全啊~)
        if (strcmp(ip,"127.0.0.1") && strcmp(ip,"::1")) {
            char *err =
                "-DENIED Redis is running in protected mode because protected "
                "mode is enabled, no bind address was specified, no "
                "authentication password is requested to clients. In this mode "
                "connections are only accepted from the loopback interface. "
                "If you want to connect from external computers to Redis you "
                "may adopt one of the following solutions: "
                "1) Just disable protected mode sending the command "
                "'CONFIG SET protected-mode no' from the loopback interface "
                "by connecting to Redis from the same host the server is "
                "running, however MAKE SURE Redis is not publicly accessible "
                "from internet if you do so. Use CONFIG REWRITE to make this "
                "change permanent. "
                "2) Alternatively you can just disable the protected mode by "
                "editing the Redis configuration file, and setting the protected "
                "mode option to 'no', and then restarting the server. "
                "3) If you started the server manually just for testing, restart "
                "it with the '--protected-mode no' option. "
                "4) Setup a bind address or an authentication password. "
                "NOTE: You only need to do one of the above things in order for "
                "the server to start accepting connections from the outside.\r\n";
            if (write(c->fd,err,strlen(err)) == -1) {
                /* Nothing to do, Just to avoid the warning... */
            }
            server.stat_rejected_conn++;
            freeClient(c);
            return;
        }
    }

    // 鏈接數+1
    server.stat_numconnections++;
    c->flags |= flags;
}

流程

client的結構

/* With multiplexing we need to take per-client state.
 * Clients are taken in a linked list. */
typedef struct client {
    uint64_t id;            /* Client incremental unique ID. */
    int fd;                 /* Client socket. */
    redisDb *db;            /* Pointer to currently SELECTed DB. */
    robj *name;             /* As set by CLIENT SETNAME. */
    sds querybuf;           /* Buffer we use to accumulate client queries. */
    size_t qb_pos;          /* The position we have read in querybuf. */
    sds pending_querybuf;   /* If this client is flagged as master, this buffer
                               represents the yet not applied portion of the
                               replication stream that we are receiving from
                               the master. */
    size_t querybuf_peak;   /* Recent (100ms or more) peak of querybuf size. */
    int argc;               /* Num of arguments of current command. */
    robj **argv;            /* Arguments of current command. */
    struct redisCommand *cmd, *lastcmd;  /* Last command executed. */
    int reqtype;            /* Request protocol type: PROTO_REQ_* */
    int multibulklen;       /* Number of multi bulk arguments left to read. */
    long bulklen;           /* Length of bulk argument in multi bulk request. */
    list *reply;            /* List of reply objects to send to the client. */
    unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */
    size_t sentlen;         /* Amount of bytes already sent in the current
                               buffer or object being sent. */
    time_t ctime;           /* Client creation time. */
    time_t lastinteraction; /* Time of the last interaction, used for timeout */
    time_t obuf_soft_limit_reached_time;
    int flags;              /* Client flags: CLIENT_* macros. */
    int authenticated;      /* When requirepass is non-NULL. */
    int replstate;          /* Replication state if this is a slave. */
    int repl_put_online_on_ack; /* Install slave write handler on ACK. */
    int repldbfd;           /* Replication DB file descriptor. */
    off_t repldboff;        /* Replication DB file offset. */
    off_t repldbsize;       /* Replication DB file size. */
    sds replpreamble;       /* Replication DB preamble. */
    long long read_reploff; /* Read replication offset if this is a master. */
    long long reploff;      /* Applied replication offset if this is a master. */
    long long repl_ack_off; /* Replication ack offset, if this is a slave. */
    long long repl_ack_time;/* Replication ack time, if this is a slave. */
    long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
                                       copying this slave output buffer
                                       should use. */
    char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */
    int slave_listening_port; /* As configured with: SLAVECONF listening-port */
    char slave_ip[NET_IP_STR_LEN]; /* Optionally given by REPLCONF ip-address */
    int slave_capa;         /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */
    multiState mstate;      /* MULTI/EXEC state */
    int btype;              /* Type of blocking op if CLIENT_BLOCKED. */
    blockingState bpop;     /* blocking state */
    long long woff;         /* Last write global replication offset. */
    list *watched_keys;     /* Keys WATCHED for MULTI/EXEC CAS */
    dict *pubsub_channels;  /* channels a client is interested in (SUBSCRIBE) */
    list *pubsub_patterns;  /* patterns a client is interested in (SUBSCRIBE) */
    sds peerid;             /* Cached peer ID. */
    listNode *client_list_node; /* list node in client list */

    /* Response buffer */
    int bufpos;
    char buf[PROTO_REPLY_CHUNK_BYTES];
} client;

PS: 有註釋就懶得寫了,並且部分屬性我還沒細看安全

建立client *createClient(int fd)

createClient()的主要功能是傳入Client Socket的fd,用來初始化建立一個client,client中記錄則該鏈接的一些操做數據,好比WATCH KEY的列表等。服務器

// networking.c

// 傳入Client Socket的fd,用來初始化建立一個client
client *createClient(int fd) {
    // 申請內存
    client *c = zmalloc(sizeof(client));

    /* passing -1 as fd it is possible to create a non connected client.
     * This is useful since all the commands needs to be executed
     * in the context of a client. When commands are executed in other
     * contexts (for instance a Lua script) we need a non connected client. */
    if (fd != -1) {
        // 將Client Socket設成非阻塞模式(epoll等須要)
        anetNonBlock(NULL,fd);
        // 關閉TCP的Nagle算法,使得能更快響應客戶端的請求
        anetEnableTcpNoDelay(NULL,fd);
        // 開啓keepalive
        if (server.tcpkeepalive)
            anetKeepAlive(NULL,fd,server.tcpkeepalive);
        // 爲客戶端註冊讀就緒事件,並註冊handler
        if (aeCreateFileEvent(server.el,fd,AE_READABLE,
            readQueryFromClient, c) == AE_ERR)
        {
            close(fd);
            zfree(c);
            return NULL;
        }
    }

    // 初始化client的各個參數
    selectDb(c,0);
    uint64_t client_id;
    atomicGetIncr(server.next_client_id,client_id,1);
    c->id = client_id;
    c->fd = fd;
    c->name = NULL;
    c->bufpos = 0;
    c->qb_pos = 0;
    c->querybuf = sdsempty();
    c->pending_querybuf = sdsempty();
    c->querybuf_peak = 0;
    c->reqtype = 0;
    c->argc = 0;
    c->argv = NULL;
    c->cmd = c->lastcmd = NULL;
    c->multibulklen = 0;
    c->bulklen = -1;
    c->sentlen = 0;
    c->flags = 0;
    c->ctime = c->lastinteraction = server.unixtime;
    c->authenticated = 0;
    c->replstate = REPL_STATE_NONE;
    c->repl_put_online_on_ack = 0;
    c->reploff = 0;
    c->read_reploff = 0;
    c->repl_ack_off = 0;
    c->repl_ack_time = 0;
    c->slave_listening_port = 0;
    c->slave_ip[0] = '\0';
    c->slave_capa = SLAVE_CAPA_NONE;
    c->reply = listCreate();
    c->reply_bytes = 0;
    c->obuf_soft_limit_reached_time = 0;
    listSetFreeMethod(c->reply,freeClientReplyValue);
    listSetDupMethod(c->reply,dupClientReplyValue);
    c->btype = BLOCKED_NONE;
    c->bpop.timeout = 0;
    c->bpop.keys = dictCreate(&objectKeyHeapPointerValueDictType,NULL);
    c->bpop.target = NULL;
    c->bpop.xread_group = NULL;
    c->bpop.xread_consumer = NULL;
    c->bpop.xread_group_noack = 0;
    c->bpop.numreplicas = 0;
    c->bpop.reploffset = 0;
    c->woff = 0;
    c->watched_keys = listCreate();
    c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType,NULL);
    c->pubsub_patterns = listCreate();
    c->peerid = NULL;
    c->client_list_node = NULL;
    listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
    listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
    if (fd != -1) linkClient(c);
    initClientMultiState(c);
    return c;
}

釋放void freeClient(client *c)

freeClient()是釋放client,斷開鏈接,釋放緩存等操做。app

// networking.c


void freeClient(client *c) {
    listNode *ln;

    /* If a client is protected, yet we need to free it right now, make sure
     * to at least use asynchronous freeing. */
    if (c->flags & CLIENT_PROTECTED) {

        // 將client添加到 erver.clients_to_close中
        // 等時間事件serverCron,調用freeClientsInAsyncFreeQueue()來釋放裏面的鏈接
        freeClientAsync(c);
        return;
    }

    /* If it is our master that's beging disconnected we should make sure
     * to cache the state to try a partial resynchronization later.
     *
     * Note that before doing this we make sure that the client is not in
     * some unexpected state, by checking its flags. */
    // 與Master斷開的處理
    if (server.master && c->flags & CLIENT_MASTER) {
        serverLog(LL_WARNING,"Connection with master lost.");
        if (!(c->flags & (CLIENT_CLOSE_AFTER_REPLY|
                          CLIENT_CLOSE_ASAP|
                          CLIENT_BLOCKED)))
        {
            replicationCacheMaster(c);
            return;
        }
    }

    // 與 Slave 斷開鏈接的處理
    /* Log link disconnection with slave */
    if ((c->flags & CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR)) {
        serverLog(LL_WARNING,"Connection with replica %s lost.",
            replicationGetSlaveName(c));
    }

    /* 下面就是釋放各類內存等操做了 */

    /* Free the query buffer */
    sdsfree(c->querybuf);
    sdsfree(c->pending_querybuf);
    c->querybuf = NULL;

    /* Deallocate structures used to block on blocking ops. */
    if (c->flags & CLIENT_BLOCKED) unblockClient(c);
    dictRelease(c->bpop.keys);

    /* UNWATCH all the keys */
    unwatchAllKeys(c);
    listRelease(c->watched_keys);

    /* Unsubscribe from all the pubsub channels */
    pubsubUnsubscribeAllChannels(c,0);
    pubsubUnsubscribeAllPatterns(c,0);
    dictRelease(c->pubsub_channels);
    listRelease(c->pubsub_patterns);

    /* Free data structures. */
    listRelease(c->reply);
    freeClientArgv(c);

    /* Unlink the client: this will close the socket, remove the I/O
     * handlers, and remove references of the client from different
     * places where active clients may be referenced. */
    // 真正斷開Socket的地方
    unlinkClient(c);

    /* Master/slave cleanup Case 1:
     * we lost the connection with a slave. */
    if (c->flags & CLIENT_SLAVE) {
        if (c->replstate == SLAVE_STATE_SEND_BULK) {
            if (c->repldbfd != -1) close(c->repldbfd);
            if (c->replpreamble) sdsfree(c->replpreamble);
        }
        list *l = (c->flags & CLIENT_MONITOR) ? server.monitors : server.slaves;
        ln = listSearchKey(l,c);
        serverAssert(ln != NULL);
        listDelNode(l,ln);
        /* We need to remember the time when we started to have zero
         * attached slaves, as after some time we'll free the replication
         * backlog. */
        if (c->flags & CLIENT_SLAVE && listLength(server.slaves) == 0)
            server.repl_no_slaves_since = server.unixtime;
        refreshGoodSlavesCount();
    }

    /* Master/slave cleanup Case 2:
     * we lost the connection with the master. */
    if (c->flags & CLIENT_MASTER) replicationHandleMasterDisconnection();

    /* If this client was scheduled for async freeing we need to remove it
     * from the queue. */
    if (c->flags & CLIENT_CLOSE_ASAP) {
        ln = listSearchKey(server.clients_to_close,c);
        serverAssert(ln != NULL);
        listDelNode(server.clients_to_close,ln);
    }

    /* Release other dynamically allocated client structure fields,
     * and finally release the client structure itself. */
    if (c->name) decrRefCount(c->name);
    zfree(c->argv);
    freeClientMultiState(c);
    sdsfree(c->peerid);
    zfree(c);
}

接收來之Client的命令

這裏就是主要看,Client鏈接上以後,主要的處理流程。dom

主要是從createClient()中,爲客戶端註冊讀就緒事件的readQueryFromClient()這個Handler開始。socket

這裏涉及到了Redis的通訊協議,配合https://redis.io/topics/protocol食用纔是正道。

從上面連接能夠得知,請求能夠分爲2種類型:

  • INLINE: 單個請求
  • MULTIBULK: 以*開頭的多個請求

整個調用流程大概爲:

  • readQueryFromClient(), 客戶端Client讀就緒後,向Socket讀取數據,並存入client的buf中,而後調用processInputBufferAndReplicate()
  • processInputBufferAndReplicate()會根據client的類型(Master的Client 和 其餘Client),分別調用processInputBuffer()對收到的數據進行處理,兩種Client都調用processInputBuffer(),可是Master的Client須要額外處理。
  • processInputBuffer()會對數據進行必定處理,取出client的buf中未處理的數據,並判斷請求的類型(INLINE/MULTIBULK),並將數據的數量和值保存進client->argcclient->argv中,而後調用processCommand()(在server.c中)執行。
  • processCommand()是真正檢查和執行命令的函數。

PS: processCommand()後面單獨寫,主要是懶

readQueryFromClient()

// 客戶端Client讀就緒後,調用的Handler
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    client *c = (client*) privdata;
    int nread, readlen;
    size_t qblen;
    UNUSED(el);
    UNUSED(mask);

    // 默認的讀取緩存大小
    readlen = PROTO_IOBUF_LEN;
    /* If this is a multi bulk request, and we are processing a bulk reply
     * that is large enough, try to maximize the probability that the query
     * buffer contains exactly the SDS string representing the object, even
     * at the risk of requiring more read(2) calls. This way the function
     * processMultiBulkBuffer() can avoid copying buffers to create the
     * Redis Object representing the argument. */
    // 若是是一個批量請求
    if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
        && c->bulklen >= PROTO_MBULK_BIG_ARG)
    {
        ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);

        /* Note that the 'remaining' variable may be zero in some edge case,
         * for example once we resume a blocked client after CLIENT PAUSE. */
        if (remaining > 0 && remaining < readlen) readlen = remaining;
    }

    qblen = sdslen(c->querybuf);
    // 查詢query的長度最大值
    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
    // 擴大c->querybuf的SDS字符串到相應的長度
    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
    // socket中讀取數據
    nread = read(fd, c->querybuf+qblen, readlen);
    if (nread == -1) {
        if (errno == EAGAIN) {
            return;
        } else {
            serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
            freeClient(c);
            return;
        }
    } else if (nread == 0) {
        serverLog(LL_VERBOSE, "Client closed connection");
        freeClient(c);
        return;
    } else if (c->flags & CLIENT_MASTER) {
        /* Append the query buffer to the pending (not applied) buffer
         * of the master. We'll use this buffer later in order to have a
         * copy of the string applied by the last command executed. */
        // 複製拼接字符串,將`c->querybuf+qblen`後的數據,複製拼接到`c->pending_querybuf`後
        // master的鏈接是處理`c->pending_querybuf`的
        c->pending_querybuf = sdscatlen(c->pending_querybuf,
                                        c->querybuf+qblen,nread);
    }
    // 更新sds的長度
    sdsIncrLen(c->querybuf,nread);
    c->lastinteraction = server.unixtime;
    if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
    server.stat_net_input_bytes += nread;
    // 超過最大長度限制
    if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
        sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();

        bytes = sdscatrepr(bytes,c->querybuf,64);
        serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
        sdsfree(ci);
        sdsfree(bytes);
        freeClient(c);
        return;
    }

    /* Time to process the buffer. If the client is a master we need to
     * compute the difference between the applied offset before and after
     * processing the buffer, to understand how much of the replication stream
     * was actually applied to the master state: this quantity, and its
     * corresponding part of the replication stream, will be propagated to
     * the sub-slaves and to the replication backlog. */
    // 處理接收到的數據
    processInputBufferAndReplicate(c);
}

能夠看到readQueryFromClient()的主要工做其實很簡單,主要是從socket中讀取數據,並將其存入client->querybuf

processInputBufferAndReplicate()

/* This is a wrapper for processInputBuffer that also cares about handling
 * the replication forwarding to the sub-slaves, in case the client 'c'
 * is flagged as master. Usually you want to call this instead of the
 * raw processInputBuffer(). */
// 對processInputBuffer()的封裝,主要是master鏈接的處理差別
void processInputBufferAndReplicate(client *c) {
    if (!(c->flags & CLIENT_MASTER)) {
        // 若是這個不是master的client鏈接
        processInputBuffer(c);
    } else {
         // master的client鏈接
        size_t prev_offset = c->reploff;
        processInputBuffer(c);
        size_t applied = c->reploff - prev_offset;
        // applied是計算c->pending_querybuf處理了哪些數據,而後在使用sdsrange()清除已經被處理的
        if (applied) {
            // 用戶將數據代理到該Redis的slaves中
            replicationFeedSlavesFromMasterStream(server.slaves,
                    c->pending_querybuf, applied);
            sdsrange(c->pending_querybuf,applied,-1);
        }
    }
}

processInputBufferAndReplicate()主要是對processInputBuffer()進行一個封裝,其實直接寫在readQueryFromClient()也能夠,但如今這麼作,對於之後添加c->flags不一樣的處理時,更加直觀。

processInputBuffer()

/* This function is called every time, in the client structure 'c', there is
 * more query buffer to process, because we read more data from the socket
 * or because a client was blocked and later reactivated, so there could be
 * pending query buffer, already representing a full command, to process. */
// 讀取數據,判斷數據格式是否正確
void processInputBuffer(client *c) {
    server.current_client = c;

    /* Keep processing while there is something in the input buffer */
    // `c->qb_pos`是已經讀取的`c->querybuf`遊標
    // 因此當`c->qb_pos` >= `c->querybuf`時,就不須要處理了
    while(c->qb_pos < sdslen(c->querybuf)) {
        /* Return if clients are paused. */
        // 若是非slave鏈接,並且當前服務器的全部client都被暫停了
        // 則退出循環
        if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;

        // client 處於CLIENT_BLOCKED,停止
        // TODO: 看看什麼狀況下會處於這個狀態
        /* Immediately abort if the client is in the middle of something. */
        if (c->flags & CLIENT_BLOCKED) break;

        /* Don't process input from the master while there is a busy script
         * condition on the slave. We want just to accumulate the replication
         * stream (instead of replying -BUSY like we do with other clients) and
         * later resume the processing. */
        if (server.lua_timedout && c->flags & CLIENT_MASTER) break;

        /* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is
         * written to the client. Make sure to not let the reply grow after
         * this flag has been set (i.e. don't process more commands).
         *
         * The same applies for clients we want to terminate ASAP. */
        // CLIENT_CLOSE_AFTER_REPLY: 表示響應客戶端的請求後斷開鏈接
        // CLIENT_CLOSE_ASAP: 表示要斷開這個鏈接
        // 因此 `CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP`是已經響應請求,須要斷開的鏈接
        if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;

        /* Determine request type when unknown. */
        // 判斷命令的類型
        if (!c->reqtype) {
            // 當client剛建立的時候`c->reqtype`默認爲0(詳細見`createClient()`)
            // 經過讀數據的第一位判斷是命令類型是MULTIBULK仍是INLINE
            // 詳細協議看 https://redis.io/topics/protocol
            if (c->querybuf[c->qb_pos] == '*') {
                c->reqtype = PROTO_REQ_MULTIBULK; 
            } else {
                c->reqtype = PROTO_REQ_INLINE;
            }
        }

        if (c->reqtype == PROTO_REQ_INLINE) {
            // 處理INLINE請求的buffer
            // 其實就是處理好請求的命令數據,存進去`client->argc`和`client->argv`中
            if (processInlineBuffer(c) != C_OK) break;
        } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
            // 處理MULTIBULK請求
            // 與processInlineBuffer()處理的方式相似,當時協議格式不同
            if (processMultibulkBuffer(c) != C_OK) break;
        } else {
            serverPanic("Unknown request type");
        }

        /* Multibulk processing could see a <= 0 length. */
        if (c->argc == 0) {
            resetClient(c);
        } else {
            /* Only reset the client when the command was executed. */
            // 調用server.c中的processCommand()函數執行命令
            if (processCommand(c) == C_OK) {
                if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
                    /* Update the applied replication offset of our master. */
                    c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
                }

                /* Don't reset the client structure for clients blocked in a
                 * module blocking command, so that the reply callback will
                 * still be able to access the client argv and argc field.
                 * The client will be reset in unblockClientFromModule(). */
                if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE)
                    resetClient(c);
            }
            /* freeMemoryIfNeeded may flush slave output buffers. This may
             * result into a slave, that may be the active client, to be
             * freed. */
            if (server.current_client == NULL) break;
        }
    }

    /* Trim to pos */
    if (c->qb_pos) {
        // 將已經處理的`c->querybuf`中的數據刪除
        sdsrange(c->querybuf,c->qb_pos,-1);
        c->qb_pos = 0;
    }

    server.current_client = NULL;
}

processInputBuffer()最主要的工做就是對鏈接傳入的數據,進行必定格式化(並無檢查內容),方便processCommand()調用,同時經過首字符,判斷請求的類型(INLINE、MULTIBULK)。

相關文章
相關標籤/搜索