在《簡易RPC框架:需求與設計》這篇文章中已經給出了協議的具體細節,協議類型爲二進制協議,以下:java
------------------------------------------------------------------------ | magic (2bytes) | version (1byte) | type (1byte) | reserved (7bits) | ------------------------------------------------------------------------ | status (1byte) | id (8bytes) | body length (4bytes) | ------------------------------------------------------------------------ | | | body ($body_length bytes) | | | ------------------------------------------------------------------------
協議的解碼咱們稱爲 decode,編碼咱們成爲 encode,下文咱們將直接使用 decode 和 encode 術語。react
decode 的本質就是講接收到的一串二進制報文,轉化爲具體的消息對象,在 Java 中,就是將這串二進制報文所包含的信息,用某種類型的對象存儲起來。微信
encode 則是將存儲了信息的對象,轉化爲具備相同含義的一串二進制報文,而後網絡收發模塊再將報文發出去。網絡
不管是 rpc 客戶端仍是服務端,都須要有一個 decode 和 encode 的邏輯。框架
rpc 客戶端與服務端之間的通訊,須要經過發送不一樣類型的消息來實現,例如:client 向 server 端發送的消息,多是請求消息,多是心跳消息,多是認證消息,而 server 向 client 發送的消息,通常就是響應消息。ide
利用 Java 中的枚舉類型,能夠將消息類型進行以下定義:this
/** * 消息類型 * * @author beanlam * @version 1.0 */ public enum MessageType { REQUEST((byte) 0x01), HEARTBEAT((byte) 0x02), CHECKIN((byte) 0x03), RESPONSE( (byte) 0x04), UNKNOWN((byte) 0xFF); private byte code; MessageType(byte code) { this.code = code; } public static MessageType valueOf(byte code) { for (MessageType instance : values()) { if (instance.code == code) { return instance; } } return UNKNOWN; } public byte getCode() { return code; } }
在這個類中設計了 valueOf 方法,方便進行具體的 byte 字節與具體的消息枚舉類型之間的映射和轉換。編碼
client 主動發起的一次 rpc 調用,要麼成功,要麼失敗,server 端有責任告知 client 這次調用的結果,client 也有責任去感知調用失敗的緣由,由於不必定是 server 端形成的失敗,多是由於 client 端在對消息進行預處理的時候,例如序列化,就已經出錯了,這種錯誤也應該做爲一次調用的調用結果返回給 client 調用者。所以引入一個調用狀態,與消息類型同樣,它也藉助了 Java 語言裏的枚舉類型來實現,並實現了方便的 valueOf 方法:atom
/** * 調用狀態 * * @author beanlam * @version 1.0 */ public enum InvocationStatus { OK((byte) 0x01), CLIENT_TIMEOUT((byte) 0x02), SERVER_TIMEOUT( (byte) 0x03), BAD_REQUEST((byte) 0x04), BAD_RESPONSE( (byte) 0x05), SERVICE_NOT_FOUND((byte) 0x06), SERVER_SERIALIZATION_ERROR( (byte) 0x07), CLIENT_SERIALIZATION_ERROR((byte) 0x08), CLIENT_CANCELED( (byte) 0x09), SERVER_BUSY((byte) 0x0A), CLIENT_BUSY( (byte) 0x0B), SERIALIZATION_ERROR((byte) 0x0C), INTERNAL_ERROR( (byte) 0x0D), SERVER_METHOD_INVOKE_ERROR((byte) 0x0E), UNKNOWN((byte) 0xFF); private byte code; InvocationStatus(byte code) { this.code = code; } public static InvocationStatus valueOf(byte code) { for (InvocationStatus instance : values()) { if (code == instance.code) { return instance; } } return UNKNOWN; } public byte getCode() { return code; } }
咱們將 client 往 server 端發送的統一稱爲 rpc 請求消息,一個請求對應着一個響應,所以在 client 和 server 端間流動的信息大致上其實就只有兩種,即要麼是請求,要麼是響應。咱們將會定義兩個類,分別是 RpcRequest 和 RpcResponse 來表明請求消息和響應消息。spa
另外因爲不管是請求消息仍是響應消息,它們都有一些共同的屬性,例如說「調用上下文ID」,或者消息類型。所以會再定義一個 RpcMessage 類,做爲父類。
/** * rpc消息 * * @author beanlam * @version 1.0 */ public class RpcMessage { private MessageType type; private long contextId; private Object data; public long getContextId() { return this.contextId; } public void setContextId(long id) { this.contextId = id; } public Object getData() { return this.data; } public void setData(Object data) { this.data = data; } public void setType(byte code) { this.type = MessageType.valueOf(code); } public MessageType getType() { return this.type; } public void setType(MessageType type) { this.type = type; } @Override public String toString() { return "[messageType=" + type.name() + ", contextId=" + contextId + ", data=" + data + "]"; } }
import java.util.concurrent.atomic.AtomicLong; /** * rpc請求消息 * * @author beanlam * @version 1.0 */ public class RpcRequest extends RpcMessage { private static final AtomicLong ID_GENERATOR = new AtomicLong(0); public RpcRequest() { this(ID_GENERATOR.incrementAndGet()); } public RpcRequest(long contextId) { setContextId(contextId); setType(MessageType.REQUEST); } }
/** * * rpc響應消息 * * @author beanlam * @version 1.0 */ public class RpcResponse extends RpcMessage { private InvocationStatus status = InvocationStatus.OK; public RpcResponse(long contextId) { setContextId(contextId); setType(MessageType.RESPONSE); } public InvocationStatus getStatus() { return this.status; } public void setStatus(InvocationStatus status) { this.status = status; } @Override public String toString() { return "RpcResponse[contextId=" + getContextId() + ", status=" + status.name() + "]"; } }
netty 是一個 NIO 框架,應該這麼說,netty 是一個有良好設計思想的 NIO 框架。一個 NIO 框架必備的要素就是 reactor 線程模型,目前有一些比較優秀並且開源的小型 NIO 框架,例如分庫分表中間件 mycat 實現的一個簡易 NIO 框架,能夠在這裏看到。
netty 的主要特色有:微內核設計、責任鏈模式的業務邏輯處理、內存和資源泄露的檢測等。其中編解碼在 netty 中,都被設計成責任鏈上的一個一個 Handler。
decode 對於 netty 來講,它提供了 ByteToMessageDecoder,它也提供了 MessageToByteEncoder。
藉助 netty 來實現協議編解碼,實際上就是去在這兩個handler裏面實現編解碼的邏輯。
在實現 decode 邏輯時須要注意的一個問題是,因爲二進制報文是在網絡上發送的,所以一個完整的報文可能通過多個分組來發送的,什麼意思呢,就是當有報文進來後,要確認報文是否完整,decode邏輯代碼不能假設收到的報文就是一個完整報文,通常稱這爲「TCP半包問題」。一樣,報文是連着報文發送的,意味着decode代碼邏輯還要負責在一長串二進制序列中,分割出一個一個獨立的報文,這稱之爲「TCP粘包問題」。
netty 自己有提供一些方便的 decoder handler 來處理 TCP 半包和粘包的問題。不過通常狀況下咱們不會直接去用它,由於咱們的協議比較簡單,本身在代碼裏處理一下就能夠了。
完整的 decode 代碼邏輯以下所示:
import cn.com.agree.ats.rpc.message.*; import cn.com.agree.ats.util.logfacade.AbstractPuppetLoggerFactory; import cn.com.agree.ats.util.logfacade.IPuppetLogger; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; /** * 協議解碼器 * * @author beanlam * @version 1.0 */ public class ProtocolDecoder extends ByteToMessageDecoder { private static final IPuppetLogger logger = AbstractPuppetLoggerFactory .getInstance(ProtocolDecoder.class); private boolean magicChecked = false; @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> list) throws Exception { if (!magicChecked) { if (in.readableBytes() < ProtocolMetaData.MAGIC_LENGTH_IN_BYTES) { return; } magicChecked = true; if (!(in.getShort(in.readerIndex()) == ProtocolMetaData.MAGIC)) { logger.warn( "illegal data received without correct magic number, channel will be close"); ctx.close(); magicChecked = false; //this line of code makes no any sense, but it's good for a warning return; } } if (in.readableBytes() < ProtocolMetaData.HEADER_LENGTH_IN_BYTES) { return; } int bodyLength = in .getInt(in.readerIndex() + ProtocolMetaData.BODY_LENGTH_OFFSET); if (in.readableBytes() < bodyLength + ProtocolMetaData.HEADER_LENGTH_IN_BYTES) { return; } magicChecked = false;// so far the whole packet was received in.readShort(); // skip the magic in.readByte(); // dont care about the protocol version so far byte type = in.readByte(); byte status = in.readByte(); long contextId = in.readLong(); byte[] body = new byte[in.readInt()]; in.readBytes(body); RpcMessage message = null; MessageType messageType = MessageType.valueOf(type); if (messageType == MessageType.RESPONSE) { message = new RpcResponse(contextId); ((RpcResponse) message).setStatus(InvocationStatus.valueOf(status)); } else { message = new RpcRequest(contextId); } message.setType(messageType); message.setData(body); list.add(message); } }
能夠看到,咱們解決半包問題的時候,是判斷有沒有收到咱們指望收到的報文,若是沒有,直接在 decode 方法裏面 return,等有更多的報文被收到的時候,netty 會自動幫咱們調起 decode 方法。而咱們解決粘包問題的思路也很清晰,那就是一次只處理一個報文,不去動後面的報文內容。
還須要注意的是,在 netty 中,對於 ByteBuf 的 get 是不會消費掉報文的,而 read 是會消費掉報文的。當不肯定報文是否收完整的時候,咱們都是用 get開頭的方法去試探性地驗證報文是否接收徹底,當肯定報文接收徹底後,咱們才用 read 開頭的方法去消費這段報文。
直接貼代碼,參考前文提到的協議格式閱讀如下代碼:
/** * * 協議編碼器 * * @author beanlam * @version 1.0 */ public class ProtocolEncoder extends MessageToByteEncoder<RpcMessage> { @Override protected void encode(ChannelHandlerContext ctx, RpcMessage rpcMessage, ByteBuf out) throws Exception { byte status; byte[] data = (byte[]) rpcMessage.getData(); if (rpcMessage instanceof RpcRequest) { RpcRequest request = (RpcRequest) rpcMessage; status = InvocationStatus.OK.getCode(); } else { RpcResponse response = (RpcResponse) rpcMessage; status = response.getStatus().getCode(); } out.writeShort(ProtocolMetaData.MAGIC); out.writeByte(ProtocolMetaData.VERSION); out.writeByte(rpcMessage.getType().getCode()); out.writeByte(status); out.writeLong(rpcMessage.getContextId()); out.writeInt(data.length); out.writeBytes(data); } }