SOFAJRaft 源碼分析三(狀態機、線性一致性讀)

1.概述

今天來看一下jraft如何將日誌寫入到狀態機,其實就是業務真正的存儲工做。若是咱們須要使用jraft,咱們對這裏的實現就須要足夠的瞭解。而後還會介紹jraft的讀取邏輯。java

2.思路整理

對於狀態機,咱們關注問題以下:node

  • 什麼時候會將日誌同步到狀態機?
  • 對於節點變化,狀態機會作什麼?
  • 狀態機爲了業務解藕作了怎麼樣封裝?

對於讀取操做:主要就是如何作讀取優化操做?
那咱們帶着這幾個問題一塊兒深刻源碼,尋找答案吧!數據庫

3.狀態機源碼分析

主要就是leader和follower將日誌應用到狀態機的過程。固然leader和follower應用的時機不同,可是過程都是同樣的。網絡

咱們先來看leader。leader再增長日誌的時候,會有一個回調,若是成功會執行這個回調方法(具體時機爲將日誌添加到本地磁盤後,也就是AppendBatcher 的flush方法)。架構

這個回調回執行ballotBox的commitAt方法。併發

@Override
public void run(final Status status) {
    if (status.isOk()) {
        NodeImpl.this.ballotBox.commitAt(this.firstLogIndex, this.firstLogIndex + this.nEntries - 1,
            NodeImpl.this.serverId);
    } else {
        LOG.error("Node {} append [{}, {}] failed, status={}.", getNodeId(), this.firstLogIndex,
            this.firstLogIndex + this.nEntries - 1, status);
    }
}

commitAt方法

這裏說一下,ballotBox主要記錄了日誌提交的狀態。每一個節點提交日誌成功後都會調用這個方法。app

上面只說了leader成功,若是follower提交成功,則會以響應的形式告訴leader。在onAppendEntriesReturned 中也會調用該方法。以下圖。異步


這就很清晰了。其實上篇博客都介紹了這個方法的做用。由於和狀態機實現銜接,因此咱們在來回顧一下這個方法。ide

final long startAt = Math.max(this.pendingIndex, firstLogIndex);
Ballot.PosHint hint = new Ballot.PosHint();
for (long logIndex = startAt; logIndex <= lastLogIndex; logIndex++) {
    final Ballot bl = this.pendingMetaQueue.get((int) (logIndex - this.pendingIndex));
    hint = bl.grant(peer, hint);
    if (bl.isGranted()) {
        lastCommittedIndex = logIndex;
    }
}
if (lastCommittedIndex == 0) {
    return true;
}
this.pendingMetaQueue.removeFromFirst((int) (lastCommittedIndex - this.pendingIndex) + 1);
LOG.debug("Committed log fromIndex={}, toIndex={}.", this.pendingIndex, lastCommittedIndex);
this.pendingIndex = lastCommittedIndex + 1;
this.lastCommittedIndex = lastCommittedIndex;
...
this.waiter.onCommitted(lastCommittedIndex);
  • pendingIndex:當前已經ok的日誌索引+1(何爲ok,就是大多數節點都持久化的)
  • firstLogIndex:本次提交成功日誌的起始值。
  • lastLogIndex:本次提交成功日誌的終止值。
爲何這裏startAt爲max,由於這個有很大的可能pendingIndex比firstLogIndex大,緣由是這個節點響應比較慢。在他響應以前Ballot的isGranted已經返回true了。

這樣的話,咱們能理解這個方法其實就是用來維護this.lastCommittedIndex這個成員變量。最後他會調用this.waiter.onCommitted方法。源碼分析

onCommitted方法

其實這個方法就是commit到狀態機的入口。
固然這個方法也會在兩處被調用。一處是被leader,一處是被follower調用。

@Override
public boolean onCommitted(final long committedIndex) {
    return enqueueTask((task, sequence) -> {
        task.type = TaskType.COMMITTED;
        task.committedIndex = committedIndex;
    });
}

這個方法邏輯比較簡單,就是建立一個commit時間丟到FSMCallerImpl 的隊列。
咱們順藤摸瓜看看follower什麼時候調用這個onCommitted方法。

FollowerStableClosure#run

