thrift做爲脫胎於facebook的rpc框架,各方面都很是優秀。清晰的分層設計,多語言的支持,以及不輸protocolbuffer的效率(compact下優於protocolbuffer),都讓thrift擁有愈來愈多的使用者。apache
做爲一個RPC框架,thrift支持的是open->client--rpc-->server->close的短鏈接模式。在實際應用中,卻常常會有客戶端創建鏈接後,等待服務端數據的長鏈接模式,也能夠稱爲雙向鏈接。一般的方案有三種,可參考http://dongxicheng.org/search-engine/thrift-bidirectional-async-rpc/,文中提到第三種方法會修改源碼,而實際操做過程當中發現這實際上是做者小小的理解錯誤,實現thrift雙向通訊並無這麼複雜,通過一番實驗,發現只須要以下理解和實現便可輕鬆實現一個thrift的雙向鏈接。服務器
搞定以上三步,便可實現一個thrift雙向鏈接,這裏附上實驗代碼,客戶端使用C#(sorry for my pool C#),服務端使用C++框架
thriftasync
service HandshakeService{ oneway void HandShake(); } service CallbackService{ oneway void Push(1: string msg); }
client函數
using System; using System.Collections.Generic; using System.Linq; using System.Text; using Thrift.Collections; using Thrift.Protocol; using Thrift.Server; using Thrift.Transport; using System.Threading; using Thrift; using System.IO; namespace ThriftBidirection { class Program { class CallbackServiceImply : CallbackService.Iface { int msgCount = 0; public void Push(string msg) { Console.WriteLine("receive msg {0}: {1}", msgCount++, msg); } } //服務處理線程 static void ProcessThread(TProtocol protocol) { TProcessor processor = new CallbackService.Processor(new CallbackServiceImply()); while (true) { try { ////////////////////////////////////////////////////////////////////////// ///模仿server行爲,同時重用client端protocol ///至關於同時重用一個鏈接 while (processor.Process(protocol, protocol)) { }; ///connection lost, return return; } catch (IOException) //not fatal error, resume { continue; } catch (TException) //fatal error { return; } } } //服務器狀態監聽線程 static void MonitorThread(TTransport trans, Action<string> callback) { while (true) { try { if (!trans.Peek()) { callback("鏈接中斷"); } Thread.Sleep(3000); } catch (Thrift.TException ex) { callback(ex.Message); return; } } } static void Main(string[] args) { TTransport transport = new TBufferedTransport(new TSocket("localhost", 5555)); TProtocol protocol = new TBinaryProtocol(transport); HandshakeService.Client client = new HandshakeService.Client(protocol); Action<TProtocol> processAction = new Action<TProtocol>(ProcessThread); Action<TTransport, Action<string>> monitorAction = new Action<TTransport, Action<string>>(MonitorThread); transport.Open(); processAction.BeginInvoke(protocol, (result) => { processAction.EndInvoke(result); }, null); monitorAction.BeginInvoke(transport, (msg) => { Console.WriteLine("鏈接中斷: {0}", msg); }, (result) => { }, null); for (int i = 0; i < 100; ++i) { client.HandShake(); Thread.Sleep(10); } Console.Read(); transport.Close(); } } }
serverui
// This autogenerated skeleton file illustrates how to build a server. // You should copy it to another filename to avoid overwriting it. #include "HandshakeService.h" #include <thrift/protocol/TBinaryProtocol.h> #include <thrift/server/TSimpleServer.h> #include <thrift/transport/TServerSocket.h> #include <thrift/transport/TBufferTransports.h> #include <boost/make_shared.hpp> #include <thrift/server/TThreadPoolServer.h> #include <thrift/concurrency/PlatformThreadFactory.h> #include "CallbackService.h" using namespace ::apache::thrift; using namespace ::apache::thrift::protocol; using namespace ::apache::thrift::transport; using namespace ::apache::thrift::server; using namespace apache::thrift::concurrency; using boost::make_shared; using boost::shared_ptr; class HandshakeServiceHandler : virtual public HandshakeServiceIf { public: HandshakeServiceHandler(const boost::shared_ptr<TTransport> &trans) : m_client(make_shared<TBinaryProtocol>(trans)) { boost::once_flag flag = BOOST_ONCE_INIT; m_flag = flag; } virtual ~HandshakeServiceHandler() { m_thread->interrupt(); m_thread->join(); } void CallbackThread() { while(true) { try { m_client.Push("server push msg"); } catch (TException) { return; } boost::this_thread::sleep_for(boost::chrono::milliseconds(20)); } } void HandShake() { // Your implementation goes here printf("HandShake\n"); boost::call_once(boost::bind(&HandshakeServiceHandler::_StartThread, this), m_flag); } void _StartThread() { m_thread.reset(new boost::thread(boost::bind(&HandshakeServiceHandler::CallbackThread, this))); } boost::shared_ptr<TTransport> m_trans; CallbackServiceClient m_client; shared_ptr<boost::thread> m_thread; boost::once_flag m_flag; }; class ProcessorFactoryImply : public TProcessorFactory { virtual boost::shared_ptr<TProcessor> getProcessor( const TConnectionInfo& connInfo) { return make_shared<HandshakeServiceProcessor>(make_shared<HandshakeServiceHandler>(connInfo.transport)); } }; int main(int argc, char **argv) { int port = 5555; shared_ptr<TProcessorFactory> processorFactory(new ProcessorFactoryImply()); shared_ptr<TServerTransport> serverTransport(new TServerSocket(port)); shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory()); shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory()); shared_ptr<ThreadManager> threadMgr = ThreadManager::newSimpleThreadManager(30); boost::shared_ptr<PlatformThreadFactory> threadFactory = boost::shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory()); threadMgr->threadFactory(threadFactory); threadMgr->start(); TThreadPoolServer server(processorFactory,serverTransport, transportFactory, protocolFactory, threadMgr); server.serve(); return 0; }
一個簡單的thrift雙向通訊就實現了。this