SOFAJRaft-RheaKV 是如何使用 Raft 的 | SOFAJRaft 實現原理

SOFAStacknode

Scalable Open Financial Architecture Stackgit

是螞蟻金服自主研發的金融級分佈式架構,包含了構建金融級雲原生架構所需的各個組件,是在金融場景裏錘鍊出來的最佳實踐。github

SOFAJRaft-2.png

本文爲《剖析 | SOFAJRaft 實現原理》第二篇,本篇做者米麒麟,來自陸金所。《剖析 | SOFAJRaft 實現原理》系列由 SOFA 團隊和源碼愛好者們出品,項目代號:<SOFA:JRaftLab/>,目前領取已經完成,感謝你們的參與。算法

SOFAJRaft 是一個基於 Raft 一致性算法的生產級高性能 Java 實現,支持 MULTI-RAFT-GROUP,適用於高負載低延遲的場景。數據庫

SOFAJRaft :github.com/sofastack/s…緩存

前言

SOFAJRaft-RheaKV 是基於 SOFAJRaft 和 RocksDB 實現的嵌入式、分佈式、高可用、強一致的 KV 存儲類庫,SOFAJRaft 是基於 Raft 一致性算法的生產級高性能 Java 實現,支持 Multi-Raft-Group。SOFAJRaft-RheaKV 集羣主要包括三個核心組件:PD,Store 和 Region。本文將圍繞 SOFAJRaft-RheaKV 架構設計,存儲概覽,核心模塊,使用場景以及基於 Raft 實現等方面剖析 SOFAJRaft-RheaKV 基於 SOFAJRaft 實現原理,闡述如何使用 Raft 協議支持 KV 存儲類庫功能特性:安全

  • SOFAJRaft-RheaKV 基礎架構如何設計?核心組件負責哪些功能?模塊內部處理流程是怎樣?
  • 基於 SOFAJRaft 如何使用 Raft 實現 SOFAJRaft-RheaKV 強一致性和自驅動等特性?

image.png

SOFAJRaft-RheaKV 概覽

SOFAJRaft-RheaKV 是一個輕量級的分佈式的嵌入式的 KV 存儲 Library, RheaKV 包含在 SOFAJRaft 項目裏,是 SOFAJRaft 的子模塊。SOFAJRaft-RheaKV 定位是嵌入式 jar 包方式嵌入到應用中,涵蓋如下功能特性:bash

  • 強一致性,基於 Multi-Raft 分佈式一致性協議保證數據可靠性和一致性;
  • 自驅動,自診斷,自優化,自決策,自恢復;
  • 可監控基於節點自動上報到 PD 的元信息和狀態信息;
  • 基本 API get/put/delete 和跨分區 scan/batch put,distributed lock 等。

架構設計

SOFAJRaft-RheaKV 存儲類庫主要包括 PD,Store 和 Region 三個核心組件,支持輕量級的狀態/元信息存儲以及集羣同步,分佈式鎖服務使用場景:數據結構

  • PD 是全局的中心總控節點,負責整個集羣的調度管理,維護 RegionRouteTable 路由表。一個 PDServer 管理多個集羣,集羣之間基於 clusterId 隔離;PD Server 須要單獨部署,不少場景其實並不須要自管理,RheaKV 也支持不啓用 PD,不須要自管理的集羣可不啓用 PD,設置 PlacementDriverOptions 的 fake選項爲 true 便可。
  • Store 是集羣中的一個物理存儲節點,一個 Store 包含一個或多個 Region。
  • Region 是最小的 KV 數據單元,可理解爲一個數據分區或者分片,每一個 Region 都有一個左閉右開的區間 [startKey, endKey),可以根據請求流量/負載/數據量大小等指標自動分裂以及自動副本搬遷。Region 有多個副本 Replication 構建 Raft Groups 存儲在不一樣的 Store 節點,經過 Raft 協議日誌複製功能數據同步到同 Group 的所有節點。

image.png

存儲設計

SOFAJRaft-RheaKV 存儲層爲可插拔設計,實現 RawKVStore 存儲接口,目前 StoreEngine 存儲引擎支持 MemoryDB 和 RocksDB 兩種實現:架構

  • MemoryRawKVStore:MemoryDB 基於 ConcurrentSkipListMap 實現,有更好的性能,可是單機存儲容量受內存限制;
  • RocksRawKVStore:RocksDB 在存儲容量上只受磁盤限制,適合更大數據量的場景。

