Redis 持久化之 AOF

除了 RDB 持久化功能以外,Redis 還提供了 AOF(Append Only File)持久化功能。與 RDB 持久化經過保存數據庫中的鍵值對來記錄數據庫狀態不一樣,AOF 持久化是經過保存 Redis 服務器所執行的寫命令來記錄數據庫狀態的。html

簡介

AOF 文件中記錄了 Redis 服務器所執行的寫命令,以此來保存數據庫的狀態。AOF 文件本質上是一個 redo log,經過它能夠恢復數據庫狀態。mysql

隨着執行命令的增多,AOF 文件的大小會不斷增大,這會致使幾個問題,好比,磁盤佔用增長,重啓加載過慢等。所以, Redis 提供了 AOF 重寫機制來控制 AOF 文件大小,下面會細說。linux

AOF 文件中寫入的全部命令以 Redis 的命令請求協議格式去保存,即 RESP 格式。c++

有兩種方式能夠實現 AOF 功能的開關,以下,redis

  • 在 redis 配置文件 redis.conf 中有配置項 appendonly, yes 打開 AOF 功能,no 關閉 AOF 功能。
  • 使用客戶端命令config set appendonly yes/no

server 相關變量

與 AOF 相關的 server 成員變量不少,這裏只選擇幾個進行簡要說明。先看後面的章節,以後再回頭看本章節,也是個不錯的主意。sql

int aof_state;                  /* AOF_(ON|OFF|WAIT_REWRITE) */
int aof_fsync;                  /* Kind of fsync() policy */
char *aof_filename;             /* Name of the AOF file */
int aof_no_fsync_on_rewrite;    /* Don't fsync if a rewrite is in prog. */
int aof_rewrite_perc;           /* Rewrite AOF if % growth is > M and... */
off_t aof_rewrite_min_size;     /* the AOF file is at least N bytes. */
off_t aof_rewrite_base_size;    /* AOF size on latest startup or rewrite. */
off_t aof_current_size;         /* AOF current size. */
int aof_rewrite_scheduled;      /* Rewrite once BGSAVE terminates. */
pid_t aof_child_pid;            /* PID if rewriting process */
list *aof_rewrite_buf_blocks;   /* Hold changes during an AOF rewrite. */
sds aof_buf;                   /* AOF buffer, written before entering the event loop */
int aof_fd;                     /* File descriptor of currently selected AOF file */
int aof_selected_db;            /* Currently selected DB in AOF */
time_t aof_flush_postponed_start; /* UNIX time of postponed AOF flush */
time_t aof_last_fsync;            /* UNIX time of last fsync() */
time_t aof_rewrite_time_last;     /* Time used by last AOF rewrite run. */
time_t aof_rewrite_time_start;    /* Current AOF rewrite start time. */
int aof_lastbgrewrite_status;     /* C_OK or C_ERR */
unsigned long aof_delayed_fsync;  /* delayed AOF fsync() counter */
int aof_rewrite_incremental_fsync;/* fsync incrementally while rewriting? */
int aof_last_write_status;        /* C_OK or C_ERR */
int aof_last_write_errno;         /* Valid if aof_last_write_status is ERR */
int aof_load_truncated;           /* Don't stop on unexpected AOF EOF. */

aof_fsync

表示 AOF 刷盤策略,後面會細說數據庫

aof_child_pid

因爲 aofrewrite 是個耗時操做,所以會 fork 一個子進程去作這件事, aof_child_pid 就標識了子進程的 pid。數組

aof_buf

該變量保存着全部等待寫入到 AOF 文件的協議文本。緩存

aof_rewrite_buf_blocks

該變量用來保存 aofrewrite 期間,server 處理過的須要寫入 AOF 文件的協議文本。這個變量採用 list 結構,是考慮到分配到一個很是大的空間並不老是可能的,也可能產生大量的複製工做。安全

aof_rewrite_scheduled

可取值有 0 和 1。

