Tomcat源碼解析系列(十二)NioEndpoint

前言
上篇文章講到了 ProtocolHandler 及其默認實現類 Http11NioProtocol,在 Http11NioProtocol 的構造方法中建立了一個 NioEndpoint 對象,而且在 Http11NioProtocol 的 init 和 start 方法中最重要的步驟是調用這個 NioEndpoint 對象的 init 和 start 方法。NioEndpoint 繼承自 AbstractJsseEndpoint,而 AbstractJsseEndpoint 繼承自 AbstractEndpoint。tomcat


1. AbstractEndpoint#init
NioEndpoint 的 init 方法在起父類的父類 AbstractEndpoint 裏。dom

private boolean bindOnInit = true;

public final void init() throws Exception {
    if (bindOnInit) {
        bindWithCleanup();
        bindState = BindState.BOUND_ON_INIT;
    }
    if (this.domain != null) {
        // Register endpoint (as ThreadPool - historical name)
        oname = new ObjectName(domain + ":type=ThreadPool,name=\"" + getName() + "\"");
        Registry.getRegistry(null, null).registerComponent(this, oname, null);

        ObjectName socketPropertiesOname = new ObjectName(domain +
                ":type=ThreadPool,name=\"" + getName() + "\",subType=SocketProperties");
        socketProperties.setObjectName(socketPropertiesOname);
        Registry.getRegistry(null, null).registerComponent(socketProperties, socketPropertiesOname, null);

        for (SSLHostConfig sslHostConfig : findSslHostConfigs()) {
            registerJmx(sslHostConfig);
        }
    }
}
private void bindWithCleanup() throws Exception {
    try {
        bind();
    } catch (Throwable t) {
        // Ensure open sockets etc. are cleaned up if something goes
        // wrong during bind
        ExceptionUtils.handleThrowable(t);
        unbind();
        throw t;
    }
}

在 init 方法裏,首先調用了 bindWithCleanup() 方法,而後根據須要的 SSLHostConfig 調用了 registerJmx(sslHostConfig) 方法,registerJmx 方法是將 sslHostConfig 註冊到 MBeanServer 中。
bindWithCleanup() 方法裏就只是調用了 bind() 方法,bind() 是一個 abstract 方法,其實如今 NioEndpoint 類socket


2. NioEndpoint#bindide

protected int acceptorThreadCount = 1;

private int pollerThreadCount = Math.min(2,Runtime.getRuntime().availableProcessors());

/**
 * Initialize the endpoint.
 */
@Override
public void bind() throws Exception {
    initServerSocket();

    // Initialize thread count defaults for acceptor, poller
    if (acceptorThreadCount == 0) {
        // FIXME: Doesn't seem to work that well with multiple accept threads
        acceptorThreadCount = 1;
    }
    if (pollerThreadCount <= 0) {
        //minimum one poller thread
        pollerThreadCount = 1;
    }
    setStopLatch(new CountDownLatch(pollerThreadCount));

    // Initialize SSL if needed
    initialiseSsl();

    selectorPool.open();
}

protected void setStopLatch(CountDownLatch stopLatch) {
    this.stopLatch = stopLatch;
}

bind 方法中首先調用 initServerSocket 方法,而後初始化呢了 acceptorThreadCount 和 pollerThreadCount 兩個屬性,這兩個屬性一個是指 Accepter 線程的個數,另外一個是指 Poller 線程的個數,acceptorThreadCount 默認是 1,pollerThreadCount 默認是
Accepter 和 Poller 線程構成了 tomcat 的線程模型,
再而後建立一個 CountDownLatch 對象並賦值給 stopLatch 屬性
接着調用 initialiseSsl() 方法來初始化 ssl 的實現類。
2.1. initialiseSslthis

private String sslImplementationName = null;
private SSLImplementation sslImplementation = null;

public String getSslImplementationName() {
    return sslImplementationName;
}

protected void initialiseSsl() throws Exception {
    if (isSSLEnabled()) {
        sslImplementation = SSLImplementation.getInstance(getSslImplementationName());

        for (SSLHostConfig sslHostConfig : sslHostConfigs.values()) {
            sslHostConfig.setConfigType(getSslConfigType());
            createSSLContext(sslHostConfig);
        }

        // Validate default SSLHostConfigName
        if (sslHostConfigs.get(getDefaultSSLHostConfigName()) == null) {
            throw new IllegalArgumentException(sm.getString("endpoint.noSslHostConfig",
                    getDefaultSSLHostConfigName(), getName()));
        }

    }
}

