SOFAJRaft 源碼分析四(SPI機制、快照)

1.概述

這篇文章咱們來學習一下JRaft的SPI機制,以及快照的實現。對於SPI,在框架開發,尤爲是模塊設計中是很是必要的,他能夠實現可插拔的業務邏輯,實現解藕。對於快照,其實就是raft對日誌的優化,避免新節點加入後大量的網絡和磁盤IO,而且能夠節省磁盤空間,是很是必要的。接下來咱們慢慢看。java

2.JRaft的SPI實現

主要實現類就是JRaftServiceLoader。
咱們來看看使用方式node

public static final JRaftServiceFactory defaultServiceFactory = JRaftServiceLoader.load(JRaftServiceFactory.class).first();

JRaftServiceLoader#load

public static <S> JRaftServiceLoader<S> load(final Class<S> service) {
    return JRaftServiceLoader.load(service, Thread.currentThread().getContextClassLoader());
}

public static <S> JRaftServiceLoader<S> load(final Class<S> service, final ClassLoader loader) {
    return new JRaftServiceLoader<>(service, loader);
}

經過當前線程上線文類加載器(其實就是AppClassLoader)去進行加載工做。固然這裏直接用JRaftServiceLoader的類加載器也是同樣的效果。緩存

JRaftServiceLoader#first

public S first() {
    final List<S> sortList = sort();
    if (sortList.isEmpty()) {
        throw fail(this.service, "could not find any implementation for class");
    }
    return sortList.get(0);
}

這個sort方法其實就是獲取到一個排序實現類的集合。而後從中獲取第一個(最優先的)。網絡

JRaftServiceLoader#sort

public List<S> sort() {
    final Iterator<S> it = iterator();
    final List<S> sortList = new ArrayList<>();
    while (it.hasNext()) {
        sortList.add(it.next());
    }
    if (sortList.size() <= 1) {
        return sortList;
    }
    sortList.sort((o1, o2) -> {
        final SPI o1_spi = o1.getClass().getAnnotation(SPI.class);
        final SPI o2_spi = o2.getClass().getAnnotation(SPI.class);
        final int o1_priority = o1_spi == null ? 0 : o1_spi.priority();
        final int o2_priority = o2_spi == null ? 0 : o2_spi.priority();
        return -(o1_priority - o2_priority);
    });
    return sortList;
}

這個方法其實實現很簡單,就是經過迭代器,去迭代獲取全部的實現,而後根據實現類的SPI註解中的priority,也就是優先級進行排序,最後返回一個有序的集合。
咱們重點仍是關注這個迭代器的實現。session

JRaftServiceLoader#iterator

final Iterator<Map.Entry<String, S>> knownProviders = JRaftServiceLoader.this.providers.entrySet()
    .iterator();
@Override
public boolean hasNext() {
    return this.knownProviders.hasNext() || JRaftServiceLoader.this.lookupIterator.hasNext();
}
@Override
public S next() {
    if (this.knownProviders.hasNext()) {
        return this.knownProviders.next().getValue();
    }
    final Class<S> cls = JRaftServiceLoader.this.lookupIterator.next();
    try {
        final S provider = JRaftServiceLoader.this.service.cast(cls.newInstance());
        JRaftServiceLoader.this.providers.put(cls.getName(), provider);
        return provider;
    } catch (final Throwable x) {
        throw fail(JRaftServiceLoader.this.service, "provider " + cls.getName()
            + " could not be instantiated", x);
    }
}

knownProviders其實就是一個緩存,加載成功的會放在這個Map裏面。也就是咱們第二次調用iterator方法的時候,就不須要從新執行類加載過程了(也不會從新執行),直接從緩存中獲取。
若是是第一次執行該方法,那麼依賴lookupIterator去執行加載過程。咱們先來看一下lookupIterator(LazyIterator)的實現。框架

LazyIterator#hasNext

