Java NIO 詳解(二)

異步IO
異步 I/O 是一種沒有阻塞地讀寫數據的方法。一般,在代碼進行 read() 調用時,代碼會阻塞直至有可供讀取的數據。一樣, write()調用將會阻塞直至數據可以寫入,關於同步的IO請參考另外一篇文章Java IO。java

另外一方面,異步 I/O 調用不但不會阻塞,相反,您能夠註冊對特定 I/O 事件諸如數據可讀、新鏈接到來等等,而在發生這樣感興趣的事件時,系統將會告訴您。編程

異步 I/O 的一個優點在於,它容許您同時根據大量的輸入和輸出執行 I/O。同步程序經常要求助於輪詢,或者建立許許多多的線程以處理大量的鏈接。使用異步 I/O,您能夠監放任何數量的通道上的事件,不用輪詢,也不用額外的線程。安全

Selector
在個人JavaNIO詳解(一)中已經詳細介紹了Java NIO三個核心對象中的Buffer和Channel,如今咱們就重點介紹一下第三個核心對象Selector。Selector是一個對象,它能夠註冊到不少個Channel上,監聽各個Channel上發生的事件,而且可以根據事件狀況決定Channel讀寫。這樣,經過一個線程管理多個Channel,就能夠處理大量網絡鏈接了。服務器

採用Selector模式的的好處
有了Selector,咱們就能夠利用一個線程來處理全部的channels。線程之間的切換對操做系統來講代價是很高的,而且每一個線程也會佔用必定的系統資源。因此,對系統來講使用的線程越少越好。網絡

可是,須要記住,現代的操做系統和CPU在多任務方面表現的愈來愈好,因此多線程的開銷隨着時間的推移,變得愈來愈小了。實際上,若是一個CPU有多個內核,不使用多任務多是在浪費CPU能力。無論怎麼說,關於那種設計的討論應該放在另外一篇不一樣的文章中。在這裏,只要知道使用Selector可以處理多個通道就足夠了。多線程

下面這幅圖展現了一個線程處理3個 Channel的狀況:
異步

如何建立一個Selector

異步 I/O 中的核心對象名爲 Selector。Selector 就是您註冊對各類 I/O 事件興趣的地方,並且當那些事件發生時,就是這個對象告訴您所發生的事件。socket

Selector selector = Selector.open();

而後,就須要註冊Channel到Selector了。測試

如何註冊Channel到Selector

爲了能讓Channel和Selector配合使用,咱們須要把Channel註冊到Selector上。經過調用 channel.register()方法來實現註冊:this

channel.configureBlocking(false);
SelectionKey key =channel.register(selector,SelectionKey.OP_READ);

注意,註冊的Channel 必須設置成異步模式 才能夠,,不然異步IO就沒法工做,這就意味着咱們不能把一個FileChannel註冊到Selector,由於FileChannel沒有異步模式,可是網絡編程中的SocketChannel是能夠的。

須要注意register()方法的第二個參數,它是一個「interest set」,意思是註冊的Selector對Channel中的哪些時間感興趣,事件類型有四種:

  1. Connect
  2. Accept
  3. Read
  4. Write

通道觸發了一個事件意思是該事件已經 Ready(就緒)。因此,某個Channel成功鏈接到另外一個服務器稱爲 Connect Ready。一個ServerSocketChannel準備好接收新鏈接稱爲 Accept Ready,一個有數據可讀的通道能夠說是 Read Ready,等待寫數據的通道能夠說是Write Ready

上面這四個事件對應到SelectionKey中的四個常量:

1. SelectionKey.OP_CONNECT
2. SelectionKey.OP_ACCEPT
3. SelectionKey.OP_READ
4. SelectionKey.OP_WRITE

若是你對多個事件感興趣,能夠經過or操做符來鏈接這些常量:

int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;

關於SelectionKey

請注意對register()的調用的返回值是一個SelectionKey。 SelectionKey 表明這個通道在此 Selector 上的這個註冊。當某個 Selector 通知您某個傳入事件時,它是經過提供對應於該事件的 SelectionKey 來進行的。SelectionKey 還能夠用於取消通道的註冊。SelectionKey中包含以下屬性:

  • The interest set
  • The ready set
  • The Channel
  • The Selector
  • An attached object (optional)

Interest Set

就像咱們在前面講到的把Channel註冊到Selector來監聽感興趣的事件,interest set就是你要選擇的感興趣的事件的集合。你能夠經過SelectionKey對象來讀寫interest set:

