Redis 持久化之 AOF 重寫

由於 AOF 持久化是經過保存被執行的寫命令來記錄數據庫狀態的,因此隨着服務器運行時間的流逝,AOF 文件中的內容會原來越多,文件的體積也會愈來愈大,若不加以控制,體積過大的 AOF 文件極可能對 Redis 服務器、甚至整個宿主計算機形成影響,而且其體積越大,使用 AOF 文件來進行數據還原所須要的時間就越長。

爲防止 aofrewrite 過程阻塞服務器,Redis 服務器會 fork 一個子進程執行該過程,且任什麼時候刻只能有一個子進程作這件事。mysql

server 相關變量

爲了保證 AOF 的連續性,父進程把 aofrewrite 期間的寫命令緩存起來,等子進程重寫以後再追加到新的 AOF 文件。若是 aofrewrite 期間寫命令寫入量較大的話,子進程結束後,父進程的追加就涉及到大量的寫磁盤操做,形成服務性能降低。redis

Redis 經過在父子進程間創建 pipe,把 aofrewrite 期間的寫命令經過 pipe 同步給子進程,這樣一來,追加寫盤的操做也就轉嫁給了子進程。Redis server 中與之相關的變量主要有如下幾個,主要三個 pipe。sql

int aof_pipe_write_data_to_child;
int aof_pipe_read_data_from_parent;
int aof_pipe_write_ack_to_parent;
int aof_pipe_read_ack_from_child;
int aof_pipe_write_ack_to_child;
int aof_pipe_read_ack_from_parent;
int aof_stop_sending_diff; /*If true stop sending accumulated diffs to child process. */
sds aof_child_diff;        /* AOF diff accumulator child side. */

實現原理

aofrewrite 的入口邏輯在 rewriteAppendOnlyFileBackground 函數。數據庫

int rewriteAppendOnlyFileBackground(void) {
    ...
    if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
    ...
}

要確保沒有後臺進程作 aofrewrite 或者 rdb,纔會考慮作本次的 aofrewrite。緩存

pipe 初始化

int rewriteAppendOnlyFileBackground(void) {
   ...
   if (aofCreatePipes() != C_OK) return C_ERR; 
   ...
}
int aofCreatePipes(void) {
    int fds[6] = {-1, -1, -1, -1, -1, -1};
    int j;

    if (pipe(fds) == -1) goto error; /* parent -> children data. */
    if (pipe(fds+2) == -1) goto error; /* children -> parent ack. */
    if (pipe(fds+4) == -1) goto error; /* children -> parent ack. */
    /* Parent -> children data is non blocking. */
    if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error;
    if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error;

    /* 註冊讀事件處理函數,負責處理子進程要求中止數據傳輸的消息 */
    if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;

    server.aof_pipe_write_data_to_child = fds[1];
    server.aof_pipe_read_data_from_parent = fds[0];
    server.aof_pipe_write_ack_to_parent = fds[3];
    server.aof_pipe_read_ack_from_child = fds[2];
    server.aof_pipe_write_ack_to_child = fds[5];
    server.aof_pipe_read_ack_from_parent = fds[4];
    server.aof_stop_sending_diff = 0; /* 是否中止管道傳輸標記位 */
    return C_OK;

error:
    serverLog(LL_WARNING,"Error opening /setting AOF rewrite IPC pipes: %s",
        strerror(errno));
    for (j = 0; j < 6; j++) if(fds[j] != -1) close(fds[j]);
    return C_ERR;
}

aofCreatePipes 函數中,對 pipe 進行初始化,pipe 各變量的用處從名字也能夠看出來,一共有三條 pipe,每條 pipe 一來一回,佔用兩個 fd。服務器

pipe 1 用於父進程向子進程發送緩存的新數據。子進程在 aofrewrite 時,會按期從該管道中讀取數據並緩存起來,並在最後將緩存的數據寫入重寫的新 AOF 文件,這兩個 fd 都設置爲非阻塞式的。app

pipe 2 負責子進程向父進程發送結束信號。父進程監聽 fds[2] 讀事件,回調函數爲 aofChildPipeReadable。父進程不斷地接收客戶端命令,可是子進程不可能無休止地等待父進程的數據,所以,子進程在遍歷完數據庫全部數據以後,從 pipe 1 中執行一段時間的讀取操做後,就會向 pipe 2 中發送一個特殊標記 "!",父進程收到子進程的 "!" 後,就會置 server.aof_stop_sending_diff 爲 1,表示再也不向父進程發送緩存數據了。socket

