本文是使用 golang 實現 redis 系列的第四篇文章,將介紹如何使用 golang 實現 Append Only File 持久化及 AOF 文件重寫。html
本文完整源代碼在做者GithubHDT3213/godisgit
AOF 持久化是典型的異步任務,主協程(goroutine) 可使用 channel 將數據發送到異步協程由異步協程執行持久化操做。github
在 DB 中定義相關字段:golang
type DB struct { // 主線程使用此channel將要持久化的命令發送到異步協程 aofChan chan *reply.MultiBulkReply // append file 文件描述符 aofFile *os.File // append file 路徑 aofFilename string // aof 重寫須要的緩衝區,將在AOF重寫一節詳細介紹 aofRewriteChan chan *reply.MultiBulkReply // 在必要的時候使用此字段暫停持久化操做 pausingAof sync.RWMutex }
在進行持久化時須要注意兩個細節:redis
expire a 3600
表示鍵 a 在 11:00 過時,在 10:30 載入AOF文件時執行 expire a 3600
就成了 11:30 過時與原數據不符。咱們在命令處理方法中返回 AOF 須要的額外信息:sql
type extra struct { // 表示該命令是否須要持久化 toPersist bool // 如上文所述 expire 之類的命令不能直接持久化 // 若 specialAof == nil 則將命令原樣持久化,不然持久化 specialAof 中的指令 specialAof []*reply.MultiBulkReply } type CmdFunc func(db *DB, args [][]byte) (redis.Reply, *extra)
以 SET 命令爲例:數據庫
func Set(db *DB, args [][]byte) (redis.Reply, *extra) { //.... var result int switch policy { case upsertPolicy: result = db.Put(key, entity) case insertPolicy: result = db.PutIfAbsent(key, entity) case updatePolicy: result = db.PutIfExists(key, entity) } extra := &extra{toPersist: result > 0} // 若實際寫入了數據則toPresist=true, 若由於XX或NX選項沒有實際寫入數據則toPresist=false if result > 0 { if ttl != unlimitedTTL { // 使用了 EX 或 NX 選項 expireTime := time.Now().Add(time.Duration(ttl) * time.Millisecond) db.Expire(key, expireTime) // 持久化時使用 set key value 和 pexpireat 命令代替 set key value EX ttl 命令 extra.specialAof = []*reply.MultiBulkReply{ reply.MakeMultiBulkReply([][]byte{ []byte("SET"), args[0], args[1], }), makeExpireCmd(key, expireTime), } } else { db.Persist(key) // override ttl } } return &reply.OkReply{}, extra } var pExpireAtCmd = []byte("PEXPIREAT") func makeExpireCmd(key string, expireAt time.Time) *reply.MultiBulkReply { args := make([][]byte, 3) args[0] = pExpireAtCmd args[1] = []byte(key) args[2] = []byte(strconv.FormatInt(expireAt.UnixNano()/1e6, 10)) return reply.MakeMultiBulkReply(args) }
在處理命令的調度方法中將 aof 命令發送到 channel:緩存
func (db *DB) Exec(c redis.Client, args [][]byte) (result redis.Reply) { // .... // normal commands var extra *extra cmdFunc, ok := router[cmd] // 找到命令對應的處理函數 if !ok { return reply.MakeErrReply("ERR unknown command '" + cmd + "'") } // 使用處理函數執行命令 if len(args) > 1 { result, extra = cmdFunc(db, args[1:]) } else { result, extra = cmdFunc(db, [][]byte{}) } // AOF 持久化 if config.Properties.AppendOnly { if extra != nil && extra.toPersist { // 寫入 specialAof if extra.specialAof != nil && len(extra.specialAof) > 0 { for _, r := range extra.specialAof { db.addAof(r) } } else { // 寫入原始命令 r := reply.MakeMultiBulkReply(args) db.addAof(r) } } } return }
在異步協程中寫入命令:安全
func (db *DB) handleAof() { for cmd := range db.aofChan { // 異步協程在持久化以前會嘗試獲取鎖,若其餘協程持有鎖則會暫停持久化操做 // 鎖也保證了每次寫入完整的一條指令不會格式錯誤 db.pausingAof.RLock() if db.aofRewriteChan != nil { db.aofRewriteChan <- cmd } _, err := db.aofFile.Write(cmd.ToBytes()) if err != nil { logger.Warn(err) } db.pausingAof.RUnlock() } }
讀取過程與協議解析器一節基本相同,不在正文中贅述:loadAof。app
若咱們對鍵a賦值100次會在AOF文件中產生100條指令但只有最後一條指令是有效的,爲了減小持久化文件的大小須要進行AOF重寫以刪除無用的指令。
重寫必須在固定不變的數據集上進行,不能直接使用內存中的數據。Redis 重寫的實現方式是進行 fork 並在子進程中遍歷數據庫內的數據從新生成AOF文件。因爲 golang 不支持 fork 操做,咱們只能採用讀取AOF文件生成副本的方式來代替fork。
在進行AOF重寫操做時須要知足兩個要求:
所以咱們設計了一套比較複雜的流程:
在不阻塞在線服務的同時進行其它操做是一項必需的能力,AOF重寫的思路在解決這類問題時具備重要的參考價值。好比Mysql Online DDL: gh-ost採用了相似的策略保證數據一致。
首先準備開始重寫操做:
func (db *DB) startRewrite() (*os.File, error) { // 暫停AOF寫入, 數據會在 db.aofChan 中暫時堆積 db.pausingAof.Lock() defer db.pausingAof.Unlock() // 建立重寫緩衝區 db.aofRewriteChan = make(chan *reply.MultiBulkReply, aofQueueSize) // 建立臨時文件 file, err := ioutil.TempFile("", "aof") if err != nil { logger.Warn("tmp file create failed") return nil, err } return file, nil }
在重寫過程當中,持久化協程進行雙寫:
func (db *DB) handleAof() { for cmd := range db.aofChan { db.pausingAof.RLock() if db.aofRewriteChan != nil { // 數據寫入重寫緩衝區 db.aofRewriteChan <- cmd } _, err := db.aofFile.Write(cmd.ToBytes()) if err != nil { logger.Warn(err) } db.pausingAof.RUnlock() } }
執行重寫:
func (db *DB) aofRewrite() { file, err := db.startRewrite() if err != nil { logger.Warn(err) return } // load aof file tmpDB := &DB{ Data: dict.MakeSimple(), TTLMap: dict.MakeSimple(), Locker: lock.Make(lockerSize), interval: 5 * time.Second, aofFilename: db.aofFilename, } tmpDB.loadAof() // rewrite aof file tmpDB.Data.ForEach(func(key string, raw interface{}) bool { var cmd *reply.MultiBulkReply entity, _ := raw.(*DataEntity) switch val := entity.Data.(type) { case []byte: cmd = persistString(key, val) case *List.LinkedList: cmd = persistList(key, val) case *set.Set: cmd = persistSet(key, val) case dict.Dict: cmd = persistHash(key, val) case *SortedSet.SortedSet: cmd = persistZSet(key, val) } if cmd != nil { _, _ = file.Write(cmd.ToBytes()) } return true }) tmpDB.TTLMap.ForEach(func(key string, raw interface{}) bool { expireTime, _ := raw.(time.Time) cmd := makeExpireCmd(key, expireTime) if cmd != nil { _, _ = file.Write(cmd.ToBytes()) } return true }) db.finishRewrite(file) }
重寫完畢後寫入緩衝區中的數據並替換正式文件:
func (db *DB) finishRewrite(tmpFile *os.File) { // 暫停AOF寫入 db.pausingAof.Lock() defer db.pausingAof.Unlock() // 將重寫緩衝區內的數據寫入臨時文件 // 由於handleAof已被暫停,在遍歷期間aofRewriteChan中不會有新數據 loop: for { select { case cmd := <-db.aofRewriteChan: _, err := tmpFile.Write(cmd.ToBytes()) if err != nil { logger.Warn(err) } default: // 只有 channel 爲空時纔會進入此分支 break loop } } // 釋放重寫緩衝區 close(db.aofRewriteChan) db.aofRewriteChan = nil // 使用臨時文件代替aof文件 _ = db.aofFile.Close() _ = os.Rename(tmpFile.Name(), db.aofFilename) // 從新打開文件描述符以保證正常寫入 aofFile, err := os.OpenFile(db.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600) if err != nil { panic(err) } db.aofFile = aofFile }