最近對網絡編程方面比較有興趣,在微服務實踐上也用到了相對主流的RPC
框架如Spring Cloud Gateway
底層也切換爲Reactor-Netty
,像Redisson
底層也是使用Netty
封裝通信協議,最近調研和準備使用的SOFARpc
也是基於Netty
封裝實現了多種協議的兼容。所以,基於Netty
造一個輪子,在SpringBoot
的加持下,實現一個輕量級的RPC
框架。這篇博文介紹的是RPC
框架協議的定義以及對應的編碼解碼處理的實現。java
截止本文(2020-01-12
)編寫完成之時,Netty
的最新版本爲4.1.44.Final
,而SpringBoot
的最新版本爲2.2.2.RELEASE
,所以引入這兩個版本的依賴,加上其餘工具包和序列化等等的支持,pom
文件的核心內容以下:git
<dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>${spring.boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>${netty.version}</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.10</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.61</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>28.1-jre</version> </dependency> </dependencies>
部分參數的序列化會依賴到FastJson
或者Jackson
,具體看偏好而定。github
爲了提升協議傳輸的效率,須要定製一套高效的RPC
協議,設計協議所需的字段和類型。spring
基礎Packet字段:shell
字段名 | 字段類型 | 字段功能 | 備註 |
---|---|---|---|
magicNumber |
int |
魔數,相似於Java 的字節碼文件的魔數是0xcafebase |
|
version |
int |
版本號 | 預留字段,默認爲1 |
serialNumber |
java.lang.String |
請求流水號 | 十分重要,每一個請求的惟一標識 |
messageType |
MessageType |
消息類型 | 自定義的枚舉類型,見下面的MessageType 類 |
attachments |
Map<String, String> |
附件 | K-V 形式,相似於HTTP 協議中的Header |
// 消息枚舉類型 @RequiredArgsConstructor public enum MessageType { /** * 請求 */ REQUEST((byte) 1), /** * 響應 */ RESPONSE((byte) 2), /** * PING */ PING((byte) 3), /** * PONG */ PONG((byte) 4), /** * NULL */ NULL((byte) 5), ; @Getter private final Byte type; public static MessageType fromValue(byte value) { for (MessageType type : MessageType.values()) { if (type.getType() == value) { return type; } } throw new IllegalArgumentException(String.format("value = %s", value)); } } // 基礎Packet @Data public abstract class BaseMessagePacket implements Serializable { /** * 魔數 */ private int magicNumber; /** * 版本號 */ private int version; /** * 流水號 */ private String serialNumber; /** * 消息類型 */ private MessageType messageType; /** * 附件 - K-V形式 */ private Map<String, String> attachments = new HashMap<>(); /** * 添加附件 */ public void addAttachment(String key, String value) { attachments.put(key, value); } }
請求Packet擴展字段:編程
字段名 | 字段類型 | 字段功能 | 備註 |
---|---|---|---|
interfaceName |
java.lang.String |
接口全類名 | |
methodName |
java.lang.String |
方法名 | |
methodArgumentSignatures |
java.lang.String[] |
方法參數簽名字符串數組 | 存放方法參數類型全類名字符串數組 |
methodArguments |
java.lang.Object[] |
方法參數數組 | 由於未知方法參數類型,因此用Object 表示 |
@EqualsAndHashCode(callSuper = true) @Data public class RequestMessagePacket extends BaseMessagePacket { /** * 接口全類名 */ private String interfaceName; /** * 方法名 */ private String methodName; /** * 方法參數簽名 */ private String[] methodArgumentSignatures; /** * 方法參數 */ private Object[] methodArguments; }
響應Packet擴展字段:json
字段名 | 字段類型 | 字段功能 | 備註 |
---|---|---|---|
errorCode |
java.lang.Long |
響應碼 | |
message |
java.lang.String |
響應消息 | 若是出現異常,message 就是對應的異常信息 |
payload |
java.lang.Object |
消息載荷 | 業務處理返回的消息載荷,定義爲Object 類型 |
@EqualsAndHashCode(callSuper = true) @Data public class ResponseMessagePacket extends BaseMessagePacket { /** * error code */ private Long errorCode; /** * 消息描述 */ private String message; /** * 消息載荷 */ private Object payload; }
須要注意如下幾點:bootstrap
java.lang.String
類型爲例:// 序列化 - 流水號 out.writeInt(packet.getSerialNumber().length()); out.writeCharSequence(packet.getSerialNumber(), ProtocolConstant.UTF_8); // 反序列化 - 流水號 int serialNumberLength = in.readInt(); packet.setSerialNumber(in.readCharSequence(serialNumberLength, ProtocolConstant.UTF_8).toString());
UTF-8
編碼下一個中文字符佔3個字節,這一點能夠抽取一個工具類專門處理字符串的序列化:public enum ByteBufferUtils { // 單例 X; public void encodeUtf8CharSequence(ByteBuf byteBuf, CharSequence charSequence) { int writerIndex = byteBuf.writerIndex(); byteBuf.writeInt(0); int length = ByteBufUtil.writeUtf8(byteBuf, charSequence); byteBuf.setInt(writerIndex, length); } }
Object[]
的序列化和反序列化相對特殊,由於ByteBuf
沒法處理自定義類型的寫入和讀取(這個很好理解,網絡編程就是面向0
和1
的編程):write Object --> ByteBuf#writeInt() && ByteBuf#writeBytes() read Object --> ByteBuf#readInt() && ByteBuf#readBytes() [<== 這個方法返回值是ByteBuf實例]
ByteBuf
的引用,不然有可能致使內存泄漏。自定義協議編碼解碼主要包括四個部分的編碼解碼器:數組
Packet
編碼器:RequestMessagePacketEncoder
,主要用於客戶端把RequestMessagePacket
實例序列化爲二進制序列。Packet
解碼器:RequestMessagePacketDecoder
,主要用於服務端把二進制序列反序列化爲RequestMessagePacket
實例。Packet
編碼器:ResponseMessagePacketEncoder
,主要用於服務端把ResponseMessagePacket
實例序列化爲二進制序列。Packet
解碼器:ResponseMessagePacketDecoder
,主要用於客戶端把二進制序列反序列化爲ResponseMessagePacket
實例。畫個圖描述一下幾個組件的交互流程(省略了部分入站和出站處理器):springboot
序列化器Serializer
的代碼以下:
public interface Serializer { byte[] encode(Object target); Object decode(byte[] bytes, Class<?> targetClass); } // FastJson實現 public enum FastJsonSerializer implements Serializer { // 單例 X; @Override public byte[] encode(Object target) { return JSON.toJSONBytes(target); } @Override public Object decode(byte[] bytes, Class<?> targetClass) { return JSON.parseObject(bytes, targetClass); } }
請求Packet
編碼器RequestMessagePacketEncoder
的代碼以下:
@RequiredArgsConstructor public class RequestMessagePacketEncoder extends MessageToByteEncoder<RequestMessagePacket> { private final Serializer serializer; @Override protected void encode(ChannelHandlerContext context, RequestMessagePacket packet, ByteBuf out) throws Exception { // 魔數 out.writeInt(packet.getMagicNumber()); // 版本 out.writeInt(packet.getVersion()); // 流水號 out.writeInt(packet.getSerialNumber().length()); out.writeCharSequence(packet.getSerialNumber(), ProtocolConstant.UTF_8); // 消息類型 out.writeByte(packet.getMessageType().getType()); // 附件size Map<String, String> attachments = packet.getAttachments(); out.writeInt(attachments.size()); // 附件內容 attachments.forEach((k, v) -> { out.writeInt(k.length()); out.writeCharSequence(k, ProtocolConstant.UTF_8); out.writeInt(v.length()); out.writeCharSequence(v, ProtocolConstant.UTF_8); }); // 接口全類名 out.writeInt(packet.getInterfaceName().length()); out.writeCharSequence(packet.getInterfaceName(), ProtocolConstant.UTF_8); // 方法名 out.writeInt(packet.getMethodName().length()); out.writeCharSequence(packet.getMethodName(), ProtocolConstant.UTF_8); // 方法參數簽名(String[]類型) - 非必須 if (null != packet.getMethodArgumentSignatures()) { int len = packet.getMethodArgumentSignatures().length; // 方法參數簽名數組長度 out.writeInt(len); for (int i = 0; i < len; i++) { String methodArgumentSignature = packet.getMethodArgumentSignatures()[i]; out.writeInt(methodArgumentSignature.length()); out.writeCharSequence(methodArgumentSignature, ProtocolConstant.UTF_8); } } else { out.writeInt(0); } // 方法參數(Object[]類型) - 非必須 if (null != packet.getMethodArguments()) { int len = packet.getMethodArguments().length; // 方法參數數組長度 out.writeInt(len); for (int i = 0; i < len; i++) { byte[] bytes = serializer.encode(packet.getMethodArguments()[i]); out.writeInt(bytes.length); out.writeBytes(bytes); } } else { out.writeInt(0); } } }
請求Packet
解碼器RequestMessagePacketDecoder
的代碼以下:
@RequiredArgsConstructor public class RequestMessagePacketDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext context, ByteBuf in, List<Object> list) throws Exception { RequestMessagePacket packet = new RequestMessagePacket(); // 魔數 packet.setMagicNumber(in.readInt()); // 版本 packet.setVersion(in.readInt()); // 流水號 int serialNumberLength = in.readInt(); packet.setSerialNumber(in.readCharSequence(serialNumberLength, ProtocolConstant.UTF_8).toString()); // 消息類型 byte messageTypeByte = in.readByte(); packet.setMessageType(MessageType.fromValue(messageTypeByte)); // 附件 Map<String, String> attachments = Maps.newHashMap(); packet.setAttachments(attachments); int attachmentSize = in.readInt(); if (attachmentSize > 0) { for (int i = 0; i < attachmentSize; i++) { int keyLength = in.readInt(); String key = in.readCharSequence(keyLength, ProtocolConstant.UTF_8).toString(); int valueLength = in.readInt(); String value = in.readCharSequence(valueLength, ProtocolConstant.UTF_8).toString(); attachments.put(key, value); } } // 接口全類名 int interfaceNameLength = in.readInt(); packet.setInterfaceName(in.readCharSequence(interfaceNameLength, ProtocolConstant.UTF_8).toString()); // 方法名 int methodNameLength = in.readInt(); packet.setMethodName(in.readCharSequence(methodNameLength, ProtocolConstant.UTF_8).toString()); // 方法參數簽名 int methodArgumentSignatureArrayLength = in.readInt(); if (methodArgumentSignatureArrayLength > 0) { String[] methodArgumentSignatures = new String[methodArgumentSignatureArrayLength]; for (int i = 0; i < methodArgumentSignatureArrayLength; i++) { int methodArgumentSignatureLength = in.readInt(); methodArgumentSignatures[i] = in.readCharSequence(methodArgumentSignatureLength, ProtocolConstant.UTF_8).toString(); } packet.setMethodArgumentSignatures(methodArgumentSignatures); } // 方法參數 int methodArgumentArrayLength = in.readInt(); if (methodArgumentArrayLength > 0) { // 這裏的Object[]其實是ByteBuf[] - 後面須要二次加工爲對應類型的實例 Object[] methodArguments = new Object[methodArgumentArrayLength]; for (int i = 0; i < methodArgumentArrayLength; i++) { int byteLength = in.readInt(); methodArguments[i] = in.readBytes(byteLength); } packet.setMethodArguments(methodArguments); } list.add(packet); } }
響應Packet
編碼器ResponseMessagePacketEncoder
的代碼以下:
@RequiredArgsConstructor public class ResponseMessagePacketEncoder extends MessageToByteEncoder<ResponseMessagePacket> { private final Serializer serializer; @Override protected void encode(ChannelHandlerContext ctx, ResponseMessagePacket packet, ByteBuf out) throws Exception { // 魔數 out.writeInt(packet.getMagicNumber()); // 版本 out.writeInt(packet.getVersion()); // 流水號 out.writeInt(packet.getSerialNumber().length()); out.writeCharSequence(packet.getSerialNumber(), ProtocolConstant.UTF_8); // 消息類型 out.writeByte(packet.getMessageType().getType()); // 附件size Map<String, String> attachments = packet.getAttachments(); out.writeInt(attachments.size()); // 附件內容 attachments.forEach((k, v) -> { out.writeInt(k.length()); out.writeCharSequence(k, ProtocolConstant.UTF_8); out.writeInt(v.length()); out.writeCharSequence(v, ProtocolConstant.UTF_8); }); // error code out.writeLong(packet.getErrorCode()); // message String message = packet.getMessage(); ByteBufferUtils.X.encodeUtf8CharSequence(out, message); // payload byte[] bytes = serializer.encode(packet.getPayload()); out.writeInt(bytes.length); out.writeBytes(bytes); } }
響應Packet
解碼器ResponseMessagePacketDecoder
的代碼以下:
public class ResponseMessagePacketDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { ResponseMessagePacket packet = new ResponseMessagePacket(); // 魔數 packet.setMagicNumber(in.readInt()); // 版本 packet.setVersion(in.readInt()); // 流水號 int serialNumberLength = in.readInt(); packet.setSerialNumber(in.readCharSequence(serialNumberLength, ProtocolConstant.UTF_8).toString()); // 消息類型 byte messageTypeByte = in.readByte(); packet.setMessageType(MessageType.fromValue(messageTypeByte)); // 附件 Map<String, String> attachments = Maps.newHashMap(); packet.setAttachments(attachments); int attachmentSize = in.readInt(); if (attachmentSize > 0) { for (int i = 0; i < attachmentSize; i++) { int keyLength = in.readInt(); String key = in.readCharSequence(keyLength, ProtocolConstant.UTF_8).toString(); int valueLength = in.readInt(); String value = in.readCharSequence(valueLength, ProtocolConstant.UTF_8).toString(); attachments.put(key, value); } } // error code packet.setErrorCode(in.readLong()); // message int messageLength = in.readInt(); packet.setMessage(in.readCharSequence(messageLength, ProtocolConstant.UTF_8).toString()); // payload - ByteBuf實例 int payloadLength = in.readInt(); packet.setPayload(in.readBytes(payloadLength)); out.add(packet); } }
核心的編碼解碼器已經編寫完,接着要注意一下TCP
協議二進制包發送的時候只保證了包的發送順序、確認發送以及重傳,沒法保證二進制包是否完整(有些博客也稱此類場景爲粘包、半包等等,其實網絡協議裏面並無定義這些術語,估計是有人杜撰出來),所以這裏採起了定長幀編碼和解碼器LengthFieldPrepender
和LengthFieldBasedFrameDecoder
,簡單來講就是在消息幀的開頭幾位定義了整個幀的長度,讀取到整個長度的消息幀才認爲是一個完整的二進制報文。舉個幾個例子:
|<--------packet frame--------->| | Length Field | Actual Content |
序號 | Length Field | Actual Content |
---|---|---|
0 | 4 | abcd |
1 | 9 | throwable |
2 | 14 | {"name":"doge"} |
客戶端代碼以下:
@Slf4j public class TestProtocolClient { public static void main(String[] args) throws Exception { int port = 9092; EventLoopGroup workerGroup = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); try { bootstrap.group(workerGroup); bootstrap.channel(NioSocketChannel.class); bootstrap.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE); bootstrap.option(ChannelOption.TCP_NODELAY, Boolean.TRUE); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4)); ch.pipeline().addLast(new LengthFieldPrepender(4)); ch.pipeline().addLast(new RequestMessagePacketEncoder(FastJsonSerializer.X)); ch.pipeline().addLast(new ResponseMessagePacketDecoder()); ch.pipeline().addLast(new SimpleChannelInboundHandler<ResponseMessagePacket>() { @Override protected void channelRead0(ChannelHandlerContext ctx, ResponseMessagePacket packet) throws Exception { Object targetPayload = packet.getPayload(); if (targetPayload instanceof ByteBuf) { ByteBuf byteBuf = (ByteBuf) targetPayload; int readableByteLength = byteBuf.readableBytes(); byte[] bytes = new byte[readableByteLength]; byteBuf.readBytes(bytes); targetPayload = FastJsonSerializer.X.decode(bytes, String.class); byteBuf.release(); } packet.setPayload(targetPayload); log.info("接收到來自服務端的響應消息,消息內容:{}", JSON.toJSONString(packet)); } }); } }); ChannelFuture future = bootstrap.connect("localhost", port).sync(); log.info("啓動NettyClient[{}]成功...", port); Channel channel = future.channel(); RequestMessagePacket packet = new RequestMessagePacket(); packet.setMagicNumber(ProtocolConstant.MAGIC_NUMBER); packet.setVersion(ProtocolConstant.VERSION); packet.setSerialNumber(SerialNumberUtils.X.generateSerialNumber()); packet.setMessageType(MessageType.REQUEST); packet.setInterfaceName("club.throwable.contract.HelloService"); packet.setMethodName("sayHello"); packet.setMethodArgumentSignatures(new String[]{"java.lang.String"}); packet.setMethodArguments(new Object[]{"doge"}); channel.writeAndFlush(packet); future.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); } } }
服務端代碼以下:
@Slf4j public class TestProtocolServer { public static void main(String[] args) throws Exception { int port = 9092; ServerBootstrap bootstrap = new ServerBootstrap(); EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4)); ch.pipeline().addLast(new LengthFieldPrepender(4)); ch.pipeline().addLast(new RequestMessagePacketDecoder()); ch.pipeline().addLast(new ResponseMessagePacketEncoder(FastJsonSerializer.X)); ch.pipeline().addLast(new SimpleChannelInboundHandler<RequestMessagePacket>() { @Override protected void channelRead0(ChannelHandlerContext ctx, RequestMessagePacket packet) throws Exception { log.info("接收到來自客戶端的請求消息,消息內容:{}", JSON.toJSONString(packet)); ResponseMessagePacket response = new ResponseMessagePacket(); response.setMagicNumber(packet.getMagicNumber()); response.setVersion(packet.getVersion()); response.setSerialNumber(packet.getSerialNumber()); response.setAttachments(packet.getAttachments()); response.setMessageType(MessageType.RESPONSE); response.setErrorCode(200L); response.setMessage("Success"); response.setPayload("{\"name\":\"throwable\"}"); ctx.writeAndFlush(response); } }); } }); ChannelFuture future = bootstrap.bind(port).sync(); log.info("啓動NettyServer[{}]成功...", port); future.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
這裏在測試的環境中,最大的消息幀長度暫時定義爲1024。先啓動服務端,再啓動客戶端,見控制檯輸出以下:
// 服務端 22:29:32.596 [main] INFO club.throwable.protocol.TestProtocolServer - 啓動NettyServer[9092]成功... ...省略其餘日誌... 22:29:53.538 [nioEventLoopGroup-3-1] INFO club.throwable.protocol.TestProtocolServer - 接收到來自客戶端的請求消息,消息內容:{"attachments":{},"interfaceName":"club.throwable.contract.HelloService","magicNumber":10086,"messageType":"REQUEST","methodArgumentSignatures":["java.lang.String"],"methodArguments":[{"contiguous":true,"direct":true,"readOnly":false,"readable":true,"writable":false}],"methodName":"sayHello","serialNumber":"7f992c7cf9f445258601def1cac9bec0","version":1} // 客戶端 22:31:28.360 [main] INFO club.throwable.protocol.TestProtocolClient - 啓動NettyClient[9092]成功... ...省略其餘日誌... 22:31:39.320 [nioEventLoopGroup-2-1] INFO club.throwable.protocol.TestProtocolClient - 接收到來自服務端的響應消息,消息內容:{"attachments":{},"errorCode":200,"magicNumber":10086,"message":"Success","messageType":"RESPONSE","payload":"{\"name\":\"throwable\"}","serialNumber":"320808e709b34edbb91ba557780b58ad","version":1}
一個基於Netty
實現的簡單的自定義協議基本完成,可是要編寫一個優秀的RPC
框架,還須要作服務端的宿主類和目標方法查詢、調用,客戶端的動態代理,Netty
的NIO
模式下的同步調用改造,心跳處理,異常處理等等。後面會使用多篇文章逐個問題解決,網絡編程其實挺好玩了,就是編碼量會比較大(゜-゜)つロ
。
Demo
項目:
(e-a-20200112 c-1-d)