Buffer是一個對象,包含一些要寫入和讀出的數據。java
在NIO中,全部的數據都是用緩衝區處理的,讀取數據時,它是從通道(Channel)直接讀到緩衝區中,在寫入數據時,也是從緩衝區寫入到通道。node
緩衝區實質上是一個數組,一般是一個字節數組(ByteBuffer),也能夠是其它類型的數組,此外緩衝區還提供了對數據的結構化訪問以及維護讀寫位置等信息。數組
Buffer類的繼承關係以下圖所示:服務器
Channel是一個通道,網絡數據經過Channel讀取和寫入。通道和流的不一樣之處在於通道是雙向的(通道能夠用於讀、寫後者兩者同時進行),流只是在一個方向上移動。網絡
Channel大致上能夠分爲兩類:用於網絡讀寫的SelectableChannel(ServerSocketChannel和SocketChannel就是其子類)、用於文件操做的FileChannel。dom
下面的例子給出經過FileChannel來向文件中寫入數據、從文件中讀取數據,將文件數據拷貝到另外一個文件中:socket
public class NioTest { public static void main(String[] args) throws IOException { copyFile(); } //拷貝文件 private static void copyFile() { FileInputStream in=null; FileOutputStream out=null; try { in=new FileInputStream("src/main/java/data/in-data.txt"); out=new FileOutputStream("src/main/java/data/out-data.txt"); FileChannel inChannel=in.getChannel(); FileChannel outChannel=out.getChannel(); ByteBuffer buffer=ByteBuffer.allocate(1024); int bytesRead = inChannel.read(buffer); while (bytesRead!=-1) { buffer.flip(); outChannel.write(buffer); buffer.clear(); bytesRead = inChannel.read(buffer); } } catch (FileNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } //寫文件 private static void writeFileNio() { try { RandomAccessFile fout = new RandomAccessFile("src/main/java/data/nio-data.txt", "rw"); FileChannel fc=fout.getChannel(); ByteBuffer buffer=ByteBuffer.allocate(1024); buffer.put("hi123".getBytes()); buffer.flip(); try { fc.write(buffer); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } catch (FileNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } } //讀文件 private static void readFileNio() { FileInputStream fileInputStream; try { fileInputStream = new FileInputStream("src/main/java/data/nio-data.txt"); FileChannel fileChannel=fileInputStream.getChannel();//從 FileInputStream 獲取通道 ByteBuffer byteBuffer=ByteBuffer.allocate(1024);//建立緩衝區 int bytesRead=fileChannel.read(byteBuffer);//將數據讀到緩衝區 while(bytesRead!=-1) { /*limit=position * position=0; */ byteBuffer.flip(); //hasRemaining():告知在當前位置和限制之間是否有元素 while (byteBuffer.hasRemaining()) { System.out.print((char) byteBuffer.get()); } /* * 清空緩衝區 * position=0; * limit=capacity; */ byteBuffer.clear(); bytesRead = fileChannel.read(byteBuffer); } } catch (FileNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
多路複用器提供選擇已經就緒的任務的能力。Selector會不斷的輪詢註冊在其上的Channel,若是某個Channel上面發送讀或者寫事件,這個Channel就處於就緒狀態,會被Selector輪詢出來,而後經過SelectionKey能夠獲取就緒Channel的集合,進行後續的I/O操做。ide
一個多路複用器Selector能夠同時輪詢多個Channel,因爲JDK使用了epoll代替了傳統的select實現,因此它沒有最大鏈接句柄1024/2048的限制,意味着只須要一個線程負責Selector的輪詢,就能夠接入成千上萬的客戶端。其模型以下圖所示:url
用單線程處理一個Selector。要使用Selector,得向Selector註冊Channel,而後調用它的select()方法。這個方法會一直阻塞到某個註冊的通道有事件就緒。一旦這個方法返回,線程就能夠處理這些事件,事件的例子有如新鏈接進來,數據接收等。 spa
注:
一、什麼select模型?
select是事件觸發機制,當等待的事件發生就觸發進行處理,多用於Linux實現的服務器對客戶端的處理。
能夠阻塞地同時探測一組支持非阻塞的IO設備,是否有事件發生(如可讀、可寫,有高優先級錯誤輸出等),直至某一個設備觸發了事件或者超過了指定的等待時間。也就是它們的職責不是作IO,而是幫助調用者尋找當前就緒的設備。
二、什麼是epoll模型?
epoll的設計思路,是把select/poll單個的操做拆分爲1個epoll_create+多個epoll_ctrl+一個wait。此外,內核針對epoll操做添加了一個文件系統」eventpollfs」,每個或者多個要監視的文件描述符都有一個對應的eventpollfs文件系統的inode節點,主要信息保存在eventpoll結構體中。而被監視的文件的重要信息則保存在epitem結構體中。因此他們是一對多的關係。
功能說明:開啓服務器端,對每個接入的客戶端都向其發送hello字符串。
使用NIO進行服務器端開發主要有如下幾個步驟:
一、建立ServerSocketChannel,配置它爲非阻塞模式
serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false);
二、綁定監聽,配置TCP參數,如backlog大小
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
三、建立一個獨立的I/O線程,用於輪詢多路複用器Selector
四、建立Selector,將以前建立的ServerSocketChannel註冊到Selector上,監聽SelectionKey.ACCEPT
selector=Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
五、啓動I/O線程,在循環體內執行Selector.select()方法,輪詢就緒的Channel
while(true) { try { //select()阻塞到至少有一個通道在你註冊的事件上就緒了 //若是沒有準備好的channel,就在這一直阻塞 //select(long timeout)和select()同樣,除了最長會阻塞timeout毫秒(參數)。 selector.select(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); break; }
}
六、當輪詢到了處於就緒狀態的Channel時,需對其進行判斷,若是是OP_ACCEPT狀態,說明是新的客戶端接入,則調用ServerSocketChannel.accept()方法接受新的客戶端
//返回已經就緒的SelectionKey,而後迭代執行 Set<SelectionKey> readKeys=selector.selectedKeys(); for(Iterator<SelectionKey> it=readKeys.iterator();it.hasNext();) { SelectionKey key=it.next(); it.remove(); try { if(key.isAcceptable()) { ServerSocketChannel server=(ServerSocketChannel) key.channel(); SocketChannel client=server.accept(); client.configureBlocking(false); client.register(selector,SelectionKey.OP_WRITE); } else if(key.isWritable()) { SocketChannel client=(SocketChannel) key.channel(); ByteBuffer buffer=ByteBuffer.allocate(20); String str="hello"; buffer=ByteBuffer.wrap(str.getBytes()); client.write(buffer); key.cancel(); } }catch(IOException e) { e.printStackTrace(); key.cancel(); try { key.channel().close(); } catch (IOException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } } }
七、設置新接入的客戶端鏈路SocketChannel爲非阻塞模式,配置其餘的一些TCP參數
if(key.isAcceptable()) { ServerSocketChannel server=(ServerSocketChannel) key.channel(); SocketChannel client=server.accept(); client.configureBlocking(false); ... }
八、將SocketChannel註冊到Selector,監聽OP_WRITE
client.register(selector,SelectionKey.OP_WRITE);
九、若是輪詢的Channel爲OP_WRITE,則說明要向SockChannel中寫入數據,則構造ByteBuffer對象,寫入數據包
else if(key.isWritable()) { SocketChannel client=(SocketChannel) key.channel(); ByteBuffer buffer=ByteBuffer.allocate(20); String str="hello"; buffer=ByteBuffer.wrap(str.getBytes()); client.write(buffer); key.cancel(); }
完整代碼以下:
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; public class ServerSocketChannelDemo { public static void main(String[] args) { ServerSocketChannel serverSocketChannel; Selector selector=null; try { serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(8080)); selector=Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } while(true) { try { //select()阻塞到至少有一個通道在你註冊的事件上就緒了 //若是沒有準備好的channel,就在這一直阻塞 //select(long timeout)和select()同樣,除了最長會阻塞timeout毫秒(參數)。 selector.select(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); break; } //返回已經就緒的SelectionKey,而後迭代執行 Set<SelectionKey> readKeys=selector.selectedKeys(); for(Iterator<SelectionKey> it=readKeys.iterator();it.hasNext();) { SelectionKey key=it.next(); it.remove(); try { if(key.isAcceptable()) { ServerSocketChannel server=(ServerSocketChannel) key.channel(); SocketChannel client=server.accept(); client.configureBlocking(false); client.register(selector,SelectionKey.OP_WRITE); } else if(key.isWritable()) { SocketChannel client=(SocketChannel) key.channel(); ByteBuffer buffer=ByteBuffer.allocate(20); String str="hello"; buffer=ByteBuffer.wrap(str.getBytes()); client.write(buffer); key.cancel(); } }catch(IOException e) { e.printStackTrace(); key.cancel(); try { key.channel().close(); } catch (IOException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } } } } } }
咱們用telnet localhost 8080模擬出多個客戶端:
程序運行結果以下:
一、netty權威指南(李林峯)