做者:姚維html
這篇文章不是講 TiDB Binlog 組件的源碼,而是講 TiDB 在執行 DML/DDL 語句過程當中,如何將 Binlog 數據 發送給 TiDB Binlog 集羣的 Pump 組件。目前 TiDB 在 DML 上的 Binlog 用的相似 Row-based 的格式。具體 Binlog 具體的架構細節能夠參考這篇 文章。mysql
這裏只描述 TiDB 中的代碼實現。git
TiDB 採用 protobuf 來編碼 binlog,具體的格式能夠見 binlog.proto。這裏討論 TiDB 寫 Binlog 的機制,以及 Binlog 對 TiDB 寫入的影響。github
TiDB 會在 DML 語句提交,以及 DDL 語句完成的時候,向 pump 輸出 Binlog。算法
DML 語句包括 Insert/Replace、Update、Delete,這裏挑 Insert 語句來闡述,其餘的語句行爲都相似。首先在 Insert 語句執行完插入(未提交)以前,會把本身新增的數據記錄在 binlog.TableMutation
結構體中。sql
// TableMutation 存儲表中數據的變化 message TableMutation { // 表的 id,惟一標識一個表 optional int64 table_id = 1 [(gogoproto.nullable) = false]; // 保存插入的每行數據 repeated bytes inserted_rows = 2; // 保存修改前和修改後的每行的數據 repeated bytes updated_rows = 3; // 已廢棄 repeated int64 deleted_ids = 4; // 已廢棄 repeated bytes deleted_pks = 5; // 刪除行的數據 repeated bytes deleted_rows = 6; // 記錄數據變動的順序 repeated MutationType sequence = 7; }
這個結構體保存於跟每一個 Session 連接相關的事務上下文結構體中 TxnState.mutations
。 一張表對應一個 TableMutation
對象,TableMutation
裏面保存了這個事務對這張表的全部變動數據。Insert 會把當前語句插入的行,根據 RowID
+ Row-value
的格式編碼以後,追加到 TableMutation.InsertedRows
中:session
func (t *Table) addInsertBinlog(ctx context.Context, h int64, row []types.Datum, colIDs []int64) error { mutation := t.getMutation(ctx) pk, err := codec.EncodeValue(ctx.GetSessionVars().StmtCtx, nil, types.NewIntDatum(h)) if err != nil { return errors.Trace(err) } value, err := tablecodec.EncodeRow(ctx.GetSessionVars().StmtCtx, row, colIDs, nil, nil) if err != nil { return errors.Trace(err) } bin := append(pk, value...) mutation.InsertedRows = append(mutation.InsertedRows, bin) mutation.Sequence = append(mutation.Sequence, binlog.MutationType_Insert) return nil }
等到全部的語句都執行完以後,在 TxnState.mutations
中就保存了當前事務對全部表的變動數據。架構
對於 DML 而言,TiDB 的事務採用 2-phase-commit 算法,一次事務提交會分爲 Prewrite 階段,以及 Commit 階段。這裏分兩個階段來看看 TiDB 具體的行爲。app
在 session.doCommit
函數中,TiDB 會構造 binlog.PrewriteValue
:異步
message PrewriteValue { optional int64 schema_version = 1 [(gogoproto.nullable) = false]; repeated TableMutation mutations = 2 [(gogoproto.nullable) = false]; }
這個 PrewriteValue
中包含了跟此次變更相關的全部行數據,TiDB 會填充一個類型爲 binlog.BinlogType_Prewrite
的 Binlog:
info := &binloginfo.BinlogInfo{ Data: &binlog.Binlog{ Tp: binlog.BinlogType_Prewrite, PrewriteValue: prewriteData, }, Client: s.sessionVars.BinlogClient.(binlog.PumpClient), }
TiDB 這裏用一個事務的 Option kv.BinlogInfo
來把 BinlogInfo
綁定到當前要提交的 transaction 對象中:
s.txn.SetOption(kv.BinlogInfo, info)
在 twoPhaseCommitter.execute
中,在把數據 prewrite 到 TiKV 的同時,會調用 twoPhaseCommitter.prewriteBinlog
,這裏會把關聯的 binloginfo.BinlogInfo
取出來,把 Binlog 的 binlog.PrewriteValue
輸出到 Pump。
binlogChan := c.prewriteBinlog() err := c.prewriteKeys(NewBackoffer(prewriteMaxBackoff, ctx), c.keys) if binlogChan != nil { binlogErr := <-binlogChan // 等待 write prewrite binlog 完成 if binlogErr != nil { return errors.Trace(binlogErr) } }
這裏值得注意的是,在 prewrite 階段,是須要等待 write prewrite binlog 完成以後,才能繼續作接下去的提交的,這裏是爲了保證 TiDB 成功提交的事務,Pump 至少必定能收到 Prewrite Binlog。
在 twoPhaseCommitter.execute
事務提交結束以後,事務可能提交成功,也可能提交失敗。TiDB 須要把這個狀態告知 Pump:
err = committer.execute(ctx) if err != nil { committer.writeFinishBinlog(binlog.BinlogType_Rollback, 0) return errors.Trace(err) } committer.writeFinishBinlog(binlog.BinlogType_Commit, int64(committer.commitTS))
若是發生了 error,那麼輸出的 Binlog 類型就爲 binlog.BinlogType_Rollback
,若是成功提交,那麼輸出的 Binlog 類型就爲 binlog.BinlogType_Commit
。
func (c *twoPhaseCommitter) writeFinishBinlog(tp binlog.BinlogType, commitTS int64) { if !c.shouldWriteBinlog() { return } binInfo := c.txn.us.GetOption(kv.BinlogInfo).(*binloginfo.BinlogInfo) binInfo.Data.Tp = tp binInfo.Data.CommitTs = commitTS go func() { err := binInfo.WriteBinlog(c.store.clusterID) if err != nil { log.Errorf("failed to write binlog: %v", err) } }() }
值得注意的是,這裏 WriteBinlog 是單獨啓動 goroutine 異步完成的,也就是 Commit 階段,是再也不須要等待寫 binlog 完成的。這裏能夠節省一點 commit 的等待時間,這裏不須要等待是由於 Pump 即便接收不到這個 Commit Binlog,在超過 timeout 時間後,Pump 會自行根據 Prewrite Binlog 到 TiKV 中確認當條事務的提交狀態。
一個 DDL 有以下幾個狀態:
const ( JobStateNone JobState = 0 JobStateRunning JobState = 1 JobStateRollingback JobState = 2 JobStateRollbackDone JobState = 3 JobStateDone JobState = 4 JobStateSynced JobState = 6 JobStateCancelling JobState = 7 )
這些狀態表明瞭一個 DDL 任務所處的狀態:
JobStateNone
,表明 DDL 任務還在處理隊列,TiDB 尚未開始作這個 DDL。
JobStateRunning
,當 DDL Owner 開始處理這個任務的時候,會把狀態設置爲 JobStateRunning
,以後 DDL 會開始變動,TiDB 的 Schema 可能會涉及多個狀態的變動,這中間不會改變 DDL job 的狀態,只會變動 Schema 的狀態。
JobStateDone
, 當 TiDB 完成本身全部的 Schema 狀態變動以後,會把 Job 的狀態改成 Done。
JobStateSynced
,當 TiDB 每作一次 schema 狀態變動,就會須要跟集羣中的其餘 TiDB 作一次同步,可是當 Job 狀態爲 JobStateDone
以後,在 TiDB 等到全部的 TiDB 節點同步以後,會將狀態修改成 JobStateSynced
。
JobStateCancelling
,TiDB 提供語法 ADMIN CANCEL DDL JOBS job_ids
用於取消某個正在執行或者還未執行的 DDL 任務,當成功執行這個命令以後,DDL 任務的狀態會變爲 JobStateCancelling
。
JobStateRollingback
,當 DDL Owner 發現 Job 的狀態變爲 JobStateCancelling
以後,它會將 job 的狀態改變爲 JobStateRollingback
,以示已經開始處理 cancel 請求。
JobStateRollbackDone
,在作 cancel 的過程,也會涉及 Schema 狀態的變動,也須要經歷 Schema 的同步,等到狀態回滾已經作完了,TiDB 會將 Job 的狀態設置爲 JobStateRollbackDone
。
對於 Binlog 而言,DDL 的 Binlog 輸出機制,跟 DML 語句也是相似的,只有開始處理事務提交階段,纔會開始寫 Binlog 出去。那麼對於 DDL 來講,跟 DML 不同,DML 有事務的概念,對於 DDL 來講,SQL 的事務是不影響 DDL 語句的。可是 DDL 裏面,上面提到的 Job 的狀態變動,是做爲一個事務來提交的(保證狀態一致性)。因此在每一個狀態變動,都會有一個事務與之對應,可是上面提到的中間狀態,DDL 並不會往外寫 Binlog,只有 JobStateRollbackDone
以及 JobStateDone
這兩種狀態,TiDB 會認爲 DDL 語句已經完成,會對外發送 Binlog,發送以前,會把 Job 的狀態從 JobStateDone
修改成 JobStateSynced
,此次修改,也涉及一次事務提交。這塊邏輯的代碼以下:
worker.handleDDLJobQueue(): if job.IsDone() || job.IsRollbackDone() { binloginfo.SetDDLBinlog(d.binlogCli, txn, job.ID, job.Query) if !job.IsRollbackDone() { job.State = model.JobStateSynced } err = w.finishDDLJob(t, job) return errors.Trace(err) } type Binlog struct { DdlQuery []byte DdlJobId int64 }
DdlQuery
會設置爲原始的 DDL 語句,DdlJobId
會設置爲 DDL 的任務 ID。
對於最後一次 Job 狀態的提交,會有兩條 Binlog 與之對應,這裏有幾種狀況:
若是事務提交成功,類型分別爲 binlog.BinlogType_Prewrite
和 binlog.BinlogType_Commit
。
若是事務提交失敗,類型分別爲 binlog.BinlogType_Prewrite
和 binlog.BinlogType_Rollback
。
因此,Pumps 收到的 DDL Binlog,若是類型爲 binlog.BinlogType_Rollback
應該只認爲以下狀態是合法的:
JobStateDone
(由於修改成 JobStateSynced
還未成功)
JobStateRollbackDone
若是類型爲 binlog.BinlogType_Commit
,應該只認爲以下狀態是合法的:
JobStateSynced
JobStateRollbackDone
當 TiDB 在提交最後一個 Job 狀態的時候,若是事務提交失敗了,那麼 TiDB Owner 會嘗試繼續修改這個 Job,直到成功。也就是對於同一個 DdlJobId
,後續還可能會有屢次 Binlog,直到出現 binlog.BinlogType_Commit
。