最近看了 Redis 的代碼,感受仍是挺簡單的.有衝動想用其它語言實現(抄)一個.原來想用 Python 實現來着.後來想一想試試 Netty.緣由有二java
第一:Java 的NIO 和Netty 的 EventLoop 配合起來和 Redis 的網絡模型很接近.都是 Ractor 模型.甚至 Redis的模型更簡單--只有一個 EventLoop 線程.寫(抄)起來更方便git
第二:Netty 架構挺不錯.借這個機會學習一下.redis
若是咱們從一個很抽象(簡單)的角度看 Redis Server.就是一個監聽在6379的程序, 本質上是一個處理單線線請求的 Hashtable. 而 Redis 的協議也是很是很是的簡單.比 http 協議可簡單多了.bootstrap
如下是這個協議的通常形式:網絡
*<參數數量> CR LF $<參數 1 的字節數量> CR LF <參數 1 的數據> CR LF ... $<參數 N 的字節數量> CR LF <參數 N 的數據> CR LF
這基本就是一個很簡單的有限狀態機.架構
因此我給咱們的命令解析器設置3個狀態.socket
public enum State { NUMBER_OF_ARGS, NUMBER_BYTE_OF_ARGS, ARGS_DATA }
咱們將初始狀態設置NUMBER_OF_ARGS 也就是開始那個綠色的狀態.當有數據到達時.咱們不停的判斷程序的狀態.是哪一個狀態,咱們作啥.ide
while(true){ switch (state()){ case NUMBER_OF_ARGS: //從當前數據中讀取參數個數 break; case NUMBER_BYTE_OF_ARGS: //從數據中讀取參數長度 break; case ARGS_DATA: //按參數長度讀取參數 //判斷參數個數.若是到了最後一個.則跳出,不然狀態轉回NUMBER_BYTE_OF_ARGS break; } }
下面咱們按着咱們上面思路實現一下.oop
package me.yunanw.redisinjava; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.DecoderException; import io.netty.handler.codec.ReplayingDecoder; import java.util.List; /** * Created by yunanw on 2016/10/15. */ public class CommandDecoder extends ReplayingDecoder<CommandDecoder.State> { public enum State { NUMBER_OF_ARGS, NUMBER_BYTE_OF_ARGS, ARGS_DATA } static final char CR = '\r'; static final char LF = '\n'; public CommandDecoder(){ state(State.NUMBER_OF_ARGS); } protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { RedisFrame frame = doDecode(channelHandlerContext,byteBuf,list); if (frame != null){ list.add(frame); } } private RedisFrame doDecode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { RedisFrame frame = null; int currentArgsLen = 0; int argsCount = 0; while(true){ switch (state()){ case NUMBER_OF_ARGS: if (byteBuf.readByte() != '*'){ throw new DecoderException("can not found *"); } argsCount = parseRedisNumber(byteBuf); frame = new RedisFrame(argsCount); checkpoint(State.NUMBER_BYTE_OF_ARGS); break; case NUMBER_BYTE_OF_ARGS: if (byteBuf.readByte() != '$'){ throw new DecoderException("can not found $"); } currentArgsLen = parseRedisNumber(byteBuf); checkpoint(State.ARGS_DATA);; break; case ARGS_DATA: frame.AppendArgs(byteBuf.readBytes(currentArgsLen).array()); if (byteBuf.readByte() != CR || byteBuf.readByte() != LF) throw new DecoderException("can not found CR OR LF"); if ((--argsCount) <=0) return frame; else { checkpoint(State.NUMBER_BYTE_OF_ARGS); } break; default: throw new DecoderException(""); } } } private int parseRedisNumber(ByteBuf byteBuf) { byte readByte = byteBuf.readByte(); boolean negative = readByte == '-'; if (negative) { readByte = byteBuf.readByte(); } int result = 0; do { int digit = readByte - '0'; if (digit >= 0 && digit < 10) { result = (result * 10) + digit; } else { throw new DecoderException("Invalid character in integer"); } } while ((readByte = byteBuf.readByte()) != CR); if ((readByte = byteBuf.readByte()) != LF){ throw new DecoderException("can not found LF"); } return (negative? -result:result); } }
寫到這裏有一個小問題,若是你上面代碼看懂了,你就會發現一個小問題.若是因爲網絡緣由,有時數據能夠並無接收徹底.而咱們的代碼徹底沒有作這方面的考慮? 而 Checkpoint 這是又什麼鬼?學習
第一個問題:
事實上咱們有考慮這個問題.因此咱們繼承了一個相對比較特別Decoder--ReplayingDecoder.咱們看一下ReplayingDecoder的 CallDecode 方法.(這個名字起的很是的直白.你必定明白他是幹啥的)
try { decode(ctx, replayable, out); //省略 } catch (Signal replay) { replay.expect(REPLAY); //省略 // Return to the checkpoint (or oldPosition) and retry. int checkpoint = this.checkpoint; if (checkpoint >= 0) { in.readerIndex(checkpoint); } else { // Called by cleanup() - no need to maintain the readerIndex // anymore because the buffer has been released already. } break; }
Signal replay 是 Netty 中定義的一個錯誤.當咱們讀取錯誤時,Netty 會再等到下次有數據到達時,再試一次Decode 方法.看看能再解析成功.因此咱們就能夠假設置咱們要的數據都已經讀取了.
可是要注意: replaydecoder 的 decode 方法會被反覆調用..因此咱們的代碼中要作好這樣的準備.
二: CheckPoint 就是爲了防止若是每次反覆調用 Decode 時從頭執行,而設置的一個狀態.讓咱們這個 decode 方法有狀態.
好了.如今咱們建立監部分的代碼.這都是套數,直接抄下來就好了
ServerBootstrap bootstrap = new ServerBootstrap(); final DefaultEventExecutorGroup group = new DefaultEventExecutorGroup(1); try { bootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .localAddress(port) .childOption(ChannelOption.TCP_NODELAY, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new CommandDecoder()); p.addLast(new RedisServerHandler()); } }); // Start the server. ChannelFuture f = bootstrap.bind().sync(); // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. group.shutdownGracefully(); }
咱們把 Redis 的協議解析爲RedisFrame 類
package me.yunanw.redisinjava; import java.util.ArrayList; import java.util.List; /** * Created by yunanw on 2016/10/17. */ public class RedisFrame { private int argsCount = 0; List<String> ArgsData = null; public RedisFrame(int argsCount){ this.argsCount = argsCount; this.ArgsData = new ArrayList<String>(argsCount); } public void AppendArgs(byte[] args){ this.ArgsData.add(new String(args)); } public int getCommandCount(){ return ArgsData.size(); } public String GetFristCommand(){ if (ArgsData.size() > 0){ return ArgsData.get(0); } return null; } public String GetCommand(int index){ if (ArgsData.size() > index){ return ArgsData.get(index); } return null; } }
好了.這時你打開 Redis-cli 試試是否是能夠連上咱們的 "假Redis" Server.有意的是---你打開 Redis-cli.他會自動發一個 "Command" 命令.而你無論回覆什麼,它都認爲連上了.