C++多進程併發框架

三年來一直從事服務器程序開發,一直都是忙忙碌碌,不久前結束了職業生涯的第一份工做,有了一個禮拜的休息時間,終於能夠寫寫總結了。因而把之前的開源代碼作了整理和優化,這就是FFLIB。雖然這邊總結看起來像日記,有不少廢話,可是此文仍然是有很大針對性的。針對服務器開發中常見的問題,如多線程併發、消息轉發、異步、性能優化、單元測試,提出本身的看法。 html

面對的問題

從事開發工程中,遇到過很多問題,不少時候因爲時間緊迫,沒有使用優雅的方案。在跟業內的一些朋友交流過程當中,我也意識到有些問題是你們都存在的。簡單列舉以下: c++

  • 多線程與併發
  • 異步消息/接口調用
  • 消息的序列化與Reflection
  • 性能優化
  • 單元測試

多線程與併發

如今是多核時代,併發才能實現更高的吞吐量、更快的響應,但也是把雙刃劍。總結以下幾個用法: web

    • 多線程+顯示鎖;接口是被多線程調用的,當被調用時,顯示加鎖,再操做實體數據。悲劇的是,工程師爲了優化會設計多個鎖,以減小鎖的粒度,甚至有些地方使用了原子操做。這些都爲領域邏輯增長了額外的設計負擔。最壞的狀況是會出現死鎖。

  • 多線程+任務隊列;接口被多線程調用,但請求會被暫存到任務隊列,而任務隊列會被單線程不斷執行,典型生產者消費者模式。它的併發在於不一樣的接口可使用不一樣的任務隊列。這也是我最經常使用的併發方式。

這是兩種最多見的多線程併發,它們有個天生的缺陷——Scalability。一個機器的性能老是有瓶頸的。兩個場景的邏輯雖然由多個線程實現了併發,可是運算量十分有多是一臺機器沒法承載的。若是是多進程併發,那麼能夠分佈式把其部署到其餘機器(也可部署在一臺機器)。因此多進程併發比多線程併發更加Scalability。另外採用多進程後,每一個進程單線程設計,這樣的程序更加Simplicity。多進程的其餘優勢如解耦、模塊化、方便調試、方便重用等就不贅言了。 json

異步消息/接口調用

提到分佈式,就要說一下分佈式的通信技術。經常使用的方式以下: 性能優化

  • 類RPC;包括WebService、RPC、ICE等,特色是遠程同步調用。遠程的接口和本地的接口很是類似。可是遊戲服務器程序通常很是在乎延遲和吞吐量,因此這些阻塞線程的同步遠程調用方式並不經常使用。可是咱們必須意識到他的優勢,就是很是利於調用和測試。
  • 全異步消息;當調用遠程接口的時候,異步發送請求消息,接口響應後返回一個結果消息,調用方的回調函數處理結果消息繼續邏輯操做。因此有些邏輯就會被切割成ServiceStart和ServiceCallback兩段。有時異步會講領域邏輯變得支離破碎。另外消息處理函數中通常會寫一坨的switch/case 處理不一樣的消息。最大的問題在於單元測試,這種狀況傳統單元測試根本一籌莫展。

消息的序列化與Reflection

實現消息的序列化和反序列化的方式有不少,常見的有Struct、json、Protobuff等都有很成功的應用。我我的傾向於使用輕量級的二進制序列化,優勢是比較透明和高效,一切在掌握之中。在FFLIB 中實現了bin_encoder_t 和 bin_decoder_t 輕量級的消息序列化,幾十行代碼而已。 服務器

性能優化

已經寫過關於性能方面的總結,參見 網絡

http://www.cnblogs.com/zhiranok/archive/2012/06/06/cpp_perf.html 多線程

有的網友提到profiler、cpuprofiler、callgrind等工具。這些工具我都使用過,說實話,對於我來講,我太認同它有很高的價值。第一他們只能用於開發測試階段,能夠初步獲得一些性能上參考數據。第二它們如何實現跟蹤人們無從得知。運行其會使程序變慢,不能反映真實數據。第三重要的是,開發測試階段性能和上線後的能同樣嗎?Impossible ! 架構

關於性能,原則就是數聽說話,詳見博文,不在贅述。 併發

單元測試

關於單元測試,前邊已經談論了一些。遊戲服務器程序通常都比較龐大,可是難以想象的是,鄙人歷來沒見有項目(c++ 後臺架構的)有完整單元測試的。因爲存在着異步和多線程,傳統的單元測試框架沒法勝任,而開發支持異步的測試框架又是不現實的。咱們必須看到的是,傳統的單元測試框架已經取得了很是大的成功。據我瞭解,使用web 架構的遊戲後臺已經對於單元測試的使用已經很是成熟,取得了極其好的效果。因此個人思路是利用現有的單元測試框架,將異步消息、多線程的架構作出調整。

