6. SOFAJRaft源碼分析— 透過RheaKV看線性一致性讀

開篇

其實這篇文章我原本想在講完選舉的時候就開始講線性一致性讀的,可是感受直接講沒頭沒尾的看起來比比較困難,因此就有了RheaKV的系列,這是RheaKV,終於能夠講一下SOFAJRaft的線性一致性讀是怎麼作到了的。所謂線性一致性,一個簡單的例子是在 T1 的時間寫入一個值,那麼在 T1 以後讀必定能讀到這個值,不可能讀到 T1 以前的值。html

其中部份內容參考SOFAJRaft文檔:
SOFAJRaft 線性一致讀實現剖析 | SOFAJRaft 實現原理
SOFAJRaft 實現原理 - SOFAJRaft-RheaKV 是如何使用 Raft 的java

RheaKV讀取數據

RheaKV的讀取數據的入口是DefaultRheaKVStore的bGet。node

DefaultRheaKVStore#bGet安全

public byte[] bGet(final String key) {
    return FutureHelper.get(get(key), this.futureTimeoutMillis);
}

bGet方法中會一直調用到DefaultRheaKVStore的一個get方法中:
DefaultRheaKVStore#getapp

private CompletableFuture<byte[]> get(final byte[] key, final boolean readOnlySafe,
                                      final CompletableFuture<byte[]> future, final boolean tryBatching) {
    //校驗started狀態
    checkState();
    Requires.requireNonNull(key, "key");
    if (tryBatching) {
        final GetBatching getBatching = readOnlySafe ? this.getBatchingOnlySafe : this.getBatching;
        if (getBatching != null && getBatching.apply(key, future)) {
            return future;
        }
    }
    internalGet(key, readOnlySafe, future, this.failoverRetries, null, this.onlyLeaderRead);
    return future;
}

get方法會根據傳入的參數來判斷是否採用批處理的方式來讀取數據,readOnlySafe表示是否開啓線程一致性讀,因爲咱們調用的是get方法,因此readOnlySafe和tryBatching都會返回true。
因此這裏會調用getBatchingOnlySafe的apply方法,將key和future傳入。
getBatchingOnlySafe是在咱們初始化DefaultRheaKVStore的時候初始化的:
DefaultRheaKVStore#init異步

.....
this.getBatchingOnlySafe = new GetBatching(KeyEvent::new, "get_batching_only_safe",
        new GetBatchingHandler("get_only_safe", true));
.....

在初始化getBatchingOnlySafe的時候傳入的處理器是GetBatchingHandler。分佈式

而後咱們回到getBatchingOnlySafe#apply中,看看這個方法作了什麼:ide

public boolean apply(final byte[] message, final CompletableFuture<byte[]> future) {
    //GetBatchingHandler
    return this.ringBuffer.tryPublishEvent((event, sequence) -> {
        event.reset();
        event.key = message;
        event.future = future;
    });
}

apply方法會向Disruptor發送一個事件進行異步處理,並把咱們的key封裝到event的key中。getBatchingOnlySafe的處理器是GetBatchingHandler。函數

批量獲取數據

GetBatchingHandler#onEvent源碼分析

public void onEvent(final KeyEvent event, final long sequence, final boolean endOfBatch) throws Exception {
    this.events.add(event);
    this.cachedBytes += event.key.length;
    final int size = this.events.size();
    //校驗一下數據量,沒有達到MaxReadBytes而且不是最後一個event,那麼直接返回
    if (!endOfBatch && size < batchingOpts.getBatchSize() && this.cachedBytes < batchingOpts.getMaxReadBytes()) {
        return;
    }

    if (size == 1) {
        reset();
        try {
            //若是隻是一個get請求,那麼不須要進行批量處理
            get(event.key, this.readOnlySafe, event.future, false);
        } catch (final Throwable t) {
            exceptionally(t, event.future);
        }
    } else {
        //初始化一個剛恰好大小的集合
        final List<byte[]> keys = Lists.newArrayListWithCapacity(size);
        final CompletableFuture<byte[]>[] futures = new CompletableFuture[size];
        for (int i = 0; i < size; i++) {
            final KeyEvent e = this.events.get(i);
            keys.add(e.key);
            futures[i] = e.future;
        }
        //遍歷完events數據到entries以後,重置
        reset();
        try {
            multiGet(keys, this.readOnlySafe).whenComplete((result, throwable) -> {
                //異步回調處理數據
                if (throwable == null) {
                    for (int i = 0; i < futures.length; i++) {
                        final ByteArray realKey = ByteArray.wrap(keys.get(i));
                        futures[i].complete(result.get(realKey));
                    }
                    return;
                }
                exceptionally(throwable, futures);
            });
        } catch (final Throwable t) {
            exceptionally(t, futures);
        }
    }
}
}

