反應器設計模式(Reactor pattern),是一種基於事件驅動的設計模式。Reactor框架是ACE各個框架中最基礎的一個框架,其餘框架都或多或少地用到了Reactor框架。
在事件驅動的應用中,將一個或多個客戶的服務請求分離(demultiplex)和調度(dispatch)給應用程序。在事件驅動的應用中,同步地、有序地處理同時接收的多個服務請求。
reactor模式與外觀模式有點像。不過,觀察者模式與單個事件源關聯,而反應器模式則與多個事件源關聯 。當一個主體發生改變時,全部依屬體都獲得通知。java
初始事件分發器(Initialization Dispatcher):用於管理Event Handler,定義註冊、移除EventHandler等。Reactor模式的入口調用Synchronous Event Demultiplexer的select方法以阻塞等待事件返回,當阻塞等待返回時,事件發生時通知Initiation Dispatcher,而後Initiation Dispatcher調用event handler處理事件,即回調EventHandler中的handle_event()方法。react
同步(多路)事件分離器(Synchronous Event Demultiplexer):無限循環等待新事件的到來,一旦發現有新的事件到來,就會通知初始事件分發器去調取特定的事件處理器。編程
系統處理程序(Handles):操做系統中的句柄,是對資源在操做系統層面上的一種抽象,它能夠是打開的文件、一個鏈接(Socket)、Timer等。因爲Reactor模式通常使用在網絡編程中,於是這裏通常指Socket Handle,即一個網絡鏈接(Connection,在Java NIO中的Channel)。這個Channel註冊到Synchronous Event Demultiplexer中,以監聽Handle中發生的事件,對ServerSocketChannnel能夠是CONNECT事件,對SocketChannel能夠是READ、WRITE、CLOSE事件等。設計模式
事件處理器(Event Handler): 定義事件處理方法,以供Initialization Dispatcher回調使用。網絡
Concrete Event Handler :事件處理器的實際實現,並且綁定了一個Handle。由於在實際狀況中,咱們每每不止一種事件處理器,所以這裏將事件處理器接口和實現分開,與C++、Java這些高級語言中的多態相似。多線程
1)咱們註冊Concrete Event Handler到Initiation Dispatcher中。
2)Initiation Dispatcher調用每一個Event Handler的get_handle接口獲取其綁定的Handle。
3)Initiation Dispatcher調用handle_events開始事件處理循環。在這裏,Initiation Dispatcher會將步驟2獲取的全部Handle都收集起來,使用Synchronous Event Demultiplexer來等待這些Handle的事件發生。
4)當某個(或某幾個)Handle的事件發生時,Synchronous Event Demultiplexer通知Initiation Dispatcher。
5)Initiation Dispatcher根據發生事件的Handle找出所對應的Handler。
6)Initiation Dispatcher調用Handler的handle_event方法處理事件。併發
對於客戶端的因此請求,都又一個專門的線程去進行處理,這個線程無線循環去監聽是否又客戶的請求到來,一旦收到客戶端的請求,就將其分發給響應的處理器進行處理。框架
考慮到工做線程的複用,將工做線程設計爲線程池。socket
1)響應快,沒必要爲單個同步時間所阻塞,雖然Reactor自己依然是同步的;
2)編程相對簡單,能夠最大程度的避免複雜的多線程及同步問題,而且避免了多線程/進程的切換開銷;
3)可擴展性,能夠方便的經過增長Reactor實例個數來充分利用CPU資源;
4)可複用性,reactor框架自己與具體事件處理邏輯無關,具備很高的複用性;ide
1)相比傳統的簡單模型,Reactor增長了必定的複雜性,於是有必定的門檻,而且不易於調試。
2)Reactor模式須要底層的Synchronous Event Demultiplexer支持,好比Java中的Selector支持,操做系統的select系統調用支持,若是要本身實現Synchronous Event Demultiplexer可能不會有那麼高效。
3) Reactor模式在IO讀寫數據時仍是在同一個線程中實現的,即便使用多個Reactor機制的狀況下,那些共享一個Reactor的Channel若是出現一個長時間的數據讀寫,會影響這個Reactor中其餘Channel的相應時間,好比在大文件傳輸時,IO操做就會影響其餘Client的相應時間,於是對這種操做,使用傳統的Thread-Per-Connection或許是一個更好的選擇,或則此時使用Proactor模式。
reactor模式是javaNIO非堵塞技術的實現原理,咱們不只要知道其原理流程,還要知道其代碼實現,固然這個reactor模式不只僅在NIO中實現,並且在redies等其餘地方也出現過,說明這個模式仍是比較實用的,尤爲是在多線程高併發的狀況下使用。
import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.util.Iterator; import java.util.Set; /** * 反應器模式 * 用於解決多用戶訪問併發問題 * * 舉個例子:餐廳服務問題 * * 傳統線程池作法:來一個客人(請求)去一個服務員(線程) * 反應器模式作法:當客人點菜的時候,服務員就能夠去招呼其餘客人了,等客人點好了菜,直接招呼一聲「服務員」 * */ public class Reactor implements Runnable{ public final Selector selector; public final ServerSocketChannel serverSocketChannel; public Reactor(int port) throws IOException{ selector=Selector.open(); serverSocketChannel=ServerSocketChannel.open(); InetSocketAddress inetSocketAddress=new InetSocketAddress(InetAddress.getLocalHost(),port); serverSocketChannel.socket().bind(inetSocketAddress); serverSocketChannel.configureBlocking(false); //向selector註冊該channel SelectionKey selectionKey=serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); //利用selectionKey的attache功能綁定Acceptor 若是有事情,觸發Acceptor selectionKey.attach(new Acceptor(this)); } @Override public void run() { try { while(!Thread.interrupted()){ selector.select(); Set<SelectionKey> selectionKeys= selector.selectedKeys(); Iterator<SelectionKey> it=selectionKeys.iterator(); //Selector若是發現channel有OP_ACCEPT或READ事件發生,下列遍歷就會進行。 while(it.hasNext()){ //來一個事件 第一次觸發一個accepter線程 //之後觸發SocketReadHandler SelectionKey selectionKey=it.next(); dispatch(selectionKey); selectionKeys.clear(); } } } catch (IOException e) { e.printStackTrace(); } } /** * 運行Acceptor或SocketReadHandler * @param key */ void dispatch(SelectionKey key) { Runnable r = (Runnable)(key.attachment()); if (r != null){ r.run(); } } }
import java.io.IOException; import java.nio.channels.SocketChannel; public class Acceptor implements Runnable{ private Reactor reactor; public Acceptor(Reactor reactor){ this.reactor=reactor; } @Override public void run() { try { SocketChannel socketChannel=reactor.serverSocketChannel.accept(); if(socketChannel!=null)//調用Handler來處理channel new SocketReadHandler(reactor.selector, socketChannel); } catch (IOException e) { e.printStackTrace(); } } }
import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; public class SocketReadHandler implements Runnable{ private SocketChannel socketChannel; public SocketReadHandler(Selector selector,SocketChannel socketChannel) throws IOException{ this.socketChannel=socketChannel; socketChannel.configureBlocking(false); SelectionKey selectionKey=socketChannel.register(selector, 0); //將SelectionKey綁定爲本Handler 下一步有事件觸發時,將調用本類的run方法。 //參看dispatch(SelectionKey key) selectionKey.attach(this); //同時將SelectionKey標記爲可讀,以便讀取。 selectionKey.interestOps(SelectionKey.OP_READ); selector.wakeup(); } /** * 處理讀取數據 */ @Override public void run() { ByteBuffer inputBuffer=ByteBuffer.allocate(1024); inputBuffer.clear(); try { socketChannel.read(inputBuffer); //激活線程池 處理這些request //requestHandle(new Request(socket,btt)); } catch (IOException e) { e.printStackTrace(); } } }