假設讀者對thrift有必定了解。ios
客戶端有時須要非阻塞的去發送請求,給定服務端一個請求,要求其返回一個計算結果。可是客戶端不想等待服務端處理完,而是想發送完這個指令後本身去作其餘事情,當結果返回時自動的去處理。apache
好比舉個形象點的例子:飯店的Boss讓小弟A把本週店裏的欠條收集起來放到本身桌子上,而後又告訴本身的小祕書坐在本身辦公室等着小弟A把欠條拿過來,而後統計一下一共有多少,而後Boss本身出去半點事兒。數組
Boss至關於client,小弟A至關於server,而小祕書至關於client端的回調函數(callback)。怎麼講呢?Boss不想等待小弟處理完,由於他老人家公務繁忙,還要去幹別的呢。因而他把接下來處理欠條的任務託管給了小祕書,因而本身一我的出去了。服務器
OK,那麼咱們基本瞭解了整個工做流程,來看看實現的方法。thrift去實現client異步+回調的方法關鍵點在於:thrift生成的client中有個send_XXX()和recv_XXX()方法。send_XXX()至關於告知server去處理東西,能夠當即返回;而調用recv_XXX就是個阻塞的方法了,直到server返回結果。因此,咱們能夠在主線程調用完send_XXX()以後,而後另開一個線程去調用send_XXX(),該線程在等到server回覆後自動調用callback方法,對結果進行一些處理(固然callback在修改client狀態時須要進行同步操做)。這樣的模式下,咱們能夠作不少事情,好比分佈式環境下的觀察者模式。固然了須要注意的一點就是,各個線程接受到結果的順序跟請求順序不必定同樣,由於server處理不通請求時間不通或者網絡環境的影響均可能致使這種情形。因此若是你對接受這些結果時不是冪等操做時須要注意一下。網絡
thrift腳本:異步
//只有一個方法,client發送一個消息,server換回一個消息 service TestServ{ string ping(1: string message), }
server端採用TNBlockingServer實現socket
1 #include "TestServ.h" 2 3 #include <iostream> 4 5 #include <thrift/protocol/TBinaryProtocol.h> 6 #include <thrift/server/TNonblockingServer.h> 7 #include <thrift/transport/TServerSocket.h> 8 #include <thrift/transport/TBufferTransports.h> 9 #include <thrift/concurrency/PosixThreadFactory.h> 10 11 using namespace std; 12 13 using namespace ::apache::thrift; 14 using namespace ::apache::thrift::protocol; 15 using namespace ::apache::thrift::transport; 16 using namespace ::apache::thrift::server; 17 using namespace ::apache::thrift::concurrency; 18 19 using boost::shared_ptr; 20 21 class TestServHandler : virtual public TestServIf { 22 public: 23 TestServHandler() { 24 // Your initialization goes here 25 } 26 27 void ping(std::string& _return, const std::string& message) { 28 _return = "hello, i am server! "; 29 sleep(3);// do something time-consuming/ 這裏咱們在server端加一些耗時的操做 30 cout<<"Request from client: "<<message<<endl; 31 } 32 33 }; 34 35 int main(int argc, char **argv) { 36 int port = 9090; 37 38 shared_ptr<TestServHandler> handler(new TestServHandler()); 39 shared_ptr<TProcessor> processor(new TestServProcessor(handler)); 40 shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory()); 41 shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(15); 42 shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory > (new PosixThreadFactory()); 43 threadManager->threadFactory(threadFactory); 44 threadManager->start(); 45 TNonblockingServer server(processor, protocolFactory, port, threadManager); 46 server.serve(); 47 return 0; 48 }
client端實現:分佈式
1 #include "TestServ.h" 2 3 #include <iostream> 4 #include <thrift/protocol/TBinaryProtocol.h> 5 #include <thrift/transport/TSocket.h> 6 #include <thrift/transport/TBufferTransports.h> 7 8 #include "test_constants.h" 9 10 using namespace std; 11 using namespace ::apache::thrift; 12 using namespace ::apache::thrift::protocol; 13 using namespace ::apache::thrift::transport; 14 using boost::shared_ptr; 15 16 class AsynTestClient; 17 void * wait_recv(void * parg ); 18 struct PARG { 19 AsynTestClient * pthis; 20 string message; 21 }; 22 23 class AsynTestClient { 24 private: 25 unsigned int d_cnt_recv;//< 客戶端接受到server響應次數的計數器. 26 27 pthread_rwlock_t m_cnt_recv;//< 計數器的讀寫鎖. 28 vector<pthread_t> m_ids; 29 30 public: 31 TestServClient * d_client; 32 void call_back(string & _return){ 33 //輸出服務器返回信息並把返回計數加1 34 cout<<"server msg: "<<_return<<endl; 35 pthread_rwlock_wrlock( &m_cnt_recv ); 36 d_cnt_recv ++; 37 pthread_rwlock_unlock( &m_cnt_recv ); 38 } 39 explicit AsynTestClient(boost::shared_ptr<TProtocol> & protocol){ 40 pthread_rwlock_init( &m_cnt_recv, NULL ); 41 d_cnt_recv = 0; 42 d_client = new TestServClient( protocol ); 43 } 44 45 ~AsynTestClient(){ 46 delete d_client; 47 pthread_rwlock_destroy( &m_cnt_recv ); 48 } 49 50 void asyn_ping( const string & message) { 51 //發送請求 52 d_client->send_ping(message); 53 //初始化每一個等待回調線程的參數 54 PARG * parg = new PARG; 55 parg->pthis = this; 56 parg->message = message; 57 //把新生成的線程id放入全局數組維護 58 pthread_t m_id; 59 m_ids.push_back(m_id); 60 //啓動線程,今後只要接受到服務器的返回結果就調用回調函數。 61 if( 0 != pthread_create( &m_id, NULL, wait_recv, reinterpret_cast< void * > (parg) ) ) { 62 return; 63 } 64 } 65 }; 66 int main(int argc, char **argv) { 67 68 boost::shared_ptr<TSocket> socket(new TSocket("localhost", 9090)); 69 boost::shared_ptr<TTransport> transport(new TFramedTransport(socket)); 70 boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport)); 71 72 //TestServClient client(protocol); 73 74 transport->open(); 75 AsynTestClient client(protocol); 76 string message = "hello, i am client! "; 77 client.asyn_ping(message); 78 79 while(true){ 80 sleep(1);//這裏至關於client去作別的事情了 81 } 82 83 transport->close(); 84 return 0; 85 } 86 void * wait_recv(void * parg ) { 87 PARG * t_parg = reinterpret_cast< PARG * >(parg);//強制轉化線程參數 88 string _return; 89 t_parg->pthis->d_client->recv_ping(_return); 90 t_parg->pthis->call_back(_return); 91 }
其實你們能夠注意到,我並無使用asyn_ping(const string & message, void(*)call_back(void));這種方式去定義它,這是由於asyn_ping自己能夠獲取callback函數的指針。回調的本質是任務的託管、時間的複用,也就是說等待結果返回後自動去調用一段代碼而已,因此本質上上面就是回調機制。若是你想使用傳函數指針的方式,也能夠實現出來。函數
注意:編譯時須要-L$(LIB_DIR) -lthrift -lthriftnb -levent。this