NIO(1、概述)
NIO(2、Buffer)
NIO(3、Channel)
NIO(4、Selector)html
上文說了描述了Buffer的實現機制,那麼這個章節就主要描述數據是如何進入緩衝區的,而且又是如何從緩衝區流出的。java
這張圖只是簡單歸納了Channel的類圖,固然,Channel的設計遠比這個更復雜:例如SelectableChannel還有SocketChannel和ServerSocketChannel的實現,NetworkChannel繼承Channel並抽象了更多的方法;例如FileChannel,除了繼承AbstractInterruptibleChannel以外,還實現了GatheringByteChannel和ScatteringByteChannel接口。api
Channel
咱們能夠看到,Channel接口自己定義了 close() 和 isOpen() 方法,在繼承Channel的接口中,又分別抽象了讀通道(ReadableByteChannel)、寫通道(WritableByteChannel)及可中斷的異步通道(InterruptibleChannel)接口。讀寫通道天然沒必要說,下文也會有介紹。數組
InterruptibleChannel
這裏說下InterruptibleChannel,這是一個能夠被中斷的異步通道,繼承了 close() 方法。當一個線程在I/O被阻塞時,另外一個線程執行了close()方法,那麼阻塞的線程會拋出 AsynchronousCloseException異常。當一個線程在I/O被阻塞時,另外一個線程調用阻塞的線程中斷(interrupt())方法,那麼將會拋出ClosedByInterruptException異常。瀏覽器
AbstractInterruptibleChannel
AbstractInterruptibleChannel抽象類,這是全部可中斷通道實現的基類,咱們能夠看到,FileChannel正是直接繼承自它,後文會介紹的SocketChannel和ServerSocketChannel繼承自AbstractSelectableChannel抽象類,而AbstractSelectableChannel又繼承自SelectableChannel抽象類,SelectableChannel上圖就能夠看出,一樣繼承AbstractInterruptibleChannel。咱們不得不想一下,爲何FileChannel直接繼承AbstractInterruptibleChannel抽象類,而另外一個經常使用的好比SocketChannel卻須要繼承自SelectableChannel抽象類?此時咱們須要只瞭解SelectableChannel是什麼問題就迎刃而解了。安全
SelectableChannel
第一個章節在概述中提過一個名詞「多路複用」,當時也做了簡單描述:一個線程經過選擇器處理和管理多個通道,利用一個線程的資源去處理多個鏈接。SelectableChannel正是扮演着其中的一個重要角色。而相似SocketChannel同樣實現NetworkChannel接口的通道,這種網絡數據的傳輸尤爲在高併發的壓力下,讓CPU利用率真正體如今處理數據上,而不是頻繁上下文切換的開銷,使用這種機制能明顯提高處理性能,而FileChannel這種對文件操做是絕對不會使用到這種機制的。另外,SelectableChannel在阻塞模型中,每個I/O操做都會阻塞到它完成爲止,在非阻塞模型中,是不會出現阻塞的狀況,一樣返回的字節數可能少於要求或者乾脆沒有,是否阻塞咱們能經過 isBlocking() 方法查看。
SelectableChannel是經過選擇器(Selector)複用的通道,爲了配合與選擇器一塊兒使用,可以使用 register() 方法,這個方法會返回一個SelectionKey對象,表示已經在選擇器裏註冊了,這個通道會一直保持到註銷爲止,同時也包括已經分配在這個選擇器上的這個通道的資源。通常狀況下,通道本身不能直接在通道上註銷,咱們能夠調用以前註冊時返回的SelectionKey對象的 cancel() 方法,能夠顯式註銷。另外,閱讀代碼咱們發現,幾乎全部的實現方法都有同步控制,因此,在多個併發線程下使用是安全的。服務器
Channel,它既是緩衝區數據的入口,也是其數據的出口。
在上面的類圖咱們能夠看到,Channel分別抽象了ReadableByteChannel和WritableByteChannel接口,兩個接口各自定義了 read() 和 write() 方法,讀取和寫入ByteBuffer對象。並在這個基礎上又進一步定義了ScatteringByteChannel和GatheringByteChannel接口,一樣,這兩個接口一樣存在繼承自ReadableByteChannel和WritableByteChannel接口的read() 和 write() 方法,此外,還分別重載了 read() 和 write() 方法,增長了讀取或寫入ByteBuffer數組。
網絡
ByteBuffer buffer1 = ByteBuffer.allocate(1024); ByteBuffer buffer2 = ByteBuffer.allocate(1024); ByteBuffer[] buffers = {buffer1, buffer2}; channel.read(buffers);
ByteBuffer buffer1 = ByteBuffer.allocate(1024); ByteBuffer buffer2 = ByteBuffer.allocate(1024); ByteBuffer[] buffers = {buffer1, buffer2}; channel.write(buffers);
FileChannel的 transferFrom() 和 transferTo() 兩個方法實現了通道之間的數據傳輸,當有一方是FileChannel時,另外一方實現了 ReadableByteChannel和WritableByteChannel接口的通道就可以與FileChannel傳輸數據。目前爲止,只有文件通道(FileChannel)可以雙向傳輸。併發
FileChannel fromChannel = fromFile.getChannel(); FileChannel toChannel = toFile.getChannel(); // transferFrom toChannel.transferFrom(fromChannel, position, count); // transferTo fromChannel.transferTo(position,count,toChannel);
咱們在講Buffer的時候就已經說了position,意指讀寫位置,count指的是數據大小。oracle
建立SocketChannel僅須要調用它的 open() 方法:
SocketChannel channel = SocketChannel.open();
事實上,可能在實際使用的時候,咱們也會在的句柄對象的處理方法裏,使用SelectionKey的 channel() 方法來獲取,或者ServerSocketChannel對象的 accept() 方法來獲取。
SelectionKey key = ... //接受消息處理 SocketChannel channel = ((ServerSocketChannel) key.channel()).accept(); //或者 SocketChannel channel = (SocketChannel) key.channel();
在咱們手動 open() 打開這個socket以後,其實還未鏈接,這時候咱們對這個通道進行I/O操做會拋出NotYetConnectedException。咱們可使用 connect() 方法來鏈接通道的socket,與此同時,也可使用 isConnected() 判斷是否已鏈接。若是這個Socket在非阻塞模型中,就須要 finishConnect() 來肯定鏈接已完成,也能夠經過 isConnectionPending() 來測定該通道的鏈接是否正在進行中。
// 設置非阻塞 channel.configureBlocking(false); if(channel.isConnected()) { channel.connect(new InetSocketAddress(8080)); while (channel.finishConnect()){ //connected - do something } }
固然,關於channel的讀寫操做,在講Scatter/Gather時已經講過,這裏再也不贅述。
每次使用完成以後,都會調用 close() 方法關閉。須要說明的是,這裏的 close() 方法支持異步關閉,當一個線程執行 close() 方法同時,另外一個線程的在同一個通道上執行讀操做時會被阻塞,而後這個讀取操做不會返回任何數據,只會返回-1標識。固然,另外一個線程若是是寫操做的話也一樣會被阻塞,不一樣的是會拋出 AsynchronousCloseException 異常。咱們在上文中描述 InterruptibleChannel 這個可中斷通道接口的時候也提到這個問題。
channel.close();
建立ServerSocketChannel與SocketChannel相似,不一樣的是ServerSocketChannel是監聽套接字鏈接。因此在建立它的時候,須要將它綁定,若是沒有綁定就執行 accept() 方法,那麼會拋出 NotYetBoundException 異常。
ServerSocketChannel channel = ServerSocketChannel.open(); channel.socket().bind(new InetSocketAddress(8080));
監聽到的鏈接可使用 accept() 方法來返回該鏈接的 SocketChannel。執行 accept() 方法會一直阻塞直到有鏈接到達。
SocketChannel channel = ((ServerSocketChannel) key.channel()).accept();
下面是一段簡單的ServerSocketChannel和SocketChannel應用,能夠看到是如何使用這兩個類的。當瀏覽器輸入localhost:8080的時候,會在控制檯打印出這個鏈接的請求信息。
public static void main(String[] args) throws IOException { ServerSocketChannel channel = ServerSocketChannel.open(); channel.socket().bind(new InetSocketAddress(8080)); //設置非阻塞 channel.configureBlocking(false); //註冊 Selector selector = Selector.open(); channel.register(selector, SelectionKey.OP_ACCEPT); //處理器 Handler handler = new Handler(1024); while (true) { //等待請求,每次等待阻塞5s,5s後線程繼續向下執行,若是傳入0或者不傳參數將一直阻塞 if (selector.select(5000) == 0) { continue; } //獲取待處理的SelectionKey Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator(); while (keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); //當接受到請求 if (key.isAcceptable()) { handler.handleAccept(key); } try { //讀數據 if (key.isReadable()) { handler.handleRead(key); } } catch (IOException e) { keyIterator.remove(); e.printStackTrace(); } keyIterator.remove(); } } } private static class Handler { private int bufferSize = 1024; private String localCharset = "UTF-8"; public Handler() { } public Handler(int bufferSize) { this.bufferSize = bufferSize; } public Handler(String localCharset) { this.localCharset = localCharset; } public Handler(int bufferSize, String localCharset) { this.bufferSize = bufferSize; this.localCharset = localCharset; } public void handleAccept(SelectionKey key) { try { SocketChannel channel = ((ServerSocketChannel) key.channel()).accept(); channel.configureBlocking(false); channel.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufferSize)); } catch (IOException e) { e.printStackTrace(); } } public void handleRead(SelectionKey key) throws IOException { //獲取channel SocketChannel channel = (SocketChannel) key.channel(); //獲取buffer 重置 ByteBuffer buffer = (ByteBuffer) key.attachment(); buffer.clear(); if (channel.read(buffer) == -1) { channel.close(); } else { //轉爲讀狀態 buffer.flip(); String receivedString = Charset.forName(localCharset) .newDecoder().decode(buffer).toString(); System.out.printf("接受到客戶端數據" + receivedString); //返回數據給客戶端 String sendString = "接受數據:" + receivedString; buffer = ByteBuffer.wrap(sendString.getBytes(localCharset)); channel.write(buffer); channel.close(); } } }