Tomcat 的 Connector 有三種運行模式 bio、nio、apr ,先了解一下這三種的區別。java
java.io
包及其子包)。Tomcat 在默認狀況下,就是以 bio 模式運行的。通常而言 bio 模式是三種運行模式中性能最低的一種。java.nio
包及其子包)。Java nio 是一個基於緩衝區並能提供非阻塞 I/O 操做的 Java API ,所以 nio 也被當作是 non-blocking I/O 的縮寫。它擁有比傳統 I/O 操做( bio )更好的併發運行性能。要讓 Tomcat 以 nio 模式來運行只須要在 Tomcat 安裝目錄/conf/server.xml
文件中將 Connector 節點的 protocol 配置成org.apache.coyote.http11.Http11NioProtocol
便可。寫個 BIO 的 Socket 服務器仍是比較容易的,無非是每 accept 一個 socket 以後就扔到一個線程中處理請求生成響應,這種方式能夠改進的點就是增長線程池的支持,本文主要分析一下 Tomcat 中 NIO 處理方式的相關代碼邏輯。算法
關鍵代碼都是在org.apache.tomcat.util.net.NioEndpoint
這個類裏面,它是 Http11NioProtocol 中負責接收處理 socket 的主要組件,別看代碼很長,仔細閱讀會發現有不少共通的地方,如:apache
java.util.concurrent.ThreadPoolExecutor
的擴展,NioChannel 是對 ByteChannel 的擴展,KeyAttachment 則是對 NioChannel 的包裝先看下整個 Connector 組件結構圖: windows
看過以前 Tomcat 啓動文章的應該都知道,Connector 的啓動會調用 Connector 類的 startInternal 方法,裏面調用了 protocolHandler 的 start() ,該方法中將調用抽象的 endpoint 的 start() 方法,這個方法會調用到具體 Endpoint 類的 startInternal() ,因此代碼分析先從 NioEndpoint 類的 startInternal 看起。數組
/**
* Start the NIO endpoint, creating acceptor, poller threads.
*/
@Override
public void startInternal() throws Exception {
if (!running) {
running = true;
paused = false;
// Create worker collection
if ( getExecutor() == null ) {
// 構造線程池,用於後續執行SocketProcessor線程,這就是上圖中的Worker。
createExecutor();
}
initializeConnectionLatch();
// Start poller threads
// 根據處理器數量構造必定數目的輪詢器,即上圖中的Poller
pollers = new Poller[getPollerThreadCount()];
for (int i=0; i<pollers.length; i++) {
pollers[i] = new Poller();
Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();
}
// 建立接收者線程,即上圖中的Acceptor
startAcceptorThreads();
}
}
複製代碼
startAcceptorThreads 調用的是父類org.apache.tomcat.util.net.AbstractEndpoint
中的實現:緩存
protected final void startAcceptorThreads() {
int count = getAcceptorThreadCount();
acceptors = new Acceptor[count];
for (int i = 0; i < count; i++) {
// 調用子類的createAcceptor方法,本例中即NioEndpoint類的createAcceptor方法
acceptors[i] = createAcceptor();
String threadName = getName() + "-Acceptor-" + i;
acceptors[i].setThreadName(threadName);
Thread t = new Thread(acceptors[i], threadName);
t.setPriority(getAcceptorThreadPriority());
t.setDaemon(getDaemon());
t.start();
}
}
複製代碼
以上就是 Acceptor、Poller、Worker 等核心組件的初始化過程。tomcat
核心組件初始化以後接着就是 Acceptor 線程接收 socket 鏈接,看下 Acceptor 的源碼:bash
// --------------------------------------------------- Acceptor Inner Class
/**
* 後臺線程,用於監聽TCP/IP鏈接以及將它們分發給相應的調度器處理。
* The background thread that listens for incoming TCP/IP connections and
* hands them off to an appropriate processor.
*/
protected class Acceptor extends AbstractEndpoint.Acceptor {
@Override
public void run() {
int errorDelay = 0;
// 循環遍歷直到接收到關閉命令
// Loop until we receive a shutdown command
while (running) {
// Loop if endpoint is paused
while (paused && running) {
state = AcceptorState.PAUSED;
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// Ignore
}
}
if (!running) {
break;
}
state = AcceptorState.RUNNING;
try {
// 若是已經達到最大鏈接數則讓線程等待
//if we have reached max connections, wait
countUpOrAwaitConnection();
SocketChannel socket = null;
try {
// 接收鏈接,這裏用的阻塞模式。
// Accept the next incoming connection from the server
// socket
socket = serverSock.accept();
} catch (IOException ioe) {
//we didn't get a socket
countDownConnection();
// Introduce delay if necessary
errorDelay = handleExceptionWithDelay(errorDelay);
// re-throw
throw ioe;
}
// Successful accept, reset the error delay
errorDelay = 0;
// 注意這個setSocketOptions方法
// 它將把上面接收到的socket添加到輪詢器Poller中
// setSocketOptions() will add channel to the poller
// if successful
if (running && !paused) {
if (!setSocketOptions(socket)) {
countDownConnection();
closeSocket(socket);
}
} else {
countDownConnection();
closeSocket(socket);
}
} catch (SocketTimeoutException sx) {
// Ignore: Normal condition
} catch (IOException x) {
if (running) {
log.error(sm.getString("endpoint.accept.fail"), x);
}
} catch (OutOfMemoryError oom) {
try {
oomParachuteData = null;
releaseCaches();
log.error("", oom);
}catch ( Throwable oomt ) {
try {
try {
System.err.println(oomParachuteMsg);
oomt.printStackTrace();
}catch (Throwable letsHopeWeDontGetHere){
ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
}
}catch (Throwable letsHopeWeDontGetHere){
ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
}
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error(sm.getString("endpoint.accept.fail"), t);
}
}
state = AcceptorState.ENDED;
}
}
複製代碼
在 Acceptor 裏接收到一個鏈接以後調用 setSocketOptions 方法設置 SocketChannel 的一些參數,而後將 SocketChannel 註冊到 Poller 中。看下 setSocketOptions 的實現:服務器
/**
* Process the specified connection.
*/
protected boolean setSocketOptions(SocketChannel socket) {
// Process the connection
try {
// 將SocketChannel配置爲非阻塞模式
//disable blocking, APR style, we are gonna be polling it
socket.configureBlocking(false);
Socket sock = socket.socket();
// 設置Socket參數值(從server.xml的Connector節點上獲取參數值)
// 好比Socket發送、接收的緩存大小、心跳檢測等
socketProperties.setProperties(sock);
// 從NioChannel的緩存隊列中取出一個NioChannel
// NioChannel是SocketChannel的一個的包裝類
// 這裏對上層屏蔽SSL和通常TCP鏈接的差別
NioChannel channel = nioChannels.poll();
// 緩存隊列中沒有則新建一個NioChannel
if ( channel == null ) {
// SSL setup
if (sslContext != null) {
SSLEngine engine = createSSLEngine();
int appbufsize = engine.getSession().getApplicationBufferSize();
NioBufferHandler bufhandler = new NioBufferHandler(Math.max(appbufsize,socketProperties.getAppReadBufSize()),
Math.max(appbufsize,socketProperties.getAppWriteBufSize()),
socketProperties.getDirectBuffer());
channel = new SecureNioChannel(socket, engine, bufhandler, selectorPool);
} else {
// normal tcp setup
NioBufferHandler bufhandler = new NioBufferHandler(socketProperties.getAppReadBufSize(),
socketProperties.getAppWriteBufSize(),
socketProperties.getDirectBuffer());
channel = new NioChannel(socket, bufhandler);
}
} else {
// 將SocketChannel關聯到從緩存隊列中獲取的NioChannel上來
channel.setIOChannel(socket);
if ( channel instanceof SecureNioChannel ) {
SSLEngine engine = createSSLEngine();
((SecureNioChannel)channel).reset(engine);
} else {
channel.reset();
}
}
// 將新接收到的SocketChannel註冊到Poller中
getPoller0().register(channel);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
try {
log.error("",t);
} catch (Throwable tt) {
ExceptionUtils.handleThrowable(t);
}
// Tell to close the socket
return false;
}
return true;
}
複製代碼
核心調用是最後的getPoller0().register(channel);
它將配置好的 SocketChannel 包裝成一個 PollerEvent ,而後加入到 Poller 的 events 緩存隊列中。網絡
getPoller0 方法將輪詢當前的 Poller 數組,從中取出一個 Poller 返回。( Poller 的初始化參見上述第1步:NioEndpoint 類核心組件的初始化)
/**
* Return an available poller in true round robin fashion
*/
public Poller getPoller0() {
// 最簡單的輪詢調度算法,poller的計數器不斷加1再對poller數組取餘數
int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
return pollers[idx];
}
複製代碼
以後調用 Poller 對象的 register 方法:
public void register(final NioChannel socket) {
// 設置socket的Poller引用,便於後續處理
socket.setPoller(this);
// 從NioEndpoint的keyCache緩存隊列中取出一個KeyAttachment
KeyAttachment key = keyCache.poll();
// KeyAttachment實際是NioChannel的包裝類
final KeyAttachment ka = key!=null?key:new KeyAttachment(socket);
// 重置KeyAttachment對象中Poller、NioChannel等成員變量的引用
ka.reset(this,socket,getSocketProperties().getSoTimeout());
ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
ka.setSecure(isSSLEnabled());
// 從Poller的事件對象緩存中取出一個PollerEvent,並用socket初始化事件對象
PollerEvent r = eventCache.poll();
// 設置讀操做爲感興趣的操做
ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
else r.reset(socket,ka,OP_REGISTER);
// 加入到Poller對象裏的事件隊列
addEvent(r);
}
複製代碼
看下 Poller 類裏 addEvent 的代碼:
/**
* Only used in this class. Will be made private in Tomcat 8.0.x
* @deprecated
*/
@Deprecated
public void addEvent(Runnable event) {
events.offer(event);
if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup();
}
複製代碼
就兩行,第一行從 event 對象添加到緩存隊列中,第二行若是當前事件隊列中沒有事件,則喚醒處於阻塞狀態的 selector 。
上面講的是從 Acceptor 中接收到的 Socket 以 PollerEvent 的形式包裝並添加到 Poller 的事件緩存隊列中,接下來看看另一個核心組件 Poller 的處理過程:
/** * Poller class. */ public class Poller implements Runnable { // 這就是NIO中用到的選擇器,能夠看出每個Poller都會關聯一個Selector protected Selector selector; // 待處理的事件隊列 protected ConcurrentLinkedQueue events = new ConcurrentLinkedQueue(); // 喚醒多路複用器的條件閾值 protected AtomicLong wakeupCounter = new AtomicLong(0l); public Poller() throws IOException { // 對Selector的同步訪問,經過調用Selector.open()方法建立一個Selector synchronized (Selector.class) { // Selector.open() isn't thread safe // http://bugs.sun.com/view_bug.do?bug_id=6427854 // Affects 1.6.0_29, fixed in 1.7.0_01 this.selector = Selector.open(); } } // 經過addEvent方法將事件添加到Poller的事件隊列中 /** * Only used in this class. Will be made private in Tomcat 8.0.x * @deprecated */ @Deprecated public void addEvent(Runnable event) { events.offer(event); // 若是隊列中沒有待處理的事件則喚醒處於阻塞狀態的selector if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup(); } // 處理事件隊列中的全部事件,若是事件隊列是空的則返回false /** * Processes events in the event queue of the Poller. * * @returntrue複製代碼
if some events were processed, *false複製代碼
if queue was empty */ public boolean events() { boolean result = false; Runnable r = null; // 將Poller的事件隊列中的事件逐個取出並執行相應的事件線程 while ( (r = events.poll()) != null ) { result = true; try { // 執行事件處理邏輯 // 這裏將事件設計成線程是將具體的事件處理邏輯和事件框架分開 r.run(); if ( r instanceof PollerEvent ) { ((PollerEvent)r).reset(); // 事件處理完以後,將事件對象返回NIOEndpoint的事件對象緩存中 eventCache.offer((PollerEvent)r); } } catch ( Throwable x ) { log.error("",x); } } return result; } // 將socket包裝成統一的事件對象PollerEvent,加入到待處理事件隊列中 public void register(final NioChannel socket) { socket.setPoller(this); KeyAttachment key = keyCache.poll(); final KeyAttachment ka = key!=null?key:new KeyAttachment(socket); ka.reset(this,socket,getSocketProperties().getSoTimeout()); ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests()); ka.setSecure(isSSLEnabled()); // 從NIOEndpoint的事件對象緩存中取出一個事件對象 PollerEvent r = eventCache.poll(); ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into. if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER); else r.reset(socket,ka,OP_REGISTER); // 將事件添加打Poller的事件隊列中 addEvent(r); } // Poller是一個線程,該線程同Acceptor同樣會監聽TCP/IP鏈接並將它們交給合適的處理器處理 /** * The background thread that listens for incoming TCP/IP connections and * hands them off to an appropriate processor. */ @Override public void run() { // Loop until destroy() is called while (true) { try { // Loop if endpoint is paused while (paused && (!close) ) { try { Thread.sleep(100); } catch (InterruptedException e) { // Ignore } } boolean hasEvents = false; // Time to terminate? if (close) { events(); timeout(0, false); try { selector.close(); } catch (IOException ioe) { log.error(sm.getString( "endpoint.nio.selectorCloseFail"), ioe); } break; } else { // 執行事件隊列中的事件線程 hasEvents = events(); } try { if ( !close ) { if (wakeupCounter.getAndSet(-1) > 0) { // 把wakeupCounter設成-1,這是與addEvent裏的代碼呼應,這樣會喚醒selector //if we are here, means we have other stuff to do //do a non blocking select // 以非阻塞方式查看selector是否有事件發生 keyCount = selector.selectNow(); } else { // 查看selector是否有事件發生,超過指定時間則當即返回 keyCount = selector.select(selectorTimeout); } wakeupCounter.set(0); } if (close) { // 執行事件隊列中的事件線程 events(); timeout(0, false); try { selector.close(); } catch (IOException ioe) { log.error(sm.getString( "endpoint.nio.selectorCloseFail"), ioe); } break; } } catch ( NullPointerException x ) { //sun bug 5076772 on windows JDK 1.5 if ( log.isDebugEnabled() ) log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5",x); if ( wakeupCounter == null || selector == null ) throw x; continue; } catch ( CancelledKeyException x ) { //sun bug 5076772 on windows JDK 1.5 if ( log.isDebugEnabled() ) log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5",x); if ( wakeupCounter == null || selector == null ) throw x; continue; } catch (Throwable x) { ExceptionUtils.handleThrowable(x); log.error("",x); continue; } //either we timed out or we woke up, process events first if ( keyCount == 0 ) hasEvents = (hasEvents | events()); Iterator iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; // 根據向selector中註冊的key遍歷channel中已經就緒的keys,並處理這些key // Walk through the collection of ready keys and dispatch // any active event. while (iterator != null && iterator.hasNext()) { SelectionKey sk = iterator.next(); // 這裏的attachment方法返回的就是在register()方法中註冊的 // 而KeyAttachment對象是對socket的包裝 KeyAttachment attachment = (KeyAttachment)sk.attachment(); // Attachment may be null if another thread has called // cancelledKey() if (attachment == null) { iterator.remove(); } else { // 更新通道最近一次發生事件的時間 // 防止因超時沒有事件發生而被剔除出selector attachment.access(); iterator.remove(); // 具體處理通道的邏輯 processKey(sk, attachment); } }//while //process timeouts // 多路複用器每執行一遍完整的輪詢便查看全部通道是否超時 // 對超時的通道將會被剔除出多路複用器 timeout(keyCount,hasEvents); if ( oomParachute > 0 && oomParachuteData == null ) checkParachute(); } catch (OutOfMemoryError oom) { try { oomParachuteData = null; releaseCaches(); log.error("", oom); }catch ( Throwable oomt ) { try { System.err.println(oomParachuteMsg); oomt.printStackTrace(); }catch (Throwable letsHopeWeDontGetHere){ ExceptionUtils.handleThrowable(letsHopeWeDontGetHere); } } } }//while synchronized (this) { this.notifyAll(); } stopLatch.countDown(); } // 處理selector檢測到的通道事件 protected boolean processKey(SelectionKey sk, KeyAttachment attachment) { boolean result = true; try { if ( close ) { cancelledKey(sk, SocketStatus.STOP, attachment.comet); } else if ( sk.isValid() && attachment != null ) { // 確保通道不會因超時而被剔除 attachment.access();//make sure we don't time out valid sockets sk.attach(attachment);//cant remember why this is here NioChannel channel = attachment.getChannel(); // 處理通道發生的讀寫事件 if (sk.isReadable() || sk.isWritable() ) { if ( attachment.getSendfileData() != null ) { processSendfile(sk,attachment, false); } else { if ( isWorkerAvailable() ) { // 在通道上註銷對已經發生事件的關注 unreg(sk, attachment, sk.readyOps()); boolean closeSocket = false; // Read goes before write if (sk.isReadable()) { // 具體的通道處理邏輯 if (!processSocket(channel, SocketStatus.OPEN_READ, true)) { closeSocket = true; } } if (!closeSocket && sk.isWritable()) { if (!processSocket(channel, SocketStatus.OPEN_WRITE, true)) { closeSocket = true; } } if (closeSocket) { // 解除無效通道 cancelledKey(sk,SocketStatus.DISCONNECT,false); } } else { result = false; } } } } else { //invalid key cancelledKey(sk, SocketStatus.ERROR,false); } } catch ( CancelledKeyException ckx ) { cancelledKey(sk, SocketStatus.ERROR,false); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error("",t); } return result; } // 這個unreg()很巧妙,防止了通道對同一個事件不斷select的問題 protected void unreg(SelectionKey sk, KeyAttachment attachment, int readyOps) { //this is a must, so that we don't have multiple threads messing with the socket reg(sk,attachment,sk.interestOps()& (~readyOps)); } // 向NioChannel註冊感興趣的事件,具體代碼看下面的PollerEvent類的說明 protected void reg(SelectionKey sk, KeyAttachment attachment, int intops) { sk.interestOps(intops); attachment.interestOps(intops); attachment.setCometOps(intops); } } 複製代碼true複製代碼false複製代碼
Poller 處理的核心是啓動執行事件隊列中的 PollerEvent,接着從 selector 中遍歷已經就緒的 key ,一旦發生了感興趣的事件,則交由 processSocket 方法處理。PollerEvent 的做用是向 socket 註冊或更新感興趣的事件:
/**
*
* PollerEvent, cacheable object for poller events to avoid GC
*/
public static class PollerEvent implements Runnable {
// 每一個PollerEvent都會保存NioChannel的引用
protected NioChannel socket;
protected int interestOps;
protected KeyAttachment key;
public PollerEvent(NioChannel ch, KeyAttachment k, int intOps) {
reset(ch, k, intOps);
}
public void reset(NioChannel ch, KeyAttachment k, int intOps) {
socket = ch;
interestOps = intOps;
key = k;
}
public void reset() {
reset(null, null, 0);
}
@Override
public void run() {
//socket第一次註冊到selector中,完成對socket讀事件的註冊
if ( interestOps == OP_REGISTER ) {
try {
socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, key);
} catch (Exception x) {
log.error("", x);
}
} else {
// socket以前已經註冊到了selector中,更新socket所感興趣的事件
final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
try {
boolean cancel = false;
if (key != null) {
final KeyAttachment att = (KeyAttachment) key.attachment();
if ( att!=null ) {
//handle callback flag
if (att.isComet() && (interestOps & OP_CALLBACK) == OP_CALLBACK ) {
att.setCometNotify(true);
} else {
att.setCometNotify(false);
}
interestOps = (interestOps & (~OP_CALLBACK));//remove the callback flag
// 刷新事件的最後訪問時間,防止事件超時
att.access();//to prevent timeout
//we are registering the key to start with, reset the fairness counter.
int ops = key.interestOps() | interestOps;
att.interestOps(ops);
key.interestOps(ops);
} else {
cancel = true;
}
} else {
cancel = true;
}
if ( cancel ) socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false);
}catch (CancelledKeyException ckx) {
try {
socket.getPoller().cancelledKey(key,SocketStatus.DISCONNECT,true);
}catch (Exception ignore) {}
}
}//end if
}//run
@Override
public String toString() {
return super.toString()+"[intOps="+this.interestOps+"]";
}
}
複製代碼
在第5步的 Poller 處理流程的分析中看到它的 run 方法最後會調用 processKey() 處理 selector 檢測到的通道事件,而在這個方法最後會調用 processSocket 來調用具體的通道處理邏輯,看下 processSocket 方法的實現:
public boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {
try {
KeyAttachment attachment = (KeyAttachment)socket.getAttachment();
if (attachment == null) {
return false;
}
attachment.setCometNotify(false); //will get reset upon next reg
// 從SocketProcessor的緩存隊列中取出一個來處理socket
SocketProcessor sc = processorCache.poll();
if ( sc == null ) sc = new SocketProcessor(socket,status);
else sc.reset(socket,status);
// 將有事件發生的socket交給Worker處理
if ( dispatch && getExecutor()!=null ) getExecutor().execute(sc);
else sc.run();
} catch (RejectedExecutionException rx) {
log.warn("Socket processing request was rejected for:"+socket,rx);
return false;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
log.error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}
複製代碼
Poller 經過 NioEndpoint 的協調,將發生事件的 socket 交給工做者線程 Worker 來進一步處理。整個事件框架的工做就到此結束,下面就是 Worker 的處理。
在 Tomcat 6 版本的 NIO 處理實現中有一個 Worker 類,在 Tomcat 7 中把它去掉了,但工做者的職責還在,只是交由了上面看到的 SocketProcessor 這個類來擔當,看下這個類的實現代碼:
// ---------------------------------------------- SocketProcessor Inner Class
// 這個類至關於一個工做者,但只會在一個外部線程池中簡單使用。
/**
* This class is the equivalent of the Worker, but will simply use in an
* external Executor thread pool.
*/
protected class SocketProcessor implements Runnable {
// 每一個SocketProcessor保存一個NioChannel的引用
protected NioChannel socket = null;
protected SocketStatus status = null;
public SocketProcessor(NioChannel socket, SocketStatus status) {
reset(socket,status);
}
public void reset(NioChannel socket, SocketStatus status) {
this.socket = socket;
this.status = status;
}
@Override
public void run() {
// 從socket中獲取SelectionKey
SelectionKey key = socket.getIOChannel().keyFor(
socket.getPoller().getSelector());
KeyAttachment ka = null;
if (key != null) {
ka = (KeyAttachment)key.attachment();
}
// Upgraded connections need to allow multiple threads to access the
// connection at the same time to enable blocking IO to be used when
// NIO has been configured
if (ka != null && ka.isUpgraded() &&
SocketStatus.OPEN_WRITE == status) {
synchronized (ka.getWriteThreadLock()) {
doRun(key, ka);
}
} else {
synchronized (socket) {
doRun(key, ka);
}
}
}
private void doRun(SelectionKey key, KeyAttachment ka) {
try {
int handshake = -1;
try {
if (key != null) {
// For STOP there is no point trying to handshake as the
// Poller has been stopped.
if (socket.isHandshakeComplete() ||
status == SocketStatus.STOP) {
handshake = 0;
} else {
handshake = socket.handshake(
key.isReadable(), key.isWritable());
// The handshake process reads/writes from/to the
// socket. status may therefore be OPEN_WRITE once
// the handshake completes. However, the handshake
// happens when the socket is opened so the status
// must always be OPEN_READ after it completes. It
// is OK to always set this as it is only used if
// the handshake completes.
status = SocketStatus.OPEN_READ;
}
}
}catch ( IOException x ) {
handshake = -1;
if ( log.isDebugEnabled() ) log.debug("Error during SSL handshake",x);
}catch ( CancelledKeyException ckx ) {
handshake = -1;
}
if ( handshake == 0 ) {
SocketState state = SocketState.OPEN;
// Process the request from this socket
if (status == null) {
// 最關鍵的代碼,這裏將KeyAttachment(實際就是socket)交給Handler處理請求
state = handler.process(ka, SocketStatus.OPEN_READ);
} else {
state = handler.process(ka, status);
}
if (state == SocketState.CLOSED) {
// Close socket and pool
try {
close(ka, socket, key, SocketStatus.ERROR);
} catch ( Exception x ) {
log.error("",x);
}
}
} else if (handshake == -1 ) {
close(ka, socket, key, SocketStatus.DISCONNECT);
} else {
ka.getPoller().add(socket, handshake);
}
} catch (CancelledKeyException cx) {
socket.getPoller().cancelledKey(key, null, false);
} catch (OutOfMemoryError oom) {
try {
oomParachuteData = null;
log.error("", oom);
if (socket != null) {
socket.getPoller().cancelledKey(key,SocketStatus.ERROR, false);
}
releaseCaches();
}catch ( Throwable oomt ) {
try {
System.err.println(oomParachuteMsg);
oomt.printStackTrace();
}catch (Throwable letsHopeWeDontGetHere){
ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
}
}
} catch (VirtualMachineError vme) {
ExceptionUtils.handleThrowable(vme);
}catch ( Throwable t ) {
log.error("",t);
if (socket != null) {
socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false);
}
} finally {
socket = null;
status = null;
//return to cache
if (running && !paused) {
processorCache.offer(this);
}
}
}
private void close(KeyAttachment ka, NioChannel socket, SelectionKey key,
SocketStatus socketStatus) {
...
}
}
複製代碼
能夠看到由 SocketProcessor 尋找合適的 Handler 處理器作最終 socket 轉換處理。
能夠用下面這幅圖總結一下 NioEndpoint 的主要流程:
Acceptor 和 Poller 是線程數組,Worker 是一個線程池( Executor )