NIO 的基本使用

yls 2020/5/23java

NIO中buffer的使用

/**
 * 測試 nio 中 buffer的使用
 * buffer 內部就是一個數組 : final int[] hb;  、
 *
 * buffer 定義了 position,capacity,limit,mark四個屬性來標記buffer中的數據信息
 * 能夠經過debug的方式跟蹤查看值的變化
 *     private int mark = -1;    標記
 *     private int position = 0;  下一個要被讀或寫的元素的索引,每次讀寫都會改變其值,爲下次讀寫作準備
 *     private int limit;   表示緩衝區的當前終點,不能對超過limit限制的緩衝區別進行讀寫,limit能夠修改
 *     private int capacity;   容量,建立buffer時設置,不能被改變
 *
 * buffer  讀寫切換時須要執行 flip()方法
 *      public final Buffer flip() {
 *         limit = position;
 *         position = 0;
 *         mark = -1;
 *         return this;
 *     }
 * buffer清除數據時調用 clear()方法,只改變標記的指向位置,不真正刪除底層數組的值
 *     public final Buffer clear() {
 *         position = 0;
 *         limit = capacity;
 *         mark = -1;
 *         return this;
 *     }
 */
public class BasicBuffer {
    public static void main(String[] args) {
        final IntBuffer intBuffer = IntBuffer.allocate(5);
      for (int i = 0; i <intBuffer.capacity() ; i++) {
            intBuffer.put(i*2);
        }
        intBuffer.flip();
        for (int i = 0; i < intBuffer.capacity(); i++) {
            System.out.println(intBuffer.get());
        }
    }
}

NIO中Socket通訊實例

1.服務端
/**
 * 同步 非阻塞
 * FileChannel.transferTo實現了零拷貝,效率高,三十在windows中一次只能傳8M,因此大文件須要斷點續傳
 */
public class NioServer {
    public static void main(String[] args) throws IOException {
        //建立一個ServerSocketChannel
        final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        //綁定一個端口,在服務端監聽
        serverSocketChannel.socket().bind(new InetSocketAddress(7777));
        //設置爲非阻塞
        serverSocketChannel.configureBlocking(false);
        //獲得一個selector對象
        Selector selector = Selector.open();
        //serverSocketChannel註冊到selector,關心 事件 OP_ACCEPT
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        //循環等待客戶端鏈接
        while (true) {
            //等待一秒,若沒有事件發生,返回
            if (selector.select(1000) == 0) {
                System.out.println("服務器等待了1秒,沒有請求鏈接。。。。");
                continue;
            }
            //若返回的值>0,說明已經獲取到相關的事件,則獲取到相關的selectionKeys集合
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            //使用迭代器遍歷
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()) {
                //獲取到相應的key
                SelectionKey key = iterator.next();
                //根據key對應的通道發生的事件作出處理
                if (key.isAcceptable()) {//若是是 isAcceptable,有新的鏈接
                    //這裏的key對應的channel必定是serverSocketChannel
                    //爲該客戶端生成一個socketChannel
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    //將新生成的socketChannel設置爲非阻塞模式,不然會拋出異常
                    socketChannel.configureBlocking(false);
                    //將socketChannel註冊到selector,關心事件爲OP_READ,並關聯一個buffer
                    socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(128));
                }
                if (key.isReadable()) { //發生 isReadable 事件,表示有新數據發送過來
                    //根據key反向獲取到相應的channel
                    SocketChannel channel = (SocketChannel) key.channel();
                    //獲取到channel關聯的buffer
                    ByteBuffer byteBuffer = (ByteBuffer) key.attachment();
                    //先將buffer置於初始狀態
                    byteBuffer.clear();
                    //將channel中的數據讀到buffer中
                    channel.read(byteBuffer);
                    //buffer讀寫切換
                    byteBuffer.flip();
                    //從buffer中讀有效數據到bytes
                    //byteBuffer.array()直接返回buffer底層數組,若是後面發送的數據比以前發送的少,會將以前獲取的值獲取出來
                    byte[] bytes = new byte[byteBuffer.limit()];
                    byteBuffer.get(bytes);
                    System.out.println("from 客戶端: " + new String(bytes));
                }

                //手動從集合中移除當前的key,防止重複操做
                iterator.remove();
            }
        }
    }
}
2.客戶端
public class NioClient {
    public static void main(String[] args) throws IOException {
        //獲取一個 socketChannel
        final SocketChannel socketChannel = SocketChannel.open();
        //設置 socketChannel 非阻塞
        socketChannel.configureBlocking(false);
        //提供服務端的ip和端口,鏈接服務端,不阻塞
        //經過 socketChannel.finishConnect() 判斷是否鏈接成功
        boolean connect = socketChannel.connect(new InetSocketAddress("127.0.0.1", 6999));
        if (!connect) {
            while (!socketChannel.finishConnect()) {
                System.out.println("由於鏈接須要時間,客戶端不會阻塞,能夠作其它事情。。。");
            }
        }
        //鏈接成功後。。。。
        System.out.println("1...");
        String s = "大忽忽";
        // Wraps a byte array into a buffer
        ByteBuffer byteBuffer = ByteBuffer.wrap(s.getBytes());
        //發送數據
        final int write = socketChannel.write(byteBuffer);
        System.out.println("2....");
        System.in.read();
    }
}

NIO中Scattering、Gathering的使用

1.服務端
/**
 * scattering: 分散,把數據寫入buffer時,能夠採用buffer數組,依次寫
 * gathering:  聚合,從buffer中讀取數據時,能夠採用buffer數組,依次讀
 */
public class ScatteringAndGathering {
    public static void main(String[] args) throws IOException {

        //綁定端口到socket,並啓動
        final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.socket().bind(new InetSocketAddress(7000));
        //建立buffer數組
        final ByteBuffer[] byteBuffers = new ByteBuffer[2];
        byteBuffers[0] = ByteBuffer.allocate(4);
        byteBuffers[1] = ByteBuffer.allocate(4);
        //等待客戶端鏈接,鏈接成功後生成SocketChannel
        final SocketChannel socketChannel = serverSocketChannel.accept();
        //循環讀取
        while (true) {
            //從channel讀取數據到buffer數組
            long read = socketChannel.read(byteBuffers);

            System.out.println("read=========" + read);
            if (read == 0 || read == -1) {
                break;
            }
            Arrays.asList(byteBuffers).forEach(buffer -> {
                System.out.println("position=" + buffer.position() + ", limit=" + buffer.limit());
            });

            //讀寫切換
            Arrays.asList(byteBuffers).forEach(buffer -> {
                buffer.flip();
            });

            //將buffer數組中的數據寫入channel,顯示到客戶端
            final long write = socketChannel.write(byteBuffers);

            //將每一個buffer置於初始狀態
            Arrays.asList(byteBuffers).forEach(buffer -> {
                buffer.clear();
            });
        }


    }
}
2.客戶端使用上邊 Socket通訊實例 中的就能夠(改一下端口號)

NIO 使用 Socket通訊實現一個羣發系統

1.服務端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;

/**
 * 簡單的羣聊系統(NIO 實現)
 * 服務端:能夠監聽客戶端的上線和離線,能夠接收客戶端發送的數據並轉發到其它客戶端
 * 客戶端:能夠不阻塞的發送數據和接收其它客戶端發送的數據
 */
public class GroupChatServer {
    private ServerSocketChannel serverSocketChannel;
    private Selector selector;
    private static final int port = 7999;

    public GroupChatServer() throws IOException {
        serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.socket().bind(new InetSocketAddress(port));
        selector = Selector.open();
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    }

