【J2SE】java NIO 基礎學習

NIO與IO的區別

IO NIO
阻塞式 非阻塞式、選擇器selectors
面向流:單向流動,直接將數據從一方流向另外一方 面向緩存:將數據放到緩存區中進行存取,經通道進行數據的傳輸

緩衝Buffer

根據數據類型的不一樣,提供了對應的類型緩衝區(boolean類型除外),每個Buffer類都是Buffer接口的一個實例。經過Buffer類.allocate()方法獲取緩衝區;對緩衝區的數據進行操做可使用put方法和get方法。java

四個核心屬性程序員

// Invariants: mark <= position <= limit <= capacity
private int mark = -1;
private int position = 0;
private int limit;
private int capacity;
  1. capacity:容量,表示緩衝區中最大存儲容量,一旦聲明不可更改。
  2. limit:界限,表示限制可對緩衝區操做數據的範圍,範圍外的數據不可被操做。
  3. position:位置,表示當前操做的數據位於緩衝區中的位置。
  4. mark:標記,表示記錄當前position的位置。

經常使用方法(以ByteBuffer爲例)數組

public static ByteBuffer allocateDirect(int capacity):分配一個直接緩衝區

public static ByteBuffer allocate(int capacity):分配一個間接緩衝區緩存

當分配一個緩衝區時,capacity=capacity,mark=-1, position=0, limit=capacity,源碼分析以下:服務器

public static ByteBuffer allocate(int capacity) {
    ...
    return new HeapByteBuffer(capacity, capacity);
}

// class HeapByteBuffer extends ByteBuffer
HeapByteBuffer(int cap, int lim) {
    // 調用ByteBuffer的構造函數傳入默認參數:mark=-1, position=0, limit=capacity
    super(-1, 0, lim, cap, new byte[cap], 0);
};

// public abstract class ByteBuffer extends Buffer
ByteBuffer(int mark, int pos, int lim, int cap, byte[] hb, int offset) {
    super(mark, pos, lim, cap);
    this.hb = hb;            // final byte[] hb; 
    this.offset = offset;    // final int offset;
}

Buffer(int mark, int pos, int lim, int cap) { 
    ...
    this.capacity = cap;
    limit(lim);        //    設置limit
    position(pos);    //    設置position
    if (mark >= 0) {
        ...
        this.mark = mark;
    }
}
public final ByteBuffer put(byte[] src):將一個字節數組放入緩衝區。

每當放置一個字節時,position將會+1,保證position的值就是下一個可插入數據的buffer單元位置。源碼分析以下:網絡

public final ByteBuffer put(byte[] src) {
    return put(src, 0, src.length);    
}

// 由allocate方法調用分配緩衝區可知,返回的是Buffer的實現類HeapByteBuffer對象
public ByteBuffer put(byte[] src, int offset, int length) {
    checkBounds(offset, length, src.length);    // 檢查是否下標越界
    if (length > remaining())                    // 檢查是否超出了可操做的數據範圍= limit-position
        throw new BufferOverflowException();
    System.arraycopy(src, offset, hb, ix(position()), length);
    position(position() + length);                // 重設position
    return this;
}
public ByteBuffer get(byte[] dst):從緩衝區中讀取數據到 dst中。應在 flip() 方法後調用。

獲取數據,是在緩衝區字節數組中的position位置處開始,讀取一次完畢後,並會記錄當前讀取的位置,即position,以便於下一次調用get方法繼續讀取。app

public ByteBuffer get(byte[] dst) {
    return get(dst, 0, dst.length);
}

// 調用HeapByteBuffer對象的get方法
public ByteBuffer get(byte[] dst, int offset, int length) {
    ...
    // 從緩衝區的字節數組final byte[] hb中,拷貝從 hb的 offset+position(注:offset=0) 處的長度爲length的數據到 dst中
    System.arraycopy(hb, ix(position()), dst, offset, length);
    position(position() + length);    // 設置position
    return this;
}

經過源碼分析可知,當put操做後,position記錄的是下一個可用的buffer單元,而get會從position位置處開始獲取數據,這顯然是沒法得到的,所以須要從新設置 position, 即 flip()方法。dom

public final Buffer flip() :翻轉緩衝區,在一個通道讀取或PUT操做序列以後,調用此方法以準備一個通道寫入或相對獲取操做的序列

