DM 源碼閱讀系列文章(四)dump/load 全量同步的實現

做者:楊非html

本文爲 DM 源碼閱讀系列文章的第四篇,上篇文章 介紹了數據同步處理單元實現的功能,數據同步流程的運行邏輯以及數據同步處理單元的 interface 設計。本篇文章在此基礎上展開,詳細介紹 dump 和 load 兩個數據同步處理單元的設計實現,重點關注數據同步處理單元 interface 的實現,數據導入併發模型的設計,以及導入任務在暫停或出現異常後如何恢復。mysql

dump 處理單元

dump 處理單元的代碼位於 github.com/pingcap/dm/mydumper 包內,做用是從上游 MySQL 將表結構和數據導出到邏輯 SQL 文件,因爲該處理單元老是運行在任務的第一個階段(full 模式和 all 模式),該處理單元每次運行不依賴於其餘處理單元的處理結果。另外一方面,若是在 dump 運行過程當中被強制終止(例如在 dmctl 中執行 pause-task 或者 stop-task),也不會記錄已經 dump 數據的 checkpoint 等信息。不記錄 checkpoint 是由於每次運行 mydumper 從上游導出數據,上游的數據均可能發生變動,爲了能獲得一致的數據和 metadata 信息,每次恢復任務或從新運行任務時該處理單元會 清理舊的數據目錄,從新開始一次完整的數據 dump。git

導出表結構和數據的邏輯並非在 DM 內部直接實現,而是 經過 os/exec 包調用外部 mydumper 二進制文件 來完成。在 mydumper 內部,咱們須要關注如下幾個問題:github

  • 數據導出時的併發模型是如何實現的。
  • no-locks, lock-all-tables, less-locking 等參數有怎樣的功能。
  • 庫表黑白名單的實現方式。

mydumper 的實現細節

mydumper 的一次完整的運行流程從主線程開始,主線程按照如下步驟執行:sql

  1. 解析參數。
  2. 建立到數據庫的鏈接
  3. 會根據 no-locks 選項進行一系列的備份安全策略,包括 long query guardlock all tables or FLUSH TABLES WITH READ LOCK
  4. START TRANSACTION WITH CONSISTENT SNAPSHOT
  5. 記錄 binlog 位點信息
  6. less locking 處理線程的初始化
  7. 普通導出線程初始化
  8. 若是配置了 trx-consistency-only 選項,執行 UNLOCK TABLES /* trx-only */ 釋放以前獲取的表鎖。注意,若是開啓該選項,是沒法保證非 InnoDB 表導出數據的一致性。更多關於一致性讀的細節能夠參考 MySQL 官方文檔 Consistent Nonlocking Reads 部分
  9. 根據配置規則(包括 --database, --tables-list 和 --regex 配置)讀取須要導出的 schema 和表信息,並在這個過程當中有區分的記錄 innodb_tables 和 non_innodb_table
  10. 爲工做子線程建立任務,並將任務 push 到相關的工做隊列
  11. 若是沒有配置 no-lockstrx-consistency-only 選項,執行 UNLOCK TABLES / FTWRL / 釋放鎖
  12. 若是開啓 less-locking,等待全部 less locking 子線程退出
  13. 等待全部工做子線程退出

工做線程的併發控制包括了兩個層面,一層是在不一樣表級別的併發,另外一層是同一張表級別的併發。mydumper 的主線程會將一次同步任務拆分爲多個同步子任務,並將每一個子任務分發給同一個異步隊列 conf.queue_less_locking/conf.queue,工做子線程從隊列中獲取任務並執行。具體的子任務劃分包括如下策略:數據庫

從上述的併發模型能夠看出 mydumper 首先按照表進行同步任務拆分,對於同一張表,若是配置 rows-per-file 參數,會根據該參數和錶行數將表劃分爲合適的 chunks 數,這便是同一張表內部的併發。具體表行數的估算和 chunks 劃分的實現見 get_chunks_for_table 函數。

須要注意目前 DM 在任務配置中指定的庫表黑白名單功能只應用於 load 和 binlog replication 處理單元。若是在 dump 處理單元內使用庫表黑白名單功能,須要在同步任務配置文件的 dump 處理單元配置提供 extra-args 參數,並指定 mydumper 相關參數,包括 --database, --tables-list 和 --regex。mydumper 使用 regex 過濾庫表的實現參考 check_regex 函數。

load 處理單元

load 處理單元的代碼位於 github.com/pingcap/dm/loader 包內,該處理單元在 dump 處理單元運行結束後運行,讀取 dump 處理單元導出的 SQL 文件解析並在下游數據庫執行邏輯 SQL。咱們重點分析 InitProcess 兩個 interface 的實現。

