thrift是一個跨語言通訊的工具,支持的語言多,並且還提供服務器端的衆多網絡模型,使服務端的開發能夠只專於服務自己的邏輯。java
實際服務器端業務邏輯處理的實現類。git
TTransport主要處理服務器端和客戶端的網絡讀寫。主要的接口github
例如經常使用的客戶端是TSocket就是TTransport的一個實現apache
在服務器端當有請求來的時候,經過TServerTransport能夠建立TTransport對象,而後經過TTransport發送數據給客戶端,主要的接口服務器
例如TNonblockingTransport,TSocket等。ServerTransport能夠理解爲主要實現了accept之後要作的事情,究竟是採用何種方式(同步異步,阻塞非阻塞)把請求交給processor處理。網絡
傳輸協議,thrift客戶端和服務器端遠程調用時候傳輸數據時候採起的數據格式,例如異步
TBinaryProtocol 二進制格式ide
TCompactProtocol 高效和壓縮的二進制格式工具
TDenseProtocoal 與TCompactProtocol相比,meta信息略有不一樣this
TJSONProtocoal JSON
TDebugProtocoal text 格式 方便調試
服務器的做用就是把前面的processor,transport,protocol組合到一塊兒,協同他們的工做,例如:
TSimplerServer 接受一個鏈接,處理鏈接請求,直到客戶端關閉了鏈接,它纔回去接受一個新的鏈接。
TNonblockingServer 使用非阻塞的I/O解決了TSimpleServer一個客戶端阻塞其餘全部客戶端的問題。
THsHaServer(半同步/半異步的server)它使用一個單獨的線程來處理網絡I/O,一個獨立的worker線程池來處理消息。這樣,只要有空閒的worker線程,消息就會被當即處理,所以多條消息能被並行處理。
TThreadPoolServer 有一個專用的線程用來接受鏈接。一旦接受了一個鏈接,它就會被放入ThreadPoolExecutor中的一個worker線程裏處理。
注意:使用何種server和須要使用某種server transport,transport多是有要求的。
例如:TNonblockingServer.Args的構造方法中
public Args(org.apache.thrift.transport.TNonblockingServerTransport transport)
能夠看出使用TNonblockingServer須要使用TNonblockingServerTransport才行。
例如:使用THsHaServer時候server和client都須要設置transport爲TFramedTransport
下面編寫的全局ID生成器是https://github.com/twitter/snowflake的簡單版本,刪減了經過zookeeper保證workerid的不重複。基本原理類似,22位時間戳+10位workerid+12位毫秒級的計數器。
還一些其餘的ID生成方案,例如UUID,經過MYSQL的REPLACE INTO等,可用於分庫之後作主鍵。
idserver.thrift
namespace java demo
service IDService {
i64 getId()
}
thrift -r --gen java idserver.thrift
step 3.編寫業務邏輯實現類
idworker.java
import org.apache.thrift.TException; public class IDWorker implements IDService.Iface { private long lastTimestamp; private long timeabs = 1288834974657L; //12 bits private long sequence = 0l; //10 bits private long workerId; public IDWorker(long workerId) { this.workerId = workerId; } @Override public long getId() throws TException { long timestamp = System.currentTimeMillis(); if (timestamp < this.lastTimestamp) { return -1; } else if (timestamp == this.lastTimestamp) { if (this.sequence < 2048) { this.sequence = ++this.sequence; System.out.println(this.sequence); } else { while (timestamp <= this.lastTimestamp) { timestamp = System.currentTimeMillis(); } } } else { this.sequence = 0; } this.lastTimestamp = timestamp; return ((timestamp-timeabs) << 22) | this.workerId << 12 | this.sequence; } }
import org.apache.thrift.TProcessor; import org.apache.thrift.TProcessorFactory; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.server.THsHaServer; import org.apache.thrift.server.TServer; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TNonblockingServerSocket; import org.apache.thrift.transport.TTransportException;
public class IDServer { /** * 半同步/半異步的server。它使用一個單獨的線程來處理網絡I/O,一個獨立的worker線程池來處理消息。 */ public void startTHsHaServer() { try { TProcessor processor = new IDService.Processor<IDService.Iface>(new IDWorker(1l)); TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(8080); THsHaServer.Args args = new THsHaServer.Args(serverTransport); args.processorFactory(new TProcessorFactory(processor)); args.protocolFactory(new TBinaryProtocol.Factory()); args.transportFactory(new TFramedTransport.Factory()); args.workerThreads(2); TServer server = new THsHaServer(args); server.serve(); } catch (TTransportException e) { e.printStackTrace(); } } public static void main(String args[]) { new IDServer().startTHsHaServer(); } }
ClientDemo.java
import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; public class ClientDemo extends Thread { @Override public void run() { super.run(); this.startClient(); } public void startClient() { TTransport transport = null; try { transport = new TFramedTransport(new TSocket("localhost", 8080, 30000)); TProtocol protocol = new TBinaryProtocol(transport); IDService.Client client = new IDService.Client(protocol); transport.open(); System.out.println("service result:"+client.getId()); } catch (Exception e) { e.printStackTrace(); } finally { if (transport != null) { transport.close(); } } } public static void main(String args[]) { for (int i=0; i<1000; i++) { new ClientDemo().start(); } } }
參考資料