深刻淺出Java分佈式系統通訊

什麼是分佈式系統

以前我有篇文章已經簡單介紹了分佈式通訊,有興趣的朋友能夠去看看:html

大型網站系統架構實踐(二)分佈式模塊之間的通訊java

那麼今天我詳細的說下我對java分佈式系統通訊的理解數據庫

1.集羣模式,將相同應用模塊部署多份後端

2.業務拆分模式,將業務拆分紅多個模塊,並分別部署服務器

3.存儲分佈式session

因爲分佈式概念太大,咱們能夠縮小下討論的範圍:架構

如下分佈式的狹義定義爲:併發

業務拆分,但不限於水平拆分,而是拆分出底層模塊,功能模塊,上層模塊等等異步

一個系統功能繁多,且有層次依賴,那麼咱們須要將其分爲不少模塊,並分別部署 。socket

舉例:

好比咱們如今開發一個相似於錢包的系統,那麼它會有以下功能模塊:用戶模塊(用戶數據),

應用模塊(如手機充值等),業務模塊(處理核心業務),交易模塊(與銀行發生交易),

前置模塊(與客戶端通訊) 等等

咱們會獲得一個系統架構圖:

clip_image002

爲何要分佈式

1) 將系統功能模塊化,且部署在不一樣的地方,對於底層模塊,只要保持接口不變,

上層系統調用底層模塊將不關心其具體實現,且底層模塊作內部邏輯變動,上層系統

都不須要再作發佈,能夠極大限度的解耦合

2) 解耦合以後,能夠複用共同的功能,且業務擴展更爲方便,加快開發和發佈的速度

3) 系統分開部署,充分利用硬件,能夠提升系統性能

4) 減小數據庫鏈接資源的消耗

分佈式通訊方案

場景:服務端與服務端的通訊

方案1:基於socket短鏈接

方案2:基於socket長鏈接同步通訊

方案3:基於socket長鏈接異步通訊

tcp短鏈接通訊方案

定義:

短鏈接:http短鏈接,或者socket短鏈接,是指每次客戶端和服務端通訊的時候,都要新

創建一個socket鏈接,本次通訊完畢後,當即關閉該鏈接,也就是說每次通訊都須要開啓一個新的鏈接 。

傳輸圖以下:

clip_image004

io通訊用mina實現

客戶端示例代碼:

