原文地址: haifeiWu的博客
博客地址:www.hchstudio.cn
歡迎轉載,轉載請註明做者及出處,謝謝!java
近期一直在作網絡協議相關的工做,因此博客也就與之相關的比較多,今天樓主結合 Redis的協議 RESP 看看在 Netty 源碼中是如何實現的。
數組
RESP 是 Redis 序列化協議的簡寫。它是一種直觀的文本協議,優點在於實現很是簡單,解析性能極好。服務器
Redis 協議將傳輸的結構數據分爲 5 種最小單元類型,單元結束時統一加上回車換行符號\r\n,來表示該單元的結束。網絡
關於 RESP 協議的具體介紹感興趣的小夥伴請移步樓主的另外一篇文章Redis協議規範(譯文)ide
以下面代碼中所表示的,Netty中使用對應符號的ASCII碼來表示,感興趣的小夥伴能夠查一下ASCII碼錶來驗證一下。性能
public enum RedisMessageType { // 以 + 開頭的單行字符串 SIMPLE_STRING((byte)43, true), // 以 - 開頭的錯誤信息 ERROR((byte)45, true), // 以 : 開頭的整型數據 INTEGER((byte)58, true), // 以 $ 開頭的多行字符串 BULK_STRING((byte)36, false), // 以 * 開頭的數組 ARRAY_HEADER((byte)42, false), ARRAY((byte)42, false); private final byte value; private final boolean inline; private RedisMessageType(byte value, boolean inline) { this.value = value; this.inline = inline; } public byte value() { return this.value; } public boolean isInline() { return this.inline; } public static RedisMessageType valueOf(byte value) { switch(value) { case 36: return BULK_STRING; case 42: return ARRAY_HEADER; case 43: return SIMPLE_STRING; case 45: return ERROR; case 58: return INTEGER; default: throw new RedisCodecException("Unknown RedisMessageType: " + value); } } }
解碼器,顧名思義,就是將服務器返回的數據根據協議反序列化成易於閱讀的信息。RedisDecoder 就是根據 RESP 將服務端返回的信息反序列化出來。下面是指令的編碼格式this
SET key value => *3\r\n$5\r\nSET\r\n$1\r\nkey\r\n$1\r\nvalue\r\n
指令是一個字符串數組,編碼一個字符串數組,首先須要編碼數組長度*3\r\n。而後依次編碼各個字符串參數。編碼字符串首先須要編碼字符串的長度$5\r\n。而後再編碼字符串的內容SET\r\n。Redis 消息以\r\n做爲分隔符,這樣設計其實挺浪費網絡傳輸流量的,消息內容裏面處處都是\r\n符號。可是這樣的消息可讀性會比較好,便於調試。RESP 協議是犧牲性能換取可讀,易於實現的一個經典例子。編碼
指令解碼器的實現,網絡字節流的讀取存在拆包問題。所拆包問題是指一次Read調用從套件字讀到的字節數組可能只是一個完整消息的一部分。而另一部分則須要發起另一次Read調用纔可能讀到,甚至要發起多個Read調用才能夠讀到完整的一條消息。對於拆包問題感興趣的小夥伴能夠查看樓主的另外一篇文章TCP 粘包問題淺析及其解決方案設計
若是咱們拿部分消息去反序列化成輸入消息對象確定是要失敗的,或者說生成的消息對象是不完整填充的。這個時候咱們須要等待下一次Read調用,而後將這兩次Read調用的字節數組拼起來,嘗試再一次反序列化。調試
問題來了,若是一個輸入消息對象很大,就可能須要多個Read調用和屢次反序列化操做才能完整的解包出一個輸入對象。那這個反序列化的過程就會重複了屢次。
針對這個問題,Netty 中很巧妙的解決了這個問題,以下所示,Netty 中經過 state 屬性來保存當前序列化的狀態,而後下次反序列化的時候就能夠從上次記錄的 state 直接繼續反序列化。這樣就避免了重複的問題。
// 保持當前序列化狀態的字段 private RedisDecoder.State state; public RedisDecoder() { this(65536, FixedRedisMessagePool.INSTANCE); } public RedisDecoder(int maxInlineMessageLength, RedisMessagePool messagePool) { this.toPositiveLongProcessor = new RedisDecoder.ToPositiveLongProcessor(); // 默認初始化狀態爲,反序列化指令類型 this.state = RedisDecoder.State.DECODE_TYPE; if (maxInlineMessageLength > 0 && maxInlineMessageLength <= 536870912) { this.maxInlineMessageLength = maxInlineMessageLength; this.messagePool = messagePool; } else { throw new RedisCodecException("maxInlineMessageLength: " + maxInlineMessageLength + " (expected: <= " + 536870912 + ")"); } } // 解碼器的主要業務邏輯 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { try { // 循環讀取信息,將信息完成的序列化 while(true) { switch(this.state) { case DECODE_TYPE: if (this.decodeType(in)) { break; } return; case DECODE_INLINE: if (this.decodeInline(in, out)) { break; } return; case DECODE_LENGTH: if (this.decodeLength(in, out)) { break; } return; case DECODE_BULK_STRING_EOL: if (this.decodeBulkStringEndOfLine(in, out)) { break; } return; case DECODE_BULK_STRING_CONTENT: if (this.decodeBulkStringContent(in, out)) { break; } return; default: throw new RedisCodecException("Unknown state: " + this.state); } } } catch (RedisCodecException var5) { this.resetDecoder(); throw var5; } catch (Exception var6) { this.resetDecoder(); throw new RedisCodecException(var6); } }
下面代碼中,是針對每種數據類型進行反序列化的具體業務邏輯。有小夥伴可能會想,沒有看到解碼胡數組類型的邏輯呢?實際上在 RESP 協議中數組就是其餘類型的組合,因此徹底能夠循環讀取,按照單個元素解碼。
// 解碼消息類型 private boolean decodeType(ByteBuf in) throws Exception { if (!in.isReadable()) { return false; } else { this.type = RedisMessageType.valueOf(in.readByte()); this.state = this.type.isInline() ? RedisDecoder.State.DECODE_INLINE : RedisDecoder.State.DECODE_LENGTH; return true; } } // 解碼單行字符串,錯誤信息,或者整型數據類型 private boolean decodeInline(ByteBuf in, List<Object> out) throws Exception { ByteBuf lineBytes = readLine(in); if (lineBytes == null) { if (in.readableBytes() > this.maxInlineMessageLength) { throw new RedisCodecException("length: " + in.readableBytes() + " (expected: <= " + this.maxInlineMessageLength + ")"); } else { return false; } } else { out.add(this.newInlineRedisMessage(this.type, lineBytes)); this.resetDecoder(); return true; } } // 解碼消息長度 private boolean decodeLength(ByteBuf in, List<Object> out) throws Exception { ByteBuf lineByteBuf = readLine(in); if (lineByteBuf == null) { return false; } else { long length = this.parseRedisNumber(lineByteBuf); if (length < -1L) { throw new RedisCodecException("length: " + length + " (expected: >= " + -1 + ")"); } else { switch(this.type) { case ARRAY_HEADER: out.add(new ArrayHeaderRedisMessage(length)); this.resetDecoder(); return true; case BULK_STRING: if (length > 536870912L) { throw new RedisCodecException("length: " + length + " (expected: <= " + 536870912 + ")"); } this.remainingBulkLength = (int)length; return this.decodeBulkString(in, out); default: throw new RedisCodecException("bad type: " + this.type); } } } } // 解碼多行字符串 private boolean decodeBulkString(ByteBuf in, List<Object> out) throws Exception { switch(this.remainingBulkLength) { case -1: out.add(FullBulkStringRedisMessage.NULL_INSTANCE); this.resetDecoder(); return true; case 0: this.state = RedisDecoder.State.DECODE_BULK_STRING_EOL; return this.decodeBulkStringEndOfLine(in, out); default: out.add(new BulkStringHeaderRedisMessage(this.remainingBulkLength)); this.state = RedisDecoder.State.DECODE_BULK_STRING_CONTENT; return this.decodeBulkStringContent(in, out); } }
編碼器,顧名思義,就是將對象根據 RESP 協議序列化成字節流發送到服務端。編碼器的實現很是簡單,不用考慮拆包等問題,就是分配一個ByteBuf,而後將將消息輸出對象序列化的字節數組塞到ByteBuf中輸出就能夠了。
下面代碼中就是 encode 方法直接調用 writeRedisMessage 方法,根據消息類型進行寫buffer操做。
@Override protected void encode(ChannelHandlerContext ctx, RedisMessage msg, List<Object> out) throws Exception { try { writeRedisMessage(ctx.alloc(), msg, out); } catch (CodecException e) { throw e; } catch (Exception e) { throw new CodecException(e); } } private void writeRedisMessage(ByteBufAllocator allocator, RedisMessage msg, List<Object> out) { // 判斷消息類型,而後調用寫相應消息的方法。 if (msg instanceof InlineCommandRedisMessage) { writeInlineCommandMessage(allocator, (InlineCommandRedisMessage) msg, out); } else if (msg instanceof SimpleStringRedisMessage) { writeSimpleStringMessage(allocator, (SimpleStringRedisMessage) msg, out); } else if (msg instanceof ErrorRedisMessage) { writeErrorMessage(allocator, (ErrorRedisMessage) msg, out); } else if (msg instanceof IntegerRedisMessage) { writeIntegerMessage(allocator, (IntegerRedisMessage) msg, out); } else if (msg instanceof FullBulkStringRedisMessage) { writeFullBulkStringMessage(allocator, (FullBulkStringRedisMessage) msg, out); } else if (msg instanceof BulkStringRedisContent) { writeBulkStringContent(allocator, (BulkStringRedisContent) msg, out); } else if (msg instanceof BulkStringHeaderRedisMessage) { writeBulkStringHeader(allocator, (BulkStringHeaderRedisMessage) msg, out); } else if (msg instanceof ArrayHeaderRedisMessage) { writeArrayHeader(allocator, (ArrayHeaderRedisMessage) msg, out); } else if (msg instanceof ArrayRedisMessage) { writeArrayMessage(allocator, (ArrayRedisMessage) msg, out); } else { throw new CodecException("unknown message type: " + msg); } }
下面代碼主要是實現對應消息按照 RESP 協議 進行序列化操做,具體就是上面樓主說的,分配一個ByteBuf,而後將將消息輸出對象序列化的字節數組塞到ByteBuf中輸出便可。
private static void writeInlineCommandMessage(ByteBufAllocator allocator, InlineCommandRedisMessage msg, List<Object> out) { writeString(allocator, RedisMessageType.INLINE_COMMAND, msg.content(), out); } private static void writeSimpleStringMessage(ByteBufAllocator allocator, SimpleStringRedisMessage msg, List<Object> out) { writeString(allocator, RedisMessageType.SIMPLE_STRING, msg.content(), out); } private static void writeErrorMessage(ByteBufAllocator allocator, ErrorRedisMessage msg, List<Object> out) { writeString(allocator, RedisMessageType.ERROR, msg.content(), out); } private static void writeString(ByteBufAllocator allocator, RedisMessageType type, String content, List<Object> out) { ByteBuf buf = allocator.ioBuffer(type.length() + ByteBufUtil.utf8MaxBytes(content) + RedisConstants.EOL_LENGTH); type.writeTo(buf); ByteBufUtil.writeUtf8(buf, content); buf.writeShort(RedisConstants.EOL_SHORT); out.add(buf); } private void writeIntegerMessage(ByteBufAllocator allocator, IntegerRedisMessage msg, List<Object> out) { ByteBuf buf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + RedisConstants.LONG_MAX_LENGTH + RedisConstants.EOL_LENGTH); RedisMessageType.INTEGER.writeTo(buf); buf.writeBytes(numberToBytes(msg.value())); buf.writeShort(RedisConstants.EOL_SHORT); out.add(buf); } private void writeBulkStringHeader(ByteBufAllocator allocator, BulkStringHeaderRedisMessage msg, List<Object> out) { final ByteBuf buf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + (msg.isNull() ? RedisConstants.NULL_LENGTH : RedisConstants.LONG_MAX_LENGTH + RedisConstants.EOL_LENGTH)); RedisMessageType.BULK_STRING.writeTo(buf); if (msg.isNull()) { buf.writeShort(RedisConstants.NULL_SHORT); } else { buf.writeBytes(numberToBytes(msg.bulkStringLength())); buf.writeShort(RedisConstants.EOL_SHORT); } out.add(buf); } private static void writeBulkStringContent(ByteBufAllocator allocator, BulkStringRedisContent msg, List<Object> out) { out.add(msg.content().retain()); if (msg instanceof LastBulkStringRedisContent) { out.add(allocator.ioBuffer(RedisConstants.EOL_LENGTH).writeShort(RedisConstants.EOL_SHORT)); } } private void writeFullBulkStringMessage(ByteBufAllocator allocator, FullBulkStringRedisMessage msg, List<Object> out) { if (msg.isNull()) { ByteBuf buf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + RedisConstants.NULL_LENGTH + RedisConstants.EOL_LENGTH); RedisMessageType.BULK_STRING.writeTo(buf); buf.writeShort(RedisConstants.NULL_SHORT); buf.writeShort(RedisConstants.EOL_SHORT); out.add(buf); } else { ByteBuf headerBuf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + RedisConstants.LONG_MAX_LENGTH + RedisConstants.EOL_LENGTH); RedisMessageType.BULK_STRING.writeTo(headerBuf); headerBuf.writeBytes(numberToBytes(msg.content().readableBytes())); headerBuf.writeShort(RedisConstants.EOL_SHORT); out.add(headerBuf); out.add(msg.content().retain()); out.add(allocator.ioBuffer(RedisConstants.EOL_LENGTH).writeShort(RedisConstants.EOL_SHORT)); } } /** * Write array header only without body. Use this if you want to write arrays as streaming. */ private void writeArrayHeader(ByteBufAllocator allocator, ArrayHeaderRedisMessage msg, List<Object> out) { writeArrayHeader(allocator, msg.isNull(), msg.length(), out); } /** * Write full constructed array message. */ private void writeArrayMessage(ByteBufAllocator allocator, ArrayRedisMessage msg, List<Object> out) { if (msg.isNull()) { writeArrayHeader(allocator, msg.isNull(), RedisConstants.NULL_VALUE, out); } else { writeArrayHeader(allocator, msg.isNull(), msg.children().size(), out); for (RedisMessage child : msg.children()) { writeRedisMessage(allocator, child, out); } } } private void writeArrayHeader(ByteBufAllocator allocator, boolean isNull, long length, List<Object> out) { if (isNull) { final ByteBuf buf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + RedisConstants.NULL_LENGTH + RedisConstants.EOL_LENGTH); RedisMessageType.ARRAY_HEADER.writeTo(buf); buf.writeShort(RedisConstants.NULL_SHORT); buf.writeShort(RedisConstants.EOL_SHORT); out.add(buf); } else { final ByteBuf buf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + RedisConstants.LONG_MAX_LENGTH + RedisConstants.EOL_LENGTH); RedisMessageType.ARRAY_HEADER.writeTo(buf); buf.writeBytes(numberToBytes(length)); buf.writeShort(RedisConstants.EOL_SHORT); out.add(buf); } }
對於 Netty 源碼,樓主一直是一種敬畏的態度,沒想到今天居然從另外一個方面對 Netty 的冰山一角展開解讀,畢竟萬事開頭難,有了這一次但願以後能夠更順利,在技術成長的道路上一塊兒加油。