java中的NIO

簡介

從JDK1.4開始,java中提供一個種叫NIO(Non-Blocking IO)的IO處理機制。與以往的標準IO機制(BIO,Blocking IO)不一樣的是,新的機制把重點放在瞭如何縮短抽象與現實之間的距離上面。NIO中提出了一種新的抽象,NIO 彌補了原來的BIO的不足,它在標準 Java 代碼中提供了高速的、面向塊的I/O。java

NIO的包括三個核心概念:緩衝區(Buffer)、通道(Channel)、選擇器(Selector)。思惟導圖以下:api

NIO思惟導圖

BIO與NIO

BIO與NIO之間的共同點是他們都是同步的。而非異步的。數組

  • BIO是阻塞的(當前線程必須等待感興趣的事情發生), NIO是非柱塞的(事件選擇,感興趣的事情發生能夠通知線程,而沒必要一直在哪等待);服務器

  • BIO是面向流式的IO抽象(一次一個字節地處理數據), NIO是面向塊的IO抽象(每個操做都在一步中產生或者消費一個數據塊(Buffer));網絡

  • BIO的服務器實現模式爲一個鏈接一個線程,NIO服務器實現模式爲一個請求一個線程;oracle

前提概念

緩衝區操做:

緩衝區,以及緩衝區如何工做,是全部 I/O 的基礎。所謂「輸入/輸出」講的無非就是把數據移進或移出緩衝區。進程執行 I/O 操做,歸結起來,也就是向操做系統發出請求,讓它要麼把緩衝區裏的數據排幹 (寫),要麼用數據把緩衝區填滿(讀)。大體流程如圖:app

I:O緩衝區操做簡圖

注意圖中用戶空間和內核空間的概念。用戶空間是常規進程所在區域。JVM就是常規進程,駐守於用戶空間。用戶空間是非特權區域(好比,在該區域執行的代碼就不能直接訪問硬件設備)。內核空間是操做系統所在區域。內核代碼有特別的權力。異步

緩衝區操做發散/匯聚,許多操做系統能把組裝/分解過程進行得更加高效。socket

3個緩衝區發散讀取

這樣用戶進程就沒必要屢次執行系統調用(那樣作可能代價不菲),內核也能夠優化數據的處理 過程,由於它已掌握待傳輸數據的所有信息。函數

虛擬內存

全部現代操做系統都使用虛擬內存。虛擬內存意爲程序中使用虛擬地址取代物理(硬件RAM)內存地址。這樣作好處頗多:

  • 一個以上的虛擬地址可指向同一個物理內存地址;

  • 虛擬內存空間可大於實際可用的硬件內存。

內存頁

設備控制器不能經過 DMA 直接存儲到用戶空間,但經過利用上面 到的第一 項,則能夠達到相同效果。把內核空間地址與用戶空間的虛擬地址映射到同一個物理地址,這樣, DMA 硬件(只能訪問物理內存地址)就能夠填充對內核與用戶空間進程同時可見的緩衝區。

內存空間多重映射

文件I/O

文件I/O屬文件系統範疇,文件系統與磁盤迥然不一樣。磁盤把數據存在扇區上,一般一個扇區 512 字節。磁盤屬硬件設備,對何謂文件一無所知,它只是 供了一系列數據存取窗口。文件系統把一連串大小一致的數據塊組織到一塊兒。有些塊存儲元信息,如空閒塊、目錄、索引等的映射,有些包含文件數據。

內存映射文件, 爲了在內核空間 的文件系統頁與用戶空間的內存區之間移動數據,一次以上的拷貝操做幾乎老是免不了的。

用戶內存到文件系統頁映射

文件鎖定機制, 容許一個進程阻止其餘進程存取某文件,或限制其存取方式。一般的用途是控制共享信息的更新方式,或用於事務隔離。文件鎖有建議使用和強制使用之分。建議型文件鎖會向 出請求的進程 供當前鎖定信息,但 操做系統並不要求必定這樣作,而是由相關進程進行協調並關注鎖定信息。

流I/O

並不是全部 I/O 都像前幾節講的是面向塊的,也有流 I/O,其原理模仿了通道。I/O 字節流必須順序存取,常見的例子有 TTY(控制檯)設備、打印機端口和網絡鏈接。

流的傳輸通常(也沒必要然如此)比塊設備慢,常常用於間歇性輸入。

緩衝區

一個Buffer對象是固定數量的數據的容器。其做用是一個存儲器,或者分段運輸區,在 這裏數據可被存儲並在以後用於檢索。緩衝區的工做與通道緊密聯繫。 Buffer的類層次圖:

Buffer類層次圖

