sofa-pbrpc是基於Google Protocol Buffers 實現的RPC網絡通訊庫,在百度公司各部門獲得普遍使用,天天支撐上億次內部調用。sofa-pbrpc基於百度大搜索高併發高負載的業務場景不斷打磨,成爲一套簡單易用的輕量級高性能RPC框架。2014年sofa-pbrpc正式對外開源受到廣大開發人員的關注,目前sofa-pbrpc已經在各大互聯網公司產品中使用。c++
開源地址:https://github.com/baidu/sofa-pbrpcgit
主要用戶接口分爲四個接口類和三個option。 github
Server端配置:RpcServerOptionsweb
參數名 | 參數說明 |
---|---|
work_thread_num | 工做線程數 |
max_pending_buffer_size | pengding buffer 大小 (MB) |
max_throughput_in | 最大入帶寬限制 (MB/s) |
max_throughput_out | 最大出帶寬限制 (MB/s) |
keep_alive_time | 空閒鏈接維持時間 (s) |
Client端配置:RpcClientOptionsjson
參數名 | 參數說明 |
---|---|
work_thread_num | 工做線程數 |
callback_thread_num | 回調線程數 |
max_pending_buffer_size | pengding buffer 大小 (MB) |
max_throughput_in | 最大入帶寬限制 (MB/s) |
max_throughput_out | 最大出帶寬限制 (MB/s) |
使用sofa-pbrpc只須要三步:網絡
樣例代碼參見sample/echo。併發
定義協議只須要編寫一個proto文件便可。 範例:echo_service.proto負載均衡
package sofa.pbrpc.test; option cc_generic_services = true; message EchoRequest { required string message = 1; } message EchoResponse { required string message = 1; } service EchoServer { rpc Echo(EchoRequest) returns(EchoResponse); }
使用protoc編譯'echo_service.proto',生成接口文件'echo_service.pb.h'和'echo_service.pb.cc'。框架
注意:異步
#include <sofa/pbrpc/pbrpc.h> // sofa-pbrpc頭文件 #include "echo_service.pb.h" // service接口定義頭文件
class EchoServerImpl : public sofa::pbrpc::test::EchoServer { public: EchoServerImpl() {} virtual ~EchoServerImpl() {} private: virtual void Echo(google::protobuf::RpcController* controller, const sofa::pbrpc::test::EchoRequest* request, sofa::pbrpc::test::EchoResponse* response, google::protobuf::Closure* done) { sofa::pbrpc::RpcController* cntl = static_cast<sofa::pbrpc::RpcController*>(controller); SLOG(NOTICE, "Echo(): request message from %s: %s", cntl->RemoteAddress().c_str(), request->message().c_str()); response->set_message("echo message: " + request->message()); done->Run(); } };
注意:
int main() { SOFA_PBRPC_SET_LOG_LEVEL(NOTICE); sofa::pbrpc::RpcServerOptions options; options.work_thread_num = 8; sofa::pbrpc::RpcServer rpc_server(options); if (!rpc_server.Start("0.0.0.0:12321")) { SLOG(ERROR, "start server failed"); return EXIT_FAILURE; } sofa::pbrpc::test::EchoServer* echo_service = new EchoServerImpl(); if (!rpc_server.RegisterService(echo_service)) { SLOG(ERROR, "register service failed"); return EXIT_FAILURE; } rpc_server.Run(); rpc_server.Stop(); return EXIT_SUCCESS; }
Client支持同步和異步兩種調用方式:
#include <sofa/pbrpc/pbrpc.h> // sofa-pbrpc頭文件 #include "echo_service.pb.h" // service接口定義頭文件
int main() { SOFA_PBRPC_SET_LOG_LEVEL(NOTICE); sofa::pbrpc::RpcClientOptions client_options; client_options.work_thread_num = 8; sofa::pbrpc::RpcClient rpc_client(client_options); sofa::pbrpc::RpcChannel rpc_channel(&rpc_client, "127.0.0.1:12321"); sofa::pbrpc::test::EchoServer_Stub stub(&rpc_channel); sofa::pbrpc::test::EchoRequest request; request.set_message("Hello world!"); sofa::pbrpc::test::EchoResponse response; sofa::pbrpc::RpcController controller; controller.SetTimeout(3000); stub.Echo(&controller, &request, &response, NULL); if (controller.Failed()) { SLOG(ERROR, "request failed: %s", controller.ErrorText().c_str()); } return EXIT_SUCCESS; }
void EchoCallback(sofa::pbrpc::RpcController* cntl, sofa::pbrpc::test::EchoRequest* request, sofa::pbrpc::test::EchoResponse* response, bool* callbacked) { SLOG(NOTICE, "RemoteAddress=%s", cntl->RemoteAddress().c_str()); SLOG(NOTICE, "IsRequestSent=%s", cntl->IsRequestSent() ? "true" : "false"); if (cntl->IsRequestSent()) { SLOG(NOTICE, "LocalAddress=%s", cntl->LocalAddress().c_str()); SLOG(NOTICE, "SentBytes=%ld", cntl->SentBytes()); } if (cntl->Failed()) { SLOG(ERROR, "request failed: %s", cntl->ErrorText().c_str()); } else { SLOG(NOTICE, "request succeed: %s", response->message().c_str()); } delete cntl; delete request; delete response; *callbacked = true; } int main() { SOFA_PBRPC_SET_LOG_LEVEL(NOTICE); sofa::pbrpc::RpcClientOptions client_options; sofa::pbrpc::RpcClient rpc_client(client_options); sofa::pbrpc::RpcChannel rpc_channel(&rpc_client, "127.0.0.1:12321"); sofa::pbrpc::test::EchoServer_Stub stub(&rpc_channel); sofa::pbrpc::test::EchoRequest* request = new sofa::pbrpc::test::EchoRequest(); request->set_message("Hello from qinzuoyan01"); sofa::pbrpc::test::EchoResponse* response = new sofa::pbrpc::test::EchoResponse(); sofa::pbrpc::RpcController* cntl = new sofa::pbrpc::RpcController(); cntl->SetTimeout(3000); bool callbacked = false; google::protobuf::Closure* done = sofa::pbrpc::NewClosure( &EchoCallback, cntl, request, response, &callbacked); stub.Echo(cntl, request, response, done); while (!callbacked) { usleep(100000); } return EXIT_SUCCESS; }
注意:
在sofa-pbrpc中網絡數據自上而下流劃分爲RpcClientStream/RpcServerStream、RpcMessageStream、RpcByteStream三層。字節流層主要負責網絡通訊相關的操做,操做對象爲序列化後的二機制字節流;消息流層處理的對象是由header、meta和data組裝的消息,負責消息級別的控制與統計;協議層負責異步發送請求和接收響應數據。採用這樣協議棧方式的層次劃分更加有利於數據協議的擴展。
一條rpc消息由RpcMessageHeader、RpcMeta和Data組成。
struct RpcMessageHeader { union { char magic_str[4]; uint32 magic_str_value; }; // 4 bytes int32 meta_size; // 4 bytes int64 data_size; // 8 bytes int64 message_size; // 8 bytes: message_size = meta_size + data_size, for check RpcMessageHeader() : magic_str_value(SOFA_RPC_MAGIC) , meta_size(0), data_size(0), message_size(0) {} bool CheckMagicString() const { return magic_str_value == SOFA_RPC_MAGIC; } };
message RpcMeta { enum Type { REQUEST = 0; RESPONSE = 1; }; required Type type = 1; required uint64 sequence_id = 2; optional string method = 100; optional int64 server_timeout = 101; optional bool failed = 200; optional int32 error_code = 201; optional string reason = 202; optional CompressType compress_type = 300; optional CompressType expected_response_compress_type = 301; }
一次RPC調用通過如下流程:
asio異步模型,底層使用epoll。
sofa-pbrpc將內存劃分爲固定大小的buffer做爲緩衝區,對buffer採用引用計數進行管理,減小沒必要要的內存拷貝。
採用裝飾者模式的透明壓縮,易於擴展。
使用lock+swap操做縮小臨界區。
按時間片分配流量配額,保證流控精準高效。
除了使用原生client訪問server外,sofa-pbrpc也支持使用http協議訪問server上的服務。同時,用戶能夠經過使用server端的WebService工具類,快速實現server的對於http請求的處理邏輯。
sofa-pbrpc支持用戶使用http客戶端向server發送json格式的數據請求,並返回json格式的響應。
sofa-pbrpc提供經常使用工具類給開發者,包括:
類別 | 頭文件 | 說明 |
---|---|---|
智能指針 | sofa/pbrpc/smart_ptr/smart_ptr.hpp | 包括scoped_ptr,shared_ptr,weak_ptr等 |
原子操做 | sofa/pbrpc/atomic.h | 支持fetch,inc,dec,cas等 |
鎖操做 | sofa/pbrpc/locks.h | 提供了互斥鎖,自旋鎖,讀寫鎖的封裝 |
定時管理 | sofa/pbrpc/timeout_manager.h | 高效的提供了定時器功能 |
百度網頁搜索部開源團隊 opensearch@baidu.com