Reactor模式

Reactor模式是什麼?

    反應器設計模式(Reactor pattern),是一種基於事件驅動的設計模式。Reactor框架是ACE各個框架中最基礎的一個框架,其餘框架都或多或少地用到了Reactor框架。 
      在事件驅動的應用中,將一個或多個客戶的服務請求分離(demultiplex)和調度(dispatch)給應用程序。在事件驅動的應用中,同步地、有序地處理同時接收的多個服務請求。 
      reactor模式與外觀模式有點像。不過,觀察者模式與單個事件源關聯,而反應器模式則與多個事件源關聯 。當一個主體發生改變時,全部依屬體都獲得通知。java

    初始事件分發器(Initialization Dispatcher):用於管理Event Handler,定義註冊、移除EventHandler等。Reactor模式的入口調用Synchronous Event Demultiplexer的select方法以阻塞等待事件返回,當阻塞等待返回時,事件發生時通知Initiation Dispatcher,而後Initiation Dispatcher調用event handler處理事件,即回調EventHandler中的handle_event()方法。react

    同步(多路)事件分離器(Synchronous Event Demultiplexer):無限循環等待新事件的到來,一旦發現有新的事件到來,就會通知初始事件分發器去調取特定的事件處理器。編程

    系統處理程序(Handles):操做系統中的句柄,是對資源在操做系統層面上的一種抽象,它能夠是打開的文件、一個鏈接(Socket)、Timer等。因爲Reactor模式通常使用在網絡編程中,於是這裏通常指Socket Handle,即一個網絡鏈接(Connection,在Java NIO中的Channel)。這個Channel註冊到Synchronous Event Demultiplexer中,以監聽Handle中發生的事件,對ServerSocketChannnel能夠是CONNECT事件,對SocketChannel能夠是READ、WRITE、CLOSE事件等。設計模式

    事件處理器(Event Handler): 定義事件處理方法,以供Initialization Dispatcher回調使用。網絡

    Concrete Event Handler :事件處理器的實際實現,並且綁定了一個Handle。由於在實際狀況中,咱們每每不止一種事件處理器,所以這裏將事件處理器接口和實現分開,與C++、Java這些高級語言中的多態相似。多線程

模塊交互

    1)咱們註冊Concrete Event Handler到Initiation Dispatcher中。 
    2)Initiation Dispatcher調用每一個Event Handler的get_handle接口獲取其綁定的Handle。 
    3)Initiation Dispatcher調用handle_events開始事件處理循環。在這裏,Initiation Dispatcher會將步驟2獲取的全部Handle都收集起來,使用Synchronous Event Demultiplexer來等待這些Handle的事件發生。 
   4)當某個(或某幾個)Handle的事件發生時,Synchronous Event Demultiplexer通知Initiation Dispatcher。 
    5)Initiation Dispatcher根據發生事件的Handle找出所對應的Handler。 
    6)Initiation Dispatcher調用Handler的handle_event方法處理事件。併發

單線程版的Reactor模式

    對於客戶端的因此請求,都又一個專門的線程去進行處理,這個線程無線循環去監聽是否又客戶的請求到來,一旦收到客戶端的請求,就將其分發給響應的處理器進行處理。框架

工做線程使用線程池實現

考慮到工做線程的複用,將工做線程設計爲線程池。socket

Reactor優勢

1)響應快,沒必要爲單個同步時間所阻塞,雖然Reactor自己依然是同步的; 
2)編程相對簡單,能夠最大程度的避免複雜的多線程及同步問題,而且避免了多線程/進程的切換開銷; 
3)可擴展性,能夠方便的經過增長Reactor實例個數來充分利用CPU資源; 
4)可複用性,reactor框架自己與具體事件處理邏輯無關,具備很高的複用性;ide


Reactor缺點

1)相比傳統的簡單模型,Reactor增長了必定的複雜性,於是有必定的門檻,而且不易於調試。 
2)Reactor模式須要底層的Synchronous Event Demultiplexer支持,好比Java中的Selector支持,操做系統的select系統調用支持,若是要本身實現Synchronous Event Demultiplexer可能不會有那麼高效。 
3) Reactor模式在IO讀寫數據時仍是在同一個線程中實現的,即便使用多個Reactor機制的狀況下,那些共享一個Reactor的Channel若是出現一個長時間的數據讀寫,會影響這個Reactor中其餘Channel的相應時間,好比在大文件傳輸時,IO操做就會影響其餘Client的相應時間,於是對這種操做,使用傳統的Thread-Per-Connection或許是一個更好的選擇,或則此時使用Proactor模式。

