IO、NIO實現簡單聊天室,附帶問題解析

  本篇文章主要使用IO和NIO的形式來實現一個簡單的聊天室,而且說明IO方法存在的問題,而NIO又是如何解決的。java

  大概的框架爲,先提供思路和大概框架圖——代碼——問題及解決方式,這樣會容易看一點。服務器

1. IO寫法

1.1 思路框架

  下面編寫一個簡單的聊天室,大概須要的功能就是服務端維護一個聊天室,裏邊的客戶端發送消息以後服務將其消息轉發給其餘客戶端,達到一個聊天室的效果。多線程

  大體的思路:服務端區分職責,分紅兩部分,主線程負責接收鏈接並把鏈接放入到線程池中處理,維護一個線程池,全部對於socket的處理都交給線程池中的線程來處理。以下圖。架構

socket架構圖

  下面貼上demo代碼(代碼中有幾處爲了方便並無採用最規範的定義方式,如線程池的建立和Map初始化的時候未設置初始容量等)併發

  代碼分五個類,服務端(ChatServer,監聽做用,爲服務端主線程)、客戶端(ChatClient)、服務端處理器(ServerHandler,能夠理解爲線程池中要執行的事情)、客戶端處理器(ClientHandler,客戶端讀寫服務器消息的處理),工具類(SocketUtils,只有一個發送消息方法)。框架

1.2 demo代碼

服務端:socket

/**
 * 服務端啓動類
 * 主要負責監聽客戶端鏈接
 */
public class ChatServer {

    public static void main(String[] args) {
        ServerSocket serverSocket = null;
        /*----------爲了方便使用Executors建立線程-------------*/
        ExecutorService handlerThreadPool = Executors.newFixedThreadPool(100);
        try {
            serverSocket = new ServerSocket(8888);
            while (true) {
                System.out.println("-----------阻塞等待鏈接------------");
                Socket socket = serverSocket.accept();
                String key = socket.getInetAddress().getHostAddress() + ":" + socket.getPort();
                System.err.println(key + "已鏈接");
                // 主線程只接收,處理直接交給處理線程池
                handlerThreadPool.execute(new ServerHandler(socket));
            }
        } catch (IOException e) {
            e.printStackTrace();
            if (Objects.nonNull(serverSocket)) {
                try {
                    serverSocket.close();
                } catch (IOException ioException) {
                    ioException.printStackTrace();
                }
            }
        }
    }

}

服務端處理類:ide

/**
 * 服務端socket事件處理類
 * 負責處理對應socket中的讀寫操做
 */
public class ServerHandler implements Runnable {

    /**
     * 鏈接到服務端的全部鏈接 socket的地址端口->socket
     */
    private static final Map<String, Socket> socketMap = new ConcurrentHashMap<>();

    /**
     * 維護名稱和地址的map
     */
    private static final Map<String, String> nameMap = new ConcurrentHashMap<>();

    private Socket socket;

    /**
     * 每一個socket的標識,使用地址+端口構成
     */
    private String key;

    public ServerHandler() {
    }

    public ServerHandler(Socket socket) {
        this.socket = socket;
        this.key = socket.getInetAddress().getHostAddress() + ":" + socket.getPort();
    }

