TiKV 源碼解析系列文章(十)Snapshot 的發送和接收

做者:黃夢龍git

背景知識

TiKV 使用 Raft 算法來提供高可用且具備強一致性的存儲服務。在 Raft 中,Snapshot 指的是整個 State Machine 數據的一份快照,大致上有如下這幾種狀況須要用到 Snapshot:github

  1. 正常狀況下 leader 與 follower/learner 之間是經過 append log 的方式進行同步的,出於空間和效率的考慮,leader 會按期清理過老的 log。假如 follower/learner 出現宕機或者網絡隔離,恢復之後可能所缺的 log 已經在 leader 節點被清理掉了,此時只能經過 Snapshot 的方式進行同步。
  2. Raft 加入新的節點的,因爲新節點沒同步過任何日誌,只能經過接收 Snapshot 的方式來同步。實際上這也能夠認爲是 1 的一種特殊情形。
  3. 出於備份/恢復等需求,應用層須要 dump 一份 State Machine 的完整數據。

TiKV 涉及到的是 1 和 2 這兩種狀況。在咱們的實現中,Snapshot 老是由 Region leader 所在的 TiKV 生成,經過網絡發送給 Region follower/learner 所在的 TiKV。算法

理論上講,咱們徹底能夠把 Snapshot 看成普通的 RaftMessage 來發送,但這樣作實踐上會產生一些問題,主要是由於 Snapshot 消息的尺寸遠大於其餘 RaftMessage服務器

  1. Snapshot 消息須要花費更長的時間來發送,若是共用網絡鏈接容易致使網絡擁塞,進而引發其餘 Region 出現 Raft 選舉超時等問題。
  2. 構建待發送 Snapshot 消息須要消耗更多的內存。
  3. 過大的消息可能致使超出 gRPC 的 Message Size 限制等問題。

基於上面的緣由,TiKV 對 Snapshot 的發送和接收進行了特殊處理,爲每一個 Snapshot 建立單獨的網絡鏈接,並將 Snapshot 拆分紅 1M 大小的多個 Chunk 進行傳輸。網絡

源碼解讀

下面咱們分別從 RPC 協議、發送 Snapshot、收取 Snapshot 三個方面來解讀相關源代碼。本文的全部內容都基於 v3.0.0-rc.2 版本。app

Snapshot RPC call 的定義

與普通的 raft message 相似,Snapshot 消息也是使用 gRPC 遠程調用的方式來傳輸的。在 pingcap/kvproto 項目中能夠找到相關 RPC Call 的定義,具體在 tikvpb.protoraft_serverpb.proto 文件中。異步

rpc Snapshot(stream raft_serverpb.SnapshotChunk) returns (raft_serverpb.Done) {}
...
message SnapshotChunk {
  RaftMessage message = 1;
  bytes data = 2;
}

message Done {}

能夠看出,Snapshot 被定義成 client streaming 調用,即對於每一個 Call,客戶端依次向服務器發送多個相同類型的請求,服務器接收並處理完全部請求後,向客戶端返回處理結果。具體在這裏,每一個請求的類型是 SnapshotChunk,其中包含了 Snapshot 對應的 RaftMessage,或者攜帶一段 Snapshot 數據;回覆消息是一個簡單的空消息 Done,由於咱們在這裏實際不須要返回任何信息給客戶端,只須要關閉對應的 stream。函數

Snapshot 的發送流程

Snapshot 的發送過程的處理比較簡單粗暴,直接在將要發送 RaftMessage 的地方截獲 Snapshot 類型的消息,轉而經過特殊的方式進行發送。相關代碼能夠在 server/transport.rs 中找到:學習

fn write_data(&self, store_id: u64, addr: &str, msg: RaftMessage) {
  if msg.get_message().has_snapshot() {
      return self.send_snapshot_sock(addr, msg);
  }
  if let Err(e) = self.raft_client.wl().send(store_id, addr, msg) {
      error!("send raft msg err"; "err" => ?e);
  }
}

fn send_snapshot_sock(&self, addr: &str, msg: RaftMessage) {
  ...
  if let Err(e) = self.snap_scheduler.schedule(SnapTask::Send {
      addr: addr.to_owned(),
      msg,
      cb,
  }) {
      ...
  }
}

從代碼中能夠看出,這裏簡單地把對應的 RaftMessage 包裝成一個 SnapTask::Send 任務,並將其交給獨立的 snap-worker 去處理。值得注意的是,這裏的 RaftMessage 只包含 Snapshot 的元信息,而不包括真正的快照數據。TiKV 中有一個單獨的模塊叫作 SnapManager ,用來專門處理數據快照的生成與轉存,稍後咱們將會看到從 SnapManager 模塊讀取 Snapshot 數據塊並進行發送的相關代碼。ui

咱們不妨順藤摸瓜來看看 snap-worker 是如何處理這個任務的,相關代碼在 server/snap.rs,精簡掉非核心邏輯後的代碼引用以下:

fn run(&mut self, task: Task) {
  match task {
      Task::Recv { stream, sink } => {
           ...
           let f = recv_snap(stream, sink, ...).then(move |result| {
               ...
           });
           self.pool.spawn(f).forget();
      }
      Task::Send { addr, msg, cb } => {
          ...
          let f = future::result(send_snap(..., &addr, msg))
              .flatten()
              .then(move |res| {
                  ...
              });
          self.pool.spawn(f).forget();
      }
  }
}

snap-worker 使用了 future 來完成收發 Snapshot 任務:經過調用 send_snap()recv_snap() 生成一個 future 對象,並將其交給 FuturePool 異步執行。

如今咱們暫且只關注 send_snap()實現

fn send_snap(
  ...
  addr: &str,
  msg: RaftMessage,
) -> Result<impl Future<Item = SendStat, Error = Error>> {
  ...
  let key = {
      let snap = msg.get_message().get_snapshot();
      SnapKey::from_snap(snap)?
  };
  ...
  let s = box_try!(mgr.get_snapshot_for_sending(&key));
  if !s.exists() {
      return Err(box_err!("missing snap file: {:?}", s.path()));
  }
  let total_size = s.total_size()?;
  let chunks = {
      let mut first_chunk = SnapshotChunk::new();
      first_chunk.set_message(msg);

      SnapChunk {
          first: Some(first_chunk),
          snap: s,
          remain_bytes: total_size as usize,
      }
  };

  let cb = ChannelBuilder::new(env);
  let channel = security_mgr.connect(cb, addr);
  let client = TikvClient::new(channel);
  let (sink, receiver) = client.snapshot()?;

  let send = chunks.forward(sink).map_err(Error::from);
  let send = send
      .and_then(|(s, _)| receiver.map_err(Error::from).map(|_| s))
      .then(move |result| {
          ...
      });
  Ok(send)
}

這一段流程仍是比較清晰的:先是用 Snapshot 元信息從 SnapManager 取到待發送的快照數據,而後將 RaftMessageSnap 一塊兒封裝進 SnapChunk 結構,最後建立全新的 gRPC 鏈接及一個 Snapshot stream 並將 SnapChunk 寫入。這裏引入 SnapChunk 是爲了不將整塊 Snapshot 快照一次性加載進內存,它 impl 了 futures::Stream 這個 trait 來達成按需加載流式發送的效果。若是感興趣能夠參考它的 具體實現,本文就暫不展開了。

Snapshot 的收取流程

最後咱們來簡單看一下 Snapshot 的收取流程,其實也就是 gRPC Call 的 server 端對應的處理,整個流程的入口咱們能夠在 server/service/kv.rs 中找到:

fn snapshot(
  &mut self,
  ctx: RpcContext<'_>,
  stream: RequestStream<SnapshotChunk>,
  sink: ClientStreamingSink<Done>,
) {
  let task = SnapTask::Recv { stream, sink };
  if let Err(e) = self.snap_scheduler.schedule(task) {
      ...
  }
}

與發送過程相似,也是直接構建 SnapTask::Recv 任務並轉發給 snap-worker 了,這裏會調用上面出現過的 recv_snap() 函數,具體實現 以下:

fn recv_snap<R: RaftStoreRouter + 'static>(
  stream: RequestStream<SnapshotChunk>,
  sink: ClientStreamingSink<Done>,
  ...
) -> impl Future<Item = (), Error = Error> {
  ...
  let f = stream.into_future().map_err(|(e, _)| e).and_then(
      move |(head, chunks)| -> Box<dyn Future<Item = (), Error = Error> + Send> {
          let context = match RecvSnapContext::new(head, &snap_mgr) {
              Ok(context) => context,
              Err(e) => return Box::new(future::err(e)),
          };

          ...
          let recv_chunks = chunks.fold(context, |mut context, mut chunk| -> Result<_> {
              let data = chunk.take_data();
              ...
              if let Err(e) = context.file.as_mut().unwrap().write_all(&data) {
                  ...
              }
              Ok(context)
          });

          Box::new(
              recv_chunks
                  .and_then(move |context| context.finish(raft_router))
                  .then(move |r| {
                      snap_mgr.deregister(&context_key, &SnapEntry::Receiving);
                      r
                  }),
          )
      },
  );
  f.then(move |res| match res {
      ...
  })
  .map_err(Error::from)
}

值得留意的是 stream 中的第一個消息(其中包含有 RaftMessage)被用來建立 RecvSnapContext 對象,其後的每一個 chunk 收取後都依次寫入文件,最後調用 context.finish() 把以前保存的 RaftMessage 發送給 raftstore 完成整個接收過程。

總結

以上就是 TiKV 發送和接收 Snapshot 相關的代碼解析了。這是 TiKV 代碼庫中較小的一個模塊,它很好地解決了因爲 Snapshot 消息特殊性所帶來的一系列問題,充分應用了 grpc-rs 組件及 futures/FuturePool 模型,你們能夠結合本系列文章的 第七篇第八篇 進一步拓展學習。

原文閱讀https://www.pingcap.com/blog-cn/tikv-source-code-reading-10/

相關文章
相關標籤/搜索