IDL文件以下,詳細的IDL語法參考官方文檔http://thrift.apache.org/docs/idl。java
經過代碼生成工具獲得兩個文件:HelloService.java和ResultCommon.java。apache
namespace java com.mytest.thrift struct ResultCommon{ 1:i32 resultCode, 2:string desc } service HelloService{ ResultCommon sayHello(1:string paramJson) }
Thrift業務HelloService.Iface接口的實現以下緩存
public class HelloHandler implements HelloService.Iface { private Logger logger = LoggerFactory.getLogger(HelloHandler.class); @Override public ResultCommon sayHello(String paramJson) throws TException { logger.info("receive request param : {}", paramJson); ResultCommon response = new ResultCommon(); response.setDesc("Hello World!"); return response; } }
Thrift RPC服務端實現服務器
public class RpcServer { public static void main(String[] args) throws TTransportException { //基於阻塞式同步IO模型 TServerSocket tServerSocket = new TServerSocket(8090); HelloService.Processor<Iface> processor = new HelloService.Processor<HelloService.Iface>(new HelloHandler()); Args args1 = new Args(tServerSocket); args1.processor(processor); //消息格式使用二進制 args1.protocolFactory(new TBinaryProtocol.Factory()); //線程池的最大、最小線程數 args1.maxWorkerThreads(10); args1.minWorkerThreads(1); //啓動服務 TThreadPoolServer server = new TThreadPoolServer(args1); //在此處阻塞 server.serve(); } }
Thrift RPC客戶端實現網絡
public class RpcClient { public static void main(String[] args) throws TException { TSocket tSocket = new TSocket("127.0.0.1", 8090); tSocket.open(); TProtocol tProtocol = new TBinaryProtocol(tSocket); HelloService.Client client = new HelloService.Client(tProtocol); String paramJson = "{\"wewe\":\"111\"}"; ResultCommon resultCommon = client.sayHello(paramJson); System.out.println(resultCommon.getDesc()); tSocket.close(); } }
注意點:1)Thrift客戶端和服務端使用的I/O模型必須一致,上例中都是使用阻塞式同步I/O模型。多線程
2)Thrift客戶端和服務端使用的消息格式必須一致,上例中都是使用二進制流格式TBinaryProtocol。架構
Thrift協議棧以下圖所示: 併發
底層I/O模塊:負責實際的數據傳輸,能夠是Socket、文件、壓縮數據流等;app
TTransport:定義了消息怎樣在Client和Server之間進行通訊的,負責以字節流的方式發送和接收消息。TTransport不一樣的子類負責Thrift字節流(Byte Stream)數據在不一樣的IO模塊上的傳輸,如:TSocket負責Socket傳輸,TFileTransport負責文件傳輸;負載均衡
TProtocol:定義了消息時怎樣進行序列化的,即負責結構化數據(如對象、結構體等)與字節流消息的轉換,對Client側是將結構化數據組裝成字節流消息,對Server端則是從字節流消息中提取結構化數據。TProtocol不一樣的子類對應不一樣的消息格式轉換,如TBinaryProtocol對應字節流。
TServer:負責接收客戶端請求,並將請求轉發給Processor。TServer各個子類實現機制不一樣,性能也差距很大。
Processor:負責處理客戶端請求並返回響應,包括RPC請求轉發、參數解析、調用用戶定義的代碼等。Processor的代碼時Thrift根據IDL文件自動生成的,用戶只需根據自動生成的接口進行業務邏輯的實現就能夠,Processor是Thrift框架轉入用戶邏輯的關鍵。
ServiceClient:負責客戶端發送RPC請求,和Processor同樣,該部分的代碼也是由Thrift根據IDL文件自動生成的。
主要負責接收並轉發Client的請求。TServer的類結構圖以下:
Thrift提供了多種TServer的實現,不一樣的TServer使用了不一樣的模型,適用的狀況也有所不一樣。
TSimpleServer:阻塞I/O單線程Server,主要用於測試;
TThreadPoolServer:阻塞I/O多線程Server,多線程使用Java併發包中的線程池ThreadPoolExecutor。
AbstractNonblockingServer:抽象類,爲非阻塞I/O Server類提供共同的方法和類。
TNonblockingServer:多路複用I/O單線程Server,依賴於TFramedTransport;
THsHaServer:半同步/半異步Server,多線程處理業務邏輯調用,一樣依賴於TFramedTransport;
TThreadedSelectorServer:半同步/半異步Server,依賴於TFramedTransport。
下面詳細分析一下各個TServer的實現原理
TSimpleServer每次只能處理一個鏈接,直到客戶端關閉了鏈接,它纔回去接受一個新的鏈接,正由於它只在一個單獨的線程中以阻塞I/O的方式完成這些工做,因此它只能服務一個客戶端鏈接,其餘全部客戶端在被服務器端接受以前都只能等待。TSimpleServer的效率很低,不能用在生產環境。經過源碼具體分析實現機制。
public void serve() { stopped_ = false; try { //啓動監聽Socket serverTransport_.listen(); } catch (TTransportException ttx) { LOGGER.error("Error occurred during listening.", ttx); return; } setServing(true); //置狀態爲正在服務 //一次只能處理一個Socket鏈接 while (!stopped_) { TTransport client = null; TProcessor processor = null; TTransport inputTransport = null; TTransport outputTransport = null; TProtocol inputProtocol = null; TProtocol outputProtocol = null; try { client = serverTransport_.accept(); //接收鏈接請求,若沒有則一直阻塞 if (client != null) { processor = processorFactory_.getProcessor(client); inputTransport = inputTransportFactory_.getTransport(client); outputTransport = outputTransportFactory_.getTransport(client); inputProtocol = inputProtocolFactory_.getProtocol(inputTransport); outputProtocol = outputProtocolFactory_.getProtocol(outputTransport); //處理該請求直到成功 while (processor.process(inputProtocol, outputProtocol)) {} } } catch (TTransportException ttx) { // Client died, just move on } catch (TException tx) { if (!stopped_) { LOGGER.error("Thrift error occurred during processing of message.", tx); } } catch (Exception x) { if (!stopped_) { LOGGER.error("Error occurred during processing of message.", x); } } if (inputTransport != null) { inputTransport.close(); } if (outputTransport != null) { outputTransport.close(); } } setServing(false); }
由源代碼能夠分析出,TSimpleServer的處理流程以下:
TThreadPoolServer也是基於阻塞I/O模型,與TSimpleServer不一樣的是,它使用線程池來提升效率。
TThreadPoolServer的構造函數以下,使用了JDK併發包提供的線程池ThreadPoolExecutor,可配置最大線程數(默認爲Integer.Max)和最小線程數(默認5),線程池的阻塞隊列使用的是SynchronousQueue,每一個put操做必須等待一個take操做,若是不知足條件,put操做和take操做將會被阻塞。
// Executor service for handling client connections private ExecutorService executorService_; //關閉Server時的最長等待時間 private final TimeUnit stopTimeoutUnit; private final long stopTimeoutVal; public TThreadPoolServer(Args args) { super(args); //同步阻塞隊列,每一個put操做必須等待一個take操做,沒有容量,經常使用於線程間交換單一元素 SynchronousQueue<Runnable> executorQueue = new SynchronousQueue<Runnable>(); stopTimeoutUnit = args.stopTimeoutUnit; stopTimeoutVal = args.stopTimeoutVal; //初始化線程池 executorService_ = new ThreadPoolExecutor(args.minWorkerThreads, args.maxWorkerThreads, 60, TimeUnit.SECONDS, executorQueue); }
再看一下TThreadPoolServer的serve()方法,主線程專門用來接受鏈接,一旦接收了一個鏈接,該Client鏈接會被放入ThreadPoolExecutor中的一個worker線程裏處理,主線程繼續接收下一個Client鏈接請求。因爲線程池的阻塞隊列使用的是SynchronousQueue,因此TThreadPoolServer可以支撐的最大Client鏈接數爲線程池的線程數,也就是說每一個Client鏈接都會佔用一個線程。須要注意的是,當併發的Client鏈接數很大時,Server端的線程數會很大,可能會引起Server端的性能問題。
public void serve() { try { //啓動監聽Socket serverTransport_.listen(); } catch (TTransportException ttx) { LOGGER.error("Error occurred during listening.", ttx); return; } stopped_ = false; setServing(true); //若是Server沒有被中止,就一直循環 while (!stopped_) { int failureCount = 0; try { //阻塞方式接收Client鏈接請求,每收到一個Client鏈接請求就新建一個Worker,放入線程池處理該鏈接的業務 TTransport client = serverTransport_.accept(); WorkerProcess wp = new WorkerProcess(client); executorService_.execute(wp); } catch (TTransportException ttx) { if (!stopped_) { ++failureCount; LOGGER.warn("Transport error occurred during acceptance of message.", ttx); } } } //Server中止,關閉線程池 executorService_.shutdown(); // Loop until awaitTermination finally does return without a interrupted // exception. If we don't do this, then we'll shut down prematurely. We want // to let the executorService clear it's task queue, closing client sockets // appropriately. //在timeoutMS時間內,循環直到完成調用awaitTermination方法。防止過早的關閉線程池,關閉遺留的client sockets。 long timeoutMS = stopTimeoutUnit.toMillis(stopTimeoutVal); long now = System.currentTimeMillis(); while (timeoutMS >= 0) { try { //awaitTermination方法調用會被阻塞,直到全部任務執行完畢而且shutdown請求被調用,或者參數中定義的timeout時間到達或者當前線程被中斷 executorService_.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS); break; } catch (InterruptedException ix) { //若是發生中斷異常,繼續循環 long newnow = System.currentTimeMillis(); timeoutMS -= (newnow - now); now = newnow; } } setServing(false); }
最後看一下WorkerProcess類。WorkerProcess是TThreadPoolServer的內部類。每一個WorkerProcess線程被綁定到特定的客戶端鏈接上,處理該鏈接上的請求,直到它關閉,一旦鏈接關閉,該worker線程就又回到了線程池中。
private class WorkerProcess implements Runnable { private TTransport client_; private WorkerProcess(TTransport client) { client_ = client; } public void run() { TProcessor processor = null; TTransport inputTransport = null; TTransport outputTransport = null; TProtocol inputProtocol = null; TProtocol outputProtocol = null; try { processor = processorFactory_.getProcessor(client_); inputTransport = inputTransportFactory_.getTransport(client_); outputTransport = outputTransportFactory_.getTransport(client_); inputProtocol = inputProtocolFactory_.getProtocol(inputTransport); outputProtocol = outputProtocolFactory_.getProtocol(outputTransport); // we check stopped_ first to make sure we're not supposed to be shutting // down. this is necessary for graceful shutdown. //循環處理該Client鏈接的請求,除非Server關閉或鏈接異常不然一直循環 while (!stopped_ && processor.process(inputProtocol, outputProtocol)) {} } catch (TTransportException ttx) { // Assume the client died and continue silently } catch (TException tx) { LOGGER.error("Thrift error occurred during processing of message.", tx); } catch (Exception x) { LOGGER.error("Error occurred during processing of message.", x); } //關閉inputTransport和outputTransport if (inputTransport != null) { inputTransport.close(); } if (outputTransport != null) { outputTransport.close(); } } }
用流程圖表示TThreadPoolServer的處理流程以下:
AbstractNonblockingServer類是非阻塞I/O TServer的父類,提供了公用的方法和類。先經過源碼瞭解它的實現機制。啓動服務的大體流程爲 startThreads() -> startListening() -> setServing(true) -> waitForShutdown(),具體內容依賴於AbstractNonblockingServer子類的具體實現。基於Java NIO(多路複用I/O模型)實現。
public abstract class AbstractNonblockingServer extends TServer { protected final Logger LOGGER = LoggerFactory.getLogger(getClass().getName()); public static abstract class AbstractNonblockingServerArgs<T extends AbstractNonblockingServerArgs<T>> extends AbstractServerArgs<T> { //讀緩衝區的最大字節數 public long maxReadBufferBytes = Long.MAX_VALUE; //設置父類inputTransportFactory_、outputTransportFactory_對象 public AbstractNonblockingServerArgs(TNonblockingServerTransport transport) { super(transport); transportFactory(new TFramedTransport.Factory()); } } private final long MAX_READ_BUFFER_BYTES; //已分配讀緩存字節數 private final AtomicLong readBufferBytesAllocated = new AtomicLong(0); public AbstractNonblockingServer(AbstractNonblockingServerArgs args) { super(args); MAX_READ_BUFFER_BYTES = args.maxReadBufferBytes; } /** * Begin accepting connections and processing invocations. */ public void serve() { // start any IO threads 啓動IO線程 if (!startThreads()) { return; } // start listening, or exit 開啓監聽端口,接收Client請求 if (!startListening()) { return; } setServing(true); //置狀態爲服務中 // this will block while we serve waitForShutdown(); //啓動服務後的阻塞方法,Server中止後會解除阻塞 setServing(false); //置狀態爲服務結束 // do a little cleanup stopListening(); //中止監聽端口 } /** * Starts any threads required for serving. * * @return true if everything went ok, false if threads could not be started. */ protected abstract boolean startThreads();//啓動IO線程,由子類實現 /** * A method that will block until when threads handling the serving have been * shut down. */ protected abstract void waitForShutdown();//啓動服務後的阻塞方法,Server中止後會解除阻塞,由子類實現 //開啓監聽端口 protected boolean startListening() { try { serverTransport_.listen(); return true; } catch (TTransportException ttx) { LOGGER.error("Failed to start listening on server socket!", ttx); return false; } } //中止監聽端口 protected void stopListening() { serverTransport_.close(); } /** * Perform an invocation. This method could behave several different ways - * invoke immediately inline, queue for separate execution, etc. * * @return true if invocation was successfully requested, which is not a * guarantee that invocation has completed. False if the request * failed. */ protected abstract boolean requestInvoke(FrameBuffer frameBuffer);//對frameBuffer執行業務邏輯,由子類實現 }
AbstractNonblockingServer的內部類 FrameBuffer是非阻塞I/O TServer實現讀寫數據的核心類。FrameBuffer類存在多種狀態,不一樣的狀態表現出不一樣的行爲,先看一下FrameBufferState枚舉類。
private enum FrameBufferState { // in the midst of reading the frame size off the wire 讀取FrameSize的狀態 READING_FRAME_SIZE, // reading the actual frame data now, but not all the way done yet 讀取真實數據的狀態 READING_FRAME, // completely read the frame, so an invocation can now happen 完成讀取數據,調用業務處理方法 READ_FRAME_COMPLETE, // waiting to get switched to listening for write events 完成業務調用,等待被轉換爲監聽寫事件 AWAITING_REGISTER_WRITE, // started writing response data, not fully complete yet 寫response數據狀態 WRITING, // another thread wants this framebuffer to go back to reading //完成寫response數據,等待另外一個線程註冊爲讀事件,註冊成功後變爲READING_FRAME_SIZE狀態 AWAITING_REGISTER_READ, // we want our transport and selection key invalidated in the selector // thread 上面任一種狀態執行異常時處於該狀態,selector輪詢時會關閉該鏈接 AWAITING_CLOSE }
若是Client須要返回結果,FrameBuffer狀態轉換過程爲: READING_FRAME_SIZE -> READING_FRAME -> READ_FRAME_COMPLETE -> AWAITING_REGISTER_WRITE -> WRITING -> AWAITING_REGISTER_READ -> READING_FRAME_SIZE ;
若是Client不須要返回結果,FrameBuffer狀態轉換過程爲: READING_FRAME_SIZE -> READING_FRAME -> READ_FRAME_COMPLETE -> AWAITING_REGISTER_READ -> READING_FRAME_SIZE ;
若是以上任何狀態執行時出現異常,FrameBuffer狀態將轉換爲 AWAITING_CLOSE。
FrameBuffer類的源碼分析以下,FrameBuffer與SelectionKey綁定,它實現了從客戶端讀取數據、調用業務邏輯、向客戶端返回數據,並管理閾值綁定的SelectionKey的註冊事件的改變。
protected class FrameBuffer { // the actual transport hooked up to the client. private final TNonblockingTransport trans_;//與客戶端創建的鏈接,具體的實現是TNonblockingSocket // the SelectionKey that corresponds to our transport private final SelectionKey selectionKey_;//該FrameBuffer對象關聯的SelectionKey對象 // the SelectThread that owns the registration of our transport private final AbstractSelectThread selectThread_;//該FrameBuffer對象所屬的selectThread_線程 // where in the process of reading/writing are we? private FrameBufferState state_ = FrameBufferState.READING_FRAME_SIZE;//該FrameBuffer對象的狀態 // the ByteBuffer we'll be using to write and read, depending on the state private ByteBuffer buffer_;//讀寫數據時使用的buffer,Java NIO private TByteArrayOutputStream response_;//執行完業務邏輯後,保存在本地的結果 public FrameBuffer(final TNonblockingTransport trans, final SelectionKey selectionKey, final AbstractSelectThread selectThread) { trans_ = trans; selectionKey_ = selectionKey; selectThread_ = selectThread; buffer_ = ByteBuffer.allocate(4);//由於TFramedTransport的frameSize爲4-byte,因此分配4字節 } /** * Give this FrameBuffer a chance to read. The selector loop should have * received a read event for this FrameBuffer. * * @return true if the connection should live on, false if it should be * closed */ //讀取一次數據,若是狀態爲READING_FRAME_SIZE,則讀取FrameSize;若是狀態爲READING_FRAME,則讀數據 public boolean read() { if (state_ == FrameBufferState.READING_FRAME_SIZE) { // try to read the frame size completely //從trans_讀取數據到buffer_中,數據大小小於等於Framesize if (!internalRead()) { return false; } // if the frame size has been read completely, then prepare to read the // actual frame. //remaining()返回buffer_剩餘的可用長度,返回0表明buffer_的4字節緩存已經被佔滿,即讀完了FrameSize if (buffer_.remaining() == 0) { // pull out the frame size as an integer. int frameSize = buffer_.getInt(0);//轉化爲Int型frameSize //對frameSize進行校驗 if (frameSize <= 0) { LOGGER.error("Read an invalid frame size of " + frameSize + ". Are you using TFramedTransport on the client side?"); return false; } // if this frame will always be too large for this server, log the // error and close the connection. if (frameSize > MAX_READ_BUFFER_BYTES) { LOGGER.error("Read a frame size of " + frameSize + ", which is bigger than the maximum allowable buffer size for ALL connections."); return false; } // if this frame will push us over the memory limit, then return. // with luck, more memory will free up the next time around. // 超出已分配讀緩存字節數,返回true,等待下次讀取 if (readBufferBytesAllocated.get() + frameSize > MAX_READ_BUFFER_BYTES) { return true; } // increment the amount of memory allocated to read buffers已分配讀緩存字節數增長frameSize readBufferBytesAllocated.addAndGet(frameSize); // reallocate the readbuffer as a frame-sized buffer //frameSize經過校驗後,從新爲buffer_分配frameSize大小的緩存空間,讀取真實數據時使用 buffer_ = ByteBuffer.allocate(frameSize); //frameSize經過校驗後,將狀態改成READING_FRAME,接着讀真實數據 state_ = FrameBufferState.READING_FRAME; } else { // this skips the check of READING_FRAME state below, since we can't // possibly go on to that state if there's data left to be read at // this one. //buffer_還有剩餘空間,即尚未讀完FrameSize,返回true,下次繼續讀 return true; } } // it is possible to fall through from the READING_FRAME_SIZE section // to READING_FRAME if there's already some frame data available once // READING_FRAME_SIZE is complete. if (state_ == FrameBufferState.READING_FRAME) { if (!internalRead()) { return false; } // since we're already in the select loop here for sure, we can just // modify our selection key directly. //此時的buffer_大小爲frameSize,當==0時,說明數據讀取完成 if (buffer_.remaining() == 0) { // get rid of the read select interests //註銷掉當前FrameBuffer關聯的selectionKey_的read事件 selectionKey_.interestOps(0); //修改狀態爲READ_FRAME_COMPLETE state_ = FrameBufferState.READ_FRAME_COMPLETE; } //數據讀取沒有完成,返回true下次繼續讀取 return true; } // if we fall through to this point, then the state must be invalid. LOGGER.error("Read was called but state is invalid (" + state_ + ")"); return false; } /** * Give this FrameBuffer a chance to write its output to the final client.寫數據 */ public boolean write() { if (state_ == FrameBufferState.WRITING) { try { //將buffer_中的數據寫入客戶端trans_ if (trans_.write(buffer_) < 0) { return false; } } catch (IOException e) { LOGGER.warn("Got an IOException during write!", e); return false; } // we're done writing. now we need to switch back to reading. if (buffer_.remaining() == 0) { prepareRead();//已經write完成,準備切換爲讀模式 } return true; } LOGGER.error("Write was called, but state is invalid (" + state_ + ")"); return false; } /** * Give this FrameBuffer a chance to set its interest to write, once data * has come in. 修改selectionKey_的事件,當狀態爲AWAITING_狀態時調用, */ public void changeSelectInterests() { if (state_ == FrameBufferState.AWAITING_REGISTER_WRITE) { // set the OP_WRITE interest selectionKey_.interestOps(SelectionKey.OP_WRITE); state_ = FrameBufferState.WRITING; } else if (state_ == FrameBufferState.AWAITING_REGISTER_READ) { prepareRead(); } else if (state_ == FrameBufferState.AWAITING_CLOSE) { close(); selectionKey_.cancel(); } else { LOGGER.error("changeSelectInterest was called, but state is invalid (" + state_ + ")"); } } /** * Shut the connection down. 關閉當前FrameBuffer */ public void close() { // if we're being closed due to an error, we might have allocated a // buffer that we need to subtract for our memory accounting. if (state_ == FrameBufferState.READING_FRAME || state_ == FrameBufferState.READ_FRAME_COMPLETE) { readBufferBytesAllocated.addAndGet(-buffer_.array().length); } trans_.close(); } /** * Check if this FrameBuffer has a full frame read. */ public boolean isFrameFullyRead() { return state_ == FrameBufferState.READ_FRAME_COMPLETE; } /** * After the processor has processed the invocation, whatever thread is * managing invocations should call this method on this FrameBuffer so we * know it's time to start trying to write again. Also, if it turns out that * there actually isn't any data in the response buffer, we'll skip trying * to write and instead go back to reading. */ //準備返回結果 public void responseReady() { // the read buffer is definitely no longer in use, so we will decrement // our read buffer count. we do this here as well as in close because // we'd like to free this read memory up as quickly as possible for other // clients. // 此時已完成調用,釋放讀緩存 readBufferBytesAllocated.addAndGet(-buffer_.array().length); if (response_.len() == 0) { // go straight to reading again. this was probably an oneway method // 不須要返回結果,直接將狀態置爲AWAITING_REGISTER_READ,準備進行下次讀取操做 state_ = FrameBufferState.AWAITING_REGISTER_READ; buffer_ = null; } else { //將返回數據寫入buffer_ buffer_ = ByteBuffer.wrap(response_.get(), 0, response_.len()); // set state that we're waiting to be switched to write. we do this // asynchronously through requestSelectInterestChange() because there is // a possibility that we're not in the main thread, and thus currently // blocked in select(). (this functionality is in place for the sake of // the HsHa server.) //狀態置爲AWAITING_REGISTER_WRITE,準備寫回數據 state_ = FrameBufferState.AWAITING_REGISTER_WRITE; } //請求註冊selector事件變化 requestSelectInterestChange(); } /** * Actually invoke the method signified by this FrameBuffer. * 調用業務邏輯的方法 */ public void invoke() { TTransport inTrans = getInputTransport(); TProtocol inProt = inputProtocolFactory_.getProtocol(inTrans); TProtocol outProt = outputProtocolFactory_.getProtocol(getOutputTransport()); try { //執行業務邏輯 processorFactory_.getProcessor(inTrans).process(inProt, outProt); //準被返回數據 responseReady(); return; } catch (TException te) { LOGGER.warn("Exception while invoking!", te); } catch (Throwable t) { LOGGER.error("Unexpected throwable while invoking!", t); } // This will only be reached when there is a throwable. state_ = FrameBufferState.AWAITING_CLOSE; requestSelectInterestChange(); } /** * Wrap the read buffer in a memory-based transport so a processor can read * the data it needs to handle an invocation. */ private TTransport getInputTransport() { return new TMemoryInputTransport(buffer_.array()); } /** * Get the transport that should be used by the invoker for responding. */ private TTransport getOutputTransport() { response_ = new TByteArrayOutputStream(); return outputTransportFactory_.getTransport(new TIOStreamTransport(response_)); } /** * Perform a read into buffer. * 從trans_讀取數據到buffer_中 * @return true if the read succeeded, false if there was an error or the * connection closed. */ private boolean internalRead() { try { if (trans_.read(buffer_) < 0) { return false; } return true; } catch (IOException e) { LOGGER.warn("Got an IOException in internalRead!", e); return false; } } /** * We're done writing, so reset our interest ops and change state * accordingly. */ private void prepareRead() { // we can set our interest directly without using the queue because // we're in the select thread. 註冊讀事件 selectionKey_.interestOps(SelectionKey.OP_READ); // get ready for another go-around buffer_ = ByteBuffer.allocate(4);//分配4字節緩存 state_ = FrameBufferState.READING_FRAME_SIZE;//狀態置爲READING_FRAME_SIZE } /** * When this FrameBuffer needs to change its select interests and execution * might not be in its select thread, then this method will make sure the * interest change gets done when the select thread wakes back up. When the * current thread is this FrameBuffer's select thread, then it just does the * interest change immediately. */ private void requestSelectInterestChange() { if (Thread.currentThread() == this.selectThread_) { changeSelectInterests(); } else { this.selectThread_.requestSelectInterestChange(this); } } }
AbstractSelectThread類是Selector非阻塞I/O讀寫的線程,源碼分析以下:
protected abstract class AbstractSelectThread extends Thread { protected final Selector selector; // List of FrameBuffers that want to change their selection interests. // 當FrameBuffer須要修改已註冊到selector的事件時,要把該FrameBuffer加入這個集合 protected final Set<FrameBuffer> selectInterestChanges = new HashSet<FrameBuffer>(); public AbstractSelectThread() throws IOException { this.selector = SelectorProvider.provider().openSelector(); } /** * If the selector is blocked, wake it up. 喚醒selector */ public void wakeupSelector() { selector.wakeup(); } /** * Add FrameBuffer to the list of select interest changes and wake up the * selector if it's blocked. When the select() call exits, it'll give the * FrameBuffer a chance to change its interests. * 將frameBuffer加入selectInterestChanges集合 */ public void requestSelectInterestChange(FrameBuffer frameBuffer) { synchronized (selectInterestChanges) { selectInterestChanges.add(frameBuffer); } // wakeup the selector, if it's currently blocked. selector.wakeup(); } /** * Check to see if there are any FrameBuffers that have switched their * interest type from read to write or vice versa. * 檢查是否有須要改變註冊事件的FrameBuffer */ protected void processInterestChanges() { synchronized (selectInterestChanges) { for (FrameBuffer fb : selectInterestChanges) { fb.changeSelectInterests(); } selectInterestChanges.clear(); } } /** * Do the work required to read from a readable client. If the frame is * fully read, then invoke the method call. * 讀取Client數據,若是已經讀取完成則調用業務邏輯 */ protected void handleRead(SelectionKey key) { FrameBuffer buffer = (FrameBuffer) key.attachment(); if (!buffer.read()) { //讀取失敗則清除鏈接 cleanupSelectionKey(key); return; } // if the buffer's frame read is complete, invoke the method. if (buffer.isFrameFullyRead()) { if (!requestInvoke(buffer)) { //調用失敗則清除鏈接 cleanupSelectionKey(key); } } } /** * Let a writable client get written, if there's data to be written. * 向Client返回數據 */ protected void handleWrite(SelectionKey key) { FrameBuffer buffer = (FrameBuffer) key.attachment(); if (!buffer.write()) { //寫入失敗則清除鏈接 cleanupSelectionKey(key); } } /** * Do connection-close cleanup on a given SelectionKey. * 關閉鏈接 */ protected void cleanupSelectionKey(SelectionKey key) { // remove the records from the two maps FrameBuffer buffer = (FrameBuffer) key.attachment(); if (buffer != null) { // close the buffer buffer.close(); } // cancel the selection key key.cancel(); } }
總結:AbstractNonblockingServer、FrameBuffer、AbstractSelectThread三個類是實現非阻塞I/O TServer的關鍵,三種的關係以下圖所示。
其中AbstractSelectThread中handleRead(SelectionKey key),processInterestChanges(),handleWrite(SelectionKey key)是子類調用的方法入口,咱們按照 一次請求的流程來介紹整個過程。 1.1.子類調用handRead(SelectionKey key)方法時,會對傳入的SelectionKey綁定的FrameBuffer調用read()方法,這裏read()可能一次不會讀完,有可能屢次handRead方法調用纔會讀完數據,最終讀完數據狀態轉爲READ_FRAME_COMPLETE,從而isFrameFullyRead()纔會經過。 1.2.讀完數據後,會調用用子類的requestInvoke(buffer)方法,內部最終回調FrameBuffer.invoke()方法,進行業務邏輯處理。 1.3.業務調用結束後,調整FrameBuffer進入AWAITING_REGISTER_WRITE或AWAITING_REGISTER_READ狀態,而後將變動Selector事件類型,這裏的requestSelectInterestChange()方法會有判斷當前線程是否爲所屬Select線程,是由於非阻塞服務模型中有單線程、多線程,通常來講,多線程因爲業務邏輯的執行是線程池在調用,因此確定是調用AbstractSelectThread.requestSelectInterestChange(FrameBuffer frameBuffer)將事件變動註冊到AbstractSelectThread的事件集合中。 2.processInterestChanges()由子類調用後,會對事件集合中的FrameBuffer進行已註冊的事件轉換。 3.handleWrite(SelectionKey key)由子類調用後,會對傳入的SelectionKey綁定的FrameBuffer調用write()方法,同read()同樣,可能須要屢次才能寫完,寫完後又回到READING_FRAME_SIZE狀態。 注意:handleRead,handleWrite調用時,若是讀寫操做出錯,則調用cleanupSelectionKey(SelectionKey key)清理key和釋放FrameBuffer相關資源。
圖片和解釋摘自http://blog.csdn.net/chen7253886/article/details/53024848
TNonblockingServer是非阻塞AbstractNonblockingServer的一種實現,採用單線程處理I/O事件。將全部的Socket註冊到Selector中,在一個線程中循環檢查並處理Selector的就緒事件。TNonblockingServer與TSimpleServer都是使用單線程,但與阻塞TSimpleServer不一樣的是,TNonblockingServer能夠實現同時接入多個客戶端鏈接。下面看一下源碼。
public class TNonblockingServer extends AbstractNonblockingServer { private SelectAcceptThread selectAcceptThread_; //開啓selectAcceptThread_處理Client鏈接和請求 @Override protected boolean startThreads() { try { //單線程SelectAcceptThread處理I/O selectAcceptThread_ = new SelectAcceptThread((TNonblockingServerTransport)serverTransport_); stopped_ = false; selectAcceptThread_.start(); return true; } catch (IOException e) { LOGGER.error("Failed to start selector thread!", e); return false; } } @Override protected void waitForShutdown() { joinSelector(); } //阻塞直到selectAcceptThread_退出 protected void joinSelector() { try { selectAcceptThread_.join(); } catch (InterruptedException e) { // for now, just silently ignore. technically this means we'll have less of // a graceful shutdown as a result. } } //關閉Server @Override public void stop() { stopped_ = true; if (selectAcceptThread_ != null) { selectAcceptThread_.wakeupSelector(); } } /** * Perform an invocation. This method could behave several different ways * - invoke immediately inline, queue for separate execution, etc. * 調用業務邏輯,在handleRead方法中讀取數據完成後會調用該方法 */ @Override protected boolean requestInvoke(FrameBuffer frameBuffer) { frameBuffer.invoke(); return true; } }
其中SelectAcceptThread線程類是處理I/O的核心方法,SelectAcceptThread繼承了抽象類AbstractSelectThread。
/** * The thread that will be doing all the selecting, managing new connections * and those that still need to be read. * 處理I/O事件的線程,繼承了抽象類AbstractSelectThread */ protected class SelectAcceptThread extends AbstractSelectThread { // The server transport on which new client transports will be accepted private final TNonblockingServerTransport serverTransport; /** * Set up the thread that will handle the non-blocking accepts, reads, and * writes. */ public SelectAcceptThread(final TNonblockingServerTransport serverTransport) throws IOException { this.serverTransport = serverTransport; //註冊serverSocketChannel到selector,SelectionKey.OP_ACCEPT serverTransport.registerSelector(selector); } public boolean isStopped() { return stopped_; } /** * The work loop. Handles both selecting (all IO operations) and managing * the selection preferences of all existing connections. */ public void run() { //循環檢查selector是否有就緒的事件 try { while (!stopped_) { //檢查並處理IO事件 select(); //檢查是否有FrameBuffer須要修改他們的interest processInterestChanges(); } //服務關閉,清除全部的SelectionKey for (SelectionKey selectionKey : selector.keys()) { cleanupSelectionKey(selectionKey); } } catch (Throwable t) { LOGGER.error("run() exiting due to uncaught error", t); } finally { stopped_ = true; } } /** * Select and process IO events appropriately: * If there are connections to be accepted, accept them. * If there are existing connections with data waiting to be read, read it, * buffering until a whole frame has been read. * If there are any pending responses, buffer them until their target client * is available, and then send the data. * 檢查並處理I/O事件 */ private void select() { try { // wait for io events. 檢查是否有就緒的I/O操做,若是沒有則一直阻塞 selector.select(); // process the io events we received Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator(); while (!stopped_ && selectedKeys.hasNext()) { SelectionKey key = selectedKeys.next(); selectedKeys.remove(); // skip if not valid if (!key.isValid()) { //清除無效的SelectionKey cleanupSelectionKey(key); continue; } // if the key is marked Accept, then it has to be the server // transport. 對不一樣的事件作不一樣的處理 if (key.isAcceptable()) { handleAccept(); } else if (key.isReadable()) { // deal with reads 處理讀數據,調用AbstractSelectThread的handleRead方法。 handleRead(key); } else if (key.isWritable()) { // deal with writes 處理寫數據,調用AbstractSelectThread的handleWrite方法。 handleWrite(key); } else { LOGGER.warn("Unexpected state in select! " + key.interestOps()); } } } catch (IOException e) { LOGGER.warn("Got an IOException while selecting!", e); } } /** * Accept a new connection. Client創建鏈接 */ private void handleAccept() throws IOException { SelectionKey clientKey = null; TNonblockingTransport client = null; try { // accept the connection 創建與客戶端的鏈接,並將該鏈接註冊到selector的OP_READ事件 //在Java NIO中SelectionKey是跟蹤被註冊事件的句柄 client = (TNonblockingTransport)serverTransport.accept(); clientKey = client.registerSelector(selector, SelectionKey.OP_READ); // add this key to the map 每一個與客戶端的鏈接都對應一個FrameBuffer // FrameBuffer frameBuffer = new FrameBuffer(client, clientKey, SelectAcceptThread.this); //將frameBuffer附着到SelectionKey上,這樣就能方便的識別某個給定的通道 clientKey.attach(frameBuffer); } catch (TTransportException tte) { // something went wrong accepting. LOGGER.warn("Exception trying to accept!", tte); tte.printStackTrace(); if (clientKey != null) cleanupSelectionKey(clientKey); if (client != null) client.close(); } } }
由源碼能夠看出,TNonblockingServer的處理流程以下
THsHaServer是TNonblockingServer的子類,它重寫了 requestInvoke() 方法,與TNonblockingServer使用單線程處理selector和業務邏輯調用不一樣的是,THsHaServer採用線程池異步處理業務邏輯調用,所以THsHaServer也被稱爲半同步/半異步Server。它的源碼就很簡單了。
public class THsHaServer extends TNonblockingServer { private final ExecutorService invoker;//處理業務邏輯調用的線程池 private final Args args; public THsHaServer(Args args) { super(args); //若是參數中沒有線程池則建立線程池 invoker = args.executorService == null ? createInvokerPool(args) : args.executorService; this.args = args; } @Override protected void waitForShutdown() { joinSelector();//Server關閉前一直阻塞 gracefullyShutdownInvokerPool(); } //建立線程池方法 protected static ExecutorService createInvokerPool(Args options) { int workerThreads = options.workerThreads; int stopTimeoutVal = options.stopTimeoutVal; TimeUnit stopTimeoutUnit = options.stopTimeoutUnit; LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(); ExecutorService invoker = new ThreadPoolExecutor(workerThreads, workerThreads, stopTimeoutVal, stopTimeoutUnit, queue); return invoker; } //友好的關閉線程池 protected void gracefullyShutdownInvokerPool() { invoker.shutdown(); // Loop until awaitTermination finally does return without a interrupted // exception. If we don't do this, then we'll shut down prematurely. We want // to let the executorService clear it's task queue, closing client sockets // appropriately. long timeoutMS = args.stopTimeoutUnit.toMillis(args.stopTimeoutVal); long now = System.currentTimeMillis(); while (timeoutMS >= 0) { try { invoker.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS); break; } catch (InterruptedException ix) { long newnow = System.currentTimeMillis(); timeoutMS -= (newnow - now); now = newnow; } } } //重寫的業務邏輯調用的方法,使用線程池異步完成 @Override protected boolean requestInvoke(FrameBuffer frameBuffer) { try { Runnable invocation = getRunnable(frameBuffer); invoker.execute(invocation); return true; } catch (RejectedExecutionException rx) { LOGGER.warn("ExecutorService rejected execution!", rx); return false; } } protected Runnable getRunnable(FrameBuffer frameBuffer){ return new Invocation(frameBuffer); } }
THsHaServer處理流程以下
TThreadedSelectorServer是非阻塞服務AbstractNonblockingServer的另外一種實現,也是TServer的最高級形式。雖然THsHaServer對業務邏輯調用採用了線程池的方式,可是全部的數據讀取和寫入操做還都在單線程中完成,當須要在Client和Server之間傳輸大量數據時,THsHaServer就會面臨性能問題。TThreadedSelectorServer將數據讀取和寫入操做也進行了多線程化,先經過模型圖瞭解實現原理。
由上圖能夠看到:
1)單個AcceptThread線程負責處理Client的新建鏈接;
2)多個SelectorThread線程負責處理數據讀取和寫入操做;
3)單個負載均衡器SelectorThreadLoadBalancer負責將AcceptThread線程創建的新鏈接分配給哪一個SelectorThread線程處理;
4)ExecutorService線程池負責業務邏輯的異步調用。
源碼分析,先看一下TThreadedSelectorServer的參數類Args增長了那些參數。
public static class Args extends AbstractNonblockingServerArgs<Args> { public int selectorThreads = 2; //SelectorThread線程數量 //業務邏輯調用線程池大小,爲0時至關於在SelectorThread線程中直接調用業務邏輯 private int workerThreads = 5; private int stopTimeoutVal = 60; private TimeUnit stopTimeoutUnit = TimeUnit.SECONDS; private ExecutorService executorService = null; //業務邏輯調用線程池 private int acceptQueueSizePerThread = 4; //SelectorThread線程接收請求的隊列大小 //處理Client新鏈接請求的策略 public static enum AcceptPolicy { //已接收的鏈接請求須要註冊到線程池中,若是線程池已滿,將當即關閉鏈接,因爲調度將會稍微增長延遲 FAIR_ACCEPT, //快速接收,不關心線程池的狀態 FAST_ACCEPT } //默認使用快速接收 private AcceptPolicy acceptPolicy = AcceptPolicy.FAST_ACCEPT; }
再看一下TThreadedSelectorServer類的成員變量和對父類AbstractNonblockingServer抽象方法的具體實現。
public class TThreadedSelectorServer extends AbstractNonblockingServer { private volatile boolean stopped_ = true; private AcceptThread acceptThread; //處理Client新鏈接線程 private final Set<SelectorThread> selectorThreads = new HashSet<SelectorThread>(); //處理讀寫數據的線程集合 private final ExecutorService invoker; //線程池 private final Args args; //構造函數,初始化Server public TThreadedSelectorServer(Args args) { super(args); args.validate(); invoker = args.executorService == null ? createDefaultExecutor(args) : args.executorService; this.args = args; } //啓動acceptThread和若干個selectorThreads @Override protected boolean startThreads() { try { for (int i = 0; i < args.selectorThreads; ++i) { selectorThreads.add(new SelectorThread(args.acceptQueueSizePerThread)); } acceptThread = new AcceptThread((TNonblockingServerTransport) serverTransport_, createSelectorThreadLoadBalancer(selectorThreads)); stopped_ = false; for (SelectorThread thread : selectorThreads) { thread.start(); } acceptThread.start(); return true; } catch (IOException e) { LOGGER.error("Failed to start threads!", e); return false; } } //等待關閉Server @Override protected void waitForShutdown() { try { joinThreads(); //等待accept and selector threads都中止運行 } catch (InterruptedException e) { LOGGER.error("Interrupted while joining threads!", e); } //關閉回調業務邏輯的線程池 gracefullyShutdownInvokerPool(); } protected void joinThreads() throws InterruptedException { //accept and selector threads都中止運行前一直阻塞 acceptThread.join(); for (SelectorThread thread : selectorThreads) { thread.join(); } } //中止Server @Override public void stop() { stopped_ = true; stopListening(); //中止接收新請求 if (acceptThread != null) { //可能acceptThread處於阻塞中,喚醒acceptThread acceptThread.wakeupSelector(); } if (selectorThreads != null) { //可能selectorThreads處於阻塞中,喚醒selectorThreads for (SelectorThread thread : selectorThreads) { if (thread != null) thread.wakeupSelector(); } } } protected void gracefullyShutdownInvokerPool() { invoker.shutdown(); // Loop until awaitTermination finally does return without a interrupted // exception. If we don't do this, then we'll shut down prematurely. We want // to let the executorService clear it's task queue, closing client sockets // appropriately. long timeoutMS = args.stopTimeoutUnit.toMillis(args.stopTimeoutVal); long now = System.currentTimeMillis(); while (timeoutMS >= 0) { try { invoker.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS); break; } catch (InterruptedException ix) { long newnow = System.currentTimeMillis(); timeoutMS -= (newnow - now); now = newnow; } } } //業務邏輯調用,在handleRead方法讀取數據完成後調用 @Override protected boolean requestInvoke(FrameBuffer frameBuffer) { Runnable invocation = getRunnable(frameBuffer); if (invoker != null) { //放進線程池執行 try { invoker.execute(invocation); return true; } catch (RejectedExecutionException rx) { LOGGER.warn("ExecutorService rejected execution!", rx); return false; } } else { // 線程池爲null,在調用requestInvoke的線程(I/O線程)中執行 invocation.run(); return true; } } protected Runnable getRunnable(FrameBuffer frameBuffer) { return new Invocation(frameBuffer); } protected static ExecutorService createDefaultExecutor(Args options) { return (options.workerThreads > 0) ? Executors.newFixedThreadPool(options.workerThreads) : null; } private static BlockingQueue<TNonblockingTransport> createDefaultAcceptQueue(int queueSize) { if (queueSize == 0) { return new LinkedBlockingQueue<TNonblockingTransport>();//無界隊列 } return new ArrayBlockingQueue<TNonblockingTransport>(queueSize); } }
最後看一下最核心的兩個類AcceptThread與SelectorThread的源碼。
AcceptThread負責接收CLient的新鏈接請求。
protected class AcceptThread extends Thread { private final TNonblockingServerTransport serverTransport;//監聽端口的ServerSocket private final Selector acceptSelector; private final SelectorThreadLoadBalancer threadChooser;//負責負載均衡 public AcceptThread(TNonblockingServerTransport serverTransport, SelectorThreadLoadBalancer threadChooser) throws IOException { this.serverTransport = serverTransport; this.threadChooser = threadChooser; //acceptSelector是AcceptThread專屬的,專門用於接收新鏈接使用,不要與處理I/O時的selector混淆 this.acceptSelector = SelectorProvider.provider().openSelector(); //將serverTransport註冊到Selector上接收OP_ACCEPT鏈接事件 this.serverTransport.registerSelector(acceptSelector); } public void run() { try { //不斷循環select() while (!stopped_) { select(); } } catch (Throwable t) { LOGGER.error("run() exiting due to uncaught error", t); } finally { TThreadedSelectorServer.this.stop();//調用Stop方法,喚醒SelectorThreads中的線程 } } //喚醒acceptSelector public void wakeupSelector() { acceptSelector.wakeup(); } private void select() { try { // wait for connect events. acceptSelector.select(); // process the io events we received Iterator<SelectionKey> selectedKeys = acceptSelector.selectedKeys().iterator(); while (!stopped_ && selectedKeys.hasNext()) { SelectionKey key = selectedKeys.next(); selectedKeys.remove(); // skip if not valid if (!key.isValid()) { continue; } //處理鏈接請求 if (key.isAcceptable()) { handleAccept(); } else { LOGGER.warn("Unexpected state in select! " + key.interestOps()); } } } catch (IOException e) { LOGGER.warn("Got an IOException while selecting!", e); } } //處理鏈接請求 private void handleAccept() { final TNonblockingTransport client = doAccept();//新建鏈接 if (client != null) { //取出一個selector thread final SelectorThread targetThread = threadChooser.nextThread(); //當接收策略爲FAST_ACCEPT或invoker爲空時,直接將client扔給SelectorThread if (args.acceptPolicy == Args.AcceptPolicy.FAST_ACCEPT || invoker == null) { doAddAccept(targetThread, client); } else { //當接收策略爲FAIR_ACCEPT時,將doAddAccept任務扔到線程池處理 try { invoker.submit(new Runnable() { public void run() { doAddAccept(targetThread, client); } }); } catch (RejectedExecutionException rx) { LOGGER.warn("ExecutorService rejected accept registration!", rx); // 若是線程池invoker隊列滿,關閉該Client鏈接 client.close(); } } } } //接收新鏈接 private TNonblockingTransport doAccept() { try { return (TNonblockingTransport) serverTransport.accept(); } catch (TTransportException tte) { LOGGER.warn("Exception trying to accept!", tte); return null; } } //將新鏈接添加到SelectorThread的隊列中 private void doAddAccept(SelectorThread thread, TNonblockingTransport client) { if (!thread.addAcceptedConnection(client)) { client.close();//若是添加失敗,關閉client } } }
SelectorThread線程負責讀寫數據:
protected class SelectorThread extends AbstractSelectThread { private final BlockingQueue<TNonblockingTransport> acceptedQueue;//存放Client鏈接的阻塞隊列 public SelectorThread() throws IOException { this(new LinkedBlockingQueue<TNonblockingTransport>());//默認爲無界隊列 } public SelectorThread(int maxPendingAccepts) throws IOException { this(createDefaultAcceptQueue(maxPendingAccepts));//指定大小有界隊列 } public SelectorThread(BlockingQueue<TNonblockingTransport> acceptedQueue) throws IOException { this.acceptedQueue = acceptedQueue;//指定隊列 } //將鏈接添加到acceptedQueue,若是隊列滿將阻塞 public boolean addAcceptedConnection(TNonblockingTransport accepted) { try { acceptedQueue.put(accepted); } catch (InterruptedException e) { LOGGER.warn("Interrupted while adding accepted connection!", e); return false; } //某個線程調用select()方法後阻塞了,即便沒有通道就緒,wakeup()辦法也能讓其從select()方法返回 //喚醒selector,很重要,由於首次添加accepted時select()方法確定會一直阻塞,只有喚醒後才能執行processAcceptedConnections方法,將新鏈接註冊到註冊到selector,下次調用select()方法時就能夠檢測到該鏈接就緒的事件 selector.wakeup(); return true; } public void run() { try { while (!stopped_) { select();//若是沒有通道就緒,將阻塞 processAcceptedConnections();//處理新鏈接,註冊到selector processInterestChanges();//處理現有鏈接,註冊事件修改請求 } //Server關閉時的清理工做 for (SelectionKey selectionKey : selector.keys()) { cleanupSelectionKey(selectionKey); } } catch (Throwable t) { LOGGER.error("run() exiting due to uncaught error", t); } finally { // This will wake up the accept thread and the other selector threads TThreadedSelectorServer.this.stop(); } } /** * Select and process IO events appropriately: If there are existing * connections with data waiting to be read, read it, buffering until a * whole frame has been read. If there are any pending responses, buffer * them until their target client is available, and then send the data. */ private void select() { try { // wait for io events. selector.select();//每一個SelectorThread線程都有本身的selector // process the io events we received Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator(); while (!stopped_ && selectedKeys.hasNext()) { SelectionKey key = selectedKeys.next(); selectedKeys.remove(); // skip if not valid if (!key.isValid()) { cleanupSelectionKey(key); continue; } if (key.isReadable()) { // deal with reads handleRead(key); } else if (key.isWritable()) { // deal with writes handleWrite(key); } else { LOGGER.warn("Unexpected state in select! " + key.interestOps()); } } } catch (IOException e) { LOGGER.warn("Got an IOException while selecting!", e); } } private void processAcceptedConnections() { // Register accepted connections while (!stopped_) { TNonblockingTransport accepted = acceptedQueue.poll(); if (accepted == null) { break; } registerAccepted(accepted); } } //將accepted註冊到selector監聽OP_READ事件,並組裝FrameBuffer附着在SelectionKey上 private void registerAccepted(TNonblockingTransport accepted) { SelectionKey clientKey = null; try { clientKey = accepted.registerSelector(selector, SelectionKey.OP_READ); FrameBuffer frameBuffer = new FrameBuffer(accepted, clientKey, SelectorThread.this); clientKey.attach(frameBuffer); } catch (IOException e) { LOGGER.warn("Failed to register accepted connection to selector!", e); if (clientKey != null) { cleanupSelectionKey(clientKey); } accepted.close(); } } }
是否阻塞I/O | 接收鏈接處理 | I/O處理 | 業務邏輯調用 | 特色 | 適用狀況 | |
TSimpleServer | 阻塞 | 單線程 | 單線程處理全部操做,同一時間只能處理一個客戶端鏈接,當前客戶端斷開鏈接後才能接收下一個鏈接 | 測試使用,不能在生產環境使用 | ||
TThreadPoolServer | 阻塞 | 單線程 | 線程池 | 有一個專用的線程用來接受鏈接,一旦接受了一個鏈接,它就會被放入ThreadPoolExecutor中的一個worker線程裏處理, worker線程被綁定到特定的客戶端鏈接上,直到它關閉。一旦鏈接關閉,該worker線程就又回到了線程池中。 若是客戶端數量超過了線程池中的最大線程數,在有一個worker線程可用以前,請求將被一直阻塞在那裏。 |
性能較高,適合併發Client鏈接數不是過高的狀況 | |
TNonblockingServer | 非阻塞 | 單線程 | 採用非阻塞的I/O能夠單線程監控多個鏈接,全部處理是被調用select()方法的同一個線程順序處理的 | 適用於業務處理簡單,客戶端鏈接較少的狀況,不適合高併發場景,單線程效率低 | ||
THsHaServer | 非阻塞 | 單線程 | 線程池 | 半同步半異步,使用一個單獨的線程來處理接收鏈接和網絡I/O,一個獨立的worker線程池來處理消息。 只要有空閒的worker線程,消息就會被當即處理,所以多條消息能被並行處理。 |
適用於網絡I/O不是太繁忙、對業務邏輯調用要求較高的場景 | |
TThreadedSelectorServer | 非阻塞 | 單線程 | 多線程 | 線程池 | 半同步半異步Server。用多個線程來處理網絡I/O,用線程池來進行業務邏輯調用的處理。 當網絡I/O是瓶頸的時候,TThreadedSelectorServer比THsHaServer的表現要好。 |
適用於網絡I/O繁忙、對業務邏輯調用要求較高的、高併發場景 |
通常狀況下,生產環境中使用會在TThreadPoolServer和TThreadedSelectorServer中選一個。TThreadPoolServer優點是處理速度快、響應時間短,缺點是在高併發狀況下佔用系統資源較高;TThreadedSelectorServer優點是支持高併發,劣勢是處理速度沒有TThreadPoolServer高,但在大多數狀況下能也知足業務須要。
本篇文章主要介紹了Thrtft RPC的簡單實用、總體協議棧介紹,TServer幾種實現類的原理和源碼解析。下一篇將介紹Thrift的其餘重要組成部分TProtocol、TTransport等
架構設計:系統間通訊(12)——RPC實例Apache Thrift 中篇
[原創](翻譯)Java版的各類Thrift server實現的比較
多線程awaitTermination和shutdown的使用問題