onEvent方法首先會校驗一下當前的event數量有沒有達到閾值以及當前的event是否是Disruptor中最後一個event;而後會根據不一樣的events集合中的數量來走不一樣的實現,這裏作了一個優化,若是是隻有一條數據那麼不會走批處理;最後將全部的key放入到keys集合中並調用multiGet進行批處理。

multiGet方法會調用internalMultiGet返回一個Future,從而實現異步的返回結果。
DefaultRheaKVStore#internalMultiGet

private FutureGroup<Map<ByteArray, byte[]>> internalMultiGet(final List<byte[]> keys, final boolean readOnlySafe,
                                                             final int retriesLeft, final Throwable lastCause) {
    //由於不一樣的key是存放在不一樣的region中的,因此一個region會對應多個key,封裝到map中
    final Map<Region, List<byte[]>> regionMap = this.pdClient
            .findRegionsByKeys(keys, ApiExceptionHelper.isInvalidEpoch(lastCause));
    //返回值
    final List<CompletableFuture<Map<ByteArray, byte[]>>> futures =
            Lists.newArrayListWithCapacity(regionMap.size());
    //lastCause傳入爲null
    final Errors lastError = lastCause == null ? null : Errors.forException(lastCause);

    for (final Map.Entry<Region, List<byte[]>> entry : regionMap.entrySet()) {
        final Region region = entry.getKey();
        final List<byte[]> subKeys = entry.getValue();
        //重試次數減1,設置一個重試函數
        final RetryCallable<Map<ByteArray, byte[]>> retryCallable = retryCause -> internalMultiGet(subKeys,
                readOnlySafe, retriesLeft - 1, retryCause);
        final MapFailoverFuture<ByteArray, byte[]> future = new MapFailoverFuture<>(retriesLeft, retryCallable);
        //發送MultiGetRequest請求,獲取數據
        internalRegionMultiGet(region, subKeys, readOnlySafe, future, retriesLeft, lastError, this.onlyLeaderRead);
        futures.add(future);
    }
    return new FutureGroup<>(futures);
}

internalMultiGet裏會根據key去組裝region,不一樣的key會對應不一樣的region,數據時存在region中的,因此要從不一樣的region中獲取數據,region和key是一對多的關係因此這裏會封裝成一個map。而後會遍歷regionMap,每一個region所對應的數據做爲一個批次調用到internalRegionMultiGet方法中,根據不一樣的狀況獲取數據。

DefaultRheaKVStore#internalRegionMultiGet

private void internalRegionMultiGet(final Region region, final List<byte[]> subKeys, final boolean readOnlySafe,
                                    final CompletableFuture<Map<ByteArray, byte[]>> future, final int retriesLeft,
                                    final Errors lastCause, final boolean requireLeader) {
    //由於當前的是client,因此這裏會是null
    final RegionEngine regionEngine = getRegionEngine(region.getId(), requireLeader);
    // require leader on retry
    //設置重試函數
    final RetryRunner retryRunner = retryCause -> internalRegionMultiGet(region, subKeys, readOnlySafe, future,
            retriesLeft - 1, retryCause, true);
    final FailoverClosure<Map<ByteArray, byte[]>> closure = new FailoverClosureImpl<>(future,
            false, retriesLeft, retryRunner);
    if (regionEngine != null) {
        if (ensureOnValidEpoch(region, regionEngine, closure)) {
            //若是不是null,那麼會獲取rawKVStore,並從中獲取數據
            final RawKVStore rawKVStore = getRawKVStore(regionEngine);
            if (this.kvDispatcher == null) {
                rawKVStore.multiGet(subKeys, readOnlySafe, closure);
            } else {
                //若是是kvDispatcher不爲空,那麼放入到kvDispatcher中異步執行
                this.kvDispatcher.execute(() -> rawKVStore.multiGet(subKeys, readOnlySafe, closure));
            }
        }
    } else {
        final MultiGetRequest request = new MultiGetRequest();
        request.setKeys(subKeys);
        request.setReadOnlySafe(readOnlySafe);
        request.setRegionId(region.getId());
        request.setRegionEpoch(region.getRegionEpoch());
        //調用rpc請求
        this.rheaKVRpcService.callAsyncWithRpc(request, closure, lastCause, requireLeader);
    }
}

