Java I/O 模型的演進

原文同步至 http://waylau.com/java-io-model-evolution/ java


什麼是同步?什麼是異步?阻塞和非阻塞又有什麼區別?本文先從 Unix 的 I/O 模型講起,介紹了5種常見的 I/O 模型。然後再引出 Java 的 I/O 模型的演進過程,並用實例說明如何選擇合適的 Java I/O 模型來提升系統的併發量和可用性。 設計模式

因爲,Java 的 I/O 依賴於操做系統的實現,因此先了解 Unix 的 I/O 模型有助於理解 Java 的 I/O。 安全

相關概念

同步和異步

描述的是用戶線程與內核的交互方式: 服務器

  • 同步是指用戶線程發起 I/O 請求後須要等待或者輪詢內核 I/O 操做完成後才能繼續執行;
  • 異步是指用戶線程發起 I/O 請求後仍繼續執行,當內核 I/O 操做完成後會通知用戶線程,或者調用用戶線程註冊的回調函數。

阻塞和非阻塞

描述的是用戶線程調用內核 I/O 操做的方式: 網絡

  • 阻塞是指 I/O 操做須要完全完成後才返回到用戶空間;
  • 非阻塞是指 I/O 操做被調用後當即返回給用戶一個狀態值,無需等到 I/O 操做完全完成。

一個 I/O 操做其實分紅了兩個步驟:發起 I/O 請求和實際的 I/O 操做。 阻塞 I/O 和非阻塞 I/O 的區別在於第一步,發起 I/O 請求是否會被阻塞,若是阻塞直到完成那麼就是傳統的阻塞 I/O ,若是不阻塞,那麼就是非阻塞 I/O 。 同步 I/O 和異步 I/O 的區別就在於第二個步驟是否阻塞,若是實際的 I/O 讀寫阻塞請求進程,那麼就是同步 I/O 。 多線程

Unix I/O 模型

Unix 下共有五種 I/O 模型: 併發

  1. 阻塞 I/O
  2. 非阻塞 I/O
  3. I/O 多路複用(select 和 poll)
  4. 信號驅動 I/O(SIGIO)
  5. 異步 I/O(Posix.1 的 aio_ 系列函數)

阻塞 I/O

請求沒法當即完成則保持阻塞。 異步

  • 階段1:等待數據就緒。網絡 I/O 的狀況就是等待遠端數據陸續抵達;磁盤I/O的狀況就是等待磁盤數據從磁盤上讀取到內核態內存中。
  • 階段2:數據拷貝。出於系統安全,用戶態的程序沒有權限直接讀取內核態內存,所以內核負責把內核態內存中的數據拷貝一份到用戶態內存中。

非阻塞 I/O

  • socket 設置爲 NONBLOCK(非阻塞)就是告訴內核,當所請求的 I/O 操做沒法完成時,不要將進程睡眠,而是返回一個錯誤碼(EWOULDBLOCK) ,這樣請求就不會阻塞
  • I/O 操做函數將不斷的測試數據是否已經準備好,若是沒有準備好,繼續測試,直到數據準備好爲止。整個 I/O 請求的過程當中,雖然用戶線程每次發起 I/O 請求後能夠當即返回,可是爲了等到數據,仍須要不斷地輪詢、重複請求,消耗了大量的 CPU 的資源
  • 數據準備好了,從內核拷貝到用戶空間。

通常不多直接使用這種模型,而是在其餘 I/O 模型中使用非阻塞 I/O 這一特性。這種方式對單個 I/O 請求意義不大,但給 I/O 多路複用鋪平了道路. socket

I/O 多路複用(異步阻塞 I/O)

I/O 多路複用會用到 select 或者 poll 函數,這兩個函數也會使進程阻塞,可是和阻塞 I/O 所不一樣的的,這兩個函數能夠同時阻塞多個 I/O 操做。並且能夠同時對多個讀操做,多個寫操做的 I/O 函數進行檢測,直到有數據可讀或可寫時,才真正調用 I/O 操做函數。 async

