Apache thrift - 使用,內部實現及構建一個可擴展的RPC框架

本文首先介紹了什麼是Apache Thrift,接着介紹了Thrift的安裝部署及如何利用Thrift來實現一個簡單的RPC應用,並簡單的探究了一下Thrift的內部實現原理,最後給出一個基於Thrift的可擴展的分佈式RPC調用框架,在中小型項目中是一個常見的SOA實踐。linux

Thrift介紹

Apache ThriftFacebook 開發的遠程服務調用框架,它採用接口描述語言(IDL)定義並建立服務,支持可擴展的跨語言服務開發,所包含的代碼生成引擎能夠在多種語言中,如 C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, Smalltalk 等建立高效的、無縫的服務,其傳輸數據採用二進制格式,相對 XML JSON 體積更小,對於高併發、大數據量和多語言的環境更有優點。本文將詳細介紹 Thrift 的使用,並簡要分析Thrift的底層運行原理,最後給出一個基於Thrift的可擴展分佈式RPC框架。c++

Thrift安裝部署

首先安裝make toolsbison

yum -y install automake gcc gcc-c++apache

wget http://ftp.gnu.org/gnu/bison/bison-2.5.1.tar.gzbootstrap

tar -zxvf bison-2.5.1.tar.gz 服務器

cd bison-2.5.1多線程

./configure --prefix=/usr/local/併發

make;make install負載均衡

而後安裝thrift底層依賴庫和boost

yum -y install libevent2-devel zlib-devel openssl-devel框架

wget http://sourceforge.net/projects/boost/files/boost/1.55.0/boost_1_55_0.tar.gzdom

tar -zxvf boost_1_55_0.tar.gz

cd boost_1_55_0

./bootstrap.sh

./b2 install

安裝Thrift

Wget http://www.apache.org/dyn/closer.cgi?path=/thrift/0.9.3/thrift-0.9.3.tar.gz

cd thrift-0.9.3

./configure;make;make install

Thrift cpp源碼類介紹

Thrift代碼包(位於thrift-0.9.3/lib/cpp/src)有如下幾個目錄:

concurrency:併發和時鐘管理方面的庫

processorProcessor相關類

protocolProtocal相關類

transporttransport相關類

serverserver相關類

async:異步rpc相關類

Thrift實現實例

這裏介紹一個簡單的 Thrift 實現實例,使讀者可以快速直觀地瞭解什麼是 Thrift 以及如何使用 Thrift 構建服務。

建立一個簡單的服務Log

首先根據 Thrift 的語法規範編寫腳本文件 log.thrift,代碼以下:

struct LogInfo {
1: required string name,
2: optional string content,
}

service LogSender {
void SendLog(1:list<LogInfo> loglist);
string GetLog(1:string logname);
}

其中定義了服務 Log 的兩個方法,每一個方法包含一個方法名,參數列表和返回類型。每一個參數包括參數序號,參數類型以及參數名。 Thrift 是對 IDL(Interface Definition Language) 描述性語言的一種具體實現。所以,以上的服務描述文件使用 IDL 語法編寫。使用 Thrift 工具編譯 log.thrift,就會生成相應的 LogSender.cpp 文件。該文件包含了在 log.thrift 文件中描述的服務Log的接口定義以及服務調用的底層通訊細節,用於構建客戶端和服務器端的功能。

調用thrift命令生成代碼,命令爲thrift --gen <language> <Thrift filename>

[root@localhost log_thrift]# thrift -gen cpp log.thrift

[root@localhost log_thrift]# tree gen-cpp/

gen-cpp/

── log_constants.cpp

── log_constants.h

── LogSender.cpp

── LogSender.h

── LogSender_server.skeleton.cpp

── log_types.cpp

└── log_types.h

Thrift文件與生成的代碼對應關係

每一個thrift文件會產生四個文件,分別爲:${thrift_name}_constants.h${thrift_name}_constants.cpp${thrift_name}_types.h${thrift_name}_types.cpp

對於含有servicethrift文件,會額外生成兩個文件,分別爲:${service_name}.h${service_name}.cpp

對於含有servicethrift文件,會生成一個可用的server樁:${service_name}_server.skeleton.cpp

 

一個阻塞式服務器實現server.cpp

#include "gen-cpp/LogSender.h"
#include <map>
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>

using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;

using boost::shared_ptr;
std::map<std::string, std::string> logMap;

class LogSenderHandler : virtual public LogSenderIf {
 public:
  LogSenderHandler() {
    // Your initialization goes here
  }

  void SendLog(const std::vector<LogInfo> & loglist) {
    // Your implementation goes here
    sleep(5);
    time_t now = time(NULL);
    printf("SendLog, now = %s\n", ctime(&now));
    for (size_t i = 0; i < loglist.size(); ++i)
    {
      if (logMap.find(loglist[i].name) == logMap.end())
      {
        printf("name=[%s], content=[%s]\n", loglist[i].name.c_str(), loglist[i].content.c_str());
        logMap.insert(std::make_pair(loglist[i].name, loglist[i].content));
      }
    }
  }

  void GetLog(std::string& _return, const std::string& logname) {
    // Your implementation goes here
    std::map<std::string,std::string>::iterator iter = logMap.find(logname);
    if (iter != logMap.end())
    {
      _return = iter->second;
    }
    else
    {
      _return = "Not Found!";
    }
  }

};

int main(int argc, char **argv) 
{
  int port = 9090;
  shared_ptr<LogSenderHandler> handler(new LogSenderHandler());
  shared_ptr<TProcessor> processor(new LogSenderProcessor(handler));
  shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
  shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
  shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());

  TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);
  server.serve();
  return 0;
}

阻塞式服務器對應客戶端實現client.cpp

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <stdint.h>
#include <string>

#include "gen-cpp/log_constants.h"
#include "gen-cpp/log_types.h"
#include "gen-cpp/LogSender.h"
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/protocol/TBinaryProtocol.h>

using namespace std;
using namespace apache::thrift;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;

void send_log(const std::string& strName, const std::string& strContent)
{
    boost::shared_ptr<TSocket> socket(new TSocket("127.0.0.1", 9090));
    boost::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
    boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
    LogSenderClient client(protocol);
    try 
    {
        transport->open();
        vector<LogInfo> logInfos;
        LogInfo logInfo;
        logInfo.__set_name(strName);
        logInfo.__set_content(strContent);
        logInfos.push_back(logInfo);
        client.SendLog(logInfos);
        transport->close();
    } catch (TException &tx)
    {
        printf("ERROR: %s\n", tx.what());
    }
}

void get_log(const std::string& strName)
{
    boost::shared_ptr<TSocket> socket(new TSocket("127.0.0.1", 9090));
    boost::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
    boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
    LogSenderClient client(protocol);
    try 
    {
        transport->open();
        std::string strResult;
            client.GetLog(strResult, strName);
        printf("GetLog: name = %s, log = %s\n", strName.c_str(), strResult.c_str());
        transport->close();
    } catch (TException &tx)
    {
        printf("ERROR: %s\n", tx.what());
    }
}

int main(int argc, char** argv)
{
    send_log("log1", "this is a example1");
    get_log("log1");
    get_log("log2");
    return 0;
}

一個非阻塞式服務器實現nonblock_server.cpp,這裏採用非阻塞服務器加線程池模式,可以在必定程度上提升併發

#include "gen-cpp/LogSender.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TNonblockingServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/concurrency/PosixThreadFactory.h>

using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
using namespace ::apache::thrift::concurrency;

using boost::shared_ptr;

#define THREAD_NUM 5

std::map<std::string, std::string> logMap;

class LogSenderHandler : virtual public LogSenderIf {
 public:
  LogSenderHandler() {
    // Your initialization goes here
  }

  void SendLog(const std::vector<LogInfo> & loglist) {
    // Your implementation goes here
    sleep(5);
    time_t now = time(NULL);
    printf("SendLog, now = %s\n", ctime(&now));
    for (size_t i = 0; i < loglist.size(); ++i)
    {
      if (logMap.find(loglist[i].name) == logMap.end())
      {
        logMap.insert(std::make_pair(loglist[i].name, loglist[i].content));
      }
    }
  }

  void GetLog(std::string& _return, const std::string& logname) {
    // Your implementation goes here
    std::map<std::string,std::string>::iterator iter = logMap.find(logname);
    if (iter != logMap.end())
    {
      _return = iter->second;
    }
    else
    {
      _return = "Not Found!";
    }
  }

};

int main(int argc, char **argv) 
{
  int port = 9090;
  shared_ptr<LogSenderHandler> handler(new LogSenderHandler());
  shared_ptr<TProcessor> processor(new LogSenderProcessor(handler));
  shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());

  shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(THREAD_NUM);
  shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory> (new PosixThreadFactory());
  threadManager->threadFactory(threadFactory);
  threadManager->start();

  TNonblockingServer server(processor, protocolFactory, port, threadManager);

  server.serve();
  return 0;
}

非阻塞式服務器對應客戶端實現nonblock_client.cpp,注意對於非阻塞服務器,客戶端需使用TFramedTransport

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <stdint.h>
#include <string>

#include "gen-cpp/log_constants.h"
#include "gen-cpp/log_types.h"
#include "gen-cpp/LogSender.h"
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/protocol/TBinaryProtocol.h>

using namespace std;
using namespace apache::thrift;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;

void send_log(const std::string& strName, const std::string& strContent)
{
    boost::shared_ptr<TSocket> socket(new TSocket("127.0.0.1", 9090));
    //對接nonblockingServer時必須的,對普通server端時用boost::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
    boost::shared_ptr<TTransport> transport(new TFramedTransport(socket)); 
    boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
    LogSenderClient client(protocol);
    try 
    {
        transport->open();
        vector<LogInfo> logInfos;
        LogInfo logInfo;
        logInfo.__set_name(strName);
        logInfo.__set_content(strContent);
        logInfos.push_back(logInfo);
        client.SendLog(logInfos);
        transport->close();
    } catch (TException &tx)
    {
        printf("ERROR: %s\n", tx.what());
    }
}

int main(int argc, char** argv)
{
    send_log("log1", "this is a example1");
    return 0;
}

運行及結果

運行阻塞式服務器,同時啓動10client,能夠觀察到,因爲阻塞服務器sleep(5)模擬每一個調用,這裏每次調用之間都相差5秒,至關因而串行進行處理的:

clip_image001

 

運行非阻塞服務器,同時啓動10client,因爲使用了非阻塞加線程池(這裏線程池大小爲5)模式,一樣是sleep(5)的模擬處理,這裏的處理速度和吞吐量都大大提升。

clip_image003

版本兼容

thrift文件內容可能會隨着時間變化的。若是已經存在的消息類型再也不符合設計要求,好比,新的設計要在message格式中添加一個額外字段,但你仍想使用之前的thrift文件產生的處理代碼。若是想要達到這個目的,須要:

1)不要修改已存在域的整數編號

2)新添加的域必須是optional的,以便格式兼容。

好比對於上面例子中的log.thrift

struct LogInfo {

1: required string name,

2: optional string content,

}

 

contentoptional的,須要將它的__isset值設爲true,才能序列化並傳輸,不然會認爲字段不存在,不會被序列化。好比client.cpp中的代碼,若是咱們將content字段__isset設爲false,server將不會收到content

logInfo.__isset.content=false;

clip_image004

內部實現

程序運行完了,咱們來看一下client.GetLog()函數的內部實現(在LogSender.cpp中)

void LogSenderClient::GetLog(std::string& _return, const std::string& logname)
{
    send_GetLog(logname);
    recv_GetLog(_return);
}

void LogSenderClient::send_GetLog(const std::string& logname)
{
    int32_t cseqid = 0;
    oprot_->writeMessageBegin("GetLog", ::apache::thrift::protocol::T_CALL, cseqid);

    LogSender_GetLog_pargs args;
    args.logname = &logname;
    args.write(oprot_);

    oprot_->writeMessageEnd();
    oprot_->getTransport()->writeEnd();
    oprot_->getTransport()->flush();
}

void LogSenderClient::recv_GetLog(std::string& _return)
{
    int32_t rseqid = 0;
    std::string fname;
    ::apache::thrift::protocol::TMessageType mtype;
    iprot_->readMessageBegin(fname, mtype, rseqid);
    if (mtype == ::apache::thrift::protocol::T_EXCEPTION) {
        ::apache::thrift::TApplicationException x;
        x.read(iprot_);
        iprot_->readMessageEnd();
        iprot_->getTransport()->readEnd();
        throw x;
    }

    if (mtype != ::apache::thrift::protocol::T_REPLY) {

        iprot_->skip(::apache::thrift::protocol::T_STRUCT);
        iprot_->readMessageEnd();
        iprot_->getTransport()->readEnd();
    }

    if (fname.compare("GetLog") != 0) {
        iprot_->skip(::apache::thrift::protocol::T_STRUCT);
        iprot_->readMessageEnd();
        iprot_->getTransport()->readEnd();
    }

    LogSender_GetLog_presult result;
    result.success = &_return;
    result.read(iprot_);
    iprot_->readMessageEnd();
    iprot_->getTransport()->readEnd();

    if (result.__isset.success) {
        // _return pointer has now been filled
        return;
    }
    throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::MISSING_RESULT, "GetLog failed: unknown result");
}

閱讀上面的代碼,能夠看出,RPC函數GetLog()實際上被轉化成了兩個函數:send_GetLogrecv_GetLog,分別用於發送數據和接收結果。數據是以消息的形式表示的,消息頭部是RPC函數名,消息內容是RPC函數的參數。

Thrift內部實現

分層圖

clip_image006

Thrift其實是實現了C/S模式,經過代碼生成工具將接口定義文件生成服務器端和客戶端代碼(能夠爲不一樣語言),從而實現服務端和客戶端跨語言的支持。用戶在Thirft描述文件中聲明本身的服務,這些服務通過編譯後會生成相應語言的代碼文件,而後用戶實現服務(客戶端調用服務,服務端提供服務)。其中protocol(協議層, 定義數據傳輸格式,能夠爲二進制或者XML等)和transport(傳輸層,定義數據傳輸方式,能夠爲TCP/IP傳輸,內存共享或者文件共享等)被用做運行時庫。

數據類型

Thrift 腳本可定義的數據類型包括如下幾種類型:

基本類型:

bool:布爾值,true false,對應 Java boolean

byte8 位有符號整數,對應 Java byte

i1616 位有符號整數,對應 Java short

i3232 位有符號整數,對應 Java int

i6464 位有符號整數,對應 Java long

double64 位浮點數,對應 Java double

string:未知編碼文本或二進制字符串,對應 Java String

結構體類型:

struct:定義公共的對象,相似於 C 語言中的結構體定義,在 Java 中是一個 JavaBean

容器類型:

list:對應 Java ArrayList

set:對應 Java HashSet

map:對應 Java HashMap

異常類型:

exception:對應 Java Exception

服務類型:

service:對應服務的類

協議

Thrift可讓用戶選擇客戶端與服務端之間傳輸通訊協議的類別,在傳輸協議上整體劃分爲文本 (text) 和二進制 (binary) 傳輸協議,爲節約帶寬,提升傳輸效率,通常狀況下使用二進制類型的傳輸協議爲多數,有時還會使用基於文本類型的協議,這須要根據項目/產品中的實際需求。經常使用協議有如下幾種:

TBinaryProtocol  二進制編碼格式進行數據傳輸

TCompactProtocol  高效率的、密集的二進制編碼格式進行數據傳輸

TJSONProtocol  使用 JSON 的數據編碼協議進行數據傳輸

TSimpleJSONProtocol  只提供 JSON 只寫的協議,適用於經過腳本語言解析

TDebugProtocol – 使用易懂的可讀的文本格式,以便於debug

傳輸層

經常使用的傳輸層有如下幾種:

TServerTransport  使用阻塞式 I/O 進行傳輸,是最多見的模式

TFramedTransport  使用非阻塞方式,按塊的大小進行傳輸

若使用 TFramedTransport 傳輸層,其服務器必須修改成非阻塞的服務類型

TFileTransport – 以文件形式進行傳輸

TNonblockingTransport  使用非阻塞方式,用於構建異步客戶端

TMemoryTransport  將內存用於I/O

TZlibTransport  使用zlib進行壓縮,與其餘傳輸方式聯合使用。

服務端類型

常見的服務端類型有如下幾種:

TSimpleServer  單線程服務器端使用標準的阻塞式 I/O

TThreadPoolServer  多線程服務器端使用標準的阻塞式 I/O

TNonblockingServer  多線程服務器端使用非阻塞式 I/O(需使用TFramedTransport數據傳輸方式)

一個可擴展的分佈式rpc調用框架

Client負責作負載均衡和容災,通常狀況下使用random來選擇proxy就能夠了。某個proxy鏈接不上的話,由客戶端自動另外選擇一個。

Proxy部署能夠比較靈活,能夠在某一類service前面單獨部署proxy,也能夠在多個類別的service前面部署proxy,通常根據service被調用的頻率或熱點狀況來調整。

Service的負載均衡能夠由proxy來負責,service定時上報自身負載和運行狀況,proxy根據必定的策略來進行調度;或proxy也能夠採用第三方負載均衡組件來分發對service的調用,好比騰訊的L5等。

clip_image008

總結

thrift相似的開源RPC框架還有googleprotocal buffer,它雖然支持的語言比較少,但效率更高,於是受到愈來愈多的關注。

因爲thrift開源時間很早,經受了時間的驗證,於是許多系統更願意採用thrift,如HadoopCassandra等。

附:thriftprotocal buffer比較

 

clip_image009

從上面的比較能夠看出,thrift勝在豐富的特性上,而protocal buffer勝在文檔化很是好上。在具體實現上,它們很是相似,都是使用惟一整數標記字段域,這就使得增長和刪除字段與不會破壞已有的代碼。

它們的最大區別是thrift支持完整的client/server RPC框架,而protocal buffer只會產生接口,具體實現,還須要用戶作大量工做。

另外,從序列化性能上比較,Protocal Buffer要遠遠優於thrift,具體可參考:http://www.ibm.com/developerworks/cn/linux/l-cn-gpb/?ca=drs-tp4608

相關文章
相關標籤/搜索