一:socket又稱「套接字」,至關於「排插」,一一對應,一個客戶端對應一個服務端,原始的socket通訊爲,每當一個客戶端socket新接入,服務端serverSocket就得就得新建一個線程。缺點:客戶端一多就容易搞崩服務端。(爲傳統的BIO編程)java
例子1:編程
服務端代碼:數組
package bhz.bio; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; // 服務端 public class Server { final static int PROT = 8765; public static void main(String[] args) { ServerSocket server = null; try { server = new ServerSocket(PROT); System.out.println(" server start .. "); //進行阻塞 Socket socket = server.accept(); //新建一個線程執行客戶端的任務:每來一個Socket就新建一個線程,一多就容易將服務器撐爆。 new Thread(new ServerHandler(socket)).start(); } catch (Exception e) { e.printStackTrace(); } finally { if(server != null){ try { server.close(); } catch (IOException e) { e.printStackTrace(); } } server = null; } } }
服務端接收請求後進行run()響應緩存
package bhz.bio; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; // 服務端接收請求響應後的具體操做 public class ServerHandler implements Runnable{ private Socket socket ; public ServerHandler(Socket socket){ this.socket = socket; } @Override public void run() { BufferedReader in = null; PrintWriter out = null; try { // 接收客戶端數據 in = new BufferedReader(new InputStreamReader(this.socket.getInputStream())); // 向客戶端響應數據 out = new PrintWriter(this.socket.getOutputStream(), true); String body = null; while(true){ // 讀取客戶端數據 body = in.readLine(); if(body == null) break; System.out.println("Server :" + body); out.println("服務器端回送響的應數據."); } } catch (Exception e) { e.printStackTrace(); } finally { if(in != null){ try { in.close(); } catch (IOException e) { e.printStackTrace(); } } if(out != null){ try { out.close(); } catch (Exception e) { e.printStackTrace(); } } if(socket != null){ try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } socket = null; } } }
客戶端請求:服務器
package bhz.bio; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; public class Client { final static String ADDRESS = "127.0.0.1"; final static int PORT = 8765; public static void main(String[] args) { Socket socket = null; BufferedReader in = null; PrintWriter out = null; try { // 建立Socket鏈接 socket = new Socket(ADDRESS, PORT); // 接收服務端請求數據 in = new BufferedReader(new InputStreamReader(socket.getInputStream())); // 向服務端發送響應數據 out = new PrintWriter(socket.getOutputStream(), true); //向服務器端發送數據 out.println("接收到客戶端的請求數據..."); out.println("接收到客戶端的請求數據1111..."); // 讀取服務端響應的數據 String response = in.readLine(); System.out.println("Client: " + response); } catch (Exception e) { e.printStackTrace(); } finally { if(in != null){ try { in.close(); } catch (IOException e) { e.printStackTrace(); } } if(out != null){ try { out.close(); } catch (Exception e) { e.printStackTrace(); } } if(socket != null){ try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } socket = null; } } }
二:爲擺脫服務端每接收一次客戶端請求就得建立一個線程的問題,能夠自定義線程池,限制線程、隊列的大小。(爲傳統的BIO編程)異步
自定義線程池:socket
package bhz.bio2; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; // 服務端自定的線程池 public class HandlerExecutorPool { private ExecutorService executor; // 構造方法,定義參數值 public HandlerExecutorPool(int maxPoolSize, int queueSize){ this.executor = new ThreadPoolExecutor( Runtime.getRuntime().availableProcessors(), maxPoolSize, 120L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueSize)); } public void execute(Runnable task){ this.executor.execute(task); } }
服務端使用線程池:ide
package bhz.bio2; import java.io.BufferedReader; import java.io.PrintWriter; import java.net.ServerSocket; import java.net.Socket; public class Server { final static int PORT = 8765; public static void main(String[] args) { ServerSocket server = null; BufferedReader in = null; PrintWriter out = null; try { server = new ServerSocket(PORT); System.out.println("server start"); Socket socket = null; // 自定義一個線程池 HandlerExecutorPool executorPool = new HandlerExecutorPool(50, 1000); while(true){ // 阻塞,接收客戶端請求 socket = server.accept(); // 將客戶端請求放在線程池中,進行服務端響應 executorPool.execute(new ServerHandler(socket)); } } catch (Exception e) { e.printStackTrace(); } finally { if(in != null){ try { in.close(); } catch (Exception e1) { e1.printStackTrace(); } } if(out != null){ try { out.close(); } catch (Exception e2) { e2.printStackTrace(); } } if(server != null){ try { server.close(); } catch (Exception e3) { e3.printStackTrace(); } } server = null; } } }
服務端執行請求響應操做:this
package bhz.bio2; import java.io.BufferedReader; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; public class ServerHandler implements Runnable { private Socket socket; public ServerHandler (Socket socket){ this.socket = socket; } // 執行線程操做 @Override public void run() { BufferedReader in = null; PrintWriter out = null; try { // 接收客戶端請求數據 in = new BufferedReader(new InputStreamReader(this.socket.getInputStream())); // 發送服務端響應數據 out = new PrintWriter(this.socket.getOutputStream(), true); String body = null; while(true){ body = in.readLine(); if(body == null) break; System.out.println("Server:" + body); out.println("Server response"); } } catch (Exception e) { e.printStackTrace(); } finally { if(in != null){ try { in.close(); } catch (Exception e1) { e1.printStackTrace(); } } if(out != null){ try { out.close(); } catch (Exception e2) { e2.printStackTrace(); } } if(socket != null){ try { socket.close(); } catch (Exception e3) { e3.printStackTrace(); } } socket = null; } } }
客戶端請求:spa
package bhz.bio2; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; import java.net.UnknownHostException; public class Client { final static String ADDRESS = "127.0.0.1"; final static int PORT =8765; public static void main(String[] args) { Socket socket = null; BufferedReader in = null; PrintWriter out = null; try { // 建立Socket鏈接 socket = new Socket(ADDRESS, PORT); // 接收服務端返回數據 in = new BufferedReader(new InputStreamReader(socket.getInputStream())); // 發送客戶端響應數據 out = new PrintWriter(socket.getOutputStream(), true); out.println("Client request"); String response = in.readLine(); System.out.println("Client:" + response); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { if(in != null){ try { in.close(); } catch (Exception e1) { e1.printStackTrace(); } } if(out != null){ try { out.close(); } catch (Exception e2) { e2.printStackTrace(); } } if(socket != null){ try { socket.close(); } catch (Exception e3) { e3.printStackTrace(); } } socket = null; } } }
三:NIO的Socket通訊:同步非阻塞
1.NIO三個重要概念:Buffer(緩衝區)、Channel(通道)、選擇器(Selector)。
Buffer(緩衝區):用於通訊傳輸時的緩衝
Channel(通道):客戶端和服務端都有一個通道,分別爲:socketChannel和ServerSocketChannel,這兩個類型管道都得註冊到選擇器(Seletoer)上,由選擇器進行統一的通訊管理。
選擇器(Selector):對客戶端和服務端的通訊管道進行統一管理
2.Buffer的基本概念操做:
package bhz.nio.test; import java.nio.IntBuffer; public class TestBuffer { public static void main(String[] args) { // 1 基本操做 //建立指定長度的緩衝區 IntBuffer buf = IntBuffer.allocate(10); buf.put(13);// position位置:0 - > 1 緩衝區加入數據 buf.put(21);// position位置:1 - > 2 buf.put(35);// position位置:2 - > 3 //把位置復位爲0,也就是position位置:3 - > 0 buf.flip(); System.out.println("使用flip復位:" + buf); System.out.println("容量爲: " + buf.capacity()); //容量一旦初始化後不容許改變(warp方法包裹數組除外) System.out.println("限制爲: " + buf.limit()); //因爲只裝載了三個元素,因此可讀取或者操做的元素爲3 則limit=3 System.out.println("獲取下標爲1的元素:" + buf.get(1)); System.out.println("get(index)方法,position位置不改變:" + buf); buf.put(1, 4); System.out.println("put(index, change)方法,position位置不變:" + buf);; for (int i = 0; i < buf.limit(); i++) { //調用get方法會使其緩衝區位置(position)向後遞增一位 System.out.print(buf.get() + "\t"); } System.out.println("buf對象遍歷以後爲: " + buf); // 2 wrap方法使用 /** // wrap方法會包裹一個數組: 通常這種用法不會先初始化緩存對象的長度,由於沒有意義,最後還會被wrap所包裹的數組覆蓋掉。 // 而且wrap方法修改緩衝區對象的時候,數組自己也會跟着發生變化。 int[] arr = new int[]{1,2,5}; IntBuffer buf1 = IntBuffer.wrap(arr); System.out.println(buf1); IntBuffer buf2 = IntBuffer.wrap(arr, 0 , 2); //這樣使用表示容量爲數組arr的長度,可是可操做的元素只有實際進入緩存區的元素長度 System.out.println(buf2); */ // 3 其餘方法 /** IntBuffer buf1 = IntBuffer.allocate(10); int[] arr = new int[]{1,2,5}; buf1.put(arr); System.out.println(buf1); //一種複製方法 IntBuffer buf3 = buf1.duplicate(); System.out.println(buf3); //設置buf1的位置屬性 //buf1.position(0); buf1.flip(); System.out.println(buf1); System.out.println("可讀數據爲:" + buf1.remaining()); int[] arr2 = new int[buf1.remaining()]; //將緩衝區數據放入arr2數組中去 buf1.get(arr2); for(int i : arr2){ System.out.print(Integer.toString(i) + ","); } */ } }
2.使用管道Channel和選擇器Seletor的服務端(接收客戶端請求):
package bhz.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; public class Server implements Runnable{ //1 多路複用器(管理全部的通道),channel會註冊到這個seletor對象上 private Selector seletor; //2 創建緩衝區 private ByteBuffer readBuf = ByteBuffer.allocate(1024); //3 private ByteBuffer writeBuf = ByteBuffer.allocate(1024); // 構造方法,初始化參數 public Server(int port){ try { //1 打開路複用器 this.seletor = Selector.open(); //2 打開服務器通道 ServerSocketChannel ssc = ServerSocketChannel.open(); //3 設置服務器通道爲非阻塞模式,爲true的話和傳統的阻塞socket沒區別 ssc.configureBlocking(false); //4 綁定地址 ssc.bind(new InetSocketAddress(port)); //5 把服務器通道註冊到多路複用器上,而且監聽阻塞事件 ssc.register(this.seletor, SelectionKey.OP_ACCEPT); System.out.println("Server start, port :" + port); } catch (IOException e) { e.printStackTrace(); } } // 線程開啓後執行 @Override public void run() { while(true){ try { //1 必需要讓多路複用器開始監聽 this.seletor.select(); //2 返回多路複用器已經選擇的結果集,客戶端chanal和服務端的serverChanal都會註冊到seletor上(註冊的值爲Key),此處拿到的keys包含了客戶端的和服務端的key Iterator<SelectionKey> keys = this.seletor.selectedKeys().iterator(); //3 對結果集進行遍歷 while(keys.hasNext()){ //4 獲取一個選擇的元素 SelectionKey key = keys.next(); //5 直接從容器中移除就能夠了 keys.remove(); //6 若是是有效的 if(key.isValid()){ //7 若是爲阻塞狀態,說明key爲服務端的serverSocketChanal。此處NIO爲接收客戶端請求,同步不阻塞 if(key.isAcceptable()){ this.accept(key); } //8 若是爲可讀狀態 if(key.isReadable()){ // 讀取每個客戶端的SocketChannle發送過來的數據 this.read(key); } //9 寫數據 if(key.isWritable()){ //this.write(key); //ssc } } } } catch (IOException e) { e.printStackTrace(); } } } private void write(SelectionKey key){ //ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); //ssc.register(this.seletor, SelectionKey.OP_WRITE); } // 接收讀取客戶端數據 private void read(SelectionKey key) { try { //1 清空緩衝區舊的數據 this.readBuf.clear(); //2 獲取以前註冊的socket通道對象 SocketChannel sc = (SocketChannel) key.channel(); //3 讀取數據 int count = sc.read(this.readBuf); //4 若是沒有數據 if(count == -1){ key.channel().close(); key.cancel(); return; } //5 有數據則進行讀取 讀取以前須要進行復位方法(把position 和limit進行復位) this.readBuf.flip(); //6 根據緩衝區的數據長度建立相應大小的byte數組,接收緩衝區的數據 byte[] bytes = new byte[this.readBuf.remaining()]; //7 接收緩衝區數據 this.readBuf.get(bytes); //8 打印客戶端傳送過來的結果數據 String body = new String(bytes).trim(); System.out.println("Server : " + body); // 9..能夠寫回給客戶端數據 } catch (IOException e) { e.printStackTrace(); } } // 接收客戶端請求 private void accept(SelectionKey key) { try { //1 獲取服務通道 ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); //2 執行阻塞方法,拿到客戶端管道,客戶端管道也要註冊到selector選擇器上 SocketChannel sc = ssc.accept(); //3 設置阻塞模式:非阻塞 sc.configureBlocking(false); //4 註冊到多路複用器上,並設置讀取標識 sc.register(this.seletor, SelectionKey.OP_READ); } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) { // 開啓服務,執行線程run()方法 new Thread(new Server(8765)).start();; } }
3.客戶端發送請求:
package bhz.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; public class Client { //須要一個Selector public static void main(String[] args) { //建立鏈接的地址 InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8765); //聲明鏈接通道 SocketChannel sc = null; //創建緩衝區 ByteBuffer buf = ByteBuffer.allocate(1024); try { //打開通道 sc = SocketChannel.open(); //進行鏈接 sc.connect(address); // 鏈接成功 while(true){ //定義一個字節數組,而後使用系統錄入功能: byte[] bytes = new byte[1024]; System.in.read(bytes); //把數據放到緩衝區中 buf.put(bytes); //對緩衝區進行復位 buf.flip(); //寫出數據 sc.write(buf); //清空緩衝區數據 buf.clear(); } } catch (IOException e) { e.printStackTrace(); } finally { if(sc != null){ try { sc.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
四:AIO的Socket通訊:AIO是在NIO的基礎上,新增了異步通道,在管道前面使用了Asynchronous的,服務端和客戶端管道類分別爲:AsynchronousServerSocketChannel和AsynchronousSocketChannel。
AIO通訊的服務端爲:先建立線程池,線程組使用線程池並負責客戶端管道的接入,服務端開啓線程組。
1.服務端ServerCompletionHandler類的建立(服務端接收請求後的具體執行內容):
package bhz.aio; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.ExecutionException; public class ServerCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, Server> { // 鏈接成功的操做方法,每個客戶端鏈接進來都進行此操做 @Override public void completed(AsynchronousSocketChannel asc, Server attachment) { //當有下一個客戶端接入的時候 直接調用Server的accept方法,這樣反覆執行下去,保證多個客戶端均可以阻塞 attachment.assc.accept(attachment, this); read(asc); } // 讀取客戶端管道數據 private void read(final AsynchronousSocketChannel asc) { //讀取數據 ByteBuffer buf = ByteBuffer.allocate(1024); // 異步,不堵塞,執行不成功也直接往下走 asc.read(buf, buf, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer resultSize, ByteBuffer attachment) { //進行讀取以後,重置標識位 attachment.flip(); //得到讀取的字節數 System.out.println("Server -> " + "收到客戶端的數據長度爲:" + resultSize); //獲取讀取的數據 String resultData = new String(attachment.array()).trim(); System.out.println("Server -> " + "收到客戶端的數據信息爲:" + resultData); // 發送服務端響應數據 String response = "服務器響應, 收到了客戶端發來的數據: " + resultData; write(asc, response); } @Override public void failed(Throwable exc, ByteBuffer attachment) { exc.printStackTrace(); } }); } // 服務端寫響應數據 private void write(AsynchronousSocketChannel asc, String response) { try { // 設置緩衝區大小 ByteBuffer buf = ByteBuffer.allocate(1024); // 向客戶端發送響應數據 buf.put(response.getBytes()); buf.flip(); asc.write(buf).get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } // 鏈接失敗的操做方法 @Override public void failed(Throwable exc, Server attachment) { exc.printStackTrace(); } }
2.服務端:
package bhz.aio; import java.net.InetSocketAddress; import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousServerSocketChannel; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Server { //線程池 private ExecutorService executorService; //線程組 private AsynchronousChannelGroup threadGroup; //服務器通道 public AsynchronousServerSocketChannel assc; public Server(int port){ try { //建立一個緩存池 executorService = Executors.newCachedThreadPool(); //建立線程組:異步,負責接收client的接入 threadGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1); //建立服務器通道 assc = AsynchronousServerSocketChannel.open(threadGroup); //進行綁定接收端口 assc.bind(new InetSocketAddress(port)); System.out.println("server start , port : " + port); //進行阻塞:一直等客戶端的請求,this指當前對象。new ServerCompletionHandler()爲進行通訊的具體操做內容 assc.accept(this, new ServerCompletionHandler()); //一直阻塞 不讓服務器中止 Thread.sleep(Integer.MAX_VALUE); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { Server server = new Server(8765); } }
3.客戶端:
package bhz.aio; import java.io.UnsupportedEncodingException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.util.concurrent.ExecutionException; public class Client implements Runnable{ private AsynchronousSocketChannel asc ; public Client() throws Exception { asc = AsynchronousSocketChannel.open(); } // 鏈接服務端地址 public void connect(){ asc.connect(new InetSocketAddress("127.0.0.1", 8765)); } // 向服務端寫數據 public void write(String request){ try { asc.write(ByteBuffer.wrap(request.getBytes())).get(); read(); } catch (Exception e) { e.printStackTrace(); } } //接收服務端數據 private void read() { ByteBuffer buf = ByteBuffer.allocate(1024); try { asc.read(buf).get(); buf.flip(); byte[] respByte = new byte[buf.remaining()]; buf.get(respByte); System.out.println(new String(respByte,"utf-8").trim()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override public void run() { while(true){ } } public static void main(String[] args) throws Exception { Client c1 = new Client(); c1.connect(); Client c2 = new Client(); c2.connect(); Client c3 = new Client(); c3.connect(); new Thread(c1, "c1").start(); new Thread(c2, "c2").start(); new Thread(c3, "c3").start(); Thread.sleep(1000); c1.write("c1 aaa"); c2.write("c2 bbbb"); c3.write("c3 ccccc"); } }
客戶端發送請求,接收服務端的數據:
服務器響應, 收到了客戶端發來的數據: c1 aaa
服務器響應, 收到了客戶端發來的數據: c2 bbbb
服務器響應, 收到了客戶端發來的數據: c3 ccccc
五:思考:BIO和NIO,NIO和AIO的區別?(這三者如今開發都不會用了)