百度開源高性能RPC框架 sofa-pbrpc

簡介

sofa-pbrpc是基於Google Protocol Buffers 實現的RPC網絡通訊庫,在百度公司各部門獲得普遍使用,天天支撐上億次內部調用。sofa-pbrpc基於百度大搜索高併發高負載的業務場景不斷打磨,成爲一套簡單易用的輕量級高性能RPC框架。2014年sofa-pbrpc正式對外開源受到廣大開發人員的關注,目前sofa-pbrpc已經在各大互聯網公司產品中使用。c++

開源地址:https://github.com/baidu/sofa-pbrpcgit

目標

  • 輕量
  • 穩定
  • 高性能
  • 易用

特性

  • 接口簡單,容易使用
  • 實現高效,性能優異(高吞吐、低延遲、高併發鏈接數)
  • 測試完善,運行穩定
  • 支持同步和異步調用,知足不一樣類型需求
  • 支持多級超時設定,靈活控制請求超時時間
  • 支持精準的網絡流量控制,對應用層透明
  • 支持透明壓縮傳輸,節省帶寬
  • 提供服務和方法級別的服務調用統計信息,方便監控
  • 支持自動創建鏈接和自動重連,用戶無需感知鏈接
  • 遠程地址相同的Client Stub共享一個鏈接通道,節省資源
  • 空閒鏈接自動關閉,及時釋放資源
  • 支持Mock測試
  • 支持多Server負載均衡與容錯
  • 原生支持HTTP協議訪問
  • 提供內建的Web監控頁面
  • 提供Python客戶端庫
  • 支持webservice,用戶快速定義web server處理邏輯
  • 支持profiling,實時查看程序的資源消耗,方便問題追查

輸入圖片說明

接口

主要接口類

主要用戶接口分爲四個接口類和三個option。 輸入圖片說明github

用戶配置

Server端配置:RpcServerOptionsweb

參數名 參數說明
work_thread_num 工做線程數
max_pending_buffer_size pengding buffer 大小 (MB)
max_throughput_in 最大入帶寬限制 (MB/s)
max_throughput_out 最大出帶寬限制 (MB/s)
keep_alive_time 空閒鏈接維持時間 (s)

Client端配置:RpcClientOptionsjson

參數名 參數說明
work_thread_num 工做線程數
callback_thread_num 回調線程數
max_pending_buffer_size pengding buffer 大小 (MB)
max_throughput_in 最大入帶寬限制 (MB/s)
max_throughput_out 最大出帶寬限制 (MB/s)

快速使用

使用sofa-pbrpc只須要三步:網絡

  • 定義通信協議
  • 實現Server
  • 實現Client

樣例代碼參見sample/echo併發

定義通信協議

定義協議只須要編寫一個proto文件便可。 範例:echo_service.proto負載均衡

package sofa.pbrpc.test;
option cc_generic_services = true;
message EchoRequest {
    required string message = 1;
}
message EchoResponse {
    required string message = 1;
}
service EchoServer {
    rpc Echo(EchoRequest) returns(EchoResponse);
}

使用protoc編譯'echo_service.proto',生成接口文件'echo_service.pb.h'和'echo_service.pb.cc'。框架

注意:異步

  • package會被映射到C++中的namespace,爲了不衝突建議使用package;
  • 須要設置「cc_generic_services」,以通知protoc工具生成RPC框架代碼;
  • 這裏EchoRequest和EchoResponse的成員徹底相同,在實際應用中能夠設置不一樣的成員;

實現Server

頭文件

#include <sofa/pbrpc/pbrpc.h>  // sofa-pbrpc頭文件
#include "echo_service.pb.h"   // service接口定義頭文件

實現服務

class EchoServerImpl : public sofa::pbrpc::test::EchoServer
{
public:
    EchoServerImpl() {}
    virtual ~EchoServerImpl() {}

private:
    virtual void Echo(google::protobuf::RpcController* controller,
                      const sofa::pbrpc::test::EchoRequest* request,
                      sofa::pbrpc::test::EchoResponse* response,
                      google::protobuf::Closure* done)
    {
        sofa::pbrpc::RpcController* cntl =
            static_cast<sofa::pbrpc::RpcController*>(controller);
        SLOG(NOTICE, "Echo(): request message from %s: %s",
            cntl->RemoteAddress().c_str(), request->message().c_str());
        response->set_message("echo message: " + request->message());
        done->Run();
    }
};

注意:

  • 服務完成後必須調用done->Run(),通知RPC系統服務完成,觸發發送Response;
  • 在調了done->Run()以後,Echo的全部四個參數都再也不能訪問; done->Run()能夠分派到其餘線程中執行,以實現了真正的異步處理;

