簡易RPC框架-私有協議棧

HTTP協議

客戶機與服務端之間的數據交互須要遵照必定的約定,好比協議版本,數據類型,是否有緩存,是否有壓縮等,只有在這些約定的基礎上才能相互之間愉快的工做。css

Netty通訊過程當中的編解碼

這時說的是基於TCP/IP的Netty之間的通訊。TCP/IP協議下客戶端與服務端之間要進行數據交互,通常須要將數據轉換成二進制格式,直接傳java bean是不能支持的。在RPC模式下客戶端在向服務端發起請求前須要將數據作編碼,服務端在接收客戶端發的數據後須要作解碼以後才能正常工做。java

  • 解碼流程

  • 編碼流程

Netty 私有協議棧

爲了更好的控制RPC客戶端與服務端之間的通訊,也能夠編寫私有的協議棧來支撐。git

定義消息體

相似HTTP協議,包含頭信息以及內容信息。github

public class RpcMessage implements Serializable {

    private RpcMessageHeader messageHeader;

    private Object messageBody;

}

頭信息,包含內容體長度,消息類型等信息。能夠根據消息類型來作不一樣的業務,好比區分是心跳信息仍是業務或者是監控之類的信息。web

public class RpcMessageHeader implements Serializable {
    private int length;

    private int type;
   
}

定義解碼器

由於TCP/IP協議容易出現粘包拆包現象,這裏爲了簡單直接選擇繼承組件提供的LengthFieldBasedFrameDecoder,只須要重寫下面的方法便可:緩存

public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        ByteBuf frame=(ByteBuf)super.decode(ctx,in);
        if(null==frame){
            return null;
        }

        RpcMessage message=new RpcMessage();
        RpcMessageHeader messageHeader=new RpcMessageHeader();
        messageHeader.setLength(frame.readInt());
        message.setMessageHeader(messageHeader);

        byte[] data = new byte[message.getMessageHeader().getLength()];
        frame.readBytes(data);

        Object obj = ProtoStuffSerializeUtil.deserialize(data, genericClass);
        message.setMessageBody(obj);
        return message;
    }

定義編碼器

編碼器繼承MessageToByteEncoder,將對象轉換成字節的編碼器網絡

public class RpcEncoder extends MessageToByteEncoder<RpcMessage>

重點是下面的編碼函數,在ByteBuf中輸出數據長度以及數據體,若有其它須要能夠補充其它的字段,好比消息類型。ide

public void encode(ChannelHandlerContext ctx, RpcMessage in, ByteBuf out) throws Exception {
        if(null==in){
            throw new RpcException("RpcMessage is null");
        }
        if (genericClass.isInstance(in.getMessageBody())) {
            byte[] data = ProtoStuffSerializeUtil.serialize(in.getMessageBody());
            out.writeInt(data.length);
            out.writeBytes(data);
        }
    }

ServerHandle

  • 修改服務端執行器消息實體類型爲新定義的RpcMessage
public class RpcServerInvoker extends AbstractInvoker<RpcMessage>
  • 修改服務端回調

從服務端方法獲取到返回的結果後,從新封裝成消息對象(RpcMessage)發送給客戶端。函數

protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcMessage message) {

        this.executor.execute(new Runnable() {
            @Override
            public void run() {
                RpcInvoker rpcInvoker=RpcServerInvoker.this.buildInvokerChain(RpcServerInvoker.this);
                RpcResponse response=(RpcResponse) rpcInvoker.invoke(RpcServerInvoker.this.buildRpcInvocation((RpcRequest) message.getMessageBody()));
                RpcMessage responseMessage=new RpcMessage();
                byte[] data = ProtoStuffSerializeUtil.serialize(response);
                RpcMessageHeader messageHeader=new RpcMessageHeader();
                messageHeader.setLength(data.length);
                responseMessage.setMessageHeader(messageHeader);
                responseMessage.setMessageBody(response);
                channelHandlerContext.writeAndFlush(responseMessage);
            }
        });

    }

ClientHandle

  • 修改客戶端執行器消息實體類型爲新定義的RpcMessage
public class RpcClientInvoker extends AbstractInvoker<RpcMessage>
  • 修改客戶端回調方法

接收的返回結果修改成RpcMessage,從body屬性中獲取原來的RpcResponse對象ui

public void channelRead0(ChannelHandlerContext ctx, RpcMessage message) {
        RpcResponse response=(RpcResponse) message.getMessageBody();
        String requestId = response.getRequestId();
        ResponseFuture responseFuture = pendingRPC.get(requestId);
        if (responseFuture != null) {
            pendingRPC.remove(requestId);
            responseFuture.done(response);
        }
    }
  • 修改發送請求的消息對象,組裝成RpcMessage發送
public ResponseFuture invoke(RpcInvocation invocation) {
        RpcRequest request=this.getRpcRequest();
        ResponseFuture responseFuture = new ResponseFuture(request);
        pendingRPC.put(request.getRequestId(), responseFuture);
        RpcMessage message=new RpcMessage();
        byte[] data = ProtoStuffSerializeUtil.serialize(request);
        RpcMessageHeader messageHeader=new RpcMessageHeader();
        messageHeader.setLength(data.length);
        message.setMessageHeader(messageHeader);
        message.setMessageBody(request);
        channel.writeAndFlush(message);
        return responseFuture;
    }

本文源碼

https://github.com/jiangmin168168/jim-framework

文中代碼是依賴上述項目的,若是有不明白的可下載源碼

引用

  • 文中插圖來自來網絡
  • 文中的思路參考了Netty權威指南
相關文章
相關標籤/搜索