Tomcat系列(二)- EndPoint源碼解析

在上一節中咱們描述了Tomcat的總體架構,java

咱們知道了Tomcat分爲兩個大組件,一個鏈接器和一個容器。編程

而咱們此次要講的 EndPoint的組件就是屬於鏈接器裏面的。服務器

它是一個通訊的端點,就是負責對外實現TCP/IP協議。架構

EndPoint是個接口,app

它的具體實現類就是 AbstractEndpoint,而 AbstractEndpoint具體的實現類就有 AprEndpointNio2EndpointNioEndpoint框架

  • AprEndpoint:對應的是APR模式,簡單理解就是從操做系統級別解決異步IO的問題,大幅度提升服務器的處理和響應性能。可是啓用這種模式須要安裝一些其餘的依賴庫。異步

  • Nio2Endpoint:利用代碼來實現異步IOsocket

  • NioEndpoint:利用了JAVA的NIO實現了非阻塞IO,Tomcat默認啓動是以這個來啓動的,而這個也是咱們的講述重點。ide

NioEndpoint中重要的組件

  咱們知道 NioEndpoint的原理仍是對於Linux的多路複用器的使用,而在多路複用器中簡單來講就兩個步驟。模塊化

    1. 建立一個Selector,在它身上註冊各類Channel,而後調用select方法,等待通道中有感興趣的事件發生。

    2. 若是有感興趣的事情發生了,例如是讀事件,那麼就將信息從通道中讀取出來。

  而 NioEndpoint爲了實現上面這兩步,用了五個組件來。

  這五個組件是 LimitLatchAcceptorPollerSocketProcessorExecutor

/**
 * Threads used to accept new connections and pass them to worker threads.
 */
protected List<Acceptor<U>> acceptors;

/**
 * counter for nr of connections handled by an endpoint
 */
private volatile LimitLatch connectionLimitLatch = null;

/**
 * The socket pollers.
 */
private Poller[] pollers = null;

// 內部類
SocketProcessor

/**
 * External Executor based thread pool.
 */
private Executor executor = null;
View Code

  咱們能夠看到在代碼中定義的這五個組件。具體這五個組件是幹嗎的呢?

    • LimitLatch:鏈接控制器,負責控制最大的鏈接數

    • Acceptor:負責接收新的鏈接,而後返回一個 Channel對象給 Poller

    • Poller:能夠將其當作是NIO中 Selector,負責監控 Channel的狀態

    • SocketProcessor:能夠當作是一個被封裝的任務類

    • Executor:Tomcat本身擴展的線程池,用來執行任務類

  用圖簡單表示就是如下的關係

  

  接下來咱們就來分別的看一下每一個組件裏面關鍵的代碼

LimitLatch

  咱們上面說了 LimitLatch主要是用來控制Tomcat所能接收的最大數量鏈接,若是超過了此鏈接,那麼Tomcat就會將此鏈接線程阻塞等待,等裏面有其餘鏈接釋放了再消費此鏈接。

  那麼 LimitLatch是如何作到呢?咱們能夠看 LimitLatch這個類

public class LimitLatch {
    private static final Log log = LogFactory.getLog(LimitLatch.class);
    
    private class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1L;
        public Sync() {}
        @Override
        protected int tryAcquireShared(int ignored) {
            long newCount = count.incrementAndGet();
            if (!released && newCount > limit) {
                // Limit exceeded
                count.decrementAndGet();
                return -1;
            } else {
                return 1;
            }
        }
        @Override
        protected boolean tryReleaseShared(int arg) {
            count.decrementAndGet();
            return true;
        }
    }

    private final Sync sync;
    private final AtomicLong count;
    private volatile long limit;
    private volatile boolean released = false;
}
View Code

  咱們能夠看到它內部實現了 AbstractQueuedSynchronizer,AQS其實就是一個框架,實現它的類能夠自定義控制線程何時掛起何時釋放。

  limit參數就是控制的最大鏈接數。

  咱們能夠看到 AbstractEndpoint調用 LimitLatchcountUpOrAwait方法來判斷是否能獲取鏈接。

public void countUpOrAwait() throws InterruptedException {
        if (log.isDebugEnabled()) {
            log.debug("Counting up["+Thread.currentThread().getName()+"] latch="+getCount());
        }
        sync.acquireSharedInterruptibly(1);
    }
View Code

  AQS是如何知道何時阻塞線程呢?即不能獲取鏈接呢?

  這些就靠用戶本身實現 AbstractQueuedSynchronizer本身來定義何時獲取鏈接,何時釋放鏈接了。

  能夠看到Sync類重寫了 tryAcquireSharedtryReleaseShared方法。

  在 tryAcquireShared方法中定義了一旦當前鏈接數大於了設置的最大鏈接數,那麼就會返回 -1表示將此線程放入AQS隊列中等待。

Acceptor

  Acceptor是接收鏈接的,咱們能夠看到 Acceptor實現了 Runnable接口,那麼在哪會新開啓線程來執行 Acceptor的run方法呢?

  在 AbstractEndpointstartAcceptorThreads方法中。