取 1 時,表示此時有子進程正在作 aofrewrite 操做,本次任務後延,等到 serverCron 執行時,合適的狀況再執行。或者是執行了 config set appendonly yes, 想把 AOF 功能打開,此時執行的 aofrewrite 失敗了,aof_state 仍然處於 AOF_WAIT_REWRITE 狀態,此時 aof_rewrite_scheduled 也會置爲 1,等下次再執行 aofrewrite。

aof_state

表示 AOF 功能如今的狀態,可取值以下,

#define AOF_OFF 0             /* AOF is off */
#define AOF_ON 1              /* AOF is on */
#define AOF_WAIT_REWRITE 2    /* AOF waits rewrite to start appending */

AOF_OFF 表示 AOF 功能處於關閉狀態,開關在上一節已經說過,默認 AOF 功能是關閉的。AOF 功能從 off switch 到 on 後,aof_state 會從 AOF_OFF 變爲 AOF_WAIT_REWRITEstartAppendOnly 函數完成該邏輯。在 aofrewrite 一次以後,該變量纔會從 AOF_WAIT_REWRITE 變爲 AOF_ON

能夠看到從 ON 切換到 OFF 時,要經歷一箇中間狀態 AOF_WAIT_REWRITE,那爲什麼要這麼設計呢?再來分析一下 startAppendOnly 函數的邏輯(代碼去掉了打印日誌的部分)。

server.aof_fd = open(server.aof_filename,O_WRONLY|O_APPEND|O_CREAT,0644);
serverAssert(server.aof_state == AOF_OFF);
if (server.aof_fd == -1) {
    char *cwdp = getcwd(cwd,MAXPATHLEN);
    return C_ERR;
}
if (server.rdb_child_pid != -1) {
    server.aof_rewrite_scheduled = 1;
} else if (rewriteAppendOnlyFileBackground() == C_ERR) {
    close(server.aof_fd);
    return C_ERR;
}
server.aof_state = AOF_WAIT_REWRITE;

【1】打開 aof 文件,默認名爲 appendonly.aof,沒有的話就新建空文件,失敗則返回。

【2】切換後,須要作一次 aofrewrite,將 server 中現有的數據轉換成協議文本,寫到 AOF 文件。可是,這裏要注意,若是此時有子進程在作 bgrdb,那麼這次 aofrewrite 須要任務延緩,即 aof_rewrite_scheduled 置爲 1。

【3】將 aof_state 置爲 AOF_WAIT_REWRITE 狀態。

而作完第一次 aofrewrite 後,AOF_WAIT_REWRITE 轉換成 AOF_ON,以下,

void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
    ...
    if (server.aof_state == AOF_WAIT_REWRITE)
        server.aof_state = AOF_ON;
    ...
}

仔細分析源碼發現,在 AOF 持久化的命令追加階段(後面章節細講),有以下邏輯,

void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
    ...
    if (server.aof_state == AOF_ON)
        server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));
    if (server.aof_child_pid != -1)
        aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
    ...
}

很明顯,剛開啓 AOF 時, aof_stateAOF_WAIT_REWRITE ,處理好的協議文本 buf 沒法寫入 aof_buf 變量 ,但必須寫入 aof_rewrite_buf_blocks 變量(數據在 aofrewrite 的最後階段會被寫進 AOF 文件)。

這裏是否將命令 append 到 aof_state 的判斷相當重要,若是修改條件爲 server.aof_state != AOF_OFF考慮以下狀況

AOF 狀態剛打開,還沒有完成第一次 aofrewrite,也即,一邊 Child 進程數據庫中現有數據還未寫進 AOF 文件,另外一邊 Parent 進程仍然持續處理 client 請求,因而,Parent 進程在指定的數據刷盤策略下,將 aof_buf 刷盤。若是這時宕機了,當 server 重啓後,加載 AOF 文件,在內存中塞入數據,實際上對於用戶來講,這部分數據算是髒數據了,由於 AOF 並無成功打開,未開啓 AOF 狀態時,數據都在內存中,宕機後,數據會所有丟掉。增長這個中間狀態就是爲了應對這種狀況。因此, AOF_WAIT_REWRITE 狀態存在的時間範圍起始於 startAppendOnly ,到完成第一次 aofrewrite 後切成 AOF_ON 。aofrewrite 後再發生宕機,丟失的數據就少多了。

這只是我我的的理解,不必定正確,歡迎你們斧正。

另外,若是開啓了 AOF,在 redis 啓動 加載 AOF 文件時,aof_state 也會暫時設置成 AOF_OFF,加載完畢以後設置爲 AOF_ON

aof_pipe_*

爲了提升 aofrewrite 效率,Redis 經過在父子進程間創建管道,把 aofrewrite 期間的寫命令經過管道同步給子進程,追加寫盤的操做也就轉交給了子進程。aof_pipe_* 變量就是這部分會用到的管道。

AOF 持久化

命令追加

AOF 功能開啓後,每次致使數據庫狀態發生變化的命令都會通過函數 feedAppendOnlyFile 累積到 aof_buf 變量中。若是後臺有正在執行的 aofrewrite 任務,還會寫一份數據到 aof_rewrite_buf_blocks 變量中。

feedAppendOnlyFile 函數

在該函數中,首先要將數據庫切換到當前數據庫( aof_selected_db 更新),在 buf 中插入一條 SELECT 命令。

sds buf = sdsempty();
if (dictid != server.aof_selected_db) {
    char seldb[64];
    snprintf(seldb,sizeof(seldb),"%d",dictid);
    buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n", (unsigned        long)strlen(seldb),seldb);
    server.aof_selected_db = dictid;
}

而後在對須要加入 buf 的命令進行分類處理。

【1】帶有過時時間的命令,調用函數 catAppendOnlyExpireAtCommand 進行協議文本 buf 組裝。EXPIRE/PEXPIRE/EXPIREAT 這三個命令直接調用該函數,而 SETEX/PSETEX 這兩個命令須要在調用以前加入一個 SET 命令。即,

tmpargv[0] = createStringObject("SET",3);
tmpargv[1] = argv[1];
tmpargv[2] = argv[3];
buf = catAppendOnlyGenericCommand(buf,3,tmpargv);

decrRefCount(tmpargv[0]);
buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);

【2】普通命令,直接調用函數 catAppendOnlyGenericCommand 進行協議文本 buf 組裝。

catAppendOnlyExpireAtCommand 函數

該函數其實就是將全部與過時時間相關的命令轉成 PEXPIREAT 命令,細化到毫秒。最後調用普通命令組裝 buf 函數 catAppendOnlyGenericCommand

// 構建 PEXPIREAT 命令
argv[0] = createStringObject("PEXPIREAT",9);
argv[1] = key;
argv[2] = createStringObjectFromLongLong(when);

// 調用 aof 公共函數
buf = catAppendOnlyGenericCommand(buf, 3, argv);

catAppendOnlyGenericCommand 函數

該函數用來把 redis 命令轉換成 RESP 協議文本。

sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) {
    char buf[32];
    int len, j;
    robj *o;

    // 好比 *3\r\n
    buf[0] = '*';
    len = 1+ll2string(buf+1,sizeof(buf)-1,argc);
    buf[len++] = '\r';
    buf[len++] = '\n';
    dst = sdscatlen(dst,buf,len);

    for (j = 0; j < argc; j++) {
        o = getDecodedObject(argv[j]);
        buf[0] = '$';
        len = 1+ll2string(buf+1,sizeof(buf)-1,sdslen(o->ptr));
        buf[len++] = '\r';
        buf[len++] = '\n';
        dst = sdscatlen(dst,buf,len);
        dst = sdscatlen(dst,o->ptr,sdslen(o->ptr));
        dst = sdscatlen(dst,"\r\n",2);
        decrRefCount(o);
    }
    return dst;
}

能夠看到,定義了一個 buf 數組,反覆使用,經過 len 精確控制 append 到 dst 後的長度。

aofRewriteBufferAppend 函數

aof_rewrite_buf_blocks 變量是一個 list 結構,其中每個元素都是一個大小爲 10M 的 block

#define AOF_RW_BUF_BLOCK_SIZE (1024*1024*10)    /* 10 MB per block */
typedef struct aofrwblock {
    unsigned long used, free;
    char buf[AOF_RW_BUF_BLOCK_SIZE];
} aofrwblock;

這個函數作了兩件事情。

一是,將 catAppendOnlyGenericCommand 得到的協議文本 buf 存到 aof_rewrite_buf_blocks 變量,首先拿出來 list 最後一個 block,若是裝不下,那先把最後一個 block 填滿,剩下的再申請內存。

listNode *ln = listLast(server.aof_rewrite_buf_blocks); // 指向最後一個緩存塊
aofrwblock *block = ln ? ln->value : NULL;
while(len) {
    if (block) { // 若是已經有至少一個緩存塊,那麼嘗試將內容追加到這個緩存塊裏面
        unsigned long thislen = (block->free < len) ? block->free : len;
        if (thislen) {  /* The current block is not already full. */
            memcpy(block->buf+block->used, s, thislen);
            block->used += thislen;
            block->free -= thislen;
            s += thislen;
            len -= thislen;
        }
    }
    if (len) {  // 最後一個緩存塊沒有放得下本次 data,那再申請一個 block
        int numblocks;
        block = zmalloc(sizeof(*block));
        block->free = AOF_RW_BUF_BLOCK_SIZE;
        block->used = 0;
        listAddNodeTail(server.aof_rewrite_buf_blocks,block);
        ... ...
    }
}

二是,給 aof_pipe_write_data_to_child 這個 fd 註冊寫事件,回調函數爲 aofChildWriteDiffData

/* Install a file event to send data to the rewrite child if there is
     * not one already. */
if (aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0) {
    aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child,
                      AE_WRITABLE, aofChildWriteDiffData, NULL);
}

這個屬於 aof 重寫的邏輯,後面章節會細說,這裏先留個心。

什麼時候進行命令追加

也就是說,何時會調用feedAppendOnlyFile 呢?有如下兩個時機。

propagate 函數

你們都知道,Redis 中命令執行的流程,即 processCommand -> call 。在 call 函數中會把某些命令寫入 AOF 文件。如何判斷某個命令是否須要寫入 AOF 呢?

在 server 結構體中維持了一個 dirty 計數器,dirty 記錄的是服務器狀態進行了多少次修改,每次作完 save/bgsave 執行完成後,會將 dirty 清 0,而使得服務器狀態修改的命令通常都須要寫入 AOF 文件和主從同步(排除某些特殊狀況)。

dirty = server.dirty;
c->cmd->proc(c);
dirty = server.dirty-dirty;
...
if (propagate_flags != PROPAGATE_NONE)
    propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);

propagate 函數中就會調用到 feedAppendOnlyFile

void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
               int flags)
{
    if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
        feedAppendOnlyFile(cmd,dbid,argv,argc);
    if (flags & PROPAGATE_REPL)
        replicationFeedSlaves(server.slaves,dbid,argv,argc);
}
propagateExpire 函數

當內存中帶有過時時間的 key 過時時,會向 AOF 寫入 del 命令。

void propagateExpire(redisDb *db, robj *key) {
    ...
    if (server.aof_state != AOF_OFF)
        feedAppendOnlyFile(server.delCommand,db->id,argv,2);
    replicationFeedSlaves(server.slaves,db->id,argv,2);
    ...
}

propagateExpire 函數在一些檢查 key 是否過時時會調用。

文件的寫入與同步

上一步中,將須要寫入 AOF 文件的數據先寫到了 aof_buf 變量中,那麼,接下來講一下如何將 aof_buf 的內容寫進 AOF 文件。

同步策略

爲了提升文件的寫入效率,在現代操做系統中,當用戶調用 write 函數試,將一些數據寫入到文件的時候,操做系統一般會將寫入的數據保存在一個內存緩衝區裏,等到緩衝區的空間被填滿,或者超過了指定的時限後,才真正地將緩衝區中的數據寫入磁盤。

這種作法雖然提升了效率,但也爲寫入數據帶來了安全問題,由於若是計算機宕機,那麼保存在內存緩衝區裏面的寫入數據將會丟失。

