由於 AOF 持久化是經過保存被執行的寫命令來記錄數據庫狀態的,因此隨着服務器運行時間的流逝,AOF 文件中的內容會原來越多,文件的體積也會愈來愈大,若不加以控制,體積過大的 AOF 文件極可能對 Redis 服務器、甚至整個宿主計算機形成影響,而且其體積越大,使用 AOF 文件來進行數據還原所須要的時間就越長。
爲防止 aofrewrite 過程阻塞服務器,Redis 服務器會 fork
一個子進程執行該過程,且任什麼時候刻只能有一個子進程作這件事。mysql
爲了保證 AOF 的連續性,父進程把 aofrewrite 期間的寫命令緩存起來,等子進程重寫以後再追加到新的 AOF 文件。若是 aofrewrite 期間寫命令寫入量較大的話,子進程結束後,父進程的追加就涉及到大量的寫磁盤操做,形成服務性能降低。redis
Redis 經過在父子進程間創建 pipe,把 aofrewrite 期間的寫命令經過 pipe 同步給子進程,這樣一來,追加寫盤的操做也就轉嫁給了子進程。Redis server 中與之相關的變量主要有如下幾個,主要三個 pipe。sql
int aof_pipe_write_data_to_child; int aof_pipe_read_data_from_parent; int aof_pipe_write_ack_to_parent; int aof_pipe_read_ack_from_child; int aof_pipe_write_ack_to_child; int aof_pipe_read_ack_from_parent; int aof_stop_sending_diff; /*If true stop sending accumulated diffs to child process. */ sds aof_child_diff; /* AOF diff accumulator child side. */
aofrewrite 的入口邏輯在 rewriteAppendOnlyFileBackground
函數。數據庫
int rewriteAppendOnlyFileBackground(void) { ... if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR; ... }
要確保沒有後臺進程作 aofrewrite 或者 rdb,纔會考慮作本次的 aofrewrite。緩存
int rewriteAppendOnlyFileBackground(void) { ... if (aofCreatePipes() != C_OK) return C_ERR; ... }
int aofCreatePipes(void) { int fds[6] = {-1, -1, -1, -1, -1, -1}; int j; if (pipe(fds) == -1) goto error; /* parent -> children data. */ if (pipe(fds+2) == -1) goto error; /* children -> parent ack. */ if (pipe(fds+4) == -1) goto error; /* children -> parent ack. */ /* Parent -> children data is non blocking. */ if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error; if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error; /* 註冊讀事件處理函數,負責處理子進程要求中止數據傳輸的消息 */ if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error; server.aof_pipe_write_data_to_child = fds[1]; server.aof_pipe_read_data_from_parent = fds[0]; server.aof_pipe_write_ack_to_parent = fds[3]; server.aof_pipe_read_ack_from_child = fds[2]; server.aof_pipe_write_ack_to_child = fds[5]; server.aof_pipe_read_ack_from_parent = fds[4]; server.aof_stop_sending_diff = 0; /* 是否中止管道傳輸標記位 */ return C_OK; error: serverLog(LL_WARNING,"Error opening /setting AOF rewrite IPC pipes: %s", strerror(errno)); for (j = 0; j < 6; j++) if(fds[j] != -1) close(fds[j]); return C_ERR; }
在 aofCreatePipes
函數中,對 pipe 進行初始化,pipe 各變量的用處從名字也能夠看出來,一共有三條 pipe,每條 pipe 一來一回,佔用兩個 fd。服務器
pipe 1 用於父進程向子進程發送緩存的新數據。子進程在 aofrewrite 時,會按期從該管道中讀取數據並緩存起來,並在最後將緩存的數據寫入重寫的新 AOF 文件,這兩個 fd 都設置爲非阻塞式的。app
pipe 2 負責子進程向父進程發送結束信號。父進程監聽 fds[2] 讀事件,回調函數爲 aofChildPipeReadable。父進程不斷地接收客戶端命令,可是子進程不可能無休止地等待父進程的數據,所以,子進程在遍歷完數據庫全部數據以後,從 pipe 1 中執行一段時間的讀取操做後,就會向 pipe 2 中發送一個特殊標記 "!",父進程收到子進程的 "!" 後,就會置 server.aof_stop_sending_diff 爲 1,表示再也不向父進程發送緩存數據了。socket
pipe 3 負責父進程向子進程發送應答信號。父進程收到子進程的 "!" 後,會經過該管道也向子進程應答一個 "!",表示已收到了中止信號。ide
詳細過程後面會細說。函數
接着上面的邏輯,server fork
出一個子進程,兩個進程分別作各有不一樣的處理,下面先看父進程的一些主要處理(代碼有刪減)。
int rewriteAppendOnlyFileBackground(void) { ... if ((childpid = fork()) == 0) { ... ... } else { server.aof_rewrite_scheduled = 0; server.aof_child_pid = childpid; updateDictResizePolicy(); server.aof_selected_db = -1; replicationScriptCacheFlush(); return C_OK; } ... }
server.aof_rewrite_scheduled 置零,防止在 serverCron
函數中重複觸發 aofrewrite,這時由於 serverCron
中有以下邏輯,
int rewriteAppendOnlyFileBackground(void) { ... if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 && server.aof_rewrite_scheduled) { rewriteAppendOnlyFileBackground(); } ... }
這裏,updateDictResizePolicy
函數所作的操做是很重要的,以下,
void updateDictResizePolicy(void) { if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) dictEnableResize(); else dictDisableResize(); }
也就是說,在後臺有子進程作 aofrewrite 或 rdb 時,就不要作 dict rehash 了。如今大多數操做系統都採用寫時複製(copy-on-write)來優化子進程的使用效率,因此在子進程存在期間,應該避免沒必要要的內存寫入,不然會引發大量的內存 copy,影響性能。COW 的知識能夠參考文檔 《Copy On Write機制瞭解一下》。
另外,server.aof_selected_db 置爲 -1,是爲了在子進程進行數據庫掃描時插入 select 命令,以便選擇正確的數據庫。
在上一篇博客中說過,在 feedAppendOnlyFile
函數 append 寫命令時,若是當前有子進程在作 aofrewrite 時,須要將寫命令寫到 server.aof_rewrite_buf_blocks 中一份。該變量是一個鏈表,其中每一個節點最大10MB。
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) { ... if (server.aof_child_pid != -1) aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf)); }
void aofRewriteBufferAppend(unsigned char *s, unsigned long len) { ... ... /* Install a file event to send data to the rewrite child if there is * not one already. */ if (aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0) { aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child, AE_WRITABLE, aofChildWriteDiffData, NULL); } }
爲 server.aof_pipe_write_data_to_child 註冊寫事件,回調函數爲 aofChildWriteDiffData
。
void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) { listNode *ln; aofrwblock *block; ssize_t nwritten; UNUSED(el); UNUSED(fd); UNUSED(privdata); UNUSED(mask); while(1) { ln = listFirst(server.aof_rewrite_buf_blocks); block = ln ? ln->value : NULL; if (server.aof_stop_sending_diff || !block) { aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child, AE_WRITABLE); return; } if (block->used > 0) { nwritten = write(server.aof_pipe_write_data_to_child, block->buf,block->used); if (nwritten <= 0) return; memmove(block->buf,block->buf+nwritten,block->used-nwritten); block->used -= nwritten; } if (block->used == 0) listDelNode(server.aof_rewrite_buf_blocks,ln); } }
當子進程告訴父進程不要發數據(server.aof_stop_sending_diff = 1)或者 server.aof_rewrite_buf_blocks 爲空時,刪除寫事件。
不然,往 pipe1 中寫入數據,而後寫入的數據從 server.aof_rewrite_buf_blocks 刪掉。
int rewriteAppendOnlyFileBackground(void) { ... char tmpfile[256]; closeListeningSockets(0); /* child 關閉沒必要要的 socket */ redisSetProcTitle("redis-aof-rewrite"); /* 修改進程名爲 redis-aof-rewrite */ snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid()); ... }
首先作一些必要的處理,臨時 AOF 文件名爲 temp-rewriteaof-bg-%d.aof。
而後進入正式的處理函數 rewriteAppendOnlyFile
,如下貼上主要代碼(有刪減)。
int rewriteAppendOnlyFile(char *filename) { ... snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid()); fp = fopen(tmpfile,"w"); server.aof_child_diff = sdsempty(); /* 初始化 aof_child_diff */ ... }
aof_child_diff 變量中存放在 aofwrite 期間,子進程接收到父進程經過 pipe 傳過來的緩存數據。
而後就是掃描數據庫的操做。
int rewriteAppendOnlyFile(char *filename) { ... rio aof; for (j = 0; j < server.dbnum; j++) { redisDb *db = server.db+j; dict *d = db->dict; if (dictSize(d) == 0) continue; // skip empty database di = dictGetSafeIterator(d); while((de = dictNext(di)) != NULL) { ... ... if (aof.processed_bytes > processed+1024*10) { // 10K processed = aof.processed_bytes; aofReadDiffFromParent(); } } dictReleaseIterator(di); di = NULL; } if (fflush(fp) == EOF) goto werr; if (fsync(fileno(fp)) == -1) goto werr; ... }
以上邏輯裏,子進程會挨個 db 掃描每個 key,根據 key 的類型使用不一樣的函數進行數據重寫,帶過時時間的數據,都須要 append 一個 PEXPIREAT 命令。
有一點須要注意,前面說到利用 pipe 優化 aofwrite,能夠看到上面的邏輯,每遍歷一個 db,若是 rio 寫入的數據量超過了 10K,那麼就經過 pipe 從父進程讀一次數據,將數據累加到 server.aof_child_diff。
ssize_t aofReadDiffFromParent(void) { char buf[65536]; /* Default pipe buffer size on most Linux systems. */ ssize_t nread, total = 0; while ((nread = read(server.aof_pipe_read_data_from_parent,buf,sizeof(buf))) > 0) { server.aof_child_diff = sdscatlen(server.aof_child_diff,buf,nread); total += nread; } return total; }
由於,有客戶端可能不斷有流量打到父進程,子進程不可能一直等父進程,因此要有一個結束的時刻, Redis 中作了以下決定。
int rewriteAppendOnlyFile(char *filename) { ... int nodata = 0; mstime_t start = mstime(); while(mstime()-start < 1000 && nodata < 20) { /* 在1ms以內,查看從父進程讀數據的 fd 是否變成可讀的,若不可讀則aeWait()函數返回0 */ if (aeWait(server.aof_pipe_read_data_from_parent, AE_READABLE, 1) <= 0) { nodata++; continue; } // 當管道的讀端可讀時,清零nodata nodata = 0; aofReadDiffFromParent(); } ... }
1ms 超時等待父進程從 pipe 傳來數據,若是在 1ms 內有 20 次父進程沒傳來數據,那麼就放棄 ReadDiffFromParent。因爲 server.aof_pipe_read_data_from_parent 在初始化時設置爲非阻塞,所以 aeWait
調用返回很快。
if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr;
接着經過 pipe2 告訴父進程(發特殊符號 !)不要再發來緩存數據了。
還記得前面初始化時,父進程一直在監聽 server.aof_pipe_read_ack_from_child 的可讀事件吧?當收到 「!」 後,父進程調用處理函數 aofChildPipeReadable
。
void aofChildPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask) { char byte; if (read(fd,&byte,1) == 1 && byte == '!') { serverLog(LL_NOTICE,"AOF rewrite child asks to stop sending diffs."); server.aof_stop_sending_diff = 1; if (write(server.aof_pipe_write_ack_to_child,"!",1) != 1) { serverLog(LL_WARNING,"Can't send ACK to AOF child: %s", strerror(errno)); } } /* Remove the handler since this can be called only one time during a * rewrite. */ aeDeleteFileEvent(server.el,server.aof_pipe_read_ack_from_child,AE_READABLE); }
能夠看到 server.aof_stop_sending_diff
置爲 1,表示再也不給子進程發送緩存數據,接着刪除 server.aof_pipe_read_ack_from_child 上可讀事件,給子進程回覆一個 「!」。
如今回來看子進程的行爲。
int rewriteAppendOnlyFile(char *filename) { ... if (syncRead(server.aof_pipe_read_ack_from_parent,&byte,1,5000) != 1 || byte != '!') goto werr; ... }
子進程阻塞 5s 等待父進程發來確認標記 「!」,以後就開始作本身的收尾工做,以下:
int rewriteAppendOnlyFile(char *filename) { ... aofReadDiffFromParent(); /* 最後一次從父進程累計寫入的緩衝區的差別 */ /* 將子進程aof_child_diff 中保存的差別數據寫到 AOF 文件中 */ if (rioWrite(&aof,server.aof_child_diff,sdslen(server.aof_child_diff)) == 0) goto werr; /* Make sure data will not remain on the OS's output buffers */ if (fflush(fp) == EOF) goto werr; if (fsync(fileno(fp)) == -1) goto werr; if (fclose(fp) == EOF) goto werr; /* 原子性修改臨時文件的名字爲 temp-rewriteaof-bg-<pid>.aof */ if (rename(tmpfile,filename) == -1) { unlink(tmpfile); return C_ERR; } ... }
最後再讀取一次 pipe 中的數據,將子進程進行 aofrewrite 期間,aof_child_diff 從父進程累積的數據刷盤,最後進行 rename
系統調用。
通過以上的邏輯處理,server 交給子進程的 aofrewrite 工做就完成了,最終獲得一個文件 temp-rewriteaof-bg-<pid>.aof,成功返回 0,不然返回1。
子進程在執行完 aofrewrite 後退出,父進程 wait3
到子進程的退出狀態後,進行 aofrewrite 的收尾工做。在 serverCron
函數裏,有以下邏輯,
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { ... if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) { /* wait3 等待全部子進程 */ int exitcode = WEXITSTATUS(statloc); int bysignal = 0; if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc); if (pid == -1) { serverLog(LL_WARNING,"wait3() returned an error: %s. " "rdb_child_pid = %d, aof_child_pid = %d", strerror(errno), (int) server.rdb_child_pid, (int) server.aof_child_pid); } else if (pid == server.rdb_child_pid) { backgroundSaveDoneHandler(exitcode,bysignal); } else if (pid == server.aof_child_pid) { /* aof 子進程結束 */ backgroundRewriteDoneHandler(exitcode,bysignal); } else { if (!ldbRemoveChild(pid)) { serverLog(LL_WARNING, "Warning, detected child with unmatched pid: %ld", (long)pid); } } updateDictResizePolicy(); /* 更新 dict resize 爲可用狀態 */ } ... }
wait3
函數表示父進程等待全部子進程的返回值, WNOHANG 選項表示沒有子進程 exit 時當即返回,man 中對該選項有以下說明, 」WNOHANG return immediately if no child has exited「。
能夠看到若是等到 aofwrite 的子進程 exit,那麼使用 backgroundRewriteDoneHandler
函數進行處理,主要以下(代碼有刪減),
void backgroundRewriteDoneHandler(int exitcode, int bysignal) { ... snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int)server.aof_child_pid); newfd = open(tmpfile,O_WRONLY|O_APPEND); if (aofRewriteBufferWrite(newfd) == -1) { close(newfd); goto cleanup; } ... }
打開子進程生成的臨時文件 temp-rewriteaof-bg-<pid>.aof,調用 aofRewriteBufferWrite
,將服務器緩存的剩下的新數據寫入該臨時文件中,這樣該 AOF 臨時文件就徹底與當前數據庫狀態一致了。
那麼,下面還有兩件事要作,一是將臨時 AOF 文件更名,二是切換 fd。
void backgroundRewriteDoneHandler(int exitcode, int bysignal) { ... if (server.aof_fd == -1) { /* AOF disabled */ oldfd = open(server.aof_filename,O_RDONLY|O_NONBLOCK); } else { /* AOF enabled */ oldfd = -1; /* We'll set this to the current AOF filedes later. */ } if (rename(tmpfile,server.aof_filename) == -1) { close(newfd); if (oldfd != -1) close(oldfd); goto cleanup; } if (server.aof_fd == -1) { /* AOF disabled, we don't need to set the AOF file descriptor * to this new file, so we can close it. */ close(newfd); } else { /* AOF enabled, replace the old fd with the new one. */ oldfd = server.aof_fd; server.aof_fd = newfd; if (server.aof_fsync == AOF_FSYNC_ALWAYS) aof_fsync(newfd); else if (server.aof_fsync == AOF_FSYNC_EVERYSEC) aof_background_fsync(newfd); server.aof_selected_db = -1; /* Make sure SELECT is re-issued */ aofUpdateCurrentSize(); server.aof_rewrite_base_size = server.aof_current_size; /* Clear regular AOF buffer since its contents was just written to * the new AOF from the background rewrite buffer. */ sdsfree(server.aof_buf); server.aof_buf = sdsempty(); } ... ... /* Asynchronously close the overwritten AOF. */ if (oldfd != -1) bioCreateBackgroundJob(BIO_CLOSE_FILE,(void*)(long)oldfd,NULL,NULL); ... }
如上,首先將臨時 AOF 文件更名,而後就是 oldfd 和 newfd 的處理了,分兩種狀況:
當 AOF 功能關閉時,打開原來的 AOF 文件,得到 oldfd,這裏並不關心該操做是不是成功的,若是失敗了,那麼 oldfd 值爲 -1,close(newfd)
。
當 AOF 功能開啓時,oldfd 直接置爲 -1,將 aof_fd 切換成 newfd,根據不一樣的數據刷盤策略進行 AOF 刷盤,更新相應的參數。
而後是關閉 oldfd 的邏輯,因爲 oldfd 多是對舊 AOF 文件的最後一個引用,直接 close
可能會阻塞 server,所以建立後臺任務去關閉文件。
最後進行清理工做,以下,
void backgroundRewriteDoneHandler(int exitcode, int bysignal) { ... cleanup: aofClosePipes(); aofRewriteBufferReset(); aofRemoveTempFile(server.aof_child_pid); server.aof_child_pid = -1; server.aof_rewrite_time_last = time(NULL)-server.aof_rewrite_time_start; server.aof_rewrite_time_start = -1; /* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */ if (server.aof_state == AOF_WAIT_REWRITE) server.aof_rewrite_scheduled = 1; ... }
以上, 父進程就完成了收尾工做,寫命令就 write
到 newfd 了。
能夠將以上父子進程的交互整理出時序圖以下,
上圖參考 Redis · 原理介紹 · 利用管道優化aofrewrite
有兩個時刻能夠觸發 AOF 重寫。
【1】手動執行 BGREWRITEAOF
命令。
【2】自動執行,在 serverCron
函數中根據必定邏輯進行斷定。
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { ... /* Trigger an AOF rewrite if needed */ if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 && server.aof_rewrite_perc && server.aof_current_size > server.aof_rewrite_min_size) /* 默認 64M */ { long long base = server.aof_rewrite_base_size ? server.aof_rewrite_base_size : 1; long long growth = (server.aof_current_size*100/base) - 100; if (growth >= server.aof_rewrite_perc) { rewriteAppendOnlyFileBackground(); } } }
也就是說 AOF 文件大小超過了 server.aof_rewrite_min_size,而且增加率大於 server.aof_rewrite_perc 時就會觸發,增加率計算的基數 server.aof_rewrite_base_size 是上次 aofrewrite 結束後 AOF 文件的大小。
幾個解釋。
阻塞模式下,進程或是線程執行到這些函數時必須等待某個事件的發生,若是事件沒有發生,進程或線程就被阻塞(死等在被阻塞的地方),函數不會當即返回。非阻塞non-block模式下,進程或線程執行此函數時沒必要非要等待事件的發生,一旦執行確定返回,以返回值的不一樣來反映函數的執行狀況,若是事件發生則與阻塞方式相同,若事件沒有發生則返回一個代碼來告知事件未發生,而進程或線程繼續執行,因此非阻塞模式效率較高。