你是否想知道一個分佈式系統的網絡傳輸解決方案,那你能夠學習下RocketMQ的網絡傳輸原理,從RocketMQ的Remoting網絡處理部分,能夠學習到如何進行高效的網絡傳輸,這些思想能夠應用到不一樣的業務中。java
其實大部分應用的網絡處理都要解決以下圖所示的問題:json
那麼就以RocketMQ的源碼入手,看看它是如何架構如上的結構的。網絡
2.2 編碼解碼數據結構
在RocketMQ中,全部的通信都是使用RemotingCommand這個結構,這個結構的內容以下:架構
[java] view plain copyapp
[java] view plain copy異步
private transient CommandCustomHeader customHeader; // 業務邏輯中使用該結構,傳輸時,使用 extFields
/** * Body 部分 */private transient byte[] body;async
2.2.1 RemotingCommand轉爲網絡傳輸數據分佈式
在MQ中,全部數據傳輸都使用該數據結構進行數據傳輸,當把數據轉爲網絡傳輸時,會將customHeader轉爲HashMap的extFields,再轉爲json串函數
2.2.2 傳輸格式:
Length |
Header length |
Header data |
Body |
2.2.3 編碼過程(重點函數:makeCustomHeaderToNet)
A. 將業務上的CustomHeader轉爲extFields;
B. 而後調用RemotingSerializable的encode,將RemotingCommand的Header部分轉爲byte類型
C. 在按照傳輸格式,將數據轉爲最終的header+body結構進行傳輸
2.2.4 解碼過程(重點函數:decodeCommandCustomHeader)
A. 首先將獲取的byteBuffer,按照傳輸格式進行解包,獲得其headerData和bodyData
B. 將HeaderData部分進行decode,解包爲RemotingCommand
C. 業務層調用decodeCommandCustomHeader(m.class)將頭部解析爲對應的m類
備註:
transient:當序列化類時,有該屬性的變量不進行序列化
Netty在處理通訊層的事件時,將其NettyEventExecuter的eventQueue中,再起一個線程,不斷地處理存入消息。
2.3.1 Put消息
在Netty的註冊部分,handler在addLast的時候,將NettyConnetManageHandler註冊進去;這裏面對應了connect、disconnect、close、channelRegistered等等事件,對於這些事件,將會調用NettyEventExecuter.putNettyEvent將消息放入Queue中;
2.3.2 Get處理消息
NettyEventExecuter處理線程會不斷從queue中讀取消息進行處理,調用註冊的ChannelEventListener進行處理;
2.4.1 NettyRemotingAbstract:它是做爲NettyRemotingServer和NettyRemotingClient的基類,對發送和接收的公共部分進行了處理
A. 數據結構和基礎函數
A.1 首先保存了RPC處理器:HashMap<Integer/*request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable
A.2 其次保存了全部對外請求ConcurrentHashMap<Integer/* opaque */, ResponseFuture> responseTable =
new ConcurrentHashMap<Integer,ResponseFuture>(256);
A.3 scanResponseTable:掃描responseTable,將超時的ResponseFuture直接移除
B. 發送部分
B.1 invokeSyncImpl: 同步發送,發送時,生成ResponseFuture,放入responseTable中;而後發送後等待設置的timeout(3s)時間,若是對應的ResponseFuture爲空,則報錯;不然返回RemoteCommand進行業務邏輯處理;
B.2 invokeAsyncImpl:異步發送,發送時,生成ResponseFuture,放入responseTable中;若是超過scanResponseTable的timeout (30s),則報錯;不然調用註冊的invokeCallback進行回調處理;
B.3 invokeOnewayImpl:單向發送,不將消息寫入responseTable中,直接返回;
C. 接收消息部分
C.1 processRequestCommand:接收消息,做爲Server端,接收的消息是請求,那麼調用processTable對應的事件進行處理
C.2 processResponseCommand:接收消息,做爲Client端,接收的消息是回覆,即接收到Server端的回覆,那麼從responseTable中,首先獲取opaque對應的ResponseFuture,若是這個response是異步回調,則有InvokeCallback,那麼調用invokeBack函數,而後將Response塞入ResponseFuture後返回;
2.4.2 NettyRemotingServer
處理過程以下:
首先全部的入口都在start函數:
若是是input方向,那麼會先調用NettyDecoder->NettyConnectManageHandler->NettyServerHandler
NettyDecoder(底層編碼):會將數據包從byte轉爲RemotingCommand
NettyConnectManageHandler(通訊層事件):會將請求轉入channelRegistered、channelUnregistered、channelActive、channelInactive、userEventTriggered、exceptionCaught,對應的調用NettyRemotingAbstract.putNettyEvent將事件放入Queue中,等待NettyEventExecuter進行處理
NettyServerHandler(業務層事件):調用註冊的<Integer/*request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable,進行業務邏輯處理,當processRequestCommand接收到消息時,進行對應的處理
2.4.3 NettyRemotingClient
首先全部的入口都在start函數:
若是是input方向,那麼會先調用NettyDecoder->NettyConnectManageHandler->NettyClientHandler
NettyDecoder:會將數據包從byte轉爲RemotingCommand
NettyConnectManageHandler:會將請求轉入channelRegistered、channelUnregistered、channelActive、channelInactive、userEventTriggered、exceptionCaught,對應的調用NettyRemotingAbstract.putNettyEvent將事件放入Queue中,等待NettyEventExecuter進行處理
NettyClientHandler:調用註冊的<Integer/* request code */,Pair<NettyRequestProcessor, ExecutorService>> processorTable, 進行業務邏輯處理
ConcurrentMap<String /* addr */, ChannelWrapper> channelTables = new ConcurrentHashMap<String, ChannelWrapper>():一個remotingClient會管理不少個channel
最後,是client端的超時時間處理,若是鏈接超過120s沒有接收到發送和請求,那麼將會斷開鏈接,不然將會是長鏈接的一個保持。
一個實例:在producer,consumer的鏈接保持中,雖然有120s的超時時間,可是他們基本都是長鏈接的一個保持,由於會經過心跳來保持全部的鏈接。
2.5 後臺服務:
1. NettyEventExecutor和ChannelEventListener:主要負責處理connect,disconnect,close等消息。
2. scanResonseTable : 主要負責清理過時超時的response。
3. 異步回調:不算是標準的後臺服務,當採用async的發送方式或sync的回調模式時,會在後臺線程中執行。
在整個網絡傳輸部分,有以下值得思考借鑑的地方:
3.1 編解碼:編碼要節省資源,經常使用bit,位爲單位進行編碼,最終轉爲json或xml傳輸(固然還能夠選擇probuf等)
3.2 限流的使用,這裏採用semaphore來進行限流處理
3.3 rpchook的設計,發送前,接受後的hook設計
3.4 發送與接收
發送:invokeAsyncImpl, invokeOnewayImpl, invokeSyncImpl
接收:processReceiveMessage, processRequestMessage, processResponseMessage
發送與接收使用opaque和responseFuture進行交互(即ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable),其中,opaque用於標示發送/接收對,responseFuture的countDownLatch字段用於通知客戶端接收到消息,並控制超時時間。
3.5 後臺服務的設計
有不少服務不須要是實時的,須要在一致性和可用性之間找到一個平衡,所以,不少非實時任務能夠採用一個全局的單線程來維護,參考上面2.5的描述。