簡易RPC框架:基於 netty 的協議編解碼

概述

《簡易RPC框架:需求與設計》這篇文章中已經給出了協議的具體細節,協議類型爲二進制協議,以下:java

------------------------------------------------------------------------
 | magic (2bytes) | version (1byte) |  type (1byte)  | reserved (7bits) | 
 ------------------------------------------------------------------------
 | status (1byte) |    id (8bytes)    |        body length (4bytes)     |
 ------------------------------------------------------------------------
 |                                                                      |
 |                   body ($body_length bytes)                          |
 |                                                                      |
 ------------------------------------------------------------------------

協議的解碼咱們稱爲 decode,編碼咱們成爲 encode,下文咱們將直接使用 decode 和 encode 術語。react

decode 的本質就是講接收到的一串二進制報文,轉化爲具體的消息對象,在 Java 中,就是將這串二進制報文所包含的信息,用某種類型的對象存儲起來。微信

encode 則是將存儲了信息的對象,轉化爲具備相同含義的一串二進制報文,而後網絡收發模塊再將報文發出去。網絡

不管是 rpc 客戶端仍是服務端,都須要有一個 decode 和 encode 的邏輯。框架

消息類型

rpc 客戶端與服務端之間的通訊,須要經過發送不一樣類型的消息來實現,例如:client 向 server 端發送的消息,多是請求消息,多是心跳消息,多是認證消息,而 server 向 client 發送的消息,通常就是響應消息。ide

利用 Java 中的枚舉類型,能夠將消息類型進行以下定義:this

/**
 * 消息類型
 *
 * @author beanlam
 * @version 1.0  
 */ 

public enum MessageType {

    REQUEST((byte) 0x01), HEARTBEAT((byte) 0x02), CHECKIN((byte) 0x03), RESPONSE(
            (byte) 0x04), UNKNOWN((byte) 0xFF);

    private byte code;

    MessageType(byte code) {
        this.code = code;
    }

    public static MessageType valueOf(byte code) {
        for (MessageType instance : values()) {
            if (instance.code == code) {
                return instance;
            }
        }
        return UNKNOWN;
    }

    public byte getCode() {
        return code;
    }
}

在這個類中設計了 valueOf 方法,方便進行具體的 byte 字節與具體的消息枚舉類型之間的映射和轉換。編碼

調用狀態設計

client 主動發起的一次 rpc 調用,要麼成功,要麼失敗,server 端有責任告知 client 這次調用的結果,client 也有責任去感知調用失敗的緣由,由於不必定是 server 端形成的失敗,多是由於 client 端在對消息進行預處理的時候,例如序列化,就已經出錯了,這種錯誤也應該做爲一次調用的調用結果返回給 client 調用者。所以引入一個調用狀態,與消息類型同樣,它也藉助了 Java 語言裏的枚舉類型來實現,並實現了方便的 valueOf 方法:atom

/**

 * 調用狀態

 *

 * @author beanlam

 * @version 1.0

 */

public enum InvocationStatus {

    OK((byte) 0x01), CLIENT_TIMEOUT((byte) 0x02), SERVER_TIMEOUT(

            (byte) 0x03), BAD_REQUEST((byte) 0x04), BAD_RESPONSE(

            (byte) 0x05), SERVICE_NOT_FOUND((byte) 0x06), SERVER_SERIALIZATION_ERROR(

            (byte) 0x07), CLIENT_SERIALIZATION_ERROR((byte) 0x08), CLIENT_CANCELED(

            (byte) 0x09), SERVER_BUSY((byte) 0x0A), CLIENT_BUSY(

            (byte) 0x0B), SERIALIZATION_ERROR((byte) 0x0C), INTERNAL_ERROR(

            (byte) 0x0D), SERVER_METHOD_INVOKE_ERROR((byte) 0x0E), UNKNOWN((byte) 0xFF);

    private byte code;

    InvocationStatus(byte code) {

        this.code = code;

    }

    public static InvocationStatus valueOf(byte code) {

        for (InvocationStatus instance : values()) {

            if (code == instance.code) {

                return instance;

            }

        }

        return UNKNOWN;

    }

    public byte getCode() {

        return code;

    }

}

消息實體設計

咱們將 client 往 server 端發送的統一稱爲 rpc 請求消息,一個請求對應着一個響應,所以在 client 和 server 端間流動的信息大致上其實就只有兩種,即要麼是請求,要麼是響應。咱們將會定義兩個類,分別是 RpcRequest 和 RpcResponse 來表明請求消息和響應消息。spa

另外因爲不管是請求消息仍是響應消息,它們都有一些共同的屬性,例如說「調用上下文ID」,或者消息類型。所以會再定義一個 RpcMessage 類,做爲父類。

消息類的關係

RpcMessage

/**

 * rpc消息

 *

 * @author beanlam

 * @version 1.0

 */

public class RpcMessage {

    private MessageType type;

    private long contextId;

    private Object data;

    public long getContextId() {

        return this.contextId;

    }

    public void setContextId(long id) {

        this.contextId = id;

    }

    public Object getData() {

        return this.data;

    }

    public void setData(Object data) {

        this.data = data;

    }

    public void setType(byte code) {

        this.type = MessageType.valueOf(code);

    }

    public MessageType getType() {

        return this.type;

    }

    public void setType(MessageType type) {

        this.type = type;

    }

    @Override

    public String toString() {

        return "[messageType=" + type.name() + ", contextId=" + contextId + ", data="

                + data + "]";

    }

}

RpcRequest

import java.util.concurrent.atomic.AtomicLong;

/**

 * rpc請求消息

 *

 * @author beanlam

 * @version 1.0

 */

public class RpcRequest extends RpcMessage {

    private static final AtomicLong ID_GENERATOR = new AtomicLong(0);

    public RpcRequest() {

        this(ID_GENERATOR.incrementAndGet());

    }

    public RpcRequest(long contextId) {

        setContextId(contextId);

        setType(MessageType.REQUEST);

    }

}

RpcResponse

/**

 *

 * rpc響應消息

 *

 * @author beanlam

 * @version 1.0

 */

public class RpcResponse extends RpcMessage {

    private InvocationStatus status = InvocationStatus.OK;

    public RpcResponse(long contextId) {

        setContextId(contextId);

        setType(MessageType.RESPONSE);

    }

    public InvocationStatus getStatus() {

        return this.status;

    }

    public void setStatus(InvocationStatus status) {

        this.status = status;

    }

    @Override

    public String toString() {

        return "RpcResponse[contextId=" + getContextId() + ", status=" + status.name() + "]";

    }

}

netty 編解碼介紹

netty 是一個 NIO 框架,應該這麼說,netty 是一個有良好設計思想的 NIO 框架。一個 NIO 框架必備的要素就是 reactor 線程模型,目前有一些比較優秀並且開源的小型 NIO 框架,例如分庫分表中間件 mycat 實現的一個簡易 NIO 框架,能夠在這裏看到。

netty 的主要特色有:微內核設計、責任鏈模式的業務邏輯處理、內存和資源泄露的檢測等。其中編解碼在 netty 中,都被設計成責任鏈上的一個一個 Handler。

decode 對於 netty 來講,它提供了 ByteToMessageDecoder,它也提供了 MessageToByteEncoder。

藉助 netty 來實現協議編解碼,實際上就是去在這兩個handler裏面實現編解碼的邏輯。

decode

在實現 decode 邏輯時須要注意的一個問題是,因爲二進制報文是在網絡上發送的,所以一個完整的報文可能通過多個分組來發送的,什麼意思呢,就是當有報文進來後,要確認報文是否完整,decode邏輯代碼不能假設收到的報文就是一個完整報文,通常稱這爲「TCP半包問題」。一樣,報文是連着報文發送的,意味着decode代碼邏輯還要負責在一長串二進制序列中,分割出一個一個獨立的報文,這稱之爲「TCP粘包問題」。

netty 自己有提供一些方便的 decoder handler 來處理 TCP 半包和粘包的問題。不過通常狀況下咱們不會直接去用它,由於咱們的協議比較簡單,本身在代碼裏處理一下就能夠了。

