Reactor模式

什麼是Reactor模式

Reactor模式是一種設計模式,它是基於事件驅動的,能夠併發的處理多個服務請求,當請求抵達後,依據多路複用策略,同步的派發這些請求至相關的請求處理程序。vue

Reactor模式角色構成

在早先的論文An Object Behavioral Pattern for
Demultiplexing and Dispatching Handles for Synchronous Events
中Reactor模式主要有五大角色組成,分別以下:java

Handle:操做系統提供的一種資源,用於表示一個個的事件,在網絡編程中能夠是一個鏈接事件,一個讀取事件,一個寫入事件,Handle是事件產生的發源地
Synchronous Event Demultiplexer:本質上是一個系統調用,用於等待事件的發生,調用方在調用它的時候會被阻塞,一直阻塞到同步事件分離器上有事件產生爲止
Initiation Dispatcher:定義了一些用於控制事件的調度方式的規範,提供對事件管理。它自己是整個事件處理器的核心所在,Initiation Dispatcher會經過Synchronous Event Demultiplexer來等待事件的發生。一旦事件發生,Initiation Dispatcher首先會分離出每個事件,而後調用事件處理器,最後調用相關的回調方法來處理這些事件
Event Handler:定義事件處理方法以供InitiationDispatcher回調使用
Concrete Event Handler:是事件處理器的實現。它自己實現了事件處理器所提供的各類回調方法,從而實現了特定於業務的邏輯。它本質上就是咱們所編寫的一個個的處理器實現。react

img

Reactor模式實現流程

  1. 初始化 Initiation Dispatcher,而後將若干個Concrete Event Handler註冊到 Initiation Dispatcher中,應用會標識出該事件處理器但願Initiation Dispatcher在某些事件發生時向其發出通知
  2. Initiation Dispatcher 會要求每一個事件處理器向其傳遞內部的Handle,該Handle向操做系統標識了事件處理器
  3. 當全部的Concrete Event Handler都註冊完畢後,就會啓動 Initiation Dispatcher的事件循環,使用Synchronous Event Demultiplexer同步阻塞的等待事件的發生
  4. 當與某個事件源對應的Handle變爲ready狀態時,Synchronous Event Demultiplexer就會通知 Initiation Dispatcher
  5. Initiation Dispatcher會觸發事件處理器的回調方法響應這個事件

img

Java NIO對Reactor的實現

在Java的NIO中,對Reactor模式有無縫的支持,即便用Selector類封裝了操做系統提供的Synchronous Event Demultiplexer功能。Doug Lea(Java concurrent包的做者)在Scalable IO in Java中對此有很是詳細的描述。概況來講其主要流程以下:git

  1. 服務器端的Reactor線程對象會啓動事件循環,並使用Selector來實現IO的多路複用
  2. 註冊Acceptor事件處理器到Reactor中,Acceptor事件處理器所關注的事件是ACCEPT事件,這樣Reactor會監聽客戶端向服務器端發起的鏈接請求事件
  3. 客戶端向服務器端發起鏈接請求,Reactor監聽到了該ACCEPT事件的發生並將該ACCEPT事件派發給相應的Acceptor處理器來進行處理。Acceptor處理器經過accept()方法獲得與這個客戶端對應的鏈接(SocketChannel),而後將該鏈接所關注的READ/WRITE事件以及對應的READ/WRITE事件處理器註冊到Reactor中,這樣一來Reactor就會監聽該鏈接的READ/WRITE事件了。
  4. 當Reactor監聽到有讀或者寫事件發生時,將相關的事件派發給對應的處理器進行處理
  5. 每當處理完全部就緒的感興趣的I/O事件後,Reactor線程會再次執行select()阻塞等待新的事件就緒並將其分派給對應處理器進行處理

Doug Lea 在Scalable IO in Java中分別描述了單線程的Reactor,多線程模式的Reactor以及多Reactor線程模式。數據庫

單線程的Reactor,主要依賴Java NIO中的Channel,Buffer,Selector,SelectionKey。在單線程Reactor模式中,不只I/O操做在該Reactor線程上,連非I/O的業務操做也在該線程上進行處理了,這可能會大大延遲I/O請求的響應npm

img

在多線程Reactor中添加了一個工做線程池,將非I/O操做從Reactor線程中移出轉交給工做者線程池來執行。這樣可以提升Reactor線程的I/O響應,不至於由於一些耗時的業務邏輯而延遲對後面I/O請求的處理,可是全部的I/O操做依舊由一個Reactor來完成,包括I/O的accept()、read()、write()以及connect()操做編程

img

多Reactor線程模式將「接受客戶端的鏈接請求」和「與該客戶端的通訊」分在了兩個Reactor線程來完成。mainReactor完成接收客戶端鏈接請求的操做,它不負責與客戶端的通訊,而是將創建好的鏈接轉交給subReactor線程來完成與客戶端的通訊,這樣一來就不會由於read()數據量太大而致使後面的客戶端鏈接請求得不到即時處理的狀況。而且多Reactor線程模式在海量的客戶端併發請求的狀況下,還能夠經過實現subReactor線程池來將海量的鏈接分發給多個subReactor線程,在多核的操做系統中這能大大提高應用的負載和吞吐量設計模式

img

代碼示例:服務器

// NIO selector 多路複用reactor線程模型
public class NIOReactor {

  // 處理業務操做的線程池
  private static ExecutorService workPool = Executors.newCachedThreadPool();

  // 封裝了selector.select()等事件輪詢的代碼
  abstract class ReactorThread extends Thread {

    Selector selector;
    LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();

    volatile boolean running = false;

    private ReactorThread() throws IOException {
      selector = Selector.open();
    }

    // Selector監聽到有事件後,調用這個方法
    public abstract void handler(SelectableChannel channel) throws Exception;

    @Override
    public void run() {
      // 輪詢Selector事件
      while (running) {
        try {
          // 執行隊列中的任務
          Runnable task;
          while ((task = taskQueue.poll()) != null) {
            task.run();
          }
          selector.select(1000);
          // 獲取查詢結果
          Set<SelectionKey> selectionKeys = selector.selectedKeys();
          // 遍歷查詢結果
          Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
          while (keyIterator.hasNext()) {
            // 被封裝的查詢結果
            SelectionKey selectionKey = keyIterator.next();
            keyIterator.remove();
            int readyOps = selectionKey.readyOps();
            // 關注 Read 和 Accept兩個事件
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0
                || readyOps == 0) {
              try {
                SelectableChannel channel = (SelectableChannel) selectionKey.attachment();
                channel.configureBlocking(false);
                handler(channel);
                // 若是關閉了,就取消這個KEY的訂閱
                if (!channel.isOpen()) {
                  selectionKey.cancel();
                }

              } catch (Exception e) {
                // 若是有異常,就取消這個KEY的訂閱
                selectionKey.cancel();
                e.printStackTrace();
              }
            }
          }

        } catch (Exception e) {
          e.printStackTrace();
        }
      }
    }

    private SelectionKey register(SelectableChannel channel) throws Exception {
      // 爲何register要以任務提交的形式,讓reactor線程去處理?
      // 由於線程在執行channel註冊到selector的過程當中,會和調用selector.select()方法的線程爭用同一把鎖
      // 而select()方法實在eventLoop中經過while循環調用的,爭搶的可能性很高,
      // 爲了讓register能更快的執行,就放到同一個線程來處理
      FutureTask<SelectionKey> futureTask =
          new FutureTask<>(() -> channel.register(selector, 0, channel));
      taskQueue.add(futureTask);
      return futureTask.get();
    }

