歡迎關注公衆號: nullobject 。
文章首發在我的博客 https://www.nullobject.cn,公衆號 nullobject同步更新。
這篇文章主要介紹Java AIO網絡編程。
本文所說的AIO特指Java
環境下的AIO。AIO是java
中IO模型的一種,做爲NIO的改進和加強隨JDK1.7版本更新被集成在JDK的nio包中,所以AIO也被稱做是NIO2.0。區別於傳統的BIO(Blocking IO,同步阻塞式模型,JDK1.4以前就存在於JDK中,NIO於JDK1.4版本發佈更新)的阻塞式讀寫,AIO提供了從創建鏈接到讀、寫的全異步操做。AIO可用於異步的文件讀寫和網絡通訊。本文將介紹如何使用AIO實現一個簡單的網絡通訊以及AIO的一些比較關鍵的API。java
首先以Server端爲例,須要建立一個AsynchronousServerSocketChannel
示例並綁定監聽端口,接着開始監聽客戶端鏈接:編程
public class SimpleAIOServer { public static void main(String[] args) { try { final int port = 5555; //首先打開一個ServerSocket通道並獲取AsynchronousServerSocketChannel實例: AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open(); //綁定須要監聽的端口到serverSocketChannel: serverSocketChannel.bind(new InetSocketAddress(port)); //實現一個CompletionHandler回調接口handler, //以後須要在handler的實現中處理鏈接請求和監聽下一個鏈接、數據收發,以及通訊異常。 CompletionHandler<AsynchronousSocketChannel, Object> handler = new CompletionHandler<AsynchronousSocketChannel, Object>() { @Override public void completed(final AsynchronousSocketChannel result, final Object attachment) { // 繼續監聽下一個鏈接請求 serverSocketChannel.accept(attachment, this); try { System.out.println("接受了一個鏈接:" + result.getRemoteAddress() .toString()); // 給客戶端發送數據並等待發送完成 result.write(ByteBuffer.wrap("From Server:Hello i am server".getBytes())) .get(); ByteBuffer readBuffer = ByteBuffer.allocate(128); // 阻塞等待客戶端接收數據 result.read(readBuffer) .get(); System.out.println(new String(readBuffer.array())); } catch (IOException | InterruptedException | ExecutionException e) { e.printStackTrace(); } } @Override public void failed(final Throwable exc, final Object attachment) { System.out.println("出錯了:" + exc.getMessage()); } }; serverSocketChannel.accept(null, handler); // 因爲serverSocketChannel.accept(null, handler);是一個異步方法,調用會直接返回, // 爲了讓子線程可以有時間處理監聽客戶端的鏈接會話, // 這裏經過讓主線程休眠一段時間(固然實際開發通常不會這麼作)以確保應用程序不會當即退出。 TimeUnit.MINUTES.sleep(Integer.MAX_VALUE); } catch (IOException | InterruptedException e) { e.printStackTrace(); } } }
其中result
即表示當前接受的客戶端的鏈接會話,與客戶端的通訊都須要經過該鏈接會話進行。安全
Client端:服務器
public class SimpleAIOClient { public static void main(String[] args) { try { // 打開一個SocketChannel通道並獲取AsynchronousSocketChannel實例 AsynchronousSocketChannel client = AsynchronousSocketChannel.open(); // 鏈接到服務器並處理鏈接結果 client.connect(new InetSocketAddress("127.0.0.1", 5555), null, new CompletionHandler<Void, Void>() { @Override public void completed(final Void result, final Void attachment) { System.out.println("成功鏈接到服務器!"); try { // 給服務器發送信息並等待發送完成 client.write(ByteBuffer.wrap("From client:Hello i am client".getBytes())) .get(); ByteBuffer readBuffer = ByteBuffer.allocate(128); // 阻塞等待接收服務端數據 client.read(readBuffer) .get(); System.out.println(new String(readBuffer.array())); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } @Override public void failed(final Throwable exc, final Void attachment) { exc.printStackTrace(); } }); TimeUnit.MINUTES.sleep(Integer.MAX_VALUE); } catch (IOException | InterruptedException e) { e.printStackTrace(); } } }
從第2節例子能夠看到,實現一個最簡單的AIO socket通訊server、client,主要須要這些相關的類和接口:網絡
AsynchronousServerSocketChannel
服務端Socket通道類,負責服務端Socket的建立和監聽;異步
AsynchronousSocketChannel
客戶端Socket通道類,負責客戶端消息讀寫;socket
CompletionHandler<A,V>
消息處理回調接口,是一個負責消費異步IO操做結果的消息處理器;ide
ByteBuffer
負責承載通訊過程當中須要讀、寫的消息。this
此外,還有可選的用於異步通道資源共享的AsynchronousChannelGroup
類,接下來將一一介紹這些類的主要接口及使用。線程
AsynchronousServerSocketChannel是一個 流式監聽套接字的異步通道。
AsynchronousServerSocketChannel
的使用須要通過三個步驟:建立/打開通道、綁定地址和端口和監聽客戶端鏈接請求。
1、建立/打開通道:簡單地,能夠經過調用AsynchronousServerSocketChannel
的靜態方法open()
來建立AsynchronousServerSocketChannel
實例:
try { AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open(); } catch (IOException e) { e.printStackTrace(); }
當打開通道失敗時,會拋出一個IOException
異常。AsynchronousServerSocketChannel
提供了設置通道分組(AsynchronousChannelGroup
)的功能,以實現組內通道資源共享。能夠調用open(AsynchronousChannelGroup)
重載方法建立指定分組的通道:
try { ExecutorService pool = Executors.newCachedThreadPool(); AsynchronousChannelGroup group = AsynchronousChannelGroup.withCachedThreadPool(pool, 10); AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open(group); } catch (IOException e) { e.printStackTrace(); }
AsynchronousChannelGroup
封裝了處理由綁定到組的異步通道所觸發的I/O操做完成所需的機制。每一個AsynchronousChannelGroup
關聯了一個被用於提交處理I/O事件和分發消費在組內通道上執行的異步操做結果的completion-handlers的線程池。除了處理I/O事件,該線程池還有可能處理其餘一些用於支持完成異步I/O操做的任務。從上面例子能夠看到,經過指定AsynchronousChannelGroup
的方式打開AsynchronousServerSocketChannel
,能夠定製server channel執行的線程池。有關AsynchronousChannelGroup
的詳細介紹能夠查看官方文檔註釋。若是不指定AsynchronousChannelGroup
,則AsynchronousServerSocketChannel
會歸類到一個默認的分組中。
2、綁定地址和端口:經過調用AsynchronousServerSocketChannel.bind(SocketAddress)
方法來綁定監聽地址和端口:
// 構建一個InetSocketAddress實例以指定監聽的地址和端口,若是須要指定ip,則調用InetSocketAddress(ip,port)構造方法建立便可 serverSocketChannel.bind(new InetSocketAddress(port));
3、監聽和接收客戶端鏈接請求:
監聽客戶端鏈接請求,主要經過調用AsynchronousServerSocketChannel.accept()
方法完成。accept()
有兩個重載方法:
public abstract <A> void accept(A,CompletionHandler<AsynchronousSocketChannel,? super A>); public abstract Future<AsynchronousSocketChannel> accept();
這兩個重載方法的行爲方式徹底相同,事實上,AIO的不少異步API都封裝了諸如此類的重載方法:提供CompletionHandle
回調參數或者返回一個Future<T>
類型變量。用過Feture
接口的都知道,能夠調用Feture.get()
方法阻塞等待調用結果。以第一個重載方法爲例,當接受一個新的客戶端鏈接,或者accept操做發生異常時,會經過CompletionHandler將結果返回給用戶處理:
serverSocketChannel .accept(serverSocketChannel, new CompletionHandler<AsynchronousSocketChannel, AsynchronousServerSocketChannel>() { @Override public void completed(final AsynchronousSocketChannel result, final AsynchronousServerSocketChannel attachment) { // 接收到新的客戶端鏈接時回調 // result即和該客戶端的鏈接會話 // 此時能夠經過result與客戶端進行交互 } @Override public void failed(final Throwable exc, final AsynchronousServerSocketChannel attachment) { // accept失敗時回調 } });
須要注意的是,AsynchronousServerSocketChannel
是線程安全的,但在任什麼時候候同一時間內只能容許有一個accept操做。所以,必須得等待前一個accept
操做完成以後才能啓動下一個accept
:
serverSocketChannel .accept(serverSocketChannel, new CompletionHandler<AsynchronousSocketChannel, AsynchronousServerSocketChannel>() { @Override public void completed(final AsynchronousSocketChannel result, final AsynchronousServerSocketChannel attachment) { // 接收到新的客戶端鏈接,此時本次accept已經完成 // 繼續監聽下一個客戶端鏈接到來 serverSocketChannel.accept(serverSocketChannel,this); // result即和該客戶端的鏈接會話 // 此時能夠經過result與客戶端進行交互 } ... });
此外,還能夠經過如下方法獲取和設置AsynchronousServerSocketChannel
的socket
選項:
// 設置socket選項 serverSocketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE,true); // 獲取socket選項設置 boolean keepAlive = serverSocketChannel.getOption(StandardSocketOptions.SO_KEEPALIVE);
其中StandardSocketOptions
類封裝了經常使用的socket設置選項。
獲取本地地址:
InetSocketAddress address = (InetSocketAddress) serverSocketChannel.getLocalAddress();
AsynchronousSocketChannel是一個 流式鏈接套接字的異步通道。
AsynchronousSocketChannel
表示服務端與客戶端之間的鏈接通道。客戶端能夠經過調用AsynchronousSocketChannel
靜態方法open()
建立,而服務端則經過調用AsynchronousServerSocketChannel.accept()
方法後由AIO內部在合適的時候建立。下面以客戶端實現爲例,介紹AsynchronousSocketChannel
。
1、建立AsynchronousSocketChannel並鏈接到服務端:須要經過open()
建立和打開一個AsynchronousSocketChannel
實例,再調用其connect()
方法鏈接到服務端,接着才能夠與服務端交互:
// 打開一個socket通道 AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open(); // 阻塞等待鏈接成功 socketChannel.connect(new InetSocketAddress(ip,port)).get(); // 鏈接成功,接下來能夠進行read、write操做
同AsynchronousServerSocketChannel
,AsynchronousSocketChannel
也提供了open(AsynchronousChannelGroup)
方法用於指定通道分組和定製線程池。socketChannel.connect()
也提供了CompletionHandler
回調和Future
返回值兩個重載方法,上面例子使用帶Future返回值的重載,並調用get()
方法阻塞等待鏈接創建完成。
2、發送消息:
能夠構建一個ByteBuffer
對象並調用socketChannel.write(ByteBuffer)
方法異步發送消息,並經過CompletionHandler
回調接收處理髮送結果:
ByteBuffer writeBuf = ByteBuffer.wrap("From socketChannel:Hello i am socketChannel".getBytes()); socketChannel.write(writeBuf, null, new CompletionHandler<Integer, Object>() { @Override public void completed(final Integer result, final Object attachment) { // 發送完成,result:總共寫入的字節數 } @Override public void failed(final Throwable exc, final Object attachment) { // 發送失敗 } });
3、讀取消息:
構建一個指定接收長度的ByteBuffer
用於接收數據,調用socketChannel.read()
方法讀取消息並經過CompletionHandler
處理讀取結果:
ByteBuffer readBuffer = ByteBuffer.allocate(128); socketChannel.read(readBuffer, null, new CompletionHandler<Integer, Object>() { @Override public void completed(final Integer result, final Object attachment) { // 讀取完成,result:實際讀取的字節數。若是通道中沒有數據可讀則result=-1。 } @Override public void failed(final Throwable exc, final Object attachment) { // 讀取失敗 } });
此外,AsynchronousSocketChannel
也封裝了設置/獲取socket選項的方法:
// 設置socket選項 socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE,true); // 獲取socket選項設置 boolean keepAlive = socketChannel.getOption(StandardSocketOptions.SO_KEEPALIVE);
CompletionHandler是一個用於 消費異步I/O操做結果的處理器。
AIO中定義的異步通道容許指定一個CompletionHandler
處理器消費一個異步操做的結果。從上文中也能夠看到,AIO中大部分的異步I/O操做接口都封裝了一個帶CompletionHandler
類型參數的重載方法,使用CompletionHandler
能夠很方便地處理AIO中的異步I/O操做結果。CompletionHandler
是一個具備兩個泛型類型參數的接口,聲明瞭兩個接口方法:
public interface CompletionHandler<V,A> { void completed(V result, A attachment); void failed(Throwable exc, A attachment); }
其中,泛型V表示I/O操做的結果類型,經過該類型參數消費I/O操做的結果;泛型A爲附加到I/O操做中的對象類型,能夠經過該類型參數將須要的變量傳入到CompletionHandler實現中使用。所以,AIO中大部分的異步I/O操做都有一個相似這樣的重載方法:
<V,A> void ioOperate(params,A attachment,CompletionHandler<V,A> handler);
例如,AsynchronousServerSocketChannel.accept()
方法:
public abstract <A> void accept(A attachment,CompletionHandler<AsynchronousSocketChannel,? super A> handler);
AsynchronousSocketChannel.write()
方法等:
public final <A> void write(ByteBuffer src,A attachment,CompletionHandler<Integer,? super A> handler)
當I/O操做成功完成時,會回調到completed
方法,failed
方法則在I/O操做失敗時被回調。須要注意的是:在CompletionHandler
的實現中應立即使處理操做結果,以免一直佔用調用線程而不能分發其餘的CompletionHandler
處理器。