上篇文章中分析了 Poller 和 PollerEvent,Poller#processSocket 方法裏獲取了一個 SocketProcessorBase 來處理SocketChannel 的讀寫事件,在 SocketProcessorBase 的子類 SocketProcessor#doRun 方法裏經過 getHandler().process(socketWrapper, event) 來處理,這個 getHandler() 獲取的就是 ConnectionHandler 對象。java
1. ConnectionHandler#process
ConnectionHandler 是 AbstractProtocol 裏的內部類,其聲明爲react
protected static class ConnectionHandler<S> implements AbstractEndpoint.Handler<S>
下面看它的 process 方法segmentfault
@Override public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) { if (getLog().isDebugEnabled()) { getLog().debug(sm.getString("abstractConnectionHandler.process", wrapper.getSocket(), status)); } if (wrapper == null) { // Nothing to do. Socket has been closed. return SocketState.CLOSED; } S socket = wrapper.getSocket(); Processor processor = connections.get(socket); if (getLog().isDebugEnabled()) { getLog().debug(sm.getString("abstractConnectionHandler.connectionsGet", processor, socket)); } // Async timeouts are calculated on a dedicated thread and then // dispatched. Because of delays in the dispatch process, the // timeout may no longer be required. Check here and avoid // unnecessary processing. if (SocketEvent.TIMEOUT == status && (processor == null || !processor.isAsync() || !processor.checkAsyncTimeoutGeneration())) { // This is effectively a NO-OP return SocketState.OPEN; } if (processor != null) { // Make sure an async timeout doesn't fire getProtocol().removeWaitingProcessor(processor); } else if (status == SocketEvent.DISCONNECT || status == SocketEvent.ERROR) { // Nothing to do. Endpoint requested a close and there is no // longer a processor associated with this socket. return SocketState.CLOSED; } ContainerThreadMarker.set(); try { if (processor == null) { String negotiatedProtocol = wrapper.getNegotiatedProtocol(); if (negotiatedProtocol != null) { UpgradeProtocol upgradeProtocol = getProtocol().getNegotiatedProtocol(negotiatedProtocol); if (upgradeProtocol != null) { processor = upgradeProtocol.getProcessor( wrapper, getProtocol().getAdapter()); } else if (negotiatedProtocol.equals("http/1.1")) { // Explicitly negotiated the default protocol. // Obtain a processor below. } else { // TODO: // OpenSSL 1.0.2's ALPN callback doesn't support // failing the handshake with an error if no // protocol can be negotiated. Therefore, we need to // fail the connection here. Once this is fixed, // replace the code below with the commented out // block. if (getLog().isDebugEnabled()) { getLog().debug(sm.getString( "abstractConnectionHandler.negotiatedProcessor.fail", negotiatedProtocol)); } return SocketState.CLOSED; /* * To replace the code above once OpenSSL 1.1.0 is * used. // Failed to create processor. This is a bug. throw new IllegalStateException(sm.getString( "abstractConnectionHandler.negotiatedProcessor.fail", negotiatedProtocol)); */ } } } if (processor == null) { processor = recycledProcessors.pop(); if (getLog().isDebugEnabled()) { getLog().debug(sm.getString("abstractConnectionHandler.processorPop", processor)); } } if (processor == null) { processor = getProtocol().createProcessor(); register(processor); } processor.setSslSupport( wrapper.getSslSupport(getProtocol().getClientCertProvider())); // Associate the processor with the connection connections.put(socket, processor); SocketState state = SocketState.CLOSED; do { state = processor.process(wrapper, status); if (state == SocketState.UPGRADING) { // Get the HTTP upgrade handler UpgradeToken upgradeToken = processor.getUpgradeToken(); // Retrieve leftover input ByteBuffer leftOverInput = processor.getLeftoverInput(); if (upgradeToken == null) { // Assume direct HTTP/2 connection UpgradeProtocol upgradeProtocol = getProtocol().getUpgradeProtocol("h2c"); if (upgradeProtocol != null) { processor = upgradeProtocol.getProcessor( wrapper, getProtocol().getAdapter()); wrapper.unRead(leftOverInput); // Associate with the processor with the connection connections.put(socket, processor); } else { if (getLog().isDebugEnabled()) { getLog().debug(sm.getString( "abstractConnectionHandler.negotiatedProcessor.fail", "h2c")); } return SocketState.CLOSED; } } else { HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler(); // Release the Http11 processor to be re-used release(processor); // Create the upgrade processor processor = getProtocol().createUpgradeProcessor(wrapper, upgradeToken); if (getLog().isDebugEnabled()) { getLog().debug(sm.getString("abstractConnectionHandler.upgradeCreate", processor, wrapper)); } wrapper.unRead(leftOverInput); // Mark the connection as upgraded wrapper.setUpgraded(true); // Associate with the processor with the connection connections.put(socket, processor); // Initialise the upgrade handler (which may trigger // some IO using the new protocol which is why the lines // above are necessary) // This cast should be safe. If it fails the error // handling for the surrounding try/catch will deal with // it. if (upgradeToken.getInstanceManager() == null) { httpUpgradeHandler.init((WebConnection) processor); } else { ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null); try { httpUpgradeHandler.init((WebConnection) processor); } finally { upgradeToken.getContextBind().unbind(false, oldCL); } } } } } while ( state == SocketState.UPGRADING); if (state == SocketState.LONG) { // In the middle of processing a request/response. Keep the // socket associated with the processor. Exact requirements // depend on type of long poll longPoll(wrapper, processor); if (processor.isAsync()) { getProtocol().addWaitingProcessor(processor); } } else if (state == SocketState.OPEN) { // In keep-alive but between requests. OK to recycle // processor. Continue to poll for the next request. connections.remove(socket); release(processor); wrapper.registerReadInterest(); } else if (state == SocketState.SENDFILE) { // Sendfile in progress. If it fails, the socket will be // closed. If it works, the socket either be added to the // poller (or equivalent) to await more data or processed // if there are any pipe-lined requests remaining. } else if (state == SocketState.UPGRADED) { // Don't add sockets back to the poller if this was a // non-blocking write otherwise the poller may trigger // multiple read events which may lead to thread starvation // in the connector. The write() method will add this socket // to the poller if necessary. if (status != SocketEvent.OPEN_WRITE) { longPoll(wrapper, processor); } } else if (state == SocketState.SUSPENDED) { // Don't add sockets back to the poller. // The resumeProcessing() method will add this socket // to the poller. } else { // Connection closed. OK to recycle the processor. Upgrade // processors are not recycled. connections.remove(socket); if (processor.isUpgrade()) { UpgradeToken upgradeToken = processor.getUpgradeToken(); HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler(); InstanceManager instanceManager = upgradeToken.getInstanceManager(); if (instanceManager == null) { httpUpgradeHandler.destroy(); } else { ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null); try { httpUpgradeHandler.destroy(); } finally { try { instanceManager.destroyInstance(httpUpgradeHandler); } catch (Throwable e) { ExceptionUtils.handleThrowable(e); getLog().error(sm.getString("abstractConnectionHandler.error"), e); } upgradeToken.getContextBind().unbind(false, oldCL); } } } else { release(processor); } } return state; } catch(java.net.SocketException e) { // SocketExceptions are normal getLog().debug(sm.getString( "abstractConnectionHandler.socketexception.debug"), e); } catch (java.io.IOException e) { // IOExceptions are normal getLog().debug(sm.getString( "abstractConnectionHandler.ioexception.debug"), e); } catch (ProtocolException e) { // Protocol exceptions normally mean the client sent invalid or // incomplete data. getLog().debug(sm.getString( "abstractConnectionHandler.protocolexception.debug"), e); } // Future developers: if you discover any other // rare-but-nonfatal exceptions, catch them here, and log as // above. catch (Throwable e) { ExceptionUtils.handleThrowable(e); // any other exception or error is odd. Here we log it // with "ERROR" level, so it will show up even on // less-than-verbose logs. getLog().error(sm.getString("abstractConnectionHandler.error"), e); } finally { ContainerThreadMarker.clear(); } // Make sure socket/processor is removed from the list of current // connections connections.remove(socket); release(processor); return SocketState.CLOSED; }
ConnectionHandler#process 方法比較長,一點點分析緩存
private final Map<S,Processor> connections = new ConcurrentHashMap<>(); S socket = wrapper.getSocket(); Processor processor = connections.get(socket);
首先從 connections 這個 Map 裏取出一個 Processor 對象,若是取出的 Processor 不爲null,就接下來調用app
// Make sure an async timeout doesn't fire getProtocol().removeWaitingProcessor(processor);
若是是第一次鏈接的話,connections.get(socket) 獲取的就會爲 null。
getProtocol() 這個返回的就是構造 ConnectionHandler 時傳入的 ProtocolHandler 對象,也就是 Http11NioProtocol 對象。less
public void removeWaitingProcessor(Processor processor) { waitingProcessors.remove(processor); }
removeWaitingProcessor 在 Http11NioProtocol 的父類的父類 AbstractProtocol 裏,就是將取出的 processor 對象從 waitingProcessors 裏移出。socket
接下來就是 try 語句塊裏的三個 if (processor == null) 了,這三個都是爲了確保 processor 不爲 null 的。
第一個 if (processor == null) 的邏輯是若是這個鏈接是一個 HTTPS 鏈接,就先獲取 UpgradeProtocol 對象,獲取到了以後再經過這個對象在獲取一個 Processor 對象。HTTPS 的相關內容在此就不作詳細討論了。
第二個 if (processor == null) 裏就是從 recycledProcessors 緩存池裏獲取一個。recycledProcessors 是 ConnectionHandler 裏的屬性,它的聲明爲async
private final RecycledProcessors recycledProcessors = new RecycledProcessors(this);
protected static class RecycledProcessors extends SynchronizedStack<Processor>
第三個 if (processor == null) 裏就是建立一個 Processor 對象。getProtocol() 返回的是 Http11NioProtocol 對象,createProcessor 方法在 Http11NioProtocol 的父類的父類 AbstractHttp11Protocol 裏聲明。ide
@Override protected Processor createProcessor() { Http11Processor processor = new Http11Processor(this, adapter); return processor; }
createProcessor() 方法就是簡單建立一個 Http11Processor 對象。傳入的 this 是指 Http11NioProtocol 對象,而 adapter 是指 CoyoteAdapter 對象,這個 adapter 屬性是在 Connector 的 initInternal 方法裏建立完 CoyoteAdapter 對象後,調用 protocolHandler.setAdapter(adapter) 賦值的。ui
Http11Processor 是處理請求過程當中的重要一環,後面會講到,這裏就很少作討論。
獲取到 Http11Processor 對象後,先設置了一下 sslSupport 屬性,而後把這個對象放在 connections 裏,而後就用這個對象來處理了。
也就是在 do-while 循環裏。
SocketState state = SocketState.CLOSED; do { state = processor.process(wrapper, status); …… } while ( state == SocketState.UPGRADING);
先調用 Processor#process 方法來處理,把 ConnectionHandler#process 的形參都傳入 Processor#process,而後返回一個 SocketState 對象
/** * Different types of socket states to react upon. */ public enum SocketState { // TODO Add a new state to the AsyncStateMachine and remove // ASYNC_END (if possible) OPEN, CLOSED, LONG, ASYNC_END, SENDFILE, UPGRADING, UPGRADED, SUSPENDED }
SocketState 是 Handler<S> 裏的內部枚舉類,Handler 是 AbstractEndpoint 的內部接口。
而後,根據返回的 SocketState 的不一樣的值,分別做了處理。
首先 if (state == SocketState.LONG) 就執行longPoll(wrapper, processor)。
protected void longPoll(SocketWrapperBase<?> socket, Processor processor) { if (!processor.isAsync()) { // This is currently only used with HTTP // Either: // - this is an upgraded connection // - the request line/headers have not been completely // read socket.registerReadInterest(); } }
longPoll 也只是調用 socket.registerReadInterest() 方法,而後 socket.registerReadInterest() 在上篇文章裏講過了,這裏就很少贅述了。
getProtocol().addWaitingProcessor(processor) 也只是將這個 processor 加入到上面提到的 waitingProcessors 裏。
if (state == SocketState.OPEN) 裏先將這個 <NioChannel, Processor> 對從 connections 裏移除,並調用 release(processor) 方法釋放資源或者回收 Processor 到 RecycledProcessors 裏。
而後調用 wrapper.registerReadInterest(),跟 longPoll 的 if 語句裏同樣。
if (state == SocketState.SENDFILE) 和 if (state == SocketState.SUSPENDED) 同樣,什麼都不處理。
if (state == SocketState.UPGRADED) 裏判斷 ConnectionHandler#process 方法傳入的參數是否是 SocketEvent.OPEN_WRITE,若是是就調用 longPoll 方法。
最後的 else 語句就是在處理 state == SocketState.CLOSED 的狀況,並作一些清理操做。
最後返回這個 state。
小結本文介紹了 ConnectionHandler#process 方法,其主要邏輯就是找一個 Processor 對象來處理讀寫事件。