java藝術開發(3)-NIO

1. 前言

和大多數人同樣,NIO在課本里面看到過,可是基本沒有用過。畢竟應該不多有程序員本身去在java裏面實現NIO,大多都基於Netty框架來實現。我以前開發過一個基於websocket的項目,websocket是基於tomcat實現的,可是根據線上的效果來看,websocket鏈接傳輸的性能並很差,因此準備着手優化。java

目前主流的websocket實現框架有tomcat、netty-socketIO和netty,而且網友一邊倒的傾向於netty相關的實現。鄙視tomcat的理由是: 依賴於容器,性能較差;基於BIO,當併發量高的時候會有資源瓶頸。 可是通過個人實際測試,我當前使用的內嵌tomcat9已經默認實現NIO了。只能說tomcat實現websocket性能較差的緣由,在於它基於容器的websocket實現不夠完善,NIO的實現也不如netty成熟。react

固然,我最終選擇了用netty來重構當前的websocket框架。不過,我對於NIO的實現過程也產生了興趣,連tomcat也在向它傾斜。程序員

2. BIO、NIO、AIO

Java 中的 BIO、NIO和 AIO 理解爲是 Java 語言對操做系統的各類 IO 模型的封裝。程序員在使用這些 API 的時候,不須要關心操做系統層面的知識,也不須要根據不一樣操做系統編寫不一樣的代碼。只須要使用Java的API就能夠了。web

在講 BIO,NIO,AIO 以前先來回顧一下這樣幾個概念:同步與異步,阻塞與非阻塞。spring

2.1. 同步與異步

同步就是發起一個調用後,被調用者未處理完請求以前,調用不返回。異步就是發起一個調用後,馬上獲得被調用者的迴應表示已接收到請求,可是被調用者並無返回結果,此時咱們能夠處理其餘的請求,被調用者一般依靠事件,回調等機制來通知調用者其返回結果。apache

同步和異步的區別最大在於異步的話調用者不須要等待處理結果,被調用者會經過回調等機制來通知調用者其返回結果。咱們能夠用打電話和發短信來很好的比喻同步與異步操做。數組

2.2. 阻塞和非阻塞

阻塞與非阻塞主要是從 CPU 的消耗上來講的,阻塞就是 CPU 停下來等待一個慢的操做完成 CPU 才接着完成其它的事。非阻塞就是在這個慢的操做在執行時 CPU 去幹其它別的事,等這個慢的操做完成時,CPU 再接着完成後續的操做。緩存

雖然表面上看非阻塞的方式能夠明顯的提升 CPU 的利用率,可是也帶了另一種後果就是系統的線程切換增長。增長的 CPU 使用時間能不能補償系統的切換成本須要好好評估。tomcat

那麼同步阻塞同步非阻塞異步非阻塞又表明什麼意思呢?springboot

我在網上看到一個很好的例子:你媽媽讓你燒水,小時候你比較笨啊,在哪裏傻等着水開(同步阻塞)。等你稍微再長大一點,你知道每次燒水的空隙能夠去幹點其餘事,而後只須要時不時來看看水開了沒有(同步非阻塞)。後來,大家家用上了水開了會發出聲音的壺,這樣你就只須要聽到響聲後就知道水開了,在這期間你能夠隨便幹本身的事情,你須要去倒水了(異步非阻塞)。

2.3. BIO、NIO、AIO

  • BIO:就是傳統的 java.io 包,它是基於流模型實現的,交互的方式是同步、阻塞方式,也就是說在讀入輸入流或者輸出流時,在讀寫動做完成以前,線程會一直阻塞在那裏,它們之間的調用時可靠的線性順序。它的有點就是代碼比較簡單、直觀;缺點就是 IO 的效率和擴展性很低,容易成爲應用性能瓶頸。
  • NIO :是 Java 1.4 引入的 java.nio 包,提供了 Channel、Selector、Buffer 等新的抽象,能夠構建多路複用的、同步非阻塞 IO 程序,同時提供了更接近操做系統底層高性能的數據操做方式。
  • AIO:是 Java 1.7 以後引入的包,是 NIO 的升級版本,提供了異步非堵塞的 IO 操做方式,因此人們叫它 AIO(Asynchronous IO),異步 IO 是基於事件和回調機制實現的,也就是應用操做以後會直接返回,不會堵塞在那裏,當後臺處理完成,操做系統會通知相應的線程進行後續的操做。

3. 關於NIO

3.1. NIO說明

NIO是一種新的IO模型(Recator模型),新主要體如今多路複用,事件驅動上

一、多路複用,一個線程能夠處理多個socket請求,經過多個socket註冊在一個select上面,而後不斷調用select來獲取被激活的socket,即達到在一個線程中,處理多個socket請求目的,而在傳統(同步阻塞)IO模型中,須要經過多線程的方式才能達到此目的,傳統的IO模型因爲使用多線程,就會有線程數量以及線程上下文切換等限制。

二、事件驅動(其實就是觀察者模式),模型圖以下

image.png

image.png

如圖所示,EventHandler爲IO的事件處理器(觀察者),Reactor爲管理EventHandler類,事件的註冊,刪除等(被觀察者),reactor的handle_event函數會不斷循環調用內核的selec()函數(同步事件多路分離器(通常是內核)的多路分離函數),只要某個文件句柄被激活(可讀寫),select()函數就返回,handle_event會調用相關的事件處理函數EventHandler上的handle_event()函數。

時序圖如上圖所示,使用reactor模型以後,用戶線程註冊事件以後,能夠去執行其餘事情(異步),等相關讀寫工做就緒以後,Reactor會通知用戶線程進行讀寫。用戶IO線程輪詢是否讀寫好等工做由Reactor上的handle_events處理,Reactor會調用內核select函數檢查socket的狀態。當socket被激活的時候,通知用戶線程(或調用戶線程的回掉函數)。執行EventHandler的hand_event()函數。因爲select函數是阻塞的,因此多了複用模型被叫作異步阻塞模型,注意,這裏所說的阻塞並非socket上read等操做的阻塞,socket上這些操做時非阻塞的(事件模型)。

NIO有3個實體:Buffer(緩衝區),Channel(通道),Selector(多路複用器)。
image.png

Buffer是客戶端存放服務端信息的一個容器,服務端若是把數據準備好了,就會經過Channel往Buffer裏面傳。Buffer有7個類型:ByteBuffer、CharBuffer、DoubleBuffer、FloatBuffer、IntBuffer、LongBuffer、ShortBuffer。

image.png

Channel是客戶端與服務端之間的雙工鏈接通道。因此在請求的過程當中,客戶端與服務端中間的Channel就在不停的執行「鏈接、詢問、斷開」的過程。直到數據準備好,再經過Channel傳回來。Channel主要有4個類型:FileChannel(從文件讀取數據)、DatagramChannel(讀寫UDP網絡協議數據)、SocketChannel(讀寫TCP網絡協議數據)、ServerSocketChannel(能夠監聽TCP鏈接)

Selector是服務端選擇Channel的一個複用器。Seletor有兩個核心任務:監控數據是否準備好,應答Channel。具體說來,多個Channel反覆輪詢時,Selector就看該Channel所需的數據是否準備好了;若是準備好了,則將數據經過Channel返回給該客戶端的Buffer,該客戶端再進行後續其餘操做;若是沒準備好,則告訴Channel還須要繼續輪詢;多個Channel反覆詢問Selector,Selector爲這些Channel一一解答。

3.2. NIO和BIO對比

旦有請求到來(無論是幾個同時到仍是隻有一個到),都會調用對應IO處理函數處理,因此:

(1)NIO適合處理鏈接數目特別多,可是鏈接比較短(輕操做)的場景,Jetty,Mina,ZooKeeper等都是基於java nio實現。

(2)BIO方式適用於鏈接數目比較小且固定的場景,這種方式對服務器資源要求比較高,併發侷限於應用中。

4. 零拷貝

