除了RDB持久化以外,Redis還提供了AOF(Append Only File)持久化功能。與RDB持久化經過保存數據庫中鍵值對來保存數據庫的狀態不一樣,AOF持久化是經過保存Redis服務器所執行的寫命令來記錄數據庫的狀態。被寫入AOF文件的全部命令都是以Redis的命令請求協議格式保存的,該格式是一種純本文的格式,因此能夠經過直接打開AOF文件,觀察裏面的類容。html
AOF持久化功能的實現能夠分爲:命令追加(append),文件寫入(write),文件同步(sync)三個步驟。redis
AOF持久化須要將全部寫命令記錄在文件中來保存服務器狀態,而文件寫入操做效率比較低,若是每執行一條寫命令都要寫一次AOF文件無疑是低效的。爲了提升效率,Redis提供了一箇中間層 – AOF緩衝區,也就是說當Redis執行一條寫命令後,先將該命令追加到AOF緩衝區中,在之後的某個時刻再將AOF緩衝區中的內容同步到文件中。當AOF持久化功能處於打開狀態時,服務器在執行完一個寫命令以後,會以協議格式將被執行的寫命令追加到服務器狀態的aof_buf緩衝區的末尾:數據庫
struct redisServer { ... sds aof_buf;/* AOF buffer, written before entering the event loop */ }
服務器執行完寫命令,調用propagate進行命令追加。緩存
//進行命令追加 void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags) { if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF) feedAppendOnlyFile(cmd,dbid,argv,argc); }
將命令追加到緩衝區中的操做由feedAppendOnlyFile
函數實現,若是後臺正在執行AOF文件後臺重寫操做,該命令還會被追加到AOF重寫緩存中。安全
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) { sds buf = sdsempty(); robj *tmpargv[3]; /* The DB this command was targeting is not the same as the last command * we appended. To issue a SELECT command is needed. */ // 若是當前命令涉及的數據庫與server.aof_selected_db指明的數據庫不一致,須要加入SELECT命令顯式設置 if (dictid != server.aof_selected_db) { char seldb[64]; snprintf(seldb,sizeof(seldb),"%d",dictid); buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n", (unsigned long)strlen(seldb),seldb); server.aof_selected_db = dictid; } // 處理EXPIRE, SETEX, EXPIREAT命令 if (cmd->proc == expireCommand || cmd->proc == pexpireCommand || cmd->proc == expireatCommand) { /* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */ // 將EXPIRE/PEXPIRE/EXPIREAT命令都轉換爲PEXPIREAT命令 buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]); } // 處理SETEX、PSETEX命令 else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) { /* Translate SETEX/PSETEX to SET and PEXPIREAT */ // 將SETEX/PSETEX命令轉換爲SET命令和PEXPIREAT命令 tmpargv[0] = createStringObject("SET",3); tmpargv[1] = argv[1]; tmpargv[2] = argv[3]; buf = catAppendOnlyGenericCommand(buf,3,tmpargv); decrRefCount(tmpargv[0]); buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]); } // 其它命令使用catAppendOnlyGenericCommand()函數處理 else { /* All the other commands don't need translation or need the * same translation already operated in the command vector * for the replication itself. */ // 全部其它命令並不須要轉換操做或者已經完成轉換,採用此函數將將寫命令轉化爲命令協議格式的字符串 buf = catAppendOnlyGenericCommand(buf,argc,argv); } /* Append to the AOF buffer. This will be flushed on disk just before * of re-entering the event loop, so before the client will get a * positive reply about the operation performed. */ if (server.aof_state == REDIS_AOF_ON) server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf)); /* If a background append only file rewriting is in progress we want to * accumulate the differences between the child DB and the current one * in a buffer, so that when the child process will do its work we * can append the differences to the new append only file. */ // 若是後臺正在執行AOF文件重寫操做(即BGREWRITEAOF命令),爲了記錄當前正在重寫的AOF文件和當前數據庫的 // 差別信息,咱們還須要將重構後的命令追加到AOF重寫緩存中。 if (server.aof_child_pid != -1) aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf)); sdsfree(buf); }
aof.c文件中的catAppendOnlyGenericCommand
函數提供了根據傳入命令和該命令的參數將其構形成知足AOF文件格式的字符串的功能。服務器
sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) { char buf[32]; int len, j; robj *o; // 構建格式爲「*<count>\r\n"格式的字符串,<count>爲命令參數個數 buf[0] = '*'; len = 1+ll2string(buf+1,sizeof(buf)-1,argc); buf[len++] = '\r'; buf[len++] = '\n'; dst = sdscatlen(dst,buf,len); // 重建命令,每一個item的格式爲「$<len>\r\n<content>\r\n」,其中<len>指明<content>的字符長度,<content>爲參數內容 for (j = 0; j < argc; j++) { o = getDecodedObject(argv[j]); buf[0] = '$'; len = 1+ll2string(buf+1,sizeof(buf)-1,sdslen(o->ptr)); buf[len++] = '\r'; buf[len++] = '\n'; dst = sdscatlen(dst,buf,len); dst = sdscatlen(dst,o->ptr,sdslen(o->ptr)); dst = sdscatlen(dst,"\r\n",2); decrRefCount(o); } // 返回重建後的命令內容 return dst; }
在上面的介紹中,咱們調用feedAppendOnlyFile
函數只是把命令追加到了AOF緩衝區server.aof_buf
中,並無寫入到磁盤文件中。app
在現代操做系統中,當用戶將數據寫入一個文件中時,爲了提升效率,操做系統會先利用一個緩衝區來存放寫入的內容,直到這個緩衝區滿了或者超過指定的時間後才真正將緩衝區中的內容寫入到磁盤文件中。爲了強制讓操做系統將緩衝區中的數據寫入磁盤,通常能夠經過fsync()函數來強制寫入到磁盤中。而fsync()函數的調用頻率就是咱們這一小節要介紹的「同步策略」。函數
在處理文件事件時(寫命令),命令被追加到aof_buf中;而後在處理時間事件時,serverCron函數會調用flushAppendOnlyFile函數進行文件的寫入和同步。Redis能夠經過配置redis.conf文件中的flush選項來指定AOF同步策略,主要支持如下三種同步策略:oop
AOF_FSYNC_NO源碼分析
在該模式下,Redis服務器在每一個事件循環都將AOF緩衝區server.aof_buf
中的數據寫入AOF文件中,但不執行同步fsync方法,由操做系統決定什麼時候同步。該模式速度最快(無需執行同步操做)但也最不安全(若是機器崩潰將丟失上次同步後的全部數據)。
AOF_FSYNC_ALWAYS
在該模式下,Redis服務器在每一個事件循環都將AOF緩衝區server.aof_buf
中的數據寫入AOF文件中,且執行一次AOF文件同步操做。該模式速度最慢(每一個事件循環都要執行同步操做)但也最安全(若是機器崩潰只丟失當前事件循環中處理的新數據)。
AOF_FSYNC_EVERYSEC
在該模式下,Redis服務器在每一個事件循環都將AOF緩衝區server.aof_buf
中的數據寫入AOF文件中,且每秒執行一次AOF文件同步操做。該模式效率和安全性(若是機器崩潰只丟失前一秒處理的新數據)比較適中,是Redis的默認同步策略。
void flushAppendOnlyFile(int force) { if (sdslen(server.aof_buf) == 0) return; if (server.aof_fsync == AOF_FSYNC_EVERYSEC) sync_in_progress = bioPendingJobsOfType(BIO_AOF_FSYNC) != 0; if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) { if (sync_in_progress) { if (server.aof_flush_postponed_start == 0) { server.aof_flush_postponed_start = server.unixtime; return; } else if (server.unixtime - server.aof_flush_postponed_start < 2) { return; } server.aof_delayed_fsync++; } } //將aof_buf中的內容寫入到aof文件 nwritten = write(server.aof_fd,server.aof_buf,sdslen(server.aof_buf)); server.aof_flush_postponed_start = 0; …… server.aof_current_size += nwritten; if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) { sdsclear(server.aof_buf); } else { sdsfree(server.aof_buf); server.aof_buf = sdsempty(); } /*Don't fsync if no-appendfsync-on-rewrite is set to yes and there are children doing I/O in the background. */
if (server.aof_no_fsync_on_rewrite && (server.aof_child_pid != -1 || server.rdb_child_pid != -1)) return; /* appendfsync爲always */ if (server.aof_fsync == AOF_FSYNC_ALWAYS) {/ aof_fsync(server.aof_fd); //同步aof文件 server.aof_last_fsync = server.unixtime;//記錄同步時間 } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC && server.unixtime > server.aof_last_fsync)) { /* appendfsync爲EVERYSEC*/ if (!sync_in_progress) aof_background_fsync(server.aof_fd); server.aof_last_fsync = server.unixtime; } }
// 在另外一個線程中,對給定的描述符 fd (指向 AOF 文件)執行一個後臺 fsync() 操做。 void aof_background_fsync(int fd) { bioCreateBackgroundJob(BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL); }
數據還原就是將AOF文件中保存的命令解析並執行,這樣就能夠將數據庫還原爲原來的狀態。由於在Redis中,命令必須由redisClient實例來執行,因此爲了加載AOF文件須要建立一個僞Redis客戶端。建立了僞Redis客戶端後,執行數據還原的過程就是從AOF文件中讀取命令並交給僞Redis客戶端執行的過程。數據還原的功能由aof.c文件中的loadAppendOnlyFile
函數完成。
因爲aof是經過不斷追加寫命令來記錄數據庫狀態,因此服務器執行比較久以後,aof文件中的內容會愈來愈多,磁盤佔有量愈來愈大,同時也是使經過過aof文件還原數據庫的須要的時間也變得好久。因此就須要經過讀取服務器當前的數據庫狀態來重寫新的aof文件。新的AOF文件不會包含任何浪費空間的冗餘命令,因此會比舊的AOF文件小不少。
因爲AOF重寫是會進行大量寫入操做,勢必爲長時間阻塞主進程,所以redis把重寫程序放到子進程執行。
這樣作有兩點好處:
1)子進程重寫期間,主進程能夠繼續處理命令。
2)子進程帶有主進程的數據副本,這樣就能夠避免與主進程競爭db->dict,這是線程實現不了的。
重寫期間,主進程繼續處理命令,對數據庫狀態進行修改,這樣使得當前的數據庫狀態與重寫的AOF文件所保存的數據庫狀態不一致。所以,redis設置了AOF重寫緩衝區,在建立子進程後,主進程每執行一個寫命令都會寫到重寫緩衝區。在子進程完成重寫後,主進程會將AOF重寫緩衝區的數據寫入到重寫的AOF文件,保證數據狀態的一致。
在子進程執行AOF重寫期間,服務器進程須要執行如下三個操做:
(1)執行客戶端發送過來的命令;
(2)將執行的命令追加到AOF緩衝區;
(3)將執行的命令追加到AOF重寫緩衝期;
這樣就能夠保證:
當子進程執行完畢後,會向父進程發送一個信號。父進程收到信號後,將執行如下工做:
struct redisServer{ // AOF 狀態(開啓/關閉/可寫) int aof_state; /* REDIS_AOF_(ON|OFF|WAIT_REWRITE) */ // 所使用的 fsync 策略(每一個寫入/每秒/從不) int aof_fsync; /* Kind of fsync() policy */ char *aof_filename; /* Name of the AOF file */ int aof_no_fsync_on_rewrite; /* Don't fsync if a rewrite is in prog. */ int aof_rewrite_perc; /* Rewrite AOF if % growth is > M and... */ off_t aof_rewrite_min_size; /* the AOF file is at least N bytes. */ // 最後一次執行 BGREWRITEAOF 時, AOF 文件的大小 off_t aof_rewrite_base_size; /* AOF size on latest startup or rewrite. */ // AOF 文件的當前字節大小 off_t aof_current_size; /* AOF current size. */ int aof_rewrite_scheduled; /* Rewrite once BGSAVE terminates. */ // 負責進行 AOF 重寫的子進程 ID pid_t aof_child_pid; /* PID if rewriting process */ // AOF 重寫緩存鏈表,連接着多個緩存塊 list *aof_rewrite_buf_blocks; /* Hold changes during an AOF rewrite. */ // AOF 緩衝區 sds aof_buf; /* AOF buffer, written before entering the event loop */ // AOF 文件的描述符 int aof_fd; /* File descriptor of currently selected AOF file */ // AOF 的當前目標數據庫 int aof_selected_db; /* Currently selected DB in AOF */ // 推遲 write 操做的時間 time_t aof_flush_postponed_start; /* UNIX time of postponed AOF flush */ // 最後一直執行 fsync 的時間 time_t aof_last_fsync; /* UNIX time of last fsync() */ time_t aof_rewrite_time_last; /* Time used by last AOF rewrite run. */ // AOF 重寫的開始時間 time_t aof_rewrite_time_start; /* Current AOF rewrite start time. */ // 最後一次執行 BGREWRITEAOF 的結果 int aof_lastbgrewrite_status; /* REDIS_OK or REDIS_ERR */ // 記錄 AOF 的 write 操做被推遲了多少次 unsigned long aof_delayed_fsync; /* delayed AOF fsync() counter */ // 指示是否須要每寫入必定量的數據,就主動執行一次 fsync() int aof_rewrite_incremental_fsync;/* fsync incrementally while rewriting? */ int aof_last_write_status; /* REDIS_OK or REDIS_ERR */ int aof_last_write_errno; /* Valid if aof_last_write_status is ERR */ /* RDB persistence */ }
void bgrewriteaofCommand(client *c) { if (server.aof_child_pid != -1) { } else if (server.rdb_child_pid != -1) { server.aof_rewrite_scheduled = 1; } else if (rewriteAppendOnlyFileBackground() == C_OK) { } else { }
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 || ldbPendingChildren()) { …… } else { ……//檢查是否觸發AOF重寫 if (server.rdb_child_pid == -1 &&server.aof_child_pid == -1 && server.aof_rewrite_perc &&server.aof_current_size > server.aof_rewrite_min_size) { long long base = server.aof_rewrite_base_size ?server.aof_rewrite_base_size : 1; long long growth = (server.aof_current_size*100/base) - 100; if (growth >= server.aof_rewrite_perc) { rewriteAppendOnlyFileBackground(); } } } }
後臺重寫的實現:
//後臺重寫AOF文件 int rewriteAppendOnlyFileBackground(void) { if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR; if (aofCreatePipes() != C_OK) return C_ERR;//建立父進程與子進程的管道 openChildInfoPipe(); start = ustime(); if ((childpid = fork()) == 0) { char tmpfile[256]; snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid()); //在子進程中執行AOF重寫 if (rewriteAppendOnlyFile(tmpfile) == C_OK) { …… } } else { /* Parent */ …… } return C_OK; /* unreached */ }
//重寫AOF文件的程序 int rewriteAppendOnlyFile(char *filename) { snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid()); server.aof_child_diff = sdsempty(); rioInitWithFile(&aof,fp); if (server.aof_rewrite_incremental_fsync) rioSetAutoSync(&aof,AOF_AUTOSYNC_BYTES); //遍歷數據庫,進行重寫操做 …… //寫入、沖洗並同步到AOF文件 if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr; if (fflush(fp) == EOF) goto werr; if (fsync(fileno(fp)) == -1) goto werr; ... return C_OK; }//重寫操做
子進程重寫完成後,父進程進行處理
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 || ldbPendingChildren()) { if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) { if(pid == server.aof_child_pid) { //子進程完成重寫,父進程進行重寫AOF文件的處理 backgroundRewriteDoneHandler(exitcode,bysignal); } } } }
void backgroundRewriteDoneHandler(int exitcode, int bysignal) { if (!bysignal && exitcode == 0) { snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int)server.aof_child_pid); newfd = open(tmpfile,O_WRONLY|O_APPEND); if (aofRewriteBufferWrite(newfd) == -1) { ……//將重寫緩衝區的數據寫入到重寫AOF文件 } if (rename(tmpfile,server.aof_filename) == -1) { ……//覆蓋舊的AOF文件 } …… } }
參考: