完結篇 | TiDB Binlog 源碼閱讀系列文章 (九)同步數據到下游

image

上篇文章介紹了用於將 binlog 同步到 MySQL / TiDB 的 Loader package,本文往回退一步,介紹 Drainer 同步到不一樣下游的機制。

TiDB Binlog(github.com/pingcap/tidb-binlog)用於收集 TiDB 的 binlog,並準實時同步給下游。 同步數據這一步重要操做由 Drainer 模塊支持,它能夠將 binlog 同步到 TiDB / MySQL / Kafka / File (增量備份)等下游組件。mysql

  • 對於 TiDB 和 MySQL 兩種類型的下游組件,Drainer 會從 binlog 中還原出對應的 SQL 操做在下游直接執行;
  • 對於 Kafka 和 File(增量備份)兩種類型的下游組件,輸出約定編碼格式的 binlog。用戶能夠定製後續各類處理流程,如更新搜索引擎索引、清除緩存、增量備份等。TiDB Binlog 自帶工具 Reparo 實現了將增量備份數據(下游類型爲 File(增量備份))同步到 TiDB / MySQL 的功能。

本文將按如下幾個小節介紹 Drainer 如何將收到的 binlog 同步到下游:git

  1. Drainer Sync 模塊:Drainer 經過 Sync 模塊調度整個同步過程,全部的下游相關的同步邏輯統一封裝成了 Syncer 接口。
  2. 恢復工具 Reparo (讀音:reh-PAH-roh):從下游保存的 File(增量備份)中讀取 binlog 同步到 TiDB / MySQL。

Drainer Sync 模塊

Syncer

同步機制的核心是 Syncer 接口,定義以下:github

// Syncer sync binlog item to downstream
type Syncer interface {
  // Sync the binlog item to downstream
  Sync(item *Item) error
  // will be close if Close normally or meet error, call Error() to check it
  Successes() <-chan *Item
  // Return not nil if fail to sync data to downstream or nil if closed normally
  Error() <-chan error
  // Close the Syncer, no more item can be added by `Sync`
  Close() error
}

其中 Sync 方法表示異步地向下遊同步一個 binlog,對應的參數類型是 *Item,這是一個封裝了 binlog 的結構體;Successes 方法返回一個 channel,從中能夠讀取已經成功同步到下游的 Item;Error 方法返回一個 channel,當 Syncer 同步過程出錯中斷時,會往這個 channel 發送遇到的錯誤;Close 用於關掉 Syncer,釋放資源。golang

支持的每一個下游類型在 drainer/sync 目錄下都有一個對應的 Syncer 實現,例如 MySQL 對應的是 mysql.go 裏的 MySQLSyncer,Kafka 對應的是 kafka.go 裏的 KafkaSyncer。Drainer 啓動時,會根據配置文件中指定的下游,找到對應的 Syncer 實現,而後就能夠用統一的接口管理整個同步過程了。sql

Checkpoint

同步進程可能由於各類緣由退出,重啓後要恢復同步就須要知道上次同步的進度。在 Drainer 裏記錄同步進度的功能抽象成 Checkpoint 接口,其定義以下:數據庫

type CheckPoint interface {
  // Load loads checkpoint information.
  Load() error

  // Save saves checkpoint information.
  Save(int64) error

  // Pos gets position information.
  TS() int64

  // Close closes the CheckPoint and release resources, after closed other methods should not be called again.
  Close() error
}

從以上定義中能夠看到,Save 的參數和 TS 的返回結果都是 int64 類型,由於同步的進度是以 TiDB 中單調遞增的 commit timestamp 來記錄的,它的類型就是 int64。緩存

Drainer 支持不一樣類型的 Checkpoint 實現,例如  mysql.go 裏的 MySQLCheckpoint,默認將 commit timestamp 寫到 tidb_binlog 庫下的 checkpoint 表。Drainer 會根據下游類型自動選擇不一樣的 Checkpoint 實現,例如 TiDB / MySQL 的下游就會使用 MySQLCheckPoint,File(增量備份) 則使用 PbCheckpoint異步

