Selector 容許一個單一的線程來操做多個 Channel. 若是咱們的應用程序中使用了多個 Channel, 那麼使用 Selector 很方便的實現這樣的目的, 可是由於在一個線程中使用了多個 Channel, 所以也會形成了每一個 Channel 傳輸效率的下降.
使用 Selector 的圖解以下:segmentfault
爲了使用 Selector, 咱們首先須要將 Channel 註冊到 Selector 中, 隨後調用 Selector 的 select()方法, 這個方法會阻塞, 直到註冊在 Selector 中的 Channel 發送可讀寫事件. 當這個方法返回後, 當前的這個線程就能夠處理 Channel 的事件了.服務器
經過 Selector.open()方法, 咱們能夠建立一個選擇器:socket
Selector selector = Selector.open();
爲了使用選擇器管理 Channel, 咱們須要將 Channel 註冊到選擇器中:spa
channel.configureBlocking(false); SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
注意
, 若是一個 Channel 要註冊到 Selector 中, 那麼這個 Channel 必須是非阻塞的, 即channel.configureBlocking(false);
由於 Channel 必需要是非阻塞的, 所以 FileChannel 是不可以使用選擇器的, 由於 FileChannel 都是阻塞的.線程
注意到, 在使用 Channel.register()方法時, 第二個參數指定了咱們對 Channel 的什麼類型的事件感興趣, 這些事件有:rest
Connect, 即鏈接事件(TCP 鏈接), 對應於SelectionKey.OP_CONNECTcode
Accept, 即確認事件, 對應於SelectionKey.OP_ACCEPTserver
Read, 即讀事件, 對應於SelectionKey.OP_READ, 表示 buffer 可讀.對象
Write, 即寫事件, 對應於SelectionKey.OP_WRITE, 表示 buffer 可寫.blog
一個 Channel發出一個事件也能夠稱爲 對於某個事件, Channel 準備好了. 所以一個 Channel 成功鏈接到了另外一個服務器也能夠被稱爲 connect ready.
咱們可使用或運算|來組合多個事件, 例如:
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
注意, 一個 Channel 僅僅能夠被註冊到一個 Selector 一次, 若是將 Channel 註冊到 Selector 屢次, 那麼其實就是至關於更新 SelectionKey 的 interest set
. 例如:
channel.register(selector, SelectionKey.OP_READ); channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
上面的 channel 註冊到同一個 Selector 兩次了, 那麼第二次的註冊其實就是至關於更新這個 Channel 的 interest set 爲 SelectionKey.OP_READ | SelectionKey.OP_WRITE.
如上所示, 當咱們使用 register 註冊一個 Channel 時, 會返回一個 SelectionKey 對象, 這個對象包含了以下內容:
interest set, 即咱們感興趣的事件集, 即在調用 register 註冊 channel 時所設置的 interest set.
ready set
channel
selector
attached object, 可選的附加對象
咱們能夠經過以下方式獲取 interest set:
int interestSet = selectionKey.interestOps(); boolean isInterestedInAccept = interestSet & SelectionKey.OP_ACCEPT; boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT; boolean isInterestedInRead = interestSet & SelectionKey.OP_READ; boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;
表明了 Channel 所準備好了的操做.
咱們能夠像判斷 interest set 同樣操做 Ready set, 可是咱們還可使用以下方法進行判斷:
int readySet = selectionKey.readyOps(); selectionKey.isAcceptable(); selectionKey.isConnectable(); selectionKey.isReadable(); selectionKey.isWritable();
咱們能夠經過 SelectionKey 獲取相對應的 Channel 和 Selector:
Channel channel = selectionKey.channel(); Selector selector = selectionKey.selector();
咱們能夠在selectionKey中附加一個對象:
selectionKey.attach(theObject); Object attachedObj = selectionKey.attachment();
或者在註冊時直接附加:
SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);
咱們能夠經過 Selector.select()方法獲取對某件事件準備好了的 Channel, 即若是咱們在註冊 Channel 時, 對其的可寫事件感興趣, 那麼當 select()返回時, 咱們就能夠獲取 Channel 了.
注意
, select()方法返回的值表示有多少個 Channel 可操做.
若是 select()方法返回值表示有多個 Channel 準備好了, 那麼咱們能夠經過 Selected key set 訪問這個 Channel:
Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> keyIterator = selectedKeys.iterator(); while(keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); if(key.isAcceptable()) { // a connection was accepted by a ServerSocketChannel. } else if (key.isConnectable()) { // a connection was established with a remote server. } else if (key.isReadable()) { // a channel is ready for reading } else if (key.isWritable()) { // a channel is ready for writing } keyIterator.remove(); }
注意, 在每次迭代時, 咱們都調用 "keyIterator.remove()" 將這個 key 從迭代器中刪除, 由於 select() 方法僅僅是簡單地將就緒的 IO 操做放到 selectedKeys 集合中, 所以若是咱們從 selectedKeys 獲取到一個 key, 可是沒有將它刪除, 那麼下一次 select 時, 這個 key 所對應的 IO 事件還在 selectedKeys 中.
例如此時咱們收到 OP_ACCEPT 通知, 而後咱們進行相關處理, 可是並無將這個 Key 從 SelectedKeys 中刪除, 那麼下一次 select() 返回時 咱們還能夠在 SelectedKeys 中獲取到 OP_ACCEPT 的 key.注意, 咱們能夠動態更改 SekectedKeys 中的 key 的 interest set.
例如在 OP_ACCEPT 中, 咱們能夠將 interest set 更新爲 OP_READ, 這樣 Selector 就會將這個 Channel 的 讀 IO 就緒事件包含進來了.
經過 Selector.open() 打開一個 Selector.
將 Channel 註冊到 Selector 中, 並設置須要監聽的事件(interest set)
不斷重複:
調用 select() 方法
調用 selector.selectedKeys() 獲取 selected keys
迭代每一個 selected key:
*從 selected key 中獲取 對應的 Channel 和附加信息(若是有的話)
*判斷是哪些 IO 事件已經就緒了, 而後處理它們. 若是是 OP_ACCEPT 事件, 則調用 "SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept()" 獲取 SocketChannel, 並將它設置爲 非阻塞的, 而後將這個 Channel 註冊到 Selector 中.
*根據須要更改 selected key 的監聽事件.
*將已經處理過的 key 從 selected keys 集合中刪除.
當調用了 Selector.close()方法時, 咱們實際上是關閉了 Selector 自己而且將全部的 SelectionKey 失效, 可是並不會關閉 Channel.
public class NioEchoServer { private static final int BUF_SIZE = 256; private static final int TIMEOUT = 3000; public static void main(String args[]) throws Exception { // 打開服務端 Socket ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 打開 Selector Selector selector = Selector.open(); // 服務端 Socket 監聽8080端口, 並配置爲非阻塞模式 serverSocketChannel.socket().bind(new InetSocketAddress(8080)); serverSocketChannel.configureBlocking(false); // 將 channel 註冊到 selector 中. // 一般咱們都是先註冊一個 OP_ACCEPT 事件, 而後在 OP_ACCEPT 到來時, 再將這個 Channel 的 OP_READ // 註冊到 Selector 中. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (true) { // 經過調用 select 方法, 阻塞地等待 channel I/O 可操做 if (selector.select(TIMEOUT) == 0) { System.out.print("."); continue; } // 獲取 I/O 操做就緒的 SelectionKey, 經過 SelectionKey 能夠知道哪些 Channel 的哪類 I/O 操做已經就緒. Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator(); while (keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); // 當獲取一個 SelectionKey 後, 就要將它刪除, 表示咱們已經對這個 IO 事件進行了處理. keyIterator.remove(); if (key.isAcceptable()) { // 當 OP_ACCEPT 事件到來時, 咱們就有從 ServerSocketChannel 中獲取一個 SocketChannel, // 表明客戶端的鏈接 // 注意, 在 OP_ACCEPT 事件中, 從 key.channel() 返回的 Channel 是 ServerSocketChannel. // 而在 OP_WRITE 和 OP_READ 中, 從 key.channel() 返回的是 SocketChannel. SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept(); clientChannel.configureBlocking(false); //在 OP_ACCEPT 到來時, 再將這個 Channel 的 OP_READ 註冊到 Selector 中. // 注意, 這裏咱們若是沒有設置 OP_READ 的話, 即 interest set 仍然是 OP_CONNECT 的話, 那麼 select 方法會一直直接返回. clientChannel.register(key.selector(), OP_READ, ByteBuffer.allocate(BUF_SIZE)); } if (key.isReadable()) { SocketChannel clientChannel = (SocketChannel) key.channel(); ByteBuffer buf = (ByteBuffer) key.attachment(); long bytesRead = clientChannel.read(buf); if (bytesRead == -1) { clientChannel.close(); } else if (bytesRead > 0) { key.interestOps(OP_READ | SelectionKey.OP_WRITE); System.out.println("Get data length: " + bytesRead); } } if (key.isValid() && key.isWritable()) { ByteBuffer buf = (ByteBuffer) key.attachment(); buf.flip(); SocketChannel clientChannel = (SocketChannel) key.channel(); clientChannel.write(buf); if (!buf.hasRemaining()) { key.interestOps(OP_READ); } buf.compact(); } } } } }
本文由 yongshun 發表於我的博客, 採用署名-非商業性使用-相同方式共享 3.0 中國大陸許可協議.
非商業轉載請註明做者及出處. 商業轉載請聯繫做者本人
Email: yongshun1228@gmail .com本文標題爲: Java NIO 的前生今世 之四 NIO Selector 詳解本文連接爲: segmentfault.com/a/1190000006824196