4.1. 傳統IO

數據須要從磁盤拷貝到內核空間,再從內核空間拷到用戶空間(JVM)。
程序可能進行數據修改等操做
再將數據拷貝到內核空間,內核空間再拷貝到網卡內存,經過網絡發送出去(或拷貝到磁盤)。
即數據的讀寫(這裏用戶空間發到網絡也算做寫),都至少須要兩次拷貝。

固然磁盤到內核空間屬於DMA拷貝(DMA即直接內存存取,原理是外部設備不經過CPU而直接與系統內存交換數據)。而內核空間到用戶空間則須要CPU的參與進行拷貝,既然須要CPU參與,也就涉及到了內核態和用戶態的相互切換

3.gif

4.2. NIO零拷貝

改進的地方:

  • 已經將上下文切換次數從4次減小到了2次;
  • 將數據拷貝次數從4次減小到了3次(其中只有1次涉及了CPU,另外2次是DMA直接存取)。

但這尚未達到咱們零拷貝的目標。若是底層NIC(網絡接口卡)支持gather操做,咱們能進一步減小內核中的數據拷貝。在Linux 2.4以及更高版本的內核中,socket緩衝區描述符已被修改用來適應這個需求。這種方式不但減小屢次的上下文切換,同時消除了須要CPU參與的重複的數據拷貝。用戶這邊的使用方式不變,而內部已經有了質的改變。

NIO的零拷貝由transferTo()方法實現。transferTo()方法將數據從FileChannel對象傳送到可寫的字節通道(如Socket Channel等)。在內部實現中,由native方法transferTo0()來實現,它依賴底層操做系統的支持。在UNIX和Linux系統中,調用這個方法將會引發sendfile()系統調用。

4.gif

4.3. NIO直接內存

首先,它的做用位置處於傳統IO(BIO)與零拷貝之間,爲什麼這麼說?

傳統IO,能夠把磁盤的文件通過內核空間,讀到JVM空間,而後進行各類操做,最後再寫到磁盤或是發送到網絡,效率較慢但支持數據文件操做。

零拷貝則是直接在內核空間完成文件讀取並轉到磁盤(或發送到網絡)。因爲它沒有讀取文件數據到JVM這一環,所以程序沒法操做該文件數據,儘管效率很高!

而直接內存則介於二者之間,效率通常且可操做文件數據。直接內存(mmap技術)將文件直接映射到內核空間的內存,返回一個操做地址(address),它解決了文件數據須要拷貝到JVM才能進行操做的窘境。而是直接在內核空間直接進行操做,省去了內核空間拷貝到用戶空間這一步操做。

NIO的直接內存是由MappedByteBuffer實現的。核心便是map()方法,該方法把文件映射到內存中,得到內存地址addr,而後經過這個addr構造MappedByteBuffer類,以暴露各類文件操做API。

因爲MappedByteBuffer申請的是堆外內存,所以不受Minor GC控制,只能在發生Full GC時才能被回收。而DirectByteBuffer改善了這一狀況,它是MappedByteBuffer類的子類,同時它實現了DirectBuffer接口,維護一個Cleaner對象來完成內存回收。所以它既能夠經過Full GC來回收內存,也能夠調用clean()方法來進行回收。

另外,直接內存的大小可經過jvm參數來設置:-XX:MaxDirectMemorySize。

NIO的MappedByteBuffer還有一個兄弟叫作HeapByteBuffer。顧名思義,它用來在堆中申請內存,本質是一個數組。因爲它位於堆中,所以可受GC管控,易於回收。

5. tomcat的應用

仍是那個websocket的項目,以前說過,網上不少人都認爲tomcat默認是實現BIO的。但我在運行springboot項目後,無心中看到控制檯的日誌中有個"nio-exec-"前綴的線程。我覺得是由於引入了netty,可是當時那個接口是http的,和netty不要緊,最終查閱資料後瞭解到tomcat不一樣版本也在作改變。

