SOFAStack Scalable Open Financial Architecture Stack 是螞蟻金服自主研發的金融級分佈式架構,包含了構建金融級雲原生架構所需的各個組件,是在金融場景裏錘鍊出來的最佳實踐。java
SOFAJRaft 是一個基於 Raft 一致性算法的生產級高性能 Java 實現,支持 MULTI-RAFT-GROUP,適用於高負載低延遲的場景。git
本文爲《剖析 | SOFAJRaft 實現原理》第五篇,本篇做者袖釦,來自螞蟻金服。算法
《剖析 | SOFAJRaft 實現原理》系列由 SOFA 團隊和源碼愛好者們出品,項目代號:<SOFA:JRaftLab/>
,文章尾部有參與方式,歡迎一樣對源碼熱情的你加入。數據庫
SOFAJRaft :https://gitee.com/sofastack/sofa-jraft網絡
RheaKV 是首個以 JRaft 爲基礎實現的一個原生支持分佈式的嵌入式鍵值(key、value)數據庫,如今本文將從 RheaKV 是如何利用 MULTI-RAFT-GROUP 的方式實現 RheaKV 的高性能及容量的可擴展性的,從而進行全面的源碼、實例剖析。架構
經過對 Raft 協議的描述咱們知道:用戶在對一組 Raft 系統進行更新操做時必須先通過 Leader,再由 Leader 同步給大多數 Follower。而在實際運用中,一組 Raft 的 Leader 每每存在單點的流量瓶頸,流量高便沒法承載,同時每一個節點都是全量數據,因此會受到節點的存儲限制而致使容量瓶頸,沒法擴展。併發
MULTI-RAFT-GROUP 正是經過把整個數據從橫向作切分,分爲多個 Region 來解決磁盤瓶頸,而後每一個 Region 都對應有獨立的 Leader 和一個或多個 Follower 的 Raft 組進行橫向擴展,此時系統便有多個寫入的節點,從而分擔寫入壓力,圖以下:運維
此時磁盤及 I/O 瓶頸解決了,那多個 Raft Group 是如何協做的呢,咱們接着往下看。異步
RheaKV 主要由 3 個角色組成:PlacementDriver(如下成爲 PD) 、Store、Region。因爲 RheaKV 支持多組 Raft,因此比單組場景多出一個 PD 角色,用來調度以及收集每一個 Store 及 Region 的基礎信息。分佈式
PD 負責整個集羣的管理調度、Region ID 生成等。此組件非必須的,若是不使用 PD,設置 PlacementDriverOptions 的 fake 屬性爲 true 便可。PD 通常經過 Region 的心跳返回信息進行對 Region 調度,Region 處理完後,PD 則會在下一個心跳返回中收到 Region 的變動信息來更新路由及狀態表。
一般一個 Node 負責一個 Store,Store 能夠被看做是 Region 的容器,裏面存儲着多個分片數據。Store 會向 PD 主動上報 StoreHeartbeatRequest 心跳,心跳交由 PD 的 handleStoreHeartbeat 處理,裏面包含該 Store 的基本信息,好比,包含多少 Region,有哪些 Region 的 Leader 在該 Store 等。
Region 是數據存儲、搬遷的最小單元,對應的是 Store 裏某個實際的數據區間。每一個 Region 會有多個副本,每一個副本存儲在不一樣的 Store,一塊兒組成一個Raft Group。Region 中的 Leader 會向 PD 主動上報 RegionHeartbeatRequest 心跳,交由 PD 的 handleRegionHeartbeat 處理,而 PD 是經過 Region 的 Epoch 感知 Region 是否有變化。
Muti-Raft-Group 的多 Region 是經過 RegionRouteTable 路由表組件進行管理的,可經過 addOrUpdateRegion、removeRegion 進行添加、更新、移除 Region,也包括 Region 的拆分。目前暫時還未實現 Region 的聚合,後面會考慮實現。
「讓每組 Raft 負責一部分數據。」
數據分區或者分片算法一般就是 Range 和 Hash,RheaKV 是經過 Range 進行數據分片的,分紅一個個 Raft Group,也稱爲 Region。這裏爲什麼要設計成 Range 呢?緣由是 Range 切分是按照對 Key 進行字節排序後再作每段每段切分,像相似 scan 等操做對相近 key 的查詢會盡量集中在某個 Region,這個是 Hash 沒法支持的,就算遇到單個 Region 的拆分也會更好處理一些,只用修改部分元數據,不會涉及到大範圍的數據挪動。
固然 Range 也會有一個問題那就是,可能會存在某個 Region 被頻繁操做成爲熱點 Region。不過也有一些優化方案,好比 PD 調度熱點 Region 到更空閒的機器上,或者提供 Follower 分擔讀的壓力等。
Region 和 RegionEpoch 結構以下:
class Region { long id; // region id // Region key range [startKey, endKey) byte[] startKey; // inclusive byte[] endKey; // exclusive RegionEpoch regionEpoch; // region term List<Peer> peers; // all peers in the region } class RegionEpoch { // Conf change version, auto increment when add or remove peer long confVer; // Region version, auto increment when split or merge long version; } class Peer { long id; long storeId; Endpoint endpoint; }
Region.id:爲 Region 的惟一標識,經過 PD 全局惟一分配。
Region.startKey、Region.endKey:這個表示的是 Region 的 key 的區間範圍 [startKey, endKey),特別值得注意的是針對最開始 Region 的 startKey,和最後 Region 的 endKey 都爲空。
Region.regionEpoch:當 Region 添加和刪除 Peer,或者 split 等,此時 regionEpoch 就會發生變化,其中 confVer 會在配置修改後遞增,version 則是每次有 split 、merge(還未實現)等操做時遞增。
Region.peers:peers 則指的是當前 Region 所包含的節點信息,Peer.id 也是由 PD 全局分配的,Peer.storeId 表明的是 Peer 當前所處的 Store。
因爲數據被拆分到不一樣 Region 上,因此在進行多 key 的讀、寫、更新操做時須要操做多個 Region,這時操做前咱們須要獲得具體的 Region,而後再單獨對不一樣 Region 進行操做。咱們以在多 Region上 scan 操做爲例, 目標是返回某個 key 區間的全部數據:
例如:com.alipay.sofa.jraft.rhea.client.DefaultRheaKVStore#scan(byte[], byte[], boolean, boolean)
咱們很容易看到,在調用 scan 首先讓 PD Client 經過 RegionRouteTable.findRegionsByKeyRange 檢索 startKey、endKey 所覆蓋的 Region,最後返回的可能爲多個 Region,具體 Region 覆蓋檢索方法以下:
檢索相關變量定義以下:
咱們能夠看到整個 RheaKV 的 range 路由表是經過 TreeMap 的進行存儲的,正呼應咱們前面講過全部的 key 是經過對應字節進行排序存儲。對應的 Value 爲該 Region 的 RegionId,隨後咱們經過 Region 路由 regionTable 查出便可。
如今咱們獲得 scan 覆蓋到的全部 Region:List<Region>
在循環查詢中咱們看到有一個「retryCause -> {}」的 Lambda 表達式很容易看出這裏是加持異常重試處理,後面咱們會講到,接下來會經過 internalRegionScan 查詢每一個 Region 的結果。具體源碼以下:
這裏也一樣有一個重試處理,能夠看到代碼中根據當前是否爲 Region 節點來決定是本機查詢仍是經過RPC進行查詢,若是是本機則調用 rawKVStore.scan() 進行本地直接查詢,反之經過 rheaKVRpcService 進行 RPC 遠程節點查詢。最後每一個 Region 查詢都返回爲一個 future,經過 FutureHelper.joinList 工具類 CompletableFuture.allOf 異步併發返回結果 List<KVEntry>
。
例如:com.alipay.sofa.jraft.rhea.client.DefaultRheaKVStore#put(java.lang.String, byte[])
咱們能夠發現 put 基礎方法是支持 batch 的,便可成批提交。如未使用 batch 即直接提交,具體邏輯以下:
經過 pdClinet 查詢對應存儲的 Region,而且經過 regionId 拿到 RegionEngine,再經過對應存儲引擎 KVStore 進行 put,整個過程一樣支持重試機制。咱們再回過去看看 batch 的實現,很容易發現利用到了 Disruptor 的 RingBuffer 環形緩衝區,無鎖隊列爲性能提供了保障,代碼現場以下:
前面咱們有講過,PD 會在 Region 的 heartBeat 裏面對 Region 進行調度,當某個 Region 裏的 keys 數量超過預設閥值,咱們便可對該 Region 進行拆分,Store 的狀態機 KVStoreStateMachine 即收到拆分消息進行拆分處理。具體拆分源碼以下:
KVStoreStateMachine.doSplit 源碼以下:
StoreEngine.doSplit 源碼以下:
咱們能夠輕易的看到從原始 parentRegion 切分紅 region 和 pRegion,並重設了 startKey、endKey 和版本號,並添加到 RegionEngineTable 註冊到 RegionKVService,同時調用 pdClient.getRegionRouteTable().splitRegion() 方法進行更新存儲在 PD 的 Region 路由表。
既然數據過多須要進行拆分,那 Region 進行合併那就確定是 2 個或者多個連續的 Region 數據量明顯小於絕大多數 Region 容量則咱們能夠對其進行合併。這一塊後面會考慮實現。
經過上面咱們知道,一個 Store 即爲一個節點,裏面包含着一個或者多個 RegionEngine,一個 StoreEngine 一般經過 PlacementDriverClient 對 PD 進行調用,同時擁有 StoreEngineOptions 配置項,裏面配置着存儲引擎和節點相關配置。
在這個過程當中裏面的 StoreEngine 會記錄着 regionKVServiceTable、regionEngineTable,它們分別掌握着具體每一個不一樣的 Region 存儲的操做功能,對應的 key 即爲 RegionId。
每一個在 Store 裏的 Region 副本中,RegionEngine 則是一個執行單元。它裏面記錄着關聯着的 StoreEngine 信息以及對應的 Region 信息。因爲它也是一個選舉節點,因此也包含着對應狀態機 KVStoreStateMachine,以及對應的 RaftGroupService,並啓動裏面的 RpcServer 進行選舉同步。
這個裏面有個transferLeadershipTo方法,這個可被調用用於平衡當前節點分區的Leader,避免壓力重疊。
DefaultRegionKVService 是 RegionKVService 的默認實現類,主要處理對 Region 的具體操做。
須要特別講到的是,在具體的 RheaKV 操做時,FailoverClosure 擔任着比較重要的角色,也給整個系統增長了必定的容錯性。假如在一次 scan 操做中,若是跨 Store 須要多節點 scan 數據的時候,任何網絡抖動都會形成數據不完整或者失敗狀況,因此容許必定次數的重試有利於提升系統的可用性,可是重試次數不宜太高,若是出現網絡堵塞,屢次 timeout 級別失敗會給系統帶來額外的壓力。這裏只須要在 DefaultRheaKVStore 中,進行配置 failoverRetries 設置次數便可。
PlacementDriverClient 接口主要由 AbstractPlacementDriverClient 實現,而後 FakePlacementDriverClient、RemotePlacementDriverClient 爲主要功能。FakePlacementDriverClient 是當系統不須要 PD 的時候進行 PD 對象的模擬,這裏主要講到 RemotePlacementDriverClient。
因爲不少傳統存儲中間件並不原生支持分佈式,因此一直少有體感,Raft 協議是一套比較比較好理解的共識協議,SOFAJRaft 通俗易懂是一個很是好的代碼和工程範例,同時 RheaKV 也是一套很是輕量化支持多存儲結構可分片的嵌入式數據庫。寫一篇代碼分析文章也是一個學習和進步的過程,由此咱們也能夠窺探到了一些數據庫的基礎實現,祝願社區能在 SOFAJRaft / RheaKV 基礎上構建更加靈活和自治理的系統和應用。