NIO究竟是什麼的簡稱?有人喜歡稱之爲New IO,由於它相對於之前的IO是新增的,因此官方稱之爲New IO。可是,因爲以前的IO類庫是阻塞的,New IO就是要讓Java可以支持非阻塞IO,因此,也有人喜歡稱之爲Non-block IO。 java
Buffer 是一個對象, 它包含一些要寫入或者剛讀出的數據。 在 NIO 中加入 Buffer 對象,體現了新庫與原 I/O 的一個重要區別。在面向流的 I/O 中,您將數據直接寫入或者將數據直接讀到 Stream 對象中。
在 NIO 庫中,全部數據都是用緩衝區處理的。在讀取數據時,它是直接讀到緩衝區中的。在寫入數據時,它是寫入到緩衝區中的。任什麼時候候訪問 NIO 中的數據,您都是將它放到緩衝區中。
緩衝區實質上是一個數組。一般它是一個字節數組,可是也可使用其餘種類的數組。可是一個緩衝區不 僅僅 是一個數組。緩衝區提供了對數據的結構化訪問,並且還能夠跟蹤系統的讀/寫進程。
最經常使用的緩衝區類型是 ByteBuffer。一個 ByteBuffer 能夠在其底層字節數組上進行 get/set 操做(即字節的獲取和設置)。react
Channel是一個通道,能夠經過它讀取與寫入數據,它就像自來水管同樣,網絡數據經過Channel讀取和寫入。通道與流的不一樣之處在於通道是雙向的,流只是在一個方向上移動(一個流必須是 InputStream 或者 OutputStream 的子類),並且通道能夠用於讀、寫或者同時用於讀寫。
由於Channel是全雙工的,因此它能夠比流更好的映射底層操做系統的API。特別是在UNIX網絡編程模型中,底層操做系統的通道都是全雙工的,同時支持讀寫操做。編程
多路複用器提供選擇已經就緒的任務的能力。簡單來說,Selector會不斷地輪詢註冊在其上的Channel,若是某個channel上有新的TCP鏈接接入、讀和寫事件,這個Channel就處於就緒狀態,會被Selector輪詢出來,而後經過SelectionKey能夠獲取就緒Channel的集合,進行後續的I/O操做。
一個多路複用器Selector能夠同時輪詢多個Channel,因爲JDK使用epool()代替傳統的select實現,因此它並無最大連續句柄1024/2048的限制。這也就意味着只須要一個線程負責Selector的輪詢,就能夠接入成千上萬的客戶端。數組
ServerSocketChannel acceptor = ServerSocketChannel.open();
int port = 8080; acceptor.socket().bind(new InetSocketAddress(InetAddress.getByName("IP"),port)); acceptor.configureBlocking(false);
Selector selector = Selector.open(); new Thread(new ReactorTask()).start();
SelectionKey key = acceptor.register(selector,SelectionKey.OP_ACCEPT,ioHandler);
int num = selector.select(); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> keys = selectedKeys.iterator(); while(keys.hasNext()){ SelectionKey key = keys.next(); //doWhat }
SocketChannel sc = serverChannel.accept();
sc.configureBlocking(false); sc.socket().setReuseAddress(true);
SelectionKey key = sc.register(selector,SelectionKey.OP_READ);
int number = sc.read(receivedBuffer);
while(buffer.hasRemain){ writeBuffer(); }
/** * used to test nio * Created by spark on 10/14/16. */ public class MultiplexerTimeServer implements Runnable { private Selector selector; private ServerSocketChannel serverSocketChannel; private volatile boolean stop; /** * 初始化多路複用器,綁定監聽端口 * @param port */ public MultiplexerTimeServer(int port) { try { selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); //設爲異步非阻塞 serverSocketChannel.configureBlocking(false); //backlog設爲1024 serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("The time server is start in port:" + port); } catch (IOException e) { e.printStackTrace(); System.exit(1); } } public void stop() { this.stop = true; } /** * 根據key的操做位獲取網絡事件的類型 TCP三次握手過程 * @param key * @throws IOException */ private void handleInput(SelectionKey key) throws IOException { if (key.isValid()) { if (key.isAcceptable()) { ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel sc = ssc.accept(); sc.configureBlocking(false); sc.register(selector, SelectionKey.OP_READ); } if(key.isReadable()){ SocketChannel sc = (SocketChannel) key.channel(); //經過ByteBuffer讀取客戶端的請求信息 開闢1K的緩衝區 ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(readBuffer); if(readBytes > 0){ readBuffer.flip(); byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); String body = new String(bytes,"UTF-8"); System.out.println("The time server received order : " + body ); String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER"; doWrite(sc,currentTime); }else if(readBytes < 0){ key.cancel(); sc.close(); }else{ } } } } /** * 經過ByteBuffer將應答消息異步發送給客戶端 * @param socketChannel * @param response * @throws IOException */ private void doWrite(SocketChannel socketChannel,String response) throws IOException { if(response != null && response.trim().length() > 0){ byte[] bytes = response.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); socketChannel.write(writeBuffer); } } @Override public void run() { //遍歷selector,間隔爲1s while (!stop) { try { selector.select(1000); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectedKeys.iterator(); SelectionKey key = null; //有就緒狀態的Channel時,selector返回就緒狀態的Channel的SelectionKey集合,經過對就緒狀態的Channel集合進行迭代,進行異步讀寫操做 while (it.hasNext()) { key = it.next(); it.remove(); try { handleInput(key); } catch (IOException e) { if(key != null){ key.cancel(); if(key.channel() != null){ key.channel().close(); } } } } } catch (IOException e) { e.printStackTrace(); } } } }
public class TimeServer { public static void main(String[] args) { int port = 8080; if(args != null && args.length > 0){ try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { e.printStackTrace(); } } MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port); new Thread(timeServer,"NIO-MultiplexerTimeServer-001").start(); } }
SocketChannel clientChannel = SocketChannel.open();
clientChannel.configureBlocking(false); clientChannel.socket().setReuseAddress(true); clientChannel.socket().setReceiveBufferSize(BUFFER_SIZE); clientChannel.socket().setSendBufferSize(BUFFER_SIZE);
boolean connected = clientChannel.connect(new InetSocketAddress("ip",port));
if(connected){ clientChannel.register(selector,SelectionKey.OP_READ,ioHandler); }else{ clientChannel.register(selector,SelectionKey.OP_CONNECT,ioHandler); }
Selector selector = Selector.open(); new Thread(new ReactorTask()).start();
int num = selector.select(); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> keys = selectedKeys.iterator(); while(keys.hasNext()){ SelectionKey key = keys.next(); }
if(key.isConnectable()){ //handleConnect }
if(clientChannel.finishConnect()) registerRead();
int number = sc.read(receivedBuffer); while(buffer.hasRemain){ }
public class TimeClientHandler implements Runnable { private String host; private int port; private Selector selector; private SocketChannel socketChannel; private volatile boolean stop; public TimeClientHandler(int port, String host) { this.port = port; this.host = host == null ? "127.0.0.1" : host; try { selector = Selector.open(); socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); } catch (IOException e) { e.printStackTrace(); System.exit(1); } } private void handleInput(SelectionKey key) throws IOException { if(key.isValid()){ SocketChannel sc = (SocketChannel) key.channel(); if(key.isConnectable()){ if(sc.finishConnect()){ sc.register(selector,SelectionKey.OP_READ); doWrite(sc); }else{ System.exit(1); } } if(key.isReadable()){ ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(readBuffer); if(readBytes > 0) { readBuffer.flip(); byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); String body = new String(bytes, "UTF-8"); System.out.println("Now is " + body); this.stop = true; }else if(readBytes < 0){ key.cancel(); sc.close(); }else{ } } } } private void doWrite(SocketChannel sc) throws IOException { byte[] req = "QUERY TIME ORDER".getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); writeBuffer.put(req); writeBuffer.flip(); sc.write(writeBuffer); if(!writeBuffer.hasRemaining()){ System.out.println("Send order 2 server succeed."); } } private void doConnect() throws IOException { if(socketChannel.connect(new InetSocketAddress(host,port))){ socketChannel.register(selector,SelectionKey.OP_READ); doWrite(socketChannel); }else{ socketChannel.register(selector,SelectionKey.OP_CONNECT); } } @Override public void run() { try { doConnect(); } catch (IOException e) { e.printStackTrace(); System.exit(1); } while (!stop) { try { selector.select(1000); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectedKeys.iterator(); SelectionKey key = null; while (it.hasNext()) { key = it.next(); it.remove(); try { handleInput(key); } catch (IOException e) { if(key != null){ key.cancel(); if(key.channel() != null){ key.channel().close(); } } } } } catch (IOException e) { e.printStackTrace(); System.exit(1); } } if(selector != null){ try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } } }
public class TimeClient { public static void main(String[] args) { int port = 8080; if(args != null && args.length > 0){ try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { e.printStackTrace(); } } new Thread(new TimeClientHandler(port,"127.0.0.1"),"TimeClient-001").start(); } }
NIO2.0引入了新的異步通道概念,並提供了異步文件通道和異步套接字通道的實現。
異步通道提供兩種方式獲取操做結果網絡
- 經過java.util.concurrent.Feature類來表示異步操做的結果;
- 在執行異步操做的時候傳入一個java.nio.channels。
CompletionHandler接口的實現類做爲操做完成的回調。
public class AsyncTimeServerHandler implements Runnable { private int port; CountDownLatch latch; AsynchronousServerSocketChannel asynchronousServerSocketChannel; public AsyncTimeServerHandler(int port) { this.port = port; try { asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open(); // 建立一個異步服務端通道。 asynchronousServerSocketChannel.bind(new InetSocketAddress(port));// bind 一個監聽端口 System.out.println("The time server is start in port : " + port); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { latch = new CountDownLatch(1); // 在完成一組正在執行的操做以前,容許當前的線程一直阻塞。 doAccept(); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } public void doAccept() { asynchronousServerSocketChannel.accept(this, new AcceptCompletionHandler());// 處理接受消息的通知。 } }
public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel,AsyncTimeServerHandler> { @Override public void completed(AsynchronousSocketChannel result, AsyncTimeServerHandler attachment) { attachment.asynchronousServerSocketChannel.accept(attachment, this); ByteBuffer buffer = ByteBuffer.allocate(1024); result.read(buffer, buffer, new ReadCompletionHandler(result)); } @Override public void failed(Throwable exc, AsyncTimeServerHandler attachment) { exc.printStackTrace(); attachment.latch.countDown(); } }
public class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel channel; public ReadCompletionHandler(AsynchronousSocketChannel channel) { if (this.channel == null) this.channel = channel; } @Override public void completed(Integer result, ByteBuffer attachment) { attachment.flip(); byte[] body = new byte[attachment.remaining()]; attachment.get(body); try { String req = new String(body, "UTF-8"); System.out.println("The time server receive order : " + req); String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(req) ? new java.util.Date( System.currentTimeMillis()).toString() : "BAD ORDER"; doWrite(currentTime); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { this.channel.close(); } catch (IOException e) { e.printStackTrace(); } } private void doWrite(String currentTime) { if (currentTime != null && currentTime.trim().length() > 0) { byte[] bytes = (currentTime).getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); channel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer buffer) { // 若是沒有發送完成,繼續發送 if (buffer.hasRemaining()) channel.write(buffer, buffer, this); } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { channel.close(); } catch (IOException e) { // ingnore on close } } }); } } }
public class TimeServer { public static void main(String[] args) throws IOException { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 採用默認值 } } AsyncTimeServerHandler timeServer = new AsyncTimeServerHandler(port); new Thread(timeServer, "AIO-AsyncTimeServerHandler-001").start(); } }
public class AsyncTimeClientHandler implements CompletionHandler<Void, AsyncTimeClientHandler>, Runnable{ private AsynchronousSocketChannel client; private String host; private int port; private CountDownLatch latch; public AsyncTimeClientHandler(String host, int port) { this.host = host; this.port = port; try { client = AsynchronousSocketChannel.open(); } catch (IOException e) { e.printStackTrace(); } } @Override public void completed(Void result, AsyncTimeClientHandler attachment) { byte[] req = "QUERY TIME ORDER".getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); writeBuffer.put(req); writeBuffer.flip(); client.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer buffer) { if (buffer.hasRemaining()) { client.write(buffer, buffer, this); } else { ByteBuffer readBuffer = ByteBuffer.allocate(1024); client.read( readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer buffer) { buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String body; try { body = new String(bytes, "UTF-8"); System.out.println("Now is : " + body); latch.countDown(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { client.close(); latch.countDown(); } catch (IOException e) { // ingnore on close } } }); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { client.close(); latch.countDown(); } catch (IOException e) { // ingnore on close } } }); } @Override public void failed(Throwable exc, AsyncTimeClientHandler attachment) { exc.printStackTrace(); try { client.close(); latch.countDown(); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { latch = new CountDownLatch(1); client.connect(new InetSocketAddress(host, port), this, this); try { latch.await(); } catch (InterruptedException e1) { e1.printStackTrace(); } try { client.close(); } catch (IOException e) { e.printStackTrace(); } } }
public class TimeClient { public static void main(String[] args) { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 採用默認值 } } new Thread(new AsyncTimeClientHandler("127.0.0.1", port), "AIO-AsyncTimeClientHandler-001").start(); } }