NIO 之阻塞IO和非阻塞IO(轉載)

阻塞模式 IO

咱們已經介紹過使用 Java NIO 包組成一個簡單的客戶端-服務端網絡通信所須要的 ServerSocketChannel、SocketChannel 和 Buffer,咱們這裏整合一下它們,給出一個完整的可運行的例子:java

public class Server { public static void main(String[] args) throws IOException { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 監聽 8080 端口進來的 TCP 連接 serverSocketChannel.socket().bind(new InetSocketAddress(8080)); while (true) { // 這裏會阻塞,直到有一個請求的鏈接進來 SocketChannel socketChannel = serverSocketChannel.accept(); // 開啓一個新的線程來處理這個請求,而後在 while 循環中繼續監聽 8080 端口 SocketHandler handler = new SocketHandler(socketChannel); new Thread(handler).start(); } } } 

這裏看一下新的線程須要作什麼,SocketHandler:編程

public class SocketHandler implements Runnable { private SocketChannel socketChannel; public SocketHandler(SocketChannel socketChannel) { this.socketChannel = socketChannel; } @Override public void run() { ByteBuffer buffer = ByteBuffer.allocate(1024); try { // 將請求數據讀入 Buffer 中 int num; while ((num = socketChannel.read(buffer)) > 0) { // 讀取 Buffer 內容以前先 flip 一下 buffer.flip(); // 提取 Buffer 中的數據 byte[] bytes = new byte[num]; buffer.get(bytes); String re = new String(bytes, "UTF-8"); System.out.println("收到請求:" + re); // 迴應客戶端 ByteBuffer writeBuffer = ByteBuffer.wrap(("我已經收到你的請求,你的請求內容是:" + re).getBytes()); socketChannel.write(writeBuffer); buffer.clear(); } } catch (IOException e) { IOUtils.closeQuietly(socketChannel); } } } 

最後,貼一下客戶端 SocketChannel 的使用,客戶端比較簡單:windows

public class SocketChannelTest { public static void main(String[] args) throws IOException { SocketChannel socketChannel = SocketChannel.open(); socketChannel.connect(new InetSocketAddress("localhost", 8080)); // 發送請求 ByteBuffer buffer = ByteBuffer.wrap("1234567890".getBytes()); socketChannel.write(buffer); // 讀取響應 ByteBuffer readBuffer = ByteBuffer.allocate(1024); int num; if ((num = socketChannel.read(readBuffer)) > 0) { readBuffer.flip(); byte[] re = new byte[num]; readBuffer.get(re); String result = new String(re, "UTF-8"); System.out.println("返回值: " + result); } } } 

上面介紹的阻塞模式的代碼應該很好理解:來一個新的鏈接,咱們就新開一個線程來處理這個鏈接,以後的操做所有由那個線程來完成。緩存

那麼,這個模式下的性能瓶頸在哪裏呢?服務器

  1. 首先,每次來一個鏈接都開一個新的線程這確定是不合適的。當活躍鏈接數在幾十幾百的時候固然是能夠這樣作的,但若是活躍鏈接數是幾萬幾十萬的時候,這麼多線程明顯就不行了。每一個線程都須要一部份內存,內存會被迅速消耗,同時,線程切換的開銷很是大。
  2. 其次,阻塞操做在這裏也是一個問題。首先,accept() 是一個阻塞操做,當 accept() 返回的時候,表明有一個鏈接可使用了,咱們這裏是立刻就新建線程來處理這個 SocketChannel 了,可是,可是這裏不表明對方就將數據傳輸過來了。因此,SocketChannel#read 方法將阻塞,等待數據,明顯這個等待是不值得的。同理,write 方法也須要等待通道可寫才能執行寫入操做,這邊的阻塞等待也是不值得的。

非阻塞 IO

說完了阻塞模式的使用及其缺點之後,咱們這裏就能夠介紹非阻塞 IO 了。網絡

非阻塞 IO 的核心在於使用一個 Selector 來管理多個通道,能夠是 SocketChannel,也能夠是 ServerSocketChannel,將各個通道註冊到 Selector 上,指定監聽的事件。多線程

以後能夠只用一個線程來輪詢這個 Selector,看看上面是否有通道是準備好的,當通道準備好可讀或可寫,而後纔去開始真正的讀寫,這樣速度就很快了。咱們就徹底沒有必要給每一個通道都起一個線程。併發

NIO 中 Selector 是對底層操做系統實現的一個抽象,管理通道狀態其實都是底層系統實現的,這裏簡單介紹下在不一樣系統下的實現。異步

select:上世紀 80 年代就實現了,它支持註冊 FD_SETSIZE(1024) 個 socket,在那個年代確定是夠用的,不過如今嘛,確定是不行了。socket

poll:1997 年,出現了 poll 做爲 select 的替代者,最大的區別就是,poll 再也不限制 socket 數量。

select 和 poll 都有一個共同的問題,那就是它們都只會告訴你有幾個通道準備好了,可是不會告訴你具體是哪幾個通道。因此,一旦知道有通道準備好之後,本身仍是須要進行一次掃描,顯然這個不太好,通道少的時候還行,一旦通道的數量是幾十萬個以上的時候,掃描一次的時間都很可觀了,時間複雜度 O(n)。因此,後來才催生了如下實現。

epoll:2002 年隨 Linux 內核 2.5.44 發佈,epoll 能直接返回具體的準備好的通道,時間複雜度 O(1)。

除了 Linux 中的 epoll,2000 年 FreeBSD 出現了 Kqueue,還有就是,Solaris 中有 /dev/poll。

前面說了那麼多實現,可是沒有出現 Windows,Windows 平臺的非阻塞 IO 使用 select,咱們也沒必要以爲 Windows 很落後,在 Windows 中 IOCP 提供的異步 IO 是比較強大的。

咱們回到 Selector,畢竟 JVM 就是這麼一個屏蔽底層實現的平臺,咱們面向 Selector 編程就能夠了。

以前在介紹 Selector 的時候已經瞭解過了它的基本用法,這邊來一個可運行的實例代碼,你們不妨看看:

public class SelectorServer { public static void main(String[] args) throws IOException { Selector selector = Selector.open(); ServerSocketChannel server = ServerSocketChannel.open(); server.socket().bind(new InetSocketAddress(8080)); // 將其註冊到 Selector 中,監聽 OP_ACCEPT 事件 server.configureBlocking(false); server.register(selector, SelectionKey.OP_ACCEPT); while (true) { int readyChannels = selector.select(); if (readyChannels == 0) { continue; } Set<SelectionKey> readyKeys = selector.selectedKeys(); // 遍歷 Iterator<SelectionKey> iterator = readyKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); if (key.isAcceptable()) { // 有已經接受的新的到服務端的鏈接 SocketChannel socketChannel = server.accept(); // 有新的鏈接並不表明這個通道就有數據, // 這裏將這個新的 SocketChannel 註冊到 Selector,監聽 OP_READ 事件,等待數據 socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); } else if (key.isReadable()) { // 有數據可讀 // 上面一個 if 分支中註冊了監聽 OP_READ 事件的 SocketChannel SocketChannel socketChannel = (SocketChannel) key.channel(); ByteBuffer readBuffer = ByteBuffer.allocate(1024); int num = socketChannel.read(readBuffer); if (num > 0) { // 處理進來的數據... System.out.println("收到數據:" + new String(readBuffer.array()).trim()); ByteBuffer buffer = ByteBuffer.wrap("返回給客戶端的數據...".getBytes()); socketChannel.write(buffer); } else if (num == -1) { // -1 表明鏈接已經關閉 socketChannel.close(); } } } } } } 

至於客戶端,你們能夠繼續使用上一節介紹阻塞模式時的客戶端進行測試。

NIO.2 異步 IO

More New IO,或稱 NIO.2,隨 JDK 1.7 發佈,包括了引入異步 IO 接口和 Paths 等文件訪問接口。

異步這個詞,我想對於絕大多數開發者來講都很熟悉,不少場景下咱們都會使用異步。

一般,咱們會有一個線程池用於執行異步任務,提交任務的線程將任務提交到線程池就能夠立馬返回,沒必要等到任務真正完成。若是想要知道任務的執行結果,一般是經過傳遞一個回調函數的方式,任務結束後去調用這個函數。

一樣的原理,Java 中的異步 IO 也是同樣的,都是由一個線程池來負責執行任務,而後使用回調或本身去查詢結果。

大部分開發者都知道爲何要這麼設計了,這裏再囉嗦一下。異步 IO 主要是爲了控制線程數量,減小過多的線程帶來的內存消耗和 CPU 在線程調度上的開銷。

在 Unix/Linux 等系統中,JDK 使用了併發包中的線程池來管理任務,具體能夠查看 AsynchronousChannelGroup 的源碼。

在 Windows 操做系統中,提供了一個叫作 I/O Completion Ports 的方案,一般簡稱爲 IOCP,操做系統負責管理線程池,其性能很是優異,因此在 Windows 中 JDK 直接採用了 IOCP 的支持,使用系統支持,把更多的操做信息暴露給操做系統,也使得操做系統可以對咱們的 IO 進行必定程度的優化。

在 Linux 中其實也是有異步 IO 系統實現的,可是限制比較多,性能也通常,因此 JDK 採用了自建線程池的方式。

本文仍是以實用爲主,想要了解更多信息請自行查找其餘資料,下面對 Java 異步 IO 進行實踐性的介紹。

總共有三個類須要咱們關注,分別是 AsynchronousSocketChannel,AsynchronousServerSocketChannel 和 AsynchronousFileChannel,只不過是在以前介紹的 FileChannel、SocketChannel 和 ServerSocketChannel 的類名上加了個前綴 Asynchronous。

Java 異步 IO 提供了兩種使用方式,分別是返回 Future 實例和使用回調函數。

一、返回 Future 實例

返回 java.util.concurrent.Future 實例的方式咱們應該很熟悉,JDK 線程池就是這麼使用的。Future 接口的幾個方法語義在這裏也是通用的,這裏先作簡單介紹。

