Netty 源碼分析之 番外篇 Java NIO 的前生今世

簡介

Java NIO 是由 Java 1.4 引進的異步 IO.
Java NIO 由如下幾個核心部分組成:java

  • Channel
  • Buffer
  • Selector

NIO 和 IO 的對比

IO 和 NIO 的區別主要體如今三個方面:數組

  • IO 基於流(Stream oriented), 而 NIO 基於 Buffer (Buffer oriented)
  • IO 操做是阻塞的, 而 NIO 操做是非阻塞的
  • IO 沒有 selector 概念, 而 NIO 有 selector 概念.

基於 Stream 與基於 Buffer

傳統的 IO 是面向字節流或字符流的, 而在 NIO 中, 咱們拋棄了傳統的 IO 流, 而是引入了 ChannelBuffer 的概念. 在 NIO 中, 我只能從 Channel 中讀取數據到 Buffer 中或將數據從 Buffer 中寫入到 Channel.
那麼什麼是 基於流 呢? 在通常的 Java IO 操做中, 咱們以流式的方式順序地從一個 Stream 中讀取一個或多個字節, 所以咱們也就不能隨意改變讀取指針的位置.
基於 Buffer 就顯得有點不一樣了. 咱們首先須要從 Channel 中讀取數據到 Buffer 中, 當 Buffer 中有數據後, 咱們就能夠對這些數據進行操做了. 不像 IO 那樣是順序操做, NIO 中咱們能夠隨意地讀取任意位置的數據.緩存

阻塞和非阻塞

Java 提供的各類 Stream 操做都是阻塞的, 例如咱們調用一個 read 方法讀取一個文件的內容, 那麼調用 read 的線程會被阻塞住, 直到 read 操做完成.
而 NIO 的非阻塞模式容許咱們非阻塞地進行 IO 操做. 例如咱們須要從網絡中讀取數據, 在 NIO 的非阻塞模式中, 當咱們調用 read 方法時, 若是此時有數據, 則 read 讀取並返回; 若是此時沒有數據, 則 read 直接返回, 而不會阻塞當前線程.服務器

selector

selector 是 NIO 中才有的概念, 它是 Java NIO 之因此能夠非阻塞地進行 IO 操做的關鍵.
經過 Selector, 一個線程能夠監聽多個 Channel 的 IO 事件, 當咱們向一個 Selector 中註冊了 Channel 後, Selector 內部的機制就能夠自動地爲咱們不斷地查詢(select) 這些註冊的 Channel 是否有已就緒的 IO 事件(例如可讀, 可寫, 網絡鏈接完成等). 經過這樣的 Selector 機制, 咱們就能夠很簡單地使用一個線程高效地管理多個 Channel 了.網絡

Java NIO Channel

一般來講, 全部的 NIO 的 I/O 操做都是從 Channel 開始的. 一個 channel 相似於一個 stream.
java Stream 和 NIO Channel 對比dom

  • 咱們能夠在同一個 Channel 中執行讀和寫操做, 然而同一個 Stream 僅僅支持讀或寫.
  • Channel 能夠異步地讀寫, 而 Stream 是阻塞的同步讀寫.
  • Channel 老是從 Buffer 中讀取數據, 或將數據寫入到 Buffer 中.

Channel 類型有:異步

  • FileChannel, 文件操做
  • DatagramChannel, UDP 操做
  • SocketChannel, TCP 操做
  • ServerSocketChannel, TCP 操做, 使用在服務器端.

這些通道涵蓋了 UDP 和 TCP網絡 IO以及文件 IO.
基本的 Channel 使用例子:socket

public static void main( String[] args ) throws Exception
{
    RandomAccessFile aFile = new RandomAccessFile("/Users/xiongyongshun/settings.xml", "rw");
    FileChannel inChannel = aFile.getChannel();

    ByteBuffer buf = ByteBuffer.allocate(48);

    int bytesRead = inChannel.read(buf);
    while (bytesRead != -1) {
        buf.flip();

        while(buf.hasRemaining()){
            System.out.print((char) buf.get());
        }

        buf.clear();
        bytesRead = inChannel.read(buf);
    }
    aFile.close();
}

