Doug Lea是這樣類比的架構
單線程版本Java NIO的支持:
Reactor 代碼以下
public class Reactor implements Runnable { final Selector selector; final ServerSocketChannel serverSocketChannel; public Reactor(int port) throws IOException { selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(port)); serverSocketChannel.configureBlocking(false); SelectionKey key = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); key.attach(new Acceptor()); } @Override public void run() { while (!Thread.interrupted()) { try { selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); for (SelectionKey selectionKey : selectionKeys) { dispatch(selectionKey); } selectionKeys.clear(); } catch (IOException e) { e.printStackTrace(); } } } private void dispatch(SelectionKey selectionKey) { Runnable run = (Runnable) selectionKey.attachment(); if (run != null) { run.run(); } } class Acceptor implements Runnable { @Override public void run() { try { SocketChannel channel = serverSocketChannel.accept(); if (channel != null) { new Handler(selector, channel); } } catch (IOException e) { e.printStackTrace(); } } } public static void main(String[] args) throws IOException { new Thread( new Reactor(1234) ).start(); } }
public class Handler implements Runnable{ private final static int DEFAULT_SIZE = 1024; private final SocketChannel socketChannel; private final SelectionKey seletionKey; private static final int READING = 0; private static final int SENDING = 1; private int state = READING; ByteBuffer inputBuffer = ByteBuffer.allocate(DEFAULT_SIZE); ByteBuffer outputBuffer = ByteBuffer.allocate(DEFAULT_SIZE); public Handler(Selector selector, SocketChannel channel) throws IOException { this.socketChannel = channel; socketChannel.configureBlocking(false); this.seletionKey = socketChannel.register(selector, 0); seletionKey.attach(this); seletionKey.interestOps(SelectionKey.OP_READ); selector.wakeup(); } @Override public void run() { if (state == READING) { read(); } else if (state == SENDING) { write(); } } private void write() { try { socketChannel.write(outputBuffer); } catch (IOException e) { e.printStackTrace(); } while (outIsComplete()) { seletionKey.cancel(); } } private void read() { try { socketChannel.read(inputBuffer); if (inputIsComplete()) { process(); System.out.println("接收到來自客戶端(" + socketChannel.socket().getInetAddress().getHostAddress() + ")的消息:" + new String(inputBuffer.array())); seletionKey.attach(new Sender()); seletionKey.interestOps(SelectionKey.OP_WRITE); seletionKey.selector().wakeup(); } } catch (IOException e) { e.printStackTrace(); } } public boolean inputIsComplete() { return true; } public boolean outIsComplete() { return true; } public void process() { // do something... } class Sender implements Runnable { @Override public void run() { try { socketChannel.write(outputBuffer); } catch (IOException e) { e.printStackTrace(); } if (outIsComplete()) { seletionKey.cancel(); } } } }
public class Handler implements Runnable{ private final static int DEFAULT_SIZE = 1024; private final SocketChannel socketChannel; private final SelectionKey seletionKey; private static final int READING = 0; private static final int SENDING = 1; private int state = READING; ByteBuffer inputBuffer = ByteBuffer.allocate(DEFAULT_SIZE); ByteBuffer outputBuffer = ByteBuffer.allocate(DEFAULT_SIZE); private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime() .availableProcessors()); private static final int PROCESSING = 3; private Selector selector; public Handler(Selector selector, SocketChannel channel) throws IOException { this.selector = selector; this.socketChannel = channel; socketChannel.configureBlocking(false); this.seletionKey = socketChannel.register(selector, 0); seletionKey.attach(this); seletionKey.interestOps(SelectionKey.OP_READ); selector.wakeup(); } @Override public void run() { if (state == READING) { read(); } else if (state == SENDING) { write(); } } private void write() { try { socketChannel.write(outputBuffer); } catch (IOException e) { e.printStackTrace(); } while (outIsComplete()) { seletionKey.cancel(); } } private void read() { try { socketChannel.read(inputBuffer); if (inputIsComplete()) { process(); executorService.execute(new Processer()); } } catch (IOException e) { e.printStackTrace(); } } public boolean inputIsComplete() { return true; } public boolean outIsComplete() { return true; } public void process() { // do something... } class Sender implements Runnable { @Override public void run() { try { socketChannel.write(outputBuffer); } catch (IOException e) { e.printStackTrace(); } if (outIsComplete()) { seletionKey.cancel(); } } } synchronized void processAndHandOff() { process(); // or rebind attachment state = SENDING; seletionKey.interestOps(SelectionKey.OP_WRITE); selector.wakeup(); } class Processer implements Runnable { @Override public void run() { processAndHandOff(); } } }
主從Reactor多線程模型是將Reactor分紅兩部分,mainReactor負責監聽server socket,accept新鏈接,並將創建的socket分派給subReactor。subReactor負責多路分離已鏈接的socket,讀寫網絡數據,對業務處理功能,其扔給worker線程池完成。一般,subReactor個數上可與CPU個數等同:
public class Reactor { // also create threads Selector[] selectors; AtomicInteger next = new AtomicInteger(0); final ServerSocketChannel serverSocketChannel; private static ExecutorService sunReactors = Executors.newFixedThreadPool(Runtime.getRuntime() .availableProcessors()); private static final int PROCESSING = 3; public Reactor(int port) throws IOException { serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(port)); serverSocketChannel.configureBlocking(false); selectors = new Selector[4]; for (int i = 0; i < selectors.length; i++) { Selector selector = selectors[i]; serverSocketChannel.socket().bind(new InetSocketAddress(port)); serverSocketChannel.configureBlocking(false); SelectionKey key = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); key.attach(new Acceptor()); new Thread(()->{ while (!Thread.interrupted()) { try { selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); for (SelectionKey selectionKey : selectionKeys) { dispatch(selectionKey); } selectionKeys.clear(); } catch (IOException e) { e.printStackTrace(); } } }).start(); } } private void dispatch(SelectionKey selectionKey) { Runnable run = (Runnable) selectionKey.attachment(); if (run != null) { run.run(); } } class Acceptor implements Runnable { @Override public void run() { try { SocketChannel channel = serverSocketChannel.accept(); if (channel != null) { sunReactors.execute(new Handler(selectors[next.getAndIncrement() % selectors.length], channel)); } } catch (IOException e) { e.printStackTrace(); } } } public static void main(String[] args) throws IOException { new Reactor(1234); } }
public final class EchoServer { static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); public static void main(String[] args) throws Exception { // Configure the server. EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); final EchoServerHandler serverHandler = new EchoServerHandler(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(serverHandler); } }); ChannelFuture f = b.bind(PORT).sync(); f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ctx.write(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } }