前言
上篇文章講到了 ProtocolHandler 及其默認實現類 Http11NioProtocol,在 Http11NioProtocol 的構造方法中建立了一個 NioEndpoint 對象,而且在 Http11NioProtocol 的 init 和 start 方法中最重要的步驟是調用這個 NioEndpoint 對象的 init 和 start 方法。NioEndpoint 繼承自 AbstractJsseEndpoint,而 AbstractJsseEndpoint 繼承自 AbstractEndpoint。tomcat
1. AbstractEndpoint#init
NioEndpoint 的 init 方法在起父類的父類 AbstractEndpoint 裏。dom
private boolean bindOnInit = true; public final void init() throws Exception { if (bindOnInit) { bindWithCleanup(); bindState = BindState.BOUND_ON_INIT; } if (this.domain != null) { // Register endpoint (as ThreadPool - historical name) oname = new ObjectName(domain + ":type=ThreadPool,name=\"" + getName() + "\""); Registry.getRegistry(null, null).registerComponent(this, oname, null); ObjectName socketPropertiesOname = new ObjectName(domain + ":type=ThreadPool,name=\"" + getName() + "\",subType=SocketProperties"); socketProperties.setObjectName(socketPropertiesOname); Registry.getRegistry(null, null).registerComponent(socketProperties, socketPropertiesOname, null); for (SSLHostConfig sslHostConfig : findSslHostConfigs()) { registerJmx(sslHostConfig); } } }
private void bindWithCleanup() throws Exception { try { bind(); } catch (Throwable t) { // Ensure open sockets etc. are cleaned up if something goes // wrong during bind ExceptionUtils.handleThrowable(t); unbind(); throw t; } }
在 init 方法裏,首先調用了 bindWithCleanup() 方法,而後根據須要的 SSLHostConfig 調用了 registerJmx(sslHostConfig) 方法,registerJmx 方法是將 sslHostConfig 註冊到 MBeanServer 中。
bindWithCleanup() 方法裏就只是調用了 bind() 方法,bind() 是一個 abstract 方法,其實如今 NioEndpoint 類socket
2. NioEndpoint#bindide
protected int acceptorThreadCount = 1; private int pollerThreadCount = Math.min(2,Runtime.getRuntime().availableProcessors()); /** * Initialize the endpoint. */ @Override public void bind() throws Exception { initServerSocket(); // Initialize thread count defaults for acceptor, poller if (acceptorThreadCount == 0) { // FIXME: Doesn't seem to work that well with multiple accept threads acceptorThreadCount = 1; } if (pollerThreadCount <= 0) { //minimum one poller thread pollerThreadCount = 1; } setStopLatch(new CountDownLatch(pollerThreadCount)); // Initialize SSL if needed initialiseSsl(); selectorPool.open(); } protected void setStopLatch(CountDownLatch stopLatch) { this.stopLatch = stopLatch; }
bind 方法中首先調用 initServerSocket 方法,而後初始化呢了 acceptorThreadCount 和 pollerThreadCount 兩個屬性,這兩個屬性一個是指 Accepter 線程的個數,另外一個是指 Poller 線程的個數,acceptorThreadCount 默認是 1,pollerThreadCount 默認是
Accepter 和 Poller 線程構成了 tomcat 的線程模型,
再而後建立一個 CountDownLatch 對象並賦值給 stopLatch 屬性
接着調用 initialiseSsl() 方法來初始化 ssl 的實現類。
2.1. initialiseSslthis
private String sslImplementationName = null; private SSLImplementation sslImplementation = null; public String getSslImplementationName() { return sslImplementationName; } protected void initialiseSsl() throws Exception { if (isSSLEnabled()) { sslImplementation = SSLImplementation.getInstance(getSslImplementationName()); for (SSLHostConfig sslHostConfig : sslHostConfigs.values()) { sslHostConfig.setConfigType(getSslConfigType()); createSSLContext(sslHostConfig); } // Validate default SSLHostConfigName if (sslHostConfigs.get(getDefaultSSLHostConfigName()) == null) { throw new IllegalArgumentException(sm.getString("endpoint.noSslHostConfig", getDefaultSSLHostConfigName(), getName())); } } }
initialiseSsl() 方法就是建立一個 SSLImplementation 的實現類並賦值給 sslImplementation 屬性。
SSLImplementation 是一個抽象類,tomcat 中它的實現類有 JSSEImplementation 和 OpenSSLImplementation。其中 JSSEImplementation 是默認的實現類。
看一看出 bind() 最重要的一步就是調用了 initServerSocket() 方法。線程
2.2 initServerSocketcode
/** * Server socket "pointer". */ private volatile ServerSocketChannel serverSock = null; /** * Allows the server developer to specify the acceptCount (backlog) that * should be used for server sockets. By default, this value * is 100. */ private int acceptCount = 100; public int getAcceptCount() { return acceptCount; } /** * Use System.inheritableChannel to obtain channel from stdin/stdout. */ private boolean useInheritedChannel = false; public boolean getUseInheritedChannel() { return useInheritedChannel; } // Separated out to make it easier for folks that extend NioEndpoint to // implement custom [server]sockets protected void initServerSocket() throws Exception { if (!getUseInheritedChannel()) { serverSock = ServerSocketChannel.open(); socketProperties.setProperties(serverSock.socket()); InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset()); serverSock.socket().bind(addr,getAcceptCount()); } else { // Retrieve the channel provided by the OS Channel ic = System.inheritedChannel(); if (ic instanceof ServerSocketChannel) { serverSock = (ServerSocketChannel) ic; } if (serverSock == null) { throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited")); } } serverSock.configureBlocking(true); //mimic APR behavior }
默認狀況下 useInheritedChannel 是 false,所以會走 if 塊。if 塊裏先初始化 serverSock 這個 ServerSocketChannel 類型的屬性,而後設置了一些 ServerSocketChannel 的屬性。orm
public void setProperties(ServerSocket socket) throws SocketException{ if (rxBufSize != null) socket.setReceiveBufferSize(rxBufSize.intValue()); if (performanceConnectionTime != null && performanceLatency != null && performanceBandwidth != null) socket.setPerformancePreferences( performanceConnectionTime.intValue(), performanceLatency.intValue(), performanceBandwidth.intValue()); if (soReuseAddress != null) socket.setReuseAddress(soReuseAddress.booleanValue()); if (soTimeout != null && soTimeout.intValue() >= 0) socket.setSoTimeout(soTimeout.intValue()); }
socketProperties 在 AbstractEndpoint 裏。server
2.3 selectorPool.open()
bind() 方法最後調用了 selectorPool.open() 方法。對象
private NioSelectorPool selectorPool = new NioSelectorPool();
selectorPool 是 NioEndpoint 裏的一個屬性。
protected NioBlockingSelector blockingSelector; protected volatile Selector SHARED_SELECTOR; public void open() throws IOException { enabled = true; getSharedSelector(); if (SHARED) { blockingSelector = new NioBlockingSelector(); blockingSelector.open(getSharedSelector()); } } protected Selector getSharedSelector() throws IOException { if (SHARED && SHARED_SELECTOR == null) { synchronized ( NioSelectorPool.class ) { if ( SHARED_SELECTOR == null ) { SHARED_SELECTOR = Selector.open(); } } } return SHARED_SELECTOR; }
NioSelectorPool#open 方法裏,先初始化了 SHARED_SELECTOR,而後建立了一個 NioBlockingSelector 對象並賦值給 blockingSelector 屬性,而後調用了這個對象的 open 方法。NioBlockingSelector 是 tomcat 裏定義的類。
protected Selector sharedSelector; protected BlockPoller poller; public void open(Selector selector) { sharedSelector = selector; poller = new BlockPoller(); poller.selector = sharedSelector; poller.setDaemon(true); poller.setName("NioBlockingSelector.BlockPoller-" + threadCounter.incrementAndGet()); poller.start(); }
NioBlockingSelector#open 方法的入參是 NioSelectorPool#open 裏的 SHARED_SELECTOR 對象,在 open 方法裏把 SHARED_SELECTOR 對象賦值給 sharedSelector 屬性,而後建立了一個 BlockPoller 對象,並調用了它的 start 方法,BlockPoller 的父類是 Thread,調用 BlockPoller 的 start 方法實際上是啓動一個線程,BlockPoller 重載了 Thread 的 run 方法。
3. NioEndpoint#start
NioEndpoint 的 start 方法在父類 AbstractEndpoint 裏,
public final void start() throws Exception { if (bindState == BindState.UNBOUND) { bindWithCleanup(); bindState = BindState.BOUND_ON_START; } startInternal(); }
AbstractEndpoint#start 裏只是簡單調用了一下 startInternal() 方法,而 NioEndpoint 重載了 startInternal 方法。
/** * Cache for SocketProcessor objects */ protected SynchronizedStack<SocketProcessorBase<S>> processorCache; /** * Cache for poller events */ private SynchronizedStack<PollerEvent> eventCache; /** * Bytebuffer cache, each channel holds a set of buffers (two, except for SSL holds four) */ private SynchronizedStack<NioChannel> nioChannels; /** * Start the NIO endpoint, creating acceptor, poller threads. */ @Override public void startInternal() throws Exception { if (!running) { running = true; paused = false; processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getProcessorCache()); eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getEventCache()); nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getBufferPool()); // Create worker collection if ( getExecutor() == null ) { createExecutor(); } initializeConnectionLatch(); // Start poller threads pollers = new Poller[getPollerThreadCount()]; for (int i=0; i<pollers.length; i++) { pollers[i] = new Poller(); Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i); pollerThread.setPriority(threadPriority); pollerThread.setDaemon(true); pollerThread.start(); } startAcceptorThreads(); } }
startInternal 方法裏先建立了三個 SynchronizedStack 對象分別賦值給 processorCache,eventCache 和 nioChannels,這三個屬性都使用來複用的,分別複用 SocketProcessorBase 對象,PollerEvent 對象 和 NioChannel 對象。其中 processorCache 在 AbstractEndpoint 裏聲明,其餘兩個在 NioEndpoint 裏聲明。
而後,調用 createExecutor()。createExecutor 在 AbstractEndpoint 裏聲明
private Executor executor = null; public Executor getExecutor() { return executor; } public void createExecutor() { internalExecutor = true; TaskQueue taskqueue = new TaskQueue(); TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority()); executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf); taskqueue.setParent( (ThreadPoolExecutor) executor); }
createExecutor 方法建立了一個線程池而且賦值給 executor 屬性。
接着,startInternal 方法 調用了 initializeConnectionLatch 方法,
protected LimitLatch initializeConnectionLatch() { if (maxConnections==-1) return null; if (connectionLimitLatch==null) { connectionLimitLatch = new LimitLatch(getMaxConnections()); } return connectionLimitLatch; }
initializeConnectionLatch 方法初始換了 connectionLimitLatch 屬性,這個屬性是用來限制 tomcat 的最大鏈接數的。
再而後,startInternal 建立了 pollerThreadCount 個 Poller 對象和線程,並啓動了這些線程,這些線程成爲 Poller 線程。Poller 類實現了 Runnable 接口。
最後 startInternal 調用了 startAcceptorThreads() 方法。
protected void startAcceptorThreads() { int count = getAcceptorThreadCount(); acceptors = new ArrayList<>(count); for (int i = 0; i < count; i++) { Acceptor<U> acceptor = new Acceptor<>(this); String threadName = getName() + "-Acceptor-" + i; acceptor.setThreadName(threadName); acceptors.add(acceptor); Thread t = new Thread(acceptor, threadName); t.setPriority(getAcceptorThreadPriority()); t.setDaemon(getDaemon()); t.start(); } }
startAcceptorThreads 方法裏建立了 acceptorThreadCount 個 Accepter 對象和線程,並啓動了這些線程,這些線程被稱爲 Acceptor 線程。Acceptor 跟 Poller 同樣,也實現了 Runnable 接口。
Acceptor 線程處理客戶端鏈接,而 Poller 處理這些鏈接通道上的讀寫事件。Acceptor 和 Poller 構成了 tomcat 的線程模型,是很是重要的組件,後面的文章會單獨講解,這裏先不作討論。
小結本文介紹了 NioEndpoint 的啓動方法 init 和 start。在 init 方法裏建立了 ServerSocketChannel 對象(在 NioEndpoint#initServerSocket 方法裏)和一個 Selector 對象(在 NioSelectorPool#open 方法裏)。在 start 方法裏,啓動了 Acceptor 線程和 Poller 線程。