1.在follower增長日誌成功以後,有執行FollowerStableClosure 回調,上篇文章說過,他就是用來響應leader。固然在響應以前回執行node.ballotBox.setLastCommittedIndex方法。其實這個方法最後會調用onCommitted方法。

2.在follower處理心跳或者探針消息的時候。也會調用setLastCommittedIndex方法。ok,到這裏咱們已經瞭解了。follower和leader什麼時候會給FSMCallerImpl 的隊列提交commit事件。接下來咱們只須要關注如何處理事件了。

if (entriesCount == 0) {
    // heartbeat
    final AppendEntriesResponse.Builder respBuilder = AppendEntriesResponse.newBuilder() //
        .setSuccess(true) //
        .setTerm(this.currTerm) //
        .setLastLogIndex(this.logManager.getLastLogIndex());
    doUnlock = false;
    this.writeLock.unlock();
    // see the comments at FollowerStableClosure#run()
    this.ballotBox.setLastCommittedIndex(Math.min(request.getCommittedIndex(), prevLogIndex));
    return respBuilder.build();
}

enqueueTask

在這以前咱們先看一下enqueueTask方法。咱們發現,他有不少事件,不只只有咱們上面看到的commit事件,還會有節點變化,或者快照等一系列事件。咱們後面一塊兒分析。

ApplyTaskHandler

這個handler就是對應事件處理器。具體是runApplyTask方法。

private class ApplyTaskHandler implements EventHandler<ApplyTask> {
    // max committed index in current batch, reset to -1 every batch
    private long maxCommittedIndex = -1;

    @Override
    public void onEvent(final ApplyTask event, final long sequence, final boolean endOfBatch) throws Exception {
        this.maxCommittedIndex = runApplyTask(event, this.maxCommittedIndex, endOfBatch);
    }
}

runApplyTask方法

這個方法邏輯其實很簡單。其實就是根據實現類型執行不一樣的處理操做。

好比下面的代碼,這裏爲了儘可能少貼代碼,只放了三個case。很明顯根據不一樣類型會調用對應的方法。

其實方法的實現也很簡單,就是調用業務方狀態機實現類的對象方法。其實以前咱們也說過,要實現的話,只須要繼承對應的適配器類,實現想要實現的方法便可。

case LEADER_STOP:
    this.currTask = TaskType.LEADER_STOP;
    doLeaderStop(task.status);
    break;
case LEADER_START:
    this.currTask = TaskType.LEADER_START;
    doLeaderStart(task.term);
    break;
case START_FOLLOWING:
    this.currTask = TaskType.START_FOLLOWING;
    doStartFollowing(task.leaderChangeCtx);
    break;

咱們重點關注提交數據的邏輯,這裏更新最大commitIndex,而後調用doCommitted方法。

if (endOfBatch && maxCommittedIndex >= 0) {
    this.currTask = TaskType.COMMITTED;
    doCommitted(maxCommittedIndex);
    maxCommittedIndex = -1L; // reset maxCommittedIndex
}

doCommitted方法

1.獲取上一次提交的Index,若是當前commitIndex小於上一個提交的index,直接return。

final long lastAppliedIndex = this.lastAppliedIndex.get();
// We can tolerate the disorder of committed_index
if (lastAppliedIndex >= committedIndex) {
    return;
}

2.建立迭代器,這裏依賴commitIndex以及LogManager,LogManager主要就是根據偏移獲取日誌。

final long firstClosureIndex = this.closureQueue.popClosureUntil(committedIndex, closures, taskClosures);

// Calls TaskClosure#onCommitted if necessary
onTaskCommitted(taskClosures);

Requires.requireTrue(firstClosureIndex >= 0, "Invalid firstClosureIndex");
final IteratorImpl iterImpl = new IteratorImpl(this.fsm, this.logManager, closures, firstClosureIndex,
    lastAppliedIndex, committedIndex, this.applyingIndex);
while (iterImpl.isGood()) {
    final LogEntry logEntry = iterImpl.entry();
    if (logEntry.getType() != EnumOutter.EntryType.ENTRY_TYPE_DATA) {
        if (logEntry.getType() == EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION) {
            if (logEntry.getOldPeers() != null && !logEntry.getOldPeers().isEmpty()) {
                // Joint stage is not supposed to be noticeable by end users.
                this.fsm.onConfigurationCommitted(new Configuration(iterImpl.entry().getPeers()));
            }
        }
        if (iterImpl.done() != null) {
            iterImpl.done().run(Status.OK());
        }
        iterImpl.next();
        continue;
    }
    // Apply data task to user state machine
    doApplyTasks(iterImpl);
}