FileChannel

FileChannel 是操做文件的Channel, 咱們能夠經過 FileChannel 從一個文件中讀取數據, 也能夠將數據寫入到文件中.
注意, FileChannel 不能設置爲非阻塞模式.this

打開 FileChannel

RandomAccessFile aFile     = new RandomAccessFile("data/nio-data.txt", "rw");
FileChannel      inChannel = aFile.getChannel();

從 FileChannel 中讀取數據

ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = inChannel.read(buf);

寫入數據

String newData = "New String to write to file..." + System.currentTimeMillis();

ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());

buf.flip();

while(buf.hasRemaining()) {
    channel.write(buf);
}

關閉

當咱們對 FileChannel 的操做完成後, 必須將其關閉操作系統

channel.close();

設置 position

long pos channel.position();
channel.position(pos +123);

文件大小

咱們能夠經過 channel.size()獲取關聯到這個 Channel 中的文件的大小. 注意, 這裏返回的是文件的大小, 而不是 Channel 中剩餘的元素個數.

截斷文件

channel.truncate(1024);

將文件的大小截斷爲1024字節.

強制寫入

咱們能夠強制將緩存的未寫入的數據寫入到文件中:

channel.force(true);

SocketChannel

SocketChannel 是一個客戶端用來進行 TCP 鏈接的 Channel.
建立一個 SocketChannel 的方法有兩種:

  • 打開一個 SocketChannel, 而後將其鏈接到某個服務器中
  • 當一個 ServerSocketChannel 接受到鏈接請求時, 會返回一個 SocketChannel 對象.

打開 SocketChannel

SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("http://example.com", 80));

關閉

socketChannel.close();

讀取數據

ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = socketChannel.read(buf);

若是 read()返回 -1, 那麼表示鏈接中斷了.

寫入數據

String newData = "New String to write to file..." + System.currentTimeMillis();

ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());

buf.flip();

while(buf.hasRemaining()) {
    channel.write(buf);
}

非阻塞模式

咱們能夠設置 SocketChannel 爲異步模式, 這樣咱們的 connect, read, write 都是異步的了.

鏈接
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("http://example.com", 80));

while(! socketChannel.finishConnect() ){
    //wait, or do something else...    
}

在異步模式中, 或許鏈接尚未創建, connect 方法就返回了, 所以咱們須要檢查當前是不是鏈接到了主機, 所以經過一個 while 循環來判斷.

讀寫

在異步模式下, 讀寫的方式是同樣的.
在讀取時, 由於是異步的, 所以咱們必須檢查 read 的返回值, 來判斷當前是否讀取到了數據.

ServerSocketChannel

ServerSocketChannel 顧名思義, 是用在服務器爲端的, 能夠監聽客戶端的 TCP 鏈接, 例如:

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(9999));
while(true){
    SocketChannel socketChannel =
            serverSocketChannel.accept();

    //do something with socketChannel...
}

打開 關閉

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.close();

監聽鏈接

咱們可使用ServerSocketChannel.accept()方法來監聽客戶端的 TCP 鏈接請求, accept()方法會阻塞, 直到有鏈接到來, 當有鏈接時, 這個方法會返回一個 SocketChannel 對象:

while(true){
    SocketChannel socketChannel =
            serverSocketChannel.accept();

    //do something with socketChannel...
}

非阻塞模式

在非阻塞模式下, accept()是非阻塞的, 所以若是此時沒有鏈接到來, 那麼 accept()方法會返回null:

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

serverSocketChannel.socket().bind(new InetSocketAddress(9999));
serverSocketChannel.configureBlocking(false);

