Java I/O模型從BIO到NIO和Reactor模式

Java I/O模型

同步 vs. 異步

同步I/O 每一個請求必須逐個地被處理,一個請求的處理會致使整個流程的暫時等待,這些事件沒法併發地執行。用戶線程發起I/O請求後須要等待或者輪詢內核I/O操做完成後才能繼續執行。java

異步I/O 多個請求能夠併發地執行,一個請求或者任務的執行不會致使整個流程的暫時等待。用戶線程發起I/O請求後仍然繼續執行,當內核I/O操做完成後會通知用戶線程,或者調用用戶線程註冊的回調函數。react

阻塞 vs. 非阻塞

阻塞 某個請求發出後,因爲該請求操做須要的條件不知足,請求操做一直阻塞,不會返回,直到條件知足。緩存

非阻塞 請求發出後,若該請求須要的條件不知足,則當即返回一個標誌信息告知條件不知足,而不會一直等待。通常須要經過循環判斷請求條件是否知足來獲取請求結果。安全

須要注意的是,阻塞並不等價於同步,而非阻塞並不是等價於異步。事實上這兩組概念描述的是I/O模型中的兩個不一樣維度。服務器

同步和異步着重點在於多個任務執行過程當中,後發起的任務是否必須等先發起的任務完成以後再進行。而無論先發起的任務請求是阻塞等待完成,仍是當即返回經過循環等待請求成功。網絡

而阻塞和非阻塞重點在於請求的方法是否當即返回(或者說是否在條件不知足時被阻塞)。多線程

Unix下五種I/O模型

Unix 下共有五種 I/O 模型:併發

  • 阻塞 I/O
  • 非阻塞 I/O
  • I/O 多路複用(select和poll)
  • 信號驅動 I/O(SIGIO)
  • 異步 I/O(Posix.1的aio_系列函數)

阻塞I/O

如上文所述,阻塞I/O下請求沒法當即完成則保持阻塞。阻塞I/O分爲以下兩個階段。dom

  • 階段1:等待數據就緒。網絡 I/O 的狀況就是等待遠端數據陸續抵達;磁盤I/O的狀況就是等待磁盤數據從磁盤上讀取到內核態內存中。
  • 階段2:數據拷貝。出於系統安全,用戶態的程序沒有權限直接讀取內核態內存,所以內核負責把內核態內存中的數據拷貝一份到用戶態內存中。

非阻塞I/O

非阻塞I/O請求包含以下三個階段異步

  • socket設置爲 NONBLOCK(非阻塞)就是告訴內核,當所請求的I/O操做沒法完成時,不要將線程睡眠,而是返回一個錯誤碼(EWOULDBLOCK) ,這樣請求就不會阻塞。
  • I/O操做函數將不斷的測試數據是否已經準備好,若是沒有準備好,繼續測試,直到數據準備好爲止。整個I/O 請求的過程當中,雖然用戶線程每次發起I/O請求後能夠當即返回,可是爲了等到數據,仍須要不斷地輪詢、重複請求,消耗了大量的 CPU 的資源。
  • 數據準備好了,從內核拷貝到用戶空間。

通常不多直接使用這種模型,而是在其餘I/O模型中使用非阻塞I/O 這一特性。這種方式對單個I/O 請求意義不大,但給I/O多路複用提供了條件。

I/O多路複用(異步阻塞 I/O)

I/O多路複用會用到select或者poll函數,這兩個函數也會使線程阻塞,可是和阻塞I/O所不一樣的是,這兩個函數能夠同時阻塞多個I/O操做。並且能夠同時對多個讀操做,多個寫操做的I/O函數進行檢測,直到有數據可讀或可寫時,才真正調用I/O操做函數。

從流程上來看,使用select函數進行I/O請求和同步阻塞模型沒有太大的區別,甚至還多了添加監視Channel,以及調用select函數的額外操做,增長了額外工做。可是,使用 select之後最大的優點是用戶能夠在一個線程內同時處理多個Channel的I/O請求。用戶能夠註冊多個Channel,而後不斷地調用select讀取被激活的Channel,便可達到在同一個線程內同時處理多個I/O請求的目的。而在同步阻塞模型中,必須經過多線程的方式才能達到這個目的。

