客戶機與服務端之間的數據交互須要遵照必定的約定,好比協議版本,數據類型,是否有緩存,是否有壓縮等,只有在這些約定的基礎上才能相互之間愉快的工做。css
這時說的是基於TCP/IP的Netty之間的通訊。TCP/IP協議下客戶端與服務端之間要進行數據交互,通常須要將數據轉換成二進制格式,直接傳java bean是不能支持的。在RPC模式下客戶端在向服務端發起請求前須要將數據作編碼,服務端在接收客戶端發的數據後須要作解碼以後才能正常工做。java
爲了更好的控制RPC客戶端與服務端之間的通訊,也能夠編寫私有的協議棧來支撐。git
相似HTTP協議,包含頭信息以及內容信息。github
public class RpcMessage implements Serializable { private RpcMessageHeader messageHeader; private Object messageBody; }
頭信息,包含內容體長度,消息類型等信息。能夠根據消息類型來作不一樣的業務,好比區分是心跳信息仍是業務或者是監控之類的信息。web
public class RpcMessageHeader implements Serializable { private int length; private int type; }
由於TCP/IP協議容易出現粘包拆包現象,這裏爲了簡單直接選擇繼承組件提供的LengthFieldBasedFrameDecoder,只須要重寫下面的方法便可:緩存
public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { ByteBuf frame=(ByteBuf)super.decode(ctx,in); if(null==frame){ return null; } RpcMessage message=new RpcMessage(); RpcMessageHeader messageHeader=new RpcMessageHeader(); messageHeader.setLength(frame.readInt()); message.setMessageHeader(messageHeader); byte[] data = new byte[message.getMessageHeader().getLength()]; frame.readBytes(data); Object obj = ProtoStuffSerializeUtil.deserialize(data, genericClass); message.setMessageBody(obj); return message; }
編碼器繼承MessageToByteEncoder,將對象轉換成字節的編碼器網絡
public class RpcEncoder extends MessageToByteEncoder<RpcMessage>
重點是下面的編碼函數,在ByteBuf中輸出數據長度以及數據體,若有其它須要能夠補充其它的字段,好比消息類型。ide
public void encode(ChannelHandlerContext ctx, RpcMessage in, ByteBuf out) throws Exception { if(null==in){ throw new RpcException("RpcMessage is null"); } if (genericClass.isInstance(in.getMessageBody())) { byte[] data = ProtoStuffSerializeUtil.serialize(in.getMessageBody()); out.writeInt(data.length); out.writeBytes(data); } }
public class RpcServerInvoker extends AbstractInvoker<RpcMessage>
從服務端方法獲取到返回的結果後,從新封裝成消息對象(RpcMessage)發送給客戶端。函數
protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcMessage message) { this.executor.execute(new Runnable() { @Override public void run() { RpcInvoker rpcInvoker=RpcServerInvoker.this.buildInvokerChain(RpcServerInvoker.this); RpcResponse response=(RpcResponse) rpcInvoker.invoke(RpcServerInvoker.this.buildRpcInvocation((RpcRequest) message.getMessageBody())); RpcMessage responseMessage=new RpcMessage(); byte[] data = ProtoStuffSerializeUtil.serialize(response); RpcMessageHeader messageHeader=new RpcMessageHeader(); messageHeader.setLength(data.length); responseMessage.setMessageHeader(messageHeader); responseMessage.setMessageBody(response); channelHandlerContext.writeAndFlush(responseMessage); } }); }
public class RpcClientInvoker extends AbstractInvoker<RpcMessage>
接收的返回結果修改成RpcMessage,從body屬性中獲取原來的RpcResponse對象ui
public void channelRead0(ChannelHandlerContext ctx, RpcMessage message) { RpcResponse response=(RpcResponse) message.getMessageBody(); String requestId = response.getRequestId(); ResponseFuture responseFuture = pendingRPC.get(requestId); if (responseFuture != null) { pendingRPC.remove(requestId); responseFuture.done(response); } }
public ResponseFuture invoke(RpcInvocation invocation) { RpcRequest request=this.getRpcRequest(); ResponseFuture responseFuture = new ResponseFuture(request); pendingRPC.put(request.getRequestId(), responseFuture); RpcMessage message=new RpcMessage(); byte[] data = ProtoStuffSerializeUtil.serialize(request); RpcMessageHeader messageHeader=new RpcMessageHeader(); messageHeader.setLength(data.length); message.setMessageHeader(messageHeader); message.setMessageBody(request); channel.writeAndFlush(message); return responseFuture; }
https://github.com/jiangmin168168/jim-framework
文中代碼是依賴上述項目的,若是有不明白的可下載源碼