限於篇幅關係,在觀察源碼的時候,只列舉了部分源代碼html
TSimpleServer/TThreadPoolServer是阻塞服務模型
TNonblockingServer/THsHaServer/TThreadedSelectotServer是非阻塞服務模型(NIO)git
內部靜態類Args的定義, 用於TServer類用於串聯軟件棧(傳輸層, 協議層, 處理層)github
public abstract class TServer { public static class Args extends AbstractServerArgs<Args> { public Args(TServerTransport transport) { super(transport); } } public static abstract class AbstractServerArgs<T extends AbstractServerArgs<T>> { public AbstractServerArgs(TServerTransport transport); public T processorFactory(TProcessorFactory factory); public T processor(TProcessor processor); public T transportFactory(TTransportFactory factory); public T protocolFactory(TProtocolFactory factory); } }
TServer類定義的抽象類數據庫
public abstract class TServer { public abstract void serve(); public void stop(); public boolean isServing(); public void setServerEventHandler(TServerEventHandler eventHandler); }
評註:服務器
抽象函數serve由具體的TServer實例來實現, 而並不是全部的服務都須要優雅的退出, 所以stop沒有被定義爲抽象網絡
TSimpleServer的工做模式採用最簡單的阻塞IO,實現方法簡潔明瞭,便於理解,可是一次只能接收和處理一個socket鏈接,效率比較低,主要用於演示Thrift的工做過程,在實際開發過程當中不多用到它。併發
工做方式如圖:負載均衡
抽象的代碼可簡單描述以下:socket
// *) server socket進行監聽 serverSocket.listen(); while ( isServing() ) { // *) 接受socket連接 client = serverSocket.accept(); // *) 封裝處理器 processor = factory.getProcess(client); while ( true ) { // *) 阻塞處理rpc的輸入/輸出 if ( !processor.process(input, output) ) { break; } } }
TThreadPoolServer模式採用阻塞socket方式工做,,主線程負責阻塞式監聽「監聽socket」中是否有新socket到來,業務處理交由一個線程池來處async
工做模式圖:
ThreadPoolServer解決了TSimple不支持併發和多鏈接的問題, 引入了線程池. 實現的模型是One Thread Per Connection
線程池代碼片斷:
private static ExecutorService createDefaultExecutorService(Args args) { SynchronousQueue<Runnable> executorQueue = new SynchronousQueue<Runnable>(); return new ThreadPoolExecutor(args.minWorkerThreads, args.maxWorkerThreads, args.stopTimeoutVal, TimeUnit.SECONDS, executorQueue); }
評註:
採用同步隊列(SynchronousQueue), 線程池採用能線程數可伸縮的模式.
主線程循環簡單描述代碼:
setServing(true); while (!stopped_) { try { TTransport client = serverTransport_.accept(); WorkerProcess wp = new WorkerProcess(client); executorService_.execute(wp); } catch (TTransportException ttx) { } }
TThreadPoolServer模式優勢:
線程池模式中,拆分了監聽線程(accept)和處理客戶端鏈接的工做線程(worker),數據讀取和業務處理都交由線程池完成,主線程只負責監聽新鏈接,所以在併發量較大時新鏈接也可以被及時接受。線程池模式比較適合服務器端能預知最多有多少個客戶端併發的狀況,這時每一個請求都能被業務線程池及時處理,性能也很是高。
TThreadPoolServer模式缺點:
線程池模式的處理能力受限於線程池的工做能力,當併發請求數大於線程池中的線程數時,新請求也只能排隊等待
TNonblockingServer該模式也是單線程工做,可是採用NIO的模式, 藉助Channel/Selector機制, 採用IO事件模型來處理.
全部的socket都被註冊到selector中,在一個線程中經過seletor循環監控全部的socket,每次selector結束時,處理全部的處於就緒狀態的socket,對於有數據到來的socket進行數據讀取操做,對於有數據發送的socket則進行數據發送,對於監聽socket則產生一個新業務socket並將其註冊到selector中。
工做原理圖:
nio部分關鍵代碼以下:
private void select() { try { // wait for io events. selector.select(); // process the io events we received Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator(); while (!stopped_ && selectedKeys.hasNext()) { SelectionKey key = selectedKeys.next(); selectedKeys.remove(); // skip if not valid if (!key.isValid()) { cleanupSelectionKey(key); continue; } // if the key is marked Accept, then it has to be the server // transport. if (key.isAcceptable()) { handleAccept(); } else if (key.isReadable()) { // deal with reads handleRead(key); } else if (key.isWritable()) { // deal with writes handleWrite(key); } else { LOGGER.warn("Unexpected state in select! " + key.interestOps()); } } } catch (IOException e) { LOGGER.warn("Got an IOException while selecting!", e); } }
TNonblockingServer模式優勢:
相比於TSimpleServer效率提高主要體如今IO多路複用上,TNonblockingServer採用非阻塞IO,對accept/read/write等IO事件進行監控和處理,同時監控多個socket的狀態變化;
TNonblockingServer模式缺點:
TNonblockingServer模式在業務處理上仍是採用單線程順序來完成,在業務處理比較複雜、耗時的時候,例如某些接口函數須要讀取數據庫執行時間較長,會致使整個服務被阻塞住,此時該模式效率也不高,由於多個調用請求任務依然是順序一個接一個執行
鑑於TNonblockingServer的缺點, THsHaServer繼承TNonblockingServer,引入了線程池去處理, 其模型把讀寫任務放到線程池去處理.THsHaServer是Half-sync/Half-async的處理模式, Half-aysnc是在處理IO事件上(accept/read/write io), Half-sync用於handler對rpc的同步處理上.
工做模式圖:
/** * Helper to create an invoker pool */ protected static ExecutorService createInvokerPool(Args options) { int minWorkerThreads = options.minWorkerThreads; int maxWorkerThreads = options.maxWorkerThreads; int stopTimeoutVal = options.stopTimeoutVal; TimeUnit stopTimeoutUnit = options.stopTimeoutUnit; LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(); ExecutorService invoker = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads, stopTimeoutVal, stopTimeoutUnit, queue); return invoker; }
THsHaServer的優勢:
與TNonblockingServer模式相比,THsHaServer在完成數據讀取以後,將業務處理過程交由一個線程池來完成,主線程直接返回進行下一次循環操做,效率大大提高;
THsHaServer的缺點:
主線程須要完成對全部socket的監聽以及數據讀寫的工做,當併發請求數較大時,且發送數據量較多時,監聽socket上新鏈接請求不能被及時接受。
TThreadedSelectorServer是對以上NonblockingServer的擴充, 其分離了Accept和Read/Write的Selector線程, 同時引入Worker工做線程池. 它也是種Half-sync/Half-async的服務模型
TThreadedSelectorServer模式是目前Thrift提供的最高級的模式,它內部有若是幾個部分構成:
(1) 一個AcceptThread線程對象,專門用於處理監聽socket上的新鏈接;
(2) 若干個SelectorThread對象專門用於處理業務socket的網絡I/O操做,全部網絡數據的讀寫均是有這些線程來完成;
(3) 一個負載均衡器SelectorThreadLoadBalancer對象,主要用於AcceptThread線程接收到一個新socket鏈接請求時,決定將這個新鏈接請求分配給哪一個SelectorThread線程。
(4) 一個ExecutorService類型的工做線程池,在SelectorThread線程中,監聽到有業務socket中有調用請求過來,則將請求讀取以後,交個ExecutorService線程池中的線程完成這次調用的具體執行;主要用於處理每一個rpc請求的handler回調處理(這部分是同步的).
工做模式圖:
TThreadedSelectorServer模式中有一個專門的線程AcceptThread用於處理新鏈接請求,所以可以及時響應大量併發鏈接請求;另外它將網絡I/O操做分散到多個SelectorThread線程中來完成,所以可以快速對網絡I/O進行讀寫操做,可以很好地應對網絡I/O較多的狀況
從accpect線程到selectorThreads關鍵代碼
protected boolean startThreads() { try { for (int i = 0; i < args.selectorThreads; ++i) { selectorThreads.add(new SelectorThread(args.acceptQueueSizePerThread));//創建事件選擇線程池 } acceptThread = new AcceptThread((TNonblockingServerTransport) serverTransport_, createSelectorThreadLoadBalancer(selectorThreads));//創建accept接受請求線程 for (SelectorThread thread : selectorThreads) { thread.start(); } acceptThread.start(); return true; } catch (IOException e) { LOGGER.error("Failed to start threads!", e); return false; } }
負載均衡器SelectorThreadLoadBalancer對象部分關鍵代碼:
protected SelectorThreadLoadBalancer createSelectorThreadLoadBalancer(Collection<? extends SelectorThread> threads) { return new SelectorThreadLoadBalancer(threads); } /** * A round robin load balancer for choosing selector threads for new * connections. */ protected static class SelectorThreadLoadBalancer { private final Collection<? extends SelectorThread> threads; private Iterator<? extends SelectorThread> nextThreadIterator; public <T extends SelectorThread> SelectorThreadLoadBalancer(Collection<T> threads) { if (threads.isEmpty()) { throw new IllegalArgumentException("At least one selector thread is required"); } this.threads = Collections.unmodifiableList(new ArrayList<T>(threads)); nextThreadIterator = this.threads.iterator(); } //根據循環負載均衡策略獲取一個SelectorThread public SelectorThread nextThread() { // Choose a selector thread (round robin) if (!nextThreadIterator.hasNext()) { nextThreadIterator = threads.iterator(); } return nextThreadIterator.next(); } }
從SelectorThread線程中,監聽到有業務socket中有調用請求,轉到業務工做線程池關鍵代碼
private void handleAccept() { final TNonblockingTransport client = doAccept();//取得客戶端的鏈接 if (client != null) { // Pass this connection to a selector thread final SelectorThread targetThread = threadChooser.nextThread();//獲取目標SelectorThread if (args.acceptPolicy == Args.AcceptPolicy.FAST_ACCEPT || invoker == null) { doAddAccept(targetThread, client); } else { // FAIR_ACCEPT try { invoker.submit(new Runnable() {// 提交client的業務給到工做線程 public void run() { doAddAccept(targetThread, client); } }); } catch (RejectedExecutionException rx) { LOGGER.warn("ExecutorService rejected accept registration!", rx); // close immediately client.close(); } } } }
demo地址:
碼雲:http://git.oschina.net/shunyang/thrift-all/tree/master/thrift-demo
github:https://github.com/shunyang/thrift-all/tree/master/thrift-demo
本文參考文章: