做者:楊非html
本文爲 DM 源碼閱讀系列文章的第四篇,上篇文章 介紹了數據同步處理單元實現的功能,數據同步流程的運行邏輯以及數據同步處理單元的 interface 設計。本篇文章在此基礎上展開,詳細介紹 dump 和 load 兩個數據同步處理單元的設計實現,重點關注數據同步處理單元 interface 的實現,數據導入併發模型的設計,以及導入任務在暫停或出現異常後如何恢復。mysql
dump 處理單元的代碼位於 github.com/pingcap/dm/mydumper 包內,做用是從上游 MySQL 將表結構和數據導出到邏輯 SQL 文件,因爲該處理單元老是運行在任務的第一個階段(full 模式和 all 模式),該處理單元每次運行不依賴於其餘處理單元的處理結果。另外一方面,若是在 dump 運行過程當中被強制終止(例如在 dmctl 中執行 pause-task 或者 stop-task),也不會記錄已經 dump 數據的 checkpoint 等信息。不記錄 checkpoint 是由於每次運行 mydumper 從上游導出數據,上游的數據均可能發生變動,爲了能獲得一致的數據和 metadata 信息,每次恢復任務或從新運行任務時該處理單元會 清理舊的數據目錄,從新開始一次完整的數據 dump。git
導出表結構和數據的邏輯並非在 DM 內部直接實現,而是 經過 os/exec
包調用外部 mydumper 二進制文件 來完成。在 mydumper 內部,咱們須要關注如下幾個問題:github
mydumper 的一次完整的運行流程從主線程開始,主線程按照如下步驟執行:sql
no-locks
選項進行一系列的備份安全策略,包括 long query guard
和 lock all tables or FLUSH TABLES WITH READ LOCK
。START TRANSACTION WITH CONSISTENT SNAPSHOT
。less locking
處理線程的初始化。trx-consistency-only
選項,執行 UNLOCK TABLES /* trx-only */
釋放以前獲取的表鎖。注意,若是開啓該選項,是沒法保證非 InnoDB 表導出數據的一致性。更多關於一致性讀的細節能夠參考 MySQL 官方文檔 Consistent Nonlocking Reads 部分。no-locks
和 trx-consistency-only
選項,執行 UNLOCK TABLES / FTWRL / 釋放鎖。less-locking
,等待全部 less locking
子線程退出。工做線程的併發控制包括了兩個層面,一層是在不一樣表級別的併發,另外一層是同一張表級別的併發。mydumper 的主線程會將一次同步任務拆分爲多個同步子任務,並將每一個子任務分發給同一個異步隊列 conf.queue_less_locking/conf.queue
,工做子線程從隊列中獲取任務並執行。具體的子任務劃分包括如下策略:數據庫
開啓 less-locking
選項的非 InnoDB 表的處理。c#
non_innodb_table
分爲 num_threads
組,分組方式是遍歷這些表,依此將遍歷到的表加入到當前數據量最小的分組,儘可能保證每一個分組內的數據量相近。rows-per-file
選項,會對每張表進行 chunks
估算,對於每一張表,若是估算結果包含多個 chunks,會將子任務進一步按照 chunks
進行拆分,分發 chunks
數量個子任務,若是沒有 chunks
劃分,分發爲一個獨立的子任務。注意,在該模式下,子任務會 發送到 queue_less_locking
,並在編號爲 num_threads
~ 2 * num_threads
的子線程中處理任務。安全
less_locking_threads
任務執行完成以後,主線程就會 UNLOCK TABLES / FTWRL / 釋放鎖,這樣有助於減小鎖持有的時間。主線程根據 conf.unlock_tables
來判斷非 InnoDB 表是否所有導出,普通工做線程 或者 queue_less_locking 工做線程每次處理完一個非 InnoDB 表任務都會根據 non_innodb_table_counter
和 non_innodb_done
兩個變量判斷是否還有沒有導出結束的非 InnoDB 表,若是都已經導出結束,就會向異步隊列 conf.unlock_tables
中發送一條數據,表示能夠解鎖全局鎖。less_locking_threads
處理非 InnoDB 表任務時,會先 加表鎖,導出數據,最後 解鎖表鎖。未開啓 less-locking
選項的非 InnoDB 表的處理。併發
InnoDB 表的處理。less
less-locking
選項的非 InnoDB 表的處理相同,一樣是 按照表分發子任務,若是有 chunks
子任務會進一步細分。從上述的併發模型能夠看出 mydumper 首先按照表進行同步任務拆分,對於同一張表,若是配置 rows-per-file
參數,會根據該參數和錶行數將表劃分爲合適的 chunks
數,這便是同一張表內部的併發。具體表行數的估算和 chunks
劃分的實現見 get_chunks_for_table
函數。
須要注意目前 DM 在任務配置中指定的庫表黑白名單功能只應用於 load 和 binlog replication 處理單元。若是在 dump 處理單元內使用庫表黑白名單功能,須要在同步任務配置文件的 dump 處理單元配置提供 extra-args 參數,並指定 mydumper 相關參數,包括 --database, --tables-list 和 --regex。mydumper 使用 regex 過濾庫表的實現參考 check_regex
函數。
load 處理單元的代碼位於 github.com/pingcap/dm/loader 包內,該處理單元在 dump 處理單元運行結束後運行,讀取 dump 處理單元導出的 SQL 文件解析並在下游數據庫執行邏輯 SQL。咱們重點分析 Init
和 Process
兩個 interface 的實現。
該階段進行一些初始化和清理操做,並不會開始同步任務,若是在該階段運行中出現錯誤,會經過 rollback 機制 清理資源,不須要調用 Close 函數。該階段包含的初始化操做包括如下幾點:
checkpoint
,checkpoint
用於記錄全量數據的導入進度和 load 處理單元暫停或異常終止後,恢復或從新開始任務時能夠從斷點處繼續導入數據。應用任務配置的數據同步規則,包括如下規則:
該階段的工做流程也很直觀,經過 一個收發數據類型爲 *pb.ProcessError
的 channel
接收運行過程當中出現的錯誤,出錯後經過 context 的 CancelFunc
強制結束處理單元運行。在覈心的 數據導入函數 中,工做模型與 mydumper 相似,即在 主線程中分發任務,有多個工做線程執行具體的數據導入任務。具體的工做細節以下:
<db-name>-schema-create.sql
和建表語句文件 <db-name>.<table-name>-schema-create.sql
,並在下游執行 SQL 建立相對應的庫和表。主線程讀取 checkpoint
信息,結合數據文件信息建立 fileJob 隨機分發任務給一個工做子線程,fileJob 任務的結構以下所示 :
type fileJob struct { schema string table string dataFile string offset int64 // 表示讀取文件的起始 offset,若是沒有 checkpoint 斷點信息該值爲 0 info *tableInfo // 保存原庫表,目標庫表,列名,insert 語句 column 名字列表等信息 }
在每一個工做線程內部,有一個循環不斷從本身 fileJobQueue
獲取任務,每次獲取任務後會對文件進行解析,並將解析後的結果分批次打包爲 SQL 語句分發給線程內部的另一個工做協程,該工做協程負責處理 SQL 語句的執行。工做流程的僞代碼以下所示,完整的代碼參考 func (w *Worker) run()
:
// worker 工做線程內分發給內部工做協程的任務結構 type dataJob struct { sql string // insert 語句, insert into <table> values (x, y, z), (x2, y2, z2), … (xn, yn, zn); schema string // 目標數據庫 file string // SQL 文件名 offset int64 // 本次導入數據在 SQL 文件的偏移量 lastOffset int64 // 上一次已導入數據對應 SQL 文件偏移量 } // SQL 語句執行協程 doJob := func() { for { select { case <-ctx.Done(): return case job := <-jobQueue: sqls := []string{ fmt.Sprintf("USE `%s`;", job.schema), // 指定插入數據的 schema job.sql, checkpoint.GenSQL(job.file, job.offset), // 更新 checkpoint 的 SQL 語句 } executeSQLInOneTransaction(sqls) // 在一個事務中執行上述 3 條 SQL 語句 } } } // worker 主線程 for { select { case <-ctx.Done(): return case job := <-fileJobQueue: go doJob() readDataFileAndDispatchSQLJobs(ctx, dir, job.dataFile, job.offset, job.info) } }
dispatchSQL
函數負責在工做線程內部讀取 SQL 文件和重寫 SQL,該函數會在運行初始階段 建立所操做表的 checkpoint
信息,須要注意在任務中斷恢復以後,若是這個文件的導入尚未完成,checkpoint.Init
仍然會執行,可是此次運行不會更新該文件的 checkpoint
信息。列值轉換和庫表路由也是在這個階段內完成。
offset
信息生成一條更新 checkpoint 的語句,加入到打包的 SQL 語句中,具體執行時這些語句會 在一個事務中提交,這樣就保證了斷點信息的準確性,若是導入過程暫停或中斷,恢復任務後從斷點從新同步能夠保證數據一致。本篇詳細介紹 dump 和 load 兩個數據同步處理單元的設計實現,對核心 interface 實現、數據導入併發模型、數據導入暫停或中斷的恢復進行了分析。接下來的文章會繼續介紹 binlog replication
,relay log
兩個數據同步處理單元的實現。