在上一篇文章中咱們介紹了Java基本IO,也就是阻塞式IO(BIO),在JDK1.4版本後推出了新的IO系統(NIO),也能夠理解爲非阻塞IO(Non-Blocking IO)。引用《Java NIO》中的一段話來解釋一下NIO出現的緣由:html
操做系統與 Java 基於流的 I/O模型有些不匹配。操做系統要移動的是大塊數據(緩衝區),這每每是在硬件直接存儲器存取( DMA)的協助下完成的。而 JVM 的 I/O 類喜歡操做小塊數據——單個字節、幾行文本。結果,操做系統送來整緩衝區的數據, java.io 的流數據類再花大量時間把它們拆成小塊,每每拷貝一個小塊就要往返於幾層對象。操做系統喜歡整卡車地運來數據, java.io 類則喜歡一鏟子一鏟子地加工數據。有了 NIO,就能夠輕鬆地把一卡車數據備份到您能直接使用的地方( ByteBuffer 對象)。可是Java裏的RandomAccessFile類是比較接近操做系統的方式。java
能夠看出Java原生的IO模型之因此慢,是由於與操做系統的操做方式不匹配形成的,那麼NIO之因此比BIO快主要就是用到了緩衝區相關的技術,接下來慢慢介紹這些技術點。git
下圖描述了操做系統中數據是如何從外部存儲向運行中的進程內存區域移動的過程:進程使用read()系統調用要求緩衝區被填充滿。內核隨即向磁盤控制器發出指令,要求其從磁盤讀取數據。磁盤控制器經過DMA直接把磁盤上的數據寫入緩衝區,這一步不須要CPU的參與。當緩衝區填滿時,內核將數據從臨時緩衝區拷貝到進程執行read()調用時指定的緩衝區。github
這裏須要主要爲何要執行系統調用這樣一箇中間步驟而不是直接DMA到進程的緩衝區,是由於用戶空間是沒法直接操做硬件的,另外磁盤這種塊存儲設備操做的是固定大小的數據塊,而用戶請求的則是非規則大小的數據,內核空間在這裏的做用就是分解、重組的做用。數組
Java NIO主要依賴的組件有三個:緩衝區Buffer、通道Channel和選擇器Selector。服務器
Buffer家族主要有這麼些個成員,根據類名也大概能猜到它們的用處,用的最多的是ByteBuffer,在下面的例子中也會主要用到它。
app
在這裏就不仔細講Buffer類的API了,由於須要用的時候能夠去查Java Doc,而以幾個經常使用的操做來說述一下怎麼使用Buffer。dom
容量(capacity):緩衝區的最大大小socket
上界(limit):緩衝區當前的大小函數
位置(position):下一個要讀寫的位置,由get()和put()更新
標記(mark):備忘位置,由mark()來指定mark = position,由reset()來指定position=mark
它們之間的大小關係:
0 <= mark <= position <= limit <= capacity
一種最經常使用的方式是:
ByteBuffer buffer = ByteBuffer.allocate(1024);
這種方法是建立一個1024字節大小的緩衝區。也能夠用下面這種方式來包裝本身建立的字節數組。
byte[] bytes = new byte[1024];
ByteBuffer buffer = ByteBuffer.wrap(bytes);
Buffer在填充完畢後須要傳遞到一個通道中,這時若是直接讀取Buffer,實際上是什麼都讀不到的。由於Buffer的設計中是有一個指針概念的,指向當前的位置,當一個Buffer填充完畢時指針是指向末尾的,所以在讀取時應該將指針指向Buffer的頭部,簡單的方法就是使用下面這個方法:
Buffer.flip();
flip的實現以下:
public final Buffer flip() { limit = position; position = 0; mark = -1; return this; }
能夠看出flip實際上是把當前的limit從capacity變成了position,又把position放到了緩衝區的起點,並取消了mark。
Buffer.clear();
clear的實現以下:
public final Buffer clear() { position = 0; limit = capacity; mark = -1; return this; }
clear函數就是將position放到起點,並重置limiti爲capacity,以及取消mark。
Buffer.rewind();
rewind的實現以下:
public final Buffer rewind() { position = 0; mark = -1; return this; }
rewind和flip的區別在於沒有改變limit的值。
Buffer.compact()
開始我不是很理解Channel這個東西爲何要存在,看了書才慢慢明白,緩衝區爲咱們裝載了數據,可是數據的寫入和讀取並不能直接進行read()和write()這樣的系統調用,而是JVM爲咱們提供了一層對系統調用的封裝。而Channel能夠用最小的開銷來訪問操做系統自己的IO服務,這就是爲何要有Channel的緣由。
下面來說講經常使用的幾個Channel類及其經常使用的方法。
I/O從廣義上能夠分爲File I/O和Stream I/O,對應到通道來講就有文件通道和socket通道,具體的說是FileChannle類和SocketChannel、ServerSocketChannel和DatagramChannel類。
它們之間的區別仍是很大的,從繼承關係上來看:
public abstract class FileChannel extends AbstractInterruptibleChannel implements SeekableByteChannel, GatheringByteChannel, ScatteringByteChannel
FileChannel主要是繼承了可中斷接口,而對於socket相關的Channel類都繼承AbstractSelectableChannel,這是選擇器(Selector)相關的通道,在下一節中具體講解。
public abstract class SocketChannel extends AbstractSelectableChannel implements ByteChannel, ScatteringByteChannel, GatheringByteChannel, NetworkChannel
FileChannel只能經過工廠方法來實例化,那就是調用RandomAccessFile、FileInputStream和FileOutputStream的getChannel()方法。如:
RandomAccessFile file = new RandomAccessFile("a.txt", "r"); FileChannel fc = file.getChannel();
先看看FileChannel提供的方法句柄:
public abstract int read(ByteBuffer dst) throws IOException;//把通道中數據傳到目的緩衝區中,dst是destination的縮寫 public abstract int write(ByteBuffer src) throws IOException;//把源緩衝區中的內容寫到指定的通道中去
從句柄能夠看出FileChannel是既能夠讀又能夠寫的,是全雙工的。下面這個例子用來展現FileChannel是如何進行讀和寫的。
public class FileChannelTest { public static void readFile(String path) throws IOException { FileChannel fc = new FileInputStream(path).getChannel(); ByteBuffer buffer = ByteBuffer.allocate(128); StringBuilder sb = new StringBuilder(); while ((fc.read(buffer)) >= 0) { //翻轉指針 buffer.flip(); //remaining = limit - position byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String string = new String(bytes, "UTF-8"); sb.append(string); //清空buffer buffer.clear(); } System.out.println(sb.toString()); } public static void writeFile(String path, String string) throws IOException { FileChannel fc = new FileOutputStream(path).getChannel(); ByteBuffer buffer = ByteBuffer.allocate(10); int current = 0; int len = string.getBytes().length; while (current < len) { for (int i=0;i<10;i++) { if (current+i>=len) break; buffer.put(string.getBytes()[current+i]); } current += buffer.position(); buffer.flip(); fc.write(buffer); buffer.clear(); } } public static void main(String[] args) throws IOException { String in = "D:/in.txt"; String out = "D:/out.txt"; readFile(in); writeFile(out, "hello world"); readFile(out); } }
分析一下上面這段代碼,在readFile()函數中,經過FileInputStream.getChannel()獲得FileChannel對象,並建立ByteBuffer對象,接着利用FileChannel的read方法填充buffer,獲得填充完的buffer以後咱們將buffer的當前指針翻轉一下接着利用buffer的get方法把數據放到byte數組中,接着就能夠讀取數據了。
讀取文件的整個過程相比原生的I/O方法仍是略顯麻煩,可是咱們若是把數據當作一堆煤礦,把ByteBuffer當作裝煤的礦車,而FileChannel當作是運煤的礦道,那麼上面的過程就演變成了:先打通一條礦道,而後把煤礦裝在小車裏運出來。形象的記憶更利於理解這個過程。
而writeFile()函數也是相似,爲了更好的理解Buffer的屬性,我特地將buffer的大小設置爲10,爲要寫入的字符串長度爲11個字節。首先仍是經過FileOutputStream.getChannel()方法獲得FileChannel對象,並建立一個10字節大小的緩衝區,接着定義一個整型變量current指向要寫入的字符串的當前下標,每次向buffer中put10個字節,並更新current,經過buffer.position()方法能夠獲得buffer被填充以後指針的位置,也就是緩衝區裏的字節個數,而後翻轉指針,最後經過FileChannel.write(buffer)方法將buffer寫入到文件中。
一樣考慮一下形象化的過程:咱們首先把煤礦裝入小車(buffer.put()),並打開一條通往礦山的礦道(FileOutputStream.getChannel()),接着把煤礦運輸進去(FileChannel.write(buffer))。仍是很容易理解的吧!
在另外一篇博客中介紹了阻塞式TCP的使用,接下來會介紹一下非阻塞式的TCP使用。
Socket通道與文件通道有着不一樣的特徵,最顯著的就是能夠運行非阻塞模式而且是能夠選擇的。在2.2.1節中咱們講到Socket通道都繼承自AbstractSelectableChannel類,而文件通道沒有,而這個類就是Socket通道擁有非阻塞和可選擇特色的關鍵。下面是SelectableChannel的幾個方法句柄:
public abstract boolean isBlocking(); public abstract SelectableChannel configureBlocking(boolean block) throws IOException;
從這兩個方法句柄能夠看到,設置一個socket通道的非阻塞模式只須要:
socketChannel.configureBlocking(false)
便可。而有條件的選擇(readiness selection)是一種能夠用來查詢通道的機制,該查詢能夠判斷通道是否準備好執行一個目標操做,好比read、write或accept。這個特性是在SelectableChannel類和SelectionKey類中進行了定義。
public static final int OP_READ = 1 << 0; public static final int OP_WRITE = 1 << 2; public static final int OP_CONNECT = 1 << 3; public static final int OP_ACCEPT = 1 << 4;
SelectionKey中的四個常量定義了socket通道的四種狀態,而SelectableChannel的register方法正好返回了SelectionKey對象。
public abstract SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException;
socket通道與文件通道不一樣,並非經過socket.getChannel()來建立對象(儘管socket對象有這個方法),而是經過SocketChannel.open()這樣的靜態工廠方法去建立對象。每個socket通道有與之關聯的一個socket對象,卻並非全部的socket對象都有一個關聯的通道,若是用傳統的方法建立了一個socket對象,則它不會有一個關聯的通道而且getChannel()方法老是返回null。
SocketChannel sc = SocketChannel.open(); sc.configureBlocking(false);
這樣就建立了一個非阻塞的socket通道。
public abstract class ServerSocketChannel extends AbstractSelectableChannel { public static ServerSocketChannel open( ) throws IOException; public abstract ServerSocket socket( ); public abstract ServerSocket accept( ) throws IOException; public final int validOps( ); }
ServerSocketChannel與SocketChannel和DatagramChannel不一樣,它自己是不傳輸數據的,提供的接口很是簡單,若是要進行數據讀寫,須要經過ServerSocketChannel.socket()方法返回一個與之關聯的ServerSocket對象來進行。
ServerSocketChannel ssc = ServerSocketChannel.open(); ServerSocket ss = ssc.socket(); ss.bind(new InetSocketAddress(port));
ServerSocketChannel同ServerSocket同樣也有accept()方法,當調用ServerSocket的accept()函數時只能是阻塞式的,而調用ServerSocketChannel的accept()函數卻能夠是非阻塞式。
下面這個例子展現了ServerSocketChannel的用法:
public class Server { static int port = 20001; public static void main(String[] args) throws IOException, InterruptedException { ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.socket().bind(new InetSocketAddress(port)); ssc.configureBlocking(false); String string = "hello client"; ByteBuffer buffer = ByteBuffer.wrap(string.getBytes()); ByteBuffer in = ByteBuffer.allocate(1024); System.out.println("Server wait for connection..."); while (true) { SocketChannel sc = ssc.accept(); if (sc == null) { TimeUnit.SECONDS.sleep(1); }else { //rewind只是將position調到0,不會改變Limit的值,而flip是將limit調整成position,再把position改爲0 System.out.println("Server get a connection..."); sc.read(in); in.flip(); buffer.rewind(); sc.write(buffer); System.out.println("From client:" + new String(in.array(), "UTF-8")); } } } }
選擇器實際上是一種多路複用機制在Java語言中的應用,在學習Selector以前有必要學習一下I/O多路複用的概念。
在以前的文章中咱們已經看到對於每一個客戶端請求都分配一個線程的設計,或者是利用線程池來處理客戶端請求,可是這樣的設計對於處理客戶端有大量請求的狀況都一籌莫展。緣由在於首先線程很是消耗系統資源,其次阻塞式的設計在某一個請求發送的數據很大時會使其餘請求等待好久。那麼究竟有沒有其餘方法來解決這個問題呢?早在上世紀80年代在Unix系統中就已經提出select模型來解決這個問題,在以後對select進行優化又提出了poll模型和epoll模型(Linux專有)。
select/poll/epoll其實都是一種多路複用模型,什麼是多路複用,開始聽見這個名詞我也是一臉懵逼,以爲好像很高大上很難理解的樣子。後面經過看書和看知乎上的形象化描述,慢慢理解了其實多路複用也沒有想象着那麼難。咱們若是把每一個客戶端請求當作一個電路,以下圖,那麼是否有必要爲每條電路都分配一條專有的線路呢?仍是當電流來了進行開關切換?很明顯,後者只須要一個開關就能夠節省大量的沒必要要開銷。select模型其實就是這樣作的,監控全部的socket請求,當某個socket準備好(read/write/accept)時就進行處。可是如何作到監控全部socket的狀態呢,select作的很簡單,也許你也想到了,就是去輪詢全部socket的狀態,這樣很明顯當socket數量比較大時效率很是低。而且select對於監控的socket數量有限制,默認是1024個。poll模型進行了一些改進,可是並無本質的改變。到了epoll模型,就有了很是大的改觀。假象另外一個場景,若是你是一個監考老師,考試結束時要去收卷子,你按照常理一個一個的收着,一旦有一個學生還沒寫完,因而你就會卡(阻塞)在那,而且整個輪詢一遍下來很是慢。因此你想到了嗎?讓那些已經作完的學生舉手告知你他已經作完了,你再過去收一下卷子便可。這樣很明顯阻塞會大幅度減小。這就是epoll,讓那些已經準備好的socket發出通知,而後來處理它。
若是仍是不理解,能夠看看知乎上的一些回答。
好了,廢話這麼多,已是能夠理解多路複用是什麼了。Java語言直到JDK1.4版本纔有多路複用這個概念,很大緣由也是由於沒人用Java去寫服務器,例如著名的Apache和Nginx都是用C/C++寫的。接下來對NIO中多路複用的實現進行介紹。
NIO處理多路複用請求只須要三個組件:可選擇的通道(SelectableChannels)、選擇器(Selector)和選擇鍵(SelectionKey),他們之間的關係以下圖所示:
可選擇的通道能夠主動註冊到一個選擇器上,並指定對哪些動做是感興趣的。這個註冊行爲會返回一個選擇鍵,選擇鍵封裝了該通道和選擇器之間的註冊關係,包含兩個比特集:指示該註冊關係所關心的通道操做;通道已經準備好的操做。選擇器是核心組件,它管理着註冊在其上的通道集合的信息和它們的就緒狀態。值得注意的是,通道在註冊到一個選擇器以前,必須設置爲非阻塞模式。緣由在這裏。
經過靜態工廠方法建立一個選擇器。
Selector selector = Selector.open();
這是通道擁有的方法,先看看方法句柄:
public abstract SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException; public final SelectionKey register(Selector sel, int ops) throws ClosedChannelException { return register(sel, ops, null); }
值得注意的是第二個參數ops,這個參數表示了該通道感興趣的操做,全部的操做包括讀(read)、寫(write)、鏈接(connect)和接受(accept)。並非全部通道都支持這些操做,好比SocketChannel就沒有accept這個操做,由於這是專屬於ServerSocketChannel的操做。能夠經過調用Channel.validOps()來查詢支持的操做。
第三個參數是用來傳遞一個對象的引用,在調用新生成的選擇鍵的attach()方法時會返回該對象的引用。
選擇器的核心功能是選擇過程,選擇器其實是對select()、poll()等本地系統調用的一個封裝。每個選擇器會維護三個鍵集合:已註冊的鍵集合、已選擇的鍵集合和已取消的鍵集合。經過執行Selector.select()、Selector.select(int timeout)或Selector.selectNow(),選擇過程被調用,這時會執行如下步驟:
好不容易纔寫完上面這段,由於我在看原書的時候看了2-3遍纔看懂,過程仍是比較複雜的,我以爲是時候去看看Unix中的select()是怎麼作的,也許這樣更利於理解這個選擇過程。
說了這麼多原理,不知道你暈沒暈,反正我是快暈了。這時候來一段實戰代碼,告訴你瞭解了這麼多,到底該怎麼用!
一般的作法以下:在選擇器上調用一次select操做(這會更新已選擇鍵的集合),而後遍歷selectedKeys返回的鍵的集合。接着鍵將從已選擇的鍵的集合中被移除(經過Iterator.remove()方法),而後檢測下一個鍵。完成後,繼續下一次select操做。
服務端程序演示:
public class SelectorTest { public static void main(String[] args) throws IOException { new SelectorTest().select(); } public void select() throws IOException { //建立選擇器 Selector selector = Selector.open(); //建立serverChannel ServerSocketChannel ssc = ServerSocketChannel.open(); //設置爲非阻塞模式 ssc.configureBlocking(false); //綁定監聽的地址 ssc.socket().bind(new InetSocketAddress(20000), 1024); //將serverChannel註冊到選擇器上,監聽accept事件,返回選擇鍵 ssc.register(selector, SelectionKey.OP_ACCEPT); while (true) { //這次選擇過程準備就緒的通道數量 int num = selector.select(); if (num == 0) { //若沒有準備好的就繼續循環 continue; } //返回已就緒的鍵集合 Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey key = it.next(); handle(selector, key); //由於已經處理了該鍵,因此把當前的key從已選擇的集合中去除 it.remove(); } } } public void handle(Selector selector, SelectionKey key) throws IOException { if (key.isValid()) { //當一個ServerChannel爲accept狀態時,註冊這個ServerChannel的SocketChannel爲可讀取狀態 if (key.isAcceptable()) { ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel(); SocketChannel channel = serverChannel.accept(); //把通道註冊到選擇器以前要設置爲非阻塞,不然會報異常 channel.configureBlocking(false); channel.register(selector, SelectionKey.OP_READ); } //若是channel是可讀取狀態,則讀取其中的數據 if (key.isReadable()) { //只有SocketChannel才能讀寫數據,因此若是是可讀取狀態,只能是SocketChannel SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer in = ByteBuffer.allocate(1024); //將socketChannel中的數據讀入到buffer中,返回當前字節的位置 int readBytes = sc.read(in); if (readBytes > 0) { //把buffer的position指針指向buffer的開頭 in.flip(); byte[] bytes = new byte[in.remaining()]; in.get(bytes); String body = new String(bytes, "UTF-8"); System.out.println("The server receive : " + body); //把response輸出到socket中 doWrite(sc, "Hello client"); } else if (readBytes < 0) { key.cancel(); sc.close(); } } } } private void doWrite(SocketChannel sc, String response) throws IOException { //把服務器端返回的數據寫到socketChannel中 if (response == null && response.trim().length() > 0) { byte[] bytes = response.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); sc.write(writeBuffer); } } }
代碼相較於阻塞式TCP服務端程序複雜了太多倍,可是基本思路跟我上面那段話寫的是同樣的,並且基本每一段代碼都寫了註釋,耐心看下去確定看的懂。我就再也不解釋這段代碼啦。
客戶端演示:
public class Client { public static final int PORT = 20000; public static final String HOST = "127.0.0.1"; private volatile boolean stop = false; public static void main(String[] args) throws IOException { new Client().select(); } public void select() throws IOException { // 建立選擇器 Selector selector = Selector.open(); // 建立SocketChannel SocketChannel sc = SocketChannel.open(); // 設置爲非阻塞模式 sc.configureBlocking(false); try { doConnect(selector, sc); } catch (Exception e) { e.printStackTrace(); System.exit(1); } while (!stop) { int num = selector.select(); if (num == 0) { continue; } Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey key = it.next(); try { handleKeys(selector, key); } catch (Exception e) { e.printStackTrace(); if (key != null) { key.cancel(); if (key.channel() != null) { key.channel().close(); } } } // 由於已經處理了該鍵,因此把當前的key從已選擇的集合中去除 it.remove(); } } if (selector != null) { selector.close(); } } private void doWrite(SocketChannel sc, String response) throws IOException { if (response != null && response.trim().length() > 0) { byte[] bytes = response.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); sc.write(writeBuffer); if (!writeBuffer.hasRemaining()) { System.out.println("Send msg successfully"); } } } private void handleKeys(Selector selector, SelectionKey key) throws IOException { if (key.isValid()) { SocketChannel sc = (SocketChannel) key.channel(); // 判斷是否鏈接成功 if (key.isConnectable()) { if (sc.finishConnect()) { sc.register(selector, SelectionKey.OP_READ); doWrite(sc, "Hello Server"); } else { System.exit(1); } } if (key.isReadable()) { ByteBuffer in = ByteBuffer.allocate(1024); // 將socketChannel中的數據讀入到buffer中,返回當前字節的位置 int readBytes = sc.read(in); if (readBytes > 0) { // 把buffer的position指針指向buffer的開頭 in.flip(); byte[] bytes = new byte[in.remaining()]; in.get(bytes); String body = new String(bytes, "UTF-8"); System.out.println("The Client receive : " + body); this.stop = true; } else if (readBytes < 0) { // 對端鏈路關閉 key.cancel(); sc.close(); } else { // 讀到0字節,忽略 } } } } private void doConnect(Selector selector, SocketChannel sc) throws IOException { if (sc.connect(new InetSocketAddress(HOST, PORT))) { System.out.println("Client connect successfully..."); // 若是直接鏈接成功,則註冊讀操做 sc.register(selector, SelectionKey.OP_READ); doWrite(sc, "Hello server!"); } else { // 若是沒有鏈接成功,則註冊鏈接操做 sc.register(selector, SelectionKey.OP_CONNECT); } } }
客戶端跟服務端很類似,惟一不一樣的是服務端須要監測的socket行爲是OP_ACCEPT和OP_READ,而客戶端須要監控的是OP_CONNECT和OP_READ,其餘的區別不是很大。
依次運行服務器端和客戶端,結果以下:
代碼在個人github repo上也能夠找到。
花了大概三天的時間,把《Java NIO》這本書看了一遍並記錄了下來學習過程,而且結合《Netty權威指南》中的例子去實踐了一下,慢慢感受到NIO的魅力。反思一下學習的比較慢的緣由,應該是對Unix上的I/O模型不熟悉致使的,因此以爲接下來好好學習一下select、poll、epoll,加深對多路複用的理解。
本文中可能存在理解有誤差的地方,也請多多指正。