pipe 3 負責父進程向子進程發送應答信號。父進程收到子進程的 "!" 後,會經過該管道也向子進程應答一個 "!",表示已收到了中止信號。ide

詳細過程後面會細說。函數

父進程處理邏輯

rewriteAppendOnlyFileBackground 函數

接着上面的邏輯,server fork 出一個子進程,兩個進程分別作各有不一樣的處理,下面先看父進程的一些主要處理(代碼有刪減)。

int rewriteAppendOnlyFileBackground(void) {
    ...
    if ((childpid = fork()) == 0) {
        ... ...
    } else {
        server.aof_rewrite_scheduled = 0;
        server.aof_child_pid = childpid;
        updateDictResizePolicy();
        server.aof_selected_db = -1;
        replicationScriptCacheFlush();
        return C_OK;
    }
    ...
}

server.aof_rewrite_scheduled 置零,防止在 serverCron 函數中重複觸發 aofrewrite,這時由於 serverCron 中有以下邏輯,

int rewriteAppendOnlyFileBackground(void) {
    ...
    if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
        server.aof_rewrite_scheduled)
    {
        rewriteAppendOnlyFileBackground();
    }
    ...
}

這裏,updateDictResizePolicy 函數所作的操做是很重要的,以下,

void updateDictResizePolicy(void) {
    if (server.rdb_child_pid == -1 && server.aof_child_pid == -1)
        dictEnableResize();
    else
        dictDisableResize();
}

也就是說,在後臺有子進程作 aofrewrite 或 rdb 時,就不要作 dict rehash 了。如今大多數操做系統都採用寫時複製(copy-on-write)來優化子進程的使用效率,因此在子進程存在期間,應該避免沒必要要的內存寫入,不然會引發大量的內存 copy,影響性能。COW 的知識能夠參考文檔 《Copy On Write機制瞭解一下》。

另外,server.aof_selected_db 置爲 -1,是爲了在子進程進行數據庫掃描時插入 select 命令,以便選擇正確的數據庫。

aofRewriteBufferAppend 函數

在上一篇博客中說過,在 feedAppendOnlyFile 函數 append 寫命令時,若是當前有子進程在作 aofrewrite 時,須要將寫命令寫到 server.aof_rewrite_buf_blocks 中一份。該變量是一個鏈表,其中每一個節點最大10MB。

void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
    ...
    if (server.aof_child_pid != -1)
        aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
}
void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
    ... ...
    /* Install a file event to send data to the rewrite child if there is
     * not one already. */
    if (aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0) {
        aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child,
            AE_WRITABLE, aofChildWriteDiffData, NULL);
    }
}

server.aof_pipe_write_data_to_child 註冊寫事件,回調函數爲 aofChildWriteDiffData

void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {
    listNode *ln;
    aofrwblock *block;
    ssize_t nwritten;
    UNUSED(el);
    UNUSED(fd);
    UNUSED(privdata);
    UNUSED(mask);

    while(1) {
        ln = listFirst(server.aof_rewrite_buf_blocks);
        block = ln ? ln->value : NULL;
        if (server.aof_stop_sending_diff || !block) {
            aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child,
                              AE_WRITABLE);
            return;
        }
        if (block->used > 0) {
            nwritten = write(server.aof_pipe_write_data_to_child,
                             block->buf,block->used);
            if (nwritten <= 0) return;
            memmove(block->buf,block->buf+nwritten,block->used-nwritten);
            block->used -= nwritten;
        }
        if (block->used == 0) listDelNode(server.aof_rewrite_buf_blocks,ln);
    }
}

當子進程告訴父進程不要發數據(server.aof_stop_sending_diff = 1)或者 server.aof_rewrite_buf_blocks 爲空時,刪除寫事件。

不然,往 pipe1 中寫入數據,而後寫入的數據從 server.aof_rewrite_buf_blocks 刪掉。

子進程處理邏輯

int rewriteAppendOnlyFileBackground(void) {
    ...
    char tmpfile[256];
    closeListeningSockets(0);               /* child 關閉沒必要要的 socket */
    redisSetProcTitle("redis-aof-rewrite"); /* 修改進程名爲 redis-aof-rewrite */
    snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
    ...
}

