02.第二階段、實戰Java高併發程序設計模式-7.nio、aio

1、概要java

 什麼是NIO 
 Buffer
 Channel 
 網絡編程 
 AIO
 爲何須要了解NIO和AIO?
  1. 什麼是NIO編程

     NIO是New I/O的簡稱,與舊式的基於流的I/O方法相對,從名字看,它表示新的一套Java I/O標 準。它是在Java 1.4中被歸入到JDK中的,並具備如下特性:
     – NIO是基於塊(Block)的,它以塊爲基本單位處理數據
     – 爲全部的原始類型提供(Buffer)緩存支持
     – 增長通道(Channel)對象,做爲新的原始 I/O 抽象
     – 支持鎖和內存映射文件的文件訪問接口
     – 提供了基於Selector的異步網絡I/O
  2. Buffer && Channel緩存

    import java.io.FileInputStream;
    	import java.io.FileOutputStream;
    	import java.io.IOException;
    	import java.io.RandomAccessFile;
    	import java.nio.ByteBuffer;
    	import java.nio.CharBuffer;
    	import java.nio.MappedByteBuffer;
    	import java.nio.channels.FileChannel;
    	import java.nio.charset.Charset;
    
    	/**
    	 * description:
    	 *
    	 * @author: dawn.he QQ:       905845006
    	 * @email: dawn.he@cloudwise.com
    	 * @email: 905845006@qq.com
    	 * @date: 2019/10/4    11:14 PM
    	 */
    	public class DelayMain {
    		public static void main(String[] args) throws IOException {
    			//把DelayMain.java複製一份給DelayMains.java
    			nioCopyFile("/Users/heliming/IdeaProjects/democloud/jvm/target/classes/DelayMain.class","/Users/heliming/IdeaProjects/democloud/jvm/target/classes/DelayMains.class");
    			test2();
    	//        test3();
    			test4();
    
    
    		}
    
    
    		public static void nioCopyFile(String resource, String destination) throws IOException {
    
    			FileInputStream fis = new FileInputStream(resource);
    			FileOutputStream fos = new FileOutputStream(destination);
    			FileChannel readChannel = fis.getChannel();
    			FileChannel writeChannel = fos.getChannel();
    			ByteBuffer buffer = ByteBuffer.allocate(1024);
    			while (true) {
    				buffer.clear();
    				int len = readChannel.read(buffer);
    				if (len == -1) {
    					break;
    					//讀取完畢
    				}
    				buffer.flip();
    				//寫入文件
    				writeChannel.write(buffer);
    			}
    			readChannel.close();
    			writeChannel.close();
    		}
    		public static void test4() throws IOException {
    			RandomAccessFile raf = new RandomAccessFile("/Users/heliming/IdeaProjects/democloud/jvm/src/main/java/DelayMain.java", "rw");
    			FileChannel fc = raf.getChannel();
    			//將文件映射到內存中
    			MappedByteBuffer mbb = fc.map(FileChannel.MapMode.READ_WRITE, 0, raf.length());
    			//中文
    					Charset charset = Charset.defaultCharset();
    			CharBuffer charBuffer = charset.decode(mbb);
    			while (charBuffer.hasRemaining()) {
    				System.out.print((char) charBuffer.get());
    			}
    			mbb.put(0, (byte) 98); //修改文件
    			raf.close();
    		}
    		public static void test2() throws IOException {
    			//2.
    			ByteBuffer b = ByteBuffer.allocate(15); //15個字節大小的緩衝區
    			System.out.println("limit=" + b.limit() + " capacity=" + b.capacity() + " position=" + b.position());
    			for (int i = 0; i < 10; i++) { //存入10個字節數據
    				b.put((byte) i);
    			}
    			System.out.println("limit=" + b.limit() + " capacity=" + b.capacity() + " position=" + b.position());
    			b.flip(); //重置position
    			System.out.println("limit=" + b.limit() + " capacity=" + b.capacity() + " position=" + b.position());
    			for (int i = 0; i < 5; i++) {
    				System.out.print(b.get());
    			}
    			System.out.println();
    			System.out.println("limit=" + b.limit() + " capacity=" + b.capacity() + " position=" + b.position());
    			b.flip();
    			System.out.println("limit=" + b.limit() + " capacity=" + b.capacity() + " position=" + b.position());
    
    		}
    		public static void test3() throws IOException {
    			RandomAccessFile raf = new RandomAccessFile("/Users/heliming/IdeaProjects/democloud/jvm/src/main/java/DelayMain.java", "rw");
    			FileChannel fc = raf.getChannel();
    			//將文件映射到內存中
    			MappedByteBuffer mbb = fc.map(FileChannel.MapMode.READ_WRITE, 0, raf.length());
    			while (mbb.hasRemaining()) {
    				System.out.print((char) mbb.get());
    			}
    			mbb.put(0, (byte) 98); //修改文件
    			raf.close();
    		}
    
    	}

  3. 網絡編程服務器

    服務端網絡

    import java.io.BufferedReader;
    	import java.io.IOException;
    	import java.io.InputStreamReader;
    	import java.io.PrintWriter;
    	import java.net.ServerSocket;
    	import java.net.Socket;
    	import java.util.concurrent.ExecutorService;
    	import java.util.concurrent.Executors;
    
    	/**
    	 * description: 服務端
    	 *
    	 * @author: dawn.he QQ:       905845006
    	 * @email: dawn.he@cloudwise.com
    	 * @email: 905845006@qq.com
    	 * @date: 2019/10/5    6:12 PM
    	 */
    	public class EchoServer {
    		private static ExecutorService tp = Executors.newCachedThreadPool();
    
    		public static void main(String args[]) {
    			ServerSocket echoServer = null;
    			Socket clientSocket = null;
    
    			try {
    				echoServer = new ServerSocket(8000);
    			} catch (IOException e) {
    				System.out.println(e);
    			}
    			while (true) {
    				try {
    					clientSocket = echoServer.accept();
    					System.out.println(clientSocket.getRemoteSocketAddress() + " connect!");
    					tp.execute(new HandleMsg(clientSocket));
    				} catch (IOException e) {
    					System.out.println(e);
    				}
    			}
    		}
    
    		static class HandleMsg implements Runnable {
    
    			private Socket clientSocket ;
    
    			public HandleMsg(Socket socket) {
    				this.clientSocket = socket;
    			}
    
    			public void run() {
    				PrintWriter os = null;
    				BufferedReader is = null;
    				try {
    					is = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
    					os = new PrintWriter(clientSocket.getOutputStream(), true);// 從InputStream當中讀取客戶端所發送的數據
    					String inputLine = null;
    					long b = System.currentTimeMillis();
    					while ((inputLine = is.readLine()) != null) {
    						os.println(inputLine);
    					}
    					long e = System.currentTimeMillis();
    					System.out.println("spend:" + (e - b) + "ms");
    				} catch (IOException e) {
    					e.printStackTrace();
    				} finally {
    	//                關閉資源
    					try {
    						is.close();
    						os.close();
    						clientSocket.close();
    					} catch (IOException e) {
    						e.printStackTrace();
    					}
    				}
    			}
    		}
    
    	}

    客戶端app

    import java.io.BufferedReader;
    	import java.io.IOException;
    	import java.io.InputStreamReader;
    	import java.io.PrintWriter;
    	import java.net.InetSocketAddress;
    	import java.net.Socket;
    
    	/**
    	 * description: 客戶端
    	 *
    	 * @author: dawn.he QQ:       905845006
    	 * @email: dawn.he@cloudwise.com
    	 * @email: 905845006@qq.com
    	 * @date: 2019/10/5    9:21 PM
    	 */
    	public class EchoServerclient {
    
    		public static void main(String[] args) throws IOException {
    			Socket client = null;
    			PrintWriter writer = null;
    			BufferedReader reader = null;
    			try {
    				client = new Socket();
    				client.connect(new InetSocketAddress("localhost", 8000));
    				writer = new PrintWriter(client.getOutputStream(), true);
    				writer.println("Hello!");
    				writer.flush();
    				reader = new BufferedReader(new InputStreamReader(client.getInputStream()));
    				System.out.println("from server: " + reader.readLine());
    			} catch (Exception e) {
    			} finally {
    				//資源關閉
    				client.close();
    				writer.close();
    				reader.close();
    			}
    		}
    
    	}

    網絡編程-模擬低效的客戶端dom

    修改客戶端代碼異步

    import java.io.BufferedReader;
    	import java.io.IOException;
    	import java.io.InputStreamReader;
    	import java.io.PrintWriter;
    	import java.net.InetSocketAddress;
    	import java.net.Socket;
    	import java.util.concurrent.ExecutorService;
    	import java.util.concurrent.Executors;
    	import java.util.concurrent.locks.LockSupport;
    
    	/**
    	 * description: 客戶端
    	 *
    	 * @author: dawn.he QQ:       905845006
    	 * @email: dawn.he@cloudwise.com
    	 * @email: 905845006@qq.com
    	 * @date: 2019/10/5    9:21 PM
    	 */
    	public class EchoServerclient {
    		private static final int sleep_time = 1000 * 1000 * 1000;
    		private static ExecutorService tp = Executors.newCachedThreadPool();
    
    		public static void main(String[] args) {
    			for(int i =0 ;i<10;i++){
    				tp.execute(new EchoClient());
    			}
    			tp.shutdown();
    		}
    
    
    		public static class EchoClient implements Runnable {
    			Socket client = null;
    			PrintWriter writer = null;
    			BufferedReader reader = null;
    			public void run() {
    					try {
    						client = new Socket();
    						client.connect(new InetSocketAddress("localhost", 8000));
    						writer = new PrintWriter(client.getOutputStream(), true);
    						writer.print("H");
    						LockSupport.parkNanos(sleep_time);
    						writer.print("e");
    						LockSupport.parkNanos(sleep_time);
    						writer.print("l");
    						LockSupport.parkNanos(sleep_time);
    						writer.print("l");
    						LockSupport.parkNanos(sleep_time);
    						writer.print("o");
    						LockSupport.parkNanos(sleep_time);
    						writer.print("!");
    						LockSupport.parkNanos(sleep_time);
    						writer.println();
    						writer.flush();
    						reader = new BufferedReader(new InputStreamReader(client.getInputStream()));
    						System.out.println("from server: " + reader.readLine());
    					} catch (Exception e) {
    					} finally {
    						//資源關閉
    
    						try {
    							client.close();
    							writer.close();
    							reader.close();
    						} catch (IOException e) {
    							e.printStackTrace();
    						}
    					}
    			}
    		}
    	}

    服務器輸出以下:jvm

    spend:6038ms
     spend:6038ms
     spend:6040ms
     spend:6041ms
     spend:6042ms
     spend:6043ms
     spend:6043ms
     spend:6043ms
     spend:6045ms
     spend:6046ms
  4. 網絡編程-NIOsocket

    服務端代碼替換爲:

    /**
    	 * description:
    	 *
    	 * @author: dawn.he QQ:       905845006
    	 * @email: dawn.he@cloudwise.com
    	 * @email: 905845006@qq.com
    	 * @date: 2019/10/5    9:15 PM
    	 */
    
    	import java.io.IOException;
    	import java.net.InetAddress;
    	import java.net.InetSocketAddress;
    	import java.net.ServerSocket;
    	import java.net.Socket;
    	import java.nio.ByteBuffer;
    	import java.nio.channels.SelectionKey;
    	import java.nio.channels.Selector;
    	import java.nio.channels.ServerSocketChannel;
    	import java.nio.channels.SocketChannel;
    	import java.util.HashMap;
    	import java.util.Iterator;
    	import java.util.LinkedList;
    	import java.util.Map;
    	import java.util.Set;
    	import java.util.concurrent.ExecutorService;
    	import java.util.concurrent.Executors;
    
    	public class NIOEchoServer
    	{
    		private static ExecutorService tp = Executors.newCachedThreadPool();
    
    		public static Map<Socket, Long> geym_time_stat = new HashMap<Socket, Long>();
    
    		private static ServerSocketChannel ssc = null;
    
    		private static Selector selector = null;
    
    		private static final int PORT = 8000;
    
    		public static void startServer() throws IOException
    		{
    			ssc = ServerSocketChannel.open();
    			selector = Selector.open();
    			ssc.configureBlocking(false);
    
    			final ServerSocket serverSocket = ssc.socket();
    
    			serverSocket.bind(new InetSocketAddress(PORT));
    
    			ssc.register(selector, SelectionKey.OP_ACCEPT);
    			while (true)
    			{
    				int n = selector.select();
    				if (n == 0)
    					continue;
    
    				final Set<SelectionKey> readyKeys = selector.selectedKeys();
    				final Iterator<SelectionKey> it = readyKeys.iterator();
    				long e = 0;
    				while (it.hasNext())
    				{
    					final SelectionKey key = it.next();
    					it.remove();
    					if(key.isAcceptable()){
    						doAccept(key);
    					}else if(key.isValid() && key.isReadable()){
    						if (!geym_time_stat.containsKey(((SocketChannel) key
    								.channel()).socket())) {
    							geym_time_stat.put(
    									((SocketChannel) key.channel()).socket(),
    									System.currentTimeMillis());
    						}
    						doRead(key);
    					}else if (key.isValid() && key.isWritable()) {
    						doWrite(key);
    						e = System.currentTimeMillis();
    						long b = geym_time_stat.remove(((SocketChannel) key
    								.channel()).socket());
    						System.out.println("spend:" + (e - b) + "ms");
    					}
    				}
    			}
    		}
    
    		private static void doWrite(SelectionKey key) {
    			SocketChannel channel = (SocketChannel) key.channel();
    			EchoClient echoClient = (EchoClient) key.attachment();
    			LinkedList<ByteBuffer> outq = echoClient.getOutputQueue();
    			ByteBuffer bb = outq.getLast();
    			try {
    				int len = channel.write(bb);
    				if (len == -1) {
    					disconnect(key);
    					return;
    				}
    				if (bb.remaining() == 0) {
    					outq.removeLast();
    				}
    			} catch (Exception e) {
    				disconnect(key);
    			}
    			if (outq.size() == 0) {
    				key.interestOps(SelectionKey.OP_READ);
    			}
    		}
    
    		private static void doRead(SelectionKey key) {
    
    			SocketChannel channel = (SocketChannel) key.channel();
    			ByteBuffer bb = ByteBuffer.allocate(8192);
    			int len;
    			try {
    				len = channel.read(bb);
    				if (len < 0) {
    					disconnect(key);
    					return;
    				}
    			} catch (Exception e) {
    				disconnect(key);
    				return;
    			}
    			bb.flip();
    			tp.execute(new NIOEchoServer.HandleMsg(key, bb));
    		}
    		private static void disconnect(SelectionKey sk) {
    			try {
    				sk.channel().close();
    			} catch (IOException e) {
    				e.printStackTrace();
    			}
    		}
    		static class HandleMsg implements Runnable {
    			SelectionKey sk;
    			ByteBuffer bb;
    
    			public HandleMsg(SelectionKey sk, ByteBuffer bb) {
    				super();
    				this.sk = sk;
    				this.bb = bb;
    			}
    
    			@Override
    			public void run() {
    				EchoClient echoClient = (EchoClient) sk.attachment();
    				echoClient.enqueue(bb);
    				sk.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
    				//強迫selector當即返回
    				selector.wakeup();
    			}
    
    		}
    		private static void doAccept(SelectionKey key) {
    			SocketChannel clientChannel= null;
    			ServerSocketChannel server = (ServerSocketChannel) key.channel();
    			try {
    				clientChannel = server.accept();
    				clientChannel.configureBlocking(false);
    				SelectionKey clientKey = clientChannel.register(selector, SelectionKey.OP_READ);
    				EchoClient echoClient = new EchoClient();
    				clientKey.attach(echoClient);
    
    				InetAddress clientAddress = clientChannel.socket().getInetAddress();
    				System.out.println("Acception connection from "+ clientAddress.getHostAddress()+" ! ");
    
    			} catch (IOException e) {
    				System.out.println( " Failed to accept new client.");
    				e.printStackTrace();
    			}
    		}
    
    		public static void main(String[] args) throws IOException
    		{
    			NIOEchoServer.startServer();
    		}
    	}
    	class EchoClient
    	{
    		private LinkedList<ByteBuffer> outq;
    
    		public EchoClient() {
    			this.outq = new LinkedList<ByteBuffer>();
    		}
    
    		public LinkedList<ByteBuffer> getOutputQueue(){
    			return  outq;
    		}
    
    		public void enqueue(ByteBuffer bb){
    			outq.addFirst(bb);
    
    		}
    
    	}

    服務器輸出以下:

    spend:7ms
     spend:2ms
     spend:2ms
     spend:4ms
     spend:1ms
     spend:1ms
     spend:1ms
     spend:0ms
     spend:0ms
     spend:0ms
  5. 網絡編程 AIO

     讀完了再通知我
      不會加快IO,只是在讀完後進行通知 
      使用回調函數,進行業務處理

    服務器

    /**
    	 * description:
    	 *
    	 * @author: dawn.he QQ:       905845006
    	 * @email: dawn.he@cloudwise.com
    	 * @email: 905845006@qq.com
    	 * @date: 2019/10/5    11:43 PM
    	 */
    
    	import java.net.InetSocketAddress;
    	import java.nio.ByteBuffer;
    	import java.nio.channels.AsynchronousServerSocketChannel;
    	import java.nio.channels.AsynchronousSocketChannel;
    	import java.util.concurrent.Future;
    
    	public class Server {
    		public static void main(String[] args) {
    			try {
    				Server server = new Server();
    			} catch (Exception e) {
    				e.printStackTrace();
    			}
    		}
    
    		public Server() throws Exception {
    			AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
    			InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", 8000);
    			serverSocketChannel.bind(inetSocketAddress);
    
    			Future<AsynchronousSocketChannel> accept;
    
    			while (true) {
    				// accept()不會阻塞。
    				accept = serverSocketChannel.accept();
    
    				System.out.println("=================");
    				System.out.println("服務器等待鏈接...");
    				AsynchronousSocketChannel socketChannel = accept.get();// get()方法將阻塞。
    
    				System.out.println("服務器接受鏈接");
    				System.out.println("服務器與" + socketChannel.getRemoteAddress() + "創建鏈接");
    
    				ByteBuffer buffer = ByteBuffer.wrap("zhangphil".getBytes());
    				Future<Integer> write=socketChannel.write(buffer);
    
    				while(!write.isDone()) {
    					Thread.sleep(10);
    				}
    
    				System.out.println("服務器發送數據完畢.");
    				socketChannel.close();
    			}
    		}
    	}

    客戶端

    /**
    	 * description:
    	 *
    	 * @author: dawn.he QQ:       905845006
    	 * @email: dawn.he@cloudwise.com
    	 * @email: 905845006@qq.com
    	 * @date: 2019/10/4    8:07 PM
    	 */
    	import java.net.InetSocketAddress;
    	import java.nio.ByteBuffer;
    	import java.nio.channels.AsynchronousSocketChannel;
    	import java.util.concurrent.Future;
    
    	public class Client {
    		public static void main(String[] args) {
    			AsynchronousSocketChannel socketChannel = null;
    			try {
    				socketChannel = AsynchronousSocketChannel.open();
    				InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", 8000);
    				Future<Void> connect = socketChannel.connect(inetSocketAddress);
    
    				while (!connect.isDone()) {
    					Thread.sleep(10);
    				}
    
    				System.out.println("創建鏈接" + socketChannel.getRemoteAddress());
    
    				ByteBuffer buffer = ByteBuffer.allocate(1024);
    				Future<Integer> read = socketChannel.read(buffer);
    
    				while (!read.isDone()) {
    					Thread.sleep(10);
    				}
    
    				System.out.println("接收服務器數據:" + new String(buffer.array(), 0, read.get()));
    			} catch (Exception e) {
    				e.printStackTrace();
    			}
    		}
    	}
    Future接口
     public V get(long timeout, TimeUnit unit)調用
    
     private int awaitDone(boolean timed, long nanos)調用
    
     LockSupport類的park方法和unpark方法調用
    
      UNSAFE類的park方法和unpark方法調用
    
     native方法阻塞當前線程。
    
     阻塞park方法,解除阻塞unpark方法
    
     Unsafe(提供CAS操做)
     LockSupport(提供park/unpark操做)
     最終都會回到操做系統的cas操做
相關文章
相關標籤/搜索