Tomcat源碼解讀——初始化及啓動階段

從startup.sh文件,找到catalina.sh,而後找到啓動類:org.apache.catalina.startup.Bootstrapjava

初始化階段

1.Bootstrap的main方法,實例化本身,而後初始化一堆classloader,分別是commonLoader, serverLoader, sharedLoader,其中commonLoader在conf的ctalina.properties裏配置了讀取哪些jar包算法

common.loader=${catalina.base}/lib,${catalina.base}/lib/*.jar,${catalina.home}/lib,${catalina.home}/lib/*.jar

而另外兩個classloader是沒有指定目錄的,也就是沒有目錄讓他們加載。apache

2.調用Bootstrap的start方法,而後初始化org.apache.catalina.startup.Catalina對象,而後設置其classloader成員變量爲sharedLoader。
3.調用Catalina對象的start方法,這裏就稍微複雜點。windows

public void start() {

        if (getServer() == null) {
            load();
        }

        if (getServer() == null) {
            log.fatal("Cannot start server. Server instance is not configured.");
            return;
        }

        long t1 = System.nanoTime();

        // Start the new server
        try {
            getServer().start();
        } catch (LifecycleException e) {
            log.fatal(sm.getString("catalina.serverStartFail"), e);
            try {
                getServer().destroy();
            } catch (LifecycleException e1) {
                log.debug("destroy() failed for failed Server ", e1);
            }
            return;
        }

        long t2 = System.nanoTime();
        if(log.isInfoEnabled()) {
            log.info("Server startup in " + ((t2 - t1) / 1000000) + " ms");
        }

        // Register shutdown hook
        if (useShutdownHook) {
            if (shutdownHook == null) {
                shutdownHook = new CatalinaShutdownHook();
            }
            Runtime.getRuntime().addShutdownHook(shutdownHook);

            // If JULI is being used, disable JULI's shutdown hook since
            // shutdown hooks run in parallel and log messages may be lost
            // if JULI's hook completes before the CatalinaShutdownHook()
            LogManager logManager = LogManager.getLogManager();
            if (logManager instanceof ClassLoaderLogManager) {
                ((ClassLoaderLogManager) logManager).setUseShutdownHook(
                        false);
            }
        }

        if (await) {
            await();
            stop();
        }
    }

使用了Digester工具,邊讀取server.xml,邊實例化對象。實例化了org.apache.catalina.core.StandardServer對象,org.apache.catalina.deploy.NamingResources對象等等,Digester這工具可設置實例化默認class或讀取xml標籤中className屬性來實例化對應Catalina對象的成員變量(還能夠帶層次的,如實例化Server對象裏的Service成員變量)。如Server對象,就是默認org.apache.catalina.core.StandardServer對象。而後再實例化Server對象裏的Service成員變量org.apache.catalina.core.StandardService。還有就是自定義實例化規則,如ConnectorCreateRule,就會直接將Connector的標籤裏protocol做爲org.apache.catalina.connector.Connector#Connector(java.lang.String)的實例化參數,實例化org.apache.catalina.connector.Connector;tomcat

Digester這工具備四種功能,1.讀取xml標籤,實例化默認配置的class;2.讀取xml標籤中className屬性來實例化;3.能夠帶層次的設置成員變量;4.自定實例化規則。
//ConnectorCreateRule.java
//ConnectorCreateRule,就會直接將Connector的標籤裏protocol做爲org.apache.catalina.connector.Connector#Connector(java.lang.String)的實例化參數,實例化org.apache.catalina.connector.Connector;
 @Override
    public void begin(String namespace, String name, Attributes attributes)
            throws Exception {
        Service svc = (Service)digester.peek();
        Executor ex = null;
        if ( attributes.getValue("executor")!=null ) {
            ex = svc.getExecutor(attributes.getValue("executor"));
        }
        Connector con = new Connector(attributes.getValue("protocol"));
        if ( ex != null )  _setExecutor(con,ex);

        digester.push(con);
    }

還實例化org.apache.catalina.core.StandardThreadExecutor,這個Exector後續做爲Connector處理ServerProcessor使用
第3點,實例化不少對象,當時爲了便於理解主幹,其餘的先忽略。session

4.實例化上面一堆對象後,會調用Server對象的init方法,但因爲是繼承org.apache.catalina.util.LifecycleBase,因此也就是調用LifecycleBase的init方法。app

@Override
    protected void initInternal() throws LifecycleException { 
...這裏還有一些代碼,先忽略
        // Initialize our defined Services
        for (int i = 0; i < services.length; i++) {
            services[i].init();
        }
    }

5.Server對象init也是調用一堆Service的init方法(意思說server.xml的Service標籤是能夠配置多個),很尷尬,Service的init方法又是調用LifecycleBase的init方法,這一塊可能形成理解混亂,由於Server和Service名字很像,並且init方法又是調用LifecycleBase的init模版方法。socket

@Override
    protected void initInternal() throws LifecycleException { 
...這裏還有一些代碼,先忽略
        // Initialize our defined Connectors
        synchronized (connectorsLock) {
            for (Connector connector : connectors) {
                try {
                    connector.init();
                } catch (Exception e) {
                    String message = sm.getString(
                            "standardService.connector.initFailed", connector);
                    log.error(message, e);

                    if (Boolean.getBoolean("org.apache.catalina.startup.EXIT_ON_INIT_FAILURE"))
                        throw new LifecycleException(message);
                }
            }
        }
    }
  1. Service對象調用Connector的init方法。也是同樣調用LifecycleBase的init模版方法。
@Override
    protected void initInternal() throws LifecycleException {

        super.initInternal();

        // Initialize adapter
        adapter = new CoyoteAdapter(this);
        protocolHandler.setAdapter(adapter);

        // Make sure parseBodyMethodsSet has a default
        if (null == parseBodyMethodsSet) {
            setParseBodyMethods(getParseBodyMethods());
        }

        if (protocolHandler.isAprRequired() &&
                !AprLifecycleListener.isAprAvailable()) {
            throw new LifecycleException(
                    sm.getString("coyoteConnector.protocolHandlerNoApr",
                            getProtocolHandlerClassName()));
        }

        try {
            protocolHandler.init();
        } catch (Exception e) {
            throw new LifecycleException(
                    sm.getString("coyoteConnector.protocolHandlerInitializationFailed"), e);
        }

        // Initialize mapper listener
        mapperListener.init();
    }

這裏就調用protocolHandler的init方法。protocolHandler從哪裏來的呢?就第3點說的,自定義實例化規則ConnectorCreateRule裏,將Connector標籤的protocol屬性做爲Connector構造方法參數tcp

public Connector(String protocol) {
        setProtocol(protocol);
        // Instantiate protocol handler
        try {
            Class<?> clazz = Class.forName(protocolHandlerClassName);
            this.protocolHandler = (ProtocolHandler) clazz.getDeclaredConstructor().newInstance();
        } catch (Exception e) {
            log.error(sm.getString(
                    "coyoteConnector.protocolHandlerInstantiationFailed"), e);
        }

        // Default for Connector depends on this (deprecated) system property
        if (Boolean.parseBoolean(System.getProperty("org.apache.tomcat.util.buf.UDecoder.ALLOW_ENCODED_SLASH", "false"))) {
            encodedSolidusHandling = EncodedSolidusHandling.DECODE;
        }
    }

7.protocolHandler的init方法,是在org.apache.coyote.AbstractProtocol#initide

@Override
    public void init() throws Exception {
...這裏還有一些代碼,先忽略
        String endpointName = getName();
        endpoint.setName(endpointName.substring(1, endpointName.length()-1));

        try {
            endpoint.init();
        } catch (Exception ex) {
            getLog().error(sm.getString("abstractProtocolHandler.initError",
                    getName()), ex);
            throw ex;
        }
    }

這裏咱們用org.apache.coyote.http11.Http11NioProtocol這個Protocal繼續

public Http11NioProtocol() {
        endpoint=new NioEndpoint();
        cHandler = new Http11ConnectionHandler(this);
        ((NioEndpoint) endpoint).setHandler(cHandler);
        setSoLinger(Constants.DEFAULT_CONNECTION_LINGER);
        setSoTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT);
        setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY);
    }

8.protocolHandler的init方法,調用NioEndpoint的init方法. NioEndpoint繼承org.apache.tomcat.util.net.AbstractEndpoint的init模版方法

public final void init() throws Exception {
        testServerCipherSuitesOrderSupport();
        if (bindOnInit) {
            bind();
            bindState = BindState.BOUND_ON_INIT;
        }
    }

9.bind方法就是交給子類實現,咱們看org.apache.tomcat.util.net.NioEndpoint#bind的實現

@Override
    public void bind() throws Exception {

        serverSock = ServerSocketChannel.open();
        socketProperties.setProperties(serverSock.socket());
        InetSocketAddress addr = (getAddress()!=null?new InetSocketAddress(getAddress(),getPort()):new InetSocketAddress(getPort()));
        serverSock.socket().bind(addr,getBacklog());
        serverSock.configureBlocking(true); //mimic APR behavior
        if (getSocketProperties().getSoTimeout() >= 0) {
            serverSock.socket().setSoTimeout(getSocketProperties().getSoTimeout());
        }

        // Initialize thread count defaults for acceptor, poller
        if (acceptorThreadCount == 0) {
            // FIXME: Doesn't seem to work that well with multiple accept threads
            acceptorThreadCount = 1;
        }
        if (pollerThreadCount <= 0) {
            //minimum one poller thread
            pollerThreadCount = 1;
        }
        stopLatch = new CountDownLatch(pollerThreadCount);

        // Initialize SSL if needed
        if (isSSLEnabled()) {
            SSLUtil sslUtil = handler.getSslImplementation().getSSLUtil(this);

            sslContext = sslUtil.createSSLContext();
            sslContext.init(wrap(sslUtil.getKeyManagers()),
                    sslUtil.getTrustManagers(), null);

            SSLSessionContext sessionContext =
                sslContext.getServerSessionContext();
            if (sessionContext != null) {
                sslUtil.configureSessionContext(sessionContext);
            }
            // Determine which cipher suites and protocols to enable
            enabledCiphers = sslUtil.getEnableableCiphers(sslContext);
            enabledProtocols = sslUtil.getEnableableProtocols(sslContext);
        }

        if (oomParachute>0) reclaimParachute(true);
        selectorPool.open();
    }

這裏咱們看到使用了實例化了ServerSocketChannel,並設置爲阻塞模式。但沒看到ServerSocketChannel註冊Selector。不過調用了org.apache.tomcat.util.net.NioSelectorPool#open來實例化Selector,但沒註冊

protected Selector getSharedSelector() throws IOException {
        if (SHARED && SHARED_SELECTOR == null) {
            synchronized ( NioSelectorPool.class ) {
                if ( SHARED_SELECTOR == null )  {
                    synchronized (Selector.class) {
                        // Selector.open() isn't thread safe
                        // http://bugs.sun.com/view_bug.do?bug_id=6427854
                        // Affects 1.6.0_29, fixed in 1.7.0_01
                        SHARED_SELECTOR = Selector.open();
                    }
                    log.info("Using a shared selector for servlet write/read");
                }
            }
        }
        return  SHARED_SELECTOR;
    }
  public void open() throws IOException {
        enabled = true;
        getSharedSelector();
        if (SHARED) {
            blockingSelector = new NioBlockingSelector();
            blockingSelector.open(getSharedSelector());
        }

    }

而後將Selector賦值給NioBlockingSelector成員變量.

public void open(Selector selector) {
        sharedSelector = selector;
        poller = new BlockPoller();
        poller.selector = sharedSelector;
        poller.setDaemon(true);
        poller.setName("NioBlockingSelector.BlockPoller-"+(threadCounter.getAndIncrement()));
        poller.start();
    }

BlockPoller是個線程對象

protected static class BlockPoller extends Thread {
       protected volatile boolean run = true;
        protected Selector selector = null;
        protected ConcurrentLinkedQueue<Runnable> events = new ConcurrentLinkedQueue<Runnable>();
 }

維護了一個隊列events,用於

NioEndPoint --> NioSelectorPool --> NioBlockingSelector --> BlockPoller

Catalina對象的start方法中初始化部分的中,相關主幹已經初始化完畢。

下一步,執行getServer().start();方法

啓動階段

1.Server的start方法,又開始一輪的org.apache.catalina.util.LifecycleBase#start模版方法。

@Override
    protected void startInternal() throws LifecycleException {

        fireLifecycleEvent(CONFIGURE_START_EVENT, null);
        setState(LifecycleState.STARTING);

        globalNamingResources.start();

        // Start our defined Services
        synchronized (servicesLock) {
            for (int i = 0; i < services.length; i++) {
                services[i].start();
            }
        }
    }

2.Service的start方法(org.apache.catalina.util.LifecycleBase#start模版方法)

@Override
    protected void startInternal() throws LifecycleException {

...這裏還有一些代碼,先忽略
        // Start our defined Connectors second
        synchronized (connectorsLock) {
            for (Connector connector: connectors) {
                try {
                    // If it has already failed, don't try and start it
                    if (connector.getState() != LifecycleState.FAILED) {
                        connector.start();
                    }
                } catch (Exception e) {
                    log.error(sm.getString(
                            "standardService.connector.startFailed",
                            connector), e);
                }
            }
        }
    }

3.調用Connector的start方法(org.apache.catalina.util.LifecycleBase#start模版方法)

@Override
    protected void startInternal() throws LifecycleException {

        // Validate settings before starting
        if (getPort() < 0) {
            throw new LifecycleException(sm.getString(
                    "coyoteConnector.invalidPort", Integer.valueOf(getPort())));
        }

        setState(LifecycleState.STARTING);

        try {
            protocolHandler.start();
        } catch (Exception e) {
            String errPrefix = "";
            if(this.service != null) {
                errPrefix += "service.getName(): \"" + this.service.getName() + "\"; ";
            }

            throw new LifecycleException
            (errPrefix + " " + sm.getString
                    ("coyoteConnector.protocolHandlerStartFailed"), e);
        }

        mapperListener.start();
    }

4.protocolHandler.start(),但這裏不是調用org.apache.catalina.util.LifecycleBase#start模版方法,而是調用org.apache.coyote.AbstractProtocol#start的方法。

@Override
    public void start() throws Exception {
        if (getLog().isInfoEnabled())
            getLog().info(sm.getString("abstractProtocolHandler.start",
                    getName()));
        try {
            endpoint.start();
        } catch (Exception ex) {
            getLog().error(sm.getString("abstractProtocolHandler.startError",
                    getName()), ex);
            throw ex;
        }
    }

5.endpoint.start()調用了org.apache.tomcat.util.net.AbstractEndpoint#start方法

public final void start() throws Exception {
        if (bindState == BindState.UNBOUND) {
            bind();
            bindState = BindState.BOUND_ON_START;
        }
        startInternal();
    }

這裏的bind已經init階段調用過,因此這裏不會調用,而是繼續startInternal方法
6.startInternal方法是交給子類實現,這裏是org.apache.tomcat.util.net.NioEndpoint#startInternal

@Override
    public void startInternal() throws Exception {

        if (!running) {
            running = true;
            paused = false;

            // Create worker collection
            if ( getExecutor() == null ) {
                createExecutor();
            }

            initializeConnectionLatch();

            // Start poller threads
            pollers = new Poller[getPollerThreadCount()];
            for (int i=0; i<pollers.length; i++) {
                pollers[i] = new Poller();
                Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
                pollerThread.setPriority(threadPriority);
                pollerThread.setDaemon(true);
                pollerThread.start();
            }

            startAcceptorThreads();
        }
    }

建立org.apache.tomcat.util.net.NioEndpoint.Acceptor來循環接收鏈接。countUpOrAwaitConnection的方法就是用於判斷是否繼續從OS隊列中獲取鏈接。裏面的數量就是由maxConnections配置的,若是在BIO狀況下,maxConnections=maxThreads,因此等價於當工做線程都處於繁忙時,則acceptor會等待工做線程空閒纔會去獲取來鏈接。而在NIO狀況下,maxConnections默認等於100000,則不會等工做線程繁忙,而是繼續從OS隊列中獲取鏈接,放在events隊列中.

@Override
        public void run() {

            int errorDelay = 0;

            // Loop until we receive a shutdown command
            while (running) {

                // Loop if endpoint is paused
                while (paused && running) {
                    state = AcceptorState.PAUSED;
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        // Ignore
                    }
                }

                if (!running) {
                    break;
                }
                state = AcceptorState.RUNNING;

                try {
                    //if we have reached max connections, wait
                    countUpOrAwaitConnection();

                    SocketChannel socket = null;
                    try {
                        // Accept the next incoming connection from the server
                        // socket
                        socket = serverSock.accept();
                    } catch (IOException ioe) {
                        //we didn't get a socket
                        countDownConnection();
                        // Introduce delay if necessary
                        errorDelay = handleExceptionWithDelay(errorDelay);
                        // re-throw
                        throw ioe;
                    }
                    // Successful accept, reset the error delay
                    errorDelay = 0;

                    // setSocketOptions() will add channel to the poller
                    // if successful
                    if (running && !paused) {
                        if (!setSocketOptions(socket)) {
                            countDownConnection();
                            closeSocket(socket);
                        }
                    } else {
                        countDownConnection();
                        closeSocket(socket);
                    }
                } catch (SocketTimeoutException sx) {
                    // Ignore: Normal condition
                } catch (IOException x) {
                    if (running) {
                        log.error(sm.getString("endpoint.accept.fail"), x);
                    }
                } catch (OutOfMemoryError oom) {
                    try {
                        oomParachuteData = null;
                        releaseCaches();
                        log.error("", oom);
                    }catch ( Throwable oomt ) {
                        try {
                            try {
                                System.err.println(oomParachuteMsg);
                                oomt.printStackTrace();
                            }catch (Throwable letsHopeWeDontGetHere){
                                ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
                            }
                        }catch (Throwable letsHopeWeDontGetHere){
                            ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
                        }
                    }
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    log.error(sm.getString("endpoint.accept.fail"), t);
                }
            }
            state = AcceptorState.ENDED;
        }

每接收到鏈接,也就是socket,會調用org.apache.tomcat.util.net.NioEndpoint#setSocketOptions方法來處理socket。

protected boolean setSocketOptions(SocketChannel socket) {
        // Process the connection
        try {
            //disable blocking, APR style, we are gonna be polling it
            socket.configureBlocking(false);
            Socket sock = socket.socket();
            socketProperties.setProperties(sock);

            NioChannel channel = nioChannels.poll();
            if ( channel == null ) {
                // SSL setup
                if (sslContext != null) {
                    SSLEngine engine = createSSLEngine();
                    int appbufsize = engine.getSession().getApplicationBufferSize();
                    NioBufferHandler bufhandler = new NioBufferHandler(Math.max(appbufsize,socketProperties.getAppReadBufSize()),
                                                                       Math.max(appbufsize,socketProperties.getAppWriteBufSize()),
                                                                       socketProperties.getDirectBuffer());
                    channel = new SecureNioChannel(socket, engine, bufhandler, selectorPool);
                } else {
                    // normal tcp setup
                    NioBufferHandler bufhandler = new NioBufferHandler(socketProperties.getAppReadBufSize(),
                                                                       socketProperties.getAppWriteBufSize(),
                                                                       socketProperties.getDirectBuffer());

                    channel = new NioChannel(socket, bufhandler);
                }
            } else {
                channel.setIOChannel(socket);
                if ( channel instanceof SecureNioChannel ) {
                    SSLEngine engine = createSSLEngine();
                    ((SecureNioChannel)channel).reset(engine);
                } else {
                    channel.reset();
                }
            }
            getPoller0().register(channel);
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            try {
                log.error("",t);
            } catch (Throwable tt) {
                ExceptionUtils.handleThrowable(tt);
            }
            // Tell to close the socket
            return false;
        }
        return true;
    }

注意到,這裏socket設置了非阻塞,而後實例化NioChannel,而後註冊到Poller裏。Poller的註冊方法:

public void register(final NioChannel socket) {
            socket.setPoller(this);
            KeyAttachment key = keyCache.poll();
            final KeyAttachment ka = key!=null?key:new KeyAttachment(socket);
            ka.reset(this,socket,getSocketProperties().getSoTimeout());
            ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
            ka.setSecure(isSSLEnabled());
            PollerEvent r = eventCache.poll();
            ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
            if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
            else r.reset(socket,ka,OP_REGISTER);
            addEvent(r);
        }

建立一個監聽READ的事件。註冊到Poller裏的Selector上。是否是看出叻,實際在PollerEvent裏完成叻,雖然看去好像OP_REGISTER,NIO好像沒有這事件,其實在PollerEvent會將OP_REGISTER轉爲SelectionKey.OP_READ。

*/
    public static class PollerEvent implements Runnable {

        protected NioChannel socket;
        protected int interestOps;
        protected KeyAttachment key;
        public PollerEvent(NioChannel ch, KeyAttachment k, int intOps) {
            reset(ch, k, intOps);
        }

        public void reset(NioChannel ch, KeyAttachment k, int intOps) {
            socket = ch;
            interestOps = intOps;
            key = k;
        }

        public void reset() {
            reset(null, null, 0);
        }

        @Override
        public void run() {
            if ( interestOps == OP_REGISTER ) {
                try {
                    socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, key);
                } catch (Exception x) {
                    log.error("", x);
                }
            } else {
                final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
                try {
                    if (key == null) {
                        // The key was cancelled (e.g. due to socket closure)
                        // and removed from the selector while it was being
                        // processed. Count down the connections at this point
                        // since it won't have been counted down when the socket
                        // closed.
                        socket.getPoller().getEndpoint().countDownConnection();
                    } else {
                        final KeyAttachment att = (KeyAttachment) key.attachment();
                        if ( att!=null ) {
                            //handle callback flag
                            if (att.isComet() && (interestOps & OP_CALLBACK) == OP_CALLBACK ) {
                                att.setCometNotify(true);
                            } else {
                                att.setCometNotify(false);
                            }
                            interestOps = (interestOps & (~OP_CALLBACK));//remove the callback flag
                            att.access();//to prevent timeout
                            //we are registering the key to start with, reset the fairness counter.
                            int ops = key.interestOps() | interestOps;
                            att.interestOps(ops);
                            key.interestOps(ops);
                        } else {
                            socket.getPoller().cancelledKey(key, SocketStatus.ERROR, false);
                        }
                    }
                } catch (CancelledKeyException ckx) {
                    try {
                        socket.getPoller().cancelledKey(key, SocketStatus.DISCONNECT, true);
                    } catch (Exception ignore) {}
                }
            }//end if
        }//run

        @Override
        public String toString() {
            return super.toString()+"[intOps="+this.interestOps+"]";
        }
    }

