【NIO系列】——之Reactor模型


在開篇以前,咱們對JavaNIO 的使用方式不作過多介紹,這種API的介紹方式網上太多了,不必詳細介紹,咱們假設NIO的使用方式,你可以熟練運用。這是NIO系列第三篇:java

【NIO系列】——之TCP探祕react

【NIO系列】——之IO模型程序員


經過以前的Unix的IO模型介紹,想必也瞭解到了5種IO模型。java的NIO是屬於同步非阻塞IO,關於IO多路複用,java沒有相應的IO模型,但有相應的編程模式,Reactor 就是基於NIO中實現多路複用的一種模式。本文將從如下幾點闡述Reactor模式:編程

  1. reactor 是什麼設計模式

  2. 爲什麼要用,能解決什麼問題安全

  3. 如何用,更好的方式bash

  4. 其餘事件處理模式服務器


1、Reactor 是什麼

關於reactor 是什麼,咱們先從wiki上看下:網絡

The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers.多線程

從上述文字中咱們能夠看出如下關鍵點 :

  1. 事件驅動(event handling)

  2. 能夠處理一個或多個輸入源(one or more inputs)

  3. 經過Service Handler同步的將輸入事件(Event)採用多路複用分發給相應的Request Handler(多個)處理

自POSA2 中的關於Reactor Pattern 介紹中,咱們瞭解了Reactor 的處理方式:

  1. 同步的等待多個事件源到達(採用select()實現)

  2. 將事件多路分解以及分配相應的事件服務進行處理,這個分派採用server集中處理(dispatch)

  3. 分解的事件以及對應的事件服務應用從分派服務中分離出去(handler)


關於Reactor Pattern 的OMT 類圖設計:


2、爲什麼要用Reactor

常見的網絡服務中,若是每個客戶端都維持一個與登錄服務器的鏈接。那麼服務器將維護多個和客戶端的鏈接以出來和客戶端的contnect 、read、write ,特別是對於長連接的服務,有多少個c端,就須要在s端維護同等的IO鏈接。這對服務器來講是一個很大的開銷。

一、BIO

好比咱們採用BIO的方式來維護和客戶端的鏈接:

// 主線程維護鏈接
  public void run() {
      try {
          while (true) {
              Socket socket = serverSocket.accept();
              //提交線程池處理
              executorService.submit(new Handler(socket));
          }
      } catch (Exception e) {
          e.printStackTrace();
      }
  }
​
  // 處理讀寫服務
  class Handler implements Runnable {
      public void run() {
          try {
              //獲取Socket的輸入流,接收數據
              BufferedReader buf = new BufferedReader(new InputStreamReader(socket.getInputStream()));
              String readData = buf.readLine();
              while (readData != null) {
                  readData = buf.readLine();
                  System.out.println(readData);
              }
          } catch (Exception e) {
              e.printStackTrace();
          }
      }
  }複製代碼


很明顯,爲了不資源耗盡,咱們採用線程池的方式來處理讀寫服務。可是這麼作依然有很明顯的弊端:

  1. 同步阻塞IO,讀寫阻塞,線程等待時間過長

  2. 在制定線程策略的時候,只能根據CPU的數目來限定可用線程資源,不能根據鏈接併發數目來制定,也就是鏈接有限制。不然很難保證對客戶端請求的高效和公平。

  3. 多線程之間的上下文切換,形成線程使用效率並不高,而且不易擴展

  4. 狀態數據以及其餘須要保持一致的數據,須要採用併發同步控制


二、NIO

那麼能夠有其餘方式來更好的處理麼,咱們能夠採用NIO來處理,NIO中支持的基本機制:

  1. 非阻塞的IO讀寫

  2. 基於IO事件進行分發任務,同時支持對多個fd的監聽


咱們看下NIO 中實現相關方式:

public NIOServer(int port) throws Exception {
      selector = Selector.open();
      serverSocket = ServerSocketChannel.open();
      serverSocket.socket().bind(new InetSocketAddress(port));
      serverSocket.configureBlocking(false);
      serverSocket.register(selector, SelectionKey.OP_ACCEPT);
  }
​
  @Override
  public void run() {
      while (!Thread.interrupted()) {
          try {
              //阻塞等待事件
              selector.select();
              // 事件列表
              Set selected = selector.selectedKeys();
              Iterator it = selected.iterator();
              while (it.hasNext()) {
                  it.remove();
                  //分發事件
                  dispatch((SelectionKey) (it.next()));
              }
          } catch (Exception e) {
​
          }
      }
  }
​
  private void dispatch(SelectionKey key) throws Exception {
      if (key.isAcceptable()) {
          register(key);//新連接創建,註冊
      } else if (key.isReadable()) {
          read(key);//讀事件處理
      } else if (key.isWritable()) {
          wirete(key);//寫事件處理
      }
  }
​
  private void register(SelectionKey key) throws Exception {
      ServerSocketChannel server = (ServerSocketChannel) key
              .channel();
      // 得到和客戶端鏈接的通道
      SocketChannel channel = server.accept();
      channel.configureBlocking(false);
      //客戶端通道註冊到selector 上
      channel.register(this.selector, SelectionKey.OP_READ);
  }複製代碼


咱們能夠看到上述的NIO例子已經差很少擁有reactor的影子了

  1. 基於事件驅動-> selector(支持對多個socketChannel的監聽)

  2. 統一的事件分派中心-> dispatch

  3. 事件處理服務-> read & write


事實上NIO已經解決了上述BIO暴露的1&2問題了,服務器的併發客戶端有了量的提高,再也不受限於一個客戶端一個線程來處理,而是一個線程能夠維護多個客戶端(selector 支持對多個socketChannel 監聽)。

但這依然不是一個完善的Reactor Pattern ,首先Reactor 是一種設計模式,好的模式應該是支持更好的擴展性,顯然以上的並不支持,另外好的Reactor Pattern 必須有如下特色:

  1. 更少的資源利用,一般不須要一個客戶端一個線程

  2. 更少的開銷,更少的上下文切換以及locking

  3. 可以跟蹤服務器狀態

  4. 可以管理handler 對event的綁定

那麼好的Reactor Pattern應該是怎樣的?

3、Reactor

在應用Java NIO構建Reactor Pattern中,大神 Doug Lea(讓人無限景仰的java 大神)在「Scalable IO in Java」中給了很好的闡述。咱們採用大神介紹的3種Reactor 來分別介紹。

首先咱們基於Reactor Pattern 處理模式中,定義如下三種角色:

  • Reactor將I/O事件分派給對應的Handler

  • Acceptor處理客戶端新鏈接,並分派請求處處理器鏈中

  • Handlers執行非阻塞讀/寫 任務


一、單Reactor單線程模型

咱們看代碼的實現方式:

/**
    * 等待事件到來,分發事件處理
    */
  class Reactor implements Runnable {
​
      private Reactor() throws Exception {
​
          SelectionKey sk =
                  serverSocket.register(selector,
                          SelectionKey.OP_ACCEPT);
          // attach Acceptor 處理新鏈接
          sk.attach(new Acceptor());
      }
​
      public void run() {
          try {
              while (!Thread.interrupted()) {
                  selector.select();
                  Set selected = selector.selectedKeys();
                  Iterator it = selected.iterator();
                  while (it.hasNext()) {
                      it.remove();
                      //分發事件處理
                      dispatch((SelectionKey) (it.next()));
                  }
              }
          } catch (IOException ex) {
              //do something
          }
      }
​
      void dispatch(SelectionKey k) {
          // 如果鏈接事件獲取是acceptor
          // 如果IO讀寫事件獲取是handler
          Runnable runnable = (Runnable) (k.attachment());
          if (runnable != null) {
              runnable.run();
          }
      }
​
  }
  /**
    * 鏈接事件就緒,處理鏈接事件
    */
  class Acceptor implements Runnable {
      @Override
      public void run() {
          try {
              SocketChannel c = serverSocket.accept();
              if (c != null) {// 註冊讀寫
                  new Handler(c, selector);
              }
          } catch (Exception e) {
​
          }
      }
  }
  /**
    * 處理讀寫業務邏輯
    */
  class Handler implements Runnable {
      public static final int READING = 0, WRITING = 1;
      int state;
      final SocketChannel socket;
      final SelectionKey sk;
​
      public Handler(SocketChannel socket, Selector sl) throws Exception {
          this.state = READING;
          this.socket = socket;
          sk = socket.register(selector, SelectionKey.OP_READ);
          sk.attach(this);
          socket.configureBlocking(false);
      }
​
      @Override
      public void run() {
          if (state == READING) {
              read();
          } else if (state == WRITING) {
              write();
          }
      }
​
      private void read() {
          process();
          //下一步處理寫事件
          sk.interestOps(SelectionKey.OP_WRITE);
          this.state = WRITING;
      }
​
      private void write() {
          process();
          //下一步處理讀事件
          sk.interestOps(SelectionKey.OP_READ);
          this.state = READING;
      }
​
      /**
        * task 業務處理
        */
      public void process() {
          //do something
      }
  }複製代碼


這是最基本的單Reactor單線程模型。其中Reactor線程,負責多路分離套接字,有新鏈接到來觸發connect 事件以後,交由Acceptor進行處理,有IO讀寫事件以後交給hanlder 處理。