爲此,系統提供了 fsyncfdatasync 兩個同步函數,它們能夠強制讓操做系統當即將緩存區中的數據寫入到硬盤裏面,從而確保寫入數據的安全性。

要知道,這兩個系統調用函數都是阻塞式的,針對如何協調文件寫入與同步的關係,該版本 Redis 支持 3 種同步策略,可在配置文件中使用 appendfsync 項進行配置,有以下取值,

  • always。每次有新命令追加到 AOF文件 時就執行一次同步,,安全性最高,可是性能影響最大。
  • everysec。每秒執行一次同步。宕機只會丟失一秒鐘的命令。這算是一個折中方案。
  • no。將數據同步操做徹底交由操做系統處理,性能最好,可是數據可靠性最差。宕機將丟失同步 AOF 文件後的全部寫命令。

在 Redis 源碼中, 當程序運行在 Linux 系統上時,執行的是 fdatasync 函數,而在其餘系統上,則會執行 fsync 函數,即,

#ifdef __linux__
#define aof_fsync fdatasync
#else
#define aof_fsync fsync
#endif

:如下敘述均以 fsync 代稱。

如何寫入文件

寫入文件的邏輯在 flushAppendOnlyFile 函數中實現。下面分兩部分來看主要代碼。

文件寫入write 系統調用
...

// aof 緩存區內沒有數據須要寫入 disk,無需處理
if (sdslen(server.aof_buf) == 0) return;

// 若是 sync policy 設置成 everysec,
// sync_in_progress 表示是否有 fsync 任務在後臺
if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
    sync_in_progress = bioPendingJobsOfType(BIO_AOF_FSYNC) != 0;

// force=0(非強制寫入)時,若是後臺有 fsync 任務,推遲這次寫入,但推遲時間不超過 2s
if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
    if (sync_in_progress) {
        if (server.aof_flush_postponed_start == 0) { // 首次推遲 write,一次推遲 2s
            server.aof_flush_postponed_start = server.unixtime;
            return;
        } else if (server.unixtime - server.aof_flush_postponed_start < 2) {
            return;
        }

        // 不然,經過,繼續寫,由於咱們不能等待超過 2s
        server.aof_delayed_fsync++;
        serverLog(LL_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
    }
}
...
// 將 aof 緩衝區的內容寫到系統緩存區
nwritten = write(server.aof_fd, server.aof_buf, sdslen(server.aof_buf)); 
...
// 執行了 write 操做,因此要清零延遲 flush 的時間
server.aof_flush_postponed_start = 0;

首先會判斷 aof_buf 是否爲空,若是是,那麼不須要執行下面的邏輯,直接返回。

若是同步策略爲 everysec,那麼須要查看是否有 fsync 任務在後臺,調用 fsync 使用的是 Redis 中 bio ,若是對這個還不瞭解,能夠參考我以前的文章 《 Redis Bio 詳解 》。爲何要作這個判斷呢?

fsyncwrite 同一個 fd 時, write 必然阻塞。 當系統 IO 很是繁忙時, fsync() 可能會阻塞, 即便系統 IO 不繁忙, fsync 也會由於數據量大而慢。

所以對於 everysec 策略,須要儘可能保證 fsyncwrite 不一樣時操做同一個 fd。no 策略徹底把 fsync 交給了操做系統,操做系統何時 fsync ,無從得知。always 策略則是每次都要主從調用 fsync,也不必作判斷。所以,這裏的判斷,只針對 everysec 策略有效。

對於 everysec 策略,若是有 fsync 在執行,那麼本次 write 推遲 2 秒鐘,等到下次在進入本函數時,若是推遲時間超過 2 秒,那麼更新 aof_delayed_fsync 值(info 裏能夠查到),打印日誌 」 Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis. 「 ,以後進行 write 系統調用。固然了,系統也提供了 force 選項,去跳過這項是否要推遲 write 的檢查。

write 以後,將 aof_flush_postponed_start 推遲開始計時值清零,迎接下次檢查。