因爲 socket.setPoller(this);設置poller,因此socker能夠註冊poller裏selector。
而Poller是線程,由上面org.apache.tomcat.util.net.NioEndpoint#startInternal實例化了好幾個,也就是運行時看到的線程名:http-nio-8080-ClientPoller-1

/**
     * Poller class.
     */
    public class Poller implements Runnable {

        protected Selector selector;
        protected ConcurrentLinkedQueue<Runnable> events = new ConcurrentLinkedQueue<Runnable>();

        protected volatile boolean close = false;
        protected long nextExpiration = 0;//optimize expiration handling

        protected AtomicLong wakeupCounter = new AtomicLong(0l);

        protected volatile int keyCount = 0;

        public Poller() throws IOException {
            synchronized (Selector.class) {
                // Selector.open() isn't thread safe
                // http://bugs.sun.com/view_bug.do?bug_id=6427854
                // Affects 1.6.0_29, fixed in 1.7.0_01
                this.selector = Selector.open();
            }
        }
        ...
        public boolean events() {
            boolean result = false;

            Runnable r = null;
            for (int i = 0, size = events.size(); i < size && (r = events.poll()) != null; i++ ) {
                result = true;
                try {
                    r.run();
                    if ( r instanceof PollerEvent ) {
                        ((PollerEvent)r).reset();
                        eventCache.offer((PollerEvent)r);
                    }
                } catch ( Throwable x ) {
                    log.error("",x);
                }
            }

            return result;
        }

        public void register(final NioChannel socket) {
            socket.setPoller(this);
            KeyAttachment key = keyCache.poll();
            final KeyAttachment ka = key!=null?key:new KeyAttachment(socket);
            ka.reset(this,socket,getSocketProperties().getSoTimeout());
            ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
            ka.setSecure(isSSLEnabled());
            PollerEvent r = eventCache.poll();
            ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
            if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
            else r.reset(socket,ka,OP_REGISTER);
            addEvent(r);
        }

        public KeyAttachment cancelledKey(SelectionKey key, SocketStatus status, boolean dispatch) {
            KeyAttachment ka = null;
            try {
                if ( key == null ) return null;//nothing to do
                ka = (KeyAttachment) key.attachment();
                if (ka != null && ka.isComet() && status != null) {
                    //the comet event takes care of clean up
                    //processSocket(ka.getChannel(), status, dispatch);
                    ka.setComet(false);//to avoid a loop
                    if (status == SocketStatus.TIMEOUT ) {
                        if (processSocket(ka.getChannel(), status, true)) {
                            return null; // don't close on comet timeout
                        }
                    } else {
                        // Don't dispatch if the lines below are cancelling the key
                        processSocket(ka.getChannel(), status, false);
                    }
                }
                ka = (KeyAttachment) key.attach(null);
                if (ka!=null) handler.release(ka);
                else handler.release((SocketChannel)key.channel());
                if (key.isValid()) key.cancel();
                // If it is available, close the NioChannel first which should
                // in turn close the underlying SocketChannel. The NioChannel
                // needs to be closed first, if available, to ensure that TLS
                // connections are shut down cleanly.
                if (ka != null) {
                    try {
                        ka.getSocket().close(true);
                    } catch (Exception e){
                        if (log.isDebugEnabled()) {
                            log.debug(sm.getString(
                                    "endpoint.debug.socketCloseFail"), e);
                        }
                    }
                }
                // The SocketChannel is also available via the SelectionKey. If
                // it hasn't been closed in the block above, close it now.
                if (key.channel().isOpen()) {
                    try {
                        key.channel().close();
                    } catch (Exception e) {
                        if (log.isDebugEnabled()) {
                            log.debug(sm.getString(
                                    "endpoint.debug.channelCloseFail"), e);
                        }
                    }
                }
                try {
                    if (ka != null && ka.getSendfileData() != null
                            && ka.getSendfileData().fchannel != null
                            && ka.getSendfileData().fchannel.isOpen()) {
                        ka.getSendfileData().fchannel.close();
                    }
                } catch (Exception ignore) {
                }
                if (ka!=null) {
                    ka.reset();
                    countDownConnection();
                }
            } catch (Throwable e) {
                ExceptionUtils.handleThrowable(e);
                if (log.isDebugEnabled()) log.error("",e);
            }
            return ka;
        }
        /**
         * The background thread that listens for incoming TCP/IP connections and
         * hands them off to an appropriate processor.
         */
        @Override
        public void run() {
            // Loop until destroy() is called
            while (true) {
                try {
                    // Loop if endpoint is paused
                    while (paused && (!close) ) {
                        try {
                            Thread.sleep(100);
                        } catch (InterruptedException e) {
                            // Ignore
                        }
                    }

                    boolean hasEvents = false;

                    // Time to terminate?
                    if (close) {
                        events();
                        timeout(0, false);
                        try {
                            selector.close();
                        } catch (IOException ioe) {
                            log.error(sm.getString(
                                    "endpoint.nio.selectorCloseFail"), ioe);
                        }
                        break;
                    } else {
                        hasEvents = events();
                    }
                    try {
                        if ( !close ) {
                            if (wakeupCounter.getAndSet(-1) > 0) {
                                //if we are here, means we have other stuff to do
                                //do a non blocking select
                                keyCount = selector.selectNow();
                            } else {
                                keyCount = selector.select(selectorTimeout);
                            }
                            wakeupCounter.set(0);
                        }
                        if (close) {
                            events();
                            timeout(0, false);
                            try {
                                selector.close();
                            } catch (IOException ioe) {
                                log.error(sm.getString(
                                        "endpoint.nio.selectorCloseFail"), ioe);
                            }
                            break;
                        }
                    } catch ( NullPointerException x ) {
                        //sun bug 5076772 on windows JDK 1.5
                        if ( log.isDebugEnabled() ) log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5",x);
                        if ( wakeupCounter == null || selector == null ) throw x;
                        continue;
                    } catch ( CancelledKeyException x ) {
                        //sun bug 5076772 on windows JDK 1.5
                        if ( log.isDebugEnabled() ) log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5",x);
                        if ( wakeupCounter == null || selector == null ) throw x;
                        continue;
                    } catch (Throwable x) {
                        ExceptionUtils.handleThrowable(x);
                        log.error("",x);
                        continue;
                    }
                    //either we timed out or we woke up, process events first
                    if ( keyCount == 0 ) hasEvents = (hasEvents | events());

                    Iterator<SelectionKey> iterator =
                        keyCount > 0 ? selector.selectedKeys().iterator() : null;
                    // Walk through the collection of ready keys and dispatch
                    // any active event.
                    while (iterator != null && iterator.hasNext()) {
                        SelectionKey sk = iterator.next();
                        KeyAttachment attachment = (KeyAttachment)sk.attachment();
                        // Attachment may be null if another thread has called
                        // cancelledKey()
                        if (attachment == null) {
                            iterator.remove();
                        } else {
                            attachment.access();
                            iterator.remove();
                            processKey(sk, attachment);
                        }
                    }//while

                    //process timeouts
                    timeout(keyCount,hasEvents);
                    if ( oomParachute > 0 && oomParachuteData == null ) checkParachute();
                } catch (OutOfMemoryError oom) {
                    try {
                        oomParachuteData = null;
                        releaseCaches();
                        log.error("", oom);
                    }catch ( Throwable oomt ) {
                        try {
                            System.err.println(oomParachuteMsg);
                            oomt.printStackTrace();
                        }catch (Throwable letsHopeWeDontGetHere){
                            ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
                        }
                    }
                }
            }//while

            stopLatch.countDown();
        }
    }