調用select/poll該方法由一個用戶態線程負責輪詢多個Channel,直到某個階段1的數據就緒,再通知實際的用戶線程執行階段2的拷貝。 經過一個專職的用戶態線程執行非阻塞I/O輪詢,模擬實現了階段一的異步化。

信號驅動I/O(SIGIO)

首先咱們容許socket進行信號驅動I/O,並安裝一個信號處理函數,線程繼續運行並不阻塞。當數據準備好時,線程會收到一個SIGIO 信號,能夠在信號處理函數中調用I/O操做函數處理數據。

異步I/O

調用aio_read 函數,告訴內核描述字,緩衝區指針,緩衝區大小,文件偏移以及通知的方式,而後當即返回。當內核將數據拷貝到緩衝區後,再通知應用程序。因此異步I/O模式下,階段1和階段2所有由內核完成,完成不須要用戶線程的參與。

幾種I/O模型對比

除異步I/O外,其它四種模型的階段2基本相同,都是從內核態拷貝數據到用戶態。區別在於階段1不一樣。前四種都屬於同步I/O。

Java中四種I/O模型

上一章所述Unix中的五種I/O模型,除信號驅動I/O外,Java對其它四種I/O模型都有所支持。其中Java最先提供的blocking I/O便是阻塞I/O,而NIO便是非阻塞I/O,同時經過NIO實現的Reactor模式便是I/O複用模型的實現,經過AIO實現的Proactor模式便是異步I/O模型的實現。

從IO到NIO

面向流 vs. 面向緩衝

Java IO是面向流的,每次從流(InputStream/OutputStream)中讀一個或多個字節,直到讀取完全部字節,它們沒有被緩存在任何地方。另外,它不能先後移動流中的數據,如需先後移動處理,須要先將其緩存至一個緩衝區。

Java NIO面向緩衝,數據會被讀取到一個緩衝區,須要時能夠在緩衝區中先後移動處理,這增長了處理過程的靈活性。但與此同時在處理緩衝區前須要檢查該緩衝區中是否包含有所須要處理的數據,並須要確保更多數據讀入緩衝區時,不會覆蓋緩衝區內還沒有處理的數據。

阻塞 vs. 非阻塞

Java IO的各類流是阻塞的。當某個線程調用read()或write()方法時,該線程被阻塞,直到有數據被讀取到或者數據徹底寫入。阻塞期間該線程沒法處理任何其它事情。

Java NIO爲非阻塞模式。讀寫請求並不會阻塞當前線程,在數據可讀/寫前當前線程能夠繼續作其它事情,因此一個單獨的線程能夠管理多個輸入和輸出通道。

選擇器(Selector)

Java NIO的選擇器容許一個單獨的線程同時監視多個通道,能夠註冊多個通道到同一個選擇器上,而後使用一個單獨的線程來「選擇」已經就緒的通道。這種「選擇」機制爲一個單獨線程管理多個通道提供了可能。

零拷貝

Java NIO中提供的FileChannel擁有transferTo和transferFrom兩個方法,可直接把FileChannel中的數據拷貝到另一個Channel,或者直接把另一個Channel中的數據拷貝到FileChannel。該接口常被用於高效的網絡/文件者數據傳輸和大文件拷貝。在操做系統支持的狀況下,經過該方法傳輸數據並不須要將源數據從內核態拷貝到用戶態,再從用戶態拷貝到目標通道的內核態,同時也避免了兩次用戶態和內核態間的上下文切換,也即便用了「零拷貝」,因此其性能通常高於Java IO中提供的方法。

使用FileChannel的零拷貝將本地文件內容傳輸到網絡的示例代碼以下所示。

public class NIOClient {

  public static void main(String[] args) throws IOException, InterruptedException {
    SocketChannel socketChannel = SocketChannel.open();
    InetSocketAddress address = new InetSocketAddress(1234);
    socketChannel.connect(address);

    RandomAccessFile file = new RandomAccessFile(
        NIOClient.class.getClassLoader().getResource("test.txt").getFile(), "rw");
    FileChannel channel = file.getChannel();
    channel.transferTo(0, channel.size(), socketChannel);
    channel.close();
    file.close();
    socketChannel.close();
  }
}

阻塞I/O下的服務器實現

單線程逐個處理全部請求

使用阻塞I/O的服務器,通常使用循環,逐個接受鏈接請求並讀取數據,而後處理下一個請求。

public class IOServer {

  private static final Logger LOGGER = LoggerFactory.getLogger(IOServer.class);

  public static void main(String[] args) {
    ServerSocket serverSocket = null;
    try {
      serverSocket = new ServerSocket();
      serverSocket.bind(new InetSocketAddress(2345));
    } catch (IOException ex) {
      LOGGER.error("Listen failed", ex);
      return;
    }
    try{
      while(true) {
        Socket socket = serverSocket.accept();
        InputStream inputstream = socket.getInputStream();
        LOGGER.info("Received message {}", IOUtils.toString(new InputStreamReader(inputstream)));
      }
    } catch(IOException ex) {
      try {
        serverSocket.close();
      } catch (IOException e) {
      }
      LOGGER.error("Read message failed", ex);
    }
  }
}

爲每一個請求建立一個線程

上例使用單線程逐個處理全部請求,同一時間只能處理一個請求,等待I/O的過程浪費大量CPU資源,同時沒法充分使用多CPU的優點。下面是使用多線程對阻塞I/O模型的改進。一個鏈接創建成功後,建立一個單獨的線程處理其I/O操做。
阻塞I/O 多線程

public class IOServerMultiThread {

  private static final Logger LOGGER = LoggerFactory.getLogger(IOServerMultiThread.class);

  public static void main(String[] args) {
    ServerSocket serverSocket = null;
    try {
      serverSocket = new ServerSocket();
      serverSocket.bind(new InetSocketAddress(2345));
    } catch (IOException ex) {
      LOGGER.error("Listen failed", ex);
      return;
    }
    try{
      while(true) {
        Socket socket = serverSocket.accept();
        new Thread( () -> {
          try{
            InputStream inputstream = socket.getInputStream();
            LOGGER.info("Received message {}", IOUtils.toString(new InputStreamReader(inputstream)));
          } catch (IOException ex) {
            LOGGER.error("Read message failed", ex);
          }
        }).start();
      }
    } catch(IOException ex) {
      try {
        serverSocket.close();
      } catch (IOException e) {
      }
      LOGGER.error("Accept connection failed", ex);
    }
  }
}

使用線程池處理請求

爲了防止鏈接請求過多,致使服務器建立的線程數過多,形成過多線程上下文切換的開銷。能夠經過線程池來限制建立的線程數,以下所示。

public class IOServerThreadPool {

  private static final Logger LOGGER = LoggerFactory.getLogger(IOServerThreadPool.class);

  public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    ServerSocket serverSocket = null;
    try {
      serverSocket = new ServerSocket();
      serverSocket.bind(new InetSocketAddress(2345));
    } catch (IOException ex) {
      LOGGER.error("Listen failed", ex);
      return;
    }
    try{
      while(true) {
        Socket socket = serverSocket.accept();
        executorService.submit(() -> {
          try{
            InputStream inputstream = socket.getInputStream();
            LOGGER.info("Received message {}", IOUtils.toString(new InputStreamReader(inputstream)));
          } catch (IOException ex) {
            LOGGER.error("Read message failed", ex);
          }
        });
      }
    } catch(IOException ex) {
      try {
        serverSocket.close();
      } catch (IOException e) {
      }
      LOGGER.error("Accept connection failed", ex);
    }
  }
}

Reactor模式

精典Reactor模式

精典的Reactor模式示意圖以下所示。
精典Reactor

在Reactor模式中,包含以下角色

  • Reactor 將I/O事件發派給對應的Handler
  • Acceptor 處理客戶端鏈接請求
  • Handlers 執行非阻塞讀/寫

最簡單的Reactor模式實現代碼以下所示。

public class NIOServer {

  private static final Logger LOGGER = LoggerFactory.getLogger(NIOServer.class);

  public static void main(String[] args) throws IOException {
    Selector selector = Selector.open();
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.configureBlocking(false);
    serverSocketChannel.bind(new InetSocketAddress(1234));
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

    while (selector.select() > 0) {
      Set<SelectionKey> keys = selector.selectedKeys();
      Iterator<SelectionKey> iterator = keys.iterator();
      while (iterator.hasNext()) {
        SelectionKey key = iterator.next();
        iterator.remove();
        if (key.isAcceptable()) {
          ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel();
          SocketChannel socketChannel = acceptServerSocketChannel.accept();
          socketChannel.configureBlocking(false);
          LOGGER.info("Accept request from {}", socketChannel.getRemoteAddress());
          socketChannel.register(selector, SelectionKey.OP_READ);
        } else if (key.isReadable()) {
          SocketChannel socketChannel = (SocketChannel) key.channel();
          ByteBuffer buffer = ByteBuffer.allocate(1024);
          int count = socketChannel.read(buffer);
          if (count <= 0) {
            socketChannel.close();
            key.cancel();
            LOGGER.info("Received invalide data, close the connection");
            continue;
          }
          LOGGER.info("Received message {}", new String(buffer.array()));
        }
        keys.remove(key);
      }
    }
  }
}

爲了方便閱讀,上示代碼將Reactor模式中的全部角色放在了一個類中。

從上示代碼中能夠看到,多個Channel能夠註冊到同一個Selector對象上,實現了一個線程同時監控多個請求狀態(Channel)。同時註冊時須要指定它所關注的事件,例如上示代碼中socketServerChannel對象只註冊了OP_ACCEPT事件,而socketChannel對象只註冊了OP_READ事件。

selector.select()是阻塞的,當有至少一個通道可用時該方法返回可用通道個數。同時該方法只捕獲Channel註冊時指定的所關注的事件。

多工做線程Reactor模式

經典Reactor模式中,儘管一個線程可同時監控多個請求(Channel),可是全部讀/寫請求以及對新鏈接請求的處理都在同一個線程中處理,無想充分利用多CPU的優點,同時讀/寫操做也會阻塞對新鏈接請求的處理。所以能夠引入多線程,並行處理多個讀/寫操做,以下圖所示。
多線程Reactor

多線程Reactor模式示例代碼以下所示。

public class NIOServer {

  private static final Logger LOGGER = LoggerFactory.getLogger(NIOServer.class);

  public static void main(String[] args) throws IOException {
    Selector selector = Selector.open();
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.configureBlocking(false);
    serverSocketChannel.bind(new InetSocketAddress(1234));
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

    while (true) {
      if(selector.selectNow() < 0) {
        continue;
      }
      Set<SelectionKey> keys = selector.selectedKeys();
      Iterator<SelectionKey> iterator = keys.iterator();
      while(iterator.hasNext()) {
        SelectionKey key = iterator.next();
        iterator.remove();
        if (key.isAcceptable()) {
          ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel();
          SocketChannel socketChannel = acceptServerSocketChannel.accept();
          socketChannel.configureBlocking(false);
          LOGGER.info("Accept request from {}", socketChannel.getRemoteAddress());
          SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ);
          readKey.attach(new Processor());
        } else if (key.isReadable()) {
          Processor processor = (Processor) key.attachment();
          processor.process(key);
        }
      }
    }
  }
}

從上示代碼中能夠看到,註冊完SocketChannel的OP_READ事件後,能夠對相應的SelectionKey attach一個對象(本例中attach了一個Processor對象,該對象處理讀請求),而且在獲取到可讀事件後,能夠取出該對象。

注:attach對象及取出該對象是NIO提供的一種操做,但該操做並不是Reactor模式的必要操做,本文使用它,只是爲了方便演示NIO的接口。

具體的讀請求處理在以下所示的Processor類中。該類中設置了一個靜態的線程池處理全部請求。而process方法並不直接處理I/O請求,而是把該I/O操做提交給上述線程池去處理,這樣就充分利用了多線程的優點,同時將對新鏈接的處理和讀/寫操做的處理放在了不一樣的線程中,讀/寫操做再也不阻塞對新鏈接請求的處理。

public class Processor {
  private static final Logger LOGGER = LoggerFactory.getLogger(Processor.class);
  private static final ExecutorService service = Executors.newFixedThreadPool(16);

  public void process(SelectionKey selectionKey) {
    service.submit(() -> {
      ByteBuffer buffer = ByteBuffer.allocate(1024);
      SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
      int count = socketChannel.read(buffer);
      if (count < 0) {
        socketChannel.close();
        selectionKey.cancel();
        LOGGER.info("{}\t Read ended", socketChannel);
        return null;
      } else if(count == 0) {
        return null;
      }
      LOGGER.info("{}\t Read message {}", socketChannel, new String(buffer.array()));
      return null;
    });
  }
}

多Reactor

Netty中使用的Reactor模式,引入了多Reactor,也即一個主Reactor負責監控全部的鏈接請求,多個子Reactor負責監控並處理讀/寫請求,減輕了主Reactor的壓力,下降了主Reactor壓力太大而形成的延遲。
而且每一個子Reactor分別屬於一個獨立的線程,每一個成功鏈接後的Channel的全部操做由同一個線程處理。這樣保證了同一請求的全部狀態和上下文在同一個線程中,避免了沒必要要的上下文切換,同時也方便了監控請求響應狀態。

多Reactor模式示意圖以下所示。
多Reactor

多Reactor示例代碼以下所示。

public class NIOServer {

  private static final Logger LOGGER = LoggerFactory.getLogger(NIOServer.class);

  public static void main(String[] args) throws IOException {
    Selector selector = Selector.open();
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.configureBlocking(false);
    serverSocketChannel.bind(new InetSocketAddress(1234));
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

    int coreNum = Runtime.getRuntime().availableProcessors();
    Processor[] processors = new Processor[coreNum];
    for (int i = 0; i < processors.length; i++) {
      processors[i] = new Processor();
    }

    int index = 0;
    while (selector.select() > 0) {
      Set<SelectionKey> keys = selector.selectedKeys();
      for (SelectionKey key : keys) {
        keys.remove(key);
        if (key.isAcceptable()) {
          ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel();
          SocketChannel socketChannel = acceptServerSocketChannel.accept();
          socketChannel.configureBlocking(false);
          LOGGER.info("Accept request from {}", socketChannel.getRemoteAddress());
          Processor processor = processors[(int) ((index++) / coreNum)];
          processor.addChannel(socketChannel);
        }
      }
    }
  }
}

如上代碼所示,本文設置的子Reactor個數是當前機器可用核數的兩倍(與Netty默認的子Reactor個數一致)。對於每一個成功鏈接的SocketChannel,經過round robin的方式交給不一樣的子Reactor。

子Reactor對SocketChannel的處理以下所示。

public class Processor {
  private static final Logger LOGGER = LoggerFactory.getLogger(Processor.class);
  private static final ExecutorService service =
      Executors.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors());

  private Selector selector;

  public Processor() throws IOException {
    this.selector = SelectorProvider.provider().openSelector();
    start();
  }

  public void addChannel(SocketChannel socketChannel) throws ClosedChannelException {
    socketChannel.register(this.selector, SelectionKey.OP_READ);
  }

  public void start() {
    service.submit(() -> {
      while (true) {
        if (selector.selectNow() <= 0) {
          continue;
        }
        Set<SelectionKey> keys = selector.selectedKeys();
        Iterator<SelectionKey> iterator = keys.iterator();
        while (iterator.hasNext()) {
          SelectionKey key = iterator.next();
          iterator.remove();
          if (key.isReadable()) {
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            SocketChannel socketChannel = (SocketChannel) key.channel();
            int count = socketChannel.read(buffer);
            if (count < 0) {
              socketChannel.close();
              key.cancel();
              LOGGER.info("{}\t Read ended", socketChannel);
              continue;
            } else if (count == 0) {
              LOGGER.info("{}\t Message size is 0", socketChannel);
              continue;
            } else {
              LOGGER.info("{}\t Read message {}", socketChannel, new String(buffer.array()));
            }
          }
        }
      }
    });
  }
}

在Processor中,一樣建立了一個靜態的線程池,且線程池的大小爲機器核數的兩倍。每一個Processor實例均包含一個Selector實例。同時每次獲取Processor實例時均提交一個任務到該線程池,而且該任務正常狀況下一直循環處理,不會中止。而提交給該Processor的SocketChannel經過在其Selector註冊事件,加入到相應的任務中。由此實現了每一個子Reactor包含一個Selector對象,並由一個獨立的線程處理。

相關文章
相關標籤/搜索