在 Syncer 小節,咱們看到 Syncer 的 Successes 方法提供了一個 channel 用來接收已經處理完畢的 binlog,收到 binlog 後,咱們用 Checkpoint 的 Save 方法保存 binlog 的 commit timestamp 就能夠記下同步進度,細節可查看源碼中的 handleSuccess 方法。函數

Translator

Syncer 在收到 binlog 後須要將裏面記錄的變動轉換成適合下游 Syncer 類型的格式,這部分實如今 drainer/translator 包。工具

如下游是 MySQL / TiDB 的狀況爲例。MySQLSyncer.Sync 會先調用 TiBinlogToTxn

將 binlog 轉換成 loader.Txn 以便接入下層的 loader 模塊 (loader 接收一個個 loader.Txn 結構並還原成對應的 SQL 批量寫入 MySQL / TiDB)。

loader.Txn 定義以下:

// Txn holds transaction info, an DDL or DML sequences
type Txn struct {
  DMLs []*DML
  DDL  *DDL

  // This field is used to hold arbitrary data you wish to include so it
  // will be available when receiving on the Successes channel
  Metadata interface{}
}

Txn 主要有兩類:DDL 和 DML。Metadata 目前放的就是傳給 Sync 的 *Item 對象。DDL 的狀況比較簡單,由於 binlog 中已經直接包含了咱們要用到的 DDL Query。DML 則須要遍歷 binlog 中的一個個行變動,根據它的類型 insert / update / delete 還原成相應的 loader.DML

Schema

上個小節中,咱們提到了對行變動數據的解析,在 binlog 中編碼的行變動是沒有列信息的,咱們須要查到對應版本的列信息才能還原出 SQL 語義。Schema 就是解決這個問題的模塊。

在 Drainer 啓動時,會調用 loadHistoryDDLJobs 從 TiKV 處查詢截至當前時間全部已完成的 DDL Job 記錄,按 SchemaVersion 升序排序(能夠粗略認爲這是一個單調遞增地賦給每一個 DDL 任務的版本號)。這些記錄在 Syncer 中會用於建立一個 Schema 對象。在運行過程當中,Drainer 每遇到一條 DDL 也會添加到 Schema 中

binlog 中帶有一個 SchemaVersion 信息,記錄這條 binlog 生成的時刻 Schema 版本。在同步 Binlog 前,咱們會先用這個 SchemaVersion 信息調用 Schema 的一個方法 handlePreviousDDLJobIfNeed。上一段中咱們看到 Schema 從何處收集到有序的 DDL Job 記錄,這個方法則是按順序應用 SchemaVersion 小於等於指定版本的 DDL Job,在 Schema 中維護每一個表對應版本的最新結構信息,去掉一些錯誤代碼後實現大體以下:

func (s *Schema) handlePreviousDDLJobIfNeed(version int64) error {
  var i int
  for i = 0; i < len(s.jobs); i++ {
     if s.jobs[i].BinlogInfo.SchemaVersion <= version {
        _, _, _, err := s.handleDDL(s.jobs[i])
        if err != nil {
           return errors.Annotatef(err, "handle ddl job %v failed, the schema info: %s", s.jobs[i], s)
        }
     } else {
        break
     }
  }

  s.jobs = s.jobs[i:]

  return nil
}

對於每一個符合條件的 Job,由 handleDDL 方法將其表結構 TableInfo 等信息更新到 Schema 中,其餘模塊就能夠查詢到表格當前最新的信息。

恢復工具

咱們知道 Drainer 除了能夠將 binlog 直接還原到下游數據庫之外,還支持同步到其餘外部存儲系統塊,因此咱們也提供了相應的工具來處理存儲下來的文件,Reparo 是其中之一,用於讀取存儲在文件系統中的 binlog 文件,寫入 TiDB 中。本節簡單介紹下 Reparo 的用途與實現,讀者能夠做爲示例瞭解如何處理同步到文件系統的 binlog 增量備份。

