本系列文章主要面向 TiKV 社區開發者,重點介紹 TiKV 的系統架構,源碼結構,流程解析。目的是使得開發者閱讀以後,能對 TiKV 項目有一個初步瞭解,更好的參與進入 TiKV 的開發中。git
TiKV 是一個分佈式的 KV 系統,它採用 Raft 協議保證數據的強一致性,同時使用 MVCC + 2PC 的方式實現了分佈式事務的支持。github
本文爲本系列文章第三節。緩存
Placement Driver (後續以 PD 簡稱) 是 TiDB 裏面全局中心總控節點,它負責整個集羣的調度,負責全局 ID 的生成,以及全局時間戳 TSO 的生成等。PD 還保存着整個集羣 TiKV 的元信息,負責給 client 提供路由功能。安全
做爲中心總控節點,PD 經過集成 etcd ,自動的支持 auto failover,無需擔憂單點故障問題。同時,PD 也經過 etcd 的 raft,保證了數據的強一致性,不用擔憂數據丟失的問題。架構
在架構上面,PD 全部的數據都是經過 TiKV 主動上報獲知的。同時,PD 對整個 TiKV 集羣的調度等操做,也只會在 TiKV 發送 heartbeat 命令的結果裏面返回相關的命令,讓 TiKV 自行去處理,而不是主動去給 TiKV 發命令。這樣設計上面就很是簡單,咱們徹底能夠認爲 PD 是一個無狀態的服務(固然,PD 仍然會將一些信息持久化到 etcd),全部的操做都是被動觸發,即便 PD 掛掉,新選出的 PD leader 也能馬上對外服務,無需考慮任何以前的中間狀態。app
PD 集成了 etcd,因此一般,咱們須要啓動至少三個副本,才能保證數據的安全。現階段 PD 有集羣啓動方式,initial-cluster
的靜態方式以及 join
的動態方式。less
在繼續以前,咱們須要瞭解下 etcd 的端口,在 etcd 裏面,默認要監聽 2379 和 2380 兩個端口。2379 主要是 etcd 用來處理外部請求用的,而 2380 則是 etcd peer 之間相互通訊用的。分佈式
假設如今咱們有三個 pd,分別爲 pd1,pd2,pd3,分別在 host1,host2,host3 上面。ide
對於靜態初始化,咱們直接在三個 PD 啓動的時候,給 initial-cluster
設置 pd1=http://host1:2380,pd2=http://host2:2380,pd3=http://host3:2380
。函數
對於動態初始化,咱們先啓動 pd1,而後啓動 pd2,加入到 pd1 的集羣裏面,join
設置爲 http://host1:2379
。而後啓動 pd3,加入到 pd1,pd2 造成的集羣裏面, join
設置爲 http://host1:2379
。
能夠看到,靜態初始化和動態初始化徹底走的是兩個端口,並且這兩個是互斥的,也就是咱們只能使用一種方式來初始化集羣。etcd 自己只支持 initial-cluster
的方式,但爲了方便,PD 同時也提供了 join
的方式。
join
主要是用了 etcd 自身提供的 member 相關 API,包括 add member,list member 等,因此咱們使用 2379 端口,由於須要將命令發到 etcd 去執行。而 initial-cluster
則是 etcd 自身的初始化方式,因此使用的 2380 端口。
相比於 initial-cluster
,join
須要考慮很是多的 case(在 server/join.go
prepareJoinCluster
函數裏面有詳細的解釋),但 join
的使用很是天然,後續咱們會考慮去掉 initial-cluster
的初始化方案。
當 PD 啓動以後,咱們就須要選出一個 leader 對外提供服務。雖然 etcd 自身也有 raft leader,但咱們仍是以爲使用本身的 leader,也就是 PD 的 leader 跟 etcd 本身的 leader 是不同的。
當 PD 啓動以後,Leader 的選舉以下:
檢查當前集羣是否是有 leader,若是有 leader,就 watch 這個 leader,只要發現 leader 掉了,就從新開始 1。
若是沒有 leader,開始 campaign,建立一個 Lessor,而且經過 etcd 的事務機制寫入相關信息,以下:
// Create a lessor. ctx, cancel := context.WithTimeout(s.client.Ctx(), requestTimeout) leaseResp, err := lessor.Grant(ctx, s.cfg.LeaderLease) cancel() // The leader key must not exist, so the CreateRevision is 0. resp, err := s.txn(). If(clientv3.Compare(clientv3.CreateRevision(leaderKey), "=", 0)). Then(clientv3.OpPut(leaderKey, s.leaderValue, clientv3.WithLease(clientv3.LeaseID(leaseResp.ID)))). Commit()
若是 leader key 的 CreateRevision 爲 0,代表其餘 PD 尚未寫入,那麼我就能夠將我本身的 leader 相關信息寫入,同時會帶上一個 Lease。若是事務執行失敗,代表其餘的 PD 已經成爲了 leader,那麼就從新回到 1。
成爲 leader 以後,咱們對按期進行保活處理:
// Make the leader keepalived. ch, err := lessor.KeepAlive(s.client.Ctx(), clientv3.LeaseID(leaseResp.ID)) if err != nil { return errors.Trace(err) }
當 PD 崩潰,原先寫入的 leader key 會由於 lease 到期而自動刪除,這樣其餘的 PD 就能 watch 到,從新開始選舉。
初始化 raft cluster,主要是從 etcd 裏面從新載入集羣的元信息。拿到最新的 TSO 信息:
// Try to create raft cluster. err = s.createRaftCluster() if err != nil { return errors.Trace(err) } log.Debug("sync timestamp for tso") if err = s.syncTimestamp(); err != nil { return errors.Trace(err) }
全部作完以後,開始按期更新 TSO,監聽 lessor 是否過時,以及外面是否主動退出:
for { select { case _, ok := <-ch: if !ok { log.Info("keep alive channel is closed") return nil } case <-tsTicker.C: if err = s.updateTimestamp(); err != nil { return errors.Trace(err) } case <-s.client.Ctx().Done(): return errors.New("server closed") } }
前面咱們說到了 TSO,TSO 是一個全局的時間戳,它是 TiDB 實現分佈式事務的基石。因此對於 PD 來講,咱們首先要保證它能快速大量的爲事務分配 TSO,同時也須要保證分配的 TSO 必定是單調遞增的,不可能出現回退的狀況。
TSO 是一個 int64 的整形,它由 physical time + logical time 兩個部分組成。Physical time 是當前 unix time 的毫秒時間,而 logical time 則是一個最大 1 << 18
的計數器。也就是說 1ms,PD 最多能夠分配 262144 個 TSO,這個能知足絕大多數狀況了。
對於 TSO 的保存於分配,PD 會作以下處理:
當 PD 成爲 leader 以後,會從 etcd 上面獲取上一次保存的時間,若是發現本地的時間比這個大,則會繼續等待直到當前的時間大於這個值:
last, err := s.loadTimestamp() if err != nil { return errors.Trace(err) } var now time.Time for { now = time.Now() if wait := last.Sub(now) + updateTimestampGuard; wait > 0 { log.Warnf("wait %v to guarantee valid generated timestamp", wait) time.Sleep(wait) continue } break }
當 PD 能分配 TSO 以後,首先會向 etcd 申請一個最大的時間,譬如,假設當前時間是 t1,每次最多能申請 3s 的時間窗口,PD 會向 etcd 保存 t1 + 3s 的時間值,而後 PD 就能在內存裏面直接使用這一段時間窗口.噹噹前的時間 t2 大於 t1 + 3s 以後,PD 就會在向 etcd 繼續更新爲 t2 + 3s:
if now.Sub(s.lastSavedTime) >= 0 { last := s.lastSavedTime save := now.Add(s.cfg.TsoSaveInterval.Duration) if err := s.saveTimestamp(save); err != nil { return errors.Trace(err) } }
這麼處理的好處在於,即便 PD 當掉,新啓動的 PD 也會從上一次保存的最大的時間以後開始分配 TSO,也就是 1 處理的狀況。
由於 PD 在內存裏面保存了一個可分配的時間窗口,因此外面請求 TSO 的時候,PD 能直接在內存裏面計算 TSO 並返回。
resp := pdpb.Timestamp{} for i := 0; i < maxRetryCount; i++ { current, ok := s.ts.Load().(*atomicObject) if !ok { log.Errorf("we haven't synced timestamp ok, wait and retry, retry count %d", i) time.Sleep(200 * time.Millisecond) continue } resp.Physical = current.physical.UnixNano() / int64(time.Millisecond) resp.Logical = atomic.AddInt64(¤t.logical, int64(count)) if resp.Logical >= maxLogical { log.Errorf("logical part outside of max logical interval %v, please check ntp time, retry count %d", resp, i) time.Sleep(updateTimestampStep) continue } return resp, nil }
由於是在內存裏面計算的,因此性能很高,咱們本身內部測試每秒能分配百萬級別的 TSO。
若是 client 每次事務都向 PD 來請求一次 TSO,每次 RPC 的開銷也是很是大的,因此 client 會批量的向 PD 獲取 TSO。client 會首先收集一批事務的 TSO 請求,譬如 n 個,而後直接向 PD 發送命令,參數就是 n,PD 收到命令以後,會生成 n 個 TSO 返回給客戶端。
在最開始咱們說過,PD 全部關於集羣的數據都是由 TiKV 主動心跳上報的,PD 對 TiKV 的調度也是在心跳的時候完成的。一般 PD 會處理兩種心跳,一個是 TiKV 自身 store 的心跳,而另外一個則是 store 裏面 region 的 leader peer 上報的心跳。
對於 store 的心跳,PD 在 handleStoreHeartbeat
函數裏面處理,主要就是將心跳裏面當前的 store 的一些狀態緩存到 cache 裏面。store 的狀態包括該 store 有多少個 region,有多少個 region 的 leader peer 在該 store 上面等,這些信息都會用於後續的調度。
對於 region 的心跳,PD 在 handleRegionHeartbeat
裏面處理。這裏須要注意,只有 leader peer 纔會去上報所屬 region 的信息,follower peer 是不會上報的。收到 region 的心跳以後,首先 PD 也會將其放入 cache 裏面,若是 PD 發現 region 的 epoch 有變化,就會將這個 region 的信息也保存到 etcd 裏面。而後,PD 會對這個 region 進行具體的調度,譬如發現 peer 數目不夠,添加新的 peer,或者有一個 peer 已經壞了,刪除這個 peer 等,詳細的調度實現,咱們會在後續討論。
這裏再說一下 region 的 epoch,在 region 的 epoch 裏面,有 conf_ver
和 version
,分別表示這個 region 不一樣的版本狀態。若是一個 region 發生了 membership changes,也就是新增或者刪除了 peer,conf_ver
會加 1,若是 region 發生了 split
或者 merge
,則 version
加 1。
不管是 PD 仍是在 TiKV,咱們都是經過 epoch 來判斷 region 是否發生了變化,從而拒絕掉一些危險的操做。譬如 region 已經發生了分裂,version
變成了 2,那麼若是這時候有一個寫請求帶上的 version
是 1, 咱們就會認爲這個請求是 stale,會直接拒絕掉。由於 version
變化代表 region 的範圍已經發生了變化,頗有可能這個 stale 的請求須要操做的 key 是在以前的 region range 裏面而沒在新的 range 裏面。
前面咱們說了,PD 會在 region 的 heartbeat 裏面對 region 進行調度,而後直接在 heartbeat 的返回值裏面帶上相關的調度信息,讓 TiKV 本身去處理,TiKV 處理完成以後,經過下一個 heartbeat 從新上報,PD 就能知道是否調度成功了。
對於 membership changes,比較容易,由於咱們有最大副本數的配置,假設三個,那麼當 region 的心跳上來,發現只有兩個 peer,那麼就 add peer,若是有四個 peer,就 remove peer。而對於 region 的 split / merge,則狀況稍微要複雜一點,但也比較簡單。注意,現階段,咱們只支持 split,merge 處於開發階段,沒對外發布,因此這裏僅僅以 split 舉例:
在 TiKV 裏面,leader peer 會按期檢查 region 所佔用的空間是否超過某一個閥值,假設咱們設置 region 的 size 爲 64MB,若是一個 region 超過了 96MB, 就須要分裂。
Leader peer 會首先向 PD 發送一個請求分裂的命令,PD 在 handleAskSplit
裏面處理,由於咱們是一個 region 分裂成兩個,對於這兩個新分裂的 region,一個會繼承以前 region 的全部的元信息,而另外一個相關的信息,譬如 region ID,新的 peer ID,則須要 PD 生成,並將其返回給 leader。
Leader peer 寫入一個 split raft log,在 apply 的時候執行,這樣 region 就分裂成了兩個。
分裂成功以後,TiKV 告訴 PD,PD 就在 handleReportSplit
裏面處理,更新 cache 相關的信息,並持久化到 etcd。
由於 PD 保存了全部 TiKV 的集羣信息,天然對 client 提供了路由的功能。假設 client 要對 key
寫入一個值。
client 先從 PD 獲取 key
屬於哪個 region,PD 將這個 region 相關的元信息返回。
client 本身 cache,這樣就不須要每次都從 PD 獲取。而後直接給 region 的 leader peer 發送命令。
有可能 region 的 leader 已經漂移到其餘 peer,TiKV 會返回 NotLeader
錯誤,並帶上新的 leader 的地址,client 在 cache 裏面更新,並從新向新的 leader 發送請求。
也有可能 region 的 version 已經變化,譬如 split 了,這時候,key
可能已經落入了新的 region 上面,client 會收到 StaleCommand
的錯誤,因而從新從 PD 獲取,進入狀態 1。
PD 做爲 TiDB 集羣的中心調度模塊,在設計上面,咱們儘可能保證無狀態,方便擴展。本篇文章主要介紹了 PD 是如何跟 TiKV,TiDB 協做交互的。後面,咱們會詳細地介紹核心調度功能,也就是 PD 是如何控制整個集羣的。