除了RDB持久化以外,Redis還提供了AOF(Append Only File)持久化功能。與RDB持久化經過保存數據庫中鍵值對來保存數據庫的狀態不一樣,AOF持久化是經過保存Redis服務器所執行的寫命令來記錄數據庫的狀態。被寫入AOF文件的全部命令都是以Redis的命令請求協議格式保存的,該格式是一種純本文的格式,因此能夠經過直接打開AOF文件,觀察裏面的類容。html
AOF持久化須要將全部寫命令記錄在文件中來保存服務器狀態,而文件寫入操做效率比較低,若是每執行一條寫命令都要寫一次AOF文件無疑是低效的。爲了提升效率,Redis提供了一箇中間層 – AOF緩衝區,也就是說當Redis執行一條寫命令後,先將該命令追加到AOF緩衝區中,在之後的某個時刻再將AOF緩衝區中的內容同步到文件中。當AOF持久化功能處於打開狀態時,服務器在執行完一個寫命令以後,會以協議格式將被執行的寫命令追加到服務器狀態的aof_buf緩衝區的末尾:數據庫
struct redisServer { ... sds aof_buf;/* AOF buffer, written before entering the event loop */ }
//進行命令追加 void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags) { if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF) feedAppendOnlyFile(cmd,dbid,argv,argc); }
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) { sds buf = sdsempty(); robj *tmpargv[3]; /* The DB this command was targeting is not the same as the last command * we appended. To issue a SELECT command is needed. */ // 若是當前命令涉及的數據庫與server.aof_selected_db指明的數據庫不一致,須要加入SELECT命令顯式設置 if (dictid != server.aof_selected_db) { char seldb[64]; snprintf(seldb,sizeof(seldb),"%d",dictid); buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n", (unsigned long)strlen(seldb),seldb); server.aof_selected_db = dictid; } // 處理EXPIRE, SETEX, EXPIREAT命令 if (cmd->proc == expireCommand || cmd->proc == pexpireCommand || cmd->proc == expireatCommand) { /* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */ // 將EXPIRE/PEXPIRE/EXPIREAT命令都轉換爲PEXPIREAT命令 buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]); } // 處理SETEX、PSETEX命令 else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) { /* Translate SETEX/PSETEX to SET and PEXPIREAT */ // 將SETEX/PSETEX命令轉換爲SET命令和PEXPIREAT命令 tmpargv[0] = createStringObject("SET",3); tmpargv[1] = argv[1]; tmpargv[2] = argv[3]; buf = catAppendOnlyGenericCommand(buf,3,tmpargv); decrRefCount(tmpargv[0]); buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]); } // 其它命令使用catAppendOnlyGenericCommand()函數處理 else { /* All the other commands don't need translation or need the * same translation already operated in the command vector * for the replication itself. */ // 全部其它命令並不須要轉換操做或者已經完成轉換,採用此函數將將寫命令轉化爲命令協議格式的字符串 buf = catAppendOnlyGenericCommand(buf,argc,argv); } /* Append to the AOF buffer. This will be flushed on disk just before * of re-entering the event loop, so before the client will get a * positive reply about the operation performed. */ if (server.aof_state == REDIS_AOF_ON) server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf)); /* If a background append only file rewriting is in progress we want to * accumulate the differences between the child DB and the current one * in a buffer, so that when the child process will do its work we * can append the differences to the new append only file. */ // 若是後臺正在執行AOF文件重寫操做(即BGREWRITEAOF命令),爲了記錄當前正在重寫的AOF文件和當前數據庫的 // 差別信息,咱們還須要將重構後的命令追加到AOF重寫緩存中。 if (server.aof_child_pid != -1) aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf)); sdsfree(buf); }
sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) { char buf[32]; int len, j; robj *o; // 構建格式爲「*<count>\r\n"格式的字符串,<count>爲命令參數個數 buf[0] = '*'; len = 1+ll2string(buf+1,sizeof(buf)-1,argc); buf[len++] = '\r'; buf[len++] = '\n'; dst = sdscatlen(dst,buf,len); // 重建命令,每一個item的格式爲「$<len>\r\n<content>\r\n」,其中<len>指明<content>的字符長度,<content>爲參數內容 for (j = 0; j < argc; j++) { o = getDecodedObject(argv[j]); buf[0] = '$'; len = 1+ll2string(buf+1,sizeof(buf)-1,sdslen(o->ptr)); buf[len++] = '\r'; buf[len++] = '\n'; dst = sdscatlen(dst,buf,len); dst = sdscatlen(dst,o->ptr,sdslen(o->ptr)); dst = sdscatlen(dst,"\r\n",2); decrRefCount(o); } // 返回重建後的命令內容 return dst; }
void flushAppendOnlyFile(int force) { if (sdslen(server.aof_buf) == 0) return; if (server.aof_fsync == AOF_FSYNC_EVERYSEC) sync_in_progress = bioPendingJobsOfType(BIO_AOF_FSYNC) != 0; if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) { if (sync_in_progress) { if (server.aof_flush_postponed_start == 0) { server.aof_flush_postponed_start = server.unixtime; return; } else if (server.unixtime - server.aof_flush_postponed_start < 2) { return; } server.aof_delayed_fsync++; } } //將aof_buf中的內容寫入到aof文件 nwritten = write(server.aof_fd,server.aof_buf,sdslen(server.aof_buf)); server.aof_flush_postponed_start = 0; …… server.aof_current_size += nwritten; if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) { sdsclear(server.aof_buf); } else { sdsfree(server.aof_buf); server.aof_buf = sdsempty(); } /*Don't fsync if no-appendfsync-on-rewrite is set to yes and there are children doing I/O in the background. */
if (server.aof_no_fsync_on_rewrite && (server.aof_child_pid != -1 || server.rdb_child_pid != -1)) return; /* appendfsync爲always */ if (server.aof_fsync == AOF_FSYNC_ALWAYS) {/ aof_fsync(server.aof_fd); //同步aof文件 server.aof_last_fsync = server.unixtime;//記錄同步時間 } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC && server.unixtime > server.aof_last_fsync)) { /* appendfsync爲EVERYSEC*/ if (!sync_in_progress) aof_background_fsync(server.aof_fd); server.aof_last_fsync = server.unixtime; } }
// 在另外一個線程中,對給定的描述符 fd (指向 AOF 文件)執行一個後臺 fsync() 操做。 void aof_background_fsync(int fd) { bioCreateBackgroundJob(BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL); }
struct redisServer{ // AOF 狀態(開啓/關閉/可寫) int aof_state; /* REDIS_AOF_(ON|OFF|WAIT_REWRITE) */ // 所使用的 fsync 策略(每一個寫入/每秒/從不) int aof_fsync; /* Kind of fsync() policy */ char *aof_filename; /* Name of the AOF file */ int aof_no_fsync_on_rewrite; /* Don't fsync if a rewrite is in prog. */ int aof_rewrite_perc; /* Rewrite AOF if % growth is > M and... */ off_t aof_rewrite_min_size; /* the AOF file is at least N bytes. */ // 最後一次執行 BGREWRITEAOF 時, AOF 文件的大小 off_t aof_rewrite_base_size; /* AOF size on latest startup or rewrite. */ // AOF 文件的當前字節大小 off_t aof_current_size; /* AOF current size. */ int aof_rewrite_scheduled; /* Rewrite once BGSAVE terminates. */ // 負責進行 AOF 重寫的子進程 ID pid_t aof_child_pid; /* PID if rewriting process */ // AOF 重寫緩存鏈表,連接着多個緩存塊 list *aof_rewrite_buf_blocks; /* Hold changes during an AOF rewrite. */ // AOF 緩衝區 sds aof_buf; /* AOF buffer, written before entering the event loop */ // AOF 文件的描述符 int aof_fd; /* File descriptor of currently selected AOF file */ // AOF 的當前目標數據庫 int aof_selected_db; /* Currently selected DB in AOF */ // 推遲 write 操做的時間 time_t aof_flush_postponed_start; /* UNIX time of postponed AOF flush */ // 最後一直執行 fsync 的時間 time_t aof_last_fsync; /* UNIX time of last fsync() */ time_t aof_rewrite_time_last; /* Time used by last AOF rewrite run. */ // AOF 重寫的開始時間 time_t aof_rewrite_time_start; /* Current AOF rewrite start time. */ // 最後一次執行 BGREWRITEAOF 的結果 int aof_lastbgrewrite_status; /* REDIS_OK or REDIS_ERR */ // 記錄 AOF 的 write 操做被推遲了多少次 unsigned long aof_delayed_fsync; /* delayed AOF fsync() counter */ // 指示是否須要每寫入必定量的數據,就主動執行一次 fsync() int aof_rewrite_incremental_fsync;/* fsync incrementally while rewriting? */ int aof_last_write_status; /* REDIS_OK or REDIS_ERR */ int aof_last_write_errno; /* Valid if aof_last_write_status is ERR */ /* RDB persistence */ }
void bgrewriteaofCommand(client *c) { if (server.aof_child_pid != -1) { } else if (server.rdb_child_pid != -1) { server.aof_rewrite_scheduled = 1; } else if (rewriteAppendOnlyFileBackground() == C_OK) { } else { }
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 || ldbPendingChildren()) { …… } else { ……//檢查是否觸發AOF重寫 if (server.rdb_child_pid == -1 &&server.aof_child_pid == -1 && server.aof_rewrite_perc &&server.aof_current_size > server.aof_rewrite_min_size) { long long base = server.aof_rewrite_base_size ?server.aof_rewrite_base_size : 1; long long growth = (server.aof_current_size*100/base) - 100; if (growth >= server.aof_rewrite_perc) { rewriteAppendOnlyFileBackground(); } } } }
//後臺重寫AOF文件 int rewriteAppendOnlyFileBackground(void) { if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR; if (aofCreatePipes() != C_OK) return C_ERR;//建立父進程與子進程的管道 openChildInfoPipe(); start = ustime(); if ((childpid = fork()) == 0) { char tmpfile[256]; snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid()); //在子進程中執行AOF重寫 if (rewriteAppendOnlyFile(tmpfile) == C_OK) { …… } } else { /* Parent */ …… } return C_OK; /* unreached */ }
//重寫AOF文件的程序 int rewriteAppendOnlyFile(char *filename) { snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid()); server.aof_child_diff = sdsempty(); rioInitWithFile(&aof,fp); if (server.aof_rewrite_incremental_fsync) rioSetAutoSync(&aof,AOF_AUTOSYNC_BYTES); //遍歷數據庫,進行重寫操做 …… //寫入、沖洗並同步到AOF文件 if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr; if (fflush(fp) == EOF) goto werr; if (fsync(fileno(fp)) == -1) goto werr; ... return C_OK; }//重寫操做
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 || ldbPendingChildren()) { if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) { if(pid == server.aof_child_pid) { //子進程完成重寫,父進程進行重寫AOF文件的處理 backgroundRewriteDoneHandler(exitcode,bysignal); } } } }
void backgroundRewriteDoneHandler(int exitcode, int bysignal) { if (!bysignal && exitcode == 0) { snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int)server.aof_child_pid); newfd = open(tmpfile,O_WRONLY|O_APPEND); if (aofRewriteBufferWrite(newfd) == -1) { ……//將重寫緩衝區的數據寫入到重寫AOF文件 } if (rename(tmpfile,server.aof_filename) == -1) { ……//覆蓋舊的AOF文件 } …… } }