netty的標準設計流程爲:編碼,解碼,檢測,連接,其餘handler,業務。按照這個流程將rocketmq的netty的實現流程進行細化。java
編碼
NettyEncoder
繼承MessageToByteEncoder,netty的編碼規範要求
將RemotingCommand的請求數據結構數據頭:存數據的長度描述;數據體:存數據的內容
RemotingCommand的功能強大:涉及ByteBuffer的頻繁操做,實現數據協議的轉換
總體的協議格式:<length><header length><header data><body data>
1,4個字節的int型數據存儲數據的總長度
2,4個字節的int型數據存儲報文頭的字節長度
3,存儲的頭部數據內容
4,存儲的報文的數據內容數組
public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> { private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); @Override public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out) throws Exception { try { //得到請求體消息頭,封裝爲byte的協議要求 ByteBuffer header = remotingCommand.encodeHeader(); //寫入頭 out.writeBytes(header); //得到消息體 byte[] body = remotingCommand.getBody(); if (body != null) { //寫入協議 out.writeBytes(body); } } catch (Exception e) { log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e); if (remotingCommand != null) { log.error(remotingCommand.toString()); } RemotingUtil.closeChannel(ctx.channel()); } } }
解碼
NettyDecoder
繼承LengthFieldBasedFrameDecoder,netty的編碼要求,該實現是基於長度的要求解決拆包粘包問題
將ByteBuf的數據輸入按照長度的要求,數據解碼到ByteBuf的存儲體內
將netty封裝的ByteBuf對象轉換爲java的Nio的ByteBuffer對象:ByteBuf.nioBuffer()
全部的操做都是基於RemotingCommand的decode來實現
解碼的操做過程基於編碼的規範來
1,得到數據的最大有效位置
2,得到第一個int位的值,該值爲數據長度,包括頭數據和報文數據
3,byteBuffer操做得到頭部的數據
4,將數據的最大有效位置減去 4(標識),再減去頭數據的長度,獲得報文的長度
5,根據報文長度,byteBuffer操做得到報文長度網絡
public class NettyDecoder extends LengthFieldBasedFrameDecoder { private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); //基於長度的解碼器,定義長度的標準,友好處理粘包及拆包 private static final int FRAME_MAX_LENGTH = Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "16777216")); public NettyDecoder() { super(FRAME_MAX_LENGTH, 0, 4, 0, 4); } @Override public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { ByteBuf frame = null; try { //基於父類的解碼,共性操做 frame = (ByteBuf) super.decode(ctx, in); if (null == frame) { return null; } //將netty的bytebuf轉換爲java標準的butebuffer ByteBuffer byteBuffer = frame.nioBuffer(); //按照編碼協議,將結果轉換爲對象 return RemotingCommand.decode(byteBuffer); } catch (Exception e) { log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e); RemotingUtil.closeChannel(ctx.channel()); } finally { if (null != frame) { frame.release(); } } return null; } }
Idle
netty的系統的IdleStateHandler,操做,只是配置了指定的最大idle時間
能夠實現心跳的檢測數據結構
連接管理
NettyConnectManageHandler
繼承ChannelDuplexHandler,netty的輸入輸入及寫出的標準設計
將重點對服務的註冊,連接激活,連接關閉,連接的檢測,連接的操做異常進行管理
連接激活:若是本機的監聽存在則設置到永久循環隊列中,之間內部業務管理,事件爲連接
激活關閉:若是本機的監聽存在則設置到永久循環隊列中,之間內部業務管理,事件爲關閉
連接檢測:若是本機的監聽存在則設置到永久循環隊列中,之間內部業務管理,事件爲檢測
異常操做:若是本機的監聽存在則設置到永久循環隊列中,之間內部業務管理,事件爲異常
底層是基於無限循環的操做隊列內容,對事件進行操做,基於接口設計,不一樣的服務端有不一樣的業務需求ide
class NettyConnectManageHandler extends ChannelDuplexHandler { @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); log.info("NETTY SERVER PIPELINE: channelRegistered {}", remoteAddress); super.channelRegistered(ctx); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); log.info("NETTY SERVER PIPELINE: channelUnregistered, the channel[{}]", remoteAddress); super.channelUnregistered(ctx); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); log.info("NETTY SERVER PIPELINE: channelActive, the channel[{}]", remoteAddress); super.channelActive(ctx); //處理鏈接的事件 if (NettyRemotingServer.this.channelEventListener != null) { NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress, ctx.channel())); } } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); log.info("NETTY SERVER PIPELINE: channelInactive, the channel[{}]", remoteAddress); super.channelInactive(ctx); //處理鏈接關閉的事件 if (NettyRemotingServer.this.channelEventListener != null) { NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel())); } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state().equals(IdleState.ALL_IDLE)) { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress); RemotingUtil.closeChannel(ctx.channel()); if (NettyRemotingServer.this.channelEventListener != null) { //處理檢測的事件 NettyRemotingServer.this .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel())); } } } ctx.fireUserEventTriggered(evt); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); log.warn("NETTY SERVER PIPELINE: exceptionCaught {}", remoteAddress); log.warn("NETTY SERVER PIPELINE: exceptionCaught exception.", cause); //處理異常的事件 if (NettyRemotingServer.this.channelEventListener != null) { NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel())); } RemotingUtil.closeChannel(ctx.channel()); } }
業務處理
NettyServerHandler:服務端;NettyClientHandler:客戶端
都繼承SimpleChannelInboundHandler,基於netty的標準規範,主要實現數據的接受
實現channelRead0方法,實現標準的數據接收
封裝服務端和客戶端共享的方法processMessageReceived來處理數據
處理數據中根據數據類型區分爲請求類型和相應類型,若是是服務端,更多的處理請求數據,若是是發送端更多的是出來響應數據
若是是請求數據,內部將不一樣的事件封裝成事件處理模型,根據事件類型得到對應的事件模型來處理
若是是響應數據,主要是針對前面的請求數據,進行結果的設置this
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> { //讀取網絡請求 @Override protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { processMessageReceived(ctx, msg); } }
=========================================================================================編碼
下面將協議的實現細節增長介紹,主要是編碼和解碼部分設計
編碼消息頭的對象netty
public ByteBuffer encodeHeader() { return encodeHeader(this.body != null ? this.body.length : 0); } public ByteBuffer encodeHeader(final int bodyLength) { // 1> header length size //固定長度,頭部長度的設置 int length = 4; // 2> header data length //編碼頭部內容爲byte數組,並得到長度 byte[] headerData; headerData = this.headerEncode(); //增長頭部長度 length += headerData.length; // 3> body data length //增長報文的長度 length += bodyLength; //數據頭的總長度 ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength); // length 數據的總長度 result.putInt(length); // header length result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC)); // header data result.put(headerData); result.flip(); return result; }
解碼是基於編碼的規範要求和實現進行逆操做code
public static RemotingCommand decode(final ByteBuffer byteBuffer) { //得到消息走長度 int length = byteBuffer.limit(); //得到第一個int位值 int oriHeaderLen = byteBuffer.getInt(); //獲取消息頭內容的長度 int headerLength = getHeaderLength(oriHeaderLen); //得到消息頭內容 byte[] headerData = new byte[headerLength]; byteBuffer.get(headerData); //將消息頭解碼爲對象 RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen)); //得到報文體的長度 int bodyLength = length - 4 - headerLength; byte[] bodyData = null; if (bodyLength > 0) { bodyData = new byte[bodyLength]; byteBuffer.get(bodyData); } cmd.body = bodyData; return cmd; }