NIO之Reactor模式,Netty序章

Reactor模式

反應堆模式:「反應」器名字中」反應「的由來:數組

  • 「反應」即「倒置」,「控制逆轉」,具體事件處理程序不調用反應器,而向反應器註冊一個事件處理器,表示本身對某些事件感興趣,有時間來了,具體事件處理程序經過事件處理器對某個指定的事件發生作出反應。

單線程Reactor模式流程:

clipboard.png

  • ①服務器端的Reactor是一個線程對象,該線程會啓動事件循環,並使用Selector(選擇器)來實現IO的多路複用。channel註冊一個Acceptor事件處理器到Reactor中,Acceptor事件處理器所關注的事件是ACCEPT事件,這樣Reactor會監聽客戶端向服務器端發起的鏈接請求事件(ACCEPT事件)。
  • ②客戶端向服務器端發起一個鏈接請求,Reactor監聽到了該ACCEPT事件的發生並將該ACCEPT事件派發給相應的Acceptor處理器來進行處理。Acceptor處理器經過accept()方法獲得與這個客戶端對應的鏈接(SocketChannel),而後將該鏈接所關注的READ事件以及對應的READ事件處理器註冊到Reactor中,這樣一來Reactor就會監聽該鏈接的READ事件了。
  • ③當Reactor監聽到有讀或者寫事件發生時,將相關的事件派發給對應的處理器進行處理。好比,讀處理器會經過SocketChannel的read()方法讀取數據,此時read()操做能夠直接讀取到數據,而不會堵塞與等待可讀的數據到來。
  • ④每當處理完全部就緒的感興趣的I/O事件後,Reactor線程會再次執行select()阻塞等待新的事件就緒並將其分派給對應處理器進行處理。

注意,Reactor的單線程模式的單線程主要是針對於I/O操做而言,也就是全部的I/O的accept()、read()、write()以及connect()操做都在一個線程上完成的。服務器

基於單線程反應器模式手寫一個NIO通訊

先簡單介紹NIO中幾個重要對象:
Selector併發

  • Selector的英文含義是「選擇器」,也能夠稱爲爲「輪詢代理器」、「事件訂閱器」、「channel容器管理機」都行。
  • 事件訂閱和Channel管理: 應用程序將向Selector對象註冊須要它關注的Channel,以及具體的某一個Channel會對哪些IO事件感興趣。Selector中也會維護一個「已經註冊的Channel」的容器。

Channels
通道,被創建的一個應用程序和操做系統交互事件、傳遞內容的渠道(注意是鏈接到操做系統)。那麼既然是和操做系統進行內容的傳遞,那麼說明應用程序能夠經過通道讀取數據,也能夠經過通道向操做系統寫數據。socket

  • 全部被Selector(選擇器)註冊的通道,只能是繼承了SelectableChannel類的子類。
  • ServerSocketChannel:應用服務器程序的監聽通道。只有經過這個通道,應用程序才能向操做系統註冊支持「多路複用IO」的端口監聽。同時支持UDP協議和TCP協議。
  • ScoketChannel:TCP Socket套接字的監聽通道,一個Socket套接字對應了一個客戶端IP:端口 到
    服務器IP:端口的通訊鏈接。
  • DatagramChannel:UDP 數據報文的監聽通道。

通道中的數據老是要先讀到一個Buffer,或者老是要從一個Buffer中寫入。ide

服務端處理器:性能

/**
 * 類說明:nio通訊服務端處理器
 */
public class NioServerHandle implements Runnable{
    private Selector selector;
    private ServerSocketChannel serverChannel;
    private volatile boolean started;
    /**
     * 構造方法
     * @param port 指定要監聽的端口號
     */
    public NioServerHandle(int port) {

        try {
            selector = Selector.open();
            serverChannel = ServerSocketChannel.open();
            serverChannel.configureBlocking(false);
            serverChannel.socket().bind(new InetSocketAddress(port));
            serverChannel.register(selector,SelectionKey.OP_ACCEPT);
            started = true;
            System.out.println("服務器已啓動,端口號:"+port);
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
    public void stop(){
        started = false;
    }
    @Override
    public void run() {
        //循環遍歷selector
        while(started){
            try{
                //阻塞,只有當至少一個註冊的事件發生的時候纔會繼續.
                selector.select();
                Set<SelectionKey> keys = selector.selectedKeys();
                Iterator<SelectionKey> it = keys.iterator();
                SelectionKey key = null;
                while(it.hasNext()){
                    key = it.next();
                    it.remove();
                    try{
                        handleInput(key);
                    }catch(Exception e){
                        if(key != null){
                            key.cancel();
                            if(key.channel() != null){
                                key.channel().close();
                            }
                        }
                    }
                }
            }catch(Throwable t){
                t.printStackTrace();
            }
        }
        //selector關閉後會自動釋放裏面管理的資源
        if(selector != null)
            try{
                selector.close();
            }catch (Exception e) {
                e.printStackTrace();
            }
    }
    private void handleInput(SelectionKey key) throws IOException{
        if(key.isValid()){
            //處理新接入的請求消息
            if(key.isAcceptable()){
                //得到關心當前事件的channel
                ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
                //經過ServerSocketChannel的accept建立SocketChannel實例
                //完成該操做意味着完成TCP三次握手,TCP物理鏈路正式創建
                SocketChannel sc = ssc.accept();
                System.out.println("======socket channel 創建鏈接" );
                //設置爲非阻塞的
                sc.configureBlocking(false);
                //鏈接已經完成了,能夠開始關心讀事件了
                sc.register(selector,SelectionKey.OP_READ);
            }
            //讀消息
            if(key.isReadable()){
                System.out.println("======socket channel 數據準備完成," +
                        "能夠去讀==讀取=======");
                SocketChannel sc = (SocketChannel) key.channel();
                //建立ByteBuffer,並開闢一個1M的緩衝區
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                //讀取請求碼流,返回讀取到的字節數
                int readBytes = sc.read(buffer);
                //讀取到字節,對字節進行編解碼
                if(readBytes>0){
                    //將緩衝區當前的limit設置爲position=0,
                    // 用於後續對緩衝區的讀取操做
                    buffer.flip();
                    //根據緩衝區可讀字節數建立字節數組
                    byte[] bytes = new byte[buffer.remaining()];
                    //將緩衝區可讀字節數組複製到新建的數組中
                    buffer.get(bytes);
                    String message = new String(bytes,"UTF-8");
                    System.out.println("服務器收到消息:" + message);
                    //處理數據
                    String result = response(message) ;
                    //發送應答消息
                    doWrite(sc,result);
                }
                //鏈路已經關閉,釋放資源
                else if(readBytes<0){
                    key.cancel();
                    sc.close();
                }
            }

        }
    }
    //發送應答消息
    private void doWrite(SocketChannel channel,String response)
            throws IOException {
        //將消息編碼爲字節數組
        byte[] bytes = response.getBytes();
        //根據數組容量建立ByteBuffer
        ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
        //將字節數組複製到緩衝區
        writeBuffer.put(bytes);
        //flip操做
        writeBuffer.flip();
        //發送緩衝區的字節數組
        channel.write(writeBuffer);
    }
}
public class NioServer {

    private static NioServerHandle nioServerHandle;

    public static void start(){
        if(nioServerHandle !=null)
            nioServerHandle.stop();
        nioServerHandle = new NioServerHandle(DEFAULT_PORT);
        new Thread(nioServerHandle,"Server").start();
    }
    public static void main(String[] args){
        start();
    }

}

客戶端處理器大數據

public class NioClientHandle implements Runnable{
    private String host;
    private int port;
    private Selector selector;
    private SocketChannel socketChannel;

    private volatile boolean started;

    public NioClientHandle(String ip, int port) {
        this.host = ip;
        this.port = port;
        try {
            //建立選擇器
            selector = Selector.open();
            //打開通道
            socketChannel = SocketChannel.open();
            //若是爲 true,則此通道將被置於阻塞模式;
            // 若是爲 false,則此通道將被置於非阻塞模式
            socketChannel.configureBlocking(false);
            started = true;
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
    public void stop(){
        started = false;
    }
    @Override
    public void run() {
        try {
            doConnect();
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
        //循環遍歷selector
        while(started){
            try {
                //阻塞,只有當至少一個註冊的事件發生的時候纔會繼續
                selector.select();
                //獲取當前有哪些事件可使用
                Set<SelectionKey> keys = selector.selectedKeys();
                //轉換爲迭代器
                Iterator<SelectionKey> it = keys.iterator();
                SelectionKey key = null;
                while(it.hasNext()){
                    key = it.next();
                    it.remove();
                    try {
                        handleInput(key);
                    } catch (IOException e) {
                        e.printStackTrace();
                        if(key!=null){
                            key.cancel();
                            if(key.channel()!=null){
                                key.channel().close();
                            }
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        //selector關閉後會自動釋放裏面管理的資源
        if(selector!=null){
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }

    //具體的事件處理方法
    private void handleInput(SelectionKey key) throws IOException{
        if(key.isValid()){
            //得到關心當前事件的channel
            SocketChannel sc = (SocketChannel)key.channel();
            if(key.isConnectable()){//鏈接事件
                if(sc.finishConnect()){}
                else{System.exit(1);}
            }
            //有數據可讀事件
            if(key.isReadable()){
                //建立ByteBuffer,並開闢一個1M的緩衝區
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                //讀取請求碼流,返回讀取到的字節數
                int readBytes = sc.read(buffer);
                //讀取到字節,對字節進行編解碼
                if(readBytes>0){
                    //將緩衝區當前的limit設置爲position,position=0,
                    // 用於後續對緩衝區的讀取操做
                    buffer.flip();
                    //根據緩衝區可讀字節數建立字節數組
                    byte[] bytes = new byte[buffer.remaining()];
                    //將緩衝區可讀字節數組複製到新建的數組中
                    buffer.get(bytes);
                    String result = new String(bytes,"UTF-8");
                    System.out.println("accept message:"+result);
                }else if(readBytes<0){
                    key.cancel();
                    sc.close();
                }
            }
        }
    }

    //發送消息
    private void doWrite(SocketChannel channel,String request)
            throws IOException {
        //將消息編碼爲字節數組
        byte[] bytes = request.getBytes();
        //根據數組容量建立ByteBuffer
        ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
        //將字節數組複製到緩衝區
        writeBuffer.put(bytes);
        //flip操做
        writeBuffer.flip();
        //發送緩衝區的字節數組
        channel.write(writeBuffer);
    }

    private void doConnect() throws IOException {
        /*若是此通道處於非阻塞模式,
        則調用此方法將啓動非阻塞鏈接操做。
        若是當即創建鏈接,就像本地鏈接可能發生的那樣,則此方法返回true。
        不然,此方法返回false,
        稍後必須經過調用finishConnect方法完成鏈接操做。*/
        if(socketChannel.connect(new InetSocketAddress(host,port))){}
        else{
            //鏈接還未完成,因此註冊鏈接就緒事件,向selector表示關注這個事件
            socketChannel.register(selector,SelectionKey.OP_CONNECT);
        }
    }

    //寫數據對外暴露的API
    public void sendMsg(String msg) throws Exception{
        socketChannel.register(selector,SelectionKey.OP_READ);
        doWrite(socketChannel,msg);
    }
}
public class NioClient {

    private static NioClientHandle nioClientHandle;

    public static void start(){
        if(nioClientHandle !=null)
            nioClientHandle.stop();
        nioClientHandle = new NioClientHandle(DEFAULT_SERVER_IP,DEFAULT_PORT);
        new Thread(nioClientHandle,"Client").start();
    }
    //向服務器發送消息
    public static boolean sendMsg(String msg) throws Exception{
        nioClientHandle.sendMsg(msg);
        return true;
    }
    public static void main(String[] args) throws Exception {
        start();
        System.out.println("請輸入請求信息:");
        Scanner scanner = new Scanner(System.in);
        while(NioClient.sendMsg(scanner.next()));

    }

}

服務端過程:this

  1. 啓動服務端,完成一些初始化工做,ServerSocketChannel綁定端口而且註冊接受鏈接事件.
  2. 循環裏selector.select()阻塞,只有當至少一個註冊的事件發生的時候纔會繼續,循環裏面處理髮生的註冊事件
  3. 註冊事件發生時交給處理器,若爲接受鏈接則accept取出socketChannel並完成鏈接,而後就是關注read讀取事件即註冊,有數據讀取了則處理器讀取請求數據並返回.

客戶端過程:編碼

  1. 啓動客戶端,完成一些初始化工做.
  2. 根據服務端ip及端口發起鏈接.
  3. 往服務端發送數據,並註冊read讀取事件
  4. 循環裏selector.select()阻塞,只有當至少一個註冊的事件發生的時候纔會繼續,循環裏面處理髮生的註冊事件.
  5. 註冊事件發生時交給處理器,若爲鏈接事件而且鏈接成功則跳過即不予處理等待讀取事件發送.

初始化工做如打開selector,channel,設置通道模式是否阻塞.spa


單線程Reactor,工做者線程池

但在單線程Reactor模式中,不只I/O操做在該Reactor線程上,連非I/O的業務操做也在該線程上進行處理了,這可能會大大延遲I/O請求的響應。因此咱們應該將非I/O的業務邏輯操做從Reactor線程上卸載,以此來加速Reactor線程對I/O請求的響應.

clipboard.png

添加了一個工做者線程池,並將非I/O操做從Reactor線程中移出轉交給工做者線程池來執行。這樣可以提升Reactor線程的I/O響應,不至於由於一些耗時的業務邏輯而延遲對後面I/O請求的處理。


改進的版本中,因此的I/O操做依舊由一個Reactor來完成,包括I/O的accept()、read()、write()以及connect()操做。
對於一些小容量應用場景,可使用單線程模型。可是對於高負載、大併發或大數據量的應用場景卻不合適,主要緣由以下:

  • ① 一個NIO線程同時處理成百上千的鏈路,性能上沒法支撐,即使NIO線程的CPU負荷達到100%,也沒法知足海量消息的讀取和發送;
  • ②當NIO線程負載太重以後,處理速度將變慢,這會致使大量客戶端鏈接超時,超時以後每每會進行重發,這更加劇了NIO線程的負載,最終會致使大量消息積壓和處理超時,成爲系統的性能瓶頸;

多Reactor線程模式

clipboard.png

Reactor線程池中的每一Reactor線程都會有本身的Selector、線程和分發的事件循環邏輯。
mainReactor能夠只有一個,但subReactor通常會有多個。mainReactor線程主要負責接收客戶端的鏈接請求,而後將接收到的SocketChannel傳遞給subReactor,由subReactor來完成和客戶端的通訊。


流程:

  • ①註冊一個Acceptor事件處理器到mainReactor中,Acceptor事件處理器所關注的事件是ACCEPT事件,這樣mainReactor會監聽客戶端向服務器端發起的鏈接請求事件(ACCEPT事件)。啓動mainReactor的事件循環。
  • ②客戶端向服務器端發起一個鏈接請求,mainReactor監聽到了該ACCEPT事件並將該ACCEPT事件派發給Acceptor處理器來進行處理。Acceptor處理器經過accept()方法獲得與這個客戶端對應的鏈接(SocketChannel),而後將這個SocketChannel傳遞給subReactor線程池。
  • ③subReactor線程池分配一個subReactor線程給這個SocketChannel,即,將SocketChannel關注的READ事件以及對應的READ事件處理器註冊到subReactor線程中。固然你也註冊WRITE事件以及WRITE事件處理器到subReactor線程中以完成I/O寫操做。Reactor線程池中的每一Reactor線程都會有本身的Selector、線程和分發的循環邏輯。
  • ④當有I/O事件就緒時,相關的subReactor就將事件派發給響應的處理器處理。注意,這裏subReactor線程只負責完成I/O的read()操做,在讀取到數據後將業務邏輯的處理放入到線程池中完成,若完成業務邏輯後須要返回數據給客戶端,則相關的I/O的write操做仍是會被提交回subReactor線程來完成。

注意,因此的I/O操做(包括,I/O的accept()、read()、write()以及connect()操做)依舊仍是在Reactor線程(mainReactor線程 或 subReactor線程)中完成的。Thread Pool(線程池)僅用來處理非I/O操做的邏輯。
多Reactor線程模式將「接受客戶端的鏈接請求」和「與該客戶端的通訊」分在了兩個Reactor線程來完成。mainReactor完成接收客戶端鏈接請求的操做,它不負責與客戶端的通訊,而是將創建好的鏈接轉交給subReactor線程來完成與客戶端的通訊,這樣一來就不會由於read()數據量太大而致使後面的客戶端鏈接請求得不到即時處理的狀況。而且多Reactor線程模式在海量的客戶端併發請求的狀況下,還能夠經過實現subReactor線程池來將海量的鏈接分發給多個subReactor線程,在多核的操做系統中這能大大提高應用的負載和吞吐量。

Netty服務端使用了「多Reactor線程模式」

相關文章
相關標籤/搜索