BIO是基於字節流和字符流進行操做的,而NIO是基於通道Channel和緩衝區Buffer進行操做的,數據從通道讀取到緩衝區中,或者從緩衝區寫入到通道中。java
NIO的類庫位於java.lang.nio下,其中有以下一些基本的概念:編程
通道Channel:通道的做用與BIO中的流相似,主要不一樣的地方在於:緩存
緩衝區Buffer:在BIO中,能夠直接將數據寫入或者直接讀取到流中,也能夠經過裝飾類添加緩衝的功能;而在NIO中,全部的數據都是用緩衝區處理的,任什麼時候候使用NIO讀取或者寫入數據都是經過緩衝區進行的。緩衝區本質上是一塊能夠讀寫數據的內存,這塊內存被包裝成NIO Buffer對象,並提供了一組方法來訪問該塊內存。服務器
分散/彙集scatter/gather:分散和彙集是用來描述從通道中讀取或者寫入通道的操做。分散從通道中讀取是指讀操做時將讀取的數據寫入多個緩衝區中;彙集寫入通道是指寫入操做時將多個緩衝區的數據寫入到同一個通道中。分散/彙集一般用於須要將傳輸數據分開處理的場合,如傳輸一個消息能夠將消息頭和消息體分散到不一樣的buffer中。網絡
選擇器Selector:selector模型是NIO編程的基礎,多路複用器Selector經過不斷地輪詢已經註冊過的通道,檢測出就緒的通道集合,從而能夠實現一個線程管理多個通道,管理多個網絡鏈接。併發
java.nio包中經常使用的Channel實現類有:dom
FileChannel沒法設置爲非阻塞模式,只能運行在阻塞模式下。使用FileChannel的幾個基本步驟包括:打開FileChannel、從FileChannel讀寫數據、關閉FileChannel。異步
private static final int BUF_SIZE = 1024; public static void main(String[] args) { // 打開FileChannel須要經過FileIntputStream或FileOutputStream或RandomAccessFile try (FileOutputStream out = new FileOutputStream(new File("d:\\test.txt")); FileChannel channel = out.getChannel();) { ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE); buffer.put("NIO學習:NIO FileChannel Demo".getBytes());// 先往buffer中寫入數據 buffer.flip();// 調轉buffer中讀寫指針position的位置 printBuffer(buffer); //打印buffer中內容 channel.write(buffer); // 將buffer中數據寫入channel } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } private static void printBuffer(ByteBuffer buffer) { try { Charset charset = Charset.forName("UTF-8"); CharsetDecoder decoder = charset.newDecoder(); CharBuffer cBuf = decoder.decode(buffer); buffer.flip(); System.out.println(cBuf.toString()); } catch (CharacterCodingException e) { e.printStackTrace(); } }
private static final int BUF_SIZE = 1024; public static void main(String[] args) { //channel關聯文件——>經過channel讀取數據到buffer try (FileInputStream in = new FileInputStream(new File("d:\\test.txt")); FileChannel channel = in.getChannel();) { ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE); channel.read(buffer); buffer.flip(); printBuffer(buffer); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } private static void printBuffer(ByteBuffer buffer) { Charset charset = Charset.forName("UTF-8"); CharsetDecoder decoder = charset.newDecoder(); try { CharBuffer buf = decoder.decode(buffer); buffer.flip(); System.out.println(buf.toString()); } catch (CharacterCodingException e) { e.printStackTrace(); } }
close():使用完FileChannel須要關閉channel,爲簡潔代碼可使用try-with-resources語法。socket
position():position()方法能夠獲取FileChannel當前的位置,而position(long pos)能夠設置FileChannel當前位置,可是若是將位置設置在文件結束符以後,調用position()將返回-1;調用position(pos)寫入數據,則會把文件撐大到當前位置並寫入數據,這樣會致使磁盤上物理文件中寫入的數據間有空隙。性能
long pos = channle.position(); channel.position(100L);
size():channel.size()方法返回的是channel關聯文件的大小。
truncate(long size):truncate()方法用來截取文件,此時文件中指定長度後面的部分將會被刪除,如:
channel.truncate(100);// 將會保留前100個字節
DatagramChannel是用來收發UDP包的通道,由於UDP是無鏈接的網絡協議,因此DatagramChannel收發的是UDP數據包。
public static void main(String[] args) throws InterruptedException { new Thread(() -> { // 打開DatagramChannel,監聽UDP9999端口 try (DatagramChannel channel = DatagramChannel.open()) { channel.socket().bind(new InetSocketAddress(9999)); ByteBuffer buffer = ByteBuffer.allocate(100); // 經過channel的recevice方法接收UDP數據包 channel.receive(buffer); buffer.flip(); printBuffer(buffer); } catch (IOException e) { e.printStackTrace(); } }).start(); Thread.sleep(1000); // 服務端先啓動 new Thread(() -> { try (DatagramChannel channel = DatagramChannel.open();) { ByteBuffer buf = ByteBuffer.allocate(100); buf.clear(); buf.put("DatagramChannel Demo".getBytes()); // 發送數據前注意要把buffer的position置爲0 buf.flip(); // 調用send方法發送到指定IP地址的指定端口 channel.send(buf, new InetSocketAddress("localhost", 9999)); } catch (IOException e) { e.printStackTrace(); } }).start(); }
因爲UDP是無鏈接的,當指定鏈接的IP地址或域名時並不會建立一個真正的鏈接,而是鎖住了DatagramChannel,只能從鎖定的地址收發數據,而且數據傳送沒有可靠性保證。
public static void main(String[] args) throws InterruptedException { Thread server = new Thread(() -> { try (ServerSocketChannel channel = ServerSocketChannel.open()) { channel.socket().bind(new InetSocketAddress(44593)); SocketChannel socket = channel.accept(); ByteBuffer buffer = ByteBuffer.allocate(1024); socket.read(buffer); buffer.flip(); printBuffer(buffer); } catch (IOException e) { e.printStackTrace(); } }); Thread client = new Thread(() -> { try (SocketChannel channel = SocketChannel.open()) { channel.socket().connect(new InetSocketAddress("127.0.0.1", 44593)); ByteBuffer buffer = ByteBuffer.allocate(1034); buffer.put("socket channle demo".getBytes()); buffer.flip(); channel.write(buffer); } catch (IOException e) { e.printStackTrace(); } }); //啓動順序不影響結果 server.start(); client.start(); }
Buffer做爲數據的讀寫緩衝區,具有讀和寫兩種模式。
public abstract class Buffer { // Invariants: mark <= position <= limit <= capacity private int mark = -1; private int position = 0; private int limit; private int capacity; ...... public final Buffer flip() { limit = position; position = 0; mark = -1; return this; } public final Buffer clear() { position = 0; limit = capacity; mark = -1; return this; } ...... }
capacity:buffer的容量,在申請buffer時指定大小,是固定不變的。
limit:buffer可使用的上限——寫模式下表示最多能往buffer中寫入數據的邊界,初始化時limit等於capacity;調用flip()切換爲讀模式後limit會等於當前的position;表示能讀到的數據邊界。當一次讀寫操做完成後,limit的值可能不會等於capacity,存在內存泄露的狀況(這個不知道算不算設計不夠友好),避免這種狀況要在每一次讀寫操做完成後執行clear()方法清空buffer。
position:position能夠當作是一個讀寫指針,指示當前讀或寫的位置,隨put/get方法自動更新,當buffer中的數據準備好了,須要從寫模式切換爲讀模式時,須要調用buffer.flip()方法,能夠看到flip()方法會將當前寫的最後一個位置賦值給limit,而後將position切換爲0,即變成從0位置開始讀,能夠讀到limit位置,反之從讀模式切換爲寫模式也是如此。
mark: mark用來標記某個時刻的一個position,經過調用buffer.mark()方法能夠記錄當前的position,以後能經過buffer.reset()恢復到這個position。mark默認值是-1,而且其值必須小於position,若是調用buffer.position(index)時傳入的index比mark小,則會將mark設置爲-1使暫存的位置失效。
這4個屬性的大小關係是$mark <= position <= limit <= capacity $
public static void main(String[] args) { ByteBuffer buffer = ByteBuffer.allocate(10); buffer.put("abcde".getBytes()); buffer.flip(); while (buffer.hasRemaining()) { System.out.print(buffer.get() + " "); } buffer.flip(); try { buffer.put("abcdef".getBytes()); //沒有clear()將拋出BufferOverflowException } catch (Exception e) { e.printStackTrace(); } }
一個selector能夠註冊多個channel,而且seletor要求channel必須工做在非阻塞模式下,所以FileChannel不能結合selector使用,同時註冊的channel須要調用 channel.configureBlocking(flase);
設置爲非阻塞通道。
channel聲明瞭一個抽象方法用來註冊channel到selector:
public abstract SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException;
SelectionKey:SelectionKey包含了許多有用的屬性,如interest集合、ready集合、channel對象、selector對象等;經過SelectionKey返回值,能夠進行各類操做。
ops:ops是一個int類型數,其實質表示的是事件類型的集合,即channel註冊selector時告訴selector其對哪些事件感興趣。SelectionKey中只定義了四種事件類型,分別用四個常量表示:
而且channel也不是四種事件都能註冊,不一樣的channel只能註冊validOps()方法中有定義的事件。
public final int validOps() { return (SelectionKey.OP_READ | SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT); }
public final int validOps() { return (SelectionKey.OP_READ | SelectionKey.OP_WRITE); }
public final int validOps() { return SelectionKey.OP_ACCEPT; }
註冊了selector的channel便能將本來由本身調用accept的工做交由selector來代替。selector經過select()方法根據channel註冊時所關聯的感興趣的事件返回準備就緒的channel。此時,本來阻塞在channel.accept()上的操做變成了阻塞在selector.select()上。
當select()返回值大於0時,說明有channel準備就緒了,進一步處理能夠按如下步驟進行:
Set keys = selector.selectedKeys();
;selectNow()和select()不一樣之處在於前者不會阻塞當前線程,而是直接返回。
wakeUp()是用來喚醒被select()阻塞的線程的,有的時候select()阻塞的線程,咱們不想其一直被阻塞,而是一段時間內若是沒有通道就緒就繼續執行,那麼這個時候能夠在另一個線程裏調用selector.wakeUp(),可是這裏有個「坑」就是若是當前的selector沒有被阻塞在select上,那麼下一次調用該selector對象的select方法會被當即喚醒。
public class Server { public static void main(String[] args) { try { Selector selector = Selector.open(); ServerSocketChannel channel = ServerSocketChannel.open(); channel.configureBlocking(false); channel.socket().bind(new InetSocketAddress(44593)); channel.register(selector, channel.validOps()); while (true) { while (selector.select() == 0) { continue; } Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> iterator = keys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel client = server.accept(); client.configureBlocking(false); client.register(selector, SelectionKey.OP_READ); } if(key.isReadable()) { SocketChannel client = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(100); client.read(buffer); buffer.flip(); BufferUtil.printBuffer(buffer); client = (SocketChannel) key.channel(); client.register(selector, SelectionKey.OP_READ); } keys.clear(); } } } catch (Exception e) { e.printStackTrace(); } } }
public class Client { public static void main(String[] args) { try (Scanner sc = new Scanner(System.in); Socket socket = new Socket("127.0.0.1", 44593);) { String input = sc.nextLine(); while (input != null && !"".equals(input.trim())) { socket.getOutputStream().write(input.getBytes()); input = sc.nextLine(); } } catch (Exception e) { e.printStackTrace(); } } }