  • future.isDone();

    判斷操做是否已經完成,包括了正常完成、異常拋出、取消

  • future.cancel(true);

    取消操做,方式是中斷。參數 true 說的是,即便這個任務正在執行,也會進行中斷。

  • future.isCancelled();

    是否被取消,只有在任務正常結束以前被取消,這個方法纔會返回 true

  • future.get();

    這是咱們的老朋友,獲取執行結果,阻塞。

  • future.get(10, TimeUnit.SECONDS);

    若是上面的 get() 方法的阻塞你不滿意,那就設置個超時時間。

二、提供 CompletionHandler 回調函數

java.nio.channels.CompletionHandler 接口定義:

public interface CompletionHandler<V,A> { void completed(V result, A attachment); void failed(Throwable exc, A attachment); } 

注意,參數上有個 attachment,雖然不經常使用,咱們能夠在各個支持的方法中傳遞這個參數值

AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel.open().bind(null); // accept 方法的第一個參數能夠傳遞 attachment listener.accept(attachment, new CompletionHandler<AsynchronousSocketChannel, Object>() { public void completed( AsynchronousSocketChannel client, Object attachment) { // } public void failed(Throwable exc, Object attachment) { // } }); 

AsynchronousFileChannel

網上關於 Non-Blocking IO 的介紹文章不少,可是 Asynchronous IO 的文章相對就少得多了,因此我這邊會多介紹一些相關內容。

首先,咱們就來關注異步的文件 IO,前面咱們說了,文件 IO 在全部的操做系統中都不支持非阻塞模式,可是咱們能夠對文件 IO 採用異步的方式來提升性能。

下面,我會介紹 AsynchronousFileChannel 裏面的一些重要的接口,都很簡單,讀者要是以爲無趣,直接滑到下一個標題就能夠了。

實例化:

AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get("/Users/hongjie/test.txt")); 

一旦實例化完成,咱們就能夠着手準備將數據讀入到 Buffer 中:

ByteBuffer buffer = ByteBuffer.allocate(1024); Future<Integer> result = channel.read(buffer, 0); 

異步文件通道的讀操做和寫操做都須要提供一個文件的開始位置,文件開始位置爲 0

除了使用返回 Future 實例的方式,也能夠採用回調函數進行操做,接口以下:

public abstract <A> void read(ByteBuffer dst, long position, A attachment, CompletionHandler<Integer,? super A> handler); 

順便也貼一下寫操做的兩個版本的接口:

public abstract Future<Integer> write(ByteBuffer src, long position); public abstract <A> void write(ByteBuffer src, long position, A attachment, CompletionHandler<Integer,? super A> handler); 

咱們能夠看到,AIO 的讀寫主要也仍是與 Buffer 打交道,這個與 NIO 是一脈相承的。

另外,還提供了用於將內存中的數據刷入到磁盤的方法:

public abstract void force(boolean metaData) throws IOException; 

由於咱們對文件的寫操做,操做系統並不會直接針對文件操做,系統會緩存,而後週期性地刷入到磁盤。若是但願將數據及時寫入到磁盤中,以避免斷電引起部分數據丟失,能夠調用此方法。參數若是設置爲 true,意味着同時也將文件屬性信息更新到磁盤。

還有,還提供了對文件的鎖定功能,咱們能夠鎖定文件的部分數據,這樣能夠進行排他性的操做。

public abstract Future<FileLock> lock(long position, long size, boolean shared); 

position 是要鎖定內容的開始位置,size 指示了要鎖定的區域大小,shared 指示須要的是共享鎖仍是排他鎖

固然,也可使用回調函數的版本:

public abstract <A> void lock(long position, long size, boolean shared, A attachment, CompletionHandler<FileLock,? super A> handler); 

文件鎖定功能上還提供了 tryLock 方法,此方法會快速返回結果:

public abstract FileLock tryLock(long position, long size, boolean shared) throws IOException; 

這個方法很簡單,就是嘗試去獲取鎖,若是該區域已被其餘線程或其餘應用鎖住,那麼馬上返回 null,不然返回 FileLock 對象。

AsynchronousFileChannel 操做大致上也就以上介紹的這些接口,仍是比較簡單的,這裏就少一些廢話早點結束好了。

AsynchronousServerSocketChannel

這個類對應的是非阻塞 IO 的 ServerSocketChannel,你們能夠類比下使用方式。

咱們就廢話少說,用代碼說事吧:

package com.javadoop.aio; import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; public class Server { public static void main(String[] args) throws IOException { // 實例化,並監聽端口 AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(8080)); // 本身定義一個 Attachment 類,用於傳遞一些信息 Attachment att = new Attachment(); att.setServer(server); server.accept(att, new CompletionHandler<AsynchronousSocketChannel, Attachment>() { @Override public void completed(AsynchronousSocketChannel client, Attachment att) { try { SocketAddress clientAddr = client.getRemoteAddress(); System.out.println("收到新的鏈接:" + clientAddr); // 收到新的鏈接後,server 應該從新調用 accept 方法等待新的鏈接進來 att.getServer().accept(att, this); Attachment newAtt = new Attachment(); newAtt.setServer(server); newAtt.setClient(client); newAtt.setReadMode(true); newAtt.setBuffer(ByteBuffer.allocate(2048)); // 這裏也能夠繼續使用匿名實現類,不過代碼很差看,因此這裏專門定義一個類 client.read(newAtt.getBuffer(), newAtt, new ChannelHandler()); } catch (IOException ex) { ex.printStackTrace(); } } @Override public void failed(Throwable t, Attachment att) { System.out.println("accept failed"); } }); // 爲了防止 main 線程退出 try { Thread.currentThread().join(); } catch (InterruptedException e) { } } } 

看一下 ChannelHandler 類:

package com.javadoop.aio; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.CompletionHandler; import java.nio.charset.Charset; public class ChannelHandler implements CompletionHandler<Integer, Attachment> { @Override public void completed(Integer result, Attachment att) { if (att.isReadMode()) { // 讀取來自客戶端的數據 ByteBuffer buffer = att.getBuffer(); buffer.flip(); byte bytes[] = new byte[buffer.limit()]; buffer.get(bytes); String msg = new String(buffer.array()).toString().trim(); System.out.println("收到來自客戶端的數據: " + msg); // 響應客戶端請求,返回數據 buffer.clear(); buffer.put("Response from server!".getBytes(Charset.forName("UTF-8"))); att.setReadMode(false); buffer.flip(); // 寫數據到客戶端也是異步 att.getClient().write(buffer, att, this); } else { // 到這裏,說明往客戶端寫數據也結束了,有如下兩種選擇: // 1. 繼續等待客戶端發送新的數據過來 // att.setReadMode(true); // att.getBuffer().clear(); // att.getClient().read(att.getBuffer(), att, this); // 2. 既然服務端已經返回數據給客戶端,斷開此次的鏈接 try { att.getClient().close(); } catch (IOException e) { } } } @Override public void failed(Throwable t, Attachment att) { System.out.println("鏈接斷開"); } } 

順便再貼一下自定義的 Attachment 類:

public class Attachment { private AsynchronousServerSocketChannel server; private AsynchronousSocketChannel client; private boolean isReadMode; private ByteBuffer buffer; // getter & setter } 

這樣,一個簡單的服務端就寫好了,接下來能夠接收客戶端請求了。上面咱們用的都是回調函數的方式,讀者要是感興趣,能夠試試寫個使用 Future 的。

AsynchronousSocketChannel

其實,說完上面的 AsynchronousServerSocketChannel,基本上讀者也就知道怎麼使用 AsynchronousSocketChannel 了,和非阻塞 IO 基本相似。

這邊作個簡單演示,這樣讀者就能夠配合以前介紹的 Server 進行測試使用了。

package com.javadoop.aio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.charset.Charset; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class Client { public static void main(String[] args) throws Exception { AsynchronousSocketChannel client = AsynchronousSocketChannel.open(); // 來個 Future 形式的 Future<?> future = client.connect(new InetSocketAddress(8080)); // 阻塞一下,等待鏈接成功 future.get(); Attachment att = new Attachment(); att.setClient(client); att.setReadMode(false); att.setBuffer(ByteBuffer.allocate(2048)); byte[] data = "I am obot!".getBytes(); att.getBuffer().put(data); att.getBuffer().flip(); // 異步發送數據到服務端 client.write(att.getBuffer(), att, new ClientChannelHandler()); // 這裏休息一下再退出,給出足夠的時間處理數據 Thread.sleep(2000); } } 

往裏面看下 ClientChannelHandler 類:

package com.javadoop.aio; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.CompletionHandler; import java.nio.charset.Charset; public class ClientChannelHandler implements CompletionHandler<Integer, Attachment> { @Override public void completed(Integer result, Attachment att) { ByteBuffer buffer = att.getBuffer(); if (att.isReadMode()) { // 讀取來自服務端的數據 buffer.flip(); byte[] bytes = new byte[buffer.limit()]; buffer.get(bytes); String msg = new String(bytes, Charset.forName("UTF-8")); System.out.println("收到來自服務端的響應數據: " + msg); // 接下來,有如下兩種選擇: // 1. 向服務端發送新的數據 // att.setReadMode(false); // buffer.clear(); // String newMsg = "new message from client"; // byte[] data = newMsg.getBytes(Charset.forName("UTF-8")); // buffer.put(data); // buffer.flip(); // att.getClient().write(buffer, att, this); // 2. 關閉鏈接 try { att.getClient().close(); } catch (IOException e) { } } else { // 寫操做完成後,會進到這裏 att.setReadMode(true); buffer.clear(); att.getClient().read(buffer, att, this); } } @Override public void failed(Throwable t, Attachment att) { System.out.println("服務器無響應"); } } 

以上代碼都是能夠運行調試的,若是讀者碰到問題,請在評論區留言。

Asynchronous Channel Groups

爲了知識的完整性,有必要對 group 進行介紹,其實也就是介紹 AsynchronousChannelGroup 這個類。以前咱們說過,異步 IO 必定存在一個線程池,這個線程池負責接收任務、處理 IO 事件、回調等。這個線程池就在 group 內部,group 一旦關閉,那麼相應的線程池就會關閉。

AsynchronousServerSocketChannels 和 AsynchronousSocketChannels 是屬於 group 的,當咱們調用 AsynchronousServerSocketChannel 或 AsynchronousSocketChannel 的 open() 方法的時候,相應的 channel 就屬於默認的 group,這個 group 由 JVM 自動構造並管理。

若是咱們想要配置這個默認的 group,能夠在 JVM 啓動參數中指定如下系統變量:

  • java.nio.channels.DefaultThreadPool.threadFactory

    此係統變量用於設置 ThreadFactory,它應該是 java.util.concurrent.ThreadFactory 實現類的全限定類名。一旦咱們指定了這個 ThreadFactory 之後,group 中的線程就會使用該類產生。

  • java.nio.channels.DefaultThreadPool.initialSize

    此係統變量也很好理解,用於設置線程池的初始大小。

可能你會想要使用本身定義的 group,這樣能夠對其中的線程進行更多的控制,使用如下幾個方法便可:

  • AsynchronousChannelGroup.withCachedThreadPool(ExecutorService executor, int initialSize)
  • AsynchronousChannelGroup.withFixedThreadPool(int nThreads, ThreadFactory threadFactory)
  • AsynchronousChannelGroup.withThreadPool(ExecutorService executor)

熟悉線程池的讀者對這些方法應該很好理解,它們都是 AsynchronousChannelGroup 中的靜態方法。

至於 group 的使用就很簡單了,代碼一看就懂:

AsynchronousChannelGroup group = AsynchronousChannelGroup
        .withFixedThreadPool(10, Executors.defaultThreadFactory()); AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(group); AsynchronousSocketChannel client = AsynchronousSocketChannel.open(group); 

AsynchronousFileChannels 不屬於 group。可是它們也是關聯到一個線程池的,若是不指定,會使用系統默認的線程池,若是想要使用指定的線程池,能夠在實例化的時候使用如下方法:

public static AsynchronousFileChannel open(Path file, Set<? extends OpenOption> options, ExecutorService executor, FileAttribute<?>... attrs) { ... } 

到這裏,異步 IO 就算介紹完成了。

小結

我想,本文應該是說清楚了非阻塞 IO 和異步 IO 了,對於異步 IO,因爲網上的資料比較少,因此難免篇幅多了些。

咱們也要知道,看懂了這些,確實能夠學到一些東西,多瞭解一些知識,可是咱們仍是不多在工做中將這些知識變成工程代碼。通常而言,咱們須要在網絡應用中使用 NIO 或 AIO 來提高性能,可是,在工程上,毫不是瞭解了一些概念,知道了一些接口就能夠的,須要處理的細節還很是多。

這也是爲何 Netty/Mina 如此盛行的緣由,由於它們幫助封裝好了不少細節,提供給咱們用戶友好的接口,後面有時間我也會對 Netty 進行介紹。

(全文完)

 

原文:https://javadoop.com/post/nio-and-aio

相關文章
相關標籤/搜索