首先作一些必要的處理,臨時 AOF 文件名爲 temp-rewriteaof-bg-%d.aof

而後進入正式的處理函數 rewriteAppendOnlyFile,如下貼上主要代碼(有刪減)。

int rewriteAppendOnlyFile(char *filename) {
    ...
    snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
    fp = fopen(tmpfile,"w");
    server.aof_child_diff = sdsempty(); /* 初始化 aof_child_diff */
    ...
}

aof_child_diff 變量中存放在 aofwrite 期間,子進程接收到父進程經過 pipe 傳過來的緩存數據。

而後就是掃描數據庫的操做。

int rewriteAppendOnlyFile(char *filename) {
    ...
    rio aof;
    for (j = 0; j < server.dbnum; j++) {
        redisDb *db = server.db+j;
        dict *d = db->dict;
        if (dictSize(d) == 0) continue; // skip empty database
        di = dictGetSafeIterator(d);
        while((de = dictNext(di)) != NULL) {
            ... ...
            if (aof.processed_bytes > processed+1024*10) { // 10K
                processed = aof.processed_bytes;
                aofReadDiffFromParent();
            }
        }
        dictReleaseIterator(di);
        di = NULL;
    }
    if (fflush(fp) == EOF) goto werr;
    if (fsync(fileno(fp)) == -1) goto werr;
    ...
}

以上邏輯裏,子進程會挨個 db 掃描每個 key,根據 key 的類型使用不一樣的函數進行數據重寫,帶過時時間的數據,都須要 append 一個 PEXPIREAT 命令。

有一點須要注意,前面說到利用 pipe 優化 aofwrite,能夠看到上面的邏輯,每遍歷一個 db,若是 rio 寫入的數據量超過了 10K,那麼就經過 pipe 從父進程讀一次數據,將數據累加到 server.aof_child_diff

ssize_t aofReadDiffFromParent(void) {
    char buf[65536]; /* Default pipe buffer size on most Linux systems. */
    ssize_t nread, total = 0;

    while ((nread = read(server.aof_pipe_read_data_from_parent,buf,sizeof(buf))) > 0) {
        server.aof_child_diff = sdscatlen(server.aof_child_diff,buf,nread);
        total += nread;
    }
    return total;
}

由於,有客戶端可能不斷有流量打到父進程,子進程不可能一直等父進程,因此要有一個結束的時刻, Redis 中作了以下決定。

int rewriteAppendOnlyFile(char *filename) {
    ...
    int nodata = 0;
    mstime_t start = mstime();
    while(mstime()-start < 1000 && nodata < 20) {
        /* 在1ms以內,查看從父進程讀數據的 fd 是否變成可讀的,若不可讀則aeWait()函數返回0 */
        if (aeWait(server.aof_pipe_read_data_from_parent, AE_READABLE, 1) <= 0)
        {
            nodata++;
            continue;
        }
        // 當管道的讀端可讀時,清零nodata
        nodata = 0;
        aofReadDiffFromParent();
    }
    ...
}

1ms 超時等待父進程從 pipe 傳來數據,若是在 1ms 內有 20 次父進程沒傳來數據,那麼就放棄 ReadDiffFromParent。因爲 server.aof_pipe_read_data_from_parent 在初始化時設置爲非阻塞,所以 aeWait 調用返回很快。

if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr;

接着經過 pipe2 告訴父進程(發特殊符號 !)不要再發來緩存數據了。

還記得前面初始化時,父進程一直在監聽 server.aof_pipe_read_ack_from_child 的可讀事件吧?當收到 「!」 後,父進程調用處理函數 aofChildPipeReadable

void aofChildPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask) {
    char byte;
    if (read(fd,&byte,1) == 1 && byte == '!') {
        serverLog(LL_NOTICE,"AOF rewrite child asks to stop sending diffs.");
        server.aof_stop_sending_diff = 1;
        if (write(server.aof_pipe_write_ack_to_child,"!",1) != 1) {
            serverLog(LL_WARNING,"Can't send ACK to AOF child: %s",
                strerror(errno));
        }
    }
    /* Remove the handler since this can be called only one time during a
     * rewrite. */
    aeDeleteFileEvent(server.el,server.aof_pipe_read_ack_from_child,AE_READABLE);
}