    @Override
    public void run() {
        Socket s = socket;
        // 根據消息執行不一樣操做
        InputStream inputStream;
        // debug查看數據用
        // Map<String, Socket> tmpMap = socketMap;
        try {
            inputStream = s.getInputStream();
            Scanner scanner = new Scanner(inputStream);
            while (true) {
                String line = scanner.nextLine();
                if (line.startsWith("register")) {
                    // 登記
                    String[] split = line.split(":");
                    String name = split[1];
                    String msg;
                    // 校驗是否存在
                    if (socketMap.containsKey(key)) {
                        msg = "請勿重複登記";
                        sendMsg(s, msg);
                        return;
                    }

                    if (nameMap.containsValue(name)) {
                        msg = "名稱已被登記,請換一個名稱";
                        sendMsg(s, msg);
                        return;
                    }

                    // 通知本身已鏈接
                    sendMsg(s, "已鏈接到服務器");

                    msg = name + "進入聊天室";
                    // 將消息轉發給其餘客戶端
                    sendMsgToClients(msg);

                    // 放入socket池
                    socketMap.put(key, s);
                    nameMap.put(key, name);
                    System.err.println(name + "已登記");
                } else if (line.trim().equalsIgnoreCase("end")) {
                    if (notPassRegisterValidate()) {
                        continue;
                    }

                    // 斷開鏈接
                    socketMap.remove(key);
                    String name = nameMap.get(key);
                    String msg = name + "離開聊天室";
                    System.err.println(msg);
                    // 將消息轉發給其餘客戶端
                    sendMsgToClients(msg);

                    msg = "已斷開鏈接";
                    // 發送給對應的鏈接斷開信息
                    sendMsg(s, msg);
                    inputStream.close();
                    break;
                } else {
                    if (notPassRegisterValidate()) {
                        continue;
                    }

                    // 正常通訊
                    String name = nameMap.get(key);
                    String msg = name + ":" + line;
                    // 將消息轉發給其餘客戶端
                    sendMsgToClients(msg);
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 是否已登陸校驗
     *
     * @return 是否已登陸
     */
    private boolean notPassRegisterValidate() {
        boolean hasRegister = nameMap.containsKey(key);
        if (hasRegister) {
            return false;
        }

        String msg = "您還未登陸,請先登陸";
        sendMsg(socket, msg);
        return true;
    }

    /**
     * 往鏈接發送消息
     *
     * @param socket 客戶端鏈接
     * @param msg    消息
     */
    private void sendMsg(Socket socket, String msg) {
        SocketUtils.sendMsg(socket, msg);
        if (socket.isClosed()) {
            socketMap.remove(key);
        }
    }

    /**
     * 發送給其餘客戶端信息
     *
     * @param msg 信息
     */
    private void sendMsgToClients(String msg) {
        for (Map.Entry<String, Socket> entry : socketMap.entrySet()) {
            if (this.key.equals(entry.getKey())) {
                continue;
            }

            sendMsg(entry.getValue(), msg);
        }
    }

}

工具類(一個發送消息的方法):函數

public class SocketUtils {

    private SocketUtils() {
    }

    public static void sendMsg(Socket socket, String msg) {
        Socket s = socket;
        OutputStream outputStream = null;
        msg += "\r\n";
        try {
            outputStream = s.getOutputStream();
            outputStream.write(msg.getBytes(StandardCharsets.UTF_8));
            outputStream.flush();
        } catch (IOException e) {
            System.err.println("發送消息失敗, 鏈接已斷開");
            try {
                if (Objects.nonNull(outputStream)) {
                    outputStream.close();
                }
                socket.close();
            } catch (IOException ioException) {
                ioException.printStackTrace();
            }

        }
    }

}

客戶端:高併發

/**
 * 客戶端讀和寫各自使用一個線程
 */
public class ChatClient {

    public static void main(String[] args) {
        Socket socket;
        ExecutorService clientHandlerPool = Executors.newFixedThreadPool(2);
        try {
            socket = new Socket("localhost", 8888);

            // 寫線程
            clientHandlerPool.execute(new ClientHandler(socket, 1));
            // 讀線程
            clientHandlerPool.execute(new ClientHandler(socket, 0));

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

客戶端處理器:

/**
 * 客戶端處理器
 * 根據type來區分是作讀工做仍是寫工做
 */
public class ClientHandler implements Runnable {

    private Socket socket;

    /**
     * 處理類型,0-讀、1-寫
     */
    private int type;

    public ClientHandler() {
        throw new IllegalArgumentException("不能使用沒有參數的構造函數");
    }

    public ClientHandler(Socket socket, int type) {
        this.socket = socket;
        this.type = type;
    }

    @Override
    public void run() {
        if (type == 1) {
            // 進行寫操做
            doWriteJob();
            return;
        }

        // 默認讀操做
        doReadJob();
    }

    /**
     * 讀操做
     */
    private void doReadJob() {
        Socket s = socket;
        InputStream inputStream;
        try {
            inputStream = s.getInputStream();
            Scanner scanner = new Scanner(inputStream);
            while (true) {
                String line = scanner.nextLine();
                if (null != line && !"".equals(line)) {
                    System.err.println(line);
                }
                // 若是已退出了,那麼關閉鏈接
                if ("已斷開鏈接".equals(line)) {
                    socket.close();
                    break;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
            try {
                socket.close();
            } catch (IOException ioException) {
                ioException.printStackTrace();
            }
        }
    }

    /**
     * 寫線程
     */
    private void doWriteJob() {
        Socket s = socket;
        try {
            Scanner scanner = new Scanner(System.in);
            while (true) {
                String output = scanner.nextLine();
                if (Objects.nonNull(output) && !"".equals(output)) {
                    SocketUtils.sendMsg(s, output);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
            System.err.println("錯誤發生了:" + e.getMessage());
        }
    }
}

結果:

IO結果圖

思考:當前這樣實現有什麼瓶頸,可能會出現什麼問題?

存在問題:

  1. 服務端使用accept阻塞接收線程,鏈接一個一個處理,在高併發下處理性能緩慢
  2. 沒有鏈接的時候線程一直處於阻塞狀態形成資源的浪費(若是使用多線程接收處理併發,那麼沒鏈接的時候形成多個線程的資源浪費)。

2. 使用NIO實現聊天室

2.1 總體思路

  那咱們來看下NIO是怎麼解決上方的問題的,首先上這個demo總體的架構圖。

NIO架構圖

  大概的邏輯爲

  1. 服務端將ServerSocketChannel註冊到Selector中,客戶端鏈接進來的時候事件觸發,將客戶端的鏈接註冊到selector中。
  2. 主線程負責selector的輪詢工做,發現有事件能夠處理就將其交給線程池
  3. 客戶端同理分紅兩個部分,寫操做和讀操做,每一個操做由一個線程單獨完成;可是若是讀操做處理使用while循環不斷輪詢等待接收的話,CPU會飆升,因此須要客戶端新建一個selector來解決這個問題,注意這個selector跟服務端不是同一個,沒有啥關係。

  代碼分類大體跟IO寫法同樣,分紅服務端、服務端處理器、客戶端、客戶端處理器,下面爲demo。

2.2 代碼

服務端:

public class ChatServer {

    private Selector selector;

    private ServerSocketChannel serverSocketChannel;

    private static final ExecutorService handlerPool = Executors.newFixedThreadPool(100);

    public ChatServer() throws IOException {
        this.selector = Selector.open();
        serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        ServerSocket serverSocket = serverSocketChannel.socket();
        serverSocket.bind(new InetSocketAddress(9999));
        // 將服務端的socket註冊到selector中,接收客戶端,並將其註冊到selector中,其自己也是selector中的一個I/O事件
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        System.err.println("聊天室服務端初始化結束");
    }

    /**
     * 啓動方法
     * 1.監聽,拿到以後進行處理
     */
    public void start() throws IOException {
        int count;
        while (true) {
            // 可能出現select方法沒阻塞,空輪詢致使死循環的狀況
            count = selector.select();

            if (count > 0) {
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    // 交給線程池處理
                    handlerPool.execute(new ServerHandler(key, selector));
                    // 處理完成後移除
                    iterator.remove();
                }
            }
        }
    }

    public static void main(String[] args) throws IOException {
        new ChatServer().start();
    }
}

服務端處理器:

public class ServerHandler implements Runnable {

    private SelectionKey key;

    private Selector selector;

    public ServerHandler() {

    }

    /**
     * 原本能夠經過key拿到selector,這裏爲了圖方便就這樣寫了
     */
    public ServerHandler(SelectionKey key, Selector selector) {
        this.key = key;
        this.selector = selector;
    }

    @Override
    public void run() {
        try {
            if (key.isAcceptable()) {
                // 說明是服務端的事件,注意這裏強轉換爲的是ServerSocketChannel
                ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                // 接收鏈接
                SocketChannel socket = channel.accept();
                if (Objects.isNull(socket)) {
                    return;
                }

                socket.configureBlocking(false);
                // 接收客戶端的socket而且將其註冊到服務端這邊的selector中,注意客戶端在此時跟服務端selector產生關聯
                socket.register(selector, SelectionKey.OP_READ);
                System.err.println("服務端已接收鏈接");
            } else if (key.isReadable()) {
                // 客戶端發送信息過來了
                doReadJob();
            }
        } catch (IOException e) {
            e.printStackTrace();
            // 錯誤處理
        }
    }

    /**
     * 讀取操做
     */
    private void doReadJob() throws IOException {
        SocketChannel socketChannel = (SocketChannel) key.channel();
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int readCount = socketChannel.read(buffer);
        if (readCount > 0) {
            String msg = new String(buffer.array(), StandardCharsets.UTF_8);
            System.err.println(socketChannel.getRemoteAddress().toString() + "的信息爲:" + msg);

            // 轉發給其餘客戶端
            sendMsgToOtherClients(msg);
        }
    }

    /**
     * 轉發消息給其餘客戶端
     *
     * @param msg 消息
     */
    private void sendMsgToOtherClients(String msg) throws IOException {

        SocketChannel self = (SocketChannel) key.channel();

        Set<SelectionKey> keys = selector.keys();
        Iterator<SelectionKey> iterator = keys.iterator();
        while (iterator.hasNext()) {
            SelectionKey selectionKey = iterator.next();
            SelectableChannel channel = selectionKey.channel();
            // 若是是自己或者不是socketChannel類型則跳過
            if (self.equals(channel) || channel instanceof ServerSocketChannel) {
                continue;
            }

            SocketChannel socketChannel = (SocketChannel) channel;
            ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8));
            socketChannel.write(byteBuffer);
        }
    }
}

客戶端:

public class ChatClient {

    private Selector selector;

    private SocketChannel socketChannel;

    private static ExecutorService dealPool = Executors.newFixedThreadPool(2);

    public ChatClient() throws IOException {

        /*
         * 說明一下:
         * 客戶端這邊的selector跟剛纔在服務端定義的selector是不一樣的兩個selector
         * 客戶端這邊不須要selector也能實現功能,可是讀取的時候必須不斷的循環,會致使CPU飆升,
         * 因此使用selector是爲了解決這個問題的,別跟服務端的selector搞混就好
         */
        selector = Selector.open();
        socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9999));
        socketChannel.configureBlocking(false);
        socketChannel.register(selector, SelectionKey.OP_READ);
    }


    public void start() throws IOException, InterruptedException {
        // 鏈接
//        socketChannel.connect(new InetSocketAddress("localhost", 9999));
        while (!socketChannel.finishConnect()) {
            System.err.println("正在鏈接...");
            TimeUnit.MILLISECONDS.sleep(200);
        }

        System.err.println("鏈接成功");

        // 使用兩個線程來分別處理讀取和寫操做
        // 寫數據
        dealPool.execute(new ClientHandler(selector, socketChannel, 1));

        // 讀取數據
        dealPool.execute(new ClientHandler(selector, socketChannel, 0));
    }


    public static void main(String[] args) throws IOException, InterruptedException {
        new ChatClient().start();
    }
}

客戶端處理器:

public class ClientHandler implements Runnable {

    private Selector selector;

    private SocketChannel socketChannel;

    /**
     * 0-讀,1-寫
     */
    private int type;

    public ClientHandler() {
    }

    public ClientHandler(Selector selector, SocketChannel socketChannel, int type) {
        // selector是爲了解決讀時候CPU飆升的問題,具體見客戶端的啓動類代碼註釋
        this.selector = selector;
        this.socketChannel = socketChannel;
        this.type = type;
    }

    @Override
    public void run() {
        try {
            if (type == 0) {
                doClientReadJob();
                return;
            }

            doClientWriteJob();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 寫操做
     */
    private void doClientWriteJob() throws IOException {
        SocketChannel sc = socketChannel;
        Scanner scanner = new Scanner(System.in);
        while (true) {
            if (scanner.hasNextLine()) {
                String line = scanner.nextLine();
                if (null != line && !"".equals(line)) {
                    ByteBuffer buffer = ByteBuffer.wrap(line.getBytes(StandardCharsets.UTF_8));
                    sc.write(buffer);
                }
            }
        }
    }

    /**
     * 讀操做
     */
    private void doClientReadJob() throws IOException {
        SocketChannel sc = socketChannel;
        ByteBuffer buf = ByteBuffer.allocate(1024);
        while (true) {
            int select = selector.select();
            if (select > 0) {
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    // 這是必須的,否則下方的remove會出錯
                    SelectionKey next = iterator.next();
                    // 這裏由於只有自己這個客戶端註冊到客戶端的selector中,因此有事件必定是它的,也就不用從key拿了,直接操做就行
                    buf.clear();
                    int read = sc.read(buf);
                    if (read > 0) {
                        String msg = new String(buf.array(), StandardCharsets.UTF_8);
                        System.err.println(msg);
                    }
                    // 事件處理完以後要移除這個key,不然的話selector.select()方法不會再讀到這個key,即使有新的時間到這個channel來
                    iterator.remove();
                }
            }
        }
    }

}

  結果圖:

NIO結果圖

在編寫的過程當中發現瞭如下兩點:

  1. select方法以後若是存在key,而且接下來的操做未對這個selectionKeyremove操做,那麼下次的select不會再將其選入,即使有事件發生,也就是說,select方法不會選擇以前已經選過的key。
  2. selector.select()方法中偶爾會出現不阻塞的狀況。這就是NIO中的空輪詢bug,也就是說,沒有鏈接又不阻塞的話,while(true) ... 的寫法就是一個死循環,會致使CPU飆升。

  第二點問題在NIO框架(如netty)中都採用了比較好的解決方法,能夠去查下如何解決的。接下來看下NIO的寫法是否解決了IO寫法中存在的問題:

  1. 服務端使用accept阻塞接收線程,鏈接一個一個處理,在高併發下處理性能緩慢。

    答:上述寫法中仍是使用一個ServerSocketChannel來接收客戶端,沒有解決這個問題;可是能夠經過使用線程池的方式來解決。也就是說將服務端的事件分紅兩個部分,第一個部分爲接收客戶端,使用一個線程池來維護;第二個部分爲客戶端的事件處理操做,也維護一個線程池來執行這些事件。

      這樣性能上去了,因爲selector的存在也不會出現資源浪費的事情netty就是這麼作的哦。

  2. 沒有鏈接的時候線程一直處於阻塞狀態形成資源的浪費(若是使用多線程接收處理併發,那麼沒鏈接的時候形成多個線程的資源浪費)。

    答:解決。NIO寫法主要有selector不斷輪詢,不會出現沒鏈接不做爲的狀況,並且多個鏈接的話也沒有問題(參考1的回答)。

3. 小結

  兩種寫法都有Reactor模式的影子,可是IO寫法有明顯的缺點就是若是沒有鏈接會形成資源浪費的問題(採用多個接收鏈接的話更甚),而NIO中selector輪詢機制就很好的解決了無鏈接時無做爲的狀況,而且在性能方面能夠經過職責分類和線程池來獲得改善,因此,NIO,永遠滴神。

須要壓力,須要努力。

相關文章
相關標籤/搜索