grpc使用記錄(三)簡單異步服務實例

grpc使用記錄(三)簡單異步服務實例

編寫異步服務和編寫同步服務的基本流程都差很少,稍有點區別。ios

同步服務你只須要實現相關服務接口的實現便可,不須要管理太多東西。異步服務GRPC運行時會把讀取到的客戶端請求放入CompletionQueue中,須要主動從中取出,而後進行相關的處理,能夠多線程也能夠單線程。c++

一、編寫proto文件,定義服務

這裏和grpc使用記錄(二)簡單同步服務實例中的同樣,這裏就很少說了。服務器

二、編譯proto文件,生成代碼

這裏也是和grpc使用記錄(二)簡單同步服務實例中的同樣的。多線程

三、編寫服務端代碼

這裏能夠複用前面同步服務的代碼,只須要作簡單的修改便可。異步

簡單說一下建立一個GRPC異步服務的要點:async

  • 一、建立服務對象的時候要建立AsyncService,而不是Service
  • 二、至少須要添加一個grpc::ServerCompletionQueue用於異步任務操做。
  • 三、必需要經過AsyncService::RequestXXXX來註冊XXXX接口的處理。
  • 四、一個客戶端請求的處理可簡單的分爲兩個步驟:一、構建返回給客戶端的響應數據;二、發送響應數據給客戶端。
  • 五、完成隊列和註冊請求處理均可以有多個,不必定非得是一個。

async_service.cpp

下面代碼簡單的建立了3個HandlerContext的結構體類型,用於保存三個接口請求處理過程當中的數據,實際的請求處理仍是和以前同步服務的同樣,這裏只是寫成了Test1Test2Test3三個函數的形式。函數

// > g++ -o aservice async_service.cpp  simple.grpc.pb.cc simple.pb.cc -std=c++11 -I. -lgrpc++ -lgrpc -lprotobuf -lgpr -lz -lcares -laddress_sorting -lpthread -Wno-deprecated

#include "simple.grpc.pb.h"
#include <grpcpp/grpcpp.h>

#include <memory>
#include <iostream>
#include <strstream>

struct HandlerContext {
  // 當前處理狀態(處理分爲兩步:1處理請求構建響應數據;2發送響應)
  // 這裏記錄一下完成到哪一步了,以便進行相關操做
  int                 status_; // (1構建響應完成;2發送完成)
  // rpc的上下文,容許經過它進行諸如壓縮、身份驗證,以及把元數據發回客戶端等。
  grpc::ServerContext ctx_;
};

struct HandlerTest1Context:public HandlerContext {
  // 用於接收客戶端發送的請求
  Simple::TestRequest req_;
  // 用於發送響應給客戶端
  Simple::TestNull    rep_;

  // 發送到客戶端的方法對象
  grpc::ServerAsyncResponseWriter<Simple::TestNull> responder_;
  // 構造函數
  HandlerTest1Context()
    :responder_(&ctx_)
  {}
};

struct HandlerTest2Context:public HandlerContext  {
  // 用於接收客戶端發送的請求
  Simple::TestNull req_;
  // 用於發送響應給客戶端
  Simple::TestReply   rep_;

  // 發送到客戶端的方法對象
  grpc::ServerAsyncResponseWriter<Simple::TestReply> responder_;
  // 構造函數
  HandlerTest2Context()
    :responder_(&ctx_)
  {}
};

struct HandlerTest3Context:public HandlerContext {
  // 用於接收客戶端發送的請求
  Simple::TestRequest req_;
  // 用於發送響應給客戶端
  Simple::TestReply   rep_;

  // 發送到客戶端的方法對象
  grpc::ServerAsyncResponseWriter<Simple::TestReply> responder_;
  // 構造函數
  HandlerTest3Context()
    :responder_(&ctx_)
  {}
};


// Test1 實現都是差不都的,這裏只是爲了測試,就隨便返回點數據了
grpc::Status Test1(grpc::ServerContext*       context,
                   const Simple::TestRequest* request,
                   Simple::TestNull*          response)
{
  printf("%s %d\n",__func__,__LINE__);
  std::ostrstream os;
  os << "Client Name = " << request->name() << '\n';
  os << "Clinet ID   = " << request->id()   << '\n';
  os << "Clinet Value= " << request->value()<< '\n';
  std::string message = os.str();
  // grpc狀態能夠設置message,因此也能夠用來返回一些信息
  return grpc::Status(grpc::StatusCode::OK,message);
}
// Test2
grpc::Status Test2(grpc::ServerContext*       context,
                   const Simple::TestNull*    request,
                   Simple::TestReply*         response)
{
  printf("%s %d\n",__func__,__LINE__);
  response->set_tid(100);
  response->set_svrname("Simple Server");
  response->set_takeuptime(0.01);
  return grpc::Status::OK;
}
// Test3
grpc::Status Test3(grpc::ServerContext*       context,
                   const Simple::TestRequest* request,
                   Simple::TestReply*         response)
{
  printf("%s %d\n",__func__,__LINE__);
  std::ostrstream os;
  os << "Client Name = " << request->name() << '\n';
  os << "Clinet ID   = " << request->id()   << '\n';
  os << "Clinet Value= " << request->value()<< '\n';
  std::string message = os.str();

  response->set_tid(__LINE__);
  response->set_svrname(__FILE__);
  response->set_takeuptime(1.234);
  // grpc狀態能夠設置message
  return grpc::Status(grpc::StatusCode::OK,std::move(message));
}

int main()
{
  // 服務構建器,用於構建同步或者異步服務
  grpc::ServerBuilder builder;
  // 添加監聽的地址和端口,後一個參數用於設置認證方式,這裏選擇不認證
  builder.AddListeningPort("0.0.0.0:33333",grpc::InsecureServerCredentials());
  // 建立一個異步服務對象
  Simple::Server::AsyncService service;
  // 註冊服務
  builder.RegisterService(&service);

  // 添加一個完成隊列,用於與 gRPC 運行時異步通訊
  std::unique_ptr<grpc::ServerCompletionQueue> cq_ptr = builder.AddCompletionQueue();

  // 構建服務器
  std::unique_ptr<grpc::Server> server(builder.BuildAndStart());
  std::cout<<"Server Runing"<<std::endl;
  // 這裏用一個map來記錄一下下面要進行處理的請求
  // 由於這裏也是單線程的,因此不加鎖了
  std::map<HandlerContext*,int> handlerMap; // value用於記錄是Test1仍是二、3
  {
    // 先建立三個類型接口的請求處理上下文對象
    HandlerTest1Context* htc1 = new HandlerTest1Context;
    htc1->status_ = 1; // 設置狀態爲1(由於只須要區分是否已經發送響應完成)
    HandlerTest2Context* htc2 = new HandlerTest2Context;
    htc2->status_ = 1;
    HandlerTest3Context* htc3 = new HandlerTest3Context;
    htc3->status_ = 1;

    // 將三個上下文對象存入map中
    handlerMap[htc1] = 1; // 值用於區分是哪一個類型
    handlerMap[htc2] = 2;
    handlerMap[htc3] = 3;

    // 進入下面死循環前須要先註冊一下請求
    service.RequestTest1(
        &htc1->ctx_         /*服務上下文對象*/,
        &htc1->req_         /*用於接收請求的對象*/,
        &htc1->responder_   /*異步寫響應對象*/,
        cq_ptr.get()        /*新的調用使用的完成隊列*/,
        cq_ptr.get()        /*通知使用的完成隊列*/,
        htc1                /*惟一標識tag*/);
    service.RequestTest2(&htc2->ctx_,&htc2->req_,&htc2->responder_,cq_ptr.get(),cq_ptr.get(),htc2);
    service.RequestTest3(&htc3->ctx_,&htc3->req_,&htc3->responder_,cq_ptr.get(),cq_ptr.get(),htc3);
  }
  // 異步服務這裏不能使用 server.Wait() 來等待處理,由於是異步服務
  // 服務器會把到達的請求放入隊列,須要本身從完成隊列取出請求進行處理
  // 因此這裏須要一個死循環來獲取請求並進行處理
  while(true){
    // 前面已經註冊了請求處理,這裏阻塞從完成隊列中取出一個請求進行處理
    HandlerContext* htc = NULL;
    bool ok = false; 
    GPR_ASSERT(cq_ptr->Next((void**)&htc, &ok));
    GPR_ASSERT(ok);
    // 根據tag判斷是哪個請求
    // 由於前面註冊請求處理的時候使用的就是對象地址
    // 因此這裏直接從map裏面取出來判斷便可
    int type = handlerMap[htc];
    // 判斷狀態,看是否是已經響應發送了
    if(htc->status_ == 2) {
      // 從map中移除
      handlerMap.erase(htc);
      // 由於這裏並非多態類,必須根據類型操做
      switch(type) {
        case 1:
          {
            // 釋放對象(這裏未對這個對象進行復用)
            delete (HandlerTest1Context*)htc;
          }
          break;
        case 2:
          {
            delete (HandlerTest2Context*)htc;
          }
          break;
        case 3:
          {
            delete (HandlerTest3Context*)htc;
          }
          break;
      }
      continue; // 回到從完成隊列獲取下一個
    }

    // 根據type進行相應的處理
    switch(type) {
      case 1: /*Test1的處理*/
        {
          // 從新建立一個請求處理上下文對象(以便不影響下一個請求的處理)
          HandlerTest1Context* htc1 = new HandlerTest1Context;
          htc1->status_ = 1;    // 設置狀態爲1
          handlerMap[htc1] = 1; // 保存到handlerMap中
          service.RequestTest1(&htc1->ctx_,&htc1->req_,&htc1->responder_,
                               cq_ptr.get(),cq_ptr.get(),htc1);
            
          HandlerTest1Context* h = (HandlerTest1Context*)htc;
          grpc::Status status = Test1(&h->ctx_,&h->req_,&h->rep_);
          // 設置狀態爲發送響應
          h->status_ = 2;
          // 調用responder_進行響應發送(異步)
          h->responder_.Finish(h->rep_/*發送的響應*/,status/*狀態碼*/,htc/*請求處理的惟一tag*/);
        }
        break;
      case 2: /*Test2的處理*/
        {
          HandlerTest2Context* htc2 = new HandlerTest2Context;
          htc2->status_ = 1;    // 設置狀態爲1
          handlerMap[htc2] = 2; // 保存到handlerMap中
          service.RequestTest2(&htc2->ctx_,&htc2->req_,&htc2->responder_,
                               cq_ptr.get(),cq_ptr.get(),htc2);
            
          HandlerTest2Context* h = (HandlerTest2Context*)htc;
          grpc::Status status = Test2(&h->ctx_,&h->req_,&h->rep_);
          // 設置狀態爲發送響應
          h->status_ = 2;
          // 調用responder_進行響應發送(異步)
          h->responder_.Finish(h->rep_/*發送的響應*/,status/*狀態碼*/,htc/*請求處理的惟一tag*/);
        }
        break;
      case 3: /*Test3的處理*/
        {
          HandlerTest3Context* htc3 = new HandlerTest3Context;
          htc3->status_ = 1;    // 設置狀態爲1
          handlerMap[htc3] = 3; // 保存到handlerMap中
          service.RequestTest3(&htc3->ctx_,&htc3->req_,&htc3->responder_,
                               cq_ptr.get(),cq_ptr.get(),htc3);
            
          HandlerTest3Context* h = (HandlerTest3Context*)htc;
          grpc::Status status = Test3(&h->ctx_,&h->req_,&h->rep_);
          // 設置狀態爲發送響應
          h->status_ = 2;
          // 調用responder_進行響應發送(異步)
          h->responder_.Finish(h->rep_/*發送的響應*/,status/*狀態碼*/,htc/*請求處理的惟一tag*/);
        }
        break;
    }
  }
  return 0;
}