因此說,AOF 執行 everysec 策略時,若是剛好有 fsync 在長時間的執行,Redis 意外關閉會丟失最多兩秒的數據。若是 fsync 運行正常,只有當操做系統 crash 時纔會形成最多 1 秒的數據丟失。

收尾工做, write 結果處理

write 調用結果多是正常的,也多是異常的,那麼須要作不一樣的處理。首先主要看異常處理,

if (nwritten != (signed)sdslen(server.aof_buf)) {
    ...
    /* Log the AOF write error and record the error code. */
    if (nwritten == -1) {
        ...
    } else { // 若是僅寫了一部分,發生錯誤
    // 將追加的內容截斷,刪除了追加的內容,恢復成原來的文件
        if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
            ...
        } else {
            nwritten = -1;
        }
        server.aof_last_write_errno = ENOSPC;
    }

    // 若是是寫入的策略爲每次寫入就同步,沒法恢復這種策略的寫,由於咱們已經告知使用者,已經將寫的數據同步到磁盤了,所以直接退出程序
    if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
        ...
        exit(1);
    } else {
        // 設置執行write操做的狀態
        server.aof_last_write_status = C_ERR;
        if (nwritten > 0) {
            // 只能更新當前的 AOF 文件的大小
            server.aof_current_size += nwritten;
            // 刪除 AOF 緩衝區寫入的字節
            sdsrange(server.aof_buf,nwritten,-1);
        }
        return; /* We'll try again on the next call... */
    }
} else {
    /* Successful write(2). If AOF was in error state, restore the
     * OK state and log the event. 
     */
    if (server.aof_last_write_status == C_ERR) {
        serverLog(LL_WARNING, "AOF write error looks solved, Redis can write again.");
        server.aof_last_write_status = C_OK;
    }
}

寫入異常的判斷,nwritten != (signed)sdslen(server.aof_buf)write 的數據量與 aof_buf 的大小不一樣。當徹底沒寫入時,打個日誌就算了;當僅寫入了一部分數據時,使用 ftruncate 函數把 AOF 文件的內容恢復成原來的大小,以備下次從新寫入,nwritten 置爲 -1。使用 ftruncate 的緣由是怕操做系統執行了 fsync,所以須要把 AOF 文件的大小恢復。

若是執行的是 always 同步策略,那麼須要返回會客戶端錯誤。對於其餘策略,更新 aof_last_write_status ,以便知道上一次作 write 的結果,對於未徹底寫入的狀況,若是上面執行的 ftruncate 失敗,此時 nwritten > 0,須要更新 aof_current_size,從 aof_buf 中減去已經寫入的,防止下次有重複數據寫入,而後返回。

若是寫入成功,那麼視狀況更新 aof_last_write_status,表示這次 write 成功。

下面主要是正常狀況的處理。

/* nwritten = -1 時走不到這個步驟 */ 
server.aof_current_size += nwritten; // 正常 write,更新 aof_current_size

/* Re-use AOF buffer when it is small enough. The maximum comes from the
 * arena size of 4k minus some overhead (but is otherwise arbitrary).
 */
if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
    sdsclear(server.aof_buf);
} else {
    sdsfree(server.aof_buf);
    server.aof_buf = sdsempty();
}

/* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
 * children doing I/O in the background. */
if (server.aof_no_fsync_on_rewrite && (server.aof_child_pid != -1 || server.rdb_child_pid != -1)) return;

/* Perform the fsync if needed. */
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
    /* aof_fsync is defined as fdatasync() for Linux in order to avoid
         * flushing metadata. */
    latencyStartMonitor(latency);
    aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */
    latencyEndMonitor(latency);
    latencyAddSampleIfNeeded("aof-fsync-always",latency);
    server.aof_last_fsync = server.unixtime;
} else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC && 
            server.unixtime > server.aof_last_fsync)) {
    if (!sync_in_progress) aof_background_fsync(server.aof_fd); // 若是沒有正在執行同步,那麼建立一個後臺任務
    server.aof_last_fsync = server.unixtime;
}