SOFAJRaft-RheaKV 存儲引擎基於 MemoryDB 和 RocksDB 實現 KV 存儲入口:

com.alipay.sofa.jraft.rhea.storage.RawKVStore
com.alipay.sofa.jraft.rhea.storage.MemoryRawKVStore
com.alipay.sofa.jraft.rhea.storage.RocksRawKVStore複製代碼

SOFAJRaft-RheaKV 數據強一致性依靠 SOFAJRaft 同步數據到其餘副本 Replication, 每一個數據變動都會落地爲一條 Raft 日誌, 經過 Raft 協議日誌複製功能將數據安全可靠地同步到同 Raft Group 的所有節點裏。

核心設計

SOFAJRaft-RheaKV 核心模塊包括 KV 模塊[RheaKVStore 基於 RegionRouteTable 路由表使用 RaftRawKVStore 存儲 KeyValue],PD 模塊[PlacementDriverServer 基於 StoreHeartbeat/RegionHeartbeat 心跳平衡節點分區 Leader 以及分裂]。

KV 模塊內部處理

image.png

  • RheaKVStore:最上層 User API,默認實現爲 DefaultRheaKVStore, RheaKVStore 爲純異步實現,因此一般阻塞調用致使的客戶端出現瓶頸,理論上不會在 RheaKV 上遭遇,DefaultRheaKVStore 實現包括請求路由、Request 分裂、Response 聚合以及失敗重試等功能。
  • PlacementDriverClient:非必須,做爲與 PlacementDriver Server 集羣溝通的客戶端,經過它獲取集羣完整信息包括但不只限於"請求路由表",對於無 PD 場景, RheaKV 提供 Fake PD Client。
  • RegionRouteTable:分片邏輯基於 RegionRouteTable 路由表結構,最適合的數據結構即是跳錶或者二叉樹(最接近匹配項查詢)。做爲本地路由表緩存組件,RegionRouteTable 根據 KV 請求的具體失敗緣由來決策是否從 PD Server 集羣刷新數據,而且提供對單個 Key、多個 Key 列表以及 Key Range 進行計算返回對應的分區 ID。選擇 Region 的 StartKey 做爲 RegionRouteTable 的 Key ,主要取決於 Region Split 的方式,父 Region 分裂成兩個子 Region 致使其中一個子 Region 的 StartKey 爲 SplitKey。
  • LoadBalancer:在提供 Follower 線性一致讀的配置下有效,目前僅支持 RR 策略。
  • RheaKVRpcService:針對 KV 存儲服務的 RPC Client 客戶端封裝,實現 Failover 邏輯。
  • RegionKVService:KV Server 服務端的請求處理服務,一個 StoreEngine 中包含不少 RegionKVService, 每一個 RegionKVService 對應一個 Region,只處理自己 Region 範疇內的請求。
  • MetricsRawKVStore:攔截請求作指標度量。
  • RaftRawKVStore:RheaKV 的 Raft 入口,從這裏開始 Raft 流程。
  • KVStoreStateMachine:實現 Raft 狀態機。
  • RocksRawKVStore:原始的 RocksDB API 封裝, 目前 RheaKV 也支持可插拔的 MemoryDB 存儲實現。

PD 模塊內部處理

image.png

PD 模塊主要參考 TIKV 的設計理念,目前只實現自動平衡全部節點的分區 Leader 以及自動分裂。

  • PlacementDriverClient -> MetadataClient:MetadataClient 負責從 PD 獲取集羣元信息以及註冊元信息。
  • StoreEngine -> HeartbeatSender:
    • HeartbeatSender 負責發送當前存儲節點的心跳,心跳中包含一些狀態信息,心跳一共分爲兩類:StoreHeartbeat 和 RegionHeartbeat;
    • PD 不斷接受 RheaKV 集羣這兩類心跳消息,PD 在對 Region Leader 的心跳回復裏面包含具體調度指令,再以這些信息做爲決策依據。除此以外,PD 還應該能夠經過管理接口接收額外的運維指令,用來人爲執行更準確的決策。
    • 兩類心跳包含的狀態信息詳細內容以下:
      • StoreHeartbeat 包括存儲節點 Store 容量,Region 數量,Snapshot 數量以及寫入/讀取數據量等 StoreStats 統計明細;
      • RegionHeartbeat 包括 Region 的 Leader 位置,掉線 Peer 列表,暫時不 Work 的 Follower 以及寫入/讀取數據量/Key 的個數等 RegionStats 統計明細。
  • Pipeline:是針對心跳上報 Stats 的計算以及存儲處理流水線,處理單元 Handler 可插拔很是方便擴展。
  • MetadataStore:負責集羣元信息存儲以及查詢,存儲方面基於內嵌的 RheaKV。

