回顧下Linux下阻塞IO模型:linux
再看看Java的BIO編程模型:編程
/** * 類說明:客戶端 */ public class BioClient { public static void main(String[] args) throws InterruptedException, IOException { //經過構造函數建立Socket,而且鏈接指定地址和端口的服務端 Socket socket = new Socket(DEFAULT_SERVER_IP,DEFAULT_PORT); System.out.println("請輸入請求消息:"); //啓動讀取服務端輸出數據的線程 new ReadMsg(socket).start(); PrintWriter pw = null; //容許客戶端在控制檯輸入數據,而後送往服務器 while(true){ pw = new PrintWriter(socket.getOutputStream()); pw.println(new Scanner(System.in).next()); pw.flush(); } } //讀取服務端輸出數據的線程 private static class ReadMsg extends Thread { Socket socket; public ReadMsg(Socket socket) { this.socket = socket; } @Override public void run() { //負責socket讀寫的輸入流 try (BufferedReader br = new BufferedReader( new InputStreamReader(socket.getInputStream()))){ String line = null; //經過輸入流讀取服務端傳輸的數據 //若是已經讀到輸入流尾部,返回null,退出循環 //若是獲得非空值,就將結果進行業務處理 while((line=br.readLine())!=null){ System.out.printf("%s\n",line); } } catch (SocketException e) { System.out.printf("%s\n", "服務器斷開了你的鏈接"); } catch (Exception e) { e.printStackTrace(); } finally { clear(); } } //必要的資源清理工做 private void clear() { if (socket != null) try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } } /** * 類說明:bio的服務端主程序 */ public class BioServer { //服務器端必須 private static ServerSocket server; //線程池,處理每一個客戶端的請求 private static ExecutorService executorService = Executors.newFixedThreadPool(5); private static void start() throws IOException{ try{ //經過構造函數建立ServerSocket //若是端口合法且空閒,服務端就監聽成功 server = new ServerSocket(DEFAULT_PORT); System.out.println("服務器已啓動,端口號:" + DEFAULT_PORT); while(true){ Socket socket= server.accept(); System.out.println("有新的客戶端鏈接----" ); //當有新的客戶端接入時,打包成一個任務,投入線程池 executorService.execute(new BioServerHandler(socket)); } }finally{ if(server!=null){ server.close(); } } } public static void main(String[] args) throws IOException { start(); } } /** * 類說明: */ public class BioServerHandler implements Runnable{ private Socket socket; public BioServerHandler(Socket socket) { this.socket = socket; } public void run() { try(//負責socket讀寫的輸出、輸入流 BufferedReader in = new BufferedReader( new InputStreamReader(socket.getInputStream())); PrintWriter out = new PrintWriter(socket.getOutputStream(), true)){ String message; String result; //經過輸入流讀取客戶端傳輸的數據 //若是已經讀到輸入流尾部,返回null,退出循環 //若是獲得非空值,就將結果進行業務處理 while((message = in.readLine())!=null){ System.out.println("Server accept message:"+message); result = response(message); //將業務結果經過輸出流返回給客戶端 out.println(result); } }catch(Exception e){ e.printStackTrace(); }finally{ if(socket != null){ try { socket.close(); } catch (IOException e) { e.printStackTrace(); } socket = null; } } } }
過程:windows
最先的時候服務器端是針對一個鏈接新建一個線程來處理→演變成服務端針對每一個客戶端鏈接把請求丟進線程池來處理任務
缺點:若高併發場景且處理時間稍長則許多請求會阻塞一直等待,嚴重影響性能.數組
先回顧下Linux下AIO模型:服務器
原生JDK網絡編程AIO:網絡
異步IO採用「訂閱-通知」模式:即應用程序向操做系統註冊IO監聽,而後繼續作本身的事情。當操做系統發生IO事件,而且準備好數據後,在主動通知應用程序,觸發相應的函數。
注意:異步IO裏面客戶端和服務端均採用這種「訂閱-通知」模式.併發
AIO編程幾個核心類:
①:AsynchronousServerSocketChannel:相似BIO裏面的ServerSocket異步
②:AsynchronousSocketChannel :相似BIO裏面的socket用來通訊,有三個方法:connect():用於鏈接到指定端口,指定IP地址的服務器,read()、write():完成讀寫
注意點:socket
Channel可看作JDK對IO的抽象,除了網絡通道,還有文件通道FileChannel。ide
③:CompletionHandler:源碼註釋是異步IO操做中用來處理消費的結果,其實也就是結果回調函數,鏈接丶讀寫都是異步操做都須要實現此接口。
而CompletionHandler接口中定義了兩個方法,
先上代碼
客戶端:
/** * 類說明:aio的客戶端主程序 */ public class AioClient { //IO通訊處理器 private static AioClientHandler clientHandle; public static void start(){ if(clientHandle!=null) return; clientHandle = new AioClientHandler(DEFAULT_SERVER_IP,DEFAULT_PORT); //負責網絡通信的線程 new Thread(clientHandle,"Client").start(); } //向服務器發送消息 public static boolean sendMsg(String msg) throws Exception{ if(msg.equals("q")) return false; clientHandle.sendMessag(msg); return true; } public static void main(String[] args) throws Exception{ AioClient.start(); System.out.println("請輸入請求消息:"); Scanner scanner = new Scanner(System.in); while(AioClient.sendMsg(scanner.nextLine())); } } /** * 類說明:IO通訊處理器,負責鏈接服務器,對外暴露對服務端發送數據的API */ public class AioClientHandler implements CompletionHandler<Void,AioClientHandler>,Runnable { private AsynchronousSocketChannel clientChannel; private String host; private int port; private CountDownLatch latch;//防止線程退出 public AioClientHandler(String host, int port) { this.host = host; this.port = port; try { //建立一個實際異步的客戶端通道 clientChannel = AsynchronousSocketChannel.open(); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { //建立CountDownLatch,由於是異步調用,下面的connect不會阻塞, // 那麼整個run方法會迅速結束,那麼負責網絡通信的線程也會迅速結束 latch = new CountDownLatch(1); //發起異步鏈接操做,回調參數就是這個實例自己, // 若是鏈接成功會回調這個實例的completed方法 clientChannel.connect(new InetSocketAddress(host,port), null,this); try { latch.await(); clientChannel.close(); } catch (InterruptedException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } //鏈接成功,這個方法會被系統調用 @Override public void completed(Void result, AioClientHandler attachment) { System.out.println("已經鏈接到服務端。"); } //鏈接失敗,這個方法會被系統調用 @Override public void failed(Throwable exc, AioClientHandler attachment) { System.err.println("鏈接失敗。"); exc.printStackTrace(); latch.countDown(); try { clientChannel.close(); } catch (IOException e) { e.printStackTrace(); } } //對外暴露對服務端發送數據的API public void sendMessag(String msg){ /*爲了把msg變成能夠在網絡傳輸的格式*/ byte[] bytes = msg.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); /*進行異步寫,一樣的這個方法會迅速返回, 須要提供一個接口讓系統在一次網絡寫操做完成後通知咱們的應用程序。 因此咱們傳入一個實現了CompletionHandler的AioClientWriteHandler 第1個writeBuffer,表示咱們要發送給服務器的數據; 第2個writeBuffer,考慮到網絡寫有可能沒法一次性將數據寫完,須要進行屢次網絡寫, 因此將writeBuffer做爲附件傳遞給AioClientWriteHandler。 */ clientChannel.write(writeBuffer,writeBuffer, new AioClientWriteHandler(clientChannel,latch)); } } /** * 類說明:網絡寫的處理器,CompletionHandler<Integer, ByteBuffer>中 * Integer:本次網絡寫操做完成實際寫入的字節數, * ByteBuffer:寫操做的附件,存儲了寫操做須要寫入的數據 */ public class AioClientWriteHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public AioClientWriteHandler(AsynchronousSocketChannel clientChannel, CountDownLatch latch) { this.clientChannel = clientChannel; this.latch = latch; } @Override public void completed(Integer result, ByteBuffer buffer) { //有可能沒法一次性將數據寫完,須要檢查緩衝區中是否還有數據須要繼續進行網絡寫 if(buffer.hasRemaining()){ clientChannel.write(buffer,buffer,this); }else{ //寫操做已經完成,爲讀取服務端傳回的數據創建緩衝區 ByteBuffer readBuffer = ByteBuffer.allocate(1024); /*這個方法會迅速返回,須要提供一個接口讓 系統在讀操做完成後通知咱們的應用程序。*/ clientChannel.read(readBuffer,readBuffer, new AioClientReadHandler(clientChannel,latch)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { System.err.println("數據發送失敗..."); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { } } } /** * 類說明:網絡讀的處理器 * CompletionHandler<Integer, ByteBuffer>中 * Integer:本次網絡讀操做實際讀取的字節數, * ByteBuffer:讀操做的附件,存儲了讀操做讀到的數據 * */ public class AioClientReadHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public AioClientReadHandler(AsynchronousSocketChannel clientChannel, CountDownLatch latch) { this.clientChannel = clientChannel; this.latch = latch; } @Override public void completed(Integer result,ByteBuffer buffer) { buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String msg; try { msg = new String(bytes,"UTF-8"); System.out.println("accept message:"+msg); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc,ByteBuffer attachment) { System.err.println("數據讀取失敗..."); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { } } }
服務端:
/** * 類說明:服務器主程序 */ public class AioServer { private static AioServerHandler serverHandle; //統計客戶端個數 public volatile static long clientCount = 0; public static void start(){ if(serverHandle!=null) return; serverHandle = new AioServerHandler(DEFAULT_PORT); new Thread(serverHandle,"Server").start(); } public static void main(String[] args){ AioServer.start(); } } /** * 類說明:處理用戶鏈接的處理器 */ public class AioAcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AioServerHandler> { @Override public void completed(AsynchronousSocketChannel channel, AioServerHandler serverHandler) { AioServer.clientCount++; System.out.println("鏈接的客戶端數:" + AioServer.clientCount); //從新註冊監聽,讓別的客戶端也能夠鏈接 serverHandler.channel.accept(serverHandler,this); ByteBuffer readBuffer = ByteBuffer.allocate(1024); //1)ByteBuffer dst:接收緩衝區,用於從異步Channel中讀取數據包; //2) A attachment:異步Channel攜帶的附件,通知回調的時候做爲入參使用; //3) CompletionHandler<Integer,? super A>:系統回調的業務handler,進行讀操做 channel.read(readBuffer,readBuffer, new AioReadHandler(channel)); } @Override public void failed(Throwable exc, AioServerHandler serverHandler) { exc.printStackTrace(); serverHandler.latch.countDown(); } } /** * 類說明:讀數據的處理器 */ public class AioReadHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel channel; public AioReadHandler(AsynchronousSocketChannel channel) { this.channel = channel; } //讀取到消息後的處理 @Override public void completed(Integer result, ByteBuffer attachment) { //若是條件成立,說明客戶端主動終止了TCP套接字,這時服務端終止就能夠了 if(result == -1) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } return; } //flip操做 attachment.flip(); byte[] message = new byte[attachment.remaining()]; attachment.get(message); try { System.out.println(result); String msg = new String(message,"UTF-8"); System.out.println("server accept message:"+msg); String responseStr = response(msg); //向客戶端發送消息 doWrite(responseStr); } catch (Exception e) { e.printStackTrace(); } } //發送消息 private void doWrite(String result) { byte[] bytes = result.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); //異步寫數據 channel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer attachment) { if(attachment.hasRemaining()){ channel.write(attachment,attachment,this); }else{ //讀取客戶端傳回的數據 ByteBuffer readBuffer = ByteBuffer.allocate(1024); //異步讀數據 channel.read(readBuffer,readBuffer, new AioReadHandler(channel)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } } }); } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { this.channel.close(); } catch (IOException e) { e.printStackTrace(); } } } /** * 類說明:響應網絡操做的處理器 */ public class AioServerHandler implements Runnable { public CountDownLatch latch; /*進行異步通訊的通道*/ public AsynchronousServerSocketChannel channel; public AioServerHandler(int port) { try { //建立服務端通道 channel = AsynchronousServerSocketChannel.open(); //綁定端口 channel.bind(new InetSocketAddress(port)); System.out.println("Server is start,port:"+port); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { latch = new CountDownLatch(1); //用於接收客戶端的鏈接,異步操做, // 須要實現了CompletionHandler接口的處理器處理和客戶端的鏈接操做 channel.accept(this,new AioAcceptHandler()); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } }
疑難點1:
怎麼理解這裏客戶端寫操做的處理器回調方法?
疑難點2:
Buffer:
查看源碼可看到幾個重要屬性:
capacity:表示分配的內存大小
position:相似指針類的索引,讀取或寫入的位置標識符,下一個可寫入的初始位置/下一個可讀取的初始位置
limit:可讀或可寫的範圍,小於等於capacity,當小於capacity,limit到capaticy的最大容量值的這段空間不予寫入是放一些初始化值的.
ByteBuffer能夠理解爲放在內存中的一個數組。
好比圖中一開始是寫入模式,寫入五個字節,地址爲0-4,position在5,調用flip方法後切換到讀模式,position變爲0即開始序列,limit變爲5,這樣就能夠buffer開頭開始讀取了.
應用場景:
能夠服務端用AIO模型,客戶端使用BIO簡化編程,本文的例子便可調試,啓動AioServer再啓動BioClient,通訊是沒問題的
AIO編程相對複雜,代碼中一些關鍵方法都有註釋,目前Linux下沒有真正意義上的AIO,其實是用了NIO裏面的epoll(true),底層原理仍是用了IO複用(NIO).windows實現了AIO,AIO是將來的方向,需待linux內核支持.