Netty中的三種Reactor(反應堆)

目錄:html

Reactor(反應堆)和Proactor(前攝器)java

I/O模型之三:兩種高性能 I/O 設計模式 Reactor 和 Proactorreact

【轉】第8章 前攝器(Proactor):用於爲異步事件多路分離和分派處理器的對象行爲模式數據庫

Java NIO系列教程(八)JDK AIO編程》-- java AIO的proactor模式編程

Java NIO系列教程(七) selector原理 Epoll版的Selector》--java NIO的Reactor模式後端

Netty中的三種Reactor(反應堆)》 設計模式

 

Netty的I/O線程NioEventLoop因爲聚合了多路複用器Selector,能夠同時併發處理成百上千個客戶端SocketChannel。因爲讀寫操做都是非阻塞的,這就能夠充分提高I/O線程的運行效率,避免由頻繁的I/O阻塞致使的線程掛起。另外,因爲Netty採用了異步通訊模式,一個I/O線程能夠併發處理N個客戶端鏈接和讀寫操做,這從根本上解決了傳統同步阻塞I/O一鏈接一線程模型,架構的性能、彈性伸縮能力和可靠性都獲得了極大的提高。安全

高效的Reactor線程模型

常見的Reactor線程模型有三種,分別以下:網絡

  1. Reactor單線程模型;
  2. Reactor多線程模型;
  3. 主從Reactor多線程模型;

Netty是典型的Reactor模型結構,關於Reactor的詳盡闡釋,可參考POSA2,這裏不作概念性的解釋。而應用Java NIO構建Reactor模式,Doug Lea(就是那位讓人無限景仰的大爺)在「Scalable IO in Java」中給了很好的闡述。這裏截取其PPT中經典的圖例說明 Reactor模式的典型實現:多線程

一、Reactor單線程模型

Reactor單線程模型,指的是全部的I/O操做都在同一個NIO線程上面完成,NIO線程的職責以下:

  1. 做爲NIO服務端,接收客戶端的TCP鏈接;
  2. 做爲NIO客戶端,向服務端發起TCP鏈接;
  3. 讀取通訊對端的請求或者應答消息;
  4. 向通訊對端發送消息請求或者應答消息;

Reactor線程是個多面手,負責多路分離套接字,Accept新鏈接,並分派請求處處理器鏈中。該模型 適用於處理器鏈中業務處理組件能快速完成的場景。不過,這種單線程模型不能充分利用多核資源,因此實際使用的很少。

對於一些小容量應用場景,可使用單線程模型,可是對於高負載、大併發的應用卻不合適,主要緣由以下:

  1. 一個NIO線程同時處理成百上千的鏈路,性能上沒法支撐。即使NIO線程的CPU負荷達到100%,也沒法知足海量消息的編碼、解碼、讀取和發送;
  2. 當NIO線程負載太重以後,處理速度將變慢,這會致使大量客戶端鏈接超時,超時以後每每進行重發,這更加劇了NIO線程的負載,最終致使大量消息積壓和處理超時,NIO線程會成爲系統的性能瓶頸;
  3. 可靠性問題。一旦NIO線程意外跑飛,或者進入死循環,會致使整個系統通信模塊不可用,不能接收和處理外部信息,形成節點故障。

爲了解決這些問題,演進出了Reactor多線程模型,下面咱們一塊兒學習下Reactor多線程模型。

二、Reactor多線程模型

Reactor多線程模型與單線程模型最大區別就是有一組NIO線程處理I/O操做,它的特色以下:

  1. 有一個專門的NIO線程--acceptor新城用於監聽服務端,接收客戶端的TCP鏈接請求;
  2. 網絡I/O操做--讀、寫等由一個NIO線程池負責,線程池能夠採用標準的JDK線程池實現,它包含一個任務隊列和N個可用的線程,由這些NIO線程負責消息的讀取、解碼、編碼和發送;
  3. 1個NIO線程能夠同時處理N條鏈路,可是1個鏈路只對應1個NIO線程,防止發生併發操做問題。

在絕大多數場景下,Reactor多線程模型均可以知足性能需求;可是,在極特殊應用場景中,一個NIO線程負責監聽和處理全部的客戶端鏈接可能會存在性能問題。例如百萬客戶端併發鏈接,或者服務端須要對客戶端的握手信息進行安全認證,認證自己很是損耗性能。這類場景下,單獨一個Acceptor線程可能會存在性能不足問題,爲了解決性能問題,產生了第三種Reactor線程模型--主從Reactor多線程模型。

三、主從Reactor多線程模型