    //監聽
    public void listen() {
        try {
            while (true) {
                final int select = selector.select(2000);
                if (select > 0) {//有事件發生
                    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                    while (iterator.hasNext()) {
                        //獲取key
                        SelectionKey key = iterator.next();
                        //根據key對應的通道發生的事件作出處理
                        if (key.isAcceptable()) {//若是是 isAcceptable,有新的鏈接
                            //這裏的key對應的channel必定是serverSocketChannel
                            //爲該客戶端生成一個socketChannel
//                            SocketChannel socketChannel = serverSocketChannel.accept();
                            SocketChannel socketChannel = ((ServerSocketChannel) key.channel()).accept();
                            //將新生成的socketChannel設置爲非阻塞模式,不然會拋出異常
                            socketChannel.configureBlocking(false);
                            //將socketChannel註冊到selector,關心事件爲OP_READ,並關聯一個buffer
                            socketChannel.register(selector, SelectionKey.OP_READ);
                            //提示
                            System.out.println(socketChannel.getRemoteAddress() + ",上線了");
                        }
                        if (key.isReadable()) { //發生 isReadable 事件,表示有新數據發送過來
                            //專門寫方法,處理讀操做
                            readData(key);
                        }

                        //手動從集合中移除當前的key,防止重複操做
                        iterator.remove();
                    }
                } else {
                    System.out.println("等待。。。");
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void readData(SelectionKey key) {

        SocketChannel channel = null;
        try {
            //根據key反向獲取到相應的channel
            channel = (SocketChannel) key.channel();
            ByteBuffer byteBuffer = ByteBuffer.allocate(128);
            //將channel中的數據讀到buffer中
            final int read = channel.read(byteBuffer);
            String msg = new String(byteBuffer.array(), 0, read);
            System.out.println("from 客戶端: " + msg);
            //轉發消息到其它客戶端(除了本身),專門寫一個方法
            sendMsgToOthers(msg, channel);

        } catch (Exception e) {
            e.printStackTrace();
            try {
                System.out.println(channel.getRemoteAddress() + "離線了。。");
                //關閉通道
                channel.close();
            } catch (IOException ex) {
                ex.printStackTrace();
            }

        }
    }

    private void sendMsgToOthers(String msg, SocketChannel self) {

        final Set<SelectionKey> keys = selector.keys();
        keys.forEach((key) -> {
             SelectableChannel channel = key.channel();
            if (channel instanceof SocketChannel && channel != self) {
                SocketChannel socketChannel=(SocketChannel)channel;
                final ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
                try {
                    socketChannel.write(buffer);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    public static void main(String[] args) throws IOException {
        final GroupChatServer groupChatServer = new GroupChatServer();
        groupChatServer.listen();
    }
}
2.客戶端
public class GroupChatClient {
    private String ip = "127.0.0.1";
    private int port = 7999;
    private Selector selector;
    private SocketChannel socketChannel;
    private String name;

    public GroupChatClient() throws IOException {
        socketChannel = SocketChannel.open(new InetSocketAddress(ip, port));
        socketChannel.configureBlocking(false);
        selector = Selector.open();
        socketChannel.register(selector, SelectionKey.OP_READ);
        name = socketChannel.getLocalAddress().toString().substring(1);
        System.out.println(name + " is start ..");
    }

    //發送消息
    public void sendData(String msg) throws IOException {
        msg = name + " 說:" + msg;
        socketChannel.write(ByteBuffer.wrap(msg.getBytes()));
    }

    //接收消息
    public void rec() {
        try {
            while (true) {
                final int select = selector.select();
                if (select > 0) {
                    final Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                    while (iterator.hasNext()) {
                        final SelectionKey next = iterator.next();
                        if (next.isReadable()) {
                            SocketChannel channel = (SocketChannel) next.channel();
                            final ByteBuffer allocate = ByteBuffer.allocate(128);
                            final int read = channel.read(allocate);
                            System.out.println(new String(allocate.array(), 0, read));
                        }
                        iterator.remove();
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws IOException {
        final GroupChatClient groupChatClient = new GroupChatClient();
        //一個線程專門接收數據
        new Thread(() -> {
            groupChatClient.rec();
        }, "接收數據線程").start();
        //主線程用來發送數據
        final Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            final String s = scanner.nextLine();
            groupChatClient.sendData(s);
        }
    }
}
相關文章
相關標籤/搜索