TiKV 源碼解析系列文章(八)grpc-rs 的封裝與實現

做者: 李建俊git

上一篇《gRPC Server 的初始化和啓動流程》爲你們介紹了 gRPC Server 的初始化和啓動流程,本篇將帶你們深刻到 grpc-rs 這個庫裏,查看 RPC 請求是如何被封裝和派發的,以及它是怎麼和 Rust Future 進行結合的。github

gRPC C Core

gRPC 包括了一系列複雜的協議和流控機制,若是要爲每一個語言都實現一遍這些機制和協議,將會是一個很繁重的工做。所以 gRPC 提供了一個統一的庫來提供基本的實現,其餘語言再基於這個實現進行封裝和適配,提供更符合相應語言習慣或生態的接口。這個庫就是 gRPC C Core,grpc-rs 就是基於 gRPC C Core 進行封裝的。promise

要說明 grpc-rs 的實現,須要先介紹 gRPC C Core 的運行方式。gRPC C Core 有三個很關鍵的概念 grpc_channelgrpc_completion_queuegrpc_callgrpc_channel 在 RPC 裏就是底層的鏈接,grpc_completion_queue 就是一個處理完成事件的隊列。grpc_call 表明的是一個 RPC。要進行一次 RPC,首先從 grpc_channel 建立一個 grpc_call,而後再給這個 grpc_call 發送請求,收取響應。而這個過程都是異步,因此須要調用 grpc_completion_queue 的接口去驅動消息處理。整個過程能夠經過如下代碼來解釋(爲了讓代碼更可讀一些,如下代碼和實際可編譯運行的代碼有一些出入)。服務器

grpc_completion_queue* queue = grpc_completion_queue_create_for_next(NULL);
grpc_channel* ch = grpc_insecure_channel_create("example.com", NULL);
grpc_call* call = grpc_channel_create_call(ch, NULL, 0, queue, "say_hello");
grpc_op ops[6];
memset(ops, 0, sizeof(ops));
char* buffer = (char*) malloc(100);
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
ops[1].op = GRPC_OP_SEND_MESSAGE;
ops[1].data.send_message.send_message = "gRPC";
ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
ops[3].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[4].op = GRPC_OP_RECV_MESSAGE;
ops[4].data.recv_message.recv_message = buffer;
ops[5].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
void* tag = malloc(1);
grpc_call_start_batch(call, ops, 6, tag);
grpc_event ev = grpc_completion_queue_next(queue);
ASSERT_EQ(ev.tag, tag);
ASSERT(strcmp(buffer, "Hello gRPC"));

能夠看到,對 grpc_call 的操做是經過一次 grpc_call_start_batch 來指定的。這個 start batch 會將指定的操做放在內存 buffer 當中,而後經過 grpc_completion_queue_next 來實際執行相關操做,如收發消息。這裏須要注意的是 tag 這個變量。當這些操做都完成之後,grpc_completion_queue_next 會返回一個包含 tag 的消息來通知這個操做完成了。因此在代碼的末尾就能夠在先前指定的 buffer 讀出預期的字符串。app

因爲篇幅有限,對於 gRPC C Core 的解析就再也不深刻了,對這部分很感興趣的朋友也能夠在 github.com/grpc/grpc 閱讀相關文檔和源碼。異步

封裝與實現細節

經過上文的分析能夠明顯看到,gRPC C Core 的通知機制其實和 Rust Future 的通知機制很是相似。Rust Future 提供一個 poll 方法來檢驗當前 Future 是否已經 ready。若是還沒有 ready,poll 方法會註冊一個通知鉤子 task。等到 ready 時,task 會被調用,從而觸發對這個 Future 的再次 poll,獲取結果。task 其實和上文中的 tag 正好對應起來了,而在 grpc-rs 中,tag 就是一個儲存了 task 的 enum。oop

pub enum CallTag {
   Batch(BatchPromise),
   Request(RequestCallback),
   UnaryRequest(UnaryRequestCallback),
   Abort(Abort),
   Shutdown(ShutdownPromise),
   Spawn(SpawnNotify),
}

tag 之因此是一個 enum 是由於不一樣的 call 會對應不一樣的行爲,如對於服務器端接受請求的處理和客戶端發起請求的處理就不太同樣。線程

grpc-rs 在初始化時會建立多個線程來不斷調用 grpc_completion_queue_next 來獲取已經完成的 tag,而後根據 tag 的類型,將數據存放在結構體中並通知 task 來獲取。下面是這個流程的代碼。code

// event loop
fn poll_queue(cq: Arc<CompletionQueueHandle>) {
   let id = thread::current().id();
   let cq = CompletionQueue::new(cq, id);
   loop {
       let e = cq.next();
       match e.event_type {
           EventType::QueueShutdown => break,
           // timeout should not happen in theory.
           EventType::QueueTimeout => continue,
           EventType::OpComplete => {}
       }

       let tag: Box<CallTag> = unsafe { Box::from_raw(e.tag as _) };

       tag.resolve(&cq, e.success != 0);
   }
}

能夠看到,tag 會被強轉成爲一個 CallTag,而後調用 resolve 方法來處理結果。不一樣的 enum 類型會有不一樣的 resolve 方式,這裏挑選其中 CallTag::BatchCallTag::Request 來進行解釋,其餘的 CallTag 流程相似。server

BatchPromise 是用來處理上文提到的 grpc_call_start_batch 返回結果的 tagRequestCallback 則用來接受新的 RPC 請求。下面是 BatchPromise 的定義及其 resolve 方法。

/// A promise used to resolve batch jobs.
pub struct BatchPromise {
   ty: BatchType,
   ctx: BatchContext,
   inner: Arc<Inner<Option<MessageReader>>>,
}

impl BatchPromise {
   fn handle_unary_response(&mut self) {
       let task = {
           let mut guard = self.inner.lock();
           let status = self.ctx.rpc_status();
           if status.status == RpcStatusCode::Ok {
               guard.set_result(Ok(self.ctx.recv_message()))
           } else {
               guard.set_result(Err(Error::RpcFailure(status)))
           }
       };
       task.map(|t| t.notify());
   }

   pub fn resolve(mut self, success: bool) {
       match self.ty {
           BatchType::CheckRead => {
               assert!(success);
               self.handle_unary_response();
           }
           BatchType::Finish => {
               self.finish_response(success);
           }
           BatchType::Read => {
               self.read_one_msg(success);
           }
       }
   }
}

上面代碼中的 ctx 是用來儲存響應的字段,包括響應頭、數據之類的。當 next 返回時,gRPC C Core 會將對應內容填充到這個結構體裏。inner 儲存的是 task 和收到的消息。當 resolve 被調用時,先判斷這個 tag 要執行的是什麼任務。BatchType::CheckRead 表示是一問一答式的讀取任務,Batch::Finish 表示的是沒有返回數據的任務,BatchType::Read 表示的是流式響應裏讀取單個消息的任務。拿 CheckRead 舉例,它會將拉取到的數據存放在 inner 裏,並通知 task。而 task 對應的 Future 再被 poll 時就能夠拿到對應的數據了。這個 Future 的定義以下:

/// A future object for task that is scheduled to `CompletionQueue`.
pub struct CqFuture<T> {
    inner: Arc<Inner<T>>,
}

impl<T> Future for CqFuture<T> {
    type Item = T;
    type Error = Error;

    fn poll(&mut self) -> Poll<T, Error> {
        let mut guard = self.inner.lock();
        if guard.stale {
            panic!("Resolved future is not supposed to be polled again.");
        }

        if let Some(res) = guard.result.take() {
            guard.stale = true;
            return Ok(Async::Ready(res?));
        }

        // So the task has not been finished yet, add notification hook.
        if guard.task.is_none() || !guard.task.as_ref().unwrap().will_notify_current() {
            guard.task = Some(task::current());
        }

        Ok(Async::NotReady)
    }
}

Inner 是一個 SpinLock。若是在 poll 時還沒拿到結果時,會將 task 存放在鎖裏,在有結果的時候,存放結果並經過 task 通知再次 poll。若是有結果則直接返回結果。

下面是 RequestCallback 的定義和 resolve 方法。

pub struct RequestCallback {
   ctx: RequestContext,
}

impl RequestCallback {
   pub fn resolve(mut self, cq: &CompletionQueue, success: bool) {
       let mut rc = self.ctx.take_request_call_context().unwrap();
       if !success {
           server::request_call(rc, cq);
           return;
       }

       match self.ctx.handle_stream_req(cq, &mut rc) {
           Ok(_) => server::request_call(rc, cq),
           Err(ctx) => ctx.handle_unary_req(rc, cq),
       }
   }
}

上面代碼中的 ctx 是用來儲存請求的字段,主要包括請求頭。和 BatchPromise 相似,ctx 的內容也是在調用 next 方法時被填充。在 resolve 時,若是失敗,則再次調用 request_call 來接受下一個 RPC,不然會調用對應的 RPC 方法。

handle_stream_req 的定義以下:

pub fn handle_stream_req(
   self,
   cq: &CompletionQueue,
   rc: &mut RequestCallContext,
) -> result::Result<(), Self> {
   let handler = unsafe { rc.get_handler(self.method()) };
   match handler {
       Some(handler) => match handler.method_type() {
           MethodType::Unary | MethodType::ServerStreaming => Err(self),
           _ => {
               execute(self, cq, None, handler);
               Ok(())
           }
       },
       None => {
           execute_unimplemented(self, cq.clone());
           Ok(())
       }
   }
}

從上面能夠看到,整個過程先經過 get_handler,根據 RPC 想要執行的方法名字拿到方法並調用,若是方法不存在,則向客戶端報錯。能夠看到這裏對於 UnaryServerStreaming 返回了錯誤。這是由於這兩種請求都是客戶端只發一次請求,因此返回錯誤讓 resolve 繼續拉取消息體而後再執行對應的方法。

爲何 get_handler 能夠知道調用的是什麼方法呢?這是由於 gRPC 編譯器在生成代碼裏對這些方法進行了映射,具體的細節在生成的 create_xxx_service 裏,本文就再也不展開了。

小結

最後簡要總結一下 grpc-rs 的封裝和實現過程。當 grpc-rs 初始化時,會建立數個線程輪詢消息隊列(grpc_completion_queue)並 resolve。當 server 被建立時,RPC 會被註冊起來,server 啓動時,grpc-rs 會建立數個 RequestCall 來接受請求。當有 RPC 請求發到服務器端時,CallTag::Request 就會被返回並 resolve,並在 resolve 中調用對應的 RPC 方法。而 client 在調用 RPC 時,其實都是建立了一個 Call,併產生相應的 BatchPromise 來異步通知 RPC 方法是否已經完成。

還有不少 grpc-rs 的源碼在咱們的文章中暫未涉及,其中還有很多有趣的技巧,好比,如何減小喚醒線程的次數而減小切換、如何無鎖地註冊調用各個 service 鉤子等。歡迎有好奇心的小夥伴自行閱讀源碼,也歡迎你們提 issue 或 PR 一塊兒來完善這個項目。

原文閱讀https://www.pingcap.com/blog-cn/tikv-source-code-reading-8/

相關文章
相關標籤/搜索