Redis的客戶端與服務端採用叫作 RESP(Redis Serialization Protocol)的網絡通訊協議交換數據,客戶端和服務器經過 TCP 鏈接來進行數據交互, 服務器默認的端口號爲 6379 。客戶端和服務器發送的命令或數據一概以 \r\n (CRLF)結尾。git
RESP支持五種數據類型:github
狀態回覆(status reply):以「+」開頭,表示正確的狀態信息,」+」後就是具體信息,好比:redis
redis 127.0.0.1:6379> set ss sdf OK
其實它真正回覆的數據是:+OK\r\n
錯誤回覆(error reply):以」-「開頭,表示錯誤的狀態信息,」-「後就是具體信息,好比:數組
redis 127.0.0.1:6379> incr ss (error) ERR value is not an integer or out of range
整數回覆(integer reply):以」:」開頭,表示對某些操做的回覆好比DEL, EXISTS, INCR等等服務器
redis 127.0.0.1:6379> incr aa (integer) 1
批量回復(bulk reply):以」$」開頭,表示下一行的字符串長度,具體字符串在下一行中網絡
多條批量回復(multi bulk reply):以」*」開頭,表示消息體總共有多少行(不包括當前行)」*」是具體行數session
redis 127.0.0.1:6379> get ss "sdf" 客戶端->服務器 *2\r\n $3\r\n get\r\n $2\r\n ss\r\n 服務器->客戶端 $3\r\n sdf\r\n
注:以上寫的都是XX回覆,並非說協議格式只是適用於服務器->客戶端,客戶端->服務器端也一樣使用以上協議格式,其實雙端協議格式的統一更加方便擴展app
回到正題,咱們這裏是經過netty來模擬redis服務器,能夠整理一下思路大概分爲這麼幾步:框架
1.須要一個底層的通訊框架,這裏選擇的是netty4.0.25 2.須要對客戶端穿過來的數據進行解碼(Decoder),其實就是分別處理以上5種數據類型 3.解碼之後咱們封裝成更加利於理解的命令(Command),好比:set<name> foo hello<params> 4.有了命令之後就是處理命令(execute),其實咱們能夠去鏈接正在的redis服務器,不過這裏只是簡單的模擬 5.處理完以後就是封裝回復(Reply),而後編碼(Encoder),須要根據不一樣的命令分別返回之後5種數據類型 6.測試驗證,經過redis-cli去鏈接netty模擬的redis服務器,看可否返回正確的結果
以上思路參考github上的一個項目:https://github.com/spullara/redis-protocol,測試代碼也是在此基礎上作了一個簡化ide
第一步:通訊框架netty
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.0.25.Final</version> </dependency>
第二步:數據類型解碼
public class RedisCommandDecoder extends ReplayingDecoder<Void> { public static final char CR = '\r'; public static final char LF = '\n'; public static final byte DOLLAR_BYTE = '$'; public static final byte ASTERISK_BYTE = '*'; private byte[][] bytes; private int arguments = 0; @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (bytes != null) { int numArgs = bytes.length; for (int i = arguments; i < numArgs; i++) { if (in.readByte() == DOLLAR_BYTE) { int l = RedisReplyDecoder.readInt(in); if (l > Integer.MAX_VALUE) { throw new IllegalArgumentException( "Java only supports arrays up to " + Integer.MAX_VALUE + " in size"); } int size = (int) l; bytes[i] = new byte[size]; in.readBytes(bytes[i]); if (in.bytesBefore((byte) CR) != 0) { throw new RedisException("Argument doesn't end in CRLF"); } // Skip CRLF(\r\n) in.skipBytes(2); arguments++; checkpoint(); } else { throw new IOException("Unexpected character"); } } try { out.add(new Command(bytes)); } finally { bytes = null; arguments = 0; } } else if (in.readByte() == ASTERISK_BYTE) { int l = RedisReplyDecoder.readInt(in); if (l > Integer.MAX_VALUE) { throw new IllegalArgumentException( "Java only supports arrays up to " + Integer.MAX_VALUE + " in size"); } int numArgs = (int) l; if (numArgs < 0) { throw new RedisException("Invalid size: " + numArgs); } bytes = new byte[numArgs][]; checkpoint(); decode(ctx, in, out); } else { in.readerIndex(in.readerIndex() - 1); byte[][] b = new byte[1][]; b[0] = in.readBytes(in.bytesBefore((byte) CR)).array(); in.skipBytes(2); out.add(new Command(b, true)); } } }
首先經過接受到以「*」開頭的多條批量類型初始化二維數組byte[][] bytes,以讀取到第一個以\r\n結尾的數據做爲數組的長度,而後再處理以「$」開頭的批量類型。
以上除了處理咱們熟悉的批量和多條批量類型外,還處理了沒有任何標識的數據,其實有一個專門的名字叫Inline命令:
有些時候僅僅是telnet鏈接Redis服務,或者是僅僅向Redis服務發送一個命令進行檢測。雖然Redis協議能夠很容易的實現,可是使用Interactive sessions 並不理想,並且redis-cli也不老是可使用。基於這些緣由,Redis支持特殊的命令來實現上面描述的狀況。這些命令的設計是很人性化的,被稱做Inline 命令。
第三步:封裝command對象
由第二步中能夠看到不論是commandName仍是params都統一放在了字節二維數組裏面,最後封裝在command對象裏面
public class Command { public static final byte[] EMPTY_BYTES = new byte[0]; private final Object name; private final Object[] objects; private final boolean inline; public Command(Object[] objects) { this(null, objects, false); } public Command(Object[] objects, boolean inline) { this(null, objects, inline); } private Command(Object name, Object[] objects, boolean inline) { this.name = name; this.objects = objects; this.inline = inline; } public byte[] getName() { if (name != null) return getBytes(name); return getBytes(objects[0]); } public boolean isInline() { return inline; } private byte[] getBytes(Object object) { byte[] argument; if (object == null) { argument = EMPTY_BYTES; } else if (object instanceof byte[]) { argument = (byte[]) object; } else if (object instanceof ByteBuf) { argument = ((ByteBuf) object).array(); } else if (object instanceof String) { argument = ((String) object).getBytes(Charsets.UTF_8); } else { argument = object.toString().getBytes(Charsets.UTF_8); } return argument; } public void toArguments(Object[] arguments, Class<?>[] types) { for (int position = 0; position < types.length; position++) { if (position >= arguments.length) { throw new IllegalArgumentException( "wrong number of arguments for '" + new String(getName()) + "' command"); } if (objects.length - 1 > position) { arguments[position] = objects[1 + position]; } } } }
全部的數據都放在了Object數組裏面,並且能夠經過getName方法知道Object[0]就是commandName
第四步:執行命令
在經歷瞭解碼和封裝以後,下面須要實現handler類,用來處理消息
public class RedisCommandHandler extends SimpleChannelInboundHandler<Command> { private Map<String, Wrapper> methods = new HashMap<String, Wrapper>(); interface Wrapper { Reply<?> execute(Command command) throws RedisException; } public RedisCommandHandler(final RedisServer rs) { Class<? extends RedisServer> aClass = rs.getClass(); for (final Method method : aClass.getMethods()) { final Class<?>[] types = method.getParameterTypes(); methods.put(method.getName(), new Wrapper() { @Override public Reply<?> execute(Command command) throws RedisException { Object[] objects = new Object[types.length]; try { command.toArguments(objects, types); return (Reply<?>) method.invoke(rs, objects); } catch (Exception e) { return new ErrorReply("ERR " + e.getMessage()); } } }); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override protected void channelRead0(ChannelHandlerContext ctx, Command msg) throws Exception { String name = new String(msg.getName()); Wrapper wrapper = methods.get(name); Reply<?> reply; if (wrapper == null) { reply = new ErrorReply("unknown command '" + name + "'"); } else { reply = wrapper.execute(msg); } if (reply == StatusReply.QUIT) { ctx.close(); } else { if (msg.isInline()) { if (reply == null) { reply = new InlineReply(null); } else { reply = new InlineReply(reply.data()); } } if (reply == null) { reply = ErrorReply.NYI_REPLY; } ctx.write(reply); } } }
在實例化handler的時候傳入了一個RedisServer對象,這個方法是真正用來處理redis命令的,理論上這個對象應該支持redis的全部命令,不過這裏只是測試全部只提供了2個方法:
public interface RedisServer { public BulkReply get(byte[] key0) throws RedisException; public StatusReply set(byte[] key0, byte[] value1) throws RedisException; }
在channelRead0方法中咱們能夠拿到以前封裝好的command方法,而後經過命令名稱執行操做,這裏的RedisServer也很簡單,只是用簡單的hashmap進行臨時的保存數據。
第五步:封裝回復
第四步種咱們能夠看處處理完命令以後,返回了一個Reply對象
public interface Reply<T> { byte[] CRLF = new byte[] { RedisReplyDecoder.CR, RedisReplyDecoder.LF }; T data(); void write(ByteBuf os) throws IOException; }
根據上面提到的5種類型再加上一個inline命令,根據不一樣的數據格式進行拼接,好比StatusReply:
public void write(ByteBuf os) throws IOException { os.writeByte('+'); os.writeBytes(statusBytes); os.writeBytes(CRLF); }
因此對應Decoder的Encoder就很簡單了:
public class RedisReplyEncoder extends MessageToByteEncoder<Reply<?>> { @Override public void encode(ChannelHandlerContext ctx, Reply<?> msg, ByteBuf out) throws Exception { msg.write(out); } }
只須要將封裝好的Reply返回給客戶端就好了
最後一步:測試
啓動類:
public class Main { private static Integer port = 6379; public static void main(String[] args) throws InterruptedException { final RedisCommandHandler commandHandler = new RedisCommandHandler( new SimpleRedisServer()); ServerBootstrap b = new ServerBootstrap(); final DefaultEventExecutorGroup group = new DefaultEventExecutorGroup(1); try { b.group(new NioEventLoopGroup(), new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100).localAddress(port) .childOption(ChannelOption.TCP_NODELAY, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new RedisCommandDecoder()); p.addLast(new RedisReplyEncoder()); p.addLast(group, commandHandler); } }); ChannelFuture f = b.bind().sync(); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } } }
ChannelPipeline分別添加了RedisCommandDecoder、RedisReplyEncoder和RedisCommandHandler,同時咱們啓動的端口和Redis服務器端口是同樣的也是6379
打開redis-cli程序:
redis 127.0.0.1:6379> get dsf (nil) redis 127.0.0.1:6379> set dsf dsfds OK redis 127.0.0.1:6379> get dsf "dsfds" redis 127.0.0.1:6379>
從結果能夠看出和正常使用redis服務器沒有差異
總結
這樣作的意義其實就是能夠把它當作一個redis代理,由這個代理服務器去進行sharding處理,客戶端不直接訪問redis服務器,對客戶端來講,後臺redis集羣是徹底透明的。
我的博客:codingo.xyz