AIO也叫NIO2.0或者NIO.2,隨JDK1.7發佈。NIO的API比較麻煩,易錯,開發效率低。AIO經過回調函數的方式來表示異步通訊,API相對簡單一些。java
AIO在Windows系統中底層使用IOCP這樣系統級的支持,比NIO的性能要好。但Java的服務端程序不多將Windows系統做爲生產服務器。而在Linux系統上(內核2.6以上),AIO的底層使用的依然是epoll技術,與NIO同樣,只不過封裝成異步IO的樣子,簡化了API而已。服務器
接下來經過一個客戶端與服務端通訊的例子,來學習使用AIO。客戶端每隔1秒向服務端發送請求,服務端響應並返回數據。能夠與上一篇的NIO對比學習。異步
服務端:ide
package cn.testAio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; /** * @Description : TODO * @Author : houshuiqiang@163.com, 2017年10月3日 下午1:38:39 * @Modified :houshuiqiang@163.com, 2017年10月3日 */ public class AioDemoServer { public static void main(String[] args) { AioServer aioServer = new AioServer(8181); new Thread(aioServer, "aio-server-test").start(); } } class AioServer implements Runnable { private AsynchronousServerSocketChannel assChannel; private CountDownLatch cdl; public AioServer (int port) { try { assChannel = AsynchronousServerSocketChannel.open(); assChannel.bind(new InetSocketAddress(port)); cdl = new CountDownLatch(1); } catch (IOException e) { e.printStackTrace(); System.exit(1); } } @Override public void run() { assChannel.accept(this, new CompletionHandler<AsynchronousSocketChannel,AioServer>(){ // 註冊接受接連的回調函數 @Override public void completed(AsynchronousSocketChannel result, AioServer attachment) { // 若是鏈接成功 assChannel.accept(attachment, this); // 等待下一個連接 ByteBuffer buffer = ByteBuffer.allocate(1024); // 1M的空間用來讀取數據,實際可能會>1M,須要屢次註冊讀的回調函數 result.read(buffer, buffer, new ReadsCompletionHandler(result)); // 註冊讀取完成以後的回調函數 } @Override public void failed(Throwable exc, AioServer attachment) { // 若是鏈接失敗 exc.printStackTrace(); attachment.cdl.countDown(); } }); try { cdl.await(); // 阻塞該線程,使Server保持運行 } catch (InterruptedException e) { e.printStackTrace(); } } } class ReadsCompletionHandler implements CompletionHandler<Integer, ByteBuffer>{ private AsynchronousSocketChannel channel; public ReadsCompletionHandler(AsynchronousSocketChannel channel){ this.channel = channel; } @Override public void completed(Integer result, ByteBuffer attachment) { try { String body = getBody(attachment); // 獲取請求內容 String resultBody = handlerBody(channel, body); // 模擬處理請求,獲得返回結果 write2Client(channel, resultBody); // 將結果返回給客戶端 } catch (IOException e) { e.printStackTrace(); // ignore } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { this.channel.close(); } catch (IOException e) { e.printStackTrace(); } } private String getBody(ByteBuffer byteBuffer){ byteBuffer.flip(); byte[] body = new byte[byteBuffer.remaining()]; byteBuffer.get(body); return new String(body); } private String handlerBody(AsynchronousSocketChannel channel, String body) throws IOException{ String remoteAddress = channel.getRemoteAddress().toString(); System.out.println("message from client : " + remoteAddress + ", content: " + body); // 模擬請求處理 return "server received message: " + body; // 模擬返回處理結果 } private void write2Client(AsynchronousSocketChannel channel, String resultBody){ byte[] bytes = resultBody.getBytes(); ByteBuffer byteBuffer = ByteBuffer.allocate(bytes.length); byteBuffer.put(bytes); byteBuffer.flip(); channel.write(byteBuffer, byteBuffer, new CompletionHandler<Integer, ByteBuffer>(){ // 註冊寫完以後的回調函數 @Override public void completed(Integer result, ByteBuffer attachment) { if (attachment.hasRemaining()) { channel.write(attachment, attachment, this); }else{ // 寫完以後註冊讀,等待客戶端再次請求 ByteBuffer buffer = ByteBuffer.allocate(1024); // 1M的空間用來讀取數據,實際可能會>1M channel.read(buffer, buffer, new ReadsCompletionHandler(channel)); // 註冊讀取完成以後的回調函數 } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { channel.close(); } catch (IOException e) { // ingnore on close } } }); } }
客戶端:函數
package cn.testAio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; /** * @Description : TODO * @Author : houshuiqiang@163.com, 2017年10月3日 下午3:16:10 * @Modified :houshuiqiang@163.com, 2017年10月3日 */ public class AioDemoClient { public static void main(String[] args) throws InterruptedException { AioClient aioClient = new AioClient("192.168.10.47", 8181); Thread aioClientThread = new Thread(aioClient, "aio-client-test"); aioClientThread.start(); for (int i = 0; i < 5; i++) { aioClient.getQueue().offer("time" + i); Thread.sleep(1000); } aioClient.stop(); aioClient = null; } } class AioClient implements Runnable{ private int port; private String address; private AsynchronousSocketChannel channel; private LinkedBlockingQueue<String> queue; public AioClient(String address, int port){ this.address = address; this.port = port; queue = new LinkedBlockingQueue<String>(); try { channel = AsynchronousSocketChannel.open(); } catch (IOException e) { e.printStackTrace(); System.exit(1); } } public void stop(){ if (channel.isOpen()) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } } } @Override public void run() { channel.connect(new InetSocketAddress(address, port), this, new CompletionHandler<Void, AioClient>(){ @Override public void completed(Void result, AioClient attachment) { try { sendRequest(); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc, AioClient attachment) { exc.printStackTrace(); } }); } public BlockingQueue<String> getQueue(){ return queue; } private void sendRequest() throws InterruptedException{ String requestBody = queue.take(); // 從隊列中阻塞獲取數據 byte[] bytes = requestBody.getBytes(); ByteBuffer byteBuffer = ByteBuffer.allocate(bytes.length); byteBuffer.put(bytes); byteBuffer.flip(); channel.write(byteBuffer, byteBuffer, new CompletionHandler<Integer, ByteBuffer>(){ // 註冊寫完以後的回調函數 @Override public void completed(Integer result, ByteBuffer attachment) { if (attachment.hasRemaining()) { // 若是buffer尚未寫完 channel.write(attachment, attachment, this); } else { readResult(); // 寫完以後註冊讀的回調函數,等待服務端返回數據 } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } } }); } private void readResult(){ ByteBuffer readBuffer = ByteBuffer.allocate(1024); channel.read(readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>(){ @Override public void completed(Integer result, ByteBuffer attachment) { attachment.flip(); byte[] bytes = new byte[attachment.remaining()]; attachment.get(bytes); String body = new String(bytes); System.out.println("The msg from Server is :" + body); try { sendRequest(); // 讀完以後,註冊寫的回調函數 } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } } }); } }