目錄 1git
1. 前言 1github
2. 示例Service 1apache
3. 網絡部分類圖 2json
4. 線程模式 3性能優化
4.2.2. 工做線程啓動過程 5socket
分析Thrift的結構動機是爲了實現服務端能取到客戶端的IP,所以須要對它的結構、調用流程有些瞭解。另外,請注意本文針對的是TNonblockingServer,不包含TThreadPoolServer、TThreadedServer和TSimpleServer。
thrift對網絡鏈接沒有使用內存池,最直接簡單的性能優化是綁定Google gperftools中的TCMalloc。
service EchoService { string hello(1: string greetings); }
class EchoHandler: public EchoServiceIf { private: virtual void hello(std::string& _return, const std::string& greetings); }; |
Thrift線程模型爲若干IO線程TNonblockingIOThread(負責收發TCP鏈接的數據),以及主線程(負責監聽TCP鏈接及接受鏈接請求)組成。
主線程不必定就是進程的主線程,哪一個線程調用了TServer::run()或TServer::serve()就是本文所說的主線程。就當前最新版本(0.9.2)的Thrift而言,調用TServer::run()或TServer::serve()都可以,緣由是TServer::run()除無條件的調用外TServer::serve(),沒有作任何其它事。對TServer::serve()的調用實際是對TServer的實現類TNonblockingServer的serve()的調用。
簡而言之,TNonblockingIOThread負責數據的收發,而TNonblockingServer負責接受鏈接請求。
在使用中須要注意,調用TServer::run()或TServer::serve()的線程或進程會被阻塞,阻塞進入libevent的死循環,Linux上是死循環調用epoll_wait()。
Thrift將線程分紅兩類:
IO線程負責監聽和接受鏈接請求,和接收客戶端發送過來的數據,收到完整請求後,以Task方式傳遞給工做線程,由工做線程回調。
IO線程針對TNonblockingServer,TNonblockingServer提供方法setNumIOThreads()來設置IO線程個數。第一個IO線程老是獨佔調用TServer::server()或TServer::run()的線程。
IO線程在accept一個鏈接後,會建立一個TConnection實例(在TNonblockingServer::TConnection::transition()中),而TConnection會建立一個Task(在TNonblockingServer::TConnection::transition()中完成),由TNonblockingServer將Task傳遞給ThreadManager。
糾正:上圖中的TNonblockingServer應當爲TNonblockingIOThread。
注意函數TNonblockingServer::handleEvent()的下小段代碼,getIOThreadNumber()並非表示取得IO線程個數,而是該線程在線程組中的ID,能夠這麼認爲等於0時表示0號線程:
void TNonblockingServer::handleEvent(int fd, short which) { if (clientConnection->getIOThreadNumber() == 0) { clientConnection->transition(); } else { clientConnection->notifyIOThread(); // 最終也會調用transition() } } |
工做線程負責回調和對客戶端響應。
準備的工做包括:
1) 啓動監聽鏈接
2) 啓動收發數據線程
3) 初始化運行環境
在這裏,能夠看到第一次對TServerEventHandler的回調:
從接受鏈接的時序過程能夠看出:在該鏈接TConnection接收數據以前,先調用了TServerEventHandler::createContext(),這個就是獲取客戶端IP的機會之一,可是當前的實現沒有將相關的信息做爲參數傳遞給TServerEventHandler::createContext()。
這過程當中對TServerEventHandler::processContext(connectionContext_, getTSocket())進行了回調,並傳遞了TSocket。
下面是thrift編譯生成的代碼片斷,爲服務端的代碼:
// TProtocol爲協議接口,經常使用實現類爲TBinaryProtocol等 void EchoServiceProcessor::process_hello(int32_t seqid, // 消息序列號 ::apache::thrift::protocol::TProtocol* iprot, // 輸入參數 ::apache::thrift::protocol::TProtocol* oprot, // 輸出參數 void* callContext) { // eventHandler_類型爲TProcessorEventHandler,是一個被回調對象 void* ctx = NULL; if (this->eventHandler_.get() != NULL) { ctx = this->eventHandler_->getContext("EchoService.hello", callContext); } ::apache::thrift::TProcessorContextFreer freer(this->eventHandler_.get(), ctx, "EchoService.hello");
if (this->eventHandler_.get() != NULL) { this->eventHandler_->preRead(ctx, "HelloService.hello"); // 回調TProcessorEventHandler }
EchoService_hello_args args; // 輸入參數 args.read(iprot); // 反序列化輸入參數 iprot->readMessageEnd(); uint32_t bytes = iprot->getTransport()->readEnd();
if (this->eventHandler_.get() != NULL) { this->eventHandler_->postRead(ctx, "EchoService.hello", bytes); // 回調TProcessorEventHandler }
// EchoService_hello_result是thrift編譯生成的類 EchoService_hello_result result; // 輸出參數,也就是thrift文件中定義的返回值 try { iface_->hello(result.success, args.greetings); // 這裏就是回調用戶本身寫的代碼了 result.__isset.success = true; } catch (const std::exception& e) { if (this->eventHandler_.get() != NULL) { this->eventHandler_->handlerError(ctx, "EchoService.hello"); // 回調TProcessorEventHandler }
// 下段是異常時的返回,客戶端應當catch它 ::apache::thrift::TApplicationException x(e.what()); // writeMessageBegin序列化消息頭 oprot->writeMessageBegin("hello", ::apache::thrift::protocol::T_EXCEPTION, seqid); x.write(oprot); // 將x序列化到oprot oprot->writeMessageEnd(); oprot->getTransport()->writeEnd(); oprot->getTransport()->flush(); return; }
if (this->eventHandler_.get() != NULL) { this->eventHandler_->preWrite(ctx, "EchoService.hello"); // 回調TProcessorEventHandler }
// 下段爲序列化輸出參數,也注是返回值啦 oprot->writeMessageBegin("hello", ::apache::thrift::protocol::T_REPLY, seqid); // 序列化消息頭 result.write(oprot); // 序列化result到oprot oprot->writeMessageEnd(); bytes = oprot->getTransport()->writeEnd(); oprot->getTransport()->flush();
if (this->eventHandler_.get() != NULL) { this->eventHandler_->postWrite(ctx, "EchoService.hello", bytes); // 回調TProcessorEventHandler } } |
下面是thrift編譯生成的代碼片斷,爲客戶端的代碼:
// 同步調用實現 // hello就是客戶端直接調用的 void EchoServiceClient::hello(std::string& _return, const std::string& greetings) { send_hello(greetings); // 序列化輸入參數,併發送給服務端 recv_hello(_return); // 接收服務端的返回,並反序列化 }
// 向服務端發起調用 void EchoServiceClient::send_hello(const std::string& greetings) { int32_t cseqid = 0; oprot_->writeMessageBegin("hello", ::apache::thrift::protocol::T_CALL, cseqid);
// 類EchoService_hello_pargs也是thrift編譯生成的類,全部的參數都是它的數據成員 EchoService_hello_pargs args; args.greetings = &greetings; args.write(oprot_); // 序列化
oprot_->writeMessageEnd(); oprot_->getTransport()->writeEnd(); oprot_->getTransport()->flush(); }
// 接收服務端的響應 void EchoServiceClient::recv_hello(std::string& _return) { int32_t rseqid = 0; std::string fname; // 函數名 ::apache::thrift::protocol::TMessageType mtype;
iprot_->readMessageBegin(fname, mtype, rseqid); if (mtype == ::apache::thrift::protocol::T_EXCEPTION) { ::apache::thrift::TApplicationException x; x.read(iprot_); iprot_->readMessageEnd(); iprot_->getTransport()->readEnd(); throw x; // 拋出異常 } if (mtype != ::apache::thrift::protocol::T_REPLY) { iprot_->skip(::apache::thrift::protocol::T_STRUCT); iprot_->readMessageEnd(); iprot_->getTransport()->readEnd(); } if (fname.compare("hello") != 0) { iprot_->skip(::apache::thrift::protocol::T_STRUCT); iprot_->readMessageEnd(); iprot_->getTransport()->readEnd(); } EchoService_hello_presult result; result.success = &_return; result.read(iprot_); // 反序列化 iprot_->readMessageEnd(); iprot_->getTransport()->readEnd();
if (result.__isset.success) { // _return pointer has now been filled return; } throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::MISSING_RESULT, "hello failed: unknown result"); } |
thrift編譯生成的類Ec hoServiceProcessor,實現了接口apache::thrift::TDispatchProcessor的dispatchCall()方法:
bool EchoServiceProcessor::dispatchCall(::apache::thrift::protocol::TProtocol* iprot, // 輸入參數 ::apache::thrift::protocol::TProtocol* oprot, // 輸出參數 const std::string& fname, // 被調用的函數名 int32_t seqid, // 序列號 void* callContext) { ProcessMap::iterator pfn;
// typedef void (EchoServiceProcessor::*ProcessFunction)(int32_t, // ::apache::thrift::protocol::TProtocol*, // ::apache::thrift::protocol::TProtocol*, // void*); // typedef std::map<std::string, ProcessFunction> ProcessMap; pfn = processMap_.find(fname); // 根據函數名,找到函數(ProcessMap processMap_;) if (pfn == processMap_.end()) { // 沒有找到時,拋出異常 iprot->skip(::apache::thrift::protocol::T_STRUCT); iprot->readMessageEnd(); iprot->getTransport()->readEnd(); ::apache::thrift::TApplicationException x(::apache::thrift::TApplicationException::UNKNOWN_METHOD, "Invalid method name: '"+fname+"'"); oprot->writeMessageBegin(fname, ::apache::thrift::protocol::T_EXCEPTION, seqid); x.write(oprot); oprot->writeMessageEnd(); // 序列化後,調用了Transport,而Transport調用了網絡send oprot->getTransport()->writeEnd(); oprot->getTransport()->flush(); return true; }
// 找到,則進行回調 (this->*(pfn->second))(seqid, iprot, oprot, callContext); return true; } |
TProtocol提供序列化和反序列化能力,定義了消息包的編碼和解碼協議,它的實現有如下幾種:
1) TBinaryProtocol 二進制編解碼
2) TDebugProtocol 用於調試的,可讀的文本編解碼
3) TJSONProtocol 基於json的編解碼
4) TCompactProtocol 壓縮的二進制編解碼
若是須要爲thrift增長一種數據類型,則須要修改TProtocol,增長對新數據類型的序列化和反序列化實現。
TTransport負責收發數據,能夠簡單的是對Socket的包裝,可是也支持非Socket,好比Pipe。其中TSocket爲TServerSocket使用的Transport。
對於TNonblockingServer默認使用的是輸入和輸出Transport,都是以TMemoryBuffer爲TTransport。
TProtocol自己沒有緩衝區等,它只是序列化和反序列化。然而它依賴於TTransport,經過TTransport發送數據。以TBinaryProtocol爲例:
// 序列化int16_t值 template <class Transport_> uint32_t TBinaryProtocolT<Transport_>::writeI16(const int16_t i16) { int16_t net = (int16_t)htons(i16); this->trans_->write((uint8_t*)&net, 2); // 看到沒?這裏調用的是TTransport return 2; } |
對比看下TTransport::write的實現:
// TSocket是一種TTransport void TSocket::write(const uint8_t* buf, uint32_t len) { uint32_t sent = 0;
// 從下面的實現能夠看出發送是同步的 while (sent < len) { uint32_t b = write_partial(buf + sent, len - sent); // 這裏實際調用的是系統的send() if (b == 0) { // This should only happen if the timeout set with SO_SNDTIMEO expired. // Raise an exception. throw TTransportException(TTransportException::TIMED_OUT, "send timeout expired"); } sent += b; } } |
客戶端發送數據時,會觸發libevent事件,而後調用Transport收數據。包完整後,調用Protocol反序列化,接着就調用服務端的代碼。
前半部分在IO線程中完成,後半部分在工做線程中完成。
爲取得客戶端的IP,有三個辦法:
1) 網上博文http://blog.csdn.net/hbuxiaoshe/article/details/38942869介紹的方法也是可行的,不過讓人有些糾結;
2) 修改Thrift的實現,爲TServerEventHandler::createContext()增長一個參數,將TSocket做爲參數傳遞,這樣就能夠很是輕易的取得客戶端的IP了。最簡單的修改成:
class TServerEventHandler { public: virtual void* createContext(boost::shared_ptr<TProtocol> input, boost::shared_ptr<TProtocol> output, TTransport* transport); // 對於TNonblockingServer實際傳遞爲TSocket }; |
3) 不修改Thrift的實現。
在「收發數據:執行調用」的流程中,能夠發現有對TServerEventHandler::processContext()的調用,而這裏真好將TSocket做爲第二個參數進行了傳遞,所以能夠直接利用。
TServerEventHandler::createContext()和TServerEventHandler::processContext()的不一樣在於:前者只在創建鏈接時被調用一次,然後者每個RPC調用時都會調用一次。
#ifndef MOOON_NET_THRIFT_HELPER_H #define MOOON_NET_THRIFT_HELPER_H #include <mooon/net/config.h> #include <mooon/sys/log.h> #include <mooon/utils/scoped_ptr.h> #include <arpa/inet.h> #include <boost/scoped_ptr.hpp> #include <thrift/concurrency/PosixThreadFactory.h> #include <thrift/concurrency/ThreadManager.h> #include <thrift/protocol/TBinaryProtocol.h> #include <thrift/server/TNonblockingServer.h> #include <thrift/transport/TSocketPool.h> #include <thrift/transport/TTransportException.h> NET_NAMESPACE_BEGIN
// 用來判斷thrift是否已經鏈接,包括兩種狀況: // 1.從未鏈接過,也就是還未打開過鏈接 // 2.鏈接被對端關閉了 inline bool thrift_not_connected( apache::thrift::transport::TTransportException::TTransportExceptionType type) { return (apache::thrift::transport::TTransportException::NOT_OPEN == type) || (apache::thrift::transport::TTransportException::END_OF_FILE == type); }
inline bool thrift_not_connected( apache::thrift::transport::TTransportException& ex) { apache::thrift::transport::TTransportException::TTransportExceptionType type = ex.getType(); return thrift_not_connected(type); }
// thrift客戶端輔助類 // // 使用示例: // mooon::net::CThriftClientHelper<ExampleServiceClient> client(rpc_server_ip, rpc_server_port); // try // { // client.connect(); // client->foo(); // } // catch (apache::thrift::transport::TTransportException& transport_ex) // { // MYLOG_ERROR("thrift exception: %s\n", transport_ex.what()); // } // catch (apache::thrift::transport::TApplicationException& app_ex) // { // MYLOG_ERROR("thrift exception: %s\n", app_ex.what()); // } // catch (apache::thrift::TException& tx) // { // MYLOG_ERROR("thrift exception: %s\n", tx.what()); // } // Transport除默認的TFramedTransport (TBufferTransports.h),還可選擇: // TBufferedTransport (TBufferTransports.h) // THttpTransport // TZlibTransport // TFDTransport (TSimpleFileTransport) // // Protocol除默認的apache::thrift::protocol::TBinaryProtocol,還可選擇: // TCompactProtocol // TJSONProtocol // TDebugProtocol template <class ThriftClient, class Protocol=apache::thrift::protocol::TBinaryProtocol, class Transport=apache::thrift::transport::TFramedTransport> class CThriftClientHelper { public: // host thrift服務端的IP地址 // port thrift服務端的端口號 // connect_timeout_milliseconds 鏈接thrift服務端的超時毫秒數 // receive_timeout_milliseconds 接收thrift服務端發過來的數據的超時毫秒數 // send_timeout_milliseconds 向thrift服務端發送數據時的超時毫秒數 CThriftClientHelper(const std::string &host, uint16_t port, int connect_timeout_milliseconds=2000, int receive_timeout_milliseconds=2000, int send_timeout_milliseconds=2000); ~CThriftClientHelper();
// 鏈接thrift服務端 // // 出錯時,可拋出如下幾個thrift異常: // apache::thrift::transport::TTransportException // apache::thrift::TApplicationException // apache::thrift::TException void connect();
// 斷開與thrift服務端的鏈接 // // 出錯時,可拋出如下幾個thrift異常: // apache::thrift::transport::TTransportException // apache::thrift::TApplicationException // apache::thrift::TException void close();
ThriftClient* get() { return _client.get(); } ThriftClient* get() const { return _client.get(); } ThriftClient* operator ->() { return get(); } ThriftClient* operator ->() const { return get(); }
const std::string& get_host() const { return _host; } uint16_t get_port() const { return _port; }
private: std::string _host; uint16_t _port; boost::shared_ptr<apache::thrift::transport::TSocketPool> _sock_pool; boost::shared_ptr<apache::thrift::transport::TTransport> _socket; boost::shared_ptr<apache::thrift::transport::TFramedTransport> _transport; boost::shared_ptr<apache::thrift::protocol::TProtocol> _protocol; boost::shared_ptr<ThriftClient> _client; };
//////////////////////////////////////////////////////////////////////////////// // thrift服務端輔助類 // // 使用示例: // mooon::net::CThriftServerHelper<CExampleHandler, ExampleServiceProcessor> _thrift_server; // try // { // _thrift_server.serve(listen_port); // } // catch (apache::thrift::TException& tx) // { // MYLOG_ERROR("thrift exception: %s\n", tx.what()); // } // ProtocolFactory除了默認的TBinaryProtocolFactory,還可選擇: // TCompactProtocolFactory // TJSONProtocolFactory // TDebugProtocolFactory // // Server除默認的TNonblockingServer外,還可選擇: // TSimpleServer // TThreadedServer // TThreadPoolServer template <class ThriftHandler, class ServiceProcessor, class ProtocolFactory=apache::thrift::protocol::TBinaryProtocolFactory, class Server=apache::thrift::server::TNonblockingServer> class CThriftServerHelper { public: // 啓動rpc服務,請注意該調用是同步阻塞的,因此需放最後調用 // port thrift服務端的監聽端口號 // num_threads thrift服務端開啓的線程數 // // 出錯時,可拋出如下幾個thrift異常: // apache::thrift::transport::TTransportException // apache::thrift::TApplicationException // apache::thrift::TException // 參數num_io_threads,只有當Server爲TNonblockingServer纔有效 void serve(uint16_t port, uint8_t num_worker_threads=1, uint8_t num_io_threads=1); void serve(const std::string &ip, uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads=1); void stop();
private: boost::shared_ptr<ThriftHandler> _handler; boost::shared_ptr<apache::thrift::TProcessor> _processor; boost::shared_ptr<apache::thrift::protocol::TProtocolFactory> _protocol_factory; boost::shared_ptr<apache::thrift::server::ThreadManager> _thread_manager; boost::shared_ptr<apache::thrift::concurrency::PosixThreadFactory> _thread_factory; boost::shared_ptr<apache::thrift::server::TServer> _server; };
//////////////////////////////////////////////////////////////////////////////// // 被thrift回調的寫日誌函數,由set_thrift_log_write_function()調用它 inline void write_log_function(const char* log) { MYLOG_INFO("%s", log); }
// 將thrift輸出寫入到日誌文件中 inline void set_thrift_log_write_function() { if (log != NULL) { apache::thrift::GlobalOutput.setOutputFunction(write_log_function); } }
//////////////////////////////////////////////////////////////////////////////// template <class ThriftClient, class Protocol, class Transport> CThriftClientHelper<ThriftClient, Protocol, Transport>::CThriftClientHelper( const std::string &host, uint16_t port, int connect_timeout_milliseconds, int receive_timeout_milliseconds, int send_timeout_milliseconds) : _host(host) , _port(port) { set_thrift_log_write_function();
_sock_pool.reset(new apache::thrift::transport::TSocketPool()); _sock_pool->addServer(host, (int)port); _sock_pool->setConnTimeout(connect_timeout_milliseconds); _sock_pool->setRecvTimeout(receive_timeout_milliseconds); _sock_pool->setSendTimeout(send_timeout_milliseconds);
_socket = _sock_pool; // Transport默認爲apache::thrift::transport::TFramedTransport _transport.reset(new Transport(_socket)); // Protocol默認爲apache::thrift::protocol::TBinaryProtocol _protocol.reset(new Protocol(_transport));
_client.reset(new ThriftClient(_protocol)); }
template <class ThriftClient, class Protocol, class Transport> CThriftClientHelper<ThriftClient, Protocol, Transport>::~CThriftClientHelper() { close(); }
template <class ThriftClient, class Protocol, class Transport> void CThriftClientHelper<ThriftClient, Protocol, Transport>::connect() { if (!_transport->isOpen()) { _transport->open(); } }
template <class ThriftClient, class Protocol, class Transport> void CThriftClientHelper<ThriftClient, Protocol, Transport>::close() { if (_transport->isOpen()) { _transport->close(); } }
//////////////////////////////////////////////////////////////////////////////// template <class ThriftHandler, class ServiceProcessor, class ProtocolFactory, class Server> void CThriftServerHelper<ThriftHandler, ServiceProcessor, ProtocolFactory, Server>::serve(uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads) { serve("0.0.0.0", port, num_worker_threads, num_io_threads); }
template <class ThriftHandler, class ServiceProcessor, class ProtocolFactory, class Server> void CThriftServerHelper<ThriftHandler, ServiceProcessor, ProtocolFactory, Server>::serve(const std::string &ip, uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads) { set_thrift_log_write_function();
_handler.reset(new ThriftHandler); _processor.reset(new ServiceProcessor(_handler));
// ProtocolFactory默認爲apache::thrift::protocol::TBinaryProtocolFactory _protocol_factory.reset(new ProtocolFactory()); _thread_manager = apache::thrift::server::ThreadManager::newSimpleThreadManager(num_worker_threads); _thread_factory.reset(new apache::thrift::concurrency::PosixThreadFactory());
_thread_manager->threadFactory(_thread_factory); _thread_manager->start();
// Server默認爲apache::thrift::server::TNonblockingServer Server* server = new Server(_processor, _protocol_factory, port, _thread_manager); if (sizeof(Server) == sizeof(apache::thrift::server::TNonblockingServer)) server->setNumIOThreads(num_io_threads); _server.reset(server); _server->run(); // 這裏也可直接調用serve(),但推薦run() }
template <class ThriftHandler, class ServiceProcessor, class ProtocolFactory, class Server> void CThriftServerHelper<ThriftHandler, ServiceProcessor, ProtocolFactory, Server>::stop() { _server->stop(); }
NET_NAMESPACE_END #endif // MOOON_NET_THRIFT_HELPER_H |
默認thrift日誌打屏,但其實可讓它輸出到本身的日誌文件中。這個功能經過全局對象apache::thrift::GlobalOutput來實現,在Thrift.h中聲明瞭GlobalOutput,它的定義在Thrift.cpp文件中。
類TOutput提供了方法setOutputFunction()用來設置日誌輸出函數:
class TOutput{ public: inline void setOutputFunction(void (*function)(const char *)); }; |
調用setOutputFunction()設置回調函數,便可將日誌輸出到本身的日誌文件中,遺憾的是不能自動區分日誌級別。更佳的作法是定義一個抽象接口,而後讓使用者注入接口實現,如mooon中ILogger:
https://github.com/eyjian/mooon/blob/master/common_library/include/mooon/sys/log.h。
具體作法,能夠參考:https://github.com/eyjian/mooon/blob/master/common_library/include/mooon/net/thrift_helper.h。
如何讓Thrift只在指定的IP上監聽,而不是監聽0.0.0.0?