異步IO
異步 I/O 是一種沒有阻塞地讀寫數據的方法。一般,在代碼進行 read() 調用時,代碼會阻塞直至有可供讀取的數據。一樣, write()調用將會阻塞直至數據可以寫入,關於同步的IO請參考另外一篇文章Java IO。java
另外一方面,異步 I/O 調用不但不會阻塞,相反,您能夠註冊對特定 I/O 事件諸如數據可讀、新鏈接到來等等,而在發生這樣感興趣的事件時,系統將會告訴您。編程
異步 I/O 的一個優點在於,它容許您同時根據大量的輸入和輸出執行 I/O。同步程序經常要求助於輪詢,或者建立許許多多的線程以處理大量的鏈接。使用異步 I/O,您能夠監放任何數量的通道上的事件,不用輪詢,也不用額外的線程。安全
Selector
在個人JavaNIO詳解(一)中已經詳細介紹了Java NIO三個核心對象中的Buffer和Channel,如今咱們就重點介紹一下第三個核心對象Selector。Selector是一個對象,它能夠註冊到不少個Channel上,監聽各個Channel上發生的事件,而且可以根據事件狀況決定Channel讀寫。這樣,經過一個線程管理多個Channel,就能夠處理大量網絡鏈接了。服務器
採用Selector模式的的好處
有了Selector,咱們就能夠利用一個線程來處理全部的channels。線程之間的切換對操做系統來講代價是很高的,而且每一個線程也會佔用必定的系統資源。因此,對系統來講使用的線程越少越好。網絡
可是,須要記住,現代的操做系統和CPU在多任務方面表現的愈來愈好,因此多線程的開銷隨着時間的推移,變得愈來愈小了。實際上,若是一個CPU有多個內核,不使用多任務多是在浪費CPU能力。無論怎麼說,關於那種設計的討論應該放在另外一篇不一樣的文章中。在這裏,只要知道使用Selector可以處理多個通道就足夠了。多線程
下面這幅圖展現了一個線程處理3個 Channel的狀況:
異步
異步 I/O 中的核心對象名爲 Selector。Selector 就是您註冊對各類 I/O 事件興趣的地方,並且當那些事件發生時,就是這個對象告訴您所發生的事件。socket
Selector selector = Selector.open();
而後,就須要註冊Channel到Selector了。測試
爲了能讓Channel和Selector配合使用,咱們須要把Channel註冊到Selector上。經過調用 channel.register()
方法來實現註冊:this
channel.configureBlocking(false); SelectionKey key =channel.register(selector,SelectionKey.OP_READ);
注意,註冊的Channel 必須設置成異步模式 才能夠,,不然異步IO就沒法工做,這就意味着咱們不能把一個FileChannel註冊到Selector,由於FileChannel沒有異步模式,可是網絡編程中的SocketChannel是能夠的。
須要注意register()方法的第二個參數,它是一個「interest set」,意思是註冊的Selector對Channel中的哪些時間感興趣,事件類型有四種:
通道觸發了一個事件意思是該事件已經 Ready(就緒)。因此,某個Channel成功鏈接到另外一個服務器稱爲 Connect Ready
。一個ServerSocketChannel準備好接收新鏈接稱爲 Accept Ready
,一個有數據可讀的通道能夠說是 Read Ready
,等待寫數據的通道能夠說是Write Ready
。
上面這四個事件對應到SelectionKey中的四個常量:
1. SelectionKey.OP_CONNECT 2. SelectionKey.OP_ACCEPT 3. SelectionKey.OP_READ 4. SelectionKey.OP_WRITE
若是你對多個事件感興趣,能夠經過or操做符來鏈接這些常量:
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
請注意對register()
的調用的返回值是一個SelectionKey。 SelectionKey 表明這個通道在此 Selector 上的這個註冊。當某個 Selector 通知您某個傳入事件時,它是經過提供對應於該事件的 SelectionKey 來進行的。SelectionKey 還能夠用於取消通道的註冊。SelectionKey中包含以下屬性:
就像咱們在前面講到的把Channel註冊到Selector來監聽感興趣的事件,interest set就是你要選擇的感興趣的事件的集合。你能夠經過SelectionKey對象來讀寫interest set:
int interestSet = selectionKey.interestOps(); boolean isInterestedInAccept = interestSet & SelectionKey.OP_ACCEPT; boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT; boolean isInterestedInRead = interestSet & SelectionKey.OP_READ; boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;
經過上面例子能夠看到,咱們能夠經過用AND 和SelectionKey 中的常量作運算,從SelectionKey中找到咱們感興趣的事件。
ready set 是通道已經準備就緒的操做的集合。在一次選Selection以後,你應該會首先訪問這個ready set。Selection將在下一小節進行解釋。能夠這樣訪問ready集合:
int readySet = selectionKey.readyOps(); 12
能夠用像檢測interest集合那樣的方法,來檢測Channel中什麼事件或操做已經就緒。可是,也可使用如下四個方法,它們都會返回一個布爾類型:
selectionKey.isAcceptable(); selectionKey.isConnectable(); selectionKey.isReadable(); selectionKey.isWritable();
咱們能夠經過SelectionKey得到Selector和註冊的Channel:
Channel channel = selectionKey.channel(); Selector selector = selectionKey.selector();
能夠將一個對象或者更多信息attach 到SelectionKey上,這樣就能方便的識別某個給定的通道。例如,能夠附加 與通道一塊兒使用的Buffer,或是包含彙集數據的某個對象。使用方法以下:
selectionKey.attach(theObject); Object attachedObj = selectionKey.attachment();
還能夠在用register()方法向Selector註冊Channel的時候附加對象。如:
SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);
一旦向Selector註冊了一或多個通道,就能夠調用幾個重載的select()
方法。這些方法返回你所感興趣的事件(如鏈接、接受、讀或寫)已經準備就緒的那些通道。換句話說,若是你對「Read Ready」的通道感興趣,select()方法會返回讀事件已經就緒的那些通道:
select()方法返回的int值表示有多少通道已經就緒。亦即,自上次調用select()方法後有多少通道變成就緒狀態。若是調用select()方法,由於有一個通道變成就緒狀態,返回了1,若再次調用select()方法,若是另外一個通道就緒了,它會再次返回1。若是對第一個就緒的channel沒有作任何操做,如今就有兩個就緒的通道,但在每次select()方法調用之間,只有一個通道處於就緒狀態。
一旦調用了select()
方法,它就會返回一個數值,表示一個或多個通道已經就緒,而後你就能夠經過調用selector.selectedKeys()
方法返回的SelectionKey集合來得到就緒的Channel。請看演示方法:
Set<SelectionKey> selectedKeys = selector.selectedKeys();
當你經過Selector註冊一個Channel時,channel.register()
方法會返回一個SelectionKey對象,這個對象就表明了你註冊的Channel。這些對象能夠經過selectedKeys()
方法得到。你能夠經過迭代這些selected key來得到就緒的Channel,下面是演示代碼:
Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> keyIterator = selectedKeys.iterator(); while(keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); if(key.isAcceptable()) { // a connection was accepted by a ServerSocketChannel. } else if (key.isConnectable()) { // a connection was established with a remote server. } else if (key.isReadable()) { // a channel is ready for reading } else if (key.isWritable()) { // a channel is ready for writing } keyIterator.remove(); }
這個循環遍歷selected key的集合中的每一個key,並對每一個key作測試來判斷哪一個Channel已經就緒。
請注意循環中最後的keyIterator.remove()
方法。Selector對象並不會從本身的selected key集合中自動移除SelectionKey實例。咱們須要在處理完一個Channel的時候本身去移除。當下一次Channel就緒的時候,Selector會再次把它添加到selected key集合中。
SelectionKey.channel()
方法返回的Channel須要轉換成你具體要處理的類型,好比是ServerSocketChannel或者SocketChannel等等。
某個線程調用select()方法後阻塞了,即便沒有通道就緒,也有辦法讓其從select()方法返回。只要讓其它線程在第一個線程調用select()方法的那個對象上調用Selector.wakeup()
方法便可。阻塞在select()方法上的線程會立馬返回。
若是有其它線程調用了wakeup()方法,但當前沒有線程阻塞在select()方法上,下個調用select()方法的線程會當即「醒來(wake up)」
當用完Selector後調應道掉用close()
方法,它將關閉Selector而且使註冊到該Selector上的全部SelectionKey實例無效。通道自己並不會關閉。
下面經過一個MultiPortEcho的例子來演示一下上面整個過程。
public class MultiPortEcho { private int ports[]; private ByteBuffer echoBuffer = ByteBuffer.allocate(1024); public MultiPortEcho(int ports[]) throws IOException { this.ports = ports; go(); } private void go() throws IOException { // 1. 建立一個selector,select是NIO中的核心對象 // 它用來監聽各類感興趣的IO事件 Selector selector = Selector.open(); // 爲每一個端口打開一個監聽, 並把這些監聽註冊到selector中 for (int i = 0; i < ports.length; ++i) { //2. 打開一個ServerSocketChannel //其實咱們沒監聽一個端口就須要一個channel ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false);//設置爲非阻塞 ServerSocket ss = ssc.socket(); InetSocketAddress address = new InetSocketAddress(ports[i]); ss.bind(address);//監聽一個端口 //3. 註冊到selector //register的第一個參數永遠都是selector //第二個參數是咱們要監聽的事件 //OP_ACCEPT是新創建鏈接的事件 //也是適用於ServerSocketChannel的惟一事件類型 SelectionKey key = ssc.register(selector, SelectionKey.OP_ACCEPT); System.out.println("Going to listen on " + ports[i]); } //4. 開始循環,咱們已經註冊了一些IO興趣事件 while (true) { //這個方法會阻塞,直到至少有一個已註冊的事件發生。當一個或者更多的事件發生時 // select() 方法將返回所發生的事件的數量。 int num = selector.select(); //返回發生了事件的 SelectionKey 對象的一個 集合 Set selectedKeys = selector.selectedKeys(); //咱們經過迭代 SelectionKeys 並依次處理每一個 SelectionKey 來處理事件 //對於每個 SelectionKey,您必須肯定發生的是什麼 I/O 事件,以及這個事件影響哪些 I/O 對象。 Iterator it = selectedKeys.iterator(); while (it.hasNext()) { SelectionKey key = (SelectionKey) it.next(); //5. 監聽新鏈接。程序執行到這裏,咱們僅註冊了 ServerSocketChannel //而且僅註冊它們「接收」事件。爲確認這一點 //咱們對 SelectionKey 調用 readyOps() 方法,並檢查發生了什麼類型的事件 if ((key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) { //6. 接收了一個新鏈接。由於咱們知道這個服務器套接字上有一個傳入鏈接在等待 //因此能夠安全地接受它;也就是說,不用擔憂 accept() 操做會阻塞 ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel sc = ssc.accept(); sc.configureBlocking(false); // 7. 講新鏈接註冊到selector。將新鏈接的 SocketChannel 配置爲非阻塞的 //並且因爲接受這個鏈接的目的是爲了讀取來自套接字的數據,因此咱們還必須將 SocketChannel 註冊到 Selector上 SelectionKey newKey = sc.register(selector,SelectionKey.OP_READ); it.remove(); System.out.println("Got connection from " + sc); } else if ((key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) { // Read the data SocketChannel sc = (SocketChannel) key.channel(); // Echo data int bytesEchoed = 0; while (true) { echoBuffer.clear(); int r = sc.read(echoBuffer); if (r <= 0) { break; } echoBuffer.flip(); sc.write(echoBuffer); bytesEchoed += r; } System.out.println("Echoed " + bytesEchoed + " from " + sc); it.remove(); } } // System.out.println( "going to clear" ); // selectedKeys.clear(); // System.out.println( "cleared" ); } } static public void main(String args2[]) throws Exception { String args[]={"9001","9002","9003"}; if (args.length <= 0) { System.err.println("Usage: java MultiPortEcho port [port port ...]"); System.exit(1); } int ports[] = new int[args.length]; for (int i = 0; i < args.length; ++i) { ports[i] = Integer.parseInt(args[i]); } new MultiPortEcho(ports); } }