一個基於protocol buffer的RPC實現

Protocol Buffer僅僅是提供了一套序列化和反序列化結構數據的機制,自己不具備RPC功能,可是能夠基於其實現一套RPC框架。html

Services

protocol buffer的Services類型是專門用來給RPC實現定義服務用的。git

定義示例以下:github

service SearchService {
  rpc Search (SearchRequest) returns (SearchResponse);
}

Search是方法名,SearchRequest是參數,SearchResponse是返回類型,SearchRequest、SearchResponse分別都是預先定義的Message類型。這個Service通過編譯後會生成一個SearchService類和其對應的stub實現SearchService_Stub。SearchService_Stub把調用都轉給RpcChannel處理,RpcChannel是一個接口類,RPC系統中通常本身重載RpcChannel,例如你能夠在重載類中把調用請求序列化後經過網絡傳輸到服務端。而後客戶端就能夠像下面的代碼同樣進行RPC調用了:服務器

using google::protobuf;
protobuf::RpcChannel* channel;
protobuf::RpcController* controller;
SearchService* service;
SearchRequest request;
SearchResponse response;
void DoSearch() {
  // You provide classes MyRpcChannel and MyRpcController, which implement
  // the abstract interfaces protobuf::RpcChannel and protobuf::RpcController.
  channel = new MyRpcChannel("somehost.example.com:1234");
  controller = new MyRpcController;
  // The protocol compiler generates the SearchService class based on the
  // definition given above.
  service = new SearchService::Stub(channel);
  // Set up the request.
  request.set_query("protocol buffers");

  // Execute the RPC.

  service->Search(controller, request, response, protobuf::NewCallback(&Done));

}

void Done() {

  delete service;

  delete channel;

  delete controller;

}

服務端這邊要實現Service接口,就是負責具體RPC函數的實現。而且在一網絡接口上監聽請求,處理請求,反序列化收到的網絡數據後轉調到這個函數的實現,以後把返回值序列化發回客戶端做爲調用結果。像下面的代碼:網絡

using google::protobuf;

class ExampleSearchService : public SearchService {

public:

  void Search(protobuf::RpcController* controller,

              const SearchRequest* request,

              SearchResponse* response,

              protobuf::Closure* done) {

    if (request->query() == "google") {

      response->add_result()->set_url("http://www.google.com");

    } else if (request->query() == "protocol buffers") {

      response->add_result()->set_url("http://protobuf.googlecode.com");

    }

    done->Run();

  }

};

int main() {

  // You provide class MyRpcServer.  It does not have to implement any

  // particular interface; this is just an example.

  MyRpcServer server;

  protobuf::Service* service = new ExampleSearchService;

  server.ExportOnPort(1234, service);

  server.Run();

  delete service;

  return 0;

}

一個RPC實現

代碼在這(https://github.com/persistentsnail/easy_pb_rpc框架

協議

package RPC;
option cc_generic_services = true;                                               
message RpcRequestData {                                                         
    required uint32 service_id = 1;    // 對應Service                                           
    required uint32 method_id = 2;      // 對應Service中的函數                                          
    required uint32 call_id = 3;        // 對應本次調用(可能同一函數短期有屢次請求調用)                       
    required bytes content = 4;       // 對應已經序列化了的函數參數Message                                           
}
message RpcResponseData {
    required uint32 call_id = 1;       // 對應請求的call_id                                           

    required bytes content = 2;        // 對應已經序列化了的返回值Message                         
}

RPC請求是RpcRequestData Message,返回是RpcResponseData Message。service_id定義在一個配置文件services.cfg中,一個service_id對應一個服務名字,由服務端客戶端共享,在程序啓動時初始化一個一一映射的map。(這樣的實現不太好,後面會提到)。在網絡上傳遞的數據格式比較簡單:異步

| Length of Encoding Binary Data (unsigned int) | RpcRequestData or RpcResponseData |。ide

客戶端

支持RPC同步異步調用,例如:函數

void Foo(::google ::protobuf:: RpcController* controller ,

const ::FooRequest * request,

                       :: FooResponse* response ,

                       :: google::protobuf ::Closure* done);

以回調參數Closure爲準,若爲NULL則是同步調用,反之異步回調之。內部實現上建立了一個底層工做線程,重載的RpcChannel實現把每次調用結構化一個一個msg放到msg queue中,工做線程從msg queue中取msg處理,具體來講就是把msg序列化經過網絡接口把請求傳出去。邏輯上一個RpcChannel實例表明一個網絡鏈接,因此能夠重複使用一個RpcChannel對象。下面是同步調用一個EchoService的Foo方法的客戶端代碼示例:ui

RpcClient client ;

RpcChannel channel(&client , "127.0.0.1:18669");

EchoService::Stub echo_clt(& channel);

FooRequest request ;

request.set_text ("test1");

request.set_times (1);

FooResponse response ;

RpcController controller ;

echo_clt.Foo (&controller, & request, &response , NULL);

RpcClient管理全部鏈接會話,管理消息隊列,工做線程,只須要一個實例對象,RpcChannel 使用RpcClient完成鏈接和轉調。

服務端

首先註冊服務,就是建立Service的實現類對象,放到容器裏面。而後在一個網絡端口上監聽鏈接,解析網絡數據包,根據不一樣請求在服務容器裏面找合適的service調用相應method。實現的比較簡單,一個單線程服務器,同時只能處理一個請求。一個提供EchoService服務的server代碼看起來是這樣:

EchoServiceImpl *impl = new EchoServiceImpl();

RpcServer rpc_server ;

rpc_server.RegisterService (impl);

rpc_server.Start ();

服務端客戶端網絡數據處理使用的都是libevent。

一些protocol buffer的細節

1. 以前用到了service_id,要在雙端同時維護一份service id和name互相對應的配置文件,不利於部署和更新。protocol buffer能夠經過DescriptorPool自省出本身有哪些服務和方法的,能夠參見http://www.cnblogs.com/Solstice/archive/2011/04/03/2004458.html。因此在定義協議的時候能夠直接用service name而不是id,而那份配置文件天然也不須要。客戶端用服務名字作一個RPC請求,服務端經過名字判斷是否本身存在這個服務。相應的method_id也能夠考慮用method name。可是用id也是有好處,id是數值類型使用的Base 128 Varints變長編碼比字符串表示的name生成的數據包更小,另外數值作的哈希應該比DescriptorPool經過名字查找服務類更快。

 

2.應該充分使用protocol buffer錯誤處理方式,那就是使用RpcController 來作錯誤跟蹤。

 

3.協議字段類型多使用optional,由於required字段是必須有數據的,相反optional卻不必定須要,若是沒有就是一個默認值。optional類型一般用來升級協議,好比一個Message添加了一個新的optinal字段,之前使用老的Message格式的代碼序列出來的Message仍然可以被使用新的Message格式的代碼正確解析,由於optional字段不存在,他會使用默認值;相似的,使用新的Message格式的代碼序列出來的Message也可以被使用老的Message格式的代碼正確解析,由於他會忽略不認識的字段,並且他不丟掉這個字段,也就是這個Message還能被繼續正確的傳輸。

相關文章
相關標籤/搜索