gRPC-rs:從 C 到 Rust

介紹

上篇文章中,咱們講到 TiKV 爲了支持 [gRPC],咱們造了個輪子 [gRPC-rs],這篇文章簡要地介紹一下這個庫。首先咱們來聊聊什麼是 gRPC。gRPC 是 Google 推出的基於 [HTTP2] 的開源 RPC 框架,但願經過它使得各類微服務之間擁有統一的 RPC 基礎設施。它不只支持常規的平臺如 Linux,Windows,還支持移動設備和 IoT,現有十幾種語言的實現,如今又多了一種語言 Rust。html

gRPC 之因此有如此多的語言支持,是由於它有一個 C 寫的核心庫(gRPC core),所以只要某個語言兼容 C ABI,那麼就能夠經過封裝,寫一個該語言的 gRPC 庫。Rust 對 C 有良好的支持,gRPC-rs 就是對 gRPC core ABI 的 Rust 封裝。git

Core 能異步處理 RPC 請求,在考慮到 Rust 中已有較爲成熟的異步框架 [Futures],咱們決定將 API 設計成 Future 模式。github

gRPC-rs 架構圖編程

咱們將根據架構圖從底向上地講一下,在上一篇文章中已經討論過傳輸層和協議,在這就再也不贅述。後端

gRPC Core

Core 中有幾個比較重要的對象:數組

  • Call 以及 4 種類型 RPC: Call 表明了一次 RPC,能夠派生出四種類型 RPC,安全

    • Unary: 這是最簡單的一種 RPC 模式,即一問一答,客戶端發送一個請求,服務端返回一個回覆,該輪 RPC 結束。服務器

    • Client streaming: 這類的 RPC 會建立一個客戶端到服務端的流,客戶端能夠經過這個流,向服務端發送多個請求,而服務端只會返回一個回覆。架構

    • Server streaming: 與上面的相似,不過它會建立一個服務端到客戶端的流,服務端能夠發送多個回覆,併發

    • Bidirectional streaming: 若是說上面兩類是單工,那麼這類就是雙工了,客戶端和服務端能夠同時向對方發送消息。

    值得一提的是因爲 gRPC 基於 HTTP2,它利用了 HTTP2 多路複用特性,使得一個 TCP 鏈接上能夠同時進行多個 RPC,一次 RPC 即爲 HTTP2 中的一個 Stream。

  • Channel: 它是對底層連接的抽象,具體來講一個 Channel 就是一條連着遠程服務器的 TCP 連接。

  • Server: 顧名思義,它就是 gRPC 服務端封裝,能夠在上面註冊咱們的服務。

  • Completion queue: 它是 gRPC 完成事件隊列,事件能夠是收到新的回覆,能夠是新來的請求。

簡要介紹一下 Core 庫的實現,Core 中有一個 [Combiner] 的概念,Combiner 中一個函數指針或稱組合子(Combinator)隊列。每一個組合子都有特定的功能,經過不一樣的組合能夠實現不一樣的功能。下面的僞碼大概說明了 Combiner 的工做方式。

class combiner {
  mpscq q; // multi-producer single-consumer queue can be made non-blocking
  state s; // is it empty or executing

  run(f) {
    if (q.push(f)) {
      // q.push returns true if it's the first thing
      while (q.pop(&f)) { // modulo some extra work to avoid races
        f();
      }
    }
  }
}

Combiner 裏面有一個 mpsc 的無鎖隊列 q,因爲 q 只能有一個消費者,這就要求在同一時刻只能有一個線程去調用隊列裏面的各個函數。調用的入口是 run() 方法,在 run() 中各個函數會被序列地執行。當取完 q 時,該輪調用結束。假設一次 RPC 由六個函數組成,這樣的設計使這組函數(RPC)能夠在不一樣的線程上運行,這是異步化 RPC 的基礎。

Completion queue(如下簡稱 CQ)就是一個 Combiner,它暴露出了一個 next()藉口,至關於 Combiner 的 run()。因爲接口的簡單,Core 內部不用開啓額外線程,只要經過外部不斷調用 next() 就能驅動整個 Core。

全部的 HTTP2 處理,Client 的 RPC 請求和 Server 的 RPC 鏈接全是經過一個個組合子的不一樣組合而構成的。下面是一次 Unary 的代碼。它由6個組合子組成,這些組合子做爲一個 batch 再加上 Call 用於記錄狀態,二者構成了此次的 RPC。