initialiseSsl() 方法就是建立一個 SSLImplementation 的實現類並賦值給 sslImplementation 屬性。
SSLImplementation 是一個抽象類,tomcat 中它的實現類有 JSSEImplementation 和 OpenSSLImplementation。其中 JSSEImplementation 是默認的實現類。
看一看出 bind() 最重要的一步就是調用了 initServerSocket() 方法。線程

2.2 initServerSocketcode

/**
 * Server socket "pointer".
 */
private volatile ServerSocketChannel serverSock = null;

/**
 * Allows the server developer to specify the acceptCount (backlog) that
 * should be used for server sockets. By default, this value
 * is 100.
 */
private int acceptCount = 100;
public int getAcceptCount() { return acceptCount; }


/**
 * Use System.inheritableChannel to obtain channel from stdin/stdout.
 */
private boolean useInheritedChannel = false;
public boolean getUseInheritedChannel() { return useInheritedChannel; }


// Separated out to make it easier for folks that extend NioEndpoint to
// implement custom [server]sockets
protected void initServerSocket() throws Exception {
    if (!getUseInheritedChannel()) {
        serverSock = ServerSocketChannel.open();
        socketProperties.setProperties(serverSock.socket());
        InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset());
        serverSock.socket().bind(addr,getAcceptCount());
    } else {
        // Retrieve the channel provided by the OS
        Channel ic = System.inheritedChannel();
        if (ic instanceof ServerSocketChannel) {
            serverSock = (ServerSocketChannel) ic;
        }
        if (serverSock == null) {
            throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited"));
        }
    }
    serverSock.configureBlocking(true); //mimic APR behavior
}

默認狀況下 useInheritedChannel 是 false,所以會走 if 塊。if 塊裏先初始化 serverSock 這個 ServerSocketChannel 類型的屬性,而後設置了一些 ServerSocketChannel 的屬性。orm

public void setProperties(ServerSocket socket) throws SocketException{
    if (rxBufSize != null)
        socket.setReceiveBufferSize(rxBufSize.intValue());
    if (performanceConnectionTime != null && performanceLatency != null &&
            performanceBandwidth != null)
        socket.setPerformancePreferences(
                performanceConnectionTime.intValue(),
                performanceLatency.intValue(),
                performanceBandwidth.intValue());
    if (soReuseAddress != null)
        socket.setReuseAddress(soReuseAddress.booleanValue());
    if (soTimeout != null && soTimeout.intValue() >= 0)
        socket.setSoTimeout(soTimeout.intValue());
}

socketProperties 在 AbstractEndpoint 裏。server

2.3 selectorPool.open()
bind() 方法最後調用了 selectorPool.open() 方法。對象

private NioSelectorPool selectorPool = new NioSelectorPool();

selectorPool 是 NioEndpoint 裏的一個屬性。

protected NioBlockingSelector blockingSelector;

protected volatile Selector SHARED_SELECTOR;


public void open() throws IOException {
    enabled = true;
    getSharedSelector();
    if (SHARED) {
        blockingSelector = new NioBlockingSelector();
        blockingSelector.open(getSharedSelector());
    }

}

protected Selector getSharedSelector() throws IOException {
    if (SHARED && SHARED_SELECTOR == null) {
        synchronized ( NioSelectorPool.class ) {
            if ( SHARED_SELECTOR == null )  {
                SHARED_SELECTOR = Selector.open();
            }
        }
    }
    return  SHARED_SELECTOR;
}

NioSelectorPool#open 方法裏,先初始化了 SHARED_SELECTOR,而後建立了一個 NioBlockingSelector 對象並賦值給 blockingSelector 屬性,而後調用了這個對象的 open 方法。NioBlockingSelector 是 tomcat 裏定義的類。

protected Selector sharedSelector;

protected BlockPoller poller;

public void open(Selector selector) {
    sharedSelector = selector;
    poller = new BlockPoller();
    poller.selector = sharedSelector;
    poller.setDaemon(true);
    poller.setName("NioBlockingSelector.BlockPoller-" + threadCounter.incrementAndGet());
    poller.start();
}

NioBlockingSelector#open 方法的入參是 NioSelectorPool#open 裏的 SHARED_SELECTOR 對象,在 open 方法裏把 SHARED_SELECTOR 對象賦值給 sharedSelector 屬性,而後建立了一個 BlockPoller 對象,並調用了它的 start 方法,BlockPoller 的父類是 Thread,調用 BlockPoller 的 start 方法實際上是啓動一個線程,BlockPoller 重載了 Thread 的 run 方法。





3. NioEndpoint#start
NioEndpoint 的 start 方法在父類 AbstractEndpoint 裏,