緩衝區屬性

  • Capacity: 容量, 緩衝區可以容納的數據元素的最大數量。這一容量在緩衝區建立時被設定,而且永遠不能被改變;

  • Limit: 上界, 緩衝區的第一個不能被讀或寫的元素。或者說,緩衝區中現存元素的計數;

  • Position: 位置, 下一個要被讀或寫的元素的索引。位置會自動由相應的get()和put()函數更新;

  • Mark: 標記, 一個備忘位置。調用mark()來設定mark=postion。調用reset()設定position= mark。標記在設定前是未定義的(undefined)。

這四個屬性之間老是 循如下關係:0 <= mark <= position <= limit <= capacity。

直接緩衝區

操做系統的在內存區域中進行I/O操做。這些內存區域,就操做系統方面而言,是相連的字節序列。因而,毫無疑問,只有字節緩衝區有資格參與I/O操做。也請回想一下操做系統會直接存取進程——在本例中是JVM進程的內存空間,以傳輸數據。這也意味着I/O操做的目標內存區域必須是連續的字節序列。在JVM中,字節數組可能不會在內存中連續存儲,或者無用存儲單元 集可能隨時對其進行移動。在Java中,數組是對象,而數據存儲在對象中的方式在不一樣的JVM實現中都各有不一樣。

直接緩衝區被用於與通道和固有 I/O 例程交 互。它們經過使用固有代碼來告知操做系統直接釋放或填充內存區域,對用於通道直接或原始 存取的內存區域中的字節元素的存儲盡了最大的努力。

通道

通道用於在字節緩衝區和位於通道另外一邊的實體(一般是一個文件或套接字)之間有效地傳輸數據。

通道能夠形象地比喻爲銀行出納窗口使用的動導管。您的薪水支票就是您要傳送的信息,載體(Carrier)就比如一個緩衝區。您先填充緩衝區(將您的薪水支票放到載體上),接着將緩衝「寫」到通道中(將載體進導管中),而後信息負載就被傳遞到通道另外一邊的I/O服務(銀行出納員)。channel類的繼承關係以下:

Channel類層次結構

Scatter/Gather

通道提供了一種被稱爲Scatter/Gather的重要新功能(有時也被稱爲矢量 I/O)。Scatter/Gather是一個簡單卻強大的概念,它是指在多個緩衝區上實現一個簡單的I/O操做。對於一個write操做而言,數據是從幾個緩衝區按順序抽取(稱爲gather)並沿着通道發送的。對於 read 操做而言,從通道讀取的數據會按順序被散佈(稱爲scatter)到多個緩衝區,將每一個緩衝區填滿直至通道中的數據或者緩衝區的最大空間被消耗完。

Scatter的意思是分散,Gather的意思是彙集。咱們注意到在上面的類層次結構圖中,除了ByteChannel外,各Channel類還都實現了兩個接口,分別是:

  • ScatteringByteChannel

  • GatheringByteChannel

public interface ScatteringByteChannel extends ReadableByteChannel
{
   public long read (ByteBuffer [] dsts) throws IOException;
   public long read (ByteBuffer [] dsts, int offset, int length) throws IOException;
}
public interface GatheringByteChannel extends WritableByteChannel
{
   public long write(ByteBuffer[] srcs) throws IOException;
   public long write(ByteBuffer[] srcs, int offset, int length) throws IOException;
}

文件通道

Channel根據IO服務的狀況主要分爲兩大類,按照《Java NIO》的描述,兩類IO分別是:file I/O 和 stream I/O。前者是針對文件讀寫操做的,然後者可能是網絡通訊相關的和Socket相關的。Channel分類也基本如此,和前者對應的FileChannel,以及與後者對應的SocketChannel等類對象。

文件通道老是阻塞式的,所以不能被置於非阻塞模式。

Socket通道

新的socket通道類能夠運行非阻塞模式而且是可選擇的。所有socket通道類包括DatagramChannel、SocketChannel和ServerSocketChannel
如上面的類圖,全部的socket通道都繼承於AbstractSelectableChannel。

請注意DatagramChannel和SocketChannel 實現定義讀和寫功能的接口而ServerSocketChannel不實現。ServerSocketChannel 負責監聽傳入的鏈接和建立新的SocketChannel對象,它自己從不傳輸數據。

ServerSocketChannel

讓咱們從最簡單的ServerSocketChannel來開始對socket通道類的討論。如下是ServerSocketChannel的完整API:

public abstract class ServerSocketChannel extends AbstractSelectableChannel{
    public static ServerSocketChannel open() throws IOException
    public abstract ServerSocket socket();
    public abstract ServerSocket accept() throws IOException;
    public final int validOps()
}

