上一篇咱們詳細分析了源碼分析 RocketMQ DLedger 多副本之 Leader 選主,本文將詳細分析日誌複製的實現。java
根據 raft 協議可知,當整個集羣完成 Leader 選主後,集羣中的主節點就能夠接受客戶端的請求,而集羣中的從節點只負責從主節點同步數據,而不會處理讀寫請求,與M-S結構的讀寫分離有着巨大的區別。數組
有了前篇文章的基礎,本文將直接從 Leader 處理客戶端請求入口開始,其入口爲:DLedgerServer 的 handleAppend 方法開始講起。服務器
在正式分析 RocketMQ DLedger 多副本複製以前,咱們首先來了解客戶端發送日誌的請求協議字段,其類圖以下所示: 數據結構
咱們先一一介紹各個字段的含義:日誌的請求處理處理入口爲 DLedgerServer 的 handleAppend 方法。app
DLedgerServer#handleAppend函數
PreConditions.check(memberState.getSelfId().equals(request.getRemoteId()), DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s", request.getRemoteId(), memberState.getSelfId());
reConditions.check(memberState.getGroup().equals(request.getGroup()), DLedgerResponseCode.UNKNOWN_GROUP, "%s != %s", request.getGroup(), memberState.getGroup());
PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER);
複製代碼
Step1:首先驗證請求的合理性:源碼分析
DLedgerServer#handleAppendpost
long currTerm = memberState.currTerm();
if (dLedgerEntryPusher.isPendingFull(currTerm)) { // @1
AppendEntryResponse appendEntryResponse = new AppendEntryResponse();
appendEntryResponse.setGroup(memberState.getGroup());
appendEntryResponse.setCode(DLedgerResponseCode.LEADER_PENDING_FULL.getCode());
appendEntryResponse.setTerm(currTerm);
appendEntryResponse.setLeaderId(memberState.getSelfId());
return AppendFuture.newCompletedFuture(-1, appendEntryResponse);
} else { // @2
DLedgerEntry dLedgerEntry = new DLedgerEntry();
dLedgerEntry.setBody(request.getBody());
DLedgerEntry resEntry = dLedgerStore.appendAsLeader(dLedgerEntry);
return dLedgerEntryPusher.waitAck(resEntry);
}
複製代碼
Step2:若是預處理隊列已經滿了,則拒絕客戶端請求,返回 LEADER_PENDING_FULL 錯誤碼;若是未滿,將請求封裝成 DledgerEntry,則調用 dLedgerStore 方法追加日誌,而且經過使用 dLedgerEntryPusher 的 waitAck 方法同步等待副本節點的複製響應,並最終將結果返回給調用方法。學習
接下來就按照上述三個要點進行展開:this
DLedgerEntryPusher#isPendingFull
public boolean isPendingFull(long currTerm) {
checkTermForPendingMap(currTerm, "isPendingFull"); // @1
return pendingAppendResponsesByTerm.get(currTerm).size() > dLedgerConfig.getMaxPendingRequestsNum(); // @2
}
複製代碼
主要分兩個步驟: 代碼@1:檢查當前投票輪次是否在 PendingMap 中,若是不在,則初始化,其結構爲:Map< Long/* 投票輪次*/, ConcurrentMap<Long, TimeoutFuture< AppendEntryResponse>>>。
代碼@2:檢測當前等待從節點返回結果的個數是否超過其最大請求數量,可經過maxPendingRequests Num 配置,該值默認爲:10000。
上述邏輯比較簡單,但疑問隨着而來,ConcurrentMap<Long, TimeoutFuture< AppendEntryResponse>> 中的數據是從何而來的呢?咱們不妨接着往下看。
Leader 節點的數據存儲主要由 DLedgerStore 的 appendAsLeader 方法實現。DLedger 分別實現了基於內存、基於文件的存儲實現,本文重點關注基於文件的存儲實現,其實現類爲:DLedgerMmapFileStore。
下面重點來分析一下數據存儲流程,其入口爲DLedgerMmapFileStore 的 appendAsLeader 方法。
DLedgerMmapFileStore#appendAsLeader
PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER);
PreConditions.check(!isDiskFull, DLedgerResponseCode.DISK_FULL);
複製代碼
Step1:首先判斷是否能夠追加數據,其判斷依據主要是以下兩點:
ByteBuffer dataBuffer = localEntryBuffer.get();
ByteBuffer indexBuffer = localIndexBuffer.get();
複製代碼
Step2:從本地線程變量獲取一個數據與索引 buffer。其中用於存儲數據的 ByteBuffer,其容量固定爲 4M ,索引的 ByteBuffer 爲兩個索引條目的長度,固定爲64個字節。
DLedgerEntryCoder.encode(entry, dataBuffer);
public static void encode(DLedgerEntry entry, ByteBuffer byteBuffer) {
byteBuffer.clear();
int size = entry.computSizeInBytes();
//always put magic on the first position
byteBuffer.putInt(entry.getMagic());
byteBuffer.putInt(size);
byteBuffer.putLong(entry.getIndex());
byteBuffer.putLong(entry.getTerm());
byteBuffer.putLong(entry.getPos());
byteBuffer.putInt(entry.getChannel());
byteBuffer.putInt(entry.getChainCrc());
byteBuffer.putInt(entry.getBodyCrc());
byteBuffer.putInt(entry.getBody().length);
byteBuffer.put(entry.getBody());
byteBuffer.flip();
}
複製代碼
Step3:將 DLedgerEntry,即將數據寫入到 ByteBuffer中,從這裏看出,每一次寫入會調用 ByteBuffer 的 clear 方法,將數據清空,從這裏能夠看出,每一次數據追加,只能存儲4M的數據。
DLedgerMmapFileStore#appendAsLeader
synchronized (memberState) {
PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER, null);
// ... 省略代碼
}
複製代碼
Step4:鎖定狀態機,並再一次檢測節點的狀態是不是 Leader 節點。
DLedgerMmapFileStore#appendAsLeader
long nextIndex = ledgerEndIndex + 1;
entry.setIndex(nextIndex);
entry.setTerm(memberState.currTerm());
entry.setMagic(CURRENT_MAGIC);
DLedgerEntryCoder.setIndexTerm(dataBuffer, nextIndex, memberState.currTerm(), CURRENT_MAGIC);
複製代碼
Step5:爲當前日誌條目設置序號,即 entryIndex 與 entryTerm (投票輪次)。並將魔數、entryIndex、entryTerm 等寫入到 bytebuffer 中。
DLedgerMmapFileStore#appendAsLeader
long prePos = dataFileList.preAppend(dataBuffer.remaining());
entry.setPos(prePos);
PreConditions.check(prePos != -1, DLedgerResponseCode.DISK_ERROR, null);
DLedgerEntryCoder.setPos(dataBuffer, prePos);
複製代碼
Step6:計算新的消息的起始偏移量,關於 dataFileList 的 preAppend 後續詳細介紹其實現,而後將該偏移量寫入日誌的 bytebuffer 中。
DLedgerMmapFileStore#appendAsLeader
for (AppendHook writeHook : appendHooks) {
writeHook.doHook(entry, dataBuffer.slice(), DLedgerEntry.BODY_OFFSET);
}
複製代碼
Step7:執行鉤子函數。
DLedgerMmapFileStore#appendAsLeader
long dataPos = dataFileList.append(dataBuffer.array(), 0, dataBuffer.remaining());
PreConditions.check(dataPos != -1, DLedgerResponseCode.DISK_ERROR, null);
PreConditions.check(dataPos == prePos, DLedgerResponseCode.DISK_ERROR, null);
複製代碼
Step8:將數據追加到 pagecache 中。該方法稍後詳細介紹。
DLedgerMmapFileStore#appendAsLeader
DLedgerEntryCoder.encodeIndex(dataPos, entrySize, CURRENT_MAGIC, nextIndex, memberState.currTerm(), indexBuffer);
long indexPos = indexFileList.append(indexBuffer.array(), 0, indexBuffer.remaining(), false);
PreConditions.check(indexPos == entry.getIndex() * INDEX_UNIT_SIZE, DLedgerResponseCode.DISK_ERROR, null);
複製代碼
Step9:構建條目索引並將索引數據追加到 pagecache。
DLedgerMmapFileStore#appendAsLeader
ledgerEndIndex++;
ledgerEndTerm = memberState.currTerm();
if (ledgerBeginIndex == -1) {
ledgerBeginIndex = ledgerEndIndex;
}
updateLedgerEndIndexAndTerm();
複製代碼
Step10:ledgerEndeIndex 加一(下一個條目)的序號。並設置 leader 節點的狀態機的 ledgerEndIndex 與 ledgerEndTerm。
Leader 節點數據追加就介紹到這裏,稍後會重點介紹與存儲相關方法的實現細節。
其實現入口爲 dLedgerEntryPusher 的 waitAck 方法。
DLedgerEntryPusher#waitAck
public CompletableFuture<AppendEntryResponse> waitAck(DLedgerEntry entry) {
updatePeerWaterMark(entry.getTerm(), memberState.getSelfId(), entry.getIndex()); // @1
if (memberState.getPeerMap().size() == 1) { // @2
AppendEntryResponse response = new AppendEntryResponse();
response.setGroup(memberState.getGroup());
response.setLeaderId(memberState.getSelfId());
response.setIndex(entry.getIndex());
response.setTerm(entry.getTerm());
response.setPos(entry.getPos());
return AppendFuture.newCompletedFuture(entry.getPos(), response);
} else {
checkTermForPendingMap(entry.getTerm(), "waitAck");
AppendFuture<AppendEntryResponse> future = new AppendFuture<>(dLedgerConfig.getMaxWaitAckTimeMs()); // @3
future.setPos(entry.getPos());
CompletableFuture<AppendEntryResponse> old = pendingAppendResponsesByTerm.get(entry.getTerm()).put(entry.getIndex(), future); // @4
if (old != null) {
logger.warn("[MONITOR] get old wait at index={}", entry.getIndex());
}
wakeUpDispatchers(); // @5
return future;
}
}
複製代碼
代碼@1:更新當前節點的 push 水位線。 代碼@2:若是集羣的節點個數爲1,無需轉發,直接返回成功結果。 代碼@3:構建 append 響應 Future 並設置超時時間,默認值爲:2500 ms,能夠經過 maxWaitAckTimeMs 配置改變其默認值。 代碼@4:將構建的 Future 放入等待結果集合中。 代碼@5:喚醒 Entry 轉發線程,即將主節點中的數據 push 到各個從節點。
接下來分別對上述幾個關鍵點進行解讀。
DLedgerEntryPusher#updatePeerWaterMark
private void updatePeerWaterMark(long term, String peerId, long index) { // 代碼@1
synchronized (peerWaterMarksByTerm) {
checkTermForWaterMark(term, "updatePeerWaterMark"); // 代碼@2
if (peerWaterMarksByTerm.get(term).get(peerId) < index) { // 代碼@3
peerWaterMarksByTerm.get(term).put(peerId, index);
}
}
}
複製代碼
代碼@1:先來簡單介紹該方法的兩個參數:
代碼@2:初始化 peerWaterMarksByTerm 數據結構,其結果爲 < Long /** term */, Map< String /** peerId */, Long /** entry index*/>。
代碼@3:若是 peerWaterMarksByTerm 存儲的 index 小於當前數據的 index,則更新。
DLedgerEntryPusher#updatePeerWaterMark
public void wakeUpDispatchers() {
for (EntryDispatcher dispatcher : dispatcherMap.values()) {
dispatcher.wakeup();
}
}
複製代碼
該方法主要就是遍歷轉發器並喚醒。本方法的核心關鍵就是 EntryDispatcher,在詳細介紹它以前咱們先來看一下該集合的初始化。
DLedgerEntryPusher 構造方法
for (String peer : memberState.getPeerMap().keySet()) {
if (!peer.equals(memberState.getSelfId())) {
dispatcherMap.put(peer, new EntryDispatcher(peer, logger));
}
}
複製代碼
原來在構建 DLedgerEntryPusher 時會爲每個從節點建立一個 EntryDispatcher 對象。
顯然,日誌的複製由 DLedgerEntryPusher 來實現。因爲篇幅的緣由,該部份內容將在下篇文章中繼續。
上面在講解 Leader 追加日誌時並無詳細分析存儲相關的實現,爲了知識體系的完備,接下來咱們來分析一下其核心實現。
本節主要對 MmapFileList 的 preAppend 與 append 方法進行詳細講解。
存儲部分的設計請查閱筆者的博客:源碼分析 RocketMQ DLedger 多副本存儲實現,MmapFileList 對標 RocketMQ 的MappedFileQueue。
該方法最終會調用兩個參數的 preAppend方法,故咱們直接來看兩個參數的 preAppend 方法。
MmapFileList#preAppend
public long preAppend(int len, boolean useBlank) { // @1
MmapFile mappedFile = getLastMappedFile(); // @2 start
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = getLastMappedFile(0);
}
if (null == mappedFile) {
logger.error("Create mapped file for {}", storePath);
return -1;
} // @2 end
int blank = useBlank ? MIN_BLANK_LEN : 0;
if (len + blank > mappedFile.getFileSize() - mappedFile.getWrotePosition()) { // @3
if (blank < MIN_BLANK_LEN) {
logger.error("Blank {} should ge {}", blank, MIN_BLANK_LEN);
return -1;
} else {
ByteBuffer byteBuffer = ByteBuffer.allocate(mappedFile.getFileSize() - mappedFile.getWrotePosition()); // @4
byteBuffer.putInt(BLANK_MAGIC_CODE); // @5
byteBuffer.putInt(mappedFile.getFileSize() - mappedFile.getWrotePosition()); // @6
if (mappedFile.appendMessage(byteBuffer.array())) { // @7
//need to set the wrote position
mappedFile.setWrotePosition(mappedFile.getFileSize());
} else {
logger.error("Append blank error for {}", storePath);
return -1;
}
mappedFile = getLastMappedFile(0);
if (null == mappedFile) {
logger.error("Create mapped file for {}", storePath);
return -1;
}
}
}
return mappedFile.getFileFromOffset() + mappedFile.getWrotePosition();// @8
}
複製代碼
代碼@1:首先介紹其參數的含義:
代碼@2:獲取最後一個文件,即獲取當前正在寫的文件。
代碼@3:若是須要申請的資源超過了當前文件可寫字節時,須要處理的邏輯。代碼@4-@7都是其處理邏輯。
代碼@4:申請一個當前文件剩餘字節的大小的bytebuffer。
代碼@5:先寫入魔數。
代碼@6:寫入字節長度,等於當前文件剩餘的總大小。
代碼@7:寫入空字節,代碼@4-@7的用意就是寫一條空Entry,填入魔數與 size,方便解析。
代碼@8:若是當前文件足以容納待寫入的日誌,則直接返回其物理偏移量。
通過上述代碼解讀,咱們很容易得出該方法的做用,就是返回待寫入日誌的起始物理偏移量。
最終會調用4個參數的 append 方法,其代碼以下: MmapFileList#append
public long append(byte[] data, int pos, int len, boolean useBlank) { // @1
if (preAppend(len, useBlank) == -1) {
return -1;
}
MmapFile mappedFile = getLastMappedFile(); // @2
long currPosition = mappedFile.getFileFromOffset() + mappedFile.getWrotePosition(); // @3
if (!mappedFile.appendMessage(data, pos, len)) { // @4
logger.error("Append error for {}", storePath);
return -1;
}
return currPosition;
}
複製代碼
代碼@1:首先介紹一下各個參數:
代碼@2:獲取最後一個文件,即當前可寫的文件。
代碼@3:獲取當前寫入指針。
代碼@4:追加消息。
最後咱們再來看一下 appendMessage,具體的消息追加實現邏輯。
DefaultMmapFile#appendMessage
public boolean appendMessage(final byte[] data, final int offset, final int length) {
int currentPos = this.wrotePosition.get();
if ((currentPos + length) <= this.fileSize) {
ByteBuffer byteBuffer = this.mappedByteBuffer.slice(); // @1
byteBuffer.position(currentPos);
byteBuffer.put(data, offset, length);
this.wrotePosition.addAndGet(length);
return true;
}
return false;
}
複製代碼
該方法我主要是想突出一下寫入的方式是 mappedByteBuffer,是經過 FileChannel 的 map 方法建立,即咱們常說的 PageCache,即消息追加首先是寫入到 pageCache 中。
本文詳細介紹了 Leader 節點處理客戶端消息追加請求的前面兩個步驟,即 判斷 Push 隊列是否已滿 與 Leader 節點存儲消息。考慮到篇幅的問題,各個節點的數據同步將在下一篇文章中詳細介紹。
在進入下一篇的文章學習以前,咱們不妨思考一下以下問題:
親愛的讀者朋友們,都讀到這裏了,麻煩幫忙個點個贊,謝謝。
做者簡介:《RocketMQ技術內幕》做者,RocketMQ 社區佈道師,維護公衆號:中間件興趣圈,可掃描以下二維碼與做者進行互動。