首先看一下RemotingCommand的幾個重要屬性:json
private int code; private LanguageCode language = LanguageCode.JAVA; private int version = 0; private int opaque = requestId.getAndIncrement(); private int flag = 0; private String remark; private HashMap<String, String> extFields; private transient CommandCustomHeader customHeader; private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer; private transient byte[] body;
除了static以外,還有body、extfields是transitent,除此以外都是要直接進行序列化的,默認用fastjson直接序列化。app
這裏面的extfields跟customHeader是互相轉換的,也就是序列化的時候用前者傳入,在代碼裏面用反序列化後的customer對象作操做。customHeader是一個接口,他有不少實現類,不一樣的request請求體除了body不一樣以外,還有一個不一樣就在於customHeader不一樣。ide
好比註冊broker的:this
final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader(); requestHeader.setBrokerAddr(brokerAddr); requestHeader.setBrokerId(brokerId); requestHeader.setBrokerName(brokerName); requestHeader.setClusterName(clusterName); requestHeader.setHaServerAddr(haServerAddr); requestHeader.setCompressed(compressed); RegisterBrokerBody requestBody = new RegisterBrokerBody(); requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper); requestBody.setFilterServerList(filterServerList); final byte[] body = requestBody.encode(compressed); final int bodyCrc32 = UtilAll.crc32(body);
這裏的requestHeader屬於RegisterBrokerRequestHeader,就是一種特殊的commandheader,他有本身獨特的屬性,好比brokeraddr、brokerId等等。通通塞入header裏面。netty
再構造須要的body體,把header跟序列化後的body一塊兒:code
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader); request.setBody(body);
public static RemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader) { RemotingCommand cmd = new RemotingCommand(); cmd.setCode(code); cmd.customHeader = customHeader; setCmdVersion(cmd); return cmd; }
把header、body都放入RemotingCommand裏面,回到command的屬性,只有code、version、remark、opaque纔是通用的屬性,其餘通通經過header、body實現個性化。對於註冊broker來講,個性化的body就是註冊的具體內容。對象
拿到一個完整的command之後,就是如何在netty裏面進行序列化和反序列化了,這裏面的body已經被序列化了,還剩下通用屬性和header屬性。blog
回到encode方法:接口
@Override public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out) throws Exception { try { ByteBuffer header = remotingCommand.encodeHeader(); out.writeBytes(header); byte[] body = remotingCommand.getBody(); if (body != null) { out.writeBytes(body); } } catch (Exception e) { log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e); if (remotingCommand != null) { log.error(remotingCommand.toString()); } RemotingUtil.closeChannel(ctx.channel()); } }
簡單來看就是header跟body一塊兒放入out裏面。進入encodeHeader方法:ip
public ByteBuffer encodeHeader() { return encodeHeader(this.body != null ? this.body.length : 0); } public ByteBuffer encodeHeader(final int bodyLength) { // 1> header length size int length = 4; // 2> header data length byte[] headerData; headerData = this.headerEncode(); length += headerData.length; // 3> body data length length += bodyLength; ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength); // length result.putInt(length); // header length result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC)); // header data result.put(headerData); result.flip(); return result; }
使用LengthFieldBasedFrameDecoder的時候,第一個put的數據就是整體長度:全部header(包括公共的和extrafields的)+ body+markprotocol獲得的長度(全部header長度),這裏的整體長度不須要包括整體長度自己,也就是putint(length)自己不須要額外算4個字節,這裏面多出來的4是因爲markprotocol的長度。
private byte[] headerEncode() { this.makeCustomHeaderToNet(); if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) { return RocketMQSerializable.rocketMQProtocolEncode(this); } else { return RemotingSerializable.encode(this); } }
在makeCustomHeaderToNet方法裏面,經過反射把header的內容所有放入extrafields裏面,這樣再經過encode(this)就能夠直接把header的內容(已經轉換成extrafields)和公共屬性所有序列化成result。
再看看反序列化:
public static RemotingCommand decode(final ByteBuffer byteBuffer) { int length = byteBuffer.limit(); int oriHeaderLen = byteBuffer.getInt(); int headerLength = getHeaderLength(oriHeaderLen); byte[] headerData = new byte[headerLength]; byteBuffer.get(headerData); RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen)); int bodyLength = length - 4 - headerLength; byte[] bodyData = null; if (bodyLength > 0) { bodyData = new byte[bodyLength]; byteBuffer.get(bodyData); } cmd.body = bodyData; return cmd; }
雖然咱們在encode的時候塞入了整體length,可是LengthFieldBasedFrameDecoder已經幫咱們解析了去掉了,咱們第一個取數據的時候是後面的markeprotocolType獲得的header長度,
根據header長度之後拿到header,他就是除了body以外全部的數據。body能夠經過後面的buff拿到,header反序列化後獲得的extrafields能夠進一步轉換成commandHeader,因爲在公共屬性裏面咱們塞入了code,因而咱們能夠知道應該轉換成具體哪種commandHeader。