第一章:手動搭建I/O網絡通訊框架1:Socket和ServerSocket入門實戰,實現單聊html
第二章:手動搭建I/O網絡通訊框架2:BIO編程模型實現羣聊編程
第三章:手動搭建I/O網絡通訊框架3:NIO編程模型,升級改造聊天室安全
上一章講到的NIO編程模型比較主流,很是著名的Netty就是基於NIO編程模型的。這一章說的是AIO編程模型,是異步非阻塞的。雖然一樣實現的是聊天室功能,可是實現邏輯上稍微要比NIO和BIO複雜一點。不過理好總體脈絡,會好理解一些。首先仍是講講概念:服務器
BIO和NIO的區別是阻塞和非阻塞,而AIO表明的是異步IO。在此以前只提到了阻塞和非阻塞,沒有提到異步仍是同步。能夠用我在知乎上看到的一句話表示:【在處理 IO 的時候,阻塞和非阻塞都是同步 IO,只有使用了特殊的 API 纔是異步 IO】。這些「特殊的API」下面會講到。在說AIO以前,先總結一下阻塞非阻塞、異步同步的概念。網絡
阻塞和非阻塞,描述的是結果的請求。阻塞:在獲得結果以前就一直呆在那,啥也不幹,此時線程掛起,就如其名,線程被阻塞了。非阻塞:若是沒獲得結果就返回,等一會再去請求,直到獲得結果爲止。異步和同步,描述的是結果的發出,當調用方的請求進來。同步:在沒獲取到結果前就不返回給調用方,若是調用方是阻塞的,那麼調用方就會一直等着。若是調用方是非阻塞的,調用方就會先回去,等一會再來問問獲得結果沒。異步:調用方一來,若是是非阻塞的叫它先回去,待會有結果了再告訴你。若是是阻塞的,雖然異步會通知他,但他仍是要等着,實屬鐵憨憨。併發
CompletionHandler框架
在AIO編程模型中,經常使用的API,如connect、accept、read、write都是支持異步操做的。當調用這些方法時,能夠攜帶一個CompletionHandler參數,它會提供一些回調函數。這些回調函數包括:1.當這些操做成功時你須要怎麼作;2.若是這些操做失敗了你要這麼作。關於這個CompletionHandler參數,你只須要寫一個類實現CompletionHandler口,並實現裏面兩個方法就好了。異步
那如何在調用connect、accept、read、write這四個方法時,傳入CompletionHandler參數從而實現異步呢?下面分別舉例這四個方法的使用。socket
先說說Socket和ServerSocket,在NIO中,它們變成了通道,配合緩衝區,從而實現了非阻塞。而在AIO中它們變成了異步通道。也就是AsynchronousServerSocketChannel和AsynchronousSocketChannel,下面例子中對象名分別是serverSocket和socket.ide
accept:serverSocket.accept(attachment,handler)。handler就是實現了CompletionHandler接口並實現兩個回調函數的類,它具體怎麼寫能夠看下面的實戰代碼。attachment爲handler裏面可能須要用到的輔助數據,若是沒有就填null。
read:socket.read(buffer,attachment,handler)。buffer是緩衝區,用以存放讀取到的信息。後面兩個參數和accept同樣。
write:socket.write(buffer,attachment,handler)。和read參數同樣。
connect:socket.connect(address,attachment,handler)。address爲服務器的IP和端口,後面兩個參數與前幾個同樣。
Future
既然說到了異步操做,除了使用實現CompletionHandler接口的方式,不得不想到Future。客戶端邏輯較爲簡單,若是使用CompletionHandler的話代碼反而更復雜,因此下面的實戰客戶端代碼就會使用Future的方式。簡單來講,Future表示的是異步操做將來的結果,怎麼理解將來。好比,客戶端調用read方法獲取服務器發來得消息:
Future<Integer> readResult=clientChannel.read(buffer)
Integer是read()的返回類型,此時變量readResult實際上並不必定有數據,而是表示read()方法將來的結果,這時候readResult有兩個方法,isDone():返回boolean,查看程序是否完成處理,若是返回true,有結果了,這時候能夠經過get()獲取結果。若是你不事先判斷isDone()直接調用get()也行,只不過它是阻塞的。若是你不想阻塞,想在這期間作點什麼,就用isDone()。
還有一個問題:這些handler的方法是在哪一個線程執行的?serverSocket.accept這個方法確定是在主線程裏面調用的,而傳入的這些回調方法實際上是在其餘線程執行的。在AIO中,會有一個AsynchronousChannelGroup,它和AsynchronousServerSocketChannel是綁定在一塊兒的,它會爲這些異步通道提供系統資源,線程就算其中一種系統資源,因此爲了方便理解,咱們暫時能夠把他看做一個線程池,它會爲這些handler分配線程,而不是在主線程中去執行。
上面只說了些零碎的概念,爲了更好的理解,下面講一講大概的工做流程(主要針對服務器,客戶端邏輯較爲簡單,代碼註釋也比較少,能夠看前面幾章):
1.首先作準備工做。跟NIO同樣,先要建立好通道,只不過AIO是異步通道。而後建立好AsyncChannelGroup,能夠選擇自定義線程池。最後把AsyncServerSocket和AsyncChannelGroup綁定在一塊兒,這樣處於同一個AsyncChannelGroup裏的通道就能夠共享系統資源。
2.最後一步準備工做,建立好handler類,並實現接口和裏面兩個回調方法。(如圖:客戶端1對應的handler,裏面的回調方法會實現讀取消息和轉發消息的功能;serverSocket的handler裏的回調方法會實現accept功能。)
3.準備工做完成,當客戶端1鏈接請求進來,客戶端會立刻回去,ServerSocket的異步方法會在鏈接成功後把客戶端的SocketChannel存進在線用戶列表,並利用客戶端1的handler開始異步監聽客戶端1發送的消息。
4.當客戶端1發送消息時,若是上一步中的handler成功監聽到,就會回調成功後的回調方法,這個方法裏會把這個消息轉發給其餘客戶端。轉發完成後,接着利用handler監聽客戶端1發送的消息。
代碼一共有三個類:
ChatServer:功能基本上和上面講的工做流程差很少,還會有一些工具方法,都比較簡單,就很少說了,如:轉發消息,客戶端下線後從在線列表移除客戶端等。
ChatClient:基本和前兩章的BIO、NIO沒什麼區別,一個線程監聽用戶輸入信息併發送,主線程異步的讀取服務器信息。
UserInputHandler:監聽用戶輸入信息的線程。
ChatServer
public class ChatServer { //設置緩衝區字節大小 private static final int BUFFER = 1024; //聲明AsynchronousServerSocketChannel和AsynchronousChannelGroup private AsynchronousServerSocketChannel serverSocketChannel; private AsynchronousChannelGroup channelGroup; //在線用戶列表。爲了併發下的線程安全,因此使用CopyOnWriteArrayList //CopyOnWriteArrayList在寫時加鎖,讀時不加鎖,而本項目正好在轉發消息時須要頻繁讀取. //ClientHandler包含每一個客戶端的通道,類型選擇爲ClientHandler是爲了在write的時候調用每一個客戶端的handler private CopyOnWriteArrayList<ClientHandler> clientHandlerList; //字符和字符串互轉須要用到,規定編碼方式,避免中文亂碼 private Charset charset = Charset.forName("UTF-8"); //經過構造函數設置監聽端口 private int port; public ChatServer(int port) { this.port = port; clientHandlerList=new CopyOnWriteArrayList<>(); } public void start() { try { /** *建立一個線程池並把線程池和AsynchronousChannelGroup綁定,前面提到了AsynchronousChannelGroup包括一些系統資源,而線程就是其中一種。 *爲了方便理解咱們就暫且把它看成線程池,實際上並不止包含線程池。若是你須要本身選定線程池類型和數量,就能夠以下操做 *若是不須要自定義線程池類型和數量,能夠不用寫下面兩行代碼。 * */ ExecutorService executorService = Executors.newFixedThreadPool(10); channelGroup = AsynchronousChannelGroup.withThreadPool(executorService); serverSocketChannel=AsynchronousServerSocketChannel.open(channelGroup); serverSocketChannel.bind(new InetSocketAddress("127.0.0.1",port)); System.out.println("服務器啓動:端口【"+port+"】"); /** * AIO中accept能夠異步調用,就用上面說到的CompletionHandler方式 * 第一個參數是輔助參數,回調函數中可能會用上的,若是沒有就填null;第二個參數爲CompletionHandler接口的實現 * 這裏使用while和System.in.read()的緣由: * while是爲了讓服務器保持運行狀態,前面的NIO,BIO都有用到while無限循環來保持服務器運行,可是它們用的地方可能更好理解 * System.in.read()是阻塞式的調用,只是單純的避免無限循環而讓accept頻繁被調用,無實際業務功能。 */ while (true) { serverSocketChannel.accept(null, new AcceptHandler()); System.in.read(); } } catch (IOException e) { e.printStackTrace(); }finally { if(serverSocketChannel!=null){ try { serverSocketChannel.close(); } catch (IOException e) { e.printStackTrace(); } } } } //AsynchronousSocketChannel爲accept返回的類型,Object爲輔助參數類型,沒有就填Object private class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel,Object>{ //若是成功,執行的回調方法 @Override public void completed(AsynchronousSocketChannel clientChannel, Object attachment) { //若是服務器沒關閉,在接收完當前客戶端的請求後,再次調用,以接着接收其餘客戶端的請求 if(serverSocketChannel.isOpen()){ serverSocketChannel.accept(null,this); } //若是客戶端的channel沒有關閉 if(clientChannel!=null&&clientChannel.isOpen()){ //這個就是異步read和write要用到的handler,並傳入當前客戶端的channel ClientHandler handler=new ClientHandler(clientChannel); //把新用戶添加到在線用戶列表裏 clientHandlerList.add(handler); System.out.println(getPort(clientChannel)+"上線啦!"); ByteBuffer buffer=ByteBuffer.allocate(BUFFER); //異步調用read,第一個buffer是存放讀到數據的容器,第二個是輔助參數。 //由於真正的處理是在handler裏的回調函數進行的,輔助參數會直接傳進回調函數,因此爲了方便使用,buffer就看成輔助參數 clientChannel.read(buffer,buffer,handler); } } //若是失敗,執行的回調方法 @Override public void failed(Throwable exc, Object attachment) { System.out.println("鏈接失敗"+exc); } } private class ClientHandler implements CompletionHandler<Integer, ByteBuffer>{ private AsynchronousSocketChannel clientChannel; public ClientHandler(AsynchronousSocketChannel clientChannel) { this.clientChannel = clientChannel; } @Override public void completed(Integer result, ByteBuffer buffer) { if(buffer!=null){ //若是read返回的結果小於等於0,而buffer不爲空,說明客戶端通道出現異常,作下線操做 if(result<=0){ removeClient(this); }else { //轉換buffer讀寫模式並獲取消息 buffer.flip(); String msg=String.valueOf(charset.decode(buffer)); //在服務器上打印客戶端發來的消息 System.out.println(getPort(clientChannel)+msg); //把消息轉發給其餘客戶端 sendMessage(clientChannel,getPort(clientChannel)+msg); buffer=ByteBuffer.allocate(BUFFER); //若是用戶輸入的是退出,就從在線列表裏移除。不然接着監聽這個用戶發送消息 if(msg.equals("quit")) removeClient(this); else clientChannel.read(buffer, buffer, this); } } } @Override public void failed(Throwable exc, ByteBuffer attachment) { System.out.println("客戶端讀寫異常:"+exc); } } //轉發消息的方法 private void sendMessage(AsynchronousSocketChannel clientChannel,String msg){ for(ClientHandler handler:clientHandlerList){ if(!handler.clientChannel.equals(clientChannel)){ ByteBuffer buffer=charset.encode(msg); //write不須要buffer當輔助參數,由於寫到客戶端的通道就完事了,而讀還須要回調函數轉發給其餘客戶端。 handler.clientChannel.write(buffer,null,handler); } } } //根據客戶端channel獲取對應端口號的方法 private String getPort(AsynchronousSocketChannel clientChannel){ try { InetSocketAddress address=(InetSocketAddress)clientChannel.getRemoteAddress(); return "客戶端["+address.getPort()+"]:"; } catch (IOException e) { e.printStackTrace(); return "客戶端[Undefined]:"; } } //移除客戶端 private void removeClient(ClientHandler handler){ clientHandlerList.remove(handler); System.out.println(getPort(handler.clientChannel)+"斷開鏈接..."); if(handler.clientChannel!=null){ try { handler.clientChannel.close(); } catch (IOException e) { e.printStackTrace(); } } } public static void main(String[] args) { new ChatServer(8888).start(); } }
ChatClient
public class ChatClient { private static final int BUFFER = 1024; private AsynchronousSocketChannel clientChannel; private Charset charset = Charset.forName("UTF-8"); private String host; private int port; //設置服務器IP和端口 public ChatClient(String host, int port) { this.host = host; this.port = port; } public void start() { try { clientChannel = AsynchronousSocketChannel.open(); //鏈接服務器 Future<Void> future = clientChannel.connect(new InetSocketAddress(host, port)); future.get(); //新建一個線程去等待用戶輸入 new Thread(new UserInputHandler(this)).start(); ByteBuffer buffer=ByteBuffer.allocate(BUFFER); //無限循環讓客戶端保持運行狀態 while (true){ //獲取服務器發來的消息並存入到buffer Future<Integer> read=clientChannel.read(buffer); if(read.get()>0){ buffer.flip(); String msg=String.valueOf(charset.decode(buffer)); System.out.println(msg); buffer.clear(); }else { //若是read的結果小於等於0說明和服務器鏈接出現異常 System.out.println("服務器斷開鏈接"); if(clientChannel!=null){ clientChannel.close(); } System.exit(-1); } } } catch (IOException | InterruptedException | ExecutionException e) { e.printStackTrace(); } } public void send(String msg) { if (msg.isEmpty()) return; ByteBuffer buffer = charset.encode(msg); Future<Integer> write=clientChannel.write(buffer); try { //獲取發送結果,若是get方法發生異常說明發送失敗 write.get(); } catch (ExecutionException|InterruptedException e) { System.out.println("消息發送失敗"); e.printStackTrace(); } } public static void main(String[] args) { new ChatClient("127.0.0.1",8888).start(); } }
UserInputHandler
public class UserInputHandler implements Runnable { ChatClient client; public UserInputHandler(ChatClient chatClient) { this.client=chatClient; } @Override public void run() { BufferedReader read=new BufferedReader( new InputStreamReader(System.in) ); while (true){ try { String input=read.readLine(); client.send(input); if(input.equals("quit")) break; } catch (IOException e) { e.printStackTrace(); } } } }
運行測試: