以前我有篇文章已經簡單介紹了分佈式通訊,有興趣的朋友能夠去看看:html
那麼今天我詳細的說下我對java分佈式系統通訊的理解數據庫
1.集羣模式,將相同應用模塊部署多份後端
2.業務拆分模式,將業務拆分紅多個模塊,並分別部署服務器
3.存儲分佈式session
因爲分佈式概念太大,咱們能夠縮小下討論的範圍:架構
如下分佈式的狹義定義爲:併發
業務拆分,但不限於水平拆分,而是拆分出底層模塊,功能模塊,上層模塊等等異步
一個系統功能繁多,且有層次依賴,那麼咱們須要將其分爲不少模塊,並分別部署 。socket
舉例:
好比咱們如今開發一個相似於錢包的系統,那麼它會有以下功能模塊:用戶模塊(用戶數據),
應用模塊(如手機充值等),業務模塊(處理核心業務),交易模塊(與銀行發生交易),
前置模塊(與客戶端通訊) 等等
咱們會獲得一個系統架構圖:
1) 將系統功能模塊化,且部署在不一樣的地方,對於底層模塊,只要保持接口不變,
上層系統調用底層模塊將不關心其具體實現,且底層模塊作內部邏輯變動,上層系統
都不須要再作發佈,能夠極大限度的解耦合
2) 解耦合以後,能夠複用共同的功能,且業務擴展更爲方便,加快開發和發佈的速度
3) 系統分開部署,充分利用硬件,能夠提升系統性能
4) 減小數據庫鏈接資源的消耗
場景:服務端與服務端的通訊
方案1:基於socket短鏈接
方案2:基於socket長鏈接同步通訊
方案3:基於socket長鏈接異步通訊
定義:
短鏈接:http短鏈接,或者socket短鏈接,是指每次客戶端和服務端通訊的時候,都要新
創建一個socket鏈接,本次通訊完畢後,當即關閉該鏈接,也就是說每次通訊都須要開啓一個新的鏈接 。
傳輸圖以下:
io通訊用mina實現
客戶端示例代碼:
NioSocketConnector connector = new NioSocketConnector(); connector.setConnectTimeoutMillis(CONNECT_TIMEOUT); //設置讀緩衝,傳輸的內容必須小於此緩衝 connector.getSessionConfig().setReadBufferSize(2048*2048); //設置編碼解碼器 connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory())); //設置日誌過濾器 connector.getFilterChain().addLast("logger", new LoggingFilter()); //設置Handler connector.setHandler(new MyClientHandler()); //獲取鏈接,該方法爲異步執行 ConnectFuture future = connector.connect(new InetSocketAddress( HOSTNAME, PORT)); //等待鏈接創建 future.awaitUninterruptibly(); //獲取session IoSession session = future.getSession(); //等待session關閉 session.getCloseFuture().awaitUninterruptibly(); //釋放connector connector.dispose();
下面咱們進行性能測試:
測試場景:
每一個請求的業務處理時間110ms
100個線程併發測試,每一個線程循環請求服務端
測試環境:
客戶端服務器:
Cpu爲4線程 2400mhz
服務端cpu: 4線程 3000Mhz
測試結果:
在通過10分鐘測試以後,穩定狀況下的tps
Tps:554左右
客戶端Cpu:30%
服務端cpu:230%
該方案的優勢:
程序實現起來簡單
該方案的缺點:
1. Socket發送消息時,須要先發送至socket緩衝區,所以系統爲每一個socket分配緩衝區
當緩衝不足時,就達到了最大鏈接數的限制
2. 鏈接數大,也就意味着系統內核調用的越多,socket的accept和close調用
3.每次通訊都從新開啓新的tcp鏈接,握手協議耗時間,tcp是三次握手
4.tcp是慢啓動,TCP 數據傳輸的性能還取決於 TCP 鏈接的使用期(age)。TCP 鏈接會隨着時間進行自我「調諧」,起初會限制鏈接的最大速度,若是數據成功傳輸,會隨着時間的推移提升傳輸的速度。這種調諧被稱爲 TCP 慢啓動(slow start),用於防止因特網的突
然過載和擁塞 。
長鏈接同步的傳輸圖
一個socket鏈接在同一時間只能傳遞一個請求的信息
只有等到response以後,第二個請求才能開始使用這個通道
爲了提升併發性能,能夠提供多個鏈接,創建一個鏈接池,鏈接被使用的時候標誌爲正在使用,
使用完放回鏈接池,標識爲空閒,這和jdbc鏈接池是同樣的。
假設後端服務器,tps是1000,即每秒處理業務數是1000
如今內網傳輸耗時是5毫秒,業務處理一次請求的時間爲150毫秒
那麼一次請求從客戶端發起請求到獲得服務端的響應,總共耗時150毫秒+5毫秒*2
=160毫秒,若是隻有一個鏈接通訊,那麼1秒內只能完成2次業務處理,即tps爲2
若是要使tps達到1000,那麼理論上須要500個鏈接,可是當鏈接數上升的時候,其性能卻在降低,
所以該方案將會下降網站的吞吐量。
實現挑戰:
mina的session.write()和receive消息都是異步的,那麼須要在主線程上阻塞以等待響應的到達。
鏈接池代碼:
/** * 空閒鏈接池 */ private static BlockingQueue<Connection> idlePool = new LinkedBlockingQueue<Connection>(); /** * 使用中的鏈接池 */ public static BlockingQueue<Connection> activePool = new LinkedBlockingQueue<Connection>(); public static Connection getConn() throws InterruptedException{ long time1 = System.currentTimeMillis(); Connection connection = null; connection = idlePool.take(); activePool.add(connection); long time2 = System.currentTimeMillis(); //log.info("獲取鏈接耗時:"+(time2-time1)); return connection; }
客戶端代碼:
public TransInfo send(TransInfo info) throws InterruptedException { Result result = new Result(); //獲取tcp鏈接 Connection connection = ConnectFutureFactory.getConnection(result); ConnectFuture connectFuture = connection.getConnection(); IoSession session = connectFuture.getSession(); session.setAttribute("result", result); //發送信息 session.write(info); //同步阻塞獲取響應 TransInfo synGetInfo = result.synGetInfo(); //此處並非真正關閉鏈接,而是將鏈接放回鏈接池 ConnectFutureFactory.close(connection,result); return synGetInfo; }
阻塞獲取服務端響應代碼:
public synchronized TransInfo synGetInfo() { //等待消息返回 //必需要在同步的狀況下執行 if (!done) { try { wait(); } catch (InterruptedException e) { log.error(e.getMessage(), e); } } return info; } public synchronized void synSetInfo(TransInfo info) { this.info = info; this.done = true; notify(); }
測試場景:
每一個請求的業務處理時間110ms
300個線程300個鏈接併發測試,每一個線程循環請求服務端
測試環境:
客戶端服務器:
Cpu爲4線程 2400mhz
服務端cpu: 4線程 3000Mhz
測試結果:
在通過10分鐘測試以後,穩定狀況下的tps
Tps:2332左右
客戶端Cpu:90%
服務端cpu:250%
從測試結果能夠看出,當鏈接數足夠大的時候,系統性能會下降,開啓的tcp鏈接數越多,那麼
系統開銷將會越大。
通訊圖:
一個socket鏈接在同一時間內傳輸屢次請求的信息,輸入通道接收多條響應消息,消息是連續發出,連續收回的。
業務處理和發消息是異步的,一個業務線程告訴通道發送消息後,再也不佔用通道,而是等待響應到達,而此時其它
業務線程也能夠往該鏈接通道發信息,這樣能夠充分利用通道來進行通訊。
實現挑戰
但該方案使編碼變得複雜,如上圖,請求request1,request2,request3順序發出,可是服務端處理請求並非
排隊的,而是並行處理的,有可能request3先於request1響應給客戶端,那麼一個request將沒法找到他的response,
這時候咱們須要在request和response報文中添加惟一標識,如通訊序列號,在一個通訊通道里面保持惟一,
那麼能夠根據序列號去獲取對應的響應報文。
個人方案是:
1.客戶端獲取一個tcp鏈接
2.調用session.write()發送信息,並將消息的惟一序列號存入一個Result對象
result對象存入一個map
3.同步阻塞獲取結果,線程在result對象進行同步阻塞
4.接收消息,並經過惟一序列號從map裏面獲取result對象,並喚醒阻塞在result對象上的線程
客戶端發送消息示例代碼:
public TransInfo send(TransInfo info) throws InterruptedException { Result result = new Result(); result.setInfo(info); //獲取socket鏈接 ConnectFuture connectFuture = ConnectFutureFactory .getConnection(result); IoSession session = connectFuture.getSession(); //將result放入ConcurrentHashMap ConcurrentHashMap<Long, Result> resultMap = (ConcurrentHashMap<Long, Result>)session.getAttribute("resultMap"); resultMap.put(info.getId(), result); //發送消息 session.write(info); //同步阻塞獲取結果 return result.synGetInfo(); }
同步阻塞和喚醒方法:
public synchronized TransInfo synGetInfo() { //等待消息返回 //必需要在同步的狀況下執行 while (!done) { try { wait(); } catch (InterruptedException e) { log.error(e.getMessage(), e); } } return info; } public synchronized void synSetInfo(TransInfo info) { this.info = info; this.done = true; notify(); }
接收消息示例代碼:
public void messageReceived(IoSession session, Object message) throws Exception { TransInfo info = (TransInfo) message; //根據惟一序列號從resultMap中獲取result ConcurrentHashMap<Long, Result> resultMap = (ConcurrentHashMap<Long, Result>)session.getAttribute("resultMap"); //移除result Result result = resultMap.remove(info.getId()); //喚醒阻塞線程 result.synSetInfo(info); }
測試場景:
每一個請求的業務處理時間110ms
300個線程10個鏈接併發測試,每一個線程循環請求服務端
測試環境:
客戶端服務器:
Cpu爲4線程 2400mhz
服務端cpu: 4線程 3000Mhz
測試結果:
在通過10分鐘測試以後,穩定狀況下的tps
Tps:2600左右
客戶端Cpu:25%
服務端cpu:250%
經測試發現,異步通訊能夠用更少的tcp鏈接實現一樣高效的通訊,極大的減小了系統性能開銷。
今天暫時寫到這裏。
參考 文章
http://www.2cto.com/os/201203/125511.html
wireshark-win32-1.6.5.exe:
http://down.51cto.com/data/685517
RPC與消息隊列的區別
http://oldratlee.com/post/2013-02-01/synchronous-rpc-vs-asynchronous-message
tcp長鏈接與短鏈接的區別
http://www.cnblogs.com/liuyong/archive/2011/07/01/2095487.html
http://blog.chinaunix.net/uid-354915-id-3587924.html
keep-alived詳解
http://wudi.in/archives/446.html
http://www.nowamagic.net/academy/detail/23350305
wireshark抓包詳解
http://www.cnblogs.com/TankXiao/archive/2012/10/10/2711777.html
長鏈接,同步異步參考
http://www.yeolar.com/note/2012/11/10/c10k/
同步隊列:
http://ifeve.com/java-synchronousqueue/
netty: