Redis RDB 持久化詳解

  Redis 是一種內存數據庫,將數據保存在內存中,讀寫效率要比傳統的將數據保存在磁盤上的數據庫要快不少。可是一旦進程退出,Redis 的數據就會丟失。node

  爲了解決這個問題,Redis 提供了 RDB 和 AOF 兩種持久化方案,將內存中的數據保存到磁盤中,避免數據丟失。redis

antirez 在《Redis 持久化解密》一文中說,通常來講有三種常見的策略來進行持久化操做,防止數據損壞:sql

  • 方法1 是數據庫不關心發生故障,在數據文件損壞後經過數據備份或者快照來進行恢復。Redis 的 RDB 持久化就是這種方式。數據庫

  • 方法2 是數據庫使用操做日誌,每次操做時記錄操做行爲,以便在故障後經過日誌恢復到一致性的狀態。由於操做日誌是順序追加的方式寫的,因此不會出現操做日誌也沒法恢復的狀況。相似於 Mysql 的 redo 和 undo 日誌。數組

  • 方法3 是數據庫不進行老數據的修改,只是以追加方式去完成寫操做,這樣數據自己就是一份日誌,這樣就永遠不會出現數據沒法恢復的狀況了。CouchDB就是此作法的優秀範例。緩存

RDB 就是第一種方法,它就是把當前 Redis 進程的數據生成時間點快照( point-in-time snapshot ) 保存到存儲設備的過程。安全

RDB 的使用

RDB 觸發機制分爲使用指令手動觸發和 redis.conf 配置自動觸發。服務器

手動觸發 Redis 進行 RDB 持久化的指令的爲:數據結構

  • save ,該指令會阻塞當前 Redis 服務器,執行 save 指令期間,Redis 不能處理其餘命令,直到 RDB 過程完成爲止。less

  • bgsave,執行該命令時,Redis 會在後臺異步執行快照操做,此時 Redis 仍然能夠相應客戶端請求。具體操做是 Redis 進程執行 fork 操做建立子進程,RDB 持久化過程由子進程負責,完成後自動結束。Redis 只會在 fork 期間發生阻塞,可是通常時間都很短。可是若是 Redis 數據量特別大,fork 時間就會變長,並且佔用內存會加倍,這一點須要特別注意。

自動觸發 RDB 的默認配置以下所示:

1 save 900 1 # 表示900 秒內若是至少有 1 個 key 的值變化,則觸發RDB
2 save 300 10 # 表示300 秒內若是至少有 10 個 key 的值變化,則觸發RDB
3 save 60 10000 # 表示60 秒內若是至少有 10000 個 key 的值變化,則觸發RDB

  若是不須要 Redis 進行持久化,那麼能夠註釋掉全部的 save 行來停用保存功能,也能夠直接一個空字符串來停用持久化:save ""。

  Redis 服務器週期操做函數 serverCron 默認每一個 100 毫秒就會執行一次,該函數用於正在運行的服務器進行維護,它的一項工做就是檢查 save 選項所設置的條件是否有一項被知足,若是知足的話,就執行 bgsave 指令。

RDB 總體流程

  瞭解了 RDB 的基礎使用後,咱們要繼續深刻對 RDB持久化的學習。在此以前,咱們能夠先思考一下如何實現一個持久化機制,畢竟這是不少中間件所需的一個模塊。

  首先,持久化保存的文件內容結構必須是緊湊的,特別對於數據庫來講,須要持久化的數據量十分大,須要保證持久化文件不至於佔用太多存儲。其次,進行持久化時,中間件應該還能夠快速地響應用戶請求,持久化的操做應該儘可能少影響中間件的其餘功能。最後,畢竟持久化會消耗性能,如何在性能和數據安全性之間作出平衡,如何靈活配置觸發持久化操做。

  接下來咱們將帶着這些問題,到源碼中尋求答案。

本文中的源碼來自 Redis 5.0.2,RDB持久化過程的相關源碼都在 rdb.c 文件中。其中大概的流程以下圖所示。

 

  上圖代表了三種觸發 RDB 持久化的手段之間的總體關係。經過 serverCron 自動觸發的 RDB 至關於直接調用了 bgsave 指令的流程進行處理。而 bgsave 的處理流程啓動子進程後,調用了 save 指令的處理流程。