從流程上來看,使用 select 函數進行 I/O 請求和同步阻塞模型沒有太大的區別,甚至還多了添加監視 socket,以及調用 select 函數的額外操做,效率更差。可是,使用 select 之後最大的優點是用戶能夠在一個線程內同時處理多個 socket 的 I/O 請求。用戶能夠註冊多個 socket,而後不斷地調用 select 讀取被激活的 socket,便可達到在同一個線程內同時處理多個 I/O 請求的目的。而在同步阻塞模型中,必須經過多線程的方式才能達到這個目的。

I/O 多路複用模型使用了 Reactor 設計模式實現了這一機制。

調用 select / poll 該方法由一個用戶態線程負責輪詢多個 socket,直到某個階段1的數據就緒,再通知實際的用戶線程執行階段2的拷貝。 經過一個專職的用戶態線程執行非阻塞I/O輪詢,模擬實現了階段一的異步化

信號驅動 I/O(SIGIO)

首先咱們容許 socket 進行信號驅動 I/O,並安裝一個信號處理函數,進程繼續運行並不阻塞。當數據準備好時,進程會收到一個 SIGIO 信號,能夠在信號處理函數中調用 I/O 操做函數處理數據。

異步 I/O

調用 aio_read 函數,告訴內核描述字,緩衝區指針,緩衝區大小,文件偏移以及通知的方式,而後當即返回。當內核將數據拷貝到緩衝區後,再通知應用程序。

異步 I/O 模型使用了 Proactor 設計模式實現了這一機制。

告知內核,當整個過程(包括階段1和階段2)所有完成時,通知應用程序來讀數據.

幾種 I/O 模型的比較

前四種模型的區別是階段1不相同,階段2基本相同,都是將數據從內核拷貝到調用者的緩衝區。而異步 I/O 的兩個階段都不一樣於前四個模型。

同步 I/O 操做引發請求進程阻塞,直到 I/O 操做完成。異步 I/O 操做不引發請求進程阻塞。

常見 Java I/O 模型

在瞭解了 UNIX 的 I/O 模型以後,其實 Java 的 I/O 模型也是相似。

「阻塞I/O」模式

在上一節 Socket 章節中的 EchoServer 就是一個簡單的阻塞 I/O 例子,服務器啓動後,等待客戶端鏈接。在客戶端鏈接服務器後,服務器就阻塞讀寫取數據流。

EchoServer 代碼:

public class EchoServer { public static int DEFAULT_PORT = 7; public static void main(String[] args) throws IOException { int port; try {
            port = Integer.parseInt(args[0]);
        } catch (RuntimeException ex) {
            port = DEFAULT_PORT;
        } try (
            ServerSocket serverSocket = new ServerSocket(port);
            Socket clientSocket = serverSocket.accept();     
            PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);                   
            BufferedReader in = new BufferedReader( new InputStreamReader(clientSocket.getInputStream()));
        ) {
            String inputLine; while ((inputLine = in.readLine()) != null) {
                out.println(inputLine);
            }
        } catch (IOException e) {
            System.out.println("Exception caught when trying to listen on port " + port + " or listening for a connection");
            System.out.println(e.getMessage());
        }
    }
}

改進爲「阻塞I/O+多線程」模式

使用多線程來支持多個客戶端來訪問服務器。

主線程 MultiThreadEchoServer.java

public class MultiThreadEchoServer { public static int DEFAULT_PORT = 7; public static void main(String[] args) throws IOException { int port; try {
            port = Integer.parseInt(args[0]);
        } catch (RuntimeException ex) {
            port = DEFAULT_PORT;
        }
        Socket clientSocket = null; try (ServerSocket serverSocket = new ServerSocket(port);) { while (true) {
                clientSocket = serverSocket.accept(); // MultiThread new Thread(new EchoServerHandler(clientSocket)).start();
            }
        } catch (IOException e) {
            System.out.println( "Exception caught when trying to listen on port " + port + " or listening for a connection");
            System.out.println(e.getMessage());
        }
    }
}