@Override
public boolean hasNext() {
    if (this.nextName != null) {
        return true;
    }
    if (this.configs == null) {
        try {
            final String fullName = PREFIX + this.service.getName();
            if (this.loader == null) {
                this.configs = ClassLoader.getSystemResources(fullName);
            } else {
                this.configs = this.loader.getResources(fullName);
            }
        } catch (final IOException x) {
            throw fail(this.service, "error locating configuration files", x);
        }
    }
    while ((this.pending == null) || !this.pending.hasNext()) {
        if (!this.configs.hasMoreElements()) {
            return false;
        }
        this.pending = parse(this.service, this.configs.nextElement());
    }
    this.nextName = this.pending.next();
    return true;
}

這裏有個細節,就是加了一個this.nextName,若是不爲空的話,直接返回true,由於後面在判斷成功後會給this.nextName賦值。若是調用了hasNext方法,卻沒有調用next,那麼下次調用hasNext就不用在走一遍判斷邏輯。ide

其實方法邏輯很簡單,就是獲取對應路徑下的文件,並加載出來。而後逐行解析。無論每行多個實現或者只有一個。都是能夠適配的。到這裏基本上就說完了。學習

實現仍是比較簡單。經過迭代器,懶加載,緩存來實現的。並加了優先級。優化

2.JRaft的快照

快照初始化

在節點初始化的時候,會啓動快照的定時器snapshotTimer。
默認時間爲一天執行一次。ui

SnapshotExecutorImpl#doSnapshot

這個方法流程以下

  1. 判斷節點狀態,若是stop、savingSnapshot或downloadingSnapshot的時候,直接返回
  2. 若是沒有新的數據則返回。
  3. 建立SnapshotWriter
  4. 封裝成SaveSnapshotDone丟給fsmCaller
final SnapshotWriter writer = this.snapshotStorage.create();
if (writer == null) {
    Utils.runClosureInThread(done, new Status(RaftError.EIO, "Fail to create writer."));
    reportError(RaftError.EIO.getNumber(), "Fail to create snapshot writer.");
    return;
}
this.savingSnapshot = true;
final SaveSnapshotDone saveSnapshotDone = new SaveSnapshotDone(writer, done, null);
if (!this.fsmCaller.onSnapshotSave(saveSnapshotDone)) {
    Utils.runClosureInThread(done, new Status(RaftError.EHOSTDOWN, "The raft node is down."));
    return;
}

在fsmCaller中的runApplyTask 方法會處理snapshot save事件,最終調用doSnapshotSave方法。

case SNAPSHOT_SAVE:
    this.currTask = TaskType.SNAPSHOT_SAVE;
    if (passByStatus(task.done)) {
        doSnapshotSave((SaveSnapshotClosure) task.done);
    }
    break;

FSMCallerImpl#doSnapshotSave

這個方法主要是爲SnapshotWriter構建SnapshotMeta 對象,填充對應元數據,最後會調用onSnapshotSave 方法。

final SnapshotWriter writer = done.start(metaBuilder.build());
if (writer == null) {
    done.run(new Status(RaftError.EINVAL, "snapshot_storage create SnapshotWriter failed"));
    return;
}
this.fsm.onSnapshotSave(writer, done);

this.fsm.onSnapshotSave方法須要業務方去實現,主要就是快照的保存和壓縮。業務方需將快照文件數據加入到SnapshotWriter中。

安裝快照

安裝快照實在Replicator中執行。

fillCommonFields返回false就調用installSnapshot方法。

或者當前要同步的日誌數量爲0(兩種狀況,一種是確實沒有日誌同步,另外一種是日誌被刪除,存儲到快照文件了,若是是被刪除,就須要同步快照),也會調用installSnapshot方法。

if (!fillCommonFields(rb, this.nextIndex - 1, isHeartbeat)) {
    // id is unlock in installSnapshot
    installSnapshot();
    if (isHeartbeat && heartBeatClosure != null) {
        Utils.runClosureInThread(heartBeatClosure, new Status(RaftError.EAGAIN,
            "Fail to send heartbeat to peer %s", this.options.getPeerId()));
    }
    return;
}