自動觸發 RDB 持久化

  如上圖所示, redisServer 結構體的 save_params指向擁有三個值的數組,該數組的值與 redis.conf 文件中 save 配置項一一對應。分別是 save9001save30010save6010000dirty 記錄着有多少鍵值發生變化, lastsave記錄着上次 RDB 持久化的時間。

  而 serverCron 函數就是遍歷該數組的值,檢查當前 Redis 狀態是否符合觸發 RDB 持久化的條件,好比說距離上次 RDB 持久化過去了 900 秒而且有至少一條數據發生變動。

 1 int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
 2     ···
 3     /* Check if a background saving or AOF rewrite in progress terminated. */
 4     /* 判斷後臺是否正在進行 rdb 或者 aof 操做 */
 5     if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
 6         ldbPendingChildren())
 7     {
 8         int statloc;
 9         pid_t pid;
10 
11         if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
12             int exitcode = WEXITSTATUS(statloc);
13             int bysignal = 0;
14 
15             if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
16 
17             if (pid == -1) {
18                 serverLog(LL_WARNING,"wait3() returned an error: %s. "
19                     "rdb_child_pid = %d, aof_child_pid = %d",
20                     strerror(errno),
21                     (int) server.rdb_child_pid,
22                     (int) server.aof_child_pid);
23             } else if (pid == server.rdb_child_pid) {
24                 backgroundSaveDoneHandler(exitcode,bysignal);
25                 if (!bysignal && exitcode == 0) receiveChildInfo();
26             } else if (pid == server.aof_child_pid) {
27                 backgroundRewriteDoneHandler(exitcode,bysignal);
28                 if (!bysignal && exitcode == 0) receiveChildInfo();
29             } else {
30                 if (!ldbRemoveChild(pid)) {
31                     serverLog(LL_WARNING,
32                         "Warning, detected child with unmatched pid: %ld",
33                         (long)pid);
34                 }
35             }
36             updateDictResizePolicy();
37             closeChildInfoPipe();
38         }
39     } else {
40         // 到這兒就能肯定 當前木有進行 rdb 或者 aof 操做
41         // 遍歷每個 rdb 保存條件
42         /* If there is not a background saving/rewrite in progress check if
43          * we have to save/rewrite now. */
44         for (j = 0; j < server.saveparamslen; j++) {
45             struct saveparam *sp = server.saveparams+j;
46 
47             /**
48              * Save if we reached the given amount of changes,
49              * the given amount of seconds, and if the latest bgsave was
50              * successful or if, in case of an error, at least
51              * CONFIG_BGSAVE_RETRY_DELAY seconds already elapsed.
52              * 若是數據保存記錄 大於規定的修改次數 且距離 上一次保存的時間大於規定時間或者上次BGSAVE命令執行成功,
53              * 才執行 BGSAVE 操做
54              */
55             if (server.dirty >= sp->changes &&
56                 server.unixtime-server.lastsave > sp->seconds &&
57                 (server.unixtime-server.lastbgsave_try >
58                  CONFIG_BGSAVE_RETRY_DELAY ||
59                  server.lastbgsave_status == C_OK))
60             {
61                 //記錄日誌
62                 serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
63                     sp->changes, (int)sp->seconds);
64                 rdbSaveInfo rsi, *rsiptr;
65                 rsiptr = rdbPopulateSaveInfo(&rsi);
66                 // 異步保存操做
67                 rdbSaveBackground(server.rdb_filename,rsiptr);
68                 break;
69             }
70         }
71     ···
72     server.cronloops++;
73     return 1000/server.hz;
74 }

若是符合觸發 RDB 持久化的條件, serverCron會調用 rdbSaveBackground函數,也就是 bgsave 指令會觸發的函數。

子進程後臺執行 RDB 持久化

執行 bgsave 指令時,Redis 會先觸發 bgsaveCommand 進行當前狀態檢查,而後纔會調用 rdbSaveBackground,其中的邏輯以下圖所示。

 

 

 

rdbSaveBackground 函數中最主要的工做就是調用 fork 命令生成子流程,而後在子流程中執行 rdbSave函數,也就是 save 指令最終會觸發的函數。

 1 int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
 2     pid_t childpid;
 3     long long start;
 4 
 5 
 6     // 檢查後臺是否正在執行 aof 或者 rdb 操做
 7     if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
 8 
 9     // 拿出 數據保存記錄,保存爲 上次記錄
