轉載請註明出處:http://www.cnblogs.com/Joanna-Yan/p/7794151.html html
前面講到:Java IO編程全解(四)——NIO編程java
NIO2.0引入了新的異步通道的概念,並提供了異步文件通道和異步套接字通道的實現。異步通道提供兩種方式獲取操做結果。編程
CompletionHandler接口的實現類做爲操做完成的回調。服務器
NIO2.0的異步套接字通道是真正的異步非阻塞I/O,它對UNIX網絡編程中的事件驅動I/O(AIO),它不須要經過多路複用器(Selector)對註冊的通道進行輪詢操做便可實現異步讀寫,從而簡化了NIO的編程模型。微信
下面經過代碼來熟悉NIO2.0 AIO的相關類庫,仍舊以時間服務器爲例程進行講解。網絡
package joanna.yan.aio; public class TimeServer { public static void main(String[] args) { int port=9090; if(args!=null&&args.length>0){ try { port=Integer.valueOf(args[0]); } catch (Exception e) { // 採用默認值 } } AsyncTimeServerHandler timeServer=new AsyncTimeServerHandler(port); new Thread(timeServer,"AIO-AsyncTimeServerHandler-001").start(); } }
package joanna.yan.aio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.AsynchronousServerSocketChannel; import java.util.concurrent.CountDownLatch; public class AsyncTimeServerHandler implements Runnable{ private int port; CountDownLatch latch; AsynchronousServerSocketChannel asynchronousServerSocketChannel; public AsyncTimeServerHandler(int port){ this.port=port; try { //建立一個異步的服務端通道AsynchronousServerSocketChannel asynchronousServerSocketChannel=AsynchronousServerSocketChannel.open(); asynchronousServerSocketChannel.bind(new InetSocketAddress(port)); System.out.println("The time server is start in port: "+port); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { /* *初始化CountDownLatch對象。 *它的做用是,在完成一組正在執行的操做以前,容許當前的線程一直阻塞。 *在本例中,咱們讓線程在此阻塞,防止服務器執行完成退出。 *在實際項目應用中,不須要啓動獨立的線程來處理AsynchronousServerSocketChannel,這裏僅僅是個demo演示。 */ latch=new CountDownLatch(1); doAccept(); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } /** * 接收客戶端的鏈接。 * 因爲這裏是異步操做。咱們能夠傳遞一個CompletionHandler<AsynchronousSocketChannel,? super A>類型 * 的handler實例接收accept操做成功的通知消息 */ public void doAccept() { asynchronousServerSocketChannel.accept(this, new AcceptCompletionHandler()); } }
package joanna.yan.aio; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; /** * 接收accept操做成功的通知消息 * @author Administrator * */ public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel,AsyncTimeServerHandler>{ @Override public void completed(AsynchronousSocketChannel result, AsyncTimeServerHandler attachment) { /* * 疑惑:既然已經接收客戶端成功了,爲何還要再次調用accept方法呢? * 緣由:當咱們調用AsynchronousServerSocketChannel的accept方法後,若是有新的客戶端鏈接接入, * 系統將回調咱們傳入的CompletionHandler實例的completed方法,表示新的客戶端已經接入成功, * 由於一個AsynchronousServerSocketChannel能夠接收成千上萬個客戶端,因此咱們須要繼續調用它的accep方法, * 接收其餘的客戶端鏈接,最終造成一個循環。 * 每當接收一個客戶讀鏈接成功以後,再異步接收新的客戶端鏈接。 */ attachment.asynchronousServerSocketChannel.accept(attachment, this); ByteBuffer buffer=ByteBuffer.allocate(1024); /* * 參數一:接收緩衝區,用於從異步Channel中讀取數據包; * 參數二:異步Channel攜帶的附件,通知回調的時候做爲入參使用; * 參數三:接收通知回調的業務handler */ result.read(buffer, buffer, new ReadCompletionHandler(result)); } @Override public void failed(Throwable exc, AsyncTimeServerHandler attachment) { exc.printStackTrace(); attachment.latch.countDown(); } }
package joanna.yan.aio; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.Date; /** * 主要用於讀取半包消息和發送應答。 * 本例不對半包讀寫進行具體說明,在後面的Netty半包處理中會介紹。 * @author Administrator * */ public class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer>{ private AsynchronousSocketChannel channel; public ReadCompletionHandler(AsynchronousSocketChannel channel){ if(this.channel==null){ this.channel=channel; } } @Override public void completed(Integer result, ByteBuffer attachment) { //爲後續從緩衝區讀取數據作準備 attachment.flip(); byte[] body=new byte[attachment.remaining()]; attachment.get(body); try { String req=new String(body, "UTF-8"); System.out.println("The time server receive order : "+req); String currentTime="QUERY TIME ORDER".equalsIgnoreCase(req) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER"; doWrite(currentTime); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } /** * 發生異常的時候調用。 * 對異常Throwable進行判斷,若是是I/O異常,就關閉鏈路,釋放資源; * 若是是其它異常,按照業務本身的邏輯進行處理。 * 本例做爲簡單demo,沒有對異常進行分類判斷,只要發生了讀寫異常,就關閉鏈路,釋放資源。 */ @Override public void failed(Throwable exc, ByteBuffer attachment) { try { this.channel.close(); } catch (IOException e) { e.printStackTrace(); } } private void doWrite(String currentTime) { if(currentTime!=null&¤tTime.trim().length()>0){ byte[] bytes=currentTime.getBytes(); ByteBuffer writeBuffer=ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); channel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer buffer) { //若是沒有發送完成,繼續發送 if(buffer.hasRemaining()){ channel.write(buffer, buffer, this); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { channel.close(); } catch (IOException e) { //ingnore on close } } }); } } }
package joanna.yan.aio; public class TimeClient { public static void main(String[] args) { int port=9090; if(args!=null&&args.length>0){ try { port=Integer.valueOf(args[0]); } catch (Exception e) { // 採用默認值 } } /* * 經過一個獨立的I/O線程建立異步時間服務器客戶端handler。 * 在實際項目中,咱們不須要獨立的線程建立異步鏈接對象,由於底層都是經過JDK的系統回調實現的。 */ new Thread(new AsyncTimeClientHandler("127.0.0.1", port),"AIO-AsyncTimeClientHandle-001").start(); /* * 須要指出的是,正如以前的NIO例程,咱們並無完整的處理網絡的半包讀寫,在對例程進行功能測試的是尚未問題, * 可是,若是對代碼稍加改造,進行壓力或者性能測試,就會發現輸出結果存在問題。 * 這裏只集中將NIO的入門知識,後面會詳細講到半包讀寫 */ } }
package joanna.yan.aio; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class AsyncTimeClientHandler implements CompletionHandler<Void, AsyncTimeClientHandler>,Runnable{ private AsynchronousSocketChannel client; private String host; private int port; private CountDownLatch latch; public AsyncTimeClientHandler(String host,int port){ this.host=host; this.port=port; try { client=AsynchronousSocketChannel.open(); } catch (IOException e) { e.printStackTrace(); } } @Override public void completed(Void result, AsyncTimeClientHandler attachment) { byte[] req="QUERY TIME ORDER".getBytes(); ByteBuffer writeBuffer=ByteBuffer.allocate(req.length); writeBuffer.put(req); writeBuffer.flip(); client.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {//用於寫操做完成後的回調 @Override public void completed(Integer result, ByteBuffer buffer) { if(buffer.hasRemaining()){ client.write(buffer, buffer, this); }else{ ByteBuffer readBuffer=ByteBuffer.allocate(1024); client.read(readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>(){//當讀取完成被JDK回調時,構造應答消息。 @Override public void completed(Integer result, ByteBuffer buffer) { buffer.flip(); byte[] bytes=new byte[buffer.remaining()]; buffer.get(bytes); String body; try { body=new String(bytes, "UTF-8"); System.out.println("Now is : "+body); latch.countDown(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { client.close(); //讓AsyncTimeClientHandler線程執行完畢,客戶端退出執行 latch.countDown(); } catch (IOException e) { //ingnore on close } } }); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { client.close(); latch.countDown(); } catch (IOException e) { //ingnore on close } } }); } @Override public void failed(Throwable exc, AsyncTimeClientHandler attachment) { exc.printStackTrace(); try { client.close(); latch.countDown(); } catch (IOException e) { //ingnore on close } } @Override public void run() { //建立CountDownLatch進行等待,防止異步操做沒有執行完成線程就退出。 latch=new CountDownLatch(1); /* * 參數二:AsynchronousSocketChannel的附件,用於回調通知時做爲入參被傳遞,調用者能夠自定義 * 參數三:異步參數回調通知接口,由調用者實現。 */ client.connect(new InetSocketAddress(host, port), this, this); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } try { client.close(); } catch (IOException e) { e.printStackTrace(); } } }
異步SocketChannel是被動執行對象,咱們不須要像NIO編程那樣建立一個獨立I/O線程來處理讀寫操做。對於AsynchronousServerSocketChannel和 AsynchronousSocketChannel,它們都由JDK底層的線程池負責回調並驅動讀寫操做。正由於如此,基於NIO2.0新的異步非阻塞Channel進行編程比NIO編程更爲簡單。異步
後面,咱們將對前面講到的4種I/O進行概念澄清和比較,讓你們從總體上掌握這些I/O模型的差別。以便在將來的工做中可以根據產品的實際狀況選擇合適的I/O模型。async
Java IO編程全解(六)——4種I/O的對比與選型ide
若是此文對您有幫助,微信打賞我一下吧~源碼分析