Redis 內存管理與事件處理

1 Redis內存管理

Redis內存管理相關文件爲zmalloc.c/zmalloc.h,其只是對C中內存管理函數作了簡單的封裝,屏蔽了底層平臺的差別,並增長了內存使用狀況統計的功能。
void *zmalloc(size_t size) {
    // 多申請的一部份內存用於存儲當前分配了多少本身的內存
    void *ptr = malloc(size+PREFIX_SIZE);
 
    if (!ptr) zmalloc_oom_handler(size);
#ifdef HAVE_MALLOC_SIZE
    update_zmalloc_stat_alloc(zmalloc_size(ptr));
    return ptr;
#else
    *((size_t*)ptr) = size;
    // 內存分配統計
    update_zmalloc_stat_alloc(size+PREFIX_SIZE);
    return (char*)ptr+PREFIX_SIZE;
#endif
}

 內存佈局圖示:c++

 

2 事件處理

Redis的事件類型分爲時間事件和文件事件,文件事件也就是網絡鏈接事件。時間事件的處理是在epoll_wait返回處理文件事件後處理的,每次epoll_wait的超時時間都是Redis最近的一個定時器時間。
 
Redis在進行事件處理前,首先會進行初始化,初始化的主要邏輯在main/initServer函數中。初始化流程主要作的工做以下:
  • 設置信號回調函數。
  • 建立事件循環機制,即調用epoll_create。
  • 建立服務監聽端口,建立定時事件,並將這些事件添加到事件機制中。
void initServer(void) {
    int j;
 
    // 設置信號對應的處理函數
    signal(SIGHUP, SIG_IGN);
    signal(SIGPIPE, SIG_IGN);
    setupSignalHandlers();
    ...
 
    createSharedObjects();
    adjustOpenFilesLimit();
    // 建立事件循環機制,及調用epoll_create建立epollfd用於事件監聽
    server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
    server.db = zmalloc(sizeof(redisDb)*server.dbnum);
 
    /* Open the TCP listening socket for the user commands. */
    // 建立監聽服務端口,socket/bind/listen
    if (server.port != 0 &&
        listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
        exit(1);
    ...
 
    /* Create the Redis databases, and initialize other internal state. */
    for (j = 0; j < server.dbnum; j++) {
        server.db[j].dict = dictCreate(&dbDictType,NULL);
        server.db[j].expires = dictCreate(&keyptrDictType,NULL);
        server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
        server.db[j].ready_keys = dictCreate(&setDictType,NULL);
        server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
        server.db[j].eviction_pool = evictionPoolAlloc();
        server.db[j].id = j;
        server.db[j].avg_ttl = 0;
    }
    ...
 
    /* Create the serverCron() time event, that's our main way to process
     * background operations. 建立定時事件 */
    if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
        serverPanic("Can't create the serverCron time event.");
        exit(1);
    }
 
    /* Create an event handler for accepting new connections in TCP and Unix
     * domain sockets. */
    for (j = 0; j < server.ipfd_count; j++) {
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
            acceptTcpHandler,NULL) == AE_ERR)
            {
                serverPanic(
                    "Unrecoverable error creating server.ipfd file event.");
            }
    }
    // 將事件加入到事件機制中,調用鏈爲 aeCreateFileEvent/aeApiAddEvent/epoll_ctl
    if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
        acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
 
    /* Open the AOF file if needed. */
    if (server.aof_state == AOF_ON) {
        server.aof_fd = open(server.aof_filename,
                               O_WRONLY|O_APPEND|O_CREAT,0644);
        if (server.aof_fd == -1) {
            serverLog(LL_WARNING, "Can't open the append-only file: %s",
                strerror(errno));
            exit(1);
        }
    }
 
    ...
}

 

事件處理流程
事件處理函數鏈:aeMain / aeProcessEvents / aeApiPoll / epoll_wait。
 
常見的事件機制處理流程是:調用epoll_wait等待事件來臨,而後遍歷每個epoll_event,提取epoll_event中的events和data域,data域經常使用來存儲fd或者指針,不過通常的作法是提取出events和data.fd,而後根據fd找到對應的回調函數,fd與對應回調函數之間的映射關係能夠存儲在特定的數據結構中,好比數組或者哈希表,而後調用事件回調函數來處理。
 
Redis中用了一個數組來保存fd與回調函數的映射關係,使用數組的優勢就是簡單高效,可是數組通常使用在創建的鏈接不太多狀況,而Redis正好符合這個狀況,通常Redis的文件事件大都是客戶端創建的鏈接,而客戶端的鏈接個數是必定的,該數量經過配置項maxclients來指定。
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;
 
    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    if (retval > 0) {
        int j;
 
        numevents = retval;
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;
 
            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}
 
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    numevents = aeApiPoll(eventLoop, tvp);
    for (j = 0; j < numevents; j++) {
        // 從eventLoop->events數組中查找對應的回調函數
        aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
        int mask = eventLoop->fired[j].mask;
        int fd = eventLoop->fired[j].fd;
        int rfired = 0;
 
        /* note the fe->mask & mask & ... code: maybe an already processed
         * event removed an element that fired and we still didn't
         * processed, so we check if the event is still valid. */
        if (fe->mask & mask & AE_READABLE) {
            rfired = 1;
            fe->rfileProc(eventLoop,fd,fe->clientData,mask);
        }
        if (fe->mask & mask & AE_WRITABLE) {
            if (!rfired || fe->wfileProc != fe->rfileProc)
                fe->wfileProc(eventLoop,fd,fe->clientData,mask);
        }
        processed++;
    }
    ...
}

 

文件事件的監聽
Redis監聽端口的事件回調函數鏈是:acceptTcpHandler / acceptCommonHandler / createClient / aeCreateFileEvent / aeApiAddEvent / epoll_ctl。 
在Reids監聽事件處理流程中,會將客戶端的鏈接fd添加到事件機制中,並設置其回調函數爲readQueryFromClient,該函數負責處理客戶端的命令請求。
 
命令處理流程
命令處理流程鏈是:readQueryFromClient / processInputBuffer / processCommand / call / 對應命令的回調函數(c->cmd->proc),好比get key命令的處理回調函數爲getCommand。getCommand的執行流程是先到client對應的數據庫字典中根據key來查找數據,而後根據響應消息格式將查詢結果填充到響應消息中。
 

3 如何添加自定義命令

如何在Redis中添加自定的命令呢?其中只須要改動如下幾個地方就好了,好比自定義命令random xxx,而後返回redis: xxx,由於hello xxx和get key相似,因此就依葫蘆畫瓢。random命令用來返回一個小於xxx的隨機值。
 
首先在redisCommandTable數組中添加自定義的命令,redisCommandTable數組定義在server.c中。而後在getCommand定義處後面添加randomCommand的定義,getCommand定義在t_string.c中。最後在server.h中添加helloCommand的聲明。整個修改patch文件以下,代碼基於redis-2.8.9版本。
From 5304020683078273c1bc6cc9666dab95efa18607 Mon Sep 17 00:00:00 2001
From: luoxn28 <luoxn28@163.com>
Date: Fri, 30 Jun 2017 04:43:47 -0700
Subject: [PATCH] add own command: random num
 
---
 src/server.c   |  3 ++-
 src/server.h   |  1 +
 src/t_string.c | 44 ++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 47 insertions(+), 1 deletion(-)
 
diff --git a/src/server.c b/src/server.c
index 609f396..e040104 100644
--- a/src/server.c
+++ b/src/server.c
@@ -296,7 +296,8 @@ struct redisCommand redisCommandTable[] = {
     {"pfdebug",pfdebugCommand,-3,"w",0,NULL,0,0,0,0,0},
     {"post",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
     {"host:",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
-    {"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0}
+    {"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0},
+    {"random",randomCommand,2,"rF",0,NULL,1,1,1,0,0}
 };
  
 struct evictionPoolEntry *evictionPoolAlloc(void);
diff --git a/src/server.h b/src/server.h
index 3fa7c3a..427ac92 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1485,6 +1485,7 @@ void setnxCommand(client *c);
 void setexCommand(client *c);
 void psetexCommand(client *c);
 void getCommand(client *c);
+void randomCommand(client *c);
 void delCommand(client *c);
 void existsCommand(client *c);
 void setbitCommand(client *c);
diff --git a/src/t_string.c b/src/t_string.c
index 8c737c4..df4022d 100644
--- a/src/t_string.c
+++ b/src/t_string.c
@@ -173,6 +173,50 @@ void getCommand(client *c) {
     getGenericCommand(c);
 }
  
+static bool checkRandomNum(char *num)
+{
+    char *c = num;
+
+    while (*c != '\0') {
+        if (!(('0' <= *c) && (*c <= '9'))) {
+            return false;
+        }
+        c++;
+    }
+
+    return true;
+}
+
+/**
+ * command: random n
+ * return a random num < n, if n <= 0, return 0
+ * @author: luoxiangnan
+ */
+void randomCommand(client *c)
+{
+    char buff[64] = {0};
+    int num = 0;
+    robj *o = NULL;
+
+    if (!checkRandomNum(c->argv[1]->ptr)) {
+        o = createObject(OBJ_STRING, sdsnewlen("sorry, it's not a num :(",
+                           strlen("sorry, it's not a num :(")));
+        addReplyBulk(c, o);
+        return;
+    }
+
+    sscanf(c->argv[1]->ptr, "%d", &num);
+    if (num > 0) {
+        num = random() % num;
+    } else {
+        num = 0;
+    }
+
+    sprintf(buff, "%s %d", "redis: ", num);
+    o = createObject(OBJ_STRING, sdsnewlen(buff, strlen(buff)));
+    addReplyBulk(c, o);
+}
+
 void getsetCommand(client *c) {
     if (getGenericCommand(c) == C_ERR) return;
     c->argv[2] = tryObjectEncoding(c->argv[2]);
-- 
1.8.3.1
結果以下所示:
相關文章
相關標籤/搜索