能夠看到 server.aof_stop_sending_diff 置爲 1,表示再也不給子進程發送緩存數據,接着刪除 server.aof_pipe_read_ack_from_child 上可讀事件,給子進程回覆一個 「!」。

如今回來看子進程的行爲。

int rewriteAppendOnlyFile(char *filename) {
    ...
    if (syncRead(server.aof_pipe_read_ack_from_parent,&byte,1,5000) != 1 || byte != '!') 
        goto werr;
    ...
}

子進程阻塞 5s 等待父進程發來確認標記 「!」,以後就開始作本身的收尾工做,以下:

int rewriteAppendOnlyFile(char *filename) {
    ...
    aofReadDiffFromParent(); /* 最後一次從父進程累計寫入的緩衝區的差別 */

    /* 將子進程aof_child_diff 中保存的差別數據寫到 AOF 文件中 */
    if (rioWrite(&aof,server.aof_child_diff,sdslen(server.aof_child_diff)) == 0)
        goto werr;

    /* Make sure data will not remain on the OS's output buffers */
    if (fflush(fp) == EOF) goto werr;
    if (fsync(fileno(fp)) == -1) goto werr;
    if (fclose(fp) == EOF) goto werr;

    /* 原子性修改臨時文件的名字爲 temp-rewriteaof-bg-<pid>.aof */
    if (rename(tmpfile,filename) == -1) {
        unlink(tmpfile);
        return C_ERR;
    }
    ...
}

最後再讀取一次 pipe 中的數據,將子進程進行 aofrewrite 期間,aof_child_diff 從父進程累積的數據刷盤,最後進行 rename 系統調用。

通過以上的邏輯處理,server 交給子進程的 aofrewrite 工做就完成了,最終獲得一個文件 temp-rewriteaof-bg-<pid>.aof,成功返回 0,不然返回1。

父進程的收尾工做

子進程在執行完 aofrewrite 後退出,父進程 wait3 到子進程的退出狀態後,進行 aofrewrite 的收尾工做。在 serverCron 函數裏,有以下邏輯,

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    ...
    if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) { /* wait3 等待全部子進程 */
        int exitcode = WEXITSTATUS(statloc);
        int bysignal = 0;

        if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);

        if (pid == -1) {
            serverLog(LL_WARNING,"wait3() returned an error: %s. "
                      "rdb_child_pid = %d, aof_child_pid = %d",
                      strerror(errno),
                      (int) server.rdb_child_pid,
                      (int) server.aof_child_pid);
        } else if (pid == server.rdb_child_pid) {
            backgroundSaveDoneHandler(exitcode,bysignal);
        } else if (pid == server.aof_child_pid) { /* aof 子進程結束 */
            backgroundRewriteDoneHandler(exitcode,bysignal);
        } else {
            if (!ldbRemoveChild(pid)) {
                serverLog(LL_WARNING,
                          "Warning, detected child with unmatched pid: %ld",
                          (long)pid);
            }
        }
        updateDictResizePolicy(); /* 更新 dict resize 爲可用狀態 */
    }
    ...
}

wait3 函數表示父進程等待全部子進程的返回值, WNOHANG 選項表示沒有子進程 exit 時當即返回,man 中對該選項有以下說明, 」WNOHANG return immediately if no child has exited「。

能夠看到若是等到 aofwrite 的子進程 exit,那麼使用 backgroundRewriteDoneHandler 函數進行處理,主要以下(代碼有刪減),

void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
    ...
    snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int)server.aof_child_pid);
    newfd = open(tmpfile,O_WRONLY|O_APPEND);
    if (aofRewriteBufferWrite(newfd) == -1) {
        close(newfd);
        goto cleanup;
    }
    ...
}

打開子進程生成的臨時文件 temp-rewriteaof-bg-<pid>.aof,調用 aofRewriteBufferWrite,將服務器緩存的剩下的新數據寫入該臨時文件中,這樣該 AOF 臨時文件就徹底與當前數據庫狀態一致了。

那麼,下面還有兩件事要作,一是將臨時 AOF 文件更名,二是切換 fd。

