rocketmq之源碼分析netty流程及細節(七)

netty的標準設計流程爲:編碼,解碼,檢測,連接,其餘handler,業務。按照這個流程將rocketmq的netty的實現流程進行細化。java

編碼
    NettyEncoder
        繼承MessageToByteEncoder,netty的編碼規範要求
        將RemotingCommand的請求數據結構數據頭:存數據的長度描述;數據體:存數據的內容
            RemotingCommand的功能強大:涉及ByteBuffer的頻繁操做,實現數據協議的轉換
            總體的協議格式:<length><header length><header data><body data>
            1,4個字節的int型數據存儲數據的總長度
            2,4個字節的int型數據存儲報文頭的字節長度
            3,存儲的頭部數據內容
            4,存儲的報文的數據內容數組

public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);

    @Override
    public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
        throws Exception {
        try {
            //得到請求體消息頭,封裝爲byte的協議要求
            ByteBuffer header = remotingCommand.encodeHeader();
            //寫入頭
            out.writeBytes(header);
            //得到消息體
            byte[] body = remotingCommand.getBody();
            if (body != null) {
                //寫入協議
                out.writeBytes(body);
            }
        } catch (Exception e) {
            log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
            if (remotingCommand != null) {
                log.error(remotingCommand.toString());
            }
            RemotingUtil.closeChannel(ctx.channel());
        }
    }
}

解碼
    NettyDecoder
        繼承LengthFieldBasedFrameDecoder,netty的編碼要求,該實現是基於長度的要求解決拆包粘包問題
        將ByteBuf的數據輸入按照長度的要求,數據解碼到ByteBuf的存儲體內
            將netty封裝的ByteBuf對象轉換爲java的Nio的ByteBuffer對象:ByteBuf.nioBuffer()
            全部的操做都是基於RemotingCommand的decode來實現
            解碼的操做過程基於編碼的規範來
            1,得到數據的最大有效位置
            2,得到第一個int位的值,該值爲數據長度,包括頭數據和報文數據
            3,byteBuffer操做得到頭部的數據
            4,將數據的最大有效位置減去 4(標識),再減去頭數據的長度,獲得報文的長度
            5,根據報文長度,byteBuffer操做得到報文長度網絡

public class NettyDecoder extends LengthFieldBasedFrameDecoder {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);

    //基於長度的解碼器,定義長度的標準,友好處理粘包及拆包
    private static final int FRAME_MAX_LENGTH =
        Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "16777216"));

    public NettyDecoder() {
        super(FRAME_MAX_LENGTH, 0, 4, 0, 4);
    }

    @Override
    public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        ByteBuf frame = null;
        try {
            //基於父類的解碼,共性操做
            frame = (ByteBuf) super.decode(ctx, in);
            if (null == frame) {
                return null;
            }

            //將netty的bytebuf轉換爲java標準的butebuffer
            ByteBuffer byteBuffer = frame.nioBuffer();

            //按照編碼協議,將結果轉換爲對象
            return RemotingCommand.decode(byteBuffer);
        } catch (Exception e) {
            log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
            RemotingUtil.closeChannel(ctx.channel());
        } finally {
            if (null != frame) {
                frame.release();
            }
        }

        return null;
    }
}

Idle
    netty的系統的IdleStateHandler,操做,只是配置了指定的最大idle時間
    能夠實現心跳的檢測數據結構

連接管理
    NettyConnectManageHandler
        繼承ChannelDuplexHandler,netty的輸入輸入及寫出的標準設計
        將重點對服務的註冊,連接激活,連接關閉,連接的檢測,連接的操做異常進行管理
            連接激活:若是本機的監聽存在則設置到永久循環隊列中,之間內部業務管理,事件爲連接
            激活關閉:若是本機的監聽存在則設置到永久循環隊列中,之間內部業務管理,事件爲關閉
            連接檢測:若是本機的監聽存在則設置到永久循環隊列中,之間內部業務管理,事件爲檢測
            異常操做:若是本機的監聽存在則設置到永久循環隊列中,之間內部業務管理,事件爲異常
        底層是基於無限循環的操做隊列內容,對事件進行操做,基於接口設計,不一樣的服務端有不一樣的業務需求ide

class NettyConnectManageHandler extends ChannelDuplexHandler {
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
        log.info("NETTY SERVER PIPELINE: channelRegistered {}", remoteAddress);
        super.channelRegistered(ctx);
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
        log.info("NETTY SERVER PIPELINE: channelUnregistered, the channel[{}]", remoteAddress);
        super.channelUnregistered(ctx);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
        log.info("NETTY SERVER PIPELINE: channelActive, the channel[{}]", remoteAddress);
        super.channelActive(ctx);

        //處理鏈接的事件
        if (NettyRemotingServer.this.channelEventListener != null) {
            NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress, ctx.channel()));
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
        log.info("NETTY SERVER PIPELINE: channelInactive, the channel[{}]", remoteAddress);
        super.channelInactive(ctx);

        //處理鏈接關閉的事件
        if (NettyRemotingServer.this.channelEventListener != null) {
            NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state().equals(IdleState.ALL_IDLE)) {
                final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress);
                RemotingUtil.closeChannel(ctx.channel());
                if (NettyRemotingServer.this.channelEventListener != null) {
                    //處理檢測的事件
                    NettyRemotingServer.this
                        .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
                }
            }
        }

        ctx.fireUserEventTriggered(evt);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
        log.warn("NETTY SERVER PIPELINE: exceptionCaught {}", remoteAddress);
        log.warn("NETTY SERVER PIPELINE: exceptionCaught exception.", cause);

        //處理異常的事件
        if (NettyRemotingServer.this.channelEventListener != null) {
            NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
        }

        RemotingUtil.closeChannel(ctx.channel());
    }
}

業務處理
    NettyServerHandler:服務端;NettyClientHandler:客戶端
    都繼承SimpleChannelInboundHandler,基於netty的標準規範,主要實現數據的接受
        實現channelRead0方法,實現標準的數據接收
        封裝服務端和客戶端共享的方法processMessageReceived來處理數據
        處理數據中根據數據類型區分爲請求類型和相應類型,若是是服務端,更多的處理請求數據,若是是發送端更多的是出來響應數據
        若是是請求數據,內部將不一樣的事件封裝成事件處理模型,根據事件類型得到對應的事件模型來處理
        若是是響應數據,主要是針對前面的請求數據,進行結果的設置this

class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {

    //讀取網絡請求
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        processMessageReceived(ctx, msg);
    }
}

=========================================================================================編碼

下面將協議的實現細節增長介紹,主要是編碼和解碼部分設計

編碼消息頭的對象netty

public ByteBuffer encodeHeader() {
    return encodeHeader(this.body != null ? this.body.length : 0);
}

public ByteBuffer encodeHeader(final int bodyLength) {
    // 1> header length size
    //固定長度,頭部長度的設置
    int length = 4;

    // 2> header data length
    //編碼頭部內容爲byte數組,並得到長度
    byte[] headerData;
    headerData = this.headerEncode();

    //增長頭部長度
    length += headerData.length;

    // 3> body data length
    //增長報文的長度
    length += bodyLength;

    //數據頭的總長度
    ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);

    // length 數據的總長度
    result.putInt(length);

    // header length 
    result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));

    // header data
    result.put(headerData);

    result.flip();

    return result;
}

解碼是基於編碼的規範要求和實現進行逆操做code

public static RemotingCommand decode(final ByteBuffer byteBuffer) {
    //得到消息走長度
    int length = byteBuffer.limit();
    //得到第一個int位值
    int oriHeaderLen = byteBuffer.getInt();
    //獲取消息頭內容的長度
    int headerLength = getHeaderLength(oriHeaderLen);

    //得到消息頭內容
    byte[] headerData = new byte[headerLength];
    byteBuffer.get(headerData);

    //將消息頭解碼爲對象
    RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));

    //得到報文體的長度
    int bodyLength = length - 4 - headerLength;
    byte[] bodyData = null;
    if (bodyLength > 0) {
        bodyData = new byte[bodyLength];
        byteBuffer.get(bodyData);
    }
    cmd.body = bodyData;

    return cmd;
}
相關文章
相關標籤/搜索