10     server.dirty_before_bgsave = server.dirty;
11     // bgsave 時間
12     server.lastbgsave_try = time(NULL);
13     openChildInfoPipe();
14 
15     start = ustime();
16     // fork 子進程
17     if ((childpid = fork()) == 0) {
18         int retval;
19 
20         /* Child */
21         /* 關閉子進程繼承的 socket 監聽 */
22         closeListeningSockets(0);
23         // 子進程 title 修改
24         redisSetProcTitle("redis-rdb-bgsave");
25         // 執行rdb 寫入操做
26         retval = rdbSave(filename,rsi);
27         // 執行完畢之後
28         if (retval == C_OK) {
29             size_t private_dirty = zmalloc_get_private_dirty(-1);
30 
31             if (private_dirty) {
32                 serverLog(LL_NOTICE,
33                     "RDB: %zu MB of memory used by copy-on-write",
34                     private_dirty/(1024*1024));
35             }
36 
37             server.child_info_data.cow_size = private_dirty;
38             sendChildInfo(CHILD_INFO_TYPE_RDB);
39         }
40         // 退出子進程
41         exitFromChild((retval == C_OK) ? 0 : 1);
42     } else {
43         /* Parent */
44         /* 父進程,進行fork時間的統計和信息記錄,好比說rdb_save_time_start、rdb_child_pid、和rdb_child_type */
45         server.stat_fork_time = ustime()-start;
46         server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
47         latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
48         if (childpid == -1) {
49             closeChildInfoPipe();
50             server.lastbgsave_status = C_ERR;
51             serverLog(LL_WARNING,"Can't save in background: fork: %s",
52                 strerror(errno));
53             return C_ERR;
54         }
55         serverLog(LL_NOTICE,"Background saving started by pid %d",childpid);
56         // rdb 保存開始時間 bgsave 子進程
57         server.rdb_save_time_start = time(NULL);
58         server.rdb_child_pid = childpid;
59         server.rdb_child_type = RDB_CHILD_TYPE_DISK;
60         updateDictResizePolicy();
61         return C_OK;
62     }
63     return C_OK; /* unreached */
64 }

  爲何 Redis 使用子進程而不是線程來進行後臺 RDB 持久化呢?主要是出於Redis性能的考慮,咱們知道Redis對客戶端響應請求的工做模型是單進程和單線程的,若是在主進程內啓動一個線程,這樣會形成對數據的競爭條件。因此爲了不使用鎖下降性能,Redis選擇啓動新的子進程,獨立擁有一份父進程的內存拷貝,以此爲基礎執行RDB持久化。

  可是須要注意的是,fork 會消耗必定時間,而且父子進程所佔據的內存是相同的,當 Redis 鍵值較大時,fork 的時間會很長,這段時間內 Redis 是沒法響應其餘命令的。除此以外,Redis 佔據的內存空間會翻倍。

生成 RDB 文件,而且持久化到硬盤

Redis 的 rdbSave 函數是真正進行 RDB 持久化的函數,它的大體流程以下:

  • 首先打開一個臨時文件,

  • 調用 rdbSaveRio函數,將當前 Redis 的內存信息寫入到這個臨時文件中,

  • 接着調用 fflush、 fsync 和 fclose 接口將文件寫入磁盤中,

  • 使用 rename 將臨時文件更名爲 正式的 RDB 文件,

  • 最後記錄 dirty 和 lastsave等狀態信息。這些狀態信息在 serverCron時會使用到。

 1 /**
 2  * Save the DB on disk. Return C_ERR on error, C_OK on success.
 3  * 生成 RDB 文件,而且持久化到硬盤
 4  */
 5 int rdbSave(char *filename, rdbSaveInfo *rsi) {
 6     char tmpfile[256];
 7     // 當前工做目錄
 8     char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
 9     FILE *fp;
10     rio rdb;
11     int error = 0;
12 
13     /* 生成tmpfile文件名 temp-[pid].rdb */
14     snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
15     /* 打開文件 */
16     fp = fopen(tmpfile,"w");
17     if (!fp) {
18         char *cwdp = getcwd(cwd,MAXPATHLEN);
19         serverLog(LL_WARNING,
20             "Failed opening the RDB file %s (in server root dir %s) "
21             "for saving: %s",
22             filename,
23             cwdp ? cwdp : "unknown",
24             strerror(errno));
25         return C_ERR;
26     }
27 
28     /* 初始化rio結構 */
29     rioInitWithFile(&rdb,fp);
30 
31     if (server.rdb_save_incremental_fsync)
32         rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);
33 
34     if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) {
35         errno = error;
36         goto werr;
37     }
38 
39     /* Make sure data will not remain on the OS's output buffers */
40     if (fflush(fp) == EOF) goto werr;
41     if (fsync(fileno(fp)) == -1) goto werr;
42     if (fclose(fp) == EOF) goto werr;
43 
44     /* Use RENAME to make sure the DB file is changed atomically only
45      * if the generate DB file is ok. */
46     /* 從新命名 rdb 文件,把以前臨時的名稱修改成正式的 rdb 文件名稱 */
47     if (rename(tmpfile,filename) == -1) {
48         // 異常處理
49         char *cwdp = getcwd(cwd,MAXPATHLEN);
50         serverLog(LL_WARNING,
51             "Error moving temp DB file %s on the final "
52             "destination %s (in server root dir %s): %s",
53             tmpfile,
54             filename,
55             cwdp ? cwdp : "unknown",
56             strerror(errno));
57         unlink(tmpfile);
58         return C_ERR;
59     }
60 
61     // 寫入完成,打印日誌
62     serverLog(LL_NOTICE,"DB saved on disk");
63     // 清理數據保存記錄
64     server.dirty = 0;
65     // 最後一次完成 SAVE 命令的時間
66     server.lastsave = time(NULL);
67     // 最後一次 bgsave 的狀態置位 成功
68     server.lastbgsave_status = C_OK;
69     return C_OK;
70 
71 werr:
72     serverLog(LL_WARNING,"Write error saving DB on disk: %s", strerror(errno));
73     fclose(fp);
74     unlink(tmpfile);
75     return C_ERR;
76 }

  這裏要簡單說一下 fflushfsync的區別。它們倆都是用於刷緩存,可是所屬的層次不一樣。fflush函數用於 FILE* 指針上,將緩存數據從應用層緩存刷新到內核中,而 fsync 函數則更加底層,做用於文件描述符,用於將內核緩存刷新到物理設備上。

內存數據到 RDB 文件

rdbSaveRio 會將 Redis 內存中的數據以相對緊湊的格式寫入到文件中,其文件格式的示意圖以下所示。