async_service2.cpp

上面雖然是使用到了grpc的異步服務機制,可是隻是爲了描述清楚異步服務的建立過程,是一個單線程的簡陋實現。下面寫一個使用線程池的實現。測試

// > g++ -o aservice2 async_service2.cpp  simple.grpc.pb.cc simple.pb.cc -std=c++11 -I. -lgrpc++ -lgrpc -lprotobuf -lgpr -lz -lcares -laddress_sorting -lpthread -Wno-deprecated

// 線程池的代碼可見 https://www.cnblogs.com/oloroso/p/5881863.html
#include "threadpool.h"
#include "simple.grpc.pb.h"
#include <grpcpp/grpcpp.h>

#include <memory>
#include <iostream>
#include <strstream>
#include <chrono>

struct HandlerContextBase {
  // 當前對象類型,用於肯定是Test1/2/3哪個請求的
  int                 type_;
  // 當前處理狀態(處理分爲兩步:1處理請求構建響應數據;2發送響應)
  // 這裏記錄一下完成到哪一步了,以便進行相關操做
  int                 status_; // (1構建響應完成;2發送完成)
  // rpc的上下文,容許經過它進行諸如壓縮、身份驗證,以及把元數據發回客戶端等。
  grpc::ServerContext ctx_;
};

template<typename RequestType,typename ReplyType>
struct HandlerContext:public HandlerContextBase {
  // 用於接收客戶端發送的請求
  RequestType         req_;
  // 用於發送響應給客戶端
  ReplyType           rep_;
  // 發送到客戶端的方法對象
  grpc::ServerAsyncResponseWriter<ReplyType> responder_;
  //================================================
  // 構造函數
  HandlerContext()
    :responder_(&ctx_)
  {}

};
typedef HandlerContext<Simple::TestRequest,Simple::TestNull>  HandlerTest1Context;
typedef HandlerContext<Simple::TestNull,Simple::TestReply>    HandlerTest2Context;
typedef HandlerContext<Simple::TestRequest,Simple::TestReply> HandlerTest3Context;

unsigned long get_tid()
{
  std::thread::id tid = std::this_thread::get_id();
  std::ostrstream os;
  os << tid;
  unsigned long tidx = std::stol(os.str());
  return tidx;
}

// Test1 實現都是差不都的,這裏只是爲了測試,就隨便返回點數據了
grpc::Status Test1(grpc::ServerContext*       context,
                   const Simple::TestRequest* request,
                   Simple::TestNull*          response)
{
  printf("%s %d\n",__func__,__LINE__);
  std::ostrstream os;
  os << "Client Name = " << request->name() << '\n';
  os << "Clinet ID   = " << request->id()   << '\n';
  os << "Clinet Value= " << request->value()<< '\n';
  std::string message = os.str();
  // grpc狀態能夠設置message,因此也能夠用來返回一些信息
  return grpc::Status(grpc::StatusCode::OK,message);
}
// Test2
grpc::Status Test2(grpc::ServerContext*       context,
                   const Simple::TestNull*    request,
                   Simple::TestReply*         response)
{
  printf("%s %d\n",__func__,__LINE__);
  response->set_tid(100);
  response->set_svrname("Simple Server");
  response->set_takeuptime(0.01);
  return grpc::Status::OK;
}
// Test3
grpc::Status Test3(grpc::ServerContext*       context,
                   const Simple::TestRequest* request,
                   Simple::TestReply*         response)
{
  printf("%s %d\n",__func__,__LINE__);
  int tid = get_tid();
  std::ostrstream os;
  os << "Client Name = " << request->name() << '\n';
  os << "Clinet ID   = " << request->id()   << '\n';
  os << "Clinet Value= " << request->value()<< '\n';
  os << "Server TID  = " << tid<<'\n';
  std::string message = os.str();
  
  // 休眠0.5秒,以便觀察異步執行的效果
  std::this_thread::sleep_for(std::chrono::milliseconds(500));

  response->set_tid(tid);
  response->set_svrname(__FILE__);
  response->set_takeuptime(1.234);
  // grpc狀態能夠設置message
  return grpc::Status(grpc::StatusCode::OK,std::move(message));
}

int main()
{
  // 服務構建器,用於構建同步或者異步服務
  grpc::ServerBuilder builder;
  // 添加監聽的地址和端口,後一個參數用於設置認證方式,這裏選擇不認證
  builder.AddListeningPort("0.0.0.0:33333",grpc::InsecureServerCredentials());
  // 建立一個異步服務對象
  Simple::Server::AsyncService service;
  // 註冊服務
  builder.RegisterService(&service);

  // 添加一個完成隊列,用於與 gRPC 運行時異步通訊
  std::unique_ptr<grpc::ServerCompletionQueue> cq_ptr = builder.AddCompletionQueue();

  // 構建服務器
  std::unique_ptr<grpc::Server> server(builder.BuildAndStart());
  std::cout<<"Server Runing"<<std::endl;
  // 下面能夠有幾個工做線程就先註冊幾個,也能夠僅註冊一個(至少一個)
  /*for(int i=0;i<4;++i)*/ {
    // 先建立三個類型接口的請求處理上下文對象
    HandlerTest1Context* htc1 = new HandlerTest1Context;
    htc1->status_ = 1; // 設置狀態爲1(由於只須要區分是否已經發送響應完成)
    htc1->type_   = 1; // 設置類型爲1
    HandlerTest2Context* htc2 = new HandlerTest2Context;
    htc2->status_ = 1;
    htc2->type_   = 2;
    HandlerTest3Context* htc3 = new HandlerTest3Context;
    htc3->status_ = 1;
    htc3->type_   = 3;

    // 進入下面死循環前須要先註冊一下請求
    service.RequestTest1(
        &htc1->ctx_         /*服務上下文對象*/,
        &htc1->req_         /*用於接收請求的對象*/,
        &htc1->responder_   /*異步寫響應對象*/,
        cq_ptr.get()        /*新的調用使用的完成隊列*/,
        cq_ptr.get()        /*通知使用的完成隊列*/,
        htc1                /*惟一標識tag*/);
    service.RequestTest2(&htc2->ctx_,&htc2->req_,&htc2->responder_,cq_ptr.get(),cq_ptr.get(),htc2);
    service.RequestTest3(&htc3->ctx_,&htc3->req_,&htc3->responder_,cq_ptr.get(),cq_ptr.get(),htc3);
  }

  // 建立線程池,使用4個工做線程,用於構建請求的響應
  ThreadPool pool(4);

  // 異步服務這裏不能使用 server->Wait() 來等待處理,由於是異步服務
  // 服務器會把到達的請求放入隊列,須要本身從完成隊列取出請求進行處理
  // 因此這裏須要一個死循環來獲取請求並進行處理
  while(true){
    // 前面已經註冊了請求處理,這裏阻塞從完成隊列中取出一個請求進行處理
    HandlerContextBase* htc = NULL;
    bool ok = false; 
    GPR_ASSERT(cq_ptr->Next((void**)&htc, &ok));
    GPR_ASSERT(ok);
    // 根據tag判斷是哪個請求
    // 由於前面註冊請求處理的時候使用的就是對象地址
    // 因此這裏直接從map裏面取出來判斷便可
    int type = htc->type_;
    // 判斷狀態,看是否是已經響應發送了
    if(htc->status_ == 2) {
      // 由於這裏並非多態類,必須根據類型操做
      switch(type) {
        case 1:
          {
            // 釋放對象(這裏未對這個對象進行復用)
            delete (HandlerTest1Context*)htc;
          }
          break;
        case 2:
          {
            delete (HandlerTest2Context*)htc;
          }
          break;
        case 3:
          {
            delete (HandlerTest3Context*)htc;
          }
          break;
      }
      continue; // 回到從完成隊列獲取下一個
    }
    
    // 從新建立一個請求處理上下文對象(以便可以接受下一個請求進行處理)
    switch(type) {
      case 1:
        {
          HandlerTest1Context* htc1 = new HandlerTest1Context;
          htc1->status_ = 1;    // 設置狀態爲1
          htc1->type_   = 1;    // 設置類型爲1
          service.RequestTest1(&htc1->ctx_,&htc1->req_,&htc1->responder_,
                               cq_ptr.get(),cq_ptr.get(),htc1);
        }
        break;
      case 2:
        {
          HandlerTest2Context* htc2 = new HandlerTest2Context;
          htc2->status_ = 1;    // 設置狀態爲1
          htc2->type_   = 1;    // 設置類型爲2
          service.RequestTest2(&htc2->ctx_,&htc2->req_,&htc2->responder_,
                               cq_ptr.get(),cq_ptr.get(),htc2);
        }
        break;
      case 3:
        {
          HandlerTest3Context* htc3 = new HandlerTest3Context;
          htc3->status_ = 1;    // 設置狀態爲1
          htc3->type_   = 3;    // 設置類型爲3
          service.RequestTest3(&htc3->ctx_,&htc3->req_,&htc3->responder_,
                               cq_ptr.get(),cq_ptr.get(),htc3);
        }
        break;
    }

    pool.enqueue([type,htc](){
    // 根據type進行相應的處理
    switch(type) {
      case 1: /*Test1的處理*/
        {
          HandlerTest1Context* h = (HandlerTest1Context*)htc;
          grpc::Status status = Test1(&h->ctx_,&h->req_,&h->rep_);
          // 設置狀態爲發送響應
          h->status_ = 2;
          // 調用responder_進行響應發送(異步)
          h->responder_.Finish(h->rep_/*發送的響應*/,status/*狀態碼*/,htc/*請求處理的惟一tag*/);
        }
        break;
      case 2: /*Test2的處理*/
        {
          HandlerTest2Context* h = (HandlerTest2Context*)htc;
          grpc::Status status = Test2(&h->ctx_,&h->req_,&h->rep_);
          // 設置狀態爲發送響應
          h->status_ = 2;
          // 調用responder_進行響應發送(異步)
          h->responder_.Finish(h->rep_/*發送的響應*/,status/*狀態碼*/,htc/*請求處理的惟一tag*/);
        }
        break;
      case 3: /*Test3的處理*/
        {
          HandlerTest3Context* h = (HandlerTest3Context*)htc;
          grpc::Status status = Test3(&h->ctx_,&h->req_,&h->rep_);
          // 設置狀態爲發送響應
          h->status_ = 2;
          // 調用responder_進行響應發送(異步)
          h->responder_.Finish(h->rep_/*發送的響應*/,status/*狀態碼*/,htc/*請求處理的惟一tag*/);
        }
        break;
    }
  });
  }
  return 0;
}
相關文章
相關標籤/搜索