使用NIO編程的優勢:java
1.客戶端發起的鏈接操做是異步的,能夠經過在多路複用器註冊OP_CONNECT等待後續結果,不須要像以前的客戶端那樣被同步阻塞.編程
2.SocketChannel的讀寫操做都是異步的,若是沒有可讀寫的數據它不會同步等待,直接返回,這樣I/O通訊線程就能夠處理其餘的鏈路,不須要同步等待這個鏈路可用。服務器
3.線程模型的優化:因爲JDK的Selector在Linux等主流操做系統上經過epoll實現,它沒有鏈接句柄數的限制(只受限於操做系統的最大句柄數或者單個進程的句柄限制),這意味着一個Selector線程能夠同時處理成千上萬個客戶端鏈接,並且性能不會隨着客戶端的增長而線性降低。所以,它很是適合作高性能、高負載的網絡服務器。網絡
Server:異步
package netty.chapter2; public class TimeServer { public static void main(String[] args) { int port = 8090; MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port); new Thread(timeServer,"NIO-MultiplexerTimeServer-001").start(); } }
package netty.chapter2; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Date; import java.util.Iterator; import java.util.Set; public class MultiplexerTimeServer implements Runnable{ private Selector selector; private ServerSocketChannel servChannel; private volatile boolean stop; /** * @Description 初始化多路複用器,綁定監聽端口 */ public MultiplexerTimeServer(int port) { try { //1.打開ServerSocketChannel,用於監聽客戶端的鏈接,它是全部客戶端鏈接的父管道. servChannel = ServerSocketChannel.open(); //2.綁定監聽端口,設置鏈接爲非阻塞模式 servChannel.socket().bind(new InetSocketAddress(port), 1024); servChannel.configureBlocking(false); //3.建立多路複用器 selector = Selector.open(); //4.將ServerSocketChannel註冊到多路複用器Selector上,監聽ACCEPT事件 servChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("The time server is started in port:"+port); } catch (IOException e) { e.printStackTrace(); System.exit(1); } } public void stop(){ this.stop = true; } public void run() { while(!stop){ try { //5.輪詢準備就緒的Key selector.select(1000); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectedKeys.iterator(); SelectionKey key = null; while(it.hasNext()){ key = it.next(); it.remove(); try { handleInput(key); } catch (Exception e) { if(key != null){ key.cancel(); if(key.channel() != null) key.channel().close(); } } } } catch (Exception e) { e.printStackTrace(); } } } private void handleInput(SelectionKey key) throws IOException{ if(key.isValid()){ //處理新接入的請求消息 if(key.isAcceptable()){ //6.處理新的接入請求 ServerSocketChannel ssc = (ServerSocketChannel)key.channel(); SocketChannel sc = ssc.accept(); //7.設置客戶端鏈路爲非阻塞模式 sc.configureBlocking(false); //8.將新接入的客戶端鏈接註冊到多路複用器上,監聽讀操做,讀取客戶端發送的網絡消息 sc.register(selector, SelectionKey.OP_READ); } if(key.isReadable()){ SocketChannel sc = (SocketChannel)key.channel(); ByteBuffer readBuffer = ByteBuffer.allocate(1024); //9.異步讀取客戶端請求消息到緩衝區 int readBytes = sc.read(readBuffer); if(readBytes >0){ readBuffer.flip(); byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); String body = new String(bytes,"UTF-8"); System.out.println("The time server receive order:"+body); String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER"; doWrite(sc, currentTime); }else if(readBytes <0){ key.cancel(); sc.close(); }else{ System.out.println("0 size bytes"); } } } } private void doWrite(SocketChannel channel,String response) throws IOException{ //10.將消息異步發送給客戶端 if(response !=null && response.trim().length()>0){ byte[] bytes = response.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); channel.write(writeBuffer); } } }
Client:socket
package netty.chapter2; public class TimeClient { public static void main(String[] args) { int port = 8090; new Thread(new TimeClinetHandle("127.0.0.1", port), "TimeClient-001").start(); } }
package netty.chapter2; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; public class TimeClinetHandle implements Runnable { private String host; private int port; private Selector selector; private SocketChannel socketChannel; private volatile boolean stop; public TimeClinetHandle(String host, int port) { this.host = host == null ? "127.0.0.1" : host; this.port = port; try { //1.打開SocketChannel socketChannel = SocketChannel.open(); //2.設置SocketChannel爲非阻塞模式 socketChannel.configureBlocking(false); //4.建立多路複用器 selector = Selector.open(); } catch (Exception e) { e.printStackTrace(); System.exit(1); } } public void run() { try { doConnect(); } catch (IOException e) { e.printStackTrace(); System.exit(1); } while (!stop) { try { //5.輪詢準備就緒的key selector.select(1000); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectedKeys.iterator(); SelectionKey key = null; while (iterator.hasNext()) { key = iterator.next(); iterator.remove(); try { handleInput(key); } catch (Exception e) { if (key != null) { key.cancel(); if (key.channel() != null) key.channel().close(); } } } } catch (Exception e) { e.printStackTrace(); System.exit(1); } } //多路複用器關閉後,全部註冊在上面的Channel和Pipe等資源都會被自動去註冊並關閉,因此不須要重複釋放資源 if (selector != null) { try { selector.close(); } catch (Exception e) { e.printStackTrace(); } } } private void handleInput(SelectionKey key) throws IOException { if (key.isValid()) { SocketChannel sc = (SocketChannel) key.channel(); //6.接收connect事件進行處理 if (key.isConnectable()) { //7.鏈接成功,註冊讀事件到多路複用器 if (sc.finishConnect()) { sc.register(selector, SelectionKey.OP_READ); doWrite(sc); } else { System.exit(1); } } //8.異步讀服務器返回消息到緩衝區 if (key.isReadable()) { ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(readBuffer); if (readBytes > 0) { readBuffer.flip(); byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); String body = new String(bytes, "UTF-8"); System.out.println("Now is:" + body); this.stop = true; } else if (readBytes < 0) { key.cancel(); sc.close(); } else { System.out.println("0 size bytes."); } } } } private void doConnect() throws IOException { //3.異步鏈接服務器 //成功,註冊 if (socketChannel.connect(new InetSocketAddress(host, port))) { socketChannel.register(selector, SelectionKey.OP_READ); doWrite(socketChannel); } else {//註冊,監聽服務器的ack socketChannel.register(selector, SelectionKey.OP_CONNECT); } } private void doWrite(SocketChannel sc) throws IOException { //9.調用SocketChannel的異步write接口,將消息異步發送給服務器 byte[] req = "QUERY TIME ORDER".getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); writeBuffer.put(req); writeBuffer.flip(); sc.write(writeBuffer); if (!writeBuffer.hasRemaining()) { System.out.println("Send order 2 server succeed."); } } }