while(true){
    SocketChannel socketChannel =
            serverSocketChannel.accept();

    if(socketChannel != null){
        //do something with socketChannel...
        }
}

DatagramChannel

DatagramChannel 是用來處理 UDP 鏈接的.

打開

DatagramChannel channel = DatagramChannel.open();
channel.socket().bind(new InetSocketAddress(9999));

讀取數據

ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();

channel.receive(buf);

發送數據

String newData = "New String to write to file..."
                    + System.currentTimeMillis();
    
ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());
buf.flip();

int bytesSent = channel.send(buf, new InetSocketAddress("example.com", 80));

鏈接到指定地址

由於 UDP 是非鏈接的, 所以這個的 connect 並非向 TCP 同樣真正意義上的鏈接, 而是它會講 DatagramChannel 鎖住, 所以咱們僅僅能夠從指定的地址中讀取或寫入數據.

channel.connect(new InetSocketAddress("example.com", 80));

Java NIO Buffer

當咱們須要與 NIO Channel 進行交互時, 咱們就須要使用到 NIO Buffer, 即數據從 Buffer讀取到 Channel 中, 而且從 Channel 中寫入到 Buffer 中.
實際上, 一個 Buffer 其實就是一塊內存區域, 咱們能夠在這個內存區域中進行數據的讀寫. NIO Buffer 實際上是這樣的內存塊的一個封裝, 並提供了一些操做方法讓咱們可以方便地進行數據的讀寫.
Buffer 類型有:

  • ByteBuffer
  • CharBuffer
  • DoubleBuffer
  • FloatBuffer
  • IntBuffer
  • LongBuffer
  • ShortBuffer

這些 Buffer 覆蓋了能從 IO 中傳輸的全部的 Java 基本數據類型.

NIO Buffer 的基本使用

使用 NIO Buffer 的步驟以下:

  • 將數據寫入到 Buffer 中.
  • 調用 Buffer.flip()方法, 將 NIO Buffer 轉換爲讀模式.
  • 從 Buffer 中讀取數據
  • 調用 Buffer.clear() 或 Buffer.compact()方法, 將 Buffer 轉換爲寫模式.

當咱們將數據寫入到 Buffer 中時, Buffer 會記錄咱們已經寫了多少的數據, 當咱們須要從 Buffer 中讀取數據時, 必須調用 Buffer.flip()將 Buffer 切換爲讀模式.
一旦讀取了全部的 Buffer 數據, 那麼咱們必須清理 Buffer, 讓其重新可寫, 清理 Buffer 能夠調用 Buffer.clear() 或 Buffer.compact().
例如:

/**
 * @author xiongyongshun
 * @Email yongshun1228@gmail.com
 * @version 1.0
 * @created 16/8/1 13:13
 */
public class Test {
    public static void main(String[] args) {
        IntBuffer intBuffer = IntBuffer.allocate(2);
        intBuffer.put(12345678);
        intBuffer.put(2);
        intBuffer.flip();
        System.err.println(intBuffer.get());
        System.err.println(intBuffer.get());
    }
}

上述中, 咱們分配兩個單位大小的 IntBuffer, 所以它能夠寫入兩個 int 值.
咱們使用 put 方法將 int 值寫入, 而後使用 flip 方法將 buffer 轉換爲讀模式, 而後連續使用 get 方法從 buffer 中獲取這兩個 int 值.
每當調用一次 get 方法讀取數據時, buffer 的讀指針都會向前移動一個單位長度(在這裏是一個 int 長度)

Buffer 屬性

一個 Buffer 有三個屬性:

  • capacity
  • position
  • limit

其中 positionlimit 的含義與 Buffer 處於讀模式或寫模式有關, 而 capacity 的含義與 Buffer 所處的模式無關.

Capacity

一個內存塊會有一個固定的大小, 即容量(capacity), 咱們最多寫入capacity 個單位的數據到 Buffer 中, 例如一個 DoubleBuffer, 其 Capacity 是100, 那麼咱們最多能夠寫入100個 double 數據.

Position

當從一個 Buffer 中寫入數據時, 咱們是從 Buffer 的一個肯定的位置(position)開始寫入的. 在最初的狀態時, position 的值是0. 每當咱們寫入了一個單位的數據後, position 就會遞增一.
當咱們從 Buffer 中讀取數據時, 咱們也是從某個特定的位置開始讀取的. 當咱們調用了 filp()方法將 Buffer 從寫模式轉換到讀模式時, position 的值會自動被設置爲0, 每當咱們讀取一個單位的數據, position 的值遞增1.
position 表示了讀寫操做的位置指針.

limit

limit - position 表示此時還能夠寫入/讀取多少單位的數據.
例如在寫模式, 若是此時 limit 是10, position 是2, 則表示已經寫入了2個單位的數據, 還能夠寫入 10 - 2 = 8 個單位的數據.

例子:

public class Test {
    public static void main(String args[]) {
        IntBuffer intBuffer = IntBuffer.allocate(10);
        intBuffer.put(10);
        intBuffer.put(101);
        System.err.println("Write mode: ");
        System.err.println("\tCapacity: " + intBuffer.capacity());
        System.err.println("\tPosition: " + intBuffer.position());
        System.err.println("\tLimit: " + intBuffer.limit());

        intBuffer.flip();
        System.err.println("Read mode: ");
        System.err.println("\tCapacity: " + intBuffer.capacity());
        System.err.println("\tPosition: " + intBuffer.position());
        System.err.println("\tLimit: " + intBuffer.limit());
    }
}

這裏咱們首先寫入兩個 int 值, 此時 capacity = 10, position = 2, limit = 10.
而後咱們調用 flip 轉換爲讀模式, 此時 capacity = 10, position = 0, limit = 2;

分配 Buffer

爲了獲取一個 Buffer 對象, 咱們首先須要分配內存空間. 每一個類型的 Buffer 都有一個 allocate()方法, 咱們能夠經過這個方法分配 Buffer:

ByteBuffer buf = ByteBuffer.allocate(48);

這裏咱們分配了48 * sizeof(Byte)字節的內存空間.

CharBuffer buf = CharBuffer.allocate(1024);

這裏咱們分配了大小爲1024個字符的 Buffer, 即 這個 Buffer 能夠存儲1024 個 Char, 其大小爲 1024 * 2 個字節.

關於 Direct Buffer 和 Non-Direct Buffer 的區別

Direct Buffer:

  • 所分配的內存不在 JVM 堆上, 不受 GC 的管理.(可是 Direct Buffer 的 Java 對象是由 GC 管理的, 所以當發生 GC, 對象被回收時, Direct Buffer 也會被釋放)
  • 由於 Direct Buffer 不在 JVM 堆上分配, 所以 Direct Buffer 對應用程序的內存佔用的影響就不那麼明顯(實際上仍是佔用了這麼多內存, 可是 JVM 很差統計到非 JVM 管理的內存.)
  • 申請和釋放 Direct Buffer 的開銷比較大. 所以正確的使用 Direct Buffer 的方式是在初始化時申請一個 Buffer, 而後不斷複用此 buffer, 在程序結束後才釋放此 buffer.
  • 使用 Direct Buffer 時, 當進行一些底層的系統 IO 操做時, 效率會比較高, 由於此時 JVM 不須要拷貝 buffer 中的內存到中間臨時緩衝區中.

Non-Direct Buffer:

  • 直接在 JVM 堆上進行內存的分配, 本質上是 byte[] 數組的封裝.
  • 由於 Non-Direct Buffer 在 JVM 堆中, 所以當進行操做系統底層 IO 操做中時, 會將此 buffer 的內存複製到中間臨時緩衝區中. 所以 Non-Direct Buffer 的效率就較低.

寫入數據到 Buffer

int bytesRead = inChannel.read(buf); //read into buffer.
buf.put(127);

