做者:趙一霖git
在 上篇文章 中,咱們主要介紹了 Pump Server 的上線過程、gRPC API 實現、以及下線過程和相關輔助機制,其中反覆提到了 Pump Storage 這個實體。本文就將介紹 Pump Storage 的實現,其主要代碼在 pump/storage 文件夾中。github
Pump Storage 由 Pump Server 調用,主要負責 binlog 的持久化存儲,同時兼顧排序、配對等功能,下面咱們由 Storage 接口開始瞭解 Pump Storage 的實現。算法
Storage 接口 定義了 Pump Storage 對外暴露的操做,其中比較重要的是 WriteBinlog
、GC
和 PullCommitBinlog
函數,咱們將在下文具體介紹。Storage 的接口定義以下:數據庫
type Storage interface { // WriteBinlog 寫入 binlog 數據到 Storage WriteBinlog(binlog *pb.Binlog) error // GC 清理 tso 小於指定 ts 的 binlog GC(ts int64) // GetGCTS 返回最近一次觸發 GC 指定的 ts GetGCTS() int64 // AllMatched 返回是否全部的 P-binlog 都和 C-binlog 匹配 AllMatched() bool // MaxCommitTS 返回最大的 CommitTS,在這個 TS 以前的數據已經完備,能夠安全的同步給下游 MaxCommitTS() int64 // GetBinlog 指定 ts 返回 binlog GetBinlog(ts int64) (binlog *pb.Binlog, err error) // PullCommitBinlog 按序拉 commitTs > last 的 binlog PullCommitBinlog(ctx context.Context, last int64) <-chan []byte // Close 安全的關閉 Storage Close() error }
Append 是創建在文件系統接口上的持久化的 Storage 接口實現。在這個實現中,binlog 數據被追加寫入 Valuelog,所以咱們將這個實現命名爲 Append。因爲一條 binlog 可能會很大,爲了提升性能,咱們採用 Key 和 Value 分離的設計。使用 goleveldb 存儲 Key(binlog 的 Timestamp),並針對 Pump 的讀寫特色設計了用於存儲 binlog 數據的 Valuelog 組件。安全
Append 的初始化操做是在 NewAppendWithResolver
函數中實現的,首先初始化 Valuelog、goleveldb 等組件,而後啓動處理寫入 binlog、GC、狀態維護等幾個 goroutine。數據結構
WriteBinlog
由 Pump Server 調用,用於寫入 binlog 到本地的持久化存儲中。在 Append 實現的 WirteBinlog
函數中,binlog 在編碼後被傳入到 Append.writeCh
Channel 由專門的 goroutine 處理:架構
toKV := append.writeToValueLog(writeCh) go append.writeToSorter(append.writeToKV(toKV))
一條 binlog 被傳入 Append.writeCh
後將按以下順序流經數個處理流程:app
<center>圖 1 binlog 傳入 Append.writeCh 的處理流程</center>函數
vlog性能
這個過程的主要實如今 writeToValueLog
中:
// valuePointer 定義 type valuePointer struct { // Fid 是 valuelog 文件 Id Fid uint32 // Offset 是 pointer 指向的 valuelog 在文件中的偏移量 Offset int64 }
Append 將從 Append.writeCh
讀出的 binlog,批量寫入到 ValueLog 組件中。咱們能夠將 ValueLog 組件看做一種由 valuePointer
映射到 binlog 的持久化鍵值存儲實現,咱們將在下一篇文章詳細介紹 ValueLog 組件。
這個過程的主要實如今 writeBatchToKV
中,Append 將 binlog 的 tso 做爲 Key, valuePointer
做爲 Value 批量寫入 Metadata 存儲中,在目前的 Pump 實現中,咱們採用 goleveldb 做爲 Metadata 存儲數據庫。因爲 goleveldb 的底層是數據結構是 LSM-Tree,存儲在 Metadata 存儲的 binlog 相關信息已經自然按 tso 排好序了。
sorter
既然 binlog 的元數據在 writeToKV 過程已經排好序了,爲何還須要 writeToSorter
呢?這裏和《TiDB-Binlog 架構演進與實現原理》一文提到的 Binlog 工做原理有關:
TiDB 的事務採用 2pc 算法,一個成功的事務會寫兩條 binlog,包括一條 Prewrite binlog 和一條 Commit binlog;若是事務失敗,會發一條 Rollback binlog。
要完整的還原事務,咱們須要對 Prewrite binlog 和 Commit binlog(下文簡稱 P-binlog 和 C-binlog) 配對,才能知曉某一個事務是否被 Commit 成功了。Sorter 就起這樣的做用,這個過程的主要實如今 sorter.run 中。Sorter 逐條讀出 binlog,對於 P-binlog 則暫時存放在內存中等待配對,對於 C-binlog 則與內存中未配對的 P-binlog 進行匹配。若是某一條 P-binlog 長期沒有 C-binlog 與之牽手,Sorter 將反查 TiKV 問問這條單身狗 P-binlog 的伴侶是否是迷路了。
爲何會有 C-binlog 迷路呢?要解釋這個現象,咱們首先要回顧一下 binlog 的寫入流程:
<center>圖 2 binlog 寫入流程</center>
在 Prepare 階段,TiDB 同時向 TiKV 和 Pump 發起 prewrite 請求,只有 TiKV 和 Pump 所有返回成功了,TiDB 才認爲 Prepare 成功。所以能夠保證只要 Prepare 階段成功,Pump 就必定能收到 P-binlog。這裏能夠這樣作的緣由是,TiKV 和 Pump 的 prewrite 均可以回滾,所以有任一節點 prewrite 失敗後,TiDB 能夠回滾其餘節點,不會影響數據一致性。然而 Commit 階段則否則,Commit 是沒法回滾的操做,所以 TiDB 先 Commit TiKV,成功後再向 Pump 寫入 C-binlog。而 TiKV Commit 後,這個事務就已經提交成功了,若是寫 C-binlog 操做失敗,則會產生事務提交成功但 Pump 未收到 C-binlog 的現象。在生產環境中,C-binlog 寫失敗大可能是因爲重啓 TiDB 致使的,這自己屬於一個可控事件或小几率事件。
PullCommitBinlog 顧名思義,是用於拉 Commit binlog 的接口,其實現主要在 PullCommitBinlog
函數中。這個過程實現上比較簡單,Append 將從客戶端指定的 tso 開始 Scan Metadata,Scan 過程當中只關注 C-binlog,發現 C-binlog 時根據 StartTs 再反查與它牽手的 P-binlog。這樣咱們從這個接口拉到的就都是 Commit 成功的 binlog 了。
GC 是老生常談,必不可少的機制。Pump Storage 數據在本地存儲的體積隨時間而增大,咱們須要某種 GC 機制來釋放存儲資源。對垃圾數據的斷定有兩條規則:1.該條 binlog 已經同步到下游;2.該條 binlog 的 tso 距如今已經超過一段時間(該值即配置項:gc
)。
注:因爲生產環境中發現用戶有時會關閉了 drainer 卻沒有使用 binlogctl 將相應 drainer 節點標記爲 offline,致使 Pump Storage 的數據一直在膨脹,不能 GC。所以在 v3.0.一、v2.1.15 後不管 Binlog 是否已經同步到下游,都會正常進入 GC 流程。
GC 實如今 doGCTS 中,GC 過程分別針對 Metadata 和 Valuelog 兩類存儲。
對於 Metadata,咱們 Scan [0,GcTso]
這個範圍內的 Metadata,每 1024 個 KVS 做爲一批次進行刪除:
for iter.Next() && deleteBatch < 100 { batch.Delete(iter.Key()) deleteNum++ lastKey = iter.Key() if batch.Len() == 1024 { err := a.metadata.Write(batch, nil) if err != nil { log.Error("write batch failed", zap.Error(err)) } deletedKv.Add(float64(batch.Len())) batch.Reset() deleteBatch++ } }
在實際的生產環境中,咱們發現,若是不對 GC 限速,GC 線程將頻繁的觸發底層 goleveldb 的 compaction 操做,嚴重時甚至會引發 WritePaused,影響 Binlog 的正常寫入,這是不能接受的。所以,咱們經過 l0
文件的數量判斷當前底層 goleveldb 的寫入壓力,當 l0
文件數量超過必定閾值,咱們將暫停 GC 過程:
if l0Num >= l0Trigger { log.Info("wait some time to gc cause too many L0 file", zap.Int("files", l0Num)) if iter != nil { iter.Release() iter = nil } time.Sleep(5 * time.Second) continue }
對於 Valuelog,GC 每刪除 100 批 KVS(即 102400 個 KVS)觸發一次 Valuelog 的 GC,Valuelog GC 最終反應到文件系統上刪除文件,所以開銷比較小。
在示例代碼的
doGCTS
函數中存在一個 Bug,你發現了麼?歡迎留言搶答。
本文介紹了 Pump Storage 的初始化過程和主要功能的實現,但願能幫助你們在閱讀代碼的時候梳理重點、理清思路。下一篇文章將會介紹上文說起的 Valuelog 和 SlowChaser 等輔助機制。
原文閱讀:https://pingcap.com/blog-cn/tidb-binlog-source-code-reading-5/