已經屢次談論單元測試了。其實在開發FFLIB的思路很大程度來源於此,不然可能只是一個c++ 網絡庫而已。我決定嘗試去解決這個問題的時候,把FFLIB 定位於框架。

先來看一段很是簡單的單元測試的代碼 :

Assert(2 == Add(1, 1));

請容許我對這行代碼作些解釋,對Add函數輸入參數,驗證返回值是不是預期的結果。這不就是單元測試的本質嗎?在想一下咱們異步發送消息的過程,若是每一個輸入消息約定一個結果消息包,每次發送請求時都綁定一個回調函數接收和驗證結果消息包。這樣的話就偏偏知足了傳統單元測試的步驟了。最後還需解決一個問題,Assert是不能處理異步的返回值的。幸運的是,future機制能夠化異步爲同步。不瞭解future 模式的能夠參考這裏:

http://blog.chinaunix.net/uid-23093301-id-190969.html

http://msdn.microsoft.com/zh-cn/library/dd764564.aspx#Y300

來看一下在FFLIB框架下遠程調用echo 服務的示例:

struct lambda_t
{
  static void callback(echo_t::out_t& msg_)
  {
    echo_t::in_t in;
    in.value = "XXX_echo_test_XXX";
    singleton_t<msg_bus_t>::instance()
       .get_service_group("echo")
       ->get_service(1)->async_call(in, &lambda_t::callback);
  }
};
echo_t::in_t in;
in.value = "XXX_echo_test_XXX";
singleton_t<msg_bus_t>::instance().get_service_group("echo")->get_service(1)->async_call(in, &lambda_t::callback);

當須要調用遠程接口時,async_call(in, &lambda_t::callback); 異步調用必須綁定一個回調函數,回調函數接收結果消息,能夠觸發後續操做。這樣的話,若是對echo 的遠程接口作單元測試,能夠這樣作:

rpc_future_t< echo_t::out_t> rpc_future;
echo_t::in_t in;
in.value = "XXX_echo_test_XXX";
const echo_t::out_t& out = rpc_future.call(
    singleton_t<msg_bus_t>::instance()
        .get_service_group("echo")->get_service(1), in);
Assert(in.value == out.value);
這樣全部的遠程接口均可以被單元測試覆蓋。

FFLIB 介紹

 FFLIB 結構圖

如圖所示,Client 不會直接和Service 相鏈接,而是經過Broker 中間層完成了消息傳遞。關於Broker 模式能夠參見:http://blog.chinaunix.net/uid-23093301-id-90459.html

進程間通訊採用TPC,而不是多線程使用的共享內存方式。Service 通常是單線程架構的,經過啓動多進程實現相對於多線程的併發。因爲Broker模式天生石分佈式的,因此有很好的Scalability。

消息時序圖

如何註冊服務和接口

來看一下Echo 服務的實現:

 
 
struct echo_service_t
{
public:
    void echo(echo_t::in_t& in_msg_, rpc_callcack_t<echo_t::out_t>& cb_)
    {
        logtrace((FF, "echo_service_t::echo done value<%s>", in_msg_.value.c_str()));
        echo_t::out_t out;
        out.value = in_msg_.value;
        cb_(out);
    }
};

int main(int argc, char* argv[])
{
    int g_index = 1;
    if (argc > 1)
    {
        g_index = atoi(argv[1]);
    }
    char buff[128];
    snprintf(buff, sizeof(buff), "tcp://%s:%s", "127.0.0.1", "10241");

    msg_bus_t msg_bus;
    assert(0 == singleton_t<msg_bus_t>::instance().open("tcp://127.0.0.1:10241") && "can't connnect to broker");

    echo_service_t f;

    singleton_t<msg_bus_t>::instance().create_service_group("echo");
    singleton_t<msg_bus_t>::instance().create_service("echo", g_index)
            .bind_service(&f)
            .reg(&echo_service_t::echo);

    signal_helper_t::wait();

    singleton_t<msg_bus_t>::instance().close();
    //usleep(1000);
    cout <<"\noh end\n";
    return 0;
}
 
    • create_service_group 建立一個服務group,一個服務組可能有多個並行的實例

 

  • create_service 以特定的id 建立一個服務實例

 

 

  • reg 爲該服務註冊接口

 

 

  • 接口的定義規範爲void echo(echo_t::in_t& in_msg_, rpc_callcack_t<echo_t::out_t>& cb_),第一個參數爲輸入的消息struct,第二個參數爲回調函數的模板特例,模板參數爲返回消息的struct 類型。接口無需知道發送消息等細節,只需將結果callback 便可。

 

 

  • 註冊到Broker 後,全部Client均可獲取該服務

 

 

 