處理器類 EchoServerHandler.java

public class EchoServerHandler implements Runnable { private Socket clientSocket; public EchoServerHandler(Socket clientSocket) { this.clientSocket = clientSocket;
    } @Override public void run() { try (PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
                BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));) {

            String inputLine; while ((inputLine = in.readLine()) != null) {
                out.println(inputLine);
            }
        } catch (IOException e) {
            System.out.println(e.getMessage());
        }
    }
}

存在問題:每次接收到新的鏈接都要新建一個線程,處理完成後銷燬線程,代價大。當有大量地短鏈接出現時,性能比較低。

改進爲「阻塞I/O+線程池」模式

針對上面多線程的模型中,出現的線程重複建立、銷燬帶來的開銷,能夠採用線程池來優化。每次接收到新鏈接後從池中取一個空閒線程進行處理,處理完成後再放回池中,重用線程避免了頻率地建立和銷燬線程帶來的開銷。

主線程 ThreadPoolEchoServer.java

public class ThreadPoolEchoServer { public static int DEFAULT_PORT = 7; public static void main(String[] args) throws IOException { int port; try {
            port = Integer.parseInt(args[0]);
        } catch (RuntimeException ex) {
            port = DEFAULT_PORT;
        }
        ExecutorService threadPool = Executors.newFixedThreadPool(5);
        Socket clientSocket = null; try (ServerSocket serverSocket = new ServerSocket(port);) { while (true) {
                clientSocket = serverSocket.accept(); // Thread Pool threadPool.submit(new Thread(new EchoServerHandler(clientSocket)));
            }
        } catch (IOException e) {
            System.out.println( "Exception caught when trying to listen on port " + port + " or listening for a connection");
            System.out.println(e.getMessage());
        }
    }
}

存在問題:在大量短鏈接的場景中性能會有提高,由於不用每次都建立和銷燬線程,而是重用鏈接池中的線程。但在大量長鏈接的場景中,由於線程被鏈接長期佔用,不須要頻繁地建立和銷燬線程,於是沒有什麼優點。

改進爲「非阻塞I/O」模式

"阻塞I/O+線程池"網絡模型雖然比"阻塞I/O+多線程"網絡模型在性能方面有提高,但這兩種模型都存在一個共同的問題:讀和寫操做都是同步阻塞的,面對大併發(持續大量鏈接同時請求)的場景,須要消耗大量的線程來維持鏈接。CPU 在大量的線程之間頻繁切換,性能損耗很大。一旦單機的鏈接超過1萬,甚至達到幾萬的時候,服務器的性能會急劇降低。

而 NIO 的 Selector 卻很好地解決了這個問題,用主線程(一個線程或者是 CPU 個數的線程)保持住全部的鏈接,管理和讀取客戶端鏈接的數據,將讀取的數據交給後面的線程池處理,線程池處理完業務邏輯後,將結果交給主線程發送響應給客戶端,少許的線程就能夠處理大量鏈接的請求。

Java NIO 由如下幾個核心部分組成:

  • Channel
  • Buffer
  • Selector

要使用 Selector,得向 Selector 註冊 Channel,而後調用它的 select()方法。這個方法會一直阻塞到某個註冊的通道有事件就緒。一旦這個方法返回,線程就能夠處理這些事件,事件的例子有如新鏈接進來,數據接收等。

主線程 NonBlokingEchoServer.java

public class NonBlokingEchoServer { public static int DEFAULT_PORT = 7; public static void main(String[] args) throws IOException { int port; try {
            port = Integer.parseInt(args[0]);
        } catch (RuntimeException ex) {
            port = DEFAULT_PORT;
        }
        System.out.println("Listening for connections on port " + port);