將此通道的緩衝區的界限設置爲當前position,保證了有可操做的數據。異步

public final Buffer flip() {
    limit = position;
    position = 0;
    mark = -1;
    return this;
}
public final Buffer mark():標記當前position

可用於在put操做轉get操做時標記當前的position位置,以便於調用reset方法從該位置繼續操做socket

public final Buffer mark() {
    mark = position;
    return this;
}
public final Buffer reset():回到mark標記的位置
public final Buffer reset() {
    int m = mark;
    if (m < 0)
        throw new InvalidMarkException();
    position = m;
    return this;
}
public final Buffer clear():清除緩衝,重置初始化原始狀態
public final Buffer clear() {
    position = 0;
    limit = capacity;
    mark = -1;
    return this;
}
public final Buffer rewind():倒回,用於從新讀取數據
public final Buffer rewind() {
    position = 0;
    mark = -1;
    return this;
}

直接緩衝區與間接緩衝區

間接緩衝:經過allocate方法分配的緩衝區。當程序發起read請求獲取磁盤文件時,該文件首先被OS讀取到內核地址空間中,並copy一份原始數據傳入JVM用戶地址空間,再傳給應用程序。增長了一個copy操做,致使效率下降。

直接緩衝:經過allocateDirecr方法分配的緩衝區,此緩衝區創建在物理內存中。直接在兩個空間中開闢內存空間,建立映射文件,去除了在內核地址空間和用戶地址空間中的copy操做,使得直接經過物理內存傳輸數據。雖然有效提升了效率,可是分配和銷燬該緩衝區的成本高於間接緩衝,且對於緩衝區中的數據將交付給OS管理,程序員沒法控制。

通道Channel

用於源節點與目標節點之間的鏈接,負責對緩衝區中的數據提供傳輸服務。

經常使用類

​ FileChannel:用於讀取、寫入、映射和操做文件的通道。

​ SocketChannel:經過 TCP 讀寫網絡中的數據。

​ ServerSocketChannerl:經過 UDP 讀寫網絡中的數據通道。

​ DatagramChannel:經過 UDP 讀寫網絡中的數據通道。

​ 本地IO:FileInputStream、FileOutputStream、RandomAccessFile

​ 網絡IO:Socket、ServerSocket、DatagramSocket

獲取Channel方式(以FileChannel爲例)

​ 1. Files.newByteChannel工具類靜態方法

​ 2. getChannel方法:經過對象動態獲取,使用間接緩衝區。

FileInputStream fis = new FileInputStream(ORIGINAL_FILE);
FileOutputStream fos = new FileOutputStream(OUTPUT_FILE);

// 獲取通道
FileChannel inChannel = fis.getChannel();
FileChannel outChannel = fos.getChannel();

// 提供緩衝區(間接緩衝區)
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (inChannel.read(buffer) != -1) {
    buffer.flip();
    outChannel.write(buffer);
    buffer.clear();
}

​ 3. 靜態open方法:使用open獲取到的Channel通道,使用直接緩衝區。

FileChannel inChannel = 
    FileChannel.open(Paths.get(ORIGINAL_FILE), StandardOpenOption.READ);
FileChannel outChannel = 
    FileChannel.open(Paths.get(OUTPUT_FILE), StandardOpenOption.READ,
    StandardOpenOption.CREATE, StandardOpenOption.WRITE);
    
// 使用物理內存 內存映射文件 
MappedByteBuffer inBuffer = 
    inChannel.map(MapMode.READ_ONLY, 0, inChannel.size());
MappedByteBuffer outBuffer = 
    outChannel.map(MapMode.READ_WRITE, 0, inChannel.size());
    
