Tomcat源碼解析系列(十六)Http11Processor

前言
上篇文章講到了 ConnectionHandler#process 方法,其中最關鍵的步驟是獲取一個 org.apache.coyote.Processor 對象,而後調用這個對象的 process 方法,傳入的參數就是它本身接收的參數,也就是 和 NioSocketWrapper 對象 和 SocketEvent 對象。tomcat 中 用於處理 http 請求的 Processor 的實現類有 Http11Processor 和 StreamProcessor,這二者的父類都是 AbstractProcessor,而 AbstractProcessor 的父類是 AbstractProcessorLight,AbstractProcessorLight 直接實現了 Processor。StreamProcessor 是用於處理 Http/2 的,本文以 Http11Processor 爲例進行分析。StreamProcessor 和 Http11Processor 在大致處理邏輯上是同樣的。
1. Http11Processor 構造方法apache

public Http11Processor(AbstractHttp11Protocol<?> protocol, Adapter adapter) {
    super(adapter);
    this.protocol = protocol;

    httpParser = new HttpParser(protocol.getRelaxedPathChars(),
            protocol.getRelaxedQueryChars());

    inputBuffer = new Http11InputBuffer(request, protocol.getMaxHttpHeaderSize(),
            protocol.getRejectIllegalHeaderName(), httpParser);
    request.setInputBuffer(inputBuffer);

    outputBuffer = new Http11OutputBuffer(response, protocol.getMaxHttpHeaderSize());
    response.setOutputBuffer(outputBuffer);

    // Create and add the identity filters.
    inputBuffer.addFilter(new IdentityInputFilter(protocol.getMaxSwallowSize()));
    outputBuffer.addFilter(new IdentityOutputFilter());

    // Create and add the chunked filters.
    inputBuffer.addFilter(new ChunkedInputFilter(protocol.getMaxTrailerSize(),
            protocol.getAllowedTrailerHeadersInternal(), protocol.getMaxExtensionSize(),
            protocol.getMaxSwallowSize()));
    outputBuffer.addFilter(new ChunkedOutputFilter());

    // Create and add the void filters.
    inputBuffer.addFilter(new VoidInputFilter());
    outputBuffer.addFilter(new VoidOutputFilter());

    // Create and add buffered input filter
    inputBuffer.addFilter(new BufferedInputFilter());

    // Create and add the chunked filters.
    //inputBuffer.addFilter(new GzipInputFilter());
    outputBuffer.addFilter(new GzipOutputFilter());

    pluggableFilterIndex = inputBuffer.getFilters().length;
}
public AbstractProcessor(Adapter adapter) {
    this(adapter, new Request(), new Response());
}

protected AbstractProcessor(Adapter adapter, Request coyoteRequest, Response coyoteResponse) {
    this.adapter = adapter;
    asyncStateMachine = new AsyncStateMachine(this);
    request = coyoteRequest;
    response = coyoteResponse;
    response.setHook(this);
    request.setResponse(response);
    request.setHook(this);
    userDataHelper = new UserDataHelper(getLog());
}

Http11Processor 的構造方法裏初始化了 request(org.apache.coyote.Request),response(org.apache.coyote.Response), httpParser(HttpParser)、inputBuffer(Http11InputBuffer)、outputBuffer(Http11OutputBuffer),以及一些 InputFilter 和 OutputFilter 等,這些是處理 http 協議必需的。segmentfault

2. AbstractProcessorLight#process
Processor#process 方法在 AbstractProcessorLight 中被實現。tomcat