SOFAJRaft-RheaKV 剖析

RheaKV 是基於 SOFAJRaft 實現的嵌入式、分佈式、高可用、強一致的 KV 存儲類庫,TiKV 是一個分佈式的 KV 系統,採用 Raft 協議保證數據的強一致性,同時使用 MVCC + 2PC 方式實現分佈式事務的支持,二者如何基於 Raft協議實現 KV 存儲?

RheaKV 基於 JRaft 實現

RaftRawKVStore 是 RheaKV 基於 Raft 複製狀態機 KVStoreStateMachine 的 RawKVStore 接口 KV 存儲實現,調用 applyOperation(kvOperation,kvStoreClosure) 方法根據讀寫請求申請指定 KVOperation 操做,申請鍵值操做處理邏輯:

  1. 檢查當前節點的狀態是否爲 STATE_LEADER,若是當前節點不是 Leader 直接失敗通知 Done Closure,通知失敗(NOT_LEADER)後客戶端刷新 Leader 地址而且重試。Raft 分組 Leader 節點調用 Node#apply(task) 提交申請基於鍵值操做的任務到相應 Raft Group,向 Raft Group 組成的複製狀態機集羣提交新任務應用到業務狀態機,Raft Log 造成 Majority 後 StateMachine#onApply(iterator) 接口應用到狀態機的時候會被獲取調用。Node 節點構建申請任務日誌封裝成事件發佈回調,發佈節點服務事件到隊列 applyQueue,依靠 Disruptor 的 MPSC 模型批量消費,對總體吞吐性能有着極大的提高。日誌服務事件處理器以單線程 Batch 攢批的消費方式批量運行鍵值存儲申請任務;
  2. Raft 副本節點 Node 執行申請任務檢查當前狀態是否爲 STATE_LEADER,必須保證 Leader 節點操做申請任務。循環遍歷節點服務事件判斷任務的預估任期是否等於當前節點任期,Leader 沒有發生變動的階段內提交的日誌擁有相同的 Term 編號,節點 Node 任期知足預期則 Raft 協議投票箱 BallotBox 調用 appendPendingTask(conf, oldConf, done) 日誌複製以前保存應用上下文,即基於當前節點配置以及原始配置建立選票 Ballot 添加到選票雙向隊列 pendingMetaQueue;
  3. 日誌管理器 LogManager 調用底層日誌存儲 LogStorage#appendEntries(entries) 批量提交申請任務日誌寫入 RocksDB,用於 Leader 向 Follower 複製日誌包括心跳存活檢查等。日誌管理器發佈 Leader 穩定狀態回調 LeaderStableClosure 事件到隊列 diskQueue 即 Disruptor 的 Ring Buffer,穩定狀態回調事件處理器經過MPSC Queue 模型攢批消費觸發提交節點選票;
  4. 投票箱 BallotBox 調用 commitAt(firstLogIndex, lastLogIndex, peerId) 方法提交當前 PeerId 節點選票到 Raft Group,更新日誌索引在[first_log_index, last_log_index]範疇。經過 Node#apply(task) 提交的申請任務最終將會複製應用到全部 Raft 節點上的狀態機,RheaKV 狀態機經過繼承 StateMachineAdapter 狀態機適配器的 KVStoreStateMachine 表示;
  5. Raft 狀態機 KVStoreStateMachine 調用 onApply(iterator) 方法按照提交順序應用任務列表到狀態機。當 onApply(iterator) 方法返回時認爲此批申請任務都已經成功應用到狀態機上,假如沒有徹底應用(好比錯誤、異常)將被當作 Critical 級別錯誤報告給狀態機的 onError(raftException) 方法,錯誤類型爲 ERROR_TYPE_STATE_MACHINE。Critical 錯誤致使終止狀態機,爲何這裏須要終止狀態機,非業務邏輯異常的話(好比磁盤滿了等 IO 異常),表明可能某個節點成功應用到狀態機,可是當前節點卻應用狀態機失敗,是否是表明出現不一致的錯誤? 解決辦法只能終止狀態機,須要手工介入重啓,重啓後依靠 Snapshot + Raft log 恢復狀態機保證狀態機數據的正確性。提交的任務在 SOFAJRaft 內部用來累積批量提交,應用到狀態機的是 Task迭代器,經過 com.alipay.sofa.jraft.Iterator 接口表示;
  6. KVStoreStateMachine 狀態機迭代狀態輸出列表積攢鍵值狀態列表批量申請 RocksRawKVStore 調用 batch***(kvStates) 方法運行相應鍵值操做存儲到 RocksDB,爲啥 Batch 批量存儲呢? 刷盤經常使用伎倆,攢批刷盤優於屢次刷盤。經過 RecycleUtil 回收器工具回收狀態輸出列表,其中KVStateOutputList 是 Pooled ArrayList 實現,RecycleUtil 用於釋放列表對象池化複用避免每次建立 List。

