TiKV 源碼解析系列文章(九)Service 層處理流程解析

做者:周振靖bootstrap

以前的 TiKV 源碼解析系列文章介紹了 TiKV 依賴的周邊庫,從本篇文章開始,咱們將開始介紹 TiKV 自身的代碼。本文重點介紹 TiKV 最外面的一層——Service 層。網絡

TiKV 的 Service 層的代碼位於 src/server 文件夾下,其職責包括提供 RPC 服務、將 store id 解析成地址、TiKV 之間的相互通訊等。這一部分的代碼並非特別複雜。本篇將會簡要地介紹 Service 層的總體結構和組成 Service 層的各個組件。閉包

總體結構

位於 src/server/server.rs 文件中的 Server 是咱們本次介紹的 Service 層的主體。它封裝了 TiKV 在網絡上提供服務和 Raft group 成員之間相互通訊的邏輯。Server 自己的代碼比較簡短,大部分代碼都被分離到 RaftClientTransportSnapRunner 和幾個 gRPC service 中。上述組件的層次關係以下圖所示:框架

接下來,咱們將詳細介紹這些組件。異步

Resolver

在一個集羣中,每一個 TiKV 實例都由一個惟一的 store id 進行標識。Resolver 的功能是將 store id 解析成 TiKV 的地址和端口,用於創建網絡通訊。async

Resolver 是一個很簡單的組件,其接口僅包含一個函數:函數

pub trait StoreAddrResolver: Send + Clone {
   fn resolve(&self, store_id: u64, cb: Callback) -> Result<()>;
}

其中 Callback 用於異步地返回結果。PdStoreAddrResolver 實現了該 trait,它的 resolve 方法的實現則是簡單地將查詢任務經過其 sched 成員發送給 Runner。而 Runner 則實現了 Runnable<Task>,其意義是 Runner 能夠在本身的一個線程裏運行,外界將會向 Runner 發送 Task 類型的消息,Runner 將對收到的 Task 進行處理。 這裏使用了由 TiKV 的 util 提供的一個單線程 worker 框架,在 TiKV 的不少處代碼中都有應用。Runnerstore_addrs 字段是個 cache,它在執行任務時首先嚐試在這個 cache 中找,找不到則向 PD 發送 RPC 請求來進行查詢,並將查詢結果添加進 cache 裏。性能

RaftClient

TiKV 是一個 Multi Raft 的結構,Region 的副本之間,即 Raft group 的成員之間須要相互通訊,RaftClient 的做用即是管理 TiKV 之間的鏈接,並用於向其它 TiKV 節點發送 Raft 消息。RaftClient 能夠和另外一個節點創建多個鏈接,並把不一樣 Region 的請求均攤到這些鏈接上。這部分代碼的主要的複雜性就在於鏈接的創建,也就是 Conn::new 這個函數。創建鏈接的代碼的關鍵部分以下:測試

let client1 = TikvClient::new(channel);

let (tx, rx) = batch::unbounded::<RaftMessage>(RAFT_MSG_NOTIFY_SIZE);
let rx = batch::BatchReceiver::new(rx, RAFT_MSG_MAX_BATCH_SIZE, Vec::new, |v, e| v.push(e));
let rx1 = Arc::new(Mutex::new(rx));

let (batch_sink, batch_receiver) = client1.batch_raft().unwrap();
let batch_send_or_fallback = batch_sink
   .send_all(Reusable(rx1).map(move |v| {
       let mut batch_msgs = BatchRaftMessage::new();
       batch_msgs.set_msgs(RepeatedField::from(v));
       (batch_msgs, WriteFlags::default().buffer_hint(false))
   })).then(/*...*/);

client1.spawn(batch_send_or_fallback.map_err(/*...*/));

上述代碼向指定地址調用了 batch_raft 這個 gRPC 接口。batch_raftraft 都是 stream 接口。對 RaftClient 調用 send 方法會將消息發送到對應的 Connstream 成員,即上述代碼的 tx 中,而在 gRPC 的線程中則會從 rx 中取出這些消息(這些消息被 BatchReceiver 這一層 batch 起來以提高性能),並經過網絡發送出去。優化

若是對方不支持 batch,則會 fallback 到 raft 接口。這種狀況一般僅在從舊版本升級的過程當中發生。

RaftStoreRouter 與 Transport

RaftStoreRouter 負責將收到的 Raft 消息轉發給 raftstore 中對應的 Region,而 Transport 負責將 Raft 消息發送到指定的 store。

ServerRaftStoreRouter 是在 TiKV 實際運行時將會使用的 RaftStoreRouter 的實現,它包含一個內層的、由 raftstore 提供的 RaftRouter 對象和一個 LocalReader 對象。收到的請求若是是一個只讀的請求,則會由 LocalReader 處理;其它狀況則是交給內層的 router 來處理。

ServerTransport 則是 TiKV 實際運行時使用的 Transport 的實現(Transport trait 的定義在 raftstore 中),其內部包含一個 RaftClient 用於進行 RPC 通訊。發送消息時,ServerTransport 經過上面說到的 Resolver 將消息中的 store id 解析爲地址,並將解析的結果存入 raft_client.addrs 中;下次向同一個 store 發送消息時便再也不須要再次解析。接下來,再經過 RaftClient 進行 RPC 請求,將消息發送出去。

Node

Node 能夠認爲是將 raftstore 的複雜的建立、啓動和中止邏輯進行封裝的一層,其內部的 RaftBatchSystem 即是 raftstore 的核心。在啓動過程當中(即 Nodestart 函數中),若是該節點是一個新建的節點,那麼會進行 bootstrap 的過程,包括分配 store id、分配第一個 Region 等操做。

Node 並無直接包含在 Server 以內,可是 raftstore 的運行須要有用於向其它 TiKV 發送消息的 Transport,而 Transport 做爲提供網絡通訊功能的一部分,則是包含在 Server 內。因此咱們能夠看到,在 src/binutil/server.rs 文件的 run_raft_server 中(被 tikv-server 的 main 函數調用),啓動過程當中須要先建立 Server,而後建立並啓動 Node 並把 Server 所建立的 Transport 傳給 Node,最後再啓動 Node

Service

TiKV 包含多個 gRPC service。其中,最重要的一個是 KvService,位於 src/server/service/kv.rs 文件中。

KvService 定義了 TiKV 的 kv_getkv_scankv_prewritekv_commit 等事務操做的 API,用於執行 TiDB 下推下來的複雜查詢和計算的 coprocessor API,以及 raw_getraw_put 等 Raw KV API。batch_commands 接口則是用於將上述的接口 batch 起來,以優化高吞吐量的場景。當咱們要爲 TiKV 添加一個新的 API 時,首先就要在 kvproto 項目中添加相關消息體的定義,並在這裏添加相關代碼。另外,TiKV 的 Raft group 各成員之間通訊用到的 raftbatch_raft 接口也是在這裏提供的。

下面以 kv_prewrite 爲例,介紹 TiKV 處理一個請求的流程。首先,不管是直接調用仍是經過 batch_commands 接口調用,都會調用 future_prewrite 函數,並在該函數返回的 future 附加上根據結果發送響應的操做,再將獲得的 future spawn 到 RpcContext,也就是一個線程池裏。future_prewrite 的邏輯以下:

// 從請求體中取出調用 prewrite 所需的參數

let (cb, f) = paired_future_callback();
let res = storage.async_prewrite(/*其它參數*/, cb);

AndThenWith::new(res, f.map_err(Error::from)).map(|v| {
   let mut resp = PrewriteResponse::new();
   if let Some(err) = extract_region_error(&v) {
       resp.set_region_error(err);
   } else {
       resp.set_errors(RepeatedField::from_vec(extract_key_errors(v)));
   }
   resp
})

這裏的 paired_future_callback 是一個 util 函數,它返回一個閉包 cb 和一個 future f,當 cb 被調用時 f 就會返回被傳入 cb 的值。上述代碼會馬上返回,但 future 中的邏輯在 async_prewrite 中的異步操做完成以後纔會執行。一旦 prewrite 操做完成,cb 便會被調用,將結果傳給 f,接下來,咱們寫在 future 中的建立和發送 Response 的邏輯便會繼續執行。

總結

以上就是 TiKV 的 Service 層的代碼解析。你們能夠看到這些代碼大量使用 trait 和泛型,這是爲了方便將其中一些組件替換成另一些實現,方便編寫測試代碼。另外,在 src/server/snap.rs 中,咱們還有一個專門用於處理 Snapshot 的模塊,因爲 Snapshot 消息的特殊性,在其它模塊中也有一些針對 snapshot 的代碼。關於 Snapshot,咱們將在另外一篇文章裏進行詳細講解,敬請期待。

原文閱讀https://www.pingcap.com/blog-cn/tikv-source-code-reading-9/

相關文章
相關標籤/搜索