Poller的run方法,監聽SelectionKey,而後交給org.apache.tomcat.util.net.NioEndpoint.Poller#processKey處理

protected boolean processKey(SelectionKey sk, KeyAttachment attachment) {
            boolean result = true;
            try {
                if ( close ) {
                    cancelledKey(sk, SocketStatus.STOP, attachment.comet);
                } else if ( sk.isValid() && attachment != null ) {
                    attachment.access();//make sure we don't time out valid sockets
                    sk.attach(attachment);//cant remember why this is here
                    NioChannel channel = attachment.getChannel();
                    if (sk.isReadable() || sk.isWritable() ) {
                        if ( attachment.getSendfileData() != null ) {
                            processSendfile(sk,attachment, false);
                        } else {
                            if ( isWorkerAvailable() ) {
                                unreg(sk, attachment, sk.readyOps());
                                boolean closeSocket = false;
                                // Read goes before write
                                if (sk.isReadable()) {
                                    if (!processSocket(channel, SocketStatus.OPEN_READ, true)) {
                                        closeSocket = true;
                                    }
                                }
                                if (!closeSocket && sk.isWritable()) {
                                    if (!processSocket(channel, SocketStatus.OPEN_WRITE, true)) {
                                        closeSocket = true;
                                    }
                                }
                                if (closeSocket) {
                                    cancelledKey(sk,SocketStatus.DISCONNECT,false);
                                }
                            } else {
                                result = false;
                            }
                        }
                    }
                } else {
                    //invalid key
                    cancelledKey(sk, SocketStatus.ERROR,false);
                }
            } catch ( CancelledKeyException ckx ) {
                cancelledKey(sk, SocketStatus.ERROR,false);
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                log.error("",t);
            }
            return result;
        }

