Reactor模式是一種設計模式,它是基於事件驅動的,能夠併發的處理多個服務請求,當請求抵達後,依據多路複用策略,同步的派發這些請求至相關的請求處理程序。vue
在早先的論文An Object Behavioral Pattern for
Demultiplexing and Dispatching Handles for Synchronous Events中Reactor模式主要有五大角色組成,分別以下:java
Handle:操做系統提供的一種資源,用於表示一個個的事件,在網絡編程中能夠是一個鏈接事件,一個讀取事件,一個寫入事件,Handle是事件產生的發源地
Synchronous Event Demultiplexer:本質上是一個系統調用,用於等待事件的發生,調用方在調用它的時候會被阻塞,一直阻塞到同步事件分離器上有事件產生爲止
Initiation Dispatcher:定義了一些用於控制事件的調度方式的規範,提供對事件管理。它自己是整個事件處理器的核心所在,Initiation Dispatcher會經過Synchronous Event Demultiplexer來等待事件的發生。一旦事件發生,Initiation Dispatcher首先會分離出每個事件,而後調用事件處理器,最後調用相關的回調方法來處理這些事件
Event Handler:定義事件處理方法以供InitiationDispatcher回調使用
Concrete Event Handler:是事件處理器的實現。它自己實現了事件處理器所提供的各類回調方法,從而實現了特定於業務的邏輯。它本質上就是咱們所編寫的一個個的處理器實現。react
在Java的NIO中,對Reactor模式有無縫的支持,即便用Selector類封裝了操做系統提供的Synchronous Event Demultiplexer功能。Doug Lea(Java concurrent包的做者)在Scalable IO in Java中對此有很是詳細的描述。概況來講其主要流程以下:git
Doug Lea 在Scalable IO in Java中分別描述了單線程的Reactor,多線程模式的Reactor以及多Reactor線程模式。數據庫
單線程的Reactor,主要依賴Java NIO中的Channel,Buffer,Selector,SelectionKey。在單線程Reactor模式中,不只I/O操做在該Reactor線程上,連非I/O的業務操做也在該線程上進行處理了,這可能會大大延遲I/O請求的響應npm
在多線程Reactor中添加了一個工做線程池,將非I/O操做從Reactor線程中移出轉交給工做者線程池來執行。這樣可以提升Reactor線程的I/O響應,不至於由於一些耗時的業務邏輯而延遲對後面I/O請求的處理,可是全部的I/O操做依舊由一個Reactor來完成,包括I/O的accept()、read()、write()以及connect()操做編程
多Reactor線程模式將「接受客戶端的鏈接請求」和「與該客戶端的通訊」分在了兩個Reactor線程來完成。mainReactor完成接收客戶端鏈接請求的操做,它不負責與客戶端的通訊,而是將創建好的鏈接轉交給subReactor線程來完成與客戶端的通訊,這樣一來就不會由於read()數據量太大而致使後面的客戶端鏈接請求得不到即時處理的狀況。而且多Reactor線程模式在海量的客戶端併發請求的狀況下,還能夠經過實現subReactor線程池來將海量的鏈接分發給多個subReactor線程,在多核的操做系統中這能大大提高應用的負載和吞吐量設計模式
代碼示例:服務器
// NIO selector 多路複用reactor線程模型 public class NIOReactor { // 處理業務操做的線程池 private static ExecutorService workPool = Executors.newCachedThreadPool(); // 封裝了selector.select()等事件輪詢的代碼 abstract class ReactorThread extends Thread { Selector selector; LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(); volatile boolean running = false; private ReactorThread() throws IOException { selector = Selector.open(); } // Selector監聽到有事件後,調用這個方法 public abstract void handler(SelectableChannel channel) throws Exception; @Override public void run() { // 輪詢Selector事件 while (running) { try { // 執行隊列中的任務 Runnable task; while ((task = taskQueue.poll()) != null) { task.run(); } selector.select(1000); // 獲取查詢結果 Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 遍歷查詢結果 Iterator<SelectionKey> keyIterator = selectionKeys.iterator(); while (keyIterator.hasNext()) { // 被封裝的查詢結果 SelectionKey selectionKey = keyIterator.next(); keyIterator.remove(); int readyOps = selectionKey.readyOps(); // 關注 Read 和 Accept兩個事件 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { try { SelectableChannel channel = (SelectableChannel) selectionKey.attachment(); channel.configureBlocking(false); handler(channel); // 若是關閉了,就取消這個KEY的訂閱 if (!channel.isOpen()) { selectionKey.cancel(); } } catch (Exception e) { // 若是有異常,就取消這個KEY的訂閱 selectionKey.cancel(); e.printStackTrace(); } } } } catch (Exception e) { e.printStackTrace(); } } } private SelectionKey register(SelectableChannel channel) throws Exception { // 爲何register要以任務提交的形式,讓reactor線程去處理? // 由於線程在執行channel註冊到selector的過程當中,會和調用selector.select()方法的線程爭用同一把鎖 // 而select()方法實在eventLoop中經過while循環調用的,爭搶的可能性很高, // 爲了讓register能更快的執行,就放到同一個線程來處理 FutureTask<SelectionKey> futureTask = new FutureTask<>(() -> channel.register(selector, 0, channel)); taskQueue.add(futureTask); return futureTask.get(); } private void doStart() { if (!running) { running = true; start(); } } } private ServerSocketChannel serverSocketChannel; // 一、建立多個線程 - accept處理reactor線程 (accept線程) private ReactorThread[] mainReactorThreads = new ReactorThread[1]; // 二、建立多個線程 - io處理reactor線程 (I/O線程) private ReactorThread[] subReactorThreads = new ReactorThread[8]; // 初始化線程組 private void newGroup() throws IOException { // 建立mainReactor線程, 只負責處理serverSocketChannel for (int i = 0; i < mainReactorThreads.length; i++) { mainReactorThreads[i] = new ReactorThread() { AtomicInteger incr = new AtomicInteger(0); @Override public void handler(SelectableChannel channel) throws Exception { // 只作請求分發,不作具體的數據讀取 ServerSocketChannel ch = (ServerSocketChannel) channel; SocketChannel socketChannel = ch.accept(); socketChannel.configureBlocking(false); // 收到鏈接創建的通知以後,分發給I/O線程繼續去讀取數據 int index = incr.getAndIncrement() % subReactorThreads.length; ReactorThread workEventLoop = subReactorThreads[index]; workEventLoop.doStart(); SelectionKey selectionKey = workEventLoop.register(socketChannel); selectionKey.interestOps(SelectionKey.OP_READ); System.out.println( Thread.currentThread().getName() + "收到新鏈接 : " + socketChannel.getRemoteAddress()); } }; } // 建立IO線程,負責處理客戶端鏈接之後socketChannel的IO讀寫 for (int i = 0; i < subReactorThreads.length; i++) { subReactorThreads[i] = new ReactorThread() { @Override public void handler(SelectableChannel channel) throws Exception { // work線程只負責處理IO處理,不處理accept事件 SocketChannel ch = (SocketChannel) channel; ByteBuffer requestBuffer = ByteBuffer.allocate(1024); while (ch.isOpen() && ch.read(requestBuffer) != -1) { // 長鏈接狀況下,須要手動判斷數據有沒有讀取結束 (此處作一個簡單的判斷: 超過0字節就認爲請求結束了) if (requestBuffer.position() > 0) break; } if (requestBuffer.position() == 0) return; // 若是沒數據了, 則不繼續後面的處理 requestBuffer.flip(); byte[] content = new byte[requestBuffer.limit()]; requestBuffer.get(content); System.out.println(new String(content)); System.out.println( Thread.currentThread().getName() + "收到數據,來自:" + ch.getRemoteAddress()); // TODO 業務操做 數據庫、接口... workPool.submit(() -> {}); // 響應結果 200 String response = "HTTP/1.1 200 OK\r\n" + "Content-Length: 11\r\n\r\n" + "Hello World"; ByteBuffer buffer = ByteBuffer.wrap(response.getBytes()); while (buffer.hasRemaining()) { ch.write(buffer); } } }; } } // 始化channel,而且綁定一個eventLoop線程 private void initAndRegister() throws Exception { // 一、 建立ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); // 二、 將serverSocketChannel註冊到selector int index = new Random().nextInt(mainReactorThreads.length); mainReactorThreads[index].doStart(); SelectionKey selectionKey = mainReactorThreads[index].register(serverSocketChannel); selectionKey.interestOps(SelectionKey.OP_ACCEPT); } // 綁定端口 private void bind() throws IOException { // 一、 正式綁定端口,對外服務 serverSocketChannel.bind(new InetSocketAddress(8080)); System.out.println("啓動完成,端口8080"); } public static void main(String[] args) throws Exception { NIOReactor nioReactor = new NIOReactor(); // 一、 建立main和sub兩組線程 nioReactor.newGroup(); // 二、 建立serverSocketChannel,註冊到mainReactor線程上的selector上 nioReactor.initAndRegister(); // 三、 爲serverSocketChannel綁定端口 nioReactor.bind(); } }