tomcat的NIO線程模型源碼分析

#1 tomcat8的併發參數控制html

這種問題其實到官方文檔上查看一番就能夠知道,tomcat很早的版本仍是使用的BIO,以後就支持NIO了,具體版本我也不記得了,有興趣的本身能夠去查下。本篇的tomcat版本是tomcat8.5。能夠到這裏看下tomcat8.5的配置參數apache

咱們先來簡單回顧下目前通常的NIO服務器端的大體實現,借鑑infoq上的一篇文章Netty系列之Netty線程模型中的一張圖緩存

通常NIO線程模型

  • 一個或多個Acceptor線程,每一個線程都有本身的Selector,Acceptor只負責accept新的鏈接,一旦鏈接創建以後就將鏈接註冊到其餘Worker線程中tomcat

  • 多個Worker線程,有時候也叫IO線程,就是專門負責IO讀寫的。一種實現方式就是像Netty同樣,每一個Worker線程都有本身的Selector,能夠負責多個鏈接的IO讀寫事件,每一個鏈接歸屬於某個線程。另外一種方式實現方式就是有專門的線程負責IO事件監聽,這些線程有本身的Selector,一旦監聽到有IO讀寫事件,並非像第一種實現方式那樣(本身去執行IO操做),而是將IO操做封裝成一個Runnable交給Worker線程池來執行,這種狀況每一個鏈接可能會被多個線程同時操做,相比第一種併發性提升了,可是也可能引來多線程問題,在處理上要更加謹慎些。tomcat的NIO模型就是第二種。服務器

因此通常參數就是Acceptor線程個數,Worker線程個數。來具體看下參數多線程

##1.1 acceptCount架構

文檔描述爲:併發

The maximum queue length for incoming connection requests when all possible request processing threads are in use. Any requests received when the queue is full will be refused. The default value is 100.app

這個參數就立馬牽涉出一塊大內容:TCP三次握手的詳細過程,這個以後再詳細探討。這裏能夠簡單理解爲:鏈接在被ServerSocketChannel accept以前就暫存在這個隊列中,acceptCount就是這個隊列的最大長度。ServerSocketChannel accept就是從這個隊列中不斷取出已經創建鏈接的的請求。因此當ServerSocketChannel accept取出不及時就有可能形成該隊列積壓,一旦滿了鏈接就被拒絕了less

##1.2 acceptorThreadCount

文檔以下描述

The number of threads to be used to accept connections. Increase this value on a multi CPU machine, although you would never really need more than 2. Also, with a lot of non keep alive connections, you might want to increase this value as well. Default value is 1.

Acceptor線程只負責從上述隊列中取出已經創建鏈接的請求。在啓動的時候使用一個ServerSocketChannel監聽一個鏈接端口如8080,能夠有多個Acceptor線程併發不斷調用上述ServerSocketChannel的accept方法來獲取新的鏈接。參數acceptorThreadCount其實使用的Acceptor線程的個數。

##1.3 maxConnections

文檔描述以下

The maximum number of connections that the server will accept and process at any given time. When this number has been reached, the server will accept, but not process, one further connection. This additional connection be blocked until the number of connections being processed falls below maxConnections at which point the server will start accepting and processing new connections again. Note that once the limit has been reached, the operating system may still accept connections based on the acceptCount setting. The default value varies by connector type. For NIO and NIO2 the default is 10000. For APR/native, the default is 8192.

Note that for APR/native on Windows, the configured value will be reduced to the highest multiple of 1024 that is less than or equal to maxConnections. This is done for performance reasons. If set to a value of -1, the maxConnections feature is disabled and connections are not counted.

這裏就是tomcat對於鏈接數的一個控制,即最大鏈接數限制。一旦發現當前鏈接數已經超過了必定的數量(NIO默認是10000),上述的Acceptor線程就被阻塞了,即再也不執行ServerSocketChannel的accept方法從隊列中獲取已經創建的鏈接。可是它並不阻止新的鏈接的創建,新的鏈接的創建過程不是Acceptor控制的,Acceptor僅僅是從隊列中獲取新創建的鏈接。因此當鏈接數已經超過maxConnections後,仍然是能夠創建新的鏈接的,存放在上述acceptCount大小的隊列中,這個隊列裏面的鏈接沒有被Acceptor獲取,就處於鏈接創建了可是不被處理的狀態。當鏈接數低於maxConnections以後,Acceptor線程就再也不阻塞,繼續調用ServerSocketChannel的accept方法從acceptCount大小的隊列中繼續獲取新的鏈接,以後就開始處理這些新的鏈接的IO事件了

##1.4 maxThreads

文檔描述以下

The maximum number of request processing threads to be created by this Connector, which therefore determines the maximum number of simultaneous requests that can be handled. If not specified, this attribute is set to 200. If an executor is associated with this connector, this attribute is ignored as the connector will execute tasks using the executor rather than an internal thread pool.

這個簡單理解就算是上述worker的線程數,下面會詳細的說明。他們專門用於處理IO事件,默認是200。

#2 tomcat的NioEndpoint

上面參數僅僅是簡單瞭解了下參數配置,下面咱們就來詳細研究下tomcat的NIO服務器具體狀況,這就要詳細瞭解下tomcat的NioEndpoint實現了

先來借鑑看下tomcat高併發場景下的BUG排查中的一張圖

輸入圖片說明

這張圖勾畫出了NioEndpoint的大體執行流程圖,worker線程並無體現出來,它是做爲一個線程池不斷的執行IO讀寫事件即SocketProcessor(一個Runnable),即這裏的Poller僅僅監聽Socket的IO事件,而後封裝成一個個的SocketProcessor交給worker線程池來處理。下面咱們來詳細的介紹下NioEndpoint中的Acceptor、Poller、SocketProcessor

##2.1 Acceptor

###2.1.1 初始化過程

獲取指定的Acceptor數量的線程

protected final void startAcceptorThreads() {
    int count = getAcceptorThreadCount();
    acceptors = new Acceptor[count];

    for (int i = 0; i < count; i++) {
        acceptors[i] = createAcceptor();
        String threadName = getName() + "-Acceptor-" + i;
        acceptors[i].setThreadName(threadName);
        Thread t = new Thread(acceptors[i], threadName);
        t.setPriority(getAcceptorThreadPriority());
        t.setDaemon(getDaemon());
        t.start();
    }
}

###2.1.2 Acceptor的run方法

protected class Acceptor extends AbstractEndpoint.Acceptor {

    @Override
    public void run() {

        int errorDelay = 0;

        // Loop until we receive a shutdown command
        while (running) {

            // Loop if endpoint is paused
            while (paused && running) {
                state = AcceptorState.PAUSED;
                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    // Ignore
                }
            }

            if (!running) {
                break;
            }
            state = AcceptorState.RUNNING;

            try {
                //if we have reached max connections, wait
                countUpOrAwaitConnection();

                SocketChannel socket = null;
                try {
                    // Accept the next incoming connection from the server
                    // socket
                    socket = serverSock.accept();
                } catch (IOException ioe) {
                    //we didn't get a socket
                    countDownConnection();
                    // Introduce delay if necessary
                    errorDelay = handleExceptionWithDelay(errorDelay);
                    // re-throw
                    throw ioe;
                }
                // Successful accept, reset the error delay
                errorDelay = 0;

                // setSocketOptions() will add channel to the poller
                // if successful
                if (running && !paused) {
                    if (!setSocketOptions(socket)) {
                        countDownConnection();
                        closeSocket(socket);
                    }
                } else {
                    countDownConnection();
                    closeSocket(socket);
                }
            } catch (SocketTimeoutException sx) {
                // Ignore: Normal condition
            } catch (IOException x) {
                if (running) {
                    log.error(sm.getString("endpoint.accept.fail"), x);
                }
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                log.error(sm.getString("endpoint.accept.fail"), t);
            }
        }
        state = AcceptorState.ENDED;
    }
}

能夠看到就是一個while循環,循環裏面不斷的accept新的鏈接。

###2.1.3 countUpOrAwaitConnection

先來看下在accept新的鏈接以前,首選進行鏈接數的自增,即countUpOrAwaitConnection

protected void countUpOrAwaitConnection() throws InterruptedException {
    if (maxConnections==-1) return;
    LimitLatch latch = connectionLimitLatch;
    if (latch!=null) latch.countUpOrAwait();
}

當咱們設置maxConnections=-1的時候就表示不用限制最大鏈接數。默認是限制10000,若是不限制則一旦出現大的衝擊,則tomcat頗有可能直接掛掉,致使服務中止。

這裏的需求就是當前鏈接數一旦超過最大鏈接數maxConnections,就直接阻塞了,一旦當前鏈接數小於最大鏈接數maxConnections,就再也不阻塞,咱們來看下這個功能的具體實現latch.countUpOrAwait()

