手動搭建I/O網絡通訊框架3:NIO編程模型,升級改造聊天室

第一章:手動搭建I/O網絡通訊框架1:Socket和ServerSocket入門實戰,實現單聊html

第二章:手動搭建I/O網絡通訊框架2:BIO編程模型實現羣聊編程

第四章:手動搭建I/O網絡通訊框架4:AIO編程模型,聊天室終極改造數組

 

  在第二章中用BIO編程模型,簡單的實現了一個聊天室。可是其最大的問題在解釋BIO時就已經說了:ServerSocket接收請求時(accept()方法)、InputStream、OutputStream(輸入輸出流的讀和寫)都是阻塞的。還有一個問題就是線程池,線程多了,服務器性能耗不起。線程少了,在聊天室這種場景下,讓用戶等待鏈接確定不可取。今天要說到的NIO編程模型就很好的解決了這幾個問題。有兩個主要的替換地方:緩存

  1.用Channel代替Stream。2.使用Selector監控多條Channel,起到相似線程池的做用,可是它只需一條線程。服務器

  既然要用NIO編程模型,那就要說說它的三個主要核心:Selector、Channel、Buffer。它們的關係是:一個Selector管理多個Channel,一個Channel能夠往Buffer中寫入和讀取數據。Buffer名叫緩衝區,底層實際上是一個數組,會提供一些方法往數組寫入讀取數據。網絡

  Buffer:框架

不太瞭解Buffer的能夠看看這個:https://blog.csdn.net/czx2018/article/details/89502699異步

  經常使用API:socket

  allocate() - 初始化一塊緩衝區ide

  put() - 向緩衝區寫入數據

  get() - 向緩衝區讀數據

  filp() - 將緩衝區的讀寫模式轉換

  clear() - 這個並非把緩衝區裏的數據清除,而是利用後來寫入的數據來覆蓋原來寫入的數據,以達到相似清除了老的數據的效果

  compact() - 從讀數據切換到寫模式,數據不會被清空,會將全部未讀的數據copy到緩衝區頭部,後續寫數據不會覆蓋,而是在這些數據以後寫數據

  mark() - 對position作出標記,配合reset使用

  reset() - 將position置爲標記值

  簡單地說:Buffer實質上是個數組,有兩個關鍵的指針,一個position表明當前數據寫入到哪了、一個limit表明限制。初始化時設置了數組長度,這limit就是數組的長度。如:設置intBuffer.allocate(10),最大存儲10個int數據,寫入5五個數據後,須要讀取數據了。用filp()轉換讀寫模式後,limit=position,position=0。也就是說從0開始讀,只能讀到第五個。讀完後這個緩衝區就須要clear()了,實際上並無真的去清空數據,而是position和limit兩個指針又回到了初始化的位置,接着又能夠寫入數據了,反正數組下標相同從新寫入數據會覆蓋,就不必真的去清空了。

  Channel:

 

  Channel(通道)主要用於傳輸數據,而後從Buffer中寫入或讀取。它們兩個結合起來雖然和流有些類似,但主要有如下幾點區別:
  1.流是單向的,能夠發現Stream的輸入流和輸出流是獨立的,它們只能輸入或輸出。而通道既能夠讀也能夠寫。
  2.通道自己不能存放數據,只能藉助Buffer。
  3.Channel支持異步。
  Channel有以下三個經常使用的類:FileChannel、SocketChannel、ServerSocketChannel。從名字也能夠看出區別,第一個是對文件數據的讀寫,後面兩個則是針對Socket和ServerSocket,這裏咱們只是用後面兩個。更詳細的用法能夠看:https://www.cnblogs.com/snailclimb/p/9086335.html,下面的代碼中也會用到,會有詳細的註釋。

  Selector

  多個Channel能夠註冊到Selector,就能夠直接經過一個Selector管理多個通道。Channel在不一樣的時間或者不一樣的事件下有不一樣的狀態,Selector會經過輪詢來達到監視的效果,若是查到Channel的狀態正好是咱們註冊時聲明的所要監視的狀態,咱們就能夠查出這些通道,而後作相應的處理。這些狀態以下:
  1.客戶端的SocketChannel和服務器端創建鏈接,SocketChannel狀態就是Connect
  2.服務器端的ServerSocketChannel接收了客戶端的請求,ServerSocketChannel狀態就是Accept
  3.當SocketChannel有數據可讀,那麼它們的狀態就是Read
  4.當咱們須要向Channel中寫數據時,那麼它們的狀態就是Write
  具體的使用見下面代碼註釋或看http://www.javashuo.com/article/p-ommkuisy-eu.html

  NIO編程模型
  NIO編程模型工做流程:
  1.首先會建立一個Selector,用來監視管理各個不一樣的Channel,也就是不一樣的客戶端。至關於取代了原來BIO的線程池,可是它只需一個線程就能夠處理多個Channel,沒有了線程上下文切換帶來的消耗,很好的優化了性能。
  2.建立一個ServerSocketChannel監聽通訊端口,並註冊到Selector,讓Seletor監視這個通道的Accept狀態,也就是接收客戶端請求的狀態。
  3.此時客戶端ClientA請求服務器,那麼Selector就知道了有客戶端請求進來。這時候咱們能夠獲得客戶端的SocketChannel,併爲這個通道註冊Read狀態,也就是Selector會監聽ClientA發來的消息。
  4.一旦接收到ClientA的消息,就會用其餘客戶端的SocketChannel的Write狀態,向它們轉發ClientA的消息。

 

  上代碼以前,仍是先說說各個類的做用:

  相比較BIO的代碼,NIO的代碼還少了一個類,那就是服務器端的工做線程類。沒了線程池,天然也不須要一個單獨的線程去服務客戶端。客戶端仍是須要一個單獨的線程去等待用戶輸入,由於用戶隨時均可能輸入信息,這個無法預見,只能阻塞式的等待。

  ChatServer:服務器端的惟一的類,做用就是經過Selector監聽Read和Accept事件,並針對這些事件的類型,進行不一樣的處理,如鏈接、轉發。

  ChatClient:客戶端,經過Selector監聽Read和Connect事件。Read事件就是獲取服務器轉發的消息而後顯示出來;Connect事件就是和服務器創建鏈接,創建成功後就能夠發送消息。

  UserInputHandler:專門等待用戶輸入的線程,和BIO沒區別。

 

  ChatServer