一、BIO:阻塞式I/O操做即便用的是傳統 I/O操做,Tomcat7如下版本默認狀況下是以BIO模式運行的,因爲每一個請求都要建立一個線程來處理,線程開銷較大,不能處理高併發的場景,在三種模式中性能也最低。啓動tomcat後,日誌中會有 http-bio-端口 的內容。

二、NIO是Java 1.4 及後續版本提供的一種新的I/O操做方式,是一個基於緩衝區、並能提供非阻塞I/O操做的Java API,它擁有比傳統I/O操做(BIO)更好的併發運行性能。tomcat 8版本及以上默認就是在NIO模式下容許。啓動tomcat後,日誌中會有 http-nio-端口 的內容。

三、APR(Apache Portable Runtime/Apache可移植運行時),是Apache HTTP服務器的支持庫。你能夠簡單地理解爲,Tomcat將以JNI的形式調用Apache HTTP服務器的核心動態連接庫來處理文件讀取或網絡傳輸操做,從而大大地提升Tomcat對靜態文件的處理性能。 Tomcat apr也是在Tomcat上運行高併發應用的首選模式。啓動tomcat後,日誌中會有 http-apr-端口 的內容。

固然,tomcat的版本也不必定絕對匹配到,若是你想看你的tomcat是什麼版本的,仍是要看日誌。若是你是啓動springboot運行的,內嵌tomcat的日誌可能不夠完整,能夠經過在配置文件中加上如下的屬性來開啓完整日誌:

logging.level.org.apache.tomcat=debug
logging.level.org.apache.catalina=debug

6. 附錄代碼

附上簡單實現java nio的代碼做爲參考(也是copy來的)。
NioServer.java

/**
 * @Copyright: Shanghai Definesys Company.All rights reserved.
 * @Description:
 * ServerSocketChannel:有效事件爲 OP_ACCEPT。
 * SocketChannel:有效事件爲 OP_CONNECT、OP_READ、OP_WRITE
 * 他們之間是互斥的,若是 OP_READ爲true,其餘的就爲false
 *
 * @author: kerry.wu
 * @since: 2020/4/26 8:52
 * @history: 1.2020/4/26 created by kerry.wu
 */
public class NioServer {
    private int port;
    private Selector selector;
    private ExecutorService executorService= Executors.newFixedThreadPool(5);

    public NioServer(int port){
        this.port=port;
    }

