在前兩篇《快速理解Linux網絡I_O》、《java的I_O模型-BIO&NIO&AIO》兩邊中介紹了Linux下的I/O模型和java中的I/O模型,今天咱們介紹Reactor模型,並探究Netty的實現html
在互聯網時代,咱們使用的軟件基本上全是C/S架構,C/S架構的軟件一個明顯的好處就是:只要有網絡,你能夠在任何地方幹同一件事。C/S架構能夠抽象爲以下模型:java
那服務器如何能快速的處理用戶的請求呢?在我看來高性能服務器至少要知足以下幾個需求:react
而知足如上需求的一個基礎就是高性能的IO!編程
什麼是Reactor模式?數組
兩種I/O多路複用模式:Reactor和Proactor,兩個與事件分離器有關的模式是Reactor和Proactor。Reactor模式採用同步IO,而Proactor採用異步IO。瀏覽器
在Reactor中,事件分離器負責等待文件描述符或socket爲讀寫操做準備就緒,而後將就緒事件傳遞給對應的處理器,最後由處理器負責完成實際的讀寫工做。服務器
在Proactor模式中,處理器--或者兼任處理器的事件分離器,只負責發起異步讀寫操做。IO操做自己由操做系統來完成。傳遞給操做系統的參數須要包括用戶定義的數據緩衝區地址和數據大小,操做系統才能從中獲得寫出操做所需數據,或寫入從socket讀到的數據。事件分離器捕獲IO操做完成事件,而後將事件傳遞給對應處理器。網絡
說人話的方式理解:多線程
Doug Lea是這樣類比的架構
單線程版本Java NIO的支持:
Channels:與支持非阻塞讀取的文件,套接字等的鏈接
Buffers:相似於數組的對象,可由Channels直接讀取或寫入
Selectors:通知一組通道中哪個有IO事件
SelectionKeys:維護IO事件狀態和綁定
Reactor 代碼以下
public class Reactor implements Runnable { final Selector selector; final ServerSocketChannel serverSocketChannel; public Reactor(int port) throws IOException { selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(port)); serverSocketChannel.configureBlocking(false); SelectionKey key = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); key.attach(new Acceptor()); } @Override public void run() { while (!Thread.interrupted()) { try { selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); for (SelectionKey selectionKey : selectionKeys) { dispatch(selectionKey); } selectionKeys.clear(); } catch (IOException e) { e.printStackTrace(); } } } private void dispatch(SelectionKey selectionKey) { Runnable run = (Runnable) selectionKey.attachment(); if (run != null) { run.run(); } } class Acceptor implements Runnable { @Override public void run() { try { SocketChannel channel = serverSocketChannel.accept(); if (channel != null) { new Handler(selector, channel); } } catch (IOException e) { e.printStackTrace(); } } } public static void main(String[] args) throws IOException { new Thread( new Reactor(1234) ).start(); } }
public class Handler implements Runnable{ private final static int DEFAULT_SIZE = 1024; private final SocketChannel socketChannel; private final SelectionKey seletionKey; private static final int READING = 0; private static final int SENDING = 1; private int state = READING; ByteBuffer inputBuffer = ByteBuffer.allocate(DEFAULT_SIZE); ByteBuffer outputBuffer = ByteBuffer.allocate(DEFAULT_SIZE); public Handler(Selector selector, SocketChannel channel) throws IOException { this.socketChannel = channel; socketChannel.configureBlocking(false); this.seletionKey = socketChannel.register(selector, 0); seletionKey.attach(this); seletionKey.interestOps(SelectionKey.OP_READ); selector.wakeup(); } @Override public void run() { if (state == READING) { read(); } else if (state == SENDING) { write(); } } private void write() { try { socketChannel.write(outputBuffer); } catch (IOException e) { e.printStackTrace(); } while (outIsComplete()) { seletionKey.cancel(); } } private void read() { try { socketChannel.read(inputBuffer); if (inputIsComplete()) { process(); System.out.println("接收到來自客戶端(" + socketChannel.socket().getInetAddress().getHostAddress() + ")的消息:" + new String(inputBuffer.array())); seletionKey.attach(new Sender()); seletionKey.interestOps(SelectionKey.OP_WRITE); seletionKey.selector().wakeup(); } } catch (IOException e) { e.printStackTrace(); } } public boolean inputIsComplete() { return true; } public boolean outIsComplete() { return true; } public void process() { // do something... } class Sender implements Runnable { @Override public void run() { try { socketChannel.write(outputBuffer); } catch (IOException e) { e.printStackTrace(); } if (outIsComplete()) { seletionKey.cancel(); } } } }
這個模型和上面的NIO流程很相似,只是將消息相關處理獨立到了Handler中去了!雖說到NIO一個線程就能夠支持全部的IO處理。可是瓶頸也是顯而易見的!若是這個客戶端屢次進行請求,若是在Handler中的處理速度較慢,那麼後續的客戶端請求都會被積壓,致使響應變慢!因此引入了Reactor多線程模型!
Reactor多線程模型就是將Handler中的IO操做和非IO操做分開,操做IO的線程稱爲IO線程,非IO操做的線程稱爲工做線程!這樣的話,客戶端的請求會直接被丟到線程池中,客戶端發送請求就不會堵塞!
Reactor保持不變,僅須要改動Handler代碼:
public class Handler implements Runnable{ private final static int DEFAULT_SIZE = 1024; private final SocketChannel socketChannel; private final SelectionKey seletionKey; private static final int READING = 0; private static final int SENDING = 1; private int state = READING; ByteBuffer inputBuffer = ByteBuffer.allocate(DEFAULT_SIZE); ByteBuffer outputBuffer = ByteBuffer.allocate(DEFAULT_SIZE); private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime() .availableProcessors()); private static final int PROCESSING = 3; private Selector selector; public Handler(Selector selector, SocketChannel channel) throws IOException { this.selector = selector; this.socketChannel = channel; socketChannel.configureBlocking(false); this.seletionKey = socketChannel.register(selector, 0); seletionKey.attach(this); seletionKey.interestOps(SelectionKey.OP_READ); selector.wakeup(); } @Override public void run() { if (state == READING) { read(); } else if (state == SENDING) { write(); } } private void write() { try { socketChannel.write(outputBuffer); } catch (IOException e) { e.printStackTrace(); } while (outIsComplete()) { seletionKey.cancel(); } } private void read() { try { socketChannel.read(inputBuffer); if (inputIsComplete()) { process(); executorService.execute(new Processer()); } } catch (IOException e) { e.printStackTrace(); } } public boolean inputIsComplete() { return true; } public boolean outIsComplete() { return true; } public void process() { // do something... } class Sender implements Runnable { @Override public void run() { try { socketChannel.write(outputBuffer); } catch (IOException e) { e.printStackTrace(); } if (outIsComplete()) { seletionKey.cancel(); } } } synchronized void processAndHandOff() { process(); // or rebind attachment state = SENDING; seletionKey.interestOps(SelectionKey.OP_WRITE); selector.wakeup(); } class Processer implements Runnable { @Override public void run() { processAndHandOff(); } } }
主從Reactor多線程模型是將Reactor分紅兩部分,mainReactor負責監聽server socket,accept新鏈接,並將創建的socket分派給subReactor。subReactor負責多路分離已鏈接的socket,讀寫網絡數據,對業務處理功能,其扔給worker線程池完成。一般,subReactor個數上可與CPU個數等同:
Handler保持不變,僅須要改動Reactor代碼:
public class Reactor { // also create threads Selector[] selectors; AtomicInteger next = new AtomicInteger(0); final ServerSocketChannel serverSocketChannel; private static ExecutorService sunReactors = Executors.newFixedThreadPool(Runtime.getRuntime() .availableProcessors()); private static final int PROCESSING = 3; public Reactor(int port) throws IOException { serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(port)); serverSocketChannel.configureBlocking(false); selectors = new Selector[4]; for (int i = 0; i < selectors.length; i++) { Selector selector = selectors[i]; serverSocketChannel.socket().bind(new InetSocketAddress(port)); serverSocketChannel.configureBlocking(false); SelectionKey key = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); key.attach(new Acceptor()); new Thread(()->{ while (!Thread.interrupted()) { try { selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); for (SelectionKey selectionKey : selectionKeys) { dispatch(selectionKey); } selectionKeys.clear(); } catch (IOException e) { e.printStackTrace(); } } }).start(); } } private void dispatch(SelectionKey selectionKey) { Runnable run = (Runnable) selectionKey.attachment(); if (run != null) { run.run(); } } class Acceptor implements Runnable { @Override public void run() { try { SocketChannel channel = serverSocketChannel.accept(); if (channel != null) { sunReactors.execute(new Handler(selectors[next.getAndIncrement() % selectors.length], channel)); } } catch (IOException e) { e.printStackTrace(); } } } public static void main(String[] args) throws IOException { new Reactor(1234); } }
以上是三種不一樣的設計思路,接下來看一下Netty這個一個高性能NIO框架,其是如何實現Reactor模型的!
public final class EchoServer { static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); public static void main(String[] args) throws Exception { // Configure the server. EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); final EchoServerHandler serverHandler = new EchoServerHandler(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(serverHandler); } }); ChannelFuture f = b.bind(PORT).sync(); f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ctx.write(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } }
咱們從Netty服務器代碼來看,與Reactor模型進行對應!