Java NIO:Buffer、Channel 和 Selector

Buffer

一個 Buffer 本質上是內存中的一塊,咱們能夠將數據寫入這塊內存,以後從這塊內存獲取數據。java

java.nio 定義瞭如下幾個 Buffer 的實現,這個圖讀者應該也在很多地方見過了吧。數組

其實核心是最後的 ByteBuffer,前面的一大串類只是包裝了一下它而已,咱們使用最多的一般也是 ByteBuffer。網絡

咱們應該將 Buffer 理解爲一個數組,IntBuffer、CharBuffer、DoubleBuffer 等分別對應 int[]、char[]、double[] 等。app

MappedByteBuffer 用於實現內存映射文件,也不是本文關注的重點。dom

我以爲操做 Buffer 和操做數組、類集差很少,只不過大部分時候咱們都把它放到了 NIO 的場景裏面來使用而已。下面介紹 Buffer 中的幾個重要屬性和幾個重要方法。socket

position、limit、capacity

就像數組有數組容量,每次訪問元素要指定下標,Buffer 中也有幾個重要屬性:position、limit、capacity。this

 

最好理解的固然是 capacity,它表明這個緩衝區的容量,一旦設定就不能夠更改。好比 capacity 爲 1024 的 IntBuffer,表明其一次能夠存放 1024 個 int 類型的值。一旦 Buffer 的容量達到 capacity,須要清空 Buffer,才能從新寫入值。spa

position 和 limit 是變化的,咱們分別看下讀和寫操做下,它們是如何變化的。線程

position 的初始值是 0,每往 Buffer 中寫入一個值,position 就自動加 1,表明下一次的寫入位置。讀操做的時候也是相似的,每讀一個值,position 就自動加 1。翻譯

從寫操做模式到讀操做模式切換的時候(flip),position 都會歸零,這樣就能夠從頭開始讀寫了。

limit:寫操做模式下,limit 表明的是最大能寫入的數據,這個時候 limit 等於 capacity。寫結束後,切換到讀模式,此時的 limit 等於 Buffer 中實際的數據大小,由於 Buffer 不必定被寫滿了。

初始化 Buffer

每一個 Buffer 實現類都提供了一個靜態方法 allocate(int capacity) 幫助咱們快速實例化一個 Buffer。如:

ByteBuffer byteBuf = ByteBuffer.allocate(1024);
IntBuffer intBuf = IntBuffer.allocate(1024);
LongBuffer longBuf = LongBuffer.allocate(1024);

另外,咱們常用 wrap 方法來初始化一個 Buffer。

public static ByteBuffer wrap(byte[] array) {
    ...
}

 

填充 Buffer

各個 Buffer 類都提供了一些 put 方法用於將數據填充到 Buffer 中,如 ByteBuffer 中的幾個 put 方法:

// 填充一個 byte 值
public abstract ByteBuffer put(byte b);
// 在指定位置填充一個 int 值
public abstract ByteBuffer put(int index, byte b);
// 將一個數組中的值填充進去
public final ByteBuffer put(byte[] src) {...}
public ByteBuffer put(byte[] src, int offset, int length) {...}

上述這些方法須要本身控制 Buffer 大小,不能超過 capacity,超過會java.nio.BufferOverflowException 異常。  

對於 Buffer 來講,另外一個常見的操做中就是,咱們要未來自 Channel 的數據填充到 Buffer 中,在系統層面上,這個操做咱們稱爲讀操做,由於數據是從外部(文件或網絡等)讀到內存中。

int num = channel.read(buf);

上述方法會返回從 Channel 中讀入到 Buffer 的數據大小。

提取 Buffer 中的值

前面介紹了寫操做,每寫入一個值,position 的值都須要加 1,因此 position 最後會指向最後一次寫入的位置的後面一個,若是 Buffer 寫滿了,那麼 position 等於 capacity(position 從 0 開始)。

若是要讀 Buffer 中的值,須要切換模式,從寫入模式切換到讀出模式。注意,一般在說 NIO 的讀操做的時候,咱們說的是從 Channel 中讀數據到 Buffer 中,對應的是對 Buffer 的寫入操做,初學者須要理清楚這個。

調用Buffer的flip()方法,能夠從寫模式切換到讀模式,其實就是從新設置了一下position和limit的值。

public final Buffer flip() {
    limit = position; // 將 limit 設置爲實際寫入的數據數量
    position = 0; // 重置 position 爲 0
    mark = -1; // mark 以後再說
    return this;
}

對應寫操做的一系列put方法,讀操做提供了一系列的get()方法:

// 根據 position 來獲取數據
public abstract byte get();
// 獲取指定位置的數據
public abstract byte get(int index);
// 將 Buffer 中的數據寫入到數組中
public ByteBuffer get(byte[] dst)

附一個常用的方法:

new String(buffer.array()).trim();

除了將數據從Buffer讀取出來使用,更常見的操做是將寫入的數據輸出到Channel中,如經過FileChannel將數據寫入到文件中,經過SocketChannel將數據寫入到網絡發送到遠程機器等。對應的,這種操做,咱們稱之爲寫操做。

int num = channel.write(buf);

 

mark()、reset()

除了position、limit、capacity這三個基本屬性外,還有一個經常使用的屬性就是mark。

mark用於臨時保存position的值,每次調用mark()方法都會將mark設置爲當前的position,便於後學須要的時候使用。

public final Buffer mark() {
    mark = position;
    return this;
}

那到底何時用呢?考慮如下場景,咱們在 position 爲 5 的時候,先 mark() 一下,而後繼續往下讀,讀到第 10 的時候,我想從新回到 position 爲 5 的地方從新來一遍,那隻要調一下 reset() 方法,position 就回到 5 了。

public final Buffer reset() {
    int m = mark;
    if (m < 0)
        throw new InvalidMarkException();
    position = m;
    return this;
}

 

rewind()、clear()、compact()

rewind():會重置position爲0,一般用於從頭讀寫Buffer。

public final Buffer rewind() {
    position = 0;
    mark = -1;
    return this;
}

 clear():至關於從新實例化。

一般,咱們會先填充Buffer,而後從Buffer讀取數據,以後再從新往裏填充新的數據,咱們通常在填充以前先調用clear().

public final Buffer clear() {
    position = 0;
    limit = capacity;
    mark = -1;
    return this;
}

compact():和clear()同樣的是都是在準備往Buffer中填充新數據以前調用。

clear()會重置幾個屬性,可是並不會將Buffer中的數據清空,只不事後面寫的時候會覆蓋以前的數據。

而compact()方法調用以後,會先處理尚未讀取的數據,也就是position到limit直接的數據,先將這些數據都移動到左邊,而後在這個基礎之上再開始寫入。此時,limit仍是等於capacity,position指向原來數據的右邊。

 

Channel

全部的 NIO 操做始於通道,通道是數據來源或數據寫入的目的地,主要地,咱們將關心 java.nio 包中實現的如下幾個 Channel:

 

FileChannel:文件通道,用於文件的讀和寫。

DatagramChannel:用於UDP鏈接的接收和發送

SocketChannel:TCP客戶端

ServerSocketChannel:TCP服務端,監聽某個端口進來的請求。

Channel 常常翻譯爲通道,相似 IO 中的流,用於讀取和寫入。它與前面介紹的 Buffer 打交道,讀操做的時候將 Channel 中的數據填充到 Buffer 中,而寫操做時將 Buffer 中的數據寫入到 Channel 中。

 

 

 

FileChannel

 

初始化:

FileInputStream inputStream = new FileInputStream(new File("/data.txt"));
FileChannel fileChannel = inputStream.getChannel();  

固然了,也能夠從RandomAccessFile類中的getChannel來獲得FileChannel。

讀取文件內容:

ByteBuffer buffer = ByteBuffer.allocate(1024);

int num = fileChannel.read(buffer);

寫入文件內容:

ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put("隨機寫入一些內容到 Buffer 中".getBytes());
// Buffer 切換爲讀模式
buffer.flip();
while(buffer.hasRemaining()) {
    // 將 Buffer 中的內容寫入文件
    fileChannel.write(buffer);
}

 

SocketChannel

打開一個TCP連接:

SocketChannel socketChannel = SocketChannel
  .open(new InetSocketAddress("127.0.0.1", 80));

固然了,上面的這行代碼等價於下面的兩行:

// 打開一個通道
SocketChannel socketChannel = SocketChannel.open();
// 發起鏈接
socketChannel.connect(new InetSocketAddress("127.0.0.1", 80));

SocketChannel 的讀寫和 FileChannel 沒什麼區別,就是操做緩衝區。

// 讀取數據
socketChannel.read(buffer);

// 寫入數據到網絡鏈接中
while(buffer.hasRemaining()) {
    socketChannel.write(buffer);   
}

ServerSocketChannel

ServerSocketChannel 用於監聽機器端口,管理從這個端口進來的 TCP 鏈接。

// 實例化
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 監聽 8080 端口
serverSocketChannel.socket().bind(new InetSocketAddress(8080));

while (true) {
    // 一旦有一個 TCP 鏈接進來,就對應建立一個 SocketChannel 進行處理
    SocketChannel socketChannel = serverSocketChannel.accept();
}

 

