Thrift
提供的網絡服務模型:單線程、多線程、事件驅動,從另外一個角度劃分爲:阻塞服務模型、非阻塞服務模型。java
阻塞服務模型:TSimpleServer
、TThreadPoolServer
。算法
非阻塞服務模型:TNonblockingServer
、THsHaServer
和TThreadedSelectorServer
。數據庫
TServer
類的層次關係:apache
TServer
定義了靜態內部類Args
,Args
繼承自抽象類AbstractServerArgs
。AbstractServerArgs
採用了建造者模式,向TServer
提供各類工廠:編程
工廠屬性 | 工廠類型 | 做用 |
---|---|---|
ProcessorFactory | TProcessorFactory | 處理層工廠類,用於具體的TProcessor對象的建立 |
InputTransportFactory | TTransportFactory | 傳輸層輸入工廠類,用於具體的TTransport對象的建立 |
OutputTransportFactory | TTransportFactory | 傳輸層輸出工廠類,用於具體的TTransport對象的建立 |
InputProtocolFactory | TProtocolFactory | 協議層輸入工廠類,用於具體的TProtocol對象的建立 |
OutputProtocolFactory | TProtocolFactory | 協議層輸出工廠類,用於具體的TProtocol對象的建立 |
下面是TServer
的部分核心代碼:後端
public abstract class TServer {
public static class Args extends org.apache.thrift.server.TServer.AbstractServerArgs<org.apache.thrift.server.TServer.Args> {
public Args(TServerTransport transport) {
super(transport);
}
}
public static abstract class AbstractServerArgs<T extends org.apache.thrift.server.TServer.AbstractServerArgs<T>> {
final TServerTransport serverTransport;
TProcessorFactory processorFactory;
TTransportFactory inputTransportFactory = new TTransportFactory();
TTransportFactory outputTransportFactory = new TTransportFactory();
TProtocolFactory inputProtocolFactory = new TBinaryProtocol.Factory();
TProtocolFactory outputProtocolFactory = new TBinaryProtocol.Factory();
public AbstractServerArgs(TServerTransport transport) {
serverTransport = transport;
}
}
protected TProcessorFactory processorFactory_;
protected TServerTransport serverTransport_;
protected TTransportFactory inputTransportFactory_;
protected TTransportFactory outputTransportFactory_;
protected TProtocolFactory inputProtocolFactory_;
protected TProtocolFactory outputProtocolFactory_;
private boolean isServing;
protected TServer(org.apache.thrift.server.TServer.AbstractServerArgs args) {
processorFactory_ = args.processorFactory;
serverTransport_ = args.serverTransport;
inputTransportFactory_ = args.inputTransportFactory;
outputTransportFactory_ = args.outputTransportFactory;
inputProtocolFactory_ = args.inputProtocolFactory;
outputProtocolFactory_ = args.outputProtocolFactory;
}
public abstract void serve();
public void stop() {}
public boolean isServing() {
return isServing;
}
protected void setServing(boolean serving) {
isServing = serving;
}
}
複製代碼
TServer
的三個方法:serve()
、stop()
和isServing()
。serve()
用於啓動服務,stop()
用於關閉服務,isServing()
用於檢測服務的起停狀態。緩存
TServer
的不一樣實現類的啓動方式不同,所以serve()
定義爲抽象方法。不是全部的服務都須要優雅的退出, 所以stop()
方法沒有被定義爲抽象。服務器
TSimpleServer
的工做模式採用最簡單的阻塞IO,實現方法簡潔明瞭,便於理解,可是一次只能接收和處理一個socket
鏈接,效率比較低。它主要用於演示Thrift
的工做過程,在實際開發過程當中不多用到它。網絡
服務端:多線程
ServerSocket serverSocket = new ServerSocket(ServerConfig.SERVER_PORT);
TServerSocket serverTransport = new TServerSocket(serverSocket);
HelloWorldService.Processor processor =
new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
TBinaryProtocol.Factory protocolFactory = new TBinaryProtocol.Factory();
TSimpleServer.Args tArgs = new TSimpleServer.Args(serverTransport);
tArgs.processor(processor);
tArgs.protocolFactory(protocolFactory);
// 簡單的單線程服務模型 通常用於測試
TServer tServer = new TSimpleServer(tArgs);
System.out.println("Running Simple Server");
tServer.serve();
複製代碼
客戶端:
TTransport transport = new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT);
TProtocol protocol = new TBinaryProtocol(transport);
HelloWorldService.Client client = new HelloWorldService.Client(protocol);
transport.open();
String result = client.say("Leo");
System.out.println("Result =: " + result);
transport.close();
複製代碼
查看上述流程的源代碼,即TSimpleServer.java
中的serve()
方法以下:
serve()
方法的操做:
TServerSocket
的listen()
方法啓動鏈接監聽。TTransport
對象。TServerEventHandler
對象處理具體的業務請求。TThreadPoolServer
模式採用阻塞socket
方式工做,主線程負責阻塞式監聽是否有新socket
到來,具體的業務處理交由一個線程池來處理。
服務端:
ServerSocket serverSocket = new ServerSocket(ServerConfig.SERVER_PORT);
TServerSocket serverTransport = new TServerSocket(serverSocket);
HelloWorldService.Processor<HelloWorldService.Iface> processor =
new HelloWorldService.Processor<>(new HelloWorldServiceImpl());
TBinaryProtocol.Factory protocolFactory = new TBinaryProtocol.Factory();
TThreadPoolServer.Args ttpsArgs = new TThreadPoolServer.Args(serverTransport);
ttpsArgs.processor(processor);
ttpsArgs.protocolFactory(protocolFactory);
// 線程池服務模型 使用標準的阻塞式IO 預先建立一組線程處理請求
TServer ttpsServer = new TThreadPoolServer(ttpsArgs);
System.out.println("Running ThreadPool Server");
ttpsServer.serve();
複製代碼
客戶端:
TTransport transport = new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT);
TProtocol protocol = new TBinaryProtocol(transport);
HelloWorldService.Client client = new HelloWorldService.Client(protocol);
transport.open();
String result = client.say("ThreadPoolClient");
System.out.println("Result =: " + result);
transport.close();
複製代碼
ThreadPoolServer
解決了TSimpleServer
不支持併發和多鏈接的問題,引入了線程池。實現的模型是One Thread Per Connection
。查看上述流程的源代碼,先查看線程池的代碼片斷:
TThreadPoolServer.java
中的serve()
方法以下:
serve()
方法的操做:
TServerSocket
的listen()
方法啓動鏈接監聽。WorkerProcess
對象(WorkerProcess
實現了Runnabel
接口),並提交到線程池。WorkerProcess
的run()
方法負責業務處理,爲客戶端建立了處理器對象、輸入傳輸通道對象、輸出傳輸通道對象、輸入協議對象和輸出協議對象。TServerEventHandler
對象處理具體的業務請求。WorkerProcess
的run()
方法:
拆分了監聽線程(Accept Thread
)和處理客戶端鏈接的工做線程(Worker Thread
),數據讀取和業務處理都交給線程池處理。所以在併發量較大時新鏈接也可以被及時接受。
線程池模式比較適合服務器端能預知最多有多少個客戶端併發的狀況,這時每一個請求都能被業務線程池及時處理,性能也很是高。
線程池模式的處理能力受限於線程池的工做能力,當併發請求數大於線程池中的線程數時,新請求也只能排隊等待。
TNonblockingServer
模式也是單線程工做,可是採用NIO
的模式,藉助Channel/Selector
機制, 採用IO
事件模型來處理。
全部的socket
都被註冊到selector
中,在一個線程中經過seletor
循環監控全部的socket
。
每次selector
循環結束時,處理全部的處於就緒狀態的socket
,對於有數據到來的socket
進行數據讀取操做,對於有數據發送的socket則進行數據發送操做,對於監聽socket
則產生一個新業務socket
並將其註冊到selector
上。
注意:TNonblockingServer要求底層的傳輸通道必須使用TFramedTransport。
服務端:
TProcessor tprocessor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
TNonblockingServerSocket tnbSocketTransport = new TNonblockingServerSocket(ServerConfig.SERVER_PORT);
TNonblockingServer.Args tnbArgs = new TNonblockingServer.Args(tnbSocketTransport);
tnbArgs.processor(tprocessor);
tnbArgs.transportFactory(new TFramedTransport.Factory());
tnbArgs.protocolFactory(new TCompactProtocol.Factory());
// 使用非阻塞式IO服務端和客戶端須要指定TFramedTransport數據傳輸的方式
TServer server = new TNonblockingServer(tnbArgs);
System.out.println("Running Non-blocking Server");
server.serve();
複製代碼
客戶端:
TTransport transport = new TFramedTransport(new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT));
// 協議要和服務端一致
TProtocol protocol = new TCompactProtocol(transport);
HelloWorldService.Client client = new HelloWorldService.Client(protocol);
transport.open();
String result = client.say("NonBlockingClient");
System.out.println("Result =: " + result);
transport.close();
複製代碼
TNonblockingServer
繼承於AbstractNonblockingServer
,這裏咱們更關心基於NIO
的selector
部分的關鍵代碼。
相比於TSimpleServer
效率提高主要體如今IO
多路複用上,TNonblockingServer
採用非阻塞IO
,對accept/read/write
等IO
事件進行監控和處理,同時監控多個socket
的狀態變化。
TNonblockingServer
模式在業務處理上仍是採用單線程順序來完成。在業務處理比較複雜、耗時的時候,例如某些接口函數須要讀取數據庫執行時間較長,會致使整個服務被阻塞住,此時該模式效率也不高,由於多個調用請求任務依然是順序一個接一個執行。
鑑於TNonblockingServer
的缺點,THsHaServer
繼承於TNonblockingServer
,引入了線程池提升了任務處理的併發能力。THsHaServer
是半同步半異步(Half-Sync/Half-Async
)的處理模式,Half-Aysnc
用於IO
事件處理(Accept/Read/Write
),Half-Sync
用於業務handler
對rpc
的同步處理上。
注意:THsHaServer和TNonblockingServer同樣,要求底層的傳輸通道必須使用TFramedTransport。
服務端:
TNonblockingServerSocket tnbSocketTransport = new TNonblockingServerSocket(ServerConfig.SERVER_PORT);
TProcessor tprocessor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
// 半同步半異步
THsHaServer.Args thhsArgs = new THsHaServer.Args(tnbSocketTransport);
thhsArgs.processor(tprocessor);
thhsArgs.transportFactory(new TFramedTransport.Factory());
thhsArgs.protocolFactory(new TBinaryProtocol.Factory());
TServer server = new THsHaServer(thhsArgs);
System.out.println("Running HsHa Server");
server.serve();
複製代碼
客戶端:
TTransport transport = new TFramedTransport(new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT));
// 協議要和服務端一致
TProtocol protocol = new TBinaryProtocol(transport);
HelloWorldService.Client client = new HelloWorldService.Client(protocol);
transport.open();
String result = client.say("HsHaClient");
System.out.println("Result =: " + result);
transport.close();
複製代碼
THsHaServer
繼承於TNonblockingServer
,新增了線程池併發處理工做任務的功能,查看線程池的相關代碼:
任務線程池的建立過程:
下文的TThreadedSelectorServer囊括了THsHaServer的大部分特性,源碼分析可參考TThreadedSelectorServer。
THsHaServer
與TNonblockingServer
模式相比,THsHaServer
在完成數據讀取以後,將業務處理過程交由一個線程池來完成,主線程直接返回進行下一次循環操做,效率大大提高。
主線程仍然須要完成全部socket
的監聽接收、數據讀取和數據寫入操做。當併發請求數較大時,且發送數據量較多時,監聽socket
上新鏈接請求不能被及時接受。
TThreadedSelectorServer
是對THsHaServer
的一種擴充,它將selector
中的讀寫IO
事件(read/write
)從主線程中分離出來。同時引入worker
工做線程池,它也是種Half-Sync/Half-Async
的服務模型。
TThreadedSelectorServer
模式是目前Thrift
提供的最高級的線程服務模型,它內部有若是幾個部分構成:
AcceptThread
線程對象,專門用於處理監聽socket
上的新鏈接。SelectorThread
對象專門用於處理業務socket
的網絡I/O
讀寫操做,全部網絡數據的讀寫均是有這些線程來完成。SelectorThreadLoadBalancer
對象,主要用於AcceptThread
線程接收到一個新socket
鏈接請求時,決定將這個新鏈接請求分配給哪一個SelectorThread
線程。ExecutorService
類型的工做線程池,在SelectorThread
線程中,監聽到有業務socket
中有調用請求過來,則將請求數據讀取以後,交給ExecutorService
線程池中的線程完成這次調用的具體執行。主要用於處理每一個rpc
請求的handler
回調處理(這部分是同步的)。服務端:
TNonblockingServerSocket serverSocket = new TNonblockingServerSocket(ServerConfig.SERVER_PORT);
TProcessor processor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
// 多線程半同步半異步
TThreadedSelectorServer.Args ttssArgs = new TThreadedSelectorServer.Args(serverSocket);
ttssArgs.processor(processor);
ttssArgs.protocolFactory(new TBinaryProtocol.Factory());
// 使用非阻塞式IO時 服務端和客戶端都須要指定數據傳輸方式爲TFramedTransport
ttssArgs.transportFactory(new TFramedTransport.Factory());
// 多線程半同步半異步的服務模型
TThreadedSelectorServer server = new TThreadedSelectorServer(ttssArgs);
System.out.println("Running ThreadedSelector Server");
server.serve();
複製代碼
客戶端:
for (int i = 0; i < 10; i++) {
new Thread("Thread " + i) {
@Override
public void run() {
// 設置傳輸通道 對於非阻塞服務 須要使用TFramedTransport(用於將數據分塊發送)
for (int j = 0; j < 10; j++) {
TTransport transport = null;
try {
transport = new TFramedTransport(new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT));
TProtocol protocol = new TBinaryProtocol(transport);
HelloWorldService.Client client = new HelloWorldService.Client(protocol);
transport.open();
String result = client.say("ThreadedSelector Client");
System.out.println("Result =: " + result);
transport.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 關閉傳輸通道
transport.close();
}
}
}
}.start();
}
複製代碼
以上工做流程的三個組件AcceptThread
、SelectorThread
和ExecutorService
在源碼中的定義以下:
TThreadedSelectorServer
模式中有一個專門的線程AcceptThread
用於處理新鏈接請求,所以可以及時響應大量併發鏈接請求;另外它將網絡I/O操做分散到多個SelectorThread
線程中來完成,所以可以快速對網絡I/O
進行讀寫操做,可以很好地應對網絡I/O
較多的狀況。
TThreadedSelectorServer
默認參數定義以下:
建立、初始化並啓動AcceptThread
和SelectorThreads
,同時啓動selector
線程的負載均衡器(selectorThreads
)。
AcceptThread
繼承於Thread
,能夠看出包含三個重要的屬性:非阻塞式傳輸通道(TNonblockingServerTransport
)、NIO
選擇器(acceptSelector
)和選擇器線程負載均衡器(threadChooser
)。
查看AcceptThread
的run()
方法,能夠看出accept
線程一旦啓動,就會不停地調用select()
方法:
查看select()
方法,acceptSelector
選擇器等待IO
事件的到來,拿到SelectionKey
即檢查是否是accept
事件。若是是,經過handleAccept()
方法接收一個新來的鏈接;不然,若是是IO
讀寫事件,AcceptThread
不做任何處理,交由SelectorThread
完成。
在handleAccept()
方法中,先經過doAccept()
去拿鏈接通道,而後Selector
線程負載均衡器選擇一個Selector
線程,完成接下來的IO
讀寫事件。
接下來繼續查看doAddAccept()
方法的實現,毫無懸念,它進一步調用了SelectorThread
的addAcceptedConnection()
方法,把非阻塞傳輸通道對象傳遞給選擇器線程作進一步的IO
讀寫操做。
SelectorThreadLoadBalancer
如何建立?
SelectorThreadLoadBalancer
是一個基於輪詢算法的Selector
線程選擇器,經過線程迭代器爲新進來的鏈接順序分配SelectorThread
。
SelectorThread
和AcceptThread
同樣,是TThreadedSelectorServer
的一個成員內部類,每一個SelectorThread
線程對象內部都有一個阻塞式的隊列,用於存放該線程被接收的鏈接通道。
阻塞隊列的大小可由構造函數指定:
上面看到,在AcceptThread
的doAddAccept()
方法中調用了SelectorThread
的addAcceptedConnection()
方法。
這個方法作了兩件事:
SelectorThread
線程接收的鏈接通道放入阻塞隊列中。wakeup()
方法喚醒SelectorThread
中的NIO
選擇器selector
。既然SelectorThread
也是繼承於Thread
,查看其run()
方法的實現:
SelectorThread
方法的select()
監聽IO
事件,僅僅用於處理數據讀取和數據寫入。若是鏈接有數據可讀,讀取並以frame
的方式緩存;若是須要向鏈接中寫入數據,緩存併發送客戶端的數據。且在數據讀寫處理完成後,須要向NIO
的selector
清空和註銷自身的SelectionKey
。
rpc
調用過程也就結束了,handleWrite()
方法以下:Thrift
會利用已讀數據執行目標方法,handleRead()
方法以下:handleRead
方法在執行read()
方法,將數據讀取完成後,會調用requestInvoke()
方法調用目標方法完成具體業務處理。requestInvoke()
方法將請求數據封裝爲一個Runnable
對象,提交到工做任務線程池(ExecutorService
)進行處理。
select()
方法完成後,線程繼續運行processAcceptedConnections()
方法處理下一個鏈接的IO
事件。
這裏比較核心的幾個操做:
SelectorThread
的阻塞隊列acceptedQueue
中獲取一個鏈接的傳輸通道。若是獲取成功,調用registerAccepted()
方法;不然,進入下一次循環。registerAccepted()
方法將傳輸通道底層的鏈接註冊到NIO
的選擇器selector
上面,獲取到一個SelectionKey
。FrameBuffer
對象,並綁定到獲取的SelectionKey
上面,用於數據傳輸時的中間讀寫緩存。本文對Thrift
的各類線程服務模型進行了介紹,包括2種阻塞式服務模型:TSimpleServer
、TThreadPoolServer
,3種非阻塞式服務模型:TNonblockingServer
、THsHaServer
和TThreadedSelectorServer
。對各類服務模型的具體用法、工做流程、原理和源碼實現進行了必定程度的分析。
鑑於篇幅較長,請各位看官請慢慢批閱!
歡迎關注技術公衆號: 零壹技術棧
本賬號將持續分享後端技術乾貨,包括虛擬機基礎,多線程編程,高性能框架,異步、緩存和消息中間件,分佈式和微服務,架構學習和進階等學習資料和文章。