總結

    reactor模式是javaNIO非堵塞技術的實現原理,咱們不只要知道其原理流程,還要知道其代碼實現,固然這個reactor模式不只僅在NIO中實現,並且在redies等其餘地方也出現過,說明這個模式仍是比較實用的,尤爲是在多線程高併發的狀況下使用。

簡單實現源碼

import java.io.IOException;  
import java.net.InetAddress;  
import java.net.InetSocketAddress;  
import java.nio.channels.SelectionKey;  
import java.nio.channels.Selector;  
import java.nio.channels.ServerSocketChannel;  
import java.util.Iterator;  
import java.util.Set;  

/** 
 * 反應器模式 
 * 用於解決多用戶訪問併發問題 
 *  
 * 舉個例子:餐廳服務問題 
 *  
 * 傳統線程池作法:來一個客人(請求)去一個服務員(線程) 
 * 反應器模式作法:當客人點菜的時候,服務員就能夠去招呼其餘客人了,等客人點好了菜,直接招呼一聲「服務員」 
 *  
 */  
public class Reactor implements Runnable{  
    public final Selector selector;  
    public final ServerSocketChannel serverSocketChannel;  

    public Reactor(int port) throws IOException{  
        selector=Selector.open();  
        serverSocketChannel=ServerSocketChannel.open();  
        InetSocketAddress inetSocketAddress=new InetSocketAddress(InetAddress.getLocalHost(),port);  
        serverSocketChannel.socket().bind(inetSocketAddress);  
        serverSocketChannel.configureBlocking(false);  

        //向selector註冊該channel    
        SelectionKey selectionKey=serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);  

        //利用selectionKey的attache功能綁定Acceptor 若是有事情,觸發Acceptor   
        selectionKey.attach(new Acceptor(this));  
    }  

    @Override  
    public void run() {  
        try {  
            while(!Thread.interrupted()){  
                selector.select();  
                Set<SelectionKey> selectionKeys= selector.selectedKeys();  
                Iterator<SelectionKey> it=selectionKeys.iterator();  
                //Selector若是發現channel有OP_ACCEPT或READ事件發生,下列遍歷就會進行。  
                while(it.hasNext()){  
                    //來一個事件 第一次觸發一個accepter線程    
                    //之後觸發SocketReadHandler  
                    SelectionKey selectionKey=it.next();  
                    dispatch(selectionKey);  
                    selectionKeys.clear();  
                }  
            }  
        } catch (IOException e) {  
            e.printStackTrace();  
        }  
    }  

    /** 
     * 運行Acceptor或SocketReadHandler 
     * @param key 
     */  
    void dispatch(SelectionKey key) {  
        Runnable r = (Runnable)(key.attachment());    
        if (r != null){    
            r.run();  
        }    
    }    

}
import java.io.IOException;  
import java.nio.channels.SocketChannel;  

public class Acceptor implements Runnable{  
    private Reactor reactor;  
    public Acceptor(Reactor reactor){  
        this.reactor=reactor;  
    }  
    @Override  
    public void run() {  
        try {  
            SocketChannel socketChannel=reactor.serverSocketChannel.accept();  
            if(socketChannel!=null)//調用Handler來處理channel  
                new SocketReadHandler(reactor.selector, socketChannel);  
        } catch (IOException e) {  
            e.printStackTrace();  
        }  
    }  
}
import java.io.IOException;  
import java.nio.ByteBuffer;  
import java.nio.channels.SelectionKey;  
import java.nio.channels.Selector;  
import java.nio.channels.SocketChannel;  

public class SocketReadHandler implements Runnable{  
    private SocketChannel socketChannel;  
    public SocketReadHandler(Selector selector,SocketChannel socketChannel) throws IOException{  
        this.socketChannel=socketChannel;  
        socketChannel.configureBlocking(false);  

        SelectionKey selectionKey=socketChannel.register(selector, 0);  

        //將SelectionKey綁定爲本Handler 下一步有事件觸發時,將調用本類的run方法。    
        //參看dispatch(SelectionKey key)    
        selectionKey.attach(this);  

        //同時將SelectionKey標記爲可讀,以便讀取。    
        selectionKey.interestOps(SelectionKey.OP_READ);    
        selector.wakeup();  
    }  

    /** 
     * 處理讀取數據 
     */  
    @Override  
    public void run() {  
        ByteBuffer inputBuffer=ByteBuffer.allocate(1024);  
        inputBuffer.clear();  
        try {  
            socketChannel.read(inputBuffer);  
            //激活線程池 處理這些request  
            //requestHandle(new Request(socket,btt));   
        } catch (IOException e) {  
            e.printStackTrace();  
        }  
    }  
}
相關文章
相關標籤/搜索