由於咱們這裏是client端調用internalRegionMultiGet方法的,因此是沒有設置regionEngine的,那麼會直接向server的當前region所對應的leader節點發送一個MultiGetRequest請求。

由於上面的這些方法基本上和put是一致的,咱們已經在5. SOFAJRaft源碼分析— RheaKV中如何存放數據?講過了,因此這裏不重複的講了。

server端處理MultiGetRequest請求

MultiGetRequest請求會被KVCommandProcessor所處理,KVCommandProcessor裏會根據請求的magic方法返回值來判斷是用什麼方式來進行處理。咱們這裏會調用到DefaultRegionKVService的handleMultiGetRequest方法中處理請求。

public void handleMultiGetRequest(final MultiGetRequest request,
                                  final RequestProcessClosure<BaseRequest, BaseResponse<?>> closure) {
    final MultiGetResponse response = new MultiGetResponse();
    response.setRegionId(getRegionId());
    response.setRegionEpoch(getRegionEpoch());
    try {
        KVParameterRequires.requireSameEpoch(request, getRegionEpoch());
        final List<byte[]> keys = KVParameterRequires.requireNonEmpty(request.getKeys(), "multiGet.keys");
        //調用MetricsRawKVStore的multiGet方法
        this.rawKVStore.multiGet(keys, request.isReadOnlySafe(), new BaseKVStoreClosure() {

            @SuppressWarnings("unchecked")
            @Override
            public void run(final Status status) {
                if (status.isOk()) {
                    response.setValue((Map<ByteArray, byte[]>) getData());
                } else {
                    setFailure(request, response, status, getError());
                }
                closure.sendResponse(response);
            }
        });
    } catch (final Throwable t) {
        LOG.error("Failed to handle: {}, {}.", request, StackTraceUtil.stackTrace(t));
        response.setError(Errors.forException(t));
        closure.sendResponse(response);
    }
}

handleMultiGetRequest方法會調用MetricsRawKVStore的multiGet方法來批量獲取數據。

MetricsRawKVStore#multiGet

public void multiGet(final List<byte[]> keys, final boolean readOnlySafe, final KVStoreClosure closure) {
    //實例化MetricsKVClosureAdapter對象
    final KVStoreClosure c = metricsAdapter(closure, MULTI_GET, keys.size(), 0);
    //調用RaftRawKVStore的multiGet方法
    this.rawKVStore.multiGet(keys, readOnlySafe, c);
}

multiGet方法會傳入一個MetricsKVClosureAdapter實例,經過這個實例實現異步回調response。而後調用RaftRawKVStore的multiGet方法。

RaftRawKVStore#multiGet

public void multiGet(final List<byte[]> keys, final boolean readOnlySafe, final KVStoreClosure closure) {
    if (!readOnlySafe) {
        this.kvStore.multiGet(keys, false, closure);
        return;
    }
    // KV 存儲實現線性一致讀
    // 調用 readIndex 方法,等待回調執行
    this.node.readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {

        @Override
        public void run(final Status status, final long index, final byte[] reqCtx) {
            //若是狀態返回成功,
            if (status.isOk()) {
                RaftRawKVStore.this.kvStore.multiGet(keys, true, closure);
                return;
            }
            //readIndex 讀取失敗嘗試應用鍵值讀操做申請任務於 Leader 節點的狀態機 KVStoreStateMachine
            RaftRawKVStore.this.readIndexExecutor.execute(() -> {
                if (isLeader()) {
                    LOG.warn("Fail to [multiGet] with 'ReadIndex': {}, try to applying to the state machine.",
                            status);
                    // If 'read index' read fails, try to applying to the state machine at the leader node
                    applyOperation(KVOperation.createMultiGet(keys), closure);
                } else {
                    LOG.warn("Fail to [multiGet] with 'ReadIndex': {}.", status);
                    // Client will retry to leader node
                    new KVClosureAdapter(closure, null).run(status);
                }
            });
        }
    });
}

multiGet調用node的readIndex方法進行一致性讀操做,並設置回調,若是返回成功那麼就直接調用RocksRawKVStore讀取數據,若是返回不是成功那麼申請任務於 Leader 節點的狀態機 KVStoreStateMachine。

線性一致性讀readIndex

所謂線性一致讀,一個簡單的例子是在 t1 的時刻咱們寫入了一個值,那麼在 t1 以後,咱們必定能讀到這個值,不可能讀到 t1 以前的舊值(想一想 Java 中的 volatile 關鍵字,即線性一致讀就是在分佈式系統中實現 Java volatile 語義)。簡而言之是須要在分佈式環境中實現 Java volatile 語義效果,即當 Client 向集羣發起寫操做的請求而且得到成功響應以後,該寫操做的結果要對全部後來的讀請求可見。和 volatile 的區別在於 volatile 是實現線程之間的可見,而 SOFAJRaft 須要實現 Server 之間的可見。

SOFAJRaft提供的線性一致讀是基於 Raft 協議的 ReadIndex 實現用 ;Node#readIndex(byte [] requestContext, ReadIndexClosure done) 發起線性一致讀請求,當安全讀取時傳入的 Closure 將被調用,正常狀況從狀態機中讀取數據返回給客戶端。

Node#readIndex

public void readIndex(final byte[] requestContext, final ReadIndexClosure done) {
    if (this.shutdownLatch != null) {
        //異步執行回調
        Utils.runClosureInThread(done, new Status(RaftError.ENODESHUTDOWN, "Node is shutting down."));
        throw new IllegalStateException("Node is shutting down");
    }
    Requires.requireNonNull(done, "Null closure");
    //EMPTY_BYTES
    this.readOnlyService.addRequest(requestContext, done);
}

readIndex會調用ReadOnlyServiceImpl#addRequest將requestContext和回調方法done傳入,requestContext傳入的是BytesUtil.EMPTY_BYTES
接着往下看

ReadOnlyServiceImpl#addRequest

public void addRequest(final byte[] reqCtx, final ReadIndexClosure closure) {
    if (this.shutdownLatch != null) {
        Utils.runClosureInThread(closure, new Status(RaftError.EHOSTDOWN, "Was stopped"));
        throw new IllegalStateException("Service already shutdown.");
    }
    try {
        EventTranslator<ReadIndexEvent> translator = (event, sequence) -> {
            event.done = closure;
            //EMPTY_BYTES
            event.requestContext = new Bytes(reqCtx);
            event.startTime = Utils.monotonicMs();
        };
        int retryTimes = 0;
        while (true) {
            //ReadIndexEventHandler
            if (this.readIndexQueue.tryPublishEvent(translator)) {
                break;
            } else {
                retryTimes++;
                if (retryTimes > MAX_ADD_REQUEST_RETRY_TIMES) {
                    Utils.runClosureInThread(closure,
                        new Status(RaftError.EBUSY, "Node is busy, has too many read-only requests."));
                    this.nodeMetrics.recordTimes("read-index-overload-times", 1);
                    LOG.warn("Node {} ReadOnlyServiceImpl readIndexQueue is overload.", this.node.getNodeId());
                    return;
                }
                ThreadHelper.onSpinWait();
            }
        }
    } catch (final Exception e) {
        Utils.runClosureInThread(closure, new Status(RaftError.EPERM, "Node is down."));
    }
}

addRequest方法裏會將傳入的reqCtx和closure封裝成一個時間,傳入到readIndexQueue隊列中,事件發佈成功後會交由ReadIndexEventHandler處理器處理,發佈失敗會進行重試,最多重試3次。

ReadIndexEventHandler

private class ReadIndexEventHandler implements EventHandler<ReadIndexEvent> {
    // task list for batch
    private final List<ReadIndexEvent> events = new ArrayList<>(
                                                  ReadOnlyServiceImpl.this.raftOptions.getApplyBatch());

    @Override
    public void onEvent(final ReadIndexEvent newEvent, final long sequence, final boolean endOfBatch)
                                                                                                     throws Exception {
        if (newEvent.shutdownLatch != null) {
            executeReadIndexEvents(this.events);
            this.events.clear();
            newEvent.shutdownLatch.countDown();
            return;
        }

        this.events.add(newEvent);
        //批量執行
        if (this.events.size() >= ReadOnlyServiceImpl.this.raftOptions.getApplyBatch() || endOfBatch) {
            executeReadIndexEvents(this.events);
            this.events.clear();
        }
    }
}

