《RocketMq》1、網絡傳輸篇

你是否想知道一個分佈式系統的網絡傳輸解決方案,那你能夠學習下RocketMQ的網絡傳輸原理,從RocketMQ的Remoting網絡處理部分,能夠學習到如何進行高效的網絡傳輸,這些思想能夠應用到不一樣的業務中。java

 

1、要解決的問題

         其實大部分應用的網絡處理都要解決以下圖所示的問題:json

 

 

 

 

那麼就以RocketMQ的源碼入手,看看它是如何架構如上的結構的。網絡

 

2、RocketMQ-remoting詳解

2.1首先給出其總體的結構圖

 

2.2 編碼解碼數據結構

在RocketMQ中,全部的通信都是使用RemotingCommand這個結構,這個結構的內容以下:架構

 

[java] view plain copyapp

  1. private static final int RPC_TYPE = 0; // 0, REQUEST_COMMAND  
  2. // 1, RESPONSE_COMMAND  
  3.   
  4. private static final int RPC_ONEWAY = 1; // 0, RPC  
  5. // 1, Oneway  
  6.   
  7. /** 
  8.  * Header 部分 
  9.  */  
  10. private int code; // 用於標示請求類型,參見RequestCode,ResponseCode  
  11. private LanguageCode language = LanguageCode.JAVA;  
  12. private int version = 0;  
  13. private int opaque = RequestId.getAndIncrement(); // 每一個消息的惟一標誌,request和response經過該字段匹配  
  14. private int flag = 0;  
  15. private String remark;  
  16. private HashMap<String, String> extFields; // 傳輸時使用,CommandCustomHeader轉爲該結構<key,value>後,再統一轉爲json傳輸。所以  

[java] view plain copy異步

  1. CommandCustomHeader只能是String,Int,Long等基礎數據結構,不能是複合數據結構  

private transient CommandCustomHeader customHeader; // 業務邏輯中使用該結構,傳輸時,使用 extFields
/** * Body 部分 */private transient byte[] body;async

 

 

2.2.1 RemotingCommand轉爲網絡傳輸數據分佈式

在MQ中,全部數據傳輸都使用該數據結構進行數據傳輸,當把數據轉爲網絡傳輸時,會將customHeader轉爲HashMap的extFields,再轉爲json串函數

 

2.2.2 傳輸格式:

Length

Header length

Header data

Body

 

2.2.3 編碼過程(重點函數:makeCustomHeaderToNet)

A. 將業務上的CustomHeader轉爲extFields;

B. 而後調用RemotingSerializable的encode,將RemotingCommand的Header部分轉爲byte類型

C. 在按照傳輸格式,將數據轉爲最終的header+body結構進行傳輸

 

2.2.4 解碼過程(重點函數:decodeCommandCustomHeader)

A. 首先將獲取的byteBuffer,按照傳輸格式進行解包,獲得其headerData和bodyData

B. 將HeaderData部分進行decode,解包爲RemotingCommand

C. 業務層調用decodeCommandCustomHeader(m.class)將頭部解析爲對應的m類

 

 

備註:

transient:當序列化類時,有該屬性的變量不進行序列化

 

2.3 通訊層處理

Netty在處理通訊層的事件時,將其NettyEventExecuter的eventQueue中,再起一個線程,不斷地處理存入消息。

2.3.1 Put消息

在Netty的註冊部分,handler在addLast的時候,將NettyConnetManageHandler註冊進去;這裏面對應了connect、disconnect、close、channelRegistered等等事件,對於這些事件,將會調用NettyEventExecuter.putNettyEvent將消息放入Queue中;
2.3.2 Get處理消息
NettyEventExecuter處理線程會不斷從queue中讀取消息進行處理,調用註冊的ChannelEventListener進行處理;
 

2.4 業務層處理

2.4.1 NettyRemotingAbstract:它是做爲NettyRemotingServer和NettyRemotingClient的基類,對發送和接收的公共部分進行了處理

A. 數據結構和基礎函數

A.1 首先保存了RPC處理器:HashMap<Integer/*request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable

A.2 其次保存了全部對外請求ConcurrentHashMap<Integer/* opaque */, ResponseFuture> responseTable =
        new ConcurrentHashMap<Integer,ResponseFuture>(256);

A.3  scanResponseTable:掃描responseTable,將超時的ResponseFuture直接移除

 

B. 發送部分

B.1 invokeSyncImpl: 同步發送,發送時,生成ResponseFuture,放入responseTable中;而後發送後等待設置的timeout(3s)時間,若是對應的ResponseFuture爲空,則報錯;不然返回RemoteCommand進行業務邏輯處理;

B.2 invokeAsyncImpl:異步發送,發送時,生成ResponseFuture,放入responseTable中;若是超過scanResponseTable的timeout (30s),則報錯;不然調用註冊的invokeCallback進行回調處理;

B.3 invokeOnewayImpl:單向發送,不將消息寫入responseTable中,直接返回;

 

C. 接收消息部分

C.1 processRequestCommand:接收消息,做爲Server端,接收的消息是請求,那麼調用processTable對應的事件進行處理

C.2 processResponseCommand:接收消息,做爲Client端,接收的消息是回覆,即接收到Server端的回覆,那麼從responseTable中,首先獲取opaque對應的ResponseFuture,若是這個response是異步回調,則有InvokeCallback,那麼調用invokeBack函數,而後將Response塞入ResponseFuture後返回;

 

 

 

2.4.2 NettyRemotingServer

處理過程以下:

首先全部的入口都在start函數:

若是是input方向,那麼會先調用NettyDecoder->NettyConnectManageHandler->NettyServerHandler

NettyDecoder(底層編碼):會將數據包從byte轉爲RemotingCommand

NettyConnectManageHandler(通訊層事件):會將請求轉入channelRegistered、channelUnregistered、channelActive、channelInactive、userEventTriggered、exceptionCaught,對應的調用NettyRemotingAbstract.putNettyEvent將事件放入Queue中,等待NettyEventExecuter進行處理

NettyServerHandler(業務層事件):調用註冊的<Integer/*request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable,進行業務邏輯處理,當processRequestCommand接收到消息時,進行對應的處理

 

2.4.3 NettyRemotingClient

首先全部的入口都在start函數:

若是是input方向,那麼會先調用NettyDecoder->NettyConnectManageHandler->NettyClientHandler

NettyDecoder:會將數據包從byte轉爲RemotingCommand

NettyConnectManageHandler:會將請求轉入channelRegistered、channelUnregistered、channelActive、channelInactive、userEventTriggered、exceptionCaught,對應的調用NettyRemotingAbstract.putNettyEvent將事件放入Queue中,等待NettyEventExecuter進行處理

NettyClientHandler:調用註冊的<Integer/* request code */,Pair<NettyRequestProcessor, ExecutorService>> processorTable, 進行業務邏輯處理

ConcurrentMap<String /* addr */, ChannelWrapper> channelTables = new ConcurrentHashMap<String, ChannelWrapper>():一個remotingClient會管理不少個channel

最後,是client端的超時時間處理,若是鏈接超過120s沒有接收到發送和請求,那麼將會斷開鏈接,不然將會是長鏈接的一個保持。

一個實例:在producer,consumer的鏈接保持中,雖然有120s的超時時間,可是他們基本都是長鏈接的一個保持,由於會經過心跳來保持全部的鏈接。

2.5 後臺服務:

1. NettyEventExecutor和ChannelEventListener:主要負責處理connect,disconnect,close等消息。

2. scanResonseTable : 主要負責清理過時超時的response。

3. 異步回調:不算是標準的後臺服務,當採用async的發送方式或sync的回調模式時,會在後臺線程中執行。

 

三. 一些總結與想法

在整個網絡傳輸部分,有以下值得思考借鑑的地方:

3.1 編解碼:編碼要節省資源,經常使用bit,位爲單位進行編碼,最終轉爲json或xml傳輸(固然還能夠選擇probuf等)

3.2 限流的使用,這裏採用semaphore來進行限流處理

3.3 rpchook的設計,發送前,接受後的hook設計

3.4 發送與接收

發送:invokeAsyncImpl, invokeOnewayImpl, invokeSyncImpl

接收:processReceiveMessage, processRequestMessage, processResponseMessage

發送與接收使用opaque和responseFuture進行交互(即ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable),其中,opaque用於標示發送/接收對,responseFuture的countDownLatch字段用於通知客戶端接收到消息,並控制超時時間。

3.5 後臺服務的設計

有不少服務不須要是實時的,須要在一致性和可用性之間找到一個平衡,所以,不少非實時任務能夠採用一個全局的單線程來維護,參考上面2.5的描述。

相關文章
相關標籤/搜索