Tomcat源碼解析系列(十五)ConnectionHandler

前言
上篇文章中分析了 Poller 和 PollerEvent,Poller#processSocket 方法裏獲取了一個 SocketProcessorBase 來處理SocketChannel 的讀寫事件,在 SocketProcessorBase 的子類 SocketProcessor#doRun 方法裏經過 getHandler().process(socketWrapper, event) 來處理,這個 getHandler() 獲取的就是 ConnectionHandler 對象。java


1. ConnectionHandler#process
ConnectionHandler 是 AbstractProtocol 裏的內部類,其聲明爲react

protected static class ConnectionHandler<S> implements AbstractEndpoint.Handler<S>

下面看它的 process 方法segmentfault

@Override
public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
    if (getLog().isDebugEnabled()) {
        getLog().debug(sm.getString("abstractConnectionHandler.process",
                wrapper.getSocket(), status));
    }
    if (wrapper == null) {
        // Nothing to do. Socket has been closed.
        return SocketState.CLOSED;
    }

    S socket = wrapper.getSocket();

    Processor processor = connections.get(socket);
    if (getLog().isDebugEnabled()) {
        getLog().debug(sm.getString("abstractConnectionHandler.connectionsGet",
                processor, socket));
    }

    // Async timeouts are calculated on a dedicated thread and then
    // dispatched. Because of delays in the dispatch process, the
    // timeout may no longer be required. Check here and avoid
    // unnecessary processing.
    if (SocketEvent.TIMEOUT == status && (processor == null ||
            !processor.isAsync() || !processor.checkAsyncTimeoutGeneration())) {
        // This is effectively a NO-OP
        return SocketState.OPEN;
    }

    if (processor != null) {
        // Make sure an async timeout doesn't fire
        getProtocol().removeWaitingProcessor(processor);
    } else if (status == SocketEvent.DISCONNECT || status == SocketEvent.ERROR) {
        // Nothing to do. Endpoint requested a close and there is no
        // longer a processor associated with this socket.
        return SocketState.CLOSED;
    }

    ContainerThreadMarker.set();

    try {
        if (processor == null) {
            String negotiatedProtocol = wrapper.getNegotiatedProtocol();
            if (negotiatedProtocol != null) {
                UpgradeProtocol upgradeProtocol =
                        getProtocol().getNegotiatedProtocol(negotiatedProtocol);
                if (upgradeProtocol != null) {
                    processor = upgradeProtocol.getProcessor(
                            wrapper, getProtocol().getAdapter());
                } else if (negotiatedProtocol.equals("http/1.1")) {
                    // Explicitly negotiated the default protocol.
                    // Obtain a processor below.
                } else {
                    // TODO:
                    // OpenSSL 1.0.2's ALPN callback doesn't support
                    // failing the handshake with an error if no
                    // protocol can be negotiated. Therefore, we need to
                    // fail the connection here. Once this is fixed,
                    // replace the code below with the commented out
                    // block.
                    if (getLog().isDebugEnabled()) {
                        getLog().debug(sm.getString(
                            "abstractConnectionHandler.negotiatedProcessor.fail",
                            negotiatedProtocol));
                    }
                    return SocketState.CLOSED;
                    /*
                     * To replace the code above once OpenSSL 1.1.0 is
                     * used.
                    // Failed to create processor. This is a bug.
                    throw new IllegalStateException(sm.getString(
                            "abstractConnectionHandler.negotiatedProcessor.fail",
                            negotiatedProtocol));
                    */
                }
            }
        }
        if (processor == null) {
            processor = recycledProcessors.pop();
            if (getLog().isDebugEnabled()) {
                getLog().debug(sm.getString("abstractConnectionHandler.processorPop",
                        processor));
            }
        }
        if (processor == null) {
            processor = getProtocol().createProcessor();
            register(processor);
        }

        processor.setSslSupport(
                wrapper.getSslSupport(getProtocol().getClientCertProvider()));

        // Associate the processor with the connection
        connections.put(socket, processor);

        SocketState state = SocketState.CLOSED;
        do {
            state = processor.process(wrapper, status);

            if (state == SocketState.UPGRADING) {
                // Get the HTTP upgrade handler
                UpgradeToken upgradeToken = processor.getUpgradeToken();
                // Retrieve leftover input
                ByteBuffer leftOverInput = processor.getLeftoverInput();
                if (upgradeToken == null) {
                    // Assume direct HTTP/2 connection
                    UpgradeProtocol upgradeProtocol = getProtocol().getUpgradeProtocol("h2c");
                    if (upgradeProtocol != null) {
                        processor = upgradeProtocol.getProcessor(
                                wrapper, getProtocol().getAdapter());
                        wrapper.unRead(leftOverInput);
                        // Associate with the processor with the connection
                        connections.put(socket, processor);
                    } else {
                        if (getLog().isDebugEnabled()) {
                            getLog().debug(sm.getString(
                                "abstractConnectionHandler.negotiatedProcessor.fail",
                                "h2c"));
                        }
                        return SocketState.CLOSED;
                    }
                } else {
                    HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler();
                    // Release the Http11 processor to be re-used
                    release(processor);
                    // Create the upgrade processor
                    processor = getProtocol().createUpgradeProcessor(wrapper, upgradeToken);
                    if (getLog().isDebugEnabled()) {
                        getLog().debug(sm.getString("abstractConnectionHandler.upgradeCreate",
                                processor, wrapper));
                    }
                    wrapper.unRead(leftOverInput);
                    // Mark the connection as upgraded
                    wrapper.setUpgraded(true);
                    // Associate with the processor with the connection
                    connections.put(socket, processor);
                    // Initialise the upgrade handler (which may trigger
                    // some IO using the new protocol which is why the lines
                    // above are necessary)
                    // This cast should be safe. If it fails the error
                    // handling for the surrounding try/catch will deal with
                    // it.
                    if (upgradeToken.getInstanceManager() == null) {
                        httpUpgradeHandler.init((WebConnection) processor);
                    } else {
                        ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null);
                        try {
                            httpUpgradeHandler.init((WebConnection) processor);
                        } finally {
                            upgradeToken.getContextBind().unbind(false, oldCL);
                        }
                    }
                }
            }
        } while ( state == SocketState.UPGRADING);

        if (state == SocketState.LONG) {
            // In the middle of processing a request/response. Keep the
            // socket associated with the processor. Exact requirements
            // depend on type of long poll
            longPoll(wrapper, processor);
            if (processor.isAsync()) {
                getProtocol().addWaitingProcessor(processor);
            }
        } else if (state == SocketState.OPEN) {
            // In keep-alive but between requests. OK to recycle
            // processor. Continue to poll for the next request.
            connections.remove(socket);
            release(processor);
            wrapper.registerReadInterest();
        } else if (state == SocketState.SENDFILE) {
            // Sendfile in progress. If it fails, the socket will be
            // closed. If it works, the socket either be added to the
            // poller (or equivalent) to await more data or processed
            // if there are any pipe-lined requests remaining.
        } else if (state == SocketState.UPGRADED) {
            // Don't add sockets back to the poller if this was a
            // non-blocking write otherwise the poller may trigger
            // multiple read events which may lead to thread starvation
            // in the connector. The write() method will add this socket
            // to the poller if necessary.
            if (status != SocketEvent.OPEN_WRITE) {
                longPoll(wrapper, processor);
            }
        } else if (state == SocketState.SUSPENDED) {
            // Don't add sockets back to the poller.
            // The resumeProcessing() method will add this socket
            // to the poller.
        } else {
            // Connection closed. OK to recycle the processor. Upgrade
            // processors are not recycled.
            connections.remove(socket);
            if (processor.isUpgrade()) {
                UpgradeToken upgradeToken = processor.getUpgradeToken();
                HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler();
                InstanceManager instanceManager = upgradeToken.getInstanceManager();
                if (instanceManager == null) {
                    httpUpgradeHandler.destroy();
                } else {
                    ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null);
                    try {
                        httpUpgradeHandler.destroy();
                    } finally {
                        try {
                            instanceManager.destroyInstance(httpUpgradeHandler);
                        } catch (Throwable e) {
                            ExceptionUtils.handleThrowable(e);
                            getLog().error(sm.getString("abstractConnectionHandler.error"), e);
                        }
                        upgradeToken.getContextBind().unbind(false, oldCL);
                    }
                }
            } else {
                release(processor);
            }
        }
        return state;
    } catch(java.net.SocketException e) {
        // SocketExceptions are normal
        getLog().debug(sm.getString(
                "abstractConnectionHandler.socketexception.debug"), e);
    } catch (java.io.IOException e) {
        // IOExceptions are normal
        getLog().debug(sm.getString(
                "abstractConnectionHandler.ioexception.debug"), e);
    } catch (ProtocolException e) {
        // Protocol exceptions normally mean the client sent invalid or
        // incomplete data.
        getLog().debug(sm.getString(
                "abstractConnectionHandler.protocolexception.debug"), e);
    }
    // Future developers: if you discover any other
    // rare-but-nonfatal exceptions, catch them here, and log as
    // above.
    catch (Throwable e) {
        ExceptionUtils.handleThrowable(e);
        // any other exception or error is odd. Here we log it
        // with "ERROR" level, so it will show up even on
        // less-than-verbose logs.
        getLog().error(sm.getString("abstractConnectionHandler.error"), e);
    } finally {
        ContainerThreadMarker.clear();
    }

    // Make sure socket/processor is removed from the list of current
    // connections
    connections.remove(socket);
    release(processor);
    return SocketState.CLOSED;
}

