在Java的NIO中,有三個比較重要的概念:Buffer、Channel和Selector。java
結合上一篇文章提到的送花的例子。Buffer對應花,Channel對應A和B與花之間的聯繫,Selector就是不斷進行輪詢的線程。服務器
Channel分爲ServerSocketChannel和SocketChannel,是客戶端與服務端進行通訊的通道。socket
ServerSocketChannel用戶服務器端,職責就是監聽客戶端的鏈接請求。一旦經過容許,就會創建與該客戶端對應的SocketChannel。一個服務端的一個端口只能創建一個ServerSocketChannel用來監聽鏈接。ide
SocketChannel具備惟一性。一個客戶端可能連接多個服務端,那就是多個SocketChannel。服務端與多個客戶端創建的鏈接就有多個SocketChannel。學習
Selector是用來負責阻塞輪詢的線程,能夠經過其靜態方法Seletor.open()建立。服務端建立後經過Channel的register方法註冊到ServerSocketChannel上,等待客戶端鏈接。客戶端一樣建立Seletor後經過Channel的register方法註冊到SocketChannel上。ui
當客戶端的SocketChannel指定服務端的port和ip進行connect請求以後,服務端的Selector就能夠檢測到客戶端的connect請求。而後服務端accept表示繼續監聽下一個請求,同時能夠繼續在與客戶端創建了SocketChannel上監聽讀寫請求。客戶端同理。this
Selector的做用就是監聽SelectionKey.OP_ACCEPT(服務端專屬)、SelectionKey.OP_CONNECT(客戶端專屬)、SelectionKey.OP_READ、SelectionKey.OP_Write四種註冊的請求,一旦有請求被容許,就會調用相關的方法進行處理。.net
Buffer是用於在Channel中傳遞的數據。Buffer裏有4個屬性,來表示數據在Buffer中的存取狀況:線程
這4個屬性的大小關係是:mark<=position<=limit<=capacitycode
接下來經過一個客戶端與服務端通訊的例子,來學習使用NIO。客戶端每隔1秒向服務端發送請求,服務端響應並返回數據。
服務端:
package cn.testNio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; /** * @Description : TODO * @Author : houshuiqiang@163.com, 2017年10月2日 下午2:58:31 * @Modified :houshuiqiang@163.com, 2017年10月2日 */ public class NioDemoServer { public static void main(String[] args) { NioServer nioServer = new NioServer(8181); new Thread(nioServer, "nio-server-test").start(); } } class NioServer implements Runnable { private Selector selector; private ServerSocketChannel serverSocketChannel; private volatile boolean stop; public NioServer(int port){ stop = false; try { selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(port)); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); } catch (IOException e) { e.printStackTrace(); System.exit(1); } } public void stop(){ this.stop = true; } @Override public void run(){ while (!stop){ try { selector.select(); // 阻塞等待 Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectedKeys.iterator(); while(iterator.hasNext()){ SelectionKey selectionKey = iterator.next(); iterator.remove(); try{ handleKey(selectionKey); // 可能發生客戶端失聯的錯誤 }catch(IOException e){ e.printStackTrace(); if (selectionKey != null) { // 將發生異常的客戶端關閉,不然會一直被selector輪詢到 selectionKey.cancel(); if (selectionKey.channel() != null) { selectionKey.channel().close(); } } } } } catch (IOException e) { e.printStackTrace(); } } if (selector != null) { try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } } private void handleKey(SelectionKey selectionKey) throws IOException { if (selectionKey.isValid()) { if (selectionKey.isAcceptable()) { ServerSocketChannel ssc = (ServerSocketChannel)selectionKey.channel(); SocketChannel socketChannel = ssc.accept(); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); } if (selectionKey.isReadable()) { SocketChannel socketChannel = (SocketChannel)selectionKey.channel(); String body = getBodyFromSocketChannel(socketChannel); if (null == body) { // 斷開鏈路 selectionKey.cancel(); selectionKey.channel().close(); }else if ("".equals(body)) { // 心跳檢測 ,忽略 }else{ String resultBody = handleBody(socketChannel, body); write2Client(socketChannel, resultBody); } } } } private String getBodyFromSocketChannel(SocketChannel socketChannel) throws IOException{ ByteBuffer byteBuffer = ByteBuffer.allocate(1024); int byteBufferSize = socketChannel.read(byteBuffer); if (byteBufferSize == 0) { // 心跳檢測,忽略 return ""; }else if (byteBufferSize > 0) { byteBuffer.flip(); byte[] array = new byte[byteBuffer.remaining()]; byteBuffer.get(array); return new String(array); }else{ return null; } } private String handleBody(SocketChannel socketChannel, String body) { String hostAddress = socketChannel.socket().getInetAddress().getHostAddress(); System.out.println("message from client : " + hostAddress + ", content: " + body); // 模擬請求處理 return "server received message: " + body; // 模擬返回處理結果 } private void write2Client(SocketChannel socketChannel, String resultBody) throws IOException { ByteBuffer byteBuffer = ByteBuffer.allocate(1024); // 真實場景每每比1024要大 byteBuffer.put(resultBody.getBytes()); byteBuffer.flip(); socketChannel.register(selector, SelectionKey.OP_READ); socketChannel.write(byteBuffer); } }
客戶端:
package cn.testNio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; /** * @Description : TODO * @Author : houshuiqiang@163.com, 2017年10月2日 下午5:58:49 * @Modified :houshuiqiang@163.com, 2017年10月2日 */ public class NioDemoClient { public static void main(String[] args) throws InterruptedException { NioClient nioClient = new NioClient("192.168.10.47", 8181); new Thread(nioClient, "nio-client-test").start(); for (int i = 0; i < 10; i++) { nioClient.getQueue().offer("time" + i); Thread.sleep(1000); } nioClient.stop(); } } class NioClient implements Runnable { private Selector selector; private SocketChannel socketChannel; private String address; private int port; private volatile boolean stop; private LinkedBlockingQueue<String> queue; public NioClient(String address, int port){ this.address = address; this.port = port; this.stop = false; queue = new LinkedBlockingQueue<String>(); try { selector = Selector.open(); socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); } catch (IOException e) { e.printStackTrace(); System.exit(1); } } public BlockingQueue<String> getQueue(){ return queue; } public void stop(){ this.stop = true; } @Override public void run(){ doConnect(); while (!stop) { try { selector.select(); // 阻塞等待 Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectedKeys.iterator(); while(iterator.hasNext()){ SelectionKey selectionKey = iterator.next(); iterator.remove(); try{ handleKey(selectionKey); }catch(IOException e){ e.printStackTrace(); if (selectionKey != null) { selectionKey.cancel(); if (selectionKey.channel() != null) { selectionKey.channel().close(); } } }catch(InterruptedException e){ e.printStackTrace(); break; } } } catch (IOException e) { e.printStackTrace(); } } try { socketChannel.close(); // 優雅關閉連接 selector.close(); // 直接selector.close()會關閉全部該seletor上的全部channel,可是服務器會接收到客戶端強制關閉的錯誤信息。 } catch (IOException e) { e.printStackTrace(); } } private void doConnect() { try { socketChannel.register(selector, SelectionKey.OP_CONNECT); socketChannel.connect(new InetSocketAddress(address, port)); } catch (IOException e) { e.printStackTrace(); System.exit(1); } } private void handleKey(SelectionKey selectionKey) throws IOException, InterruptedException { if (selectionKey.isValid()) { SocketChannel socketChannel = (SocketChannel)selectionKey.channel(); if (selectionKey.isConnectable()) { if (socketChannel.finishConnect()) { socketChannel.register(selector, SelectionKey.OP_WRITE); } } if (selectionKey.isReadable()) { String resultBody = getBodyFromSocketChannel(socketChannel); if (null == resultBody) { // 斷開鏈路 selectionKey.cancel(); selectionKey.channel().close(); }else if ("".equals(resultBody)) { // 心跳檢測 ,忽略 }else{ System.out.println("received result : " + resultBody); socketChannel.register(selector, SelectionKey.OP_WRITE); } } if (selectionKey.isWritable()) { sendRequest(socketChannel); } } } private void sendRequest(SocketChannel socketChannel) throws IOException, InterruptedException { String requestBody = queue.poll(100, TimeUnit.MILLISECONDS); if (null != requestBody) { ByteBuffer byteBuffer = ByteBuffer.allocate(1024); byteBuffer.put(requestBody.getBytes()); byteBuffer.flip(); socketChannel.write(byteBuffer); socketChannel.register(selector, SelectionKey.OP_READ); if (! byteBuffer.hasRemaining()) { System.out.println("send request to server : " + requestBody); } }else { socketChannel.register(selector, SelectionKey.OP_WRITE); } } private String getBodyFromSocketChannel(SocketChannel socketChannel) throws IOException { ByteBuffer byteBuffer = ByteBuffer.allocate(1024); int byteBufferSize = socketChannel.read(byteBuffer); if (byteBufferSize == 0) { // 心跳檢測,忽略 return ""; }else if (byteBufferSize > 0) { byteBuffer.flip(); byte[] array = new byte[byteBuffer.remaining()]; byteBuffer.get(array); return new String(array); }else{ return null; } } }