byte[] dst = new byte[inBuffer.limit()];
inBuffer.get(dst);
outBuffer.put(dst);
// 使用DMA 直接存儲器存儲
inChannel.transferTo(0, inChannel.size(), outChannel);
outChannel.transferFrom(inChannel, 0, inChannel.size());
public static FileChannel open(Path path, OpenOption... options):從path路徑中以某種方式獲取文件的Channel
StandardOpenOption 描述
CREATE 建立一個新的文件,若是存在,則覆蓋。
CREATE_NEW 建立一個新的文件,若是該文件已經存在則失敗。
DELETE_ON_CLOSE 關閉時刪除。
DSYNC 要求將文件內容的每次更新都與底層存儲設備同步寫入。
READ 讀方式
SPARSE 稀疏文件
SYNC 要求將文件內容或元數據的每次更新都同步寫入底層存儲設備。
TRUNCATE_EXISTING 若是文件已經存在,而且打開 wirte訪問,則其長度將截斷爲0。
WRITE 寫方式
APPEND 若是文件以wirte訪問打開,則字節將被寫入文件的末尾而不是開頭。
public abstract MappedByteBuffer map(MapMode mode, long position, long size):將通道的文件區域映射到內從中。當操做較大的文件時,將數據映射到物理內存中才是值得的,由於映射到內存是須要開銷的。
FileChannel.MapMode 描述
PRIVATE 專用映射模式(寫入時拷貝)
READ_ONLY 只讀模式
READ_WRIT 讀寫模式
public abstract long transferFrom(ReadableByteChannel src, long position, long count):從給定的可讀取通道src,傳輸到本通道中。直接使用直接存儲器(DMA)對數據進行存儲。

public abstract long transferTo(long position, long count, WritableByteChannel target):將本通道的文件傳輸到可寫入的target通道中。

分散(Scatter)與彙集(Gather)

​ 分散讀取:將通道中的數據分散到多個緩衝區中。 public final long read(ByteBuffer[] dsts)

​ 彙集寫入:將多個緩衝區中的數據彙集到一個Channel通道中。public final long write(ByteBuffer[] srcs)

字符集(Charset)

public final ByteBuffer encode(CharBuffer cb):編碼

public final CharBuffer decode(ByteBuffer bb):解碼

網絡通訊的阻塞與非阻塞

阻塞是相對網絡傳輸而言的。傳統的IO流都是阻塞的,在網絡通訊中,因爲 IO 阻塞,須要爲每個客戶端建立一個獨立的線程來進行數據傳輸,性能大大下降;而NIO是非阻塞的,當存在空閒線程時,能夠轉去操做其餘通道,所以沒必要非要建立一個獨立的線程來服務每個客戶端請求。

選擇器(Selector)

SelectableChannle對象的多路複用器,可同時對多個SelectableChannle對象的 IO 狀態監聽,每當建立一個Channel時,就向Selector進行註冊,交由Selector進行管理,只有Channel準備就緒時,Selector可會將任務分配給一個或多個線程去執行。Selector能夠同時管理多個Channel,是非阻塞 IO 的核心。

NIO 阻塞式

服務器Server不斷監聽客戶端Client的請求,當創建了一個Channel時,服務器進行read操做,接收客戶端發送的數據,只有當客戶端斷開鏈接close,或者執行shutdownOutput操做時,服務器才知曉沒有數據了,不然會一直進行read操做;當客戶端在read操做獲取服務器的反饋時,若服務器沒有關閉鏈接或者shutdownInput時也會一直阻塞。示例代碼以下:

static final String ORIGINAL_FILE = "F:/1.png";
 static final String OUTPUT_FILE = "F:/2.jpg";
public void server() throws Exception {
    // 打開TCP通道,綁定端口監聽
    ServerSocketChannel serverChannel = ServerSocketChannel.open();
    serverChannel.bind(new InetSocketAddress(9988));    
    
    ByteBuffer buf = ByteBuffer.allocate(1024);
    // 獲取鏈接
    SocketChannel accept = null;
    while ((accept= serverChannel.accept()) != null) {
        FileChannel fileChannel = FileChannel.open(
            Paths.get(OUTPUT_FILE), StandardOpenOption.CREATE,
            StandardOpenOption.WRITE);
        
        // 讀取客戶端的請求數據
        while (accept.read(buf) != -1) {
            buf.flip();
            fileChannel.write(buf);
            buf.clear();
        }
        
        // 發送執行結果
        buf.put("成功接收".getBytes());
        buf.flip();
        accept.write(buf);
        buf.clear();
        
        fileChannel.close();
        // 關閉鏈接,不然客戶端會一直等待讀取致使阻塞,可以使用shutdownInput,但任務已結束,該close
        accept.close();
    }
    serverChannel.close();
}
public void client() throws Exception {
    // 打開一個socket通道
    SocketChannel clientChannel = SocketChannel.open(
        new InetSocketAddress("127.0.0.1", 9988));
    // 建立緩衝區和文件傳輸通道
    FileChannel fileChannel = FileChannel.open(Paths.get(ORIGINAL_FILE),
            StandardOpenOption.READ);
    ByteBuffer buf = ByteBuffer.allocate(1024);
    
    while ( fileChannel.read(buf) != -1) {
        buf.flip();
        clientChannel.write(buf);
        buf.clear();
    }
    
    // 關閉輸出(不關閉通道),告知服務器已經發送完畢,去掉下面一行代碼服務區將一直讀取致使阻塞
    clientChannel.shutdownOutput();
    
    int len = 0;
    while ((len = clientChannel.read(buf)) != -1) {
        buf.flip();
        System.out.println(new String(buf.array(), 0, len));
        buf.clear();
    }
    
    fileChannel.close();
    clientChannel.close();
}

NIO 非阻塞式

經過在通道Channel中調用configureBlocking將blocking設置爲false,讓Channel能夠進行異步 I/O 操做。

public void client() throws Exception {
    // 打開一個socket通道
    SocketChannel clientChannel = SocketChannel.open(
        new InetSocketAddress("127.0.0.1", 9988));
    ByteBuffer buf = ByteBuffer.allocate(1024);

    // 告知服務器,已經發送完畢
    //    clientChannel.shutdownOutput();
    //    設置非阻塞
    clientChannel.configureBlocking(Boolean.FALSE);

    buf.put("哈哈".getBytes());
    buf.flip();
    clientChannel.write(buf);

    clientChannel.close();
}
public void server() throws Exception {
    // 打開TCP通道,綁定端口監聽
    ServerSocketChannel serverChannel = ServerSocketChannel.open();
    serverChannel.configureBlocking(Boolean.FALSE);
    serverChannel.bind(new InetSocketAddress(9988));

    //    建立一個Selector用於管理Channel
    Selector selector = Selector.open();
    //    將服務器的Channel註冊到selector中,並添加 OP_ACCEPT 事件,讓selector監聽通道的請求
    serverChannel.register(selector, SelectionKey.OP_ACCEPT);

    // 一直判斷是否有已經準備就緒的Channel
    while (selector.select() > 0) {
          // 存在一個已經準備就緒的Channel,獲取SelectionKey集合中獲取觸發該事件的全部key
        Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
        while (keys.hasNext()) {
            SelectionKey sk = keys.next();
            SocketChannel accept = null;
            ByteBuffer buffer = null;
            // 針對不一樣的狀態進行操做
            if (sk.isAcceptable()) {
                // 可被鏈接,設置非阻塞並註冊到selector中
                accept = serverChannel.accept();
                accept.configureBlocking(Boolean.FALSE);
                accept.register(selector, SelectionKey.OP_READ);
            } else if (sk.isReadable()) {
                // 可讀,獲取該選擇器上的 Channel進行讀操做
                accept = (SocketChannel) sk.channel();
                buffer = ByteBuffer.allocate(1024);
                int len = 0;
                while ((len = accept.read(buffer)) != -1) {
                    buffer.flip();
                    System.out.println(new String(buffer.array(), 0, len));
                    buffer.clear();
                }
            }
        }
        // 移除本次操做的SelectionKey
        keys.remove();
    }
    serverChannel.close();
}

方法使用說明

  • ServerSocketChannel對象只能註冊accept 事件。
  • 設置configureBlocking爲false,才能使套接字通道中進行異步 I/O 操做。
  • 調用selectedKeys方法,返回發生了SelectionKey對象的集合。
  • 調用remove方法,用於從SelectionKey集合中移除已經被處理的key,若不處理,那麼它將繼續以當前的激活事件狀態繼續存在。

Pipe管道

Channel都是雙向通道傳輸,而Pipe就是爲了實現單向管道傳送的通道對,有一個source通道(Pipe.SourceChannel)和一個sink通道(Pipe.SinkChannel)。sink用於寫數據,source用於讀數據。直接使用Pipe.open()獲取Pipe對象,操做和FileChannel同樣。

相關文章
相關標籤/搜索