Init 實現細節

該階段進行一些初始化和清理操做,並不會開始同步任務,若是在該階段運行中出現錯誤,會經過 rollback 機制 清理資源,不須要調用 Close 函數。該階段包含的初始化操做包括如下幾點:

Process 實現細節

該階段的工做流程也很直觀,經過 一個收發數據類型爲 *pb.ProcessErrorchannel 接收運行過程當中出現的錯誤,出錯後經過 context 的 CancelFunc 強制結束處理單元運行。在覈心的 數據導入函數 中,工做模型與 mydumper 相似,即在 主線程中分發任務有多個工做線程執行具體的數據導入任務。具體的工做細節以下:

  • 主線程會按照庫,表的順序讀取建立庫語句文件 <db-name>-schema-create.sql 和建表語句文件 <db-name>.<table-name>-schema-create.sql,並在下游執行 SQL 建立相對應的庫和表。
  • 主線程讀取 checkpoint 信息,結合數據文件信息建立 fileJob 隨機分發任務給一個工做子線程,fileJob 任務的結構以下所示 :

    type fileJob struct {
       schema    string
       table     string
       dataFile  string
       offset    int64 // 表示讀取文件的起始 offset,若是沒有 checkpoint 斷點信息該值爲 0
       info      *tableInfo // 保存原庫表,目標庫表,列名,insert 語句 column 名字列表等信息
    }
  • 在每一個工做線程內部,有一個循環不斷從本身 fileJobQueue 獲取任務,每次獲取任務後會對文件進行解析,並將解析後的結果分批次打包爲 SQL 語句分發給線程內部的另一個工做協程,該工做協程負責處理 SQL 語句的執行。工做流程的僞代碼以下所示,完整的代碼參考 func (w *Worker) run()

    // worker 工做線程內分發給內部工做協程的任務結構
    type dataJob struct {
       sql         string // insert 語句, insert into <table> values (x, y, z), (x2, y2, z2), … (xn, yn, zn);
       schema      string // 目標數據庫
       file        string // SQL 文件名
       offset      int64 // 本次導入數據在 SQL 文件的偏移量
       lastOffset  int64 // 上一次已導入數據對應 SQL 文件偏移量
    }
    
    // SQL 語句執行協程
    doJob := func() {
       for {
           select {
           case <-ctx.Done():
               return
           case job := <-jobQueue:
               sqls := []string{
                   fmt.Sprintf("USE `%s`;", job.schema), // 指定插入數據的 schema
                   job.sql,
                   checkpoint.GenSQL(job.file, job.offset), // 更新 checkpoint 的 SQL 語句
               }
               executeSQLInOneTransaction(sqls) // 在一個事務中執行上述 3 條 SQL 語句
           }
       }
    }
    ​
    // worker 主線程
    for {
       select {
       case <-ctx.Done():
           return
       case job := <-fileJobQueue:
           go doJob()
           readDataFileAndDispatchSQLJobs(ctx, dir, job.dataFile, job.offset, job.info)
       }
    }
  • dispatchSQL 函數負責在工做線程內部讀取 SQL 文件和重寫 SQL,該函數會在運行初始階段 建立所操做表的 checkpoint 信息,須要注意在任務中斷恢復以後,若是這個文件的導入尚未完成,checkpoint.Init 仍然會執行,可是此次運行不會更新該文件的 checkpoint 信息列值轉換和庫表路由也是在這個階段內完成

    • 列值轉換:須要對輸入 SQL 進行解析拆分爲每個 field,對須要轉換的 field 進行轉換操做,而後從新拼接起 SQL 語句。詳細重寫流程見 reassemble 函數。
    • 庫表路由:這種場景下只須要 替換源表到目標表 便可。
  • 在工做線程執行一個批次的 SQL 語句以前,會首先根據文件 offset 信息生成一條更新 checkpoint 的語句,加入到打包的 SQL 語句中,具體執行時這些語句會 在一個事務中提交,這樣就保證了斷點信息的準確性,若是導入過程暫停或中斷,恢復任務後從斷點從新同步能夠保證數據一致。

小結

本篇詳細介紹 dump 和 load 兩個數據同步處理單元的設計實現,對核心 interface 實現、數據導入併發模型、數據導入暫停或中斷的恢復進行了分析。接下來的文章會繼續介紹 binlog replicationrelay log 兩個數據同步處理單元的實現。

相關文章
相關標籤/搜索