TiKV 源碼解析系列——multi-raft 設計與實現

本系列文章主要面向 TiKV 社區開發者,重點介紹 TiKV 的系統架構,源碼結構,流程解析。目的是使得開發者閱讀以後,能對 TiKV 項目有一個初步瞭解,更好的參與進入 TiKV 的開發中。
須要注意,TiKV 使用 Rust 語言編寫,用戶須要對 Rust 語言有一個大概的瞭解。另外,本系列文章並不會涉及到 TiKV 中心控制服務 Placement Driver(PD) 的詳細介紹,可是會說明一些重要流程 TiKV 是如何與 PD 交互的。
TiKV 是一個分佈式的 KV 系統,它採用 Raft 協議保證數據的強一致性,同時使用 MVCC + 2PC 的方式實現了分佈式事務的支持。
本文爲本系列文章第二節。git

Placement Driver

在繼續以前,咱們先簡單介紹一下 Placement Driver(PD)。PD 是 TiKV 的全局中央控制器,存儲整個 TiKV 集羣的元數據信息,負責整個 TiKV 集羣的調度,全局 ID 的生成,以及全局 TSO 授時等。github

PD 是一個很是重要的中心節點,它經過集成 etcd,自動的支持了分佈式擴展以及 failover,解決了單點故障問題。關於 PD 的詳細介紹,後續咱們會新開一篇文章說明。算法

在 TiKV 裏面,跟 PD 的交互是放在源碼的 pd 目錄下,如今跟 PD 的交互都是經過本身定義的 RPC 實現,協議很是簡單,在 pd/mod.rs 裏面咱們直接提供了用於跟 PD 進行交互的 Client trait,以及實現了 RPC Client。bootstrap

PD 的 Client trait 很是簡單,多數都是對集羣元信息的 set/get 操做,須要額外注意的幾個:緩存

bootstrap_cluster:當咱們啓動一個 TiKV 服務的時候,首先須要經過 is_cluster_bootstrapped 來判斷整個 TiKV 集羣是否已經初始化,若是尚未初始化,咱們就會在該 TiKV 服務上面建立第一個 region。網絡

region_heartbeat:按期 Region 向 PD 彙報本身的相關信息,供 PD 作後續的調度。譬如,若是一個 Region 給 PD 上報的 peers 的數量小於預設的副本數,那麼 PD 就會給這個 Region 添加一個新的副本 Peer。多線程

store_heartbeat:按期 store 向 PD 彙報本身的相關信息,供 PD 作後續調度。譬如,Store 會告訴 PD 當前的磁盤大小,以及剩餘空間,若是 PD 發現空間不夠了,就不會考慮將其餘的 Peer 遷移到這個 Store 上面。架構

ask_split/report_split:當 Region 發現本身須要 split 的時候,就 ask_split 告訴 PD,PD 會生成新分裂 Region 的 ID ,當 Region 分裂成功以後,會 report_split 通知 PD。app

注意,後面咱們會讓 PD 支持 gRPC 協議,因此 Client API 到時候可能會有變動。異步

Raftstore

由於 TiKV 目標是支持 100 TB+ 以上的數據,一個 Raft 集羣是鐵定無法支持這麼多數據的,因此咱們須要使用多個 Raft 集羣,也就是 Multi Raft。在 TiKV 裏面,Multi Raft 的實現是在 Raftstore 完成的,代碼在 raftstore/store 目錄。

Region

由於咱們要支持 Multi Raft,因此咱們須要將數據進行分片處理,讓每一個 Raft 單獨負責一部分數據。

一般的數據分片算法就是 Hash 和 Range,TiKV 使用的 Range 來對數據進行數據分片。爲何使用 Range,主要緣由是能更好的將相同前綴的 key 聚合在一塊兒,便於 scan 等操做,這個 Hash 是無法支持的,固然,在 split/merge 上面 Range 也比 Hash 好處理不少,不少時候只會涉及到元信息的修改,都不用大範圍的挪動數據。

固然,Range 有一個問題在於頗有可能某一個 Region 會由於頻繁的操做成爲性能熱點,固然也有一些優化的方式,譬如經過 PD 將這些 Region 調度到更好的機器上面,提供 Follower 分擔讀壓力等。

總之,在 TiKV 裏面,咱們使用 Range 來對數據進行切分,將其分紅一個一個的 Raft Group,每個 Raft Group,咱們使用 Region 來表示。

Region 的 protobuf 協議定義以下:

message Region {
    optional uint64 id                  = 1 [(gogoproto.nullable) = false];
    optional bytes  start_key           = 2;
    optional bytes  end_key             = 3;
    optional RegionEpoch region_epoch   = 4;
    repeated Peer   peers               = 5;
}

message RegionEpoch {
    optional uint64 conf_ver    = 1 [(gogoproto.nullable) = false];
    optional uint64 version     = 2 [(gogoproto.nullable) = false];
}

message Peer {      
    optional uint64 id          = 1 [(gogoproto.nullable) = false]; 
    optional uint64 store_id    = 2 [(gogoproto.nullable) = false];
}

id:Region 的惟一表示,經過 PD 全局惟一分配。

start_key, end_key:用來表示這個 Region 的範圍 [start_key, end_key),對於最開始的 region,start 和 end key 都是空,TiKV 內部會特殊處理。

region_epoch:當一個 Region 添加或者刪除 Peer,或者 split 等,咱們就會認爲這個 Region 的 epoch 發生的變化,RegionEpoch 的 conf_ver 會在每次作 ConfChange 的時候遞增,而 version 則是會在每次作 split/merge 的時候遞增。

peers:當前 Region 包含的節點信息。對於一個 Raft Group,咱們一般有三個副本,每一個副本咱們使用 Peer 來表示,Peer 的 id 也是全局由 PD 分配,而 store_id 則代表這個 Peer 在哪個 Store 上面。

RocksDB / Keys Prefix

對於實際數據存儲,不管是 Raft Meta,Log,仍是 State Machine 的 data,咱們都存到一個 RocksDB 實例裏面。關於 RocksDB,能夠詳細參考 facebook/rocksdb

咱們使用不一樣的前綴來對 Raft 以及 State Machine 等數據進行區分,具體能夠參考 raftstore/store/keys.rs,對於 State Machine 實際的 data 數據,咱們統一添加 ‘z’ 前綴。而對於其餘會存在本地的元數據(包括 Raft),咱們統一添加 0x01 前綴。

這裏簡單說明一下一些重要元數據的 Key 格式,咱們忽略最開始的 0x01 前綴。

  • 0x01:用於存放StoreIdent,在初始化這個 Store 的時候,咱們會將 Store 的 Cluster ID,Store ID 等信息存儲到這個 key 裏面。

  • 0x02:用來存儲 Raft 一些信息,0x02 以後會緊跟該 Raft Region 的 ID(8字節大端序 ),而後在緊跟一個 Suffix 來標識不一樣的子類型:

    • 0x01:用於存放 Raft Log,後面緊跟 Log Index(8字節大端序)

    • 0x02:用於存放 RaftLocalState

    • 0x03:用於存放 RaftApplyState

  • 0x03:用來存儲 Region 本地的一些元信息,0x03 以後緊跟 Raft Region ID,隨後在緊跟一個 Suffix 來表示不一樣的子類型:

    • 0x01:用於存放 RegionLocalState

對於上面提到的幾個類型,都在 protobuf 裏面定義:

message RaftLocalState {
    optional eraftpb.HardState hard_state        = 1;
    optional uint64 last_index                  = 2;
}

message RaftApplyState {
    optional uint64 applied_index               = 1;
    optional RaftTruncatedState truncated_state = 2;
}