int interestSet = selectionKey.interestOps();
boolean isInterestedInAccept  = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead    = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite   = interestSet & SelectionKey.OP_WRITE;

經過上面例子能夠看到,咱們能夠經過用AND 和SelectionKey 中的常量作運算,從SelectionKey中找到咱們感興趣的事件。

Ready Set

ready set 是通道已經準備就緒的操做的集合。在一次選Selection以後,你應該會首先訪問這個ready set。Selection將在下一小節進行解釋。能夠這樣訪問ready集合:

int readySet = selectionKey.readyOps();
12

能夠用像檢測interest集合那樣的方法,來檢測Channel中什麼事件或操做已經就緒。可是,也可使用如下四個方法,它們都會返回一個布爾類型:

selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();

Channel 和Selector

咱們能夠經過SelectionKey得到Selector和註冊的Channel:

Channel  channel  = selectionKey.channel();
Selector selector = selectionKey.selector();

Attach 一個對象

能夠將一個對象或者更多信息attach 到SelectionKey上,這樣就能方便的識別某個給定的通道。例如,能夠附加 與通道一塊兒使用的Buffer,或是包含彙集數據的某個對象。使用方法以下:

selectionKey.attach(theObject);
Object attachedObj = selectionKey.attachment();

還能夠在用register()方法向Selector註冊Channel的時候附加對象。如:
SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);

經過Selector選擇通道

一旦向Selector註冊了一或多個通道,就能夠調用幾個重載的select()方法。這些方法返回你所感興趣的事件(如鏈接、接受、讀或寫)已經準備就緒的那些通道。換句話說,若是你對「Read Ready」的通道感興趣,select()方法會返回讀事件已經就緒的那些通道:

  • int select(): 阻塞到至少有一個通道在你註冊的事件上就緒
  • int select(long timeout):select()同樣,除了最長會阻塞timeout毫秒(參數)
  • int selectNow(): 不會阻塞,無論什麼通道就緒都馬上返回,此方法執行非阻塞的選擇操做。若是自從前一次選擇操做後,沒有通道變成可選擇的,則此方法直接返回零。

select()方法返回的int值表示有多少通道已經就緒。亦即,自上次調用select()方法後有多少通道變成就緒狀態。若是調用select()方法,由於有一個通道變成就緒狀態,返回了1,若再次調用select()方法,若是另外一個通道就緒了,它會再次返回1。若是對第一個就緒的channel沒有作任何操做,如今就有兩個就緒的通道,但在每次select()方法調用之間,只有一個通道處於就緒狀態。

selectedKeys()

一旦調用了select()方法,它就會返回一個數值,表示一個或多個通道已經就緒,而後你就能夠經過調用selector.selectedKeys()方法返回的SelectionKey集合來得到就緒的Channel。請看演示方法:

Set<SelectionKey> selectedKeys = selector.selectedKeys();

當你經過Selector註冊一個Channel時,channel.register()方法會返回一個SelectionKey對象,這個對象就表明了你註冊的Channel。這些對象能夠經過selectedKeys()方法得到。你能夠經過迭代這些selected key來得到就緒的Channel,下面是演示代碼:

Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) { 
SelectionKey key = keyIterator.next();
if(key.isAcceptable()) {
    // a connection was accepted by a ServerSocketChannel.
} else if (key.isConnectable()) {
    // a connection was established with a remote server.
} else if (key.isReadable()) {
    // a channel is ready for reading
} else if (key.isWritable()) {
    // a channel is ready for writing
}
keyIterator.remove();
}

這個循環遍歷selected key的集合中的每一個key,並對每一個key作測試來判斷哪一個Channel已經就緒。

請注意循環中最後的keyIterator.remove()方法。Selector對象並不會從本身的selected key集合中自動移除SelectionKey實例。咱們須要在處理完一個Channel的時候本身去移除。當下一次Channel就緒的時候,Selector會再次把它添加到selected key集合中。

SelectionKey.channel()方法返回的Channel須要轉換成你具體要處理的類型,好比是ServerSocketChannel或者SocketChannel等等。

WakeUp()和Close()

某個線程調用select()方法後阻塞了,即便沒有通道就緒,也有辦法讓其從select()方法返回。只要讓其它線程在第一個線程調用select()方法的那個對象上調用Selector.wakeup()方法便可。阻塞在select()方法上的線程會立馬返回。

若是有其它線程調用了wakeup()方法,但當前沒有線程阻塞在select()方法上,下個調用select()方法的線程會當即「醒來(wake up)」

