除了 RDB 持久化功能以外,Redis 還提供了 AOF(Append Only File)持久化功能。與 RDB 持久化經過保存數據庫中的鍵值對來記錄數據庫狀態不一樣,AOF 持久化是經過保存 Redis 服務器所執行的寫命令來記錄數據庫狀態的。html
AOF 文件中記錄了 Redis 服務器所執行的寫命令,以此來保存數據庫的狀態。AOF 文件本質上是一個 redo log,經過它能夠恢復數據庫狀態。mysql
隨着執行命令的增多,AOF 文件的大小會不斷增大,這會致使幾個問題,好比,磁盤佔用增長,重啓加載過慢等。所以, Redis 提供了 AOF 重寫機制來控制 AOF 文件大小,下面會細說。linux
AOF 文件中寫入的全部命令以 Redis 的命令請求協議格式去保存,即 RESP 格式。c++
有兩種方式能夠實現 AOF 功能的開關,以下,redis
config set appendonly yes/no
。與 AOF 相關的 server 成員變量不少,這裏只選擇幾個進行簡要說明。先看後面的章節,以後再回頭看本章節,也是個不錯的主意。sql
int aof_state; /* AOF_(ON|OFF|WAIT_REWRITE) */ int aof_fsync; /* Kind of fsync() policy */ char *aof_filename; /* Name of the AOF file */ int aof_no_fsync_on_rewrite; /* Don't fsync if a rewrite is in prog. */ int aof_rewrite_perc; /* Rewrite AOF if % growth is > M and... */ off_t aof_rewrite_min_size; /* the AOF file is at least N bytes. */ off_t aof_rewrite_base_size; /* AOF size on latest startup or rewrite. */ off_t aof_current_size; /* AOF current size. */ int aof_rewrite_scheduled; /* Rewrite once BGSAVE terminates. */ pid_t aof_child_pid; /* PID if rewriting process */ list *aof_rewrite_buf_blocks; /* Hold changes during an AOF rewrite. */ sds aof_buf; /* AOF buffer, written before entering the event loop */ int aof_fd; /* File descriptor of currently selected AOF file */ int aof_selected_db; /* Currently selected DB in AOF */ time_t aof_flush_postponed_start; /* UNIX time of postponed AOF flush */ time_t aof_last_fsync; /* UNIX time of last fsync() */ time_t aof_rewrite_time_last; /* Time used by last AOF rewrite run. */ time_t aof_rewrite_time_start; /* Current AOF rewrite start time. */ int aof_lastbgrewrite_status; /* C_OK or C_ERR */ unsigned long aof_delayed_fsync; /* delayed AOF fsync() counter */ int aof_rewrite_incremental_fsync;/* fsync incrementally while rewriting? */ int aof_last_write_status; /* C_OK or C_ERR */ int aof_last_write_errno; /* Valid if aof_last_write_status is ERR */ int aof_load_truncated; /* Don't stop on unexpected AOF EOF. */
表示 AOF 刷盤策略,後面會細說數據庫
因爲 aofrewrite 是個耗時操做,所以會 fork 一個子進程去作這件事, aof_child_pid 就標識了子進程的 pid。數組
該變量保存着全部等待寫入到 AOF 文件的協議文本。緩存
該變量用來保存 aofrewrite 期間,server 處理過的須要寫入 AOF 文件的協議文本。這個變量採用 list 結構,是考慮到分配到一個很是大的空間並不老是可能的,也可能產生大量的複製工做。安全
可取值有 0 和 1。
取 1 時,表示此時有子進程正在作 aofrewrite 操做,本次任務後延,等到 serverCron
執行時,合適的狀況再執行。或者是執行了 config set appendonly yes
, 想把 AOF 功能打開,此時執行的 aofrewrite 失敗了,aof_state 仍然處於 AOF_WAIT_REWRITE 狀態,此時 aof_rewrite_scheduled 也會置爲 1,等下次再執行 aofrewrite。
表示 AOF 功能如今的狀態,可取值以下,
#define AOF_OFF 0 /* AOF is off */ #define AOF_ON 1 /* AOF is on */ #define AOF_WAIT_REWRITE 2 /* AOF waits rewrite to start appending */
AOF_OFF 表示 AOF 功能處於關閉狀態,開關在上一節已經說過,默認 AOF 功能是關閉的。AOF 功能從 off switch 到 on 後,aof_state 會從 AOF_OFF 變爲 AOF_WAIT_REWRITE,startAppendOnly
函數完成該邏輯。在 aofrewrite 一次以後,該變量纔會從 AOF_WAIT_REWRITE 變爲 AOF_ON。
能夠看到從 ON 切換到 OFF 時,要經歷一箇中間狀態 AOF_WAIT_REWRITE,那爲什麼要這麼設計呢?再來分析一下 startAppendOnly
函數的邏輯(代碼去掉了打印日誌的部分)。
server.aof_fd = open(server.aof_filename,O_WRONLY|O_APPEND|O_CREAT,0644); serverAssert(server.aof_state == AOF_OFF); if (server.aof_fd == -1) { char *cwdp = getcwd(cwd,MAXPATHLEN); return C_ERR; } if (server.rdb_child_pid != -1) { server.aof_rewrite_scheduled = 1; } else if (rewriteAppendOnlyFileBackground() == C_ERR) { close(server.aof_fd); return C_ERR; } server.aof_state = AOF_WAIT_REWRITE;
【1】打開 aof 文件,默認名爲 appendonly.aof,沒有的話就新建空文件,失敗則返回。
【2】切換後,須要作一次 aofrewrite,將 server 中現有的數據轉換成協議文本,寫到 AOF 文件。可是,這裏要注意,若是此時有子進程在作 bgrdb,那麼這次 aofrewrite 須要任務延緩,即 aof_rewrite_scheduled 置爲 1。
【3】將 aof_state 置爲 AOF_WAIT_REWRITE 狀態。
而作完第一次 aofrewrite 後,AOF_WAIT_REWRITE 轉換成 AOF_ON,以下,
void backgroundRewriteDoneHandler(int exitcode, int bysignal) { ... if (server.aof_state == AOF_WAIT_REWRITE) server.aof_state = AOF_ON; ... }
仔細分析源碼發現,在 AOF 持久化的命令追加階段(後面章節細講),有以下邏輯,
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) { ... if (server.aof_state == AOF_ON) server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf)); if (server.aof_child_pid != -1) aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf)); ... }
很明顯,剛開啓 AOF 時, aof_state 爲 AOF_WAIT_REWRITE ,處理好的協議文本 buf 沒法寫入 aof_buf 變量 ,但必須寫入 aof_rewrite_buf_blocks 變量(數據在 aofrewrite 的最後階段會被寫進 AOF 文件)。
這裏是否將命令 append 到 aof_state 的判斷相當重要,若是修改條件爲 server.aof_state != AOF_OFF
,考慮以下狀況。
AOF 狀態剛打開,還沒有完成第一次 aofrewrite,也即,一邊 Child 進程數據庫中現有數據還未寫進 AOF 文件,另外一邊 Parent 進程仍然持續處理 client 請求,因而,Parent 進程在指定的數據刷盤策略下,將 aof_buf 刷盤。若是這時宕機了,當 server 重啓後,加載 AOF 文件,在內存中塞入數據,實際上對於用戶來講,這部分數據算是髒數據了,由於 AOF 並無成功打開,未開啓 AOF 狀態時,數據都在內存中,宕機後,數據會所有丟掉。增長這個中間狀態就是爲了應對這種狀況。因此, AOF_WAIT_REWRITE 狀態存在的時間範圍起始於 startAppendOnly
,到完成第一次 aofrewrite 後切成 AOF_ON 。aofrewrite 後再發生宕機,丟失的數據就少多了。
這只是我我的的理解,不必定正確,歡迎你們斧正。
另外,若是開啓了 AOF,在 redis 啓動 加載 AOF 文件時,aof_state 也會暫時設置成 AOF_OFF,加載完畢以後設置爲 AOF_ON。
爲了提升 aofrewrite 效率,Redis 經過在父子進程間創建管道,把 aofrewrite 期間的寫命令經過管道同步給子進程,追加寫盤的操做也就轉交給了子進程。aof_pipe_* 變量就是這部分會用到的管道。
AOF 功能開啓後,每次致使數據庫狀態發生變化的命令都會通過函數 feedAppendOnlyFile
累積到 aof_buf 變量中。若是後臺有正在執行的 aofrewrite 任務,還會寫一份數據到 aof_rewrite_buf_blocks 變量中。
在該函數中,首先要將數據庫切換到當前數據庫( aof_selected_db 更新),在 buf 中插入一條 SELECT 命令。
sds buf = sdsempty(); if (dictid != server.aof_selected_db) { char seldb[64]; snprintf(seldb,sizeof(seldb),"%d",dictid); buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n", (unsigned long)strlen(seldb),seldb); server.aof_selected_db = dictid; }
而後在對須要加入 buf 的命令進行分類處理。
【1】帶有過時時間的命令,調用函數 catAppendOnlyExpireAtCommand
進行協議文本 buf 組裝。EXPIRE/PEXPIRE/EXPIREAT 這三個命令直接調用該函數,而 SETEX/PSETEX 這兩個命令須要在調用以前加入一個 SET 命令。即,
tmpargv[0] = createStringObject("SET",3); tmpargv[1] = argv[1]; tmpargv[2] = argv[3]; buf = catAppendOnlyGenericCommand(buf,3,tmpargv); decrRefCount(tmpargv[0]); buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
【2】普通命令,直接調用函數 catAppendOnlyGenericCommand
進行協議文本 buf 組裝。
該函數其實就是將全部與過時時間相關的命令轉成 PEXPIREAT 命令,細化到毫秒。最後調用普通命令組裝 buf 函數 catAppendOnlyGenericCommand
。
// 構建 PEXPIREAT 命令 argv[0] = createStringObject("PEXPIREAT",9); argv[1] = key; argv[2] = createStringObjectFromLongLong(when); // 調用 aof 公共函數 buf = catAppendOnlyGenericCommand(buf, 3, argv);
該函數用來把 redis 命令轉換成 RESP 協議文本。
sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) { char buf[32]; int len, j; robj *o; // 好比 *3\r\n buf[0] = '*'; len = 1+ll2string(buf+1,sizeof(buf)-1,argc); buf[len++] = '\r'; buf[len++] = '\n'; dst = sdscatlen(dst,buf,len); for (j = 0; j < argc; j++) { o = getDecodedObject(argv[j]); buf[0] = '$'; len = 1+ll2string(buf+1,sizeof(buf)-1,sdslen(o->ptr)); buf[len++] = '\r'; buf[len++] = '\n'; dst = sdscatlen(dst,buf,len); dst = sdscatlen(dst,o->ptr,sdslen(o->ptr)); dst = sdscatlen(dst,"\r\n",2); decrRefCount(o); } return dst; }
能夠看到,定義了一個 buf 數組,反覆使用,經過 len 精確控制 append 到 dst 後的長度。
aof_rewrite_buf_blocks 變量是一個 list 結構,其中每個元素都是一個大小爲 10M 的 block
#define AOF_RW_BUF_BLOCK_SIZE (1024*1024*10) /* 10 MB per block */ typedef struct aofrwblock { unsigned long used, free; char buf[AOF_RW_BUF_BLOCK_SIZE]; } aofrwblock;
這個函數作了兩件事情。
一是,將 catAppendOnlyGenericCommand
得到的協議文本 buf 存到 aof_rewrite_buf_blocks 變量,首先拿出來 list 最後一個 block,若是裝不下,那先把最後一個 block 填滿,剩下的再申請內存。
listNode *ln = listLast(server.aof_rewrite_buf_blocks); // 指向最後一個緩存塊 aofrwblock *block = ln ? ln->value : NULL; while(len) { if (block) { // 若是已經有至少一個緩存塊,那麼嘗試將內容追加到這個緩存塊裏面 unsigned long thislen = (block->free < len) ? block->free : len; if (thislen) { /* The current block is not already full. */ memcpy(block->buf+block->used, s, thislen); block->used += thislen; block->free -= thislen; s += thislen; len -= thislen; } } if (len) { // 最後一個緩存塊沒有放得下本次 data,那再申請一個 block int numblocks; block = zmalloc(sizeof(*block)); block->free = AOF_RW_BUF_BLOCK_SIZE; block->used = 0; listAddNodeTail(server.aof_rewrite_buf_blocks,block); ... ... } }
二是,給 aof_pipe_write_data_to_child 這個 fd 註冊寫事件,回調函數爲 aofChildWriteDiffData
。
/* Install a file event to send data to the rewrite child if there is * not one already. */ if (aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0) { aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child, AE_WRITABLE, aofChildWriteDiffData, NULL); }
這個屬於 aof 重寫的邏輯,後面章節會細說,這裏先留個心。
也就是說,何時會調用feedAppendOnlyFile
呢?有如下兩個時機。
你們都知道,Redis 中命令執行的流程,即 processCommand
-> call
。在 call
函數中會把某些命令寫入 AOF 文件。如何判斷某個命令是否須要寫入 AOF 呢?
在 server 結構體中維持了一個 dirty 計數器,dirty 記錄的是服務器狀態進行了多少次修改,每次作完 save/bgsave 執行完成後,會將 dirty 清 0,而使得服務器狀態修改的命令通常都須要寫入 AOF 文件和主從同步(排除某些特殊狀況)。
dirty = server.dirty; c->cmd->proc(c); dirty = server.dirty-dirty; ... if (propagate_flags != PROPAGATE_NONE) propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
在 propagate
函數中就會調用到 feedAppendOnlyFile
。
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags) { if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF) feedAppendOnlyFile(cmd,dbid,argv,argc); if (flags & PROPAGATE_REPL) replicationFeedSlaves(server.slaves,dbid,argv,argc); }
當內存中帶有過時時間的 key 過時時,會向 AOF 寫入 del 命令。
void propagateExpire(redisDb *db, robj *key) { ... if (server.aof_state != AOF_OFF) feedAppendOnlyFile(server.delCommand,db->id,argv,2); replicationFeedSlaves(server.slaves,db->id,argv,2); ... }
propagateExpire
函數在一些檢查 key 是否過時時會調用。
上一步中,將須要寫入 AOF 文件的數據先寫到了 aof_buf 變量中,那麼,接下來講一下如何將 aof_buf 的內容寫進 AOF 文件。
爲了提升文件的寫入效率,在現代操做系統中,當用戶調用write
函數試,將一些數據寫入到文件的時候,操做系統一般會將寫入的數據保存在一個內存緩衝區裏,等到緩衝區的空間被填滿,或者超過了指定的時限後,才真正地將緩衝區中的數據寫入磁盤。這種作法雖然提升了效率,但也爲寫入數據帶來了安全問題,由於若是計算機宕機,那麼保存在內存緩衝區裏面的寫入數據將會丟失。
爲此,系統提供了
fsync
和fdatasync
兩個同步函數,它們能夠強制讓操做系統當即將緩存區中的數據寫入到硬盤裏面,從而確保寫入數據的安全性。
要知道,這兩個系統調用函數都是阻塞式的,針對如何協調文件寫入與同步的關係,該版本 Redis 支持 3 種同步策略,可在配置文件中使用 appendfsync 項進行配置,有以下取值,
在 Redis 源碼中, 當程序運行在 Linux 系統上時,執行的是 fdatasync
函數,而在其餘系統上,則會執行 fsync
函數,即,
#ifdef __linux__ #define aof_fsync fdatasync #else #define aof_fsync fsync #endif
注:如下敘述均以 fsync
代稱。
寫入文件的邏輯在 flushAppendOnlyFile
函數中實現。下面分兩部分來看主要代碼。
write
系統調用... // aof 緩存區內沒有數據須要寫入 disk,無需處理 if (sdslen(server.aof_buf) == 0) return; // 若是 sync policy 設置成 everysec, // sync_in_progress 表示是否有 fsync 任務在後臺 if (server.aof_fsync == AOF_FSYNC_EVERYSEC) sync_in_progress = bioPendingJobsOfType(BIO_AOF_FSYNC) != 0; // force=0(非強制寫入)時,若是後臺有 fsync 任務,推遲這次寫入,但推遲時間不超過 2s if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) { if (sync_in_progress) { if (server.aof_flush_postponed_start == 0) { // 首次推遲 write,一次推遲 2s server.aof_flush_postponed_start = server.unixtime; return; } else if (server.unixtime - server.aof_flush_postponed_start < 2) { return; } // 不然,經過,繼續寫,由於咱們不能等待超過 2s server.aof_delayed_fsync++; serverLog(LL_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis."); } } ... // 將 aof 緩衝區的內容寫到系統緩存區 nwritten = write(server.aof_fd, server.aof_buf, sdslen(server.aof_buf)); ... // 執行了 write 操做,因此要清零延遲 flush 的時間 server.aof_flush_postponed_start = 0;
首先會判斷 aof_buf 是否爲空,若是是,那麼不須要執行下面的邏輯,直接返回。
若是同步策略爲 everysec,那麼須要查看是否有 fsync 任務在後臺,調用 fsync 使用的是 Redis 中 bio ,若是對這個還不瞭解,能夠參考我以前的文章 《 Redis Bio 詳解 》。爲何要作這個判斷呢?
當fsync
和write
同一個 fd 時,write
必然阻塞。 當系統 IO 很是繁忙時,fsync
() 可能會阻塞, 即便系統 IO 不繁忙,fsync
也會由於數據量大而慢。
所以對於 everysec 策略,須要儘可能保證 fsync
和 write
不一樣時操做同一個 fd。no 策略徹底把 fsync
交給了操做系統,操做系統何時 fsync
,無從得知。always 策略則是每次都要主從調用 fsync
,也不必作判斷。所以,這裏的判斷,只針對 everysec 策略有效。
對於 everysec 策略,若是有 fsync
在執行,那麼本次 write
推遲 2 秒鐘,等到下次在進入本函數時,若是推遲時間超過 2 秒,那麼更新 aof_delayed_fsync 值(info 裏能夠查到),打印日誌 」 Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis. 「 ,以後進行 write
系統調用。固然了,系統也提供了 force 選項,去跳過這項是否要推遲 write
的檢查。
write
以後,將 aof_flush_postponed_start 推遲開始計時值清零,迎接下次檢查。
因此說,AOF 執行 everysec 策略時,若是剛好有 fsync
在長時間的執行,Redis 意外關閉會丟失最多兩秒的數據。若是 fsync
運行正常,只有當操做系統 crash 時纔會形成最多 1 秒的數據丟失。
write
結果處理write
調用結果多是正常的,也多是異常的,那麼須要作不一樣的處理。首先主要看異常處理,
if (nwritten != (signed)sdslen(server.aof_buf)) { ... /* Log the AOF write error and record the error code. */ if (nwritten == -1) { ... } else { // 若是僅寫了一部分,發生錯誤 // 將追加的內容截斷,刪除了追加的內容,恢復成原來的文件 if (ftruncate(server.aof_fd, server.aof_current_size) == -1) { ... } else { nwritten = -1; } server.aof_last_write_errno = ENOSPC; } // 若是是寫入的策略爲每次寫入就同步,沒法恢復這種策略的寫,由於咱們已經告知使用者,已經將寫的數據同步到磁盤了,所以直接退出程序 if (server.aof_fsync == AOF_FSYNC_ALWAYS) { ... exit(1); } else { // 設置執行write操做的狀態 server.aof_last_write_status = C_ERR; if (nwritten > 0) { // 只能更新當前的 AOF 文件的大小 server.aof_current_size += nwritten; // 刪除 AOF 緩衝區寫入的字節 sdsrange(server.aof_buf,nwritten,-1); } return; /* We'll try again on the next call... */ } } else { /* Successful write(2). If AOF was in error state, restore the * OK state and log the event. */ if (server.aof_last_write_status == C_ERR) { serverLog(LL_WARNING, "AOF write error looks solved, Redis can write again."); server.aof_last_write_status = C_OK; } }
寫入異常的判斷,nwritten != (signed)sdslen(server.aof_buf)
,write
的數據量與 aof_buf 的大小不一樣。當徹底沒寫入時,打個日誌就算了;當僅寫入了一部分數據時,使用 ftruncate
函數把 AOF 文件的內容恢復成原來的大小,以備下次從新寫入,nwritten 置爲 -1。使用 ftruncate
的緣由是怕操做系統執行了 fsync
,所以須要把 AOF 文件的大小恢復。
若是執行的是 always 同步策略,那麼須要返回會客戶端錯誤。對於其餘策略,更新 aof_last_write_status
,以便知道上一次作 write
的結果,對於未徹底寫入的狀況,若是上面執行的 ftruncate
失敗,此時 nwritten > 0
,須要更新 aof_current_size,從 aof_buf 中減去已經寫入的,防止下次有重複數據寫入,而後返回。
若是寫入成功,那麼視狀況更新 aof_last_write_status
,表示這次 write
成功。
下面主要是正常狀況的處理。
/* nwritten = -1 時走不到這個步驟 */ server.aof_current_size += nwritten; // 正常 write,更新 aof_current_size /* Re-use AOF buffer when it is small enough. The maximum comes from the * arena size of 4k minus some overhead (but is otherwise arbitrary). */ if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) { sdsclear(server.aof_buf); } else { sdsfree(server.aof_buf); server.aof_buf = sdsempty(); } /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are * children doing I/O in the background. */ if (server.aof_no_fsync_on_rewrite && (server.aof_child_pid != -1 || server.rdb_child_pid != -1)) return; /* Perform the fsync if needed. */ if (server.aof_fsync == AOF_FSYNC_ALWAYS) { /* aof_fsync is defined as fdatasync() for Linux in order to avoid * flushing metadata. */ latencyStartMonitor(latency); aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */ latencyEndMonitor(latency); latencyAddSampleIfNeeded("aof-fsync-always",latency); server.aof_last_fsync = server.unixtime; } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC && server.unixtime > server.aof_last_fsync)) { if (!sync_in_progress) aof_background_fsync(server.aof_fd); // 若是沒有正在執行同步,那麼建立一個後臺任務 server.aof_last_fsync = server.unixtime; }
aof_buf 清空,而後根據不一樣策略進行同步。always 策略時,主動調用 fsync
; everysec 策略,則建立 fsync bio 任務。
另外,有配置項 no-appendfsync-on-rewrite 去決定,當子進程在作 aofrewrite/bgsave 時是否要進行 fsync
。
也就是,何時會調用 flushAppendOnlyFile
函數,有如下三個時機。
Redis 的服務器進程就是一個事件循環,這個循環中的文件事件負責接收客戶端請求,以及向客戶端發送命令回覆,而時間事件則負責像
serverCron
函數這樣須要定時運行的函數。
對於 Redis 的事件機制能夠參考我以前的文章 《Redis 中的事件》。
由於服務器在處理文件事件時可能會執行寫命令,使得一些內容被追加到 aof_buf 緩衝區裏面,因此在服務器每次結束一個事件循環以前,都會調用 flushAppendOnlyFile
函數,考慮是否須要將 aof_buf 緩衝區中的內容寫入和同步到 AOF 文件裏面。即,
void beforeSleep(struct aeEventLoop *eventLoop) { ... /* Write the AOF buffer on disk */ flushAppendOnlyFile(0); ... }
這裏的調用是非強制寫入(force = 0)。
Redis 中的時間事件,按期執行 serverCron
函數(從 Redis 2.8 開始,用戶能夠經過修改 hz 選項來調整 serverCron
的每秒執行次數),作一些瑣事,好比更新服務器各項統計信息、關閉清理客戶端、作 AOF 和 RDB 等。
/* AOF postponed flush: Try at every cron cycle if the slow fsync completed. */ if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
若是上次 AOF 寫入推遲了,那麼再次嘗試非強制寫入。
run_with_period(1000) { if (server.aof_last_write_status == C_ERR) flushAppendOnlyFile(0); }
每秒鐘檢查,若是上次寫入 AOF 文件失敗了,再次嘗試非強制寫入。由於須要及時去處理 aof_buf
,以及重置 AOF 寫入狀態的變量 aof_last_write_status,每秒作檢查,這個頻率是足夠的。
當 AOF 功能要關閉時,會調用 stopAppendOnly
函數,嘗試一次強制寫入,即盡最大努力去保存最多的數據。
void stopAppendOnly(void) { serverAssert(server.aof_state != AOF_OFF); flushAppendOnlyFile(1); aof_fsync(server.aof_fd); close(server.aof_fd); }
強制寫入,並刷盤。
當 Redis 服務器進程啓動時,須要調用 loadDataFromDisk
函數去加載數據。
void loadDataFromDisk(void) { long long start = ustime(); if (server.aof_state == AOF_ON) { // 開啓了 aof if (loadAppendOnlyFile(server.aof_filename) == C_OK) serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000); } else { if (rdbLoad(server.rdb_filename) == C_OK) { serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds", (float)(ustime()-start)/1000000); } else if (errno != ENOENT) { serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno)); exit(1); } } }
能夠看到,若是開啓了 AOF 功能,就會調用 loadAppendOnlyFile
函數,加載 AOF 文件中的數據到內存中。不然,會去調用 rdbLoad
函數,加載 RDB 文件。加載 AOF 文件的設計頗有意思。
FILE *fp = fopen(filename,"r"); struct redis_stat sb; int old_aof_state = server.aof_state; long loops = 0; off_t valid_up_to = 0; /* Offset of the latest well-formed command loaded. */ // 檢查文件的正確性, 存在,而且不爲空 if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) { server.aof_current_size = 0; fclose(fp); return C_ERR; } if (fp == NULL) { serverLog(LL_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno)); exit(1); } // 暫時關掉 AOF, 防止向該 filename 中寫入新的 AOF 數據 server.aof_state = AOF_OFF;
首先,空文件沒有必要再去加載了,提早返回。
而後,暫時關閉 AOF 功能,這是爲了防止在加載 AOF 文件的過程當中,又有新的數據寫進來
fakeClient = createFakeClient(); // 建立一個不帶網絡鏈接的僞客戶端 startLoading(fp); // 標記正在 load db,loading = 1 // 讀 AOF 文件 while(1) { int argc, j; unsigned long len; robj **argv; char buf[128]; sds argsds; struct redisCommand *cmd; ... ... // 如執行命令 SET keytest val,那麼寫入 AOF 文件中的格式爲 // *3\r\n$3\r\nSET\r\n$7\r\nkeytest\r\n$3\r\nval\r\n if (fgets(buf,sizeof(buf),fp) == NULL) { // 按行讀取 AOF 文件,*3 if (feof(fp)) break; else goto readerr; } if (buf[0] != '*') goto fmterr; // 判斷協議是否正確 if (buf[1] == '\0') goto readerr; // 數據完整判斷 argc = atoi(buf+1); if (argc < 1) goto fmterr; argv = zmalloc(sizeof(robj*)*argc); fakeClient->argc = argc; fakeClient->argv = argv; for (j = 0; j < argc; j++) { if (fgets(buf,sizeof(buf),fp) == NULL) { // 依次讀到 $3, $7, $3 fakeClient->argc = j; /* Free up to j-1. */ freeFakeClientArgv(fakeClient); goto readerr; } if (buf[0] != '$') goto fmterr; len = strtol(buf+1,NULL,10); // 參數長度 argsds = sdsnewlen(NULL,len); if (len && fread(argsds,len,1,fp) == 0) { // 依次讀到 SET/ keytest/ val sdsfree(argsds); fakeClient->argc = j; /* Free up to j-1. */ freeFakeClientArgv(fakeClient); goto readerr; } argv[j] = createObject(OBJ_STRING,argsds); if (fread(buf,2,1,fp) == 0) { // 讀到 \r\n fakeClient->argc = j+1; /* Free up to j. */ freeFakeClientArgv(fakeClient); goto readerr; /* discard CRLF */ } } /* Command lookup */ cmd = lookupCommand(argv[0]->ptr); if (!cmd) { serverLog(LL_WARNING,"Unknown command '%s' reading the append only file", (char*)argv[0]->ptr); exit(1); } /* Run the command in the context of a fake client */ cmd->proc(fakeClient); /* The fake client should not have a reply */ serverAssert(fakeClient->bufpos == 0 && listLength(fakeClient->reply) == 0); /* The fake client should never get blocked */ serverAssert((fakeClient->flags & CLIENT_BLOCKED) == 0); /* Clean up. Command code may have changed argv/argc so we use the * argv/argc of the client instead of the local variables. */ freeFakeClientArgv(fakeClient); if (server.aof_load_truncated) valid_up_to = ftello(fp); }
上面這部分是加載 AOF 文件的關鍵,以 SET keytest val
命令對應的 AOF 文件內容 *3\r\n$3\r\nSET\r\n$7\r\nkeytest\r\n$3\r\nval\r\n
爲例,能夠更好地理解上面的邏輯。因爲 AOF 文件中存儲的數據與客戶端發送的請求格式相同徹底符合 Redis 的通訊協議,所以 Redis Server 建立僞客戶端 fakeClient,將解析後的 AOF 文件數據像客戶端請求同樣調用各類指令,cmd->proc(fakeClient)
,將 AOF 文件中的數據重現到 Redis Server 數據庫中。
完成以上邏輯後,進行一些收尾工做,如改回 AOF 狀態爲 ON,釋放僞客戶端等,並處理一些異常狀況,這裏就不展開細講了。