Redis 是一種內存數據庫,將數據保存在內存中,讀寫效率要比傳統的將數據保存在磁盤上的數據庫要快不少。可是一旦進程退出,Redis 的數據就會丟失。node
爲了解決這個問題,Redis 提供了 RDB 和 AOF 兩種持久化方案,將內存中的數據保存到磁盤中,避免數據丟失。redis
antirez 在《Redis 持久化解密》一文中說,通常來講有三種常見的策略來進行持久化操做,防止數據損壞:sql
方法1 是數據庫不關心發生故障,在數據文件損壞後經過數據備份或者快照來進行恢復。Redis 的 RDB 持久化就是這種方式。數據庫
方法2 是數據庫使用操做日誌,每次操做時記錄操做行爲,以便在故障後經過日誌恢復到一致性的狀態。由於操做日誌是順序追加的方式寫的,因此不會出現操做日誌也沒法恢復的狀況。相似於 Mysql 的 redo 和 undo 日誌。數組
方法3 是數據庫不進行老數據的修改,只是以追加方式去完成寫操做,這樣數據自己就是一份日誌,這樣就永遠不會出現數據沒法恢復的狀況了。CouchDB就是此作法的優秀範例。緩存
RDB 就是第一種方法,它就是把當前 Redis 進程的數據生成時間點快照( point-in-time snapshot ) 保存到存儲設備的過程。安全
RDB 的使用RDB 觸發機制分爲使用指令手動觸發和 redis.conf 配置自動觸發。服務器
手動觸發 Redis 進行 RDB 持久化的指令的爲:數據結構
save ,該指令會阻塞當前 Redis 服務器,執行 save 指令期間,Redis 不能處理其餘命令,直到 RDB 過程完成爲止。less
bgsave,執行該命令時,Redis 會在後臺異步執行快照操做,此時 Redis 仍然能夠相應客戶端請求。具體操做是 Redis 進程執行 fork
操做建立子進程,RDB 持久化過程由子進程負責,完成後自動結束。Redis 只會在 fork
期間發生阻塞,可是通常時間都很短。可是若是 Redis 數據量特別大,fork
時間就會變長,並且佔用內存會加倍,這一點須要特別注意。
自動觸發 RDB 的默認配置以下所示:
1 save 900 1 # 表示900 秒內若是至少有 1 個 key 的值變化,則觸發RDB 2 save 300 10 # 表示300 秒內若是至少有 10 個 key 的值變化,則觸發RDB 3 save 60 10000 # 表示60 秒內若是至少有 10000 個 key 的值變化,則觸發RDB
若是不須要 Redis 進行持久化,那麼能夠註釋掉全部的 save 行來停用保存功能,也能夠直接一個空字符串來停用持久化:save ""。
Redis 服務器週期操做函數 serverCron
默認每一個 100 毫秒就會執行一次,該函數用於正在運行的服務器進行維護,它的一項工做就是檢查 save 選項所設置的條件是否有一項被知足,若是知足的話,就執行 bgsave 指令。
瞭解了 RDB 的基礎使用後,咱們要繼續深刻對 RDB持久化的學習。在此以前,咱們能夠先思考一下如何實現一個持久化機制,畢竟這是不少中間件所需的一個模塊。
首先,持久化保存的文件內容結構必須是緊湊的,特別對於數據庫來講,須要持久化的數據量十分大,須要保證持久化文件不至於佔用太多存儲。其次,進行持久化時,中間件應該還能夠快速地響應用戶請求,持久化的操做應該儘可能少影響中間件的其餘功能。最後,畢竟持久化會消耗性能,如何在性能和數據安全性之間作出平衡,如何靈活配置觸發持久化操做。
接下來咱們將帶着這些問題,到源碼中尋求答案。
本文中的源碼來自 Redis 5.0.2,RDB持久化過程的相關源碼都在 rdb.c 文件中。其中大概的流程以下圖所示。
上圖代表了三種觸發 RDB 持久化的手段之間的總體關係。經過 serverCron
自動觸發的 RDB 至關於直接調用了 bgsave 指令的流程進行處理。而 bgsave 的處理流程啓動子進程後,調用了 save 指令的處理流程。
如上圖所示, redisServer
結構體的 save_params
指向擁有三個值的數組,該數組的值與 redis.conf 文件中 save 配置項一一對應。分別是 save9001
、 save30010
和 save6010000
。dirty
記錄着有多少鍵值發生變化, lastsave
記錄着上次 RDB 持久化的時間。
而 serverCron
函數就是遍歷該數組的值,檢查當前 Redis 狀態是否符合觸發 RDB 持久化的條件,好比說距離上次 RDB 持久化過去了 900 秒而且有至少一條數據發生變動。
1 int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { 2 ··· 3 /* Check if a background saving or AOF rewrite in progress terminated. */ 4 /* 判斷後臺是否正在進行 rdb 或者 aof 操做 */ 5 if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 || 6 ldbPendingChildren()) 7 { 8 int statloc; 9 pid_t pid; 10 11 if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) { 12 int exitcode = WEXITSTATUS(statloc); 13 int bysignal = 0; 14 15 if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc); 16 17 if (pid == -1) { 18 serverLog(LL_WARNING,"wait3() returned an error: %s. " 19 "rdb_child_pid = %d, aof_child_pid = %d", 20 strerror(errno), 21 (int) server.rdb_child_pid, 22 (int) server.aof_child_pid); 23 } else if (pid == server.rdb_child_pid) { 24 backgroundSaveDoneHandler(exitcode,bysignal); 25 if (!bysignal && exitcode == 0) receiveChildInfo(); 26 } else if (pid == server.aof_child_pid) { 27 backgroundRewriteDoneHandler(exitcode,bysignal); 28 if (!bysignal && exitcode == 0) receiveChildInfo(); 29 } else { 30 if (!ldbRemoveChild(pid)) { 31 serverLog(LL_WARNING, 32 "Warning, detected child with unmatched pid: %ld", 33 (long)pid); 34 } 35 } 36 updateDictResizePolicy(); 37 closeChildInfoPipe(); 38 } 39 } else { 40 // 到這兒就能肯定 當前木有進行 rdb 或者 aof 操做 41 // 遍歷每個 rdb 保存條件 42 /* If there is not a background saving/rewrite in progress check if 43 * we have to save/rewrite now. */ 44 for (j = 0; j < server.saveparamslen; j++) { 45 struct saveparam *sp = server.saveparams+j; 46 47 /** 48 * Save if we reached the given amount of changes, 49 * the given amount of seconds, and if the latest bgsave was 50 * successful or if, in case of an error, at least 51 * CONFIG_BGSAVE_RETRY_DELAY seconds already elapsed. 52 * 若是數據保存記錄 大於規定的修改次數 且距離 上一次保存的時間大於規定時間或者上次BGSAVE命令執行成功, 53 * 才執行 BGSAVE 操做 54 */ 55 if (server.dirty >= sp->changes && 56 server.unixtime-server.lastsave > sp->seconds && 57 (server.unixtime-server.lastbgsave_try > 58 CONFIG_BGSAVE_RETRY_DELAY || 59 server.lastbgsave_status == C_OK)) 60 { 61 //記錄日誌 62 serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...", 63 sp->changes, (int)sp->seconds); 64 rdbSaveInfo rsi, *rsiptr; 65 rsiptr = rdbPopulateSaveInfo(&rsi); 66 // 異步保存操做 67 rdbSaveBackground(server.rdb_filename,rsiptr); 68 break; 69 } 70 } 71 ··· 72 server.cronloops++; 73 return 1000/server.hz; 74 }
若是符合觸發 RDB 持久化的條件, serverCron
會調用 rdbSaveBackground
函數,也就是 bgsave 指令會觸發的函數。
執行 bgsave 指令時,Redis 會先觸發 bgsaveCommand
進行當前狀態檢查,而後纔會調用 rdbSaveBackground
,其中的邏輯以下圖所示。
rdbSaveBackground
函數中最主要的工做就是調用 fork
命令生成子流程,而後在子流程中執行 rdbSave
函數,也就是 save 指令最終會觸發的函數。
1 int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) { 2 pid_t childpid; 3 long long start; 4 5 6 // 檢查後臺是否正在執行 aof 或者 rdb 操做 7 if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR; 8 9 // 拿出 數據保存記錄,保存爲 上次記錄 10 server.dirty_before_bgsave = server.dirty; 11 // bgsave 時間 12 server.lastbgsave_try = time(NULL); 13 openChildInfoPipe(); 14 15 start = ustime(); 16 // fork 子進程 17 if ((childpid = fork()) == 0) { 18 int retval; 19 20 /* Child */ 21 /* 關閉子進程繼承的 socket 監聽 */ 22 closeListeningSockets(0); 23 // 子進程 title 修改 24 redisSetProcTitle("redis-rdb-bgsave"); 25 // 執行rdb 寫入操做 26 retval = rdbSave(filename,rsi); 27 // 執行完畢之後 28 if (retval == C_OK) { 29 size_t private_dirty = zmalloc_get_private_dirty(-1); 30 31 if (private_dirty) { 32 serverLog(LL_NOTICE, 33 "RDB: %zu MB of memory used by copy-on-write", 34 private_dirty/(1024*1024)); 35 } 36 37 server.child_info_data.cow_size = private_dirty; 38 sendChildInfo(CHILD_INFO_TYPE_RDB); 39 } 40 // 退出子進程 41 exitFromChild((retval == C_OK) ? 0 : 1); 42 } else { 43 /* Parent */ 44 /* 父進程,進行fork時間的統計和信息記錄,好比說rdb_save_time_start、rdb_child_pid、和rdb_child_type */ 45 server.stat_fork_time = ustime()-start; 46 server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */ 47 latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000); 48 if (childpid == -1) { 49 closeChildInfoPipe(); 50 server.lastbgsave_status = C_ERR; 51 serverLog(LL_WARNING,"Can't save in background: fork: %s", 52 strerror(errno)); 53 return C_ERR; 54 } 55 serverLog(LL_NOTICE,"Background saving started by pid %d",childpid); 56 // rdb 保存開始時間 bgsave 子進程 57 server.rdb_save_time_start = time(NULL); 58 server.rdb_child_pid = childpid; 59 server.rdb_child_type = RDB_CHILD_TYPE_DISK; 60 updateDictResizePolicy(); 61 return C_OK; 62 } 63 return C_OK; /* unreached */ 64 }
爲何 Redis 使用子進程而不是線程來進行後臺 RDB 持久化呢?主要是出於Redis性能的考慮,咱們知道Redis對客戶端響應請求的工做模型是單進程和單線程的,若是在主進程內啓動一個線程,這樣會形成對數據的競爭條件。因此爲了不使用鎖下降性能,Redis選擇啓動新的子進程,獨立擁有一份父進程的內存拷貝,以此爲基礎執行RDB持久化。
可是須要注意的是,fork 會消耗必定時間,而且父子進程所佔據的內存是相同的,當 Redis 鍵值較大時,fork 的時間會很長,這段時間內 Redis 是沒法響應其餘命令的。除此以外,Redis 佔據的內存空間會翻倍。
生成 RDB 文件,而且持久化到硬盤Redis 的 rdbSave
函數是真正進行 RDB 持久化的函數,它的大體流程以下:
首先打開一個臨時文件,
調用 rdbSaveRio
函數,將當前 Redis 的內存信息寫入到這個臨時文件中,
接着調用 fflush
、 fsync
和 fclose
接口將文件寫入磁盤中,
使用 rename
將臨時文件更名爲 正式的 RDB 文件,
最後記錄 dirty
和 lastsave
等狀態信息。這些狀態信息在 serverCron
時會使用到。
1 /** 2 * Save the DB on disk. Return C_ERR on error, C_OK on success. 3 * 生成 RDB 文件,而且持久化到硬盤 4 */ 5 int rdbSave(char *filename, rdbSaveInfo *rsi) { 6 char tmpfile[256]; 7 // 當前工做目錄 8 char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */ 9 FILE *fp; 10 rio rdb; 11 int error = 0; 12 13 /* 生成tmpfile文件名 temp-[pid].rdb */ 14 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid()); 15 /* 打開文件 */ 16 fp = fopen(tmpfile,"w"); 17 if (!fp) { 18 char *cwdp = getcwd(cwd,MAXPATHLEN); 19 serverLog(LL_WARNING, 20 "Failed opening the RDB file %s (in server root dir %s) " 21 "for saving: %s", 22 filename, 23 cwdp ? cwdp : "unknown", 24 strerror(errno)); 25 return C_ERR; 26 } 27 28 /* 初始化rio結構 */ 29 rioInitWithFile(&rdb,fp); 30 31 if (server.rdb_save_incremental_fsync) 32 rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES); 33 34 if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) { 35 errno = error; 36 goto werr; 37 } 38 39 /* Make sure data will not remain on the OS's output buffers */ 40 if (fflush(fp) == EOF) goto werr; 41 if (fsync(fileno(fp)) == -1) goto werr; 42 if (fclose(fp) == EOF) goto werr; 43 44 /* Use RENAME to make sure the DB file is changed atomically only 45 * if the generate DB file is ok. */ 46 /* 從新命名 rdb 文件,把以前臨時的名稱修改成正式的 rdb 文件名稱 */ 47 if (rename(tmpfile,filename) == -1) { 48 // 異常處理 49 char *cwdp = getcwd(cwd,MAXPATHLEN); 50 serverLog(LL_WARNING, 51 "Error moving temp DB file %s on the final " 52 "destination %s (in server root dir %s): %s", 53 tmpfile, 54 filename, 55 cwdp ? cwdp : "unknown", 56 strerror(errno)); 57 unlink(tmpfile); 58 return C_ERR; 59 } 60 61 // 寫入完成,打印日誌 62 serverLog(LL_NOTICE,"DB saved on disk"); 63 // 清理數據保存記錄 64 server.dirty = 0; 65 // 最後一次完成 SAVE 命令的時間 66 server.lastsave = time(NULL); 67 // 最後一次 bgsave 的狀態置位 成功 68 server.lastbgsave_status = C_OK; 69 return C_OK; 70 71 werr: 72 serverLog(LL_WARNING,"Write error saving DB on disk: %s", strerror(errno)); 73 fclose(fp); 74 unlink(tmpfile); 75 return C_ERR; 76 }
這裏要簡單說一下 fflush
和 fsync
的區別。它們倆都是用於刷緩存,可是所屬的層次不一樣。fflush
函數用於 FILE*
指針上,將緩存數據從應用層緩存刷新到內核中,而 fsync
函數則更加底層,做用於文件描述符,用於將內核緩存刷新到物理設備上。
rdbSaveRio
會將 Redis 內存中的數據以相對緊湊的格式寫入到文件中,其文件格式的示意圖以下所示。
rdbSaveRio
函數的寫入大體流程以下:
先寫入 REDIS 魔法值,而後是 RDB 文件的版本( rdb_version ),額外輔助信息 ( aux )。輔助信息中包含了 Redis 的版本,內存佔用和複製庫( repl-id )和偏移量( repl-offset )等。
而後 rdbSaveRio
會遍歷當前 Redis 的全部數據庫,將數據庫的信息依次寫入。先寫入 RDB_OPCODE_SELECTDB
識別碼和數據庫編號,接着寫入 RDB_OPCODE_RESIZEDB
識別碼和數據庫鍵值數量和待失效鍵值數量,最後會遍歷全部的鍵值,依次寫入。
在寫入鍵值時,當該鍵值有失效時間時,會先寫入 RDB_OPCODE_EXPIRETIME_MS
識別碼和失效時間,而後寫入鍵值類型的識別碼,最後再寫入鍵和值。
寫完數據庫信息後,還會把 Lua 相關的信息寫入,最後再寫入 RDB_OPCODE_EOF
結束符識別碼和校驗值。
1 /* Produces a dump of the database in RDB format sending it to the specified 2 * Redis I/O channel. On success C_OK is returned, otherwise C_ERR 3 * is returned and part of the output, or all the output, can be 4 * missing because of I/O errors. 5 * 6 * When the function returns C_ERR and if 'error' is not NULL, the 7 * integer pointed by 'error' is set to the value of errno just after the I/O 8 * error. */ 9 int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) { 10 dictIterator *di = NULL; 11 dictEntry *de; 12 char magic[10]; 13 int j; 14 uint64_t cksum; 15 size_t processed = 0; 16 17 if (server.rdb_checksum) 18 rdb->update_cksum = rioGenericUpdateChecksum; 19 snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION); 20 /* 1 寫入 magic字符'REDIS' 和 RDB 版本 */ 21 if (rdbWriteRaw(rdb,magic,9) == -1) goto werr; 22 /** 23 * 2 寫入輔助信息 REDIS版本,服務器操做系統位數,當前時間, 24 * 複製信息好比repl-stream-db,repl-id和repl-offset等等數據 25 */ 26 if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr; 27 28 /* 3 遍歷每個數據庫,逐個數據庫數據保存 */ 29 for (j = 0; j < server.dbnum; j++) { 30 /* 獲取數據庫指針地址和數據庫字典 */ 31 redisDb *db = server.db+j; 32 dict *d = db->dict; 33 if (dictSize(d) == 0) continue; 34 di = dictGetSafeIterator(d); 35 36 /* Write the SELECT DB opcode */ 37 /* 3.1 寫入數據庫部分的開始標識 */ 38 if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr; 39 /* 3.2 寫入當前數據庫號 */ 40 if (rdbSaveLen(rdb,j) == -1) goto werr; 41 42 /* Write the RESIZE DB opcode. We trim the size to UINT32_MAX, which 43 * is currently the largest type we are able to represent in RDB sizes. 44 * However this does not limit the actual size of the DB to load since 45 * these sizes are just hints to resize the hash tables. */ 46 uint64_t db_size, expires_size; 47 /* 獲取數據庫字典大小和過時鍵字典大小,此處代碼邏輯有簡化 */ 48 db_size = dictSize(db->dict); 49 expires_size = dictSize(db->expires); 50 /* 3.3 寫入當前待寫入數據的類型,此處爲 RDB_OPCODE_RESIZEDB,表示數據庫大小 */ 51 if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr; 52 /* 3.4 寫入獲取數據庫字典大小和過時鍵字典大小 */ 53 if (rdbSaveLen(rdb,db_size) == -1) goto werr; 54 if (rdbSaveLen(rdb,expires_size) == -1) goto werr; 55 56 /* Iterate this DB writing every entry */ 57 /* 4 遍歷當前數據庫的鍵值對 */ 58 while((de = dictNext(di)) != NULL) { 59 sds keystr = dictGetKey(de); 60 robj key, *o = dictGetVal(de); 61 long long expire; 62 63 /* 初始化 key,由於操做的是 key 字符串對象,而不是直接操做 鍵的字符串內容 */ 64 initStaticStringObject(key,keystr); 65 /* 獲取鍵的過時數據 */ 66 expire = getExpire(db,&key); 67 /* 4.1 保存鍵值對數據 */ 68 if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr; 69 70 /* When this RDB is produced as part of an AOF rewrite, move 71 * accumulated diff from parent to child while rewriting in 72 * order to have a smaller final write. */ 73 if (flags & RDB_SAVE_AOF_PREAMBLE && 74 rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) 75 { 76 processed = rdb->processed_bytes; 77 aofReadDiffFromParent(); 78 } 79 } 80 dictReleaseIterator(di); 81 di = NULL; /* So that we don't release it again on error. */ 82 } 83 84 /* If we are storing the replication information on disk, persist 85 * the script cache as well: on successful PSYNC after a restart, we need 86 * to be able to process any EVALSHA inside the replication backlog the 87 * master will send us. */ 88 /* 5 保存 Lua 腳本*/ 89 if (rsi && dictSize(server.lua_scripts)) { 90 di = dictGetIterator(server.lua_scripts); 91 while((de = dictNext(di)) != NULL) { 92 robj *body = dictGetVal(de); 93 if (rdbSaveAuxField(rdb,"lua",3,body->ptr,sdslen(body->ptr)) == -1) 94 goto werr; 95 } 96 dictReleaseIterator(di); 97 di = NULL; /* So that we don't release it again on error. */ 98 } 99 100 /* EOF opcode */ 101 /* 6 寫入結束符 */ 102 if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr; 103 104 /* CRC64 checksum. It will be zero if checksum computation is disabled, the 105 * loading code skips the check in this case. */ 106 /* 7 寫入CRC64校驗和 */ 107 cksum = rdb->cksum; 108 memrev64ifbe(&cksum); 109 if (rioWrite(rdb,&cksum,8) == 0) goto werr; 110 return C_OK; 111 112 werr: 113 if (error) *error = errno; 114 if (di) dictReleaseIterator(di); 115 return C_ERR; 116 }
rdbSaveRio
在寫鍵值時,會調用 rdbSaveKeyValuePair
函數。該函數會依次寫入鍵值的過時時間,鍵的類型,鍵和值。
1 /** 2 * Save a key-value pair, with expire time, type, key, value. 3 * On error -1 is returned. 4 * On success if the key was actually saved 1 is returned, otherwise 0 5 * is returned (the key was already expired). 6 * 該函數會依次寫入鍵值的過時時間,鍵的類型,鍵和值。 7 */ 8 int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) { 9 int savelru = server.maxmemory_policy & MAXMEMORY_FLAG_LRU; 10 int savelfu = server.maxmemory_policy & MAXMEMORY_FLAG_LFU; 11 12 /* Save the expire time */ 13 /* 若是有過時信息 */ 14 if (expiretime != -1) { 15 /* 保存過時信息標識 */ 16 if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1; 17 /* 保存過時具體數據內容 */ 18 if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1; 19 } 20 21 /* Save the LRU info. */ 22 if (savelru) { 23 uint64_t idletime = estimateObjectIdleTime(val); 24 idletime /= 1000; /* Using seconds is enough and requires less space.*/ 25 if (rdbSaveType(rdb,RDB_OPCODE_IDLE) == -1) return -1; 26 if (rdbSaveLen(rdb,idletime) == -1) return -1; 27 } 28 29 /* Save the LFU info. */ 30 if (savelfu) { 31 uint8_t buf[1]; 32 buf[0] = LFUDecrAndReturn(val); 33 /* We can encode this in exactly two bytes: the opcode and an 8 34 * bit counter, since the frequency is logarithmic with a 0-255 range. 35 * Note that we do not store the halving time because to reset it 36 * a single time when loading does not affect the frequency much. */ 37 if (rdbSaveType(rdb,RDB_OPCODE_FREQ) == -1) return -1; 38 if (rdbWriteRaw(rdb,buf,1) == -1) return -1; 39 } 40 41 /* Save type, key, value */ 42 /* 保存鍵值對 類型的標識 */ 43 if (rdbSaveObjectType(rdb,val) == -1) return -1; 44 /* 保存鍵值對 鍵的內容 */ 45 if (rdbSaveStringObject(rdb,key) == -1) return -1; 46 /* 保存鍵值對 值的內容 */ 47 if (rdbSaveObject(rdb,val) == -1) return -1; 48 return 1; 49 }
根據鍵的不一樣類型寫入不一樣格式,各類鍵值的類型和格式以下所示。
Redis 有龐大的對象和數據結構體系,它使用六種底層數據結構構建了包含字符串對象、列表對象、哈希對象、集合對象和有序集合對象的對象系統。
不一樣的數據結構進行 RDB 持久化的格式都不一樣。咱們今天只看一下集合對象是如何持久化的。
1 /** 2 * Save a Redis object. 3 * Returns -1 on error, number of bytes written on success. 4 * 保存Redis對象 5 */ 6 ssize_t rdbSaveObject(rio *rdb, robj *o) { 7 ssize_t n = 0, nwritten = 0; 8 9 if (o->type == OBJ_STRING) { 10 /* Save a string value */ 11 if ((n = rdbSaveStringObject(rdb,o)) == -1) return -1; 12 nwritten += n; 13 } else if (o->type == OBJ_LIST) { 14 /* Save a list value */ 15 if (o->encoding == OBJ_ENCODING_QUICKLIST) { 16 quicklist *ql = o->ptr; 17 quicklistNode *node = ql->head; 18 19 if ((n = rdbSaveLen(rdb,ql->len)) == -1) return -1; 20 nwritten += n; 21 22 while(node) { 23 if (quicklistNodeIsCompressed(node)) { 24 void *data; 25 size_t compress_len = quicklistGetLzf(node, &data); 26 if ((n = rdbSaveLzfBlob(rdb,data,compress_len,node->sz)) == -1) return -1; 27 nwritten += n; 28 } else { 29 if ((n = rdbSaveRawString(rdb,node->zl,node->sz)) == -1) return -1; 30 nwritten += n; 31 } 32 node = node->next; 33 } 34 } else { 35 serverPanic("Unknown list encoding"); 36 } 37 } else if (o->type == OBJ_SET) { 38 /* Save a set value */ 39 if (o->encoding == OBJ_ENCODING_HT) { 40 dict *set = o->ptr; 41 // 集合迭代器 42 dictIterator *di = dictGetIterator(set); 43 dictEntry *de; 44 45 // 寫入集合長度 46 if ((n = rdbSaveLen(rdb,dictSize(set))) == -1) { 47 dictReleaseIterator(di); 48 return -1; 49 } 50 nwritten += n; 51 52 // 遍歷集合元素 53 while((de = dictNext(di)) != NULL) { 54 sds ele = dictGetKey(de); 55 // 以字符串的形式寫入,由於是SET 因此只寫入 Key 便可 56 if ((n = rdbSaveRawString(rdb,(unsigned char*)ele,sdslen(ele))) 57 == -1) 58 { 59 dictReleaseIterator(di); 60 return -1; 61 } 62 nwritten += n; 63 } 64 dictReleaseIterator(di); 65 } else if (o->encoding == OBJ_ENCODING_INTSET) { 66 size_t l = intsetBlobLen((intset*)o->ptr); 67 68 if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1; 69 nwritten += n; 70 } else { 71 serverPanic("Unknown set encoding"); 72 } 73 } else if (o->type == OBJ_ZSET) { 74 /* Save a sorted set value */ 75 if (o->encoding == OBJ_ENCODING_ZIPLIST) { 76 size_t l = ziplistBlobLen((unsigned char*)o->ptr); 77 78 if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1; 79 nwritten += n; 80 } else if (o->encoding == OBJ_ENCODING_SKIPLIST) { 81 zset *zs = o->ptr; 82 zskiplist *zsl = zs->zsl; 83 84 if ((n = rdbSaveLen(rdb,zsl->length)) == -1) return -1; 85 nwritten += n; 86 87 /* We save the skiplist elements from the greatest to the smallest 88 * (that's trivial since the elements are already ordered in the 89 * skiplist): this improves the load process, since the next loaded 90 * element will always be the smaller, so adding to the skiplist 91 * will always immediately stop at the head, making the insertion 92 * O(1) instead of O(log(N)). */ 93 zskiplistNode *zn = zsl->tail; 94 while (zn != NULL) { 95 if ((n = rdbSaveRawString(rdb, 96 (unsigned char*)zn->ele,sdslen(zn->ele))) == -1) 97 { 98 return -1; 99 } 100 nwritten += n; 101 if ((n = rdbSaveBinaryDoubleValue(rdb,zn->score)) == -1) 102 return -1; 103 nwritten += n; 104 zn = zn->backward; 105 } 106 } else { 107 serverPanic("Unknown sorted set encoding"); 108 } 109 } else if (o->type == OBJ_HASH) { 110 /* Save a hash value */ 111 if (o->encoding == OBJ_ENCODING_ZIPLIST) { 112 size_t l = ziplistBlobLen((unsigned char*)o->ptr); 113 114 if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1; 115 nwritten += n; 116 117 } else if (o->encoding == OBJ_ENCODING_HT) { 118 dictIterator *di = dictGetIterator(o->ptr); 119 dictEntry *de; 120 121 if ((n = rdbSaveLen(rdb,dictSize((dict*)o->ptr))) == -1) { 122 dictReleaseIterator(di); 123 return -1; 124 } 125 nwritten += n; 126 127 while((de = dictNext(di)) != NULL) { 128 sds field = dictGetKey(de); 129 sds value = dictGetVal(de); 130 131 if ((n = rdbSaveRawString(rdb,(unsigned char*)field, 132 sdslen(field))) == -1) 133 { 134 dictReleaseIterator(di); 135 return -1; 136 } 137 nwritten += n; 138 if ((n = rdbSaveRawString(rdb,(unsigned char*)value, 139 sdslen(value))) == -1) 140 { 141 dictReleaseIterator(di); 142 return -1; 143 } 144 nwritten += n; 145 } 146 dictReleaseIterator(di); 147 } else { 148 serverPanic("Unknown hash encoding"); 149 } 150 } else if (o->type == OBJ_STREAM) { 151 /* Store how many listpacks we have inside the radix tree. */ 152 stream *s = o->ptr; 153 rax *rax = s->rax; 154 if ((n = rdbSaveLen(rdb,raxSize(rax))) == -1) return -1; 155 nwritten += n; 156 157 /* Serialize all the listpacks inside the radix tree as they are, 158 * when loading back, we'll use the first entry of each listpack 159 * to insert it back into the radix tree. */ 160 raxIterator ri; 161 raxStart(&ri,rax); 162 raxSeek(&ri,"^",NULL,0); 163 while (raxNext(&ri)) { 164 unsigned char *lp = ri.data; 165 size_t lp_bytes = lpBytes(lp); 166 if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1) return -1; 167 nwritten += n; 168 if ((n = rdbSaveRawString(rdb,lp,lp_bytes)) == -1) return -1; 169 nwritten += n; 170 } 171 raxStop(&ri); 172 173 /* Save the number of elements inside the stream. We cannot obtain 174 * this easily later, since our macro nodes should be checked for 175 * number of items: not a great CPU / space tradeoff. */ 176 if ((n = rdbSaveLen(rdb,s->length)) == -1) return -1; 177 nwritten += n; 178 /* Save the last entry ID. */ 179 if ((n = rdbSaveLen(rdb,s->last_id.ms)) == -1) return -1; 180 nwritten += n; 181 if ((n = rdbSaveLen(rdb,s->last_id.seq)) == -1) return -1; 182 nwritten += n; 183 184 /* The consumer groups and their clients are part of the stream 185 * type, so serialize every consumer group. */ 186 187 /* Save the number of groups. */ 188 size_t num_cgroups = s->cgroups ? raxSize(s->cgroups) : 0; 189 if ((n = rdbSaveLen(rdb,num_cgroups)) == -1) return -1; 190 nwritten += n; 191 192 if (num_cgroups) { 193 /* Serialize each consumer group. */ 194 raxStart(&ri,s->cgroups); 195 raxSeek(&ri,"^",NULL,0); 196 while(raxNext(&ri)) { 197 streamCG *cg = ri.data; 198 199 /* Save the group name. */ 200 if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1) 201 return -1; 202 nwritten += n; 203 204 /* Last ID. */ 205 if ((n = rdbSaveLen(rdb,cg->last_id.ms)) == -1) return -1; 206 nwritten += n; 207 if ((n = rdbSaveLen(rdb,cg->last_id.seq)) == -1) return -1; 208 nwritten += n; 209 210 /* Save the global PEL. */ 211 if ((n = rdbSaveStreamPEL(rdb,cg->pel,1)) == -1) return -1; 212 nwritten += n; 213 214 /* Save the consumers of this group. */ 215 if ((n = rdbSaveStreamConsumers(rdb,cg)) == -1) return -1; 216 nwritten += n; 217 } 218 raxStop(&ri); 219 } 220 } else if (o->type == OBJ_MODULE) { 221 /* Save a module-specific value. */ 222 RedisModuleIO io; 223 moduleValue *mv = o->ptr; 224 moduleType *mt = mv->type; 225 moduleInitIOContext(io,mt,rdb); 226 227 /* Write the "module" identifier as prefix, so that we'll be able 228 * to call the right module during loading. */ 229 int retval = rdbSaveLen(rdb,mt->id); 230 if (retval == -1) return -1; 231 io.bytes += retval; 232 233 /* Then write the module-specific representation + EOF marker. */ 234 mt->rdb_save(&io,mv->value); 235 retval = rdbSaveLen(rdb,RDB_MODULE_OPCODE_EOF); 236 if (retval == -1) return -1; 237 io.bytes += retval; 238 239 if (io.ctx) { 240 moduleFreeContext(io.ctx); 241 zfree(io.ctx); 242 } 243 return io.error ? -1 : (ssize_t)io.bytes; 244 } else { 245 serverPanic("Unknown object type"); 246 } 247 return nwritten; 248 }