從 Buffer 中讀取數據

//read from buffer into channel.
int bytesWritten = inChannel.write(buf);
byte aByte = buf.get();

重置 position

Buffer.rewind()方法能夠重置 position 的值爲0, 所以咱們能夠從新讀取/寫入 Buffer 了.
若是是讀模式, 則重置的是讀模式的 position, 若是是寫模式, 則重置的是寫模式的 position.
例如:

/**
 * @author xiongyongshun
 * @Email yongshun1228@gmail.com
 * @version 1.0
 * @created 16/8/1 13:13
 */
public class Test {
    public static void main(String[] args) {
        IntBuffer intBuffer = IntBuffer.allocate(2);
        intBuffer.put(1);
        intBuffer.put(2);
        System.err.println("position: " + intBuffer.position());

        intBuffer.rewind();
        System.err.println("position: " + intBuffer.position());
        intBuffer.put(1);
        intBuffer.put(2);
        System.err.println("position: " + intBuffer.position());

        
        intBuffer.flip();
        System.err.println("position: " + intBuffer.position());
        intBuffer.get();
        intBuffer.get();
        System.err.println("position: " + intBuffer.position());

        intBuffer.rewind();
        System.err.println("position: " + intBuffer.position());
    }
}

rewind() 主要針對於讀模式. 在讀模式時, 讀取到 limit 後, 能夠調用 rewind() 方法, 將讀 position 置爲0.

關於 mark()和 reset()

咱們能夠經過調用 Buffer.mark()將當前的 position 的值保存起來, 隨後能夠經過調用 Buffer.reset()方法將 position 的值回覆回來.
例如:

/**
 * @author xiongyongshun
 * @Email yongshun1228@gmail.com
 * @version 1.0
 * @created 16/8/1 13:13
 */
public class Test {
    public static void main(String[] args) {
        IntBuffer intBuffer = IntBuffer.allocate(2);
        intBuffer.put(1);
        intBuffer.put(2);
        intBuffer.flip();
        System.err.println(intBuffer.get());
        System.err.println("position: " + intBuffer.position());
        intBuffer.mark();
        System.err.println(intBuffer.get());

        System.err.println("position: " + intBuffer.position());
        intBuffer.reset();
        System.err.println("position: " + intBuffer.position());
        System.err.println(intBuffer.get());
    }
}

這裏咱們寫入兩個 int 值, 而後首先讀取了一個值. 此時讀 position 的值爲1.
接着咱們調用 mark() 方法將當前的 position 保存起來(在讀模式, 所以保存的是讀的 position), 而後再次讀取, 此時 position 就是2了.
接着使用 reset() 恢復原來的讀 position, 所以讀 position 就爲1, 能夠再次讀取數據.

flip, rewind 和 clear 的區別

flip

方法源碼:

public final Buffer flip() {
    limit = position;
    position = 0;
    mark = -1;
    return this;
}

Buffer 的讀/寫模式共用一個 position 和 limit 變量.
當從寫模式變爲讀模式時, 原先的 寫 position 就變成了讀模式的 limit.

rewind

方法源碼

public final Buffer rewind() {
    position = 0;
    mark = -1;
    return this;
}

rewind, 即倒帶, 這個方法僅僅是將 position 置爲0.

clear

方法源碼:

public final Buffer clear() {
    position = 0;
    limit = capacity;
    mark = -1;
    return this;
}

根據源碼咱們能夠知道, clear 將 positin 設置爲0, 將 limit 設置爲 capacity.
clear 方法使用場景:

  • 在一個已經寫滿數據的 buffer 中, 調用 clear, 能夠從頭讀取 buffer 的數據.
  • 爲了將一個 buffer 填充滿數據, 能夠調用 clear, 而後一直寫入, 直到達到 limit.