Reparo

Reparo 能夠讀取同步到文件系統上的 binlog 增量備份並同步到 TiDB。

讀取 binlog

當下遊設置成 File(增量備份) 時,Drainer 會將 Protobuf 編碼的 binlog 保存到指定目錄,每寫滿 512 MB 新建一個文件。每一個文件有個編號,從 0 開始依次類推。文件名格式定義以下:

// BinlogName creates a binlog file name. The file name format is like binlog-0000000000000001-20181010101010
func BinlogName(index uint64) string {
  currentTime := time.Now()
  return binlogNameWithDateTime(index, currentTime)
}

// binlogNameWithDateTime creates a binlog file name.
func binlogNameWithDateTime(index uint64, datetime time.Time) string {
  return fmt.Sprintf("binlog-%016d-%s", index, datetime.Format(datetimeFormat))
}

文件的前綴都是 「binlog-」,後面跟一個 16 位右對齊的編號和一個時間戳。將目錄裏的文件按字母順序排序就能夠獲得按編號排序的 binlog 文件名。從指定目錄獲取文件列表的實現以下:

// ReadDir reads and returns all file and dir names from directory
func ReadDir(dirpath string) ([]string, error) {
  dir, err := os.Open(dirpath)
  if err != nil {
     return nil, errors.Trace(err)
  }
  defer dir.Close()

  names, err := dir.Readdirnames(-1)
  if err != nil {
     return nil, errors.Annotatef(err, "dir %s", dirpath)
  }

  sort.Strings(names)

  return names, nil
}

這個函數簡單地獲取目錄裏所有文件名,排序後返回。在上層還作了一些過濾來去掉臨時文件等。獲得文件列表後,Reparo 會用標準庫的 bufio.NewReader 逐個打開文件,而後用 Decode 函數讀出其中的一條條 binlog:

func Decode(r io.Reader) (*pb.Binlog, int64, error) {
  payload, length, err := binlogfile.Decode(r)
  if err != nil {
     return nil, 0, errors.Trace(err)
  }

  binlog := &pb.Binlog{}
  err = binlog.Unmarshal(payload)
  if err != nil {
     return nil, 0, errors.Trace(err)
  }
  return binlog, length, nil
}

這裏先調用了 binlogfile.Decode 從文件中解析出對應 Protobuf 編碼的一段二進制數據而後解碼出 binlog。

寫入 TiDB

獲得 binlog 後就能夠準備寫入 TiDB。Reparo 這部分實現像一個簡化版的 Drainer 的 Sync 模塊,一樣有一個 Syncer 接口以及幾個具體實現(除了 mysqlSyncer 還有用於調試的 printSyncermemSyncer),因此就再也不介紹。值得一提的是,這裏也跟前面不少 MySQL / TiDB 同步相關的模塊同樣使用了 loader 模塊。

小結

本文介紹了 Drainer 是如何實現數據同步的以及 Reparo 如何從文件系統中恢復增量備份數據到 MySQL / TiDB。在 Drainer 中,Syncer 封裝了同步到各個下游模塊的具體細節,Checkpoint 記錄同步進度,Translator 從 binlog 中還原出具體的變動,Schema 在內存中維護每一個表對應的表結構定義。

TiDB Binlog 源碼閱讀系列在此就所有完結了,相信你們經過本系列文章更全面地理解了 TiDB Binlog 的原理和實現細節。咱們將繼續打磨優化,歡迎你們給咱們反饋使用過程當中遇到的問題或建議;若是社區小夥伴們想參與 TiDB Binlog 的設計、開發和測試,也歡迎與咱們聯繫 info@pingcap.com,或者在 Repo 中提 issue 討論。
image

相關文章
相關標籤/搜索