【NIO】NIO版本鴻儒聊天室

# 需求

  • 基於NIO實現
  • 支持同時多個客戶端接入
  • 支持客戶端發送文本消息到服務器
  • 支持客戶端自定義羣聊名稱
  • 接收到客戶端發送的消息以後,服務器須要將消息轉發給目前在線的全部其餘客戶端
  • 支持客戶端退出羣聊
  • 服務端中止服務後,客戶端自動斷開鏈接

# 技術介紹

  • Non-blockingI/O 編程模型
  • Channel 通道
    • ServerSocketChannel 服務端通道
    • SocketChannel 客戶端通道
  • ByteBuffer NIO中使用的讀寫緩衝區
  • Selector 多路複用器
    • channel註冊在多路複用器上,並監聽相應的事件
  • 多線程
  • 線程池

# 代碼

舒適提示:注意看代碼註釋喲~ 跟上節奏,很簡單😼java

  • 服務器

/**
 * 基於NIO實現的聊天室服務端
 *
 * @author futao
 * @date 2020/7/8
 */

@Slf4j
public class NioChatServer {

    /**
     * 用於處理通道上的事件的線程池(可選的)
     */

    private static final ExecutorService THREAD_POOL = Executors.newFixedThreadPool(10);

    /**
     * 啓動聊天室
     */

