選擇器 Selector 是 I/O 多路複用的核心組件,它能夠監控實現了 SelectableChannel 的通道的就緒狀況。有了多路複用(multiplexing) I/O 模型,使得單線程的 Java 程序在極端狀況下可以處理數萬個鏈接,極大提升了程序的併發數。css
I/O 多路複用模型是操做系統提供給應用程序的一種進行 I/O 操做的模型。應用程序經過 select/poll 系統調用監控多個 I/O 設備,一旦某個或者多個 I/O 設備的處於就緒狀態(例如:可讀)則返回,應用程序隨後可對就緒的設備進行操做。html
java
大體流程以下:git
1)應用程序向內核發起 select 系統調用,該調用會阻塞應用程序。github
2)內核等待數據到達。數據可能由 DMA 複製到內核緩衝區,也有多是 CPU 進行復制。服務器
3)數據準備完畢,select 調用返回。select 返回的是一個集和,可能有多個鏈接都已經就緒。多線程
4)應用程序發起 read 系統調用。併發
5)操做系統將數據有內核緩衝區複製到用戶緩衝區。app
6)read 調用返回。異步
I/O 多路複用模型本質上是一種阻塞 I/O,進行讀操做的 read 系統調用是阻塞的,select 的時候也是阻塞的。不過 I/O 多路複用模型的優點在於阻塞時能夠等待多路 I/O 就緒,而後一併處理。與多線程處理多路 I/O 相比,它是單線程的,沒有線程切換的開銷,單位時間內可以處理多的鏈接數。
在 Java 中,通道 Channel 能夠表示 I/O 鏈接,而選擇器能夠監控某些 I/O 事件就緒的通道,選擇通道中就緒的 I/O 事件。這裏的通道必須是實現了 SelectableChannel 接口的通道,例如:SocketChannel, DatagramChannel 等;而文件通道 FileChannel 沒有實現該接口,因此不支持選擇器。
選擇器 Selector 監控通道時監控的是通道中的事件,選擇鍵 SelectionKey 就表明着 I/O 事件。程序經過調用 Selector.select() 方法來選中選擇器所監控的通道中的就緒的 I/O 事件的集合,而後遍歷集合,對事件做出相應的處理。
選擇鍵 SelectionKey 能夠表示 4 種事件,這 4 種事件使用 int 類型的常量來表示。
1)SelectionKey.OP_ACCEPT 表示 accept 事件就緒。例如:對於 ServerSocketChannel 來講,該事件就緒表示能夠調用 accept() 方法來得到與客戶端鏈接的通道 SocketChannel。
2)SelectionKey.OP_CONNECT 表示客戶端與服務端鏈接成功。
3)SelectionKey.OP_READ 表示通道中已經有了可讀數據,能夠調用 read() 方法從通道中讀取數據。
4)SelectionKey.OP_WRITE 表示寫事件就緒,能夠調用 write() 方法往通道中寫入數據。
不一樣的通道所可以支持的 I/O 事件不一樣,例如:ServerSocketChannel 只支持 accept 事件,而 DatagramChannel 只支持 read 和 write 事件。要查看通道所支持的事件,能夠查看通道的 javadoc 文檔,或者調用通道的 validOps() 方法來進行判斷。例如:channel.validOps() & SelectionKey.OP_READ > 0 表示 channel 支持讀事件。
與通道和緩衝區的獲取相似,選擇器的獲取也是經過靜態工廠方法 open() 來獲得的。
Selector selector = Selector.open(); // 獲取一個選擇器實例
可以被選擇器監控的通道必須實現了 SelectableChannel 接口,而且須要將通道配置成非阻塞模式,不然後續的註冊步驟會拋出 IllegalBlockingModeException。
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9090)); // 打開 SocketChannel 並鏈接到本機 9090 端口 socketChannel.configureBlocking(false); // 配置通道爲非阻塞模式
通道在被指定的選擇器監控以前,應該先告訴選擇器,而且告知監控的事件,即:將通道註冊到選擇器。
通道的註冊經過 SelectableChannel.register(Selector selector, int ops) 來完成,ops 表示關注的事件,若是須要關注該通道的多個 I/O 事件,能夠傳入這些事件類型或運算以後的結果。這些事件必須是通道所支持的,不然拋出 IllegalArgumentException。
socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); // 將套接字經過到註冊到選擇器,關注 read 和 write 事件
經過調用選擇器的 Selector.select() 方法能夠獲取就緒事件,該方法會將就緒事件放到一個 SelectionKey 集合中,而後返回就緒的事件的個數。這個方法映射多路複用 I/O 模型中的 select 系統調用,它是一個阻塞方法。正常狀況下,直到至少有一個就緒事件,或者其它線程調用了當前 Selector 對象的 wakeup() 方法,或者當前線程被中斷時返回。
while (selector.select() > 0){ // 輪詢,且返回時有就緒事件 Set<SelectionKey> keys = selector.selectedKeys(); // 獲取就緒事件集合 ....... }
有 3 種方式能夠 select 就緒事件:
1)select() 阻塞方法,有一個就緒事件,或者其它線程調用了 wakeup() 或者當前線程被中斷時返回。
2)select(long timeout) 阻塞方法,有一個就緒事件,或者其它線程調用了 wakeup(),或者當前線程被中斷,或者阻塞時長達到了 timeout 時返回。不拋出超時異常。
3)selectNode() 不阻塞,若是無就緒事件,則返回 0;若是有就緒事件,則將就緒事件放到一個集合,返回就緒事件的數量。
每次能夠 select 出一批就緒的事件,因此須要對這些事件進行迭代。從一個 SelectionKey 對象能夠獲得:1)就緒事件的對應的通道;2)就緒的事件。經過這些信息,就能夠很方便地進行 I/O 操做。
for(SelectionKey key : keys){ if(key.isWritable()){ // 可寫事件 if("Bye".equals( (line = scanner.nextLine()) )){ socketChannel.shutdownOutput(); socketChannel.close(); break; } buf.put(line.getBytes()); buf.flip(); socketChannel.write(buf); buf.compact(); } } keys.clear(); // 清除選擇鍵(事件)集,避免下次循環的時候重複處理。
須要注意的是,處理完 I/O 事件以後,須要清除選擇鍵集和,避免下一輪循環的時候對同一事件重複處理。
下面給出一個完整的實例,實例中包含 TCP 客戶端 TcpClient, UDP 客戶端 UdpClient 和服務端 EchoServer。服務端 EchoServer 能夠同時處理 UDP 請求和 TCP 請求,用戶能夠在客戶端控制檯輸入內容,按回車發送給服務端,服務端打印客戶端發送過來的內容。完整代碼:https://github.com/Robothy/java-experiments/tree/main/nio/Selector
public class EchoServer { public static void main(String[] args) throws IOException { Selector selector = Selector.open(); // 獲取選擇器 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 打開服務器通道 serverSocketChannel.configureBlocking(false); // 服務器通道配置爲非阻塞模式 serverSocketChannel.bind(new InetSocketAddress(9090)); // 綁定 TCP 端口 9090 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // 將服務器通道註冊到選擇器 selector 中,註冊事件爲 ACCEPT DatagramChannel datagramChannel = DatagramChannel.open(); // 打開套接字通道 datagramChannel.configureBlocking(false); // 配置通道爲非阻塞模式 datagramChannel.bind(new InetSocketAddress(9090)); // 綁定 UDP 端口 9090 datagramChannel.register(selector, SelectionKey.OP_READ); // 將通道註冊到選擇器 selector 中,註冊事件爲讀取數據 ByteBuffer buf = ByteBuffer.allocate(1024); // 分配一個 1024 字節的堆字節緩衝區 while (selector.select() > 0){ // 輪詢已經就緒的註冊的通道的 I/O 事件 Set<SelectionKey> keys = selector.selectedKeys(); // 獲取就緒的 I/O 事件,即選擇器鍵集合 for (SelectionKey key : keys){ // 遍歷選擇鍵,處理就緒事件 if(key.isAcceptable()){ // 選擇鍵的事件的是 I/O 鏈接事件 SocketChannel socketChannel = serverSocketChannel.accept(); // 執行 I/O 操做,獲取套接字鏈接通道 socketChannel.configureBlocking(false); // 配置爲套接字通道爲非阻塞模式 socketChannel.register(selector, SelectionKey.OP_READ); // 將套接字經過到註冊到選擇器,關注 READ 事件 }else if(key.isReadable()){ // 選擇鍵的事件是 READ StringBuilder sb = new StringBuilder(); if(key.channel() instanceof DatagramChannel){ // 選擇的通道爲數據報通道,客戶端是經過 UDP 鏈接過來的 sb.append("UDP Client: "); datagramChannel.receive(buf); // 最多讀取 1024 字節,數據報多出的部分自動丟棄 buf.flip(); while(buf.position() < buf.limit()) { sb.append((char)buf.get()); } buf.clear(); }else{ // 選擇的通道爲套接字通道,客戶端時經過 TCP 鏈接過來的 sb.append("TCP Client: "); ReadableByteChannel channel = (ReadableByteChannel) key.channel(); // 獲取通道 int size; while ( (size = channel.read(buf))>0){ buf.flip(); while (buf.position() < buf.limit()) { sb.append((char)buf.get()); } buf.clear(); } if (size == -1) { sb.append("Exit"); channel.close(); } } System.out.println(sb); } } keys.clear(); // 將選擇鍵清空,防止下次循環時被重複處理 } } }
public class TcpClient { public static void main(String[] args) throws IOException { Selector selector = Selector.open(); SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9090)); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_WRITE); Scanner scanner = new Scanner(System.in); String line; ByteBuffer buf = ByteBuffer.allocate(1024); while (selector.select() > 0){ Set<SelectionKey> keys = selector.selectedKeys(); for(SelectionKey key : keys){ if(key.isWritable()){ if("Bye".equals( (line = scanner.nextLine()) )){ socketChannel.shutdownOutput(); socketChannel.close(); break; } buf.put(line.getBytes()); buf.flip(); socketChannel.write(buf); buf.compact(); } } keys.clear(); if(!socketChannel.isOpen()) break; } } }
public class UdpClient { public static void main(String[] args) throws IOException { Selector selector = Selector.open(); // 獲取選擇器 DatagramChannel datagramChannel = DatagramChannel.open(); // 打開一個數據報通道 datagramChannel.configureBlocking(false); // 配置通道爲非阻塞模式 datagramChannel.register(selector, SelectionKey.OP_WRITE); // 將通道的寫事件註冊到選擇器 ByteBuffer buff = ByteBuffer.allocate(1024); // 分配字節緩衝區 Scanner scanner = new Scanner(System.in); // 建立掃描器,掃描控制檯輸入流 InetSocketAddress server = new InetSocketAddress("localhost", 9090); while (selector.select() > 0){ // 有就緒事件 Set<SelectionKey> keys = selector.selectedKeys(); // 獲取選擇鍵,即就緒的事件 for(SelectionKey key : keys){ // 遍歷選擇鍵 if(key.isWritable()){ // 若是當前選擇鍵是讀就緒 String line; if("Bye".equals( line = scanner.nextLine() )) { // 從控制檯獲取 1 行輸入,並檢查輸入的是否是 Bye System.exit(0); // 正常退出 } buff.put(line.getBytes()); // 放入緩衝區 buff.flip(); // 將緩衝區置爲讀狀態 datagramChannel.send(buff, server); // 往 I/O 寫數據 buff.compact(); // 壓縮緩衝區,保留沒發送完的數據 } } keys.clear(); } } }
Selector 做爲多路複用 I/O 模型的核心組件,可以同時監控多路 I/O 通道。選擇器在 select 就緒事件地時候會阻塞,在處理 I/O 事件的時候也會阻塞,它的優點在於在阻塞的時候能夠等待多路 I/O 就緒,是一種異步阻塞 I/O 模型。與多線程處理多路 I/O 相比,多路複用模型只須要單個線程便可處理萬級鏈接,沒有線程切換的開銷。