廢話很少說,直接上代碼。java
服務端dom
package net.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; /** * @Description: * @author:xuyaqi * @date:2018年6月6日 */ public class Server { private static final int BUF_SIZE = 1024; private static final int PORT = 1234; private ServerSocketChannel servSocketChannel = null; private Selector selector = null; public static void main(String[] args) { new Server().startServer(); } public void startServer() { try { servSocketChannel = ServerSocketChannel.open(); // 設置爲非阻塞 servSocketChannel.configureBlocking(false); // 綁定端口 servSocketChannel.socket().bind(new InetSocketAddress(PORT)); selector = Selector.open(); // 註冊監聽事件 servSocketChannel.register(selector, SelectionKey.OP_ACCEPT); listen(); } catch (IOException e) { e.printStackTrace(); } } private void listen() { while (true) { try { // selector.select(); Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); if (key.isValid() && key.isAcceptable()) { System.out.println("handleAccept"); handleAccept(key); } if (key.isValid() && key.isReadable()) { System.out.println("handleRead"); handleRead(key); } if (key.isValid() && key.isWritable()) { System.out.println("handleWrite"); handleWrite(key); } if (key.isValid() && key.isConnectable()) { System.out.println("isConnectable = true"); } iter.remove(); } } catch (IOException e) { e.printStackTrace(); } } } public static void handleAccept(SelectionKey key) throws IOException { ServerSocketChannel ssChannel = (ServerSocketChannel) key.channel(); SocketChannel sc = ssChannel.accept(); sc.configureBlocking(false); sc.register(key.selector(), SelectionKey.OP_READ,ByteBuffer.allocateDirect(BUF_SIZE)); } public static void handleRead(SelectionKey key) throws IOException { SocketChannel sc = (SocketChannel) key.channel(); try { ByteBuffer buf = (ByteBuffer) key.attachment(); ByteBuffer writeBuffer = ByteBuffer.allocate(1024); int bytesRead = sc.read(buf); if (bytesRead > 0) { buf.flip(); byte[] bytes = new byte[bytesRead]; buf.get(bytes, 0, bytesRead); String str = new String(bytes); System.out.println(str); buf.clear(); writeBuffer.put(bytes); writeBuffer.flip(); while (writeBuffer.hasRemaining()) { sc.write(writeBuffer); } writeBuffer.compact(); } else { System.out.println("關閉的鏈接"); key.cancel(); sc.close(); } } catch (Exception e) { e.printStackTrace(); key.cancel(); sc.close(); } } public static void handleWrite(SelectionKey key) throws IOException { ByteBuffer buf = (ByteBuffer) key.attachment(); buf.put("客戶端返回".getBytes()); buf.flip(); SocketChannel sc = (SocketChannel) key.channel(); while (buf.hasRemaining()) { sc.write(buf); } buf.compact(); } }
客戶端socket
/** * */ package net.nio; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; /** * @Description: * @author:xuyaqi * @date:2018年6月6日 */ public class Client implements Runnable{ private int i = -1; public Client() { } public Client(int i) { this.i = i; } public static void main(String[] args) throws UnsupportedEncodingException { for (int i = 0 ; i < 1; i ++) { new Thread(new Client(i)).start(); } } @Override public void run() { startClient(i); } public void startClient(int i) { SocketChannel socketChannel = null; Selector selector = null; ByteBuffer writeBuffer = ByteBuffer.allocate(1024); ByteBuffer readBuffer = ByteBuffer.allocate(1024); try { socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); socketChannel.connect(new InetSocketAddress("127.0.0.1", 1234)); selector = Selector.open(); socketChannel.register(selector, SelectionKey.OP_CONNECT); if (socketChannel.finishConnect()) { String info = "客戶端" + i + ":" + Math.random() * 1000; while (true) { System.out.println("-------------------------------------------------------------"); writeBuffer.clear(); writeBuffer.put(info.getBytes("utf-8")); writeBuffer.flip(); while (writeBuffer.hasRemaining()) { socketChannel.write(writeBuffer); } int bytesRead = socketChannel.read(readBuffer); if (bytesRead > 0) { readBuffer.flip(); byte[] bytes = new byte[bytesRead]; readBuffer.get(bytes, 0, bytesRead); String str = new String(bytes); System.out.println(str); readBuffer.clear(); } Thread.sleep(2000); // Thread.sleep(Integer.MAX_VALUE); } } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } finally { try { if (socketChannel != null) { socketChannel.close(); System.out.println("關閉socketChannel"); } } catch (IOException e) { e.printStackTrace(); } } } public void hook () { Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() {} })); } }