深刻理解Java I/O模型

前言

咱們在開發過程當中常常跟I/O打交道,不少人在學習I/O模型過程和進行I/O編程過程當中,對不少概念可能不明朗,特別是像Java這樣的高級語言,它對底層操做系統的各類I/O模型進行了封裝,使得咱們能夠很輕鬆的進行開發,可是在方便之餘你是否對Java中各類I/O模型,以及它們和操做系統之間的關聯是否有過了解?java

什麼是I/O?

I/O 在計算機中指Input/Output,即輸入輸出。以一次文件讀取爲例,咱們須要將磁盤上的數據讀取到用戶空間,那麼此次數據轉移操做其實就是一次I/O操做,也就是一次文件I/O;咱們天天都瀏覽着各類各樣的網頁,在咱們每請求一個網頁,服務器經過網絡將一個個的分組數據發送給咱們,應用程序從TCP緩衝區將數據複製到用戶空間的過程也是一次I/O,即一次網絡I/O。能夠發現I/O如此重要,它時刻都在。程序員

Liunx 網絡 I/O模型

根據UNIX網絡編程對I/O模型的分類,UNIX提供了5中I/O模型分別以下:編程

阻塞I/O模型

這是最傳統的I/O模型,即在讀寫數據過程當中會阻塞,咱們經過圖能夠看到,在應用進程調用recvfrom,系統調用直到數據從內核從複製到用戶空間,應用進程在這一段時間內一直是被阻塞的。這種模型適合併發量較小的對時延不敏感的系統bash

非阻塞I/O模型

應用進程不停的經過recvfrom調用不停的和內核交互直到數據被被準備好,將他複製到用戶空間中,若是recvfrom調用沒有數據能夠返回時返回一個EWOULDBLOCK錯誤,咱們將這樣的操做稱做輪詢,這麼作每每須要耗費大量的CPU時間服務器

I/O複用模型

在Liunx中爲咱們提供了select/poll,也就是管道,咱們就能夠將調用它們阻塞在這兩個系統調用中的一個上,而不是阻塞在真正的I/O調用上,咱們阻塞select調用當數據返回可讀條件時,經過recvfrom調用將數據複製到應用程序緩衝區。 多路I/O複用本質上並非非阻塞的,對比阻塞I/O模型它並無什麼優點,事實上使用select須要兩個系統而不是當個調用,I/O複用其實稍有劣勢,它只是能處理更多的鏈接(等待多個I/O就緒) markdown

信號驅動式I/O模型

咱們首先開啓套接字的信號驅動I/O功能,經過sigaction系統調用安裝一個信號處理函數,系統調用當即返回,進程繼續工做,當數據包準備好時內核產生一個SIGIO信號通知,咱們經過recvfrom調用讀取數據報。信號驅動式I/O模型的優勢是咱們在數據報到達期間進程不會被阻塞,咱們只要等待信號處理函數的通知便可 網絡

異步I/O模型

告知內核啓動某個操做(包括將數據從內核複製到本身的緩衝區)以後通知咱們。信號驅動模型是內核通知咱們什麼時候啓動一個I/O操做,而異步I/O模型是由內核通知咱們I/O什麼時候完成 併發

同步I/O和異步I/O對比

同步I/O操做:致使請求進程阻塞,直到I/O操做完成異步

異步I/O操做:不致使請求進程阻塞 socket

綜上 阻塞式I/O模型、非阻塞式I/O模型、I/O複用模型和信號驅動模型都是同步I/O模型,他們真正的I/O操做將進程阻塞,只有異步I/O模型是異步I/O操做

Java I/O 模型

Java I/O歷史

在JDK 1.4以前,基於Java的全部Socket通訊都使用了同步阻塞模式(Blocking I/O),這種一請求一應答的通訊模型簡化了上層開發,但性能可靠性存在巨大瓶頸,對高併發和低時延支持很差

