注: NIO2(AIO) 即異步IOjava
AIO
和NIO2
實際上是一回事,如同孫行者、者行孫其實都是孫猴子,只是名稱不一樣本質都同樣git
那麼如何理解這個概念呢?舉個例子github
假設一個妹子有許多的舔狗(SpareTire),若是妹子想要完成某件事最簡單、高效的方法是什麼?web
答案是,舔狗那麼多,交給他們去辦就ok了。那麼狗子辦事期間,妹子會一直等待狗子把事情作好嗎?不行,這期間固然能夠繼續將其餘任務派發給其餘狗子。當狗子辦事期間,若是有須要妹子處理的事情,通知處理一下便可。spring
固然狗子通常都是處理一些重活累活,好比數據拷貝、I/O啊,接收新鏈接啥的(太慘了)。妹子則專一於核心業務的處理。apache
在這個例子中,妹子至關於核心業務線程,主要用來處理業務邏輯,而狗子們則是(內核+I/O線程)的抽象。後端
P.S.api
若是你瞭解NIO2,建議你直接閱讀NIO2模型解讀章節,不須要再閱讀NIO2 DEMO章節(時間寶貴)tomcat
你能夠直接越過全部章節去看總結,也能夠簡單閱讀附錄直接上手調試代碼多線程
NIO2中有個核心點,就是內核負責主要負責通知程序有什麼事件,而鏈接的接收以及數據的拷貝仍是須要程序提供線程來作這些事情,你能夠理解爲妹子(核心業務線程)須要提供舔狗池(線程池)給內核來作這些事情
talk is cheap, show me your hair
若是你想要學習一下NIO2,能夠點擊學習
該源碼的註釋爲GBK編碼,若是你看到註釋爲亂碼,最好將其改成GBK編碼
這是一個Demo,值得注意的是雖然該例子中並無顯式的建立線程池,這是由於若是你在open()服務端的時候,若是沒有顯示指定,系統將會默認分配給ServerSocketChannel一個線程池,用於事件的處理,咱們能夠打開JConsole驗證一下.
channel = AsynchronousServerSocketChannel.open();
複製代碼
如圖所示thread-0到thread-4就是系統默認分配的線程池,用來處理I/O事件。(天賜舔狗)
想象一下,若是咱們在處理I/O事件的時候將全部線程都阻塞住了,那麼整個系統的I/O都將陷入阻塞, 以下圖所示。
在有新的I/O事件到來的時候,內核會選擇一個線程來處理這些I/O事件,若是處理I/O的線程陷入阻塞,那麼來自客戶端的請求將會一直被阻塞住,沒法返回。所以處理I/O事件的線程最好只處理I/O事件(接收新鏈接、將數據從內核拷貝到線程中)
你能夠理解爲,舔狗最好只作舔狗該作的事情,即重活累活,至於核心業務或者會發生阻塞的狀況的事件最好提交給妹子(業務邏輯處理線程池)來處理。
關鍵類
org.apache.tomcat.util.net.Nio2Endpoint
既然是講解NIO2的處理模型,那麼咱們有必要了解如下關鍵角色
Nio2Acceptor Acceptor並不與特定的線程綁定,而是當由新鏈接到來從線程池中選擇一個線程來執行Acceptor的代碼,這一個過程是由底層幫咱們完成的,Acceptor的主要任務是接收新鏈接,併爲該鏈接註冊讀寫的處理對象
LimitLatch 限制鏈接數,在異步I/O狀況限制鏈接數的主要方式就是鎖阻塞用於I/O事件的線程池中的線程
I/O處理器 處理I/O的類,與Nio2Acceptor運行在同一個線程池中
異步ServerSocket啓動的流程較爲枯燥,若是你不想看代碼,如下爲其啓動的流程
AsynchronousChannelGroup
@Override
public void bind() throws Exception {
// 建立線程池
if (getExecutor() == null) {
createExecutor();
}
if (getExecutor() instanceof ExecutorService) {
//建立用於I/O的線程池(須要用AsynchronousChannelGroup包裝,才能提供給AsynchronousServerSocketChanne用)
threadGroup = AsynchronousChannelGroup.withThreadPool((ExecutorService) getExecutor());
}
// AsynchronousChannelGroup needs exclusive access to its executor service
if (!internalExecutor) {
log.warn(sm.getString("endpoint.nio2.exclusiveExecutor"));
}
//建立ServerSocketChannel
serverSock = AsynchronousServerSocketChannel.open(threadGroup);
socketProperties.setProperties(serverSock);
InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset());
//綁定端口並設置backlog的參數
//backlog能夠理解爲當前最大待執行accept操做的鏈接數
serverSock.bind(addr, getAcceptCount());
// Initialize SSL if needed
initialiseSsl();
}
複製代碼
以下圖,就是和當前異步ServerSocketChannel綁定的線程池,801表示該鏈接器所監聽的端口(附錄中有開啓NIO2的教程)
Nio2Acceptor主要功能接收新鏈接
,並限制最大鏈接數
,由於採用的是異步I/O,因此Acceptor並不會於特定的線程綁定,而是當新任務須要執行的時候,從線程池中選一個執行任務。以下圖所示當有客戶端新鏈接到達時,程序會從線程池選擇一個線程來執行Nio2Acceptor的completed方法並傳入客戶端Socket開始執行新鏈接處理的業務邏輯
在異步I/O中咱們須要向ServerSocketChannel註冊處理Accept事件的處理器以便完成鏈接事件的處理 如如下代碼所示,當tomcat啓動的時候,會開啓一個線程調用
Nio2SocketAcceptor
的run方法,將Nio2SocketAcceptor
註冊爲ServerSocketChannel
的accept事件處理器
protected class Nio2Acceptor extends Acceptor<AsynchronousSocketChannel> implements CompletionHandler<AsynchronousSocketChannel, Void> {
...
@Override
public void run() {
// The initial accept will be called in a separate utility thread
if (!isPaused()) {
// 鏈接數限制,若是達到最大鏈接數,則調用此方法的線程會陷入等待
try {
countUpOrAwaitConnection();
} catch (InterruptedException e) {
// Ignore
}
if (!isPaused()) {
//將本身註冊爲accept事件的處理器(注意此類實現的接口)
serverSock.accept(null, this);
} else {
state = AcceptorState.PAUSED;
}
} else {
state = AcceptorState.PAUSED;
}
}
...
}
複製代碼
當有新鏈接到達時,底層會從線程池選擇一個線程來執行completed方法並傳入客戶端socket,此時該方法主要的流程以下
@Override
public void completed(AsynchronousSocketChannel socket, Void attachment) {
// Successful accept, reset the error delay
errorDelay = 0;
// Continue processing the socket on the current thread
// Configure the socket
if (isRunning() && !isPaused()) {
//檢查限制的最大鏈接數,若是沒有設置(即-1)則不進行鏈接數限制
if (getMaxConnections() == -1) {
serverSock.accept(null, this);
} else {
//因爲有新鏈接的到達,所以須要從線程池選一個線程執行增長鏈接數的操做,此操做可能會發生阻塞
getExecutor().execute(this);
}
//執行後續的I/O事件處理
if (!setSocketOptions(socket)) {
closeSocket(socket);
}
} else {
if (isRunning()) {
state = AcceptorState.PAUSED;
}
destroySocket(socket);
}
}
複製代碼
因爲Acceptor並不與特定的線程綁定, 所以若是須要限制最大鏈接數,須要使用鎖將空閒的線程阻塞住,這也時爲何須要在accept新鏈接的時候須要向線程池提交增長新鏈接數
的任務,以下所示(也就是調用Nio2SocketAcceptor的run方法)
public void completed(AsynchronousSocketChannel socket, Void attachment) {
...
getExecutor().execute(this);
...
}
複製代碼
除此以外,還記得在建立ServerSocketChannel
的時候咱們設置了backlog
參數嗎?
該參數主要用於設置當前ServerSocket所容許的最大未accept的鏈接數,也就是說若是超過了未accept得鏈接數backlog所設置的值,那麼新來的鏈接都將會被丟棄掉。(API文檔)
既然是異步I/O,那麼必然要在客戶端Socket註冊讀寫的CompletionHandler
, 所以setSocketOptions
必然會致使這一步驟的發生,那麼此步驟發生在何時呢?
通過Debug跟蹤發現setSocketOptions
將會致使Nio2SocketWrapper
的建立,而實際I/O流程就發生在新建Nio2SocketWrapper對象時所建立的readCompletionHandler
中, 如下是其代碼
ReadCompletionHandler 用於監聽讀事件,在讀取到數據以後會調用processSocket方法開始數據的解析工做
public Nio2SocketWrapper(Nio2Channel channel, final Nio2Endpoint endpoint) {
super(channel, endpoint);
nioChannels = endpoint.getNioChannels();
socketBufferHandler = channel.getBufHandler();
this.readCompletionHandler = new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer nBytes, ByteBuffer attachment) {
if (log.isDebugEnabled()) {
log.debug("Socket: [" + Nio2SocketWrapper.this + "], Interest: [" + readInterest + "]");
}
readNotify = false;
//加鎖,其餘線程可能會對標誌位進行修改
synchronized (readCompletionHandler) {
//nBytes表示讀取到的字節數,若是小於0
//拋出EOF異常,沒數據讀,那咋辦嗎,只好拋異常了
if (nBytes.intValue() < 0) {
failed(new EOFException(), attachment);
} else {
if (readInterest && !Nio2Endpoint.isInline()) {
readNotify = true;
} else {
// Release here since there will be no
// notify/dispatch to do the release.
readPending.release();
}
readInterest = false;
}
}
if (readNotify) {
//處理讀事件
getEndpoint().processSocket(Nio2SocketWrapper.this, SocketEvent.OPEN_READ, false);
}
}
//省略代碼,後面太長了
...
複製代碼
debug驗證一下,以下圖所示,attachment即咱們所讀到的數據
注意 在debug的時候IDEA可能發出切換線程的請求(讀數據和以前的操做是不在一個線程上的,以下所示
Tomcat NIO2 模型
總結以下accept事件和 I/O事件共用一個線程池,不會和特定線程綁定
Acceptor(Nio2Acceptor) 用於接收新鏈接,並註冊I/O事件的處理對象
LimitLatch經過阻塞住線程池中的線程來實現鏈接數限制功能
I/O Handler 即在Nio2SocketWrapper
註冊的讀寫處理器,有I/O事件到達時,程序會選擇一個線程來執行這些處理器的代碼
整體流程以下 新鏈接到達->選擇一個線程執行Nio2Acceptor
的代碼->向線程池中提交增長鏈接數的任務->註冊讀寫處理事件->I/O事件到達,選擇一個線程處理I/O事件
不要使用默認線程池 在異步ServerSocketChannel建立的時候,tomcat會本身建立一個線程池,而不是使用默認提供的線程池,因爲線程池在咱們掌握之中,由此才實現了鏈接數限制的功能
不要阻塞I/O線程 I/O線程就要有I/O線程的亞子,不要在I/O線程執行會發生長時間阻塞的操做
後端程序猿都曉得,SpringBoot中內嵌了tomcat(固然還有jetty,取決於你如何選擇), 所以咱們能夠新建一個SpringBoot應用來專門調試學習Tomcat的源碼。
如下爲tomcat調試的過程
若是你要測試Tomcat的NIO處理方式,在如下類打斷點 (若是你想要了解tomcat中NIO的處理方式,能夠看看個人理解)
package org.apache.tomcat.util.net;
public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> {
...
public class Poller implements Runnable {
public void run() {
//此方法的代碼位於692行
}
}
...
}
複製代碼
若是你要測試Tomcat的NIO2的處理方式,則須要如下配置 將如下代碼添加到你的代碼中。(因爲SpringBoot中內嵌的tomcat默認I/O方式爲NIO因此咱們須要經過配置增長NIO2的鏈接器)
import org.apache.catalina.connector.Connector;
import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ConnectorConf {
//注意你的SpringBoot版本,此項目的版本是2.2.0,舊的版本1.5使用不一樣的類進行配置
@Bean
public TomcatServletWebServerFactory servletContainer() {
TomcatServletWebServerFactory tomcatServletWebServerFactory =
new TomcatServletWebServerFactory();
tomcatServletWebServerFactory.addAdditionalTomcatConnectors(getConnector());
return tomcatServletWebServerFactory;
}
private Connector getConnector() {
// 關鍵點哦
Connector connector = new Connector("org.apache.coyote.http11.Http11Nio2Protocol");
//將鏈接器的端口設置爲801,這樣訪問801端口的就是NIO2的模式了
connector.setPort(801);
return connector;
}
}
複製代碼
去org.apache.tomcat.util.net.Nio2Endpoint
打上斷點就完事了
在多線程的狀況下,可能會出現進入不了斷點的狀況,此時只需在斷點上右鍵選擇Thread
便可, 當其餘線程到達斷點時IDEA法發出通知,以下圖所示