JDK5以前, JDK的IO模式只有BIO(同步阻塞)
問題: 由於阻塞的存在, 需對每一個請求開啓一個線程. 過多的線程切換影響操做系統性能
解決: 使用線程池, 處理不過來的放入隊列, 再處理不過來的會觸發其餘機制
問題: 超過線程池數量的請求須要等待java
public class Client { final static String ADDRESS = "127.0.0.1"; final static int PORT = 8765; public static void main(String[] args) throws IOException { Socket socket = null; BufferedReader in = null; PrintWriter out = null; try { socket = new Socket(ADDRESS, PORT); in = new BufferedReader(new InputStreamReader(socket.getInputStream())); out = new PrintWriter(socket.getOutputStream(), true); // true自動flush //向服務器端發送數據 out.println("來自客戶端的請求"); //從服務端接收數據 String response = in.readLine(); // 阻塞 System.out.println("Client獲取數據: " + response); } catch (Exception e) { e.printStackTrace(); } finally { out.close(); in.close(); socket.close(); } } }
服務端1: 一個請求~一個線程數組
public class Server { final static int PROT = 8765; public static void main(String[] args) throws IOException { ServerSocket server = null; try { server = new ServerSocket(PROT); System.out.println("server start"); while(true){ Socket socket = server.accept(); //監聽 阻塞 , socket底層會新建線程處理與客戶端的三次握手 //創建線程處理獲取的 socket new Thread(new ServerHandler(socket)).start(); } } catch (Exception e) { e.printStackTrace(); } finally { server.close(); } } } class ServerHandler implements Runnable { private Socket socket; public ServerHandler(Socket socket) { this.socket = socket; } @Override public void run() { BufferedReader in = null; PrintWriter out = null; try { in = new BufferedReader(new InputStreamReader(this.socket.getInputStream())); out = new PrintWriter(this.socket.getOutputStream(), true); String body = null; while (true) { body = in.readLine(); // 阻塞 if (body == null) break; System.out.println("Server獲取的請求: " + body); out.println("來自服務器的響應"); } } catch (Exception e) { e.printStackTrace(); } finally { try { out.close(); in.close(); socket.close(); } catch (IOException e) { e.printStackTrace(); } } } }
服務端2: 用線程池處理請求服務器
public class Server { final static int PORT = 8765; public static void main(String[] args) throws IOException { ServerSocket server = null; try { server = new ServerSocket(PORT); System.out.println("server start"); HandlerExecutorPool executorPool = new HandlerExecutorPool(50, 1000); while(true){ Socket socket = server.accept(); executorPool.execute(new ServerHandler(socket)); } } catch (Exception e) { e.printStackTrace(); } finally { server.close(); } } } class HandlerExecutorPool { private ExecutorService executor; public HandlerExecutorPool(int maxPoolSize, int queueSize){ this.executor = new ThreadPoolExecutor( // 帶阻塞隊列的線程池 Runtime.getRuntime().availableProcessors(), // 初始線程數 maxPoolSize, // 線程數上限 若是要處理請求的Runnable對象裝滿了隊列, 則提升現有線程數 120L, // 如在120個時間顆粒內某線程是空閒的, 將被回收 TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueSize) // 存放處理請求的Runnable對象 ); } public void execute(Runnable task){ this.executor.execute(task); } } class ServerHandler implements Runnable { private Socket socket; public ServerHandler(Socket socket) { this.socket = socket; } @Override public void run() { BufferedReader in = null; PrintWriter out = null; try { in = new BufferedReader(new InputStreamReader(this.socket.getInputStream())); out = new PrintWriter(this.socket.getOutputStream(), true); String body = null; while (true) { body = in.readLine(); if (body == null) break; System.out.println("Server獲取的請求: " + body); // 阻塞 out.println("來自服務器的響應"); } } catch (Exception e) { e.printStackTrace(); } finally { try { out.close(); in.close(); socket.close(); } catch (IOException e) { e.printStackTrace(); } } } }
JDK5之後引入了NIO1.0(多路複用機制)異步
伴隨多路複用在程序中引入了以下概念:socket
Channel(通道):TCP鏈接的抽象,一個TCP鏈接對應多個Channel,這樣減小TCP的鏈接次數。
通道與BIO中socket相似
通道與BIO中的流相似, 不過channel是雙向的而流是單向的
channel有多種狀態位, 能被selector識別ide
Buffer(緩衝區):
緩衝區是一塊內存區域(數組), 在NIO中被包裝成Buffer對象. Buffer提供方法用來訪問該內存。
BIO中,數據存儲在流中,而NIO中,數據存儲在緩衝區中。
除了boolean的其餘java七種基本類型都有相應的Buffer類. 最常使用的是ByteBuffer函數
Selector(多路複用器):負責輪詢全部註冊通道,根據通道狀態執行相關操做。狀態包括:Connect,Accept,Read,Write。
在"四種經常使用IO模型"裏提過用select系統調用實現IO多路複用. 除select外Linux還提供了poll/epoll函數, 其中select/poll函數按順序掃描文件句柄是否就緒,支持的文件句柄數有限; 而epoll使用基於事件驅動方式替代順序掃描,性能更高, 對文件句柄數沒有數量限制. JDK的Selector使用了epoll, 只須要一個線程輪詢, 就能夠接入大量的客戶端.性能
public class Client { public static void main(String[] args) throws IOException { SocketChannel sc = null; ByteBuffer writeBuf = ByteBuffer.allocate(1024); ByteBuffer readBuf = ByteBuffer.allocate(1024); try { //建立通道 sc = SocketChannel.open(); //進行鏈接 sc.connect(new InetSocketAddress("127.0.0.1", 8765)); // 下面步驟能夠用selector輪詢代替 while(true){ //定義一個字節數組,而後使用系統錄入功能: byte[] bytes1 = new byte[1024]; System.in.read(bytes1); //阻塞 //把數據放到緩衝區中 writeBuf.put(bytes1); //對緩衝區進行復位 writeBuf.flip(); //寫出數據 sc.write(writeBuf); //清空緩衝區 writeBuf.clear(); // 接收服務端響應 sc.read(readBuf); readBuf.flip(); byte[] bytes2 = new byte[readBuf.remaining()]; readBuf.get(bytes2); readBuf.clear(); String body = new String(bytes2); System.out.println("Client獲取數據: " + body); } } catch (IOException e) { e.printStackTrace(); } finally { sc.close(); } } }
經過改變Selector監聽Channel的狀態位, 控制與客戶端讀寫的前後順序this
public class Server implements Runnable{ private Selector seletor; private ByteBuffer readBuf = ByteBuffer.allocate(1024); private ByteBuffer writeBuf = ByteBuffer.allocate(1024); public Server(int port){ try { //1 建立多路複用器selector this.seletor = Selector.open(); //2 建立ServerSocket通道 ServerSocketChannel ssc = ServerSocketChannel.open(); //3 設置通道是否阻塞, 決定了通道了read/write/accept/connect方法是否阻塞 ssc.configureBlocking(false); //4 設置通道地址 ssc.bind(new InetSocketAddress(port)); //5 將ServerSocket通道註冊到selector上, 指定監聽其accept事件 ssc.register(this.seletor, SelectionKey.OP_ACCEPT); System.out.println("Server start"); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { while(true){ try { // select阻塞, 監聽相關事件 this.seletor.select(); // 解除阻塞, 返回選擇key, key含有通道, 狀態等信息 Iterator<SelectionKey> keysIter = this.seletor.selectedKeys().iterator(); // 進行遍歷 while(keysIter.hasNext()){ SelectionKey key = keysIter.next(); keysIter.remove(); if (key.isValid()) { // 等待接收鏈接狀態 if (key.isAcceptable()) { accept(key); } // 可讀狀態 if (key.isReadable()) { read(key); } if (key.isWritable()) { write(key); } } } } catch (IOException e) { e.printStackTrace(); } } } private void write(SelectionKey key) { try { // 獲取通道 SocketChannel sc = (SocketChannel) key.channel(); // 寫回給客戶端數據 writeBuf.put("來自服務器的響應".getBytes()); writeBuf.flip(); sc.write(writeBuf); writeBuf.clear(); // 修改監聽的狀態位, 若是保持OP_WRITE會致使重複寫 key.interestOps(SelectionKey.OP_READ); } catch (IOException e) { e.printStackTrace(); } } private void read(SelectionKey key) { try { // 獲取通道 SocketChannel sc = (SocketChannel) key.channel(); // 讀取數據, 讀到buffer. 按程序運行順序, 這裏sc是否設置爲阻塞效果都同樣 int count = sc.read(this.readBuf); // readBuf寫時會改變position的值 if (count == -1) { key.channel().close(); key.cancel(); //取消該通道在selector的註冊, 以後不會被select輪詢到 return; } // 有數據則進行讀取. 讀取前須要將position和limit進行復位 readBuf.flip(); // 根據緩衝區的數據長度建立相應大小的byte數組, 接收緩衝區的數據 byte[] bytes = new byte[this.readBuf.remaining()]; // 接收緩衝區數據 readBuf.get(bytes); readBuf.clear(); String body = new String(bytes).trim(); System.out.println("Server獲取的請求: " + body); // 若是保持OP_READ會致使重複讀 sc.register(this.seletor, SelectionKey.OP_WRITE); } catch (IOException e) { e.printStackTrace(); } } private void accept(SelectionKey key) { try { // 獲取服務通道 ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); // 獲取客戶端通道. SocketChannel sc = ssc.accept(); // 設置非阻塞模式 sc.configureBlocking(false); // 將客戶端通道註冊到多路複用器上,指定監聽事件 sc.register(this.seletor, SelectionKey.OP_READ | SelectionKey.OP_WRITE); } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) { new Thread(new Server(8765)).start();; } }
BIO客戶端與NIO服務端通訊需注意的:spa
BIO服務端, 一次IO有明確的結束點, 客戶端再次read會返回-1
NIO服務端一次IO結束後, 沒有關閉通道, 它可能把通道從讀狀態轉爲寫狀態. 因而selector不監聽讀了, 客戶端再次read什麼都沒返回, 就會阻塞.
JDK7引入了NIO2.0(即AIO)
NIO1.0中, IO過程沒有阻塞, 阻塞被轉移到了Selector輪詢上. Selector管理全部的Channel, 所以能把總阻塞時間縮到最短.
NIO2.0中, 供咱們調用的IO API都是非阻塞的, 背後複雜的實現過程(確定有阻塞)被轉移到了JDK底層和操做系統上. 咱們的程序的IO調用能夠作到當即返回.
一樣有Channel和Buffer, 但沒有Selector
public class Server { //線程池 private ExecutorService executorService; //異步通道線程組 private AsynchronousChannelGroup threadGroup; //服務器通道 public AsynchronousServerSocketChannel assc; public Server(int port){ try { //建立一個線程池 executorService = Executors.newCachedThreadPool(); //使用線程池建立異步通道線程組, 該線程組在底層支持着咱們的異步操做 threadGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1); //使用 異步通道線程組 建立服務器通道 assc = AsynchronousServerSocketChannel.open(threadGroup); //給通道綁定端口 assc.bind(new InetSocketAddress(port)); System.out.println("server start"); // 下面的accept不會阻塞 , 一個accept只能接收一個鏈接請求 // accept第一個參數: 被綁定到IO操做的關聯對象(子類), 第二個參數 CompletionHandler<AsynchronousSocketChannel, 關聯對象(父類)>, 操做成功後執行的回調句柄 // 若是接受了一個新的鏈接, 其結果AsynchronousSocketChannel會被綁定與assc通道到相同的AsynchronousChannelGroup assc.accept(this, new ServerCompletionHandler()); // 這裏爲了不程序結束, 異步通道線程組結束就不會執行回調了 Thread.sleep(Integer.MAX_VALUE); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { new Server(8765); } }
//第一個參數: IO操做結果; 第二個參數: 被綁定到IO操做的關聯對象 public class ServerCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, Server> { // 如下兩個重載參數與CompletionHander的模板參數一致, 回調時被傳入IO結果和IO操做時設置的關聯對象 @Override public void completed(AsynchronousSocketChannel asc, Server attachment) { // 完成當前鏈接時, 首先, 爲下一個客戶端能接入再次調用accept異步方法 attachment.assc.accept(attachment, this); // 其次, 執行下一步的讀操做 read(asc); } @Override public void failed(Throwable exc, Server attachment) { exc.printStackTrace(); } private void read(final AsynchronousSocketChannel asc) { //讀取數據 ByteBuffer buf = ByteBuffer.allocate(1024); // 第一個參數: 讀操做的Buffer, 第二個參數: IO關聯對象, 第三個參數:CompletionHandler<Integer, IO管理對象父類> asc.read(buf, buf, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer resultSize, ByteBuffer attachment) { //進行讀取以後,重置標識位 attachment.flip(); //得到讀取的字節數 System.out.println("Server端" + "收到客戶端的數據長度爲:" + resultSize); //獲取讀取的數據 String resultData = new String(attachment.array()).trim(); System.out.println("Server端" + "收到客戶端的數據信息爲:" + resultData); String response = "From服務端To客戶端: 於" + new Date() + "收到了請求數據"+ resultData; write(asc, response); } @Override public void failed(Throwable exc, ByteBuffer attachment) { exc.printStackTrace(); } }); } private void write(AsynchronousSocketChannel asc, String response) { try { ByteBuffer buf = ByteBuffer.allocate(1024); buf.put(response.getBytes()); buf.flip(); // 寫操做, 異步 Future<Integer> future = asc.write(buf); // 阻塞等待結果 future.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }
public class Client { private AsynchronousSocketChannel asc ; public Client() throws Exception { asc = AsynchronousSocketChannel.open(); } public void connect() throws InterruptedException, ExecutionException{ // get()阻塞 asc.connect(new InetSocketAddress("127.0.0.1", 8765)).get(); } public void write(String request){ try { // get()阻塞 asc.write(ByteBuffer.wrap(request.getBytes())).get(); read(); } catch (Exception e) { e.printStackTrace(); } } private void read() throws IOException { ByteBuffer buf = ByteBuffer.allocate(1024); try { // get()阻塞 asc.read(buf).get(); buf.flip(); byte[] respByte = new byte[buf.remaining()]; buf.get(respByte); System.out.println(new String(respByte,"utf-8").trim()); // 關閉 asc.close(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } public static void main(String[] args) throws Exception { Client c1 = new Client(); Client c2 = new Client(); c1.connect(); c2.connect(); c1.write("aa"); c2.write("bbb"); } }