ConnectionHandler#process 方法比較長,一點點分析緩存

private final Map<S,Processor> connections = new ConcurrentHashMap<>();

S socket = wrapper.getSocket();
Processor processor = connections.get(socket);

首先從 connections 這個 Map 裏取出一個 Processor 對象,若是取出的 Processor 不爲null,就接下來調用app

// Make sure an async timeout doesn't fire
getProtocol().removeWaitingProcessor(processor);

若是是第一次鏈接的話,connections.get(socket) 獲取的就會爲 null。
getProtocol() 這個返回的就是構造 ConnectionHandler 時傳入的 ProtocolHandler 對象,也就是 Http11NioProtocol 對象。less

public void removeWaitingProcessor(Processor processor) {
    waitingProcessors.remove(processor);
}

removeWaitingProcessor 在 Http11NioProtocol 的父類的父類 AbstractProtocol 裏,就是將取出的 processor 對象從 waitingProcessors 裏移出。socket

接下來就是 try 語句塊裏的三個 if (processor == null) 了,這三個都是爲了確保 processor 不爲 null 的。
第一個 if (processor == null) 的邏輯是若是這個鏈接是一個 HTTPS 鏈接,就先獲取 UpgradeProtocol 對象,獲取到了以後再經過這個對象在獲取一個 Processor 對象。HTTPS 的相關內容在此就不作詳細討論了。
第二個 if (processor == null) 裏就是從 recycledProcessors 緩存池裏獲取一個。recycledProcessors 是 ConnectionHandler 裏的屬性,它的聲明爲async

