設計作完了,小吳要開始編碼了。若是按照「手工做坊」的思路,小吳至少須要完成以下幾個方面: (1)「客戶端向服務器端發送數據」的代碼 (2)「客戶端接收服務器端查詢結果」的代碼 (3)「服務器端接收客戶端數據」的代碼 (4)「服務器端向客戶端發送查詢結果」的代碼 (5)若是客戶端會大批量發起查詢,那可能還須要考慮改爲多線程模型或異步模型
第1步: 明確要交互的數據格式(如上例中的UserGradeInfo)和具體的方法(如上例中的Search),定義出thrift接口描述文件(英文叫作Inteface Description File); (即傳輸數據的 數據結構,與獲取待傳輸數據的 業務邏輯) 第2步: 調用thrift工具,依據thrift接口文件,生成RPC代碼; 第3步: 你的服務器端程序引用thrift生成的RPC代碼,並實現其中的Search動做的邏輯,而後啓動監聽,等待客戶端發來請求。 第4步: 客戶端一樣引入並調用RPC代碼來與服務器端通訊;
const map<string,string> MY_MAP = {'hello':'world', 'goodnight':'moon'}
include "other.thrift"
編號的做用?php
include "other.thrift"
,須要使用thrift -r ...yum install -y libevent libevent-devel libtool autoconf automake byacc flex bison pkgconfig crypto-utils
./bootstrap.sh
./configure -h
./configure --prefix=/usr/local/thrift_0_12_0 --with-cpp --with-java --without-csharp --without-go --without-python --without-erlang --without-perl --without-php --without-php_extension --without-ruby --without-haskell --without-lua --without-nodejs --without-nodets --without-dart --without-rs --without-cl --without-haxe --without-dotnetcore --without-d --enable-tests=no
./configure CPPFLAGS='-DNDEBUG'
make -j 20
sudo make install
#include "TimeService.h" #include <thrift/transport/TSocket.h> #include <thrift/transport/TBufferTransports.h> #include <thrift/protocol/TBinaryProtocol.h> #include <iostream> using namespace std; using namespace apache::thrift; using namespace apache::thrift::protocol; using namespace apache::thrift::transport; using namespace roctime; int main(int argc, char *argv[]) { boost::shared_ptr<TSocket> socket(new TSocket("localhost", 9090)); boost::shared_ptr<TTransport> transport(new TBufferedTransport(socket)); boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport)); time_t mytime = 0; TimeServiceClient client(protocol); transport->open(); mytime = client.TellMeTime(); cout << "Now is " << ctime(&mytime) << endl; transport->close(); return 0; } }
如下內容基於0.12.0版本,代碼地址:https://github.com/apache/thrift/tree/0.12.0html
NonblockingServer中,任務是由worker線程池中的線程來執行的。ThreadManager的功能包含建立、銷燬worker線程以及統計不一樣狀態的worker線程數量等。這個類還涉及到任務的調度,簡單來講主要就是有生產者操做(添加任務),消費者操做(取出並執行任務),保存任務的隊列,以及協調多個線程各類操做之間的同步關係;另外採用面向對象的方式編寫代碼,其中ThreadManager是一個抽象類,ThreadManager::Impl是其實現,而用戶直接建立的是ThreadManager::Impl的子類SimpleThreadManager的實例;對於用戶而言,workerCount_是線程池中有效的worker線程數量,workerMaxCount_則是用戶設置的線程池大小,pendingTaskCountMax_是用戶設置的任務隊列大小(任務隊列保存的是正在排隊的任務)java
NonblockingServer採用的是多個IO線程+worker線程池模型,兩種線程的數量均可以設置;主IO線程負責創建鏈接並建立TConnection對象,而後會把TConnection分配給某IO線程(round robin)監聽,IO線程使用libevent進行socket的異步讀寫(即數據的收發)node
經過setOverloadAction方法設置,這個過載指的是活躍鏈接個數超過了限制(maxConnections_)或者正在處理+等待處理的鏈接個數超過了限制(maxActiveProcessors_),具體用法以及使用場景待補充python
上面提到過,當任務隊列滿時,IO線程會阻塞在向任務隊列添加任務的操做上;thrift代碼中直接將阻塞時間設置爲0,表示一直阻塞下去;但代碼中沒有提供接口設置阻塞時間,這個用法也是等補充。
worker線程在執行完任務後,會調用TNonblockingIOThread::notify通知IO線程寫回數據,而後會阻塞等待管道(socket pair)的可寫事件,若是socket pair緩衝區滿,而且若是IO線程來不及很快的完成這些寫回數據的操做,那麼worker線程會阻塞在其run方法中,沒法從隊列中獲取取新任務執行;而同時又有許多新請求過來,須要IO線程向任務隊列中添加任務,因爲沒有worker線程取任務執行了,隊列保持最大長度,IO線程阻塞在添加任務的操做上,這樣會形成死鎖ios
經過setTaskExpireTime能夠設置一個超時時間(毫秒),從隊列中取出任務時判斷在隊列中停留超過這個時間就會被丟棄而不是被執行,這個時間爲0表示不會過時,但爲負數時,全部任務都會按過時處理git
針對過時任務,在ThreadManager::Worker::run中worker線程是不會釋放threadManager->mutex_的,並且直接調用manager_->expireCallback_,進而又調用TNonblockingIOThread::notify,而後會阻塞等待管道(socket pair)的可寫事件。
若是socket pair緩衝區已滿,IO線程又來不及很快的完成這些寫回數據的操做,而且又有許多新請求過來,須要IO線程向任務隊列中添加任務(須要對threadManager->mutex_)加鎖,則會形成死鎖github
發現任務過時後會調用manager_->expireCallback_,進而調用forceClose,server主動關閉了鏈接;在addTask中獲取鎖或者等待條件變量超時後,拋出異常,以後也會經過transition函數主動關閉鏈接web
實際使用中,須要對隊列設置一個最大長度,而且在添加任務時設置超時時間,發生添加超時後丟棄這個任務(給client返回一個empty frame),而不是直接關閉鏈接。而且對於添加任務超時和任務過時可能引發死鎖的狀況,對thrift-0.12.0版本的代碼作了一些修改:https://github.com/chenguang9239/thrift/tree/0-12-0-modifyapache
伯樂在線
Apache Thrift
Apache Thrift paper
Thrift解讀(四)——TNonblockingServer 做業調度
Thrift 的TNonblockingServer運行原理分析
thrift解讀(三)——TNonblockingServer 內的 IO線程 和 work線程之間數據流處理邏輯
Thrift線程和狀態機分析
Thrift異步IO服務器源碼分析