@Override
public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status)
        throws IOException {

    SocketState state = SocketState.CLOSED;
    Iterator<DispatchType> dispatches = null;
    do {
        if (dispatches != null) {
            DispatchType nextDispatch = dispatches.next();
            state = dispatch(nextDispatch.getSocketStatus());
        } else if (status == SocketEvent.DISCONNECT) {
            // Do nothing here, just wait for it to get recycled
        } else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) {
            state = dispatch(status);
            if (state == SocketState.OPEN) {
                // There may be pipe-lined data to read. If the data isn't
                // processed now, execution will exit this loop and call
                // release() which will recycle the processor (and input
                // buffer) deleting any pipe-lined data. To avoid this,
                // process it now.
                state = service(socketWrapper);
            }
        } else if (status == SocketEvent.OPEN_WRITE) {
            // Extra write event likely after async, ignore
            state = SocketState.LONG;
        } else if (status == SocketEvent.OPEN_READ){
            state = service(socketWrapper);
        } else {
            // Default to closing the socket if the SocketEvent passed in
            // is not consistent with the current state of the Processor
            state = SocketState.CLOSED;
        }

        if (getLog().isDebugEnabled()) {
            getLog().debug("Socket: [" + socketWrapper +
                    "], Status in: [" + status +
                    "], State out: [" + state + "]");
        }

        if (state != SocketState.CLOSED && isAsync()) {
            state = asyncPostProcess();
            if (getLog().isDebugEnabled()) {
                getLog().debug("Socket: [" + socketWrapper +
                        "], State after async post processing: [" + state + "]");
            }
        }

        if (dispatches == null || !dispatches.hasNext()) {
            // Only returns non-null iterator if there are
            // dispatches to process.
            dispatches = getIteratorAndClearDispatches();
        }
    } while (state == SocketState.ASYNC_END ||
            dispatches != null && state != SocketState.CLOSED);

    return state;
}

process 方法在一個 do-while 循環裏,根據不一樣的條件,分別處理,其中重要的處理是調用 dispatch 方法或者 service 方法。app

2. AbstractProcessor#dispatchsocket

/**
 * Process an in-progress request that is not longer in standard HTTP mode.
 * Uses currently include Servlet 3.0 Async and HTTP upgrade connections.
 * Further uses may be added in the future. These will typically start as
 * HTTP requests.
 * @param status The event to process
 * @return the socket state
 */
protected abstract SocketState dispatch(SocketEvent status);

從註釋裏能夠看出,dispatch 方法是處理非標準 HTTP 模式下的正在處理中的請求,這是在 Servlet 3.0 Async 和 HTTP 升級鏈接裏用到的。async

@Override
public final SocketState dispatch(SocketEvent status) {

    if (status == SocketEvent.OPEN_WRITE && response.getWriteListener() != null) {
        asyncStateMachine.asyncOperation();
        try {
            if (flushBufferedWrite()) {
                return SocketState.LONG;
            }
        } catch (IOException ioe) {
            if (getLog().isDebugEnabled()) {
                getLog().debug("Unable to write async data.", ioe);
            }
            status = SocketEvent.ERROR;
            request.setAttribute(RequestDispatcher.ERROR_EXCEPTION, ioe);
        }
    } else if (status == SocketEvent.OPEN_READ && request.getReadListener() != null) {
        dispatchNonBlockingRead();
    } else if (status == SocketEvent.ERROR) {
        // An I/O error occurred on a non-container thread. This includes:
        // - read/write timeouts fired by the Poller (NIO & APR)
        // - completion handler failures in NIO2

        if (request.getAttribute(RequestDispatcher.ERROR_EXCEPTION) == null) {
            // Because the error did not occur on a container thread the
            // request's error attribute has not been set. If an exception
            // is available from the socketWrapper, use it to set the
            // request's error attribute here so it is visible to the error
            // handling.
            request.setAttribute(RequestDispatcher.ERROR_EXCEPTION, socketWrapper.getError());
        }

        if (request.getReadListener() != null || response.getWriteListener() != null) {
            // The error occurred during non-blocking I/O. Set the correct
            // state else the error handling will trigger an ISE.
            asyncStateMachine.asyncOperation();
        }
    }

    RequestInfo rp = request.getRequestProcessor();
    try {
        rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
        if (!getAdapter().asyncDispatch(request, response, status)) {
            setErrorState(ErrorState.CLOSE_NOW, null);
        }
    } catch (InterruptedIOException e) {
        setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e);
    } catch (Throwable t) {
        ExceptionUtils.handleThrowable(t);
        setErrorState(ErrorState.CLOSE_NOW, t);
        getLog().error(sm.getString("http11processor.request.process"), t);
    }

    rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);

    if (getErrorState().isError()) {
        request.updateCounters();
        return SocketState.CLOSED;
    } else if (isAsync()) {
        return SocketState.LONG;
    } else {
        request.updateCounters();
        return dispatchEndRequest();
    }
}

在 dispatch 方法裏,首先根據傳入的參數 SocketEvent 的值作不一樣的處理。其中 flushBufferedWrite() 方法就是把 Http11OutputBuffer 裏的數據寫回給客戶端ide

