架構設計:系統間通訊(1)--概述從「聊天」開始上篇

從這篇博文開始,咱們將進入一個新文章系列。這個文章系列專門整理總結了目前系統間通訊的主要原理、手段和實現。咱們將講解典型的信息格式、講解傳統的RMI(Java Remote Method Invocation)調用並延伸出來重點講解RPC(Remote Procedure Call)調用和使用案例;最後咱們還會講到SOA(Service-Oriented Architecture)架構的實現,包括ESB(Enterprise service bus)實現和服務註冊/治理的實現,一樣包括原理、實現和案例。java

系統間通訊是架構師須要掌握的又一個關鍵技術領域若是說理解和掌握負載均衡層技術須要您有必定的linux系統知識和操做系統知識的話,那麼理解和掌握系統間通訊技術,須要您必定的編程經驗(最好是JAVA編程經驗,由於咱們會主要以JAVA技術做爲實例演示)。python

一、一個場景linux

首先咱們來看一個顯示場景:在現實生活中有兩個技術人員A和B,在進行一問一答形式的交流。以下圖所示:c++

 

咱們來看這幅圖的中的幾個要點:程序員

  • 他們倆都使用中文進行交流。若是他們一個使用的是南斯拉夫語,另外一人使用的是索馬里語,而且相互都不能理解對方的語系,很顯然A所表達的內容B是沒法理解的。
  • 他們的聲音是在空氣中進行傳播的。空氣除了支撐他們的呼吸外,還支撐了他們聲音的傳播。若是沒有空氣他們是沒法知道對方用中文說了什麼。
  • 他們的交流方式是協調一致的,即A問完一個問題後,等待B進行回答。收到B的回答後,A才能問下一個問題。
  • 因爲都是人類,因此他們處理信息的方式也是同樣的:用嘴說話,用耳朵聽話,用大腦處理造成結果。

目前這個交流場景下,只有A和B兩我的。可是隨時有可能增長N我的進來。第N我的可能不是採用中文進行交流。apache

二、信息格式編程

很明顯經過中文的交談,兩我的相互明白了對方的意圖。爲了保證信息傳遞的高效性,咱們必定會將信息作成某種參與者都理解的格式。例如:中文有其特定的語法結構,例如主謂賓,定狀補。設計模式

在計算機領域爲了保證信息可以被處理,信息也會被作成特定的格式,並且要確保目標可以明白這種格式。經常使用的信息格式包括:服務器

XML:微信

  可擴展標記語言,這個語言由W3C(萬維網聯盟)進行發佈和維護。XML語言應用普遍,擴展之豐富。適合作網絡通訊的信息描述格式(通常是「應用層」協議了)。例如Google定義的XMPP通訊協議就是使用XML進行描述的;不過XML的更普遍使用場景是對系統環境進行描述(由於它會形成較多的沒必要要的內容傳輸),例如服務器的配置描述,Spring的配置描述、Maven倉庫描述等等。

JSON:

JSON(JavaScript Object Notation)是一種輕量級的數據交換格式。它和XML的設計思路是一致的:和語言無關(流行的語言都支持JSON格式描述:Go、Python、C、C++、C#、JAVA、Erlang、JavaScript等等);可是和XML不一樣,JSON的設計目標就是爲了進行通訊。要描述一樣的數據,JSON格式的容量會更小。

protocol buffer(PB):

protocolbuffer(如下簡稱PB)是Google的一種數據交換的格式,它獨立於語言,獨立於平臺。Google提供了三種語言的實現:java、c++和python,每一種實現都包含了相應語言的編譯器及庫文件。

TLV(三元組編碼):

T(標記/類型域)L(長度/大小域)V(值/內容域),一般這種信息格式用於金融、軍事領域。它經過字節的位運算來進行信息的序列化/反序列化(聽說微信的信息格式也採用的是TLV,但實際狀況我不清楚):

 

這裏有一篇介紹TLV的文章:《通訊協議之序列化TLV》,TLV格式所攜帶的內容是最有效的,它就連JSON中用於分割層次的「{}」符號都沒有。

自定義的格式

固然,若是您的兩個內部系統已經約定好了一種信息格式,您固然可使用本身定製的格式進行描述。您可使用C++描述一個結構體,而後序列化/反序列它,或者使用一個純文本,以「|」號分割這些字符串,而後序列化/反序列它。

在這個系列的博文中,咱們不會把信息格式做爲一個重點,可是會花一些篇幅去比較各類信息格式在網絡上傳輸的速度、性能,併爲你們介紹幾種典型的信息格式選型場景。

三、網絡協議

如文中第一張圖描述的場景,有一個咱們看不到可是卻很重要的元素:空氣。聲音在空氣中完成傳播,真空沒法傳播聲音。一樣信息是在網絡中完成傳播的,沒有網絡就無法傳播信息。網絡協議就是計算機領域的「空氣」,下圖中咱們OSI(Open System Interconnection)模型做爲參考:

 

物理層:物理層就是咱們的網絡設備層,例如咱們的網卡、交換機等設備,在他們之間咱們通常傳遞的是電信號或者光信號。

數據鏈路層:數據鏈路又分爲物理鏈路和邏輯鏈路。物理鏈路負責組合一組電信號,稱之爲「幀」;邏輯鏈路層經過一些規則和協議保證幀傳輸的正確性,而且可使用來自於多個源/目標的幀在同一個物理鏈路上進行傳輸,實現「鏈路複用」。

網絡層:網絡層使用最普遍的協議就是IP協議(又分爲IPV4協議和IPV6協議),IPX協議。這些協議解決的是源和目標的定位問題,以及從源如何到達目標的問題。

傳輸層:TCP、UDP是傳輸層最常使用的協議,傳輸層的最重要工做就是攜帶內容信息了,而且經過他們的協議規範提供某種通訊機制。舉例來講,TCP協議中的通訊機制是:首先進行三次通訊握手,而後再進行正式數據的傳送,而且經過校驗機制保證每一個數據報文的正確性,若是數據報文錯誤了,則從新發送。

應用層:HTTP協議、FTP協議,TELNET協議這些都是應用層協議。應用層協議是最靈活的協議,甚至能夠由程序員自行定義應用層協議。下圖咱們表示了HTTP協議的工做方式:

 

 在這個系列的博文中,咱們不會把網絡協議協議做爲一個重點。這是由於網絡協議的知識是一個相對獨立的知識領域,十幾篇文章都不必定講的清楚。若是您對網絡協議有興趣,這裏推薦兩本書:《TCP/IP詳解.卷1-協議》和《TCP/IP詳解.卷2-實現》。

四、通訊方式/框架

在文章最前面咱們看到其中一我的規定了一種溝通方式:「你必須把我說的話聽完,而後給我反饋後,我纔會問第二個問題」。這種溝通方式雖然溝通效率不高,可是頗有效:一個問題一個問題的處理。

可是若是參與溝通的人處理信息的能力比較強,那麼他們還能夠採用另外一種溝通方式:我給我提的問題編了一個號,在問完第X個問題後,我不會等待你返回,就會問第X+1個問題,一樣你在聽完我第X個問題後,一邊處理個人問題,一邊聽我第X+1個問題。

實際上以上兩種現實中的溝通方式,在計算機領域是能夠找到對應的通訊方式的,這就是咱們這個系列的博文會着重講的BIO(Blocking IO)(阻塞模式)通訊和NIO(Non-Blocking IO)(非阻塞模式)。

4-一、BIO通訊方式

之前大多數網絡通訊方式都是阻塞模式的,即:

  • 客戶端向服務器發出請求後,客戶端會一直等待(不會再作其餘事情),直到服務器返回結果或者網絡出現問題。
  • 服務端一樣的,當在處理某個客戶端A發來的請求時,另外一個客戶端B發來的請求會等待,直到服務器端的這個處理線程完成上一個處理。

以下圖所示:

 傳統的BIO通訊方式存在幾個問題:

  • 同一時間,服務器只能接受來自於客戶端A的請求信息;雖然客戶端A和客戶端B的請求是同時進行的,但客戶端B發送的請求信息只能等到服務器接受完A的請求數據後,才能被接受。
  • 因爲服務器一次只能處理一個客戶端請求,當處理完成並返回後(或者異常)時,才能進行第二次請求的處理。很顯然,這樣的處理方式在高併發的狀況下,是不能採用的。

上面說的狀況是服務器只有一個線程的狀況,那麼讀者會直接提出咱們可使用多線程技術來解決這個問題:

  • 當服務器收到客戶端X的請求後,(讀取到全部請求數據後)將這個請求送入一個獨立線程進行處理,而後主線程繼續接受客戶端Y的請求。
  • 客戶端一側,也可使用一個子線程和服務端進行通訊。這樣客戶端主線程的其餘工做就不受影響了,當服務器端有響應信息的時候再由這個子線程經過監聽模式/觀察模式(等其餘設計模式)通知主線程。

以下圖所示:

 

可是使用線程來解決這個問題其實是有侷限性的:

  • 雖然在服務器端,請求的處理交給了一個獨立線程進行,可是操做系統通知accept()的方式仍是單個的。也就是,其實是服務器接收到數據報文後「業務處理過程」能夠多線程,可是數據報文的接受仍是須要一個一個的來(下文的示例代碼和debug過程咱們能夠明確看到這一點)
  • 在linux系統中,能夠建立的線程是有限的。咱們能夠經過cat/proc/sys/kernel/threads-max命令查看能夠建立的最大線程數。固然這個值是能夠更改的可是線程越多,CPU切換所需時間也就越長,用來處理真正業務的需求也就越少。
  • 建立一個線程是有較大的資源消耗的。JVM建立一個線程的時候,即便這個線程不作任何的工做,JVM都會分配一個堆棧空間。這個空間的大小默認爲128K,您能夠經過-Xss參數進行調整。
  • 固然您還可使用ThreadPoolExecutor線程池來緩解線程的建立問題,可是又會形成BlockingQueue積壓任務的持續增長,一樣消耗了大量資源。
  • 另外,若是您的應用程序大量使用長鏈接的話線程是不會關閉的。這樣系統資源的消耗更容易失控。
  • 那麼,若是你真想單純使用線程解決阻塞的問題,那麼您本身均可以算出來您一個服務器節點能夠一次接受多大的併發了。看來,單純使用線程解決這個問題不是最好的方法。

 4-二、BIO通訊方式深刻分析

在這個系列的博文中,通訊方式/框架將做爲一個重點進行講解。包括NIO的原理,並經過講解Netty的使用、JAVA原生NIO框架的使用,去熟悉這些核心原理。

實際上從上文中咱們能夠看出,BIO的問題關鍵不在因而否使用了多線程(包括線程池)處理此次請求,而在於accept()、read()的操做點都是被阻塞。要測試這個問題,也很簡單。咱們模擬了20個客戶端(用20根線程模擬),利用JAVA的同步計數器CountDownLatch,保證這20個客戶都初始化完成後而後同時向服務器發送請求,而後咱們來觀察下Server這邊接受信息的狀況。

4-2-一、模擬20個客戶端併發請求,服務器端使用單線程:

  •  客戶端代碼(SocketClientDaemon)
package com.example.demo.testBSocket;

import java.util.concurrent.CountDownLatch;

public class SocketClientDaemon {
    public static void main(String[] args) throws Exception {
        Integer clientNumber = 20;
        CountDownLatch countDownLatch = new CountDownLatch(clientNumber);
        //分別開始啓動這20個客戶端
        for (int index = 0; index < clientNumber; index++, countDownLatch.countDown()) {
            SocketClientRequestThread client = new SocketClientRequestThread(countDownLatch, index);
            new Thread(client).start();
        }

        //這個wait不涉及到具體的實現邏輯,只是爲了保證守護線程在啓動全部線程後,進入等待狀態
        synchronized (SocketClientDaemon.class) {
            SocketClientDaemon.class.wait();
        }
    }
}
  •  客戶端代碼(SocketClientRequestThread模擬請求)
package com.example.demo.testBSocket;


import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.concurrent.CountDownLatch;
import org.apache.log4j.BasicConfigurator;

/**
 * 一個SocketClientRequestThread線程模擬一個客戶端請求。
 */
public class SocketClientRequestThread implements Runnable {

    static {
        BasicConfigurator.configure();//自動快速地使用缺省 Log4j 環境。
    }
    /**
     * 日誌
     */
    private static final Log LOGGER = LogFactory.getLog(SocketClientRequestThread.class);
    private CountDownLatch countDownLatch;
    /**
     * 這個線層的編號
     */
    private Integer clientIndex;

    /**
     * countDownLatch是java提供的同步計數器
     * 當計數器數值減爲0時,全部受其影響而等待的線程將會被激活。這樣保證模擬併發請求的真實性。
     *
     * @param countDownLatch
     * @param clientIndex
     */
    public SocketClientRequestThread(CountDownLatch countDownLatch, Integer clientIndex) {
        this.countDownLatch = countDownLatch;
        this.clientIndex = clientIndex;
    }

    @Override
    public void run() {
        Socket socket = null;
        OutputStream clientRequest = null;
        InputStream clientResponse = null;

        try {
            socket = new Socket("localhost", 83);
            clientRequest = socket.getOutputStream();
            clientResponse = socket.getInputStream();
            //等待,直到SocketClientDaemon完成全部線程的啓動,而後全部線程一塊兒發送請求。
            this.countDownLatch.await();

            //發送請求信息
            clientRequest.write(("這是第" + this.clientIndex + "個客戶端的請求。").getBytes());
            clientRequest.flush();

            //在這裏等待,直到服務器返回信息
            SocketClientRequestThread.LOGGER.info("第" + this.clientIndex + "個客戶端的請求發送完成,等待服務器返回信息");
            int maxLen = 1024;
            byte[] contextBytes = new byte[maxLen];
            int realLen;
            String message = "";
            //程序執行到這裏,會一直等待服務器返回信息(注意,前提是in和out都不能close,若是close了就收不到服務器的反饋了)
            while ((realLen = clientResponse.read(contextBytes, 0, maxLen)) != -1) {
                message += new String(contextBytes, 0, realLen);
            }
            SocketClientRequestThread.LOGGER.info("接收到來自服務器的信息" + message);


        } catch (Exception e) {
            SocketClientRequestThread.LOGGER.error(e.getMessage(), e);
        } finally {
            try {
                if (clientRequest != null) {
                    clientRequest.close();
                }
                if (clientResponse != null) {
                    clientResponse.close();
                }
            } catch (IOException e) {
                SocketClientRequestThread.LOGGER.error(e.getMessage(), e);
            }
        }
    }
}

 

  • 服務器端(SocketServer1)單個線程
package com.example.demo.testBSocket;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import org.apache.log4j.BasicConfigurator;

public class SocketServer1 {
    static {
        BasicConfigurator.configure();//自動快速地使用缺省 Log4j 環境。
    }

    /**
     * 日誌
     */
    private static final Log LOGGER = LogFactory.getLog(SocketServer1.class);

    public static void main(String[] args) throws Exception {
        ServerSocket serverSocket = new ServerSocket(83);

        try {
            while (true) {
                Socket socket = serverSocket.accept();

                //下面咱們收取信息
                InputStream in = socket.getInputStream();
                OutputStream out = socket.getOutputStream();
                int sourcePort = socket.getPort();
                int maxLen = 2048;
                byte[] contextBytes = new byte[maxLen];
                //這裏也會被阻塞,直到有數據準備好
                int realLen = in.read(contextBytes, 0, maxLen);
                //讀取信息
                String message = new String(contextBytes, 0, realLen);

                //下面打印信息
                SocketServer1.LOGGER.info("服務器收到來自於端口:" + sourcePort + "的信息" + message);

                //下面開始發送信息
                out.write("回發響應信息!".getBytes());

                //關閉
                out.close();
                in.close();
                socket.close();
            }
        } catch (Exception e) {
            SocketServer1.LOGGER.error(e.getMessage(), e);
        } finally {
            if (serverSocket != null) {
                serverSocket.close();
            }
        }

    }
}

 

4-2-二、就像上文所述咱們可使用多線程來優化服務器端的處理過程:

客戶端代碼和上文同樣,最主要的是更改服務器端的代碼:

package com.example.demo.testBSocket;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.BasicConfigurator;

public class SocketServer2 {

    static {
        BasicConfigurator.configure();
    }

    private static final Log LOGGER = LogFactory.getLog(SocketServer2.class);

    public static void main(String[] args) throws Exception{
        ServerSocket serverSocket = new ServerSocket(83);

        try {
            while(true) {
                Socket socket = serverSocket.accept();
                //固然業務處理過程能夠交給一個線程(這裏可使用線程池),而且線程的建立是很耗資源的。
                //最終改變不了.accept()只能一個一個接受socket的狀況,而且被阻塞的狀況
                SocketServerThread socketServerThread = new SocketServerThread(socket);
                new Thread(socketServerThread).start();
            }
        } catch(Exception e) {
            SocketServer2.LOGGER.error(e.getMessage(), e);
        } finally {
            if(serverSocket != null) {
                serverSocket.close();
            }
        }
    }
}

/**
 * 固然,接收到客戶端的socket後,業務的處理過程能夠交給一個線程來作。
 * 但仍是改變不了socket被一個一個的作accept()的狀況。
 * @author yinwenjie
 */
class SocketServerThread implements Runnable {

    /**
     * 日誌
     */
    private static final Log LOGGER = LogFactory.getLog(SocketServerThread.class);

    private Socket socket;

    public SocketServerThread (Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {
        InputStream in = null;
        OutputStream out = null;
        try {
            //下面咱們收取信息
            in = socket.getInputStream();
            out = socket.getOutputStream();
            Integer sourcePort = socket.getPort();
            int maxLen = 1024;
            byte[] contextBytes = new byte[maxLen];
            //使用線程,一樣沒法解決read方法的阻塞問題,
            //也就是說read方法處一樣會被阻塞,直到操做系統有數據準備好
            int realLen = in.read(contextBytes, 0, maxLen);
            //讀取信息
            String message = new String(contextBytes , 0 , realLen);

            //下面打印信息
            SocketServerThread.LOGGER.info("服務器收到來自於端口:" + sourcePort + "的信息:" + message);

            //下面開始發送信息
            out.write("回發響應信息!".getBytes());
        } catch(Exception e) {
            SocketServerThread.LOGGER.error(e.getMessage(), e);
        } finally {
            //試圖關閉
            try {
                if(in != null) {
                    in.close();
                }
                if(out != null) {
                    out.close();
                }
                if(this.socket != null) {
                    this.socket.close();
                }
            } catch (IOException e) {
                SocketServerThread.LOGGER.error(e.getMessage(), e);
            }
        }
    }
}

4-2-三、看看服務器端的執行效果:

4-2-四、問題根源:

那麼重點的問題並非「是否使用了多線程」,而是爲何accept()、read()方法會被阻塞。即:異步IO模式就是爲了解決這樣的併發性存在的。可是爲了說清楚異步IO模式,在介紹IO模式的時候,咱們就要首先了解清楚,什麼是阻塞式同步、非阻塞式同步、多路複用同步模式。

這裏我要特別說明一下,在一篇網文《Java NIO與IO的詳細區別(通俗篇)》中,做者主要講到了本身對非阻塞方式下硬盤操做的理解。按照個人見解,只要有IO存在,就會有阻塞和非阻塞的問題,不管這個IO是網絡的,仍是硬盤的。這就是爲何基本的JAVA NIO框架中會有FileChannel(並且FileChannel在操做系統級別是不支持非阻塞模式的)、DatagramChannel和SocketChannel的緣由。NIO並非爲了解決磁盤讀寫的性能而存在的,它的出現緣由、要解決的問題更爲廣闊;可是另一個方面,文章做者只是表達了本身的思想,沒有必要爭論得「咬文嚼字」。

 API文檔中對於serverSocket.accept()方法的使用描述:

Listens for a connection to be made to this socket an accepts it.The method blocks until a connnection is made.

那麼咱們首先來看看爲何serverSocket.accept()會被阻塞。這裏涉及到阻塞式同步IO的工做原理:

  • 服務器線程發起一個accept動做,詢問操做系統是否有新的socket套接字信息從端口X發送過來。

  • 注意,式詢問操做系統。也就是說socket套接字的IO模式支持是基於操做系統的,那麼天然同步IO/異步IO的支持就是須要操做系統級別的了。以下圖:

 

  • 若是操做系統沒有發現有套接字從指定的端口X來,那麼操做系統就會等待。這樣serverSocket.accept()方法就會一直等待。這就是爲何accept()方法爲何會阻塞:它內部的實現是使用的操做系統級別的同步IO。

阻塞IO和非阻塞IO 這兩個概念是程序級別的。主要描述的是程序級別的。主要描述的是程序請求操做系統IO操做後,若是IO資源沒有準備好,那麼程序該如何處理的問題;前者等待;後者繼續執行(而且使用線程一直輪詢,直到有IO資源準備好了)

同步IO和非同步IO,這兩個概念是操做系統級別的。主要描述的是操做系統在收到程序請求IO操做後,若是IO資源沒有準備好,該如何響應程序的問題;前者是不響應,直到IO資源準備好之後;後者是返回一個標記(好讓程序和本身知道之後的數據往哪裏通知),當IO資源準備好之後,再用事件機制返回給程序。

  原文地址:https://blog.csdn.net/yinwenjie/article/details/48274255

相關文章
相關標籤/搜索