做者:王相node
TiDB-Binlog 組件用於收集 TiDB 的 binlog,並提供實時備份和同步功能。該組件在功能上相似於 MySQL 的主從複製,MySQL 的主從複製依賴於記錄的 binlog 文件,TiDB-Binlog 組件也是如此,主要的不一樣點是 TiDB 是分佈式的,所以須要收集各個 TiDB 實例產生的 binlog,並按照事務提交的時間排序後才能同步到下游。若是你須要部署 TiDB 集羣的從庫,或者想訂閱 TiDB 數據的變動輸出到其餘的系統中,TiDB-Binlog 則是必不可少的工具。git
TiDB-Binlog 這個組件已經發布了 2 年多時間,經歷過幾回架構演進,去年十月到如今大規模使用的是 Kafka 版本,架構圖以下:github
Kafka 版本的 TiDB-Binlog 主要包括兩個組件:算法
Pump:一個守護進程,在每一個 TiDB 主機的後臺運行。其主要功能是實時記錄 TiDB 產生的 binlog 並順序寫入 Kafka 中。數據庫
Drainer: 從 Kafka 中收集 binlog,並按照 TiDB 中事務的提交順序轉化爲指定數據庫兼容的 SQL 語句或者指定格式的數據,最後同步到目的數據庫或者寫到順序文件。json
這個架構的工做原理爲:緩存
TiDB 須要與 Pump 綁定,即 TiDB 實例只能將它生成的 binlog 發送到一個指定的 Pump 中;bash
Pump 將 binlog 先寫到本地文件,再異步地寫入到 Kafka;數據結構
Drainer 從 Kafka 中讀出 binlog,對 binlog 進行排序,對 binlog 解析後生成 SQL 或指定格式的數據再同步到下游。架構
根據用戶的反饋,以及咱們本身作的一些測試,發現該版本主要存在一些問題。
首先,TiDB 的負載可能不均衡,部分 TiDB 業務較多,產生的 binlog 也比較多,對應的 Pump 的負載高,致使數據同步延遲高。
其次,依賴 Kafka 集羣,增長了運維成本;並且 TiDB 產生的單條 binlog 的大小可達 2G(例如批量刪除數據、批量寫入數據),須要配置 Kafka 的消息大小相關設置,而 Kafka 並不太適合單條數據較大的場景。
最後,Drainer 須要讀取 Kafka 中的 binlog、對 binlog 進行排序、解析 binlog,同步數據到下游等工做,能夠看出 Drainer 的工做較多,並且 Drainer 是一個單點,因此每每同步數據的瓶頸都在 Drainer。
以上這些問題咱們很難在已有的框架下進行優化,所以咱們對 TiDB-Binlog 進行了重構,最新版本的 TiDB-Binlog 的整體架構以下圖所示:
新版本 TiDB-Binlog 再也不使用 Kafka 存儲 binlog,仍然保留了 Pump 和 Drainer 兩個組件,可是對功能進行了調整:
Pump 用於實時記錄 TiDB 產生的 binlog,並將 binlog 按照事務的提交時間進行排序,再提供給 Drainer 進行消費。
Drainer 從各個 Pump 中收集 binlog 進行歸併,再將 binlog 轉化成 SQL 或者指定格式的數據,最終同步到下游。
該版本的主要優勢爲:
多個 Pump 造成一個集羣,能夠水平擴容,各個 Pump 能夠均勻地承擔業務的壓力。
TiDB 經過內置的 Pump Client 將 binlog 分發到各個 Pump,即便有部分 Pump 出現故障也不影響 TiDB 的業務。
Pump 內部實現了簡單的 kv 來存儲 binlog,方便對 binlog 數據的管理。
原來 Drainer 的 binlog 排序邏輯移到了 Pump 來作,而 Pump 是可擴展的,這樣就能提升總體的同步性能。
Drainer 再也不須要像原來同樣讀取一批 binlog 到內存裏進行堆排序,只須要依次讀取各個 Pump 的 binlog 進行歸併排序,這樣能夠大大節省內存的使用,同時也更容易作內存控制。
因爲該版本最大的特色是多個 Pump 組成了一個集羣(cluster),所以該版本命名爲 cluster 版本。下面咱們以最新的 cluster 版本的架構來介紹 TiDB-Binlog 的實現原理。
首先咱們先介紹一下 TiDB 中的 binlog,TiDB 的事務採用 2pc 算法,一個成功的事務會寫兩條 binlog,包括一條 Prewrite binlog 和 一條 Commit binlog;若是事務失敗,會發一條 Rollback binlog。
binlog 的結構定義爲:
// Binlog 記錄事務中全部的變動,能夠用 Binlog 構建 SQL
message Binlog {
// Binlog 的類型,包括 Prewrite、Commit、Rollback 等
optional BinlogType tp = 1 [(gogoproto.nullable) = false];
// Prewrite, Commit 和 Rollback 類型的 binlog 的 start_ts,記錄事務開始的 ts
optional int64 start_ts = 2 [(gogoproto.nullable) = false];
// commit_ts 記錄事務結束的 ts,只記錄在 commit 類型的 binlog 中
optional int64 commit_ts = 3 [(gogoproto.nullable) = false];
// prewrite key 只記錄在 Prewrite 類型的 binlog 中,
// 是一個事務的主鍵,用於查詢該事務是否提交
optional bytes prewrite_key = 4;
// prewrite_value 記錄在 Prewrite 類型的 binlog 中,用於記錄每一行數據的改變
optional bytes prewrite_value = 5;
// ddl_query 記錄 ddl 語句
optional bytes ddl_query = 6;
// ddl_job_id 記錄 ddl 的 job id
optional int64 ddl_job_id = 7 [(gogoproto.nullable) = false];
}
複製代碼
binlog 及相關的數據結構定義見: binlog.proto
其中 start_ts
爲事務開始時的 ts,commit_ts
爲事務提交的 ts。ts 是由物理時間和邏輯時間轉化而成的,在 TiDB 中是惟一的,由 PD 來統一提供。在開始一個事務時,TiDB 會請求 PD,獲取一個 ts 做爲事務的 start_ts
,在事務提交時則再次請求 PD 獲取一個 ts 做爲 commit_ts
。 咱們在 Pump 和 Drainer 中就是根據 binlog 的 commit_ts
來對 binlog 進行排序的。
TiDB 的 binlog 記錄爲 row 模式,即保存每一行數據的改變。數據的變化記錄在 prewrite_value
字段中,該字段的數據主要由序列化後的 TableMutation 結構的數據組成。TableMutation 的結構以下所示:
// TableMutation 存儲表中數據的變化
message TableMutation {
// 表的 id,惟一標識一個表
optional int64 table_id = 1 [(gogoproto.nullable) = false];
// 保存插入的每行數據
repeated bytes inserted_rows = 2;
// 保存修改前和修改後的每行的數據
repeated bytes updated_rows = 3;
// 已廢棄
repeated int64 deleted_ids = 4;
// 已廢棄
repeated bytes deleted_pks = 5;
// 刪除行的數據
repeated bytes deleted_rows = 6;
// 記錄數據變動的順序
repeated MutationType sequence = 7;
}
複製代碼
下面以一個例子來講明 binlog 中是怎麼存儲數據的變化的。
例如 table 的結構爲:
create table test
(id
int, name
varchar(24), primary key id
)
按照順序執行以下 SQL:
begin;
insert into test(id, name) values(1, "a");
insert into test(id, name) values(2, "b");
update test set name = "c" where id = 1;
update test set name = "d" where id = 2;
delete from test where id = 2;
insert into test(id, name) values(2, "c");
commit;
複製代碼
則生成的 TableMutation 的數據以下所示:
inserted_rows:
1, "a"
2, "b"
2, "c"
updated_rows:
1, "a", 1, "c"
2, "b", 2, "d"
deleted_rows:
2, "d"
sequence:
Insert, Insert, Update, Update, DeleteRow, Insert
複製代碼
能夠從例子中看出,sequence 中保存的數據變動類型的順序爲執行 SQL 的順序,具體變動的數據內容則保存到了相應的變量中。
Drainer 在把 binlog 數據同步到下游前,就須要把上面的這些數據還原成 SQL,再同步到下游。
另外須要說明的是,TiDB 在寫 binlog 時,會同時向 TiKV 發起寫數據請求和向 Pump 發送 Prewrite binlog,若是 TiKV 和 Pump 其中一個請求失敗,則該事務失敗。當 Prewrite 成功後,TiDB 向 TiKV 發起 Commit 消息,並異步地向 Pump 發送一條 Commit binlog。因爲 TiDB 是同時向 TiKV 和 Pump 發送請求的,因此只要保證 Pump 處理 Prewrite binlog 請求的時間小於等於 TiKV 執行 Prewrite 的時間,開啓 binlog 就不會對事務的延遲形成影響。
從上面的介紹中咱們知道由多個 Pump 組成一個集羣,共同承擔寫 binlog 的請求,那麼就須要保證 TiDB 可以將寫 binlog 的請求儘量均勻地分發到各個 Pump,而且須要識別不可用的 Pump,及時獲取到新加入集羣中 Pump 信息。這部分的工做是在 Pump Client 中實現的。
Pump Client 以包的形式集成在 TiDB 中,代碼連接:pump_client。
Pump Client 維護 Pump 集羣的信息,Pump 的信息主要來自於 PD 中保存的 Pump 的狀態信息,狀態信息的定義以下(代碼連接:Status):
type Status struct {
// Pump/Drainer 實例的惟一標識
NodeID string `json:"nodeId"`
// Pump/Drainer 的服務地址
Addr string `json:"host"`
// Pump/Drainer 的狀態,值能夠爲 online、pausing、paused、closing、offline
State string `json:"state"`
// Pump/Drainer 是否 alive(目前沒有使用該字段)
IsAlive bool `json:"isAlive"`
// Pump的分數,該分數是由節點的負載、磁盤使用率、存儲的數據量大小等因素計算得來的,
// 這樣 Pump Client 能夠根據分數來選取合適的 Pump 發送 binlog(待實現)
Score int64 `json:"score"`
// Pump 的標籤,能夠經過 label 對 TiDB 和 Pump 進行分組,
// TiDB 只能將 binlog 發送到相同 label 的 Pump(待實現)
Label *Label `json:"label"`
// Pump: 保存的 binlog 的最大的 commit_ts
// Drainer:已消費的 binlog 的最大的 commit_ts
MaxCommitTS int64 `json:"maxCommitTS"`
// 該狀態信息的更新時間對應的 ts.
UpdateTS int64 `json:"updateTS"`
}
複製代碼
Pump Client 根據 Pump 上報到 PD 的信息以及寫 binlog 請求的實際狀況將 Pump 劃分爲可用 Pump 與不可用 Pump 兩個部分。
劃分的方法包括:
初始化時從 PD 中獲取全部 Pump 的信息,將狀態爲 online 的 Pump 加入到可用 Pump 列表中,其餘 Pump 加入到非可用列表中。
Pump 每隔固定的時間會發送心跳到 PD,並更新本身的狀態。Pump Client 監控 PD 中 Pump 上傳的狀態信息,及時更新內存中維護的 Pump 信息,若是狀態由非 online 轉換爲 online 則將該 Pump 加入到可用 Pump 列表;反之加入到非可用列表中。
在寫 binlog 到 Pump 時,若是該 Pump 在重試屢次後仍然寫 binlog 失敗,則把該 Pump 加入到非可用 Pump 列表中。
定時發送探活請求(數據爲空的 binlog 寫請求)到非可用 Pump 列表中的狀態爲 online 的 Pump,若是返回成功,則把該 Pump 從新加入到可用 Pump 列表中。
經過上面的這些措施,Pump Client 就能夠及時地更新所維護的 Pump 集羣信息,保證將 binlog 發送到可用的 Pump 中。
另一個問題是,怎麼保證 Pump Client 能夠將 binlog 寫請求均勻地分發到各個 Pump?咱們目前提供了幾種路由策略:
range: 按照順序依次選取 Pump 發送 binlog,即第一次選取第一個 Pump,第二次選取第二個 Pump...
hash:對 binlog 的 start_ts
進行 hash,而後選取 hash 值對應的 Pump。
score:根據 Pump 上報的分數按照加權平均算法選取 Pump 發送 binlog(待實現)。
須要注意的地方是,以上的策略只是針對 Prewrite binlog,對於 Commit binlog,Pump Client 會將它發送到對應的 Prewrite binlog 所選擇的 Pump,這樣作是由於在 Pump 中須要將包含 Prewrite binlog 和 Commit binlog 的完整 binlog(即執行成功的事務的 binlog)提供給 Drainer,將 Commit binlog 發送到其餘 Pump 沒有意義。
Pump Client 向 Pump 提交寫 binlog 的請求接口爲 pump.proto 中的 WriteBinlog,使用 grpc 發送 binlog 請求。
Pump 主要用來承擔 binlog 的寫請求,維護 binlog 數據,並將有序的 binlog 提供給 Drainer。咱們將 Pump 抽象成了一個簡單的 kv 數據庫,key 爲 binlog 的 start _ts
(Priwrite binlog) 或者 commit_ts
(Commit binlog),value 爲 binlog 的元數據,binlog 的數據則存在數據文件中。Drainer 像查數據庫同樣的來獲取所須要的 binlog。
Pump 內置了 leveldb 用於存儲 binlog 的元信息。在 Pump 收到 binlog 的寫請求時,會首先將 binlog 數據以 append 的形式寫到文件中,而後將 binlog 的 ts、類型、數據長度、所保存的文件以及在文件中的位置信息保存在 leveldb 中,若是爲 Prewrite binlog,則以 start_ts
做爲 key;若是是 Commit binlog,則以 commit_ts
做爲 key。
當 Drainer 向 Pump 請求獲取指定 ts 以後的 binlog 時,Pump 則查詢 leveldb 中大於該 ts 的 binlog 的元數據,若是當前數據爲 Prewrite binlog,則必須找到對應的 Commit binlog;若是爲 Commit binlog 則繼續向前推動。這裏有個問題,在 binlog 一節中提到,若是 TiKV 成功寫入了數據,而且 Pump 成功接收到了 Prewrite binlog,則該事務就提交成功了,那麼若是在 TiDB 發送 Commit binlog 到 Pump 前發生了一些異常(例如 TiDB 異常退出,或者強制終止了 TiDB 進程),致使 Pump 沒有接收到 Commit binlog,那麼 Pump 中就會一直找不到某些 Prewrite binlog 對應的 Commit binlog。這裏咱們在 Pump 中作了處理,若是某個 Prewrite binlog 超過了十分鐘都沒有找到對應的 Commit binlog,則經過 binlog 數據中的 prewrite_key
去查詢 TiKV 該事務是否提交,若是已經提交成功,則 TiKV 會返回該事務的 commit_ts
;不然 Pump 就丟棄該條 Prewrite binlog。
binlog 元數據中提供了數據存儲的文件和位置,能夠經過這些信息讀取 binlog 文件的指定位置獲取到數據。由於 binlog 數據基本上是按順序寫入到文件中的,所以咱們只須要順序地讀 binlog 文件便可,這樣就保證了不會由於頻繁地讀取文件而影響 Pump 的性能。最終,Pump 以 commit_ts
爲排序標準將 binlog 數據傳輸給 Drainer。Drainer 向 Pump 請求 binlog 數據的接口爲 pump.proto 中的 PullBinlogs,以 grpc streaming 的形式傳輸 binlog 數據。
值得一提的是,Pump 中有一個 fake binlog 機制。Pump 會定時(默認三秒)向本地存儲中寫入一條數據爲空的 binlog,在生成該 binlog 前,會向 PD 中獲取一個 ts,做爲該 binlog 的 start_ts
與 commit_ts
,這種 binlog 咱們叫做 fake binlog。這樣作的緣由在 Drainer 中介紹。
Drainer 從各個 Pump 中獲取 binlog,歸併後按照順序解析 binlog、生成 SQL 或者指定格式的數據,而後再同步到下游。
既然要從各個 Pump 獲取數據,Drainer 就須要維護 Pump 集羣的信息,及時獲取到新增長的 Pump,並識別出不可用的 Pump,這部分功能與 Pump Client 相似,Drainer 也是經過 PD 中存儲的 Pump 的狀態信息來維護 Pump 信息。另外須要注意的是,若是新增長了一個 Pump,必須讓該 Pump 通知 Drainer 本身上線了,這麼作是爲了保證不會丟數據。例如:
集羣中已經存在 Pump1 和 Pump2,Drainer 讀取 Pump1 和 Pump2 的數據並進行歸併:
Pump1 存儲的 binlog 爲{ 1,3,5,7,9 },Pump2 存儲的 binlog 爲{2,4,6,10}。Drainer 從兩個 Pump 獲取 binlog,假設當前已經讀取到了{1,2,3,4,5,6,7}這些 binlog,已處理的 binlog 的位置爲 7。此時 Pump3 加入集羣,從 Pump3 上報本身的上線信息到 PD,到 Drainer 從 PD 中獲取到 Pump3 信息須要必定的時間,若是 Pump3 沒有通知 Drainer 就直接提供寫 binlog 服務,寫入了 binlog{8,12},Drainer 在此期間繼續讀取 Pump1 和 Pump2 的 binlog,假設讀取到了 9,以後才識別到了 Pump3 並將 Pump3 加入到歸併排序中,此時 Pump3 的 binlog 8 就丟失了。爲了不這種狀況,須要讓 Pump3 通知 Drainer 本身已經上線,Drainer 收到通知後將 Pump3 加入到歸併排序,並返回成功給 Pump3,而後 Pump3 才能提供寫 binlog 的服務。
Drainer 經過如上所示的方式對 binlog 進行歸併排序,並推動同步的位置。那麼可能會存在這種狀況:某個 Pump 因爲一些特殊的緣由一直沒有收到 binlog 數據,那麼 Drainer 中的歸併排序就沒法繼續下去,正如咱們用兩條腿走路,其中一隻腿不動就不能繼續前進。咱們使用 Pump 一節中提到的 fake binlog 的機制來避免這種問題,Pump 每隔指定的時間就生成一條 fake binlog,即便某些 Pump 一直沒有數據寫入,也能夠保證歸併排序正常向前推動。
Drainer 將全部 Pump 的數據按照 commit_ts
進行歸併排序後,將 binlog 數據傳遞給 Drainer 中的數據解析及同步模塊。經過上面的 binlog 格式的介紹,咱們能夠看出 binlog 文件中並無存儲表結構的信息,所以須要在 Drainer 中維護全部庫和表的結構信息。在啓動 Drainer 時,Drainer 會請求 TiKV,獲取到全部歷史的 DDL job 的信息,對這些 DDL job 進行過濾,使用 Drainer 啓動時指定的 initial-commit-ts(或者 checkpoint 中保存的 commit_ts
)以前的 DDL 在內存中構建庫和表結構信息。這樣 Drainer 就有了一份 ts 對應時間點的庫和表的快照,在讀取到 DDL 類型的 binlog 時,則更新庫和表的信息;讀取到 DML 類型的 binlog 時,則根據庫和表的信息來生成 SQL。
在生成 SQL 以後,就能夠同步到下游了。爲了提升 Drainer 同步的速度,Drainer 中使用多個協程來執行 SQL。在生成 SQL 時,咱們會使用主鍵/惟一鍵的值做爲該條 SQL 的 key,經過對 key 進行 hash 來將 SQL 發送到對應的協程中。當每一個協程收集到了足夠多的 SQL,或者超過了必定的時間,則將這一批的 SQL 在一個事務中提交到下游。
可是有些 SQL 是相關的,若是被分到了不一樣的協程,那 SQL 的執行順序就不能獲得保證,形成數據的不一致。例如:
SQL1: delete from test.test where id = 1;
SQL2: replace into test.test (id, name ) values(1, "a");
複製代碼
按照順序執行後表中存在 id = 1 該行數據,若是這兩條 SQL 分別分配到了協程 1 和協程 2 中,而且協程 2 先執行了 SQL,則表中再也不存在 id = 1 的數據。爲了不這種狀況的發生,Drainer 中加入了衝突檢測的機制,若是檢測出來兩條 SQL 存在衝突(修改了同一行數據),則暫時不將後面的 SQL 發送到協程,而是生成一個 Flush 類型的 job 發送到全部的協程, 每一個協程在遇到 Flush job 時就會立刻執行所緩存的 SQL。接着纔會把該條有衝突的 SQL 發送到對應的協程中。下面給出一個例子說明一下衝突檢測的機制:
有如下這些 SQL,其中 id 爲表的主鍵:
SQL1: update itest set id = 4, name = "c", age = 15 where id = 3; key: 3, 4
SQL2: update itest set id = 5, name = "b", age = 14 where id = 2; key:5, 2
SQL3:delete from itest where id = 3; key: 3
複製代碼
首先將 SQL1 發送到指定的協程,這時全部的 keys 爲[3,4];
SQL2 的 key[5,2]與 keys 中的[3,4]都沒有衝突,將 SQL2 發送到指定的協程,這時 keys 爲[3,4,5,2];
SQL3 的 key[3]與 keys 中的[3]存在衝突,發送 Flush job 到全部協程,SQL1 和 SQL2 被執行,清空 keys;
將 SQL3 發送到指定的協程,同時更新 keys 爲[3]。
Drainer 經過以上這些機制來高效地同步數據,而且保證數據的一致。