文章來自gRPC 官方文檔中文版html
本教程介紹如何使用 C++ 的 gRPC 異步/非阻塞 API 去實現簡單的服務器和客戶端。假設你已經熟悉實現同步 gRPC 代碼,如gRPC 基礎: C++所描述的。本教程中的例子基原本自咱們在overview中使用的Greeter 例子。你能夠在 grpc/examples/cpp/helloworld找到安裝指南。git
gRPC 的異步操做使用CompletionQueue
。 基本工做流以下:github
CompletionQueue
voide*
標籤展現CompletionQueue::Next
去等待操做結束。若是標籤出現,表示對應的操做已經完成。要使用一個異步的客戶端調用遠程方法,你首先得建立一個頻道和存根,如你在同步客戶端中所做的那樣。一旦有了存根,你就能夠經過下面的方式來作異步調用:服務器
初始化 RPC 併爲之建立句柄。將 RPC 綁定到一個 CompletionQueue
。併發
CompletionQueue cq; std::unique_ptr<ClientAsyncResponseReader<HelloReply> > rpc( stub_->AsyncSayHello(&context, request, &cq));
用一個惟一的標籤,尋求回答和最終的狀態異步
Status status; rpc->Finish(&reply, &status, (void*)1);
等待完成隊列返回下一個標籤。當標籤被傳入對應的 Finish()
調用時,回答和狀態就能夠被返回了。async
void* got_tag; bool ok = false; cq.Next(&got_tag, &ok); if (ok && got_tag == (void*)1) { // check reply and status }
你能夠在這裏greeter_async_client.cc看到完整的客戶端例子。ide
服務器實現請求一個帶有標籤的 RPC 調用,而後等待完成隊列返回標籤。異步處理 RPC 的基本工做流以下:ui
構建一個服務器導出異步服務this
helloworld::Greeter::AsyncService service; ServerBuilder builder; builder.AddListeningPort("0.0.0.0:50051", InsecureServerCredentials()); builder.RegisterAsyncService(&service); auto cq = builder.AddCompletionQueue(); auto server = builder.BuildAndStart();
請求一個 RPC 提供惟一的標籤
ServerContext context; HelloRequest request; ServerAsyncResponseWriter<HelloReply> responder; service.RequestSayHello(&context, &request, &responder, &cq, &cq, (void*)1);
等待完成隊列返回標籤。當取到標籤時,上下文,請求和應答器都已經準備就緒。
HelloReply reply; Status status; void* got_tag; bool ok = false; cq.Next(&got_tag, &ok); if (ok && got_tag == (void*)1) { // set reply and status responder.Finish(reply, status, (void*)2); }
等待完成隊列返回標籤。標籤返回時 RPC 結束。
void* got_tag; bool ok = false; cq.Next(&got_tag, &ok); if (ok && got_tag == (void*)2) { // clean up }
然而,這個基本的工做流沒有考慮服務器併發處理多個請求。要解決這個問題,咱們的完成異步服務器例子使用了 CallData
對象去維護每一個 RPC 的狀態,而且使用這個對象的地址做爲調用的惟一標籤。
class CallData { public: // Take in the "service" instance (in this case representing an asynchronous // server) and the completion queue "cq" used for asynchronous communication // with the gRPC runtime. CallData(Greeter::AsyncService* service, ServerCompletionQueue* cq) : service_(service), cq_(cq), responder_(&ctx_), status_(CREATE) { // Invoke the serving logic right away. Proceed(); } void Proceed() { if (status_ == CREATE) { // As part of the initial CREATE state, we *request* that the system // start processing SayHello requests. In this request, "this" acts are // the tag uniquely identifying the request (so that different CallData // instances can serve different requests concurrently), in this case // the memory address of this CallData instance. service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_, this); // Make this instance progress to the PROCESS state. status_ = PROCESS; } else if (status_ == PROCESS) { // Spawn a new CallData instance to serve new clients while we process // the one for this CallData. The instance will deallocate itself as // part of its FINISH state. new CallData(service_, cq_); // The actual processing. std::string prefix("Hello "); reply_.set_message(prefix + request_.name()); // And we are done! Let the gRPC runtime know we've finished, using the // memory address of this instance as the uniquely identifying tag for // the event. responder_.Finish(reply_, Status::OK, this); status_ = FINISH; } else { GPR_ASSERT(status_ == FINISH); // Once in the FINISH state, deallocate ourselves (CallData). delete this; } }
簡單起見,服務器對於全部的事件只使用了一個完成隊列,而且在 HandleRpcs
中運行了一個主循環去查詢隊列:
void HandleRpcs() { // Spawn a new CallData instance to serve new clients. new CallData(&service_, cq_.get()); void* tag; // uniquely identifies a request. bool ok; while (true) { // Block waiting to read the next event from the completion queue. The // event is uniquely identified by its tag, which in this case is the // memory address of a CallData instance. cq_->Next(&tag, &ok); GPR_ASSERT(ok); static_cast<CallData*>(tag)->Proceed(); } }
你能夠在greeter_async_server.cc看到完整的服務器例子。