rdbSaveRio函數的寫入大體流程以下:

  • 先寫入 REDIS 魔法值,而後是 RDB 文件的版本( rdb_version ),額外輔助信息 ( aux )。輔助信息中包含了 Redis 的版本,內存佔用和複製庫( repl-id )和偏移量( repl-offset )等。

  • 而後 rdbSaveRio 會遍歷當前 Redis 的全部數據庫,將數據庫的信息依次寫入。先寫入 RDB_OPCODE_SELECTDB識別碼和數據庫編號,接着寫入 RDB_OPCODE_RESIZEDB識別碼和數據庫鍵值數量和待失效鍵值數量,最後會遍歷全部的鍵值,依次寫入。

  • 在寫入鍵值時,當該鍵值有失效時間時,會先寫入 RDB_OPCODE_EXPIRETIME_MS識別碼和失效時間,而後寫入鍵值類型的識別碼,最後再寫入鍵和值。

  • 寫完數據庫信息後,還會把 Lua 相關的信息寫入,最後再寫入 RDB_OPCODE_EOF結束符識別碼和校驗值。

  1 /* Produces a dump of the database in RDB format sending it to the specified
  2  * Redis I/O channel. On success C_OK is returned, otherwise C_ERR
  3  * is returned and part of the output, or all the output, can be
  4  * missing because of I/O errors.
  5  *
  6  * When the function returns C_ERR and if 'error' is not NULL, the
  7  * integer pointed by 'error' is set to the value of errno just after the I/O
  8  * error. */
  9 int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
 10     dictIterator *di = NULL;
 11     dictEntry *de;
 12     char magic[10];
 13     int j;
 14     uint64_t cksum;
 15     size_t processed = 0;
 16 
 17     if (server.rdb_checksum)
 18         rdb->update_cksum = rioGenericUpdateChecksum;
 19     snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
 20     /* 1 寫入 magic字符'REDIS' 和 RDB 版本 */
 21     if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
 22     /**
 23      * 2 寫入輔助信息  REDIS版本,服務器操做系統位數,當前時間,
 24      * 複製信息好比repl-stream-db,repl-id和repl-offset等等數據
 25      */
 26     if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr;
 27 
 28     /* 3 遍歷每個數據庫,逐個數據庫數據保存 */
 29     for (j = 0; j < server.dbnum; j++) {
 30         /* 獲取數據庫指針地址和數據庫字典 */
 31         redisDb *db = server.db+j;
 32         dict *d = db->dict;
 33         if (dictSize(d) == 0) continue;
 34         di = dictGetSafeIterator(d);
 35 
 36         /* Write the SELECT DB opcode */
 37         /* 3.1 寫入數據庫部分的開始標識 */
 38         if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr;
 39         /* 3.2 寫入當前數據庫號 */
 40         if (rdbSaveLen(rdb,j) == -1) goto werr;
 41 
 42         /* Write the RESIZE DB opcode. We trim the size to UINT32_MAX, which
 43          * is currently the largest type we are able to represent in RDB sizes.
 44          * However this does not limit the actual size of the DB to load since
 45          * these sizes are just hints to resize the hash tables. */
 46         uint64_t db_size, expires_size;
 47         /* 獲取數據庫字典大小和過時鍵字典大小,此處代碼邏輯有簡化 */
 48         db_size = dictSize(db->dict);
 49         expires_size = dictSize(db->expires);
 50         /* 3.3 寫入當前待寫入數據的類型,此處爲 RDB_OPCODE_RESIZEDB,表示數據庫大小 */
 51         if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;
 52         /* 3.4 寫入獲取數據庫字典大小和過時鍵字典大小 */
 53         if (rdbSaveLen(rdb,db_size) == -1) goto werr;
 54         if (rdbSaveLen(rdb,expires_size) == -1) goto werr;
 55 
 56         /* Iterate this DB writing every entry */
 57         /* 4 遍歷當前數據庫的鍵值對 */
 58         while((de = dictNext(di)) != NULL) {
 59             sds keystr = dictGetKey(de);
 60             robj key, *o = dictGetVal(de);
 61             long long expire;
 62 
 63             /* 初始化 key,由於操做的是 key 字符串對象,而不是直接操做 鍵的字符串內容 */
 64             initStaticStringObject(key,keystr);
 65             /* 獲取鍵的過時數據 */
 66             expire = getExpire(db,&key);
 67             /* 4.1 保存鍵值對數據 */
 68             if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr;
 69 
 70             /* When this RDB is produced as part of an AOF rewrite, move
 71              * accumulated diff from parent to child while rewriting in
 72              * order to have a smaller final write. */
 73             if (flags & RDB_SAVE_AOF_PREAMBLE &&
 74                 rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
 75             {
 76                 processed = rdb->processed_bytes;
 77                 aofReadDiffFromParent();
 78             }
 79         }
 80         dictReleaseIterator(di);
 81         di = NULL; /* So that we don't release it again on error. */
 82     }
 83 
 84     /* If we are storing the replication information on disk, persist
 85      * the script cache as well: on successful PSYNC after a restart, we need
 86      * to be able to process any EVALSHA inside the replication backlog the
 87      * master will send us. */
 88     /* 5 保存 Lua 腳本*/
 89     if (rsi && dictSize(server.lua_scripts)) {
 90         di = dictGetIterator(server.lua_scripts);
 91         while((de = dictNext(di)) != NULL) {
 92             robj *body = dictGetVal(de);
 93             if (rdbSaveAuxField(rdb,"lua",3,body->ptr,sdslen(body->ptr)) == -1)
 94                 goto werr;
 95         }
 96         dictReleaseIterator(di);
 97         di = NULL; /* So that we don't release it again on error. */
 98     }
 99 
100     /* EOF opcode */
101     /* 6 寫入結束符 */
102     if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;
103 
104     /* CRC64 checksum. It will be zero if checksum computation is disabled, the
105      * loading code skips the check in this case. */
106     /* 7 寫入CRC64校驗和 */
107     cksum = rdb->cksum;
108     memrev64ifbe(&cksum);
109     if (rioWrite(rdb,&cksum,8) == 0) goto werr;
110     return C_OK;
111 
112 werr:
113     if (error) *error = errno;
114     if (di) dictReleaseIterator(di);
115     return C_ERR;
116 }

rdbSaveRio在寫鍵值時,會調用 rdbSaveKeyValuePair 函數。該函數會依次寫入鍵值的過時時間,鍵的類型,鍵和值。

 1 /**
 2  * Save a key-value pair, with expire time, type, key, value.
 3  * On error -1 is returned.
 4  * On success if the key was actually saved 1 is returned, otherwise 0
 5  * is returned (the key was already expired).
 6  * 該函數會依次寫入鍵值的過時時間,鍵的類型,鍵和值。
 7  */
 8 int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) {
 9     int savelru = server.maxmemory_policy & MAXMEMORY_FLAG_LRU;
10     int savelfu = server.maxmemory_policy & MAXMEMORY_FLAG_LFU;
11 
12     /* Save the expire time */
13     /* 若是有過時信息 */
14     if (expiretime != -1) {
15         /* 保存過時信息標識 */
16         if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;
17         /* 保存過時具體數據內容 */
18         if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;
19     }
20 
21     /* Save the LRU info. */
22     if (savelru) {
23         uint64_t idletime = estimateObjectIdleTime(val);
24         idletime /= 1000; /* Using seconds is enough and requires less space.*/
25         if (rdbSaveType(rdb,RDB_OPCODE_IDLE) == -1) return -1;
26         if (rdbSaveLen(rdb,idletime) == -1) return -1;
27     }
28 
29     /* Save the LFU info. */
30     if (savelfu) {
31         uint8_t buf[1];
32         buf[0] = LFUDecrAndReturn(val);
33         /* We can encode this in exactly two bytes: the opcode and an 8
34          * bit counter, since the frequency is logarithmic with a 0-255 range.
35          * Note that we do not store the halving time because to reset it
36          * a single time when loading does not affect the frequency much. */
37         if (rdbSaveType(rdb,RDB_OPCODE_FREQ) == -1) return -1;
38         if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
39     }
40 
41     /* Save type, key, value */
42     /* 保存鍵值對 類型的標識 */
43     if (rdbSaveObjectType(rdb,val) == -1) return -1;
44     /* 保存鍵值對 鍵的內容 */
45     if (rdbSaveStringObject(rdb,key) == -1) return -1;
46     /* 保存鍵值對 值的內容 */
47     if (rdbSaveObject(rdb,val) == -1) return -1;
48     return 1;
49 }