註冊和啓動服務

int main()
{
    SOFA_PBRPC_SET_LOG_LEVEL(NOTICE);

    sofa::pbrpc::RpcServerOptions options;
    options.work_thread_num = 8;
    sofa::pbrpc::RpcServer rpc_server(options);

    if (!rpc_server.Start("0.0.0.0:12321")) {
        SLOG(ERROR, "start server failed");
        return EXIT_FAILURE;
    }

    sofa::pbrpc::test::EchoServer* echo_service = new EchoServerImpl();
    if (!rpc_server.RegisterService(echo_service)) {
        SLOG(ERROR, "register service failed");
        return EXIT_FAILURE;
    }
    rpc_server.Run();
    rpc_server.Stop();

    return EXIT_SUCCESS;
}

實現Client

Client支持同步和異步兩種調用方式:

  • 同步調用時,調用線程會被阻塞,直到收到回覆或者超時;
  • 異步調用時,調用線程不會被阻塞,收到回覆或者超時會調用用戶提供的回調函數;

頭文件

#include <sofa/pbrpc/pbrpc.h>  // sofa-pbrpc頭文件
#include "echo_service.pb.h"   // service接口定義頭文件

同步調用

int main()
{
    SOFA_PBRPC_SET_LOG_LEVEL(NOTICE);
    sofa::pbrpc::RpcClientOptions client_options;
    client_options.work_thread_num = 8;
    sofa::pbrpc::RpcClient rpc_client(client_options);
    sofa::pbrpc::RpcChannel rpc_channel(&rpc_client, "127.0.0.1:12321");
 
    sofa::pbrpc::test::EchoServer_Stub stub(&rpc_channel);
 
    sofa::pbrpc::test::EchoRequest request;
    request.set_message("Hello world!");
    sofa::pbrpc::test::EchoResponse response;
    sofa::pbrpc::RpcController controller;
    controller.SetTimeout(3000);
    stub.Echo(&controller, &request, &response, NULL);
    if (controller.Failed()) {
        SLOG(ERROR, "request failed: %s", controller.ErrorText().c_str());
    }

    return EXIT_SUCCESS;
}

異步調用

void EchoCallback(sofa::pbrpc::RpcController* cntl,
        sofa::pbrpc::test::EchoRequest* request,
        sofa::pbrpc::test::EchoResponse* response,
        bool* callbacked)
{
    SLOG(NOTICE, "RemoteAddress=%s", cntl->RemoteAddress().c_str());
    SLOG(NOTICE, "IsRequestSent=%s", cntl->IsRequestSent() ? "true" : "false");
    if (cntl->IsRequestSent())
    {
        SLOG(NOTICE, "LocalAddress=%s", cntl->LocalAddress().c_str());
        SLOG(NOTICE, "SentBytes=%ld", cntl->SentBytes());
    }
    if (cntl->Failed()) {
        SLOG(ERROR, "request failed: %s", cntl->ErrorText().c_str());
    }
    else {
        SLOG(NOTICE, "request succeed: %s", response->message().c_str());
    }
    delete cntl;
    delete request;
    delete response;

    *callbacked = true;
}

int main()
{
    SOFA_PBRPC_SET_LOG_LEVEL(NOTICE);

    sofa::pbrpc::RpcClientOptions client_options;
    sofa::pbrpc::RpcClient rpc_client(client_options);

    sofa::pbrpc::RpcChannel rpc_channel(&rpc_client, "127.0.0.1:12321");
    sofa::pbrpc::test::EchoServer_Stub stub(&rpc_channel);
    sofa::pbrpc::test::EchoRequest* request = new sofa::pbrpc::test::EchoRequest();
    request->set_message("Hello from qinzuoyan01");
    sofa::pbrpc::test::EchoResponse* response = new sofa::pbrpc::test::EchoResponse();
    sofa::pbrpc::RpcController* cntl = new sofa::pbrpc::RpcController();
    cntl->SetTimeout(3000);
    bool callbacked = false;
    google::protobuf::Closure* done = sofa::pbrpc::NewClosure(
            &EchoCallback, cntl, request, response, &callbacked);

    stub.Echo(cntl, request, response, done);
    while (!callbacked) {
        usleep(100000);
    }

    return EXIT_SUCCESS;
}

注意:

  • 異步調用傳入的controller、request、response參數,在回調函數執行以前需一直保持有效;
  • 回調函數的執行會分配到專門的回調線程中運行,能夠經過設置RpcClientOptions的callback_thread_num來配置回調線程數;

工具

sofa-pbrpc-client

  • 查詢server的健康情況(health)、配置參數(option)、負載狀況(status)
  • 查詢server對外提供的服務列表(list)
  • 獲取服務相關的protobuf類型描述信息(desc)
  • 使用text格式的請求數據,向server指定Method發送rpc請求調用(call)
  • 獲取服務的統計信息,包括處理請求數、平均處理時間、最大處理時間等(stat)

實現

系統結構

輸入圖片說明

  • RpcClientStream/RpcServerStream:表明client和server之間的鏈接,用於client和server的網絡通訊。
  • ThreadGroup:client和server內部線程池,用於io操做和執行回調。
  • TimeoutManager:採用訂閱者模型,對rpc請求進行超時管理。
  • RpcListenser:接受來自client的鏈接請求,建立與client之間的鏈接。
  • ServicePool:server端服務管理與路由。

設計原理

網絡模型

網絡協議棧

在sofa-pbrpc中網絡數據自上而下流劃分爲RpcClientStream/RpcServerStream、RpcMessageStream、RpcByteStream三層。字節流層主要負責網絡通訊相關的操做,操做對象爲序列化後的二機制字節流;消息流層處理的對象是由header、meta和data組裝的消息,負責消息級別的控制與統計;協議層負責異步發送請求和接收響應數據。採用這樣協議棧方式的層次劃分更加有利於數據協議的擴展。 輸入圖片說明

RPC 協議

一條rpc消息由RpcMessageHeader、RpcMeta和Data組成。 輸入圖片說明

struct RpcMessageHeader {
    union {
        char    magic_str[4];
        uint32  magic_str_value;
    };                    // 4 bytes
    int32   meta_size;    // 4 bytes
    int64   data_size;    // 8 bytes
    int64   message_size; // 8 bytes: message_size = meta_size + data_size, for check

    RpcMessageHeader()
        : magic_str_value(SOFA_RPC_MAGIC)
        , meta_size(0), data_size(0), message_size(0) {}

    bool CheckMagicString() const
    {
        return magic_str_value == SOFA_RPC_MAGIC;
    }
};
message RpcMeta {
  enum Type {
    REQUEST = 0;
    RESPONSE = 1;
  };
  required Type type = 1;
  required uint64 sequence_id = 2;
  optional string method = 100;
  optional int64 server_timeout = 101;
  optional bool failed = 200;
  optional int32 error_code = 201;
  optional string reason = 202;
  optional CompressType compress_type = 300;
  optional CompressType expected_response_compress_type = 301;
}

一次RPC調用通過如下流程:

  1. Stub調用RPC函數發起RPC請求。
  2. RpcChannel調用CallMethod執行RPC調用。
  3. RpcClient選取RpcClientStream異步發送請求,並添加至超時隊列。
  4. server端RpcListener接收到client的鏈接,建立對應RpcServerStream。
  5. RpcServerStream接收數據,根據meta信息在ServicePool中選取對應Service.Method執行。
  6. server經過RpcServerStream發送執行結果,回覆過程與請求過程相似。 輸入圖片說明

線程模型

asio異步模型,底層使用epoll。 輸入圖片說明 輸入圖片說明

緩衝區管理

sofa-pbrpc將內存劃分爲固定大小的buffer做爲緩衝區,對buffer採用引用計數進行管理,減小沒必要要的內存拷貝。

輸入圖片說明 輸入圖片說明

透明壓縮

採用裝飾者模式的透明壓縮,易於擴展。 輸入圖片說明

超時管理

使用lock+swap操做縮小臨界區。 輸入圖片說明

流量控制

按時間片分配流量配額,保證流控精準高效。 輸入圖片說明

輸入圖片說明

技術特色

支持HTTP協議

除了使用原生client訪問server外,sofa-pbrpc也支持使用http協議訪問server上的服務。同時,用戶能夠經過使用server端的WebService工具類,快速實現server的對於http請求的處理邏輯。

支持json格式數據傳輸

sofa-pbrpc支持用戶使用http客戶端向server發送json格式的數據請求,並返回json格式的響應。

提供豐富的工具類

sofa-pbrpc提供經常使用工具類給開發者,包括:

類別 頭文件 說明
智能指針 sofa/pbrpc/smart_ptr/smart_ptr.hpp 包括scoped_ptr,shared_ptr,weak_ptr等
原子操做 sofa/pbrpc/atomic.h 支持fetch,inc,dec,cas等
鎖操做 sofa/pbrpc/locks.h 提供了互斥鎖,自旋鎖,讀寫鎖的封裝
定時管理 sofa/pbrpc/timeout_manager.h 高效的提供了定時器功能

支持團隊

百度網頁搜索部開源團隊 opensearch@baidu.com

相關文章
相關標籤/搜索