@Override
protected boolean flushBufferedWrite() throws IOException {
    if (outputBuffer.hasDataToWrite()) {
        if (outputBuffer.flushBuffer(false)) {
            // The buffer wasn't fully flushed so re-register the
            // socket for write. Note this does not go via the
            // Response since the write registration state at
            // that level should remain unchanged. Once the buffer
            // has been emptied then the code below will call
            // Adaptor.asyncDispatch() which will enable the
            // Response to respond to this event.
            outputBuffer.registerWriteInterest();
            return true;
        }
    }
    return false;
}

Http11OutputBuffer#flushBufferoop

protected boolean flushBuffer(boolean block) throws IOException  {
    return socketWrapper.flush(block);
}

asyncStateMachine 是 AsyncStateMachine 類型的對象post

synchronized void asyncOperation() {
    if (state==AsyncState.STARTED) {
        state = AsyncState.READ_WRITE_OP;
    } else {
        throw new IllegalStateException(
                sm.getString("asyncStateMachine.invalidAsyncState",
                        "asyncOperation()", state));
    }
}

asyncStateMachine.asyncOperation() 就是把 AsyncStateMachine 裏的 state 屬性從 AsyncState.STARTED 改爲 AsyncState.READ_WRITE_OP。READ_WRITE_OP 狀態下表示這個請求已經準備好讀寫了。this

if-else 語句塊以後,就是 dispatch 方法的關鍵了。
首先調用 request.getRequestProcessor() 獲取一個 RequestInfo 對象。

private final RequestInfo reqProcessorMX=new RequestInfo(this);

public RequestInfo getRequestProcessor() {
    return reqProcessorMX;
}

這個 request 是 AbstractProcessor 構造方法裏初始化的 org.apache.coyote.Request 對象。
而後就在 try-catch 語句塊裏調用 getAdapter().asyncDispatch(request, response, status) 方法。
getAdapter() 是 AbstractProcessor 裏的方法,返回的是 Adapter 屬性。

protected final Adapter adapter;

/**
 * Get the associated adapter.
 *
 * @return the associated adapter
 */
public Adapter getAdapter() {
    return adapter;
}

而 AbstractProcessor 裏的 Adapter 類型的屬性是在建立 Http11Processor 對象時賦值的,傳入的 CoyoteAdapter 對象,
這個對象是在 Connector#initInternal 方法裏建立並賦值給 AbstractProtocol 的 adapter 屬性的。

因此 getAdapter().asyncDispatch(...) 調用的是 CoyoteAdapter#asyncDispatch 方法。
CoyoteAdapter 是處理請求的環節中重要的一環,後面的文章中會講到,這裏先略過。

3. Http11Processor#service

/**
 * Service a 'standard' HTTP request. This method is called for both new
 * requests and for requests that have partially read the HTTP request line
 * or HTTP headers. Once the headers have been fully read this method is not
 * called again until there is a new HTTP request to process. Note that the
 * request type may change during processing which may result in one or more
 * calls to {@link #dispatch(SocketEvent)}. Requests may be pipe-lined.
 *
 * @param socketWrapper The connection to process
 *
 * @return The state the caller should put the socket in when this method
 *         returns
 *
 * @throws IOException If an I/O error occurs during the processing of the
 *         request
 */
protected abstract SocketState service(SocketWrapperBase<?> socketWrapper) throws IOException;

從註釋中能夠看出,與 dispatch 方法相對立,service 方法是用來處理標準的 HTTP 請求的。service 方法的實現 Http11Processor 裏。

