目錄html
編寫異步服務和編寫同步服務的基本流程都差很少,稍有點區別。ios
同步服務你只須要實現相關服務接口的實現便可,不須要管理太多東西。異步服務GRPC運行時會把讀取到的客戶端請求放入CompletionQueue中,須要主動從中取出,而後進行相關的處理,能夠多線程也能夠單線程。c++
這裏和grpc使用記錄(二)簡單同步服務實例中的同樣,這裏就很少說了。服務器
這裏也是和grpc使用記錄(二)簡單同步服務實例中的同樣的。多線程
這裏能夠複用前面同步服務的代碼,只須要作簡單的修改便可。異步
簡單說一下建立一個GRPC異步服務的要點:async
AsyncService
,而不是Service
。grpc::ServerCompletionQueue
用於異步任務操做。AsyncService::RequestXXXX
來註冊XXXX
接口的處理。下面代碼簡單的建立了3個HandlerContext
的結構體類型,用於保存三個接口請求處理過程當中的數據,實際的請求處理仍是和以前同步服務的同樣,這裏只是寫成了Test1
、Test2
、Test3
三個函數的形式。函數
// > g++ -o aservice async_service.cpp simple.grpc.pb.cc simple.pb.cc -std=c++11 -I. -lgrpc++ -lgrpc -lprotobuf -lgpr -lz -lcares -laddress_sorting -lpthread -Wno-deprecated #include "simple.grpc.pb.h" #include <grpcpp/grpcpp.h> #include <memory> #include <iostream> #include <strstream> struct HandlerContext { // 當前處理狀態(處理分爲兩步:1處理請求構建響應數據;2發送響應) // 這裏記錄一下完成到哪一步了,以便進行相關操做 int status_; // (1構建響應完成;2發送完成) // rpc的上下文,容許經過它進行諸如壓縮、身份驗證,以及把元數據發回客戶端等。 grpc::ServerContext ctx_; }; struct HandlerTest1Context:public HandlerContext { // 用於接收客戶端發送的請求 Simple::TestRequest req_; // 用於發送響應給客戶端 Simple::TestNull rep_; // 發送到客戶端的方法對象 grpc::ServerAsyncResponseWriter<Simple::TestNull> responder_; // 構造函數 HandlerTest1Context() :responder_(&ctx_) {} }; struct HandlerTest2Context:public HandlerContext { // 用於接收客戶端發送的請求 Simple::TestNull req_; // 用於發送響應給客戶端 Simple::TestReply rep_; // 發送到客戶端的方法對象 grpc::ServerAsyncResponseWriter<Simple::TestReply> responder_; // 構造函數 HandlerTest2Context() :responder_(&ctx_) {} }; struct HandlerTest3Context:public HandlerContext { // 用於接收客戶端發送的請求 Simple::TestRequest req_; // 用於發送響應給客戶端 Simple::TestReply rep_; // 發送到客戶端的方法對象 grpc::ServerAsyncResponseWriter<Simple::TestReply> responder_; // 構造函數 HandlerTest3Context() :responder_(&ctx_) {} }; // Test1 實現都是差不都的,這裏只是爲了測試,就隨便返回點數據了 grpc::Status Test1(grpc::ServerContext* context, const Simple::TestRequest* request, Simple::TestNull* response) { printf("%s %d\n",__func__,__LINE__); std::ostrstream os; os << "Client Name = " << request->name() << '\n'; os << "Clinet ID = " << request->id() << '\n'; os << "Clinet Value= " << request->value()<< '\n'; std::string message = os.str(); // grpc狀態能夠設置message,因此也能夠用來返回一些信息 return grpc::Status(grpc::StatusCode::OK,message); } // Test2 grpc::Status Test2(grpc::ServerContext* context, const Simple::TestNull* request, Simple::TestReply* response) { printf("%s %d\n",__func__,__LINE__); response->set_tid(100); response->set_svrname("Simple Server"); response->set_takeuptime(0.01); return grpc::Status::OK; } // Test3 grpc::Status Test3(grpc::ServerContext* context, const Simple::TestRequest* request, Simple::TestReply* response) { printf("%s %d\n",__func__,__LINE__); std::ostrstream os; os << "Client Name = " << request->name() << '\n'; os << "Clinet ID = " << request->id() << '\n'; os << "Clinet Value= " << request->value()<< '\n'; std::string message = os.str(); response->set_tid(__LINE__); response->set_svrname(__FILE__); response->set_takeuptime(1.234); // grpc狀態能夠設置message return grpc::Status(grpc::StatusCode::OK,std::move(message)); } int main() { // 服務構建器,用於構建同步或者異步服務 grpc::ServerBuilder builder; // 添加監聽的地址和端口,後一個參數用於設置認證方式,這裏選擇不認證 builder.AddListeningPort("0.0.0.0:33333",grpc::InsecureServerCredentials()); // 建立一個異步服務對象 Simple::Server::AsyncService service; // 註冊服務 builder.RegisterService(&service); // 添加一個完成隊列,用於與 gRPC 運行時異步通訊 std::unique_ptr<grpc::ServerCompletionQueue> cq_ptr = builder.AddCompletionQueue(); // 構建服務器 std::unique_ptr<grpc::Server> server(builder.BuildAndStart()); std::cout<<"Server Runing"<<std::endl; // 這裏用一個map來記錄一下下面要進行處理的請求 // 由於這裏也是單線程的,因此不加鎖了 std::map<HandlerContext*,int> handlerMap; // value用於記錄是Test1仍是二、3 { // 先建立三個類型接口的請求處理上下文對象 HandlerTest1Context* htc1 = new HandlerTest1Context; htc1->status_ = 1; // 設置狀態爲1(由於只須要區分是否已經發送響應完成) HandlerTest2Context* htc2 = new HandlerTest2Context; htc2->status_ = 1; HandlerTest3Context* htc3 = new HandlerTest3Context; htc3->status_ = 1; // 將三個上下文對象存入map中 handlerMap[htc1] = 1; // 值用於區分是哪一個類型 handlerMap[htc2] = 2; handlerMap[htc3] = 3; // 進入下面死循環前須要先註冊一下請求 service.RequestTest1( &htc1->ctx_ /*服務上下文對象*/, &htc1->req_ /*用於接收請求的對象*/, &htc1->responder_ /*異步寫響應對象*/, cq_ptr.get() /*新的調用使用的完成隊列*/, cq_ptr.get() /*通知使用的完成隊列*/, htc1 /*惟一標識tag*/); service.RequestTest2(&htc2->ctx_,&htc2->req_,&htc2->responder_,cq_ptr.get(),cq_ptr.get(),htc2); service.RequestTest3(&htc3->ctx_,&htc3->req_,&htc3->responder_,cq_ptr.get(),cq_ptr.get(),htc3); } // 異步服務這裏不能使用 server.Wait() 來等待處理,由於是異步服務 // 服務器會把到達的請求放入隊列,須要本身從完成隊列取出請求進行處理 // 因此這裏須要一個死循環來獲取請求並進行處理 while(true){ // 前面已經註冊了請求處理,這裏阻塞從完成隊列中取出一個請求進行處理 HandlerContext* htc = NULL; bool ok = false; GPR_ASSERT(cq_ptr->Next((void**)&htc, &ok)); GPR_ASSERT(ok); // 根據tag判斷是哪個請求 // 由於前面註冊請求處理的時候使用的就是對象地址 // 因此這裏直接從map裏面取出來判斷便可 int type = handlerMap[htc]; // 判斷狀態,看是否是已經響應發送了 if(htc->status_ == 2) { // 從map中移除 handlerMap.erase(htc); // 由於這裏並非多態類,必須根據類型操做 switch(type) { case 1: { // 釋放對象(這裏未對這個對象進行復用) delete (HandlerTest1Context*)htc; } break; case 2: { delete (HandlerTest2Context*)htc; } break; case 3: { delete (HandlerTest3Context*)htc; } break; } continue; // 回到從完成隊列獲取下一個 } // 根據type進行相應的處理 switch(type) { case 1: /*Test1的處理*/ { // 從新建立一個請求處理上下文對象(以便不影響下一個請求的處理) HandlerTest1Context* htc1 = new HandlerTest1Context; htc1->status_ = 1; // 設置狀態爲1 handlerMap[htc1] = 1; // 保存到handlerMap中 service.RequestTest1(&htc1->ctx_,&htc1->req_,&htc1->responder_, cq_ptr.get(),cq_ptr.get(),htc1); HandlerTest1Context* h = (HandlerTest1Context*)htc; grpc::Status status = Test1(&h->ctx_,&h->req_,&h->rep_); // 設置狀態爲發送響應 h->status_ = 2; // 調用responder_進行響應發送(異步) h->responder_.Finish(h->rep_/*發送的響應*/,status/*狀態碼*/,htc/*請求處理的惟一tag*/); } break; case 2: /*Test2的處理*/ { HandlerTest2Context* htc2 = new HandlerTest2Context; htc2->status_ = 1; // 設置狀態爲1 handlerMap[htc2] = 2; // 保存到handlerMap中 service.RequestTest2(&htc2->ctx_,&htc2->req_,&htc2->responder_, cq_ptr.get(),cq_ptr.get(),htc2); HandlerTest2Context* h = (HandlerTest2Context*)htc; grpc::Status status = Test2(&h->ctx_,&h->req_,&h->rep_); // 設置狀態爲發送響應 h->status_ = 2; // 調用responder_進行響應發送(異步) h->responder_.Finish(h->rep_/*發送的響應*/,status/*狀態碼*/,htc/*請求處理的惟一tag*/); } break; case 3: /*Test3的處理*/ { HandlerTest3Context* htc3 = new HandlerTest3Context; htc3->status_ = 1; // 設置狀態爲1 handlerMap[htc3] = 3; // 保存到handlerMap中 service.RequestTest3(&htc3->ctx_,&htc3->req_,&htc3->responder_, cq_ptr.get(),cq_ptr.get(),htc3); HandlerTest3Context* h = (HandlerTest3Context*)htc; grpc::Status status = Test3(&h->ctx_,&h->req_,&h->rep_); // 設置狀態爲發送響應 h->status_ = 2; // 調用responder_進行響應發送(異步) h->responder_.Finish(h->rep_/*發送的響應*/,status/*狀態碼*/,htc/*請求處理的惟一tag*/); } break; } } return 0; }
上面雖然是使用到了grpc的異步服務機制,可是隻是爲了描述清楚異步服務的建立過程,是一個單線程的簡陋實現。下面寫一個使用線程池的實現。測試
// > g++ -o aservice2 async_service2.cpp simple.grpc.pb.cc simple.pb.cc -std=c++11 -I. -lgrpc++ -lgrpc -lprotobuf -lgpr -lz -lcares -laddress_sorting -lpthread -Wno-deprecated // 線程池的代碼可見 https://www.cnblogs.com/oloroso/p/5881863.html #include "threadpool.h" #include "simple.grpc.pb.h" #include <grpcpp/grpcpp.h> #include <memory> #include <iostream> #include <strstream> #include <chrono> struct HandlerContextBase { // 當前對象類型,用於肯定是Test1/2/3哪個請求的 int type_; // 當前處理狀態(處理分爲兩步:1處理請求構建響應數據;2發送響應) // 這裏記錄一下完成到哪一步了,以便進行相關操做 int status_; // (1構建響應完成;2發送完成) // rpc的上下文,容許經過它進行諸如壓縮、身份驗證,以及把元數據發回客戶端等。 grpc::ServerContext ctx_; }; template<typename RequestType,typename ReplyType> struct HandlerContext:public HandlerContextBase { // 用於接收客戶端發送的請求 RequestType req_; // 用於發送響應給客戶端 ReplyType rep_; // 發送到客戶端的方法對象 grpc::ServerAsyncResponseWriter<ReplyType> responder_; //================================================ // 構造函數 HandlerContext() :responder_(&ctx_) {} }; typedef HandlerContext<Simple::TestRequest,Simple::TestNull> HandlerTest1Context; typedef HandlerContext<Simple::TestNull,Simple::TestReply> HandlerTest2Context; typedef HandlerContext<Simple::TestRequest,Simple::TestReply> HandlerTest3Context; unsigned long get_tid() { std::thread::id tid = std::this_thread::get_id(); std::ostrstream os; os << tid; unsigned long tidx = std::stol(os.str()); return tidx; } // Test1 實現都是差不都的,這裏只是爲了測試,就隨便返回點數據了 grpc::Status Test1(grpc::ServerContext* context, const Simple::TestRequest* request, Simple::TestNull* response) { printf("%s %d\n",__func__,__LINE__); std::ostrstream os; os << "Client Name = " << request->name() << '\n'; os << "Clinet ID = " << request->id() << '\n'; os << "Clinet Value= " << request->value()<< '\n'; std::string message = os.str(); // grpc狀態能夠設置message,因此也能夠用來返回一些信息 return grpc::Status(grpc::StatusCode::OK,message); } // Test2 grpc::Status Test2(grpc::ServerContext* context, const Simple::TestNull* request, Simple::TestReply* response) { printf("%s %d\n",__func__,__LINE__); response->set_tid(100); response->set_svrname("Simple Server"); response->set_takeuptime(0.01); return grpc::Status::OK; } // Test3 grpc::Status Test3(grpc::ServerContext* context, const Simple::TestRequest* request, Simple::TestReply* response) { printf("%s %d\n",__func__,__LINE__); int tid = get_tid(); std::ostrstream os; os << "Client Name = " << request->name() << '\n'; os << "Clinet ID = " << request->id() << '\n'; os << "Clinet Value= " << request->value()<< '\n'; os << "Server TID = " << tid<<'\n'; std::string message = os.str(); // 休眠0.5秒,以便觀察異步執行的效果 std::this_thread::sleep_for(std::chrono::milliseconds(500)); response->set_tid(tid); response->set_svrname(__FILE__); response->set_takeuptime(1.234); // grpc狀態能夠設置message return grpc::Status(grpc::StatusCode::OK,std::move(message)); } int main() { // 服務構建器,用於構建同步或者異步服務 grpc::ServerBuilder builder; // 添加監聽的地址和端口,後一個參數用於設置認證方式,這裏選擇不認證 builder.AddListeningPort("0.0.0.0:33333",grpc::InsecureServerCredentials()); // 建立一個異步服務對象 Simple::Server::AsyncService service; // 註冊服務 builder.RegisterService(&service); // 添加一個完成隊列,用於與 gRPC 運行時異步通訊 std::unique_ptr<grpc::ServerCompletionQueue> cq_ptr = builder.AddCompletionQueue(); // 構建服務器 std::unique_ptr<grpc::Server> server(builder.BuildAndStart()); std::cout<<"Server Runing"<<std::endl; // 下面能夠有幾個工做線程就先註冊幾個,也能夠僅註冊一個(至少一個) /*for(int i=0;i<4;++i)*/ { // 先建立三個類型接口的請求處理上下文對象 HandlerTest1Context* htc1 = new HandlerTest1Context; htc1->status_ = 1; // 設置狀態爲1(由於只須要區分是否已經發送響應完成) htc1->type_ = 1; // 設置類型爲1 HandlerTest2Context* htc2 = new HandlerTest2Context; htc2->status_ = 1; htc2->type_ = 2; HandlerTest3Context* htc3 = new HandlerTest3Context; htc3->status_ = 1; htc3->type_ = 3; // 進入下面死循環前須要先註冊一下請求 service.RequestTest1( &htc1->ctx_ /*服務上下文對象*/, &htc1->req_ /*用於接收請求的對象*/, &htc1->responder_ /*異步寫響應對象*/, cq_ptr.get() /*新的調用使用的完成隊列*/, cq_ptr.get() /*通知使用的完成隊列*/, htc1 /*惟一標識tag*/); service.RequestTest2(&htc2->ctx_,&htc2->req_,&htc2->responder_,cq_ptr.get(),cq_ptr.get(),htc2); service.RequestTest3(&htc3->ctx_,&htc3->req_,&htc3->responder_,cq_ptr.get(),cq_ptr.get(),htc3); } // 建立線程池,使用4個工做線程,用於構建請求的響應 ThreadPool pool(4); // 異步服務這裏不能使用 server->Wait() 來等待處理,由於是異步服務 // 服務器會把到達的請求放入隊列,須要本身從完成隊列取出請求進行處理 // 因此這裏須要一個死循環來獲取請求並進行處理 while(true){ // 前面已經註冊了請求處理,這裏阻塞從完成隊列中取出一個請求進行處理 HandlerContextBase* htc = NULL; bool ok = false; GPR_ASSERT(cq_ptr->Next((void**)&htc, &ok)); GPR_ASSERT(ok); // 根據tag判斷是哪個請求 // 由於前面註冊請求處理的時候使用的就是對象地址 // 因此這裏直接從map裏面取出來判斷便可 int type = htc->type_; // 判斷狀態,看是否是已經響應發送了 if(htc->status_ == 2) { // 由於這裏並非多態類,必須根據類型操做 switch(type) { case 1: { // 釋放對象(這裏未對這個對象進行復用) delete (HandlerTest1Context*)htc; } break; case 2: { delete (HandlerTest2Context*)htc; } break; case 3: { delete (HandlerTest3Context*)htc; } break; } continue; // 回到從完成隊列獲取下一個 } // 從新建立一個請求處理上下文對象(以便可以接受下一個請求進行處理) switch(type) { case 1: { HandlerTest1Context* htc1 = new HandlerTest1Context; htc1->status_ = 1; // 設置狀態爲1 htc1->type_ = 1; // 設置類型爲1 service.RequestTest1(&htc1->ctx_,&htc1->req_,&htc1->responder_, cq_ptr.get(),cq_ptr.get(),htc1); } break; case 2: { HandlerTest2Context* htc2 = new HandlerTest2Context; htc2->status_ = 1; // 設置狀態爲1 htc2->type_ = 1; // 設置類型爲2 service.RequestTest2(&htc2->ctx_,&htc2->req_,&htc2->responder_, cq_ptr.get(),cq_ptr.get(),htc2); } break; case 3: { HandlerTest3Context* htc3 = new HandlerTest3Context; htc3->status_ = 1; // 設置狀態爲1 htc3->type_ = 3; // 設置類型爲3 service.RequestTest3(&htc3->ctx_,&htc3->req_,&htc3->responder_, cq_ptr.get(),cq_ptr.get(),htc3); } break; } pool.enqueue([type,htc](){ // 根據type進行相應的處理 switch(type) { case 1: /*Test1的處理*/ { HandlerTest1Context* h = (HandlerTest1Context*)htc; grpc::Status status = Test1(&h->ctx_,&h->req_,&h->rep_); // 設置狀態爲發送響應 h->status_ = 2; // 調用responder_進行響應發送(異步) h->responder_.Finish(h->rep_/*發送的響應*/,status/*狀態碼*/,htc/*請求處理的惟一tag*/); } break; case 2: /*Test2的處理*/ { HandlerTest2Context* h = (HandlerTest2Context*)htc; grpc::Status status = Test2(&h->ctx_,&h->req_,&h->rep_); // 設置狀態爲發送響應 h->status_ = 2; // 調用responder_進行響應發送(異步) h->responder_.Finish(h->rep_/*發送的響應*/,status/*狀態碼*/,htc/*請求處理的惟一tag*/); } break; case 3: /*Test3的處理*/ { HandlerTest3Context* h = (HandlerTest3Context*)htc; grpc::Status status = Test3(&h->ctx_,&h->req_,&h->rep_); // 設置狀態爲發送響應 h->status_ = 2; // 調用responder_進行響應發送(異步) h->responder_.Finish(h->rep_/*發送的響應*/,status/*狀態碼*/,htc/*請求處理的惟一tag*/); } break; } }); } return 0; }