上一篇博客大概總結了tomcat 的組件以及其組織方式,對於tomcat 的啓動過程也進行進行了簡單的總結,下面這篇博客,繼續研究tomcat 處理請求的相關組件,其實就是主要研究Connectors 以及Container 的工做過程,以此加深對tomcat 工做過程的理解,不足之處,請各路大神指正哈!java
下面是這篇博客的主要內容:web
一、Connectors的基本組件以及做用apache
二、Connectors 的工做機制windows
三、Container 的基本組件以及做用tomcat
四、tomcat 的管道以及以及責任鏈模型介紹服務器
五、最後的感悟併發
1、Connectors 的初步認識app
在上一篇博客中,我有說到,Connectors 是負責專門處理外部請求,並請求封裝爲對應的request 對象,而後給Container 容器進行處理的,能夠理解爲它利用java 實現 了web中的http 協議。框架
一、Connects的基本組件異步
下面先粗略介紹下Connectors 下的組件各個組件的做用以及他們之間的關係。首先看下下面這張我的畫的醜圖,這張圖大致說明了Connector 處理請求的過程:
在Connectors 處理請求的過程,其主要利用了一個叫作protocolhandler 的組件,不一樣的鏈接類型有不一樣的protocolHandler (例如,普通socket 就是Http11Protocal,NioSocket 就是Http11NioProtocal),該組件處理過程又主要涉及到如下組件:endpoint 、process以及adaptor 三個組件,他們工做的過程大概以下:當客戶端發起請求的時候,endpoint 經過底層的Socket 機制進行端口監聽,它負責監聽客戶端的請求,處理對應請求的socket 對象,並把Socekt 對象傳給processor 對象;當processor 對象接收到socket 對象的時候,會利用把請求封裝爲request 對象(HttpRequest),當封裝好request 對象以後,processor 吧request 對象傳給一個適配器,該適配器負責鏈接Container 和adaptor 對象,最後,Container 獲得的是一個已經封裝好的request 對象。
總的來講,能夠這樣理解,endpoint 利用socket 處理了tcp 層面的協議,而processor 則在java 層面處理了http 協議,最後,adaptor 將Connectors 和 Container 鏈接起來,實現請求轉發。下面,就針對protocolhandler 的這三個組件,看看,protocolhandler 究竟是如何進行請求處理的。
二、Connectors 中處理tcp 協議的組件--endpoint
其實在endpoint 內部,它又是主要靠一下組件進行請求處理的,具體的話能夠參考下面這張圖:(不過在正式總結以前,讀者注意了,其實若是你純粹的看我這篇博客來了解endpoint 工做過程的話,會感受好頭大的,推薦本身下一份源碼,本身點開源碼看看才能正真理解的,由於不一樣組件之間都相互調用,方法之間調用的關係也是挺複雜的(反正我一開始是頭都大了的),對着源碼才更好理解,廢話很少說,上圖)
首先,咱們看Acceptor,從名字其實咱們能夠看出來,它是一個接受器,就是專門負責接收請求,而後開啓Socket ,其實,Acceptor是 AbstractEndpoint的內部類,具體的實現又是由子類NioEndpoint 實現的,下面看看AbstractEndpoint的源碼和NioEndpoint 源碼,看卡Acceptor是如何實現請求監聽的:
這個是AbstractEndpoint的內部類,這個類其實不難理解:它是一個繼承了Runnable 接口的抽象類,可是並無實現run方法,另外定義了一些比較常規的方法例如獲取運行狀態等,具體不說
public abstract static class Acceptor implements Runnable { public enum AcceptorState { NEW, RUNNING, PAUSED, ENDED } protected volatile AcceptorState state = AcceptorState.NEW; public final AcceptorState getState() { return state; } private String threadName; protected final void setThreadName(final String threadName) { this.threadName = threadName; } protected final String getThreadName() { return threadName; } }
下面,主要看看AbstractEndpoint的子類是怎樣工做的,下面簡單粘上AbstractEndpoint的內部類Acceptor源碼,它繼承了AbstractEndpoint的Acceptor這個內部類:
protected class Acceptor extends AbstractEndpoint.Acceptor { @Override public void run() { int errorDelay = 0; // Loop until we receive a shutdown command while (running) { // Loop if endpoint is paused while (paused && running) { state = AcceptorState.PAUSED; try { Thread.sleep(50); } catch (InterruptedException e) { // Ignore } } if (!running) { break; } state = AcceptorState.RUNNING; try { //if we have reached max connections, wait countUpOrAwaitConnection(); SocketChannel socket = null; try { // Accept the next incoming connection from the server // socket socket = serverSock.accept(); } catch (IOException ioe) { //we didn't get a socket countDownConnection(); // Introduce delay if necessary errorDelay = handleExceptionWithDelay(errorDelay); // re-throw throw ioe; } // Successful accept, reset the error delay errorDelay = 0; // setSocketOptions() will add channel to the poller // if successful if (running && !paused) { if (!setSocketOptions(socket)) { countDownConnection(); closeSocket(socket); } } else { countDownConnection(); closeSocket(socket); } } catch (SocketTimeoutException sx) { // Ignore: Normal condition } catch (IOException x) { if (running) { log.error(sm.getString("endpoint.accept.fail"), x); } } catch (OutOfMemoryError oom) { try { oomParachuteData = null; releaseCaches(); log.error("", oom); }catch ( Throwable oomt ) { try { try { System.err.println(oomParachuteMsg); oomt.printStackTrace(); }catch (Throwable letsHopeWeDontGetHere){ ExceptionUtils.handleThrowable(letsHopeWeDontGetHere); } }catch (Throwable letsHopeWeDontGetHere){ ExceptionUtils.handleThrowable(letsHopeWeDontGetHere); } } } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error(sm.getString("endpoint.accept.fail"), t); } } state = AcceptorState.ENDED; } }
其實這個類,主要就是實現了Runnable 的run方法而已,其實它的核心代碼主要就是下面這兩句(其餘例如判斷運行狀態是不是在運行以及關閉Socket 等管理工做等等代碼略過了,這部分就請讀者本身看看源碼啦):
socket = serverSock.accept(); // setSocketOptions() will add channel to the poller setSocketOptions(socket);
上面兩句代碼:首先監聽端口,獲取socket ,而後調用setSocketOptions把socket 對應的channel 傳遞給poller,這樣,就完成了請求的監聽,超簡單有木有。OK,如今請求到了poller了,下面再看看源碼看看poller 又是怎麼工做的;
Poller 也實現了Runnable 接口,可是因爲Poller 的方法太多,下面就粘上主要Poller代碼中run方法相關的代碼就行了,先看看Poller 這個線程類啓動以後幹什麼:
public void run() { // Loop until destroy() is called while (true) { try { // Loop if endpoint is paused while (paused && (!close) ) { try { Thread.sleep(100); } catch (InterruptedException e) { // Ignore } } boolean hasEvents = false; // Time to terminate? if (close) { events(); timeout(0, false); try { selector.close(); } catch (IOException ioe) { log.error(sm.getString( "endpoint.nio.selectorCloseFail"), ioe); } break; } else { hasEvents = events(); } try { if ( !close ) { if (wakeupCounter.getAndSet(-1) > 0) { //if we are here, means we have other stuff to do //do a non blocking select keyCount = selector.selectNow(); } else { keyCount = selector.select(selectorTimeout); } wakeupCounter.set(0); } if (close) { events(); timeout(0, false); try { selector.close(); } catch (IOException ioe) { log.error(sm.getString( "endpoint.nio.selectorCloseFail"), ioe); } break; } } catch ( NullPointerException x ) { //sun bug 5076772 on windows JDK 1.5 if ( log.isDebugEnabled() ) log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5",x); if ( wakeupCounter == null || selector == null ) throw x; continue; } catch ( CancelledKeyException x ) { //sun bug 5076772 on windows JDK 1.5 if ( log.isDebugEnabled() ) log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5",x); if ( wakeupCounter == null || selector == null ) throw x; continue; } catch (Throwable x) { ExceptionUtils.handleThrowable(x); log.error("",x); continue; } //either we timed out or we woke up, process events first if ( keyCount == 0 ) hasEvents = (hasEvents | events()); Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; // Walk through the collection of ready keys and dispatch // any active event. while (iterator != null && iterator.hasNext()) { SelectionKey sk = iterator.next(); KeyAttachment attachment = (KeyAttachment)sk.attachment(); // Attachment may be null if another thread has called // cancelledKey() if (attachment == null) { iterator.remove(); } else { attachment.access(); iterator.remove(); processKey(sk, attachment); } }//while //process timeouts timeout(keyCount,hasEvents); if ( oomParachute > 0 && oomParachuteData == null ) checkParachute(); } catch (OutOfMemoryError oom) { try { oomParachuteData = null; releaseCaches(); log.error("", oom); }catch ( Throwable oomt ) { try { System.err.println(oomParachuteMsg); oomt.printStackTrace(); }catch (Throwable letsHopeWeDontGetHere){ ExceptionUtils.handleThrowable(letsHopeWeDontGetHere); } } } }//while stopLatch.countDown(); }
run方法大概就是幹了下面事情:從selector中,選擇一個key,該key表明這一個已經準備好的Channel 的輸入輸出流(據說說叫管道更合適?),這裏的Nio具體工做機制不展開了(汗顏,其實我對這部分也不太瞭解,後面必須研究下),而後,下面就是核心代碼了:
processKey(sk, attachment);
這個方法就是,調用processKey方法把獲取到的請求對應的channel ,傳遞給process 這個方法,而後,查看這個方法的代碼,咱們能夠發現,它會將channel 以及socket 傳遞給下一個對象:SocketProcessor,下面看看這個方法的源碼吧:
protected boolean processKey(SelectionKey sk, KeyAttachment attachment) { boolean result = true; try { if ( close ) { cancelledKey(sk, SocketStatus.STOP, attachment.comet); } else if ( sk.isValid() && attachment != null ) { attachment.access();//make sure we don't time out valid sockets sk.attach(attachment);//cant remember why this is here NioChannel channel = attachment.getChannel(); if (sk.isReadable() || sk.isWritable() ) { if ( attachment.getSendfileData() != null ) { processSendfile(sk,attachment, false); } else { if ( isWorkerAvailable() ) { unreg(sk, attachment, sk.readyOps()); boolean closeSocket = false; // Read goes before write if (sk.isReadable()) { if (!processSocket(channel, SocketStatus.OPEN_READ, true)) { closeSocket = true; } } if (!closeSocket && sk.isWritable()) { if (!processSocket(channel, SocketStatus.OPEN_WRITE, true)) { closeSocket = true; } } if (closeSocket) { cancelledKey(sk,SocketStatus.DISCONNECT,false); } } else { result = false; } } } } else { //invalid key cancelledKey(sk, SocketStatus.ERROR,false); } } catch ( CancelledKeyException ckx ) { cancelledKey(sk, SocketStatus.ERROR,false); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error("",t); } return result; }
好吧,其實這個這麼長的代碼,咱們只要看下面這行代碼就好了(固然,中間不少編碼技巧的確值得咱們仔細琢磨),下面代碼主要是調用一個processSocket的方法:
processSocket(channel, SocketStatus.OPEN_READ, true)
下面咱們看看processSocket又是幹嗎的,這個是方法源碼:
public boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) { try { KeyAttachment attachment = (KeyAttachment)socket.getAttachment(); if (attachment == null) { return false; } attachment.setCometNotify(false); //will get reset upon next reg SocketProcessor sc = processorCache.poll(); if ( sc == null ) sc = new SocketProcessor(socket,status); else sc.reset(socket,status); if ( dispatch && getExecutor()!=null ) getExecutor().execute(sc); else sc.run(); } catch (RejectedExecutionException rx) { log.warn("Socket processing request was rejected for:"+socket,rx); return false; } catch (Throwable t) { ExceptionUtils.handleThrowable(t); // This means we got an OOM or similar creating a thread, or that // the pool and its queue are full log.error(sm.getString("endpoint.process.fail"), t); return false; } return true; }
核心代碼看下面:
attachment.setCometNotify(false); //will get reset upon next reg SocketProcessor sc = processorCache.poll(); if ( sc == null ) sc = new SocketProcessor(socket,status); else sc.reset(socket,status); if ( dispatch && getExecutor()!=null ) getExecutor().execute(sc); else sc.run();
這個方法能夠看到,processSocket方法把Socket 相關的類傳遞給了SocketProcessor ,而後利用併發框架Executor進行管理SocketProcessor ,繼續往下處理請求,此時,請求已經到達了SocketProcessor 這個類。
經歷重重困難,請求終於到了SocketProcessor 這個類了,這個類是也實現了Runnable 接口,下面再研究下這個類的源碼,主要就是研究兩個方法,一個是run方法,一個是doRun方法:
public void run() { SelectionKey key = socket.getIOChannel().keyFor( socket.getPoller().getSelector()); KeyAttachment ka = null; if (key != null) { ka = (KeyAttachment)key.attachment(); } // Upgraded connections need to allow multiple threads to access the // connection at the same time to enable blocking IO to be used when // NIO has been configured if (ka != null && ka.isUpgraded() && SocketStatus.OPEN_WRITE == status) { synchronized (ka.getWriteThreadLock()) { doRun(key, ka); } } else { synchronized (socket) { doRun(key, ka); } } } private void doRun(SelectionKey key, KeyAttachment ka) { try { int handshake = -1; try { if (key != null) { // For STOP there is no point trying to handshake as the // Poller has been stopped. if (socket.isHandshakeComplete() || status == SocketStatus.STOP) { handshake = 0; } else { handshake = socket.handshake( key.isReadable(), key.isWritable()); // The handshake process reads/writes from/to the // socket. status may therefore be OPEN_WRITE once // the handshake completes. However, the handshake // happens when the socket is opened so the status // must always be OPEN_READ after it completes. It // is OK to always set this as it is only used if // the handshake completes. status = SocketStatus.OPEN_READ; } } }catch ( IOException x ) { handshake = -1; if ( log.isDebugEnabled() ) log.debug("Error during SSL handshake",x); }catch ( CancelledKeyException ckx ) { handshake = -1; } if ( handshake == 0 ) { SocketState state = SocketState.OPEN; // Process the request from this socket if (status == null) { state = handler.process(ka, SocketStatus.OPEN_READ); } else { state = handler.process(ka, status); } if (state == SocketState.CLOSED) { // Close socket and pool close(ka, socket, key, SocketStatus.ERROR); } } else if (handshake == -1 ) { close(ka, socket, key, SocketStatus.DISCONNECT); } else { ka.getPoller().add(socket, handshake); } } catch (CancelledKeyException cx) { socket.getPoller().cancelledKey(key, null, false); } catch (OutOfMemoryError oom) { try { oomParachuteData = null; log.error("", oom); if (socket != null) { socket.getPoller().cancelledKey(key,SocketStatus.ERROR, false); } releaseCaches(); }catch ( Throwable oomt ) { try { System.err.println(oomParachuteMsg); oomt.printStackTrace(); }catch (Throwable letsHopeWeDontGetHere){ ExceptionUtils.handleThrowable(letsHopeWeDontGetHere); } } } catch (VirtualMachineError vme) { ExceptionUtils.handleThrowable(vme); }catch ( Throwable t ) { log.error("",t); if (socket != null) { socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false); } } finally { socket = null; status = null; //return to cache if (running && !paused) { processorCache.offer(this); } } }
核心代碼就這句:
state = handler.process(ka, SocketStatus.OPEN_READ);
恩,就是那麼簡單,吧socket 相關的類傳給一個Handler 類進行處理。
總的來講,endpoint 的工做過程分爲這幾個部分:首先是Acceptor進行請求的監聽,當監聽到請求的時候,會將表明該次請求的socket 轉發給poller 進行處理,poller 主要負責Socket中 流(Nio中的通道)的處理,固然,期間還有涉及到不少其餘的額外工做,這裏不詳細展開;最後,SocketProcess會吧poller的處理結果,以隊裏的形式傳遞給Handler(不知道這裏理解是否有誤差,歡迎指正),最終吧處理好的請求發送到Processor進行下一步處理,封裝爲request 對象。
三、Processor 和Adaptor
Processor 對象主要實現了tcp到http層面的數據轉換,它主要是吧socket輸入輸出流封裝爲request、response 對應對象,能夠先看看該接口的源碼:
public interface Processor<S> { Executor getExecutor(); SocketState process(SocketWrapper<S> socketWrapper) throws IOException; SocketState event(SocketStatus status) throws IOException; SocketState asyncDispatch(SocketStatus status); SocketState asyncPostProcess(); /** * @deprecated Will be removed in Tomcat 8.0.x. */ @Deprecated org.apache.coyote.http11.upgrade.UpgradeInbound getUpgradeInbound(); /** * @deprecated Will be removed in Tomcat 8.0.x. */ @Deprecated SocketState upgradeDispatch() throws IOException; HttpUpgradeHandler getHttpUpgradeHandler(); SocketState upgradeDispatch(SocketStatus status) throws IOException; void errorDispatch(); boolean isComet(); boolean isAsync(); boolean isUpgrade(); Request getRequest(); void recycle(boolean socketClosing); void setSslSupport(SSLSupport sslSupport); }
從代碼中咱們能夠知道:這個接口主要定義了一些錯誤處理方法errorDispatch、異步處理的方法等等,固然,其實實現tcp到http 轉換的過程以及封裝request 對象的過程主要是由process這個方法 完成的,不過本人看了下源碼實在太複雜了,在這裏詳細展開的估計要搞很久(囧,原諒我,其實我也看不懂process這個方法內部實現,感受實在有點複雜)
當Processor處理完後,會獲得對應的request 對象,也就是咱們熟悉的HttpRequest對象了,這個時候,request 對象便會由一個適配器傳遞給Container 容器,該容器會進一步處理request 對象(感受到這裏這篇博客寫崩了,原本是想好好看看Processor怎麼處理tcp請求的,無奈看到源碼頭都大了,感受功力不夠就沒敢看了,尷尬)
好吧,下面繼續硬着頭皮,研究下Container 的工做過程。
2、Container 處理請求的過程
一、Container 的組件
Container 是一個接口,其實它的下面有這是個字接口:Engine,Host,Context,Wrapper 。下面網上盜的繼承關係圖:
有點亂是吧,我也以爲。下面簡單解釋下各個接口(類)的意義:Engine,是一個service 下的管理站點的一個引擎,Host 就是主機,是的,我也是才知道,原來一個tomcat 還能夠管理不一樣主機的,固然,這裏的主機是虛擬的,並非一個tomcat 能夠管理不一樣服務器,不一樣host 僅僅表明不一樣站點而已,Engine 和host 的關係大概能夠理解爲:Engine 管理着不一樣的host ,一個service 能夠對應多個host 卻僅僅有一個Engine;而Context 表明的是一個引用程序,狹隘地理解,其實就是一個可運行的web項目,反正一個項目中的web.xml就能夠理解爲對應一個context 對象;最後就是Wrapper了,這個是一個處理Servlet 的類,其實就是至關於在Servlet 包一層東西的類,不一樣Servlet 對應不一樣的Wrapper 類,Wrappper專門用來處理Servlet。Ok,他們之間的關係若是你仍是以爲有點模糊的話,能夠看下圖:
好吧,圖是醜了點,不過估計都能看懂吧。下面詳細說說請求又是怎麼真正從最初的Engine 入口處到最後的Servlet ,這裏須要引出一個概念:tomcat 中的管道。
二、tomcat 中的pipeline - value
相信,用過tomcat 的對過濾器應該不陌生?咱們只要實現Filter 對應的方法,而後再server.xml中配置filter 便可讓filter 起做用,其實這個過程就是利用到了管道。在tomcat 中的管道,利用責任鏈模式進行實現的,具體的過程是這樣的:Engine,host ,context 以及Wrapper 都是一個管道,在每一個管道中,會存放不一樣的Value ,這些Value 表明的是各個類對請求的處理,就像一個車間中的生產線同樣,整條生產線能夠理解爲一個管道,生產線上每一個工人就是一value ,每一個工人加工完產品以後,會把加工以後的產品交給下一個工人,直到全部工人都加完工過產品。不過,在tomcat 的管道中,有一個baseValue ,這個Value 是確定會執行的,並且是在全部value 執行完再執行,這就相似於工廠中質檢的機器,最後必定會檢查產品的質量。value 對應的是某個處理請求的類,而BaseValue在四個組件中對應的分別是下面這幾個類:StandardEngineValue、StandardHostValue、StandardContextValue、StandardWrapperValue,也就是說,這四個類對請求的處理是必定會被管道鎖執行的。若是仍是沒法理解,能夠看下面的示意圖理解管道:
下面咱們就以一個StandardEngin以及它對應的value StandardEngineValue研究下,什麼是value ,管道又是如何進行工做的,首先,咱們看看StandardEngineValue的源碼(主要是看其中的invoke 方法,該方法被調用時,對應的StandardEngine的invoke 方法將會被執行 ):
public final void invoke(Request request, Response response) throws IOException, ServletException { // Select the Host to be used for this Request Host host = request.getHost(); if (host == null) { response.sendError (HttpServletResponse.SC_BAD_REQUEST, sm.getString("standardEngine.noHost", request.getServerName())); return; } if (request.isAsyncSupported()) { request.setAsyncSupported(host.getPipeline().isAsyncSupported()); } // Ask this Host to process this request host.getPipeline().getFirst().invoke(request, response); }
能夠看到,做爲Engine管道中最後執行的Value ,StandardEngineValue的invoke 會獲取Host管道中第一個HostValue,而後調用對應的invoke ,當第一個HostValue的invoke 方法執行完的時候,會繼續獲取Host管道中下一個HostValue,而後一直這個過程,直到全部Value 執行完,到Host的StandardHostValue的時候,StandardHostValue的invoke方法會調用context 通道的第一個Value,進行執行,context通道重複這個過程,直到請求到達Wrapper 通道中時,Wrapper 會調用FilterChain ,咱們的Filter 便會在其中被執行了。總而言之,這個過程,BaseValue的做用即是調用下一個組件的通道,讓整個責任鏈能夠繼續執行下去。
3、感悟
在寫這篇博客的時候,其實感受蠻痛苦的,畢竟不少源碼的確看不懂,不少都只是只能瞭解個大概,好像前面的關於Connectors 的總結,本身不少都是蜻蜓點水地進行大致的梳理,對於組件工做的具體原理仍是不瞭解,並且,其實不少代碼是能夠看懂是什麼意思,可是卻很難懂:爲何編碼的人要這樣作。確實,本身的功力仍是不夠,不管是線程併發、Socket 等基礎,仍是總體的框架思惟層面,都有待提升。開源框架的魅力,其實不只僅是讓你有一個很好的工具能夠用,更重要的是,可讓你知道本身和真正大牛的差距在哪,讓你的思惟方式不斷靠近他們,讓你有很是好的途徑,去學習與模仿,最後轉化爲本身的東西。
廢話很少說,我的感受這篇博客並寫得有點水,歸根到底仍是不少核心的思想沒有正真領悟,不過這篇博客也花了我挺長時間整理的,因此仍是硬着頭皮發出來吧,不足地方歡迎各位大神指正!
秋招乾巴爹!