今天千鋒扣丁學堂Java培訓老師給你們分享一篇關於探索Java開發I/O模型演進的詳細介紹,什麼是同步?什麼是異步?阻塞和非阻塞又有什麼區別?本文先從Unix的I/O模型講起,介紹了5種常見的I/O模型。然後再引出Java的I/O模型的演進過程,並用實例說明如何選擇合適的JavaI/O模型來提升系統的併發量和可用性。java
同步和異步,描述的是用戶線程與內核的交互方式:git
同步是指用戶線程發起I/O請求後須要等待或者輪詢內核I/O操做完成後才能繼續執行;github
異步是指用戶線程發起I/O請求後仍繼續執行,當內核I/O操做完成後會通知用戶線程,或者調用用戶線程註冊的回調函數。設計模式
阻塞和非阻塞,描述的是用戶線程調用內核I/O操做的方式:安全
阻塞是指I/O操做須要完全完成後才返回到用戶空間;服務器
非阻塞是指I/O操做被調用後當即返回給用戶一個狀態值,無需等到I/O操做完全完成。微信
一個I/O操做其實分紅了兩個步驟:發起I/O請求和實際的I/O操做。阻塞I/O和非阻塞I/O的區別在於第一步,發起I/O請求是否會被阻塞,若是阻塞直到完成那麼就是傳統的阻塞I/O,若是不阻塞,那麼就是非阻塞I/O。同步I/O和異步I/O的區別就在於第二個步驟是否阻塞,若是實際的I/O讀寫阻塞請求進程,那麼就是同步I/O。網絡
UnixI/O模型多線程
Unix下共有五種I/O模型:併發
阻塞I/O
非阻塞I/O
I/O複用(select和poll)
信號驅動I/O(SIGIO)
異步I/O(POSIX的aio_系列函數)
阻塞I/O
請求沒法當即完成則保持阻塞。
階段1:等待數據就緒。網絡I/O的狀況就是等待遠端數據陸續抵達;磁盤I/O的狀況就是等待磁盤數據從磁盤上讀取到內核態內存中。
階段2:數據從內核拷貝到進程。出於系統安全,用戶態的程序沒有權限直接讀取內核態內存,所以內核負責把內核態內存中的數據拷貝一份到用戶態內存中。
非阻塞I/O
socket設置爲NONBLOCK(非阻塞)就是告訴內核,當所請求的I/O操做沒法完成時,不要將進程睡眠,而是返回一個錯誤碼(EWOULDBLOCK),這樣請求就不會阻塞
I/O操做函數將不斷的測試數據是否已經準備好,若是沒有準備好,繼續測試,直到數據準備好爲止。整個I/O請求的過程當中,雖然用戶線程每次發起I/O請求後能夠當即返回,可是爲了等到數據,仍須要不斷地輪詢、重複請求,消耗了大量的CPU的資源
數據準備好了,從內核拷貝到用戶空間。
通常不多直接使用這種模型,而是在其餘I/O模型中使用非阻塞I/O這一特性。這種方式對單個I/O請求意義不大,但給I/O多路複用鋪平了道路.
I/O複用(異步阻塞I/O)
I/O多路複用會用到select或者poll函數,這兩個函數也會使進程阻塞,可是和阻塞I/O所不一樣的的,這兩個函數能夠同時阻塞多個I/O操做。並且能夠同時對多個讀操做,多個寫操做的I/O函數進行檢測,直到有數據可讀或可寫時,才真正調用I/O操做函數。
從流程上來看,使用select函數進行I/O請求和同步阻塞模型沒有太大的區別,甚至還多了添加監視socket,以及調用select函數的額外操做,效率更差。可是,使用select之後最大的優點是用戶能夠在一個線程內同時處理多個socket的I/O請求。用戶能夠註冊多個socket,而後不斷地調用select讀取被激活的socket,便可達到在同一個線程內同時處理多個I/O請求的目的。而在同步阻塞模型中,必須經過多線程的方式才能達到這個目的。
I/O多路複用模型使用了Reactor設計模式實現了這一機制。
調用select/poll該方法由一個用戶態線程負責輪詢多個socket,直到某個階段1的數據就緒,再通知實際的用戶線程執行階段2的拷貝。經過一個專職的用戶態線程執行非阻塞I/O輪詢,模擬實現了階段一的異步化
信號驅動I/O(SIGIO)
首先咱們容許socket進行信號驅動I/O,並安裝一個信號處理函數,進程繼續運行並不阻塞。當數據準備好時,進程會收到一個SIGIO信號,能夠在信號處理函數中調用I/O操做函數處理數據。
異步I/O
調用aio_read函數,告訴內核描述字,緩衝區指針,緩衝區大小,文件偏移以及通知的方式,而後當即返回。當內核將數據拷貝到緩衝區後,再通知應用程序。
異步I/O模型使用了Proactor設計模式實現了這一機制。
告知內核,當整個過程(包括階段1和階段2)所有完成時,通知應用程序來讀數據.
幾種I/O模型的比較
前四種模型的區別是階段1不相同,階段2基本相同,都是將數據從內核拷貝到調用者的緩衝區。而異步I/O的兩個階段都不一樣於前四個模型。
同步I/O操做引發請求進程阻塞,直到I/O操做完成。異步I/O操做不引發請求進程阻塞。
常見JavaI/O模型
在瞭解了UNIX的I/O模型以後,其實Java的I/O模型也是相似。
「阻塞I/O」模式
在上一節Socket章節中的EchoServer就是一個簡單的阻塞I/O例子,服務器啓動後,等待客戶端鏈接。在客戶端鏈接服務器後,服務器就阻塞讀寫取數據流。
EchoServer代碼:
public class EchoServer {
public static int DEFAULT_PORT = 7;
public static void main(String[] args) throws IOException {
int port;
try {
port = Integer.parseInt(args[0]);
} catch (RuntimeException ex) {
port = DEFAULT_PORT;
}
try (
ServerSocket serverSocket =
new ServerSocket(port);
Socket clientSocket = serverSocket.accept();
PrintWriter out =
new PrintWriter(clientSocket.getOutputStream(), true);
BufferedReader in = new BufferedReader(
new InputStreamReader(clientSocket.getInputStream()));
) {
String inputLine;
while ((inputLine = in.readLine()) != null) {
out.println(inputLine);
}
} catch (IOException e) {
System.out.println("Exception caught when trying to listen on port "
System.out.println(e.getMessage());
}
}
}
改進爲「阻塞I/O+多線程」模式
使用多線程來支持多個客戶端來訪問服務器。
主線程MultiThreadEchoServer.java
public class MultiThreadEchoServer {
public static int DEFAULT_PORT = 7;
public static void main(String[] args) throws IOException {
int port;
try {
port = Integer.parseInt(args[0]);
} catch (RuntimeException ex) {
port = DEFAULT_PORT;
}
Socket clientSocket = null;
try (ServerSocket serverSocket = new ServerSocket(port);) {
while (true) {
clientSocket = serverSocket.accept();
// MultiThread
new Thread(new EchoServerHandler(clientSocket)).start();
}
} catch (IOException e) {
System.out.println(
"Exception caught when trying to listen on port " + port + " or listening for a connection");
System.out.println(e.getMessage());
}
}
}
處理器類EchoServerHandler.java
public class EchoServerHandler implements Runnable {
private Socket clientSocket;
public EchoServerHandler(Socket clientSocket) {
this.clientSocket = clientSocket;
}
@Override
public void run() {
try (PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));) {
String inputLine;
while ((inputLine = in.readLine()) != null) {
out.println(inputLine);
}
} catch (IOException e) {
System.out.println(e.getMessage());
}
}
}
存在問題:每次接收到新的鏈接都要新建一個線程,處理完成後銷燬線程,代價大。當有大量地短鏈接出現時,性能比較低。
改進爲「阻塞I/O+線程池」模式
針對上面多線程的模型中,出現的線程重複建立、銷燬帶來的開銷,能夠採用線程池來優化。每次接收到新鏈接後從池中取一個空閒線程進行處理,處理完成後再放回池中,重用線程避免了頻率地建立和銷燬線程帶來的開銷。
主線程ThreadPoolEchoServer.java
public class ThreadPoolEchoServer {
public static int DEFAULT_PORT = 7;
public static void main(String[] args) throws IOException {
int port;
try {
port = Integer.parseInt(args[0]);
} catch (RuntimeException ex) {
port = DEFAULT_PORT;
}
ExecutorService threadPool = Executors.newFixedThreadPool(5);
Socket clientSocket = null;
try (ServerSocket serverSocket = new ServerSocket(port);) {
while (true) {
clientSocket = serverSocket.accept();
// Thread Pool
threadPool.submit(new Thread(new EchoServerHandler(clientSocket)));
}
} catch (IOException e) {
System.out.println(
"Exception caught when trying to listen on port " + port + " or listening for a connection");
System.out.println(e.getMessage());
}
}
}
存在問題:在大量短鏈接的場景中性能會有提高,由於不用每次都建立和銷燬線程,而是重用鏈接池中的線程。但在大量長鏈接的場景中,由於線程被鏈接長期佔用,不須要頻繁地建立和銷燬線程,於是沒有什麼優點。
雖然這種方法能夠適用於小到中度規模的客戶端的併發數,若是鏈接數超過100,000或更多,那麼性能將很不理想。
改進爲「非阻塞I/O」模式
「阻塞I/O+線程池」網絡模型雖然比」阻塞I/O+多線程」網絡模型在性能方面有提高,但這兩種模型都存在一個共同的問題:讀和寫操做都是同步阻塞的,面對大併發(持續大量鏈接同時請求)的場景,須要消耗大量的線程來維持鏈接。CPU在大量的線程之間頻繁切換,性能損耗很大。一旦單機的鏈接超過1萬,甚至達到幾萬的時候,服務器的性能會急劇降低。
而NIO的Selector卻很好地解決了這個問題,用主線程(一個線程或者是CPU個數的線程)保持住全部的鏈接,管理和讀取客戶端鏈接的數據,將讀取的數據交給後面的線程池處理,線程池處理完業務邏輯後,將結果交給主線程發送響應給客戶端,少許的線程就能夠處理大量鏈接的請求。
JavaNIO由如下幾個核心部分組成:
Channel
Buffer
Selector
要使用Selector,得向Selector註冊Channel,而後調用它的select()方法。這個方法會一直阻塞到某個註冊的通道有事件就緒。一旦這個方法返回,線程就能夠處理這些事件,事件的例子有如新鏈接進來,數據接收等。
主線程NonBlokingEchoServer.java
public class NonBlokingEchoServer {
public static int DEFAULT_PORT = 7;
public static void main(String[] args) throws IOException {
int port;
try {
port = Integer.parseInt(args[0]);
} catch (RuntimeException ex) {
port = DEFAULT_PORT;
}
System.out.println("Listening for connections on port " + port);
ServerSocketChannel serverChannel;
Selector selector;
try {
serverChannel = ServerSocketChannel.open();
InetSocketAddress address = new InetSocketAddress(port);
serverChannel.bind(address);
serverChannel.configureBlocking(false);
selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException ex) {
ex.printStackTrace();
return;
}
while (true) {
try {
selector.select();
} catch (IOException ex) {
ex.printStackTrace();
break;
}
Set<SelectionKey> readyKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = readyKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
try {
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel client = server.accept();
System.out.println("Accepted connection from " + client);
client.configureBlocking(false);
SelectionKey clientKey = client.register(selector,
SelectionKey.OP_WRITE | SelectionKey.OP_READ);
ByteBuffer buffer = ByteBuffer.allocate(100);
clientKey.attach(buffer);
}
if (key.isReadable()) {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer output = (ByteBuffer) key.attachment();
client.read(output);
}
if (key.isWritable()) {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer output = (ByteBuffer) key.attachment();
output.flip();
client.write(output);
output.compact();
}
} catch (IOException ex) {
key.cancel();
try {
key.channel().close();
} catch (IOException cex) {
}
}
}
}
}
}
改進爲「異步I/O」模式
JavaSE7版本以後,引入了異步I/O(NIO.2)的支持,爲構建高性能的網絡應用提供了一個利器。
主線程AsyncEchoServer.java
public class AsyncEchoServer {
public static int DEFAULT_PORT = 7;
public static void main(String[] args) throws IOException {
int port;
try {
port = Integer.parseInt(args[0]);
} catch (RuntimeException ex) {
port = DEFAULT_PORT;
}
ExecutorService taskExecutor = Executors.newCachedThreadPool(Executors.defaultThreadFactory());
// create asynchronous server socket channel bound to the default group
try (AsynchronousServerSocketChannel asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open()) {
if (asynchronousServerSocketChannel.isOpen()) {
// set some options
asynchronousServerSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 4 * 1024);
asynchronousServerSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
// bind the server socket channel to local address
asynchronousServerSocketChannel.bind(new InetSocketAddress(port));
// display a waiting message while ... waiting clients
System.out.println("Waiting for connections ...");
while (true) {
Future<AsynchronousSocketChannel> asynchronousSocketChannelFuture = asynchronousServerSocketChannel
.accept();
try {
final AsynchronousSocketChannel asynchronousSocketChannel = asynchronousSocketChannelFuture
.get();
Callable<String> worker = new Callable<String>() {
@Override
public String call() throws Exception {
String host = asynchronousSocketChannel.getRemoteAddress().toString();
System.out.println("Incoming connection from: " + host);
final ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
// transmitting data
while (asynchronousSocketChannel.read(buffer).get() != -1) {
buffer.flip();
asynchronousSocketChannel.write(buffer).get();
if (buffer.hasRemaining()) {
buffer.compact();
} else {
buffer.clear();
}
}
asynchronousSocketChannel.close();
System.out.println(host + " was successfully served!");
return host;
}
};
taskExecutor.submit(worker);
} catch (InterruptedException | ExecutionException ex) {
System.err.println(ex);
System.err.println("n Server is shutting down ...");
// this will make the executor accept no new threads
// and finish all existing threads in the queue
taskExecutor.shutdown();
// wait until all threads are finished
while (!taskExecutor.isTerminated()) {
}
break;
}
}
} else {
System.out.println("The asynchronous server-socket channel cannot be opened!");
}
} catch (IOException ex) {
System.err.println(ex);
}
}
}
本章例子的源碼,能夠在https://github.com/waylau/ess...。
以上就是關於千鋒扣丁學堂Java培訓之探索IO模型總結整理的所有內容,但願對你們的學習有所幫助,想要了解更多關於Java開發方面內容的小夥伴,請關注扣丁學堂Java培訓官網、微信等平臺,扣丁學堂IT職業在線學習教育有專業的Java講師爲您指導,此外扣丁學堂老師精心推出的Java視頻教程定能讓你快速掌握Java從入門到精通開發實戰技能。