Acceptor主要任務就是構建handler ,在獲取到和client相關的SocketChannel以後 ,綁定到相應的hanlder上,對應的SocketChannel有讀寫事件以後,基於racotor 分發,hanlder就能夠處理了(全部的IO事件都綁定到selector上,有Reactor分發)。

該模型 適用於處理器鏈中業務處理組件能快速完成的場景。不過,這種單線程模型不能充分利用多核資源,因此實際使用的很少。


二、單Reactor多線程模型

相對於第一種單線程的模式來講,在處理業務邏輯,也就是獲取到IO的讀寫事件以後,交由線程池來處理,這樣能夠減少主reactor的性能開銷,從而更專一的作事件分發工做了,從而提高整個應用的吞吐。

咱們看下實現方式:

複製代碼
/**
    * 多線程處理讀寫業務邏輯
    */
  class MultiThreadHandler implements Runnable {
      public static final int READING = 0, WRITING = 1;
      int state;
      final SocketChannel socket;
      final SelectionKey sk;
​
      //多線程處理業務邏輯
      ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
​
​
      public MultiThreadHandler(SocketChannel socket, Selector sl) throws Exception {
          this.state = READING;
          this.socket = socket;
          sk = socket.register(selector, SelectionKey.OP_READ);
          sk.attach(this);
          socket.configureBlocking(false);
      }
​
      @Override
      public void run() {
          if (state == READING) {
              read();
          } else if (state == WRITING) {
              write();
          }
      }
​
      private void read() {
          //任務異步處理
          executorService.submit(() -> process());
​
          //下一步處理寫事件
          sk.interestOps(SelectionKey.OP_WRITE);
          this.state = WRITING;
      }
​
      private void write() {
          //任務異步處理
          executorService.submit(() -> process());
​
          //下一步處理讀事件
          sk.interestOps(SelectionKey.OP_READ);
          this.state = READING;
      }
​
      /**
        * task 業務處理
        */
      public void process() {
          //do IO ,task,queue something
      }
  }複製代碼


三、多Reactor多線程模型


第三種模型比起第二種模型,是將Reactor分紅兩部分,

  1. mainReactor負責監聽server socket,用來處理新鏈接的創建,將創建的socketChannel指定註冊給subReactor。

  2. subReactor維護本身的selector, 基於mainReactor 註冊的socketChannel多路分離IO讀寫事件,讀寫網 絡數據,對業務處理的功能,另其扔給worker線程池來完成。


咱們看下實現方式:

/**
    * 多work 鏈接事件Acceptor,處理鏈接事件
    */
  class MultiWorkThreadAcceptor implements Runnable {
​
      // cpu線程數相同多work線程
      int workCount =Runtime.getRuntime().availableProcessors();
      SubReactor[] workThreadHandlers = new SubReactor[workCount];
      volatile int nextHandler = 0;
​
      public MultiWorkThreadAcceptor() {
          this.init();
      }
​
      public void init() {
          nextHandler = 0;
          for (int i = 0; i < workThreadHandlers.length; i++) {
              try {
                  workThreadHandlers[i] = new SubReactor();
              } catch (Exception e) {
              }
​
          }
      }
​
      @Override
      public void run() {
          try {
              SocketChannel c = serverSocket.accept();
              if (c != null) {// 註冊讀寫
                  synchronized (c) {
                      // 順序獲取SubReactor,而後註冊channel 
                      SubReactor work = workThreadHandlers[nextHandler];
                      work.registerChannel(c);
                      nextHandler++;
                      if (nextHandler >= workThreadHandlers.length) {
                          nextHandler = 0;
                      }
                  }
              }
          } catch (Exception e) {
          }
      }
  }
  /**
    * 多work線程處理讀寫業務邏輯
    */
  class SubReactor implements Runnable {
      final Selector mySelector;
​
      //多線程處理業務邏輯
      int workCount =Runtime.getRuntime().availableProcessors();
      ExecutorService executorService = Executors.newFixedThreadPool(workCount);
​
​
      public SubReactor() throws Exception {
          // 每一個SubReactor 一個selector 
          this.mySelector = SelectorProvider.provider().openSelector();
      }
​
      /**
        * 註冊chanel
        *
        * @param sc
        * @throws Exception
        */
      public void registerChannel(SocketChannel sc) throws Exception {
          sc.register(mySelector, SelectionKey.OP_READ | SelectionKey.OP_CONNECT);
      }
​
      @Override
      public void run() {
          while (true) {
              try {
              //每一個SubReactor 本身作事件分派處理讀寫事件
                  selector.select();
                  Set<SelectionKey> keys = selector.selectedKeys();
                  Iterator<SelectionKey> iterator = keys.iterator();
                  while (iterator.hasNext()) {
                      SelectionKey key = iterator.next();
                      iterator.remove();
                      if (key.isReadable()) {
                          read();
                      } else if (key.isWritable()) {
                          write();
                      }
                  }
​
              } catch (Exception e) {
​
              }
          }
      }
​
      private void read() {
          //任務異步處理
          executorService.submit(() -> process());
      }
​
      private void write() {
          //任務異步處理
          executorService.submit(() -> process());
      }
​
      /**
        * task 業務處理
        */
      public void process() {
          //do IO ,task,queue something
      }
  }
​複製代碼


第三種模型中,咱們能夠看到,mainReactor 主要是用來處理網絡IO 鏈接創建操做,一般一個線程就能夠處理,而subReactor主要作和創建起來的socket作數據交互和事件業務處理操做,它的個數上通常是和CPU個數等同,每一個subReactor一個縣城來處理。

此種模型中,每一個模塊的工做更加專注,耦合度更低,性能和穩定性也大量的提高,支持的可併發客戶端數量可達到上百萬級別。

關於此種模型的應用,目前有不少優秀的礦建已經在應用了,好比mina 和netty 等。上述中去掉線程池的第三種形式的變種,也 是Netty NIO的默認模式。下一節咱們將着重講解netty的架構模式。


4、事件處理模式

Douglas Schmidt的大做《POSA2》中有關於事件處理模式的介紹,其中有四種事件處理模式:

  1. Reactor  

  2. Proactor  

  3. Asynchronous Completion Token  

  4. Acceptor-Connector  

1.Proactor

本文介紹的Reactor就是其中一種,而Proactor的總體結構和reacotor的處理方式大同小異,不一樣的是Proactor採用的是異步非阻塞IO的方式實現,對數據的讀寫由異步處理,無需用戶線程來處理,服務程序更專一於業務事件的處理,而非IO阻塞。

2.Asynchronous Completion Token

簡單來講,ACT就是應對應用程序異步調用服務操做,並處理相應的服務完成事件。從token這個字面意思,咱們大概就能瞭解到,它是一種狀態的保持和傳遞。

好比,一般應用程序會有調用第三方服務的需求,通常是業務線程請求都到,須要第三方資源的時候,去同步的發起第三方請求,而爲了提高應用性能,須要異步的方式發起請求,但異步請求的話,等數據到達以後,此時的我方應用程序的語境以及上下文信息已經發生了變化,你沒辦法去處理。

ACT解決的就是這個問題,採用了一個token的方式記錄異步發送前的信息,發送給接受方,接受方回覆的時候再帶上這個token,此時就能恢復業務的調用場景。


上圖中咱們能夠看到在client processing 這個階段,客戶端是能夠繼續處理其餘業務邏輯的,不是阻塞狀態。service 返回期間會帶上token信息。  


3.Acceptor-Connector

Acceptor-Connector是於Reactor的結合,也能夠當作是一種變種,它看起來很像上面介紹的Reactor第三種實現方式,但又有本質的不一樣。

Acceptor-Connector模式是將網絡中對等服務的鏈接和初始化分開處理,使系統中的鏈接創建及服務一旦服務初始化後就分開解除耦合。鏈接器

主動地
創建到遠地接受器組件的鏈接,並初始化服務處理器來處理在鏈接上交換的數據。一樣地,接受器
被動地
等待來自遠地鏈接器的鏈接請求,在這樣的請求到達時創建鏈接,並初始化服務處理器來處理在鏈接上交換的數據。隨後已初始化的服務處理器執行應用特有的處理,並經過鏈接器和接受器組件創建的鏈接來進行通訊。

這麼處理的好處是:

  1. 通常而言,用於鏈接創建和服務初始化的策略變更的頻度要遠小於應用服務實現和通訊協議。

  2. 容易增長新類型的服務、新的服務實現和新的通訊協議,而又不影響現有的鏈接創建和服務初始化軟件。好比採用IPX/SPX通訊協議或者TCP協議。

  3. 鏈接角色和通訊角色的去耦合,鏈接角色只管發起鏈接 vs. 接受鏈接。通訊角色只管數據交互。

  4. 將程序員與低級網絡編程API(像socket或TLI)類型安全性的缺少屏蔽開來。業務開發關係底層通訊


引用:

www.kuqin.com/ace-2002-12…

www.dre.vanderbilt.edu/%7Eschmidt/…

gee.cs.oswego.edu/dl/cpjslide…


更多架構知識,歡迎關注個人公衆號,大碼候(cool_wier)

相關文章
相關標籤/搜索