例子:
IntBuffer intBuffer = IntBuffer.allocate(2);
intBuffer.flip();
System.err.println("position: " + intBuffer.position());
System.err.println("limit: " + intBuffer.limit());
System.err.println("capacity: " + intBuffer.capacity());

// 這裏不能讀, 由於 limit == position == 0, 沒有數據.
//System.err.println(intBuffer.get());

intBuffer.clear();
System.err.println("position: " + intBuffer.position());
System.err.println("limit: " + intBuffer.limit());
System.err.println("capacity: " + intBuffer.capacity());

// 這裏能夠讀取數據了, 由於 clear 後, limit == capacity == 2, position == 0,
// 即便咱們沒有寫入任何的數據到 buffer 中.
System.err.println(intBuffer.get()); // 讀取到0
System.err.println(intBuffer.get()); // 讀取到0

Buffer 的比較

咱們能夠經過 equals() 或 compareTo() 方法比較兩個 Buffer, 當且僅當以下條件知足時, 兩個 Buffer 是相等的:

  • 兩個 Buffer 是相同類型的
  • 兩個 Buffer 的剩餘的數據個數是相同的
  • 兩個 Buffer 的剩餘的數據都是相同的.

經過上述條件咱們能夠發現, 比較兩個 Buffer 時, 並非 Buffer 中的每一個元素都進行比較, 而是比較 Buffer 中剩餘的元素.

Selector

Selector 容許一個單一的線程來操做多個 Channel. 若是咱們的應用程序中使用了多個 Channel, 那麼使用 Selector 很方便的實現這樣的目的, 可是由於在一個線程中使用了多個 Channel, 所以也會形成了每一個 Channel 傳輸效率的下降.
使用 Selector 的圖解以下:

爲了使用 Selector, 咱們首先須要將 Channel 註冊到 Selector 中, 隨後調用 Selector 的 select()方法, 這個方法會阻塞, 直到註冊在 Selector 中的 Channel 發送可讀寫事件. 當這個方法返回後, 當前的這個線程就能夠處理 Channel 的事件了.

建立選擇器

經過 Selector.open()方法, 咱們能夠建立一個選擇器:

Selector selector = Selector.open();

將 Channel 註冊到選擇器中

爲了使用選擇器管理 Channel, 咱們須要將 Channel 註冊到選擇器中:

channel.configureBlocking(false);

SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

注意, 若是一個 Channel 要註冊到 Selector 中, 那麼這個 Channel 必須是非阻塞的, 即channel.configureBlocking(false);
由於 Channel 必需要是非阻塞的, 所以 FileChannel 是不可以使用選擇器的, 由於 FileChannel 都是阻塞的.

注意到, 在使用 Channel.register()方法時, 第二個參數指定了咱們對 Channel 的什麼類型的事件感興趣, 這些事件有:

  • Connect, 即鏈接事件(TCP 鏈接), 對應於SelectionKey.OP_CONNECT
  • Accept, 即確認事件, 對應於SelectionKey.OP_ACCEPT
  • Read, 即讀事件, 對應於SelectionKey.OP_READ, 表示 buffer 可讀.
  • Write, 即寫事件, 對應於SelectionKey.OP_WRITE, 表示 buffer 可寫.

一個 Channel發出一個事件也能夠稱爲** 對於某個事件, Channel 準備好了. 所以一個 Channel 成功鏈接到了另外一個服務器也能夠被稱爲 connect ready.
咱們可使用或運算
|**來組合多個事件, 例如:

int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;

注意, 一個 Channel 僅僅能夠被註冊到一個 Selector 一次, 若是將 Channel 註冊到 Selector 屢次, 那麼其實就是至關於更新 SelectionKey 的 interest set. 例如:

channel.register(selector, SelectionKey.OP_READ);
channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);

上面的 channel 註冊到同一個 Selector 兩次了, 那麼第二次的註冊其實就是至關於更新這個 Channel 的 interest set 爲 SelectionKey.OP_READ | SelectionKey.OP_WRITE.

關於 SelectionKey

如上所示, 當咱們使用 register 註冊一個 Channel 時, 會返回一個 SelectionKey 對象, 這個對象包含了以下內容:

  • interest set, 即咱們感興趣的事件集, 即在調用 register 註冊 channel 時所設置的 interest set.
  • ready set
  • channel
  • selector
  • attached object, 可選的附加對象

interest set

咱們能夠經過以下方式獲取 interest set:

int interestSet = selectionKey.interestOps();

boolean isInterestedInAccept  = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead    = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite   = interestSet & SelectionKey.OP_WRITE;

ready set

表明了 Channel 所準備好了的操做.
咱們能夠像判斷 interest set 同樣操做 Ready set, 可是咱們還可使用以下方法進行判斷:

int readySet = selectionKey.readyOps();

selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();

Channel 和 Selector

咱們能夠經過 SelectionKey 獲取相對應的 Channel 和 Selector:

Channel  channel  = selectionKey.channel();
Selector selector = selectionKey.selector();

Attaching Object

咱們能夠在selectionKey中附加一個對象:

selectionKey.attach(theObject);
Object attachedObj = selectionKey.attachment();

或者在註冊時直接附加:

SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);

經過 Selector 選擇 Channel

咱們能夠經過 Selector.select()方法獲取對某件事件準備好了的 Channel, 即若是咱們在註冊 Channel 時, 對其的可寫事件感興趣, 那麼當 select()返回時, 咱們就能夠獲取 Channel 了.

注意, select()方法返回的值表示有多少個 Channel 可操做.

獲取可操做的 Channel

若是 select()方法返回值表示有多個 Channel 準備好了, 那麼咱們能夠經過 Selected key set 訪問這個 Channel:

Set<SelectionKey> selectedKeys = selector.selectedKeys();

Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

while(keyIterator.hasNext()) {
    
    SelectionKey key = keyIterator.next();

    if(key.isAcceptable()) {
        // a connection was accepted by a ServerSocketChannel.

    } else if (key.isConnectable()) {
        // a connection was established with a remote server.

    } else if (key.isReadable()) {
        // a channel is ready for reading

    } else if (key.isWritable()) {
        // a channel is ready for writing
    }

    keyIterator.remove();
}

注意, 在每次迭代時, 咱們都調用 "keyIterator.remove()" 將這個 key 從迭代器中刪除, 由於 select() 方法僅僅是簡單地將就緒的 IO 操做放到 selectedKeys 集合中, 所以若是咱們從 selectedKeys 獲取到一個 key, 可是沒有將它刪除, 那麼下一次 select 時, 這個 key 所對應的 IO 事件還在 selectedKeys 中.

例如此時咱們收到 OP_ACCEPT 通知, 而後咱們進行相關處理, 可是並無將這個 Key 從 SelectedKeys 中刪除, 那麼下一次 select() 返回時 咱們還能夠在 SelectedKeys 中獲取到 OP_ACCEPT 的 key.

注意, 咱們能夠動態更改 SekectedKeys 中的 key 的 interest set.

例如在 OP_ACCEPT 中, 咱們能夠將 interest set 更新爲 OP_READ, 這樣 Selector 就會將這個 Channel 的 讀 IO 就緒事件包含進來了.

Selector 的基本使用流程

咱們再來回顧一下 Java NIO 中的 Selector 的使用流程:

  1. 經過 Selector.open() 打開一個 Selector.
  2. 將 Channel 註冊到 Selector 中, 並設置須要監聽的事件(interest set)
  3. 不斷重複:
  • 調用 select() 方法
  • 調用 selector.selectedKeys() 獲取 selected keys
  • 迭代每一個 selected key:
  • 1) 從 selected key 中獲取 對應的 Channel 和附加信息(若是有的話)
  • 2) 判斷是哪些 IO 事件已經就緒了, 而後處理它們. 若是是 OP_ACCEPT 事件, 則調用 "SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept()" 獲取 SocketChannel, 並將它設置爲 非阻塞的, 而後將這個 Channel 註冊到 Selector 中.
  • 3) 根據須要更改 selected key 的監聽事件.
  • 4) 將已經處理過的 key 從 selected keys 集合中刪除.