在JDK 1.4以後,提供了新的NIO(New I/O)類庫,Java也能夠支持非阻塞I/O了,新增了java.nio包,提供了不少異步I/O開發的API和類庫。

JDK 1.7發佈後,將原來的NIO類庫進行了升級,提供了AIO功能,支持基於文件的異步I/O操做和針對套接字的異步I/O操做等功能

BIO 編程

使用BIO通訊模型的服務端,一般經過一個獨立的Acceptor線程負責監聽客戶端的鏈接,監聽到客戶端鏈接請求後爲每個客戶端建立一個新的線程鏈路進行處理,處理完成經過輸出流回應客戶端,線程消耗,這就是典型一對一答模型,下面咱們經過代碼對BIO模式進行具體分析,咱們實現客戶端發送消息服務端將消息回傳咱們的功能。

服務端:

int port = 3000;
    try(ServerSocket serverSocket = new ServerSocket(port)) {
        Socket socket = null;
        while (true) {
            //主程序阻塞在accept操做上
            socket = serverSocket.accept();
            new Thread(new BioExampleServerHandle(socket)).start();
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
複製代碼
private Socket socket;
    public BioExampleServerHandle(Socket socket) {
        this.socket = socket;
    }
    @Override
    public void run() {
        try(BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            PrintWriter writer = new PrintWriter(socket.getOutputStream(), true)) {
            String message = reader.readLine();
            System.out.println("收到客戶端消息:" + message);
            writer.println("answer: " + message);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
複製代碼

客戶端:

String host = "127.0.0.1";
    int port = 3000;
    try(Socket socket = new Socket(host, port);
        BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
        PrintWriter writer = new PrintWriter(socket.getOutputStream(), true)) {
        Scanner input = new Scanner(System.in);
        System.out.println("輸入你想說的話:");
        String message = input.nextLine();
        writer.println(message);
        String answer = reader.readLine();
        System.out.println(answer);
    } catch (Exception e) {
        e.printStackTrace();
    }
複製代碼

運行結果以下:

客戶端:

服務端:
經過代碼咱們能夠發現BIO的主要問題在於, 每當一個鏈接接入時咱們都須要new一個線程進行處理,這顯然是不合適的,由於一個線程只能處理一個鏈接, 若是在高併發的狀況下,咱們的程序確定沒法知足性能需求,同時咱們對線程建立也缺少管理。爲了改進這種模型咱們能夠經過消息隊列和線程池技術對他加以優化,咱們稱它爲僞異步I/O,代碼以下:

int port = 3000;
    ThreadPoolExecutor socketPool = null;
    try(ServerSocket serverSocket = new ServerSocket(port)) {
        Socket socket = null;
        int cpuNum = Runtime.getRuntime().availableProcessors();
        socketPool = new ThreadPoolExecutor(cpuNum, cpuNum * 2, 1000,
                TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1000));
        while (true) {
            socket = serverSocket.accept();
            socketPool.submit(new BioExampleServerHandle(socket));
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        socketPool.shutdown();
    }
複製代碼

能夠看到每當有新鏈接接入,咱們都將他投遞給線程池進行處理,因爲咱們設置了線程池大小和阻塞隊列大小,所以在併發狀況下都不會致使服務崩潰,可是若是併發數大於阻塞隊列大小,或服務端處理鏈接緩慢時,阻塞隊列沒法繼續處理,會致使客戶端鏈接超時,影響用戶體驗。

NIO 編程

NIO 彌補了同步阻塞I/O的不足,它提供了高速、面向塊的I/O,咱們對一些概念介紹一下:

Buffer: Buffer用於和NIO通道進行交互。數據從通道讀入緩衝區,從緩衝區寫入到通道中,它的主要做用就是和Channel進行交互。

Channel: Channel是一個通道,能夠經過它讀取和寫入數據,通道是雙向的,通道能夠用於讀、寫或者同時讀寫。

Selector: Selector會不斷的輪詢註冊在它上面的Channe,若是Channel上面有新的鏈接讀寫事件的時候就會被輪詢出來,一個Selector能夠註冊多個Channel,只須要一個線程負責Selector輪詢,就能夠支持成千上萬的鏈接,能夠說爲高併發服務器的開發提供了很好的支撐

咱們經過實際代碼演示NIO的使用:

服務端代碼:

int port = 3000;
    ServerSocketChannel socketChannel = null;
    Selector selector = null;
    try {
        selector = Selector.open();
        socketChannel = ServerSocketChannel.open();
        //設置鏈接模式爲非阻塞模式
        socketChannel.configureBlocking(false);
        socketChannel.socket().bind(new InetSocketAddress(port));
        //在selector上註冊通道,監聽鏈接事件
        socketChannel.register(selector, SelectionKey.OP_ACCEPT);
        while (true) {
            //設置selector 每隔一秒掃描全部channel
            selector.select(1000);
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterable = selectionKeys.iterator();
            SelectionKey key = null;
            while (iterable.hasNext()) {
                key = iterable.next();
                //對key進行處理
                try {
                    handlerKey(key, selector);
                } catch (Exception e) {
                    if (null != key) {
                        key.cancel();
                        if (null != key.channel()) {
                            key.channel().close();
                        }
                    }
                }
            }
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        try {
            if (null != selector) {
                selector.close();
            }
            if (null != socketChannel) {
                socketChannel.close();
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
複製代碼

handlerKey代碼以下:

private void handlerKey(SelectionKey key, Selector selector) throws IOException {
       if (key.isValid()) {
           //判斷是不是鏈接請求,對全部鏈接請求進行處理
           if (key.isAcceptable()) {
               ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
               SocketChannel channel = serverSocketChannel.accept();
               channel.configureBlocking(false);
               //在selector上註冊通道,監聽讀事件
               channel.register(selector, SelectionKey.OP_READ);
           } else if (key.isReadable()) {
               SocketChannel channel = (SocketChannel) key.channel();
               //分配一個1024字節的緩衝區
               ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
               int readBytes = channel.read(byteBuffer);
               if (readBytes > 0) {
                   //從寫模式切換到讀模式
                   byteBuffer.flip();
                   byte[] bytes = new byte[byteBuffer.remaining()];
                   byteBuffer.get(bytes);
                   String message  = new String(bytes, "UTF-8");
                   System.out.println("收到客戶端消息: " + message);
                   //回覆客戶端
                   message = "answer: " + message;
                   byte[] responseByte = message.getBytes();
                   ByteBuffer writeBuffer = ByteBuffer.allocate(responseByte.length);
                   writeBuffer.put(responseByte);
                   writeBuffer.flip();
                   channel.write(writeBuffer);
               }
           }
       }
   }
複製代碼

客戶端代碼:

int port = 3000;
   String host = "127.0.0.1";
   SocketChannel channel = null;
   Selector selector = null;
   try {
       selector = Selector.open();
       channel = SocketChannel.open();
       channel.configureBlocking(false);
       if (channel.connect(new InetSocketAddress(host, port))) {
           channel.register(selector, SelectionKey.OP_READ);
           write(channel);
       } else {
           channel.register(selector, SelectionKey.OP_CONNECT);
       }
       while (true) {
           selector.select(1000);
           Set<SelectionKey> selectionKeys = selector.selectedKeys();
           Iterator<SelectionKey> iterator = selectionKeys.iterator();
           SelectionKey key = null;
           while (iterator.hasNext()) {
               try {
                   key = iterator.next();
                   handle(key, selector);
               } catch (Exception e) {
                   e.printStackTrace();
                   if (null != key.channel()) {
                       key.channel().close();
                   }
                   if (null != key) {
                       key.cancel();
                   }
               }
           }
       }
   } catch (Exception e) {
       e.printStackTrace();
   } finally {
       try {
           if (null != channel) {
               channel.close();
           }
           if (null != selector) {
               selector.close();
           }
       } catch (Exception e) {
           throw new RuntimeException(e);
       }
   }
複製代碼

write 方法:

private void write(SocketChannel channel) throws IOException {
       Scanner in = new Scanner(System.in);
       System.out.println("輸入你想說的話:");
       String message = in.next();
       byte[] bytes = message.getBytes();
       ByteBuffer byteBuffer = ByteBuffer.allocate(bytes.length);
       byteBuffer.put(bytes);
       byteBuffer.flip();
       channel.write(byteBuffer);
   }
複製代碼

handle 方法:

private void handle(SelectionKey key, Selector selector) throws IOException {
       if (key.isValid()) {
           SocketChannel channel = (SocketChannel) key.channel();
           if (key.isConnectable()) {
               if (channel.finishConnect()) {
                   channel.register(selector, SelectionKey.OP_READ);
                   write(channel);
               }
           } else if (key.isReadable()) {
               ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
               int readBytes = channel.read(byteBuffer);
               if (readBytes > 0) {
                   byteBuffer.flip();
                   byte[] bytes = new byte[byteBuffer.remaining()];
                   byteBuffer.get(bytes);
                   String message = new String(bytes, "UTF-8");
                   System.out.println(message);
               } else if (readBytes < 0) {
                   key.cancel();
                   channel.close();
               }
           }
       }
   }
複製代碼

經過代碼咱們發現NIO比BIO複雜太多,這個代碼量也是刷刷的增加啊,雖然複雜可是NIO的優勢也值得咱們去嘗試,比起BIO客戶端鏈接操做是異步的,咱們能夠註冊OP_CONNECT事件等待結果而不用像那樣被同步阻塞Channel的讀寫操做都是異步的,沒有等待數據它不會等待直接返回,比起BIO咱們不須要頻繁的建立線程來處理客戶端鏈接,咱們經過一個Selector處理多個客戶端鏈接,並且性能也能夠獲得保障,適合作高性能服務器開發

AIO 編程

NIO2.0 引入了異步通道的概念,提供了異步文件通道和異步套接字通道的實現,咱們能夠經過Future類來表示異步操做結果,也能夠在執行異步操做的時候傳入一個Channels,實現CompletionHandler接口爲操做回調。示例代碼以下

服務端:

int port = 3000;
    AsynchronousServerSocketChannel socketChannel = null;
    try {
        socketChannel = AsynchronousServerSocketChannel.open();
        socketChannel.bind(new InetSocketAddress(port));
        //接收客戶端鏈接,傳入AcceptCompletionHandler做爲回調來接收鏈接消息
        socketChannel.accept(socketChannel, new AcceptCompletionHandler());
        Thread.currentThread().join();
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        try {
            if (null != socketChannel) {
                socketChannel.close();
            }
        } catch (Exception e1) {
            throw new RuntimeException(e1);
        }
    }
複製代碼

AcceptCompletionHandler 類:

public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, AsynchronousServerSocketChannel> {
    @Override
    public void completed(AsynchronousSocketChannel result, AsynchronousServerSocketChannel attachment) {
        //繼續接受其餘客戶端的鏈接請求,造成一個循環
        attachment.accept(attachment, this);
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        //調用read操做進行異步讀取操做,傳入ReadCompletionHandler做爲回調
        result.read(byteBuffer, byteBuffer, new ReadCompletionHandler(result));
    }
    @Override
    public void failed(Throwable exc, AsynchronousServerSocketChannel attachment) {
        //異常失敗處理在這裏
    }
}
複製代碼

ReadCompletionHandler 類

public class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {
    private AsynchronousSocketChannel channel;
    public ReadCompletionHandler(AsynchronousSocketChannel channel) {
        this.channel = channel;
    }
    @Override
    public void completed(Integer result, ByteBuffer byteBuffer) {
        try {
            byteBuffer.flip();
            byte[] bytes = new byte[byteBuffer.remaining()];
            byteBuffer.get(bytes);
            String message = new String(bytes, "UTF-8");
            System.out.println("收到客戶端消息:: " + message);
            write(message);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }
    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {
        try {
            channel.close();
        } catch (Exception e) {
            throw  new RuntimeException(e);
        }
    }
    private void write(String message) {
        message = "answer: " + message;
        byte[] bytes = message.getBytes();
        ByteBuffer byteBuffer = ByteBuffer.allocate(bytes.length);
        byteBuffer.put(bytes);
        byteBuffer.flip();
        channel.write(byteBuffer, byteBuffer, new WriteCompletionHandler(channel));
    }
}
複製代碼

客戶端:

int port = 3000;
    String host = "127.0.0.1";
    AsynchronousSocketChannel channel = null;
    try {
        channel = AsynchronousSocketChannel.open();
        channel.connect(new InetSocketAddress(host, port), channel, new AioClientHandler());
        Thread.currentThread().join();
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        try {
            if (null != channel) {
                channel.close();
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
複製代碼

AioClientHandler 類(因爲客戶端比較簡單我這裏使用了嵌套類部類):

public class AioClientHandler implements CompletionHandler<Void, AsynchronousSocketChannel> {
    @Override
    public void completed(Void result, AsynchronousSocketChannel channel) {
        Scanner in = new Scanner(System.in);
        System.out.println("輸入你想說的話:");
        String message = in.next();
        byte[] bytes = message.getBytes();
        ByteBuffer byteBuffer = ByteBuffer.allocate(bytes.length);
        byteBuffer.put(bytes);
        byteBuffer.flip();
        channel.write(byteBuffer, byteBuffer, new CompletionHandler<Integer, ByteBuffer>() {
            @Override
            public void completed(Integer result, ByteBuffer buffer) {
                //判斷是否寫完若是沒有繼續寫
                if (buffer.hasRemaining()) {
                    channel.write(buffer, buffer, this);
                } else {
                    ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                    channel.read(readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>() {
                        @Override
                        public void completed(Integer result, ByteBuffer attachment) {
                            try {
                                attachment.flip();
                                byte[] bytes1 = new byte[attachment.remaining()];
                                attachment.get(bytes1);
                                String message = new String(bytes1, "UTF-8");
                                System.out.println(message);
                                System.exit(1);
                            } catch (UnsupportedEncodingException e) {
                                e.printStackTrace();
                            }
                        }
                        @Override
                        public void failed(Throwable exc, ByteBuffer attachment) {
                            try {
                                channel.close();
                            } catch (Exception e) {
                                throw new RuntimeException(e);
                            }
                        }
                    });
                }
            }
            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                try {
                    channel.close();
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        });
    }
    @Override
    public void failed(Throwable exc, AsynchronousSocketChannel attachment) {
    }
複製代碼

經過對比代碼咱們發現AIO比BIO簡單,這是由於咱們不須要建立一個獨立的I/O線程來來處理讀寫操做, AsynchronousSocketChannel、AsynchronousServerSocketChannel由JDK底層線程池負責回調驅動讀寫操做。

對比

同步阻塞I/O(BIO) 僞異步I/O 非阻塞I/O(NIO) 異步I/O(AIO)
是否阻塞
是否同步 否(異步)
程序員友好程度 簡單 簡單 很是難 比較難
可靠性 很是差
吞吐量

總結

經過學習Lunix底層I/O模型和JavaI/O模型咱們發現上層只是對底層的抽象和封裝,BIO實際上是對阻塞I/O模型的實現,NIO是對I/O複用模型的實現,AIO是對信號驅動I/O的實現,理解了底層I/O模型,在實際開發中應該能夠很自如。若是你以爲不錯的話就點個贊吧,若是有bug也您請批評指正,您的讚揚和批評是進步路上的好夥伴。

相關文章
相關標籤/搜索