目錄:html
Reactor(反應堆)和Proactor(前攝器)java
《I/O模型之三:兩種高性能 I/O 設計模式 Reactor 和 Proactor》react
《【轉】第8章 前攝器(Proactor):用於爲異步事件多路分離和分派處理器的對象行爲模式》數據庫
《Java NIO系列教程(八)JDK AIO編程》-- java AIO的proactor模式編程
《Java NIO系列教程(七) selector原理 Epoll版的Selector》--java NIO的Reactor模式後端
《Netty中的三種Reactor(反應堆)》 設計模式
Netty的I/O線程NioEventLoop因爲聚合了多路複用器Selector,能夠同時併發處理成百上千個客戶端SocketChannel。因爲讀寫操做都是非阻塞的,這就能夠充分提高I/O線程的運行效率,避免由頻繁的I/O阻塞致使的線程掛起。另外,因爲Netty採用了異步通訊模式,一個I/O線程能夠併發處理N個客戶端鏈接和讀寫操做,這從根本上解決了傳統同步阻塞I/O一鏈接一線程模型,架構的性能、彈性伸縮能力和可靠性都獲得了極大的提高。安全
常見的Reactor線程模型有三種,分別以下:網絡
Netty是典型的Reactor模型結構,關於Reactor的詳盡闡釋,可參考POSA2,這裏不作概念性的解釋。而應用Java NIO構建Reactor模式,Doug Lea(就是那位讓人無限景仰的大爺)在「Scalable IO in Java」中給了很好的闡述。這裏截取其PPT中經典的圖例說明 Reactor模式的典型實現:多線程
一、Reactor單線程模型
Reactor單線程模型,指的是全部的I/O操做都在同一個NIO線程上面完成,NIO線程的職責以下:
Reactor線程是個多面手,負責多路分離套接字,Accept新鏈接,並分派請求處處理器鏈中。該模型 適用於處理器鏈中業務處理組件能快速完成的場景。不過,這種單線程模型不能充分利用多核資源,因此實際使用的很少。
對於一些小容量應用場景,可使用單線程模型,可是對於高負載、大併發的應用卻不合適,主要緣由以下:
爲了解決這些問題,演進出了Reactor多線程模型,下面咱們一塊兒學習下Reactor多線程模型。
二、Reactor多線程模型
Reactor多線程模型與單線程模型最大區別就是有一組NIO線程處理I/O操做,它的特色以下:
在絕大多數場景下,Reactor多線程模型均可以知足性能需求;可是,在極特殊應用場景中,一個NIO線程負責監聽和處理全部的客戶端鏈接可能會存在性能問題。例如百萬客戶端併發鏈接,或者服務端須要對客戶端的握手信息進行安全認證,認證自己很是損耗性能。這類場景下,單獨一個Acceptor線程可能會存在性能不足問題,爲了解決性能問題,產生了第三種Reactor線程模型--主從Reactor多線程模型。
三、主從Reactor多線程模型
特色是:服務端用於接收客戶端鏈接的再也不是1個單獨的NIO線程,而是一個獨立的NIO線程池。Acceptor接收到客戶端TCP鏈接請求處理完成後(可能包含接入認證等),將新建立的SocketChannel註冊到I/O線程池(sub reactor線程池)的某個I/O線程上,由它負責SocketChannel的讀寫和編解碼工做。
Acceptor線程池只用於客戶端的登陸、握手和安全認證,一旦鏈路創建成功,就將鏈路註冊到後端subReactor線程池的I/O線程上,有I/O線程負責後續的I/O操做。
第三種模型比起第二種模型,是將Reactor分紅兩部分,mainReactor負責監聽server socket,accept新鏈接,並將創建的socket分派給subReactor。subReactor負責多路分離已鏈接的socket,讀寫網 絡數據,對業務處理功能,其扔給worker線程池完成。一般,subReactor個數上可與CPU個數等同。
Netty的線程模型併發固定不變,經過在啓動輔助類中建立不一樣的EventLoopGroup實例並進行適當的參數配置,就能夠支持上述三種Reactor線程模型。
Netty單線程模型服務端代碼示例以下:
/** * Netty單線程模型服務端代碼示例 * @param port */ public void bind(int port) { EventLoopGroup reactorGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(reactorGroup, reactorGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast("http-codec", new HttpServerCodec()); ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536)); ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler()); //後面代碼省略 } }); Channel ch = b.bind(port).sync().channel(); ch.closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { reactorGroup.shutdownGracefully(); } }
Netty多線程模型代碼以下:
/** * Netty多線程模型代碼 * @param port */ public void bind2(int port) { EventLoopGroup acceptorGroup = new NioEventLoopGroup(1); NioEventLoopGroup ioGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(acceptorGroup, ioGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast("http-codec", new HttpServerCodec()); ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536)); ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler()); //後面代碼省略 } }); Channel ch = b.bind(port).sync().channel(); ch.closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { acceptorGroup.shutdownGracefully(); ioGroup.shutdownGracefully(); } }
Netty主從線程模型代碼以下:
/** * Netty主從線程模型代碼 * @param port */ public void bind3(int port) { EventLoopGroup acceptorGroup = new NioEventLoopGroup(); NioEventLoopGroup ioGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(acceptorGroup, ioGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast("http-codec", new HttpServerCodec()); ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536)); ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler()); //後面代碼省略 } }); Channel ch = b.bind(port).sync().channel(); ch.closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { acceptorGroup.shutdownGracefully(); ioGroup.shutdownGracefully(); } }
說完Reacotr模型的三種形式,那麼Netty是哪一種呢?其實,我還有一種Reactor模型的變種沒說,那就是去掉線程池的第三種形式的變種,這也 是Netty NIO的默認模式。在實現上,Netty中的Boss類充當mainReactor,NioWorker類充當subReactor(默認 NioWorker的個數是Runtime.getRuntime().availableProcessors())。在處理新來的請求 時,NioWorker讀完已收到的數據到ChannelBuffer中,以後觸發ChannelPipeline中的ChannelHandler流。
Netty是事件驅動的,能夠經過ChannelHandler鏈來控制執行流向。由於ChannelHandler鏈的執行過程是在 subReactor中同步的,因此若是業務處理handler耗時長,將嚴重影響可支持的併發數。這種模型適合於像Memcache這樣的應用場景,但 對須要操做數據庫或者和其餘模塊阻塞交互的系統就不是很合適。Netty的可擴展性很是好,而像ChannelHandler線程池化的須要,能夠經過在 ChannelPipeline中添加Netty內置的ChannelHandler實現類–ExecutionHandler實現,對使用者來講只是 添加一行代碼而已。對於ExecutionHandler須要的線程池模型,Netty提供了兩種可 選:1) MemoryAwareThreadPoolExecutor 可控制Executor中待處理任務的上限(超過上限時,後續進來的任務將被阻 塞),並可控制單個Channel待處理任務的上限;2) OrderedMemoryAwareThreadPoolExecutor 是 MemoryAwareThreadPoolExecutor 的子類,它還能夠保證同一Channel中處理的事件流的順序性,這主要是控制事件在異步處 理模式下可能出現的錯誤的事件順序,但它並不保證同一Channel中的事件都在一個線程中執行(一般也不必)。通常來 說,OrderedMemoryAwareThreadPoolExecutor 是個很不錯的選擇,固然,若是有須要,也能夠DIY一個。