NIO2(AIO) 在Tomcat的應用

概述

注: NIO2(AIO) 即異步IOjava

NIO2 簡介

AIONIO2實際上是一回事,如同孫行者、者行孫其實都是孫猴子,只是名稱不一樣本質都同樣git

那麼如何理解這個概念呢?舉個例子github

假設一個妹子有許多的舔狗(SpareTire),若是妹子想要完成某件事最簡單、高效的方法是什麼?web

答案是,舔狗那麼多,交給他們去辦就ok了。那麼狗子辦事期間,妹子會一直等待狗子把事情作好嗎?不行,這期間固然能夠繼續將其餘任務派發給其餘狗子。當狗子辦事期間,若是有須要妹子處理的事情,通知處理一下便可。spring

固然狗子通常都是處理一些重活累活,好比數據拷貝、I/O啊,接收新鏈接啥的(太慘了)。妹子則專一於核心業務的處理。apache

在這個例子中,妹子至關於核心業務線程,主要用來處理業務邏輯,而狗子們則是(內核+I/O線程)的抽象。後端

P.S.api

  • 若是你瞭解NIO2,建議你直接閱讀NIO2模型解讀章節,不須要再閱讀NIO2 DEMO章節(時間寶貴)tomcat

  • 你能夠直接越過全部章節去看總結,也能夠簡單閱讀附錄直接上手調試代碼多線程

NIO2 DEMO

NIO2中有個核心點,就是內核負責主要負責通知程序有什麼事件,而鏈接的接收以及數據的拷貝仍是須要程序提供線程來作這些事情,你能夠理解爲妹子(核心業務線程)須要提供舔狗池(線程池)給內核來作這些事情

talk is cheap, show me your hair

若是你想要學習一下NIO2,能夠點擊學習

該源碼的註釋爲GBK編碼,若是你看到註釋爲亂碼,最好將其改成GBK編碼

這是一個Demo,值得注意的是雖然該例子中並無顯式的建立線程池,這是由於若是你在open()服務端的時候,若是沒有顯示指定,系統將會默認分配給ServerSocketChannel一個線程池,用於事件的處理,咱們能夠打開JConsole驗證一下.

channel = AsynchronousServerSocketChannel.open();
複製代碼

JConsole中顯示的線程

如圖所示thread-0到thread-4就是系統默認分配的線程池,用來處理I/O事件。(天賜舔狗)

想象一下,若是咱們在處理I/O事件的時候將全部線程都阻塞住了,那麼整個系統的I/O都將陷入阻塞, 以下圖所示。

在有新的I/O事件到來的時候,內核會選擇一個線程來處理這些I/O事件,若是處理I/O的線程陷入阻塞,那麼來自客戶端的請求將會一直被阻塞住,沒法返回。

所以處理I/O事件的線程最好只處理I/O事件(接收新鏈接、將數據從內核拷貝到線程中)

你能夠理解爲,舔狗最好只作舔狗該作的事情,即重活累活,至於核心業務或者會發生阻塞的狀況的事件最好提交給妹子(業務邏輯處理線程池)來處理。

Tomcat NIO2 模型

關鍵類 org.apache.tomcat.util.net.Nio2Endpoint

既然是講解NIO2的處理模型,那麼咱們有必要了解如下關鍵角色

  • Nio2Acceptor Acceptor並不與特定的線程綁定,而是當由新鏈接到來從線程池中選擇一個線程來執行Acceptor的代碼,這一個過程是由底層幫咱們完成的,Acceptor的主要任務是接收新鏈接,併爲該鏈接註冊讀寫的處理對象

  • LimitLatch 限制鏈接數,在異步I/O狀況限制鏈接數的主要方式就是鎖阻塞用於I/O事件的線程池中的線程

  • I/O處理器 處理I/O的類,與Nio2Acceptor運行在同一個線程池中

ServerSocket的啓動

異步ServerSocket啓動的流程較爲枯燥,若是你不想看代碼,如下爲其啓動的流程

  • 建立線程池,將其包裝爲AsynchronousChannelGroup
  • 打開ServerSocket
  • 綁定端口, 並設置最大鏈接數
@Override
    public void bind() throws Exception {

        // 建立線程池
        if (getExecutor() == null) {
            createExecutor();
        }
        if (getExecutor() instanceof ExecutorService) {
            //建立用於I/O的線程池(須要用AsynchronousChannelGroup包裝,才能提供給AsynchronousServerSocketChanne用)
            threadGroup = AsynchronousChannelGroup.withThreadPool((ExecutorService) getExecutor());
        }
        // AsynchronousChannelGroup needs exclusive access to its executor service
        if (!internalExecutor) {
            log.warn(sm.getString("endpoint.nio2.exclusiveExecutor"));
        }
        //建立ServerSocketChannel
        serverSock = AsynchronousServerSocketChannel.open(threadGroup);
        socketProperties.setProperties(serverSock);
        InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset());
        //綁定端口並設置backlog的參數
        //backlog能夠理解爲當前最大待執行accept操做的鏈接數
        serverSock.bind(addr, getAcceptCount());

        // Initialize SSL if needed
        initialiseSsl();
    }