ServerSocketChannel是一個基於通道的socket監聽器。它同咱們所熟悉的java.net.ServerSocket執行相同的基本任務,不過它增長了通道語義,所以可以在非阻塞模式下運行。

SocketChannel

SocketChannel,它是使用最多的socket通道類,接口以下:

public abstract class SocketChannel
   extends AbstractSelectableChannel
   implements ByteChannel, ScatteringByteChannel,GatheringByteChannel{
       
   public static SocketChannel open() throws IOException
   public static SocketChannel open (InetSocketAddress remote) throws IOException
   public abstract Socket socket();
   public abstract boolean connect (SocketAddress remote) throws IOException;
   public abstract boolean isConnectionPending();
   public abstract boolean finishConnect() throws IOException;
   public abstract boolean isConnected();
   public final int validOps()
}

socket 和 SocketChannel 類封裝點對點、有序的網絡鏈接,相似於咱們所熟知並喜好的 TCP/IP 網絡鏈接。SocketChannel 演 戶端發起同一個監聽服務器的鏈接。直到鏈接成功,它才能 到 數據而且只會從鏈接到的地址接 。

DatagramChannel

正如SocketChannel對應Socket, ServerSocketChannel對應ServerSocket,每個DatagramChannel對象也有一個關聯的DatagramSocket對象。不過原命名模式在此並未適用: DatagramSocketChannel顯得有點笨拙,所以採用了簡潔的DatagramChannel名稱。

public abstract class DatagramChannel
    extends AbstractSelectableChannel
    implements ByteChannel, ScatteringByteChannel, GatheringByteChannel{

    public static DatagramChannel open( ) throws IOException
    public abstract DatagramSocket socket( );
    public abstract DatagramChannel connect (SocketAddress remote) throws IOException;
    public abstract boolean isConnected( );
    public abstract DatagramChannel disconnect( ) throws IOException;
    public abstract SocketAddress receive (ByteBuffer dst) throws IOException;
    public abstract int send (ByteBuffer src, SocketAddress target)
    public abstract int read (ByteBuffer dst) throws IOException;
    public abstract long read (ByteBuffer [] dsts) throws IOException;
    public abstract long read (ByteBuffer [] dsts, int offset,int length) throws IOException;
    public abstract int write (ByteBuffer src) throws IOException;
    public abstract long write(ByteBuffer[] srcs) throws IOException;
    public abstract long write(ByteBuffer[] srcs, int offset,int length) throws IOException;
}

選擇器

選擇器提供選擇執行已經就緒的任務的能力,這使得多元I/O成爲可能。選擇器類管理着一個被註冊的通道集合的信息和它們的就緒狀態。通道是和選擇器一塊兒被註冊的,而且使用選擇器來更新通道的就緒狀態。當這麼作的時候,能夠選擇將被激發的線程掛起,直
到有就緒的的通道。

就緒選擇相關類

實例

文件讀取

將文件內容讀取到一個字符串中

public static String readFileToString(String filePath, Charset charset) throws IOException {
        try(FileInputStream in = new FileInputStream(filePath);
            FileChannel channel = in.getChannel()
        ){
            long fileSize = channel.size();
            int bufferSize = 1024;
            if (fileSize < 1024){
                bufferSize = (int)fileSize;
            }
            StringBuilder builder = new StringBuilder((int)(fileSize/2));

            ByteBuffer byteBuffer = ByteBuffer.allocate(bufferSize);
            CharBuffer charBuffer = CharBuffer.allocate(bufferSize/2);
            CharsetDecoder decoder = charset.newDecoder();
            while (channel.read(byteBuffer) != -1) {
                byteBuffer.flip();
                CoderResult rel;
                do{
                    rel = decoder.decode(byteBuffer,charBuffer,false);
                    charBuffer.flip();

                    builder.append(charBuffer.array(),0,charBuffer.limit());
                    charBuffer.clear();
                }while (rel.isOverflow());
                byteBuffer.compact();
            }

            byteBuffer.flip();
            decoder.decode(byteBuffer,charBuffer,true);
            charBuffer.flip();
            builder.append(charBuffer.array(),0,charBuffer.limit());
            charBuffer.clear();

            return builder.toString();
        }
    }

文件寫入

將一串字符串寫入文件中

public static long writeStringToFile(String filePath, String content, Charset charset) throws IOException {
        long writeSize = 0;
        try(FileOutputStream out = new FileOutputStream(filePath);
            FileChannel channel = out.getChannel()
        ){
            ByteBuffer buffer = ByteBuffer.wrap(content.getBytes(charset));
            while (buffer.hasRemaining()){
                writeSize += channel.write(buffer);
            }
            channel.force(false);
        }
        return writeSize;
    }