完整的 decode 代碼邏輯以下所示:

import cn.com.agree.ats.rpc.message.*;

import cn.com.agree.ats.util.logfacade.AbstractPuppetLoggerFactory;

import cn.com.agree.ats.util.logfacade.IPuppetLogger;

import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;

import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

/**

 * 協議解碼器

 *

 * @author beanlam

 * @version 1.0

 */

public class ProtocolDecoder extends ByteToMessageDecoder {

    private static final IPuppetLogger logger = AbstractPuppetLoggerFactory

            .getInstance(ProtocolDecoder.class);

    private boolean magicChecked = false;

    @Override

    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> list)

            throws Exception {

        if (!magicChecked) {

            if (in.readableBytes() < ProtocolMetaData.MAGIC_LENGTH_IN_BYTES) {

                return;

            }

            magicChecked = true;

            if (!(in.getShort(in.readerIndex()) == ProtocolMetaData.MAGIC)) {

                logger.warn(

                        "illegal data received without correct magic number, channel will be close");

                ctx.close();

                magicChecked = false; //this line of code makes no any sense, but it's good for a warning

                return;

            }

        }

        if (in.readableBytes() < ProtocolMetaData.HEADER_LENGTH_IN_BYTES) {

            return;

        }

        int bodyLength = in

                .getInt(in.readerIndex() + ProtocolMetaData.BODY_LENGTH_OFFSET);

        if (in.readableBytes() < bodyLength + ProtocolMetaData.HEADER_LENGTH_IN_BYTES) {

            return;

        }

        magicChecked = false;// so far the whole packet was received

        in.readShort(); // skip the magic

        in.readByte(); // dont care about the protocol version so far

        byte type = in.readByte();

        byte status = in.readByte();

        long contextId = in.readLong();

        byte[] body = new byte[in.readInt()];

        in.readBytes(body);

        RpcMessage message = null;

        MessageType messageType = MessageType.valueOf(type);

        if (messageType == MessageType.RESPONSE) {

            message = new RpcResponse(contextId);

            ((RpcResponse) message).setStatus(InvocationStatus.valueOf(status));

        } else {

            message = new RpcRequest(contextId);

        }

        message.setType(messageType);

        message.setData(body);

        list.add(message);

    }

}

能夠看到,咱們解決半包問題的時候,是判斷有沒有收到咱們指望收到的報文,若是沒有,直接在 decode 方法裏面 return,等有更多的報文被收到的時候,netty 會自動幫咱們調起 decode 方法。而咱們解決粘包問題的思路也很清晰,那就是一次只處理一個報文,不去動後面的報文內容。

還須要注意的是,在 netty 中,對於 ByteBuf 的 get 是不會消費掉報文的,而 read 是會消費掉報文的。當不肯定報文是否收完整的時候,咱們都是用 get開頭的方法去試探性地驗證報文是否接收徹底,當肯定報文接收徹底後,咱們才用 read 開頭的方法去消費這段報文。

encode

直接貼代碼,參考前文提到的協議格式閱讀如下代碼:

/**

 *

 * 協議編碼器

 *

 * @author beanlam

 * @version 1.0

 */

public class ProtocolEncoder extends MessageToByteEncoder<RpcMessage> {

    @Override

    protected void encode(ChannelHandlerContext ctx, RpcMessage rpcMessage, ByteBuf out)

            throws Exception {

        byte status;

        byte[] data = (byte[]) rpcMessage.getData();

        if (rpcMessage instanceof RpcRequest) {

            RpcRequest request = (RpcRequest) rpcMessage;

            status = InvocationStatus.OK.getCode();

        } else {

            RpcResponse response = (RpcResponse) rpcMessage;

            status = response.getStatus().getCode();

        }

        out.writeShort(ProtocolMetaData.MAGIC);

        out.writeByte(ProtocolMetaData.VERSION);

        out.writeByte(rpcMessage.getType().getCode());

        out.writeByte(status);

        out.writeLong(rpcMessage.getContextId());

        out.writeInt(data.length);

        out.writeBytes(data);

    }

}

掃一掃關注個人微信公衆號

相關文章
相關標籤/搜索