複製代碼

以下圖,就是和當前異步ServerSocketChannel綁定的線程池,801表示該鏈接器所監聽的端口(附錄中有開啓NIO2的教程)

nio2的線程池

Nio2Acceptor

Nio2Acceptor主要功能接收新鏈接,並限制最大鏈接數,由於採用的是異步I/O,因此Acceptor並不會於特定的線程綁定,而是當新任務須要執行的時候,從線程池中選一個執行任務。以下圖所示當有客戶端新鏈接到達時,程序會從線程池選擇一個線程來執行Nio2Acceptor的completed方法並傳入客戶端Socket開始執行新鏈接處理的業務邏輯

斷點

AcceptHandler的註冊

在異步I/O中咱們須要向ServerSocketChannel註冊處理Accept事件的處理器以便完成鏈接事件的處理 如如下代碼所示,當tomcat啓動的時候,會開啓一個線程調用Nio2SocketAcceptor的run方法,將Nio2SocketAcceptor註冊爲ServerSocketChannel的accept事件處理器

protected class Nio2Acceptor extends Acceptor<AsynchronousSocketChannel> implements CompletionHandler<AsynchronousSocketChannel, Void> {
    ...
        @Override
        public void run() {
            // The initial accept will be called in a separate utility thread
            if (!isPaused()) {
                // 鏈接數限制,若是達到最大鏈接數,則調用此方法的線程會陷入等待
                try {
                    countUpOrAwaitConnection();
                } catch (InterruptedException e) {
                    // Ignore
                }
                if (!isPaused()) {
                   //將本身註冊爲accept事件的處理器(注意此類實現的接口)
                    serverSock.accept(null, this);
                } else {
                    state = AcceptorState.PAUSED;
                }
            } else {
                state = AcceptorState.PAUSED;
            }
        }
    ...
}
複製代碼

新鏈接的處理

當有新鏈接到達時,底層會從線程池選擇一個線程來執行completed方法並傳入客戶端socket,此時該方法主要的流程以下

  • 檢查是容器否仍在運行若是仍在運行則繼續流程
  • 檢查是否須要限制鏈接數,若是須要限制鏈接數,則從線程池中選擇一個線程來執行Acceptor的run方法(此方法可能會發生阻塞)
  • 以上操做均已完成則調用setSocketOptions方法執行後續I/O事件處理,至此新鏈接的接收完成
@Override
        public void completed(AsynchronousSocketChannel socket, Void attachment) {
            // Successful accept, reset the error delay
            errorDelay = 0;
            // Continue processing the socket on the current thread
            // Configure the socket
            if (isRunning() && !isPaused()) {
                //檢查限制的最大鏈接數,若是沒有設置(即-1)則不進行鏈接數限制
                if (getMaxConnections() == -1) {
                    serverSock.accept(null, this);
                } else {
                   //因爲有新鏈接的到達,所以須要從線程池選一個線程執行增長鏈接數的操做,此操做可能會發生阻塞
                    getExecutor().execute(this);
                }
                //執行後續的I/O事件處理
                if (!setSocketOptions(socket)) {
                    closeSocket(socket);
                }
            } else {
                if (isRunning()) {
                    state = AcceptorState.PAUSED;
                }
                destroySocket(socket);
            }
        }
複製代碼

限制最大鏈接數的實現

因爲Acceptor並不與特定的線程綁定, 所以若是須要限制最大鏈接數,須要使用鎖將空閒的線程阻塞住,這也時爲何須要在accept新鏈接的時候須要向線程池提交增長新鏈接數的任務,以下所示(也就是調用Nio2SocketAcceptor的run方法)

public void completed(AsynchronousSocketChannel socket, Void attachment) {
        ...
        getExecutor().execute(this);
        ...
}
複製代碼

除此以外,還記得在建立ServerSocketChannel的時候咱們設置了backlog參數嗎?

該參數主要用於設置當前ServerSocket所容許的最大未accept的鏈接數,也就是說若是超過了未accept得鏈接數backlog所設置的值,那麼新來的鏈接都將會被丟棄掉。(API文檔)

I/O 事件的處理

既然是異步I/O,那麼必然要在客戶端Socket註冊讀寫的CompletionHandler, 所以setSocketOptions必然會致使這一步驟的發生,那麼此步驟發生在何時呢?

通過Debug跟蹤發現setSocketOptions將會致使Nio2SocketWrapper的建立,而實際I/O流程就發生在新建Nio2SocketWrapper對象時所建立的readCompletionHandler中, 如下是其代碼

ReadCompletionHandler 用於監聽讀事件,在讀取到數據以後會調用processSocket方法開始數據的解析工做