    /**
     * 服務器端註冊 OP_ACCEPT
     */
    public void init(){
        ServerSocketChannel serverSocketChannel=null;
        try {
            serverSocketChannel=ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.bind(new InetSocketAddress(port));
            selector=Selector.open();
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("NIO Server open ...");
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    public void accept(SelectionKey selectionKey){
        try {
            ServerSocketChannel serverSocketChannel=(ServerSocketChannel) selectionKey.channel();
            SocketChannel socketChannel=serverSocketChannel.accept();
            socketChannel.configureBlocking(false);
            socketChannel.register(selector,SelectionKey.OP_READ);
            System.out.println("accept a client:"+socketChannel.socket().getInetAddress().getHostName());
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    public void start(){
        //註冊 OP_ACCEPT
        this.init();
        while (true){
            try {
                Thread.sleep(3000);
                int events=selector.select();
                if(events>0){
                    java.util.Iterator<SelectionKey> selectionKeyIterator=selector.selectedKeys().iterator();
                    while (selectionKeyIterator.hasNext()){
                        SelectionKey selectionKey=selectionKeyIterator.next();
                        selectionKeyIterator.remove();
                        //SelectionKey當前事件是 OP_ACCEPT 時,註冊 SocketChannel 的 OP_READ
                        if(selectionKey.isAcceptable()){
                            accept(selectionKey);
                        }else {
                            //SelectionKey當前事件是 OP_READ 時,分配線程接受並處理消息
                            executorService.submit(new NioServerHandler(selectionKey));
                        }
                    }
                }
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        new NioServer(8081).start();
    }
}

NioServerHandler.java

public class NioServerHandler implements Runnable {
    private SelectionKey selectionKey;

    public NioServerHandler(SelectionKey selectionKey){
        this.selectionKey=selectionKey;
    }

    /**
     * ByteBuffer:
     * 1.allocate(int capacity):從堆空間中分配一個容量大小爲capacity的byte數組做爲緩衝區的byte數據存儲器
     * 2.allocateDirect(int capacity):不使用JVM堆棧而是經過操做系統來建立內存塊用做緩衝區,它與當前操做系統可以更好的耦合,所以能進一步提升I/O操做速度。可是分配直接緩衝區的系統開銷很大,所以只有在緩衝區較大並長期存在,或者須要常常重用時,才使用這種緩衝區
     * 3.wrap(byte[] array):這個緩衝區的數據會存放在byte數組中,bytes數組或buff緩衝區任何一方中數據的改動都會影響另外一方。其實ByteBuffer底層原本就有一個bytes數組負責來保存buffer緩衝區中的數據,經過allocate方法系統會幫你構造一個byte數組
     * 4.flip:緩存字節數組的指針設置爲數組的開始序列即數組下標0。這樣就能夠從buffer開頭,對該buffer進行遍歷(讀取)了。
     */
    @Override
    public void run(){
        try {
            if(selectionKey.isReadable()){
                SocketChannel socketChannel=(SocketChannel) selectionKey.channel();
                ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
                socketChannel.read(byteBuffer);
                byteBuffer.flip();
                System.out.println("ACCEPT:"+selectionKey.isAcceptable());
                System.out.println("READ:"+selectionKey.isReadable());
                System.out.println("CONNECT:"+selectionKey.isConnectable());
                System.out.println("收到客戶端"+socketChannel.socket().getInetAddress().getHostName()+"的數據:"+new String(byteBuffer.array()));

                socketChannel.write(ByteBuffer.wrap(("回覆客戶端"+socketChannel.socket().getInetAddress().getHostName()).getBytes()) );
                selectionKey.cancel();

            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

NioClient.java

public class NioClient {
    private static final String host = "127.0.0.1";
    private static final int port = 8081;
    private Selector selector;

    public static void main(String[] args){
        for (int i=0;i<3;i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    NioClient client = new NioClient();
                    client.connect(host, port);
                    client.listen();
                }
            }).start();
        }
    }

    /**
     * 建立鏈接,註冊 OP_CONNECT
     * @param host
     * @param port
     */
    public void connect(String host, int port) {
        try {
            SocketChannel sc = SocketChannel.open();
            sc.configureBlocking(false);
            this.selector = Selector.open();
            //註冊 OP_CONNECT
            sc.register(selector, SelectionKey.OP_CONNECT);
            sc.connect(new InetSocketAddress(host, port));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void listen() {
        while (true) {
            try {
                int events = selector.select();
                if (events > 0) {
                    Iterator<SelectionKey> selectionKeys = selector.selectedKeys().iterator();
                    while (selectionKeys.hasNext()) {
                        SelectionKey selectionKey = selectionKeys.next();
                        selectionKeys.remove();
                        //當 OP_CONNECT 可鏈接事件,註冊 OP_READ
                        if (selectionKey.isConnectable()) {
                            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                            if (socketChannel.isConnectionPending()) {
                                socketChannel.finishConnect();
                            }
                            socketChannel.configureBlocking(false);
                            socketChannel.register(selector, SelectionKey.OP_READ);
                            socketChannel.write(ByteBuffer.wrap(("Hello this is " + Thread.currentThread().getName()).getBytes()));
                        }
                        //當 OP_READ 可讀事件,
                        else if (selectionKey.isReadable()) {
                            SocketChannel sc = (SocketChannel) selectionKey.channel();
                            ByteBuffer buffer = ByteBuffer.allocate(1024);
                            //從 SocketChannel 中讀取數據
                            sc.read(buffer);
                            buffer.flip();
                            Thread.sleep(3000);
                            System.out.println("收到服務端的數據:"+new String(buffer.array()));
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
相關文章
相關標籤/搜索