grpc_call_error grpcwarp_call_start_unary(
    grpc_call *call, grpcsharp_batch_context *tag) {

  grpc_op ops[6];
  ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
  ...
  ops[1].op = GRPC_OP_SEND_MESSAGE;
  ...
  ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
  ...
  ops[3].op = GRPC_OP_RECV_INITIAL_METADATA;
  ...
  ops[4].op = GRPC_OP_RECV_MESSAGE;
  ...
  ops[5].op = GRPC_OP_RECV_STATUS_ON_CLIENT;

  return grpcwrap_call_start_batch(call, ops, tag);
}

用 Rust 封裝 Core

<!-- 適當擴展該節? -->
介紹完 Core,如今說一下如何用 Rust 封裝它。這一層封裝並不會產生額外的開銷,不像有的語言在調用 C 時會有類型的轉換或者 runtime 會有較大開銷,在 Rust 中開銷微乎其微,這得益於 Rust 用 llvm 作編譯器後端,它對 C 有良好的支持,Rust 調用 C ABI 就像調用一個普通的函數,能夠作到 Zero-cost。

同時用 Rust 封裝 C ABI 是一件很簡單的事情,簡單到像黑魔法。好比封裝 CQ next():

C:

grpc_event grpc_completion_queue_next(grpc_completion_queue *cq,
                                      gpr_timespec deadline,
                                      void *reserved);

Rust:

extern "C" {
  pub fn grpc_completion_queue_next(cq: *mut GrpcCompletionQueue,
                                    deadline: GprTimespec,
                                    reserved: *mut c_void)
                                    -> GrpcEvent;
}

接着咱們看看如何封裝 C 的類型。繼續以 next() 爲例子:

C:

// CQ 指針
grpc_completion_queue *cq;

// grpc_event 結構體
struct grpc_event {
  grpc_completion_type type;
  int success;
  void *tag;
};

Rust:

pub enum GrpcCompletionQueue {}

#[repr(C)]
pub struct GrpcEvent {
    pub event_type: GrpcCompletionType,
    pub success: c_int,
    pub tag: *mut c_void,
}

CQ 在 Core 的 ABI 中傳遞的形式是指針,Rust Wraper 無須知道 CQ 具體的內部結構。對於這種狀況,Rust 推薦用無成員的枚舉體表示,具體好處有兩個,第一,因爲沒有成員,咱們沒法在 Rust 中構建該枚舉體的實例,第二,Type safe,當傳遞了一個錯誤類型的指針時編譯器會報錯。

#[repr(C)] 也是 Rust 的黑魔法之一。加上了這個標籤的結構體,在內存中的佈局和對齊就和 C 同樣了,這樣的結構體能夠安全地傳遞給 C ABI。

Futures in gRPC-rs

通過上一節的封裝,咱們已經獲得了一個可用可是很是裸的 Rust gRPC 庫了,[grpc-sys]。在實踐中,咱們不推薦直接用 [grpc-sys],直接用它就像在 Rust 中寫 C 同樣,事倍功半,Rust 語言的諸多特性沒法獲得施展,例如泛型,Trait,Ownership 等,也沒法融入 Rust 社區。

上面說過 Core 能異步處理 RPC,那麼如何用 Rust 來作更好的封裝呢? [Futures]!它是一個成熟的異步編程庫,同時有一個活躍的社區。 Futures 很是適用於 RPC 等一些 IO 操做頻繁的場景。Futures 中也有組合子概念,和 Core 中的相似,可是使用上更加方便,也更加好理解。舉一個栗子:

use futures::{future, Future};

fn double(i: i64) -> i64 { i * 2 }

let ans = future::ok(1)
              .map(double)
              .and_then(|i| Ok(40 + i));

println!("{:?}", ans.wait().unwrap());

你以爲輸出的答案是多少呢?沒錯就是 42。在 Core 那節說過不一樣的組合子組織在一塊兒能夠幹不一樣的事,在 Future 中咱們能夠這麼理解,一件事能夠分紅多個步驟,每一個步驟由一個組合子完成。好比上例,map 完成了翻倍的動做,and_then 將輸入加上 40。 如今來看看 gRPC-rs 封裝的 API。

// helloworld.proto
service Greeter {
    // An unary RPC, sends a greeting
    rpc SayHello (HelloRequest) returns (HelloReply) {}
}

impl GreeterClient {
    pub fn say_hello_async(&self, req: HelloRequest) -> ClientUnaryReceiver<HelloReply> {
        self.client.unary_call_async(&METHOD_GREETER_SAY_HELLO, req, CallOption::default())
    }
    ...
}

