TiKV 源碼解析(六)raft-rs 日誌複製過程分析

做者:屈鵬git

《TiKV 源碼解析(二)raft-rs proposal 示例情景分析》 中,咱們主要介紹了 raft-rs 的基本 API 使用,其中,與應用程序進行交互的主要 API 是:github

  1. RawNode::propose 發起一次新的提交,嘗試在 Raft 日誌中追加一個新項;
  2. RawNode::ready_since 從 Raft 節點中獲取最近的更新,包括新近追加的日誌、新近確認的日誌,以及須要給其餘節點發送的消息等;
  3. 在將一個 Ready 中的全部更新處理完畢以後,使用 RawNode::advance 在這個 Raft 節點中將這個 Ready 標記爲完成狀態。

熟悉了以上 3 個 API,用戶就能夠寫出基本的基於 Raft 的分佈式應用的框架了,而 Raft 協議中將寫入同步到多個副本中的任務,則由 raft-rs 庫自己的內部實現來完成,無須應用程序進行額外干預。本文將對數據冗餘複製的過程進行詳細展開,特別是關於 snapshot 及流量控制的機制,幫助讀者更深入地理解 Raft 的原理。網絡

通常 MsgAppend 及 MsgAppendResponse 的處理

在 Raft leader 上,應用程序經過 RawNode::propose 發起的寫入會被處理成一條 MsgPropose 類型的消息,而後調用 Raft::append_entry 和 Raft::bcast_append 將消息中的數據追加到 Raft 日誌中並廣播到其餘副本上。總體流程如僞代碼所示:app

fn Raft::step_leader(&mut self, mut m: Message) -> Result<()> {
    if m.get_msg_type() == MessageType::MsgPropose {
        // Propose with an empty entry list is not allowed.
        assert!(!m.get_entries().is_empty());
        self.append_entry(&mut m.mut_entries());
        self.bcast_append();
    }
}

這段代碼中 append_entry 的參數是一個可變引用,這是由於在 append_entry 函數中會爲每個 Entry 賦予正確的 term 和 index。term 由選舉產生,在一個 Raft 系統中,每選舉出一個新的 Leader,便會產生一個更高的 term。而 index 則是 Entry 在 Raft 日誌中的下標。Entry 須要帶上 term 和 index 的緣由是,在其餘副本上的 Raft 日誌是可能跟 Leader 不一樣的,例如一箇舊 Leader 在相同的位置(即 Raft 日誌中具備相同 index 的地方)廣播了一條過時的 Entry,那麼當其餘副本收到了重疊的、可是具備更高 term 的消息時,即可以用它們替換舊的消息,以便達成與最新的 Leader 一致的狀態。框架

在 Leader 將新的寫入追加到本身的 Raft log 中以後,即可以調用 bcast_append 將它們廣播到其餘副本了。注意這個函數並無任何參數,那麼 Leader 如何知道應該給每個副本從哪個位置開始廣播呢?原來在 Leader 上對每個副本,都關聯維護了一個 Progress,該結構體定義以下:分佈式

pub struct Progress {
    pub matched: u64,
    // 該副本指望接收的下一個 Entry 的 index
    pub next_idx: u64,
    // 未 commit 的消息的滑動窗口
    pub ins: Inflights,
    // ProgressState::Probe:Leader 每一個心跳間隔中最多發送一條 MsgAppend
    // ProgressState::Replicate:Leader 在每一個心跳間隔中能夠發送多個 MsgAppend
    // ProgressState::Snapshot:Leader 沒法再繼續發送 MsgAppend 給這個副本
    pub state: ProgressState,
    // 是否暫停給這個副本發送 MsgAppend 了
    pub paused: bool,
    // 一些其餘字段……
}

如代碼註釋中所說的那樣,Leader 在給副本廣播新的日誌時,會從對應的副本的 next_idx 開始。這就蘊含了兩個問題:ide

  1. 在剛開始啓動的時候,全部副本的 next_idx 應該如何設置?
  2. 在接收並處理完成 Leader 廣播的新寫入後,其餘副本應該如何向 Leader 更新 next_idx

第一個問題的答案在 Raft::reset 函數中。這個函數會在 Raft 完成選舉以後選出的 Leader 上調用,會將 Leader 的全部其餘副本的 next_idx 設置爲跟 Leader 相同的值。以後,Leader 就能夠會按照 Raft 論文裏的規定,廣播一條包含了本身的 term 的空 Entry 了。函數

第二個問題的答案在 Raft::handle_append_response 函數中。咱們繼續考察上面的情景,Leader 的其餘副本在收到 Leader 廣播的最新的日誌以後,可能會採起兩種動做:優化

fn Raft::handle_append_entries(&mut self, m: &Message) {
    let mut to_send = Message::new_message_append_response();
    match self.raft_log.maybe_append(...) {
        // 追加日誌成功,將最新的 last index 上報給 Leader
        Some(last_index) => to_send.set_index(last_index),
        // 追加日誌失敗,設置 reject 標誌,並告訴 Leader 本身的 last index
        None => {
            to_send.set_reject(true);
            to_send.set_reject_hint(self.raft_log.last_index());
        }
    }
}
self.send(to_send);

其餘副本調用 maybe_append 失敗的緣由多是比 Leader 的日誌更少,可是 Leader 在剛選舉出來的時候將全部副本的 next_idx 設置爲與本身相同的值了。這個時候這些副本就會在 MsgAppendResponse 中設置拒絕的標誌。在 Leader 接收到這樣的反饋以後,就能夠將對應副本的 next_idx 設置爲正確的值了。這個邏輯在 Raft::handle_append_response 中:spa

fn Raft::handle_append_response(&mut self, m: &Message, …) {
    if m.get_reject() {
        let pr: &mut Progress = self.get_progress(m.get_from());
        // 將副本對應的 `next_idx` 回退到一個合適的值
        pr.maybe_decr_to(m.get_index(), m.get_reject_hint());
    } else {
        // 將副本對應的 `next_idx` 設置爲 `m.get_index() + 1`
        pr.maybe_update(m.get_index());
    }
}

以上僞代碼中咱們省略了一些丟棄亂序消息的代碼,避免過多的細節形成干擾。

pipeline 優化和流量控制機制

上一節咱們重點觀察了 MsgAppend 及 MsgAppendResponse 消息的處理流程,原理是很是簡單、清晰的。然而,這個未經任何優化的實現可以工做的前提是在 Leader 收到某個副本的 MsgAppendResponse 以前,再也不給它發送任何 MsgAppend。因爲等待響應的時間取決於網絡的 TTL,這在實際應用中是很是低效的,所以咱們須要引入 pipeline 優化,以及配套的流量控制機制來避免「優化」帶來的網絡壅塞。

Pipeline 在 Raft::prepare_send_entries 函數中被引入。這個函數在 Raft::send_append 中被調用,內部會直接修改對目標副本的 next_idex 值,這樣,後續的 MsgAppend 即可以在此基礎上繼續發送了。而一旦以前的 MsgAppend 被該目標副本拒絕掉了,也能夠經過上一節中介紹的 maybe_decr_to 機制將 next_idx 重置爲正確的值。咱們來看一下這段代碼:

// 這個函數在 `Raft::prepare_send_entries` 中被調用
fn Progress::update_state(&mut self, last: u64) {
    match self.state {
        ProgressState::Replicate => {
            self.next_idx = last + 1;
            self.ins.add(last);
        },
        ProgressState::Probe => self.pause(),
       _ => unreachable!(),
    }
}

Progress 有 3 種不一樣的狀態,如這個結構體的定義的代碼片斷所示。其中 Probe 狀態和 Snapshot 狀態會在下一節詳細介紹,如今只須要關注 Replicate 狀態。咱們已經知道 Pipeline 機制是由更新 next_idx 的那一行引入的了,那麼下面更新 ins 的一行的做用是什麼呢?

從 Progress 的定義的代碼片斷中咱們知道,ins 字段的類型是 Inflights,能夠想象成一個相似 TCP 的滑動窗口:全部 Leader 發出了,可是還沒有被目標副本響應的消息,都被框在該副本在 Leader 上對應的 Progress 的 ins 中。這樣,因爲滑動窗口的大小是有限的,Raft 系統中任意時刻的消息數量也會是有限的,這就實現了流量控制的機制。更具體地,Leader 在給某一副本發送 MsgAppend 時,會檢查其對應的滑動窗口,這個邏輯在 Raft::send_append 函數中;在收到該副本的 MsgAppendResponse 以後,會適時調用 Inflights 的 free_to 函數,使窗口向前滑動,這個邏輯在 Raft::handle_append_response 中。

