概述
看Java NIO一篇文章的時候又看到了「異步非阻塞」這個概念,一直處於似懂非懂的狀態,想解釋下到底什麼是異步 什麼是非阻塞,感受抓不住重點。決定仔細研究一下。
本文試圖研究如下問題:java
web server原理,bio的connector與nio的connector在架構上到底什麼區別?web
NIO的優點到底在哪裏,是如何應用到實踐中的?數據庫
同步/異步、阻塞/非阻塞究竟是什麼概念,引出的IO模型同步阻塞、同步非阻塞、異步阻塞、異步非阻塞的具體使用場景,適用的場景是怎樣的?編程
bio nio也只是對內核的封裝,Linux內核的支持是怎樣的?再往下TCP的接口調用、參數的設置是否有異同?windows
這裏就不浪費篇幅介紹基礎,文中結論是學習、思考的記錄 topic比較大,歡迎討論
1. 數據的IO
1.1 IO模型
「一切皆文件」,Linux對於文件、字符設備、塊設備、socket的訪問都是以抽象爲文件的方式進行。
Linux與Unix都提供了五種IO模型,在參考1 參考2中都涉及:tomcat
阻塞IO網絡
非阻塞IO多線程
IO複用架構
信號驅動併發
異步IO
可是在實際應用中,2、四不多使用。傳統BIO使用1,Java NIO使用3,JDK7 提供了5的支持。
1.2 Java NIO被稱爲Non-block的疑問
Java NIO使用了事件驅動模型,以Linux爲例 底層經過epoll支持,使用epoll的優勢:
避免大量線程阻塞,避免浪費線程資源,避免線程上下文切換浪費CPU資源
打開的fd不受限制(仍是受操做系統的限制但進程級再也不受限)
IO效率不會隨着fd數目增長而線性降低
使用mmap加速內核與用戶空間消息傳遞
在參考3的最終模型中,稱之爲Non-block
但這裏會形成混淆:
mainReactor、subReactor一直等待事件發生,是阻塞仍是非阻塞?
ThreadPool中的worker一直處於wait狀態,event發生收到Reactor的調度會觸發後續操做,是阻塞仍是非阻塞?
上面問題的答案貌似是阻塞。那爲什麼叫這個模型爲非阻塞?
個人理解是阻塞是對應用層而言,應用層並無檢測accept、read、write是否完成,而是等待調度因此叫非阻塞。避免應用層檢測的好處上面已經提過 避免過多線程浪費資源、避免上下文切換
1.3 什麼是異步
能夠理解爲應用程序調用了io函數後,繼續執行,io操做完成後能夠經過回調函數獲取結果。
JDK7加入了對io的異步支持,以下面的例子:
public class Test { public static void main(String[] args) throws Exception { AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(Executors.newFixedThreadPool(10)); final AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(group).bind(new InetSocketAddress("127.0.0.1", 8080)); server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() { @Override public void completed(AsynchronousSocketChannel channel, Void p_w_upload) { server.accept(null, this); try { String now = new Date().toString(); ByteBuffer buffer = ByteBuffer.wrap(now.getBytes()); Future<Integer> f = channel.write(buffer); f.get(); channel.close(); } catch (Exception e) { e.printStackTrace(); } } @Override public void failed(Throwable exc, Void p_w_upload) { exc.printStackTrace(); } }); System.out.println("end........ "); group.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); } }
與此相似,使用多線程也能夠實現異步。好比訪問數據庫,下游服務等會block主線程的操做,能夠放在新的線程中執行。
JDK對異步的支持很早就出現了,JDK5就加入了concurrent包 對於多線程異步的支持:
public class Test { public static void main(String[] args) throws Exception{ //第一種方式 ExecutorService executor = Executors.newCachedThreadPool(); Task task = new Task(); Future future = executor.submit(task); executor.shutdown(); System.out.println("task運行結果"+future.get()); } } class Task implements Callable<Integer>{ @Override public Integer call() throws Exception { Thread.sleep(3000); int sum = 0; for(int i=0;i<100;i++) sum += i; return sum; } }
1.4 同步與異步
從上面的說明能夠得出結論,實際應用中的IO模型:
同步阻塞:傳統的BIO屬於這種類型
同步非阻塞:使用IO複用(select epoll)。
異步非阻塞:使用AIO,基於回調或者阻塞等待異步操做完成後繼續主程序。
2. Java NIO
2.1 核心概念
Channel
Buffer
Selector
2.2 NIO的優點
Nio與傳統BIO相比的優點在於利用了IO多路複用
面向緩衝區
避免資源太高佔用,BIO中等待線程太高會引起兩類問題:一是佔用過多線程資源,二是過多線程致使的上下問切換浪費CPU資源
高併發鏈接時,性能不會線性降低
epoll能夠經過mmap加速內核空間與用戶空間消息傳遞
文件鎖
3. NIO應用
3.1 Tomcat connector
代碼版本:6.0.44
tomcat實現NIOEndPoint提供對NIO的支持
Acceptor內部類實現接受用戶請求
Poller實現對事件的處理
Tomcat的實現代碼中包含很多Fixme註釋,以及異常的特殊處理(捕獲後僅打印堆棧 程序繼續運行)
重點代碼:
NioEndPoint.start() public void start() throws Exception { // Initialize socket if not done before if (!initialized) { init(); } if (!running) { running = true; paused = false; // Create worker collection if (getUseExecutor()) { if ( executor == null ) { TaskQueue taskqueue = new TaskQueue(); TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-"); executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf); taskqueue.setParent( (ThreadPoolExecutor) executor, this); } } else if ( executor == null ) {//avoid two thread pools being created workers = new WorkerStack(maxThreads); } // Start poller threads pollers = new Poller[getPollerThreadCount()]; for (int i=0; i<pollers.length; i++) { pollers[i] = new Poller(); Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i); pollerThread.setPriority(threadPriority); pollerThread.setDaemon(true); pollerThread.start(); } // Start acceptor threads for (int i = 0; i < acceptorThreadCount; i++) { Thread acceptorThread = new Thread(new Acceptor(), getName() + "-Acceptor-" + i); acceptorThread.setPriority(threadPriority); acceptorThread.setDaemon(daemon); acceptorThread.start(); } } }
Acceptor
protected class Acceptor implements Runnable { /** * The background thread that listens for incoming TCP/IP connections and * hands them off to an appropriate processor. */ public void run() { // Loop until we receive a shutdown command while (running) { // Loop if endpoint is paused while (paused) { try { Thread.sleep(1000); } catch (InterruptedException e) { // Ignore } } try { // Accept the next incoming connection from the server socket SocketChannel socket = serverSock.accept(); // Hand this socket off to an appropriate processor //TODO FIXME - this is currently a blocking call, meaning we will be blocking //further accepts until there is a thread available. if ( running && (!paused) && socket != null ) { //processSocket(socket); if (!setSocketOptions(socket)) { try { socket.socket().close(); socket.close(); } catch (IOException ix) { if (log.isDebugEnabled()) log.debug("", ix); } } } }catch (SocketTimeoutException sx) { //normal condition }catch ( IOException x ) { if ( running ) log.error(sm.getString("endpoint.accept.fail"), x); } catch (OutOfMemoryError oom) { try { oomParachuteData = null; releaseCaches(); log.error("", oom); }catch ( Throwable oomt ) { try { try { System.err.println(oomParachuteMsg); oomt.printStackTrace(); }catch (Throwable letsHopeWeDontGetHere){} }catch (Throwable letsHopeWeDontGetHere){} } } catch (Throwable t) { log.error(sm.getString("endpoint.accept.fail"), t); } }//while }//run }
Poller:註冊、處理事件
public void add(final NioChannel socket, final int interestOps) { PollerEvent r = eventCache.poll(); if ( r==null) r = new PollerEvent(socket,null,interestOps); else r.reset(socket,null,interestOps); addEvent(r); } /** * Processes events in the event queue of the Poller. * * @return <code>true</code> if some events were processed, * <code>false</code> if queue was empty */ public boolean events() { boolean result = false; Runnable r = null; while ( (r = (Runnable)events.poll()) != null ) { result = true; try { r.run(); if ( r instanceof PollerEvent ) { ((PollerEvent)r).reset(); eventCache.offer((PollerEvent)r); } } catch ( Throwable x ) { log.error("",x); } } return result; } public void register(final NioChannel socket) { socket.setPoller(this); KeyAttachment key = keyCache.poll(); final KeyAttachment ka = key!=null?key:new KeyAttachment(); ka.reset(this,socket,getSocketProperties().getSoTimeout()); PollerEvent r = eventCache.poll(); ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into. if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER); else r.reset(socket,ka,OP_REGISTER); addEvent(r); } public void cancelledKey(SelectionKey key, SocketStatus status, boolean dispatch) { try { if ( key == null ) return;//nothing to do KeyAttachment ka = (KeyAttachment) key.p_w_upload(); if (ka != null && ka.getComet() && status != null) { //the comet event takes care of clean up //processSocket(ka.getChannel(), status, dispatch); ka.setComet(false);//to avoid a loop if (status == SocketStatus.TIMEOUT ) { processSocket(ka.getChannel(), status, true); return; // don't close on comet timeout } else { processSocket(ka.getChannel(), status, false); //don't dispatch if the lines below are cancelling the key } } key.attach(null); if (ka!=null) handler.release(ka.getChannel()); if (key.isValid()) key.cancel(); if (key.channel().isOpen()) try {key.channel().close();}catch (Exception ignore){} try {if (ka!=null) ka.channel.close(true);}catch (Exception ignore){} try {if (ka!=null && ka.getSendfileData()!=null && ka.getSendfileData().fchannel!=null && ka.getSendfileData().fchannel.isOpen()) ka.getSendfileData().fchannel.close();}catch (Exception ignore){} if (ka!=null) ka.reset(); } catch (Throwable e) { if ( log.isDebugEnabled() ) log.error("",e); // Ignore } } /** * The background thread that listens for incoming TCP/IP connections and * hands them off to an appropriate processor. */ public void run() { // Loop until we receive a shutdown command while (running) { try { // Loop if endpoint is paused while (paused && (!close) ) { try { Thread.sleep(100); } catch (InterruptedException e) { // Ignore } } boolean hasEvents = false; hasEvents = (hasEvents | events()); // Time to terminate? if (close) { timeout(0, false); break; } int keyCount = 0; try { if ( !close ) { if (wakeupCounter.getAndSet(-1) > 0) { //if we are here, means we have other stuff to do //do a non blocking select keyCount = selector.selectNow(); } else { keyCount = selector.select(selectorTimeout); } wakeupCounter.set(0); } if (close) { timeout(0, false); selector.close(); break; } } catch ( NullPointerException x ) { //sun bug 5076772 on windows JDK 1.5 if ( log.isDebugEnabled() ) log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5",x); if ( wakeupCounter == null || selector == null ) throw x; continue; } catch ( CancelledKeyException x ) { //sun bug 5076772 on windows JDK 1.5 if ( log.isDebugEnabled() ) log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5",x); if ( wakeupCounter == null || selector == null ) throw x; continue; } catch (Throwable x) { log.error("",x); continue; } //either we timed out or we woke up, process events first if ( keyCount == 0 ) hasEvents = (hasEvents | events()); Iterator iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; // Walk through the collection of ready keys and dispatch // any active event. while (iterator != null && iterator.hasNext()) { SelectionKey sk = (SelectionKey) iterator.next(); KeyAttachment p_w_upload = (KeyAttachment)sk.p_w_upload(); // Attachment may be null if another thread has called // cancelledKey() if (p_w_upload == null) { iterator.remove(); } else { p_w_upload.access(); iterator.remove(); processKey(sk, p_w_upload); } }//while //process timeouts timeout(keyCount,hasEvents); if ( oomParachute > 0 && oomParachuteData == null ) checkParachute(); } catch (OutOfMemoryError oom) { try { oomParachuteData = null; releaseCaches(); log.error("", oom); }catch ( Throwable oomt ) { try { System.err.println(oomParachuteMsg); oomt.printStackTrace(); }catch (Throwable letsHopeWeDontGetHere){} } } }//while synchronized (this) { this.notifyAll(); } stopLatch.countDown(); } protected boolean processKey(SelectionKey sk, KeyAttachment p_w_upload) { boolean result = true; try { if ( close ) { cancelledKey(sk, SocketStatus.STOP, false); } else if ( sk.isValid() && p_w_upload != null ) { p_w_upload.access();//make sure we don't time out valid sockets sk.attach(p_w_upload);//cant remember why this is here NioChannel channel = p_w_upload.getChannel(); if (sk.isReadable() || sk.isWritable() ) { if ( p_w_upload.getSendfileData() != null ) { processSendfile(sk,p_w_upload,true, false); } else if ( p_w_upload.getComet() ) { //check if thread is available if ( isWorkerAvailable() ) { //set interest ops to 0 so we don't get multiple //invokations for both read and write on separate threads reg(sk, p_w_upload, 0); //read goes before write if (sk.isReadable()) { //read notification if (!processSocket(channel, SocketStatus.OPEN)) processSocket(channel, SocketStatus.DISCONNECT); } else { //future placement of a WRITE notif if (!processSocket(channel, SocketStatus.OPEN)) processSocket(channel, SocketStatus.DISCONNECT); } } else { result = false; } } else { //later on, improve latch behavior if ( isWorkerAvailable() ) { unreg(sk, p_w_upload,sk.readyOps()); boolean close = (!processSocket(channel)); if (close) { cancelledKey(sk,SocketStatus.DISCONNECT,false); } } else { result = false; } } } } else { //invalid key cancelledKey(sk, SocketStatus.ERROR,false); } } catch ( CancelledKeyException ckx ) { cancelledKey(sk, SocketStatus.ERROR,false); } catch (Throwable t) { log.error("",t); } return result; } PollerEvent public class PollerEvent implements Runnable { protected NioChannel socket; protected int interestOps; protected KeyAttachment key; public PollerEvent(NioChannel ch, KeyAttachment k, int intOps) { reset(ch, k, intOps); } public void reset(NioChannel ch, KeyAttachment k, int intOps) { socket = ch; interestOps = intOps; key = k; } public void reset() { reset(null, null, 0); } public void run() { if ( interestOps == OP_REGISTER ) { try { socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, key); } catch (Exception x) { log.error("", x); } } else { final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); try { boolean cancel = false; if (key != null) { final KeyAttachment att = (KeyAttachment) key.p_w_upload(); if ( att!=null ) { //handle callback flag if (att.getComet() && (interestOps & OP_CALLBACK) == OP_CALLBACK ) { att.setCometNotify(true); } else { att.setCometNotify(false); } interestOps = (interestOps & (~OP_CALLBACK));//remove the callback flag att.access();//to prevent timeout //we are registering the key to start with, reset the fairness counter. int ops = key.interestOps() | interestOps; att.interestOps(ops); key.interestOps(ops); att.setCometOps(ops); } else { cancel = true; } } else { cancel = true; } if ( cancel ) socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false); }catch (CancelledKeyException ckx) { try { socket.getPoller().cancelledKey(key,SocketStatus.DISCONNECT,true); }catch (Exception ignore) {} } }//end if }//run public String toString() { return super.toString()+"[intOps="+this.interestOps+"]"; } }
3.2 Jetty NIO
3.3 Netty
編碼解碼:能夠定製數據的編碼解碼方式,使用高效的二進制數據。
多協議開發:能夠基於http或者私有協議
特性:
0拷貝
異步非阻塞
內存池ByteBuf
主從Reactor
無鎖化的串行設計理念
TCP參數定製
3.4 總結
比較Tomcat、jetty與Netty能夠發現,Tomcat、Jetty的connector只是對nio接口的簡單實現,Netty提供了更方便使用的可編程API,提供了更豐富的高級功能。
參考Unix網絡編程 Volumn1Linux網絡編程scale in javaNetty權威指南Netty系列之Netty高性能之道