以 [helloworld.proto] 爲例,GreeterClient::say_hello_async() 向遠程 Server 發送一個請求 (HelloRequest),Server 返回給一個結果 (HelloReply)。因爲是異步操做,這個函數會當即返回,返回的 ClientUnaryReceiver 實現了 Future,當它完成時就會獲得 HelloReply。在通常的異步編程中都會有 Callback,用於處理異步的返回值,在這個 RPC 中就是 HelloReply,在 Future 中能夠用組合子來寫,好比 and_then,再舉一個栗子,現有一次完整的 RPC 邏輯,拿到回覆後打印到日誌。下面就是 gRPC-rs 的具體用法。

// 同步
let resp = client.say_hello(req);
println!("{:?}", resp);

// 異步
let f = client.say_hello_async(req)
              .and_then(|resp| {
                  println!("{:?}", resp);
                  Ok(())
              });
executer.spawn(f); // 相似 Combiner,
                   // 用於異步執行 Future,
                   // 經常使用的有 tokio-core。

Unary RPC

gRPC-rs 根據 service 在 proto 文件中的定義生成對應的代碼,包括 RPC 方法的定義(Method)、客戶端和服務端代碼,生成的代碼中會使用 gRPC-rs 的 API。那麼具體是怎麼作的呢?這節仍是以 helloworld.proto 爲例,來說講客戶端 Unary RPC 具體的實現。首先,SayHelloMethod 記錄了 RPC 類型,全稱以及序列化反序列化函數。爲何要序列化反序列化函數呢?由於 Core 自己不涉及消息的序列化,這一部分交由封裝層解決。在生成的客戶端中能夠會調用 gRPC-rs 的 API,根據 Method 的定義發起 RPC。

// 生成的代碼
const METHOD_GREETER_SAY_HELLO: Method<HelloRequest, HelloReply> = Method {
    ty: MethodType::Unary,
    name: "/helloworld.Greeter/SayHello",
    req_mar: Marshaller { ser: pb_ser, de: pb_de },
    resp_mar: Marshaller { ser: pb_ser, de: pb_de },
};

impl GreeterClient {
    // An unary RPC, sends a greeting
    pub fn say_hello_async(&self, req: HelloRequest)
                           -> ClientUnaryReceiver<HelloReply> {
        self.client.unary_call_async(&METHOD_GREETER_SAY_HELLO, req)
    }
    ...
}

// gRPC-rs 的 API。該函數當即返回,不會等待 RPC 完成。省略部分代碼。
pub fn unary_async<P, Q>(channel: &Channel,
                         method: &Method<P, Q>,
                         req: P)
                         -> ClientUnaryReceiver<Q> {
    let mut payload = vec![];
    (method.req_ser())(&req, &mut payload);            // 序列化消息
    let call = channel.create_call(method, &opt);      // 新建 Call
    let cq_f = unsafe {
        grpc_sys::grpcwrap_call_start_unary(call.call, // 發起 RPC
                                            payload,
                                            tag)
    };
    ClientUnaryReceiver::new(call, cq_f, method.resp_de()) // 收到回覆後再反序列化
}

寫在最後

這篇簡單介紹了 gRPC Core 的實現和 gRPC-rs 的封裝,詳細的用法,在這就不作過多介紹了,你們若是感興趣能夠查看 [examples]。 gRPC-rs 深刻使用了 Future,裏面有不少神奇的用法,好比 Futures in gRPC-rs 那節最後的 executer, gRPC-rs 利用 CQ 實現了一個能併發執行 Future 的 executer(相似 furtures-rs 中的 [Executer]),大幅減小 context switch,性能獲得了顯著提高。若是你對 gRPC 和 rust 都很感興趣,歡迎參與開發,目前還有一些工做沒完成,詳情請點擊 https://github.com/pingcap/grpc-rs

參考資料:

gRPC open-source universal RPC framework

The rust language implementation of gRPC

[Hypertext Transfer Protocol Version 2 (HTTP/2)
][HTTP2]

Zero-cost Futures in Rust

深刻了解 gRPC:協議

gRPC, Combiner Explanation

Rust, Representing opaque structs

Rust repr(), alternative representations

gRPC - A solution for RPCs by Google

Tokio, A platform for writing fast networking code with Rust.

做者:沈泰寧

相關文章
相關標籤/搜索