當用完Selector後調應道掉用close()方法,它將關閉Selector而且使註冊到該Selector上的全部SelectionKey實例無效。通道自己並不會關閉。

一個完整的例子

下面經過一個MultiPortEcho的例子來演示一下上面整個過程。

public class MultiPortEcho {
 private int ports[];
 private ByteBuffer echoBuffer = ByteBuffer.allocate(1024);
 public MultiPortEcho(int ports[]) throws IOException {
      this.ports = ports;
      go();
 }
 private void go() throws IOException {
      // 1. 建立一個selector,select是NIO中的核心對象
      // 它用來監聽各類感興趣的IO事件
      Selector selector = Selector.open();
      // 爲每一個端口打開一個監聽, 並把這些監聽註冊到selector中
      for (int i = 0; i < ports.length; ++i) {
           //2. 打開一個ServerSocketChannel
           //其實咱們沒監聽一個端口就須要一個channel
           ServerSocketChannel ssc = ServerSocketChannel.open();
           ssc.configureBlocking(false);//設置爲非阻塞
           ServerSocket ss = ssc.socket();
           InetSocketAddress address = new InetSocketAddress(ports[i]);
           ss.bind(address);//監聽一個端口
           //3. 註冊到selector
           //register的第一個參數永遠都是selector
           //第二個參數是咱們要監聽的事件
           //OP_ACCEPT是新創建鏈接的事件
           //也是適用於ServerSocketChannel的惟一事件類型
           SelectionKey key = ssc.register(selector, SelectionKey.OP_ACCEPT);
           System.out.println("Going to listen on " + ports[i]);
      }
      //4. 開始循環,咱們已經註冊了一些IO興趣事件
      while (true) {
           //這個方法會阻塞,直到至少有一個已註冊的事件發生。當一個或者更多的事件發生時
           // select() 方法將返回所發生的事件的數量。
           int num = selector.select();
           //返回發生了事件的 SelectionKey 對象的一個 集合
           Set selectedKeys = selector.selectedKeys();
           //咱們經過迭代 SelectionKeys 並依次處理每一個 SelectionKey 來處理事件
           //對於每個 SelectionKey,您必須肯定發生的是什麼 I/O 事件,以及這個事件影響哪些 I/O 對象。
           Iterator it = selectedKeys.iterator();
           while (it.hasNext()) {
                SelectionKey key = (SelectionKey) it.next();
                //5. 監聽新鏈接。程序執行到這裏,咱們僅註冊了 ServerSocketChannel
                //而且僅註冊它們「接收」事件。爲確認這一點
                //咱們對 SelectionKey 調用 readyOps() 方法,並檢查發生了什麼類型的事件
                if ((key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {
                     //6. 接收了一個新鏈接。由於咱們知道這個服務器套接字上有一個傳入鏈接在等待
                     //因此能夠安全地接受它;也就是說,不用擔憂 accept() 操做會阻塞
                     ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                     SocketChannel sc = ssc.accept();
                     sc.configureBlocking(false);
                     // 7. 講新鏈接註冊到selector。將新鏈接的 SocketChannel 配置爲非阻塞的
                     //並且因爲接受這個鏈接的目的是爲了讀取來自套接字的數據,因此咱們還必須將 SocketChannel 註冊到 Selector上
                     SelectionKey newKey = sc.register(selector,SelectionKey.OP_READ);
                     it.remove();
                     System.out.println("Got connection from " + sc);
                } else if ((key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
                     // Read the data
                     SocketChannel sc = (SocketChannel) key.channel();
                     // Echo data
                     int bytesEchoed = 0;
                     while (true) {
                          echoBuffer.clear();
                          int r = sc.read(echoBuffer);
                          if (r <= 0) {
                               break;
                          }
                          echoBuffer.flip();
                          sc.write(echoBuffer);
                          bytesEchoed += r;
                     }
                     System.out.println("Echoed " + bytesEchoed + " from " + sc);
                     it.remove();
                }
           }
           // System.out.println( "going to clear" );
           // selectedKeys.clear();
           // System.out.println( "cleared" );
      }
 }
 static public void main(String args2[]) throws Exception {
      String args[]={"9001","9002","9003"};
      if (args.length <= 0) {
           System.err.println("Usage: java MultiPortEcho port [port port ...]");
           System.exit(1);
      }
      int ports[] = new int[args.length];
      for (int i = 0; i < args.length; ++i) {
           ports[i] = Integer.parseInt(args[i]);
      }
      new MultiPortEcho(ports);
 }
 }
相關文章
相關標籤/搜索