做者:屈鵬git
在 《TiKV 源碼解析(二)raft-rs proposal 示例情景分析》 中,咱們主要介紹了 raft-rs 的基本 API 使用,其中,與應用程序進行交互的主要 API 是:github
RawNode::propose 發起一次新的提交,嘗試在 Raft 日誌中追加一個新項;網絡
RawNode::ready_since 從 Raft 節點中獲取最近的更新,包括新近追加的日誌、新近確認的日誌,以及須要給其餘節點發送的消息等;app
在將一個 Ready 中的全部更新處理完畢以後,使用 RawNode::advance 在這個 Raft 節點中將這個 Ready 標記爲完成狀態。框架
熟悉了以上 3 個 API,用戶就能夠寫出基本的基於 Raft 的分佈式應用的框架了,而 Raft 協議中將寫入同步到多個副本中的任務,則由 raft-rs 庫自己的內部實現來完成,無須應用程序進行額外干預。本文將對數據冗餘複製的過程進行詳細展開,特別是關於 snapshot 及流量控制的機制,幫助讀者更深入地理解 Raft 的原理。分佈式
在 Raft leader 上,應用程序經過 RawNode::propose 發起的寫入會被處理成一條 MsgPropose 類型的消息,而後調用 Raft::append_entry 和 Raft::bcast_append 將消息中的數據追加到 Raft 日誌中並廣播到其餘副本上。總體流程如僞代碼所示:ide
fn Raft::step_leader(&mut self, mut m: Message) -> Result<()> { if m.get_msg_type() == MessageType::MsgPropose { // Propose with an empty entry list is not allowed. assert!(!m.get_entries().is_empty()); self.append_entry(&mut m.mut_entries()); self.bcast_append(); } }
這段代碼中 append_entry
的參數是一個可變引用,這是由於在 append_entry
函數中會爲每個 Entry 賦予正確的 term 和 index。term 由選舉產生,在一個 Raft 系統中,每選舉出一個新的 Leader,便會產生一個更高的 term。而 index 則是 Entry 在 Raft 日誌中的下標。Entry 須要帶上 term 和 index 的緣由是,在其餘副本上的 Raft 日誌是可能跟 Leader 不一樣的,例如一箇舊 Leader 在相同的位置(即 Raft 日誌中具備相同 index 的地方)廣播了一條過時的 Entry,那麼當其餘副本收到了重疊的、可是具備更高 term 的消息時,即可以用它們替換舊的消息,以便達成與最新的 Leader 一致的狀態。函數
在 Leader 將新的寫入追加到本身的 Raft log 中以後,即可以調用 bcast_append
將它們廣播到其餘副本了。注意這個函數並無任何參數,那麼 Leader 如何知道應該給每個副本從哪個位置開始廣播呢?原來在 Leader 上對每個副本,都關聯維護了一個 Progress,該結構體定義以下:優化
pub struct Progress { pub matched: u64, // 該副本指望接收的下一個 Entry 的 index pub next_idx: u64, // 未 commit 的消息的滑動窗口 pub ins: Inflights, // ProgressState::Probe:Leader 每一個心跳間隔中最多發送一條 MsgAppend // ProgressState::Replicate:Leader 在每一個心跳間隔中能夠發送多個 MsgAppend // ProgressState::Snapshot:Leader 沒法再繼續發送 MsgAppend 給這個副本 pub state: ProgressState, // 是否暫停給這個副本發送 MsgAppend 了 pub paused: bool, // 一些其餘字段…… }
如代碼註釋中所說的那樣,Leader 在給副本廣播新的日誌時,會從對應的副本的 next_idx
開始。這就蘊含了兩個問題:rest
next_idx
應該如何設置?next_idx
?第一個問題的答案在 Raft::reset
函數中。這個函數會在 Raft 完成選舉以後選出的 Leader 上調用,會將 Leader 的全部其餘副本的 next_idx
設置爲跟 Leader 相同的值。以後,Leader 就能夠會按照 Raft 論文裏的規定,廣播一條包含了本身的 term 的空 Entry 了。
第二個問題的答案在 Raft::handle_append_response
函數中。咱們繼續考察上面的情景,Leader 的其餘副本在收到 Leader 廣播的最新的日誌以後,可能會採起兩種動做:
fn Raft::handle_append_entries(&mut self, m: &Message) { let mut to_send = Message::new_message_append_response(); match self.raft_log.maybe_append(...) { // 追加日誌成功,將最新的 last index 上報給 Leader Some(last_index) => to_send.set_index(last_index), // 追加日誌失敗,設置 reject 標誌,並告訴 Leader 本身的 last index None => { to_send.set_reject(true); to_send.set_reject_hint(self.raft_log.last_index()); } } } self.send(to_send);
其餘副本調用 maybe_append
失敗的緣由多是比 Leader 的日誌更少,可是 Leader 在剛選舉出來的時候將全部副本的 next_idx
設置爲與本身相同的值了。這個時候這些副本就會在 MsgAppendResponse 中設置拒絕的標誌。在 Leader 接收到這樣的反饋以後,就能夠將對應副本的 next_idx
設置爲正確的值了。這個邏輯在 Raft::handle_append_response
中:
fn Raft::handle_append_response(&mut self, m: &Message, …) { if m.get_reject() { let pr: &mut Progress = self.get_progress(m.get_from()); // 將副本對應的 `next_idx` 回退到一個合適的值 pr.maybe_decr_to(m.get_index(), m.get_reject_hint()); } else { // 將副本對應的 `next_idx` 設置爲 `m.get_index() + 1` pr.maybe_update(m.get_index()); } }
以上僞代碼中咱們省略了一些丟棄亂序消息的代碼,避免過多的細節形成干擾。
上一節咱們重點觀察了 MsgAppend 及 MsgAppendResponse 消息的處理流程,原理是很是簡單、清晰的。然而,這個未經任何優化的實現可以工做的前提是在 Leader 收到某個副本的 MsgAppendResponse 以前,再也不給它發送任何 MsgAppend。因爲等待響應的時間取決於網絡的 TTL,這在實際應用中是很是低效的,所以咱們須要引入 pipeline 優化,以及配套的流量控制機制來避免「優化」帶來的網絡壅塞。
Pipeline 在 Raft::prepare_send_entries
函數中被引入。這個函數在 Raft::send_append
中被調用,內部會直接修改對目標副本的 next_idex
值,這樣,後續的 MsgAppend 即可以在此基礎上繼續發送了。而一旦以前的 MsgAppend 被該目標副本拒絕掉了,也能夠經過上一節中介紹的 maybe_decr_to
機制將 next_idx
重置爲正確的值。咱們來看一下這段代碼:
// 這個函數在 `Raft::prepare_send_entries` 中被調用 fn Progress::update_state(&mut self, last: u64) { match self.state { ProgressState::Replicate => { self.next_idx = last + 1; self.ins.add(last); }, ProgressState::Probe => self.pause(), _ => unreachable!(), } }
Progress 有 3 種不一樣的狀態,如這個結構體的定義的代碼片斷所示。其中 Probe 狀態和 Snapshot 狀態會在下一節詳細介紹,如今只須要關注 Replicate 狀態。咱們已經知道 Pipeline 機制是由更新 next_idx
的那一行引入的了,那麼下面更新 ins
的一行的做用是什麼呢?
從 Progress 的定義的代碼片斷中咱們知道,ins
字段的類型是 Inflights,能夠想象成一個相似 TCP 的滑動窗口:全部 Leader 發出了,可是還沒有被目標副本響應的消息,都被框在該副本在 Leader 上對應的 Progress 的 ins
中。這樣,因爲滑動窗口的大小是有限的,Raft 系統中任意時刻的消息數量也會是有限的,這就實現了流量控制的機制。更具體地,Leader 在給某一副本發送 MsgAppend 時,會檢查其對應的滑動窗口,這個邏輯在 Raft::send_append
函數中;在收到該副本的 MsgAppendResponse 以後,會適時調用 Inflights 的 free_to
函數,使窗口向前滑動,這個邏輯在 Raft::handle_append_response
中。
咱們已經在 Progress 結構體的定義以及上面一些代碼片斷中見過了 ProgressState 這個枚舉類型。在 3 種可能的狀態中,Replicate 狀態是最容易理解的,Leader 能夠給對應的副本發送多個 MsgAppend 消息(不超過滑動窗口的限制),並適時地將窗口向前滑動。然而,咱們注意到,在 Leader 剛選舉出來時,Leader 上面的全部其餘副本的狀態卻被設置成了 Probe。這是爲何呢?
從 Progress 結構體的字段註釋中,咱們知道當某個副本處於 Probe 狀態時,Leader 只能給它發送 1 條 MsgAppend 消息。這是由於,在這個狀態下的 Progress 的 next_idx
是 Leader 猜出來的,而不是由這個副本明確的上報信息推算出來的。它有很大的機率是錯誤的,亦即 Leader 極可能會回退到某個地方從新發送;甚至有可能這個副本是不活躍的,那麼 Leader 發送的整個滑動窗口的消息均可能浪費掉。所以,咱們引入 Probe 狀態,當 Leader 給處於這一狀態的副本發送了 MsgAppend 時,這個 Progress 會被暫停掉(源碼片斷見上一節),這樣在下一次嘗試給這個副本發送 MsgAppend 時,會在 Raft::send_append
中跳過。而當 Leader 收到了這個副本上報的正確的 last index 以後,Leader 便知道下一次應該從什麼位置給這個副本發送日誌了,這一過程在 Progress::maybe_update
函數中:
fn Progress::maybe_update(&mut self, n: u64) { if self.matched < n { self.matched = n; self.resume(); // 取消暫停的狀態 } if self.next_idx < n + 1 { self.next = n + 1; } }
ProgressState::Snapshot 狀態與 Progress 中的 pause 標誌十分類似,一個副本對應的 Progress 一旦處於這個狀態,Leader 便不會再給這個副本發送任何 MsgAppend 了。可是仍有細微的差異:事實上在 Leader 收到 MsgHeartbeatResponse 時,也會調用 Progress::resume
來將取消對該副本的暫停,然而對於 ProgressState::Snapshot 狀態的 Progress 則沒有這個邏輯。這個狀態會在 Leader 成功發送完成 Snapshot,或者收到了對應的副本的最新的 MsgAppendResponse 以後被改變,詳細的邏輯請參考源代碼,這裏就不做贅述了。
咱們把篇幅留給在 Follower 上收到 Snapshot 以後的處理邏輯,主要是 Raft::restore_raft
和 RaftLog::restore
兩個函數。前者中主要包含了對 Progress 的處理,由於 Snapshot 包含了 Leader 上最新的信息,而 Leader 上的 Configuration 是可能跟 Follower 不一樣的。後者的主要邏輯僞代碼以下所示:
fn RaftLog::restore(&mut self, snapshot: Snapshot) { self.committed = snapshot.get_metadata().get_index(); self.unstable.restore(snapshot); }
能夠看到,內部僅更新了 committed,並無更新 applied。這是由於 raft-rs 僅關心 Raft 日誌的部分,至於如何把日誌中的內容更新到真正的狀態機中,是應用程序的任務。應用程序須要從上一篇文章中介紹的 Ready 接口中把 Snapshot 拿到,而後自行將其應用到狀態機中,最後再經過 RawNode::advance
接口將 applied 更新到正確的值。
Raft 日誌複製及相關的流量控制、Snapshot 流程就介紹到這裏,代碼倉庫仍然在 https://github.com/pingcap/raft-rs,source-code 分支。下一期 raft-rs 源碼解析咱們會繼續爲你們帶來 configuration change 相關的內容,敬請期待!