在上一節中咱們描述了Tomcat的總體架構,咱們知道了Tomcat分爲兩個大組件,一個鏈接器和一個容器。而咱們此次要講的EndPoint
的組件就是屬於鏈接器裏面的。它是一個通訊的端點,就是負責對外實現TCP/IP協議。EndPoint
是個接口,它的具體實現類就是AbstractEndpoint
,而AbstractEndpoint
具體的實現類就有AprEndpoint
、Nio2Endpoint
、NioEndpoint
。java
AprEndpoint
:對應的是APR模式,簡單理解就是從操做系統級別解決異步IO的問題,大幅度提升服務器的處理和響應性能。可是啓用這種模式須要安裝一些其餘的依賴庫。Nio2Endpoint
:利用代碼來實現異步IONioEndpoint
:利用了JAVA的NIO實現了非阻塞IO,Tomcat默認啓動是以這個來啓動的,而這個也是咱們的講述重點。咱們知道NioEndpoint
的原理仍是對於Linux的多路複用器的使用,而在多路複用器中簡單來講就兩個步驟。編程
而NioEndpoint
爲了實現上面這兩步,用了五個組件來。這五個組件是LimitLatch
、Acceptor
、Poller
、SocketProcessor
、Executor
bash
/**
* Threads used to accept new connections and pass them to worker threads.
*/
protected List<Acceptor<U>> acceptors;
/**
* counter for nr of connections handled by an endpoint
*/
private volatile LimitLatch connectionLimitLatch = null;
/**
* The socket pollers.
*/
private Poller[] pollers = null;
內部類
SocketProcessor
/**
* External Executor based thread pool.
*/
private Executor executor = null;
複製代碼
咱們能夠看到在代碼中定義的這五個組件。具體這五個組件是幹嗎的呢?服務器
LimitLatch
:鏈接控制器,負責控制最大的鏈接數Acceptor
:負責接收新的鏈接,而後返回一個Channel
對象給Poller
Poller
:能夠將其當作是NIO中Selector
,負責監控Channel
的狀態SocketProcessor
:能夠當作是一個被封裝的任務類Executor
:Tomcat本身擴展的線程池,用來執行任務類用圖簡單表示就是如下的關係架構
接下來咱們就來分別的看一下每一個組件裏面關鍵的代碼app
咱們上面說了LimitLatch
主要是用來控制Tomcat所能接收的最大數量鏈接,若是超過了此鏈接,那麼Tomcat就會將此鏈接線程阻塞等待,等裏面有其餘鏈接釋放了再消費此鏈接。那麼LimitLatch
是如何作到呢?咱們能夠看LimitLatch
這個類框架
public class LimitLatch {
private static final Log log = LogFactory.getLog(LimitLatch.class);
private class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1L;
public Sync() {
}
@Override
protected int tryAcquireShared(int ignored) {
long newCount = count.incrementAndGet();
if (!released && newCount > limit) {
// Limit exceeded
count.decrementAndGet();
return -1;
} else {
return 1;
}
}
@Override
protected boolean tryReleaseShared(int arg) {
count.decrementAndGet();
return true;
}
}
private final Sync sync;
//當前鏈接數
private final AtomicLong count;
//最大鏈接數
private volatile long limit;
private volatile boolean released = false;
}
複製代碼
咱們能夠看到它內部實現了AbstractQueuedSynchronizer
,AQS其實就是一個框架,實現它的類能夠自定義控制線程何時掛起何時釋放。limit
參數就是控制的最大鏈接數。咱們能夠看到AbstractEndpoint
調用LimitLatch
的countUpOrAwait
方法來判斷是否能獲取鏈接。異步
public void countUpOrAwait() throws InterruptedException {
if (log.isDebugEnabled()) {
log.debug("Counting up["+Thread.currentThread().getName()+"] latch="+getCount());
}
sync.acquireSharedInterruptibly(1);
}
複製代碼
AQS是如何知道何時阻塞線程呢?即不能獲取鏈接呢?這些就靠用戶本身實現AbstractQueuedSynchronizer
本身來定義何時獲取鏈接,何時釋放鏈接了。能夠看到Sync類重寫了tryAcquireShared
和tryReleaseShared
方法。在tryAcquireShared
方法中定義了一旦當前鏈接數大於了設置的最大鏈接數,那麼就會返回-1
表示將此線程放入AQS隊列中等待。socket
Acceptor
是接收鏈接的,咱們能夠看到Acceptor
實現了Runnable
接口,那麼在哪會新開啓線程來執行Acceptor
的run方法呢?在AbstractEndpoint
的startAcceptorThreads
方法中。ide
protected void startAcceptorThreads() {
int count = getAcceptorThreadCount();
acceptors = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
Acceptor<U> acceptor = new Acceptor<>(this);
String threadName = getName() + "-Acceptor-" + i;
acceptor.setThreadName(threadName);
acceptors.add(acceptor);
Thread t = new Thread(acceptor, threadName);
t.setPriority(getAcceptorThreadPriority());
t.setDaemon(getDaemon());
t.start();
}
}
複製代碼
能夠看到這裏能夠設置開啓幾個Acceptor
,默認是一個。而一個端口只能對應一個ServerSocketChannel
,那麼這個ServerSocketChannel
在哪初始化呢?咱們能夠看到在Acceptor<U> acceptor = new Acceptor<>(this);
這句話中傳入了this進去,那麼應該是由Endpoint
組件初始化的鏈接。在NioEndpoint
的initServerSocket
方法中初始化了鏈接。
// Separated out to make it easier for folks that extend NioEndpoint to
// implement custom [server]sockets
protected void initServerSocket() throws Exception {
if (!getUseInheritedChannel()) {
serverSock = ServerSocketChannel.open();
socketProperties.setProperties(serverSock.socket());
InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset());
serverSock.socket().bind(addr,getAcceptCount());
} else {
// Retrieve the channel provided by the OS
Channel ic = System.inheritedChannel();
if (ic instanceof ServerSocketChannel) {
serverSock = (ServerSocketChannel) ic;
}
if (serverSock == null) {
throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited"));
}
}
serverSock.configureBlocking(true); //mimic APR behavior
}
複製代碼
這裏面咱們可以看到兩點
ServerSocketChannel
被設置成了阻塞的模式,也就是說是以阻塞方式接受鏈接的。或許會有疑問。在平時的NIO編程中Channel不是都要設置成非阻塞模式嗎?這裏解釋一下,若是是設置成非阻塞模式那麼就必須設置一個Selector
不斷的輪詢,可是接受鏈接只須要阻塞一個通道便可。這裏須要注意一點,每一個Acceptor
在生成PollerEvent
對象放入Poller
隊列中時都是隨機取出Poller
對象的,具體代碼能夠看以下,因此Poller
中的Queue
對象設置成了SynchronizedQueue<PollerEvent>
,由於可能有多個Acceptor
同時向此Poller
的隊列中放入PollerEvent
對象。
public Poller getPoller0() {
if (pollerThreadCount == 1) {
return pollers[0];
} else {
int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
return pollers[idx];
}
}
複製代碼
什麼是操做系統級別的鏈接呢?在TCP的三次握手中,系統一般會每個LISTEN狀態的Socket維護兩個隊列,一個是半鏈接隊列(SYN):這些鏈接已經收到客戶端SYN;另外一個是全鏈接隊列(ACCEPT):這些連接已經收到客戶端的ACK,完成了三次握手,等待被應用調用accept方法取走使用。
全部的Acceptor
共用這一個鏈接,在Acceptor
的run
方法中,放一些重要的代碼。
@Override
public void run() {
// Loop until we receive a shutdown command
while (endpoint.isRunning()) {
try {
//若是到了最大鏈接數,線程等待
endpoint.countUpOrAwaitConnection();
U socket = null;
try {
//調用accept方法得到一個鏈接
socket = endpoint.serverSocketAccept();
} catch (Exception ioe) {
// 出異常之後當前鏈接數減掉1
endpoint.countDownConnection();
}
// 配置Socket
if (endpoint.isRunning() && !endpoint.isPaused()) {
// setSocketOptions() will hand the socket off to
// an appropriate processor if successful
if (!endpoint.setSocketOptions(socket)) {
endpoint.closeSocket(socket);
}
} else {
endpoint.destroySocket(socket);
}
}
複製代碼
裏面咱們能夠獲得兩點
LimitLatch
組件判斷的。endpoint.setSocketOptions(socket)
這段代碼protected boolean setSocketOptions(SocketChannel socket) {
// Process the connection
try {
// 設置Socket爲非阻塞模式,供Poller調用
socket.configureBlocking(false);
Socket sock = socket.socket();
socketProperties.setProperties(sock);
NioChannel channel = null;
if (nioChannels != null) {
channel = nioChannels.pop();
}
if (channel == null) {
SocketBufferHandler bufhandler = new SocketBufferHandler(
socketProperties.getAppReadBufSize(),
socketProperties.getAppWriteBufSize(),
socketProperties.getDirectBuffer());
if (isSSLEnabled()) {
channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
} else {
channel = new NioChannel(socket, bufhandler);
}
} else {
channel.setIOChannel(socket);
channel.reset();
}
//註冊ChannelEvent,實際上是將ChannelEvent放入到隊列中,而後Poller從隊列中取
getPoller0().register(channel);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
try {
log.error(sm.getString("endpoint.socketOptionsError"), t);
} catch (Throwable tt) {
ExceptionUtils.handleThrowable(tt);
}
// Tell to close the socket
return false;
}
return true;
}
複製代碼
其實裏面重要的就是將Acceptor
與一個Poller
綁定起來,而後兩個組件經過隊列通訊,每一個Poller都維護着一個SynchronizedQueue
隊列,ChannelEvent
放入到隊列中,而後Poller
從隊列中取出事件進行消費。
咱們能夠看到Poller
是NioEndpoint
的內部類,而它也是實現了Runnable
接口,能夠看到在其類中維護了一個Quene和Selector,定義以下。因此本質上Poller
就是Selector
。
private Selector selector;
private final SynchronizedQueue<PollerEvent> events = new SynchronizedQueue<>();
複製代碼
重點在其run方法中,這裏刪減了一些代碼,只展現重要的。
@Override
public void run() {
// Loop until destroy() is called
while (true) {
boolean hasEvents = false;
try {
if (!close) {
//查看是否有鏈接進來,若是有就將Channel註冊進Selector中
hasEvents = events();
}
if (close) {
events();
timeout(0, false);
try {
selector.close();
} catch (IOException ioe) {
log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
}
break;
}
} catch (Throwable x) {
ExceptionUtils.handleThrowable(x);
log.error(sm.getString("endpoint.nio.selectorLoopError"), x);
continue;
}
if (keyCount == 0) {
hasEvents = (hasEvents | events());
}
Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
// Walk through the collection of ready keys and dispatch
// any active event.
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
// Attachment may be null if another thread has called
// cancelledKey()
if (socketWrapper == null) {
iterator.remove();
} else {
iterator.remove();
processKey(sk, socketWrapper);
}
}
// Process timeouts
timeout(keyCount,hasEvents);
}
getStopLatch().countDown();
}
複製代碼
其中主要的就是調用了events()
方法,就是不斷的查看隊列中是否有Pollerevent
事件,若是有的話就將其取出而後把裏面的Channel
取出來註冊到該Selector
中,而後不斷輪詢全部註冊過的Channel
查看是否有事件發生。
咱們知道Poller
在輪詢Channel
有事件發生時,就會調用將此事件封裝起來,而後交給線程池去執行。那麼這個包裝類就是SocketProcessor
。而咱們打開此類,可以看到它也實現了Runnable
接口,用來定義線程池Executor
中線程所執行的任務。那麼這裏是如何將Channel
中的字節流轉換爲Tomcat須要的ServletRequest
對象呢?其實就是調用了Http11Processor
來進行字節流與對象的轉換的。
Executor
實際上是Tomcat定製版的線程池。咱們能夠看它的類的定義,能夠發現它實際上是擴展了Java的線程池。
public interface Executor extends java.util.concurrent.Executor, Lifecycle
複製代碼
在線程池中最重要的兩個參數就是核心線程數和最大線程數,正常的Java線程池的執行流程是這樣的。
可是在Tomcat自定義的線程池中是不同的,經過重寫了execute
方法實現了本身的任務處理邏輯。
差異就在於第四步的差異,原生線程池的處理策略是隻要當前線程數大於最大線程數,那麼就拋異常,而Tomcat的則是若是當前線程數大於最大線程數,就再嘗試一次,若是仍是滿的纔會拋異常。下面是定製化線程池execute
的執行邏輯。
public void execute(Runnable command, long timeout, TimeUnit unit) {
submittedCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
if (super.getQueue() instanceof TaskQueue) {
//得到任務隊列
final TaskQueue queue = (TaskQueue)super.getQueue();
try {
if (!queue.force(command, timeout, unit)) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
}
} catch (InterruptedException x) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} else {
submittedCount.decrementAndGet();
throw rx;
}
}
}
複製代碼
在代碼中,咱們能夠看到有這麼一句submittedCount.incrementAndGet();
,爲何會有這句呢?咱們能夠看看這個參數的定義。簡單來講這個參數就是定義了任務已經提交到了線程池中,可是尚未執行的任務個數。
/**
* The number of tasks submitted but not yet finished. This includes tasks
* in the queue and tasks that have been handed to a worker thread but the
* latter did not start executing the task yet.
* This number is always greater or equal to {@link #getActiveCount()}.
*/
private final AtomicInteger submittedCount = new AtomicInteger(0);
複製代碼
爲何會有這麼一個參數呢?咱們知道定製的隊列是繼承了LinkedBlockingQueue
,而LinkedBlockingQueue
隊列默認是沒有邊界的。因而咱們就傳入了一個參數,maxQueueSize
給構造的隊列。可是在Tomcat的任務隊列默認狀況下是無限制的,那麼這樣就會出一個問題,若是當前線程達到了核心線程數,則開始向隊列中添加任務,那麼就會一直是添加成功的。那麼就不會再建立新的線程。那麼在什麼狀況下要新建線程呢?
線程池中建立新線程會有兩個地方,一個是小於核心線程時,來一個任務建立一個線程。另外一個是超過核心線程而且任務隊列已滿,則會建立臨時線程。
那麼如何規定任務隊列是否已滿呢?若是設置了隊列的最大長度固然好了,可是Tomcat默認狀況下是沒有設置,因此默認是無限的。因此Tomcat的TaskQueue
繼承了LinkedBlockingQueue
,重寫了offer
方法,在裏面定義了何時返回false。
@Override
public boolean offer(Runnable o) {
if (parent==null) return super.offer(o);
//若是當前線程數等於最大線程數,此時不能建立新線程,只能添加進任務隊列中
if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
//若是已提交可是未完成的任務數小於等於當前線程數,說明能處理過來,就放入隊列中
if (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o);
//到這一步說明,已提交可是未完成的任務數大於當前線程數,若是當前線程數小於最大線程數,就返回false新建線程
if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
return super.offer(o);
}
複製代碼
這就是submittedCount
的意義,目的就是爲了在任務隊列長度無限的狀況下,讓線程池有機會建立新的線程。
上面的知識有部分是看着李號雙老師的深刻拆解Tomcat總結的,又結合着源碼深刻了解了一下,當時剛看文章的時候以爲本身都懂了,可是再深刻源碼的時候又會發現本身不懂。因此知識若是隻是看了而不運用,那麼知識永遠都不會是本身的。經過Tomcat鏈接器這一小塊的源碼學習,除了一些經常使用知識的實際運用,例如AQS、鎖的應用、自定義線程池須要考慮的點、NIO的應用等等。還有整體上的設計思惟的學習,模塊化設計,和現在的微服務感受很類似,將一個功能點內部分爲多種模塊,這樣不管是在之後替換或者是升級時都能遊刃有餘。