public Nio2SocketWrapper(Nio2Channel channel, final Nio2Endpoint endpoint) {
            super(channel, endpoint);
            nioChannels = endpoint.getNioChannels();
            socketBufferHandler = channel.getBufHandler();

            this.readCompletionHandler = new CompletionHandler<Integer, ByteBuffer>() {
                @Override
                public void completed(Integer nBytes, ByteBuffer attachment) {
                    if (log.isDebugEnabled()) {
                        log.debug("Socket: [" + Nio2SocketWrapper.this + "], Interest: [" + readInterest + "]");
                    }
                    readNotify = false;
                    //加鎖,其餘線程可能會對標誌位進行修改
                    synchronized (readCompletionHandler) {
                        //nBytes表示讀取到的字節數,若是小於0
                        //拋出EOF異常,沒數據讀,那咋辦嗎,只好拋異常了
                        if (nBytes.intValue() < 0) {
                            failed(new EOFException(), attachment);
                        } else {
                            if (readInterest && !Nio2Endpoint.isInline()) {
                                readNotify = true;
                            } else {
                                // Release here since there will be no
                                // notify/dispatch to do the release.
                                readPending.release();
                            }
                            readInterest = false;
                        }
                    }
                    if (readNotify) {
                        //處理讀事件
                        getEndpoint().processSocket(Nio2SocketWrapper.this, SocketEvent.OPEN_READ, false);
                    }
                }
                //省略代碼,後面太長了
                ...
複製代碼

debug驗證一下,以下圖所示,attachment即咱們所讀到的數據

斷點

注意 在debug的時候IDEA可能發出切換線程的請求(讀數據和以前的操做是不在一個線程上的,以下所示

switch thread

總結

Tomcat NIO2 模型

模型
總結以下

  • accept事件和 I/O事件共用一個線程池,不會和特定線程綁定

  • Acceptor(Nio2Acceptor) 用於接收新鏈接,並註冊I/O事件的處理對象

  • LimitLatch經過阻塞住線程池中的線程來實現鏈接數限制功能

  • I/O Handler 即在Nio2SocketWrapper註冊的讀寫處理器,有I/O事件到達時,程序會選擇一個線程來執行這些處理器的代碼

  • 整體流程以下 新鏈接到達->選擇一個線程執行Nio2Acceptor的代碼->向線程池中提交增長鏈接數的任務->註冊讀寫處理事件->I/O事件到達,選擇一個線程處理I/O事件

思想遷移

不要使用默認線程池 在異步ServerSocketChannel建立的時候,tomcat會本身建立一個線程池,而不是使用默認提供的線程池,因爲線程池在咱們掌握之中,由此才實現了鏈接數限制的功能

不要阻塞I/O線程 I/O線程就要有I/O線程的亞子,不要在I/O線程執行會發生長時間阻塞的操做

附錄 如何調試tomcat

後端程序猿都曉得,SpringBoot中內嵌了tomcat(固然還有jetty,取決於你如何選擇), 所以咱們能夠新建一個SpringBoot應用來專門調試學習Tomcat的源碼。

如下爲tomcat調試的過程

  • 第一步, 打開IDEA
  • 第二步, 新建SpringBoot工程
  • 第三步, 在項目側邊欄Ctrl+F 查找Tomcat的jar包

紅框標註的爲tomcat的核心包

  • 第四步, /mute all,帶上耳機,打上斷點

若是你要測試Tomcat的NIO處理方式,在如下類打斷點 (若是你想要了解tomcat中NIO的處理方式,能夠看看個人理解)

package org.apache.tomcat.util.net;
public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> {
...
public class Poller implements Runnable {
    public void run() {
        //此方法的代碼位於692行
    }
}
...
}
複製代碼

若是你要測試Tomcat的NIO2的處理方式,則須要如下配置 將如下代碼添加到你的代碼中。(因爲SpringBoot中內嵌的tomcat默認I/O方式爲NIO因此咱們須要經過配置增長NIO2的鏈接器)

import org.apache.catalina.connector.Connector;
import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConnectorConf {
    //注意你的SpringBoot版本,此項目的版本是2.2.0,舊的版本1.5使用不一樣的類進行配置
    @Bean
    public TomcatServletWebServerFactory servletContainer() {
        TomcatServletWebServerFactory tomcatServletWebServerFactory =
                new TomcatServletWebServerFactory();
        tomcatServletWebServerFactory.addAdditionalTomcatConnectors(getConnector());
        return tomcatServletWebServerFactory;
    }

    private Connector getConnector() {
       // 關鍵點哦
        Connector connector = new Connector("org.apache.coyote.http11.Http11Nio2Protocol");
        //將鏈接器的端口設置爲801,這樣訪問801端口的就是NIO2的模式了
        connector.setPort(801);
        return connector;
    }
}
複製代碼

org.apache.tomcat.util.net.Nio2Endpoint打上斷點就完事了

如何調試多線程

在多線程的狀況下,可能會出現進入不了斷點的狀況,此時只需在斷點上右鍵選擇Thread便可, 當其餘線程到達斷點時IDEA法發出通知,以下圖所示

相關文章
相關標籤/搜索