ReadIndexEventHandler是ReadOnlyServiceImpl裏面的內部類,裏面有一個全局的events集合用來作事件的批處理,若是當前的event已經達到了32個或是整個Disruptor隊列裏最後一個那麼會調用ReadOnlyServiceImpl的executeReadIndexEvents方法進行事件的批處理。

ReadOnlyServiceImpl#executeReadIndexEvents

private void executeReadIndexEvents(final List<ReadIndexEvent> events) {
    if (events.isEmpty()) {
        return;
    }
    //初始化ReadIndexRequest
    final ReadIndexRequest.Builder rb = ReadIndexRequest.newBuilder() //
        .setGroupId(this.node.getGroupId()) //
        .setServerId(this.node.getServerId().toString());

    final List<ReadIndexState> states = new ArrayList<>(events.size());

    for (final ReadIndexEvent event : events) {
        rb.addEntries(ZeroByteStringHelper.wrap(event.requestContext.get()));
        states.add(new ReadIndexState(event.requestContext, event.done, event.startTime));
    }
    final ReadIndexRequest request = rb.build();

    this.node.handleReadIndexRequest(request, new ReadIndexResponseClosure(states, request));
}

executeReadIndexEvents封裝好ReadIndexRequest請求和將ReadIndexState集合封裝到ReadIndexResponseClosure中,爲後續的操做作裝備

NodeImpl#handleReadIndexRequest

public void handleReadIndexRequest(final ReadIndexRequest request, final RpcResponseClosure<ReadIndexResponse> done) {
    final long startMs = Utils.monotonicMs();
    this.readLock.lock();
    try {
        switch (this.state) {
            case STATE_LEADER:
                readLeader(request, ReadIndexResponse.newBuilder(), done);
                break;
            case STATE_FOLLOWER:
                readFollower(request, done);
                break;
            case STATE_TRANSFERRING:
                done.run(new Status(RaftError.EBUSY, "Is transferring leadership."));
                break;
            default:
                done.run(new Status(RaftError.EPERM, "Invalid state for readIndex: %s.", this.state));
                break;
        }
    } finally {
        this.readLock.unlock();
        this.metrics.recordLatency("handle-read-index", Utils.monotonicMs() - startMs);
        this.metrics.recordSize("handle-read-index-entries", request.getEntriesCount());
    }
}

由於線性一致讀在任何集羣內的節點發起,並不須要強制要求放到 Leader 節點上,容許在 Follower 節點執行,所以大大下降 Leader 的讀取壓力。
當在Follower節點執行一致性讀的時候實際上Follower 節點調用 RpcService#readIndex(leaderId.getEndpoint(), newRequest, -1, closure) 方法向 Leader 發送 ReadIndex 請求,交由Leader節點實現一致性讀。因此我這裏主要介紹Leader的一致性讀。

繼續往下走調用NodeImpl的readLeader方法
NodeImpl#readLeader

