在上一節中咱們描述了Tomcat的總體架構,java
咱們知道了Tomcat分爲兩個大組件,一個鏈接器和一個容器。編程
而咱們此次要講的 EndPoint
的組件就是屬於鏈接器裏面的。服務器
它是一個通訊的端點,就是負責對外實現TCP/IP協議。架構
EndPoint
是個接口,app
它的具體實現類就是 AbstractEndpoint
,而 AbstractEndpoint
具體的實現類就有 AprEndpoint
、 Nio2Endpoint
、 NioEndpoint
。框架
AprEndpoint
:對應的是APR模式,簡單理解就是從操做系統級別解決異步IO的問題,大幅度提升服務器的處理和響應性能。可是啓用這種模式須要安裝一些其餘的依賴庫。異步
Nio2Endpoint
:利用代碼來實現異步IOsocket
NioEndpoint
:利用了JAVA的NIO實現了非阻塞IO,Tomcat默認啓動是以這個來啓動的,而這個也是咱們的講述重點。ide
咱們知道 NioEndpoint
的原理仍是對於Linux的多路複用器的使用,而在多路複用器中簡單來講就兩個步驟。模塊化
1. 建立一個Selector,在它身上註冊各類Channel,而後調用select方法,等待通道中有感興趣的事件發生。
2. 若是有感興趣的事情發生了,例如是讀事件,那麼就將信息從通道中讀取出來。
而 NioEndpoint
爲了實現上面這兩步,用了五個組件來。
這五個組件是 LimitLatch
、 Acceptor
、 Poller
、 SocketProcessor
、 Executor
/** * Threads used to accept new connections and pass them to worker threads. */ protected List<Acceptor<U>> acceptors; /** * counter for nr of connections handled by an endpoint */ private volatile LimitLatch connectionLimitLatch = null; /** * The socket pollers. */ private Poller[] pollers = null; // 內部類 SocketProcessor /** * External Executor based thread pool. */ private Executor executor = null;
咱們能夠看到在代碼中定義的這五個組件。具體這五個組件是幹嗎的呢?
LimitLatch
:鏈接控制器,負責控制最大的鏈接數
Acceptor
:負責接收新的鏈接,而後返回一個 Channel
對象給 Poller
Poller
:能夠將其當作是NIO中 Selector
,負責監控 Channel
的狀態
SocketProcessor
:能夠當作是一個被封裝的任務類
Executor
:Tomcat本身擴展的線程池,用來執行任務類
用圖簡單表示就是如下的關係
接下來咱們就來分別的看一下每一個組件裏面關鍵的代碼
咱們上面說了 LimitLatch
主要是用來控制Tomcat所能接收的最大數量鏈接,若是超過了此鏈接,那麼Tomcat就會將此鏈接線程阻塞等待,等裏面有其餘鏈接釋放了再消費此鏈接。
那麼 LimitLatch
是如何作到呢?咱們能夠看 LimitLatch
這個類
public class LimitLatch { private static final Log log = LogFactory.getLog(LimitLatch.class); private class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1L; public Sync() {} @Override protected int tryAcquireShared(int ignored) { long newCount = count.incrementAndGet(); if (!released && newCount > limit) { // Limit exceeded count.decrementAndGet(); return -1; } else { return 1; } } @Override protected boolean tryReleaseShared(int arg) { count.decrementAndGet(); return true; } } private final Sync sync; private final AtomicLong count; private volatile long limit; private volatile boolean released = false; }
咱們能夠看到它內部實現了 AbstractQueuedSynchronizer
,AQS其實就是一個框架,實現它的類能夠自定義控制線程何時掛起何時釋放。
limit
參數就是控制的最大鏈接數。
咱們能夠看到 AbstractEndpoint
調用 LimitLatch
的 countUpOrAwait
方法來判斷是否能獲取鏈接。
public void countUpOrAwait() throws InterruptedException { if (log.isDebugEnabled()) { log.debug("Counting up["+Thread.currentThread().getName()+"] latch="+getCount()); } sync.acquireSharedInterruptibly(1); }
AQS是如何知道何時阻塞線程呢?即不能獲取鏈接呢?
這些就靠用戶本身實現 AbstractQueuedSynchronizer
本身來定義何時獲取鏈接,何時釋放鏈接了。
能夠看到Sync類重寫了 tryAcquireShared
和 tryReleaseShared
方法。
在 tryAcquireShared
方法中定義了一旦當前鏈接數大於了設置的最大鏈接數,那麼就會返回 -1
表示將此線程放入AQS隊列中等待。
Acceptor
是接收鏈接的,咱們能夠看到 Acceptor
實現了 Runnable
接口,那麼在哪會新開啓線程來執行 Acceptor
的run方法呢?
在 AbstractEndpoint
的 startAcceptorThreads
方法中。
protected final void startAcceptorThreads() { int count = getAcceptorThreadCount(); acceptors = new Acceptor[count]; for (int i = 0; i < count; i++) { acceptors[i] = createAcceptor(); String threadName = getName() + "-Acceptor-" + i; acceptors[i].setThreadName(threadName); Thread t = new Thread(acceptors[i], threadName); t.setPriority(getAcceptorThreadPriority()); t.setDaemon(getDaemon()); t.start(); } }
能夠看到這裏能夠設置開啓幾個 Acceptor
,默認是一個。
而一個端口只能對應一個 ServerSocketChannel
,那麼這個 ServerSocketChannel
在哪初始化呢?咱們能夠看到在 Acceptor<U>acceptor=newAcceptor<>(this);
這句話中傳入了this進去,那麼應該是由 Endpoint
組件初始化的鏈接。
在 NioEndpoint
的 initServerSocket
方法中初始化了鏈接。
這裏面咱們可以看到兩點
1. 在bind方法中的第二個參數表示操做系統的等待隊列長度,即Tomcat再也不接受鏈接時(達到了設置的最大鏈接數),可是在操做系統層面仍是可以接受鏈接的,此時就將此鏈接信息放入等待隊列,那麼這個隊列的大小就是此參數設置
2. ServerSocketChannel
被設置成了阻塞的模式,也就是說是以阻塞方式接受鏈接的。
或許會有疑問。在平時的NIO編程中Channel不是都要設置成非阻塞模式嗎?
這裏解釋一下,若是是設置成非阻塞模式那麼就必須設置一個 Selector
不斷的輪詢,可是接受鏈接只須要阻塞一個通道便可。
這裏須要注意一點,每一個 Acceptor
在生成 PollerEvent
對象放入 Poller
隊列中時都是隨機取出 Poller
對象的,
因此 Poller
中的 Queue
對象設置成了 SynchronizedQueue<PollerEvent>
,由於可能有多個 Acceptor
同時向此 Poller
的隊列中放入 PollerEvent
對象。
具體代碼能夠看以下,
public Poller getPoller0() { int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length; return pollers[idx]; }
什麼是操做系統級別的鏈接呢?
在TCP的三次握手中,系統一般會每個LISTEN狀態的Socket維護兩個隊列,一個是半鏈接隊列(SYN):
這些鏈接已經收到客戶端SYN;另外一個是全鏈接隊列(ACCEPT):
這些連接已經收到客戶端的ACK,完成了三次握手,等待被應用調用accept方法取走使用。
全部的 Acceptor
共用這一個鏈接,在 Acceptor
的 run
方法中,放一些重要的代碼。
public void run(){ // Loop until we receive a shutdown command while(endpoint.isRunning()){ try{ //若是到了最大鏈接數,線程等待 endpoint.countUpOrAwaitConnection(); U socket = null; try{ //調用accept方法得到一個鏈接 socket = endpoint.serverSocketAccept(); }catch(Exception ioe){ // 出異常之後當前鏈接數減掉1 endpoint.countDownConnection(); } // 配置Socket if(endpoint.isRunning() && !endpoint.isPaused()){ // setSocketOptions() will hand the socket off to // an appropriate processor if successful if(!endpoint.setSocketOptions(socket)){ endpoint.closeSocket(socket) } } else { endpoint.destroySocket(socket); } } } }
裏面咱們能夠獲得兩點
1. 運行時會先判斷是否到達了最大鏈接數,若是到達了那麼就阻塞線程等待,裏面調用的就是 LimitLatch
組件判斷的。
2. 最重要的就是配置socket這一步了,是 endpoint.setSocketOptions(socket)
這段代碼
其實裏面重要的就是將 Acceptor
與一個 Poller
綁定起來,而後兩個組件經過隊列通訊,每一個Poller都維護着一個 SynchronizedQueue
隊列, ChannelEvent
放入到隊列中,而後 Poller
從隊列中取出事件進行消費。
咱們能夠看到 Poller
是 NioEndpoint
的內部類,而它也是實現了 Runnable
接口,能夠看到在其類中維護了一個Quene和Selector,定義以下。
因此本質上 Poller
就是 Selector
。
private Selector selector; private final SynchronizedQueue<PollerEvent> events =new SynchronizedQueue<>();
重點在其run方法中,這裏刪減了一些代碼,只展現重要的。
其中主要的就是調用了 events()
方法,就是不斷的查看隊列中是否有 Pollerevent
事件,若是有的話就將其取出而後把裏面的 Channel
取出來註冊到該 Selector
中,而後不斷輪詢全部註冊過的 Channel
查看是否有事件發生。
咱們知道 Poller
在輪詢 Channel
有事件發生時,就會調用將此事件封裝起來,而後交給線程池去執行。
那麼這個包裝類就是 SocketProcessor
。
而咱們打開此類,可以看到它也實現了 Runnable
接口,用來定義線程池 Executor
中線程所執行的任務。
那麼這裏是如何將 Channel
中的字節流轉換爲Tomcat須要的 ServletRequest
對象呢?其實就是調用了 Http11Processor
來進行字節流與對象的轉換的。
Executor
實際上是Tomcat定製版的線程池。咱們能夠看它的類的定義,能夠發現它實際上是擴展了Java的線程池。
public interface Executor extends java.util.concurrent.Executor, Lifecycle
在線程池中最重要的兩個參數就是核心線程數和最大線程數,正常的Java線程池的執行流程是這樣的。
1. 若是當前線程小於核心線程數,那麼來一個任務就建立一個線程。
2. 若是當前線程大於核心線程數,那麼就再來任務就將任務放入到任務隊列中。全部線程搶任務。
3. 若是隊列滿了,那麼就開始建立臨時線程。
4. 若是總線程數到了最大的線程數而且隊列也滿了,那麼就拋出異常。
可是在Tomcat自定義的線程池中是不同的,經過重寫了 execute
方法實現了本身的任務處理邏輯。
1. 若是當前線程小於核心線程數,那麼來一個任務就建立一個線程。
2. 若是當前線程大於核心線程數,那麼就再來任務就將任務放入到任務隊列中。全部線程搶任務。
3. 若是隊列滿了,那麼就開始建立臨時線程。
4. 若是總線程數到了最大的線程數,再次得到任務隊列,再嘗試一次將任務加入隊列中。
5. 若是此時仍是滿的,就拋異常。
差異就在於第四步的差異,原生線程池的處理策略是隻要當前線程數大於最大線程數,那麼就拋異常,而Tomcat的則是若是當前線程數大於最大線程數,就再嘗試一次,若是仍是滿的纔會拋異常。
下面是定製化線程池 execute
的執行邏輯。
public void execute(Runnable command, long timeout,TimeUnit unit){ submittedCount.incrementAndGet(); try{ super.execute(command); }catch(RejectedExecutionException rx){ if(super.getQueue() instanceof TaskQueue){ //得到任務隊列 final TaskQueue queue = (TaskQueue)super.getQueue(); try{ if(!queue.force(command,timeout,unit)){ submittedCount.decrementAndGet(); throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull")); } }catch(InterruptedException x){ submittedCount.decrementAndGet(); throw new RejectedExecutionException(x); } }else{ submittedCount.decrementAndGet(); throw rx; } } }
在代碼中,咱們能夠看到有這麼一句 submittedCount.incrementAndGet();
爲何會有這句呢?咱們能夠看看這個參數的定義。
簡單來講這個參數就是定義了任務已經提交到了線程池中,可是尚未執行的任務個數。
private final AtomicInteger submittedCount = new AtomicInteger(0);
爲何會有這麼一個參數呢?
咱們知道定製的隊列是繼承了 LinkedBlockingQueue
,而 LinkedBlockingQueue
隊列默認是沒有邊界的。
因而咱們就傳入了一個參數, maxQueueSize
給構造的隊列。
可是在Tomcat的任務隊列默認狀況下是無限制的,那麼這樣就會出一個問題,若是當前線程達到了核心線程數,則開始向隊列中添加任務,那麼就會一直是添加成功的。
那麼就不會再建立新的線程。那麼在什麼狀況下要新建線程呢?
線程池中建立新線程會有兩個地方,一個是小於核心線程時,來一個任務建立一個線程。另外一個是超過核心線程而且任務隊列已滿,則會建立臨時線程。
那麼如何規定任務隊列是否已滿呢?若是設置了隊列的最大長度固然好了,可是Tomcat默認狀況下是沒有設置,因此默認是無限的。因此Tomcat的 TaskQueue
繼承了 LinkedBlockingQueue
,重寫了 offer
方法,在裏面定義了何時返回false。
這就是 submittedCount
的意義,目的就是爲了在任務隊列長度無限的狀況下,讓線程池有機會建立新的線程。
上面的知識有部分是看着李號雙老師的深刻拆解Tomcat總結的,又結合着源碼深刻了解了一下,當時剛看文章的時候以爲本身都懂了,可是再深刻源碼的時候又會發現本身不懂。
因此知識若是隻是看了而不運用,那麼知識永遠都不會是本身的。
經過Tomcat鏈接器這一小塊的源碼學習,除了一些經常使用知識的實際運用,例如AQS、鎖的應用、自定義線程池須要考慮的點、NIO的應用等等。
還有整體上的設計思惟的學習,模塊化設計,和現在的微服務感受很類似,將一個功能點內部分爲多種模塊,這樣不管是在之後替換或者是升級時都能遊刃有餘。