private final RecycledProcessors recycledProcessors = new RecycledProcessors(this);
protected static class RecycledProcessors extends SynchronizedStack<Processor>

第三個 if (processor == null) 裏就是建立一個 Processor 對象。getProtocol() 返回的是 Http11NioProtocol 對象,createProcessor 方法在 Http11NioProtocol 的父類的父類 AbstractHttp11Protocol 裏聲明。ide

@Override
protected Processor createProcessor() {
    Http11Processor processor = new Http11Processor(this, adapter);
    return processor;
}

createProcessor() 方法就是簡單建立一個 Http11Processor 對象。傳入的 this 是指 Http11NioProtocol 對象,而 adapter 是指 CoyoteAdapter 對象,這個 adapter 屬性是在 Connector 的 initInternal 方法裏建立完 CoyoteAdapter 對象後,調用 protocolHandler.setAdapter(adapter) 賦值的。ui

Http11Processor 是處理請求過程當中的重要一環,後面會講到,這裏就很少作討論。

獲取到 Http11Processor 對象後,先設置了一下 sslSupport 屬性,而後把這個對象放在 connections 裏,而後就用這個對象來處理了。
也就是在 do-while 循環裏。

SocketState state = SocketState.CLOSED;
do {
    state = processor.process(wrapper, status);
    ……
} while ( state == SocketState.UPGRADING);

先調用 Processor#process 方法來處理,把 ConnectionHandler#process 的形參都傳入 Processor#process,而後返回一個 SocketState 對象

/**
 * Different types of socket states to react upon.
 */
public enum SocketState {
    // TODO Add a new state to the AsyncStateMachine and remove
    //      ASYNC_END (if possible)
    OPEN, CLOSED, LONG, ASYNC_END, SENDFILE, UPGRADING, UPGRADED, SUSPENDED
}

SocketState 是 Handler<S> 裏的內部枚舉類,Handler 是 AbstractEndpoint 的內部接口。
而後,根據返回的 SocketState 的不一樣的值,分別做了處理。
首先 if (state == SocketState.LONG) 就執行longPoll(wrapper, processor)。

protected void longPoll(SocketWrapperBase<?> socket, Processor processor) {
    if (!processor.isAsync()) {
        // This is currently only used with HTTP
        // Either:
        //  - this is an upgraded connection
        //  - the request line/headers have not been completely
        //    read
        socket.registerReadInterest();
    }
}

longPoll 也只是調用 socket.registerReadInterest() 方法,而後 socket.registerReadInterest() 在上篇文章裏講過了,這裏就很少贅述了。
getProtocol().addWaitingProcessor(processor) 也只是將這個 processor 加入到上面提到的 waitingProcessors 裏。

if (state == SocketState.OPEN) 裏先將這個 <NioChannel, Processor> 對從 connections 裏移除,並調用 release(processor) 方法釋放資源或者回收 Processor 到 RecycledProcessors 裏。
而後調用 wrapper.registerReadInterest(),跟 longPoll 的 if 語句裏同樣。

if (state == SocketState.SENDFILE) 和 if (state == SocketState.SUSPENDED) 同樣,什麼都不處理。

if (state == SocketState.UPGRADED) 裏判斷 ConnectionHandler#process 方法傳入的參數是否是 SocketEvent.OPEN_WRITE,若是是就調用 longPoll 方法。

最後的 else 語句就是在處理 state == SocketState.CLOSED 的狀況,並作一些清理操做。

最後返回這個 state。


小結本文介紹了 ConnectionHandler#process 方法,其主要邏輯就是找一個 Processor 對象來處理讀寫事件。

相關文章
相關標籤/搜索