死磕Tomcat系列(2)——EndPoint源碼解析

死磕Tomcat系列(2)——EndPoint源碼解析

在上一節中咱們描述了Tomcat的總體架構,咱們知道了Tomcat分爲兩個大組件,一個鏈接器和一個容器。而咱們此次要講的EndPoint的組件就是屬於鏈接器裏面的。它是一個通訊的端點,就是負責對外實現TCP/IP協議。EndPoint是個接口,它的具體實現類就是AbstractEndpoint,而AbstractEndpoint具體的實現類就有AprEndpointNio2EndpointNioEndpointjava

  • AprEndpoint:對應的是APR模式,簡單理解就是從操做系統級別解決異步IO的問題,大幅度提升服務器的處理和響應性能。可是啓用這種模式須要安裝一些其餘的依賴庫。
  • Nio2Endpoint:利用代碼來實現異步IO
  • NioEndpoint:利用了JAVA的NIO實現了非阻塞IO,Tomcat默認啓動是以這個來啓動的,而這個也是咱們的講述重點。

NioEndpoint中重要的組件

咱們知道NioEndpoint 的原理仍是對於Linux的多路複用器的使用,而在多路複用器中簡單來講就兩個步驟。編程

  1. 建立一個Selector,在它身上註冊各類Channel,而後調用select方法,等待通道中有感興趣的事件發生。
  2. 若是有感興趣的事情發生了,例如是讀事件,那麼就將信息從通道中讀取出來。

NioEndpoint爲了實現上面這兩步,用了五個組件來。這五個組件是LimitLatch Acceptor Poller SocketProcessor Executor服務器

/**
 * Threads used to accept new connections and pass them to worker threads.
 */
protected List<Acceptor<U>> acceptors;

/**
 * counter for nr of connections handled by an endpoint
 */
private volatile LimitLatch connectionLimitLatch = null;
/**
 * The socket pollers. 
 */
private Poller[] pollers = null;

內部類

SocketProcessor

/**
 * External Executor based thread pool.
 */
private Executor executor = null;

咱們能夠看到在代碼中定義的這五個組件。具體這五個組件是幹嗎的呢?架構

  • LimitLatch:鏈接控制器,負責控制最大的鏈接數
  • Acceptor:負責接收新的鏈接,而後返回一個Channel對象給Poller
  • Poller :能夠將其當作是NIO中Selector,負責監控Channel的狀態
  • SocketProcessor :能夠當作是一個被封裝的任務類
  • Executor :Tomcat本身擴展的線程池,用來執行任務類

用圖簡單表示就是如下的關係app

接下來咱們就來分別的看一下每一個組件裏面關鍵的代碼框架

LimitLatch

咱們上面說了LimitLatch 主要是用來控制Tomcat所能接收的最大數量鏈接,若是超過了此鏈接,那麼Tomcat就會將此鏈接線程阻塞等待,等裏面有其餘鏈接釋放了再消費此鏈接。那麼LimitLatch 是如何作到呢?咱們能夠看LimitLatch 這個類異步

public class LimitLatch {

    private static final Log log = LogFactory.getLog(LimitLatch.class);

    private class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1L;

        public Sync() {
        }

        @Override
        protected int tryAcquireShared(int ignored) {
            long newCount = count.incrementAndGet();
            if (!released && newCount > limit) {
                // Limit exceeded
                count.decrementAndGet();
                return -1;
            } else {
                return 1;
            }
        }

        @Override
        protected boolean tryReleaseShared(int arg) {
            count.decrementAndGet();
            return true;
        }
    }

    private final Sync sync;
    //當前鏈接數
    private final AtomicLong count;
    //最大鏈接數
    private volatile long limit;
    private volatile boolean released = false;
}

咱們能夠看到它內部實現了AbstractQueuedSynchronizer,AQS其實就是一個框架,實現它的類能夠自定義控制線程何時掛起何時釋放。limit 參數就是控制的最大鏈接數。咱們能夠看到AbstractEndpoint調用LimitLatch countUpOrAwait方法來判斷是否能獲取鏈接。socket

public void countUpOrAwait() throws InterruptedException {
        if (log.isDebugEnabled()) {
            log.debug("Counting up["+Thread.currentThread().getName()+"] latch="+getCount());
        }
        sync.acquireSharedInterruptibly(1);
    }

AQS是如何知道何時阻塞線程呢?即不能獲取鏈接呢?這些就靠用戶本身實現AbstractQueuedSynchronizer 本身來定義何時獲取鏈接,何時釋放鏈接了。能夠看到Sync類重寫了tryAcquireShared tryReleaseShared 方法。在tryAcquireShared 方法中定義了一旦當前鏈接數大於了設置的最大鏈接數,那麼就會返回-1表示將此線程放入AQS隊列中等待。ide

