前言
上篇文章講到了 ConnectionHandler#process 方法,其中最關鍵的步驟是獲取一個 org.apache.coyote.Processor 對象,而後調用這個對象的 process 方法,傳入的參數就是它本身接收的參數,也就是 和 NioSocketWrapper 對象 和 SocketEvent 對象。tomcat 中 用於處理 http 請求的 Processor 的實現類有 Http11Processor 和 StreamProcessor,這二者的父類都是 AbstractProcessor,而 AbstractProcessor 的父類是 AbstractProcessorLight,AbstractProcessorLight 直接實現了 Processor。StreamProcessor 是用於處理 Http/2 的,本文以 Http11Processor 爲例進行分析。StreamProcessor 和 Http11Processor 在大致處理邏輯上是同樣的。
1. Http11Processor 構造方法apache
public Http11Processor(AbstractHttp11Protocol<?> protocol, Adapter adapter) { super(adapter); this.protocol = protocol; httpParser = new HttpParser(protocol.getRelaxedPathChars(), protocol.getRelaxedQueryChars()); inputBuffer = new Http11InputBuffer(request, protocol.getMaxHttpHeaderSize(), protocol.getRejectIllegalHeaderName(), httpParser); request.setInputBuffer(inputBuffer); outputBuffer = new Http11OutputBuffer(response, protocol.getMaxHttpHeaderSize()); response.setOutputBuffer(outputBuffer); // Create and add the identity filters. inputBuffer.addFilter(new IdentityInputFilter(protocol.getMaxSwallowSize())); outputBuffer.addFilter(new IdentityOutputFilter()); // Create and add the chunked filters. inputBuffer.addFilter(new ChunkedInputFilter(protocol.getMaxTrailerSize(), protocol.getAllowedTrailerHeadersInternal(), protocol.getMaxExtensionSize(), protocol.getMaxSwallowSize())); outputBuffer.addFilter(new ChunkedOutputFilter()); // Create and add the void filters. inputBuffer.addFilter(new VoidInputFilter()); outputBuffer.addFilter(new VoidOutputFilter()); // Create and add buffered input filter inputBuffer.addFilter(new BufferedInputFilter()); // Create and add the chunked filters. //inputBuffer.addFilter(new GzipInputFilter()); outputBuffer.addFilter(new GzipOutputFilter()); pluggableFilterIndex = inputBuffer.getFilters().length; }
public AbstractProcessor(Adapter adapter) { this(adapter, new Request(), new Response()); } protected AbstractProcessor(Adapter adapter, Request coyoteRequest, Response coyoteResponse) { this.adapter = adapter; asyncStateMachine = new AsyncStateMachine(this); request = coyoteRequest; response = coyoteResponse; response.setHook(this); request.setResponse(response); request.setHook(this); userDataHelper = new UserDataHelper(getLog()); }
Http11Processor 的構造方法裏初始化了 request(org.apache.coyote.Request),response(org.apache.coyote.Response), httpParser(HttpParser)、inputBuffer(Http11InputBuffer)、outputBuffer(Http11OutputBuffer),以及一些 InputFilter 和 OutputFilter 等,這些是處理 http 協議必需的。segmentfault
2. AbstractProcessorLight#process
Processor#process 方法在 AbstractProcessorLight 中被實現。tomcat
@Override public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status) throws IOException { SocketState state = SocketState.CLOSED; Iterator<DispatchType> dispatches = null; do { if (dispatches != null) { DispatchType nextDispatch = dispatches.next(); state = dispatch(nextDispatch.getSocketStatus()); } else if (status == SocketEvent.DISCONNECT) { // Do nothing here, just wait for it to get recycled } else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) { state = dispatch(status); if (state == SocketState.OPEN) { // There may be pipe-lined data to read. If the data isn't // processed now, execution will exit this loop and call // release() which will recycle the processor (and input // buffer) deleting any pipe-lined data. To avoid this, // process it now. state = service(socketWrapper); } } else if (status == SocketEvent.OPEN_WRITE) { // Extra write event likely after async, ignore state = SocketState.LONG; } else if (status == SocketEvent.OPEN_READ){ state = service(socketWrapper); } else { // Default to closing the socket if the SocketEvent passed in // is not consistent with the current state of the Processor state = SocketState.CLOSED; } if (getLog().isDebugEnabled()) { getLog().debug("Socket: [" + socketWrapper + "], Status in: [" + status + "], State out: [" + state + "]"); } if (state != SocketState.CLOSED && isAsync()) { state = asyncPostProcess(); if (getLog().isDebugEnabled()) { getLog().debug("Socket: [" + socketWrapper + "], State after async post processing: [" + state + "]"); } } if (dispatches == null || !dispatches.hasNext()) { // Only returns non-null iterator if there are // dispatches to process. dispatches = getIteratorAndClearDispatches(); } } while (state == SocketState.ASYNC_END || dispatches != null && state != SocketState.CLOSED); return state; }
process 方法在一個 do-while 循環裏,根據不一樣的條件,分別處理,其中重要的處理是調用 dispatch 方法或者 service 方法。app
2. AbstractProcessor#dispatchsocket
/** * Process an in-progress request that is not longer in standard HTTP mode. * Uses currently include Servlet 3.0 Async and HTTP upgrade connections. * Further uses may be added in the future. These will typically start as * HTTP requests. * @param status The event to process * @return the socket state */ protected abstract SocketState dispatch(SocketEvent status);
從註釋裏能夠看出,dispatch 方法是處理非標準 HTTP 模式下的正在處理中的請求,這是在 Servlet 3.0 Async 和 HTTP 升級鏈接裏用到的。async
@Override public final SocketState dispatch(SocketEvent status) { if (status == SocketEvent.OPEN_WRITE && response.getWriteListener() != null) { asyncStateMachine.asyncOperation(); try { if (flushBufferedWrite()) { return SocketState.LONG; } } catch (IOException ioe) { if (getLog().isDebugEnabled()) { getLog().debug("Unable to write async data.", ioe); } status = SocketEvent.ERROR; request.setAttribute(RequestDispatcher.ERROR_EXCEPTION, ioe); } } else if (status == SocketEvent.OPEN_READ && request.getReadListener() != null) { dispatchNonBlockingRead(); } else if (status == SocketEvent.ERROR) { // An I/O error occurred on a non-container thread. This includes: // - read/write timeouts fired by the Poller (NIO & APR) // - completion handler failures in NIO2 if (request.getAttribute(RequestDispatcher.ERROR_EXCEPTION) == null) { // Because the error did not occur on a container thread the // request's error attribute has not been set. If an exception // is available from the socketWrapper, use it to set the // request's error attribute here so it is visible to the error // handling. request.setAttribute(RequestDispatcher.ERROR_EXCEPTION, socketWrapper.getError()); } if (request.getReadListener() != null || response.getWriteListener() != null) { // The error occurred during non-blocking I/O. Set the correct // state else the error handling will trigger an ISE. asyncStateMachine.asyncOperation(); } } RequestInfo rp = request.getRequestProcessor(); try { rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE); if (!getAdapter().asyncDispatch(request, response, status)) { setErrorState(ErrorState.CLOSE_NOW, null); } } catch (InterruptedIOException e) { setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); setErrorState(ErrorState.CLOSE_NOW, t); getLog().error(sm.getString("http11processor.request.process"), t); } rp.setStage(org.apache.coyote.Constants.STAGE_ENDED); if (getErrorState().isError()) { request.updateCounters(); return SocketState.CLOSED; } else if (isAsync()) { return SocketState.LONG; } else { request.updateCounters(); return dispatchEndRequest(); } }
在 dispatch 方法裏,首先根據傳入的參數 SocketEvent 的值作不一樣的處理。其中 flushBufferedWrite() 方法就是把 Http11OutputBuffer 裏的數據寫回給客戶端ide
@Override protected boolean flushBufferedWrite() throws IOException { if (outputBuffer.hasDataToWrite()) { if (outputBuffer.flushBuffer(false)) { // The buffer wasn't fully flushed so re-register the // socket for write. Note this does not go via the // Response since the write registration state at // that level should remain unchanged. Once the buffer // has been emptied then the code below will call // Adaptor.asyncDispatch() which will enable the // Response to respond to this event. outputBuffer.registerWriteInterest(); return true; } } return false; }
Http11OutputBuffer#flushBufferoop
protected boolean flushBuffer(boolean block) throws IOException { return socketWrapper.flush(block); }
asyncStateMachine 是 AsyncStateMachine 類型的對象post
synchronized void asyncOperation() { if (state==AsyncState.STARTED) { state = AsyncState.READ_WRITE_OP; } else { throw new IllegalStateException( sm.getString("asyncStateMachine.invalidAsyncState", "asyncOperation()", state)); } }
asyncStateMachine.asyncOperation() 就是把 AsyncStateMachine 裏的 state 屬性從 AsyncState.STARTED 改爲 AsyncState.READ_WRITE_OP。READ_WRITE_OP 狀態下表示這個請求已經準備好讀寫了。this
if-else 語句塊以後,就是 dispatch 方法的關鍵了。
首先調用 request.getRequestProcessor() 獲取一個 RequestInfo 對象。
private final RequestInfo reqProcessorMX=new RequestInfo(this); public RequestInfo getRequestProcessor() { return reqProcessorMX; }
這個 request 是 AbstractProcessor 構造方法裏初始化的 org.apache.coyote.Request 對象。
而後就在 try-catch 語句塊裏調用 getAdapter().asyncDispatch(request, response, status) 方法。
getAdapter() 是 AbstractProcessor 裏的方法,返回的是 Adapter 屬性。
protected final Adapter adapter; /** * Get the associated adapter. * * @return the associated adapter */ public Adapter getAdapter() { return adapter; }
而 AbstractProcessor 裏的 Adapter 類型的屬性是在建立 Http11Processor 對象時賦值的,傳入的 CoyoteAdapter 對象,
這個對象是在 Connector#initInternal 方法裏建立並賦值給 AbstractProtocol 的 adapter 屬性的。
因此 getAdapter().asyncDispatch(...) 調用的是 CoyoteAdapter#asyncDispatch 方法。
CoyoteAdapter 是處理請求的環節中重要的一環,後面的文章中會講到,這裏先略過。
3. Http11Processor#service
/** * Service a 'standard' HTTP request. This method is called for both new * requests and for requests that have partially read the HTTP request line * or HTTP headers. Once the headers have been fully read this method is not * called again until there is a new HTTP request to process. Note that the * request type may change during processing which may result in one or more * calls to {@link #dispatch(SocketEvent)}. Requests may be pipe-lined. * * @param socketWrapper The connection to process * * @return The state the caller should put the socket in when this method * returns * * @throws IOException If an I/O error occurs during the processing of the * request */ protected abstract SocketState service(SocketWrapperBase<?> socketWrapper) throws IOException;
從註釋中能夠看出,與 dispatch 方法相對立,service 方法是用來處理標準的 HTTP 請求的。service 方法的實現 Http11Processor 裏。
@Override public SocketState service(SocketWrapperBase<?> socketWrapper) throws IOException { RequestInfo rp = request.getRequestProcessor(); rp.setStage(org.apache.coyote.Constants.STAGE_PARSE); // Setting up the I/O setSocketWrapper(socketWrapper); inputBuffer.init(socketWrapper); outputBuffer.init(socketWrapper); // Flags keepAlive = true; openSocket = false; readComplete = true; boolean keptAlive = false; SendfileState sendfileState = SendfileState.DONE; while (!getErrorState().isError() && keepAlive && !isAsync() && upgradeToken == null && sendfileState == SendfileState.DONE && !protocol.isPaused()) { // Parsing the request header try { if (!inputBuffer.parseRequestLine(keptAlive, protocol.getConnectionTimeout(), protocol.getKeepAliveTimeout())) { if (inputBuffer.getParsingRequestLinePhase() == -1) { return SocketState.UPGRADING; } else if (handleIncompleteRequestLineRead()) { break; } } if (protocol.isPaused()) { // 503 - Service unavailable response.setStatus(503); setErrorState(ErrorState.CLOSE_CLEAN, null); } else { keptAlive = true; // Set this every time in case limit has been changed via JMX request.getMimeHeaders().setLimit(protocol.getMaxHeaderCount()); if (!inputBuffer.parseHeaders()) { // We've read part of the request, don't recycle it // instead associate it with the socket openSocket = true; readComplete = false; break; } if (!protocol.getDisableUploadTimeout()) { socketWrapper.setReadTimeout(protocol.getConnectionUploadTimeout()); } } } catch (IOException e) { if (log.isDebugEnabled()) { log.debug(sm.getString("http11processor.header.parse"), e); } setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e); break; } catch (Throwable t) { ExceptionUtils.handleThrowable(t); UserDataHelper.Mode logMode = userDataHelper.getNextMode(); if (logMode != null) { String message = sm.getString("http11processor.header.parse"); switch (logMode) { case INFO_THEN_DEBUG: message += sm.getString("http11processor.fallToDebug"); //$FALL-THROUGH$ case INFO: log.info(message, t); break; case DEBUG: log.debug(message, t); } } // 400 - Bad Request response.setStatus(400); setErrorState(ErrorState.CLOSE_CLEAN, t); } // Has an upgrade been requested? Enumeration<String> connectionValues = request.getMimeHeaders().values("Connection"); boolean foundUpgrade = false; while (connectionValues.hasMoreElements() && !foundUpgrade) { foundUpgrade = connectionValues.nextElement().toLowerCase( Locale.ENGLISH).contains("upgrade"); } if (foundUpgrade) { // Check the protocol String requestedProtocol = request.getHeader("Upgrade"); UpgradeProtocol upgradeProtocol = protocol.getUpgradeProtocol(requestedProtocol); if (upgradeProtocol != null) { if (upgradeProtocol.accept(request)) { response.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS); response.setHeader("Connection", "Upgrade"); response.setHeader("Upgrade", requestedProtocol); action(ActionCode.CLOSE, null); getAdapter().log(request, response, 0); InternalHttpUpgradeHandler upgradeHandler = upgradeProtocol.getInternalUpgradeHandler( socketWrapper, getAdapter(), cloneRequest(request)); UpgradeToken upgradeToken = new UpgradeToken(upgradeHandler, null, null); action(ActionCode.UPGRADE, upgradeToken); return SocketState.UPGRADING; } } } if (getErrorState().isIoAllowed()) { // Setting up filters, and parse some request headers rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE); try { prepareRequest(); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); if (log.isDebugEnabled()) { log.debug(sm.getString("http11processor.request.prepare"), t); } // 500 - Internal Server Error response.setStatus(500); setErrorState(ErrorState.CLOSE_CLEAN, t); } } int maxKeepAliveRequests = protocol.getMaxKeepAliveRequests(); if (maxKeepAliveRequests == 1) { keepAlive = false; } else if (maxKeepAliveRequests > 0 && socketWrapper.decrementKeepAlive() <= 0) { keepAlive = false; } // Process the request in the adapter if (getErrorState().isIoAllowed()) { try { rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE); getAdapter().service(request, response); // Handle when the response was committed before a serious // error occurred. Throwing a ServletException should both // set the status to 500 and set the errorException. // If we fail here, then the response is likely already // committed, so we can't try and set headers. if(keepAlive && !getErrorState().isError() && !isAsync() && statusDropsConnection(response.getStatus())) { setErrorState(ErrorState.CLOSE_CLEAN, null); } } catch (InterruptedIOException e) { setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e); } catch (HeadersTooLargeException e) { log.error(sm.getString("http11processor.request.process"), e); // The response should not have been committed but check it // anyway to be safe if (response.isCommitted()) { setErrorState(ErrorState.CLOSE_NOW, e); } else { response.reset(); response.setStatus(500); setErrorState(ErrorState.CLOSE_CLEAN, e); response.setHeader("Connection", "close"); // TODO: Remove } } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error(sm.getString("http11processor.request.process"), t); // 500 - Internal Server Error response.setStatus(500); setErrorState(ErrorState.CLOSE_CLEAN, t); getAdapter().log(request, response, 0); } } // Finish the handling of the request rp.setStage(org.apache.coyote.Constants.STAGE_ENDINPUT); if (!isAsync()) { // If this is an async request then the request ends when it has // been completed. The AsyncContext is responsible for calling // endRequest() in that case. endRequest(); } rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT); // If there was an error, make sure the request is counted as // and error, and update the statistics counter if (getErrorState().isError()) { response.setStatus(500); } if (!isAsync() || getErrorState().isError()) { request.updateCounters(); if (getErrorState().isIoAllowed()) { inputBuffer.nextRequest(); outputBuffer.nextRequest(); } } if (!protocol.getDisableUploadTimeout()) { int connectionTimeout = protocol.getConnectionTimeout(); if(connectionTimeout > 0) { socketWrapper.setReadTimeout(connectionTimeout); } else { socketWrapper.setReadTimeout(0); } } rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE); sendfileState = processSendfile(socketWrapper); } rp.setStage(org.apache.coyote.Constants.STAGE_ENDED); if (getErrorState().isError() || protocol.isPaused()) { return SocketState.CLOSED; } else if (isAsync()) { return SocketState.LONG; } else if (isUpgrade()) { return SocketState.UPGRADING; } else { if (sendfileState == SendfileState.PENDING) { return SocketState.SENDFILE; } else { if (openSocket) { if (readComplete) { return SocketState.OPEN; } else { return SocketState.LONG; } } else { return SocketState.CLOSED; } } } }
service 方法的邏輯有點複雜,先執行一下初步工做,也就是inputBuffer.init(socketWrapper) 、 outputBuffer.init(socketWrapper) 等,而後就進入一個 while 循環。
在 while 循環裏,是在 try-catch 語句塊裏執行
inputBuffer.parseRequestLine(...)
inputBuffer.parseHeaders()
以及一些狀態和屬性的設置。
Http11InputBuffer#parseRequestLine 是用來處理請求行的,Http11InputBuffer#parseHeaders 是用來處理請求頭的。
而後在 在請求頭裏找 Connection 參數,看是否爲爲 upgrade,若是是則進入 HTTP 升級步驟。
而後就執行調用 prepareRequest() 方法來對請求就行初步處理,也就是針對請求頭裏的一些屬性加入一些 InputFilter 到 Http11InputBuffer 裏。好比解析請求頭裏的 host,transfer-encoding,content-length 等。
最後就調用 Adapter 的方法進行處理了,也就是
getAdapter().service(request, response);
getAdapter() 就是上面提到的 CoyoteAdapter。關於 CoyoteAdapter#service 方法,會在後面的文章裏單獨解析,這裏就很少作描述了。
調用完這個方法後就是一些收尾工做了,service 方法比較長,邏輯也比較複雜,本文在此省略了不少不關鍵的地方。
小結本文介紹了 Http11Processor 的 process 方法是怎麼處理請求的,其中分爲 dispatch 和 service 方法來分別對不一樣類型的 HTTP 請求作處理。而在 dispatch 和 service 方法裏,關鍵的地方就是調用 Adapter 的相關方法。