private void readLeader(final ReadIndexRequest request, final ReadIndexResponse.Builder respBuilder,
                        final RpcResponseClosure<ReadIndexResponse> closure) {
    //1. 獲取集羣節點中多數選票數是多少
    final int quorum = getQuorum();
    if (quorum <= 1) {
        // Only one peer, fast path.
        //若是集羣中只有一個節點,那麼直接調用回調函數,返回成功
        respBuilder.setSuccess(true) //
                .setIndex(this.ballotBox.getLastCommittedIndex());
        closure.setResponse(respBuilder.build());
        closure.run(Status.OK());
        return;
    }

    final long lastCommittedIndex = this.ballotBox.getLastCommittedIndex();
    //2. 任期必須相等
    //日誌管理器 LogManager 基於投票箱 BallotBox 的 lastCommittedIndex 獲取任期檢查是否等於當前任期
    // 若是不等於當前任期表示此 Leader 節點未在其任期內提交任何日誌,須要拒絕只讀請求;
    if (this.logManager.getTerm(lastCommittedIndex) != this.currTerm) {
        // Reject read only request when this leader has not committed any log entry at its term
        closure
                .run(new Status(
                        RaftError.EAGAIN,
                        "ReadIndex request rejected because leader has not committed any log entry at its term, " +
                         "logIndex=%d, currTerm=%d.",
                        lastCommittedIndex, this.currTerm));
        return;
    }
    respBuilder.setIndex(lastCommittedIndex);

    if (request.getPeerId() != null) {
        // request from follower, check if the follower is in current conf.
        final PeerId peer = new PeerId();
        peer.parse(request.getServerId());
        //3. 來自 Follower 的請求須要檢查 Follower 是否在當前配置
        if (!this.conf.contains(peer)) {
            closure
                    .run(new Status(RaftError.EPERM, "Peer %s is not in current configuration: {}.", peer,
                     this.conf));
            return;
        }
    }

    ReadOnlyOption readOnlyOpt = this.raftOptions.getReadOnlyOptions();
    //4. 若是使用的是ReadOnlyLeaseBased,確認leader是不是在在租約有效時間內
    if (readOnlyOpt == ReadOnlyOption.ReadOnlyLeaseBased && !isLeaderLeaseValid()) {
        // If leader lease timeout, we must change option to ReadOnlySafe
        readOnlyOpt = ReadOnlyOption.ReadOnlySafe;
    }

    switch (readOnlyOpt) {
        //5
        case ReadOnlySafe:
            final List<PeerId> peers = this.conf.getConf().getPeers();
            Requires.requireTrue(peers != null && !peers.isEmpty(), "Empty peers");
            //設置心跳的響應回調函數
            final ReadIndexHeartbeatResponseClosure heartbeatDone = new ReadIndexHeartbeatResponseClosure(closure,
                    respBuilder, quorum, peers.size());
            // Send heartbeat requests to followers
            //向 Followers 節點發起一輪 Heartbeat,若是半數以上節點返回對應的
            // Heartbeat Response,那麼 Leader就可以肯定如今本身仍然是 Leader
            for (final PeerId peer : peers) {
                if (peer.equals(this.serverId)) {
                    continue;
                }
                this.replicatorGroup.sendHeartbeat(peer, heartbeatDone);
            }
            break;
        //6. 由於在租約期內不會發生選舉,確保 Leader 不會變化
        //因此直接返回回調結果
        case ReadOnlyLeaseBased:
            // Responses to followers and local node.
            respBuilder.setSuccess(true);
            closure.setResponse(respBuilder.build());
            closure.run(Status.OK());
            break;
    }
}
  1. 獲取集羣節點中多數選票數是多少,即集羣節點的1/2+1,若是當前的集羣裏只有一個節點,那麼直接返回成功,並調用回調方法
  2. 校驗 Raft 集羣節點數量以及 lastCommittedIndex 所屬任期符合預期,那麼響應構造器設置其索引爲投票箱 BallotBox 的 lastCommittedIndex
  3. 來自 Follower 的請求須要檢查 Follower 是否在當前配置,若是不在當前配置中直接調用回調方法設置異常
  4. 獲取 ReadIndex 請求級別 ReadOnlyOption 配置,ReadOnlyOption 參數默認值爲 ReadOnlySafe。若是設置的是ReadOnlyLeaseBased,那麼會調用isLeaderLeaseValid檢查leader是不是在在租約有效時間內
  5. 配置爲ReadOnlySafe 調用 Replicator#sendHeartbeat(rid, closure) 方法向 Followers 節點發送 Heartbeat 心跳請求,發送心跳成功執行 ReadIndexHeartbeatResponseClosure 心跳響應回調;ReadIndex 心跳響應回調檢查是否超過半數節點包括 Leader 節點自身投票同意,半數以上節點返回客戶端Heartbeat 請求成功響應,即 applyIndex 超過 ReadIndex 說明已經同步到 ReadIndex 對應的 Log 可以提供 Linearizable Read
  6. 配置爲ReadOnlyLeaseBased,由於Leader 租約有效期間認爲當前 Leader 是 Raft Group 內的惟一有效 Leader,因此忽略 ReadIndex 發送 Heartbeat 確認身份步驟,直接返回 Follower 節點和本地節點 Read 請求成功響應。Leader 節點繼續等待狀態機執行,直到 applyIndex 超過 ReadIndex 安全提供 Linearizable Read

不管是ReadOnlySafe仍是ReadOnlyLeaseBased,最後發送成功響應都會調用ReadIndexResponseClosure的run方法。

ReadIndexResponseClosure#run

