我的單方面認爲,NIO與BIO的最大區別在於主動和被動,使用BIO的方式須要等待被調用方返回數據,很明顯此時調用者是被動的。java
舉個例子apache
阻塞IO
假設你是一個膽小又害羞的男孩子,你約了隔壁測試的妹子,但你並不敢主動約會,因此你把本身的手機號碼給她,並暗示她想要約會的時候打電話給你。很明顯此時你陷入了被動,約不約會的結果須要妹子主動告知你,若是她忘了,那麼你要陷入長時間的等待中以及無盡的猜想和自我懷疑中(太慘了)。[若是你是一個膽小害羞又好色的男孩子,那就慘了]緩存
非阻塞IO 咱們知道,渣男一般有不少的備胎,我管這個叫作備胎池(SpareTirePool), 那麼當他想要約會的時候,只要羣發問妹子要不要約會,若是要約會的話就和妹子約會,約會結束以後,處理其餘約會事件,若是沒有繼續下一次詢問。在這個例子中約會能夠視爲IO事件,問妹子的過程能夠視爲備胎池的輪詢。tomcat
若是你要學習NIO,能夠學習網絡
既然是網絡通訊的I/O那必然有如下兩個步驟併發
關鍵代碼在 package org.apache.tomcat.util.net.NioEndpoint 中app
P.S. 文章太長,若是不想看能夠直接閱讀結論socket
在最開始看代碼,是震驚的,真的,若是你看Reactor模型的話ide
如下bind方法代碼是啓動ServerSocket的流程,主要流程以下高併發
@Override public void bind() throws Exception { if (!getUseInheritedChannel()) { serverSock = ServerSocketChannel.open(); socketProperties.setProperties(serverSock.socket()); InetSocketAddress addr = (getAddress()!=null?new InetSocketAddress(getAddress(),getPort()):new InetSocketAddress(getPort())); serverSock.socket().bind(addr,getAcceptCount()); } else { // Retrieve the channel provided by the OS Channel ic = System.inheritedChannel(); if (ic instanceof ServerSocketChannel) { serverSock = (ServerSocketChannel) ic; } if (serverSock == null) { throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited")); } } // 以阻塞的方式來接收鏈接!! serverSock.configureBlocking(true); //mimic APR behavior // 設置Acceptor和Poller的數量 if (acceptorThreadCount == 0) { // FIXME: Doesn't seem to work that well with multiple accept threads // 顧名思義,Acceptor是用來處理新鏈接的 acceptorThreadCount = 1; } if (pollerThreadCount <= 0) { // Poller 用來處理I/O事件 pollerThreadCount = 1; } setStopLatch(new CountDownLatch(pollerThreadCount)); // Initialize SSL if needed initialiseSsl(); // 今後處能夠看出tomcat池化了selector selectorPool.open(); }
先說結論,Tomcat NIO模型中有如下關鍵角色
Acceptor的主要工做就是不斷接收來自客戶端的鏈接,在簡單處理以後將該鏈接交給Poller處理
接收來自客戶端鏈接, 若是你不想看代碼,如下是其主要流程
@Override public void run() { int errorDelay = 0; // running的檢測貫穿了Accpetor的處理流程,在每次關鍵操做的時候都會執行檢測 while (running) { // 若是進入暫停狀態則每隔一段時間檢測一下 while (paused && running) { state = AcceptorState.PAUSED; try { Thread.sleep(50); } catch (InterruptedException e) { // Ignore } } // 再次檢測 if (!running) { break; } state = AcceptorState.RUNNING; try { //檢查是否達到最大鏈接數若是是則陷入等待,若是不是則增長當前鏈接數 countUpOrAwaitConnection(); SocketChannel socket = null; try { //接收新鏈接 socket = serverSock.accept(); } catch (IOException ioe) { // 發生異常,則減小鏈接數 countDownConnection(); if (running) { handleExceptionWithDelay(errorDelay); // re-throw throw ioe; } else { break; } } // Successful accept, reset the error delay errorDelay = 0; // Configure the socket if (running && !paused) { //setSocketOptions會致使將該鏈接交給Poller處理 if (!setSocketOptions(socket)) { closeSocket(socket); } } else { closeSocket(socket); } } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error(sm.getString("endpoint.accept.fail"), t); } } state = AcceptorState.ENDED; }
再來看看setSocketOptions作了什麼,不想看代碼的話,總結以下
protected boolean setSocketOptions(SocketChannel socket) { // Process the connection try { //設置爲非阻塞模式,以便經過selector進行查詢 socket.configureBlocking(false); Socket sock = socket.socket(); socketProperties.setProperties(sock); //從對象池中獲取一個NioChannel,tomcat會複用一切能夠複用的對象以減小建立新對象所帶來的消耗 NioChannel channel = nioChannels.pop(); if (channel == null) { // 沒有獲取到,那就新建一個唄 SocketBufferHandler bufhandler = new SocketBufferHandler( socketProperties.getAppReadBufSize(), socketProperties.getAppWriteBufSize(), socketProperties.getDirectBuffer()); // SSL這一塊還沒研究 if (isSSLEnabled()) { channel = new SecureNioChannel(socket, bufhandler, selectorPool, this); } else { channel = new NioChannel(socket, bufhandler); } } else { channel.setIOChannel(socket); //從新設置SocketBufferHandler,將其設置爲可寫和可讀 channel.reset(); } //從Poller池中獲取一個Poller(按照次序獲取,能夠理解爲一個圓環),並將Channel註冊到上面 getPoller0().register(channel); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); try { log.error("",t); } catch (Throwable tt) { ExceptionUtils.handleThrowable(tt); } // Tell to close the socket return false; } return true; }
從鏈接註冊到Poller提及
具體說明見代碼
關鍵點:對一個數A取餘會將餘數的結果限制在A的範圍內
/** * Return an available poller in true round robin fashion. * 很明顯,取餘的方式揭示了獲取Poller的方法。你能夠理解爲 * Poller會組成一個圓環,這樣咱們就能夠經過不斷遞增獲取 * 下一個Poller,可是數據會溢出因此咱們要取絕對值 * @return The next poller in sequence */ public Poller getPoller0() { int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length; return pollers[idx]; }
該方法會對新的建的鏈接進行封裝,並以PollerEvent的形式註冊到相應的Poller中
須要注意的是,真正的註冊讀事件並非在此方法註冊的(當前方法調用者爲Acceptor線程),而是在Poller線程中註冊讀事件的
/** * Registers a newly created socket with the poller. * 將新建的socket註冊到Poller上 * @param socket The newly created socket */ public void register(final NioChannel socket) { //如下代碼爲設置各類參數,能夠從方法名進行推測,再也不進行敘述 socket.setPoller(this); NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this); socket.setSocketWrapper(ka); ka.setPoller(this); ka.setReadTimeout(getSocketProperties().getSoTimeout()); ka.setWriteTimeout(getSocketProperties().getSoTimeout()); ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests()); ka.setSecure(isSSLEnabled()); ka.setReadTimeout(getConnectionTimeout()); ka.setWriteTimeout(getConnectionTimeout()); //從緩存中獲取一個PollerEvent PollerEvent r = eventCache.pop(); // 註冊讀事件 ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into. // 若是沒有從緩存中獲取,那麼就新建一個 if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER); else r.reset(socket,ka,OP_REGISTER); addEvent(r); }
Poller 處理I/O事件的的代碼較長,並且細節也較多,總結其主要做用以下
@Override public void run() { // Loop until destroy() is called // 一直循環直到destroy方法被調用 while (true) { boolean hasEvents = false; try { if (!close) { // events 方法會處理Acceptor註冊到Poller中的PollerEvent // 主要是註冊讀事件 hasEvents = events(); if (wakeupCounter.getAndSet(-1) > 0) { //if we are here, means we have other stuff to do //do a non blocking select keyCount = selector.selectNow(); } else { keyCount = selector.select(selectorTimeout); } wakeupCounter.set(0); } // 檢測到關閉,則處理剩餘的事件並關閉selector if (close) { // 處理Acceptors註冊到Poller中的PollerEvent events(); //selector time out 或者poller被關閉就會調用timeout方法 timeout(0, false); try { selector.close(); } catch (IOException ioe) { log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe); } break; } } catch (Throwable x) { ExceptionUtils.handleThrowable(x); log.error("",x); continue; } //either we timed out or we woke up, process events first if ( keyCount == 0 ) hasEvents = (hasEvents | events()); // 執行 select 操做,查詢I/O事件 Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; // Walk through the collection of ready keys and dispatch // any active event. while (iterator != null && iterator.hasNext()) { SelectionKey sk = iterator.next(); NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment(); // Attachment may be null if another thread has called // cancelledKey() if (attachment == null) { iterator.remove(); } else { iterator.remove(); // 處理檢測到的I/O事件 processKey(sk, attachment); } }//while //timeout 會檢查是否關閉,若是已經關閉而且有事件未處理會調用cancelledKey方法 //cancelledKey:該方法主要是對和該鏈接相關的資源執行關閉操做 timeout(keyCount,hasEvents); }//while getStopLatch().countDown(); }
processKey主要工做以下
protected void processKey(SelectionKey sk, NioSocketWrapper attachment) { try { if ( close ) { // 若是Poller關閉則關閉和釋放和此鏈接相關的資源 cancelledKey(sk); } else if ( sk.isValid() && attachment != null ) { if (sk.isReadable() || sk.isWritable() ) { if ( attachment.getSendfileData() != null ) { processSendfile(sk,attachment, false); } else { // 取消註冊事件 // sk.interestOps()& (~readyOps) unreg(sk, attachment, sk.readyOps()); boolean closeSocket = false; // Read goes before write 先讀後寫 if (sk.isReadable()) { // 關鍵代碼,調用processSocket方法處理讀事件 if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) { closeSocket = true; } } if (!closeSocket && sk.isWritable()) { if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) { closeSocket = true; } } if (closeSocket) { cancelledKey(sk); } } } } else { //invalid key cancelledKey(sk); } } catch ( CancelledKeyException ckx ) { cancelledKey(sk); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error("",t); } }
processSocket定義在org.apache.tomcat.util.net.AbstractEndPoint中, 也就是意味着不管你採用的是BIO仍是NIO或者NIO2最終讀寫數據都是調用此方法
從代碼中能夠看出,依然是對象池,依然是再次封裝(套娃),並將其提交到線程池中執行,接下來的內容就再也不本次討論範圍內呢。
public boolean processSocket(SocketWrapperBase<S> socketWrapper, SocketEvent event, boolean dispatch) { try { if (socketWrapper == null) { return false; } SocketProcessorBase<S> sc = processorCache.pop(); if (sc == null) { sc = createSocketProcessor(socketWrapper, event); } else { sc.reset(socketWrapper, event); } Executor executor = getExecutor(); if (dispatch && executor != null) { executor.execute(sc); } else { sc.run(); } } catch (RejectedExecutionException ree) { getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree); return false; } catch (Throwable t) { ExceptionUtils.handleThrowable(t); // This means we got an OOM or similar creating a thread, or that // the pool and its queue are full getLog().error(sm.getString("endpoint.process.fail"), t); return false; } return true; }
手抖了,線不怎麼♂
LimitLatch 爲全部的Acceptor共用,用來限制當前的最大鏈接數
Acceptor 以阻塞的形式來接收新鏈接,並將其封裝成PollerEvent對象提交到Poller中
Poller 接收來自Acceptor的PollerEvent並註冊讀事件,以及輪詢和其綁定的客戶端Socket有無讀事件,若是有則執行進一步操做,將其提交到其餘地方執行處理(解析Http協議)
學習源碼就是爲了學習其設計思想. -- 沃茲及.碩德
對象池化 池化對象、池化鏈接能夠大大下降新建對象以及GC所帶來的消耗,當須要使用從池中取出來從新設置相關值便可
環形隊列 雖然這玩意不新鮮,但配合上原子類,就能夠在高併發的狀況,高效的獲取隊列中的下一個元素(環形隊列中索引溢出的處理在以前我是沒有考慮到的)
阻塞獲取連接,非阻塞處理IO事件 與Reactor模型造成強烈的對比,學習NIO的時候思惟被限制住了,認爲非阻塞的獲取鏈接會得到更高的性能,但如今狀況不必定了(還沒測試,哪位老哥試了告訴我一下)
關鍵操做時,對標誌位進行檢測 若是你要經過一個標誌變量來控制你的線程,且線程循環一次須要相對較長的時間(你代碼太長,操做太多)那麼最好在執行關鍵操做以前對你的標誌變量進行檢查,來決定是否要改變線程的行爲(康康poller和Acceptor的代碼)
初次學習Tomcat的代碼,有理解錯誤的地方還請大佬指出
轉載自 https://juejin.im/post/5daea81b518825630e5d1aa9 做者: 柯三