一.傳統的BIO編程java
1.網絡編程的基本模型是C/S模型,即兩個進程間的通訊。express
服務端提供IP和監聽端口,客戶端經過鏈接操做想服務端監聽的地址發起鏈接請求,經過三次握手鍊接,若是鏈接成功創建,雙方就能夠經過套接字進行通訊。編程
傳統的同步阻塞模型開發中,ServerSocket負責綁定IP地址,啓動監聽端口;Socket負責發起鏈接操做。鏈接成功後,雙方經過輸入和輸出流進行同步阻塞式通訊。數組
簡單的描述一下BIO的服務端通訊模型:採用BIO通訊模型的服務端,一般由一個獨立的Acceptor線程負責監聽客戶端的鏈接,它接收到客戶端鏈接請求以後爲每一個客戶端建立一個新的線程進行鏈路處理沒處理完成後,經過輸出流返回應答給客戶端,線程銷燬。即典型的一請求一應答通宵模型。緩存
傳統BIO通訊模型圖:服務器
該模型最大的問題就是缺少彈性伸縮能力,當客戶端併發訪問量增長後,服務端的線程個數和客戶端併發訪問數呈1:1的正比關係,Java中的線程也是比較寶貴的系統資源,線程數量快速膨脹後,系統的性能將急劇降低,隨着訪問量的繼續增大,系統最終就死-掉-了。網絡
同步阻塞式I/O建立的Server源碼:數據結構
package com.anxpp.io.calculator.bio; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; /** * BIO服務端源碼 * [@author](https://my.oschina.net/arthor) yangtao__anxpp.com * [@version](https://my.oschina.net/u/931210) 1.0 */ public final class ServerNormal { //默認的端口號 private static int DEFAULT_PORT = 12345; //單例的ServerSocket private static ServerSocket server; //根據傳入參數設置監聽端口,若是沒有參數調用如下方法並使用默認值 public static void start() throws IOException{ //使用默認值 start(DEFAULT_PORT); } //這個方法不會被大量併發訪問,不太須要考慮效率,直接進行方法同步就好了 public synchronized static void start(int port) throws IOException{ if(server != null) return; try{ //經過構造函數建立ServerSocket //若是端口合法且空閒,服務端就監聽成功 server = new ServerSocket(port); System.out.println("服務器已啓動,端口號:" + port); //經過無線循環監聽客戶端鏈接 //若是沒有客戶端接入,將阻塞在accept操做上。 while(true){ Socket socket = server.accept(); //當有新的客戶端接入時,會執行下面的代碼 //而後建立一個新的線程處理這條Socket鏈路 new Thread(new ServerHandler(socket)).start(); } }finally{ //一些必要的清理工做 if(server != null){ System.out.println("服務器已關閉。"); server.close(); server = null; } } } }
客戶端消息處理線程ServerHandler源碼:併發
package com.anxpp.io.calculator.bio; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; import com.anxpp.io.utils.Calculator; /** * 客戶端線程 * [@author](https://my.oschina.net/arthor) yangtao__anxpp.com * 用於處理一個客戶端的Socket鏈路 */ public class ServerHandler implements Runnable{ private Socket socket; public ServerHandler(Socket socket) { this.socket = socket; } [@Override](https://my.oschina.net/u/1162528) public void run() { BufferedReader in = null; PrintWriter out = null; try{ in = new BufferedReader(new InputStreamReader(socket.getInputStream())); out = new PrintWriter(socket.getOutputStream(),true); String expression; String result; while(true){ //經過BufferedReader讀取一行 //若是已經讀到輸入流尾部,返回null,退出循環 //若是獲得非空值,就嘗試計算結果並返回 if((expression = in.readLine())==null) break; System.out.println("服務器收到消息:" + expression); try{ result = Calculator.cal(expression).toString(); }catch(Exception e){ result = "計算錯誤:" + e.getMessage(); } out.println(result); } }catch(Exception e){ e.printStackTrace(); }finally{ //一些必要的清理工做 if(in != null){ try { in.close(); } catch (IOException e) { e.printStackTrace(); } in = null; } if(out != null){ out.close(); out = null; } if(socket != null){ try { socket.close(); } catch (IOException e) { e.printStackTrace(); } socket = null; } } } }
同步阻塞式I/O建立的Client源碼:dom
package com.anxpp.io.calculator.bio; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; /** * 阻塞式I/O建立的客戶端 * [@author](https://my.oschina.net/arthor) yangtao__anxpp.com * @version 1.0 */ public class Client { //默認的端口號 private static int DEFAULT_SERVER_PORT = 12345; private static String DEFAULT_SERVER_IP = "127.0.0.1"; public static void send(String expression){ send(DEFAULT_SERVER_PORT,expression); } public static void send(int port,String expression){ System.out.println("算術表達式爲:" + expression); Socket socket = null; BufferedReader in = null; PrintWriter out = null; try{ socket = new Socket(DEFAULT_SERVER_IP,port); in = new BufferedReader(new InputStreamReader(socket.getInputStream())); out = new PrintWriter(socket.getOutputStream(),true); out.println(expression); System.out.println("___結果爲:" + in.readLine()); }catch(Exception e){ e.printStackTrace(); }finally{ //一下必要的清理工做 if(in != null){ try { in.close(); } catch (IOException e) { e.printStackTrace(); } in = null; } if(out != null){ out.close(); out = null; } if(socket != null){ try { socket.close(); } catch (IOException e) { e.printStackTrace(); } socket = null; } } } }
測試代碼,爲了方便在控制檯看輸出結果,放到同一個程序(jvm)中運行:
package com.anxpp.io.calculator.bio; import java.io.IOException; import java.util.Random; /** * 測試方法 * @author yangtao__anxpp.com * @version 1.0 */ public class Test { //測試主方法 public static void main(String[] args) throws InterruptedException { //運行服務器 new Thread(new Runnable() { @Override public void run() { try { ServerBetter.start(); } catch (IOException e) { e.printStackTrace(); } } }).start(); //避免客戶端先於服務器啓動前執行代碼 Thread.sleep(100); //運行客戶端 char operators[] = {'+','-','*','/'}; Random random = new Random(System.currentTimeMillis()); new Thread(new Runnable() { @SuppressWarnings("static-access") @Override public void run() { while(true){ //隨機產生算術表達式 String expression = random.nextInt(10)+""+operators[random.nextInt(4)]+(random.nextInt(10)+1); Client.send(expression); try { Thread.currentThread().sleep(random.nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); } }
從以上代碼,很容易看出,BIO主要的問題在於每當有一個新的客戶端請求接入時,服務端必須建立一個新的線程來處理這條鏈路,在須要知足高性能、高併發的場景是無法應用的(大量建立新的線程會嚴重影響服務器性能,甚至罷工)。
2.僞異步I/O編程
爲了改進這種一鏈接一線程的模型,咱們可使用線程池來管理這些線程(須要瞭解更多請參考前面提供的文章),實現1個或多個線程處理N個客戶端的模型(可是底層仍是使用的同步阻塞I/O),一般被稱爲「僞異步I/O模型「。
僞異步I/O模型圖:
實現很簡單,咱們只須要將新建線程的地方,交給線程池管理便可,只須要改動剛剛的Server代碼便可:
package com.anxpp.io.calculator.bio; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * BIO服務端源碼__僞異步I/O * @author yangtao__anxpp.com * @version 1.0 */ public final class ServerBetter { //默認的端口號 private static int DEFAULT_PORT = 12345; //單例的ServerSocket private static ServerSocket server; //線程池 懶漢式的單例 private static ExecutorService executorService = Executors.newFixedThreadPool(60); //根據傳入參數設置監聽端口,若是沒有參數調用如下方法並使用默認值 public static void start() throws IOException{ //使用默認值 start(DEFAULT_PORT); } //這個方法不會被大量併發訪問,不太須要考慮效率,直接進行方法同步就好了 public synchronized static void start(int port) throws IOException{ if(server != null) return; try{ //經過構造函數建立ServerSocket //若是端口合法且空閒,服務端就監聽成功 server = new ServerSocket(port); System.out.println("服務器已啓動,端口號:" + port); //經過無線循環監聽客戶端鏈接 //若是沒有客戶端接入,將阻塞在accept操做上。 while(true){ Socket socket = server.accept(); //當有新的客戶端接入時,會執行下面的代碼 //而後建立一個新的線程處理這條Socket鏈路 executorService.execute(new ServerHandler(socket)); } }finally{ //一些必要的清理工做 if(server != null){ System.out.println("服務器已關閉。"); server.close(); server = null; } } } }
測試運行結果是同樣的。
咱們知道,若是使用CachedThreadPool線程池(不限制線程數量,若是不清楚請參考文首提供的文章),其實除了能自動幫咱們管理線程(複用),看起來也就像是1:1的客戶端:線程數模型,而使用FixedThreadPool咱們就有效的控制了線程的最大數量,保證了系統有限的資源的控制,實現了N:M的僞異步I/O模型。
可是,正由於限制了線程數量,若是發生大量併發請求,超過最大數量的線程就只能等待,直到線程池中的有空閒的線程能夠被複用。而對Socket的輸入流就行讀取時,會一直阻塞,直到發生:
有數據可讀
可用數據以及讀取完畢
發生空指針或I/O異常
因此在讀取數據較慢時(好比數據量大、網絡傳輸慢等),大量併發的狀況下,其餘接入的消息,只能一直等待,這就是最大的弊端。
然後面即將介紹的NIO,就能解決這個難題。
二.NIO
1.JDK 1.4中的java.nio.*包中引入新的Java I/O庫,其目的是提升速度。實際上,「舊」的I/O包已經使用NIO從新實現過,即便咱們不顯式的使用NIO編程,也能從中受益。速度的提升在文件I/O和網絡I/O中均可能會發生,但本文只討論後者。
2.NIO咱們通常認爲是New I/O(也是官方的叫法),由於它是相對於老的I/O類庫新增的(其實在JDK 1.4中就已經被引入了,但這個名詞還會繼續用好久,即便它們在如今看來已是「舊」的了,因此也提示咱們在命名時,須要好好考慮),作了很大的改變。但民間跟多人稱之爲Non-block I/O,即非阻塞I/O,由於這樣叫,更能體現它的特色。而下文中的NIO,不是指整個新的I/O庫,而是非阻塞I/O。
NIO提供了與傳統BIO模型中的Socket和ServerSocket相對應的SocketChannel和ServerSocketChannel兩種不一樣的套接字通道實現。
新增的着兩種通道都支持阻塞和非阻塞兩種模式。
阻塞模式使用就像傳統中的支持同樣,比較簡單,可是性能和可靠性都很差;非阻塞模式正好與之相反。
對於低負載、低併發的應用程序,可使用同步阻塞I/O來提高開發速率和更好的維護性;對於高負載、高併發的(網絡)應用,應使用NIO的非阻塞模式來開發。
下面會先對基礎知識進行介紹。
3.緩衝區 Buffer
Buffer是一個對象,包含一些要寫入或者讀出的數據。
在NIO庫中,全部數據都是用緩衝區處理的。在讀取數據時,它是直接讀到緩衝區中的;在寫入數據時,也是寫入到緩衝區中。任什麼時候候訪問NIO中的數據,都是經過緩衝區進行操做。
緩衝區其實是一個數組,並提供了對數據結構化訪問以及維護讀寫位置等信息。
具體的緩存區有這些:ByteBuffe、CharBuffer、 ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer。他們實現了相同的接口:Buffer。
4.通道 Channel
咱們對數據的讀取和寫入要經過Channel,它就像水管同樣,是一個通道。通道不一樣於流的地方就是通道是雙向的,能夠用於讀、寫和同時讀寫操做。
底層的操做系統的通道通常都是全雙工的,因此全雙工的Channel比流能更好的映射底層操做系統的API。
Channel主要分兩大類:
SelectableChannel:用戶網絡讀寫
FileChannel:用於文件操做
後面代碼會涉及的ServerSocketChannel和SocketChannel都是SelectableChannel的子類。
5.多路複用器 Selector
Selector是Java NIO 編程的基礎。
Selector提供選擇已經就緒的任務的能力:Selector會不斷輪詢註冊在其上的Channel,若是某個Channel上面發生讀或者寫事件,這個Channel就處於就緒狀態,會被Selector輪詢出來,而後經過SelectionKey能夠獲取就緒Channel的集合,進行後續的I/O操做。
一個Selector能夠同時輪詢多個Channel,由於JDK使用了epoll()代替傳統的select實現,因此沒有最大鏈接句柄1024/2048的限制。因此,只須要一個線程負責Selector的輪詢,就能夠接入成千上萬的客戶端。
6.NIO服務端
代碼比傳統的Socket編程看起來要複雜很多。
直接貼代碼吧,以註釋的形式給出代碼說明。
NIO建立的Server源碼:
package com.anxpp.io.calculator.nio; public class Server { private static int DEFAULT_PORT = 12345; private static ServerHandle serverHandle; public static void start(){ start(DEFAULT_PORT); } public static synchronized void start(int port){ if(serverHandle!=null) serverHandle.stop(); serverHandle = new ServerHandle(port); new Thread(serverHandle,"Server").start(); } public static void main(String[] args){ start(); } }
ServerHandle:
package com.anxpp.io.calculator.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; import com.anxpp.io.utils.Calculator; /** * NIO服務端 * @author yangtao__anxpp.com * @version 1.0 */ public class ServerHandle implements Runnable{ private Selector selector; private ServerSocketChannel serverChannel; private volatile boolean started; /** * 構造方法 * @param port 指定要監聽的端口號 */ public ServerHandle(int port) { try{ //建立選擇器 selector = Selector.open(); //打開監聽通道 serverChannel = ServerSocketChannel.open(); //若是爲 true,則此通道將被置於阻塞模式;若是爲 false,則此通道將被置於非阻塞模式 serverChannel.configureBlocking(false);//開啓非阻塞模式 //綁定端口 backlog設爲1024 serverChannel.socket().bind(new InetSocketAddress(port),1024); //監聽客戶端鏈接請求 serverChannel.register(selector, SelectionKey.OP_ACCEPT); //標記服務器已開啓 started = true; System.out.println("服務器已啓動,端口號:" + port); }catch(IOException e){ e.printStackTrace(); System.exit(1); } } public void stop(){ started = false; } @Override public void run() { //循環遍歷selector while(started){ try{ //不管是否有讀寫事件發生,selector每隔1s被喚醒一次 selector.select(1000); //阻塞,只有當至少一個註冊的事件發生的時候纔會繼續. // selector.select(); Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> it = keys.iterator(); SelectionKey key = null; while(it.hasNext()){ key = it.next(); it.remove(); try{ handleInput(key); }catch(Exception e){ if(key != null){ key.cancel(); if(key.channel() != null){ key.channel().close(); } } } } }catch(Throwable t){ t.printStackTrace(); } } //selector關閉後會自動釋放裏面管理的資源 if(selector != null) try{ selector.close(); }catch (Exception e) { e.printStackTrace(); } } private void handleInput(SelectionKey key) throws IOException{ if(key.isValid()){ //處理新接入的請求消息 if(key.isAcceptable()){ ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); //經過ServerSocketChannel的accept建立SocketChannel實例 //完成該操做意味着完成TCP三次握手,TCP物理鏈路正式創建 SocketChannel sc = ssc.accept(); //設置爲非阻塞的 sc.configureBlocking(false); //註冊爲讀 sc.register(selector, SelectionKey.OP_READ); } //讀消息 if(key.isReadable()){ SocketChannel sc = (SocketChannel) key.channel(); //建立ByteBuffer,並開闢一個1M的緩衝區 ByteBuffer buffer = ByteBuffer.allocate(1024); //讀取請求碼流,返回讀取到的字節數 int readBytes = sc.read(buffer); //讀取到字節,對字節進行編解碼 if(readBytes>0){ //將緩衝區當前的limit設置爲position=0,用於後續對緩衝區的讀取操做 buffer.flip(); //根據緩衝區可讀字節數建立字節數組 byte[] bytes = new byte[buffer.remaining()]; //將緩衝區可讀字節數組複製到新建的數組中 buffer.get(bytes); String expression = new String(bytes,"UTF-8"); System.out.println("服務器收到消息:" + expression); //處理數據 String result = null; try{ result = Calculator.cal(expression).toString(); }catch(Exception e){ result = "計算錯誤:" + e.getMessage(); } //發送應答消息 doWrite(sc,result); } //沒有讀取到字節 忽略 // else if(readBytes==0); //鏈路已經關閉,釋放資源 else if(readBytes<0){ key.cancel(); sc.close(); } } } } //異步發送應答消息 private void doWrite(SocketChannel channel,String response) throws IOException{ //將消息編碼爲字節數組 byte[] bytes = response.getBytes(); //根據數組容量建立ByteBuffer ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); //將字節數組複製到緩衝區 writeBuffer.put(bytes); //flip操做 writeBuffer.flip(); //發送緩衝區的字節數組 channel.write(writeBuffer); //****此處不含處理「寫半包」的代碼 } }
能夠看到,建立NIO服務端的主要步驟以下:
1.打開ServerSocketChannel,監聽客戶端鏈接 2.綁定監聽端口,設置鏈接爲非阻塞模式 3.建立Reactor線程,建立多路複用器並啓動線程 4.將ServerSocketChannel註冊到Reactor線程中的5.Selector上,監聽ACCEPT事件 6.Selector輪詢準備就緒的key 7.Selector監聽到新的客戶端接入,處理新的接入請求,完成TCP三次握手,簡歷物理鏈路 8.設置客戶端鏈路爲非阻塞模式 9.將新接入的客戶端鏈接註冊到Reactor線程的Selector上,監聽讀操做,讀取客戶端發送的網絡消息 10.異步讀取客戶端消息到緩衝區 11.對Buffer編解碼,處理半包消息,將解碼成功的消息封裝成Task 12.將應答消息編碼爲Buffer,調用SocketChannel的write將消息異步發送給客戶端
由於應答消息的發送,SocketChannel也是異步非阻塞的,因此不能保證一次能吧須要發送的數據發送完,此時就會出現寫半包的問題。咱們須要註冊寫操做,不斷輪詢Selector將沒有發送完的消息發送完畢,而後經過Buffer的hasRemain()方法判斷消息是否發送完成。
7.NIO客戶端
仍是直接上代碼吧,過程也不須要太多解釋了,跟服務端代碼有點相似。
Client:
package com.anxpp.io.calculator.nio; public class Client { private static String DEFAULT_HOST = "127.0.0.1"; private static int DEFAULT_PORT = 12345; private static ClientHandle clientHandle; public static void start(){ start(DEFAULT_HOST,DEFAULT_PORT); } public static synchronized void start(String ip,int port){ if(clientHandle!=null) clientHandle.stop(); clientHandle = new ClientHandle(ip,port); new Thread(clientHandle,"Server").start(); } //向服務器發送消息 public static boolean sendMsg(String msg) throws Exception{ if(msg.equals("q")) return false; clientHandle.sendMsg(msg); return true; } public static void main(String[] args){ start(); } }
ClientHandle:
package com.anxpp.io.calculator.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; /** * NIO客戶端 * @author yangtao__anxpp.com * @version 1.0 */ public class ClientHandle implements Runnable{ private String host; private int port; private Selector selector; private SocketChannel socketChannel; private volatile boolean started; public ClientHandle(String ip,int port) { this.host = ip; this.port = port; try{ //建立選擇器 selector = Selector.open(); //打開監聽通道 socketChannel = SocketChannel.open(); //若是爲 true,則此通道將被置於阻塞模式;若是爲 false,則此通道將被置於非阻塞模式 socketChannel.configureBlocking(false);//開啓非阻塞模式 started = true; }catch(IOException e){ e.printStackTrace(); System.exit(1); } } public void stop(){ started = false; } @Override public void run() { try{ doConnect(); }catch(IOException e){ e.printStackTrace(); System.exit(1); } //循環遍歷selector while(started){ try{ //不管是否有讀寫事件發生,selector每隔1s被喚醒一次 selector.select(1000); //阻塞,只有當至少一個註冊的事件發生的時候纔會繼續. // selector.select(); Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> it = keys.iterator(); SelectionKey key = null; while(it.hasNext()){ key = it.next(); it.remove(); try{ handleInput(key); }catch(Exception e){ if(key != null){ key.cancel(); if(key.channel() != null){ key.channel().close(); } } } } }catch(Exception e){ e.printStackTrace(); System.exit(1); } } //selector關閉後會自動釋放裏面管理的資源 if(selector != null) try{ selector.close(); }catch (Exception e) { e.printStackTrace(); } } private void handleInput(SelectionKey key) throws IOException{ if(key.isValid()){ SocketChannel sc = (SocketChannel) key.channel(); if(key.isConnectable()){ if(sc.finishConnect()); else System.exit(1); } //讀消息 if(key.isReadable()){ //建立ByteBuffer,並開闢一個1M的緩衝區 ByteBuffer buffer = ByteBuffer.allocate(1024); //讀取請求碼流,返回讀取到的字節數 int readBytes = sc.read(buffer); //讀取到字節,對字節進行編解碼 if(readBytes>0){ //將緩衝區當前的limit設置爲position=0,用於後續對緩衝區的讀取操做 buffer.flip(); //根據緩衝區可讀字節數建立字節數組 byte[] bytes = new byte[buffer.remaining()]; //將緩衝區可讀字節數組複製到新建的數組中 buffer.get(bytes); String result = new String(bytes,"UTF-8"); System.out.println("客戶端收到消息:" + result); } //沒有讀取到字節 忽略 // else if(readBytes==0); //鏈路已經關閉,釋放資源 else if(readBytes<0){ key.cancel(); sc.close(); } } } } //異步發送消息 private void doWrite(SocketChannel channel,String request) throws IOException{ //將消息編碼爲字節數組 byte[] bytes = request.getBytes(); //根據數組容量建立ByteBuffer ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); //將字節數組複製到緩衝區 writeBuffer.put(bytes); //flip操做 writeBuffer.flip(); //發送緩衝區的字節數組 channel.write(writeBuffer); //****此處不含處理「寫半包」的代碼 } private void doConnect() throws IOException{ if(socketChannel.connect(new InetSocketAddress(host,port))); else socketChannel.register(selector, SelectionKey.OP_CONNECT); } public void sendMsg(String msg) throws Exception{ socketChannel.register(selector, SelectionKey.OP_READ); doWrite(socketChannel, msg); } }
三.AIO編程
NIO 2.0引入了新的異步通道的概念,並提供了異步文件通道和異步套接字通道的實現。
異步的套接字通道時真正的異步非阻塞I/O,對應於UNIX網絡編程中的事件驅動I/O(AIO)。他不須要過多的Selector對註冊的通道進行輪詢便可實現異步讀寫,從而簡化了NIO的編程模型。
直接上代碼吧。
Server端代碼
package com.anxpp.io.calculator.aio.server; /** * AIO服務端 * @author yangtao__anxpp.com * @version 1.0 */ public class Server { private static int DEFAULT_PORT = 12345; private static AsyncServerHandler serverHandle; public volatile static long clientCount = 0; public static void start(){ start(DEFAULT_PORT); } public static synchronized void start(int port){ if(serverHandle!=null) return; serverHandle = new AsyncServerHandler(port); new Thread(serverHandle,"Server").start(); } public static void main(String[] args){ Server.start(); } }
AsyncServerHandler:
package com.anxpp.io.calculator.aio.server; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.AsynchronousServerSocketChannel; import java.util.concurrent.CountDownLatch; public class AsyncServerHandler implements Runnable { public CountDownLatch latch; public AsynchronousServerSocketChannel channel; public AsyncServerHandler(int port) { try { //建立服務端通道 channel = AsynchronousServerSocketChannel.open(); //綁定端口 channel.bind(new InetSocketAddress(port)); System.out.println("服務器已啓動,端口號:" + port); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { //CountDownLatch初始化 //它的做用:在完成一組正在執行的操做以前,容許當前的現場一直阻塞 //此處,讓現場在此阻塞,防止服務端執行完成後退出 //也可使用while(true)+sleep //生成環境就不須要擔憂這個問題,覺得服務端是不會退出的 latch = new CountDownLatch(1); //用於接收客戶端的鏈接 channel.accept(this,new AcceptHandler()); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } }
AcceptHandler:
package com.anxpp.io.calculator.aio.server; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; //做爲handler接收客戶端鏈接 public class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AsyncServerHandler> { @Override public void completed(AsynchronousSocketChannel channel,AsyncServerHandler serverHandler) { //繼續接受其餘客戶端的請求 Server.clientCount++; System.out.println("鏈接的客戶端數:" + Server.clientCount); serverHandler.channel.accept(serverHandler, this); //建立新的Buffer ByteBuffer buffer = ByteBuffer.allocate(1024); //異步讀 第三個參數爲接收消息回調的業務Handler channel.read(buffer, buffer, new ReadHandler(channel)); } @Override public void failed(Throwable exc, AsyncServerHandler serverHandler) { exc.printStackTrace(); serverHandler.latch.countDown(); } }
ReadHandler:
package com.anxpp.io.calculator.aio.server; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import com.anxpp.io.utils.Calculator; public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> { //用於讀取半包消息和發送應答 private AsynchronousSocketChannel channel; public ReadHandler(AsynchronousSocketChannel channel) { this.channel = channel; } //讀取到消息後的處理 @Override public void completed(Integer result, ByteBuffer attachment) { //flip操做 attachment.flip(); //根據 byte[] message = new byte[attachment.remaining()]; attachment.get(message); try { String expression = new String(message, "UTF-8"); System.out.println("服務器收到消息: " + expression); String calrResult = null; try{ calrResult = Calculator.cal(expression).toString(); }catch(Exception e){ calrResult = "計算錯誤:" + e.getMessage(); } //向客戶端發送消息 doWrite(calrResult); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } //發送消息 private void doWrite(String result) { byte[] bytes = result.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); //異步寫數據 參數與前面的read同樣 channel.write(writeBuffer, writeBuffer,new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer buffer) { //若是沒有發送完,就繼續發送直到完成 if (buffer.hasRemaining()) channel.write(buffer, buffer, this); else{ //建立新的Buffer ByteBuffer readBuffer = ByteBuffer.allocate(1024); //異步讀 第三個參數爲接收消息回調的業務Handler channel.read(readBuffer, readBuffer, new ReadHandler(channel)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { channel.close(); } catch (IOException e) { } } }); } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { this.channel.close(); } catch (IOException e) { e.printStackTrace(); } } }
OK,這樣就已經完成了,其實提及來也簡單,雖然代碼感受不少,可是API比NIO的使用起來真的簡單多了,主要就是監聽、讀、寫等各類CompletionHandler。此處本應有一個WriteHandler的,確實,咱們在ReadHandler中,以一個匿名內部類實現了它。
下面看客戶端代碼。
Client:
package com.anxpp.io.calculator.aio.client; import java.util.Scanner; public class Client { private static String DEFAULT_HOST = "127.0.0.1"; private static int DEFAULT_PORT = 12345; private static AsyncClientHandler clientHandle; public static void start(){ start(DEFAULT_HOST,DEFAULT_PORT); } public static synchronized void start(String ip,int port){ if(clientHandle!=null) return; clientHandle = new AsyncClientHandler(ip,port); new Thread(clientHandle,"Client").start(); } //向服務器發送消息 public static boolean sendMsg(String msg) throws Exception{ if(msg.equals("q")) return false; clientHandle.sendMsg(msg); return true; } @SuppressWarnings("resource") public static void main(String[] args) throws Exception{ Client.start(); System.out.println("請輸入請求消息:"); Scanner scanner = new Scanner(System.in); while(Client.sendMsg(scanner.nextLine())); } }
AsyncClientHandler:
package com.anxpp.io.calculator.aio.client; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class AsyncClientHandler implements CompletionHandler<Void, AsyncClientHandler>, Runnable { private AsynchronousSocketChannel clientChannel; private String host; private int port; private CountDownLatch latch; public AsyncClientHandler(String host, int port) { this.host = host; this.port = port; try { //建立異步的客戶端通道 clientChannel = AsynchronousSocketChannel.open(); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { //建立CountDownLatch等待 latch = new CountDownLatch(1); //發起異步鏈接操做,回調參數就是這個類自己,若是鏈接成功會回調completed方法 clientChannel.connect(new InetSocketAddress(host, port), this, this); try { latch.await(); } catch (InterruptedException e1) { e1.printStackTrace(); } try { clientChannel.close(); } catch (IOException e) { e.printStackTrace(); } } //鏈接服務器成功 //意味着TCP三次握手完成 @Override public void completed(Void result, AsyncClientHandler attachment) { System.out.println("客戶端成功鏈接到服務器..."); } //鏈接服務器失敗 @Override public void failed(Throwable exc, AsyncClientHandler attachment) { System.err.println("鏈接服務器失敗..."); exc.printStackTrace(); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { e.printStackTrace(); } } //向服務器發送消息 public void sendMsg(String msg){ byte[] req = msg.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); writeBuffer.put(req); writeBuffer.flip(); //異步寫 clientChannel.write(writeBuffer, writeBuffer,new WriteHandler(clientChannel, latch)); } }
WriteHandler:
package com.anxpp.io.calculator.aio.client; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class WriteHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public WriteHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) { this.clientChannel = clientChannel; this.latch = latch; } @Override public void completed(Integer result, ByteBuffer buffer) { //完成所有數據的寫入 if (buffer.hasRemaining()) { clientChannel.write(buffer, buffer, this); } else { //讀取數據 ByteBuffer readBuffer = ByteBuffer.allocate(1024); clientChannel.read(readBuffer,readBuffer,new ReadHandler(clientChannel, latch)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { System.err.println("數據發送失敗..."); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { } } }
ReadHandler:
package com.anxpp.io.calculator.aio.client; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public ReadHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) { this.clientChannel = clientChannel; this.latch = latch; } @Override public void completed(Integer result,ByteBuffer buffer) { buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String body; try { body = new String(bytes,"UTF-8"); System.out.println("客戶端收到結果:"+ body); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc,ByteBuffer attachment) { System.err.println("數據讀取失敗..."); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { } } }
咱們能夠在控制檯輸入咱們須要計算的算數字符串,服務器就會返回結果,固然,咱們也能夠運行大量的客戶端,都是沒有問題的,覺得此處設計爲單例客戶端,因此也就沒有演示大量客戶端併發。
讀者能夠本身修改Client類,而後開闢大量線程,並使用構造方法建立不少的客戶端測試。
四.各類I/O的對比