前言
上篇文章介紹了 NioEndpoint,其中講到了在 NioEndpoint#startInternal 方法裏建立並啓動了 Acceptor 和 Poller,線程。本篇文章先看 Acceptor,下篇文章再看 Poller。
1. Acceptor
Acceptor 的構造方法聲明爲:java
private final AbstractEndpoint<?,U> endpoint; public Acceptor(AbstractEndpoint<?,U> endpoint) { this.endpoint = endpoint; }
其中 endpoint 參數是在 NioEndpoint#startAcceptorThreads 方法裏 new Acceptor 時傳入的 NioEndpoint 對象。算法
Acceptor 實現了 Runnable 方法,所以它的 run 方法是 Acceptor 的關鍵。編程
@Override public void run() { int errorDelay = 0; // Loop until we receive a shutdown command while (endpoint.isRunning()) { // Loop if endpoint is paused while (endpoint.isPaused() && endpoint.isRunning()) { state = AcceptorState.PAUSED; try { Thread.sleep(50); } catch (InterruptedException e) { // Ignore } } if (!endpoint.isRunning()) { break; } state = AcceptorState.RUNNING; try { //if we have reached max connections, wait endpoint.countUpOrAwaitConnection(); // Endpoint might have been paused while waiting for latch // If that is the case, don't accept new connections if (endpoint.isPaused()) { continue; } U socket = null; try { // Accept the next incoming connection from the server // socket socket = endpoint.serverSocketAccept(); } catch (Exception ioe) { // We didn't get a socket endpoint.countDownConnection(); if (endpoint.isRunning()) { // Introduce delay if necessary errorDelay = handleExceptionWithDelay(errorDelay); // re-throw throw ioe; } else { break; } } // Successful accept, reset the error delay errorDelay = 0; // Configure the socket if (endpoint.isRunning() && !endpoint.isPaused()) { // setSocketOptions() will hand the socket off to // an appropriate processor if successful if (!endpoint.setSocketOptions(socket)) { endpoint.closeSocket(socket); } } else { endpoint.destroySocket(socket); } } catch (Throwable t) { ExceptionUtils.handleThrowable(t); String msg = sm.getString("endpoint.accept.fail"); // APR specific. // Could push this down but not sure it is worth the trouble. if (t instanceof Error) { Error e = (Error) t; if (e.getError() == 233) { // Not an error on HP-UX so log as a warning // so it can be filtered out on that platform // See bug 50273 log.warn(msg, t); } else { log.error(msg, t); } } else { log.error(msg, t); } } } state = AcceptorState.ENDED; }
run 方法的代碼被包裹在一個 while 循環裏,while 循環的判斷條件是 endpoint.isRunning(),也就是 NioEndpoint 的父類 AbstractEndpoint 裏的 running 字段。segmentfault
/** * Running state of the endpoint. */ protected volatile boolean running = false;
這個 running 字段在 NioEndpoint#startInternal 方法裏被置爲 true。在 NioEndpoint#stopInternal 方法裏 running 置爲 false,數組
在最外層的 while 循環裏,就是 run 方法的核心了。緩存
// Loop if endpoint is paused while (endpoint.isPaused() && endpoint.isRunning()) { state = AcceptorState.PAUSED; try { Thread.sleep(50); } catch (InterruptedException e) { // Ignore } }
首先判斷 endpoint.isPaused() 是否是爲 true,若是是就讓線程 sleep 50毫秒,並把 Acceptor 的狀態設置爲 AcceptorState.PAUSED。
這個 endpoint.isPaused() 跟 isRunning 方法相似,也就是判斷
AbstractEndpoint 裏的一個 paused 屬性,起聲明以下app
/** * Will be set to true whenever the endpoint is paused. */ protected volatile boolean paused = false;
這個 pause 的是在 AbstractEndpoint#pause 裏置爲 true 的。socket
而後把 Acceptor 的狀態改成 AcceptorState.RUNNING。ide
接着進入 try 語句塊。首先調用 endpoint.countUpOrAwaitConnection()oop
//if we have reached max connections, wait endpoint.countUpOrAwaitConnection();
protected void countUpOrAwaitConnection() throws InterruptedException { if (maxConnections==-1) return; LimitLatch latch = connectionLimitLatch; if (latch!=null) latch.countUpOrAwait(); }
能夠看出,countUpOrAwaitConnection 這個方法是判斷是否已超過 maxConnections,若是是就調用 latch.countUpOrAwait() 等待。
而後調用 endpoint.serverSocketAccept() 方法,返回一個泛型對象,這個泛型對象的具體類型在 NioEndpoint 對象中就確立了。
public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
public abstract class AbstractJsseEndpoint<S,U> extends AbstractEndpoint<S,U>
從 NioEndpoint 和 AbstractJsseEndpoint 的聲明中能夠看出,泛型 U 的具體類型是 SocketChannel。即 java.nio.channels.SocketChannel。
也就是說 endpoint.serverSocketAccept() 獲取的是一個 SocketChannel 對象。
@Override protected SocketChannel serverSocketAccept() throws Exception { return serverSock.accept(); }
serverSocketAccept 就是簡單調用 serverSock.accept() 方法獲取一個 SocketChannel 對象。在 nio 編程裏,能夠認爲一個 SocketChannel 對象表明一個服務端與客戶端的鏈接。
這個 serverSock 就是在 NioEndpoint#initServerSocket() 裏調用 ServerSocketChannel.open() 初始化的。
拿到這個 SocketChannel 對象以後就配置這個對象
// Configure the socket if (endpoint.isRunning() && !endpoint.isPaused()) { // setSocketOptions() will hand the socket off to // an appropriate processor if successful if (!endpoint.setSocketOptions(socket)) { endpoint.closeSocket(socket); } } else { endpoint.destroySocket(socket); }
上面代碼的邏輯很簡單,就是調用 endpoint.setSocketOptions(socket) 方法,若是不成功就調用 endpoint.closeSocket(socket) 方法。destroySocket(socket) 方法內部也是調用 closeSocket 方法。
protected void destroySocket(U socket) { closeSocket(socket); }
@Override protected void closeSocket(SocketChannel socket) { countDownConnection(); try { socket.socket().close(); } catch (IOException ioe) { if (log.isDebugEnabled()) { log.debug(sm.getString("endpoint.err.close"), ioe); } } try { socket.close(); } catch (IOException ioe) { if (log.isDebugEnabled()) { log.debug(sm.getString("endpoint.err.close"), ioe); } } }
closeSocket 在 NioEndpoint 裏,而 destroySocket 在 AbstractEndpoint 裏。closeSocket 方法邏輯很簡單就是調用 SocketChannel.socket().close() 和 SocketChannel.close() 方法。
關鍵地方在於 endpoint.setSocketOptions(socket) 方法。
1.1 NioEndpoint#setSocketOptions
/** * Process the specified connection. * @param socket The socket channel * @return <code>true</code> if the socket was correctly configured * and processing may continue, <code>false</code> if the socket needs to be * close immediately */ @Override protected boolean setSocketOptions(SocketChannel socket) { // Process the connection try { //disable blocking, APR style, we are gonna be polling it socket.configureBlocking(false); Socket sock = socket.socket(); socketProperties.setProperties(sock); NioChannel channel = nioChannels.pop(); if (channel == null) { SocketBufferHandler bufhandler = new SocketBufferHandler( socketProperties.getAppReadBufSize(), socketProperties.getAppWriteBufSize(), socketProperties.getDirectBuffer()); if (isSSLEnabled()) { channel = new SecureNioChannel(socket, bufhandler, selectorPool, this); } else { channel = new NioChannel(socket, bufhandler); } } else { channel.setIOChannel(socket); channel.reset(); } getPoller0().register(channel); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); try { log.error(sm.getString("endpoint.socketOptionsError"), t); } catch (Throwable tt) { ExceptionUtils.handleThrowable(tt); } // Tell to close the socket return false; } return true; }
setSocketOptions 方法裏,首先用 socketProperties 給這個 SocketChannel 對象的 Socket 設置了一些屬性。
而後,從 nioChannels 這個 SynchronizedStack<NioChannel> 緩存池裏獲取一個 NioChannel 對象,若是獲取不到就建立一個,建立的 NioChannel 對象的時候也建立了一個 SocketBufferHandler 對象。
public SocketBufferHandler(int readBufferSize, int writeBufferSize, boolean direct) { this.direct = direct; if (direct) { readBuffer = ByteBuffer.allocateDirect(readBufferSize); writeBuffer = ByteBuffer.allocateDirect(writeBufferSize); } else { readBuffer = ByteBuffer.allocate(readBufferSize); writeBuffer = ByteBuffer.allocate(writeBufferSize); } }
SocketBufferHandler 對象裏包含了兩個 ByteBuffer 對象,一個讀一個寫。
protected SocketChannel sc = null; protected final SocketBufferHandler bufHandler; public NioChannel(SocketChannel channel, SocketBufferHandler bufHandler) { this.sc = channel; this.bufHandler = bufHandler; }
NioChannel 封裝了對 SocketChannel 對象的讀寫操做。
最後 setSocketOptions 裏調用了 getPoller0().register(channel)。
private Poller[] pollers = null; private AtomicInteger pollerRotater = new AtomicInteger(0); public Poller getPoller0() { int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length; return pollers[idx]; }
getPoller0() 方法就是從 pollers 數組裏選一個 Poller 對象,選取的算法是輪詢選取。
選出 Poller 對象後,調用其 register(channel) 方法。
1.2 NioEndpoint#Poller#register
/** * Registers a newly created socket with the poller. * * @param socket The newly created socket */ public void register(final NioChannel socket) { socket.setPoller(this); NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this); socket.setSocketWrapper(ka); ka.setPoller(this); ka.setReadTimeout(getConnectionTimeout()); ka.setWriteTimeout(getConnectionTimeout()); ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests()); ka.setSecure(isSSLEnabled()); PollerEvent r = eventCache.pop(); ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into. if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER); else r.reset(socket,ka,OP_REGISTER); addEvent(r); }
register 方法傳入的參數是 NioChannel 而不是 SocketChannel 了,SocketChannel 已經與 NioChannel 關聯了。
register 第一行就調用 NioChannel#setPoller 方法,把當前 Poller 對象複製給 NioChannel 的屬性,將 NioChannel 對象與 Poller 對象關聯起來。
接着 建立了一個 NioSocketWrapper 對象並設置了相關屬性,其中最重要的是 ka.interestOps(SelectionKey.OP_READ) 這一行設置了 NioSocketWrapper 所感興趣的操做。
而後把 NioChannel 對象與 NioSocketWrapper 對象關聯起來。
public NioSocketWrapper(NioChannel channel, NioEndpoint endpoint) { super(channel, endpoint); pool = endpoint.getSelectorPool(); socketBufferHandler = channel.getBufHandler(); }
NioSocketWrapper 的聲明爲
public static class NioSocketWrapper extends SocketWrapperBase<NioChannel> {
SocketWrapperBase 的構造方法爲
public SocketWrapperBase(E socket, AbstractEndpoint<E,?> endpoint) { this.socket = socket; this.endpoint = endpoint; ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.blockingStatusReadLock = lock.readLock(); this.blockingStatusWriteLock = lock.writeLock(); }
public static class NioSocketWrapper extends SocketWrapperBase<NioChannel>
public abstract class SocketWrapperBase<E>
SocketWrapperBase 聲明裏有一個泛型 E,而 NioSocketWrapper 的聲明裏,泛型 E 的具體類型則是 NioChannel。
register 方法的最後從 eventCache 緩存池裏獲取一個 PollerEvent 對象,若是獲取不到就建立一個 PollerEvent 對象。
private NioChannel socket; private int interestOps; private NioSocketWrapper socketWrapper; public PollerEvent(NioChannel ch, NioSocketWrapper w, int intOps) { reset(ch, w, intOps); } public void reset(NioChannel ch, NioSocketWrapper w, int intOps) { socket = ch; interestOps = intOps; socketWrapper = w; }
建立 PollerEvent 對象時傳入的參數分別是前面的 NioChannel 、NioSocketWrapper 對象,以及一個 int 類型的常量 OP_REGISTER,值爲 0x100,分別賦值給 PollerEvent 的屬性,另外 PollerEvent 也實現了 Runnable 接口,這幾個屬性在 PollerEvent#run 方法裏都有對應的做用。
拿到 PollerEvent 對象後,調用 addEvent(r) 方法把這個對象加入的隊列中等待後續 Poller 線程的處理。
private final SynchronizedQueue<PollerEvent> events = new SynchronizedQueue<>(); private void addEvent(PollerEvent event) { events.offer(event); if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup(); }
events 緩存的 PollerEvent 對象,會在 Poller#run 方法裏被處理。
小結本文分析了 Acceptor 的 run 方法,也就是 Acceptor 線程作的事情。能夠看出 Acceptor 線程在一個循環裏一直接受客戶端鏈接,生成 SocketChannel 對象,並把這個 SocketChannel 對象封裝成 NioChannel 和 NioSocketWrapper 對象,並把這兩個對象放在一個 PollerEvent 對象裏,並把這個 PollerEvent 對象加入的緩存池裏等待 Poller 線程的處理。