最終會調用doApplyTasks,其實就是調用了fsm的apply方法。

若是是leader,還會有個closureQueue,這個隊列存儲的是業務方執行apply請求的回調方法。通常就是成功應用狀態機後響應給調用方。下面代碼就是Counter例子的回調

public void handleRequest(final RpcContext rpcCtx, final IncrementAndGetRequest request) {
    final CounterClosure closure = new CounterClosure() {
        @Override
        public void run(Status status) {
            rpcCtx.sendResponse(getValueResponse());
        }
    };
    this.counterService.incrementAndGet(request.getDelta(), closure);
}

3.後續的狀態更新

final long lastIndex = iterImpl.getIndex() - 1;
final long lastTerm = this.logManager.getTerm(lastIndex);
final LogId lastAppliedId = new LogId(lastIndex, lastTerm);
this.lastAppliedIndex.set(lastIndex);
this.lastAppliedTerm = lastTerm;
this.logManager.setAppliedId(lastAppliedId);
notifyLastAppliedIndexUpdated(lastIndex);

其實上面的邏輯很簡單,就是根據commitIndex而後取迭代操做,最後調用apply方法。咱們有必要關注這個apply方法如何實現。其實根據不一樣的業務,有不一樣的實現,好比若是是數據庫,那麼直接經過數據庫引擎執行對應語句。若是隻是簡單的counter,那麼很是容易。

下面是counter的例子實現:

這裏其實有個優化,若是iter.done不爲空,說明當前爲leader,咱們就不須要從日誌序列化數據,直接從done返回。

CounterClosure closure = null;
if (iter.done() != null) {
    // This task is applied by this node, get value from closure to avoid additional parsing.
    closure = (CounterClosure) iter.done();
    counterOperation = closure.getCounterOperation();
} else {
    // Have to parse FetchAddRequest from this user log.
    final ByteBuffer data = iter.getData();
    try {
        counterOperation = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize(
            data.array(), CounterOperation.class.getName());
    } catch (final CodecException e) {
        LOG.error("Fail to decode IncrementAndGetRequest", e);
    }
}

最後根據執行執行對應的操做,若是隻是get,直接返回結果。
若是是increment,那麼調用原子自增。最後執行回調。

if (counterOperation != null) {
    switch (counterOperation.getOp()) {
        case GET:
            current = this.value.get();
            LOG.info("Get value={} at logIndex={}", current, iter.getIndex());
            break;
        case INCREMENT:
            final long delta = counterOperation.getDelta();
            final long prev = this.value.get();
            current = this.value.addAndGet(delta);
            LOG.info("Added value={} by delta={} at logIndex={}", prev, delta, iter.getIndex());
            break;
    }

    if (closure != null) {
        closure.success(current);
        closure.run(Status.OK());
    }
}

其實到這裏,狀態機的實現咱們已經足夠了解了。

jraft經過適配器模式。留了一個適配器的類StateMachineAdapter 。
業務方只須要繼承該類便可。而後在初始化server的時候。將咱們的實現類設置到配置中便可。
this.fsm = new CounterStateMachine();
// 設置狀態機到啓動參數
nodeOptions.setFsm(this.fsm);

這裏jraft就能夠在對應操做的時候執行咱們實現的方法,實現業務解藕。

4.線性一致性讀

首先咱們應該理解線性一致性讀的概念:在T1時刻寫入的值,在T1時刻以後讀確定能夠讀到。也即讀的數據必須是讀開始以後的某個值,不能是讀開始以前的某個值。不要求返回最新的值,返回時間大於讀開始的值就能夠。

LogRead

這是一種很簡單而且容易理解的解決方案,也就是說對於讀操做也要寫入Log,由於每一個Log都是有其順序的,若是按照順序去執行,必然會保證線性一致性。
可是這個缺點明顯,由於對於讀操做還要記錄Log,這就會致使不必的磁盤IO。因此有了一些優化的實現。