NioSocketConnector connector = new NioSocketConnector();
connector.setConnectTimeoutMillis(CONNECT_TIMEOUT);
//設置讀緩衝,傳輸的內容必須小於此緩衝
connector.getSessionConfig().setReadBufferSize(2048*2048);
//設置編碼解碼器
connector.getFilterChain().addLast("codec",
new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
//設置日誌過濾器
connector.getFilterChain().addLast("logger", new LoggingFilter());
//設置Handler
connector.setHandler(new MyClientHandler());

//獲取鏈接,該方法爲異步執行
ConnectFuture future = connector.connect(new InetSocketAddress(
        HOSTNAME, PORT));
//等待鏈接創建
future.awaitUninterruptibly();
//獲取session
IoSession session = future.getSession();

//等待session關閉
session.getCloseFuture().awaitUninterruptibly();
//釋放connector
connector.dispose();

下面咱們進行性能測試:

 

測試場景:

每一個請求的業務處理時間110ms

100個線程併發測試,每一個線程循環請求服務端

測試環境:

客戶端服務器:

Cpu爲4線程 2400mhz

服務端cpu: 4線程 3000Mhz

測試結果:

在通過10分鐘測試以後,穩定狀況下的tps

Tps:554左右

客戶端Cpu:30%

服務端cpu:230%

 

該方案的優勢:

程序實現起來簡單

該方案的缺點:

1. Socket發送消息時,須要先發送至socket緩衝區,所以系統爲每一個socket分配緩衝區

當緩衝不足時,就達到了最大鏈接數的限制

2. 鏈接數大,也就意味着系統內核調用的越多,socket的accept和close調用

3.每次通訊都從新開啓新的tcp鏈接,握手協議耗時間,tcp是三次握手

4.tcp是慢啓動,TCP 數據傳輸的性能還取決於 TCP 鏈接的使用期(age)。TCP 鏈接會隨着時間進行自我「調諧」,起初會限制鏈接的最大速度,若是數據成功傳輸,會隨着時間的推移提升傳輸的速度。這種調諧被稱爲 TCP 慢啓動(slow start),用於防止因特網的突

然過載和擁塞 。

 

tcp長鏈接同步通訊

長鏈接同步的傳輸圖

clip_image008

一個socket鏈接在同一時間只能傳遞一個請求的信息

只有等到response以後,第二個請求才能開始使用這個通道

爲了提升併發性能,能夠提供多個鏈接,創建一個鏈接池,鏈接被使用的時候標誌爲正在使用,

使用完放回鏈接池,標識爲空閒,這和jdbc鏈接池是同樣的。

假設後端服務器,tps是1000,即每秒處理業務數是1000

如今內網傳輸耗時是5毫秒,業務處理一次請求的時間爲150毫秒

那麼一次請求從客戶端發起請求到獲得服務端的響應,總共耗時150毫秒+5毫秒*2

=160毫秒,若是隻有一個鏈接通訊,那麼1秒內只能完成2次業務處理,即tps爲2

若是要使tps達到1000,那麼理論上須要500個鏈接,可是當鏈接數上升的時候,其性能卻在降低,

所以該方案將會下降網站的吞吐量。

實現挑戰:

mina的session.write()和receive消息都是異步的,那麼須要在主線程上阻塞以等待響應的到達。

鏈接池代碼:

/**
* 空閒鏈接池
*/
private static BlockingQueue<Connection> idlePool = new LinkedBlockingQueue<Connection>();
    
/**
* 使用中的鏈接池
*/
public static BlockingQueue<Connection> activePool = new LinkedBlockingQueue<Connection>();

public static Connection getConn() throws InterruptedException{
    long time1 = System.currentTimeMillis();
    Connection connection = null;
    connection = idlePool.take();            
    activePool.add(connection);
    long time2 = System.currentTimeMillis();
    //log.info("獲取鏈接耗時:"+(time2-time1));
    return connection;
}

客戶端代碼:

public TransInfo send(TransInfo info) throws InterruptedException {
    Result result = new Result();
    //獲取tcp鏈接
    Connection connection = ConnectFutureFactory.getConnection(result);
    ConnectFuture connectFuture = connection.getConnection();
    IoSession session = connectFuture.getSession();
    session.setAttribute("result", result);
    //發送信息
    session.write(info);
    //同步阻塞獲取響應
    TransInfo synGetInfo = result.synGetInfo();
    //此處並非真正關閉鏈接,而是將鏈接放回鏈接池
    ConnectFutureFactory.close(connection,result);
    return synGetInfo;
}

阻塞獲取服務端響應代碼:

public synchronized TransInfo synGetInfo() {
    //等待消息返回
    //必需要在同步的狀況下執行
    if (!done) {
        try {                    
            wait();
        } catch (InterruptedException e) {
            log.error(e.getMessage(), e);
        }
    }
    return info;
}

public synchronized void synSetInfo(TransInfo info) {
    this.info = info;
    this.done = true;
    notify();
}

測試場景:

每一個請求的業務處理時間110ms

300個線程300個鏈接併發測試,每一個線程循環請求服務端

測試環境:

客戶端服務器:

Cpu爲4線程 2400mhz

服務端cpu: 4線程 3000Mhz

測試結果:

在通過10分鐘測試以後,穩定狀況下的tps

Tps:2332左右

客戶端Cpu:90%

服務端cpu:250%

從測試結果能夠看出,當鏈接數足夠大的時候,系統性能會下降,開啓的tcp鏈接數越多,那麼

系統開銷將會越大。 

tcp長鏈接異步通訊

通訊圖:

image

一個socket鏈接在同一時間內傳輸屢次請求的信息,輸入通道接收多條響應消息,消息是連續發出,連續收回的。

業務處理和發消息是異步的,一個業務線程告訴通道發送消息後,再也不佔用通道,而是等待響應到達,而此時其它

業務線程也能夠往該鏈接通道發信息,這樣能夠充分利用通道來進行通訊。

實現挑戰

但該方案使編碼變得複雜,如上圖,請求request1,request2,request3順序發出,可是服務端處理請求並非

排隊的,而是並行處理的,有可能request3先於request1響應給客戶端,那麼一個request將沒法找到他的response,

這時候咱們須要在request和response報文中添加惟一標識,如通訊序列號,在一個通訊通道里面保持惟一,

那麼能夠根據序列號去獲取對應的響應報文。

個人方案是:

1.客戶端獲取一個tcp鏈接

2.調用session.write()發送信息,並將消息的惟一序列號存入一個Result對象

result對象存入一個map 

3.同步阻塞獲取結果,線程在result對象進行同步阻塞

4.接收消息,並經過惟一序列號從map裏面獲取result對象,並喚醒阻塞在result對象上的線程

客戶端發送消息示例代碼:

public TransInfo send(TransInfo info) throws InterruptedException {
    Result result = new Result();
    result.setInfo(info);
    //獲取socket鏈接
    ConnectFuture connectFuture = ConnectFutureFactory
        .getConnection(result);
    IoSession session = connectFuture.getSession();
    //將result放入ConcurrentHashMap
    ConcurrentHashMap<Long, Result> resultMap = (ConcurrentHashMap<Long, Result>)session.getAttribute("resultMap");
    resultMap.put(info.getId(), result);
    //發送消息
    session.write(info);
    //同步阻塞獲取結果
    return result.synGetInfo();
}

同步阻塞和喚醒方法:

public synchronized TransInfo synGetInfo() {
    //等待消息返回
    //必需要在同步的狀況下執行
    while (!done) {
        try {                    
            wait();
        } catch (InterruptedException e) {
            log.error(e.getMessage(), e);
        }
    }
    return info;
}

public synchronized void synSetInfo(TransInfo info) {
    this.info = info;
    this.done = true;
    notify();
}

接收消息示例代碼:

public void messageReceived(IoSession session, Object message)
        throws Exception {
    TransInfo info = (TransInfo) message;    
    //根據惟一序列號從resultMap中獲取result
    ConcurrentHashMap<Long, Result> resultMap = (ConcurrentHashMap<Long, Result>)session.getAttribute("resultMap");
    //移除result
    Result result = resultMap.remove(info.getId());        
    //喚醒阻塞線程
    result.synSetInfo(info);
}

測試場景:

每一個請求的業務處理時間110ms

300個線程10個鏈接併發測試,每一個線程循環請求服務端

測試環境:

客戶端服務器:

Cpu爲4線程 2400mhz

服務端cpu: 4線程 3000Mhz

測試結果:

在通過10分鐘測試以後,穩定狀況下的tps

Tps:2600左右

客戶端Cpu:25%

服務端cpu:250%

經測試發現,異步通訊能夠用更少的tcp鏈接實現一樣高效的通訊,極大的減小了系統性能開銷。

今天暫時寫到這裏。

 

參考 文章

http://www.2cto.com/os/201203/125511.html

wireshark-win32-1.6.5.exe:

http://down.51cto.com/data/685517

RPC與消息隊列的區別

http://oldratlee.com/post/2013-02-01/synchronous-rpc-vs-asynchronous-message

tcp長鏈接與短鏈接的區別

http://www.cnblogs.com/liuyong/archive/2011/07/01/2095487.html

http://blog.chinaunix.net/uid-354915-id-3587924.html

keep-alived詳解

http://wudi.in/archives/446.html

http://www.nowamagic.net/academy/detail/23350305

wireshark抓包詳解

http://www.cnblogs.com/TankXiao/archive/2012/10/10/2711777.html

長鏈接,同步異步參考

http://www.yeolar.com/note/2012/11/10/c10k/

同步隊列:

http://ifeve.com/java-synchronousqueue/

netty:

http://www.infoq.com/cn/articles/netty-reliability

相關文章
相關標籤/搜索