消息定義的規範

咱們約定每一個接口(遠程或本地都應知足)都包含一個輸入消息和一個結果消息。來看一下echo 服務的消息定義:

 
 
struct echo_t
{
    struct in_t: public msg_i
    {
        in_t():
            msg_i("echo_t::in_t")
        {}
        virtual string encode()
        {
            return (init_encoder() << value).get_buff();
        }
        virtual void decode(const string& src_buff_)
        {
            init_decoder(src_buff_) >> value;
        }

        string value;
    };
    struct out_t: public msg_i
    {
        out_t():
            msg_i("echo_t::out_t")
        {}
        virtual string encode()
        {
            return (init_encoder() << value).get_buff();
        }
        virtual void decode(const string& src_buff_)
        {
            init_decoder(src_buff_) >> value;
        }

        string value;
    };
};
  •  每一個接口必須包含in_t消息和out_t消息,而且他們定義在接口名(如echo _t)的內部
  • 全部消息都繼承於msg_i, 其封裝了二進制的序列化、反序列化等。構造時賦予類型名做爲消息的名稱。
  • 每一個消息必須實現encode 和 decode 函數

這裏須要指出的是,FFLIB 中不須要爲每一個消息定義對應的CMD。當接口如echo向Broker 註冊時,reg接口經過C++ 模板的類型推斷會自動將該msg name 註冊給Broker, Broker爲每一個msg name 分配惟一的msg_id。Msg_bus 中自動維護了msg_name 和msg_id 的映射。Msg_i 的定義以下:

struct msg_i : public codec_i
{
    msg_i(const char* msg_name_):
        cmd(0),
        uuid(0),
        service_group_id(0),
        service_id(0),
        msg_id(0),
        msg_name(msg_name_)
    {}

    void set(uint16_t group_id, uint16_t id_, uint32_t uuid_, uint16_t msg_id_)
    {
        service_group_id = group_id;
        service_id       = id_;
        uuid             = uuid_;
        msg_id           = msg_id_;
    }

    uint16_t cmd;
    uint16_t get_group_id()   const{ return service_group_id; }
    uint16_t get_service_id() const{ return service_id;       }
    uint32_t get_uuid()       const{ return uuid;             }

    uint16_t get_msg_id()     const{ return msg_id;           }
    const string& get_name()  const
    {
        if (msg_name.empty() == false)
        {
            return msg_name;
        }
        return singleton_t<msg_name_store_t>::instance().id_to_name(this->get_msg_id());
    }

    void     set_uuid(uint32_t id_)   { uuid = id_;  }
    void     set_msg_id(uint16_t id_) { msg_id = id_;}
    void     set_sgid(uint16_t sgid_) { service_group_id = sgid_;}
    void     set_sid(uint16_t sid_)   { service_id = sid_; }
    uint32_t uuid;
    uint16_t service_group_id;
    uint16_t service_id;
    uint16_t msg_id;
    string   msg_name;

    virtual string encode(uint16_t cmd_)
    {
        this->cmd = cmd_;
        return encode();
    }
    virtual string encode() = 0;
    bin_encoder_t& init_encoder()
    {
        return encoder.init(cmd)  << uuid << service_group_id << service_id<< msg_id;
    }
    bin_encoder_t& init_encoder(uint16_t cmd_)
    {
        return encoder.init(cmd_) << uuid << service_group_id << service_id << msg_id;
    }
    bin_decoder_t& init_decoder(const string& buff_)
    {
        return decoder.init(buff_) >> uuid >> service_group_id >> service_id >> msg_id;
    }
    bin_decoder_t decoder;
    bin_encoder_t encoder;
};

關於性能

因爲遠程接口的調用必須經過Broker, Broker會爲每一個接口自動生成性能統計數據,並每10分鐘輸出到perf.txt 文件中。文件格式爲CSV,參見:

http://www.cnblogs.com/zhiranok/archive/2012/06/06/cpp_perf.html

總結

FFLIB框架擁有以下的特色:

  • 使用多進程併發。Broker 把Client 和Service 的位置透明化
  • Service 的接口要註冊到Broker, 全部鏈接Broker的Client 均可以調用(publisher/ subscriber)
  • 遠程調用必須綁定回調函數
  • 利用future 模式實現同步,從而支持單元測試
  • 消息定義規範簡單直接高效
  • 全部service的接口性能監控數據自動生成,免費的午飯
  • Service 單線程話,更simplicity

源代碼:

Svn co http://ffown.googlecode.com/svn/trunk/

運行示例:

  • Cd example/broker && make && ./app_broker -l http://127.0.0.1:10241
  • Cd example/echo_server && make && ./app_echo_server
  • Cd example/echo_client && make && ./app_echo_client
相關文章
相關標籤/搜索