yls 2020/5/23java
/** * 測試 nio 中 buffer的使用 * buffer 內部就是一個數組 : final int[] hb; 、 * * buffer 定義了 position,capacity,limit,mark四個屬性來標記buffer中的數據信息 * 能夠經過debug的方式跟蹤查看值的變化 * private int mark = -1; 標記 * private int position = 0; 下一個要被讀或寫的元素的索引,每次讀寫都會改變其值,爲下次讀寫作準備 * private int limit; 表示緩衝區的當前終點,不能對超過limit限制的緩衝區別進行讀寫,limit能夠修改 * private int capacity; 容量,建立buffer時設置,不能被改變 * * buffer 讀寫切換時須要執行 flip()方法 * public final Buffer flip() { * limit = position; * position = 0; * mark = -1; * return this; * } * buffer清除數據時調用 clear()方法,只改變標記的指向位置,不真正刪除底層數組的值 * public final Buffer clear() { * position = 0; * limit = capacity; * mark = -1; * return this; * } */ public class BasicBuffer { public static void main(String[] args) { final IntBuffer intBuffer = IntBuffer.allocate(5); for (int i = 0; i <intBuffer.capacity() ; i++) { intBuffer.put(i*2); } intBuffer.flip(); for (int i = 0; i < intBuffer.capacity(); i++) { System.out.println(intBuffer.get()); } } }
/** * 同步 非阻塞 * FileChannel.transferTo實現了零拷貝,效率高,三十在windows中一次只能傳8M,因此大文件須要斷點續傳 */ public class NioServer { public static void main(String[] args) throws IOException { //建立一個ServerSocketChannel final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //綁定一個端口,在服務端監聽 serverSocketChannel.socket().bind(new InetSocketAddress(7777)); //設置爲非阻塞 serverSocketChannel.configureBlocking(false); //獲得一個selector對象 Selector selector = Selector.open(); //serverSocketChannel註冊到selector,關心 事件 OP_ACCEPT serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); //循環等待客戶端鏈接 while (true) { //等待一秒,若沒有事件發生,返回 if (selector.select(1000) == 0) { System.out.println("服務器等待了1秒,沒有請求鏈接。。。。"); continue; } //若返回的值>0,說明已經獲取到相關的事件,則獲取到相關的selectionKeys集合 Set<SelectionKey> selectionKeys = selector.selectedKeys(); //使用迭代器遍歷 Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { //獲取到相應的key SelectionKey key = iterator.next(); //根據key對應的通道發生的事件作出處理 if (key.isAcceptable()) {//若是是 isAcceptable,有新的鏈接 //這裏的key對應的channel必定是serverSocketChannel //爲該客戶端生成一個socketChannel SocketChannel socketChannel = serverSocketChannel.accept(); //將新生成的socketChannel設置爲非阻塞模式,不然會拋出異常 socketChannel.configureBlocking(false); //將socketChannel註冊到selector,關心事件爲OP_READ,並關聯一個buffer socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(128)); } if (key.isReadable()) { //發生 isReadable 事件,表示有新數據發送過來 //根據key反向獲取到相應的channel SocketChannel channel = (SocketChannel) key.channel(); //獲取到channel關聯的buffer ByteBuffer byteBuffer = (ByteBuffer) key.attachment(); //先將buffer置於初始狀態 byteBuffer.clear(); //將channel中的數據讀到buffer中 channel.read(byteBuffer); //buffer讀寫切換 byteBuffer.flip(); //從buffer中讀有效數據到bytes //byteBuffer.array()直接返回buffer底層數組,若是後面發送的數據比以前發送的少,會將以前獲取的值獲取出來 byte[] bytes = new byte[byteBuffer.limit()]; byteBuffer.get(bytes); System.out.println("from 客戶端: " + new String(bytes)); } //手動從集合中移除當前的key,防止重複操做 iterator.remove(); } } } }
public class NioClient { public static void main(String[] args) throws IOException { //獲取一個 socketChannel final SocketChannel socketChannel = SocketChannel.open(); //設置 socketChannel 非阻塞 socketChannel.configureBlocking(false); //提供服務端的ip和端口,鏈接服務端,不阻塞 //經過 socketChannel.finishConnect() 判斷是否鏈接成功 boolean connect = socketChannel.connect(new InetSocketAddress("127.0.0.1", 6999)); if (!connect) { while (!socketChannel.finishConnect()) { System.out.println("由於鏈接須要時間,客戶端不會阻塞,能夠作其它事情。。。"); } } //鏈接成功後。。。。 System.out.println("1..."); String s = "大忽忽"; // Wraps a byte array into a buffer ByteBuffer byteBuffer = ByteBuffer.wrap(s.getBytes()); //發送數據 final int write = socketChannel.write(byteBuffer); System.out.println("2...."); System.in.read(); } }
/** * scattering: 分散,把數據寫入buffer時,能夠採用buffer數組,依次寫 * gathering: 聚合,從buffer中讀取數據時,能夠採用buffer數組,依次讀 */ public class ScatteringAndGathering { public static void main(String[] args) throws IOException { //綁定端口到socket,並啓動 final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(7000)); //建立buffer數組 final ByteBuffer[] byteBuffers = new ByteBuffer[2]; byteBuffers[0] = ByteBuffer.allocate(4); byteBuffers[1] = ByteBuffer.allocate(4); //等待客戶端鏈接,鏈接成功後生成SocketChannel final SocketChannel socketChannel = serverSocketChannel.accept(); //循環讀取 while (true) { //從channel讀取數據到buffer數組 long read = socketChannel.read(byteBuffers); System.out.println("read=========" + read); if (read == 0 || read == -1) { break; } Arrays.asList(byteBuffers).forEach(buffer -> { System.out.println("position=" + buffer.position() + ", limit=" + buffer.limit()); }); //讀寫切換 Arrays.asList(byteBuffers).forEach(buffer -> { buffer.flip(); }); //將buffer數組中的數據寫入channel,顯示到客戶端 final long write = socketChannel.write(byteBuffers); //將每一個buffer置於初始狀態 Arrays.asList(byteBuffers).forEach(buffer -> { buffer.clear(); }); } } }
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Iterator; import java.util.Set; /** * 簡單的羣聊系統(NIO 實現) * 服務端:能夠監聽客戶端的上線和離線,能夠接收客戶端發送的數據並轉發到其它客戶端 * 客戶端:能夠不阻塞的發送數據和接收其它客戶端發送的數據 */ public class GroupChatServer { private ServerSocketChannel serverSocketChannel; private Selector selector; private static final int port = 7999; public GroupChatServer() throws IOException { serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(port)); selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); } //監聽 public void listen() { try { while (true) { final int select = selector.select(2000); if (select > 0) {//有事件發生 Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { //獲取key SelectionKey key = iterator.next(); //根據key對應的通道發生的事件作出處理 if (key.isAcceptable()) {//若是是 isAcceptable,有新的鏈接 //這裏的key對應的channel必定是serverSocketChannel //爲該客戶端生成一個socketChannel // SocketChannel socketChannel = serverSocketChannel.accept(); SocketChannel socketChannel = ((ServerSocketChannel) key.channel()).accept(); //將新生成的socketChannel設置爲非阻塞模式,不然會拋出異常 socketChannel.configureBlocking(false); //將socketChannel註冊到selector,關心事件爲OP_READ,並關聯一個buffer socketChannel.register(selector, SelectionKey.OP_READ); //提示 System.out.println(socketChannel.getRemoteAddress() + ",上線了"); } if (key.isReadable()) { //發生 isReadable 事件,表示有新數據發送過來 //專門寫方法,處理讀操做 readData(key); } //手動從集合中移除當前的key,防止重複操做 iterator.remove(); } } else { System.out.println("等待。。。"); } } } catch (Exception e) { e.printStackTrace(); } } private void readData(SelectionKey key) { SocketChannel channel = null; try { //根據key反向獲取到相應的channel channel = (SocketChannel) key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(128); //將channel中的數據讀到buffer中 final int read = channel.read(byteBuffer); String msg = new String(byteBuffer.array(), 0, read); System.out.println("from 客戶端: " + msg); //轉發消息到其它客戶端(除了本身),專門寫一個方法 sendMsgToOthers(msg, channel); } catch (Exception e) { e.printStackTrace(); try { System.out.println(channel.getRemoteAddress() + "離線了。。"); //關閉通道 channel.close(); } catch (IOException ex) { ex.printStackTrace(); } } } private void sendMsgToOthers(String msg, SocketChannel self) { final Set<SelectionKey> keys = selector.keys(); keys.forEach((key) -> { SelectableChannel channel = key.channel(); if (channel instanceof SocketChannel && channel != self) { SocketChannel socketChannel=(SocketChannel)channel; final ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes()); try { socketChannel.write(buffer); } catch (IOException e) { e.printStackTrace(); } } }); } public static void main(String[] args) throws IOException { final GroupChatServer groupChatServer = new GroupChatServer(); groupChatServer.listen(); } }
public class GroupChatClient { private String ip = "127.0.0.1"; private int port = 7999; private Selector selector; private SocketChannel socketChannel; private String name; public GroupChatClient() throws IOException { socketChannel = SocketChannel.open(new InetSocketAddress(ip, port)); socketChannel.configureBlocking(false); selector = Selector.open(); socketChannel.register(selector, SelectionKey.OP_READ); name = socketChannel.getLocalAddress().toString().substring(1); System.out.println(name + " is start .."); } //發送消息 public void sendData(String msg) throws IOException { msg = name + " 說:" + msg; socketChannel.write(ByteBuffer.wrap(msg.getBytes())); } //接收消息 public void rec() { try { while (true) { final int select = selector.select(); if (select > 0) { final Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { final SelectionKey next = iterator.next(); if (next.isReadable()) { SocketChannel channel = (SocketChannel) next.channel(); final ByteBuffer allocate = ByteBuffer.allocate(128); final int read = channel.read(allocate); System.out.println(new String(allocate.array(), 0, read)); } iterator.remove(); } } } } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws IOException { final GroupChatClient groupChatClient = new GroupChatClient(); //一個線程專門接收數據 new Thread(() -> { groupChatClient.rec(); }, "接收數據線程").start(); //主線程用來發送數據 final Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { final String s = scanner.nextLine(); groupChatClient.sendData(s); } } }