@Override
public SocketState service(SocketWrapperBase<?> socketWrapper)
    throws IOException {
    RequestInfo rp = request.getRequestProcessor();
    rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);

    // Setting up the I/O
    setSocketWrapper(socketWrapper);
    inputBuffer.init(socketWrapper);
    outputBuffer.init(socketWrapper);

    // Flags
    keepAlive = true;
    openSocket = false;
    readComplete = true;
    boolean keptAlive = false;
    SendfileState sendfileState = SendfileState.DONE;

    while (!getErrorState().isError() && keepAlive && !isAsync() && upgradeToken == null &&
            sendfileState == SendfileState.DONE && !protocol.isPaused()) {

        // Parsing the request header
        try {
            if (!inputBuffer.parseRequestLine(keptAlive, protocol.getConnectionTimeout(),
                    protocol.getKeepAliveTimeout())) {
                if (inputBuffer.getParsingRequestLinePhase() == -1) {
                    return SocketState.UPGRADING;
                } else if (handleIncompleteRequestLineRead()) {
                    break;
                }
            }

            if (protocol.isPaused()) {
                // 503 - Service unavailable
                response.setStatus(503);
                setErrorState(ErrorState.CLOSE_CLEAN, null);
            } else {
                keptAlive = true;
                // Set this every time in case limit has been changed via JMX
                request.getMimeHeaders().setLimit(protocol.getMaxHeaderCount());
                if (!inputBuffer.parseHeaders()) {
                    // We've read part of the request, don't recycle it
                    // instead associate it with the socket
                    openSocket = true;
                    readComplete = false;
                    break;
                }
                if (!protocol.getDisableUploadTimeout()) {
                    socketWrapper.setReadTimeout(protocol.getConnectionUploadTimeout());
                }
            }
        } catch (IOException e) {
            if (log.isDebugEnabled()) {
                log.debug(sm.getString("http11processor.header.parse"), e);
            }
            setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e);
            break;
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            UserDataHelper.Mode logMode = userDataHelper.getNextMode();
            if (logMode != null) {
                String message = sm.getString("http11processor.header.parse");
                switch (logMode) {
                    case INFO_THEN_DEBUG:
                        message += sm.getString("http11processor.fallToDebug");
                        //$FALL-THROUGH$
                    case INFO:
                        log.info(message, t);
                        break;
                    case DEBUG:
                        log.debug(message, t);
                }
            }
            // 400 - Bad Request
            response.setStatus(400);
            setErrorState(ErrorState.CLOSE_CLEAN, t);
        }

        // Has an upgrade been requested?
        Enumeration<String> connectionValues = request.getMimeHeaders().values("Connection");
        boolean foundUpgrade = false;
        while (connectionValues.hasMoreElements() && !foundUpgrade) {
            foundUpgrade = connectionValues.nextElement().toLowerCase(
                    Locale.ENGLISH).contains("upgrade");
        }

        if (foundUpgrade) {
            // Check the protocol
            String requestedProtocol = request.getHeader("Upgrade");

            UpgradeProtocol upgradeProtocol = protocol.getUpgradeProtocol(requestedProtocol);
            if (upgradeProtocol != null) {
                if (upgradeProtocol.accept(request)) {
                    response.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS);
                    response.setHeader("Connection", "Upgrade");
                    response.setHeader("Upgrade", requestedProtocol);
                    action(ActionCode.CLOSE,  null);
                    getAdapter().log(request, response, 0);

                    InternalHttpUpgradeHandler upgradeHandler =
                            upgradeProtocol.getInternalUpgradeHandler(
                                    socketWrapper, getAdapter(), cloneRequest(request));
                    UpgradeToken upgradeToken = new UpgradeToken(upgradeHandler, null, null);
                    action(ActionCode.UPGRADE, upgradeToken);
                    return SocketState.UPGRADING;
                }
            }
        }

        if (getErrorState().isIoAllowed()) {
            // Setting up filters, and parse some request headers
            rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE);
            try {
                prepareRequest();
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                if (log.isDebugEnabled()) {
                    log.debug(sm.getString("http11processor.request.prepare"), t);
                }
                // 500 - Internal Server Error
                response.setStatus(500);
                setErrorState(ErrorState.CLOSE_CLEAN, t);
            }
        }

        int maxKeepAliveRequests = protocol.getMaxKeepAliveRequests();
        if (maxKeepAliveRequests == 1) {
            keepAlive = false;
        } else if (maxKeepAliveRequests > 0 &&
                socketWrapper.decrementKeepAlive() <= 0) {
            keepAlive = false;
        }

        // Process the request in the adapter
        if (getErrorState().isIoAllowed()) {
            try {
                rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
                getAdapter().service(request, response);
                // Handle when the response was committed before a serious
                // error occurred.  Throwing a ServletException should both
                // set the status to 500 and set the errorException.
                // If we fail here, then the response is likely already
                // committed, so we can't try and set headers.
                if(keepAlive && !getErrorState().isError() && !isAsync() &&
                        statusDropsConnection(response.getStatus())) {
                    setErrorState(ErrorState.CLOSE_CLEAN, null);
                }
            } catch (InterruptedIOException e) {
                setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e);
            } catch (HeadersTooLargeException e) {
                log.error(sm.getString("http11processor.request.process"), e);
                // The response should not have been committed but check it
                // anyway to be safe
                if (response.isCommitted()) {
                    setErrorState(ErrorState.CLOSE_NOW, e);
                } else {
                    response.reset();
                    response.setStatus(500);
                    setErrorState(ErrorState.CLOSE_CLEAN, e);
                    response.setHeader("Connection", "close"); // TODO: Remove
                }
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                log.error(sm.getString("http11processor.request.process"), t);
                // 500 - Internal Server Error
                response.setStatus(500);
                setErrorState(ErrorState.CLOSE_CLEAN, t);
                getAdapter().log(request, response, 0);
            }
        }

        // Finish the handling of the request
        rp.setStage(org.apache.coyote.Constants.STAGE_ENDINPUT);
        if (!isAsync()) {
            // If this is an async request then the request ends when it has
            // been completed. The AsyncContext is responsible for calling
            // endRequest() in that case.
            endRequest();
        }
        rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT);

        // If there was an error, make sure the request is counted as
        // and error, and update the statistics counter
        if (getErrorState().isError()) {
            response.setStatus(500);
        }

        if (!isAsync() || getErrorState().isError()) {
            request.updateCounters();
            if (getErrorState().isIoAllowed()) {
                inputBuffer.nextRequest();
                outputBuffer.nextRequest();
            }
        }

        if (!protocol.getDisableUploadTimeout()) {
            int connectionTimeout = protocol.getConnectionTimeout();
            if(connectionTimeout > 0) {
                socketWrapper.setReadTimeout(connectionTimeout);
            } else {
                socketWrapper.setReadTimeout(0);
            }
        }

        rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);

        sendfileState = processSendfile(socketWrapper);
    }

    rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);

    if (getErrorState().isError() || protocol.isPaused()) {
        return SocketState.CLOSED;
    } else if (isAsync()) {
        return SocketState.LONG;
    } else if (isUpgrade()) {
        return SocketState.UPGRADING;
    } else {
        if (sendfileState == SendfileState.PENDING) {
            return SocketState.SENDFILE;
        } else {
            if (openSocket) {
                if (readComplete) {
                    return SocketState.OPEN;
                } else {
                    return SocketState.LONG;
                }
            } else {
                return SocketState.CLOSED;
            }
        }
    }
}