public final void start() throws Exception {
    if (bindState == BindState.UNBOUND) {
        bindWithCleanup();
        bindState = BindState.BOUND_ON_START;
    }
    startInternal();
}

AbstractEndpoint#start 裏只是簡單調用了一下 startInternal() 方法,而 NioEndpoint 重載了 startInternal 方法。

/**
 * Cache for SocketProcessor objects
 */
protected SynchronizedStack<SocketProcessorBase<S>> processorCache;

/**
 * Cache for poller events
 */
private SynchronizedStack<PollerEvent> eventCache;

/**
 * Bytebuffer cache, each channel holds a set of buffers (two, except for SSL holds four)
 */
private SynchronizedStack<NioChannel> nioChannels;


/**
 * Start the NIO endpoint, creating acceptor, poller threads.
 */
@Override
public void startInternal() throws Exception {

    if (!running) {
        running = true;
        paused = false;

        processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                socketProperties.getProcessorCache());
        eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                        socketProperties.getEventCache());
        nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                socketProperties.getBufferPool());

        // Create worker collection
        if ( getExecutor() == null ) {
            createExecutor();
        }

        initializeConnectionLatch();

        // Start poller threads
        pollers = new Poller[getPollerThreadCount()];
        for (int i=0; i<pollers.length; i++) {
            pollers[i] = new Poller();
            Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
            pollerThread.setPriority(threadPriority);
            pollerThread.setDaemon(true);
            pollerThread.start();
        }

        startAcceptorThreads();
    }
}

startInternal 方法裏先建立了三個 SynchronizedStack 對象分別賦值給 processorCache,eventCache 和 nioChannels,這三個屬性都使用來複用的,分別複用 SocketProcessorBase 對象,PollerEvent 對象 和 NioChannel 對象。其中 processorCache 在 AbstractEndpoint 裏聲明,其餘兩個在 NioEndpoint 裏聲明。

而後,調用 createExecutor()。createExecutor 在 AbstractEndpoint 裏聲明

private Executor executor = null;
public Executor getExecutor() { return executor; }

public void createExecutor() {
    internalExecutor = true;
    TaskQueue taskqueue = new TaskQueue();
    TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
    executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
    taskqueue.setParent( (ThreadPoolExecutor) executor);
}

createExecutor 方法建立了一個線程池而且賦值給 executor 屬性。

接着,startInternal 方法 調用了 initializeConnectionLatch 方法,

protected LimitLatch initializeConnectionLatch() {
    if (maxConnections==-1) return null;
    if (connectionLimitLatch==null) {
        connectionLimitLatch = new LimitLatch(getMaxConnections());
    }
    return connectionLimitLatch;
}

initializeConnectionLatch 方法初始換了 connectionLimitLatch 屬性,這個屬性是用來限制 tomcat 的最大鏈接數的。

再而後,startInternal 建立了 pollerThreadCount 個 Poller 對象和線程,並啓動了這些線程,這些線程成爲 Poller 線程。Poller 類實現了 Runnable 接口。

最後 startInternal 調用了 startAcceptorThreads() 方法。

protected void startAcceptorThreads() {
    int count = getAcceptorThreadCount();
    acceptors = new ArrayList<>(count);

    for (int i = 0; i < count; i++) {
        Acceptor<U> acceptor = new Acceptor<>(this);
        String threadName = getName() + "-Acceptor-" + i;
        acceptor.setThreadName(threadName);
        acceptors.add(acceptor);
        Thread t = new Thread(acceptor, threadName);
        t.setPriority(getAcceptorThreadPriority());
        t.setDaemon(getDaemon());
        t.start();
    }
}

startAcceptorThreads 方法裏建立了 acceptorThreadCount 個 Accepter 對象和線程,並啓動了這些線程,這些線程被稱爲 Acceptor 線程。Acceptor 跟 Poller 同樣,也實現了 Runnable 接口。

Acceptor 線程處理客戶端鏈接,而 Poller 處理這些鏈接通道上的讀寫事件。Acceptor 和 Poller 構成了 tomcat 的線程模型,是很是重要的組件,後面的文章會單獨講解,這裏先不作討論。


小結本文介紹了 NioEndpoint 的啓動方法 init 和 start。在 init 方法裏建立了 ServerSocketChannel 對象(在 NioEndpoint#initServerSocket 方法裏)和一個 Selector 對象(在 NioSelectorPool#open 方法裏)。在 start 方法裏,啓動了 Acceptor 線程和 Poller 線程。

相關文章
相關標籤/搜索