Acceptor

Acceptor 是接收鏈接的,咱們能夠看到Acceptor實現了Runnable接口,那麼在哪會新開啓線程來執行Acceptor的run方法呢?在AbstractEndpointstartAcceptorThreads方法中。模塊化

protected void startAcceptorThreads() {
    int count = getAcceptorThreadCount();
    acceptors = new ArrayList<>(count);

    for (int i = 0; i < count; i++) {
        Acceptor<U> acceptor = new Acceptor<>(this);
        String threadName = getName() + "-Acceptor-" + i;
        acceptor.setThreadName(threadName);
        acceptors.add(acceptor);
        Thread t = new Thread(acceptor, threadName);
        t.setPriority(getAcceptorThreadPriority());
        t.setDaemon(getDaemon());
        t.start();
    }
}

能夠看到這裏能夠設置開啓幾個Acceptor,默認是一個。而一個端口只能對應一個ServerSocketChannel,那麼這個ServerSocketChannel 在哪初始化呢?咱們能夠看到在 Acceptor<U> acceptor = new Acceptor<>(this);這句話中傳入了this進去,那麼應該是由Endpoint組件初始化的鏈接。在NioEndpointinitServerSocket 方法中初始化了鏈接。

// Separated out to make it easier for folks that extend NioEndpoint to
// implement custom [server]sockets
protected void initServerSocket() throws Exception {
    if (!getUseInheritedChannel()) {
        serverSock = ServerSocketChannel.open();
        socketProperties.setProperties(serverSock.socket());
        InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset());
        serverSock.socket().bind(addr,getAcceptCount());
    } else {
        // Retrieve the channel provided by the OS
        Channel ic = System.inheritedChannel();
        if (ic instanceof ServerSocketChannel) {
            serverSock = (ServerSocketChannel) ic;
        }
        if (serverSock == null) {
            throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited"));
        }
    }
    serverSock.configureBlocking(true); //mimic APR behavior
}

這裏面咱們可以看到兩點

  1. 在bind方法中的第二個參數表示操做系統的等待隊列長度,即Tomcat再也不接受鏈接時(達到了設置的最大鏈接數),可是在操做系統層面仍是可以接受鏈接的,此時就將此鏈接信息放入等待隊列,那麼這個隊列的大小就是此參數設置的。
  2. ServerSocketChannel被設置成了阻塞的模式,也就是說是以阻塞方式接受鏈接的。或許會有疑問。在平時的NIO編程中Channel不是都要設置成非阻塞模式嗎?這裏解釋一下,若是是設置成非阻塞模式那麼就必須設置一個Selector不斷的輪詢,可是接受鏈接只須要阻塞一個通道便可。

這裏須要注意一點,每一個Acceptor在生成PollerEvent對象放入Poller隊列中時都是隨機取出Poller對象的,具體代碼能夠看以下,因此Poller中的Queue對象設置成了SynchronizedQueue<PollerEvent>,由於可能有多個Acceptor 同時向此Poller的隊列中放入PollerEvent對象。

public Poller getPoller0() {
    if (pollerThreadCount == 1) {
        return pollers[0];
    } else {
        int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
        return pollers[idx];
    }
}

什麼是操做系統級別的鏈接呢?在TCP的三次握手中,系統一般會每個LISTEN狀態的Socket維護兩個隊列,一個是半鏈接隊列(SYN):這些鏈接已經收到客戶端SYN;另外一個是全鏈接隊列(ACCEPT):這些連接已經收到客戶端的ACK,完成了三次握手,等待被應用調用accept方法取走使用。

全部的Acceptor共用這一個鏈接,在Acceptorrun方法中,放一些重要的代碼。