RheaKV 基於狀態機 KVStoreStateMachine 的 RaftRawKVStore 存儲 Raft 實現入口:

com.alipay.sofa.jraft.rhea.storage.RaftRawKVStore複製代碼

RheaKV 運行在每一個 Raft 節點上面的狀態機 KVStoreStateMachine 實現入口:

com.alipay.sofa.jraft.rhea.storage.KVStoreStateMachine 複製代碼

RheaKV 是一個要保證線性一致性的分佈式 KV 存儲引擎,所謂線性一致性,一個簡單的例子是在 T1 的時間寫入一個值,那麼在 T1 以後讀必定能讀到這個值,不可能讀到 T1 以前的值。由於 Raft 協議是爲了實現分佈式環境下面線性一致性的算法,因此經過 Raft 很是方便的實現線性 Read,即將任何的讀請求走一次 Raft Log,等 Log 日誌提交以後在 apply 的時候從狀態機裏面讀取值,必定可以保證此讀取到的值是知足線性要求的。由於每次 Read 都須要走 Raft 流程,因此性能是很是的低效的,SOFAJRaft 實現 Raft 論文提到 ReadIndex 和 Lease Read 優化,提供基於 Raft 協議的 ReadIndex 算法的更高效率的線性一致讀實現,ReadIndex 省去磁盤的開銷,結合 SOFAJRaft 的 Batch + Pipeline Ack + 全異步機制大幅度提高吞吐。RaftRawKVStore 接收 get/multiGet/scan/getSequence 讀請求都使用 Node#readIndex(requestContext, readIndexClosure) 發起一次線性一致讀請求,當可以安全讀取的時候傳入的 ReadIndexClosure 將被調用,正常狀況從狀態機中讀取數據返回給客戶端,readIndex 讀取失敗嘗試應用鍵值讀操做申請任務於 Leader 節點的狀態機 KVStoreStateMachine,SOFAJRaft 保證讀取的線性一致性。線性一致讀在任何集羣內的節點發起,並不須要強制要求放到 Leader 節點上面,將請求散列到集羣內的全部節點上,下降 Leader 節點的讀取壓力。RaftRawKVStore 的 get 讀操做發起一次線性一致讀請求的調用:

// KV 存儲實現線性一致讀
public void get(final byte[] key, final boolean readOnlySafe, final KVStoreClosure closure) {
        if (!readOnlySafe) {
            this.kvStore.get(key, false, closure);
            return;
        }
        // 調用 readIndex 方法,等待回調執行
        this.node.readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {

            @Override
            public void run(final Status status, final long index, final byte[] reqCtx) {
                if (status.isOk()) {
                    // ReadIndexClosure 回調成功,從 RawKVStore 調用 get 方法讀取最新數據返回
                    RaftRawKVStore.this.kvStore.get(key, true, closure);
                    return;
                }
                // 特殊狀況譬如發生選舉讀請求失敗,嘗試申請 Leader 節點的狀態機
                RaftRawKVStore.this.readIndexExecutor.execute(() -> {
                    if (isLeader()) {
                        LOG.warn("Fail to [get] with 'ReadIndex': {}, try to applying to the state machine.", status);
                        // If 'read index' read fails, try to applying to the state machine at the leader node
                        applyOperation(KVOperation.createGet(key), closure);
                    } else {
                        LOG.warn("Fail to [get] with 'ReadIndex': {}.", status);
                        // Client will retry to leader node
                        new KVClosureAdapter(closure, null).run(status);
                    }
                });
            }
        });
}複製代碼

TiKV 基於 Raft 實現

TiDB 是 PingCAP 公司設計的開源分佈式 HTAP (Hybrid Transactional and Analytical Processing) 數據庫,TiDB 集羣主要包括三個核心組件:TiDB Server,PD Server 和 TiKV Server。TiKV Server 負責存儲數據,從外部看 TiKV 是一個分佈式的提供事務的 Key-Value 存儲引擎。存儲數據的基本單位是 Region,每一個 Region 負責存儲一個 Key Range(從 StartKey 到 EndKey 的左閉右開區間)的數據,每一個 TiKV 節點負責多個 Region。TiKV 使用 Raft 協議作複製,保持數據的一致性和容災。副本以 Region 爲單位進行管理,不一樣節點上的多個 Region 構成一個 Raft Group,互爲副本。數據在多個 TiKV 之間的負載均衡由 PD 調度,這裏也是以 Region 爲單位進行調度。TiKV 利用 Raft 來作數據複製,每一個數據變動都會落地爲一條 Raft 日誌,經過 Raft 的日誌複製功能,將數據安全可靠地同步到 Group 的多數節點。TiKV 總體架構包括 Placement Driver,Node,Store 以及 Region 組件:

  • Placement Driver : Placement Driver (PD) 負責整個集羣的管理調度。
  • Node : Node 能夠認爲是一個實際的物理機器,每一個 Node 負責一個或者多個 Store。
  • Store : Store 使用 RocksDB 進行實際的數據存儲,一般一個 Store 對應一塊硬盤。
  • Region : Region 是數據移動的最小單元,對應的是 Store 裏面一塊實際的數據區間。每一個 Region 有多個副本(Replica),每一個副本位於不一樣的 Store ,而這些副本組成了一個 Raft group。

image.png

TiKV 使用 Raft 一致性算法來保證數據的安全,默認提供的是三個副本支持,這三個副本造成了一個 Raft Group。當 Client 須要寫入 TiKV 數據的時候,Client 將操做發送給 Raft Leader,在 TiKV 裏面稱作 Propose,Leader 將操做編碼成一個 Entry,寫入到本身的 Raft Log 裏面,稱作 Append。Leader 也會經過 Raft 算法將 Entry 複製到其餘的 Follower 上面,叫作 Replicate。Follower 收到這個 Entry 以後也會一樣進行 Append 操做,順帶告訴 Leader Append 成功。當 Leader 發現此 Entry 已經被大多數節點 Append,認爲此 Entry 已是 Committed 的,而後將 Entry 裏面的操做解碼出來,執行而且應用到狀態機裏面,叫作 Apply。TiKV 提供 Lease Read,對於 Read 請求直接發給 Leader,若是 Leader 肯定自身的 Lease 沒有過時,那麼直接提供 Read 服務不用執行一次 Raft 流程。若是 Leader 發現 Lease 已通過期,就會強制執行一次 Raft 流程進行續租而後再提供 Read 服務。TiKV 是以 Region 爲單位作數據的複製,也就是一個 Region 的數據保存多個副本,將每個副本叫作一個 Replica。Replica 之間是經過 Raft 來保持數據的一致,一個 Region 的多個 Replica 保存在不一樣的節點上構成一個 Raft Group,其中一個 Replica 做爲此 Group 的 Leader,其餘的 Replica 做爲 Follower。全部的讀和寫都是經過 Leader 進行,再由 Leader 複製給 Follower。

總結

本文圍繞 SOFAJRaft-RheaKV 架構存儲,模塊流程以及基於 Raft 實現細節方面闡述 SOFAJRaft-RheaKV 基本原理,剖析 SOFAJRaft-RheaKV 如何使用 JRaft 一致性協議日誌複製功能保證數據的安全和容災,參考 TiKV 基於 Raft 算法實現了分佈式環境數據的強一致性。

系列閱讀

歡迎參加 SOFAMeetup#2 上海站

image.png

SOFA Meetup #2 上海站《使用 SOFAStack 快速構建微服務》期待你的參與❤~

5 月 26 日,本週日,SOFAStack 開源核心成員集體出動。本期咱們將側重於各個落地的實際場景進行架構解析。

分佈式事務 Seata 詳解、與 Spring Cloud 生態的融合案例、使用 SOFAStack 快速構建微服務 Demo 實操、更有最新開源的《讓 AI 像 SQL 同樣簡單 — SQLFlow Demo 》首秀,週日不見不散~

戳連接便可報名:tech.antfin.com/community/a…

公衆號:金融級分佈式架構(Antfin_SOFA)

相關文章
相關標籤/搜索