java傳統bio編程概念: http://www.cnblogs.com/carl10086/p/6034563.html#_label4html
import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; /** * @author lilinfeng * @version 1.0 * @date 2014年2月14日 */ public class TimeServer { /** * @param args * @throws IOException */ public static void main(String[] args) throws IOException { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 採用默認值 } } ServerSocket server = null; try { server = new ServerSocket(port); System.out.println("The time server is start in port : " + port); Socket socket = null; while (true) { socket = server.accept(); new Thread(new TimeServerHandler(socket)).start(); } } finally { if (server != null) { System.out.println("The time server close"); server.close(); server = null; } } } }
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; /** * @author Administrator * @version 1.0 * @date 2014年2月14日 */ public class TimeServerHandler implements Runnable { private Socket socket; public TimeServerHandler(Socket socket) { this.socket = socket; } /* * (non-Javadoc) * * @see java.lang.Runnable#run() */ @Override public void run() { BufferedReader in = null; PrintWriter out = null; try { in = new BufferedReader(new InputStreamReader( this.socket.getInputStream())); out = new PrintWriter(this.socket.getOutputStream(), true); String currentTime = null; String body = null; while (true) { body = in.readLine(); if (body == null) break; System.out.println("The time server receive order : " + body); currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date( System.currentTimeMillis()).toString() : "BAD ORDER"; out.println(currentTime); } } catch (Exception e) { if (in != null) { try { in.close(); } catch (IOException e1) { e1.printStackTrace(); } } if (out != null) { out.close(); out = null; } if (this.socket != null) { try { this.socket.close(); } catch (IOException e1) { e1.printStackTrace(); } this.socket = null; } } } }
import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import com.phei.netty.bio.TimeServerHandler; /** * @author lilinfeng * @version 1.0 * @date 2014年2月14日 */ public class TimeServer { /** * @param args * @throws IOException */ public static void main(String[] args) throws IOException { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 採用默認值 } } ServerSocket server = null; try { server = new ServerSocket(port); System.out.println("The time server is start in port : " + port); Socket socket = null; TimeServerHandlerExecutePool singleExecutor = new TimeServerHandlerExecutePool( 50, 10000);// 建立IO任務線程池 while (true) { socket = server.accept(); singleExecutor.execute(new TimeServerHandler(socket)); } } finally { if (server != null) { System.out.println("The time server close"); server.close(); server = null; } } } }
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * @author Administrator * @version 1.0 * @date 2014年2月15日 */ public class TimeServerHandlerExecutePool { private ExecutorService executor; public TimeServerHandlerExecutePool(int maxPoolSize, int queueSize) { executor = new ThreadPoolExecutor(Runtime.getRuntime() .availableProcessors(), maxPoolSize, 120L, TimeUnit.SECONDS, new ArrayBlockingQueue<java.lang.Runnable>(queueSize)); } public void execute(java.lang.Runnable task) { executor.execute(task); } }
這裏列舉一些僞異步可能阻塞的地方:前端
參考jdk文檔 關於Socket中InputStream的read()操做:java
Reads some number of bytes from the input stream and stores them into the buffer array b. The number of bytes actually read is returned as an integer. This method blocks until input data is available, end of file is detected, or an exception is thrown.linux
這個方法有三種狀況下會阻塞:編程
說明當發生網絡阻塞或者說網絡問題的情形時,線程池中的線程會阻塞住,若是線程池沒有可用線程會一直排隊。線程池和阻塞隊列的內容介紹能夠參考:java Blocking Queue。數組
write也同樣:也會阻塞至字節所有被寫完或者異常。服務器
學習過TCP/IP相關內容,當消息的接收方處理緩慢的時候,將不能及時地從TCP緩衝區讀取數據,這將會致使發送方的TCP window size不斷減小,直到爲0,雙方處於Keep-alive狀態,消息發送方將不能再向TCP緩衝區寫入消息,若是這裏採用的是同步阻塞IO,write操做將會被無限期阻塞,直到window size>0或者發生I/O異常。markdown
所以,這種僞異步在網絡不佳的狀況下可能會出現各類連鎖問題:網絡
(1) 服務器處理緩慢多線程
(2) 假如全部的可用線程都被阻塞,隊列會排隊至內存沒法容納
(3) 前端只有一個Acceptor線程,這個線程不能出問題,不然消息都會被拒絕掉
(4) 容易崩潰
Java NIO 的通道相似流,但又有些不一樣:
- 既能夠從通道中讀取數據,又能夠寫數據到通道。但流的讀寫一般是單向的。
- 通道能夠非阻塞地讀寫。
- 通道中的數據老是要先讀到一個 Buffer,或者老是要從一個 Buffer 中寫入。
正如上面所說,從通道讀取數據到緩衝區,從緩衝區寫入數據到通道。以下圖所示:
Channel 的實現
這些是 Java NIO 中最重要的通道的實現:
- FileChannel
- DatagramChannel
- SocketChannel
- ServerSocketChannel
FileChannel 從文件中讀寫數據。
DatagramChannel 能經過 UDP 讀寫網絡中的數據。
SocketChannel 能經過 TCP 讀寫網絡中的數據。
ServerSocketChannel 能夠監聽新進來的 TCP 鏈接,像 Web 服務器那樣。對每個新進來的鏈接都會建立一個 SocketChannel。
Java NIO 中的 Buffer 用於和 NIO 通道進行交互。如你所知,數據是從通道讀入緩衝區,從緩衝區寫入到通道中的。
緩衝區本質上是一塊能夠寫入數據,而後能夠從中讀取數據的內存。這塊內存被包裝成 NIO Buffer 對象,並提供了一組方法,用來方便的訪問該塊內存。
Buffer 的基本用法
使用 Buffer 讀寫數據通常遵循如下四個步驟:
- 寫入數據到 Buffer
- 調用flip()方法
- 從 Buffer 中讀取數據
- 調用clear()方法或者compact()方法
當向 buffer 寫入數據時,buffer 會記錄下寫了多少數據。一旦要讀取數據,須要經過 flip() 方法將 Buffer 從寫模式切換到讀模式。在讀模式下,能夠讀取以前寫入到 buffer 的全部數據。
一旦讀完了全部的數據,就須要清空緩衝區,讓它能夠再次被寫入。有兩種方式能清空緩衝區:調用 clear() 或 compact() 方法。clear() 方法會清空整個緩衝區。compact() 方法只會清除已經讀過的數據。任何未讀的數據都被移到緩衝區的起始處,新寫入的數據將放到緩衝區未讀數據的後面。
下面是一個使用 Buffer 的例子:
RandomAccessFile aFile = new RandomAccessFile("data/nio-data.txt", "rw"); FileChannel inChannel = aFile.getChannel(); //create buffer with capacity of 48 bytes ByteBuffer buf = ByteBuffer.allocate(48); int bytesRead = inChannel.read(buf); //read into buffer. while (bytesRead != -1) { buf.flip(); //make buffer ready for read while(buf.hasRemaining()){ System.out.print((char) buf.get()); // read 1 byte at a time } buf.clear(); //make buffer ready for writing bytesRead = inChannel.read(buf); } aFile.close();
Buffer 的 capacity,position 和 limit
緩衝區本質上是一塊能夠寫入數據,而後能夠從中讀取數據的內存。這塊內存被包裝成 NIO Buffer 對象,並提供了一組方法,用來方便的訪問該塊內存。
爲了理解 Buffer 的工做原理,須要熟悉它的三個屬性:
- capacity
- position
- limit
position 和 limit 的含義取決於 Buffer 處在讀模式仍是寫模式。無論 Buffer 處在什麼模式,capacity 的含義老是同樣的。
這裏有一個關於 capacity,position 和 limit 在讀寫模式中的說明,詳細的解釋在插圖後面。
capacity
做爲一個內存塊,Buffer 有一個固定的大小值,也叫 「capacity」。 你只能往裏寫 capacity 個 byte、long,char 等類型。一旦 Buffer 滿了,須要將其清空(經過讀數據或者清除數據)才能繼續寫數據往裏寫數據。
position
當你寫數據到 Buffer 中時,position 表示當前的位置。初始的 position 值爲 0. 當一個 byte、long 等數據寫到 Buffer 後, position 會向前移動到下一個可插入數據的 Buffer 單元。position 最大可爲 capacity – 1。
當讀取數據時,也是從某個特定位置讀。當將 Buffer 從寫模式切換到讀模式,position 會被重置爲 0。當從 Buffer 的 position 處讀取數據時,position 向前移動到下一個可讀的位置。
limit
在寫模式下,Buffer 的 limit 表示你最多能往 Buffer 裏寫多少數據。 寫模式下,limit 等於 Buffer 的 capacity。
當切換 Buffer 到讀模式時,limit 表示你最多能讀到多少數據。所以,當切換 Buffer 到讀模式時,limit 會被設置成寫模式下的 position 值。換句話說,你能讀到以前寫入的全部數據(limit 被設置成已寫數據的數量,這個值在寫模式下就是 position)
Buffer 的類型
Java NIO 有如下 Buffer 類型
- ByteBuffer
- MappedByteBuffer
- CharBuffer
- DoubleBuffer
- FloatBuffer
- IntBuffer
- LongBuffer
- ShortBuffer
如你所見,這些 Buffer 類型表明了不一樣的數據類型。換句話說,就是能夠經過 char,short,int,long,float 或 double 類型來操做緩衝區中的字節。
MappedByteBuffer 有些特別,在涉及它的專門章節中再講。
Buffer 的分配
要想得到一個 Buffer 對象首先要進行分配。 每個 Buffer 類都有一個 allocate 方法。下面是一個分配 48 字節 capacity 的 ByteBuffer 的例子。
ByteBuffer buf = ByteBuffer.allocate(48);
這是分配一個可存儲 1024 個字符的 CharBuffer:
CharBuffer buf = CharBuffer.allocate(1024);
向 Buffer 中寫數據
寫數據到 Buffer 有兩種方式:
- 從 Channel 寫到 Buffer。
- 經過 Buffer 的 put() 方法寫到 Buffer 裏。
從 Channel 寫到 Buffer 的例子
int bytesRead = inChannel.read(buf); //read into buffer.
經過 put 方法寫 Buffer 的例子:
buf.put(127);
put 方法有不少版本,容許你以不一樣的方式把數據寫入到 Buffer 中。例如, 寫到一個指定的位置,或者把一個字節數組寫入到 Buffer。 更多 Buffer 實現的細節參考 JavaDoc。
flip() 方法
flip 方法將 Buffer 從寫模式切換到讀模式。調用 flip() 方法會將 position 設回 0,並將 limit 設置成以前 position 的值。
換句話說,position 如今用於標記讀的位置,limit 表示以前寫進了多少個 byte、char 等 —— 如今能讀取多少個 byte、char 等。
從 Buffer 中讀取數據
從 Buffer 中讀取數據有兩種方式:
- 從 Buffer 讀取數據到 Channel。
- 使用 get() 方法從 Buffer 中讀取數據。
從 Buffer 讀取數據到 Channel 的例子:
//read from buffer into channel. int bytesWritten = inChannel.write(buf);使用 get() 方法從 Buffer 中讀取數據的例子
byte aByte = buf.get();get 方法有不少版本,容許你以不一樣的方式從 Buffer 中讀取數據。例如,從指定 position 讀取,或者從 Buffer 中讀取數據到字節數組。更多 Buffer 實現的細節參考 JavaDoc。
rewind() 方法
Buffer.rewind() 將 position 設回 0,因此你能夠重讀 Buffer 中的全部數據。limit 保持不變,仍然表示能從 Buffer 中讀取多少個元素(byte、char 等)。
clear() 與 compact() 方法
一旦讀完 Buffer 中的數據,須要讓 Buffer 準備好再次被寫入。能夠經過 clear() 或 compact() 方法來完成。
若是調用的是 clear() 方法,position 將被設回 0,limit 被設置成 capacity 的值。換句話說,Buffer 被清空了。Buffer 中的數據並未清除,只是這些標記告訴咱們能夠從哪裏開始往 Buffer 裏寫數據。
若是 Buffer 中有一些未讀的數據,調用 clear() 方法,數據將 「被遺忘」,意味着再也不有任何標記會告訴你哪些數據被讀過,哪些尚未。
若是 Buffer 中仍有未讀的數據,且後續還須要這些數據,可是此時想要先先寫些數據,那麼使用 compact() 方法。
compact() 方法將全部未讀的數據拷貝到 Buffer 起始處。而後將 position 設到最後一個未讀元素正後面。limit 屬性依然像 clear() 方法同樣,設置成 capacity。如今 Buffer 準備好寫數據了,可是不會覆蓋未讀的數據。
mark() 與 reset() 方法
經過調用 Buffer.mark() 方法,能夠標記 Buffer 中的一個特定 position。以後能夠經過調用 Buffer.reset() 方法恢復到這個 position。例如:
buffer.mark(); //call buffer.get() a couple of times, e.g. during parsing. buffer.reset(); //set position back to mark.
equals() 與 compareTo() 方法
可使用 equals() 和 compareTo() 方法兩個 Buffer。
equals()
當知足下列條件時,表示兩個 Buffer 相等:
- 有相同的類型(byte、char、int 等)。
- Buffer 中剩餘的 byte、char 等的個數相等。
- Buffer 中全部剩餘的 byte、char 等都相同。
如你所見,equals 只是比較 Buffer 的一部分,不是每個在它裏面的元素都比較。實際上,它只比較 Buffer 中的剩餘元素。
compareTo() 方法
compareTo() 方法比較兩個 Buffer 的剩餘元素 (byte、char 等), 若是知足下列條件,則認爲一個 Buffer「小於」 另外一個 Buffer:
- 第一個不相等的元素小於另外一個 Buffer 中對應的元素 。
- 全部元素都相等,但第一個 Buffer 比另外一個先耗盡 (第一個 Buffer 的元素個數比另外一個少)。
(譯註:剩餘元素是從 position 到 limit 之間的元素)
Selector(選擇器)是 Java NIO 中可以檢測一到多個 NIO 通道,並可以知曉通道是否爲諸如讀寫事件作好準備的組件。這樣,一個單獨的線程能夠管理多個 channel,從而管理多個網絡鏈接。
爲何使用 Selector?
僅用單個線程來處理多個 Channels 的好處是,只須要更少的線程來處理通道。事實上,能夠只用一個線程處理全部的通道。對於操做系統來講,線程之間上下文切換的開銷很大,並且每一個線程都要佔用系統的一些資源(如內存)。所以,使用的線程越少越好。
可是,須要記住,現代的操做系統和 CPU 在多任務方面表現的愈來愈好,因此多線程的開銷隨着時間的推移,變得愈來愈小了。實際上,若是一個 CPU 有多個內核,不使用多任務多是在浪費 CPU 能力。無論怎麼說,關於那種設計的討論應該放在另外一篇不一樣的文章中。在這裏,只要知道使用 Selector 可以處理多個通道就足夠了。
下面是單線程使用一個 Selector 處理 3 個 channel 的示例圖:
Selector 的建立
經過調用 Selector.open() 方法建立一個 Selector,以下:
Selector selector = Selector.open();向 Selector 註冊通道
爲了將 Channel 和 Selector 配合使用,必須將 channel 註冊到 selector 上。經過 SelectableChannel.register() 方法來實現,以下:
channel.configureBlocking(false); SelectionKey key = channel.register(selector, Selectionkey.OP_READ)與 Selector 一塊兒使用時,Channel 必須處於非阻塞模式下。這意味着不能將 FileChannel 與 Selector 一塊兒使用,由於 FileChannel 不能切換到非阻塞模式。而套接字通道均可以。
注意 register() 方法的第二個參數。這是一個 「interest 集合」,意思是在經過 Selector 監聽 Channel 時對什麼事件感興趣。能夠監聽四種不一樣類型的事件:
- Connect
- Accept
- Read
- Write
通道觸發了一個事件意思是該事件已經就緒。因此,某個 channel 成功鏈接到另外一個服務器稱爲 「鏈接就緒」。一個 server socket channel 準備好接收新進入的鏈接稱爲 「接收就緒」。一個有數據可讀的通道能夠說是 「讀就緒」。等待寫數據的通道能夠說是 「寫就緒」。
這四種事件用 SelectionKey 的四個常量來表示:
- SelectionKey.OP_CONNECT
- SelectionKey.OP_ACCEPT
- SelectionKey.OP_READ
- SelectionKey.OP_WRITE
若是你對不止一種事件感興趣,那麼能夠用 「位或」 操做符將常量鏈接起來,以下:
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;在下面還會繼續提到 interest 集合
SelectionKey
在上一小節中,當向 Selector 註冊 Channel 時,register() 方法會返回一個 SelectionKey 對象。這個對象包含了一些你感興趣的屬性:
- interest 集合
- ready 集合
- Channel
- Selector
- 附加的對象(可選)
下面我會描述這些屬性。
interest 集合
就像向 Selector 註冊通道一節中所描述的,interest 集合是你所選擇的感興趣的事件集合。能夠經過 SelectionKey 讀寫 interest 集合,像這樣:
int interestSet = selectionKey.interestOps(); boolean isInterestedInAccept = (interestSet & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT; boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT; boolean isInterestedInRead = interestSet & SelectionKey.OP_READ; boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;
能夠看到,用 「位與」 操做 interest 集合和給定的 SelectionKey 常量,能夠肯定某個肯定的事件是否在 interest 集合中。
ready 集合
ready 集合是通道已經準備就緒的操做的集合。在一次選擇 (Selection) 以後,你會首先訪問這個 ready set。Selection 將在下一小節進行解釋。能夠這樣訪問 ready 集合:
int readySet = selectionKey.readyOps();
能夠用像檢測 interest 集合那樣的方法,來檢測 channel 中什麼事件或操做已經就緒。可是,也可使用如下四個方法,它們都會返回一個布爾類型:
selectionKey.isAcceptable(); selectionKey.isConnectable(); selectionKey.isReadable(); selectionKey.isWritable();
Channel + Selector
從 SelectionKey 訪問 Channel 和 Selector 很簡單。以下:
Channel channel = selectionKey.channel(); Selector selector = selectionKey.selector();
還能夠在用 register() 方法向 Selector 註冊 Channel 的時候附加對象。如:
SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);
Selector 選擇通道
一旦向 Selector 註冊了一或多個通道,就能夠調用幾個重載的 select() 方法。這些方法返回你所感興趣的事件(如鏈接、接受、讀或寫)已經準備就緒的那些通道。換句話說,若是你對 「讀就緒」 的通道感興趣,select() 方法會返回讀事件已經就緒的那些通道。
下面是 select() 方法:
- int select()
- int select(long timeout)
- int selectNow()
select()阻塞到至少有一個通道在你註冊的事件上就緒了。
select(long timeout)和 select() 同樣,除了最長會阻塞 timeout 毫秒 (參數)。
selectNow()不會阻塞,無論什麼通道就緒都馬上返回(譯者注:此方法執行非阻塞的選擇操做。若是自從前一次選擇操做後,沒有通道變成可選擇的,則此方法直接返回零。)。
select() 方法返回的 int 值表示有多少通道已經就緒。亦即,自上次調用 select() 方法後有多少通道變成就緒狀態。若是調用 select() 方法,由於有一個通道變成就緒狀態,返回了 1,若再次調用 select() 方法,若是另外一個通道就緒了,它會再次返回 1。若是對第一個就緒的 channel 沒有作任何操做,如今就有兩個就緒的通道,但在每次 select() 方法調用之間,只有一個通道就緒了。
selectedKeys()
一旦調用了 select() 方法,而且返回值代表有一個或更多個通道就緒了,而後能夠經過調用 selector 的 selectedKeys() 方法,訪問 「已選擇鍵集(selected key set)」 中的就緒通道。以下所示:
Set selectedKeys = selector.selectedKeys();
當像 Selector 註冊 Channel 時,Channel.register() 方法會返回一個 SelectionKey 對象。這個對象表明了註冊到該 Selector 的通道。能夠經過 SelectionKey 的 selectedKeySet() 方法訪問這些對象。
能夠遍歷這個已選擇的鍵集合來訪問就緒的通道。以下:
Set selectedKeys = selector.selectedKeys(); Iterator keyIterator = selectedKeys.iterator(); while(keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); if(key.isAcceptable()) { // a connection was accepted by a ServerSocketChannel. } else if (key.isConnectable()) { // a connection was established with a remote server. } else if (key.isReadable()) { // a channel is ready for reading } else if (key.isWritable()) { // a channel is ready for writing } keyIterator.remove(); }
這個循環遍歷已選擇鍵集中的每一個鍵,並檢測各個鍵所對應的通道的就緒事件。
注意每次迭代末尾的 keyIterator.remove() 調用。Selector 不會本身從已選擇鍵集中移除 SelectionKey 實例。必須在處理完通道時本身移除。下次該通道變成就緒時,Selector 會再次將其放入已選擇鍵集中。
SelectionKey.channel() 方法返回的通道須要轉型成你要處理的類型,如 ServerSocketChannel 或 SocketChannel 等。
wakeUp()
某個線程調用 select() 方法後阻塞了,即便沒有通道已經就緒,也有辦法讓其從 select() 方法返回。只要讓其它線程在第一個線程調用 select() 方法的那個對象上調用 Selector.wakeup() 方法便可。阻塞在 select() 方法上的線程會立馬返回。
若是有其它線程調用了 wakeup() 方法,但當前沒有線程阻塞在 select() 方法上,下個調用 select() 方法的線程會當即 「醒來(wake up)」。
close()
用完 Selector 後調用其 close() 方法會關閉該 Selector,且使註冊到該 Selector 上的全部 SelectionKey 實例無效。通道自己並不會關閉。
完整的示例
這裏有一個完整的示例,打開一個 Selector,註冊一個通道註冊到這個 Selector 上 (通道的初始化過程略去), 而後持續監控這個 Selector 的四種事件(接受,鏈接,讀,寫)是否就緒。
Selector selector = Selector.open(); channel.configureBlocking(false); SelectionKey key = channel.register(selector, SelectionKey.OP_READ); while(true) { int readyChannels = selector.select(); if(readyChannels == 0) continue; Set selectedKeys = selector.selectedKeys(); Iterator keyIterator = selectedKeys.iterator(); while(keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); if(key.isAcceptable()) { // a connection was accepted by a ServerSocketChannel. } else if (key.isConnectable()) { // a connection was established with a remote server. } else if (key.isReadable()) { // a channel is ready for reading } else if (key.isWritable()) { // a channel is ready for writing } keyIterator.remove(); }j }
首先看序列圖:
大體步驟以下:
1. 打開ServerSocketChannel,用於監聽,它是全部客戶端鏈接的父管道:
ServerSocketChannel servChannel = ServerSocketChannel.open();
2. 綁定監聽端口,設置爲非阻塞模式,代碼以下:
servChannel.configureBlocking(false); servChannel.socket().bind(new InetSocketAddress(port), 1024);
3. 建立Reactor線程,建立多路複用器並啓動線程,代碼以下:
Selector selector = Selector.open();
new Thread(new ReactorTask()).start();
4. 將ServerSocketChannel註冊到Selector上,而且監聽ACCEPT事件
servChannel.register(selector, SelectionKey.OP_ACCEPT);
5. 多路複用器在線程run方法的無限循環體內輪詢準備就緒的Key
while (!stop) { try { selector.select(1000); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectedKeys.iterator(); SelectionKey key = null; while (it.hasNext()) { key = it.next(); it.remove(); try { handleInput(key); } catch (Exception e) { if (key != null) { key.cancel(); if (key.channel() != null) key.channel().close(); } } } } catch (Throwable t) { t.printStackTrace(); } }
6. 假若有新的connection接入,處理新的接入請求,完成TCP三次握手,創建物理鏈路,而且設置爲非阻塞模式,註冊到Selector中監聽讀事件,示例代碼以下:
if (key.isAcceptable()) { // Accept the new connection ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel sc = ssc.accept(); sc.configureBlocking(false); sc.socket().setReuseAddress(true); // Add the new connection to the selector sc.register(selector, SelectionKey.OP_READ); }
7. 若是發生讀事件,異步讀取客戶端請求消息到緩衝區,示例代碼以下:
int readNumber = channel.read(receivedBuffer);
8. 對ByteBuffer進行解碼,若是有半包消息指針,繼續讀取後續的報文,將解碼成功的消息封裝成Task,投遞到業務線程池中,進行業務邏輯
Object message = null; while( buffer.hasRemain() ){ byteBuffer.mark(); Object message = decode(byteBuffer); if (message == null) { byteBuffer.reset(); break; } if (!byteBuffer.hasRemain()) { byteBuffer.clear(); } else { byteBuffer.compact(); } if (messageList !=null && !messageList.isEmpty() ){ for (Object messageE : messageList ) { handlerTask(messageE); } } }
9. 最後,將消息寫出
socketChannel.write(buffer);
注意: 若是發送區TCP緩衝區滿,會出現寫半包,須要註冊監聽寫操做位,循環寫,直到整包消息寫入TCP緩衝區。
import java.io.IOException; /** * @author lilinfeng * @version 1.0 * @date 2014年2月14日 */ public class TimeServer { /** * @param args * @throws IOException */ public static void main(String[] args) throws IOException { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 採用默認值 } } MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port); new Thread(timeServer, "NIO-MultiplexerTimeServer-001").start(); } }
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; /** * @author Administrator * @version 1.0 * @date 2014年2月16日 */ public class MultiplexerTimeServer implements Runnable { private Selector selector; private ServerSocketChannel servChannel; private volatile boolean stop; /** * 初始化多路複用器、綁定監聽端口 * * @param port */ public MultiplexerTimeServer(int port) { try { selector = Selector.open(); servChannel = ServerSocketChannel.open(); servChannel.configureBlocking(false); servChannel.socket().bind(new InetSocketAddress(port), 1024); servChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("The time server is start in port : " + port); } catch (IOException e) { e.printStackTrace(); System.exit(1); } } public void stop() { this.stop = true; } /* * (non-Javadoc) * * @see java.lang.Runnable#run() */ @Override public void run() { while (!stop) { try { //設置休眠時間爲1s。不管有讀寫事件發生,selector每隔1s被喚醒一次。 selector.select(1000); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectedKeys.iterator(); SelectionKey key = null; while (it.hasNext()) { key = it.next(); it.remove(); try { handleInput(key); } catch (Exception e) { if (key != null) { key.cancel(); if (key.channel() != null) key.channel().close(); } } } } catch (Throwable t) { t.printStackTrace(); } } // 多路複用器關閉後,全部註冊在上面的Channel和Pipe等資源都會被自動去註冊並關閉,因此不須要重複釋放資源 if (selector != null) try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } private void handleInput(SelectionKey key) throws IOException { if (key.isValid()) { // 處理新接入的請求消息 if (key.isAcceptable()) { // Accept the new connection ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); //建立tcp 3次握手 虛鏈路... SocketChannel sc = ssc.accept(); //設置爲非阻塞模式 sc.configureBlocking(false); sc.socket().setReuseAddress(true); // Add the new connection to the selector sc.register(selector, SelectionKey.OP_READ); } if (key.isReadable()) { // Read the data SocketChannel sc = (SocketChannel) key.channel(); // 不知道客戶端發送的碼流大小,做爲例子,開闢1024的緩衝區。 ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(readBuffer); // 返回值>0,表示有字節,對字節進行解碼 if (readBytes > 0) { readBuffer.flip(); //flip以後才能夠進行讀操做 byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); String body = new String(bytes, "UTF-8"); System.out.println("The time server receive order : " + body); //若是請求內容是"Query time order"就回復,不然回覆"BAD ORDER" String currentTime = "QUERY TIME ORDER" .equalsIgnoreCase(body) ? new java.util.Date( System.currentTimeMillis()).toString() : "BAD ORDER"; doWrite(sc, currentTime); } else if (readBytes < 0) { // 對端鏈路關閉 key.cancel(); sc.close(); } else ; // 讀到0字節,忽略 } } } private void doWrite(SocketChannel channel, String response) throws IOException { if (response != null && response.trim().length() > 0) { byte[] bytes = response.getBytes(); //異步操做,沒法保證發送完,會出現"寫半包"問題,須要註冊寫操做,而後不斷輪詢Selector是否發送完畢,例子中不處理,太麻煩。 ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); channel.write(writeBuffer); } } }
public class TimeClient { /** * @param args */ public static void main(String[] args) { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 採用默認值 } } new Thread(new TimeClientHandle("127.0.0.1", port), "TimeClient-001") .start(); } }
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; /** * @author Administrator * @date 2014年2月16日 * @version 1.0 */ public class TimeClientHandle implements Runnable { private String host; private int port; private Selector selector; private SocketChannel socketChannel; private volatile boolean stop; public TimeClientHandle(String host, int port) { this.host = host == null ? "127.0.0.1" : host; this.port = port; try { selector = Selector.open(); socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); } catch (IOException e) { e.printStackTrace(); System.exit(1); } } /* * (non-Javadoc) * * @see java.lang.Runnable#run() */ @Override public void run() { try { doConnect(); } catch (IOException e) { e.printStackTrace(); System.exit(1); } while (!stop) { try { selector.select(1000); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectedKeys.iterator(); SelectionKey key = null; while (it.hasNext()) { key = it.next(); it.remove(); try { handleInput(key); } catch (Exception e) { if (key != null) { key.cancel(); if (key.channel() != null) key.channel().close(); } } } } catch (Exception e) { e.printStackTrace(); System.exit(1); } } // 多路複用器關閉後,全部註冊在上面的Channel和Pipe等資源都會被自動去註冊並關閉,因此不須要重複釋放資源 if (selector != null) try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } private void handleInput(SelectionKey key) throws IOException { if (key.isValid()) { // 判斷是否鏈接成功 SocketChannel sc = (SocketChannel) key.channel(); if (key.isConnectable()) { if (sc.finishConnect()) { sc.register(selector, SelectionKey.OP_READ); doWrite(sc); } else System.exit(1);// 鏈接失敗,進程退出 } if (key.isReadable()) { ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(readBuffer); if (readBytes > 0) { readBuffer.flip(); byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); String body = new String(bytes, "UTF-8"); System.out.println("Now is : " + body); this.stop = true; } else if (readBytes < 0) { // 對端鏈路關閉 key.cancel(); sc.close(); } else ; // 讀到0字節,忽略 } } } private void doConnect() throws IOException { // 若是直接鏈接成功,則註冊到多路複用器上,發送請求消息,讀應答 if (socketChannel.connect(new InetSocketAddress(host, port))) { socketChannel.register(selector, SelectionKey.OP_READ); doWrite(socketChannel); } else socketChannel.register(selector, SelectionKey.OP_CONNECT); } private void doWrite(SocketChannel sc) throws IOException { byte[] req = "QUERY TIME ORDER".getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); writeBuffer.put(req); writeBuffer.flip(); sc.write(writeBuffer); if (!writeBuffer.hasRemaining()) System.out.println("Send order 2 server succeed."); } }
(1) 建立Selector和SocketChannel對象,能夠設定SocketChannel的TCP參數
(2) 發送鏈接請求,做爲例子,鏈接是成功的,不須要進行重連操做。 doConnect中,若是鏈接成功,註冊READ事件,若是沒有成功,服務器端沒有應答信息,不表示鏈接失敗。須要註冊CONNECT事件,當服務器返回syn-ack消息後,Selector可以輪詢到這個SocketChannel處於鏈接就緒狀態。
(3) 輪詢selector,當有就緒的Channel,則使用handleInput(key)方法
(4) 對SelectorKey進行判斷,若是是鏈接事件,則註冊監聽READ操做,而且寫入請求。
(5) doWrite中直接使用ByteBuffer進行寫請求,一樣是異步,一樣有"半寫包"問題,這裏暫時略過。
(6) 若是是可讀事件,一樣不知道會有多少的響應,所以暫時分配1M,進行異步接收,因爲是異步的,因此一樣須要對結果進行判斷,此處再也不描述,也不解決TCP粘包問題
(7) 最後客戶端主動關閉鏈接...結束
例子中,咱們不少問題都沒有處理,好比"半包讀"和"半包寫"等等,NIO的代碼很是的複雜,若是要真正寫的話,可是NIO依舊被廣發使用...
緣由以下:
(1) 客戶端鏈接操做能夠no-blocking的方式,能夠經過多路複用器註冊OP_CONNECT事件,而後輪詢SelectKey的方式等待結果。
(2) SocketChannel的讀寫都是no-blocking的方式,即非阻塞的方式,沒有就當即返回下一個...
(3) 線程模型的優化,因爲JDK的Selector在Linux等主流操做系統上使用的epoll模型。這意味着一個Selector線程能夠處理成千上萬個客戶端鏈接。Windows系統上的Selector類是sun.nio.ch.WindowsSelectorImpl,而linux上的實現類是sun.nio.ch.EPollSelectorImpl,很明顯是epoll模型。
jdk1.7升級了NIO類庫,Java正式提供了異步文件I/O操做,同時提供了與UNIX網絡編程事件驅動I/O對應的AIO。
NIO 2.0 引入了新的異步通道的概念,而且提供了異步文件通道和異步套接字通道的實現。異步通道有如下2種方式:
CompletionHanler接口的實現類做爲操做完成的回調。
NIO2.0的異步套接字通道是真正的異步非阻塞I/O,對應於UNIX網絡中的事件驅動IO。它不須要經過多路複用器去對註冊的通道進行輪詢實現異步讀寫,從而簡化了NIO的編程模型。
import java.io.IOException; /** * @author lilinfeng * @version 1.0 * @date 2014年2月14日 */ public class TimeServer { /** * @param args * @throws IOException */ public static void main(String[] args) throws IOException { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 採用默認值 } } AsyncTimeServerHandler timeServer = new AsyncTimeServerHandler(port); new Thread(timeServer, "AIO-AsyncTimeServerHandler-001").start(); } }
下面是真正的Handler:
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.AsynchronousServerSocketChannel; import java.util.concurrent.CountDownLatch; /** * @author Administrator * @version 1.0 * @date 2014年2月16日 */ public class AsyncTimeServerHandler implements Runnable { private int port; CountDownLatch latch; AsynchronousServerSocketChannel asynchronousServerSocketChannel; public AsyncTimeServerHandler(int port) { this.port = port; try { asynchronousServerSocketChannel = AsynchronousServerSocketChannel .open(); asynchronousServerSocketChannel.bind(new InetSocketAddress(port)); System.out.println("The time server is start in port : " + port); } catch (IOException e) { e.printStackTrace(); } } /* * (non-Javadoc) * * @see java.lang.Runnable#run() */ @Override public void run() { latch = new CountDownLatch(1); doAccept(); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } public void doAccept() { asynchronousServerSocketChannel.accept(this, new AcceptCompletionHandler()); } }
(1) 首先建立了一個AsynchronousServerSocketChannel,而後調用它的bind方法綁定監聽端口。
(2) 使用CountDownLatch在啓動完成以後一直阻塞住,實際項目中,不須要啓動獨立的線程,這裏是演示。
(3) doAccept()用於接口客戶端的鏈接,因爲是異步操做,能夠傳遞一個CompletionHandler<AsynchronousSocketChannel, ? super A>的類型來處理。
實際處理類以下:
import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; /** * @author lilinfeng * @version 1.0 * @date 2014年2月16日 */ public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, AsyncTimeServerHandler> { @Override public void completed(AsynchronousSocketChannel result, AsyncTimeServerHandler attachment) { attachment.asynchronousServerSocketChannel.accept(attachment, this); ByteBuffer buffer = ByteBuffer.allocate(1024); result.read(buffer, buffer, new ReadCompletionHandler(result)); } @Override public void failed(Throwable exc, AsyncTimeServerHandler attachment) { exc.printStackTrace(); attachment.latch.countDown(); } }
(1) 這裏又再次調用了
attachment.asynchronousServerSocketChannel.accept(attachment, this);
方法,爲何要再次調用:若是有新的客戶端介入,系統將回調咱們傳入channel,咱們要去接收成千上萬的個請求,每調用一次,到complete方法就表示接收成功了,能夠去接收下一次請求了,當咱們再次調用這個方法,能夠理解爲本次請求已經被成功接收,接收線程能夠接收另外一個請求了。
(2) 成功接收請求後要處理請求消息,調用channel的read方法,咱們看一下該方法的參數
ByteBuffer dst: 接收緩衝區,用於異步Channel中讀取數據包;
Attachment: 異步Channel攜帶的附件,通知回調時做爲參數使用
CompletionHandler<Interger,? super A> : 接收通知回調的業務Handler,在本例中爲ReadCompletionHandler
代碼以下:
import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; /** * @author lilinfeng * @version 1.0 * @date 2014年2月16日 */ public class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel channel; public ReadCompletionHandler(AsynchronousSocketChannel channel) { if (this.channel == null) this.channel = channel; } @Override public void completed(Integer result, ByteBuffer attachment) { attachment.flip(); byte[] body = new byte[attachment.remaining()]; attachment.get(body); try { String req = new String(body, "UTF-8"); System.out.println("The time server receive order : " + req); String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(req) ? new java.util.Date( System.currentTimeMillis()).toString() : "BAD ORDER"; doWrite(currentTime); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } private void doWrite(String currentTime) { if (currentTime != null && currentTime.trim().length() > 0) { byte[] bytes = (currentTime).getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); channel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer buffer) { // 若是沒有發送完成,繼續發送 if (buffer.hasRemaining()) channel.write(buffer, buffer, this); } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { channel.close(); } catch (IOException e) { // ingnore on close } } }); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { this.channel.close(); } catch (IOException e) { e.printStackTrace(); } } }
(1) 首先繼續根據channel建立ReadCompletionHandler中當作成員變量來使用,主要用於半包消息和發送應答,例子沒有處理半包消息。
(2) attachment是一個ByteBuffer,其中已經有寫入的數據了,所以flip準備讀出
(3) 調用doWrite進行寫數據,和前面的read同樣調用channel.write()方法,一樣也有3個參數以支持異步操做,這裏直接使用匿名內部類,若是沒有發送完成就繼續發送
/** * @author lilinfeng * @date 2014年2月14日 * @version 1.0 */ public class TimeClient { /** * @param args */ public static void main(String[] args) { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 採用默認值 } } new Thread(new AsyncTimeClientHandler("127.0.0.1", port), "AIO-AsyncTimeClientHandler-001").start(); } }
import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; /** * @author Administrator * @version 1.0 * @date 2014年2月16日 */ public class AsyncTimeClientHandler implements CompletionHandler<Void, AsyncTimeClientHandler>, Runnable { private AsynchronousSocketChannel client; private String host; private int port; private CountDownLatch latch; public AsyncTimeClientHandler(String host, int port) { this.host = host; this.port = port; try { client = AsynchronousSocketChannel.open(); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { latch = new CountDownLatch(1); client.connect(new InetSocketAddress(host, port), this, this); try { latch.await(); } catch (InterruptedException e1) { e1.printStackTrace(); } try { client.close(); } catch (IOException e) { e.printStackTrace(); } } @Override public void completed(Void result, AsyncTimeClientHandler attachment) { byte[] req = "QUERY TIME ORDER".getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); writeBuffer.put(req); writeBuffer.flip(); client.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer buffer) { if (buffer.hasRemaining()) { client.write(buffer, buffer, this); } else { ByteBuffer readBuffer = ByteBuffer.allocate(1024); client.read( readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer buffer) { buffer.flip(); byte[] bytes = new byte[buffer .remaining()]; buffer.get(bytes); String body; try { body = new String(bytes, "UTF-8"); System.out.println("Now is : " + body); latch.countDown(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { client.close(); latch.countDown(); } catch (IOException e) { // ingnore on close } } }); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { client.close(); latch.countDown(); } catch (IOException e) { // ingnore on close } } }); } @Override public void failed(Throwable exc, AsyncTimeClientHandler attachment) { exc.printStackTrace(); try { client.close(); latch.countDown(); } catch (IOException e) { e.printStackTrace(); } } }
(1) 這裏跟Server很是相似,很少加描述了。
(2) 這裏一樣沒有處理半包問題,所以本例只能做爲功能測試,若是對代碼進行壓力或者性能測試,輸出結果就會存在問題。
(3) 運行代碼,使用jps 查看pid,再使用jstack -l pid 查看堆棧信息,發現JDK底層經過線程池來執行回調通知,異步回調類由sun.nio.ch.AsynchronousChannelGroupImpl實現。所以:異步Socket Channel是一個被動執行對象,這裏不須要像NIO編程那樣建立一個獨立的I/O線程去處理讀寫操做,使用的JDK內部的線程池自動回調驅動讀寫操做,所以,基於新的NIO2.0編程會更加簡單。
不少人喜歡講jdk1.4提供的NIO框架稱爲異步非阻塞I/O,可是它實際上只是非阻塞I/O,早起的JDK1.4和1.5 update10以前,JDK Selector還只能使用select/poll實現IO複用技術,不是異步的,在JDK1.5 UPDATE10和Linux core2.6之上才能使用epoll,而上層API不變,只是底層Selector由epoll實現,仍舊沒有改變IO模型,仍是多路複用模型。
而NIO2.0纔開始提供異步的套接字通道,是真正的異步I/O模型。
可是這裏不對其叫法作糾結,1.4 1.7 都稱爲異步非阻塞IO也無所謂。
這裏對這四種IO進行約定:
1. bio
2. 僞異步: bio+ 線程池,官方沒有這種叫法..
3. NIO: jdk1.4多路複用
4. AIO: jdk1.7 aio 模型