ReadIndex

他和LogRead區別就是,他不須要記錄Log。每次讀請求到達後,會將當前commitIndex記錄爲ReadIndex。而後判斷當前節點是否爲leader,若是是,等待狀態機至少應用到ReadIndex,而後執行讀請求,返回給客戶端。
固然這種優化仍是要肯定本身是否爲leader,須要走一次RPC請求。

LeaseRead

其實這種就是儘可能避免了Rpc。由於raft選舉有個election timeout的閾值,因此
LeaseRead取了一個比election timeout小的租期,可是其正確性和時間掛鉤。因此時間飄走嚴重,就會出現不一致現象。

源碼跟蹤

ReadOnlyServiceImpl#addRequest方法

while (true) {
    if (this.readIndexQueue.tryPublishEvent(translator)) {
        break;
    } else {
        retryTimes++;
        if (retryTimes > MAX_ADD_REQUEST_RETRY_TIMES) {
            Utils.runClosureInThread(closure,
                new Status(RaftError.EBUSY, "Node is busy, has too many read-only requests."));
            this.nodeMetrics.recordTimes("read-index-overload-times", 1);
            LOG.warn("Node {} ReadOnlyServiceImpl readIndexQueue is overload.", this.node.getNodeId());
            return;
        }
        ThreadHelper.onSpinWait();
    }
}

在讀請求到達後,都會走這個方法,主要就是構建請求Event,丟進隊列。
按照老的套路,咱們此時須要看一下處理隊列的Handler如何實現。

其實這個handler主要是調用executeReadIndexEvents方法,這個方法會構建一個ReadIndexRequest 請求,而後調用handleReadIndexRequest 方法。

handleReadIndexRequest方法

public void handleReadIndexRequest(final ReadIndexRequest request, final RpcResponseClosure<ReadIndexResponse> done) {
    final long startMs = Utils.monotonicMs();
    this.readLock.lock();
    try {
        switch (this.state) {
            case STATE_LEADER:
                readLeader(request, ReadIndexResponse.newBuilder(), done);
                break;
            case STATE_FOLLOWER:
                readFollower(request, done);
                break;
            case STATE_TRANSFERRING:
                done.run(new Status(RaftError.EBUSY, "Is transferring leadership."));
                break;
            default:
                done.run(new Status(RaftError.EPERM, "Invalid state for readIndex: %s.", this.state));
                break;
        }
    } finally {
        this.readLock.unlock();
        this.metrics.recordLatency("handle-read-index", Utils.monotonicMs() - startMs);
        this.metrics.recordSize("handle-read-index-entries", request.getEntriesCount());
    }
}

這個方法會先加讀鎖,而後會根據當前節點狀態執行對應的操做。

readLeader方法

1.若是當前leader在任期期間沒有提交過日誌則直接失敗

if (this.logManager.getTerm(lastCommittedIndex) != this.currTerm) {
    // Reject read only request when this leader has not committed any log entry at its term
    closure
        .run(new Status(
            RaftError.EAGAIN,
            "ReadIndex request rejected because leader has not committed any log entry at its term, logIndex=%d, currTerm=%d.",
            lastCommittedIndex, this.currTerm));
    return;
}

2.根據配置判斷採用什麼方式進行讀取。

ReadOnlyOption readOnlyOpt = this.raftOptions.getReadOnlyOptions();
if (readOnlyOpt == ReadOnlyOption.ReadOnlyLeaseBased && !isLeaderLeaseValid()) {
    // If leader lease timeout, we must change option to ReadOnlySafe
    readOnlyOpt = ReadOnlyOption.ReadOnlySafe;
}

switch (readOnlyOpt) {
    case ReadOnlySafe:
        final List<PeerId> peers = this.conf.getConf().getPeers();
        Requires.requireTrue(peers != null && !peers.isEmpty(), "Empty peers");
        final ReadIndexHeartbeatResponseClosure heartbeatDone = new ReadIndexHeartbeatResponseClosure(closure,
            respBuilder, quorum, peers.size());
        // Send heartbeat requests to followers
        for (final PeerId peer : peers) {
            if (peer.equals(this.serverId)) {
                continue;
            }
            this.replicatorGroup.sendHeartbeat(peer, heartbeatDone);
        }
        break;
    case ReadOnlyLeaseBased:
        // Responses to followers and local node.
        respBuilder.setSuccess(true);
        closure.setResponse(respBuilder.build());
        closure.run(Status.OK());
        break;
}
  • 若是是ReadOnlySafe,這裏採用readIndex的方式,會向每一個節點發送心跳,避免leader飄走問題。
  • 若是是ReadOnlyLeaseBased,由於其需求保證當前節點爲leader,因此直接返回便可。

其實這個心跳方法咱們以前文章有說過。就再也不贅述。
咱們重點看一下心跳以後的回調

public synchronized void run(final Status status) {
    if (this.isDone) {
        return;
    }
    if (status.isOk() && getResponse().getSuccess()) {
        this.ackSuccess++;
    } else {
        this.ackFailures++;
    }
    // Include leader self vote yes.
    if (this.ackSuccess + 1 >= this.quorum) {
        this.respBuilder.setSuccess(true);
        this.closure.setResponse(this.respBuilder.build());
        this.closure.run(Status.OK());
        this.isDone = true;
    } else if (this.ackFailures >= this.failPeersThreshold) {
        this.respBuilder.setSuccess(false);
        this.closure.setResponse(this.respBuilder.build());
        this.closure.run(Status.OK());
        this.isDone = true;
    }
}

這裏並無採用boltCtx去存儲心跳的結果了。而是經過synchronized進行控制。畢竟這個回調對象都是同一個,只要經過鎖進行併發控制便可。若是多半節節點承認該節點爲leader,則返回成功。不然失敗。對去leader的readIndex實現其實就這麼簡單。

上面不管是哪一種方式,成功後都會執行回調。
這個主要在ReadIndexResponseClosure 的run方法實現。

ReadIndexResponseClosure#run

// Success
final ReadIndexStatus readIndexStatus = new ReadIndexStatus(this.states, this.request,
    readIndexResponse.getIndex());
for (final ReadIndexState state : this.states) {
    // Records current commit log index.
    state.setIndex(readIndexResponse.getIndex());
}

boolean doUnlock = true;
ReadOnlyServiceImpl.this.lock.lock();
try {
    if (readIndexStatus.isApplied(ReadOnlyServiceImpl.this.fsmCaller.getLastAppliedIndex())) {
        // Already applied, notify readIndex request.
        ReadOnlyServiceImpl.this.lock.unlock();
        doUnlock = false;
        notifySuccess(readIndexStatus);
    } else {
        // Not applied, add it to pending-notify cache.
        ReadOnlyServiceImpl.this.pendingNotifyStatus
            .computeIfAbsent(readIndexStatus.getIndex(), k -> new ArrayList<>(10)) //
            .add(readIndexStatus);
   }
  }

這個方法邏輯其實很簡單了。首先就會構建一個ReadIndexStatus(jraft作讀取邏輯時候採用批量讀,減小網絡io)。

每一個讀取請求都是一個ReadIndexStatu,裏面會有成功回調。jraft只須要再可讀的時候執行該回調(在notifySuccess中執行)。固然這個回調是由業務方實現的。

若是不符合讀取條件,也就是apply小於readIndex,那麼會將其加入到pendingNotifyStatus。在apply變化的時候會經過監聽通知pendingNotifyStatus,去再次判斷。還有一個定時任務也在保證。

主要能夠參考ReadOnlyServiceImpl 的onApplied 方法。

具體到這裏咱們就徹底明白了jraft如何實現讀取的。該篇文章也就講這麼多。

5.總結

本文主要分析了狀態機的實現,詳細講解了什麼時候會同步日誌到狀態機,而且業務方如何實現。jraft經過適配器模式,讓咱們能夠更加方便的去實現業務邏輯。設計架構其實很簡單,也很容易理解。就是咱們將實現傳入配置。jraft會在對應的時候執行對應的邏輯。

還有就是jraft的線性一致性讀,其實原理上面也說清楚了,固然jraft做了足夠的優化,經過異步回調,而且採用batch讀的方式,減小必要的網絡IO。是值得咱們去學習的!

文章參考
https://pingcap.com/blog-cn/l...

相關文章
相關標籤/搜索