這裏咱們看到了SocketChannel的第二個實例化方式。

到這裏,咱們應該能理解SocketChannel了,它不只僅是TCP客戶端,它表明的是一個網絡通道,可讀可寫。

ServerSocketChannel不和Buffer打交道了,由於它並不實際處理數據,一旦接到請求,就會實例化一個SocketChannel,以後再這個簡介通道上傳遞的數據它就無論了,它會繼續監聽端口等待下一個鏈接。

DatagramChannel

UDP 和 TCP 不同,DatagramChannel 一個類處理了服務端和客戶端。

UDP 是面向無鏈接的,不須要和對方握手,不須要通知對方,就能夠直接將數據包投出去,至於能不能送達,它是不知道的.

監聽端口:

DatagramChannel channel = DatagramChannel.open();
channel.socket().bind(new InetSocketAddress(9090));

ByteBuffer buf = ByteBuffer.allocate(48);

channel.receive(buf);

發送數據:

String newData = "New String to write to file..."
                    + System.currentTimeMillis();

ByteBuffer buf = ByteBuffer.allocate(48);
buf.put(newData.getBytes());
buf.flip();

int bytesSent = channel.send(buf, new InetSocketAddress("jenkov.com", 80));

 

Selector

Selector創建在非阻塞的基礎之上,你們常常聽到的多路複用在java世界中指的就是它,用於實現一個線程管理多個Channel。

開啓Selector:

Selector selector = Selector.open();

將 Channel 註冊到 Selector 上。前面咱們說了,Selector 創建在非阻塞模式之上,因此註冊到 Selector 的 Channel 必需要支持非阻塞模式,FileChannel 不支持非阻塞,咱們這裏討論最多見的 SocketChannel 和 ServerSocketChannel。

// 將通道設置爲非阻塞模式,由於默認都是阻塞模式的
channel.configureBlocking(false);
// 註冊
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

 

register 方法的第二個 int 型參數(使用二進制的標記位)用於代表須要監聽哪些感興趣的事件,共如下四種事件:

SelectionKey.OP_READ:對應 00000001,通道中有數據能夠進行讀取

SelectionKey.OP_WRITE:對應 00000100,能夠往通道中寫入數據

SelectionKey.OP_CONNECT:對應 00001000,成功創建 TCP 鏈接

SelectionKey.OP_ACCEPT:對應 00010000,接受 TCP 鏈接

咱們能夠同時監聽一個 Channel 中的發生的多個事件,好比咱們要監聽 ACCEPT 和 READ 事件,那麼指定參數爲二進制的 00010001 即十進制數值 17 便可。

註冊方法返回值是 SelectionKey 實例,它包含了 Channel 和 Selector 信息,也包括了一個叫作 Interest Set 的信息,即咱們設置的咱們感興趣的正在監聽的事件集合。

調用 select() 方法獲取通道信息。用於判斷是否有咱們感興趣的事件已經發生了。

示例:

 

Selector selector = Selector.open();

channel.configureBlocking(false);

SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

while(true) {
  // 判斷是否有事件準備好
  int readyChannels = selector.select();
  if(readyChannels == 0) continue;

  // 遍歷
  Set<SelectionKey> selectedKeys = selector.selectedKeys();
  Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
  while(keyIterator.hasNext()) {
    SelectionKey key = keyIterator.next();

    if(key.isAcceptable()) {
        // a connection was accepted by a ServerSocketChannel.

    } else if (key.isConnectable()) {
        // a connection was established with a remote server.

    } else if (key.isReadable()) {
        // a channel is ready for reading

    } else if (key.isWritable()) {
        // a channel is ready for writing
    }

    keyIterator.remove();
  }
}

 

對於Selector,須要熟悉如下幾個方法:

select()

調用此方法,會將上次 select 以後的準備好的 channel 對應的 SelectionKey 複製到 selected set 中。若是沒有任何通道準備好,這個方法會阻塞,直到至少有一個通道準備好。

 

selectNow()

功能和 select 同樣,區別在於若是沒有準備好的通道,那麼此方法會當即返回 0。

select(long timeout)

看了前面兩個,這個應該很好理解了,若是沒有通道準備好,此方法會等待一會

wakeup()

這個方法是用來喚醒等待在 select() 和 select(timeout) 上的線程的。若是 wakeup() 先被調用,此時沒有線程在 select 上阻塞,那麼以後的一個 select() 或 select(timeout) 會當即返回,而不會阻塞,固然,它只會做用一次。

 

 

調

用 Buffer 的 flip() 方法,能夠從寫入模式切換到讀取模式。其實這個方法也就是設置了一下 position 和 limit 值罷了

相關文章
相關標籤/搜索