簡單的ServerSocketChannel使用

只是一個簡單的ServerSocketChannel

public static void main(String[] args) throws IOException {
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.socket().bind(new InetSocketAddress(8888));
//        ssc.configureBlocking(false);
        String hello_string = "hello rudy!";
        ByteBuffer buffer = ByteBuffer.wrap(hello_string.getBytes());
        while (true){
//            System.out.println("wait for connections");
            SocketChannel clientSocket = ssc.accept();
            if (null == clientSocket){
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }else{
                System.out.println(String.format("incomimg connection from: %s",clientSocket.getRemoteAddress()));
                buffer.rewind();
                clientSocket.write(buffer);
                clientSocket.close();
            }
        }
    }

簡單的SocketChannel使用

public static void main(String[] args) throws IOException {
        SocketChannel channel = SocketChannel.open();
        channel.connect(new InetSocketAddress("localhost",8888));
        ByteBuffer buffer = ByteBuffer.allocate(100);
        CharBuffer charBuffer = CharBuffer.allocate(100);
        CharsetDecoder decoder = Charset.defaultCharset().newDecoder();
        channel.read(buffer);
        buffer.flip();
        decoder.decode(buffer,charBuffer,false);
        charBuffer.flip();
        while (charBuffer.hasRemaining()){
            System.out.println(charBuffer.get());
        }
        channel.close();
    }

Selector使用,I/O多路複用

較爲綜合的例子

public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();
        ServerSocketChannel channel = ServerSocketChannel.open();
        channel.bind(new InetSocketAddress("localhost",8888));
        channel.configureBlocking(false);

        SelectionKey selectionKey = channel.register(selector,SelectionKey.OP_ACCEPT);

        ByteBuffer buffer = ByteBuffer.allocate(1024);
        CharBuffer charBuffer = CharBuffer.allocate(1024);
        CharsetDecoder decoder = Charset.defaultCharset().newDecoder();

        while (true){
            int readyNum = selector.select();
            if (readyNum <= 0){
                continue;
            }
            Set<SelectionKey> readyKey = selector.selectedKeys();
            for (SelectionKey tempKey : readyKey){
                if (tempKey.isAcceptable()){
                    ServerSocketChannel tempChannel = (ServerSocketChannel) tempKey.channel();
                    SocketChannel clientChannel = tempChannel.accept();
                    if (null != clientChannel){
                        System.out.println("one connection:" + clientChannel.getRemoteAddress());
                        clientChannel.configureBlocking(false);
                        clientChannel.register(selector,SelectionKey.OP_READ);
                    }
                }

                if(tempKey.isReadable()){
                    SocketChannel tempChannel = (SocketChannel) tempKey.channel();
                    tempChannel.read(buffer);
                    buffer.flip();
                    decoder.decode(buffer,charBuffer,false);
                    charBuffer.flip();
                    String getData = new String(charBuffer.array(),0,charBuffer.limit());
                    System.out.println(tempChannel.getRemoteAddress() + ":" + getData);
                    buffer.clear();
                    charBuffer.clear();
                    tempChannel.write(ByteBuffer.allocate(0));
                    if (getData.equalsIgnoreCase("exit")){
                        tempChannel.close();
                    }
                }

                if (tempKey.isWritable()){
                    SocketChannel tempChannel = (SocketChannel) tempKey.channel();
//                    System.out.println(tempChannel.getRemoteAddress() + ": read");
                }
                readyKey.remove(tempKey);
            }
        }
    }

UDP服務端

public static void main(String[] args) throws IOException {
        DatagramChannel channel = DatagramChannel.open();
        channel.bind(new InetSocketAddress("localhost",8888));
        ByteBuffer buffer = ByteBuffer.allocate(100);
        CharBuffer charBuffer = CharBuffer.allocate(100);
        CharsetDecoder decoder = Charset.defaultCharset().newDecoder();
        while (true){
            buffer.clear();
            charBuffer.clear();
            SocketAddress remoteAddress = channel.receive(buffer);
            buffer.flip();
            decoder.decode(buffer,charBuffer,false);
            charBuffer.flip();
            System.out.println( remoteAddress +":" + new String(charBuffer.array(),0, charBuffer.limit()));
        }

    }

UDP客戶端

public static void main(String[] args) throws IOException {
        DatagramChannel channel = DatagramChannel.open();
        String sendData = "哈哈哈 hello rudy!";
        ByteBuffer buffer = ByteBuffer.wrap(sendData.getBytes());
        channel.send(buffer, new  InetSocketAddress("localhost",8888));
        System.out.println("send end!");
    }

他山之石

相關文章
相關標籤/搜索