1、概要java
什麼是NIO Buffer Channel 網絡編程 AIO 爲何須要了解NIO和AIO?
什麼是NIO編程
NIO是New I/O的簡稱,與舊式的基於流的I/O方法相對,從名字看,它表示新的一套Java I/O標 準。它是在Java 1.4中被歸入到JDK中的,並具備如下特性: – NIO是基於塊(Block)的,它以塊爲基本單位處理數據 – 爲全部的原始類型提供(Buffer)緩存支持 – 增長通道(Channel)對象,做爲新的原始 I/O 抽象 – 支持鎖和內存映射文件的文件訪問接口 – 提供了基於Selector的異步網絡I/O
Buffer && Channel緩存
import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.nio.charset.Charset; /** * description: * * @author: dawn.he QQ: 905845006 * @email: dawn.he@cloudwise.com * @email: 905845006@qq.com * @date: 2019/10/4 11:14 PM */ public class DelayMain { public static void main(String[] args) throws IOException { //把DelayMain.java複製一份給DelayMains.java nioCopyFile("/Users/heliming/IdeaProjects/democloud/jvm/target/classes/DelayMain.class","/Users/heliming/IdeaProjects/democloud/jvm/target/classes/DelayMains.class"); test2(); // test3(); test4(); } public static void nioCopyFile(String resource, String destination) throws IOException { FileInputStream fis = new FileInputStream(resource); FileOutputStream fos = new FileOutputStream(destination); FileChannel readChannel = fis.getChannel(); FileChannel writeChannel = fos.getChannel(); ByteBuffer buffer = ByteBuffer.allocate(1024); while (true) { buffer.clear(); int len = readChannel.read(buffer); if (len == -1) { break; //讀取完畢 } buffer.flip(); //寫入文件 writeChannel.write(buffer); } readChannel.close(); writeChannel.close(); } public static void test4() throws IOException { RandomAccessFile raf = new RandomAccessFile("/Users/heliming/IdeaProjects/democloud/jvm/src/main/java/DelayMain.java", "rw"); FileChannel fc = raf.getChannel(); //將文件映射到內存中 MappedByteBuffer mbb = fc.map(FileChannel.MapMode.READ_WRITE, 0, raf.length()); //中文 Charset charset = Charset.defaultCharset(); CharBuffer charBuffer = charset.decode(mbb); while (charBuffer.hasRemaining()) { System.out.print((char) charBuffer.get()); } mbb.put(0, (byte) 98); //修改文件 raf.close(); } public static void test2() throws IOException { //2. ByteBuffer b = ByteBuffer.allocate(15); //15個字節大小的緩衝區 System.out.println("limit=" + b.limit() + " capacity=" + b.capacity() + " position=" + b.position()); for (int i = 0; i < 10; i++) { //存入10個字節數據 b.put((byte) i); } System.out.println("limit=" + b.limit() + " capacity=" + b.capacity() + " position=" + b.position()); b.flip(); //重置position System.out.println("limit=" + b.limit() + " capacity=" + b.capacity() + " position=" + b.position()); for (int i = 0; i < 5; i++) { System.out.print(b.get()); } System.out.println(); System.out.println("limit=" + b.limit() + " capacity=" + b.capacity() + " position=" + b.position()); b.flip(); System.out.println("limit=" + b.limit() + " capacity=" + b.capacity() + " position=" + b.position()); } public static void test3() throws IOException { RandomAccessFile raf = new RandomAccessFile("/Users/heliming/IdeaProjects/democloud/jvm/src/main/java/DelayMain.java", "rw"); FileChannel fc = raf.getChannel(); //將文件映射到內存中 MappedByteBuffer mbb = fc.map(FileChannel.MapMode.READ_WRITE, 0, raf.length()); while (mbb.hasRemaining()) { System.out.print((char) mbb.get()); } mbb.put(0, (byte) 98); //修改文件 raf.close(); } }
網絡編程服務器
服務端網絡
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * description: 服務端 * * @author: dawn.he QQ: 905845006 * @email: dawn.he@cloudwise.com * @email: 905845006@qq.com * @date: 2019/10/5 6:12 PM */ public class EchoServer { private static ExecutorService tp = Executors.newCachedThreadPool(); public static void main(String args[]) { ServerSocket echoServer = null; Socket clientSocket = null; try { echoServer = new ServerSocket(8000); } catch (IOException e) { System.out.println(e); } while (true) { try { clientSocket = echoServer.accept(); System.out.println(clientSocket.getRemoteSocketAddress() + " connect!"); tp.execute(new HandleMsg(clientSocket)); } catch (IOException e) { System.out.println(e); } } } static class HandleMsg implements Runnable { private Socket clientSocket ; public HandleMsg(Socket socket) { this.clientSocket = socket; } public void run() { PrintWriter os = null; BufferedReader is = null; try { is = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); os = new PrintWriter(clientSocket.getOutputStream(), true);// 從InputStream當中讀取客戶端所發送的數據 String inputLine = null; long b = System.currentTimeMillis(); while ((inputLine = is.readLine()) != null) { os.println(inputLine); } long e = System.currentTimeMillis(); System.out.println("spend:" + (e - b) + "ms"); } catch (IOException e) { e.printStackTrace(); } finally { // 關閉資源 try { is.close(); os.close(); clientSocket.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
客戶端app
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.InetSocketAddress; import java.net.Socket; /** * description: 客戶端 * * @author: dawn.he QQ: 905845006 * @email: dawn.he@cloudwise.com * @email: 905845006@qq.com * @date: 2019/10/5 9:21 PM */ public class EchoServerclient { public static void main(String[] args) throws IOException { Socket client = null; PrintWriter writer = null; BufferedReader reader = null; try { client = new Socket(); client.connect(new InetSocketAddress("localhost", 8000)); writer = new PrintWriter(client.getOutputStream(), true); writer.println("Hello!"); writer.flush(); reader = new BufferedReader(new InputStreamReader(client.getInputStream())); System.out.println("from server: " + reader.readLine()); } catch (Exception e) { } finally { //資源關閉 client.close(); writer.close(); reader.close(); } } }
網絡編程-模擬低效的客戶端dom
修改客戶端代碼異步
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.InetSocketAddress; import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.LockSupport; /** * description: 客戶端 * * @author: dawn.he QQ: 905845006 * @email: dawn.he@cloudwise.com * @email: 905845006@qq.com * @date: 2019/10/5 9:21 PM */ public class EchoServerclient { private static final int sleep_time = 1000 * 1000 * 1000; private static ExecutorService tp = Executors.newCachedThreadPool(); public static void main(String[] args) { for(int i =0 ;i<10;i++){ tp.execute(new EchoClient()); } tp.shutdown(); } public static class EchoClient implements Runnable { Socket client = null; PrintWriter writer = null; BufferedReader reader = null; public void run() { try { client = new Socket(); client.connect(new InetSocketAddress("localhost", 8000)); writer = new PrintWriter(client.getOutputStream(), true); writer.print("H"); LockSupport.parkNanos(sleep_time); writer.print("e"); LockSupport.parkNanos(sleep_time); writer.print("l"); LockSupport.parkNanos(sleep_time); writer.print("l"); LockSupport.parkNanos(sleep_time); writer.print("o"); LockSupport.parkNanos(sleep_time); writer.print("!"); LockSupport.parkNanos(sleep_time); writer.println(); writer.flush(); reader = new BufferedReader(new InputStreamReader(client.getInputStream())); System.out.println("from server: " + reader.readLine()); } catch (Exception e) { } finally { //資源關閉 try { client.close(); writer.close(); reader.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
服務器輸出以下:jvm
spend:6038ms spend:6038ms spend:6040ms spend:6041ms spend:6042ms spend:6043ms spend:6043ms spend:6043ms spend:6045ms spend:6046ms
網絡編程-NIOsocket
服務端代碼替換爲:
/** * description: * * @author: dawn.he QQ: 905845006 * @email: dawn.he@cloudwise.com * @email: 905845006@qq.com * @date: 2019/10/5 9:15 PM */ import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class NIOEchoServer { private static ExecutorService tp = Executors.newCachedThreadPool(); public static Map<Socket, Long> geym_time_stat = new HashMap<Socket, Long>(); private static ServerSocketChannel ssc = null; private static Selector selector = null; private static final int PORT = 8000; public static void startServer() throws IOException { ssc = ServerSocketChannel.open(); selector = Selector.open(); ssc.configureBlocking(false); final ServerSocket serverSocket = ssc.socket(); serverSocket.bind(new InetSocketAddress(PORT)); ssc.register(selector, SelectionKey.OP_ACCEPT); while (true) { int n = selector.select(); if (n == 0) continue; final Set<SelectionKey> readyKeys = selector.selectedKeys(); final Iterator<SelectionKey> it = readyKeys.iterator(); long e = 0; while (it.hasNext()) { final SelectionKey key = it.next(); it.remove(); if(key.isAcceptable()){ doAccept(key); }else if(key.isValid() && key.isReadable()){ if (!geym_time_stat.containsKey(((SocketChannel) key .channel()).socket())) { geym_time_stat.put( ((SocketChannel) key.channel()).socket(), System.currentTimeMillis()); } doRead(key); }else if (key.isValid() && key.isWritable()) { doWrite(key); e = System.currentTimeMillis(); long b = geym_time_stat.remove(((SocketChannel) key .channel()).socket()); System.out.println("spend:" + (e - b) + "ms"); } } } } private static void doWrite(SelectionKey key) { SocketChannel channel = (SocketChannel) key.channel(); EchoClient echoClient = (EchoClient) key.attachment(); LinkedList<ByteBuffer> outq = echoClient.getOutputQueue(); ByteBuffer bb = outq.getLast(); try { int len = channel.write(bb); if (len == -1) { disconnect(key); return; } if (bb.remaining() == 0) { outq.removeLast(); } } catch (Exception e) { disconnect(key); } if (outq.size() == 0) { key.interestOps(SelectionKey.OP_READ); } } private static void doRead(SelectionKey key) { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer bb = ByteBuffer.allocate(8192); int len; try { len = channel.read(bb); if (len < 0) { disconnect(key); return; } } catch (Exception e) { disconnect(key); return; } bb.flip(); tp.execute(new NIOEchoServer.HandleMsg(key, bb)); } private static void disconnect(SelectionKey sk) { try { sk.channel().close(); } catch (IOException e) { e.printStackTrace(); } } static class HandleMsg implements Runnable { SelectionKey sk; ByteBuffer bb; public HandleMsg(SelectionKey sk, ByteBuffer bb) { super(); this.sk = sk; this.bb = bb; } @Override public void run() { EchoClient echoClient = (EchoClient) sk.attachment(); echoClient.enqueue(bb); sk.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); //強迫selector當即返回 selector.wakeup(); } } private static void doAccept(SelectionKey key) { SocketChannel clientChannel= null; ServerSocketChannel server = (ServerSocketChannel) key.channel(); try { clientChannel = server.accept(); clientChannel.configureBlocking(false); SelectionKey clientKey = clientChannel.register(selector, SelectionKey.OP_READ); EchoClient echoClient = new EchoClient(); clientKey.attach(echoClient); InetAddress clientAddress = clientChannel.socket().getInetAddress(); System.out.println("Acception connection from "+ clientAddress.getHostAddress()+" ! "); } catch (IOException e) { System.out.println( " Failed to accept new client."); e.printStackTrace(); } } public static void main(String[] args) throws IOException { NIOEchoServer.startServer(); } } class EchoClient { private LinkedList<ByteBuffer> outq; public EchoClient() { this.outq = new LinkedList<ByteBuffer>(); } public LinkedList<ByteBuffer> getOutputQueue(){ return outq; } public void enqueue(ByteBuffer bb){ outq.addFirst(bb); } }
服務器輸出以下:
spend:7ms spend:2ms spend:2ms spend:4ms spend:1ms spend:1ms spend:1ms spend:0ms spend:0ms spend:0ms
網絡編程 AIO
讀完了再通知我 不會加快IO,只是在讀完後進行通知 使用回調函數,進行業務處理
服務器
/** * description: * * @author: dawn.he QQ: 905845006 * @email: dawn.he@cloudwise.com * @email: 905845006@qq.com * @date: 2019/10/5 11:43 PM */ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.util.concurrent.Future; public class Server { public static void main(String[] args) { try { Server server = new Server(); } catch (Exception e) { e.printStackTrace(); } } public Server() throws Exception { AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open(); InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", 8000); serverSocketChannel.bind(inetSocketAddress); Future<AsynchronousSocketChannel> accept; while (true) { // accept()不會阻塞。 accept = serverSocketChannel.accept(); System.out.println("================="); System.out.println("服務器等待鏈接..."); AsynchronousSocketChannel socketChannel = accept.get();// get()方法將阻塞。 System.out.println("服務器接受鏈接"); System.out.println("服務器與" + socketChannel.getRemoteAddress() + "創建鏈接"); ByteBuffer buffer = ByteBuffer.wrap("zhangphil".getBytes()); Future<Integer> write=socketChannel.write(buffer); while(!write.isDone()) { Thread.sleep(10); } System.out.println("服務器發送數據完畢."); socketChannel.close(); } } }
客戶端
/** * description: * * @author: dawn.he QQ: 905845006 * @email: dawn.he@cloudwise.com * @email: 905845006@qq.com * @date: 2019/10/4 8:07 PM */ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.util.concurrent.Future; public class Client { public static void main(String[] args) { AsynchronousSocketChannel socketChannel = null; try { socketChannel = AsynchronousSocketChannel.open(); InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", 8000); Future<Void> connect = socketChannel.connect(inetSocketAddress); while (!connect.isDone()) { Thread.sleep(10); } System.out.println("創建鏈接" + socketChannel.getRemoteAddress()); ByteBuffer buffer = ByteBuffer.allocate(1024); Future<Integer> read = socketChannel.read(buffer); while (!read.isDone()) { Thread.sleep(10); } System.out.println("接收服務器數據:" + new String(buffer.array(), 0, read.get())); } catch (Exception e) { e.printStackTrace(); } } }
Future接口 public V get(long timeout, TimeUnit unit)調用 private int awaitDone(boolean timed, long nanos)調用 LockSupport類的park方法和unpark方法調用 UNSAFE類的park方法和unpark方法調用 native方法阻塞當前線程。 阻塞park方法,解除阻塞unpark方法 Unsafe(提供CAS操做) LockSupport(提供park/unpark操做) 最終都會回到操做系統的cas操做