特色是:服務端用於接收客戶端鏈接的再也不是1個單獨的NIO線程,而是一個獨立的NIO線程池。Acceptor接收到客戶端TCP鏈接請求處理完成後(可能包含接入認證等),將新建立的SocketChannel註冊到I/O線程池(sub reactor線程池)的某個I/O線程上,由它負責SocketChannel的讀寫和編解碼工做。

Acceptor線程池只用於客戶端的登陸、握手和安全認證,一旦鏈路創建成功,就將鏈路註冊到後端subReactor線程池的I/O線程上,有I/O線程負責後續的I/O操做。

第三種模型比起第二種模型,是將Reactor分紅兩部分,mainReactor負責監聽server socket,accept新鏈接,並將創建的socket分派給subReactor。subReactor負責多路分離已鏈接的socket,讀寫網 絡數據,對業務處理功能,其扔給worker線程池完成。一般,subReactor個數上可與CPU個數等同。

 

NioEventLoopGroup 與 Reactor 線程模型的對應

Netty的線程模型併發固定不變,經過在啓動輔助類中建立不一樣的EventLoopGroup實例並進行適當的參數配置,就能夠支持上述三種Reactor線程模型。

Netty單線程模型服務端代碼示例以下:

/**
     * Netty單線程模型服務端代碼示例
     * @param port
     */
    public void bind(int port) {
        EventLoopGroup reactorGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(reactorGroup, reactorGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast("http-codec", new HttpServerCodec());
                        ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
                        ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
                        //後面代碼省略
                    }
                });
        
            Channel ch = b.bind(port).sync().channel();
            ch.closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            reactorGroup.shutdownGracefully();
        }
    }

Netty多線程模型代碼以下:

/**
     * Netty多線程模型代碼
     * @param port
     */
    public void bind2(int port) {
        EventLoopGroup acceptorGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup ioGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(acceptorGroup, ioGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast("http-codec", new HttpServerCodec());
                        ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
                        ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
                        //後面代碼省略
                    }
                });
        
            Channel ch = b.bind(port).sync().channel();
            ch.closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            acceptorGroup.shutdownGracefully();
            ioGroup.shutdownGracefully();
        }
    }

Netty主從線程模型代碼以下:

/**
     * Netty主從線程模型代碼
     * @param port
     */
    public void bind3(int port) {
        EventLoopGroup acceptorGroup = new NioEventLoopGroup();
        NioEventLoopGroup ioGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(acceptorGroup, ioGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast("http-codec", new HttpServerCodec());
                        ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
                        ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
                        //後面代碼省略
                    }
                });
        
            Channel ch = b.bind(port).sync().channel();
            ch.closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            acceptorGroup.shutdownGracefully();
            ioGroup.shutdownGracefully();
        }
    }

 

說完Reacotr模型的三種形式,那麼Netty是哪一種呢?其實,我還有一種Reactor模型的變種沒說,那就是去掉線程池的第三種形式的變種,這也 是Netty NIO的默認模式。在實現上,Netty中的Boss類充當mainReactor,NioWorker類充當subReactor(默認 NioWorker的個數是Runtime.getRuntime().availableProcessors())。在處理新來的請求 時,NioWorker讀完已收到的數據到ChannelBuffer中,以後觸發ChannelPipeline中的ChannelHandler流。

Netty是事件驅動的,能夠經過ChannelHandler鏈來控制執行流向。由於ChannelHandler鏈的執行過程是在 subReactor中同步的,因此若是業務處理handler耗時長,將嚴重影響可支持的併發數。這種模型適合於像Memcache這樣的應用場景,但 對須要操做數據庫或者和其餘模塊阻塞交互的系統就不是很合適。Netty的可擴展性很是好,而像ChannelHandler線程池化的須要,能夠經過在 ChannelPipeline中添加Netty內置的ChannelHandler實現類–ExecutionHandler實現,對使用者來講只是 添加一行代碼而已。對於ExecutionHandler須要的線程池模型,Netty提供了兩種可 選:1) MemoryAwareThreadPoolExecutor 可控制Executor中待處理任務的上限(超過上限時,後續進來的任務將被阻 塞),並可控制單個Channel待處理任務的上限;2) OrderedMemoryAwareThreadPoolExecutor 是  MemoryAwareThreadPoolExecutor 的子類,它還能夠保證同一Channel中處理的事件流的順序性,這主要是控制事件在異步處 理模式下可能出現的錯誤的事件順序,但它並不保證同一Channel中的事件都在一個線程中執行(一般也不必)。通常來 說,OrderedMemoryAwareThreadPoolExecutor 是個很不錯的選擇,固然,若是有須要,也能夠DIY一個。

相關文章
相關標籤/搜索