@Override
    public void run() {
        // Loop until we receive a shutdown command
        while (endpoint.isRunning()) {
            try {
                //若是到了最大鏈接數,線程等待
                endpoint.countUpOrAwaitConnection();
                U socket = null;
                try {
                    //調用accept方法得到一個鏈接
                    socket = endpoint.serverSocketAccept();
                } catch (Exception ioe) {
                    // 出異常之後當前鏈接數減掉1
                    endpoint.countDownConnection();
                }
                // 配置Socket
                if (endpoint.isRunning() && !endpoint.isPaused()) {
                    // setSocketOptions() will hand the socket off to
                    // an appropriate processor if successful
                    if (!endpoint.setSocketOptions(socket)) {
                        endpoint.closeSocket(socket);
                    }
                } else {
                    endpoint.destroySocket(socket);
                }
    }

裏面咱們能夠獲得兩點

  1. 運行時會先判斷是否到達了最大鏈接數,若是到達了那麼就阻塞線程等待,裏面調用的就是LimitLatch 組件判斷的。
  2. 最重要的就是配置socket這一步了,是endpoint.setSocketOptions(socket)這段代碼
protected boolean setSocketOptions(SocketChannel socket) {
        // Process the connection
        try {
            // 設置Socket爲非阻塞模式,供Poller調用
            socket.configureBlocking(false);
            Socket sock = socket.socket();
            socketProperties.setProperties(sock);

            NioChannel channel = null;
            if (nioChannels != null) {
                channel = nioChannels.pop();
            }
            if (channel == null) {
                SocketBufferHandler bufhandler = new SocketBufferHandler(
                        socketProperties.getAppReadBufSize(),
                        socketProperties.getAppWriteBufSize(),
                        socketProperties.getDirectBuffer());
                if (isSSLEnabled()) {
                    channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
                } else {
                    channel = new NioChannel(socket, bufhandler);
                }
            } else {
                channel.setIOChannel(socket);
                channel.reset();
            }
            //註冊ChannelEvent,實際上是將ChannelEvent放入到隊列中,而後Poller從隊列中取
            getPoller0().register(channel);
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            try {
                log.error(sm.getString("endpoint.socketOptionsError"), t);
            } catch (Throwable tt) {
                ExceptionUtils.handleThrowable(tt);
            }
            // Tell to close the socket
            return false;
        }
        return true;
    }

其實裏面重要的就是將Acceptor與一個Poller綁定起來,而後兩個組件經過隊列通訊,每一個Poller都維護着一個SynchronizedQueue隊列,ChannelEvent放入到隊列中,而後Poller從隊列中取出事件進行消費。

Poller

咱們能夠看到PollerNioEndpoint的內部類,而它也是實現了Runnable接口,能夠看到在其類中維護了一個Quene和Selector,定義以下。因此本質上Poller就是Selector

private Selector selector;
private final SynchronizedQueue<PollerEvent> events = new SynchronizedQueue<>();

重點在其run方法中,這裏刪減了一些代碼,只展現重要的。

@Override
        public void run() {
            // Loop until destroy() is called
            while (true) {
                boolean hasEvents = false;
                try {
                    if (!close) {
                        //查看是否有鏈接進來,若是有就將Channel註冊進Selector中
                        hasEvents = events();
                    }
                    if (close) {
                        events();
                        timeout(0, false);
                        try {
                            selector.close();
                        } catch (IOException ioe) {
                            log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
                        }
                        break;
                    }
                } catch (Throwable x) {
                    ExceptionUtils.handleThrowable(x);
                    log.error(sm.getString("endpoint.nio.selectorLoopError"), x);
                    continue;
                }
                if (keyCount == 0) {
                    hasEvents = (hasEvents | events());
                }
                Iterator<SelectionKey> iterator =
                    keyCount > 0 ? selector.selectedKeys().iterator() : null;
                // Walk through the collection of ready keys and dispatch
                // any active event.
                while (iterator != null && iterator.hasNext()) {
                    SelectionKey sk = iterator.next();
                    NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
                    // Attachment may be null if another thread has called
                    // cancelledKey()
                    if (socketWrapper == null) {
                        iterator.remove();
                    } else {
                        iterator.remove();
                        processKey(sk, socketWrapper);
                    }
                }

                // Process timeouts
                timeout(keyCount,hasEvents);
            }

            getStopLatch().countDown();
        }

其中主要的就是調用了events()方法,就是不斷的查看隊列中是否有Pollerevent事件,若是有的話就將其取出而後把裏面的Channel取出來註冊到該Selector中,而後不斷輪詢全部註冊過的Channel查看是否有事件發生。

SocketProcessor

咱們知道Poller在輪詢Channel有事件發生時,就會調用將此事件封裝起來,而後交給線程池去執行。那麼這個包裝類就是SocketProcessor。而咱們打開此類,可以看到它也實現了Runnable接口,用來定義線程池Executor 中線程所執行的任務。那麼這裏是如何將Channel中的字節流轉換爲Tomcat須要的ServletRequest對象呢?其實就是調用了Http11Processor 來進行字節流與對象的轉換的。

Executor

Executor 實際上是Tomcat定製版的線程池。咱們能夠看它的類的定義,能夠發現它實際上是擴展了Java的線程池。

public interface Executor extends java.util.concurrent.Executor, Lifecycle

在線程池中最重要的兩個參數就是核心線程數和最大線程數,正常的Java線程池的執行流程是這樣的。

  1. 若是當前線程小於核心線程數,那麼來一個任務就建立一個線程。
  2. 若是當前線程大於核心線程數,那麼就再來任務就將任務放入到任務隊列中。全部線程搶任務。
  3. 若是隊列滿了,那麼就開始建立臨時線程。
  4. 若是總線程數到了最大的線程數而且隊列也滿了,那麼就拋出異常。

可是在Tomcat自定義的線程池中是不同的,經過重寫了execute方法實現了本身的任務處理邏輯。

  1. 若是當前線程小於核心線程數,那麼來一個任務就建立一個線程。
  2. 若是當前線程大於核心線程數,那麼就再來任務就將任務放入到任務隊列中。全部線程搶任務。
  3. 若是隊列滿了,那麼就開始建立臨時線程。
  4. 若是總線程數到了最大的線程數,再次得到任務隊列,再嘗試一次將任務加入隊列中。
  5. 若是此時仍是滿的,就拋異常。

差異就在於第四步的差異,原生線程池的處理策略是隻要當前線程數大於最大線程數,那麼就拋異常,而Tomcat的則是若是當前線程數大於最大線程數,就再嘗試一次,若是仍是滿的纔會拋異常。下面是定製化線程池execute的執行邏輯。

public void execute(Runnable command, long timeout, TimeUnit unit) {
    submittedCount.incrementAndGet();
    try {
        super.execute(command);
    } catch (RejectedExecutionException rx) {
        if (super.getQueue() instanceof TaskQueue) {
            //得到任務隊列
            final TaskQueue queue = (TaskQueue)super.getQueue();
            try {
                if (!queue.force(command, timeout, unit)) {
                    submittedCount.decrementAndGet();
                    throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
                }
            } catch (InterruptedException x) {
                submittedCount.decrementAndGet();
                throw new RejectedExecutionException(x);
            }
        } else {
            submittedCount.decrementAndGet();
            throw rx;
        }

    }
}

在代碼中,咱們能夠看到有這麼一句submittedCount.incrementAndGet();,爲何會有這句呢?咱們能夠看看這個參數的定義。簡單來講這個參數就是定義了任務已經提交到了線程池中,可是尚未執行的任務個數。

/**
 * The number of tasks submitted but not yet finished. This includes tasks
 * in the queue and tasks that have been handed to a worker thread but the
 * latter did not start executing the task yet.
 * This number is always greater or equal to {@link #getActiveCount()}.
 */
private final AtomicInteger submittedCount = new AtomicInteger(0);

爲何會有這麼一個參數呢?咱們知道定製的隊列是繼承了LinkedBlockingQueue,而LinkedBlockingQueue隊列默認是沒有邊界的。因而咱們就傳入了一個參數,maxQueueSize給構造的隊列。可是在Tomcat的任務隊列默認狀況下是無限制的,那麼這樣就會出一個問題,若是當前線程達到了核心線程數,則開始向隊列中添加任務,那麼就會一直是添加成功的。那麼就不會再建立新的線程。那麼在什麼狀況下要新建線程呢?

線程池中建立新線程會有兩個地方,一個是小於核心線程時,來一個任務建立一個線程。另外一個是超過核心線程而且任務隊列已滿,則會建立臨時線程。

那麼如何規定任務隊列是否已滿呢?若是設置了隊列的最大長度固然好了,可是Tomcat默認狀況下是沒有設置,因此默認是無限的。因此Tomcat的TaskQueue繼承了LinkedBlockingQueue,重寫了offer方法,在裏面定義了何時返回false。

@Override
public boolean offer(Runnable o) {
    if (parent==null) return super.offer(o);
    //若是當前線程數等於最大線程數,此時不能建立新線程,只能添加進任務隊列中
    if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
    //若是已提交可是未完成的任務數小於等於當前線程數,說明能處理過來,就放入隊列中
    if (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o);
    //到這一步說明,已提交可是未完成的任務數大於當前線程數,若是當前線程數小於最大線程數,就返回false新建線程
    if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
    return super.offer(o);
}

這就是submittedCount的意義,目的就是爲了在任務隊列長度無限的狀況下,讓線程池有機會建立新的線程。

總結

上面的知識有部分是看着李號雙老師的深刻拆解Tomcat總結的,又結合着源碼深刻了解了一下,當時剛看文章的時候以爲本身都懂了,可是再深刻源碼的時候又會發現本身不懂。因此知識若是隻是看了而不運用,那麼知識永遠都不會是本身的。經過Tomcat鏈接器這一小塊的源碼學習,除了一些經常使用知識的實際運用,例如AQS、鎖的應用、自定義線程池須要考慮的點、NIO的應用等等。還有整體上的設計思惟的學習,模塊化設計,和現在的微服務感受很類似,將一個功能點內部分爲多種模塊,這樣不管是在之後替換或者是升級時都能遊刃有餘。

往期文章

如何斷點調試Tomcat源碼

死磕Tomcat系列(1)——總體架構

一次奇怪的StackOverflowError問題查找之旅

徒手擼一個簡單的RPC框架

徒手擼一個簡單的RPC框架(2)——項目改造

徒手擼一個簡單的IOC

相關文章
相關標籤/搜索