因爲項目組如今用的rpc是基於google protobuf rpc協議實現的,因此花了點時間瞭解下protobuf rpc。rpc對於作分佈式系統的人來講確定不陌生,對於rpc不瞭解的童鞋能夠自行google,這裏只是作個簡單的介紹。rpc的主要功能是讓分佈式系統的實現更爲簡單,爲提供強大的遠程調用而不損失本地調用語義的簡潔性。爲了實現這個目標,rpc框架須要提供一種透明調用機制讓使用者沒必要顯示區分本地調用仍是遠程調用。rpc架構涉及的組件以下:python
客戶方像調用本地方法同樣去調用遠程接口方法,RPC 框架提供接口的代理實現,實際的調用將委託給代理RpcProxy
。代理封裝調用信息並將調用轉交給RpcInvoker
去實際執行。在客戶端的RpcInvoker
經過鏈接器RpcConnector
去維持與服務端的通道RpcChannel
,並使用RpcProtocol
執行協議編碼(encode)並將編碼後的請求消息經過通道發送給服務方。RPC 服務端接收器 RpcAcceptor
接收客戶端的調用請求,一樣使用RpcProtocol
執行協議解碼(decode)。解碼後的調用信息傳遞給RpcProcessor
去控制處理調用過程,最後再委託調用給RpcInvoker
去實際執行並返回調用結果。服務器
protobuf rpc在上面組件中主要扮演RpcProtocol的角色,使得咱們省去了協議的設計,而且protobuf協議在編碼和空間效率都是上很是高效的,這也是不少公司採用protobuf做爲數據序列化和通訊協議的緣由。同時protobuf rpc定義了一個抽象的rpc框架,以下圖所示:網絡
RpcServiceStub和RpcService類是protobuf編譯器根據proto定義生成的類,RpcService定義了服務端暴露給客戶端的函數接口,具體實現須要用戶本身繼承這個類來實現。RpcServiceStub定義了服務端暴露函數的描述,並將客戶端對RpcServiceStub中函數的調用統一轉換到調用RpcChannel中的CallMethod方法,CallMethod經過RpcServiceStub傳過來的函數描述符和函數參數對該次rpc調用進行encode,最終經過RpcConnecor發送給服務方。對方以客戶端相反的過程最終調用RpcSerivice中定義的函數。事實上,protobuf rpc的框架只是RpcChannel中定義了空的CallMethod,因此具體怎樣進行encode和調用RpcConnector都要本身實現。RpcConnector在protobuf中沒有定義,因此這個完成由用戶本身實現,它的做用就是收發rpc消息包。在服務端,RpcChannel經過調用RpcService中的CallMethod來具體調用RpcService中暴露給客戶端的函數。架構
介紹了這麼多,對於怎麼樣用protobuf rpc來實現一個rpc確定仍是一頭霧水吧,下面就用protobuf rpc來實現一個簡單的python版rpc demo吧。框架
下面直接給出demo描述PRC的proto文件,至於proto文件的編寫規則能夠參考protobuf官網。async
common.proto文件:分佈式
1 package game; 2 3 message RequestMessage 4 { 5 required string message = 1; 6 } 7 8 message ResponseMessage 9 { 10 required string message = 1; 11 }
game_service.proto文件:函數
1 package game; 2 3 import "common.proto"; 4 option py_generic_services = true; 5 6 service GameService 7 { 8 rpc connect_server(RequestMessage) returns(RequestMessage); 9 }
common.proto文件描述了RPC中收發的消息;game_service.proto描述了服務器導出的connect_server函數,該函數接受RequestMessage對象做爲參數,並返回RequestMessage對象。在使用PRC協議時,必須加上option py_generic_services = true;可選項,要否則編譯器不會生成包含connect_server函數的GameService描述。ui
1 class MyRpcChannel(service.RpcChannel): 2 def __init__(self, rpc_service, conn): 3 super(MyRpcChannel, self).__init__() 4 self.logger = LogManager.get_logger("MyRpcChannel") 5 6 def CallMethod(self, method_descriptor, rpc_controller, request, response_class, done): 7 """"protol buffer rpc 須要的函數,用來發送rpc調用""" 8 self.logger.info('CallMethod') 9 cmd_index = method_descriptor.index 10 assert(cmd_index < 65535) 11 data = request.SerializeToString() 12 total_len = len(data) + 2 13 self.conn.send_data(''.join([pack('<I', total_len), pack('<H', cmd_index), data])) 14 15 def from_request(self): 16 """"從網絡解析出一個完整的請求以後調的函數""" 17 index_data = self.rpc_request.data[0:2] 18 cmd_index = unpack('<H', index_data)[0] 19 rpc_service = self.rpc_service 20 s_descriptor = rpc_service.GetDescriptor() 21 method = s_descriptor.methods[cmd_index] 22 try: 23 request = rpc_service.GetRequestClass(method)() 24 serialized = self.rpc_request.data[2:] 25 request.ParseFromString(serialized) 26 rpc_service.CallMethod(method, self.controller, request, None) 27 except: 28 self.logger.error("Call rpc method failed!") 29 self.logger.log_last_except() 30 return True
最後就是繼承GameService,並實現connect_server函數了。google
1 class GameService(game_service_pb2.GameService): 2 def __init__(self): 3 self.logger = LogManager.get_logger("GameService") 4 5 def connect_server(self, rpc_controller, request, callback): 6 self.logger.info('%s', request.message)
至於用於網絡收發消息的RpcConnector,能夠使用python的asyncore庫實現,具體實如今這就不討論了。
從上面的實現來看,protobuf rpc的實現主要包括編寫proto文件並編譯生成對應的service_pb2文件,繼承RpcChannel並實現CallMethod和調用Service的CallMethod,繼承Service來實現暴露給客戶端的函數。