ProgressState 相關優化

咱們已經在 Progress 結構體的定義以及上面一些代碼片斷中見過了 ProgressState 這個枚舉類型。在 3 種可能的狀態中,Replicate 狀態是最容易理解的,Leader 能夠給對應的副本發送多個 MsgAppend 消息(不超過滑動窗口的限制),並適時地將窗口向前滑動。然而,咱們注意到,在 Leader 剛選舉出來時,Leader 上面的全部其餘副本的狀態卻被設置成了 Probe。這是爲何呢?

從 Progress 結構體的字段註釋中,咱們知道當某個副本處於 Probe 狀態時,Leader 只能給它發送 1 條 MsgAppend 消息。這是由於,在這個狀態下的 Progress 的 next_idx 是 Leader 猜出來的,而不是由這個副本明確的上報信息推算出來的。它有很大的機率是錯誤的,亦即 Leader 極可能會回退到某個地方從新發送;甚至有可能這個副本是不活躍的,那麼 Leader 發送的整個滑動窗口的消息均可能浪費掉。所以,咱們引入 Probe 狀態,當 Leader 給處於這一狀態的副本發送了 MsgAppend 時,這個 Progress 會被暫停掉(源碼片斷見上一節),這樣在下一次嘗試給這個副本發送 MsgAppend 時,會在 Raft::send_append 中跳過。而當 Leader 收到了這個副本上報的正確的 last index 以後,Leader 便知道下一次應該從什麼位置給這個副本發送日誌了,這一過程在 Progress::maybe_update 函數中:

fn Progress::maybe_update(&mut self, n: u64) {
    if self.matched < n {
        self.matched = n;
        self.resume(); // 取消暫停的狀態
    }
    if self.next_idx < n + 1 {
        self.next = n + 1;
    }
}

ProgressState::Snapshot 狀態與 Progress 中的 pause 標誌十分類似,一個副本對應的 Progress 一旦處於這個狀態,Leader 便不會再給這個副本發送任何 MsgAppend 了。可是仍有細微的差異:事實上在 Leader 收到 MsgHeartbeatResponse 時,也會調用 Progress::resume 來將取消對該副本的暫停,然而對於 ProgressState::Snapshot 狀態的 Progress 則沒有這個邏輯。這個狀態會在 Leader 成功發送完成 Snapshot,或者收到了對應的副本的最新的 MsgAppendResponse 以後被改變,詳細的邏輯請參考源代碼,這裏就不做贅述了。

咱們把篇幅留給在 Follower 上收到 Snapshot 以後的處理邏輯,主要是 Raft::restore_raftRaftLog::restore 兩個函數。前者中主要包含了對 Progress 的處理,由於 Snapshot 包含了 Leader 上最新的信息,而 Leader 上的 Configuration 是可能跟 Follower 不一樣的。後者的主要邏輯僞代碼以下所示:

fn RaftLog::restore(&mut self, snapshot: Snapshot) {
    self.committed = snapshot.get_metadata().get_index();
    self.unstable.restore(snapshot);
}

能夠看到,內部僅更新了 committed,並無更新 applied。這是由於 raft-rs 僅關心 Raft 日誌的部分,至於如何把日誌中的內容更新到真正的狀態機中,是應用程序的任務。應用程序須要從上一篇文章中介紹的 Ready 接口中把 Snapshot 拿到,而後自行將其應用到狀態機中,最後再經過 RawNode::advance 接口將 applied 更新到正確的值。

總結

Raft 日誌複製及相關的流量控制、Snapshot 流程就介紹到這裏,代碼倉庫仍然在 https://github.com/pingcap/raft-rs,source-code 分支。下一期 raft-rs 源碼解析咱們會繼續爲你們帶來 configuration change 相關的內容,敬請期待!

相關文章
相關標籤/搜索