enum PeerState {
    Normal       = 0;
    Applying     = 1;
    Tombstone    = 2;
}

message RegionLocalState {
    optional PeerState state        = 1;
    optional metapb.Region region   = 2;
}

RaftLocalState: 用於存放當前 Raft 的 HardState 以及最後一個 Log index。

RaftApplyState: 用於存放當前 Raft 最後 apply 的 Log index 以及被 truncated 的 Log 信息。

RegionLocalStaste: 用於存放 Region 信息以及在該 Store 上面對應的 Peer 狀態,Normal 代表是一個正常的 Peer,Applying 代表該 Peer 還沒作完 apply snapshot 的操做,而 Tombstone 則代表該 Peer 已經被移除出了 Region,不能在參與到 Raft Group 裏面。

Peer Storage

前面已經知道,咱們經過 RawNode 來使用 Raft。由於一個 Region 對應的一個 Raft Group,Region 裏面的 Peer 就對應的是一個 Raft 副本。因此,咱們在 Peer 裏面封裝了對 RawNode 的操做。

要使用 Raft,咱們須要定義本身的 Storage,這在 raftstore/store/peer_storage.rs 的 PeerStorage 類裏面實現。

當建立 PeerStorage 的時候,首先咱們會從 RocksDB 裏面獲得該 Peer 以前的 RaftLocalState,RaftApplyState,以及 last_term 等,這些會緩存到內存裏面,便於後續的快速度訪問。

PeerStorage 須要注意幾個地方:

首先就是 RAFT_INIT_LOG_TERM 和 RAFT_INIT_LOG_INDEX,它們的值都是 5(只要大於 1 均可以)。在 TiKV 裏面,一個 Peer 的建立有以下幾種方式:

  1. 主動建立,一般對於第一個 Region 的第一個副本 Peer,咱們採用這樣的建立方式,初始化的時候,咱們會將它的 Log Term 和 Index 設置爲 5。

  2. 被動建立,當一個 Region 添加一個副本 Peer 的時候,當這個 ConfChange 命令被 applied 以後, Leader 會給這個新增 Peer 所在的 Store 發送 Message,Store 收到這個 Message 以後,發現並無相應的 Peer 存在,而且肯定這個 Message 是合法的,就會建立一個對應的 Peer,但此時這個 Peer 是一個未初始化的 Peer,不知道所在的 Region 任何的信息,咱們使用 0 來初始化它的 Log Term 和 Index。Leader 就能知道這個 Follower 並無數據(0 到 5 之間存在 Log 缺口),Leader 就會給這個 Follower 直接發送 snapshot。

  3. Split 建立,當一個 Region 分裂成兩個 Region,其中一個 Region 會繼承分裂以前 Region 的元信息,只是會將本身的 Range 範圍修改。而另外一個 Region 相關的元信息,則會新建,新建的這個 Region 對應的 Peer,初始的 Log Term 和 Index 也是 5,由於這時候 Leader 和 Follower 都有最新的數據,不須要 snapshot。(注意:實際 Split 的狀況很是的複雜,有可能也會出現發送 snapshot 的狀況,但這裏不作過多說明)。

而後就是須要注意 snapshot 的處理。不管 generate 仍是 apply snapshot,都是一件比較費時的操做,爲了避免讓 snapshot 的處理卡主整個 Raft 線程,PeerStore 都是會先只同步更新 snapshot 相關的元信息,這樣就不用阻礙後續的 Raft 流程,而後會在另外一個線程異步的進行 snapshot 的操做。PeerStorage 會維護一個 snapshot 的 state,以下:

pub enum SnapState {
    Relax,
    Generating(Receiver<Snapshot>),
    Applying(Arc<AtomicUsize>),
    ApplyAborted,
}

這裏注意 Generating 是一個 channel Receiver,當異步 snapshot 生成好以後,就會給這個 channel 發送消息,這樣下一次 Raft 檢查的時候,就能直接從這個 channel 獲得 snapshot 了。Applying 是一個共享的原子整數,這樣就能多線程去判斷當前 applying 的狀態,包括:

pub const JOB_STATUS_PENDING: usize = 0;
pub const JOB_STATUS_RUNNING: usize = 1;
pub const JOB_STATUS_CANCELLING: usize = 2;
pub const JOB_STATUS_CANCELLED: usize = 3;
pub const JOB_STATUS_FINISHED: usize = 4;
pub const JOB_STATUS_FAILED: usize = 5;

譬如,若是狀態是 JOB_STATUS_RUNNING,那麼代表當前正在進行 applying snapshot 的操做。現階段,咱們是不容許 FAILED 的,也就是若是 apply snapshot 失敗,咱們會 panic。

Peer

Peer 封裝了 Raft RawNode,咱們對 Raft 的 Propose,ready 的處理都是在 Peer 裏面完成的。

首先關注 propose 函數,Peer 的 propose 是外部 Client command 的入口。Peer 會判斷這個 command 的類型:

  • 若是是隻讀操做,而且 Leader 仍然是在 lease 有效期內,Leader 就能直接提供 local read,不須要走 Raft 流程。

  • 若是是 Transfer Leader 操做,Peer 首先會判斷本身仍是不是 Leader,同時判斷須要變成新 Leader 的 Follower 是否是有足夠新的 Log,若是條件都知足,Peer 就會調用 RawNode 的 transfer_leader 命令。

  • 若是是 Change Peer 操做,Peer 就會調用 RawNode propose_conf_change。

  • 剩下的,Peer 會直接調用 RawNode 的 propose。

在 propose 以前,Peer 也會將這個 command 對應的 callback 存到 PendingCmd 裏面,當對應的 log 被 applied 以後,會經過 command 裏面惟一的 uuid 找到對應的 callback 調用,並給 Client 返回相應的結果。

另外一個須要關注的就是 Peer 的 handle_raft_ready 系列函數,在以前 Raft 章節裏面介紹過,當一個 RawNode ready 以後,咱們須要對 ready 裏面的數據作一系列處理,包括將 entries 寫入 Storage,發送 messages,apply committed_entries 以及 advance 等。這些全都在 Peer 的 handle_raft_ready 系列函數裏面完成。

對於 committed_entries 的處理,Peer 會解析實際的 command,調用對應的處理流程,執行對應的函數,譬如 exec_admin_cmd 就執行 ConfChange,Split 等 admin 命令,而 exec_write_cmd 則執行一般的對 State Machine 的數據操做命令。爲了保證數據的一致性,Peer 在 execute 的時候,都只會將修改的數據保存到 RocksDB 的 WriteBatch 裏面,而後在最後原子的寫入到 RocksDB,寫入成功以後,才修改對應的內存元信息。若是寫入失敗,咱們會直接 panic,保證數據的完整性。

在 Peer 處理 ready 的時候,咱們還會傳入一個 Transport 對象,用來讓 Peer 發送 message,Transport 的 trait 定義以下:

pub trait Transport: Send + Clone {
    fn send(&self, msg: RaftMessage) -> Result<()>;
}

它就只有一個函數 send,TiKV 實現的 Transport 會將須要 send 的 message 發到 Server 層,由 Server 層發給其餘的節點。

Multi Raft

Peer 只是單個 Region 的副本,由於 TiKV 是支持 Multi Raft,因此對於一個 Store 來講,咱們須要管理多個 Region 的副本,這些都是在 Store 類裏面統一進行管理的。

Store 會保存全部的 Peers 信息,使用:region_peers: HashMap<u64, Peer>

region_peers 的 key 就是 Region ID,而 Peer 則是該 Region 在該 Store 上面的副本 Peer。

Store 使用 mio 驅動整個流程(後續咱們會使用 tokio-core 來簡化異步邏輯處理)。

