public abstract static class Acceptor implements Runnable { public enum AcceptorState { NEW, RUNNING, PAUSED, ENDED } protected volatile AcceptorState state = AcceptorState.NEW; public final AcceptorState getState() { return state; } private String threadName; protected final void setThreadName(final String threadName) { this.threadName = threadName; } protected final String getThreadName() { return threadName; } }
能夠看出在這個靜態內部類中並無實現run()方法,其實現交給子類來實現。在Tomcat中實際定義來一個 Acceptor數組來表示一組接受TCP鏈接的線程。咱們在簡單看一下其啓動這個接受線程的代碼實現。socket
protected final void startAcceptorThreads() { int count = getAcceptorThreadCount(); acceptors = new Acceptor[count]; for (int i = 0; i < count; i++) { acceptors[i] = createAcceptor(); String threadName = getName() + "-Acceptor-" + i; acceptors[i].setThreadName(threadName); Thread t = new Thread(acceptors[i], threadName); t.setPriority(getAcceptorThreadPriority()); t.setDaemon(getDaemon()); t.start(); } }
protected class Acceptor extends AbstractEndpoint.Acceptor { @Override public void run() { int errorDelay = 0; // Loop until we receive a shutdown command while (running) { // Loop if endpoint is paused while (paused && running) { state = AcceptorState.PAUSED; try { Thread.sleep(50); } catch (InterruptedException e) { // Ignore } } if (!running) { break; } state = AcceptorState.RUNNING; try { //if we have reached max connections, wait countUpOrAwaitConnection(); Socket socket = null; try { // Accept the next incoming connection from the server // socket socket = serverSocketFactory.acceptSocket(serverSocket); } catch (IOException ioe) { countDownConnection(); // Introduce delay if necessary errorDelay = handleExceptionWithDelay(errorDelay); // re-throw throw ioe; } // Successful accept, reset the error delay errorDelay = 0; // Configure the socket if (running && !paused && setSocketOptions(socket)) { // Hand this socket off to an appropriate processor if (!processSocket(socket)) { countDownConnection(); // Close socket right away closeSocket(socket); } } else { countDownConnection(); // Close socket right away closeSocket(socket); } } catch (IOException x) { if (running) { log.error(sm.getString(""), x); } } catch (NullPointerException npe) { if (running) { log.error(sm.getString(""), npe); } } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error(sm.getString(""), t); } } state = AcceptorState.ENDED; } }
其核心在接收到TCP鏈接以後,即在接收到Socket,會調用processSocket(Socket socket);這個方法。咱們再來關注一下這個方法。
protected boolean processSocket(Socket socket) { // Process the request from this socket try { SocketWrapper<Socket> wrapper = new SocketWrapper<Socket>(socket); wrapper.setKeepAliveLeft(getMaxKeepAliveRequests()); wrapper.setSecure(isSSLEnabled()); // During shutdown, executor may be null - avoid NPE if (!running) { return false; } getExecutor().execute(new SocketProcessor(wrapper)); } catch (RejectedExecutionException x) { log.warn("Socket processing request was rejected for:"+socket,x); return false; } catch (Throwable t) { ExceptionUtils.handleThrowable(t); // This means we got an OOM or similar creating a thread, or that // the pool and its queue are full log.error(sm.getString(""), t); return false; } return true; }
其核心代碼在於 getExecutor().execute(new SocketProcessor(wrapper));getExecutor()會返回Executor對象(在AbstractEndPoint中createExecutor()創建了線程池),由線程池中的線程來處理該Socket。咱們再來看一下SocketProccessor這個在JIOEndPoint中的內部類,這個類(注意此時已經在工做線程之中)中核心代碼
if ((state != SocketState.CLOSED)) { if (status == null) { state = handler.process(socket, SocketStatus.OPEN_READ); } else { state = handler.process(socket,status); } }