根據鍵的不一樣類型寫入不一樣格式,各類鍵值的類型和格式以下所示。

  Redis 有龐大的對象和數據結構體系,它使用六種底層數據結構構建了包含字符串對象、列表對象、哈希對象、集合對象和有序集合對象的對象系統。

  不一樣的數據結構進行 RDB 持久化的格式都不一樣。咱們今天只看一下集合對象是如何持久化的。

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk= watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=
  1 /**
  2  * Save a Redis object.
  3  * Returns -1 on error, number of bytes written on success.
  4  * 保存Redis對象
  5  */
  6 ssize_t rdbSaveObject(rio *rdb, robj *o) {
  7     ssize_t n = 0, nwritten = 0;
  8 
  9     if (o->type == OBJ_STRING) {
 10         /* Save a string value */
 11         if ((n = rdbSaveStringObject(rdb,o)) == -1) return -1;
 12         nwritten += n;
 13     } else if (o->type == OBJ_LIST) {
 14         /* Save a list value */
 15         if (o->encoding == OBJ_ENCODING_QUICKLIST) {
 16             quicklist *ql = o->ptr;
 17             quicklistNode *node = ql->head;
 18 
 19             if ((n = rdbSaveLen(rdb,ql->len)) == -1) return -1;
 20             nwritten += n;
 21 
 22             while(node) {
 23                 if (quicklistNodeIsCompressed(node)) {
 24                     void *data;
 25                     size_t compress_len = quicklistGetLzf(node, &data);
 26                     if ((n = rdbSaveLzfBlob(rdb,data,compress_len,node->sz)) == -1) return -1;
 27                     nwritten += n;
 28                 } else {
 29                     if ((n = rdbSaveRawString(rdb,node->zl,node->sz)) == -1) return -1;
 30                     nwritten += n;
 31                 }
 32                 node = node->next;
 33             }
 34         } else {
 35             serverPanic("Unknown list encoding");
 36         }
 37     } else if (o->type == OBJ_SET) {
 38         /* Save a set value */
 39         if (o->encoding == OBJ_ENCODING_HT) {
 40             dict *set = o->ptr;
 41             // 集合迭代器
 42             dictIterator *di = dictGetIterator(set);
 43             dictEntry *de;
 44 
 45             // 寫入集合長度
 46             if ((n = rdbSaveLen(rdb,dictSize(set))) == -1) {
 47                 dictReleaseIterator(di);
 48                 return -1;
 49             }
 50             nwritten += n;
 51 
 52             // 遍歷集合元素
 53             while((de = dictNext(di)) != NULL) {
 54                 sds ele = dictGetKey(de);
 55                 // 以字符串的形式寫入,由於是SET 因此只寫入 Key 便可
 56                 if ((n = rdbSaveRawString(rdb,(unsigned char*)ele,sdslen(ele)))
 57                     == -1)
 58                 {
 59                     dictReleaseIterator(di);
 60                     return -1;
 61                 }
 62                 nwritten += n;
 63             }
 64             dictReleaseIterator(di);
 65         } else if (o->encoding == OBJ_ENCODING_INTSET) {
 66             size_t l = intsetBlobLen((intset*)o->ptr);
 67 
 68             if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
 69             nwritten += n;
 70         } else {
 71             serverPanic("Unknown set encoding");
 72         }
 73     } else if (o->type == OBJ_ZSET) {
 74         /* Save a sorted set value */
 75         if (o->encoding == OBJ_ENCODING_ZIPLIST) {
 76             size_t l = ziplistBlobLen((unsigned char*)o->ptr);
 77 
 78             if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
 79             nwritten += n;
 80         } else if (o->encoding == OBJ_ENCODING_SKIPLIST) {
 81             zset *zs = o->ptr;
 82             zskiplist *zsl = zs->zsl;
 83 
 84             if ((n = rdbSaveLen(rdb,zsl->length)) == -1) return -1;
 85             nwritten += n;
 86 
 87             /* We save the skiplist elements from the greatest to the smallest
 88              * (that's trivial since the elements are already ordered in the
 89              * skiplist): this improves the load process, since the next loaded
 90              * element will always be the smaller, so adding to the skiplist
 91              * will always immediately stop at the head, making the insertion
 92              * O(1) instead of O(log(N)). */
 93             zskiplistNode *zn = zsl->tail;
 94             while (zn != NULL) {
 95                 if ((n = rdbSaveRawString(rdb,
 96                     (unsigned char*)zn->ele,sdslen(zn->ele))) == -1)
 97                 {
 98                     return -1;
 99                 }
100                 nwritten += n;
101                 if ((n = rdbSaveBinaryDoubleValue(rdb,zn->score)) == -1)
102                     return -1;
103                 nwritten += n;
104                 zn = zn->backward;
105             }
106         } else {
107             serverPanic("Unknown sorted set encoding");
108         }
109     } else if (o->type == OBJ_HASH) {
110         /* Save a hash value */
111         if (o->encoding == OBJ_ENCODING_ZIPLIST) {
112             size_t l = ziplistBlobLen((unsigned char*)o->ptr);
113 
114             if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
115             nwritten += n;
116 
117         } else if (o->encoding == OBJ_ENCODING_HT) {
118             dictIterator *di = dictGetIterator(o->ptr);
119             dictEntry *de;
120 
121             if ((n = rdbSaveLen(rdb,dictSize((dict*)o->ptr))) == -1) {
122                 dictReleaseIterator(di);
123                 return -1;
124             }
125             nwritten += n;
126 
127             while((de = dictNext(di)) != NULL) {
128                 sds field = dictGetKey(de);
129                 sds value = dictGetVal(de);
130 
131                 if ((n = rdbSaveRawString(rdb,(unsigned char*)field,
132                         sdslen(field))) == -1)
133                 {
134                     dictReleaseIterator(di);
135                     return -1;
136                 }
137                 nwritten += n;
138                 if ((n = rdbSaveRawString(rdb,(unsigned char*)value,
139                         sdslen(value))) == -1)
140                 {
141                     dictReleaseIterator(di);
142                     return -1;
143                 }
144                 nwritten += n;
145             }
146             dictReleaseIterator(di);
147         } else {
148             serverPanic("Unknown hash encoding");
149         }
150     } else if (o->type == OBJ_STREAM) {
151         /* Store how many listpacks we have inside the radix tree. */
152         stream *s = o->ptr;
153         rax *rax = s->rax;
154         if ((n = rdbSaveLen(rdb,raxSize(rax))) == -1) return -1;
155         nwritten += n;
156 
157         /* Serialize all the listpacks inside the radix tree as they are,
158          * when loading back, we'll use the first entry of each listpack
159          * to insert it back into the radix tree. */
160         raxIterator ri;
161         raxStart(&ri,rax);
162         raxSeek(&ri,"^",NULL,0);
163         while (raxNext(&ri)) {
164             unsigned char *lp = ri.data;
165             size_t lp_bytes = lpBytes(lp);
166             if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1) return -1;
167             nwritten += n;
168             if ((n = rdbSaveRawString(rdb,lp,lp_bytes)) == -1) return -1;
169             nwritten += n;
170         }
171         raxStop(&ri);
172 
173         /* Save the number of elements inside the stream. We cannot obtain
174          * this easily later, since our macro nodes should be checked for
175          * number of items: not a great CPU / space tradeoff. */
176         if ((n = rdbSaveLen(rdb,s->length)) == -1) return -1;
177         nwritten += n;
178         /* Save the last entry ID. */
179         if ((n = rdbSaveLen(rdb,s->last_id.ms)) == -1) return -1;
180         nwritten += n;
181         if ((n = rdbSaveLen(rdb,s->last_id.seq)) == -1) return -1;
182         nwritten += n;
183 
184         /* The consumer groups and their clients are part of the stream
185          * type, so serialize every consumer group. */
186 
187         /* Save the number of groups. */
188         size_t num_cgroups = s->cgroups ? raxSize(s->cgroups) : 0;
189         if ((n = rdbSaveLen(rdb,num_cgroups)) == -1) return -1;
190         nwritten += n;
191 
192         if (num_cgroups) {
193             /* Serialize each consumer group. */
194             raxStart(&ri,s->cgroups);
195             raxSeek(&ri,"^",NULL,0);
196             while(raxNext(&ri)) {
197                 streamCG *cg = ri.data;
198 
199                 /* Save the group name. */
200                 if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1)
201                     return -1;
202                 nwritten += n;
203 
204                 /* Last ID. */
205                 if ((n = rdbSaveLen(rdb,cg->last_id.ms)) == -1) return -1;
206                 nwritten += n;
207                 if ((n = rdbSaveLen(rdb,cg->last_id.seq)) == -1) return -1;
208                 nwritten += n;
209 
210                 /* Save the global PEL. */
211                 if ((n = rdbSaveStreamPEL(rdb,cg->pel,1)) == -1) return -1;
212                 nwritten += n;
213 
214                 /* Save the consumers of this group. */
215                 if ((n = rdbSaveStreamConsumers(rdb,cg)) == -1) return -1;
216                 nwritten += n;
217             }
218             raxStop(&ri);
219         }
220     } else if (o->type == OBJ_MODULE) {
221         /* Save a module-specific value. */
222         RedisModuleIO io;
223         moduleValue *mv = o->ptr;
224         moduleType *mt = mv->type;
225         moduleInitIOContext(io,mt,rdb);
226 
227         /* Write the "module" identifier as prefix, so that we'll be able
228          * to call the right module during loading. */
229         int retval = rdbSaveLen(rdb,mt->id);
230         if (retval == -1) return -1;
231         io.bytes += retval;
232 
233         /* Then write the module-specific representation + EOF marker. */
234         mt->rdb_save(&io,mv->value);
235         retval = rdbSaveLen(rdb,RDB_MODULE_OPCODE_EOF);
236         if (retval == -1) return -1;
237         io.bytes += retval;
238 
239         if (io.ctx) {
240             moduleFreeContext(io.ctx);
241             zfree(io.ctx);
242         }
243         return io.error ? -1 : (ssize_t)io.bytes;
244     } else {
245         serverPanic("Unknown object type");
246     }
247     return nwritten;
248 }
View Code
相關文章
相關標籤/搜索