這裏有個關鍵知識點,processSendfile實現了零拷貝,可參考https://www.ibm.com/developer...

if ( attachment.getSendfileData() != null ) {
                            processSendfile(sk,attachment, false);
                        }
public SendfileState processSendfile(SelectionKey sk, KeyAttachment attachment,
                boolean calledByProcessor) {
            long written = sd.fchannel.transferTo(sd.pos,sd.length,wc);
            ...
        }

若是不是文件處理的socket則會走org.apache.tomcat.util.net.NioEndpoint#processSocket這條路。

public boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {
        try {
            KeyAttachment attachment = (KeyAttachment)socket.getAttachment();
            if (attachment == null) {
                return false;
            }
            attachment.setCometNotify(false); //will get reset upon next reg
            SocketProcessor sc = processorCache.poll();
            if ( sc == null ) sc = new SocketProcessor(socket,status);
            else sc.reset(socket,status);
            if ( dispatch && getExecutor()!=null ) getExecutor().execute(sc);
            else sc.run();
        } catch (RejectedExecutionException rx) {
            log.warn("Socket processing request was rejected for:"+socket,rx);
            return false;
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            // This means we got an OOM or similar creating a thread, or that
            // the pool and its queue are full
            log.error(sm.getString("endpoint.process.fail"), t);
            return false;
        }
        return true;
    }