aof_buf 清空,而後根據不一樣策略進行同步。always 策略時,主動調用 fsync; everysec 策略,則建立 fsync bio 任務。

另外,有配置項 no-appendfsync-on-rewrite 去決定,當子進程在作 aofrewrite/bgsave 時是否要進行 fsync

什麼時候進行文件寫入

也就是,何時會調用 flushAppendOnlyFile 函數,有如下三個時機。

beforeSleep 函數
Redis 的服務器進程就是一個事件循環,這個循環中的文件事件負責接收客戶端請求,以及向客戶端發送命令回覆,而時間事件則負責像 serverCron 函數這樣須要定時運行的函數。

對於 Redis 的事件機制能夠參考我以前的文章 《Redis 中的事件》。

由於服務器在處理文件事件時可能會執行寫命令,使得一些內容被追加到 aof_buf 緩衝區裏面,因此在服務器每次結束一個事件循環以前,都會調用 flushAppendOnlyFile 函數,考慮是否須要將 aof_buf 緩衝區中的內容寫入和同步到 AOF 文件裏面。即,

void beforeSleep(struct aeEventLoop *eventLoop) {
    ...
    /* Write the AOF buffer on disk */
    flushAppendOnlyFile(0);
    ...
}

這裏的調用是非強制寫入(force = 0)。

serverCron 函數

Redis 中的時間事件,按期執行 serverCron 函數(從 Redis 2.8 開始,用戶能夠經過修改 hz 選項來調整 serverCron的每秒執行次數),作一些瑣事,好比更新服務器各項統計信息、關閉清理客戶端、作 AOF 和 RDB 等。

/* AOF postponed flush: Try at every cron cycle if the slow fsync completed. */
if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);

若是上次 AOF 寫入推遲了,那麼再次嘗試非強制寫入。

run_with_period(1000) {
    if (server.aof_last_write_status == C_ERR)
        flushAppendOnlyFile(0);
}

每秒鐘檢查,若是上次寫入 AOF 文件失敗了,再次嘗試非強制寫入。由於須要及時去處理 aof_buf,以及重置 AOF 寫入狀態的變量 aof_last_write_status,每秒作檢查,這個頻率是足夠的。

stopAppendOnly 函數

當 AOF 功能要關閉時,會調用 stopAppendOnly 函數,嘗試一次強制寫入,即盡最大努力去保存最多的數據。

void stopAppendOnly(void) {
    serverAssert(server.aof_state != AOF_OFF);
    flushAppendOnlyFile(1);
    aof_fsync(server.aof_fd);
    close(server.aof_fd);
}

強制寫入,並刷盤。

AOF 文件載入

當 Redis 服務器進程啓動時,須要調用 loadDataFromDisk 函數去加載數據。

void loadDataFromDisk(void) {
    long long start = ustime();
    if (server.aof_state == AOF_ON) { // 開啓了 aof
        if (loadAppendOnlyFile(server.aof_filename) == C_OK)
            serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
    } else {
        if (rdbLoad(server.rdb_filename) == C_OK) {
            serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds",
                (float)(ustime()-start)/1000000);
        } else if (errno != ENOENT) {
            serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno));
            exit(1);
        }
    }
}

能夠看到,若是開啓了 AOF 功能,就會調用 loadAppendOnlyFile 函數,加載 AOF 文件中的數據到內存中。不然,會去調用 rdbLoad 函數,加載 RDB 文件。加載 AOF 文件的設計頗有意思。

FILE *fp = fopen(filename,"r");
struct redis_stat sb;
int old_aof_state = server.aof_state;
long loops = 0;
off_t valid_up_to = 0; /* Offset of the latest well-formed command loaded. */

// 檢查文件的正確性, 存在,而且不爲空
if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
    server.aof_current_size = 0;
    fclose(fp);
    return C_ERR;
}
if (fp == NULL) {
    serverLog(LL_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
    exit(1);
}
// 暫時關掉 AOF, 防止向該 filename 中寫入新的 AOF 數據
server.aof_state = AOF_OFF;

首先,空文件沒有必要再去加載了,提早返回。

而後,暫時關閉 AOF 功能,這是爲了防止在加載 AOF 文件的過程當中,又有新的數據寫進來

fakeClient = createFakeClient(); // 建立一個不帶網絡鏈接的僞客戶端
startLoading(fp);                // 標記正在 load db,loading = 1

// 讀 AOF 文件
while(1) {
    int argc, j;
    unsigned long len;
    robj **argv;
    char buf[128];
    sds argsds;
    struct redisCommand *cmd;
    ... ...
        // 如執行命令 SET keytest val,那麼寫入 AOF 文件中的格式爲
        // *3\r\n$3\r\nSET\r\n$7\r\nkeytest\r\n$3\r\nval\r\n
        if (fgets(buf,sizeof(buf),fp) == NULL) { // 按行讀取 AOF 文件,*3
            if (feof(fp))
                break;
            else
                goto readerr;
        }

    if (buf[0] != '*') goto fmterr; // 判斷協議是否正確
    if (buf[1] == '\0') goto readerr; // 數據完整判斷
    argc = atoi(buf+1);
    if (argc < 1) goto fmterr;

    argv = zmalloc(sizeof(robj*)*argc);
    fakeClient->argc = argc;
    fakeClient->argv = argv;

    for (j = 0; j < argc; j++) {
        if (fgets(buf,sizeof(buf),fp) == NULL) { // 依次讀到 $3, $7, $3
            fakeClient->argc = j; /* Free up to j-1. */
            freeFakeClientArgv(fakeClient);
            goto readerr;
        }
        if (buf[0] != '$') goto fmterr;
        len = strtol(buf+1,NULL,10); // 參數長度
        argsds = sdsnewlen(NULL,len);
        if (len && fread(argsds,len,1,fp) == 0) { // 依次讀到 SET/ keytest/ val
            sdsfree(argsds);
            fakeClient->argc = j; /* Free up to j-1. */
            freeFakeClientArgv(fakeClient);
            goto readerr;
        }
        argv[j] = createObject(OBJ_STRING,argsds);
        if (fread(buf,2,1,fp) == 0) { // 讀到 \r\n
            fakeClient->argc = j+1; /* Free up to j. */
            freeFakeClientArgv(fakeClient);
            goto readerr; /* discard CRLF */
        }
    }

    /* Command lookup */
    cmd = lookupCommand(argv[0]->ptr);
    if (!cmd) {
        serverLog(LL_WARNING,"Unknown command '%s' reading the append only file", (char*)argv[0]->ptr);
        exit(1);
    }

    /* Run the command in the context of a fake client */
    cmd->proc(fakeClient);

    /* The fake client should not have a reply */
    serverAssert(fakeClient->bufpos == 0 && listLength(fakeClient->reply) == 0);
    /* The fake client should never get blocked */
    serverAssert((fakeClient->flags & CLIENT_BLOCKED) == 0);

    /* Clean up. Command code may have changed argv/argc so we use the
         * argv/argc of the client instead of the local variables. */
    freeFakeClientArgv(fakeClient);
    if (server.aof_load_truncated) valid_up_to = ftello(fp);
}

上面這部分是加載 AOF 文件的關鍵,以 SET keytest val 命令對應的 AOF 文件內容 *3\r\n$3\r\nSET\r\n$7\r\nkeytest\r\n$3\r\nval\r\n 爲例,能夠更好地理解上面的邏輯。因爲 AOF 文件中存儲的數據與客戶端發送的請求格式相同徹底符合 Redis 的通訊協議,所以 Redis Server 建立僞客戶端 fakeClient,將解析後的 AOF 文件數據像客戶端請求同樣調用各類指令,cmd->proc(fakeClient),將 AOF 文件中的數據重現到 Redis Server 數據庫中。

完成以上邏輯後,進行一些收尾工做,如改回 AOF 狀態爲 ON,釋放僞客戶端等,並處理一些異常狀況,這裏就不展開細講了。

參考

  1. Copy On Write 機制瞭解一下
  2. Redis · 原理介紹 · 利用管道優化aofrewrite
相關文章
相關標籤/搜索