關閉 Selector

當調用了 Selector.close()方法時, 咱們實際上是關閉了 Selector 自己而且將全部的 SelectionKey 失效, 可是並不會關閉 Channel.

完整的 Selector 例子

/**
 * @author xiongyongshun
 * @Email yongshun1228@gmail.com
 * @version 1.0
 * @created 16/8/1 13:13
 */
public class NioEchoServer {
    private static final int BUF_SIZE = 256;
    private static final int TIMEOUT = 3000;

    public static void main(String args[]) throws Exception {
        // 打開服務端 Socket
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

        // 打開 Selector
        Selector selector = Selector.open();

        // 服務端 Socket 監聽8080端口, 並配置爲非阻塞模式
        serverSocketChannel.socket().bind(new InetSocketAddress(8080));
        serverSocketChannel.configureBlocking(false);

        // 將 channel 註冊到 selector 中.
        // 一般咱們都是先註冊一個 OP_ACCEPT 事件, 而後在 OP_ACCEPT 到來時, 再將這個 Channel 的 OP_READ
        // 註冊到 Selector 中.
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        while (true) {
            // 經過調用 select 方法, 阻塞地等待 channel I/O 可操做
            if (selector.select(TIMEOUT) == 0) {
                System.out.print(".");
                continue;
            }

            // 獲取 I/O 操做就緒的 SelectionKey, 經過 SelectionKey 能夠知道哪些 Channel 的哪類 I/O 操做已經就緒.
            Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();

            while (keyIterator.hasNext()) {

                // 當獲取一個 SelectionKey 後, 就要將它刪除, 表示咱們已經對這個 IO 事件進行了處理.
                keyIterator.remove();

                SelectionKey key = keyIterator.next();

                if (key.isAcceptable()) {
                    // 當 OP_ACCEPT 事件到來時, 咱們就有從 ServerSocketChannel 中獲取一個 SocketChannel,
                    // 表明客戶端的鏈接
                    // 注意, 在 OP_ACCEPT 事件中, 從 key.channel() 返回的 Channel 是 ServerSocketChannel.
                    // 而在 OP_WRITE 和 OP_READ 中, 從 key.channel() 返回的是 SocketChannel.
                    SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
                    clientChannel.configureBlocking(false);
                    //在 OP_ACCEPT 到來時, 再將這個 Channel 的 OP_READ 註冊到 Selector 中.
                    // 注意, 這裏咱們若是沒有設置 OP_READ 的話, 即 interest set 仍然是 OP_CONNECT 的話, 那麼 select 方法會一直直接返回.
                    clientChannel.register(key.selector(), OP_READ, ByteBuffer.allocate(BUF_SIZE));
                }

                if (key.isReadable()) {
                    SocketChannel clientChannel = (SocketChannel) key.channel();
                    ByteBuffer buf = (ByteBuffer) key.attachment();
                    long bytesRead = clientChannel.read(buf);
                    if (bytesRead == -1) {
                        clientChannel.close();
                    } else if (bytesRead > 0) {
                        key.interestOps(OP_READ | SelectionKey.OP_WRITE);
                        System.out.println("Get data length: " + bytesRead);
                    }
                }

                if (key.isValid() && key.isWritable()) {
                    ByteBuffer buf = (ByteBuffer) key.attachment();
                    buf.flip();
                    SocketChannel clientChannel = (SocketChannel) key.channel();

                    clientChannel.write(buf);

                    if (!buf.hasRemaining()) {
                        key.interestOps(OP_READ);
                    }
                    buf.compact();
                }
            }
        }
    }
}
相關文章
相關標籤/搜索