架構設計:系統間通訊(3)——IO通訊模型和JAVA實踐 上篇

二、傳統阻塞模式(BIO)

這個小節的介紹,在《架構設計:系統間通訊(1)——概述從「聊天」開始上篇》 這篇文章中已經說明了,這裏只是「接着講」,您能夠理解成「在概述的基礎上繼續深刻寫」。BIO就是:blocking IO。最容易理解、最容易實現的IO工做方式,應用程序向操做系統請求網絡IO操做,這時應用程序會一直等待;另外一方面,操做系統收到請求後,也會等待, 直到網絡上有數據傳到監聽端口;操做系統在收集數據後,會把數據發送給應用程序;最後應用程序受到數據,並解除等待狀態。以下圖所示: java

這裏寫圖片描述

(請您注意,上圖中交互的兩個元素是應用程序和它所使用的操做系統)就TCP協議來講,整個過程實際上分紅三個步驟:三次握手創建鏈接、傳輸數據(包括驗證和重發)、斷開鏈接。固然,斷開鏈接的過程並不在咱們討論的IO的主要過程當中。但是咱們討論IO模型,應該把創建鏈接和傳輸數據的者兩個過程分開討論linux

2-一、JAVA對阻塞模式的支持

JAVA對阻塞模式的支持,就是java.net包中的Socket套接字實現。這裏要說明一下,Socket套接字是TCP/UDP等傳輸層協議的實現。 例如客戶端使用TCP協議鏈接這臺服務器的時候,當TCP三次握手成功後,應用程序就會建立一個socket套接字對象(注意,這是尚未進行數據內容的 傳輸),當這個TCP鏈接出現數據傳輸時,socket套接字就會把數據傳輸的表現告訴程序員(例如read方法接觸阻塞狀態) 程序員

下面這段代碼是java對阻塞模式的支持: apache

package testBSocket; import java.io.InputStream; import java.io.OutputStream; import java.net.ServerSocket; import java.net.Socket; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.log4j.BasicConfigurator; public class SocketServer1 { static {
        BasicConfigurator.configure();
    } /** * 日誌 */ private static final Log LOGGER = LogFactory.getLog(SocketServer1.class); public static void main(String[] args) throws Exception{
        ServerSocket serverSocket = new ServerSocket(83); try { while(true) { //這裏JAVA經過JNI請求操做系統,並一直等待操做系統返回結果(或者出錯) Socket socket = serverSocket.accept(); //下面咱們收取信息(這裏仍是阻塞式的,一直等待,直到有數據能夠接受) InputStream in = socket.getInputStream();
                OutputStream out = socket.getOutputStream();
                Integer sourcePort = socket.getPort(); int maxLen = 2048; byte[] contextBytes = new byte[maxLen]; int realLen;
                StringBuffer message = new StringBuffer(); //read的時候,程序也會被阻塞,直到操做系統把網絡傳來的數據準備好。 while((realLen = in.read(contextBytes, 0, maxLen)) != -1) {
                    message.append(new String(contextBytes , 0 , realLen)); /* * 咱們假設讀取到「over」關鍵字, * 表示客戶端的全部信息在通過若干次傳送後,完成 * */ if(message.indexOf("over") != -1) { break;
                    }
                } //下面打印信息 SocketServer1.LOGGER.info("服務器收到來自於端口:" + sourcePort + "的信息:" + message); //下面開始發送信息 out.write("回發響應信息!".getBytes()); //關閉 out.close();
                in.close();
                socket.close();
            }
        } catch(Exception e) {
            SocketServer1.LOGGER.error(e.getMessage(), e);
        } finally { if(serverSocket != null) {
                serverSocket.close();
            }
        }
    }
}

上面的服務器端代碼能夠直接運行。代碼執行到serverSocket.accept()的位置就會等待,這個調用的含義是應用程序向操做系統請求 客戶端鏈接的接收,這是代碼會阻塞,而底層調用的位置在DualStackPlainSocketImpl這個類裏面(注意我使用的測試環境是 windows 8 ,因此是由這個類處理;若是您是在windows 7環境下進行測試,那麼處理類是TwoStacksPlainSocketImpl,這是Windows環境;若是您使用的測試環境是Linux,那麼視 Linux的內核版本而異,具體的處理類又是不同的)。 windows

這裏寫圖片描述

這裏寫圖片描述

2-二、存在的問題

很明顯,咱們在代碼裏面並無設置timeout屬性,因此運行的是「if」這段的代碼,很明顯在調用JNI後,下層也在等待有客戶端鏈接上來。這種調用方式固然有問題: 服務器

  • 同一時間,服務器只能接受來自於客戶端A的請求信息;雖然客戶端A和客戶端B的請求是同時進行的,但客戶端B發送的請求信息只能等到服務器接受完A的請求數據後,才能被接受。 網絡

  • 因爲服務器一次只能處理一個客戶端請求,當處理完成並返回後(或者異常時),才能進行第二次請求的處理。很顯然,這樣的處理方式在高併發的狀況下,是不能採用的。 多線程

  • 實際上以上的問題是能夠經過多線程來解決的,實際上就是當accept接收到一個客戶端的鏈接後,服務器端啓動一個新的線程,來讀寫客戶端的數據,並完成相應的業務處理。可是你沒法影響操做系統底層的「同步IO」機制。 架構

三、非阻塞模式

必定要注意:阻塞/非阻塞的描述是針對應用程序中的線程進行的,對於阻塞方式的一種改進是應用程序將其「一直等待」的狀態主動打開,以下圖所示: 併發

這裏寫圖片描述

這種模式下,應用程序的線程再也不一直等待操做系統的IO狀態,而是在等待一段時間後,就解除阻塞。若是沒有獲得想要的結果,則再次進行相同的操做。這樣的工做方式,暴增了應用程序的線程能夠不會一直阻塞,而是能夠進行一些其餘工做。

3-一、JAVA對非阻塞模式的支持

那麼JAVA中是否支持這種非阻塞IO的工做模式呢?咱們繼續分析DualStackPlainSocketImpl中的accept0實現:

這裏寫圖片描述

那麼timeout是在哪裏設置的呢?在ServerSocket中,調用了DualStackPlainSocketImpl的父類SocketImpl進行timeout的設置:

這裏寫圖片描述

ServerSocket中的setSoTimeout方法也有相應的註釋說明:

Enable/disable SO_TIMEOUT with the specified timeout, in milliseconds. With this option set to a non-zero timeout, a call to accept() for this ServerSocket will block for only this amount of time. If the timeout expires, a java.net.SocketTimeoutException is raised, though the ServerSocket is still valid. The option must be enabled prior to entering the blocking operation to have effect. The timeout must be > 0. A timeout of zero is interpreted as an infinite timeout.

那麼java中對非阻塞IO的支持以下:

package testBSocket; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketTimeoutException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.log4j.BasicConfigurator; public class SocketServer2 { static {
        BasicConfigurator.configure();
    } private static Object xWait = new Object(); /** * 日誌 */ private static final Log LOGGER = LogFactory.getLog(SocketServer2.class); public static void main(String[] args) throws IOException {
        ServerSocket serverSocket = null; try {
            serverSocket = new ServerSocket(83);
            serverSocket.setSoTimeout(100); while(true) {
                Socket socket = null; try {
                    socket = serverSocket.accept();
                } catch(SocketTimeoutException e1) { //=========================================================== // 執行到這裏,說明本次accept沒有接收到任何數據報文 // 主線程在這裏就能夠作一些事情,記爲X //=========================================================== synchronized (SocketServer2.xWait) {
                        SocketServer2.LOGGER.info("此次沒有從底層接收到任務數據報文,等待10毫秒,模擬事件X的處理時間");
                        SocketServer2.xWait.wait(10);
                    } continue;
                }

                InputStream in = socket.getInputStream();
                OutputStream out = socket.getOutputStream();
                Integer sourcePort = socket.getPort(); int maxLen = 2048; byte[] contextBytes = new byte[maxLen]; int realLen;
                StringBuffer message = new StringBuffer(); //下面咱們收取信息(這裏仍是阻塞式的,一直等待,直到有數據能夠接受) while((realLen = in.read(contextBytes, 0, maxLen)) != -1) {
                    message.append(new String(contextBytes , 0 , realLen)); /* * 咱們假設讀取到「over」關鍵字, * 表示客戶端的全部信息在通過若干次傳送後,完成 * */ if(message.indexOf("over") != -1) { break;
                    }
                } //下面打印信息 SocketServer2.LOGGER.info("服務器收到來自於端口:" + sourcePort + "的信息:" + message); //下面開始發送信息 out.write("回發響應信息!".getBytes()); //關閉 out.close();
                in.close();
                socket.close();
            } 
        } catch(Exception e) {
            SocketServer2.LOGGER.error(e.getMessage(), e);
        } finally { if(serverSocket != null) {
                serverSocket.close();
            }
        }
    }
}

執行效果以下:

這裏寫圖片描述

這裏咱們針對了SocketServer增長了阻塞等待時間,實際上只實現了非阻塞IO模型中的第一步:監聽鏈接狀態的非阻塞。經過運行代碼,咱們能夠發現read()方法仍是被阻塞的,說明socket套接字等待數據讀取的過程,仍是阻塞方式。

3-二、繼續改進

那麼,咱們能不能改進read()方式,讓它也變成非阻塞模式呢?固然是能夠的,socket套接字一樣支持等待超時時間設置。代碼以下:

package testBSocket; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketTimeoutException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.log4j.BasicConfigurator; public class SocketServer3 { static {
        BasicConfigurator.configure();
    } private static Object xWait = new Object(); /** * 日誌 */ private static final Log LOGGER = LogFactory.getLog(SocketServer3.class); public static void main(String[] args) throws IOException {
        ServerSocket serverSocket = null; try {
            serverSocket = new ServerSocket(83);
            serverSocket.setSoTimeout(100); while(true) {
                Socket socket = null; try {
                    socket = serverSocket.accept();
                } catch(SocketTimeoutException e1) { //=========================================================== // 執行到這裏,說明本次accept沒有接收到任何TCP鏈接 // 主線程在這裏就能夠作一些事情,記爲X //=========================================================== synchronized (SocketServer3.xWait) {
                        SocketServer3.LOGGER.info("此次沒有從底層接收到任何TCP鏈接,等待10毫秒,模擬事件X的處理時間");
                        SocketServer3.xWait.wait(10);
                    } continue;
                }

                InputStream in = socket.getInputStream();
                OutputStream out = socket.getOutputStream();
                Integer sourcePort = socket.getPort(); int maxLen = 2048; byte[] contextBytes = new byte[maxLen]; int realLen;
                StringBuffer message = new StringBuffer(); //下面咱們收取信息(設置成非阻塞方式,這樣read信息的時候,又能夠作一些其餘事情) socket.setSoTimeout(10);
                BIORead:while(true) { try { while((realLen = in.read(contextBytes, 0, maxLen)) != -1) {
                            message.append(new String(contextBytes , 0 , realLen)); /* * 咱們假設讀取到「over」關鍵字, * 表示客戶端的全部信息在通過若干次傳送後,完成 * */ if(message.indexOf("over") != -1) { break BIORead;
                            }
                        }
                    } catch(SocketTimeoutException e2) { //=========================================================== // 執行到這裏,說明本次read沒有接收到任何數據流 // 主線程在這裏又能夠作一些事情,記爲Y //=========================================================== SocketServer3.LOGGER.info("此次沒有從底層接收到任務數據報文,等待10毫秒,模擬事件Y的處理時間"); continue;
                    }
                } //下面打印信息 SocketServer3.LOGGER.info("服務器收到來自於端口:" + sourcePort + "的信息:" + message); //下面開始發送信息 out.write("回發響應信息!".getBytes()); //關閉 out.close();
                in.close();
                socket.close();
            } 
        } catch(Exception e) {
            SocketServer3.LOGGER.error(e.getMessage(), e);
        } finally { if(serverSocket != null) {
                serverSocket.close();
            }
        }
    }
}

這樣一來,咱們利用JAVA實現了完整的「非阻塞IO」模型:讓TCP鏈接和數據讀取這兩個過程,都變成了「非阻塞」方式了。

然並卵,這種處理方式實際上並無解決accept方法、read方法阻塞的根本問題。根據上文的敘述,accept方法、read方法阻塞的根本 問題是底層接受數據報文時的「同步IO」工做方式。這兩次改進過程,只是解決了IO操做的兩步中的第一步:將程序層面的阻塞方式變成了非阻塞方式。

3-三、利用線程再改進

另外一個方面,因爲應用程序級別,咱們並無使用多線程技術,這就致使了應用程序只能一個socket套接字 一個socket套接字的處理。這個socket套接字沒有處理完,就無法處理下一個socket套接字。針對這個問題咱們仍是能夠進行改進的:讓應用程 序層面上,各個socket套接字的處理不相互影響:

package testBSocket; import java.io.InputStream; import java.io.OutputStream; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketTimeoutException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.log4j.BasicConfigurator; /** * 經過加入線程的概念,讓socket server可以在應用層面, * 經過非阻塞的方式同時處理多個socket套接字 * @author yinwenjie */ public class SocketServer4 { static {
        BasicConfigurator.configure();
    } private static Object xWait = new Object(); private static final Log LOGGER = LogFactory.getLog(SocketServer4.class); public static void main(String[] args) throws Exception{
        ServerSocket serverSocket = new ServerSocket(83);
        serverSocket.setSoTimeout(100); try { while(true) {
                Socket socket = null; try {
                    socket = serverSocket.accept();
                } catch(SocketTimeoutException e1) { //=========================================================== // 執行到這裏,說明本次accept沒有接收到任何TCP鏈接 // 主線程在這裏就能夠作一些事情,記爲X //=========================================================== synchronized (SocketServer4.xWait) {
                        SocketServer4.LOGGER.info("此次沒有從底層接收到任何TCP鏈接,等待10毫秒,模擬事件X的處理時間");
                        SocketServer4.xWait.wait(10);
                    } continue;
                } //固然業務處理過程能夠交給一個線程(這裏可使用線程池),而且線程的建立是很耗資源的。 //最終改變不了.accept()只能一個一個接受socket鏈接的狀況 SocketServerThread socketServerThread = new SocketServerThread(socket); new Thread(socketServerThread).start();
            }
        } catch(Exception e) {
            SocketServer4.LOGGER.error(e.getMessage(), e);
        } finally { if(serverSocket != null) {
                serverSocket.close();
            }
        }
    }
} /** * 固然,接收到客戶端的socket後,業務的處理過程能夠交給一個線程來作。 * 但仍是改變不了socket被一個一個的作accept()的狀況。 * @author yinwenjie */ class SocketServerThread implements Runnable { /** * 日誌 */ private static final Log LOGGER = LogFactory.getLog(SocketServerThread.class); private Socket socket; public SocketServerThread (Socket socket) { this.socket = socket;
    } @Override public void run() {
        InputStream in = null;
        OutputStream out = null; try {
            in = socket.getInputStream();
            out = socket.getOutputStream();
            Integer sourcePort = socket.getPort(); int maxLen = 2048; byte[] contextBytes = new byte[maxLen]; int realLen;
            StringBuffer message = new StringBuffer(); //下面咱們收取信息(設置成非阻塞方式,這樣read信息的時候,又能夠作一些其餘事情) this.socket.setSoTimeout(10);
            BIORead:while(true) { try { while((realLen = in.read(contextBytes, 0, maxLen)) != -1) {
                        message.append(new String(contextBytes , 0 , realLen)); /* * 咱們假設讀取到「over」關鍵字, * 表示客戶端的全部信息在通過若干次傳送後,完成 * */ if(message.indexOf("over") != -1) { break BIORead;
                        }
                    }
                } catch(SocketTimeoutException e2) { //=========================================================== // 執行到這裏,說明本次read沒有接收到任何數據流 // 主線程在這裏又能夠作一些事情,記爲Y //=========================================================== SocketServerThread.LOGGER.info("此次沒有從底層接收到任務數據報文,等待10毫秒,模擬事件Y的處理時間"); continue;
                }
            } //下面打印信息 Long threadId = Thread.currentThread().getId();
            SocketServerThread.LOGGER.info("服務器(線程:" + threadId + ")收到來自於端口:" + sourcePort + "的信息:" + message); //下面開始發送信息 out.write("回發響應信息!".getBytes()); //關閉 out.close();
            in.close(); this.socket.close();
        } catch(Exception e) {
            SocketServerThread.LOGGER.error(e.getMessage(), e);
        }
    }
}

3-四、依然存在的問題

引入了多線程技術後,IO的處理吞吐量大大提升了,可是這樣作就真的沒有問題了嗎,您要知道操做系統但是有「最大線程」限制的:

  • 雖然在服務器端,請求的處理交給了一個獨立線程進行,可是操做系統通知accept()的方式仍是單個處理的(甚至都不是非阻塞模式)。也 就是說,其實是服務器接收到數據報文後的「業務處理過程」能夠多線程(包括能夠是非阻塞模式),可是數據報文的接受仍是須要一個一個的來。

  • 在linux系統中,能夠建立的線程是有限的。咱們能夠經過cat /proc/sys/kernel/threads-max 命令查看能夠建立的最大線程數。固然這個值是能夠更改的,可是線程越多,CPU切換所需的時間也就越長,用來處理真正業務的需求也就越少。

  • 建立一個線程是有較大的資源消耗的。JVM建立一個線程的時候,即便這個線程不作任何的工做,JVM都會分配一個堆棧空間。這個空間的大小默認爲128K,您能夠經過-Xss參數進行調整。

  • 固然您還可使用ThreadPoolExecutor線程池來緩解線程的建立問題,可是又會形成BlockingQueue積壓任務的持續增長,一樣消耗了大量資源。另外,若是您的應用程序大量使用長鏈接的話,線程是不會關閉的。這樣系統資源的消耗更容易失控。

  • 最後,不管您是使用的多線程、仍是加入了非阻塞模式,這都是在應用程序層面的處理,而底層socketServer所匹配的操做系統的IO模型始終是「同步IO」,最根本的問題並無解決。

  • 那麼,若是你真想單純使用線程來解決問題,那麼您本身均可以計算出來您一個服務器節點能夠一次接受多大的併發了。看來,單純使用線程解決這個問題不是最好的辦法。

四、多路複用IO(IO Multiplex)

我將詳細講解操做系統支持的多路複用IO的工做方式,並介紹JAVA 1.4版本中加入的 JAVA NIO對多路複用IO的實現。(東西太多,咱們放下下篇中)

五、異步IO(真正的NIO)

我將詳細講解操做系統支持的異步IO方式,並介紹JAVA 1.7版本中加入的NIO2.0(AIO)對異步IO的實現。(東西太多,咱們放下下篇中)

相關文章
相關標籤/搜索