service 方法的邏輯有點複雜,先執行一下初步工做,也就是inputBuffer.init(socketWrapper) 、 outputBuffer.init(socketWrapper) 等,而後就進入一個 while 循環。
在 while 循環裏,是在 try-catch 語句塊裏執行
inputBuffer.parseRequestLine(...)
inputBuffer.parseHeaders()
以及一些狀態和屬性的設置。
Http11InputBuffer#parseRequestLine 是用來處理請求行的,Http11InputBuffer#parseHeaders 是用來處理請求頭的。

而後在 在請求頭裏找 Connection 參數,看是否爲爲 upgrade,若是是則進入 HTTP 升級步驟。

而後就執行調用 prepareRequest() 方法來對請求就行初步處理,也就是針對請求頭裏的一些屬性加入一些 InputFilter 到 Http11InputBuffer 裏。好比解析請求頭裏的 host,transfer-encoding,content-length 等。

最後就調用 Adapter 的方法進行處理了,也就是

getAdapter().service(request, response);

getAdapter() 就是上面提到的 CoyoteAdapter。關於 CoyoteAdapter#service 方法,會在後面的文章裏單獨解析,這裏就很少作描述了。
調用完這個方法後就是一些收尾工做了,service 方法比較長,邏輯也比較複雜,本文在此省略了不少不關鍵的地方。


小結本文介紹了 Http11Processor 的 process 方法是怎麼處理請求的,其中分爲 dispatch 和 service 方法來分別對不一樣類型的 HTTP 請求作處理。而在 dispatch 和 service 方法裏,關鍵的地方就是調用 Adapter 的相關方法。

相關文章
相關標籤/搜索