咱們在 mio 裏面註冊一個 base Raft Tick,每隔 100ms,調用一次,Store 會遍歷全部的 Peer,一次調用對應的 RawNode tick 函數,驅動 Raft。

Store 經過 mio 的 notify 機制,接受外面 Client 的請求處理,以及其餘 Store 發過來的 Raft message。 譬如收到 Msg::RaftCmd 消息以後,Store 就會調用 propose_raft_command 來處理,而收到 Msg::RaftMessage 消息以後,Store 就會調用 on_raft_message 來處理。

在每次 EventLoop 循環的最後,也就是 mio 的 tick 回調裏面,Store 會進行 on_raft_ready 的處理:

  1. Store 會遍歷全部的 ready Peers,調用 handle_raft_ready_append,咱們會使用一個 WriteBatch 來處理全部的 ready append 數據,同時保存相關的結果。

  2. 若是 WriteBatch 成功,會依次調用 post_raft_ready_append,這裏主要用來處理Follower 的消息發送(Leader 的消息已經在 handle_raft_ready_append 裏面完成)。

  3. 而後,Store 會依次調用 handle_raft_ready_apply,apply 相關 committed entries,而後調用 on_ready_result 處理最後的結果。

Server

Server 層就是 TiKV 的網絡層,現階段,TiKV 使用 mio 來實現整個網絡的處理,而網絡協議則是使用自定義的,以下:

message = header + body 
header:  | 0xdaf4(2 bytes magic value) | 0x01(version 2 bytes) | msg_len(4 bytes) | msg_id(8 bytes) |

任何一個 message,咱們都使用 header + body 的方式,body 就是實際的 message 數據,使用 protobuf 編碼,而 header,首先就是兩個字節的 magic value,0xdaf4,而後就是版本號,再就是 message 的整個長度,以及 message 的惟一 ID。

對於 mio,在 Linux 下面就是封裝的 epoll,因此熟悉 epoll 的用戶應該能很是方便的使用 mio 進行網絡開發,簡單流程以下:

  • bind 一個端口,生成一個 TcpListener 對象,而且 register 到 mio。

  • 處理 TcpListener on_readable 的回調,調用 accept 函數獲得生成的 socket TcpStream,register 到 mio,咱們後續就用這個 TcpStream 跟客戶端進行交互。

  • TcpStream 處理 on_readable 或者 on_writable 的回調。

同時,Server 經過 mio 的 notify 來接受外面發過來的消息,譬如 TiKV 實現的 Transport,就是 Peer 在調用 send 的時候,將這個 message 直接經過 channel 發給 Server,而後在 notify 裏面處理,找到對應的 Store connection,再發送給遠端的 Store 的。

對於 snapshot 的發送,Server 會單獨新開一個鏈接,直接使用一個線程同步發送,這樣代碼邏輯就會簡單不少,不須要處理過多的異步 IO 邏輯。而對於接收端來講,在收到一個 message 的時候,會首先看這個 message 的類型,若是發現是 snapshot 的,則會進入接受 snapshot 的流程,會將收到的數據直接發給 snapshot 相關的線程,寫到對應的 snapshot 文件裏面。若是是其餘的 message,也會直接 dispatch 到對應的處理邏輯處理,能夠參考 Server 的 on_conn_msg 函數。

由於 Server 就是對網絡 IO 的處理,邏輯比較簡單,這裏就不過多說明,可是,鑑於現階段 TiKV 使用的是自定義的網絡協議,並不利於跟外部其餘客戶端的對接,而且也沒有 pipeline,stream 等優秀特性的 支持,因此後續咱們會換成 gRPC。

總結

這裏,咱們解釋了 TiKV 核心的 Raft 庫,Multi Raft。在後續的章節,咱們會介紹 Transaction,Coprocessor 以及 PD 是如何對整個集羣進行變動的。(第二部分完結)

相關文章
相關標籤/搜索