gRPC官方文檔(異步基礎: C++)

文章來自gRPC 官方文檔中文版html

異步基礎: C++

本教程介紹如何使用 C++ 的 gRPC 異步/非阻塞 API 去實現簡單的服務器和客戶端。假設你已經熟悉實現同步 gRPC 代碼,如gRPC 基礎: C++所描述的。本教程中的例子基原本自咱們在overview中使用的Greeter 例子。你能夠在 grpc/examples/cpp/helloworld找到安裝指南。git

概覽

gRPC 的異步操做使用CompletionQueue。 基本工做流以下:github

  • 在 RPC 調用上綁定一個 CompletionQueue
  • 作一些事情如讀取或者寫入,以惟一的 voide* 標籤展現
  • 調用 CompletionQueue::Next 去等待操做結束。若是標籤出現,表示對應的操做已經完成。

異步客戶端

要使用一個異步的客戶端調用遠程方法,你首先得建立一個頻道和存根,如你在同步客戶端中所做的那樣。一旦有了存根,你就能夠經過下面的方式來作異步調用:服務器

  • 初始化 RPC 併爲之建立句柄。將 RPC 綁定到一個 CompletionQueue併發

    CompletionQueue cq;
      std::unique_ptr<ClientAsyncResponseReader<HelloReply> > rpc(
          stub_->AsyncSayHello(&context, request, &cq));
  • 用一個惟一的標籤,尋求回答和最終的狀態異步

    Status status;
      rpc->Finish(&reply, &status, (void*)1);
  • 等待完成隊列返回下一個標籤。當標籤被傳入對應的 Finish() 調用時,回答和狀態就能夠被返回了。async

    void* got_tag;
      bool ok = false;
      cq.Next(&got_tag, &ok);
      if (ok && got_tag == (void*)1) {
        // check reply and status
      }

你能夠在這裏greeter_async_client.cc看到完整的客戶端例子。ide

異步服務器

服務器實現請求一個帶有標籤的 RPC 調用,而後等待完成隊列返回標籤。異步處理 RPC 的基本工做流以下:ui

  • 構建一個服務器導出異步服務this

    helloworld::Greeter::AsyncService service;
      ServerBuilder builder;
      builder.AddListeningPort("0.0.0.0:50051", InsecureServerCredentials());
      builder.RegisterAsyncService(&service);
      auto cq = builder.AddCompletionQueue();
      auto server = builder.BuildAndStart();
  • 請求一個 RPC 提供惟一的標籤

    ServerContext context;
      HelloRequest request;
      ServerAsyncResponseWriter<HelloReply> responder;
      service.RequestSayHello(&context, &request, &responder, &cq, &cq, (void*)1);
  • 等待完成隊列返回標籤。當取到標籤時,上下文,請求和應答器都已經準備就緒。

    HelloReply reply;
      Status status;
      void* got_tag;
      bool ok = false;
      cq.Next(&got_tag, &ok);
      if (ok && got_tag == (void*)1) {
        // set reply and status
        responder.Finish(reply, status, (void*)2);
      }
  • 等待完成隊列返回標籤。標籤返回時 RPC 結束。

    void* got_tag;
      bool ok = false;
      cq.Next(&got_tag, &ok);
      if (ok && got_tag == (void*)2) {
        // clean up
      }

然而,這個基本的工做流沒有考慮服務器併發處理多個請求。要解決這個問題,咱們的完成異步服務器例子使用了 CallData 對象去維護每一個 RPC 的狀態,而且使用這個對象的地址做爲調用的惟一標籤。

class CallData {
   public:
    // Take in the "service" instance (in this case representing an asynchronous
    // server) and the completion queue "cq" used for asynchronous communication
    // with the gRPC runtime.
    CallData(Greeter::AsyncService* service, ServerCompletionQueue* cq)
        : service_(service), cq_(cq), responder_(&ctx_), status_(CREATE) {
      // Invoke the serving logic right away.
      Proceed();
    }

    void Proceed() {
      if (status_ == CREATE) {
        // As part of the initial CREATE state, we *request* that the system
        // start processing SayHello requests. In this request, "this" acts are
        // the tag uniquely identifying the request (so that different CallData
        // instances can serve different requests concurrently), in this case
        // the memory address of this CallData instance.
        service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_,
                                  this);
        // Make this instance progress to the PROCESS state.
        status_ = PROCESS;
      } else if (status_ == PROCESS) {
        // Spawn a new CallData instance to serve new clients while we process
        // the one for this CallData. The instance will deallocate itself as
        // part of its FINISH state.
        new CallData(service_, cq_);

        // The actual processing.
        std::string prefix("Hello ");
        reply_.set_message(prefix + request_.name());

        // And we are done! Let the gRPC runtime know we've finished, using the
        // memory address of this instance as the uniquely identifying tag for
        // the event.
        responder_.Finish(reply_, Status::OK, this);
        status_ = FINISH;
      } else {
        GPR_ASSERT(status_ == FINISH);
        // Once in the FINISH state, deallocate ourselves (CallData).
        delete this;
      }
    }

簡單起見,服務器對於全部的事件只使用了一個完成隊列,而且在 HandleRpcs 中運行了一個主循環去查詢隊列:

void HandleRpcs() {
    // Spawn a new CallData instance to serve new clients.
    new CallData(&service_, cq_.get());
    void* tag;  // uniquely identifies a request.
    bool ok;
    while (true) {
      // Block waiting to read the next event from the completion queue. The
      // event is uniquely identified by its tag, which in this case is the
      // memory address of a CallData instance.
      cq_->Next(&tag, &ok);
      GPR_ASSERT(ok);
      static_cast<CallData*>(tag)->Proceed();
    }
  }

你能夠在greeter_async_server.cc看到完整的服務器例子。

相關文章
相關標籤/搜索