內部靜態類Args的定義, 用於TServer類用於串聯軟件棧(傳輸層, 協議層, 處理層)服務器
public abstract class TServer { protected TProcessorFactory processorFactory_; protected TServerTransport serverTransport_; protected TTransportFactory inputTransportFactory_; protected TTransportFactory outputTransportFactory_; protected TProtocolFactory inputProtocolFactory_; protected TProtocolFactory outputProtocolFactory_; private boolean isServing; protected TServerEventHandler eventHandler_; protected volatile boolean stopped_ = false; protected TServer(TServer.AbstractServerArgs args) { this.processorFactory_ = args.processorFactory; this.serverTransport_ = args.serverTransport; this.inputTransportFactory_ = args.inputTransportFactory; this.outputTransportFactory_ = args.outputTransportFactory; this.inputProtocolFactory_ = args.inputProtocolFactory; this.outputProtocolFactory_ = args.outputProtocolFactory; } // 啓動服務 由具體的子類實現 public abstract void serve(); // 中止服務 public void stop() { } public abstract static class AbstractServerArgs<T extends TServer.AbstractServerArgs<T>> { final TServerTransport serverTransport; TProcessorFactory processorFactory; TTransportFactory inputTransportFactory = new TTransportFactory(); TTransportFactory outputTransportFactory = new TTransportFactory(); TProtocolFactory inputProtocolFactory = new Factory(); TProtocolFactory outputProtocolFactory = new Factory(); public AbstractServerArgs(TServerTransport transport) { this.serverTransport = transport; } } //內部靜態類Args的定義, 用於TServer類用於串聯軟件棧(傳輸層, 協議層, 處理層) public static class Args extends TServer.AbstractServerArgs<TServer.Args> { public Args(TServerTransport transport) { super(transport); } } }
serverTransport_.listen();//啓動監聽 即 new ServerSocket(port)
while (!stopped_) {//stopped 初始位false TTransport client = null; TProcessor processor = null; TTransport inputTransport = null; TTransport outputTransport = null; TProtocol inputProtocol = null; TProtocol outputProtocol = null; ServerContext connectionContext = null; try { client = serverTransport_.accept();//接收客戶端 即 ServerSocket.accept(); if (client != null) { processor = processorFactory_.getProcessor(client); inputTransport = inputTransportFactory_.getTransport(client); outputTransport = outputTransportFactory_.getTransport(client); inputProtocol = inputProtocolFactory_.getProtocol(inputTransport); outputProtocol = outputProtocolFactory_.getProtocol(outputTransport); if (eventHandler_ != null) { connectionContext = eventHandler_.createContext(inputProtocol, outputProtocol); } while (true) { if (eventHandler_ != null) { eventHandler_.processContext(connectionContext, inputTransport, outputTransport); } if(!processor.process(inputProtocol, outputProtocol)) {//處理輸入輸出 break; } } } } catch (TTransportException ttx) { // Client died, just move on } catch (TException tx) { if (!stopped_) { LOGGER.error("Thrift error occurred during processing of message.", tx); } } catch (Exception x) { if (!stopped_) { LOGGER.error("Error occurred during processing of message.", x); } } if (eventHandler_ != null) { eventHandler_.deleteContext(connectionContext, inputProtocol, outputProtocol); } if (inputTransport != null) { inputTransport.close(); } if (outputTransport != null) { outputTransport.close(); } } setServing(false); }
// 建立線程池執行器 private static ExecutorService createDefaultExecutorService(Args args) { SynchronousQueue<Runnable> executorQueue = new SynchronousQueue<Runnable>(); return new ThreadPoolExecutor(args.minWorkerThreads, args.maxWorkerThreads, args.stopTimeoutVal, args.stopTimeoutUnit, executorQueue); }
stopped_ = false; serverTransport_.listen(); while (!stopped_) { try { TTransport client = serverTransport_.accept(); WorkerProcess wp = new WorkerProcess(client); while(true) { try { executorService_.execute(wp);//ThreadPoolExecutor.execute(wp) break; } catch(Throwable t) { } } } executorService_.shutdown();
TNonblockingServer該模式也是單線程工做,可是採用NIO的模式, 藉助Channel/Selector機制, 採用IO事件模型來處理.ide
public void run() { try { while (!stopped_) { select(); processInterestChanges(); } } catch (Throwable t) { } }
private void select() { try { // wait for io events. selector.select(); // process the io events we received Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator(); while (!stopped_ && selectedKeys.hasNext()) { SelectionKey key = selectedKeys.next(); selectedKeys.remove(); // skip if not valid if (!key.isValid()) { cleanupSelectionKey(key); continue; } // if the key is marked Accept, then it has to be the server // transport. if (key.isAcceptable()) { handleAccept(); } else if (key.isReadable()) { // deal with reads handleRead(key); } else if (key.isWritable()) { // deal with writes handleWrite(key); } else { LOGGER.warn("Unexpected state in select! " + key.interestOps()); } } } catch (IOException e) { LOGGER.warn("Got an IOException while selecting!", e); } }
鑑於TNonblockingServer的缺點, THsHaServer繼承TNonblockingServer,引入了線程池去處理, 其模型把讀寫任務放到線程池去處理.THsHaServer是Half-sync/Half-async的處理模式, Half-aysnc是在處理IO事件上(accept/read/write io), Half-sync用於handler對rpc的同步處理上.
/** * Helper to create an invoker pool */ protected static ExecutorService createInvokerPool(Args options) { int minWorkerThreads = options.minWorkerThreads; int maxWorkerThreads = options.maxWorkerThreads; int stopTimeoutVal = options.stopTimeoutVal; TimeUnit stopTimeoutUnit = options.stopTimeoutUnit; LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(); ExecutorService invoker = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads, stopTimeoutVal, stopTimeoutUnit, queue); return invoker; }
TThreadedSelectorServer是對以上NonblockingServer的擴充, 其分離了Accept和Read/Write的Selector線程, 同時引入Worker工做線程池. 它也是種Half-sync/Half-async的服務模型
(1) 一個AcceptThread線程對象,專門用於處理監聽socket上的新鏈接;
(2) 若干個SelectorThread對象專門用於處理業務socket的網絡I/O操做,全部網絡數據的讀寫均是有這些線程來完成;
(3) 一個負載均衡器SelectorThreadLoadBalancer對象,主要用於AcceptThread線程接收到一個新socket鏈接請求時,決定將這個新鏈接請求分配給哪一個SelectorThread線程。
(4) 一個ExecutorService類型的工做線程池,在SelectorThread線程中,監聽到有業務socket中有調用請求過來,則將請求讀取以後,交個ExecutorService線程池中的線程完成這次調用的具體執行;主要用於處理每一個rpc請求的handler回調處理(這部分是同步的).
@Override protected boolean startThreads() { try { for (int i = 0; i < args.selectorThreads; ++i) { selectorThreads.add(new SelectorThread(args.acceptQueueSizePerThread)); } acceptThread = new AcceptThread((TNonblockingServerTransport) serverTransport_, createSelectorThreadLoadBalancer(selectorThreads));//建立負載均衡器 for (SelectorThread thread : selectorThreads) { thread.start(); } acceptThread.start(); return true; } catch (IOException e) { LOGGER.error("Failed to start threads!", e); return false; } }
private void select() { try { // wait for connect events. acceptSelector.select(); // process the io events we received Iterator<SelectionKey> selectedKeys = acceptSelector.selectedKeys().iterator(); while (!stopped_ && selectedKeys.hasNext()) { SelectionKey key = selectedKeys.next(); selectedKeys.remove(); // skip if not valid if (!key.isValid()) { continue; } if (key.isAcceptable()) { handleAccept(); } else { LOGGER.warn("Unexpected state in select! " + key.interestOps()); } } } catch (IOException e) { LOGGER.warn("Got an IOException while selecting!", e); } }
private void handleAccept() { final TNonblockingTransport client = doAccept(); if (client != null) { // Pass this connection to a selector thread final SelectorThread targetThread = threadChooser.nextThread(); if (args.acceptPolicy == Args.AcceptPolicy.FAST_ACCEPT || invoker == null) { doAddAccept(targetThread, client); } else { // FAIR_ACCEPT try { invoker.submit(new Runnable() { public void run() { doAddAccept(targetThread, client); } }); } catch (RejectedExecutionException rx) { LOGGER.warn("ExecutorService rejected accept registration!", rx); // close immediately client.close(); } } } }