Java IO------------------BIO(同步阻塞)、NIO1.0(多路複用)、NIO2.0(AIO,非阻塞)

1. BIO

JDK5以前, JDK的IO模式只有BIO(同步阻塞)
問題: 由於阻塞的存在, 需對每一個請求開啓一個線程. 過多的線程切換影響操做系統性能
解決: 使用線程池, 處理不過來的放入隊列, 再處理不過來的會觸發其餘機制
問題: 超過線程池數量的請求須要等待java

複製代碼
public class Client {

    final static String ADDRESS = "127.0.0.1";
    final static int PORT = 8765;
    
    public static void main(String[] args) throws IOException {
        Socket socket = null;
        BufferedReader in = null;
        PrintWriter out = null;
        try {
            socket = new Socket(ADDRESS, PORT);
            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            out = new PrintWriter(socket.getOutputStream(), true);  // true自動flush
            //向服務器端發送數據
            out.println("來自客戶端的請求");
            //從服務端接收數據
            String response = in.readLine();  // 阻塞
            System.out.println("Client獲取數據: " + response);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            out.close();
            in.close();
            socket.close();
        }
    }
}
複製代碼

服務端1: 一個請求~一個線程數組

複製代碼
public class Server {
    final static int PROT = 8765;
    public static void main(String[] args) throws IOException {
        ServerSocket server = null;
        try {
            server = new ServerSocket(PROT);
            System.out.println("server start");
            while(true){
                Socket socket = server.accept();  //監聽 阻塞 , socket底層會新建線程處理與客戶端的三次握手
                //創建線程處理獲取的 socket
                new Thread(new ServerHandler(socket)).start();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            server.close();
        }
    }
}

class ServerHandler implements Runnable {
    private Socket socket;
    public ServerHandler(Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {
        BufferedReader in = null;
        PrintWriter out = null;
        try {
            in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
            out = new PrintWriter(this.socket.getOutputStream(), true);
            String body = null;
            while (true) {
                body = in.readLine();  // 阻塞
                if (body == null)
                    break;
                System.out.println("Server獲取的請求: " + body);
                out.println("來自服務器的響應");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                out.close();
                in.close();
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
複製代碼

服務端2: 用線程池處理請求服務器

複製代碼
public class Server {

    final static int PORT = 8765;

    public static void main(String[] args) throws IOException {
        ServerSocket server = null;
        try {
            server = new ServerSocket(PORT);
            System.out.println("server start");
            HandlerExecutorPool executorPool = new HandlerExecutorPool(50, 1000);
            while(true){
                Socket socket = server.accept();
                executorPool.execute(new ServerHandler(socket));
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            server.close();
        }
    }
}

class HandlerExecutorPool {
    private ExecutorService executor;
    public HandlerExecutorPool(int maxPoolSize, int queueSize){
        this.executor = new ThreadPoolExecutor( // 帶阻塞隊列的線程池
                Runtime.getRuntime().availableProcessors(),  // 初始線程數
                maxPoolSize,        // 線程數上限   若是要處理請求的Runnable對象裝滿了隊列, 則提升現有線程數
                120L,               // 如在120個時間顆粒內某線程是空閒的, 將被回收
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(queueSize)  // 存放處理請求的Runnable對象
        );
    }
    public void execute(Runnable task){
        this.executor.execute(task);
    }
}

class ServerHandler implements Runnable {
    private Socket socket;
    public ServerHandler(Socket socket) {
        this.socket = socket;
    }
    @Override
    public void run() {
        BufferedReader in = null;
        PrintWriter out = null;
        try {
            in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
            out = new PrintWriter(this.socket.getOutputStream(), true);
            String body = null;
            while (true) {
                body = in.readLine();
                if (body == null)
                    break;
                System.out.println("Server獲取的請求: " + body);  // 阻塞
                out.println("來自服務器的響應");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                out.close();
                in.close();
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
複製代碼

 

2.NIO1.0

JDK5之後引入了NIO1.0(多路複用機制)異步

伴隨多路複用在程序中引入了以下概念:socket

Channel(通道):TCP鏈接的抽象,一個TCP鏈接對應多個Channel,這樣減小TCP的鏈接次數。
通道與BIO中socket相似
通道與BIO中的流相似, 不過channel是雙向的而流是單向的
channel有多種狀態位, 能被selector識別ide

Buffer(緩衝區):
緩衝區是一塊內存區域(數組), 在NIO中被包裝成Buffer對象. Buffer提供方法用來訪問該內存。
BIO中,數據存儲在流中,而NIO中,數據存儲在緩衝區中。
除了boolean的其餘java七種基本類型都有相應的Buffer類. 最常使用的是ByteBuffer函數

Selector(多路複用器):負責輪詢全部註冊通道,根據通道狀態執行相關操做。狀態包括:Connect,Accept,Read,Write。
在"四種經常使用IO模型"裏提過用select系統調用實現IO多路複用. 除select外Linux還提供了poll/epoll函數, 其中select/poll函數按順序掃描文件句柄是否就緒,支持的文件句柄數有限; 而epoll使用基於事件驅動方式替代順序掃描,性能更高, 對文件句柄數沒有數量限制. JDK的Selector使用了epoll, 只須要一個線程輪詢, 就能夠接入大量的客戶端.性能

複製代碼
public class Client {

    public static void main(String[] args) throws IOException {
        SocketChannel sc = null;
        ByteBuffer writeBuf = ByteBuffer.allocate(1024);
        ByteBuffer readBuf = ByteBuffer.allocate(1024);
        try {
            //建立通道
            sc = SocketChannel.open();
            //進行鏈接
            sc.connect(new InetSocketAddress("127.0.0.1", 8765));
            // 下面步驟能夠用selector輪詢代替
            while(true){
                //定義一個字節數組,而後使用系統錄入功能:
                byte[] bytes1 = new byte[1024];
                System.in.read(bytes1);  //阻塞
                //把數據放到緩衝區中
                writeBuf.put(bytes1);
                //對緩衝區進行復位
                writeBuf.flip();
                //寫出數據
                sc.write(writeBuf);
                //清空緩衝區
                writeBuf.clear();
                
                // 接收服務端響應
                sc.read(readBuf);
                readBuf.flip();
                byte[] bytes2 = new byte[readBuf.remaining()];
                readBuf.get(bytes2);
                readBuf.clear();
                String body = new String(bytes2);
                System.out.println("Client獲取數據: " + body);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            sc.close();
        }
    }
}
複製代碼

經過改變Selector監聽Channel的狀態位, 控制與客戶端讀寫的前後順序this

複製代碼
public class Server implements Runnable{  
    private Selector seletor;
    private ByteBuffer readBuf = ByteBuffer.allocate(1024);
    private ByteBuffer writeBuf = ByteBuffer.allocate(1024);
    
    public Server(int port){
        try {
            //1 建立多路複用器selector
            this.seletor = Selector.open();
            //2 建立ServerSocket通道
            ServerSocketChannel ssc = ServerSocketChannel.open();
            //3 設置通道是否阻塞, 決定了通道了read/write/accept/connect方法是否阻塞
            ssc.configureBlocking(false);
            //4 設置通道地址
            ssc.bind(new InetSocketAddress(port));
            //5 將ServerSocket通道註冊到selector上, 指定監聽其accept事件
            ssc.register(this.seletor, SelectionKey.OP_ACCEPT);
            System.out.println("Server start");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        while(true){
            try {
                // select阻塞, 監聽相關事件
                this.seletor.select();
                // 解除阻塞, 返回選擇key, key含有通道, 狀態等信息
                Iterator<SelectionKey> keysIter = this.seletor.selectedKeys().iterator();
                // 進行遍歷
                while(keysIter.hasNext()){
                    SelectionKey key = keysIter.next();
                    keysIter.remove();
                    if (key.isValid()) {
                        // 等待接收鏈接狀態
                        if (key.isAcceptable()) {
                            accept(key);
                        }
                        // 可讀狀態
                        if (key.isReadable()) {
                            read(key);
                        }
                        if (key.isWritable()) {
                            write(key);
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    

    private void write(SelectionKey key) {
        try {
            // 獲取通道
            SocketChannel sc = (SocketChannel) key.channel();
            // 寫回給客戶端數據
            writeBuf.put("來自服務器的響應".getBytes());
            writeBuf.flip();
            sc.write(writeBuf);
            writeBuf.clear();
            // 修改監聽的狀態位, 若是保持OP_WRITE會致使重複寫
            key.interestOps(SelectionKey.OP_READ);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void read(SelectionKey key) {
        try {
            // 獲取通道
            SocketChannel sc = (SocketChannel) key.channel();
            // 讀取數據, 讀到buffer. 按程序運行順序, 這裏sc是否設置爲阻塞效果都同樣
            int count = sc.read(this.readBuf);  // readBuf寫時會改變position的值
            if (count == -1) {
                key.channel().close();
                key.cancel();  //取消該通道在selector的註冊, 以後不會被select輪詢到
                return;
            }
            // 有數據則進行讀取. 讀取前須要將position和limit進行復位
            readBuf.flip();
            // 根據緩衝區的數據長度建立相應大小的byte數組, 接收緩衝區的數據
            byte[] bytes = new byte[this.readBuf.remaining()];
            // 接收緩衝區數據
            readBuf.get(bytes);
            readBuf.clear();
            String body = new String(bytes).trim();
            System.out.println("Server獲取的請求: " + body);
            // 若是保持OP_READ會致使重複讀
            sc.register(this.seletor, SelectionKey.OP_WRITE);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void accept(SelectionKey key) {
        try {
            // 獲取服務通道
            ServerSocketChannel ssc =  (ServerSocketChannel) key.channel();
            // 獲取客戶端通道.
            SocketChannel sc = ssc.accept();
            // 設置非阻塞模式
            sc.configureBlocking(false);
            // 將客戶端通道註冊到多路複用器上,指定監聽事件
            sc.register(this.seletor, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    public static void main(String[] args) {
        new Thread(new Server(8765)).start();;
    }
}
複製代碼

BIO客戶端與NIO服務端通訊需注意的:spa

BIO服務端, 一次IO有明確的結束點, 客戶端再次read會返回-1

NIO服務端一次IO結束後, 沒有關閉通道, 它可能把通道從讀狀態轉爲寫狀態. 因而selector不監聽讀了, 客戶端再次read什麼都沒返回, 就會阻塞.

 

3.NIO2.0

JDK7引入了NIO2.0(即AIO)

NIO1.0中, IO過程沒有阻塞, 阻塞被轉移到了Selector輪詢上. Selector管理全部的Channel, 所以能把總阻塞時間縮到最短.

NIO2.0中, 供咱們調用的IO API都是非阻塞的, 背後複雜的實現過程(確定有阻塞)被轉移到了JDK底層和操做系統上. 咱們的程序的IO調用能夠作到當即返回.

一樣有Channel和Buffer, 但沒有Selector

複製代碼
public class Server {
    //線程池
    private ExecutorService executorService;
    //異步通道線程組
    private AsynchronousChannelGroup threadGroup;
    //服務器通道
    public AsynchronousServerSocketChannel assc;
    
    public Server(int port){
        try {
            //建立一個線程池
            executorService = Executors.newCachedThreadPool();
            //使用線程池建立異步通道線程組, 該線程組在底層支持着咱們的異步操做
            threadGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1);
            //使用 異步通道線程組 建立服務器通道
            assc = AsynchronousServerSocketChannel.open(threadGroup);
            //給通道綁定端口
            assc.bind(new InetSocketAddress(port));
            System.out.println("server start");
            // 下面的accept不會阻塞 , 一個accept只能接收一個鏈接請求
            // accept第一個參數: 被綁定到IO操做的關聯對象(子類), 第二個參數 CompletionHandler<AsynchronousSocketChannel, 關聯對象(父類)>, 操做成功後執行的回調句柄
            // 若是接受了一個新的鏈接, 其結果AsynchronousSocketChannel會被綁定與assc通道到相同的AsynchronousChannelGroup 
            assc.accept(this, new ServerCompletionHandler());
            // 這裏爲了不程序結束, 異步通道線程組結束就不會執行回調了
            Thread.sleep(Integer.MAX_VALUE);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) {
        new Server(8765);
    }
    
}
複製代碼
複製代碼
//第一個參數: IO操做結果; 第二個參數: 被綁定到IO操做的關聯對象
public class ServerCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, Server> {

    // 如下兩個重載參數與CompletionHander的模板參數一致, 回調時被傳入IO結果和IO操做時設置的關聯對象
    @Override
    public void completed(AsynchronousSocketChannel asc, Server attachment) {
        // 完成當前鏈接時, 首先, 爲下一個客戶端能接入再次調用accept異步方法
        attachment.assc.accept(attachment, this);
        // 其次, 執行下一步的讀操做
        read(asc);
    }
    @Override
    public void failed(Throwable exc, Server attachment) {
        exc.printStackTrace();
    }

    private void read(final AsynchronousSocketChannel asc) {
        //讀取數據
        ByteBuffer buf = ByteBuffer.allocate(1024);
        // 第一個參數: 讀操做的Buffer, 第二個參數: IO關聯對象, 第三個參數:CompletionHandler<Integer, IO管理對象父類>
        asc.read(buf, buf, new CompletionHandler<Integer, ByteBuffer>() {
            @Override
            public void completed(Integer resultSize, ByteBuffer attachment) {
                //進行讀取以後,重置標識位
                attachment.flip();
                //得到讀取的字節數
                System.out.println("Server端" + "收到客戶端的數據長度爲:" + resultSize);
                //獲取讀取的數據
                String resultData = new String(attachment.array()).trim();
                System.out.println("Server端" + "收到客戶端的數據信息爲:" + resultData);
                String response = "From服務端To客戶端: 於" + new Date() + "收到了請求數據"+ resultData;
                write(asc, response);
            }
            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                exc.printStackTrace();
            }
        });
    }
    
    private void write(AsynchronousSocketChannel asc, String response) {
        try {
            ByteBuffer buf = ByteBuffer.allocate(1024);
            buf.put(response.getBytes());
            buf.flip();
            // 寫操做, 異步
            Future<Integer> future = asc.write(buf);
            // 阻塞等待結果
            future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}
複製代碼
複製代碼
public class Client {

    private AsynchronousSocketChannel asc ;
    public Client() throws Exception {
        asc = AsynchronousSocketChannel.open();
    }
    
    public void connect() throws InterruptedException, ExecutionException{
        // get()阻塞
        asc.connect(new InetSocketAddress("127.0.0.1", 8765)).get();
    }
    
    public void write(String request){
        try {
            // get()阻塞
            asc.write(ByteBuffer.wrap(request.getBytes())).get();
            read();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void read() throws IOException {
        ByteBuffer buf = ByteBuffer.allocate(1024);
        try {
            // get()阻塞
            asc.read(buf).get();
            buf.flip();
            byte[] respByte = new byte[buf.remaining()];
            buf.get(respByte);
            System.out.println(new String(respByte,"utf-8").trim());
            // 關閉
            asc.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }
    
    public static void main(String[] args) throws Exception {
        Client c1 = new Client();
        Client c2 = new Client();
        c1.connect();
        c2.connect();
        
        c1.write("aa");
        c2.write("bbb");
    }
}
複製代碼
相關文章
相關標籤/搜索