在上一節中咱們描述了Tomcat的總體架構,咱們知道了Tomcat分爲兩個大組件,一個鏈接器和一個容器。而咱們此次要講的EndPoint
的組件就是屬於鏈接器裏面的。它是一個通訊的端點,就是負責對外實現TCP/IP協議。EndPoint
是個接口,它的具體實現類就是AbstractEndpoint
,而AbstractEndpoint
具體的實現類就有AprEndpoint
、Nio2Endpoint
、NioEndpoint
。java
AprEndpoint
:對應的是APR模式,簡單理解就是從操做系統級別解決異步IO的問題,大幅度提升服務器的處理和響應性能。可是啓用這種模式須要安裝一些其餘的依賴庫。Nio2Endpoint
:利用代碼來實現異步IONioEndpoint
:利用了JAVA的NIO實現了非阻塞IO,Tomcat默認啓動是以這個來啓動的,而這個也是咱們的講述重點。咱們知道NioEndpoint
的原理仍是對於Linux的多路複用器的使用,而在多路複用器中簡單來講就兩個步驟。編程
而NioEndpoint
爲了實現上面這兩步,用了五個組件來。這五個組件是LimitLatch
、Acceptor
、Poller
、SocketProcessor
、Executor
服務器
/** * Threads used to accept new connections and pass them to worker threads. */ protected List<Acceptor<U>> acceptors; /** * counter for nr of connections handled by an endpoint */ private volatile LimitLatch connectionLimitLatch = null; /** * The socket pollers. */ private Poller[] pollers = null; 內部類 SocketProcessor /** * External Executor based thread pool. */ private Executor executor = null;
咱們能夠看到在代碼中定義的這五個組件。具體這五個組件是幹嗎的呢?架構
LimitLatch
:鏈接控制器,負責控制最大的鏈接數Acceptor
:負責接收新的鏈接,而後返回一個Channel
對象給Poller
Poller
:能夠將其當作是NIO中Selector
,負責監控Channel
的狀態SocketProcessor
:能夠當作是一個被封裝的任務類Executor
:Tomcat本身擴展的線程池,用來執行任務類用圖簡單表示就是如下的關係app
接下來咱們就來分別的看一下每一個組件裏面關鍵的代碼框架
咱們上面說了LimitLatch
主要是用來控制Tomcat所能接收的最大數量鏈接,若是超過了此鏈接,那麼Tomcat就會將此鏈接線程阻塞等待,等裏面有其餘鏈接釋放了再消費此鏈接。那麼LimitLatch
是如何作到呢?咱們能夠看LimitLatch
這個類異步
public class LimitLatch { private static final Log log = LogFactory.getLog(LimitLatch.class); private class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1L; public Sync() { } @Override protected int tryAcquireShared(int ignored) { long newCount = count.incrementAndGet(); if (!released && newCount > limit) { // Limit exceeded count.decrementAndGet(); return -1; } else { return 1; } } @Override protected boolean tryReleaseShared(int arg) { count.decrementAndGet(); return true; } } private final Sync sync; //當前鏈接數 private final AtomicLong count; //最大鏈接數 private volatile long limit; private volatile boolean released = false; }
咱們能夠看到它內部實現了AbstractQueuedSynchronizer
,AQS其實就是一個框架,實現它的類能夠自定義控制線程何時掛起何時釋放。limit
參數就是控制的最大鏈接數。咱們能夠看到AbstractEndpoint
調用LimitLatch
的countUpOrAwait
方法來判斷是否能獲取鏈接。socket
public void countUpOrAwait() throws InterruptedException { if (log.isDebugEnabled()) { log.debug("Counting up["+Thread.currentThread().getName()+"] latch="+getCount()); } sync.acquireSharedInterruptibly(1); }
AQS是如何知道何時阻塞線程呢?即不能獲取鏈接呢?這些就靠用戶本身實現AbstractQueuedSynchronizer
本身來定義何時獲取鏈接,何時釋放鏈接了。能夠看到Sync類重寫了tryAcquireShared
和tryReleaseShared
方法。在tryAcquireShared
方法中定義了一旦當前鏈接數大於了設置的最大鏈接數,那麼就會返回-1
表示將此線程放入AQS隊列中等待。ide
Acceptor
是接收鏈接的,咱們能夠看到Acceptor
實現了Runnable
接口,那麼在哪會新開啓線程來執行Acceptor
的run方法呢?在AbstractEndpoint
的startAcceptorThreads
方法中。模塊化
protected void startAcceptorThreads() { int count = getAcceptorThreadCount(); acceptors = new ArrayList<>(count); for (int i = 0; i < count; i++) { Acceptor<U> acceptor = new Acceptor<>(this); String threadName = getName() + "-Acceptor-" + i; acceptor.setThreadName(threadName); acceptors.add(acceptor); Thread t = new Thread(acceptor, threadName); t.setPriority(getAcceptorThreadPriority()); t.setDaemon(getDaemon()); t.start(); } }
能夠看到這裏能夠設置開啓幾個Acceptor
,默認是一個。而一個端口只能對應一個ServerSocketChannel
,那麼這個ServerSocketChannel
在哪初始化呢?咱們能夠看到在 Acceptor<U> acceptor = new Acceptor<>(this);
這句話中傳入了this進去,那麼應該是由Endpoint
組件初始化的鏈接。在NioEndpoint
的initServerSocket
方法中初始化了鏈接。
// Separated out to make it easier for folks that extend NioEndpoint to // implement custom [server]sockets protected void initServerSocket() throws Exception { if (!getUseInheritedChannel()) { serverSock = ServerSocketChannel.open(); socketProperties.setProperties(serverSock.socket()); InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset()); serverSock.socket().bind(addr,getAcceptCount()); } else { // Retrieve the channel provided by the OS Channel ic = System.inheritedChannel(); if (ic instanceof ServerSocketChannel) { serverSock = (ServerSocketChannel) ic; } if (serverSock == null) { throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited")); } } serverSock.configureBlocking(true); //mimic APR behavior }
這裏面咱們可以看到兩點
ServerSocketChannel
被設置成了阻塞的模式,也就是說是以阻塞方式接受鏈接的。或許會有疑問。在平時的NIO編程中Channel不是都要設置成非阻塞模式嗎?這裏解釋一下,若是是設置成非阻塞模式那麼就必須設置一個Selector
不斷的輪詢,可是接受鏈接只須要阻塞一個通道便可。這裏須要注意一點,每一個Acceptor
在生成PollerEvent
對象放入Poller
隊列中時都是隨機取出Poller
對象的,具體代碼能夠看以下,因此Poller
中的Queue
對象設置成了SynchronizedQueue<PollerEvent>
,由於可能有多個Acceptor
同時向此Poller
的隊列中放入PollerEvent
對象。
public Poller getPoller0() { if (pollerThreadCount == 1) { return pollers[0]; } else { int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length; return pollers[idx]; } }
什麼是操做系統級別的鏈接呢?在TCP的三次握手中,系統一般會每個LISTEN狀態的Socket維護兩個隊列,一個是半鏈接隊列(SYN):這些鏈接已經收到客戶端SYN;另外一個是全鏈接隊列(ACCEPT):這些連接已經收到客戶端的ACK,完成了三次握手,等待被應用調用accept方法取走使用。
全部的Acceptor
共用這一個鏈接,在Acceptor
的run
方法中,放一些重要的代碼。
@Override public void run() { // Loop until we receive a shutdown command while (endpoint.isRunning()) { try { //若是到了最大鏈接數,線程等待 endpoint.countUpOrAwaitConnection(); U socket = null; try { //調用accept方法得到一個鏈接 socket = endpoint.serverSocketAccept(); } catch (Exception ioe) { // 出異常之後當前鏈接數減掉1 endpoint.countDownConnection(); } // 配置Socket if (endpoint.isRunning() && !endpoint.isPaused()) { // setSocketOptions() will hand the socket off to // an appropriate processor if successful if (!endpoint.setSocketOptions(socket)) { endpoint.closeSocket(socket); } } else { endpoint.destroySocket(socket); } }
裏面咱們能夠獲得兩點
LimitLatch
組件判斷的。endpoint.setSocketOptions(socket)
這段代碼protected boolean setSocketOptions(SocketChannel socket) { // Process the connection try { // 設置Socket爲非阻塞模式,供Poller調用 socket.configureBlocking(false); Socket sock = socket.socket(); socketProperties.setProperties(sock); NioChannel channel = null; if (nioChannels != null) { channel = nioChannels.pop(); } if (channel == null) { SocketBufferHandler bufhandler = new SocketBufferHandler( socketProperties.getAppReadBufSize(), socketProperties.getAppWriteBufSize(), socketProperties.getDirectBuffer()); if (isSSLEnabled()) { channel = new SecureNioChannel(socket, bufhandler, selectorPool, this); } else { channel = new NioChannel(socket, bufhandler); } } else { channel.setIOChannel(socket); channel.reset(); } //註冊ChannelEvent,實際上是將ChannelEvent放入到隊列中,而後Poller從隊列中取 getPoller0().register(channel); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); try { log.error(sm.getString("endpoint.socketOptionsError"), t); } catch (Throwable tt) { ExceptionUtils.handleThrowable(tt); } // Tell to close the socket return false; } return true; }
其實裏面重要的就是將Acceptor
與一個Poller
綁定起來,而後兩個組件經過隊列通訊,每一個Poller都維護着一個SynchronizedQueue
隊列,ChannelEvent
放入到隊列中,而後Poller
從隊列中取出事件進行消費。
咱們能夠看到Poller
是NioEndpoint
的內部類,而它也是實現了Runnable
接口,能夠看到在其類中維護了一個Quene和Selector,定義以下。因此本質上Poller
就是Selector
。
private Selector selector; private final SynchronizedQueue<PollerEvent> events = new SynchronizedQueue<>();
重點在其run方法中,這裏刪減了一些代碼,只展現重要的。
@Override public void run() { // Loop until destroy() is called while (true) { boolean hasEvents = false; try { if (!close) { //查看是否有鏈接進來,若是有就將Channel註冊進Selector中 hasEvents = events(); } if (close) { events(); timeout(0, false); try { selector.close(); } catch (IOException ioe) { log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe); } break; } } catch (Throwable x) { ExceptionUtils.handleThrowable(x); log.error(sm.getString("endpoint.nio.selectorLoopError"), x); continue; } if (keyCount == 0) { hasEvents = (hasEvents | events()); } Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; // Walk through the collection of ready keys and dispatch // any active event. while (iterator != null && iterator.hasNext()) { SelectionKey sk = iterator.next(); NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment(); // Attachment may be null if another thread has called // cancelledKey() if (socketWrapper == null) { iterator.remove(); } else { iterator.remove(); processKey(sk, socketWrapper); } } // Process timeouts timeout(keyCount,hasEvents); } getStopLatch().countDown(); }
其中主要的就是調用了events()
方法,就是不斷的查看隊列中是否有Pollerevent
事件,若是有的話就將其取出而後把裏面的Channel
取出來註冊到該Selector
中,而後不斷輪詢全部註冊過的Channel
查看是否有事件發生。
咱們知道Poller
在輪詢Channel
有事件發生時,就會調用將此事件封裝起來,而後交給線程池去執行。那麼這個包裝類就是SocketProcessor
。而咱們打開此類,可以看到它也實現了Runnable
接口,用來定義線程池Executor
中線程所執行的任務。那麼這裏是如何將Channel
中的字節流轉換爲Tomcat須要的ServletRequest
對象呢?其實就是調用了Http11Processor
來進行字節流與對象的轉換的。
Executor
實際上是Tomcat定製版的線程池。咱們能夠看它的類的定義,能夠發現它實際上是擴展了Java的線程池。
public interface Executor extends java.util.concurrent.Executor, Lifecycle
在線程池中最重要的兩個參數就是核心線程數和最大線程數,正常的Java線程池的執行流程是這樣的。
可是在Tomcat自定義的線程池中是不同的,經過重寫了execute
方法實現了本身的任務處理邏輯。
差異就在於第四步的差異,原生線程池的處理策略是隻要當前線程數大於最大線程數,那麼就拋異常,而Tomcat的則是若是當前線程數大於最大線程數,就再嘗試一次,若是仍是滿的纔會拋異常。下面是定製化線程池execute
的執行邏輯。
public void execute(Runnable command, long timeout, TimeUnit unit) { submittedCount.incrementAndGet(); try { super.execute(command); } catch (RejectedExecutionException rx) { if (super.getQueue() instanceof TaskQueue) { //得到任務隊列 final TaskQueue queue = (TaskQueue)super.getQueue(); try { if (!queue.force(command, timeout, unit)) { submittedCount.decrementAndGet(); throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull")); } } catch (InterruptedException x) { submittedCount.decrementAndGet(); throw new RejectedExecutionException(x); } } else { submittedCount.decrementAndGet(); throw rx; } } }
在代碼中,咱們能夠看到有這麼一句submittedCount.incrementAndGet();
,爲何會有這句呢?咱們能夠看看這個參數的定義。簡單來講這個參數就是定義了任務已經提交到了線程池中,可是尚未執行的任務個數。
/** * The number of tasks submitted but not yet finished. This includes tasks * in the queue and tasks that have been handed to a worker thread but the * latter did not start executing the task yet. * This number is always greater or equal to {@link #getActiveCount()}. */ private final AtomicInteger submittedCount = new AtomicInteger(0);
爲何會有這麼一個參數呢?咱們知道定製的隊列是繼承了LinkedBlockingQueue
,而LinkedBlockingQueue
隊列默認是沒有邊界的。因而咱們就傳入了一個參數,maxQueueSize
給構造的隊列。可是在Tomcat的任務隊列默認狀況下是無限制的,那麼這樣就會出一個問題,若是當前線程達到了核心線程數,則開始向隊列中添加任務,那麼就會一直是添加成功的。那麼就不會再建立新的線程。那麼在什麼狀況下要新建線程呢?
線程池中建立新線程會有兩個地方,一個是小於核心線程時,來一個任務建立一個線程。另外一個是超過核心線程而且任務隊列已滿,則會建立臨時線程。
那麼如何規定任務隊列是否已滿呢?若是設置了隊列的最大長度固然好了,可是Tomcat默認狀況下是沒有設置,因此默認是無限的。因此Tomcat的TaskQueue
繼承了LinkedBlockingQueue
,重寫了offer
方法,在裏面定義了何時返回false。
@Override public boolean offer(Runnable o) { if (parent==null) return super.offer(o); //若是當前線程數等於最大線程數,此時不能建立新線程,只能添加進任務隊列中 if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o); //若是已提交可是未完成的任務數小於等於當前線程數,說明能處理過來,就放入隊列中 if (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o); //到這一步說明,已提交可是未完成的任務數大於當前線程數,若是當前線程數小於最大線程數,就返回false新建線程 if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false; return super.offer(o); }
這就是submittedCount
的意義,目的就是爲了在任務隊列長度無限的狀況下,讓線程池有機會建立新的線程。
上面的知識有部分是看着李號雙老師的深刻拆解Tomcat總結的,又結合着源碼深刻了解了一下,當時剛看文章的時候以爲本身都懂了,可是再深刻源碼的時候又會發現本身不懂。因此知識若是隻是看了而不運用,那麼知識永遠都不會是本身的。經過Tomcat鏈接器這一小塊的源碼學習,除了一些經常使用知識的實際運用,例如AQS、鎖的應用、自定義線程池須要考慮的點、NIO的應用等等。還有整體上的設計思惟的學習,模塊化設計,和現在的微服務感受很類似,將一個功能點內部分爲多種模塊,這樣不管是在之後替換或者是升級時都能遊刃有餘。