protected final void startAcceptorThreads() {
    int count = getAcceptorThreadCount();
    acceptors = new Acceptor[count];

    for (int i = 0; i < count; i++) {
        acceptors[i] = createAcceptor();
        String threadName = getName() + "-Acceptor-" + i;
        acceptors[i].setThreadName(threadName);
        Thread t = new Thread(acceptors[i], threadName);
        t.setPriority(getAcceptorThreadPriority());
        t.setDaemon(getDaemon());
        t.start();
    }
}
View Code

  能夠看到這裏能夠設置開啓幾個 Acceptor,默認是一個。

  而一個端口只能對應一個 ServerSocketChannel,那麼這個 ServerSocketChannel在哪初始化呢?咱們能夠看到在 Acceptor<U>acceptor=newAcceptor<>(this);

  這句話中傳入了this進去,那麼應該是由 Endpoint組件初始化的鏈接。

  在 NioEndpointinitServerSocket方法中初始化了鏈接。

  

  這裏面咱們可以看到兩點

    1. 在bind方法中的第二個參數表示操做系統的等待隊列長度,即Tomcat再也不接受鏈接時(達到了設置的最大鏈接數),可是在操做系統層面仍是可以接受鏈接的,此時就將此鏈接信息放入等待隊列,那麼這個隊列的大小就是此參數設置

    2. ServerSocketChannel被設置成了阻塞的模式,也就是說是以阻塞方式接受鏈接的。

    或許會有疑問。在平時的NIO編程中Channel不是都要設置成非阻塞模式嗎?

    這裏解釋一下,若是是設置成非阻塞模式那麼就必須設置一個 Selector不斷的輪詢,可是接受鏈接只須要阻塞一個通道便可。

  

  這裏須要注意一點,每一個 Acceptor在生成 PollerEvent對象放入 Poller隊列中時都是隨機取出 Poller對象的,

  因此 Poller中的 Queue對象設置成了 SynchronizedQueue<PollerEvent>,由於可能有多個 Acceptor同時向此 Poller的隊列中放入 PollerEvent對象。

  具體代碼能夠看以下,

public Poller getPoller0() {
    int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
    return pollers[idx];
}
View Code

什麼是操做系統級別的鏈接呢?

在TCP的三次握手中,系統一般會每個LISTEN狀態的Socket維護兩個隊列,一個是半鏈接隊列(SYN):

這些鏈接已經收到客戶端SYN;另外一個是全鏈接隊列(ACCEPT):

這些連接已經收到客戶端的ACK,完成了三次握手,等待被應用調用accept方法取走使用。

  全部的 Acceptor共用這一個鏈接,在 Acceptorrun方法中,放一些重要的代碼。

public void run(){
    // Loop until we receive a shutdown command
    while(endpoint.isRunning()){
        try{
            //若是到了最大鏈接數,線程等待
            endpoint.countUpOrAwaitConnection();
            U socket = null;             
            try{
                //調用accept方法得到一個鏈接
                socket = endpoint.serverSocketAccept();
            }catch(Exception ioe){                        
                // 出異常之後當前鏈接數減掉1
                endpoint.countDownConnection();                   
            }
            // 配置Socket
            if(endpoint.isRunning() && !endpoint.isPaused()){
                // setSocketOptions() will hand the socket off to                   
                // an appropriate processor if successful                      
                if(!endpoint.setSocketOptions(socket)){
                    endpoint.closeSocket(socket)        
                }
            } else {
                endpoint.destroySocket(socket);                    
            }        
        }
    }
}    
View Code

  裏面咱們能夠獲得兩點

    1. 運行時會先判斷是否到達了最大鏈接數,若是到達了那麼就阻塞線程等待,裏面調用的就是 LimitLatch組件判斷的。

    2. 最重要的就是配置socket這一步了,是 endpoint.setSocketOptions(socket)這段代碼

  

  其實裏面重要的就是將 Acceptor與一個 Poller綁定起來,而後兩個組件經過隊列通訊,每一個Poller都維護着一個 SynchronizedQueue隊列, ChannelEvent放入到隊列中,而後 Poller從隊列中取出事件進行消費。

Poller

  咱們能夠看到 PollerNioEndpoint的內部類,而它也是實現了 Runnable接口,能夠看到在其類中維護了一個Quene和Selector,定義以下。

  因此本質上 Poller就是 Selector

private Selector selector;
private final SynchronizedQueue<PollerEvent> events =new SynchronizedQueue<>();
View Code

  重點在其run方法中,這裏刪減了一些代碼,只展現重要的。

  

  其中主要的就是調用了 events()方法,就是不斷的查看隊列中是否有 Pollerevent事件,若是有的話就將其取出而後把裏面的 Channel取出來註冊到該 Selector中,而後不斷輪詢全部註冊過的 Channel查看是否有事件發生。

SocketProcessor

  咱們知道 Poller在輪詢 Channel有事件發生時,就會調用將此事件封裝起來,而後交給線程池去執行。

  那麼這個包裝類就是 SocketProcessor

  而咱們打開此類,可以看到它也實現了 Runnable接口,用來定義線程池 Executor中線程所執行的任務。

  那麼這裏是如何將 Channel中的字節流轉換爲Tomcat須要的 ServletRequest對象呢?其實就是調用了 Http11Processor來進行字節流與對象的轉換的。

Executor

  Executor實際上是Tomcat定製版的線程池。咱們能夠看它的類的定義,能夠發現它實際上是擴展了Java的線程池。

public interface Executor extends java.util.concurrent.Executor, Lifecycle
View Code

  在線程池中最重要的兩個參數就是核心線程數和最大線程數,正常的Java線程池的執行流程是這樣的。

    1. 若是當前線程小於核心線程數,那麼來一個任務就建立一個線程。

    2. 若是當前線程大於核心線程數,那麼就再來任務就將任務放入到任務隊列中。全部線程搶任務。

    3. 若是隊列滿了,那麼就開始建立臨時線程。

    4. 若是總線程數到了最大的線程數而且隊列也滿了,那麼就拋出異常。

  可是在Tomcat自定義的線程池中是不同的,經過重寫了 execute方法實現了本身的任務處理邏輯。

    1. 若是當前線程小於核心線程數,那麼來一個任務就建立一個線程。

    2. 若是當前線程大於核心線程數,那麼就再來任務就將任務放入到任務隊列中。全部線程搶任務。

    3. 若是隊列滿了,那麼就開始建立臨時線程。

    4. 若是總線程數到了最大的線程數,再次得到任務隊列,再嘗試一次將任務加入隊列中。

    5. 若是此時仍是滿的,就拋異常。

  差異就在於第四步的差異,原生線程池的處理策略是隻要當前線程數大於最大線程數,那麼就拋異常,而Tomcat的則是若是當前線程數大於最大線程數,就再嘗試一次,若是仍是滿的纔會拋異常。

  下面是定製化線程池 execute的執行邏輯。

public void execute(Runnable command, long timeout,TimeUnit unit){
    submittedCount.incrementAndGet();  
    try{ 
        super.execute(command);
    }catch(RejectedExecutionException rx){            
        if(super.getQueue() instanceof TaskQueue){

            //得到任務隊列             
            final TaskQueue queue = (TaskQueue)super.getQueue();
            try{            
                if(!queue.force(command,timeout,unit)){
                    submittedCount.decrementAndGet();                
                    throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));                   
                }         
            }catch(InterruptedException x){
                submittedCount.decrementAndGet();
                throw new RejectedExecutionException(x);            
            }    
        }else{
            submittedCount.decrementAndGet();
            throw rx;        
        }   
    }
}
View Code

  在代碼中,咱們能夠看到有這麼一句 submittedCount.incrementAndGet();

  爲何會有這句呢?咱們能夠看看這個參數的定義。

  簡單來講這個參數就是定義了任務已經提交到了線程池中,可是尚未執行的任務個數。

private final AtomicInteger submittedCount = new AtomicInteger(0);
View Code

  爲何會有這麼一個參數呢?

  咱們知道定製的隊列是繼承了 LinkedBlockingQueue,而 LinkedBlockingQueue隊列默認是沒有邊界的。

  因而咱們就傳入了一個參數, maxQueueSize給構造的隊列。

  可是在Tomcat的任務隊列默認狀況下是無限制的,那麼這樣就會出一個問題,若是當前線程達到了核心線程數,則開始向隊列中添加任務,那麼就會一直是添加成功的。

  那麼就不會再建立新的線程。那麼在什麼狀況下要新建線程呢?

線程池中建立新線程會有兩個地方,一個是小於核心線程時,來一個任務建立一個線程。另外一個是超過核心線程而且任務隊列已滿,則會建立臨時線程。

  那麼如何規定任務隊列是否已滿呢?若是設置了隊列的最大長度固然好了,可是Tomcat默認狀況下是沒有設置,因此默認是無限的。因此Tomcat的 TaskQueue繼承了 LinkedBlockingQueue,重寫了 offer方法,在裏面定義了何時返回false。

  

  這就是 submittedCount的意義,目的就是爲了在任務隊列長度無限的狀況下,讓線程池有機會建立新的線程。

總結

  上面的知識有部分是看着李號雙老師的深刻拆解Tomcat總結的,又結合着源碼深刻了解了一下,當時剛看文章的時候以爲本身都懂了,可是再深刻源碼的時候又會發現本身不懂。

  因此知識若是隻是看了而不運用,那麼知識永遠都不會是本身的。

  經過Tomcat鏈接器這一小塊的源碼學習,除了一些經常使用知識的實際運用,例如AQS、鎖的應用、自定義線程池須要考慮的點、NIO的應用等等。

  還有整體上的設計思惟的學習,模塊化設計,和現在的微服務感受很類似,將一個功能點內部分爲多種模塊,這樣不管是在之後替換或者是升級時都能遊刃有餘。

相關文章
相關標籤/搜索