public void run(final Status status) {
    //fail
    //傳入的狀態不是ok,響應失敗
    if (!status.isOk()) {
        notifyFail(status);
        return;
    }
    final ReadIndexResponse readIndexResponse = getResponse();
    //Fail
    //response沒有響應成功,響應失敗
    if (!readIndexResponse.getSuccess()) {
        notifyFail(new Status(-1, "Fail to run ReadIndex task, maybe the leader stepped down."));
        return;
    }
    // Success
    //一致性讀成功
    final ReadIndexStatus readIndexStatus = new ReadIndexStatus(this.states, this.request,
        readIndexResponse.getIndex());
    for (final ReadIndexState state : this.states) {
        // Records current commit log index.
        //設置當前提交的index
        state.setIndex(readIndexResponse.getIndex());
    }

    boolean doUnlock = true;
    ReadOnlyServiceImpl.this.lock.lock();
    try {
        //校驗applyIndex 是否超過 ReadIndex
        if (readIndexStatus.isApplied(ReadOnlyServiceImpl.this.fsmCaller.getLastAppliedIndex())) {
            // Already applied, notify readIndex request.
            ReadOnlyServiceImpl.this.lock.unlock();
            doUnlock = false;
            //已經同步到 ReadIndex 對應的 Log 可以提供 Linearizable Read
            notifySuccess(readIndexStatus);
        } else {
            // Not applied, add it to pending-notify cache.
            ReadOnlyServiceImpl.this.pendingNotifyStatus
                .computeIfAbsent(readIndexStatus.getIndex(), k -> new ArrayList<>(10)) //
                .add(readIndexStatus);
        }
    } finally {
        if (doUnlock) {
            ReadOnlyServiceImpl.this.lock.unlock();
        }
    }
}

Run方法首先會校驗一下是否須要響應失敗,若是響應成功,那麼會將全部封裝的ReadIndexState更新一下index,而後校驗一下applyIndex 是否超過 ReadIndex,超過了ReadIndex表明全部已經複製到多數派上的 Log(可視爲寫操做)被視爲安全的 Log,該 Log 所體現的數據就能對客戶端 Client 可見。

ReadOnlyServiceImpl#notifySuccess

private void notifySuccess(final ReadIndexStatus status) {
    final long nowMs = Utils.monotonicMs();
    final List<ReadIndexState> states = status.getStates();
    final int taskCount = states.size();
    for (int i = 0; i < taskCount; i++) {
        final ReadIndexState task = states.get(i);
        final ReadIndexClosure done = task.getDone(); // stack copy
        if (done != null) {
            this.nodeMetrics.recordLatency("read-index", nowMs - task.getStartTimeMs());
            done.setResult(task.getIndex(), task.getRequestContext().get());
            done.run(Status.OK());
        }
    }
}

若是是響應成功,那麼會調用notifySuccess方法,會將status裏封裝的ReadIndexState集合遍歷一遍,調用當中的run方法。

這個run方法會調用到咱們在multiGet中設置的run方法中
RaftRawKVStore#multiGet

public void multiGet(final List<byte[]> keys, final boolean readOnlySafe, final KVStoreClosure closure) {
    if (!readOnlySafe) {
        this.kvStore.multiGet(keys, false, closure);
        return;
    }
    // KV 存儲實現線性一致讀
    // 調用 readIndex 方法,等待回調執行
    this.node.readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {

        @Override
        public void run(final Status status, final long index, final byte[] reqCtx) {
            //若是狀態返回成功,
            if (status.isOk()) {
                RaftRawKVStore.this.kvStore.multiGet(keys, true, closure);
                return;
            }
            //readIndex 讀取失敗嘗試應用鍵值讀操做申請任務於 Leader 節點的狀態機 KVStoreStateMachine
            RaftRawKVStore.this.readIndexExecutor.execute(() -> {
                if (isLeader()) {
                    LOG.warn("Fail to [multiGet] with 'ReadIndex': {}, try to applying to the state machine.",
                            status);
                    // If 'read index' read fails, try to applying to the state machine at the leader node
                    applyOperation(KVOperation.createMultiGet(keys), closure);
                } else {
                    LOG.warn("Fail to [multiGet] with 'ReadIndex': {}.", status);
                    // Client will retry to leader node
                    new KVClosureAdapter(closure, null).run(status);
                }
            });
        }
    });

這個run方法會調用RaftRawKVStore的multiGet從RocksDB中直接獲取數據。

總結

咱們這篇文章從RheaKVStore的客戶端get方法一直講到,RheaKVStore服務端使用JRaft實現線性一致性讀,並講解了線性一致性讀是怎麼實現的,經過這個例子你們應該對線性一致性讀有了一個相對不錯的理解了。

相關文章
相關標籤/搜索