    private void doStart() {
      if (!running) {
        running = true;
        start();
      }
    }
  }

  private ServerSocketChannel serverSocketChannel;

  // 一、建立多個線程 - accept處理reactor線程 (accept線程)
  private ReactorThread[] mainReactorThreads = new ReactorThread[1];

  // 二、建立多個線程 - io處理reactor線程  (I/O線程)
  private ReactorThread[] subReactorThreads = new ReactorThread[8];

  // 初始化線程組
  private void newGroup() throws IOException {
    // 建立mainReactor線程, 只負責處理serverSocketChannel
    for (int i = 0; i < mainReactorThreads.length; i++) {
      mainReactorThreads[i] =
          new ReactorThread() {
            AtomicInteger incr = new AtomicInteger(0);

            @Override
            public void handler(SelectableChannel channel) throws Exception {
              // 只作請求分發,不作具體的數據讀取
              ServerSocketChannel ch = (ServerSocketChannel) channel;
              SocketChannel socketChannel = ch.accept();
              socketChannel.configureBlocking(false);
              // 收到鏈接創建的通知以後,分發給I/O線程繼續去讀取數據
              int index = incr.getAndIncrement() % subReactorThreads.length;
              ReactorThread workEventLoop = subReactorThreads[index];
              workEventLoop.doStart();
              SelectionKey selectionKey = workEventLoop.register(socketChannel);
              selectionKey.interestOps(SelectionKey.OP_READ);
              System.out.println(
                  Thread.currentThread().getName() + "收到新鏈接 : " + socketChannel.getRemoteAddress());
            }
          };
    }

    // 建立IO線程,負責處理客戶端鏈接之後socketChannel的IO讀寫
    for (int i = 0; i < subReactorThreads.length; i++) {
      subReactorThreads[i] =
          new ReactorThread() {

            @Override
            public void handler(SelectableChannel channel) throws Exception {
              // work線程只負責處理IO處理,不處理accept事件
              SocketChannel ch = (SocketChannel) channel;
              ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
              while (ch.isOpen() && ch.read(requestBuffer) != -1) {
                // 長鏈接狀況下,須要手動判斷數據有沒有讀取結束 (此處作一個簡單的判斷: 超過0字節就認爲請求結束了)
                if (requestBuffer.position() > 0) break;
              }
              if (requestBuffer.position() == 0) return; // 若是沒數據了, 則不繼續後面的處理
              requestBuffer.flip();
              byte[] content = new byte[requestBuffer.limit()];
              requestBuffer.get(content);
              System.out.println(new String(content));
              System.out.println(
                  Thread.currentThread().getName() + "收到數據,來自:" + ch.getRemoteAddress());

              // TODO 業務操做 數據庫、接口...
              workPool.submit(() -> {});

              // 響應結果 200
              String response =
                  "HTTP/1.1 200 OK\r\n" + "Content-Length: 11\r\n\r\n" + "Hello World";
              ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
              while (buffer.hasRemaining()) {
                ch.write(buffer);
              }
            }
          };
    }
  }

  // 始化channel,而且綁定一個eventLoop線程
  private void initAndRegister() throws Exception {
    // 一、 建立ServerSocketChannel
    serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.configureBlocking(false);
    // 二、 將serverSocketChannel註冊到selector
    int index = new Random().nextInt(mainReactorThreads.length);
    mainReactorThreads[index].doStart();
    SelectionKey selectionKey = mainReactorThreads[index].register(serverSocketChannel);
    selectionKey.interestOps(SelectionKey.OP_ACCEPT);
  }

  // 綁定端口
  private void bind() throws IOException {
    //  一、 正式綁定端口,對外服務
    serverSocketChannel.bind(new InetSocketAddress(8080));
    System.out.println("啓動完成,端口8080");
  }

  public static void main(String[] args) throws Exception {
    NIOReactor nioReactor = new NIOReactor();
    // 一、 建立main和sub兩組線程
    nioReactor.newGroup();
    // 二、 建立serverSocketChannel,註冊到mainReactor線程上的selector上
    nioReactor.initAndRegister();
    // 三、 爲serverSocketChannel綁定端口
    nioReactor.bind();
  }
}

相關文章
相關標籤/搜索