if (rb.getEntriesCount() == 0) {
    if (nextSendingIndex < this.options.getLogManager().getFirstLogIndex()) {
        installSnapshot();
        return false;
    }
    // _id is unlock in _wait_more
    waitMoreEntries(nextSendingIndex);
    return false;
}

installSnapshot方法會構建一個InstallSnapshotRequest發送給follower。InstallSnapshotRequest主要存儲了快照的元數據以及一個uri信息。

InstallSnapshotRequestProcessor

該處理器主要用來接受leader發送的InstallSnapshotRequest請求。最後會調用handleInstallSnapshot 方法。

該方法和處理其餘rpc請求的邏輯基本一致。若是當前節點ok,leader合法,會調用snapshotExecutor.installSnapshot方法。

this.snapshotExecutor.installSnapshot(request, InstallSnapshotResponse.newBuilder(), done);

SnapshotExecutorImpl#installSnapshot

該方法首先會構建一個下載快照的對象。而後調用registerDownloadingSnapshot去進行下載。

下載邏輯其實很簡單:先加載元數據,創建文件和數據映射。而後去逐個去寫文件。

public void installSnapshot(final InstallSnapshotRequest request, final InstallSnapshotResponse.Builder response,
                            final RpcRequestClosure done) {
    final SnapshotMeta meta = request.getMeta();
    final DownloadingSnapshot ds = new DownloadingSnapshot(request, response, done);
    // DON'T access request, response, and done after this point
    // as the retry snapshot will replace this one.
    if (!registerDownloadingSnapshot(ds)) {
        LOG.warn("Fail to register downloading snapshot.");
        // This RPC will be responded by the previous session
        return;
    }
    Requires.requireNonNull(this.curCopier, "curCopier");
    try {
    //阻塞 等待快照數據下載
        this.curCopier.join();
    } catch (final InterruptedException e) {
        Thread.currentThread().interrupt();
        LOG.warn("Install snapshot copy job was canceled.");
        return;
    }

    loadDownloadingSnapshot(ds, meta);
}

最後調用loadDownloadingSnapshot去安裝文件數據。

具體邏輯在FSMCallerImpl的doSnapshotLoad 方法中。這個方法最後會調用業務狀態機的onSnapshotLoad 方法。

public boolean onSnapshotLoad(final LoadSnapshotClosure done) {
    return enqueueTask((task, sequence) -> {
        task.type = TaskType.SNAPSHOT_LOAD;
        task.done = done;
    });
}

參考CounterStateMachine中的實現。首先判斷數據是否有效(是否存在該文件),而後去從文件讀取。最後set到value

public boolean onSnapshotLoad(final SnapshotReader reader) {
    if (isLeader()) {
        LOG.warn("Leader is not supposed to load snapshot");
        return false;
    }
    if (reader.getFileMeta("data") == null) {
        LOG.error("Fail to find data file in {}", reader.getPath());
        return false;
    }
    final CounterSnapshotFile snapshot = new CounterSnapshotFile(reader.getPath() + File.separator + "data");
    try {
        this.value.set(snapshot.load());
        return true;
    } catch (final IOException e) {
        LOG.error("Fail to load snapshot from {}", snapshot.getPath());
        return false;
    }
}

jraft的快照實現總結

存儲快照

經過定時任務。定時執行存儲邏輯。邏輯須要業務方實現。

傳遞快照

由leader決定是否須要傳遞。通常寫入快照的日誌會被刪除。leader在發送日誌時候,發現日誌已經被刪除的話,就須要傳遞快照。

傳遞快照方式

由leader發送一個條RPC,告訴follower須要來下載快照。這條RPC包括快照元數據和uri信息。
follower接收到RPC,會去主動下載leader的快照信息。下載成功後寫入本地文件。

安裝快照

初始化的時候會加載。或者下載完後會去主動加載。加載邏輯很簡單,就是根據下載的快照元數據信息去從磁盤加載數據,而後應用到狀態機。

相關文章
相關標籤/搜索