上篇文章介紹了用於將 binlog 同步到 MySQL / TiDB 的 Loader package,本文往回退一步,介紹 Drainer 同步到不一樣下游的機制。
TiDB Binlog(github.com/pingcap/tidb-binlog)用於收集 TiDB 的 binlog,並準實時同步給下游。 同步數據這一步重要操做由 Drainer 模塊支持,它能夠將 binlog 同步到 TiDB / MySQL / Kafka / File (增量備份)等下游組件。mysql
本文將按如下幾個小節介紹 Drainer 如何將收到的 binlog 同步到下游:git
Sync
模塊調度整個同步過程,全部的下游相關的同步邏輯統一封裝成了 Syncer
接口。同步機制的核心是 Syncer
接口,定義以下:github
// Syncer sync binlog item to downstream type Syncer interface { // Sync the binlog item to downstream Sync(item *Item) error // will be close if Close normally or meet error, call Error() to check it Successes() <-chan *Item // Return not nil if fail to sync data to downstream or nil if closed normally Error() <-chan error // Close the Syncer, no more item can be added by `Sync` Close() error }
其中 Sync
方法表示異步地向下遊同步一個 binlog,對應的參數類型是 *Item,這是一個封裝了 binlog 的結構體;Successes
方法返回一個 channel,從中能夠讀取已經成功同步到下游的 Item;Error
方法返回一個 channel,當 Syncer
同步過程出錯中斷時,會往這個 channel 發送遇到的錯誤;Close
用於關掉 Syncer
,釋放資源。golang
支持的每一個下游類型在 drainer/sync 目錄下都有一個對應的 Syncer 實現,例如 MySQL 對應的是 mysql.go
裏的 MySQLSyncer,Kafka 對應的是 kafka.go
裏的 KafkaSyncer。Drainer 啓動時,會根據配置文件中指定的下游,找到對應的 Syncer 實現,而後就能夠用統一的接口管理整個同步過程了。sql
同步進程可能由於各類緣由退出,重啓後要恢復同步就須要知道上次同步的進度。在 Drainer 裏記錄同步進度的功能抽象成 Checkpoint
接口,其定義以下:數據庫
type CheckPoint interface { // Load loads checkpoint information. Load() error // Save saves checkpoint information. Save(int64) error // Pos gets position information. TS() int64 // Close closes the CheckPoint and release resources, after closed other methods should not be called again. Close() error }
從以上定義中能夠看到,Save
的參數和 TS 的返回結果都是 int64 類型,由於同步的進度是以 TiDB 中單調遞增的 commit timestamp 來記錄的,它的類型就是 int64。緩存
Drainer 支持不一樣類型的 Checkpoint 實現,例如 mysql.go
裏的 MySQLCheckpoint
,默認將 commit timestamp 寫到 tidb_binlog 庫下的 checkpoint 表。Drainer 會根據下游類型自動選擇不一樣的 Checkpoint 實現,例如 TiDB / MySQL 的下游就會使用 MySQLCheckPoint,File(增量備份) 則使用 PbCheckpoint。異步
在 Syncer 小節,咱們看到 Syncer 的 Successes
方法提供了一個 channel 用來接收已經處理完畢的 binlog,收到 binlog 後,咱們用 Checkpoint 的 Save
方法保存 binlog 的 commit timestamp 就能夠記下同步進度,細節可查看源碼中的 handleSuccess 方法。函數
Syncer 在收到 binlog 後須要將裏面記錄的變動轉換成適合下游 Syncer 類型的格式,這部分實如今 drainer/translator 包。工具
如下游是 MySQL / TiDB 的狀況爲例。MySQLSyncer.Sync
會先調用 TiBinlogToTxn
將 binlog 轉換成 loader.Txn 以便接入下層的 loader
模塊 (loader 接收一個個 loader.Txn 結構並還原成對應的 SQL 批量寫入 MySQL / TiDB)。
loader.Txn
定義以下:
// Txn holds transaction info, an DDL or DML sequences type Txn struct { DMLs []*DML DDL *DDL // This field is used to hold arbitrary data you wish to include so it // will be available when receiving on the Successes channel Metadata interface{} }
Txn 主要有兩類:DDL 和 DML。Metadata
目前放的就是傳給 Sync
的 *Item 對象。DDL 的狀況比較簡單,由於 binlog 中已經直接包含了咱們要用到的 DDL Query。DML 則須要遍歷 binlog 中的一個個行變動,根據它的類型 insert / update / delete 還原成相應的 loader.DML
。
上個小節中,咱們提到了對行變動數據的解析,在 binlog 中編碼的行變動是沒有列信息的,咱們須要查到對應版本的列信息才能還原出 SQL 語義。Schema 就是解決這個問題的模塊。
在 Drainer 啓動時,會調用 loadHistoryDDLJobs 從 TiKV 處查詢截至當前時間全部已完成的 DDL Job 記錄,按 SchemaVersion
升序排序(能夠粗略認爲這是一個單調遞增地賦給每一個 DDL 任務的版本號)。這些記錄在 Syncer 中會用於建立一個 Schema 對象。在運行過程當中,Drainer 每遇到一條 DDL 也會添加到 Schema 中。
binlog 中帶有一個 SchemaVersion
信息,記錄這條 binlog 生成的時刻 Schema 版本。在同步 Binlog 前,咱們會先用這個 SchemaVersion
信息調用 Schema 的一個方法 handlePreviousDDLJobIfNeed。上一段中咱們看到 Schema 從何處收集到有序的 DDL Job 記錄,這個方法則是按順序應用 SchemaVersion
小於等於指定版本的 DDL Job,在 Schema 中維護每一個表對應版本的最新結構信息,去掉一些錯誤代碼後實現大體以下:
func (s *Schema) handlePreviousDDLJobIfNeed(version int64) error { var i int for i = 0; i < len(s.jobs); i++ { if s.jobs[i].BinlogInfo.SchemaVersion <= version { _, _, _, err := s.handleDDL(s.jobs[i]) if err != nil { return errors.Annotatef(err, "handle ddl job %v failed, the schema info: %s", s.jobs[i], s) } } else { break } } s.jobs = s.jobs[i:] return nil }
對於每一個符合條件的 Job,由 handleDDL
方法將其表結構 TableInfo 等信息更新到 Schema
中,其餘模塊就能夠查詢到表格當前最新的信息。
咱們知道 Drainer 除了能夠將 binlog 直接還原到下游數據庫之外,還支持同步到其餘外部存儲系統塊,因此咱們也提供了相應的工具來處理存儲下來的文件,Reparo
是其中之一,用於讀取存儲在文件系統中的 binlog 文件,寫入 TiDB 中。本節簡單介紹下 Reparo 的用途與實現,讀者能夠做爲示例瞭解如何處理同步到文件系統的 binlog 增量備份。
Reparo 能夠讀取同步到文件系統上的 binlog 增量備份並同步到 TiDB。
當下遊設置成 File(增量備份) 時,Drainer 會將 Protobuf 編碼的 binlog 保存到指定目錄,每寫滿 512 MB 新建一個文件。每一個文件有個編號,從 0 開始依次類推。文件名格式定義以下:
// BinlogName creates a binlog file name. The file name format is like binlog-0000000000000001-20181010101010 func BinlogName(index uint64) string { currentTime := time.Now() return binlogNameWithDateTime(index, currentTime) } // binlogNameWithDateTime creates a binlog file name. func binlogNameWithDateTime(index uint64, datetime time.Time) string { return fmt.Sprintf("binlog-%016d-%s", index, datetime.Format(datetimeFormat)) }
文件的前綴都是 「binlog-」,後面跟一個 16 位右對齊的編號和一個時間戳。將目錄裏的文件按字母順序排序就能夠獲得按編號排序的 binlog 文件名。從指定目錄獲取文件列表的實現以下:
// ReadDir reads and returns all file and dir names from directory func ReadDir(dirpath string) ([]string, error) { dir, err := os.Open(dirpath) if err != nil { return nil, errors.Trace(err) } defer dir.Close() names, err := dir.Readdirnames(-1) if err != nil { return nil, errors.Annotatef(err, "dir %s", dirpath) } sort.Strings(names) return names, nil }
這個函數簡單地獲取目錄裏所有文件名,排序後返回。在上層還作了一些過濾來去掉臨時文件等。獲得文件列表後,Reparo
會用標準庫的 bufio.NewReader 逐個打開文件,而後用 Decode
函數讀出其中的一條條 binlog:
func Decode(r io.Reader) (*pb.Binlog, int64, error) { payload, length, err := binlogfile.Decode(r) if err != nil { return nil, 0, errors.Trace(err) } binlog := &pb.Binlog{} err = binlog.Unmarshal(payload) if err != nil { return nil, 0, errors.Trace(err) } return binlog, length, nil }
這裏先調用了 binlogfile.Decode
從文件中解析出對應 Protobuf 編碼的一段二進制數據而後解碼出 binlog。
獲得 binlog 後就能夠準備寫入 TiDB。Reparo 這部分實現像一個簡化版的 Drainer 的 Sync
模塊,一樣有一個 Syncer 接口以及幾個具體實現(除了 mysqlSyncer
還有用於調試的 printSyncer
和 memSyncer
),因此就再也不介紹。值得一提的是,這裏也跟前面不少 MySQL / TiDB 同步相關的模塊同樣使用了 loader 模塊。
本文介紹了 Drainer 是如何實現數據同步的以及 Reparo 如何從文件系統中恢復增量備份數據到 MySQL / TiDB。在 Drainer 中,Syncer 封裝了同步到各個下游模塊的具體細節,Checkpoint 記錄同步進度,Translator 從 binlog 中還原出具體的變動,Schema 在內存中維護每一個表對應的表結構定義。
TiDB Binlog 源碼閱讀系列在此就所有完結了,相信你們經過本系列文章更全面地理解了 TiDB Binlog 的原理和實現細節。咱們將繼續打磨優化,歡迎你們給咱們反饋使用過程當中遇到的問題或建議;若是社區小夥伴們想參與 TiDB Binlog 的設計、開發和測試,也歡迎與咱們聯繫 info@pingcap.com,或者在 Repo 中提 issue 討論。