從sofarpc看rpc實現框架

1、基於C++的rpc開發框架

因爲java、go之類的rpc框架比較多,而騰訊的phxrpc框架感受又過於繁瑣,並不利於理解RPC的本質。簡單看了下這個百度的這個RPC框架,以爲比較簡單直接,文檔清晰明瞭,依賴的內容少,能夠結合這個能夠工程上在用的項目看下基於protobuf的rpc實現原理。
接下來的例子一樣是使用項目自帶的demo來講明:
tsecer@protobuf: cat echo_service.proto
package sofa.pbrpc.test;
option cc_generic_services = true;java

// 定義請求消息
message EchoRequest {
required string message = 1;
}git

// 定義迴應消息
message EchoResponse {
required string message = 1;
}github

// 定義RPC服務,可包含多個方法(這裏只列出一個)
service EchoServer {
rpc Echo(EchoRequest) returns(EchoResponse);
}
tsecer@protobuf:服務器

 

2、請求的表示問題

RPC調用一般是儘可能讓client以函數調用的形式使用,典型的函數調用包括 返回值和參數列表,參數列表的數量及類型都是任意的。這些信息如何在運行時動態表示、動態打包以後發送給server?由於函數調用的時候一般是源碼的形式,而如何把源代碼調用轉換爲協議則是一個關鍵的問題。例如
int rpccall(int x, int y)
int rpccall(char *)
這種函數調用,這些源碼級別看到的內容,在運行時已經不復存在,那麼如何在運行時把這個函數調用打包呢?對於這個問題,從protobuf的RPC定義來看,全部的RCP接口很少很多隻接受兩個消息類型。下面是文件描述文檔中對於service這種類型消息的定義protobuf-master\src\google\protobuf\descriptor.proto:
// Describes a method of a service.
message MethodDescriptorProto {
optional string name = 1;app

// Input and output type names. These are resolved in the same way as
// FieldDescriptorProto.type_name, but must refer to a message type.
optional string input_type = 2;
optional string output_type = 3;框架

optional MethodOptions options = 4;less

// Identifies if client streams multiple client messages
optional bool client_streaming = 5 [default=false];
// Identifies if server streams multiple server messages
optional bool server_streaming = 6 [default=false];
}
從這個定義來看,input和output都只能是一種類型,因此不存在函數參數個數不肯定致使的運行時函數類型問題。異步

3、server如何知道是哪一個rpcsocket

一、service的處理

當請求到達服務器以後,如何知道這個請求對應那個函數呢?從百度的rpc文檔看,這個地方的實現是將該rpc的完整名稱打包在協議中一塊兒發送給server(https://github.com/baidu/sofa-pbrpc/wiki/RPC%E5%8D%8F%E8%AE%AE#rpcmeta)。
屬性名 字節數 是否可選 意義
type enum 否 代表該消息是Request仍是Response。
sequence_id uint64 否 消息的序號,用於匹配一次Rpc調用的Request和Response消息。
method string 是 僅用於Request消息,記錄請求方法的全名,譬如「test.HelloService.GreetMethod」。ide

這是最爲簡單直觀的實現方式,由於每一個service和rpc的名字在protobuf生成的協議中都是能夠直接得到。server側只須要經過將rpc的字符串名稱和對應的實現類綁定起來便可,這個功能在sofa中經過sofa-pbrpc-master\src\sofa\pbrpc\rpc_server.cc中的
bool RpcServer::RegisterService(google::protobuf::Service* service, bool take_ownership)
{
return _impl->RegisterService(service, take_ownership);
}
註冊,註冊以後以該service的名字做爲字符串放入一個_service_map中
sofa-pbrpc-master\src\sofa\pbrpc\service_pool.h文件中的
class ServicePool
{
public:
……
typedef std::map<std::string, ServiceBoard*> ServiceMap;
ServiceMap _service_map;
……
}; // class ServicePool

二、rpc(method)的查找

因爲協議中包含了service和rpc的名字,在找到service以後,就能夠經過service的ServiceDescriptor的FindMethodByName找到rpc對應的method編號,進而經過Method函數找到rpc函數指針。
sofa-pbrpc-master\src\sofa\pbrpc\rpc_request.cc
MethodBoard* RpcRequest::FindMethodBoard(
const ServicePoolPtr& service_pool,
const std::string& service_name,
const std::string& method_name)
{
ServiceBoard* service_board = service_pool->FindService(service_name);
if (service_board == NULL)
{
return NULL;
}
const google::protobuf::MethodDescriptor* method_desc =
service_board->Descriptor()->FindMethodByName(method_name);
if (method_desc == NULL)
{
return NULL;
}
return service_board->Method(method_desc->index());
}

4、協議的打解包

sofa-pbrpc-master\src\sofa\pbrpc\rpc_client_impl.cc
void RpcClientImpl::CallMethod(const google::protobuf::Message* request,
google::protobuf::Message* response,
const RpcControllerImplPtr& cntl)
{
……meta結構的初始化
// prepare request buffer
RpcMeta meta;
meta.set_type(RpcMeta::REQUEST);
meta.set_sequence_id(cntl->SequenceId());
meta.set_method(cntl->MethodId());
int64 timeout = cntl->Timeout();
if (timeout > 0)
{
meta.set_server_timeout(timeout);
}
meta.set_compress_type(cntl->RequestCompressType());
meta.set_expected_response_compress_type(cntl->ResponseCompressType());
……Request結構的初始化
header.meta_size = static_cast<int>(write_buffer.ByteCount() - header_pos - header_size);
bool serialize_request_return = false;
if (meta.compress_type() == CompressTypeNone)
{
serialize_request_return = request->SerializeToZeroCopyStream(&write_buffer);
}
else
{
::sofa::pbrpc::scoped_ptr<AbstractCompressedOutputStream> os(
get_compressed_output_stream(&write_buffer, meta.compress_type()));
serialize_request_return = request->SerializeToZeroCopyStream(os.get());
os->Flush();
}
……

5、channel的做用

當前系統中有兩種實現,一個是SimpleRpcChannelImpl,一個是DynamicRpcChannelImpl。這個channel主要是負責選擇和服務器的主機間通信,也便是負責把消息從client發送到server。

6、controller的做用

RpcControllerImpl對傳輸進行控制或者統計,例如超時時間的記錄、sequence-id的維護、使用什麼樣的傳輸協議等。

7、protobuf對於rpc有哪些內置的類型要求

從protobuf生成的代碼來看:_stub類型的接口是要求有一個額外的RpcChannel類型的channel_對象,這個參數也是_stub構造函數須要的參數類型,這個stub就是調用channel的方法來調用,從而給channel一個將消息發送給服務器的機會。爲了支持異步,函數中還有一個Closure參數

void EchoServer_Stub::Echo(::PROTOBUF_NAMESPACE_ID::RpcController* controller,
const ::sofa::pbrpc::test::EchoRequest* request,
::sofa::pbrpc::test::EchoResponse* response,
::google::protobuf::Closure* done) {
channel_->CallMethod(descriptor()->method(0),
controller, request, response, done);
而RpcController是兩個接口中都必須的參數,因此業務框架是須要實現這個方法的。
protobuf中頭文件對於該類型的定義說明
protobuf-master\src\google\protobuf\service.h

該類的主要目的是爲了提供一個操做設置相關的方法。
// An RpcController mediates a single method call. The primary purpose of
// the controller is to provide a way to manipulate settings specific to the
// RPC implementation and to find out about RPC-level errors.
//
// The methods provided by the RpcController interface are intended to be a
// "least common denominator" set of features which we expect all
// implementations to support. Specific implementations may provide more
// advanced features (e.g. deadline propagation).
class PROTOBUF_EXPORT RpcController {
public:
inline RpcController() {}
virtual ~RpcController();

// Client-side methods ---------------------------------------------
// These calls may be made from the client side only. Their results
// are undefined on the server side (may crash).

// Resets the RpcController to its initial state so that it may be reused in
// a new call. Must not be called while an RPC is in progress.
virtual void Reset() = 0;

// After a call has finished, returns true if the call failed. The possible
// reasons for failure depend on the RPC implementation. Failed() must not
// be called before a call has finished. If Failed() returns true, the
// contents of the response message are undefined.
virtual bool Failed() const = 0;

// If Failed() is true, returns a human-readable description of the error.
virtual std::string ErrorText() const = 0;

// Advises the RPC system that the caller desires that the RPC call be
// canceled. The RPC system may cancel it immediately, may wait awhile and
// then cancel it, or may not even cancel the call at all. If the call is
// canceled, the "done" callback will still be called and the RpcController
// will indicate that the call failed at that time.
virtual void StartCancel() = 0;

// Server-side methods ---------------------------------------------
// These calls may be made from the server side only. Their results
// are undefined on the client side (may crash).

// Causes Failed() to return true on the client side. "reason" will be
// incorporated into the message returned by ErrorText(). If you find
// you need to return machine-readable information about failures, you
// should incorporate it into your response protocol buffer and should
// NOT call SetFailed().
virtual void SetFailed(const std::string& reason) = 0;

// If true, indicates that the client canceled the RPC, so the server may
// as well give up on replying to it. The server should still call the
// final "done" callback.
virtual bool IsCanceled() const = 0;

// Asks that the given callback be called when the RPC is canceled. The
// callback will always be called exactly once. If the RPC completes without
// being canceled, the callback will be called after completion. If the RPC
// has already been canceled when NotifyOnCancel() is called, the callback
// will be called immediately.
//
// NotifyOnCancel() must be called no more than once per request.
virtual void NotifyOnCancel(Closure* callback) = 0;

private:
GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(RpcController);
};

RpcChannel表示到達服務的通許鏈路。
// Abstract interface for an RPC channel. An RpcChannel represents a
// communication line to a Service which can be used to call that Service's
// methods. The Service may be running on another machine. Normally, you
// should not call an RpcChannel directly, but instead construct a stub Service
// wrapping it. Example:
// RpcChannel* channel = new MyRpcChannel("remotehost.example.com:1234");
// MyService* service = new MyService::Stub(channel);
// service->MyMethod(request, &response, callback);
class PROTOBUF_EXPORT RpcChannel {
public:
inline RpcChannel() {}
virtual ~RpcChannel();

// Call the given method of the remote service. The signature of this
// procedure looks the same as Service::CallMethod(), but the requirements
// are less strict in one important way: the request and response objects
// need not be of any specific class as long as their descriptors are
// method->input_type() and method->output_type().
virtual void CallMethod(const MethodDescriptor* method,
RpcController* controller,
const Message* request,
Message* response,
Closure* done) = 0;

private:
GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(RpcChannel);
};

protobuf-master\src\google\protobuf\stubs\callback.h

// Abstract interface for a callback. When calling an RPC, you must provide
// a Closure to call when the procedure completes. See the Service interface
// in service.h.
//
// To automatically construct a Closure which calls a particular function or
// method with a particular set of parameters, use the NewCallback() function.
// Example:
// void FooDone(const FooResponse* response) {
// ...
// }
//
// void CallFoo() {
// ...
// // When done, call FooDone() and pass it a pointer to the response.
// Closure* callback = NewCallback(&FooDone, response);
// // Make the call.
// service->Foo(controller, request, response, callback);
// }
//
// Example that calls a method:
// class Handler {
// public:
// ...
//
// void FooDone(const FooResponse* response) {
// ...
// }
//
// void CallFoo() {
// ...
// // When done, call FooDone() and pass it a pointer to the response.
// Closure* callback = NewCallback(this, &Handler::FooDone, response);
// // Make the call.
// service->Foo(controller, request, response, callback);
// }
// };
//
// Currently NewCallback() supports binding zero, one, or two arguments.
//
// Callbacks created with NewCallback() automatically delete themselves when
// executed. They should be used when a callback is to be called exactly
// once (usually the case with RPC callbacks). If a callback may be called
// a different number of times (including zero), create it with
// NewPermanentCallback() instead. You are then responsible for deleting the
// callback (using the "delete" keyword as normal).
//
// Note that NewCallback() is a bit touchy regarding argument types. Generally,
// the values you provide for the parameter bindings must exactly match the
// types accepted by the callback function. For example:
// void Foo(string s);
// NewCallback(&Foo, "foo"); // WON'T WORK: const char* != string
// NewCallback(&Foo, string("foo")); // WORKS
// Also note that the arguments cannot be references:
// void Foo(const string& s);
// string my_str;
// NewCallback(&Foo, my_str); // WON'T WORK: Can't use referecnes.
// However, correctly-typed pointers will work just fine.
class PROTOBUF_EXPORT Closure {
public:
Closure() {}
virtual ~Closure();

virtual void Run() = 0;

private:
GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(Closure);
};

8、請求的異步

從項目自帶的例子中沒有找到服務器異步的代碼。可是從實現上,若是一個server內部須要異步處理一個請求的話,應該把CallMethod傳進來的Closure* done保存起來,等本身異步完成以後調用這個對象的Run接口。

9、回包的路由

因爲server是經過accept接收socket,因此回包的時候經過socket直接返回就行了。

相關文章
相關標籤/搜索