具體看這個需求無非就是一個共享鎖,來看具體實現:

LimitLatch實現

目前實現裏算是使用了2個鎖,LimitLatch自己的AQS實現再加上AtomicLong的AQS實現。也能夠不使用AtomicLong來實現。

共享鎖的tryAcquireShared實現中,若是不依託AtomicLong,則須要進行for循環加CAS的自增,自增以後沒有超過limit這裏即maxConnections,則直接返回1表示獲取到了共享鎖,若是一旦超過limit則首先進行for循環加CAS的自減,而後返回-1表示獲取鎖失敗,便進入加入同步隊列進入阻塞狀態。

共享鎖的tryReleaseShared實現中,該方法可能會被併發執行,因此釋放共享鎖的時候也是須要for循環加CAS的自減

上述的for循環加CAS的自增、for循環加CAS的自減的實現所有被替換成了AtomicLong的incrementAndGet和decrementAndGet而已。

上文咱們關注的latch.countUpOrAwait()方法其實就是在獲取一個共享鎖,以下:

/**
 * Acquires a shared latch if one is available or waits for one if no shared
 * latch is current available.
 * [@throws](http://my.oschina.net/throws) InterruptedException If the current thread is interrupted
 */
public void countUpOrAwait() throws InterruptedException {
    if (log.isDebugEnabled()) {
        log.debug("Counting up["+Thread.currentThread().getName()+"] latch="+getCount());
    }
    sync.acquireSharedInterruptibly(1);
}

###2.1.4 鏈接的處理

從上面能夠看到在真正獲取一個鏈接以前,首先是把鏈接計數先自增了。一旦TCP三次握手成功鏈接創建,就能從ServerSocketChannel的accept方法中獲取到新的鏈接了。一旦獲取鏈接或者處理過程發生異常則須要將當前鏈接數自減的,不然會形成鏈接數虛高,即當前鏈接數並無那麼多,可是當前鏈接數卻很大,一旦超過最大鏈接數,就致使其餘請求所有阻塞,沒有辦法被ServerSocketChannel的accept處理。該bug在Tomcat7.0.26版本中出現了,詳細見這裏的一篇文章Tomcat7.0.26的鏈接數控制bug的問題排查

而後咱們來看下,一個SocketChannel鏈接被accept獲取以後如何來處理的呢?

protected boolean setSocketOptions(SocketChannel socket) {
    // Process the connection
    try {
        //disable blocking, APR style, we are gonna be polling it
        socket.configureBlocking(false);
        Socket sock = socket.socket();
        socketProperties.setProperties(sock);

        NioChannel channel = nioChannels.pop();
        if (channel == null) {
            SocketBufferHandler bufhandler = new SocketBufferHandler(
                    socketProperties.getAppReadBufSize(),
                    socketProperties.getAppWriteBufSize(),
                    socketProperties.getDirectBuffer());
            if (isSSLEnabled()) {
                channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
            } else {
                channel = new NioChannel(socket, bufhandler);
            }
        } else {
            channel.setIOChannel(socket);
            channel.reset();
        }
        getPoller0().register(channel);
    } catch (Throwable t) {
        ExceptionUtils.handleThrowable(t);
        try {
            log.error("",t);
        } catch (Throwable tt) {
            ExceptionUtils.handleThrowable(t);
        }
        // Tell to close the socket
        return false;
    }
    return true;
}

處理過程以下:

  • 設置非阻塞,以及其餘的一些參數如SoTimeout、ReceiveBufferSize、SendBufferSize

  • 而後將SocketChannel封裝成一個NioChannel,封裝過程使用了緩存,即避免了重複建立NioChannel對象,直接利用原有的NioChannel,並將NioChannel中的數據所有清空。也正是這個緩存也形成了一次bug,詳見斷網故障時Mtop觸發tomcat高併發場景下的BUG排查和修復(已被apache採納)

  • 選擇一個Poller進行註冊

下面就來詳細介紹下Poller

##2.2 Poller

###2.2.1 初始化過程

// Start poller threads
pollers = new Poller[getPollerThreadCount()];
for (int i=0; i<pollers.length; i++) {
    pollers[i] = new Poller();
    Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
    pollerThread.setPriority(threadPriority);
    pollerThread.setDaemon(true);
    pollerThread.start();
}

前面沒有說到Poller的數量控制,來看下

/**
 * Poller thread count.
 */
private int pollerThreadCount = Math.min(2,Runtime.getRuntime().availableProcessors());
public void setPollerThreadCount(int pollerThreadCount) { this.pollerThreadCount = pollerThreadCount; }
public int getPollerThreadCount() { return pollerThreadCount; }

若是不設置的話最大就是2

###2.2.2 Poller註冊SocketChannel

來詳細看下getPoller0().register(channel):

public Poller getPoller0() {
    int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
    return pollers[idx];
}

就是輪訓一個Poller來進行SocketChannel的註冊

/**
 * Registers a newly created socket with the poller.
 *
 * [@param](http://my.oschina.net/u/2303379) socket    The newly created socket
 */
public void register(final NioChannel socket) {
    socket.setPoller(this);
    NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
    socket.setSocketWrapper(ka);
    ka.setPoller(this);
    ka.setReadTimeout(getSocketProperties().getSoTimeout());
    ka.setWriteTimeout(getSocketProperties().getSoTimeout());
    ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
    ka.setSecure(isSSLEnabled());
    ka.setReadTimeout(getSoTimeout());
    ka.setWriteTimeout(getSoTimeout());
    PollerEvent r = eventCache.pop();
    ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
    if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
    else r.reset(socket,ka,OP_REGISTER);
    addEvent(r);
}

private void addEvent(PollerEvent event) {
    events.offer(event);
    if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup();
}

private final SynchronizedQueue<PollerEvent> events =
            new SynchronizedQueue<>();

這裏又是進行一些參數包裝,將socket和Poller的關係綁定,再次從緩存中取出或者從新構建一個PollerEvent,而後將該event放到Poller的事件隊列中等待被異步處理

###2.2.3 Poller的run方法

在Poller的run方法中不斷處理上述事件隊列中的事件,直接執行PollerEvent的run方法,將SocketChannel註冊到本身的Selector上。

public boolean events() {
    boolean result = false;

    PollerEvent pe = null;
    while ( (pe = events.poll()) != null ) {
        result = true;
        try {
            pe.run();
            pe.reset();
            if (running && !paused) {
                eventCache.push(pe);
            }
        } catch ( Throwable x ) {
            log.error("",x);
        }
    }

    return result;
}

並將Selector監聽到的IO讀寫事件封裝成SocketProcessor,交給線程池執行

SocketProcessor sc = processorCache.pop();
if ( sc == null ) sc = new SocketProcessor(attachment, status);
else sc.reset(attachment, status);
Executor executor = getExecutor();
if (dispatch && executor != null) {
    executor.execute(sc);
} else {
    sc.run();
}

咱們來看看這個線程池的初始化:

public void createExecutor() {
    internalExecutor = true;
    TaskQueue taskqueue = new TaskQueue();
    TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
    executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
    taskqueue.setParent( (ThreadPoolExecutor) executor);
}

就是建立了一個ThreadPoolExecutor,那咱們就重點關注下核心線程數、最大線程數、任務隊列等信息

private int minSpareThreads = 10;
public int getMinSpareThreads() {
    return Math.min(minSpareThreads,getMaxThreads());
}

核心線程數最大是10個,再來看下最大線程數

private int maxThreads = 200;

默認就是上面的配置參數maxThreads爲200。還有就是TaskQueue,這裏的TaskQueue是LinkedBlockingQueue<Runnable>的子類,最大容量就是Integer.MAX_VALUE,根據以前ThreadPoolExecutor的源碼分析,核心線程數滿了以後,會先將任務放到隊列中,隊列滿了纔會建立出新的非核心線程,若是隊列是一個大容量的話,也就是不會到建立新的非核心線程那一步了。

可是這裏的TaskQueue修改了底層offer的實現

public boolean offer(Runnable o) {
  //we can't do any checks
    if (parent==null) return super.offer(o);
    //we are maxed out on threads, simply queue the object
    if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
    //we have idle threads, just add it to the queue
    if (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o);
    //if we have less threads than maximum force creation of a new thread
    if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
    //if we reached here, we need to add it to the queue
    return super.offer(o);
}

這裏當線程數小於最大線程數的時候就直接返回false即入隊列失敗,則迫使ThreadPoolExecutor建立出新的非核心線程。

TaskQueue這一塊沒太看懂它的意圖是什麼,有待繼續研究。

#3 結束語

本篇文章描述了tomcat8.5中的NIO線程模型,以及其中涉及到的相關參數的設置。下一篇簡單整理下tomcat的總體架構圖

相關文章
相關標籤/搜索