上一篇講了RheaKV是如何進行初始化的,由於RheaKV主要是用來作KV存儲的,RheaKV讀寫的是至關的複雜,一塊兒寫會篇幅太長,因此這一篇主要來說一下RheaKV中如何存放數據。html
咱們這裏使用一個客戶端的例子來開始本次的講解:java
public static void main(final String[] args) throws Exception { final Client client = new Client(); client.init(); //get(client.getRheaKVStore()); RheaKVStore rheaKVStore = client.getRheaKVStore(); final byte[] key = writeUtf8("hello"); final byte[] value = writeUtf8("world"); rheaKVStore.bPut(key, value); client.shutdown(); }
咱們從這個main方法中啓動咱們的實例,調用rheaKVStore.bPut(key, value)方法將數據放入到RheaKV中。node
public class Client { private final RheaKVStore rheaKVStore = new DefaultRheaKVStore(); public void init() { final List<RegionRouteTableOptions> regionRouteTableOptionsList = MultiRegionRouteTableOptionsConfigured .newConfigured() // .withInitialServerList(-1L /* default id */, Configs.ALL_NODE_ADDRESSES) // .config(); final PlacementDriverOptions pdOpts = PlacementDriverOptionsConfigured.newConfigured() // .withFake(true) // .withRegionRouteTableOptionsList(regionRouteTableOptionsList) // .config(); final RheaKVStoreOptions opts = RheaKVStoreOptionsConfigured.newConfigured() // .withClusterName(Configs.CLUSTER_NAME) // .withPlacementDriverOptions(pdOpts) // .config(); System.out.println(opts); rheaKVStore.init(opts); } public void shutdown() { this.rheaKVStore.shutdown(); } public RheaKVStore getRheaKVStore() { return rheaKVStore; } } public class Configs { public static String ALL_NODE_ADDRESSES = "127.0.0.1:8181,127.0.0.1:8182,127.0.0.1:8183"; public static String CLUSTER_NAME = "rhea_example"; }
Client在調用init方法初始化rheaKVStore的時候和咱們上一節中講的server例子很像,區別是少了StoreEngineOptions的設置和多配置了一個regionRouteTableOptionsList實例。api
咱們這裏存入數據會調用DefaultRheaKVStore的bPut方法: DefaultRheaKVStore#bPutapp
public Boolean bPut(final byte[] key, final byte[] value) { return FutureHelper.get(put(key, value), this.futureTimeoutMillis); }
bPut方法裏面主要的存放數據的操做在put方法裏面作的,put方法會返回一個CompletableFuture給FutureHelper的get方法調用,而且在bPut方法裏面會放入一個超時時間,在init方法中初始化的,默認是5秒。負載均衡
接下來咱們進入到put方法中: DefaultRheaKVStore#put框架
public CompletableFuture<Boolean> put(final byte[] key, final byte[] value) { Requires.requireNonNull(key, "key"); Requires.requireNonNull(value, "value"); //是否嘗試進行批量的put return put(key, value, new CompletableFuture<>(), true); }
這裏會調用put的重載的方法,第三個參數是表示傳入一個空的回調函數,第四個參數表示採用Batch 批量存儲 DefaultRheaKVStore#putless
private CompletableFuture<Boolean> put(final byte[] key, final byte[] value, final CompletableFuture<Boolean> future, final boolean tryBatching) { //校驗一下是否已經init初始化了 checkState(); if (tryBatching) { //putBatching實例在init方法中被初始化 final PutBatching putBatching = this.putBatching; if (putBatching != null && putBatching.apply(new KVEntry(key, value), future)) { //因爲咱們傳入的是一個空的實例,因此這裏直接返回 return future; } } //直接存入數據 internalPut(key, value, future, this.failoverRetries, null); return future; }
checkState方法會去校驗started這個屬性有沒有被設置,若是調用過DefaultRheaKVStore的init方法進行初始化過,那麼會設置started爲ture。 這裏還會調用init方法裏面初始化過的putBatching實例,咱們下面看看putBatching實例作了什麼。異步
putBatching在init實例初始化的時候會傳入一個PutBatchingHandler做爲處理器:async
this.putBatching = new PutBatching(KVEvent::new, "put_batching", new PutBatchingHandler("put"));
咱們下面看看PutBatching的構造方法:
public PutBatching(EventFactory<KVEvent> factory, String name, PutBatchingHandler handler) { super(factory, batchingOpts.getBufSize(), name, handler); }
這裏因爲PutBatching繼承了Batching這個抽象類,因此在實例化的時候直接調用父類的構造器實例化:
public Batching(EventFactory<T> factory, int bufSize, String name, EventHandler<T> handler) { this.name = name; this.disruptor = new Disruptor<>(factory, bufSize, new NamedThreadFactory(name, true)); this.disruptor.handleEventsWith(handler); this.disruptor.setDefaultExceptionHandler(new LogExceptionHandler<Object>(name)); this.ringBuffer = this.disruptor.start(); }
在Batching構造器裏面會初始化一個Disruptor實例,並將咱們傳入的PutBatchingHandler處理器做爲Disruptor的處理器,全部傳入PutBatching的數據都會通過PutBatchingHandler來處理。
咱們下面看看PutBatchingHandler是怎麼處理數據的: PutBatchingHandler#onEvent
public void onEvent(final KVEvent event, final long sequence, final boolean endOfBatch) throws Exception { //1.把傳入的時間加入到集合中 this.events.add(event); //加上key和value的長度 this.cachedBytes += event.kvEntry.length(); final int size = this.events.size(); //BatchSize等於100 ,而且maxWriteBytes字節數32768 //2. 若是不是最後一個event,也沒有這麼多數量的數據,那麼就不發送 if (!endOfBatch && size < batchingOpts.getBatchSize() && this.cachedBytes < batchingOpts.getMaxWriteBytes()) { return; } //3.若是傳入的size爲1,那麼就從新調用put方法放入到Batching裏面 if (size == 1) { //重置events和cachedBytes reset(); final KVEntry kv = event.kvEntry; try { put(kv.getKey(), kv.getValue(), event.future, false); } catch (final Throwable t) { exceptionally(t, event.future); } // 4.若是size不爲1,那麼把數據遍歷到集合裏面批量處理 } else { //初始化一個長度爲size的list final List<KVEntry> entries = Lists.newArrayListWithCapacity(size); final CompletableFuture<Boolean>[] futures = new CompletableFuture[size]; for (int i = 0; i < size; i++) { final KVEvent e = this.events.get(i); entries.add(e.kvEntry); //使用CompletableFuture構建異步應用 futures[i] = e.future; } //遍歷完events數據到entries以後,重置 reset(); try { //當put方法完成後執行whenComplete中的內容 put(entries).whenComplete((result, throwable) -> { //若是沒有拋出異常,那麼通知全部future已經執行完畢了 if (throwable == null) { for (int i = 0; i < futures.length; i++) { futures[i].complete(result); } return; } exceptionally(throwable, futures); }); } catch (final Throwable t) { exceptionally(t, futures); } } }
下面我來說一下PutBatchingHandler#onEvent中的put(entries)這個方法是怎麼處理批量數據的,這個方法會調用到DefaultRheaKVStore的put方法。
DefaultRheaKVStore#put
public CompletableFuture<Boolean> put(final List<KVEntry> entries) { //檢查狀態 checkState(); Requires.requireNonNull(entries, "entries"); Requires.requireTrue(!entries.isEmpty(), "entries empty"); //存放數據 final FutureGroup<Boolean> futureGroup = internalPut(entries, this.failoverRetries, null); //處理返回狀態 return FutureHelper.joinBooleans(futureGroup); }
該方法會調用internalPut進行設值操做。
DefaultRheaKVStore#internalPut
private FutureGroup<Boolean> internalPut(final List<KVEntry> entries, final int retriesLeft, final Throwable lastCause) { //組裝Region和KVEntry的映射關係 final Map<Region, List<KVEntry>> regionMap = this.pdClient .findRegionsByKvEntries(entries, ApiExceptionHelper.isInvalidEpoch(lastCause)); final List<CompletableFuture<Boolean>> futures = Lists.newArrayListWithCapacity(regionMap.size()); final Errors lastError = lastCause == null ? null : Errors.forException(lastCause); for (final Map.Entry<Region, List<KVEntry>> entry : regionMap.entrySet()) { final Region region = entry.getKey(); final List<KVEntry> subEntries = entry.getValue(); //設置重試回調函數,並將重試次數減一 final RetryCallable<Boolean> retryCallable = retryCause -> internalPut(subEntries, retriesLeft - 1, retryCause); final BoolFailoverFuture future = new BoolFailoverFuture(retriesLeft, retryCallable); //把數據存放到region中 internalRegionPut(region, subEntries, future, retriesLeft, lastError); futures.add(future); } return new FutureGroup<>(futures); }
由於一個Store裏面會有不少的Region,因此這個方法首先會去組裝Region和KVEntry的關係,肯定這個KVEntry是屬於哪一個Region的。 而後設置好回調函數後調用internalRegionPut方法將subEntries存入到Region中。
咱們下面看看是怎麼組裝的: pdClient是FakePlacementDriverClient的實例,繼承了AbstractPlacementDriverClient,因此調用的是父類的findRegionsByKvEntries方法 AbstractPlacementDriverClient#findRegionsByKvEntries
public Map<Region, List<KVEntry>> findRegionsByKvEntries(final List<KVEntry> kvEntries, final boolean forceRefresh) { if (forceRefresh) { refreshRouteTable(); } //regionRouteTable裏面存了region的路由信息 return this.regionRouteTable.findRegionsByKvEntries(kvEntries); }
由於咱們這裏是用的FakePlacementDriverClient,因此refreshRouteTable返回的是一個空方法,因此往下走是調用RegionRouteTable的findRegionsByKvEntries的方法 RegionRouteTable#findRegionsByKvEntries
public Map<Region, List<KVEntry>> findRegionsByKvEntries(final List<KVEntry> kvEntries) { Requires.requireNonNull(kvEntries, "kvEntries"); //實例化一個map final Map<Region, List<KVEntry>> regionMap = Maps.newHashMap(); final StampedLock stampedLock = this.stampedLock; final long stamp = stampedLock.readLock(); try { for (final KVEntry kvEntry : kvEntries) { //根據kvEntry的key去找和region的startKey最接近的region final Region region = findRegionByKeyWithoutLock(kvEntry.getKey()); //設置region和KVEntry的映射關係 regionMap.computeIfAbsent(region, k -> Lists.newArrayList()).add(kvEntry); } return regionMap; } finally { stampedLock.unlockRead(stamp); } } private Region findRegionByKeyWithoutLock(final byte[] key) { // return the greatest key less than or equal to the given key //rangeTable裏面存的是region的startKey,value是regionId // 這裏返回小於等於key的第一個元素 final Map.Entry<byte[], Long> entry = this.rangeTable.floorEntry(key); if (entry == null) { reportFail(key); throw reject(key, "fail to find region by key"); } //regionTable裏面存的regionId,value是region return this.regionTable.get(entry.getValue()); }
findRegionsByKvEntries方法會遍歷全部的KVEntry集合,而後調用findRegionByKeyWithoutLock去rangeTable裏面找合適的region,因爲rangeTable是一個treemap,因此調用了floorEntry返回的是小於等於key的第一個region。 而後將region放入到regionMap裏,key是regionMap,value是一個KVEntry集合。
regionRouteTable裏面的數據是在DefaultRheaKVStore初始化的時候傳入的,不記得的同窗我給出了初始化路由表的過程:
DefaultRheaKVStore#init->FakePlacementDriverClient#init-> AbstractPlacementDriverClient#init->AbstractPlacementDriverClient#initRouteTableByRegion->regionRouteTable#addOrUpdateRegion
咱們接着DefaultRheaKVStore的internalPut的方法往下看到internalRegionPut方法,這個方法是真正存儲數據的地方:
DefaultRheaKVStore#internalRegionPut
private void internalRegionPut(final Region region, final List<KVEntry> subEntries, final CompletableFuture<Boolean> future, final int retriesLeft, final Errors lastCause) { //獲取regionEngine final RegionEngine regionEngine = getRegionEngine(region.getId(), true); //重試函數,會回調當前的方法 final RetryRunner retryRunner = retryCause -> internalRegionPut(region, subEntries, future, retriesLeft - 1, retryCause); final FailoverClosure<Boolean> closure = new FailoverClosureImpl<>(future, false, retriesLeft, retryRunner); if (regionEngine != null) { if (ensureOnValidEpoch(region, regionEngine, closure)) { //獲取MetricsRawKVStore final RawKVStore rawKVStore = getRawKVStore(regionEngine); //在init方法中根據useParallelKVExecutor屬性決定是否是空 if (this.kvDispatcher == null) { //調用RockDB的api進行插入 rawKVStore.put(subEntries, closure); } else { //把put操做分發到kvDispatcher中異步執行 this.kvDispatcher.execute(() -> rawKVStore.put(subEntries, closure)); } } } else { //若是當前節點不是leader,那麼則返回的regionEngine爲null //那麼發起rpc調用到leader節點中 final BatchPutRequest request = new BatchPutRequest(); request.setKvEntries(subEntries); request.setRegionId(region.getId()); request.setRegionEpoch(region.getRegionEpoch()); this.rheaKVRpcService.callAsyncWithRpc(request, closure, lastCause); } }
這個方法首先調用getRegionEngine獲取regionEngine,由於咱們這裏是client節點,沒有初始化RegionEngine,因此這裏獲取的爲空,會直接經過rpc請求發送,而後交由KVCommandProcessor進行處理。 若是當前的節點是server,而且該RegionEngine是leader,那麼會調用rawKVStore而後調用put方法插入到RockDB中。
咱們最後再看看rheaKVRpcService發送的rpc請求是怎麼被處理的。
向服務端發送put請求是經過調用DefaultRheaKVRpcService的callAsyncWithRpc方法發起的: DefaultRheaKVRpcService#callAsyncWithRpc
public <V> CompletableFuture<V> callAsyncWithRpc(final BaseRequest request, final FailoverClosure<V> closure, final Errors lastCause) { return callAsyncWithRpc(request, closure, lastCause, true); } public <V> CompletableFuture<V> callAsyncWithRpc(final BaseRequest request, final FailoverClosure<V> closure, final Errors lastCause, final boolean requireLeader) { final boolean forceRefresh = ErrorsHelper.isInvalidPeer(lastCause); //獲取leader的endpoint final Endpoint endpoint = getRpcEndpoint(request.getRegionId(), forceRefresh, this.rpcTimeoutMillis, requireLeader); //發起rpc調用 internalCallAsyncWithRpc(endpoint, request, closure); return closure.future(); }
在這個方法裏會調用getRpcEndpoint方法來獲取region所對應server的endpoint,而後對這個節點調用rpc請求。調用rpc請求都是sofa的bolt框架進行調用的,因此下面咱們重點看怎麼獲取endpoint
DefaultRheaKVRpcService#getRpcEndpoint
public Endpoint getRpcEndpoint(final long regionId, final boolean forceRefresh, final long timeoutMillis, final boolean requireLeader) { if (requireLeader) { //獲取leader return getLeader(regionId, forceRefresh, timeoutMillis); } else { //輪詢獲取一個不是本身的節點 return getLuckyPeer(regionId, forceRefresh, timeoutMillis); } }
這裏有兩個分支,一個是獲取leader節點,一個是輪詢獲取節點。因爲這兩個方法挺有意思的,因此咱們下面兩個方法都講一下
根據regionId獲取leader節點是由getLeader方法觸發的,在咱們調用DefaultRheaKVStore的init方法實例化DefaultRheaKVRpcService的時候會重寫getLeader方法: DefaultRheaKVStore#init
this.rheaKVRpcService = new DefaultRheaKVRpcService(this.pdClient, selfEndpoint) { @Override public Endpoint getLeader(final long regionId, final boolean forceRefresh, final long timeoutMillis) { final Endpoint leader = getLeaderByRegionEngine(regionId); if (leader != null) { return leader; } return super.getLeader(regionId, forceRefresh, timeoutMillis); } };
重寫的getLeader方法會調用getLeaderByRegionEngine方法區根據regionId找Endpoint,若是找不到,那麼會調用父類的getLeader方法。
DefaultRheaKVStore#getLeaderByRegionEngine
private Endpoint getLeaderByRegionEngine(final long regionId) { final RegionEngine regionEngine = getRegionEngine(regionId); if (regionEngine != null) { final PeerId leader = regionEngine.getLeaderId(); if (leader != null) { final String raftGroupId = JRaftHelper.getJRaftGroupId(this.pdClient.getClusterName(), regionId); RouteTable.getInstance().updateLeader(raftGroupId, leader); return leader.getEndpoint(); } } return null; }
這個方法這裏會獲取RegionEngine,可是咱們這裏是client節點,是沒有初始化RegionEngine的,因此這裏就會返回null,接着返回到上一級中調用父類的getLeader方法。
DefaultRheaKVRpcService#getLeader
public Endpoint getLeader(final long regionId, final boolean forceRefresh, final long timeoutMillis) { return this.pdClient.getLeader(regionId, forceRefresh, timeoutMillis); }
這裏會調用pdClient的getLeader方法,這裏咱們傳入的pdClient是FakePlacementDriverClient,它繼承了AbstractPlacementDriverClient,因此會調用到父類的getLeader方法中。
AbstractPlacementDriverClient#getLeader
public Endpoint getLeader(final long regionId, final boolean forceRefresh, final long timeoutMillis) { //這裏會根據clusterName和regionId拼接出raftGroupId final String raftGroupId = JRaftHelper.getJRaftGroupId(this.clusterName, regionId); //去路由表裏找這個集羣的leader PeerId leader = getLeader(raftGroupId, forceRefresh, timeoutMillis); if (leader == null && !forceRefresh) { // Could not found leader from cache, try again and force refresh cache // 若是第一次沒有找到,那麼執行強制刷新的方法再找一次 leader = getLeader(raftGroupId, true, timeoutMillis); } if (leader == null) { throw new RouteTableException("no leader in group: " + raftGroupId); } return leader.getEndpoint(); }
這個方法裏面會根據clusterName和regionId拼接raftGroupId,若是傳入的clusterName爲demo,regionId爲1,那麼拼接出來的raftGroupId就是:demo--1
。 而後會去調用getLeader獲取leader的PeerId,第一次調用這個方法傳入的forceRefresh爲false,表示不用刷新,若是返回的爲null,那麼會執行強制刷新再去找一次。
AbstractPlacementDriverClient#getLeader
protected PeerId getLeader(final String raftGroupId, final boolean forceRefresh, final long timeoutMillis) { final RouteTable routeTable = RouteTable.getInstance(); //是否要強制刷新路由表 if (forceRefresh) { final long deadline = System.currentTimeMillis() + timeoutMillis; final StringBuilder error = new StringBuilder(); // A newly launched raft group may not have been successful in the election, // or in the 'leader-transfer' state, it needs to be re-tried Throwable lastCause = null; for (;;) { try { //刷新節點路由表 final Status st = routeTable.refreshLeader(this.cliClientService, raftGroupId, 2000); if (st.isOk()) { break; } error.append(st.toString()); } catch (final InterruptedException e) { ThrowUtil.throwException(e); } catch (final Throwable t) { lastCause = t; error.append(t.getMessage()); } //若是尚未到截止時間,那麼sleep10毫秒以後再刷新 if (System.currentTimeMillis() < deadline) { LOG.debug("Fail to find leader, retry again, {}.", error); error.append(", "); try { Thread.sleep(10); } catch (final InterruptedException e) { ThrowUtil.throwException(e); } // 到了截止時間,那麼拋出異常 } else { throw lastCause != null ? new RouteTableException(error.toString(), lastCause) : new RouteTableException(error.toString()); } } } //返回路由表裏面的leader return routeTable.selectLeader(raftGroupId); }
若是要執行強制刷新,那麼會計算一下超時時間,而後調用死循環,在循環體裏面會去刷新路由表,若是沒有刷新成功也沒有超時,那麼會sleep10毫秒從新再刷。
RouteTable#refreshLeader
public Status refreshLeader(final CliClientService cliClientService, final String groupId, final int timeoutMs) throws InterruptedException, TimeoutException { Requires.requireTrue(!StringUtils.isBlank(groupId), "Blank group id"); Requires.requireTrue(timeoutMs > 0, "Invalid timeout: " + timeoutMs); //根據集羣的id去獲取集羣的配置信息,裏面包括集羣的ip和端口號 final Configuration conf = getConfiguration(groupId); if (conf == null) { return new Status(RaftError.ENOENT, "Group %s is not registered in RouteTable, forgot to call updateConfiguration?", groupId); } final Status st = Status.OK(); final CliRequests.GetLeaderRequest.Builder rb = CliRequests.GetLeaderRequest.newBuilder(); rb.setGroupId(groupId); //發送獲取leader節點的請求 final CliRequests.GetLeaderRequest request = rb.build(); TimeoutException timeoutException = null; for (final PeerId peer : conf) { //若是鏈接不上,先設置狀態爲error,而後continue if (!cliClientService.connect(peer.getEndpoint())) { if (st.isOk()) { st.setError(-1, "Fail to init channel to %s", peer); } else { final String savedMsg = st.getErrorMsg(); st.setError(-1, "%s, Fail to init channel to %s", savedMsg, peer); } continue; } //向這個節點發送獲取leader的GetLeaderRequest請求 final Future<Message> result = cliClientService.getLeader(peer.getEndpoint(), request, null); try { final Message msg = result.get(timeoutMs, TimeUnit.MILLISECONDS); //異常狀況的處理 if (msg instanceof RpcRequests.ErrorResponse) { if (st.isOk()) { st.setError(-1, ((RpcRequests.ErrorResponse) msg).getErrorMsg()); } else { final String savedMsg = st.getErrorMsg(); st.setError(-1, "%s, %s", savedMsg, ((RpcRequests.ErrorResponse) msg).getErrorMsg()); } } else { final CliRequests.GetLeaderResponse response = (CliRequests.GetLeaderResponse) msg; //重置leader updateLeader(groupId, response.getLeaderId()); return Status.OK(); } } catch (final TimeoutException e) { timeoutException = e; } catch (final ExecutionException e) { if (st.isOk()) { st.setError(-1, e.getMessage()); } else { final String savedMsg = st.getErrorMsg(); st.setError(-1, "%s, %s", savedMsg, e.getMessage()); } } } if (timeoutException != null) { throw timeoutException; } return st; }
你們不要一開始就被這樣的長的方法給迷惑住了,這個方法實際上很是的簡單:
updateLeader方法至關節點,裏面就是更新一下路由表的leader屬性,咱們這裏看看server是怎麼處理GetLeaderRequest請求的
GetLeaderRequest由GetLeaderRequestProcessor處理器來進行處理。 GetLeaderRequestProcessor#processRequest
public Message processRequest(GetLeaderRequest request, RpcRequestClosure done) { List<Node> nodes = new ArrayList<>(); String groupId = getGroupId(request); //若是請求是指定某個PeerId //那麼則則去集羣裏找到指定Peer所對應的node if (request.hasPeerId()) { String peerIdStr = getPeerId(request); PeerId peer = new PeerId(); if (peer.parse(peerIdStr)) { Status st = new Status(); nodes.add(getNode(groupId, peer, st)); if (!st.isOk()) { return RpcResponseFactory.newResponse(st); } } else { return RpcResponseFactory.newResponse(RaftError.EINVAL, "Fail to parse peer id %", peerIdStr); } } else { //獲取集羣全部的節點 nodes = NodeManager.getInstance().getNodesByGroupId(groupId); } if (nodes == null || nodes.isEmpty()) { return RpcResponseFactory.newResponse(RaftError.ENOENT, "No nodes in group %s", groupId); } //遍歷集羣node,獲取leaderId for (Node node : nodes) { PeerId leader = node.getLeaderId(); if (leader != null && !leader.isEmpty()) { return GetLeaderResponse.newBuilder().setLeaderId(leader.toString()).build(); } } return RpcResponseFactory.newResponse(RaftError.EAGAIN, "Unknown leader"); }
這裏因爲咱們穿過來的request並無攜帶PeerId,因此不會去獲取指定的peer對應node節點的leaderId,而是會去找到集羣groupId對應的全部節點,而後遍歷節點找到對應的leaderId。
在上面咱們講完了getLeader是怎麼實現的,下面咱們講一下getLuckyPeer這個方法裏面是怎麼操做的。
public Endpoint getLuckyPeer(final long regionId, final boolean forceRefresh, final long timeoutMillis) { return this.pdClient.getLuckyPeer(regionId, forceRefresh, timeoutMillis, this.selfEndpoint); }
這裏和getLeader方法同樣會調用到AbstractPlacementDriverClient的getLuckyPeer方法中 AbstractPlacementDriverClient#getLuckyPeer
public Endpoint getLuckyPeer(final long regionId, final boolean forceRefresh, final long timeoutMillis, final Endpoint unExpect) { final String raftGroupId = JRaftHelper.getJRaftGroupId(this.clusterName, regionId); final RouteTable routeTable = RouteTable.getInstance(); //是否要強制刷新一下最新的集羣節點信息 if (forceRefresh) { final long deadline = System.currentTimeMillis() + timeoutMillis; final StringBuilder error = new StringBuilder(); // A newly launched raft group may not have been successful in the election, // or in the 'leader-transfer' state, it needs to be re-tried for (;;) { try { final Status st = routeTable.refreshConfiguration(this.cliClientService, raftGroupId, 5000); if (st.isOk()) { break; } error.append(st.toString()); } catch (final InterruptedException e) { ThrowUtil.throwException(e); } catch (final TimeoutException e) { error.append(e.getMessage()); } if (System.currentTimeMillis() < deadline) { LOG.debug("Fail to get peers, retry again, {}.", error); error.append(", "); try { Thread.sleep(5); } catch (final InterruptedException e) { ThrowUtil.throwException(e); } } else { throw new RouteTableException(error.toString()); } } } final Configuration configs = routeTable.getConfiguration(raftGroupId); if (configs == null) { throw new RouteTableException("empty configs in group: " + raftGroupId); } final List<PeerId> peerList = configs.getPeers(); if (peerList == null || peerList.isEmpty()) { throw new RouteTableException("empty peers in group: " + raftGroupId); } //若是這個集羣裏只有一個節點了,那麼直接返回就行了 final int size = peerList.size(); if (size == 1) { return peerList.get(0).getEndpoint(); } //獲取負載均衡器,這裏用的是輪詢策略 final RoundRobinLoadBalancer balancer = RoundRobinLoadBalancer.getInstance(regionId); for (int i = 0; i < size; i++) { final PeerId candidate = balancer.select(peerList); final Endpoint luckyOne = candidate.getEndpoint(); if (!luckyOne.equals(unExpect)) { return luckyOne; } } throw new RouteTableException("have no choice in group(peers): " + raftGroupId); }
這個方法裏面也有一個是否要強制刷新的判斷,和getLeader方法同樣,再也不贅述。而後會判斷一下集羣裏面若是不止一個有效節點,那麼會調用輪詢策略來選取節點,這個輪詢的操做十分簡單,就是一個全局的index每次調用加一,而後和傳入的peerList集合的size取模。
到這裏DefaultRheaKVRpcService的callAsyncWithRpc方法就差很少講解完畢了,而後會向server端發起請求,在KVCommandProcessor處理BatchPutRequest請求。
BatchPutRequest的請求在KVCommandProcessor中被處理。 KVCommandProcessor#handleRequest
public void handleRequest(final BizContext bizCtx, final AsyncContext asyncCtx, final T request) { Requires.requireNonNull(request, "request"); final RequestProcessClosure<BaseRequest, BaseResponse<?>> closure = new RequestProcessClosure<>(request, bizCtx, asyncCtx); //根據傳入的RegionId去找到對應的RegionKVService //每一個 RegionKVService 對應一個 Region,只處理自己 Region 範疇內的請求 final RegionKVService regionKVService = this.storeEngine.getRegionKVService(request.getRegionId()); if (regionKVService == null) { //若是不存在則返回空 final NoRegionFoundResponse noRegion = new NoRegionFoundResponse(); noRegion.setRegionId(request.getRegionId()); noRegion.setError(Errors.NO_REGION_FOUND); noRegion.setValue(false); closure.sendResponse(noRegion); return; } switch (request.magic()) { case BaseRequest.PUT: regionKVService.handlePutRequest((PutRequest) request, closure); break; case BaseRequest.BATCH_PUT: regionKVService.handleBatchPutRequest((BatchPutRequest) request, closure); break; ..... default: throw new RheaRuntimeException("Unsupported request type: " + request.getClass().getName()); } }
handleRequest首先會根據RegionId去找RegionKVService,RegionKVService在初始化RegionEngine的時候會註冊到regionKVServiceTable中。 而後根據請求的類型判斷request是什麼請求。這裏咱們省略其餘請求,只看BATCH_PUT是怎麼作的。
在往下講代碼以前,我先來給個流程調用指指路:
BATCH_PUT對應會調用到DefaultRegionKVService的handleBatchPutRequest方法中 。 DefaultRegionKVService#handleBatchPutRequest
public void handlePutRequest(final PutRequest request, final RequestProcessClosure<BaseRequest, BaseResponse<?>> closure) { //設置一個響應response final PutResponse response = new PutResponse(); response.setRegionId(getRegionId()); response.setRegionEpoch(getRegionEpoch()); try { KVParameterRequires.requireSameEpoch(request, getRegionEpoch()); final byte[] key = KVParameterRequires.requireNonNull(request.getKey(), "put.key"); final byte[] value = KVParameterRequires.requireNonNull(request.getValue(), "put.value"); //這個實例是MetricsRawKVStore this.rawKVStore.put(key, value, new BaseKVStoreClosure() { //設置回調函數 @Override public void run(final Status status) { if (status.isOk()) { response.setValue((Boolean) getData()); } else { setFailure(request, response, status, getError()); } closure.sendResponse(response); } }); } catch (final Throwable t) { LOG.error("Failed to handle: {}, {}.", request, StackTraceUtil.stackTrace(t)); response.setError(Errors.forException(t)); closure.sendResponse(response); } }
handlePutRequest方法十分地簡單,經過獲取key和value以後調用MetricsRawKVStore的put方法,傳入key和value並設置回調函數。
MetricsRawKVStore#put
public void put(final byte[] key, final byte[] value, final KVStoreClosure closure) { final KVStoreClosure c = metricsAdapter(closure, PUT, 1, value.length); //rawKVStore是RaftRawKVStore的實例 this.rawKVStore.put(key, value, c); }
put方法會繼續調用RaftRawKVStore的put方法。 RaftRawKVStore#put
public void put(final byte[] key, final byte[] value, final KVStoreClosure closure) { applyOperation(KVOperation.createPut(key, value), closure); }
Put方法會調用KVOperation的靜態方法建立一個類型爲put的KVOperation實例,而後調用applyOperation方法。
RaftRawKVStore#applyOperation
private void applyOperation(final KVOperation op, final KVStoreClosure closure) { //這裏必須保證 Leader 節點操做申請任務 if (!isLeader()) { closure.setError(Errors.NOT_LEADER); closure.run(new Status(RaftError.EPERM, "Not leader")); return; } final Task task = new Task(); //封裝數據 task.setData(ByteBuffer.wrap(Serializers.getDefault().writeObject(op))); //封裝回調方法 task.setDone(new KVClosureAdapter(closure, op)); //調用NodeImpl的apply方法 this.node.apply(task); }
applyOperation方法裏面會校驗是否是leader,若是不是leader那麼就不能執行任務申請的操做。而後實例化一個Task實例,設置數據和回調Adapter後調用NodeImple的apply發佈任務。
NodeImpl#apply
public void apply(final Task task) { //檢查Node是否是被關閉了 if (this.shutdownLatch != null) { Utils.runClosureInThread(task.getDone(), new Status(RaftError.ENODESHUTDOWN, "Node is shutting down.")); throw new IllegalStateException("Node is shutting down"); } //校驗不能爲空 Requires.requireNonNull(task, "Null task"); //將task裏面的數據放入到LogEntry中 final LogEntry entry = new LogEntry(); entry.setData(task.getData()); //重試次數 int retryTimes = 0; try { //實例化一個Disruptor事件 final EventTranslator<LogEntryAndClosure> translator = (event, sequence) -> { event.reset(); event.done = task.getDone(); event.entry = entry; event.expectedTerm = task.getExpectedTerm(); }; while (true) { //發佈事件後交給LogEntryAndClosureHandler事件處理器處理 if (this.applyQueue.tryPublishEvent(translator)) { break; } else { retryTimes++; //最多重試3次 if (retryTimes > MAX_APPLY_RETRY_TIMES) { //不成功則進行回調,通知處理狀態 Utils.runClosureInThread(task.getDone(), new Status(RaftError.EBUSY, "Node is busy, has too many tasks.")); LOG.warn("Node {} applyQueue is overload.", getNodeId()); this.metrics.recordTimes("apply-task-overload-times", 1); return; } ThreadHelper.onSpinWait(); } } } catch (final Exception e) { Utils.runClosureInThread(task.getDone(), new Status(RaftError.EPERM, "Node is down.")); } }
在apply方法裏面會將數據封裝到LogEntry實例中,而後將LogEntry打包成一個Disruptor事件發佈到applyQueue隊列裏面去。applyQueue隊列在NodeImpl的init方法裏面初始化,並設置處理器爲LogEntryAndClosureHandler。
LogEntryAndClosureHandler#onEvent
private final List<LogEntryAndClosure> tasks = new ArrayList<>(NodeImpl.this.raftOptions.getApplyBatch()); @Override public void onEvent(final LogEntryAndClosure event, final long sequence, final boolean endOfBatch) throws Exception { //若是接收到了要關閉的請求 if (event.shutdownLatch != null) { //tasks隊列裏面的任務又不爲空,那麼先處理隊列裏面的數據 if (!this.tasks.isEmpty()) { //處理tasks executeApplyingTasks(this.tasks); } final int num = GLOBAL_NUM_NODES.decrementAndGet(); LOG.info("The number of active nodes decrement to {}.", num); event.shutdownLatch.countDown(); return; } //將新的event加入到tasks中 this.tasks.add(event); //由於設置了32爲一個批次,因此若是tasks裏面的任務達到了32或者已是最後一個event, // 那麼就執行tasks集合裏面的數據 if (this.tasks.size() >= NodeImpl.this.raftOptions.getApplyBatch() || endOfBatch) { executeApplyingTasks(this.tasks); this.tasks.clear(); } }
onEvent方法會校驗收到的事件是不是請求關閉隊列,若是是的話,那麼會先把tasks集合裏面的數據執行完畢再返回。若是是正常的事件,那麼校驗一下tasks集合裏面的個數是否是已經到達了32個,或者是否是已是最後一個事件了,那麼會執行executeApplyingTasks進行批量處理數據。
NodeImpl#executeApplyingTasks
private void executeApplyingTasks(final List<LogEntryAndClosure> tasks) { this.writeLock.lock(); try { final int size = tasks.size(); //若是當前節點不是leader,那麼就不往下進行 if (this.state != State.STATE_LEADER) { final Status st = new Status(); if (this.state != State.STATE_TRANSFERRING) { st.setError(RaftError.EPERM, "Is not leader."); } else { st.setError(RaftError.EBUSY, "Is transferring leadership."); } LOG.debug("Node {} can't apply, status={}.", getNodeId(), st); //處理全部的LogEntryAndClosure,發送回調響應 for (int i = 0; i < size; i++) { Utils.runClosureInThread(tasks.get(i).done, st); } return; } final List<LogEntry> entries = new ArrayList<>(size); for (int i = 0; i < size; i++) { final LogEntryAndClosure task = tasks.get(i); //若是任其不對,那麼直接調用回調函數發送Error if (task.expectedTerm != -1 && task.expectedTerm != this.currTerm) { LOG.debug("Node {} can't apply task whose expectedTerm={} doesn't match currTerm={}.", getNodeId(), task.expectedTerm, this.currTerm); if (task.done != null) { final Status st = new Status(RaftError.EPERM, "expected_term=%d doesn't match current_term=%d", task.expectedTerm, this.currTerm); Utils.runClosureInThread(task.done, st); } continue; } //保存應用上下文 if (!this.ballotBox.appendPendingTask(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf(), task.done)) { Utils.runClosureInThread(task.done, new Status(RaftError.EINTERNAL, "Fail to append task.")); continue; } // set task entry info before adding to list. task.entry.getId().setTerm(this.currTerm); //設置entry的類型爲ENTRY_TYPE_DATA task.entry.setType(EnumOutter.EntryType.ENTRY_TYPE_DATA); entries.add(task.entry); } //批量提交申請任務日誌寫入 RocksDB this.logManager.appendEntries(entries, new LeaderStableClosure(entries)); // update conf.first this.conf = this.logManager.checkAndSetConfiguration(this.conf); } finally { this.writeLock.unlock(); } }
executeApplyingTasks中會校驗當前的節點是否是leader,由於Raft 副本節點 Node 執行申請任務檢查當前狀態是否爲 STATE_LEADER,必須保證 Leader 節點操做申請任務。 循環遍歷節點服務事件判斷任務的預估任期是否等於當前節點任期,Leader 沒有發生變動的階段內提交的日誌擁有相同的 Term 編號,節點 Node 任期知足預期則 Raft 協議投票箱 BallotBox 調用 appendPendingTask(conf, oldConf, done) 日誌複製以前保存應用上下文,即基於當前節點配置以及原始配置建立選票 Ballot 添加到選票雙向隊列 pendingMetaQueue。 而後日誌管理器 LogManager 調用底層日誌存儲 LogStorage#appendEntries(entries) 批量提交申請任務日誌寫入 RocksDB。
接下來經過 Node#apply(task) 提交的申請任務最終將會複製應用到全部 Raft 節點上的狀態機,RheaKV 狀態機經過繼承 StateMachineAdapter 狀態機適配器的 KVStoreStateMachine 表示。 Raft 狀態機 KVStoreStateMachine 調用 onApply(iterator) 方法按照提交順序應用任務列表到狀態機。 KVStoreStateMachine 狀態機迭代狀態輸出列表積攢鍵值狀態列表批量申請 RocksRawKVStore 調用 batch(kvStates) 方法運行相應鍵值操做存儲到 RocksDB。
這一篇是至關的長流程也是很是的複雜,裏面的各個地方代碼寫的都很是的縝密。咱們主要介紹了putBatching皮處理器是怎麼使用Disruptor批量的處理數據,從而作到提高總體的吞吐量。還講解了在發起請求的時候是如何獲取server端的endpoint的。而後還了解了BatchPutRequest請求是怎麼被server處理的,以及在代碼中怎麼體現經過Batch + 全異步機制大幅度提高吞吐的。
原文出處:https://www.cnblogs.com/luozhiyun/p/11830635.html