Java NIO - Channel與Selector

聲明:java

本文由Yasin Shaw原創,首發於我的網站yasinshaw.com和公衆號"xy的技術圈"。服務器

若是須要轉載請聯繫我(微信:yasinshaw)並在文章開頭顯著的地方註明出處微信

關注公衆號便可獲取學習資源或加入技術交流羣網絡

Java NIO有三個核心的組件:Buffer、Channel和Selector。socket

在上一篇文章中,咱們已經介紹了Buffer,這篇文章主要介紹剩下兩個組件:Channel和Selector。學習

Selector與Channel的關係

Channel

Channel翻譯過來是「通道」的意思,全部的Java NIO都要通過Channel。一個Channel對象其實就對應了一個IO鏈接。Java NIO中主要有如下Channel實現:網站

  • FileChannel
  • DatagramChannel
  • SocketChannel
  • ServerSocketChannel

分別用於處理文件IO、UDP、TCP客戶端、TCP服務端。spa

這裏以ServerSocketChannel和SocketChannel爲例,介紹一些經常使用的方法。線程

// server:
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress("127.0.0.1", 8080));
SocketChannel socketChannel = serverSocketChannel.accept();
ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
int readBytes = socketChannel.read(buffer);
if (readBytes > 0) {
    buffer.flip();
    byte[] bytes = new byte[buffer.remaining()];
    buffer.get(bytes);
    String body = new String(bytes, StandardCharsets.UTF_8);
    System.out.println("server 收到:" + body);
}
複製代碼

對於服務端來講,先用open方法建立一個對象,而後使用bind方法綁定端口。翻譯

綁定之後,使用accept方法等待新的鏈接進來,這個方法是阻塞的。一旦有了新的鏈接,纔會解除阻塞。再次調用能夠阻塞等待下一個鏈接。

與Buffer配合,使用read方法能夠把數據從Channel讀到Buffer裏面,而後作後續處理。

// Client:
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));
ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
buffer.put("hi, 這是client".getBytes(StandardCharsets.UTF_8));
buffer.flip();
socketChannel.write(buffer);
複製代碼

對於客戶端來講,有一些微小的區別。客戶端不須要bind監聽端口,而是直接connect去嘗試鏈接服務端。

一樣與Buffer配合,Channel使用write方法能夠把數據從Buffer寫到Channel裏,而後後續就能夠作網絡傳輸了。

Selector

Selector翻譯過來叫作「選擇器」,Selector容許一個線程處理多個Channel。Selector的應用場景是:若是你的應用打開了多個鏈接(Channel),但每一個鏈接的流量都很低。好比:聊天服務器或者HTTP服務器。

使用Selector很簡單。使用open方法建立一個Selector對象,而後把Channel註冊到Selector上。

// 建立一個Selector
Selector selector = Selector.open();

// 把一個Channel註冊到Selector
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
複製代碼

須要注意的是,必定要使用configureBlocking(false)把Channel設置成非阻塞模式,不然會拋出IllegalBlockingModeException異常。

Channel的register有兩個重載方法:

SelectionKey register(Selector sel, int ops) {
    return register(sel, ops, null);
}
SelectionKey register(Selector sel, int ops, Object att);
複製代碼

對於ops參數,即selector要關心這個Channel的事件類型,在SelectionKey類裏面有這樣幾個常量:

  • OP_READ 能夠從Channel讀數據
  • OP_WRITE 能夠寫數據到Channel
  • OP_CONNECT 鏈接上了服務器
  • OP_ACCEPT 有新的鏈接進來了

若是你對不止一種事件感興趣,使用或運算符便可,以下:

int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
複製代碼

須要注意的是,FileChannel只有阻塞模式,不支持非阻塞模式,因此它是沒有register方法的!

第三個參數attattachment的縮寫,表明能夠傳一個「附件」進去。在返回的SelectionKey對象裏面,能夠獲取如下對象:

  • channel():獲取Channel
  • selector():獲取Selector
  • attachment():獲取附件
  • attach(obj):更新附件

除此以外,還有一些判斷當前狀態的方法:

  • isReadable()
  • isWritable()
  • isConnectable()
  • isAcceptable()

通常來講,咱們不多直接使用單個的SelectionKey,而是從Selector裏面輪詢全部的SelectionKey,好比:

輪詢

while (selector.select() > 0) {
     Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
     while(keyIterator.hasNext()) {
         SelectionKey key = keyIterator.next();
         if (key.isReadable()) {
             SocketChannel socketChannel = (SocketChannel) key.channel();
             // read
         } else if(key.isAcceptable()) {
             // accept
         }
         // 其它條件
         keyIterator.remove();
     }
 }
複製代碼

Selector能夠返回兩種SelectionKey集合:

  • keys():已註冊的鍵的集合
  • selectedKeys():已選擇的鍵的集合

並非全部註冊過的鍵都仍然有效,有些可能已經被cancel()方法被調用過的鍵。因此通常來講,咱們輪詢selectedKeys()方法。

完整的示例代碼

如下是一個完整的Server-Client Demo:

Server:

public class Server {
    public static void main(String[] args) {
        try (
                ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
                Selector selector = Selector.open();
        ) {
            serverSocketChannel.bind(new InetSocketAddress("127.0.0.1", 8080));
            serverSocketChannel.configureBlocking(false);
            System.out.println("server 啓動...");
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            while (selector.select() > 0) {
                Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
                while(keyIterator.hasNext()) {
                    SelectionKey key = keyIterator.next();
                    if (key.isReadable()) {
                        ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
                        SocketChannel socketChannel = (SocketChannel) key.channel();
                        int readBytes = socketChannel.read(buffer);
                        if (readBytes > 0) {
                            buffer.flip();
                            byte[] bytes = new byte[buffer.remaining()];
                            buffer.get(bytes);
                            String body = new String(bytes, StandardCharsets.UTF_8);
                            System.out.println("server 收到:" + body);
                        }
                    } else if(key.isAcceptable()) {
                        SocketChannel socketChannel = serverSocketChannel.accept();
                        socketChannel.configureBlocking(false);
                        socketChannel.register(selector, SelectionKey.OP_READ);
                    }
                    keyIterator.remove();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
複製代碼

Client:

public class Client {
    public static void main(String[] args) {
        try (
                SocketChannel socketChannel = SocketChannel.open();
        ) {
            socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));
            System.out.println("client 啓動...");

            ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
            buffer.put("hi, 這是client".getBytes(StandardCharsets.UTF_8));
            buffer.flip();
            socketChannel.write(buffer);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
複製代碼

更多文章,歡迎關注公衆號:xy的技術圈

相關文章
相關標籤/搜索