做者:周振靖bootstrap
以前的 TiKV 源碼解析系列文章介紹了 TiKV 依賴的周邊庫,從本篇文章開始,咱們將開始介紹 TiKV 自身的代碼。本文重點介紹 TiKV 最外面的一層——Service 層。網絡
TiKV 的 Service 層的代碼位於 src/server
文件夾下,其職責包括提供 RPC 服務、將 store id 解析成地址、TiKV 之間的相互通訊等。這一部分的代碼並非特別複雜。本篇將會簡要地介紹 Service 層的總體結構和組成 Service 層的各個組件。閉包
位於 src/server/server.rs
文件中的 Server
是咱們本次介紹的 Service 層的主體。它封裝了 TiKV 在網絡上提供服務和 Raft group 成員之間相互通訊的邏輯。Server
自己的代碼比較簡短,大部分代碼都被分離到 RaftClient
,Transport
,SnapRunner
和幾個 gRPC service 中。上述組件的層次關係以下圖所示:框架
接下來,咱們將詳細介紹這些組件。異步
在一個集羣中,每一個 TiKV 實例都由一個惟一的 store id 進行標識。Resolver 的功能是將 store id 解析成 TiKV 的地址和端口,用於創建網絡通訊。async
Resolver 是一個很簡單的組件,其接口僅包含一個函數:函數
pub trait StoreAddrResolver: Send + Clone { fn resolve(&self, store_id: u64, cb: Callback) -> Result<()>; }
其中 Callback
用於異步地返回結果。PdStoreAddrResolver
實現了該 trait,它的 resolve
方法的實現則是簡單地將查詢任務經過其 sched
成員發送給 Runner
。而 Runner
則實現了 Runnable<Task>
,其意義是 Runner
能夠在本身的一個線程裏運行,外界將會向 Runner
發送 Task
類型的消息,Runner
將對收到的 Task
進行處理。 這裏使用了由 TiKV 的 util 提供的一個單線程 worker 框架,在 TiKV 的不少處代碼中都有應用。Runner
的 store_addrs
字段是個 cache,它在執行任務時首先嚐試在這個 cache 中找,找不到則向 PD 發送 RPC 請求來進行查詢,並將查詢結果添加進 cache 裏。性能
TiKV 是一個 Multi Raft 的結構,Region 的副本之間,即 Raft group 的成員之間須要相互通訊,RaftClient
的做用即是管理 TiKV 之間的鏈接,並用於向其它 TiKV 節點發送 Raft 消息。RaftClient
能夠和另外一個節點創建多個鏈接,並把不一樣 Region 的請求均攤到這些鏈接上。這部分代碼的主要的複雜性就在於鏈接的創建,也就是 Conn::new
這個函數。創建鏈接的代碼的關鍵部分以下:測試
let client1 = TikvClient::new(channel); let (tx, rx) = batch::unbounded::<RaftMessage>(RAFT_MSG_NOTIFY_SIZE); let rx = batch::BatchReceiver::new(rx, RAFT_MSG_MAX_BATCH_SIZE, Vec::new, |v, e| v.push(e)); let rx1 = Arc::new(Mutex::new(rx)); let (batch_sink, batch_receiver) = client1.batch_raft().unwrap(); let batch_send_or_fallback = batch_sink .send_all(Reusable(rx1).map(move |v| { let mut batch_msgs = BatchRaftMessage::new(); batch_msgs.set_msgs(RepeatedField::from(v)); (batch_msgs, WriteFlags::default().buffer_hint(false)) })).then(/*...*/); client1.spawn(batch_send_or_fallback.map_err(/*...*/));
上述代碼向指定地址調用了 batch_raft
這個 gRPC 接口。batch_raft
和 raft
都是 stream 接口。對 RaftClient
調用 send
方法會將消息發送到對應的 Conn
的 stream
成員,即上述代碼的 tx
中,而在 gRPC 的線程中則會從 rx
中取出這些消息(這些消息被 BatchReceiver
這一層 batch 起來以提高性能),並經過網絡發送出去。優化
若是對方不支持 batch,則會 fallback 到 raft
接口。這種狀況一般僅在從舊版本升級的過程當中發生。
RaftStoreRouter
負責將收到的 Raft 消息轉發給 raftstore 中對應的 Region,而 Transport
負責將 Raft 消息發送到指定的 store。
ServerRaftStoreRouter
是在 TiKV 實際運行時將會使用的 RaftStoreRouter
的實現,它包含一個內層的、由 raftstore 提供的 RaftRouter
對象和一個 LocalReader
對象。收到的請求若是是一個只讀的請求,則會由 LocalReader
處理;其它狀況則是交給內層的 router 來處理。
ServerTransport
則是 TiKV 實際運行時使用的 Transport
的實現(Transport
trait 的定義在 raftstore 中),其內部包含一個 RaftClient
用於進行 RPC 通訊。發送消息時,ServerTransport
經過上面說到的 Resolver 將消息中的 store id 解析爲地址,並將解析的結果存入 raft_client.addrs
中;下次向同一個 store 發送消息時便再也不須要再次解析。接下來,再經過 RaftClient
進行 RPC 請求,將消息發送出去。
Node
能夠認爲是將 raftstore 的複雜的建立、啓動和中止邏輯進行封裝的一層,其內部的 RaftBatchSystem
即是 raftstore 的核心。在啓動過程當中(即 Node
的 start
函數中),若是該節點是一個新建的節點,那麼會進行 bootstrap 的過程,包括分配 store id、分配第一個 Region 等操做。
Node
並無直接包含在 Server
以內,可是 raftstore 的運行須要有用於向其它 TiKV 發送消息的 Transport
,而 Transport
做爲提供網絡通訊功能的一部分,則是包含在 Server
內。因此咱們能夠看到,在 src/binutil/server.rs
文件的 run_raft_server
中(被 tikv-server 的 main
函數調用),啓動過程當中須要先建立 Server
,而後建立並啓動 Node
並把 Server
所建立的 Transport
傳給 Node
,最後再啓動 Node
。
TiKV 包含多個 gRPC service。其中,最重要的一個是 KvService
,位於 src/server/service/kv.rs
文件中。
KvService
定義了 TiKV 的 kv_get
,kv_scan
,kv_prewrite
,kv_commit
等事務操做的 API,用於執行 TiDB 下推下來的複雜查詢和計算的 coprocessor
API,以及 raw_get
,raw_put
等 Raw KV API。batch_commands
接口則是用於將上述的接口 batch 起來,以優化高吞吐量的場景。當咱們要爲 TiKV 添加一個新的 API 時,首先就要在 kvproto 項目中添加相關消息體的定義,並在這裏添加相關代碼。另外,TiKV 的 Raft group 各成員之間通訊用到的 raft
和 batch_raft
接口也是在這裏提供的。
下面以 kv_prewrite
爲例,介紹 TiKV 處理一個請求的流程。首先,不管是直接調用仍是經過 batch_commands
接口調用,都會調用 future_prewrite
函數,並在該函數返回的 future 附加上根據結果發送響應的操做,再將獲得的 future spawn 到 RpcContext
,也就是一個線程池裏。future_prewrite
的邏輯以下:
// 從請求體中取出調用 prewrite 所需的參數 let (cb, f) = paired_future_callback(); let res = storage.async_prewrite(/*其它參數*/, cb); AndThenWith::new(res, f.map_err(Error::from)).map(|v| { let mut resp = PrewriteResponse::new(); if let Some(err) = extract_region_error(&v) { resp.set_region_error(err); } else { resp.set_errors(RepeatedField::from_vec(extract_key_errors(v))); } resp })
這裏的 paired_future_callback
是一個 util 函數,它返回一個閉包 cb
和一個 future f
,當 cb
被調用時 f
就會返回被傳入 cb
的值。上述代碼會馬上返回,但 future 中的邏輯在 async_prewrite
中的異步操做完成以後纔會執行。一旦 prewrite 操做完成,cb
便會被調用,將結果傳給 f
,接下來,咱們寫在 future
中的建立和發送 Response 的邏輯便會繼續執行。
以上就是 TiKV 的 Service 層的代碼解析。你們能夠看到這些代碼大量使用 trait 和泛型,這是爲了方便將其中一些組件替換成另一些實現,方便編寫測試代碼。另外,在 src/server/snap.rs
中,咱們還有一個專門用於處理 Snapshot 的模塊,因爲 Snapshot 消息的特殊性,在其它模塊中也有一些針對 snapshot 的代碼。關於 Snapshot,咱們將在另外一篇文章裏進行詳細講解,敬請期待。
原文閱讀:https://www.pingcap.com/blog-cn/tikv-source-code-reading-9/