這裏就會實例化SocketProcessor或複用SocketProcessor實例,SocketProcessor是一個Runnable,因此能夠交給線程池去處理,也就是最上面init階段初始化的org.apache.catalina.core.StandardThreadExecutor。
在這裏理下Connector的start會啓動Acceptor線程,也就是常看到的http-nio-8080-Acceptor-0,通常只有一個,代碼裏的註釋也寫着

// Initialize thread count default for acceptor
        if (acceptorThreadCount == 0) {
            // FIXME: Doesn't seem to work that well with multiple accept threads
            acceptorThreadCount = 1;
        }

多個accept線程看不出有更好。accept負責監聽鏈接,當有鏈接事後,會封裝NioChannel,而後增長Event到Poller監聽的events隊列裏。是有多個Poller,每一個Poller有本身的events對列,那麼accept會將NioChannel註冊到哪一個Poller呢?是這樣的輪訓算法:

/**
     * Return an available poller in true round robin fashion
     */
    public Poller getPoller0() {
        int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
        return pollers[idx];
    }

註冊到Poller後,這個註冊實際就會將socker與該Poller的Selector綁定,監聽READ事件。
後續Poller就能夠監聽SelectionKey來處理了。

Poller獲取SelectionKey會生成SocketProcessor交給StandardThreadExecutor線程池來執行。屬性是在Catalina裏由Digester設置規則org.apache.catalina.startup.SetAllPropertiesRule設置線程池的屬性,如maxThreads最大線程數.

此時Catalina大概就算start階段結束了。其實到accept和poller建立完,start就算結束了

相關文章
相關標籤/搜索