本文首先介紹了什麼是Apache Thrift,接着介紹了Thrift的安裝部署及如何利用Thrift來實現一個簡單的RPC應用,並簡單的探究了一下Thrift的內部實現原理,最後給出一個基於Thrift的可擴展的分佈式RPC調用框架,在中小型項目中是一個常見的SOA實踐。linux
Apache Thrift是Facebook 開發的遠程服務調用框架,它採用接口描述語言(IDL)定義並建立服務,支持可擴展的跨語言服務開發,所包含的代碼生成引擎能夠在多種語言中,如 C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, Smalltalk 等建立高效的、無縫的服務,其傳輸數據採用二進制格式,相對 XML 和 JSON 體積更小,對於高併發、大數據量和多語言的環境更有優點。本文將詳細介紹 Thrift 的使用,並簡要分析Thrift的底層運行原理,最後給出一個基於Thrift的可擴展分佈式RPC框架。c++
yum -y install automake gcc gcc-c++apache
wget http://ftp.gnu.org/gnu/bison/bison-2.5.1.tar.gzbootstrap
tar -zxvf bison-2.5.1.tar.gz 服務器
cd bison-2.5.1多線程
./configure --prefix=/usr/local/併發
make;make install負載均衡
yum -y install libevent2-devel zlib-devel openssl-devel框架
wget http://sourceforge.net/projects/boost/files/boost/1.55.0/boost_1_55_0.tar.gzdom
tar -zxvf boost_1_55_0.tar.gz
cd boost_1_55_0
./bootstrap.sh
./b2 install
Wget http://www.apache.org/dyn/closer.cgi?path=/thrift/0.9.3/thrift-0.9.3.tar.gz
cd thrift-0.9.3
./configure;make;make install
Thrift代碼包(位於thrift-0.9.3/lib/cpp/src)有如下幾個目錄:
concurrency:併發和時鐘管理方面的庫
processor:Processor相關類
protocol:Protocal相關類
transport:transport相關類
server:server相關類
async:異步rpc相關類
這裏介紹一個簡單的 Thrift 實現實例,使讀者可以快速直觀地瞭解什麼是 Thrift 以及如何使用 Thrift 構建服務。
建立一個簡單的服務Log。
struct LogInfo { 1: required string name, 2: optional string content, } service LogSender { void SendLog(1:list<LogInfo> loglist); string GetLog(1:string logname); }
其中定義了服務 Log 的兩個方法,每一個方法包含一個方法名,參數列表和返回類型。每一個參數包括參數序號,參數類型以及參數名。 Thrift 是對 IDL(Interface Definition Language) 描述性語言的一種具體實現。所以,以上的服務描述文件使用 IDL 語法編寫。使用 Thrift 工具編譯 log.thrift,就會生成相應的 LogSender.cpp 文件。該文件包含了在 log.thrift 文件中描述的服務Log的接口定義以及服務調用的底層通訊細節,用於構建客戶端和服務器端的功能。
調用thrift命令生成代碼,命令爲thrift --gen <language> <Thrift filename>
[root@localhost log_thrift]# thrift -gen cpp log.thrift
[root@localhost log_thrift]# tree gen-cpp/
gen-cpp/
├── log_constants.cpp
├── log_constants.h
├── LogSender.cpp
├── LogSender.h
├── LogSender_server.skeleton.cpp
├── log_types.cpp
└── log_types.h
每一個thrift文件會產生四個文件,分別爲:${thrift_name}_constants.h,${thrift_name}_constants.cpp,${thrift_name}_types.h,${thrift_name}_types.cpp,
對於含有service的thrift文件,會額外生成兩個文件,分別爲:${service_name}.h,${service_name}.cpp
對於含有service的thrift文件,會生成一個可用的server樁:${service_name}_server.skeleton.cpp
#include "gen-cpp/LogSender.h" #include <map> #include <thrift/protocol/TBinaryProtocol.h> #include <thrift/server/TSimpleServer.h> #include <thrift/transport/TServerSocket.h> #include <thrift/transport/TBufferTransports.h> using namespace ::apache::thrift; using namespace ::apache::thrift::protocol; using namespace ::apache::thrift::transport; using namespace ::apache::thrift::server; using boost::shared_ptr; std::map<std::string, std::string> logMap; class LogSenderHandler : virtual public LogSenderIf { public: LogSenderHandler() { // Your initialization goes here } void SendLog(const std::vector<LogInfo> & loglist) { // Your implementation goes here sleep(5); time_t now = time(NULL); printf("SendLog, now = %s\n", ctime(&now)); for (size_t i = 0; i < loglist.size(); ++i) { if (logMap.find(loglist[i].name) == logMap.end()) { printf("name=[%s], content=[%s]\n", loglist[i].name.c_str(), loglist[i].content.c_str()); logMap.insert(std::make_pair(loglist[i].name, loglist[i].content)); } } } void GetLog(std::string& _return, const std::string& logname) { // Your implementation goes here std::map<std::string,std::string>::iterator iter = logMap.find(logname); if (iter != logMap.end()) { _return = iter->second; } else { _return = "Not Found!"; } } }; int main(int argc, char **argv) { int port = 9090; shared_ptr<LogSenderHandler> handler(new LogSenderHandler()); shared_ptr<TProcessor> processor(new LogSenderProcessor(handler)); shared_ptr<TServerTransport> serverTransport(new TServerSocket(port)); shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory()); shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory()); TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory); server.serve(); return 0; }
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <stdint.h> #include <string> #include "gen-cpp/log_constants.h" #include "gen-cpp/log_types.h" #include "gen-cpp/LogSender.h" #include <thrift/transport/TSocket.h> #include <thrift/transport/TBufferTransports.h> #include <thrift/protocol/TBinaryProtocol.h> using namespace std; using namespace apache::thrift; using namespace apache::thrift::protocol; using namespace apache::thrift::transport; void send_log(const std::string& strName, const std::string& strContent) { boost::shared_ptr<TSocket> socket(new TSocket("127.0.0.1", 9090)); boost::shared_ptr<TTransport> transport(new TBufferedTransport(socket)); boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport)); LogSenderClient client(protocol); try { transport->open(); vector<LogInfo> logInfos; LogInfo logInfo; logInfo.__set_name(strName); logInfo.__set_content(strContent); logInfos.push_back(logInfo); client.SendLog(logInfos); transport->close(); } catch (TException &tx) { printf("ERROR: %s\n", tx.what()); } } void get_log(const std::string& strName) { boost::shared_ptr<TSocket> socket(new TSocket("127.0.0.1", 9090)); boost::shared_ptr<TTransport> transport(new TBufferedTransport(socket)); boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport)); LogSenderClient client(protocol); try { transport->open(); std::string strResult; client.GetLog(strResult, strName); printf("GetLog: name = %s, log = %s\n", strName.c_str(), strResult.c_str()); transport->close(); } catch (TException &tx) { printf("ERROR: %s\n", tx.what()); } } int main(int argc, char** argv) { send_log("log1", "this is a example1"); get_log("log1"); get_log("log2"); return 0; }
#include "gen-cpp/LogSender.h" #include <thrift/protocol/TBinaryProtocol.h> #include <thrift/server/TNonblockingServer.h> #include <thrift/transport/TServerSocket.h> #include <thrift/transport/TBufferTransports.h> #include <thrift/concurrency/PosixThreadFactory.h> using namespace ::apache::thrift; using namespace ::apache::thrift::protocol; using namespace ::apache::thrift::transport; using namespace ::apache::thrift::server; using namespace ::apache::thrift::concurrency; using boost::shared_ptr; #define THREAD_NUM 5 std::map<std::string, std::string> logMap; class LogSenderHandler : virtual public LogSenderIf { public: LogSenderHandler() { // Your initialization goes here } void SendLog(const std::vector<LogInfo> & loglist) { // Your implementation goes here sleep(5); time_t now = time(NULL); printf("SendLog, now = %s\n", ctime(&now)); for (size_t i = 0; i < loglist.size(); ++i) { if (logMap.find(loglist[i].name) == logMap.end()) { logMap.insert(std::make_pair(loglist[i].name, loglist[i].content)); } } } void GetLog(std::string& _return, const std::string& logname) { // Your implementation goes here std::map<std::string,std::string>::iterator iter = logMap.find(logname); if (iter != logMap.end()) { _return = iter->second; } else { _return = "Not Found!"; } } }; int main(int argc, char **argv) { int port = 9090; shared_ptr<LogSenderHandler> handler(new LogSenderHandler()); shared_ptr<TProcessor> processor(new LogSenderProcessor(handler)); shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory()); shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(THREAD_NUM); shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory> (new PosixThreadFactory()); threadManager->threadFactory(threadFactory); threadManager->start(); TNonblockingServer server(processor, protocolFactory, port, threadManager); server.serve(); return 0; }
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <stdint.h> #include <string> #include "gen-cpp/log_constants.h" #include "gen-cpp/log_types.h" #include "gen-cpp/LogSender.h" #include <thrift/transport/TSocket.h> #include <thrift/transport/TBufferTransports.h> #include <thrift/protocol/TBinaryProtocol.h> using namespace std; using namespace apache::thrift; using namespace apache::thrift::protocol; using namespace apache::thrift::transport; void send_log(const std::string& strName, const std::string& strContent) { boost::shared_ptr<TSocket> socket(new TSocket("127.0.0.1", 9090)); //對接nonblockingServer時必須的,對普通server端時用boost::shared_ptr<TTransport> transport(new TBufferedTransport(socket)); boost::shared_ptr<TTransport> transport(new TFramedTransport(socket)); boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport)); LogSenderClient client(protocol); try { transport->open(); vector<LogInfo> logInfos; LogInfo logInfo; logInfo.__set_name(strName); logInfo.__set_content(strContent); logInfos.push_back(logInfo); client.SendLog(logInfos); transport->close(); } catch (TException &tx) { printf("ERROR: %s\n", tx.what()); } } int main(int argc, char** argv) { send_log("log1", "this is a example1"); return 0; }
運行阻塞式服務器,同時啓動10個client,能夠觀察到,因爲阻塞服務器sleep(5)模擬每一個調用,這裏每次調用之間都相差5秒,至關因而串行進行處理的:
運行非阻塞服務器,同時啓動10個client,因爲使用了非阻塞加線程池(這裏線程池大小爲5)模式,一樣是sleep(5)的模擬處理,這裏的處理速度和吞吐量都大大提升。
thrift文件內容可能會隨着時間變化的。若是已經存在的消息類型再也不符合設計要求,好比,新的設計要在message格式中添加一個額外字段,但你仍想使用之前的thrift文件產生的處理代碼。若是想要達到這個目的,須要:
(1)不要修改已存在域的整數編號
(2)新添加的域必須是optional的,以便格式兼容。
好比對於上面例子中的log.thrift
struct LogInfo {
1: required string name,
2: optional string content,
}
content是optional的,須要將它的__isset值設爲true,才能序列化並傳輸,不然會認爲字段不存在,不會被序列化。好比client.cpp中的代碼,若是咱們將content字段__isset設爲false,則server將不會收到content:
logInfo.__isset.content=false;
程序運行完了,咱們來看一下client.GetLog()函數的內部實現(在LogSender.cpp中)
void LogSenderClient::GetLog(std::string& _return, const std::string& logname) { send_GetLog(logname); recv_GetLog(_return); } void LogSenderClient::send_GetLog(const std::string& logname) { int32_t cseqid = 0; oprot_->writeMessageBegin("GetLog", ::apache::thrift::protocol::T_CALL, cseqid); LogSender_GetLog_pargs args; args.logname = &logname; args.write(oprot_); oprot_->writeMessageEnd(); oprot_->getTransport()->writeEnd(); oprot_->getTransport()->flush(); } void LogSenderClient::recv_GetLog(std::string& _return) { int32_t rseqid = 0; std::string fname; ::apache::thrift::protocol::TMessageType mtype; iprot_->readMessageBegin(fname, mtype, rseqid); if (mtype == ::apache::thrift::protocol::T_EXCEPTION) { ::apache::thrift::TApplicationException x; x.read(iprot_); iprot_->readMessageEnd(); iprot_->getTransport()->readEnd(); throw x; } if (mtype != ::apache::thrift::protocol::T_REPLY) { iprot_->skip(::apache::thrift::protocol::T_STRUCT); iprot_->readMessageEnd(); iprot_->getTransport()->readEnd(); } if (fname.compare("GetLog") != 0) { iprot_->skip(::apache::thrift::protocol::T_STRUCT); iprot_->readMessageEnd(); iprot_->getTransport()->readEnd(); } LogSender_GetLog_presult result; result.success = &_return; result.read(iprot_); iprot_->readMessageEnd(); iprot_->getTransport()->readEnd(); if (result.__isset.success) { // _return pointer has now been filled return; } throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::MISSING_RESULT, "GetLog failed: unknown result"); }
閱讀上面的代碼,能夠看出,RPC函數GetLog()實際上被轉化成了兩個函數:send_GetLog和recv_GetLog,分別用於發送數據和接收結果。數據是以消息的形式表示的,消息頭部是RPC函數名,消息內容是RPC函數的參數。
Thrift其實是實現了C/S模式,經過代碼生成工具將接口定義文件生成服務器端和客戶端代碼(能夠爲不一樣語言),從而實現服務端和客戶端跨語言的支持。用戶在Thirft描述文件中聲明本身的服務,這些服務通過編譯後會生成相應語言的代碼文件,而後用戶實現服務(客戶端調用服務,服務端提供服務)。其中protocol(協議層, 定義數據傳輸格式,能夠爲二進制或者XML等)和transport(傳輸層,定義數據傳輸方式,能夠爲TCP/IP傳輸,內存共享或者文件共享等)被用做運行時庫。
Thrift 腳本可定義的數據類型包括如下幾種類型:
基本類型:
bool:布爾值,true 或 false,對應 Java 的 boolean
byte:8 位有符號整數,對應 Java 的 byte
i16:16 位有符號整數,對應 Java 的 short
i32:32 位有符號整數,對應 Java 的 int
i64:64 位有符號整數,對應 Java 的 long
double:64 位浮點數,對應 Java 的 double
string:未知編碼文本或二進制字符串,對應 Java 的 String
結構體類型:
struct:定義公共的對象,相似於 C 語言中的結構體定義,在 Java 中是一個 JavaBean
容器類型:
list:對應 Java 的 ArrayList
set:對應 Java 的 HashSet
map:對應 Java 的 HashMap
異常類型:
exception:對應 Java 的 Exception
服務類型:
service:對應服務的類
Thrift可讓用戶選擇客戶端與服務端之間傳輸通訊協議的類別,在傳輸協議上整體劃分爲文本 (text) 和二進制 (binary) 傳輸協議,爲節約帶寬,提升傳輸效率,通常狀況下使用二進制類型的傳輸協議爲多數,有時還會使用基於文本類型的協議,這須要根據項目/產品中的實際需求。經常使用協議有如下幾種:
TBinaryProtocol 二進制編碼格式進行數據傳輸
TCompactProtocol 高效率的、密集的二進制編碼格式進行數據傳輸
TJSONProtocol 使用 JSON 的數據編碼協議進行數據傳輸
TSimpleJSONProtocol 只提供 JSON 只寫的協議,適用於經過腳本語言解析
TDebugProtocol – 使用易懂的可讀的文本格式,以便於debug
經常使用的傳輸層有如下幾種:
TServerTransport 使用阻塞式 I/O 進行傳輸,是最多見的模式
TFramedTransport 使用非阻塞方式,按塊的大小進行傳輸
若使用 TFramedTransport 傳輸層,其服務器必須修改成非阻塞的服務類型
TFileTransport – 以文件形式進行傳輸
TNonblockingTransport 使用非阻塞方式,用於構建異步客戶端
TMemoryTransport 將內存用於I/O
TZlibTransport 使用zlib進行壓縮,與其餘傳輸方式聯合使用。
常見的服務端類型有如下幾種:
TSimpleServer 單線程服務器端使用標準的阻塞式 I/O
TThreadPoolServer 多線程服務器端使用標準的阻塞式 I/O
TNonblockingServer 多線程服務器端使用非阻塞式 I/O(需使用TFramedTransport數據傳輸方式)
Client負責作負載均衡和容災,通常狀況下使用random來選擇proxy就能夠了。某個proxy鏈接不上的話,由客戶端自動另外選擇一個。
Proxy部署能夠比較靈活,能夠在某一類service前面單獨部署proxy,也能夠在多個類別的service前面部署proxy,通常根據service被調用的頻率或熱點狀況來調整。
Service的負載均衡能夠由proxy來負責,service定時上報自身負載和運行狀況,proxy根據必定的策略來進行調度;或proxy也能夠採用第三方負載均衡組件來分發對service的調用,好比騰訊的L5等。
與thrift相似的開源RPC框架還有google的protocal buffer,它雖然支持的語言比較少,但效率更高,於是受到愈來愈多的關注。
因爲thrift開源時間很早,經受了時間的驗證,於是許多系統更願意採用thrift,如Hadoop,Cassandra等。
附:thrift與protocal buffer比較
從上面的比較能夠看出,thrift勝在「豐富的特性「上,而protocal buffer勝在「文檔化」很是好上。在具體實現上,它們很是相似,都是使用惟一整數標記字段域,這就使得增長和刪除字段與不會破壞已有的代碼。
它們的最大區別是thrift支持完整的client/server RPC框架,而protocal buffer只會產生接口,具體實現,還須要用戶作大量工做。
另外,從序列化性能上比較,Protocal Buffer要遠遠優於thrift,具體可參考:http://www.ibm.com/developerworks/cn/linux/l-cn-gpb/?ca=drs-tp4608