public class ChatServer {
    //設置緩衝區的大小,這裏設置爲1024個字節
    private static final int BUFFER = 1024;

    //Channel都要配合緩衝區進行讀寫,因此這裏建立一個讀緩衝區和一個寫緩衝區
    //allocate()靜態方法就是設置緩存區大小的方法
    private ByteBuffer read_buffer = ByteBuffer.allocate(BUFFER);
    private ByteBuffer write_buffer = ByteBuffer.allocate(BUFFER);

    //爲了監聽端口更靈活,再不寫死了,用一個構造函數設置須要監聽的端口號
    private int port;

    public ChatServer(int port) {
        this.port = port;
    }

    private void start() {
        //建立ServerSocketChannel和Selector並打開
        try (ServerSocketChannel server = ServerSocketChannel.open(); Selector selector = Selector.open()) {
            //【重點,實現NIO編程模型的關鍵】configureBlocking設置ServerSocketChannel爲非阻塞式調用,Channel默認的是阻塞的調用方式
            server.configureBlocking(false);
            //綁定監聽端口,這裏不是給ServerSocketChannel綁定,而是給ServerSocket綁定,socket()就是獲取通道原生的ServerSocket或Socket
            server.socket().bind(new InetSocketAddress(port));

            //把server註冊到Selector並監聽Accept事件
            server.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("啓動服務器,監聽端口:" + port);


            while (true) {
                //select()會返回此時觸發了多少個Selector監聽的事件
                if(selector.select()>0) {
                    //獲取這些已經觸發的事件,selectedKeys()返回的是觸發事件的全部信息
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    //循環處理這些事件
                    for (SelectionKey key : selectionKeys) {
                        handles(key, selector);
                    }
                    //處理完後清空selectedKeys,避免重複處理
                    selectionKeys.clear();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    //處理事件的方法
    private void handles(SelectionKey key, Selector selector) throws IOException {
        //當觸發了Accept事件,也就是有客戶端請求進來
        if (key.isAcceptable()) {
            //獲取ServerSocketChannel
            ServerSocketChannel server = (ServerSocketChannel) key.channel();
            //而後經過accept()方法接收客戶端的請求,這個方法會返回客戶端的SocketChannel,這一步和原生的ServerSocket相似
            SocketChannel client = server.accept();
            client.configureBlocking(false);

            //把客戶端的SocketChannel註冊到Selector,並監聽Read事件
            client.register(selector, SelectionKey.OP_READ);
            System.out.println("客戶端[" + client.socket().getPort() + "]上線啦!");
        }
        //當觸發了Read事件,也就是客戶端發來了消息
        if (key.isReadable()) {
            SocketChannel client = (SocketChannel) key.channel();
            //獲取消息
            String msg = receive(client);
            System.out.println("客戶端[" + client.socket().getPort() + "]:" + msg);
            //把消息轉發給其餘客戶端
            sendMessage(client, msg, selector);
            //判斷用戶是否退出
            if (msg.equals("quit")) {
                //解除該事件的監聽
                key.cancel();
                //更新Selector
                selector.wakeup();
                System.out.println("客戶端[" + client.socket().getPort() + "]下線了!");
            }
        }
    }

    //編碼方式設置爲utf-8,下面字符和字符串互轉時用獲得
    private Charset charset = Charset.forName("UTF-8");

    //接收消息的方法
    private String receive(SocketChannel client) throws IOException {
        //用緩衝區以前先清空一下,避免以前的信息殘留
        read_buffer.clear();
        //把通道里的信息讀取到緩衝區,用while循環一直讀取,直到讀完全部消息。由於沒有明確的相似\n這樣的結尾,因此要一直讀到沒有字節爲止
        while (client.read(read_buffer) > 0) ;
        //把消息讀取到緩衝區後,須要轉換buffer的讀寫狀態,不明白的看看前面的Buffer的講解
        read_buffer.flip();
        return String.valueOf(charset.decode(read_buffer));
    }

    //轉發消息的方法
    private void sendMessage(SocketChannel client, String msg, Selector selector) throws IOException {
        msg = "客戶端[" + client.socket().getPort() + "]:" + msg;
        //獲取全部客戶端,keys()與前面的selectedKeys不一樣,這個是獲取全部已經註冊的信息,而selectedKeys獲取的是觸發了的事件的信息
        for (SelectionKey key : selector.keys()) {
            //排除服務器和本客戶端而且保證key是有效的,isValid()會判斷Selector監聽是否正常、對應的通道是保持鏈接的狀態等
            if (!(key.channel() instanceof ServerSocketChannel) && !client.equals(key.channel()) && key.isValid()) {
                SocketChannel otherClient = (SocketChannel) key.channel();
                write_buffer.clear();
                write_buffer.put(charset.encode(msg));
                write_buffer.flip();
                //把消息寫入到緩衝區後,再把緩衝區的內容寫到客戶端對應的通道中
                while (write_buffer.hasRemaining()) {
                    otherClient.write(write_buffer);
                }
            }
        }
    }

    public static void main(String[] args) {
        new ChatServer(8888).start();
    }
}

  ChatClient

 

public class ChatClient {
    private static final int BUFFER = 1024;
    private ByteBuffer read_buffer = ByteBuffer.allocate(BUFFER);
    private ByteBuffer write_buffer = ByteBuffer.allocate(BUFFER);
    //聲明成全局變量是爲了方便下面一些工具方法的調用,就不用try with resource了
    private SocketChannel client;
    private Selector selector;

    private Charset charset = Charset.forName("UTF-8");

    private void start() {
        try  {
            client=SocketChannel.open();
            selector=Selector.open();
            client.configureBlocking(false);
            //註冊channel,並監聽SocketChannel的Connect事件
            client.register(selector, SelectionKey.OP_CONNECT);
            //請求服務器創建鏈接
            client.connect(new InetSocketAddress("127.0.0.1", 8888));
            //和服務器同樣,不停的獲取觸發事件,並作相應的處理
            while (true) {
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                for (SelectionKey key : selectionKeys) {
                    handle(key);
                }
                selectionKeys.clear();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }catch (ClosedSelectorException e){
            //當用戶輸入quit時,在send()方法中,selector會被關閉,而在上面的無限while循環中,可能會使用到已經關閉了的selector。
            //因此這裏捕捉一下異常,作正常退出處理就好了。不會對服務器形成影響
        }
    }

    private void handle(SelectionKey key) throws IOException {
        //當觸發connect事件,也就是服務器和客戶端創建鏈接
        if (key.isConnectable()) {
            SocketChannel client = (SocketChannel) key.channel();
            //finishConnect()返回true,說明和服務器已經創建鏈接。若是是false,說明還在鏈接中,還沒徹底鏈接完成
            if(client.finishConnect()){
                //新建一個新線程去等待用戶輸入
                new Thread(new UserInputHandler(this)).start();
            }
            //鏈接創建完成後,註冊read事件,開始監聽服務器轉發的消息
            client.register(selector,SelectionKey.OP_READ);
        }
        //當觸發read事件,也就是獲取到服務器的轉發消息
        if(key.isReadable()){
            SocketChannel client = (SocketChannel) key.channel();
            //獲取消息
            String msg = receive(client);
            System.out.println(msg);
            //判斷用戶是否退出
            if (msg.equals("quit")) {
                //解除該事件的監聽
                key.cancel();
                //更新Selector
                selector.wakeup();
            }
        }
    }
    //獲取消息
    private String receive(SocketChannel client) throws IOException{
        read_buffer.clear();
        while (client.read(read_buffer)>0);
        read_buffer.flip();
        return String.valueOf(charset.decode(read_buffer));
    }

    //發送消息
    public void send(String msg) throws IOException{
        if(!msg.isEmpty()){
            write_buffer.clear();
            write_buffer.put(charset.encode(msg));
            write_buffer.flip();
            while (write_buffer.hasRemaining()){
                client.write(write_buffer);
            }
            if(msg.equals("quit")){
                selector.close();
            }
        }
    }

    public static void main(String[] args) {
        new ChatClient().start();
    }
}

 

  UserInputHandler

 

public class UserInputHandler implements Runnable {
    ChatClient client;
    public UserInputHandler(ChatClient chatClient) {
        this.client=chatClient;
    }
    @Override
    public void run() {
        BufferedReader read=new BufferedReader(
                new InputStreamReader(System.in)
        );
        while (true){
            try {
                String input=read.readLine();
                client.send(input);
                if(input.equals("quit"))
                    break;
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

 

 

  測試運行:以前用的是win10的終端運行的,之後直接用IDEA運行,方便些。不過一個類同時運行多個,以實現多個客戶端的場景,須要先作如下設置

 

 

 

  設置完後,就能夠同時運行兩個ChatClient了,上圖中得Unnamed就是第二個ChatClient,選中後點擊右邊運行按鈕就好了。效果以下:

相關文章
相關標籤/搜索