        ServerSocketChannel serverChannel;
        Selector selector; try {
            serverChannel = ServerSocketChannel.open();
            InetSocketAddress address = new InetSocketAddress(port);
            serverChannel.bind(address);
            serverChannel.configureBlocking(false);
            selector = Selector.open();
            serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        } catch (IOException ex) {
            ex.printStackTrace(); return;
        } while (true) { try {
                selector.select();
            } catch (IOException ex) {
                ex.printStackTrace(); break;
            }
            Set<SelectionKey> readyKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = readyKeys.iterator(); while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove(); try { if (key.isAcceptable()) {
                        ServerSocketChannel server = (ServerSocketChannel) key.channel();
                        SocketChannel client = server.accept();
                        System.out.println("Accepted connection from " + client);
                        client.configureBlocking(false);
                        SelectionKey clientKey = client.register(selector,
                                SelectionKey.OP_WRITE | SelectionKey.OP_READ);
                        ByteBuffer buffer = ByteBuffer.allocate(100);
                        clientKey.attach(buffer);
                    } if (key.isReadable()) {
                        SocketChannel client = (SocketChannel) key.channel();
                        ByteBuffer output = (ByteBuffer) key.attachment();
                        client.read(output);
                    } if (key.isWritable()) {
                        SocketChannel client = (SocketChannel) key.channel();
                        ByteBuffer output = (ByteBuffer) key.attachment();
                        output.flip();
                        client.write(output);

                        output.compact();
                    }
                } catch (IOException ex) {
                    key.cancel(); try {
                        key.channel().close();
                    } catch (IOException cex) {
                    }
                }
            }
        }

    }
}

改進爲「異步I/O」模式

Java SE 7 版本以後,引入了異步 I/O (NIO.2) 的支持,爲構建高性能的網絡應用提供了一個利器。

主線程 AsyncEchoServer.java

public class AsyncEchoServer { public static int DEFAULT_PORT = 7; public static void main(String[] args) throws IOException { int port; try {
            port = Integer.parseInt(args[0]);
        } catch (RuntimeException ex) {
            port = DEFAULT_PORT;
        }

        ExecutorService taskExecutor = Executors.newCachedThreadPool(Executors.defaultThreadFactory()); // create asynchronous server socket channel bound to the default group try (AsynchronousServerSocketChannel asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open()) { if (asynchronousServerSocketChannel.isOpen()) { // set some options asynchronousServerSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 4 * 1024);
                asynchronousServerSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); // bind the server socket channel to local address asynchronousServerSocketChannel.bind(new InetSocketAddress(port)); // display a waiting message while ... waiting clients System.out.println("Waiting for connections ..."); while (true) {
                    Future<AsynchronousSocketChannel> asynchronousSocketChannelFuture = asynchronousServerSocketChannel
                            .accept(); try { final AsynchronousSocketChannel asynchronousSocketChannel = asynchronousSocketChannelFuture
                                .get();
                        Callable<String> worker = new Callable<String>() { @Override public String call() throws Exception {
                                String host = asynchronousSocketChannel.getRemoteAddress().toString();
                                System.out.println("Incoming connection from: " + host); final ByteBuffer buffer = ByteBuffer.allocateDirect(1024); // transmitting data while (asynchronousSocketChannel.read(buffer).get() != -1) {
                                    buffer.flip();
                                    asynchronousSocketChannel.write(buffer).get(); if (buffer.hasRemaining()) {
                                        buffer.compact();
                                    } else {
                                        buffer.clear();
                                    }
                                }
                                asynchronousSocketChannel.close();
                                System.out.println(host + " was successfully served!"); return host;
                            }
                        };
                        taskExecutor.submit(worker);
                    } catch (InterruptedException | ExecutionException ex) {
                        System.err.println(ex);
                        System.err.println("\n Server is shutting down ..."); // this will make the executor accept no new threads // and finish all existing threads in the queue taskExecutor.shutdown(); // wait until all threads are finished while (!taskExecutor.isTerminated()) {
                        } break;
                    }
                }
            } else {
                System.out.println("The asynchronous server-socket channel cannot be opened!");
            }
        } catch (IOException ex) {
            System.err.println(ex);
        }
    }
}
相關文章
相關標籤/搜索