從startup.sh文件,找到catalina.sh,而後找到啓動類:org.apache.catalina.startup.Bootstrapjava
1.Bootstrap的main方法,實例化本身,而後初始化一堆classloader,分別是commonLoader, serverLoader, sharedLoader,其中commonLoader在conf的ctalina.properties裏配置了讀取哪些jar包算法
common.loader=${catalina.base}/lib,${catalina.base}/lib/*.jar,${catalina.home}/lib,${catalina.home}/lib/*.jar
而另外兩個classloader是沒有指定目錄的,也就是沒有目錄讓他們加載。apache
2.調用Bootstrap的start方法,而後初始化org.apache.catalina.startup.Catalina對象,而後設置其classloader成員變量爲sharedLoader。
3.調用Catalina對象的start方法,這裏就稍微複雜點。windows
public void start() { if (getServer() == null) { load(); } if (getServer() == null) { log.fatal("Cannot start server. Server instance is not configured."); return; } long t1 = System.nanoTime(); // Start the new server try { getServer().start(); } catch (LifecycleException e) { log.fatal(sm.getString("catalina.serverStartFail"), e); try { getServer().destroy(); } catch (LifecycleException e1) { log.debug("destroy() failed for failed Server ", e1); } return; } long t2 = System.nanoTime(); if(log.isInfoEnabled()) { log.info("Server startup in " + ((t2 - t1) / 1000000) + " ms"); } // Register shutdown hook if (useShutdownHook) { if (shutdownHook == null) { shutdownHook = new CatalinaShutdownHook(); } Runtime.getRuntime().addShutdownHook(shutdownHook); // If JULI is being used, disable JULI's shutdown hook since // shutdown hooks run in parallel and log messages may be lost // if JULI's hook completes before the CatalinaShutdownHook() LogManager logManager = LogManager.getLogManager(); if (logManager instanceof ClassLoaderLogManager) { ((ClassLoaderLogManager) logManager).setUseShutdownHook( false); } } if (await) { await(); stop(); } }
使用了Digester工具,邊讀取server.xml,邊實例化對象。實例化了org.apache.catalina.core.StandardServer對象,org.apache.catalina.deploy.NamingResources對象等等,Digester這工具可設置實例化默認class或讀取xml標籤中className屬性來實例化對應Catalina對象的成員變量(還能夠帶層次的,如實例化Server對象裏的Service成員變量)。如Server對象,就是默認org.apache.catalina.core.StandardServer對象。而後再實例化Server對象裏的Service成員變量org.apache.catalina.core.StandardService。還有就是自定義實例化規則,如ConnectorCreateRule,就會直接將Connector的標籤裏protocol做爲org.apache.catalina.connector.Connector#Connector(java.lang.String)的實例化參數,實例化org.apache.catalina.connector.Connector;tomcat
Digester這工具備四種功能,1.讀取xml標籤,實例化默認配置的class;2.讀取xml標籤中className屬性來實例化;3.能夠帶層次的設置成員變量;4.自定實例化規則。
//ConnectorCreateRule.java //ConnectorCreateRule,就會直接將Connector的標籤裏protocol做爲org.apache.catalina.connector.Connector#Connector(java.lang.String)的實例化參數,實例化org.apache.catalina.connector.Connector; @Override public void begin(String namespace, String name, Attributes attributes) throws Exception { Service svc = (Service)digester.peek(); Executor ex = null; if ( attributes.getValue("executor")!=null ) { ex = svc.getExecutor(attributes.getValue("executor")); } Connector con = new Connector(attributes.getValue("protocol")); if ( ex != null ) _setExecutor(con,ex); digester.push(con); }
還實例化org.apache.catalina.core.StandardThreadExecutor,這個Exector後續做爲Connector處理ServerProcessor使用
第3點,實例化不少對象,當時爲了便於理解主幹,其餘的先忽略。session
4.實例化上面一堆對象後,會調用Server對象的init方法,但因爲是繼承org.apache.catalina.util.LifecycleBase,因此也就是調用LifecycleBase的init方法。app
@Override protected void initInternal() throws LifecycleException { ...這裏還有一些代碼,先忽略 // Initialize our defined Services for (int i = 0; i < services.length; i++) { services[i].init(); } }
5.Server對象init也是調用一堆Service的init方法(意思說server.xml的Service標籤是能夠配置多個),很尷尬,Service的init方法又是調用LifecycleBase的init方法,這一塊可能形成理解混亂,由於Server和Service名字很像,並且init方法又是調用LifecycleBase的init模版方法。socket
@Override protected void initInternal() throws LifecycleException { ...這裏還有一些代碼,先忽略 // Initialize our defined Connectors synchronized (connectorsLock) { for (Connector connector : connectors) { try { connector.init(); } catch (Exception e) { String message = sm.getString( "standardService.connector.initFailed", connector); log.error(message, e); if (Boolean.getBoolean("org.apache.catalina.startup.EXIT_ON_INIT_FAILURE")) throw new LifecycleException(message); } } } }
@Override protected void initInternal() throws LifecycleException { super.initInternal(); // Initialize adapter adapter = new CoyoteAdapter(this); protocolHandler.setAdapter(adapter); // Make sure parseBodyMethodsSet has a default if (null == parseBodyMethodsSet) { setParseBodyMethods(getParseBodyMethods()); } if (protocolHandler.isAprRequired() && !AprLifecycleListener.isAprAvailable()) { throw new LifecycleException( sm.getString("coyoteConnector.protocolHandlerNoApr", getProtocolHandlerClassName())); } try { protocolHandler.init(); } catch (Exception e) { throw new LifecycleException( sm.getString("coyoteConnector.protocolHandlerInitializationFailed"), e); } // Initialize mapper listener mapperListener.init(); }
這裏就調用protocolHandler的init方法。protocolHandler從哪裏來的呢?就第3點說的,自定義實例化規則ConnectorCreateRule裏,將Connector標籤的protocol屬性做爲Connector構造方法參數tcp
public Connector(String protocol) { setProtocol(protocol); // Instantiate protocol handler try { Class<?> clazz = Class.forName(protocolHandlerClassName); this.protocolHandler = (ProtocolHandler) clazz.getDeclaredConstructor().newInstance(); } catch (Exception e) { log.error(sm.getString( "coyoteConnector.protocolHandlerInstantiationFailed"), e); } // Default for Connector depends on this (deprecated) system property if (Boolean.parseBoolean(System.getProperty("org.apache.tomcat.util.buf.UDecoder.ALLOW_ENCODED_SLASH", "false"))) { encodedSolidusHandling = EncodedSolidusHandling.DECODE; } }
7.protocolHandler的init方法,是在org.apache.coyote.AbstractProtocol#initide
@Override public void init() throws Exception { ...這裏還有一些代碼,先忽略 String endpointName = getName(); endpoint.setName(endpointName.substring(1, endpointName.length()-1)); try { endpoint.init(); } catch (Exception ex) { getLog().error(sm.getString("abstractProtocolHandler.initError", getName()), ex); throw ex; } }
這裏咱們用org.apache.coyote.http11.Http11NioProtocol這個Protocal繼續
public Http11NioProtocol() { endpoint=new NioEndpoint(); cHandler = new Http11ConnectionHandler(this); ((NioEndpoint) endpoint).setHandler(cHandler); setSoLinger(Constants.DEFAULT_CONNECTION_LINGER); setSoTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT); setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY); }
8.protocolHandler的init方法,調用NioEndpoint的init方法. NioEndpoint繼承org.apache.tomcat.util.net.AbstractEndpoint的init模版方法
public final void init() throws Exception { testServerCipherSuitesOrderSupport(); if (bindOnInit) { bind(); bindState = BindState.BOUND_ON_INIT; } }
9.bind方法就是交給子類實現,咱們看org.apache.tomcat.util.net.NioEndpoint#bind的實現
@Override public void bind() throws Exception { serverSock = ServerSocketChannel.open(); socketProperties.setProperties(serverSock.socket()); InetSocketAddress addr = (getAddress()!=null?new InetSocketAddress(getAddress(),getPort()):new InetSocketAddress(getPort())); serverSock.socket().bind(addr,getBacklog()); serverSock.configureBlocking(true); //mimic APR behavior if (getSocketProperties().getSoTimeout() >= 0) { serverSock.socket().setSoTimeout(getSocketProperties().getSoTimeout()); } // Initialize thread count defaults for acceptor, poller if (acceptorThreadCount == 0) { // FIXME: Doesn't seem to work that well with multiple accept threads acceptorThreadCount = 1; } if (pollerThreadCount <= 0) { //minimum one poller thread pollerThreadCount = 1; } stopLatch = new CountDownLatch(pollerThreadCount); // Initialize SSL if needed if (isSSLEnabled()) { SSLUtil sslUtil = handler.getSslImplementation().getSSLUtil(this); sslContext = sslUtil.createSSLContext(); sslContext.init(wrap(sslUtil.getKeyManagers()), sslUtil.getTrustManagers(), null); SSLSessionContext sessionContext = sslContext.getServerSessionContext(); if (sessionContext != null) { sslUtil.configureSessionContext(sessionContext); } // Determine which cipher suites and protocols to enable enabledCiphers = sslUtil.getEnableableCiphers(sslContext); enabledProtocols = sslUtil.getEnableableProtocols(sslContext); } if (oomParachute>0) reclaimParachute(true); selectorPool.open(); }
這裏咱們看到使用了實例化了ServerSocketChannel,並設置爲阻塞模式。但沒看到ServerSocketChannel註冊Selector。不過調用了org.apache.tomcat.util.net.NioSelectorPool#open來實例化Selector,但沒註冊
protected Selector getSharedSelector() throws IOException { if (SHARED && SHARED_SELECTOR == null) { synchronized ( NioSelectorPool.class ) { if ( SHARED_SELECTOR == null ) { synchronized (Selector.class) { // Selector.open() isn't thread safe // http://bugs.sun.com/view_bug.do?bug_id=6427854 // Affects 1.6.0_29, fixed in 1.7.0_01 SHARED_SELECTOR = Selector.open(); } log.info("Using a shared selector for servlet write/read"); } } } return SHARED_SELECTOR; } public void open() throws IOException { enabled = true; getSharedSelector(); if (SHARED) { blockingSelector = new NioBlockingSelector(); blockingSelector.open(getSharedSelector()); } }
而後將Selector賦值給NioBlockingSelector成員變量.
public void open(Selector selector) { sharedSelector = selector; poller = new BlockPoller(); poller.selector = sharedSelector; poller.setDaemon(true); poller.setName("NioBlockingSelector.BlockPoller-"+(threadCounter.getAndIncrement())); poller.start(); }
BlockPoller是個線程對象
protected static class BlockPoller extends Thread { protected volatile boolean run = true; protected Selector selector = null; protected ConcurrentLinkedQueue<Runnable> events = new ConcurrentLinkedQueue<Runnable>(); }
維護了一個隊列events,用於
NioEndPoint --> NioSelectorPool --> NioBlockingSelector --> BlockPoller
Catalina對象的start方法中初始化部分的中,相關主幹已經初始化完畢。
下一步,執行getServer().start();方法
1.Server的start方法,又開始一輪的org.apache.catalina.util.LifecycleBase#start模版方法。
@Override protected void startInternal() throws LifecycleException { fireLifecycleEvent(CONFIGURE_START_EVENT, null); setState(LifecycleState.STARTING); globalNamingResources.start(); // Start our defined Services synchronized (servicesLock) { for (int i = 0; i < services.length; i++) { services[i].start(); } } }
2.Service的start方法(org.apache.catalina.util.LifecycleBase#start模版方法)
@Override protected void startInternal() throws LifecycleException { ...這裏還有一些代碼,先忽略 // Start our defined Connectors second synchronized (connectorsLock) { for (Connector connector: connectors) { try { // If it has already failed, don't try and start it if (connector.getState() != LifecycleState.FAILED) { connector.start(); } } catch (Exception e) { log.error(sm.getString( "standardService.connector.startFailed", connector), e); } } } }
3.調用Connector的start方法(org.apache.catalina.util.LifecycleBase#start模版方法)
@Override protected void startInternal() throws LifecycleException { // Validate settings before starting if (getPort() < 0) { throw new LifecycleException(sm.getString( "coyoteConnector.invalidPort", Integer.valueOf(getPort()))); } setState(LifecycleState.STARTING); try { protocolHandler.start(); } catch (Exception e) { String errPrefix = ""; if(this.service != null) { errPrefix += "service.getName(): \"" + this.service.getName() + "\"; "; } throw new LifecycleException (errPrefix + " " + sm.getString ("coyoteConnector.protocolHandlerStartFailed"), e); } mapperListener.start(); }
4.protocolHandler.start(),但這裏不是調用org.apache.catalina.util.LifecycleBase#start模版方法,而是調用org.apache.coyote.AbstractProtocol#start的方法。
@Override public void start() throws Exception { if (getLog().isInfoEnabled()) getLog().info(sm.getString("abstractProtocolHandler.start", getName())); try { endpoint.start(); } catch (Exception ex) { getLog().error(sm.getString("abstractProtocolHandler.startError", getName()), ex); throw ex; } }
5.endpoint.start()調用了org.apache.tomcat.util.net.AbstractEndpoint#start方法
public final void start() throws Exception { if (bindState == BindState.UNBOUND) { bind(); bindState = BindState.BOUND_ON_START; } startInternal(); }
這裏的bind已經init階段調用過,因此這裏不會調用,而是繼續startInternal方法
6.startInternal方法是交給子類實現,這裏是org.apache.tomcat.util.net.NioEndpoint#startInternal
@Override public void startInternal() throws Exception { if (!running) { running = true; paused = false; // Create worker collection if ( getExecutor() == null ) { createExecutor(); } initializeConnectionLatch(); // Start poller threads pollers = new Poller[getPollerThreadCount()]; for (int i=0; i<pollers.length; i++) { pollers[i] = new Poller(); Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i); pollerThread.setPriority(threadPriority); pollerThread.setDaemon(true); pollerThread.start(); } startAcceptorThreads(); } }
建立org.apache.tomcat.util.net.NioEndpoint.Acceptor來循環接收鏈接。countUpOrAwaitConnection的方法就是用於判斷是否繼續從OS隊列中獲取鏈接。裏面的數量就是由maxConnections配置的,若是在BIO狀況下,maxConnections=maxThreads,因此等價於當工做線程都處於繁忙時,則acceptor會等待工做線程空閒纔會去獲取來鏈接。而在NIO狀況下,maxConnections默認等於100000,則不會等工做線程繁忙,而是繼續從OS隊列中獲取鏈接,放在events隊列中.
@Override public void run() { int errorDelay = 0; // Loop until we receive a shutdown command while (running) { // Loop if endpoint is paused while (paused && running) { state = AcceptorState.PAUSED; try { Thread.sleep(50); } catch (InterruptedException e) { // Ignore } } if (!running) { break; } state = AcceptorState.RUNNING; try { //if we have reached max connections, wait countUpOrAwaitConnection(); SocketChannel socket = null; try { // Accept the next incoming connection from the server // socket socket = serverSock.accept(); } catch (IOException ioe) { //we didn't get a socket countDownConnection(); // Introduce delay if necessary errorDelay = handleExceptionWithDelay(errorDelay); // re-throw throw ioe; } // Successful accept, reset the error delay errorDelay = 0; // setSocketOptions() will add channel to the poller // if successful if (running && !paused) { if (!setSocketOptions(socket)) { countDownConnection(); closeSocket(socket); } } else { countDownConnection(); closeSocket(socket); } } catch (SocketTimeoutException sx) { // Ignore: Normal condition } catch (IOException x) { if (running) { log.error(sm.getString("endpoint.accept.fail"), x); } } catch (OutOfMemoryError oom) { try { oomParachuteData = null; releaseCaches(); log.error("", oom); }catch ( Throwable oomt ) { try { try { System.err.println(oomParachuteMsg); oomt.printStackTrace(); }catch (Throwable letsHopeWeDontGetHere){ ExceptionUtils.handleThrowable(letsHopeWeDontGetHere); } }catch (Throwable letsHopeWeDontGetHere){ ExceptionUtils.handleThrowable(letsHopeWeDontGetHere); } } } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error(sm.getString("endpoint.accept.fail"), t); } } state = AcceptorState.ENDED; }
每接收到鏈接,也就是socket,會調用org.apache.tomcat.util.net.NioEndpoint#setSocketOptions方法來處理socket。
protected boolean setSocketOptions(SocketChannel socket) { // Process the connection try { //disable blocking, APR style, we are gonna be polling it socket.configureBlocking(false); Socket sock = socket.socket(); socketProperties.setProperties(sock); NioChannel channel = nioChannels.poll(); if ( channel == null ) { // SSL setup if (sslContext != null) { SSLEngine engine = createSSLEngine(); int appbufsize = engine.getSession().getApplicationBufferSize(); NioBufferHandler bufhandler = new NioBufferHandler(Math.max(appbufsize,socketProperties.getAppReadBufSize()), Math.max(appbufsize,socketProperties.getAppWriteBufSize()), socketProperties.getDirectBuffer()); channel = new SecureNioChannel(socket, engine, bufhandler, selectorPool); } else { // normal tcp setup NioBufferHandler bufhandler = new NioBufferHandler(socketProperties.getAppReadBufSize(), socketProperties.getAppWriteBufSize(), socketProperties.getDirectBuffer()); channel = new NioChannel(socket, bufhandler); } } else { channel.setIOChannel(socket); if ( channel instanceof SecureNioChannel ) { SSLEngine engine = createSSLEngine(); ((SecureNioChannel)channel).reset(engine); } else { channel.reset(); } } getPoller0().register(channel); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); try { log.error("",t); } catch (Throwable tt) { ExceptionUtils.handleThrowable(tt); } // Tell to close the socket return false; } return true; }
注意到,這裏socket設置了非阻塞,而後實例化NioChannel,而後註冊到Poller裏。Poller的註冊方法:
public void register(final NioChannel socket) { socket.setPoller(this); KeyAttachment key = keyCache.poll(); final KeyAttachment ka = key!=null?key:new KeyAttachment(socket); ka.reset(this,socket,getSocketProperties().getSoTimeout()); ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests()); ka.setSecure(isSSLEnabled()); PollerEvent r = eventCache.poll(); ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into. if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER); else r.reset(socket,ka,OP_REGISTER); addEvent(r); }
建立一個監聽READ的事件。註冊到Poller裏的Selector上。是否是看出叻,實際在PollerEvent裏完成叻,雖然看去好像OP_REGISTER,NIO好像沒有這事件,其實在PollerEvent會將OP_REGISTER轉爲SelectionKey.OP_READ。
*/ public static class PollerEvent implements Runnable { protected NioChannel socket; protected int interestOps; protected KeyAttachment key; public PollerEvent(NioChannel ch, KeyAttachment k, int intOps) { reset(ch, k, intOps); } public void reset(NioChannel ch, KeyAttachment k, int intOps) { socket = ch; interestOps = intOps; key = k; } public void reset() { reset(null, null, 0); } @Override public void run() { if ( interestOps == OP_REGISTER ) { try { socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, key); } catch (Exception x) { log.error("", x); } } else { final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); try { if (key == null) { // The key was cancelled (e.g. due to socket closure) // and removed from the selector while it was being // processed. Count down the connections at this point // since it won't have been counted down when the socket // closed. socket.getPoller().getEndpoint().countDownConnection(); } else { final KeyAttachment att = (KeyAttachment) key.attachment(); if ( att!=null ) { //handle callback flag if (att.isComet() && (interestOps & OP_CALLBACK) == OP_CALLBACK ) { att.setCometNotify(true); } else { att.setCometNotify(false); } interestOps = (interestOps & (~OP_CALLBACK));//remove the callback flag att.access();//to prevent timeout //we are registering the key to start with, reset the fairness counter. int ops = key.interestOps() | interestOps; att.interestOps(ops); key.interestOps(ops); } else { socket.getPoller().cancelledKey(key, SocketStatus.ERROR, false); } } } catch (CancelledKeyException ckx) { try { socket.getPoller().cancelledKey(key, SocketStatus.DISCONNECT, true); } catch (Exception ignore) {} } }//end if }//run @Override public String toString() { return super.toString()+"[intOps="+this.interestOps+"]"; } }
因爲 socket.setPoller(this);設置poller,因此socker能夠註冊poller裏selector。
而Poller是線程,由上面org.apache.tomcat.util.net.NioEndpoint#startInternal實例化了好幾個,也就是運行時看到的線程名:http-nio-8080-ClientPoller-1
/** * Poller class. */ public class Poller implements Runnable { protected Selector selector; protected ConcurrentLinkedQueue<Runnable> events = new ConcurrentLinkedQueue<Runnable>(); protected volatile boolean close = false; protected long nextExpiration = 0;//optimize expiration handling protected AtomicLong wakeupCounter = new AtomicLong(0l); protected volatile int keyCount = 0; public Poller() throws IOException { synchronized (Selector.class) { // Selector.open() isn't thread safe // http://bugs.sun.com/view_bug.do?bug_id=6427854 // Affects 1.6.0_29, fixed in 1.7.0_01 this.selector = Selector.open(); } } ... public boolean events() { boolean result = false; Runnable r = null; for (int i = 0, size = events.size(); i < size && (r = events.poll()) != null; i++ ) { result = true; try { r.run(); if ( r instanceof PollerEvent ) { ((PollerEvent)r).reset(); eventCache.offer((PollerEvent)r); } } catch ( Throwable x ) { log.error("",x); } } return result; } public void register(final NioChannel socket) { socket.setPoller(this); KeyAttachment key = keyCache.poll(); final KeyAttachment ka = key!=null?key:new KeyAttachment(socket); ka.reset(this,socket,getSocketProperties().getSoTimeout()); ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests()); ka.setSecure(isSSLEnabled()); PollerEvent r = eventCache.poll(); ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into. if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER); else r.reset(socket,ka,OP_REGISTER); addEvent(r); } public KeyAttachment cancelledKey(SelectionKey key, SocketStatus status, boolean dispatch) { KeyAttachment ka = null; try { if ( key == null ) return null;//nothing to do ka = (KeyAttachment) key.attachment(); if (ka != null && ka.isComet() && status != null) { //the comet event takes care of clean up //processSocket(ka.getChannel(), status, dispatch); ka.setComet(false);//to avoid a loop if (status == SocketStatus.TIMEOUT ) { if (processSocket(ka.getChannel(), status, true)) { return null; // don't close on comet timeout } } else { // Don't dispatch if the lines below are cancelling the key processSocket(ka.getChannel(), status, false); } } ka = (KeyAttachment) key.attach(null); if (ka!=null) handler.release(ka); else handler.release((SocketChannel)key.channel()); if (key.isValid()) key.cancel(); // If it is available, close the NioChannel first which should // in turn close the underlying SocketChannel. The NioChannel // needs to be closed first, if available, to ensure that TLS // connections are shut down cleanly. if (ka != null) { try { ka.getSocket().close(true); } catch (Exception e){ if (log.isDebugEnabled()) { log.debug(sm.getString( "endpoint.debug.socketCloseFail"), e); } } } // The SocketChannel is also available via the SelectionKey. If // it hasn't been closed in the block above, close it now. if (key.channel().isOpen()) { try { key.channel().close(); } catch (Exception e) { if (log.isDebugEnabled()) { log.debug(sm.getString( "endpoint.debug.channelCloseFail"), e); } } } try { if (ka != null && ka.getSendfileData() != null && ka.getSendfileData().fchannel != null && ka.getSendfileData().fchannel.isOpen()) { ka.getSendfileData().fchannel.close(); } } catch (Exception ignore) { } if (ka!=null) { ka.reset(); countDownConnection(); } } catch (Throwable e) { ExceptionUtils.handleThrowable(e); if (log.isDebugEnabled()) log.error("",e); } return ka; } /** * The background thread that listens for incoming TCP/IP connections and * hands them off to an appropriate processor. */ @Override public void run() { // Loop until destroy() is called while (true) { try { // Loop if endpoint is paused while (paused && (!close) ) { try { Thread.sleep(100); } catch (InterruptedException e) { // Ignore } } boolean hasEvents = false; // Time to terminate? if (close) { events(); timeout(0, false); try { selector.close(); } catch (IOException ioe) { log.error(sm.getString( "endpoint.nio.selectorCloseFail"), ioe); } break; } else { hasEvents = events(); } try { if ( !close ) { if (wakeupCounter.getAndSet(-1) > 0) { //if we are here, means we have other stuff to do //do a non blocking select keyCount = selector.selectNow(); } else { keyCount = selector.select(selectorTimeout); } wakeupCounter.set(0); } if (close) { events(); timeout(0, false); try { selector.close(); } catch (IOException ioe) { log.error(sm.getString( "endpoint.nio.selectorCloseFail"), ioe); } break; } } catch ( NullPointerException x ) { //sun bug 5076772 on windows JDK 1.5 if ( log.isDebugEnabled() ) log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5",x); if ( wakeupCounter == null || selector == null ) throw x; continue; } catch ( CancelledKeyException x ) { //sun bug 5076772 on windows JDK 1.5 if ( log.isDebugEnabled() ) log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5",x); if ( wakeupCounter == null || selector == null ) throw x; continue; } catch (Throwable x) { ExceptionUtils.handleThrowable(x); log.error("",x); continue; } //either we timed out or we woke up, process events first if ( keyCount == 0 ) hasEvents = (hasEvents | events()); Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; // Walk through the collection of ready keys and dispatch // any active event. while (iterator != null && iterator.hasNext()) { SelectionKey sk = iterator.next(); KeyAttachment attachment = (KeyAttachment)sk.attachment(); // Attachment may be null if another thread has called // cancelledKey() if (attachment == null) { iterator.remove(); } else { attachment.access(); iterator.remove(); processKey(sk, attachment); } }//while //process timeouts timeout(keyCount,hasEvents); if ( oomParachute > 0 && oomParachuteData == null ) checkParachute(); } catch (OutOfMemoryError oom) { try { oomParachuteData = null; releaseCaches(); log.error("", oom); }catch ( Throwable oomt ) { try { System.err.println(oomParachuteMsg); oomt.printStackTrace(); }catch (Throwable letsHopeWeDontGetHere){ ExceptionUtils.handleThrowable(letsHopeWeDontGetHere); } } } }//while stopLatch.countDown(); } }
Poller的run方法,監聽SelectionKey,而後交給org.apache.tomcat.util.net.NioEndpoint.Poller#processKey處理
protected boolean processKey(SelectionKey sk, KeyAttachment attachment) { boolean result = true; try { if ( close ) { cancelledKey(sk, SocketStatus.STOP, attachment.comet); } else if ( sk.isValid() && attachment != null ) { attachment.access();//make sure we don't time out valid sockets sk.attach(attachment);//cant remember why this is here NioChannel channel = attachment.getChannel(); if (sk.isReadable() || sk.isWritable() ) { if ( attachment.getSendfileData() != null ) { processSendfile(sk,attachment, false); } else { if ( isWorkerAvailable() ) { unreg(sk, attachment, sk.readyOps()); boolean closeSocket = false; // Read goes before write if (sk.isReadable()) { if (!processSocket(channel, SocketStatus.OPEN_READ, true)) { closeSocket = true; } } if (!closeSocket && sk.isWritable()) { if (!processSocket(channel, SocketStatus.OPEN_WRITE, true)) { closeSocket = true; } } if (closeSocket) { cancelledKey(sk,SocketStatus.DISCONNECT,false); } } else { result = false; } } } } else { //invalid key cancelledKey(sk, SocketStatus.ERROR,false); } } catch ( CancelledKeyException ckx ) { cancelledKey(sk, SocketStatus.ERROR,false); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error("",t); } return result; }
這裏有個關鍵知識點,processSendfile實現了零拷貝,可參考https://www.ibm.com/developer...
if ( attachment.getSendfileData() != null ) { processSendfile(sk,attachment, false); }
public SendfileState processSendfile(SelectionKey sk, KeyAttachment attachment, boolean calledByProcessor) { long written = sd.fchannel.transferTo(sd.pos,sd.length,wc); ... }
若是不是文件處理的socket則會走org.apache.tomcat.util.net.NioEndpoint#processSocket這條路。
public boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) { try { KeyAttachment attachment = (KeyAttachment)socket.getAttachment(); if (attachment == null) { return false; } attachment.setCometNotify(false); //will get reset upon next reg SocketProcessor sc = processorCache.poll(); if ( sc == null ) sc = new SocketProcessor(socket,status); else sc.reset(socket,status); if ( dispatch && getExecutor()!=null ) getExecutor().execute(sc); else sc.run(); } catch (RejectedExecutionException rx) { log.warn("Socket processing request was rejected for:"+socket,rx); return false; } catch (Throwable t) { ExceptionUtils.handleThrowable(t); // This means we got an OOM or similar creating a thread, or that // the pool and its queue are full log.error(sm.getString("endpoint.process.fail"), t); return false; } return true; }
這裏就會實例化SocketProcessor或複用SocketProcessor實例,SocketProcessor是一個Runnable,因此能夠交給線程池去處理,也就是最上面init階段初始化的org.apache.catalina.core.StandardThreadExecutor。
在這裏理下Connector的start會啓動Acceptor線程,也就是常看到的http-nio-8080-Acceptor-0,通常只有一個,代碼裏的註釋也寫着
// Initialize thread count default for acceptor if (acceptorThreadCount == 0) { // FIXME: Doesn't seem to work that well with multiple accept threads acceptorThreadCount = 1; }
多個accept線程看不出有更好。accept負責監聽鏈接,當有鏈接事後,會封裝NioChannel,而後增長Event到Poller監聽的events隊列裏。是有多個Poller,每一個Poller有本身的events對列,那麼accept會將NioChannel註冊到哪一個Poller呢?是這樣的輪訓算法:
/** * Return an available poller in true round robin fashion */ public Poller getPoller0() { int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length; return pollers[idx]; }
註冊到Poller後,這個註冊實際就會將socker與該Poller的Selector綁定,監聽READ事件。
後續Poller就能夠監聽SelectionKey來處理了。
Poller獲取SelectionKey會生成SocketProcessor交給StandardThreadExecutor線程池來執行。屬性是在Catalina裏由Digester設置規則org.apache.catalina.startup.SetAllPropertiesRule設置線程池的屬性,如maxThreads最大線程數.
此時Catalina大概就算start階段結束了。其實到accept和poller建立完,start就算結束了