咱們知道NIO是同步非阻塞,服務器實現模式爲一個請求一個線程,即客戶端發送的鏈接請求都會註冊到多路複用器上,多路複用器輪詢到鏈接有I/O請求時才啓動一個線程進行處理。
而AIO則是則是異步非阻塞的,而且提供了異步文件通道和異步套接字通道的實現。主要經過兩種方式獲取操做的結果:java
public class TimeServer { public static void main(String[] args){ int port = 8080; if (args != null && args.length >0){ try{ port = Integer.parseInt(args[0]); }catch (NumberFormatException e){ } } AsyncTimeServerHandler timeServerHandler = new AsyncTimeServerHandler(port); new Thread(timeServerHandler,"AIO-AsyncTimeServerHandler-001").start(); } }
首先建立異步的時間服務處理器,而後啓動線程將異步時間服務Handler拉起編程
public class AsyncTimeServerHandler implements Runnable{ private int port; CountDownLatch countDownLatch; AsynchronousServerSocketChannel channel; public AsyncTimeServerHandler(int port) { this.port = port; try { channel = AsynchronousServerSocketChannel.open(); channel.bind(new InetSocketAddress(port)); }catch (IOException e){ e.printStackTrace(); } } public void run() { countDownLatch = new CountDownLatch(1); doAccept(); try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } public void doAccept(){ channel.accept(this,new AcceptCompletionHandler()); } }
在構造方法中,咱們建立了一個一步的Channel,而後調用bind方法綁定了監聽的端口。
在run方法中咱們初始化了一個CountDownLatch對象,是爲了在完成一組正在執行的操做以前,線程一直阻塞在那兒
在doAccept方法中接收客戶端的鏈接,咱們能夠傳遞一個handler示例接受accept操做成功的通知消息,其代碼以下:服務器
public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel,AsyncTimeServerHandler>{ @Override public void completed(AsynchronousSocketChannel result, AsyncTimeServerHandler attachment) { attachment.channel.accept(attachment,this); ByteBuffer buffer = ByteBuffer.allocate(1024); result.read(buffer,buffer,new ReadCompltetionHandler(result)); } @Override public void failed(Throwable exc, AsyncTimeServerHandler attachment) { exc.printStackTrace(); attachment.countDownLatch.countDown(); } }
其中,咱們在complete方法中繼續調用了accept方法,是爲了有新的客戶端接入成功,由於一個AsynchronousServerSocketChannel能夠接受成千上萬個客戶端
而鏈路創建成功之後,服務端能夠接受客戶端的請求消息了,經過read方法進行異步讀操做,其中傳入了一個Handler,接受通知回調業務。其代碼以下網絡
public class ReadCompltetionHandler implements CompletionHandler<Integer,ByteBuffer>{ private AsynchronousSocketChannel channel; public ReadCompltetionHandler(AsynchronousSocketChannel channel) { if (channel == null) this.channel = channel; } @Override public void completed(Integer result, ByteBuffer attachment) { attachment.flip(); byte[] body = new byte[attachment.remaining()]; attachment.get(body); try { String req = new String(body,"UTF-8"); String curentTime = "QUERY TIME ORDER".equalsIgnoreCase(req)?new Date( System.currentTimeMillis() ).toString():"BAD ORDER"; doWrite(curentTime); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } private void doWrite(String currentTime){ if (!StringUtil.isNullOrEmpty(currentTime)){ byte[] bytes = currentTime.getBytes(); ByteBuffer buffer = ByteBuffer.allocate(bytes.length); buffer.put(bytes); buffer.flip(); channel.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer attachment) { if (buffer.hasRemaining()){ channel.write(buffer,buffer,this); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } } }); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { this.channel.close(); } catch (IOException e) { e.printStackTrace(); } } }
其中具體的處理邏輯和NIO的TimeServer相同,不作詳細分析了異步
public class TimeClient { public static void main(String[] args){ int port = 8080; if (args != null && args.length >0){ try{ port = Integer.parseInt(args[0]); }catch (NumberFormatException e){ } } AsyncTimeClientHandler timeClientHandler = new AsyncTimeClientHandler("127.0.0.1",port); new Thread(timeClientHandler,"AIO-AsyncTimeClientHandler-001").start(); } }
在其中咱們經過一個I/O線程建立一步時間服務器客戶端Handler,具體代碼以下:ide
public class AsyncTimeClientHandler implements CompletionHandler<Void,AsyncTimeClientHandler>,Runnable{ private AsynchronousSocketChannel client; private String host; private int port; private CountDownLatch countDownLatch; public AsyncTimeClientHandler(String host, int port) { this.host = host; this.port = port; try { client = AsynchronousSocketChannel.open(); } catch (IOException e) { e.printStackTrace(); } } @Override public void completed(Void result, AsyncTimeClientHandler attachment) { byte[] req = "QUERY TIME ORDER".getBytes(); ByteBuffer buffer = ByteBuffer.allocate(req.length); buffer.put(req); buffer.flip(); client.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer attachment) { if (attachment.hasRemaining()){ client.write(attachment,attachment,this); }else{ ByteBuffer readBuffer = ByteBuffer.allocate(1024); client.read(readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer attachment) { byte[] bytes = new byte[attachment.remaining()]; attachment.get(bytes); try { String body = new String(bytes,"UTF-8"); System.out.print(body); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { client.close(); countDownLatch.countDown(); } catch (IOException e) { e.printStackTrace(); } } }); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { client.close(); countDownLatch.countDown(); } catch (IOException e) { e.printStackTrace(); } } }); } @Override public void failed(Throwable exc, AsyncTimeClientHandler attachment) { exc.printStackTrace(); try { client.close(); countDownLatch.countDown(); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { countDownLatch = new CountDownLatch(1); client.connect(new InetSocketAddress(host,port),this,this); try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } try { client.close(); } catch (IOException e) { e.printStackTrace(); } } }