    public void start() {
        try {
            //服務端Socket通道
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            //將通道設置成非阻塞
            serverSocketChannel.configureBlocking(false);
            //綁定主機與監聽端口
            serverSocketChannel.bind(new InetSocketAddress("localhost", Constants.SERVER_PORT));

            //多路複用器
            Selector selector = Selector.open();

            //將服務端通道註冊到多路複用器上,並設置監聽事件接入事件
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

            log.debug("{} 基於NIO的聊天室在[{}]端口啓動成功 {}", StringUtils.repeat("="30), Constants.SERVER_PORT, StringUtils.repeat("="30));

            while (true) {
                // 觸發了事件的通道數量,該方法會阻塞
                int eventCountTriggered = selector.select();
                if (eventCountTriggered <= 0) {
                    continue;
                }
                // 獲取到全部觸發的事件
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                // 遍歷事件進行處理
                for (SelectionKey selectionKey : selectionKeys) {
                    // 處理事件
                    selectionKeyHandler(selectionKey, selector);
                }
                // 清除事件記錄
                selectionKeys.clear();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 事件處理器
     *
     * @param selectionKey 觸發的事件信息
     * @param selector     多路複用器
     */

    private void selectionKeyHandler(SelectionKey selectionKey, Selector selector) {
        if (selectionKey.isAcceptable()) {
            //若是觸發的是SocketChannel接入事件
            try {
                // ServerSocketChannel上觸發的客戶端SocketChannel接入
                SocketChannel socketChannel = ((ServerSocketChannel) selectionKey.channel()).accept();
                log.debug("客戶端[{}]成功接入聊天服務器", socketChannel.socket().getPort());
                // 將客戶端SocketChannel通道設置成非阻塞
                socketChannel.configureBlocking(false);
                // 將客戶端通道註冊到多路複用器,並監聽這個通道上發生的可讀事件
                socketChannel.register(selector, SelectionKey.OP_READ);
            } catch (IOException e) {
                e.printStackTrace();
            }
        } else if (selectionKey.isReadable()) {
            // 觸發的是可讀事件
            // 獲取到可讀事件的客戶端通道
            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
            //建立緩衝區
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 4);
            try {
                // 讀取通道上的數據寫入緩衝區(返回0或者-1說明讀到了末尾)
                while (socketChannel.read(byteBuffer) > 0) {
                }
                //切換爲讀模式
                byteBuffer.flip();
                // 接收到的消息
                String message = String.valueOf(Constants.CHARSET.decode(byteBuffer));
                log.info("接收到來自客戶端[{}]的數據:[{}]", socketChannel.socket().getPort(), message);
                // 是否退出
                quit(message, selector, selectionKey);
                // 消息轉發
                forwardMessage(message, selector, selectionKey);
                // 清除緩衝區的數據
                byteBuffer.clear();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 客戶端退出
     *
     * @param message      消息
     * @param selector     多路複用器
     * @param selectionKey 觸發的selectionKey
     */

    public void quit(String message, Selector selector, SelectionKey selectionKey) {
        if (StringUtils.isBlank(message) || Constants.KEY_WORD_QUIT.equals(message)) {
            int port = ((SocketChannel) selectionKey.channel()).socket().getPort();
            // 客戶端下線
            selectionKey.cancel();
            log.debug("客戶端[{}]下線", port);
            // 由於發生了監聽事件和channel的變動,因此須要通知selector從新整理selector所監聽的事件
            selector.wakeup();
        }
    }

    /**
     * 轉發消息
     *
     * @param message         須要轉發的消息
     * @param selector        多路複用器
     * @param curSelectionKey 當前觸發的selectionKey
     */

    public void forwardMessage(String message, Selector selector, SelectionKey curSelectionKey) {
        // 建立緩衝區
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 4);
        // 數據寫入緩衝區
        byteBuffer.put(message.getBytes(Constants.CHARSET));

        // 切換爲讀模式
        byteBuffer.flip();
        // 在首尾進行標記,由於須要給每一個客戶端發送一樣的數據,須要重複讀取
        byteBuffer.mark();
        // 當前註冊在多路複用器上的SelectionKey集合
        Set<SelectionKey> keys = selector.keys();
        for (SelectionKey key : keys) {
            // 消息不能轉發給本身 and 只轉發給客戶端SocketChannel
            if (curSelectionKey.equals(key) || !(key.channel() instanceof SocketChannel)) {
                continue;
            }
            // 客戶端SocketChannel
            SocketChannel socketChannel = (SocketChannel) key.channel();
            // 若是緩衝區中還有數據就一直寫
            while (byteBuffer.hasRemaining()) {
                try {
                    // 數據寫入通道
                    socketChannel.write(byteBuffer);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            // 重置到上次mark的地方,即首位
            byteBuffer.reset();
        }
        // 清除緩衝區的數據
        byteBuffer.clear();
    }


    public static void main(String[] args) {
        new NioChatServer().start();
    }
}
  • 客戶端

/**
 * 基於NIO實現的羣聊客戶端
 *
 * @author futao
 * @date 2020/7/8
 */

@Getter
@Setter
@Slf4j
public class NioChatClient {

    /**
     * 用於處理用戶輸入數據的單個線程線程池,使用線程池是爲了便於關閉
     */

    private static final ExecutorService USER_INPUT_HANDLER = Executors.newSingleThreadExecutor();

    /**
     * 用戶名
     */

    private String userName;

    /**
     * 啓動客戶端
     */

    public void start() {
        try {
            // 建立客戶端通道
            SocketChannel socketChannel = SocketChannel.open();
            // 將通道設置爲非阻塞
            socketChannel.configureBlocking(false);

            // 建立多路複用器
            Selector selector = Selector.open();

            // 將客戶端通道註冊到多路複用器,並監聽可讀事件
            socketChannel.register(selector, SelectionKey.OP_CONNECT);

            // 嘗試鏈接到聊天服務器
            socketChannel.connect(new InetSocketAddress("localhost", Constants.SERVER_PORT));

            while (true) {
                // 阻塞等待通道上的事件觸發。返回觸發的通道的數量
                int eventCountTriggered = selector.select();
                if (eventCountTriggered <= 0) {
                    continue;
                }
                // 獲取到全部觸發的事件
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                // 遍歷事件進行處理
                for (SelectionKey selectionKey : selectionKeys) {
                    // 處理事件
                    selectionKeyHandler(selectionKey, selector);
                }
                // 清除事件記錄
                selectionKeys.clear();
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClosedSelectorException e) {
            log.debug("成功退出聊天室...");
        }
    }

    /**
     * 處理器
     *
     * @param selectionKey 觸發的selectionKey
     * @param selector     多路複用器
     */

    private void selectionKeyHandler(SelectionKey selectionKey, Selector selector) {

        if (selectionKey.isConnectable()) {
            // 觸發的是成功接入服務器的事件
            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
            try {
                // 判斷此通道上的鏈接操做是否正在進行中
                if (socketChannel.isConnectionPending()) {
                    // 完成鏈接套接字通道的過程
                    socketChannel.finishConnect();
                    log.debug("成功接入聊天服務器");

                    // 將通道設置成非阻塞
                    socketChannel.configureBlocking(false);
                    // 將通道註冊到多路複用器,並監聽可讀事件
                    socketChannel.register(selector, SelectionKey.OP_READ);

                    // 建立緩衝區,用於處理將用戶輸入的數據寫入通道
                    ByteBuffer byteBuffer = ByteBuffer.allocate(4 * 1024);
                    // 在新線程中處理用戶輸入
                    USER_INPUT_HANDLER.execute(() -> {
                        while (!Thread.currentThread().isInterrupted()) {
                            //先清空緩衝區中的數據
                            byteBuffer.clear();
                            // 獲取用戶輸入的文本
                            String message = new Scanner(System.in).nextLine();
                            // 將數據寫入緩衝區
                            byteBuffer.put(String.format("【%s】: %s", userName, message).getBytes(Constants.CHARSET));
                            // 將緩衝區設置爲讀模式
                            byteBuffer.flip();
                            try {
                                // 當緩衝區中還有數據
                                while (byteBuffer.hasRemaining()) {
                                    // 將數據寫入通道
                                    socketChannel.write(byteBuffer);
                                }
                            } catch (IOException e) {
                                e.printStackTrace();
                            }

                            // 判斷是否退出羣聊
                            if (quit(message, selector, selectionKey)) {
                                // 跳出循環,結束線程
                                break;
                            }
                        }
                        try {
                            // 關閉多路複用器
                            selector.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                        // 關閉線程池
                        USER_INPUT_HANDLER.shutdown();
                    });
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        } else if (selectionKey.isReadable()) {
            // 觸發的是可讀事件
            // 獲取到可讀事件的通道
            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
            //建立緩衝區
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 4);
            try {
                // 將通道上的數據寫入緩衝區(返回0或者-1說明讀到了末尾)
                while (socketChannel.read(byteBuffer) > 0) {
                }
                // 切換成讀模式
                byteBuffer.flip();
                String message = String.valueOf(Constants.CHARSET.decode(byteBuffer));
                byteBuffer.clear();
                log.info("接收到數據:[{}]", message);
                if (StringUtils.isBlank(message)) {
                    log.debug("服務器拉胯,下車...");
                    selector.close();
                    USER_INPUT_HANDLER.shutdownNow();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 退出羣聊
     *
     * @param message      消息
     * @param selector     多路複用器
     * @param selectionKey 觸發的selectionKey
     * @return 是否退出
     */

    public boolean quit(String message, Selector selector, SelectionKey selectionKey) {
        if (Constants.KEY_WORD_QUIT.equals(message)) {
            selectionKey.cancel();
            selector.wakeup();
            return true;
        }
        return false;
    }


    public static void main(String[] args) {
        NioChatClient nioChatClient = new NioChatClient();
        nioChatClient.setUserName("小9");
        nioChatClient.start();
    }
}

# 測試

  • 接入
image.png
  • 客戶端發送消息
image.png
  • 消息轉發

  • 客戶端下線

  • 服務器宕機
image.png

# 源代碼

* https://github.com/FutaoSmile/learn-IO/tree/master/practice/src/main/java/com/futao/practice/chatroom/nio

# 系列文章

【BIO】基於BIO實現簡單動態HTTP服務器git


【BIO】經過指定消息大小實現的多人聊天室-終極版本github


BIO在聊天室項目中的演化web


歡迎在評論區留下你看文章時的思考,及時說出,有助於加深記憶和理解,還能和像你同樣也喜歡這個話題的讀者相遇~編程

本文分享自微信公衆號 - 喜歡天文(AllUnderControl)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。服務器

相關文章
相關標籤/搜索