void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
    ...
    if (server.aof_fd == -1) {
        /* AOF disabled */
        oldfd = open(server.aof_filename,O_RDONLY|O_NONBLOCK);
    } else {
        /* AOF enabled */
        oldfd = -1; /* We'll set this to the current AOF filedes later. */
    }
    if (rename(tmpfile,server.aof_filename) == -1) {
        close(newfd);
        if (oldfd != -1) close(oldfd);
        goto cleanup;
    }

    if (server.aof_fd == -1) {
        /* AOF disabled, we don't need to set the AOF file descriptor
         * to this new file, so we can close it. */
        close(newfd);
    } else {
        /* AOF enabled, replace the old fd with the new one. */
        oldfd = server.aof_fd;
        server.aof_fd = newfd;
        if (server.aof_fsync == AOF_FSYNC_ALWAYS)
            aof_fsync(newfd);
        else if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
            aof_background_fsync(newfd);
        server.aof_selected_db = -1; /* Make sure SELECT is re-issued */
        aofUpdateCurrentSize();
        server.aof_rewrite_base_size = server.aof_current_size;

        /* Clear regular AOF buffer since its contents was just written to
         * the new AOF from the background rewrite buffer. */
        sdsfree(server.aof_buf);
        server.aof_buf = sdsempty();
    }
    ...  ...
    /* Asynchronously close the overwritten AOF. */
    if (oldfd != -1) bioCreateBackgroundJob(BIO_CLOSE_FILE,(void*)(long)oldfd,NULL,NULL);
    ...
}

如上,首先將臨時 AOF 文件更名,而後就是 oldfd 和 newfd 的處理了,分兩種狀況

當 AOF 功能關閉時,打開原來的 AOF 文件,得到 oldfd,這裏並不關心該操做是不是成功的,若是失敗了,那麼 oldfd 值爲 -1,close(newfd)

當 AOF 功能開啓時,oldfd 直接置爲 -1,將 aof_fd 切換成 newfd,根據不一樣的數據刷盤策略進行 AOF 刷盤,更新相應的參數。

而後是關閉 oldfd 的邏輯,因爲 oldfd 多是對舊 AOF 文件的最後一個引用,直接 close 可能會阻塞 server,所以建立後臺任務去關閉文件。

最後進行清理工做,以下,

void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
    ...
    cleanup:
        aofClosePipes();
        aofRewriteBufferReset();
        aofRemoveTempFile(server.aof_child_pid);
        server.aof_child_pid = -1;
        server.aof_rewrite_time_last = time(NULL)-server.aof_rewrite_time_start;
        server.aof_rewrite_time_start = -1;
        /* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */
        if (server.aof_state == AOF_WAIT_REWRITE)
            server.aof_rewrite_scheduled = 1;
    ...
}

以上, 父進程就完成了收尾工做,寫命令就 write 到 newfd 了。

時序圖

能夠將以上父子進程的交互整理出時序圖以下,

上圖參考 Redis · 原理介紹 · 利用管道優化aofrewrite

什麼時候重寫

有兩個時刻能夠觸發 AOF 重寫。

【1】手動執行 BGREWRITEAOF 命令。

【2】自動執行,在 serverCron 函數中根據必定邏輯進行斷定。

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    ...
          /* Trigger an AOF rewrite if needed */
    if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
        server.aof_rewrite_perc &&
        server.aof_current_size > server.aof_rewrite_min_size) /* 默認 64M */
    {
        long long base = server.aof_rewrite_base_size ? server.aof_rewrite_base_size : 1;
        long long growth = (server.aof_current_size*100/base) - 100;
        if (growth >= server.aof_rewrite_perc) {
            rewriteAppendOnlyFileBackground();
        }
     }
}

也就是說 AOF 文件大小超過了 server.aof_rewrite_min_size,而且增加率大於 server.aof_rewrite_perc 時就會觸發,增加率計算的基數 server.aof_rewrite_base_size 是上次 aofrewrite 結束後 AOF 文件的大小。

附錄

幾個解釋。

阻塞模式下,進程或是線程執行到這些函數時必須等待某個事件的發生,若是事件沒有發生,進程或線程就被阻塞(死等在被阻塞的地方),函數不會當即返回。

非阻塞non-block模式下,進程或線程執行此函數時沒必要非要等待事件的發生,一旦執行確定返回,以返回值的不一樣來反映函數的執行狀況,若是事件發生則與阻塞方式相同,若事件沒有發生則返回一個代碼來告知事件未發生,而進程或線程繼續執行,因此非阻塞模式效率較高。

相關文章
相關標籤/搜索