這篇文章咱們來學習一下JRaft的SPI機制,以及快照的實現。對於SPI,在框架開發,尤爲是模塊設計中是很是必要的,他能夠實現可插拔的業務邏輯,實現解藕。對於快照,其實就是raft對日誌的優化,避免新節點加入後大量的網絡和磁盤IO,而且能夠節省磁盤空間,是很是必要的。接下來咱們慢慢看。java
主要實現類就是JRaftServiceLoader。
咱們來看看使用方式node
public static final JRaftServiceFactory defaultServiceFactory = JRaftServiceLoader.load(JRaftServiceFactory.class).first();
public static <S> JRaftServiceLoader<S> load(final Class<S> service) { return JRaftServiceLoader.load(service, Thread.currentThread().getContextClassLoader()); } public static <S> JRaftServiceLoader<S> load(final Class<S> service, final ClassLoader loader) { return new JRaftServiceLoader<>(service, loader); }
經過當前線程上線文類加載器(其實就是AppClassLoader)去進行加載工做。固然這裏直接用JRaftServiceLoader的類加載器也是同樣的效果。緩存
public S first() { final List<S> sortList = sort(); if (sortList.isEmpty()) { throw fail(this.service, "could not find any implementation for class"); } return sortList.get(0); }
這個sort方法其實就是獲取到一個排序實現類的集合。而後從中獲取第一個(最優先的)。網絡
public List<S> sort() { final Iterator<S> it = iterator(); final List<S> sortList = new ArrayList<>(); while (it.hasNext()) { sortList.add(it.next()); } if (sortList.size() <= 1) { return sortList; } sortList.sort((o1, o2) -> { final SPI o1_spi = o1.getClass().getAnnotation(SPI.class); final SPI o2_spi = o2.getClass().getAnnotation(SPI.class); final int o1_priority = o1_spi == null ? 0 : o1_spi.priority(); final int o2_priority = o2_spi == null ? 0 : o2_spi.priority(); return -(o1_priority - o2_priority); }); return sortList; }
這個方法其實實現很簡單,就是經過迭代器,去迭代獲取全部的實現,而後根據實現類的SPI註解中的priority,也就是優先級進行排序,最後返回一個有序的集合。
咱們重點仍是關注這個迭代器的實現。session
final Iterator<Map.Entry<String, S>> knownProviders = JRaftServiceLoader.this.providers.entrySet() .iterator(); @Override public boolean hasNext() { return this.knownProviders.hasNext() || JRaftServiceLoader.this.lookupIterator.hasNext(); } @Override public S next() { if (this.knownProviders.hasNext()) { return this.knownProviders.next().getValue(); } final Class<S> cls = JRaftServiceLoader.this.lookupIterator.next(); try { final S provider = JRaftServiceLoader.this.service.cast(cls.newInstance()); JRaftServiceLoader.this.providers.put(cls.getName(), provider); return provider; } catch (final Throwable x) { throw fail(JRaftServiceLoader.this.service, "provider " + cls.getName() + " could not be instantiated", x); } }
knownProviders其實就是一個緩存,加載成功的會放在這個Map裏面。也就是咱們第二次調用iterator方法的時候,就不須要從新執行類加載過程了(也不會從新執行),直接從緩存中獲取。
若是是第一次執行該方法,那麼依賴lookupIterator去執行加載過程。咱們先來看一下lookupIterator(LazyIterator)的實現。框架
@Override public boolean hasNext() { if (this.nextName != null) { return true; } if (this.configs == null) { try { final String fullName = PREFIX + this.service.getName(); if (this.loader == null) { this.configs = ClassLoader.getSystemResources(fullName); } else { this.configs = this.loader.getResources(fullName); } } catch (final IOException x) { throw fail(this.service, "error locating configuration files", x); } } while ((this.pending == null) || !this.pending.hasNext()) { if (!this.configs.hasMoreElements()) { return false; } this.pending = parse(this.service, this.configs.nextElement()); } this.nextName = this.pending.next(); return true; }
這裏有個細節,就是加了一個this.nextName,若是不爲空的話,直接返回true,由於後面在判斷成功後會給this.nextName賦值。若是調用了hasNext方法,卻沒有調用next,那麼下次調用hasNext就不用在走一遍判斷邏輯。ide
其實方法邏輯很簡單,就是獲取對應路徑下的文件,並加載出來。而後逐行解析。無論每行多個實現或者只有一個。都是能夠適配的。到這裏基本上就說完了。學習
實現仍是比較簡單。經過迭代器,懶加載,緩存來實現的。並加了優先級。優化
在節點初始化的時候,會啓動快照的定時器snapshotTimer。
默認時間爲一天執行一次。ui
這個方法流程以下:
final SnapshotWriter writer = this.snapshotStorage.create(); if (writer == null) { Utils.runClosureInThread(done, new Status(RaftError.EIO, "Fail to create writer.")); reportError(RaftError.EIO.getNumber(), "Fail to create snapshot writer."); return; } this.savingSnapshot = true; final SaveSnapshotDone saveSnapshotDone = new SaveSnapshotDone(writer, done, null); if (!this.fsmCaller.onSnapshotSave(saveSnapshotDone)) { Utils.runClosureInThread(done, new Status(RaftError.EHOSTDOWN, "The raft node is down.")); return; }
在fsmCaller中的runApplyTask 方法會處理snapshot save事件,最終調用doSnapshotSave方法。
case SNAPSHOT_SAVE: this.currTask = TaskType.SNAPSHOT_SAVE; if (passByStatus(task.done)) { doSnapshotSave((SaveSnapshotClosure) task.done); } break;
這個方法主要是爲SnapshotWriter構建SnapshotMeta 對象,填充對應元數據,最後會調用onSnapshotSave 方法。
final SnapshotWriter writer = done.start(metaBuilder.build()); if (writer == null) { done.run(new Status(RaftError.EINVAL, "snapshot_storage create SnapshotWriter failed")); return; } this.fsm.onSnapshotSave(writer, done);
this.fsm.onSnapshotSave方法須要業務方去實現,主要就是快照的保存和壓縮。業務方需將快照文件數據加入到SnapshotWriter中。
安裝快照實在Replicator中執行。
fillCommonFields返回false就調用installSnapshot方法。
或者當前要同步的日誌數量爲0(兩種狀況,一種是確實沒有日誌同步,另外一種是日誌被刪除,存儲到快照文件了,若是是被刪除,就須要同步快照),也會調用installSnapshot方法。
if (!fillCommonFields(rb, this.nextIndex - 1, isHeartbeat)) { // id is unlock in installSnapshot installSnapshot(); if (isHeartbeat && heartBeatClosure != null) { Utils.runClosureInThread(heartBeatClosure, new Status(RaftError.EAGAIN, "Fail to send heartbeat to peer %s", this.options.getPeerId())); } return; } if (rb.getEntriesCount() == 0) { if (nextSendingIndex < this.options.getLogManager().getFirstLogIndex()) { installSnapshot(); return false; } // _id is unlock in _wait_more waitMoreEntries(nextSendingIndex); return false; }
installSnapshot方法會構建一個InstallSnapshotRequest發送給follower。InstallSnapshotRequest主要存儲了快照的元數據以及一個uri信息。
該處理器主要用來接受leader發送的InstallSnapshotRequest請求。最後會調用handleInstallSnapshot 方法。
該方法和處理其餘rpc請求的邏輯基本一致。若是當前節點ok,leader合法,會調用snapshotExecutor.installSnapshot方法。
this.snapshotExecutor.installSnapshot(request, InstallSnapshotResponse.newBuilder(), done);
該方法首先會構建一個下載快照的對象。而後調用registerDownloadingSnapshot去進行下載。
下載邏輯其實很簡單:先加載元數據,創建文件和數據映射。而後去逐個去寫文件。
public void installSnapshot(final InstallSnapshotRequest request, final InstallSnapshotResponse.Builder response, final RpcRequestClosure done) { final SnapshotMeta meta = request.getMeta(); final DownloadingSnapshot ds = new DownloadingSnapshot(request, response, done); // DON'T access request, response, and done after this point // as the retry snapshot will replace this one. if (!registerDownloadingSnapshot(ds)) { LOG.warn("Fail to register downloading snapshot."); // This RPC will be responded by the previous session return; } Requires.requireNonNull(this.curCopier, "curCopier"); try { //阻塞 等待快照數據下載 this.curCopier.join(); } catch (final InterruptedException e) { Thread.currentThread().interrupt(); LOG.warn("Install snapshot copy job was canceled."); return; } loadDownloadingSnapshot(ds, meta); }
最後調用loadDownloadingSnapshot去安裝文件數據。
具體邏輯在FSMCallerImpl的doSnapshotLoad 方法中。這個方法最後會調用業務狀態機的onSnapshotLoad 方法。
public boolean onSnapshotLoad(final LoadSnapshotClosure done) { return enqueueTask((task, sequence) -> { task.type = TaskType.SNAPSHOT_LOAD; task.done = done; }); }
參考CounterStateMachine中的實現。首先判斷數據是否有效(是否存在該文件),而後去從文件讀取。最後set到value
public boolean onSnapshotLoad(final SnapshotReader reader) { if (isLeader()) { LOG.warn("Leader is not supposed to load snapshot"); return false; } if (reader.getFileMeta("data") == null) { LOG.error("Fail to find data file in {}", reader.getPath()); return false; } final CounterSnapshotFile snapshot = new CounterSnapshotFile(reader.getPath() + File.separator + "data"); try { this.value.set(snapshot.load()); return true; } catch (final IOException e) { LOG.error("Fail to load snapshot from {}", snapshot.getPath()); return false; } }
存儲快照
經過定時任務。定時執行存儲邏輯。邏輯須要業務方實現。
傳遞快照
由leader決定是否須要傳遞。通常寫入快照的日誌會被刪除。leader在發送日誌時候,發現日誌已經被刪除的話,就須要傳遞快照。
傳遞快照方式
由leader發送一個條RPC,告訴follower須要來下載快照。這條RPC包括快照元數據和uri信息。
follower接收到RPC,會去主動下載leader的快照信息。下載成功後寫入本地文件。
安裝快照
初始化的時候會加載。或者下載完後會去主動加載。加載邏輯很簡單,就是根據下載的快照元數據信息去從磁盤加載數據,而後應用到狀態機。