本身對着源碼敲一遍練習,寫上註釋。發現NIO編程難度好高啊。。雖然很複雜,可是NIO編程的有點仍是不少:java
一、客戶端發起的鏈接操做是異步的,能夠經過在多路複用器註冊OP_CONNECTION等待後續結果,不須要像BIO的客戶端同樣被同步阻塞。編程
二、SocketChannel的讀寫操做都是異步的,若是沒有可讀寫的數據它不會同步等待,直接返回,這樣I/O通訊模型就能夠處理其餘的鏈路,不須要同步等待這個鏈路可用。服務器
三、線程模型的優化:因爲JDK的Selector在Linux等主流操做系統上經過epoll實現,沒有鏈接句柄的限制,那麼Selector線程能夠同時處理成千上萬個客戶端鏈接,並且性能不會隨着客戶端的增長而線性降低。因此它很是適合作高性能、高負載的網絡服務器。網絡
TimeClient:併發
1 package nio; 2 3 public class TimeClient { 4 public static void main(String args[]){ 5 int port = 8080; 6 if(args != null && args.length > 0){ 7 try{ 8 port = Integer.valueOf(args[0]); 9 }catch(NumberFormatException e){ 10 //採用默認值 11 } 12 } 13 new Thread(new TimeClientHandle("120.0.0.1",port),"TimeClient-001").start(); 14 } 15 }
TimeClientHandler:異步
1 package nio; 2 3 import java.io.IOException; 4 import java.net.InetSocketAddress; 5 import java.nio.ByteBuffer; 6 import java.nio.channels.SelectionKey; 7 import java.nio.channels.Selector; 8 import java.nio.channels.SocketChannel; 9 import java.util.Iterator; 10 import java.util.Set; 11 12 public class TimeClientHandle implements Runnable{ 13 private String host; 14 private int port; 15 private Selector selector; 16 private SocketChannel socketChannel; 17 private volatile boolean stop; 18 19 public TimeClientHandle(String host,int port){ 20 this.host = host == null ? "127.0.0.1" : host; 21 this.port = port; 22 try{ 23 selector = Selector.open(); 24 socketChannel = SocketChannel.open(); 25 socketChannel.configureBlocking(false); 26 }catch(IOException e){ 27 e.printStackTrace(); 28 System.exit(1); 29 } 30 } 31 32 33 public void run() { 34 //發送請求鏈接 35 try{ 36 doConnect(); 37 }catch(IOException e){ 38 e.printStackTrace(); 39 System.exit(1); 40 } 41 while(!stop){ 42 try{ 43 selector.select(1000); 44 Set<SelectionKey> selectedKeys = selector.selectedKeys(); 45 Iterator<SelectionKey> it = selectedKeys.iterator(); 46 SelectionKey key = null; 47 //當有就緒的Channel時,執行handleInput(key)方法 48 while(it.hasNext()){ 49 key = it.next(); 50 it.remove(); 51 try{ 52 handleInput(key); 53 }catch(Exception e){ 54 if(key != null){ 55 key.cancel(); 56 if(key.channel() != null){ 57 key.channel().close(); 58 } 59 } 60 } 61 } 62 }catch(Exception e){ 63 e.printStackTrace(); 64 System.exit(1); 65 } 66 } 67 68 //多路複用器關閉後,全部註冊在上面的Channel和Pipe等資源都會被自動去註冊並關閉,因此不須要重複釋放資源 69 if(selector != null){ 70 try{ 71 selector.close(); 72 }catch(IOException e){ 73 e.printStackTrace(); 74 } 75 } 76 77 } 78 79 80 private void handleInput(SelectionKey key) throws IOException{ 81 if(key.isValid()){ 82 SocketChannel sc = (SocketChannel)key.channel(); 83 //判斷是否鏈接成功 84 if(key.isConnectable()){ 85 if(sc.finishConnect()){ 86 sc.register(selector, SelectionKey.OP_READ); 87 }else{ 88 System.exit(1); 89 } 90 } 91 92 if(key.isReadable()){ 93 ByteBuffer readBuffer = ByteBuffer.allocate(1024); 94 int readBytes = sc.read(readBuffer); 95 if(readBytes > 0){ 96 readBuffer.flip(); 97 byte[] bytes = new byte[readBuffer.remaining()]; 98 readBuffer.get(bytes); 99 String body = new String(bytes,"UTF-8"); 100 System.out.println("Now is :" + body); 101 this.stop = true; 102 }else if(readBytes < 0){ 103 //對端鏈路關閉 104 key.cancel(); 105 sc.close(); 106 }else{ 107 ; //讀到0字節,忽略 108 } 109 } 110 } 111 } 112 113 private void doConnect() throws IOException{ 114 //若是直接鏈接成功,則註冊到多路複用器上,發送請求信息,讀應答 115 if(socketChannel.connect(new InetSocketAddress(host,port))){ 116 socketChannel.register(selector, SelectionKey.OP_READ); 117 doWrite(socketChannel); 118 }else{ 119 //說明服務器沒有返回TCP禍首應答消息,但這並不表明鏈接失敗,當服務器返回TCP syn-ack消息後,Selector就可以輪訓這個SocketChannel處於鏈接就緒狀態 120 socketChannel.register(selector, SelectionKey.OP_CONNECT); 121 } 122 } 123 124 private void doWrite(SocketChannel sc) throws IOException{ 125 byte[] req = "QUERY TIME ORDER".getBytes(); 126 ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); 127 writeBuffer.put(req); 128 writeBuffer.flip(); 129 sc.write(writeBuffer); 130 if(!writeBuffer.hasRemaining()){ 131 System.out.println("Send order 2 server succeed."); 132 } 133 } 134 135 }
TimeServer:socket
1 package nio; 2 3 import java.io.IOException; 4 5 public class TimeServer { 6 7 public static void main(String[] args) throws IOException{ 8 int port = 8080; 9 if(args != null && args.length >0){ 10 try{ 11 port = Integer.valueOf(args[0]); 12 }catch(NumberFormatException e){ 13 //採用默認值 14 } 15 } 16 //多路複用類,是一個獨立的線程,負責輪訓多路複用器Selctor,處理多個客戶端的併發接入。 17 MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port); 18 new Thread(timeServer,"NIO-MultiplexerTimeServer-001").start(); 19 } 20 }
MultiplexerTimeServer:性能
1 package nio; 2 3 import java.io.IOException; 4 import java.net.InetSocketAddress; 5 import java.nio.ByteBuffer; 6 import java.nio.channels.SelectionKey; 7 import java.nio.channels.Selector; 8 import java.nio.channels.ServerSocketChannel; 9 import java.nio.channels.SocketChannel; 10 import java.util.Iterator; 11 import java.util.Set; 12 13 public class MultiplexerTimeServer implements Runnable { 14 15 private Selector selector; 16 17 private ServerSocketChannel servChannel; 18 19 private volatile boolean stop; 20 21 public MultiplexerTimeServer(int port){ 22 try{ 23 24 selector = Selector.open(); 25 servChannel.configureBlocking(false); 26 //將ServerSocketChannel 設置爲異步非阻塞,backlog設置爲1024 27 servChannel.socket().bind(new InetSocketAddress(port),1024); 28 //將ServerSocket Channel註冊到Selector,監聽SelectionKey.OP_ACCEPT操做位,若是初始化失敗,則退出 29 servChannel.register(selector,SelectionKey.OP_ACCEPT); 30 System.out.println("The time server is start in port:" + port); 31 }catch(IOException e){ 32 e.printStackTrace(); 33 System.exit(1); 34 } 35 } 36 37 public void stop(){ 38 this.stop = true; 39 } 40 41 public void run() { 42 while(!stop){ 43 try{ 44 //遍歷時間設置1秒,每隔一秒喚醒一次,當有處於就緒狀態的Channel時,selector將返回就緒狀態的Channel的SelectionKey集合 45 selector.select(1000); 46 Set<SelectionKey> selectedKeys = selector.selectedKeys(); 47 Iterator<SelectionKey> it = selectedKeys.iterator(); 48 SelectionKey key = null; 49 //經過對就緒狀態的Channel集合進行迭代,能夠進行網絡的異步讀寫操做 50 while(it.hasNext()){ 51 key = it.next(); 52 it.remove(); 53 try{ 54 handleInput(key); 55 }catch(Exception e){ 56 if(key != null){ 57 key.cancel(); 58 if(key.channel() != null){ 59 key.channel().close(); 60 } 61 } 62 } 63 } 64 }catch(Throwable t){ 65 t.printStackTrace(); 66 } 67 } 68 69 //多路複用器關閉後,全部註冊在上面的Channel和Pipe等資源都會被自動去註冊並關閉,因此不須要重複釋放資源 70 if(selector != null){ 71 try{ 72 selector.close(); 73 }catch(IOException e){ 74 e.printStackTrace(); 75 } 76 } 77 } 78 79 //處理新接入的請求消息 80 private void handleInput(SelectionKey key) throws IOException{ 81 if(key.isValid()){ 82 83 //根據SelectionKey的操做位進行判斷便可獲知網絡事件的類型,經過accept接收客戶端的鏈接請求並建立SocketChannel實例,完成上述操做至關於 84 //完成了TCP的三次握手,TCP物理鏈路正式創建 85 if(key.isAcceptable()){ 86 ServerSocketChannel ssc = (ServerSocketChannel)key.channel(); 87 SocketChannel sc = ssc.accept(); 88 sc.configureBlocking(false); 89 //Add the new connection tothe selector 90 sc.register(selector, SelectionKey.OP_READ); 91 } 92 93 if(key.isReadable()){ 94 //Read the data 95 96 SocketChannel sc = (SocketChannel)key.channel(); 97 ByteBuffer readBuffer = ByteBuffer.allocate(1024); 98 int readBytes = sc.read(readBuffer); 99 if(readBytes > 0){ 100 //將緩衝區當前的limit設置爲position,position設置爲0,用於後續對緩衝區的讀取操做 101 readBuffer.flip(); 102 byte[] bytes = new byte[readBuffer.remaining()]; 103 readBuffer.get(bytes); 104 String body = new String(bytes,"UTF-8"); 105 System.out.println("The time server receive order: + body"); 106 String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(System.currentTimeMillis()).toString() : "BAD ORDER"; 107 doWrite(sc,currentTime); 108 }else if(readBytes < 0){ 109 //對端鏈路關閉 110 key.cancel(); 111 sc.close(); 112 }else{ 113 ; //讀到0字節,忽略 114 } 115 } 116 } 117 } 118 119 private void doWrite(SocketChannel channel,String response) throws IOException{ 120 if(response != null && response.trim().length() >0){ 121 byte[] bytes = response.getBytes(); 122 ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); 123 writeBuffer.put(bytes); 124 writeBuffer.flip(); 125 channel.write(writeBuffer); 126 } 127 } 128 }