Netty模擬redis服務器

Redis的客戶端與服務端採用叫作 RESP(Redis Serialization Protocol)的網絡通訊協議交換數據,客戶端和服務器經過 TCP 鏈接來進行數據交互, 服務器默認的端口號爲 6379 。客戶端和服務器發送的命令或數據一概以 \r\n (CRLF)結尾。git

RESP支持五種數據類型:github

狀態回覆(status reply):以「+」開頭,表示正確的狀態信息,」+」後就是具體信息​,好比:redis

redis 127.0.0.1:6379> set ss sdf
OK

其實它真正回覆的數據是:+OK\r\n
錯誤回覆(error reply):以」-「開頭,表示錯誤的狀態信息,」-「後就是具體信息,好比:數組

redis 127.0.0.1:6379> incr ss
(error) ERR value is not an integer or out of range

整數回覆(integer reply):以」:」開頭,表示對某些操做的回覆好比DEL, EXISTS, INCR等等服務器

redis 127.0.0.1:6379> incr aa
(integer) 1

批量回復(bulk reply):以」$」開頭,表示下一行的字符串長度,具體字符串在下一行中網絡

多條批量回復(multi bulk reply):以」*」開頭,表示消息體總共有多少行(不包括當前行)」*」是具體行數session

redis 127.0.0.1:6379> get ss
"sdf"

客戶端->服務器
*2\r\n
$3\r\n
get\r\n
$2\r\n
ss\r\n
服務器->客戶端
$3\r\n
sdf\r\n

注:以上寫的都是XX回覆,並非說協議格式只是適用於服務器->客戶端,客戶端->服務器端也一樣使用以上協議格式,其實雙端協議格式的統一更加方便擴展app

回到正題,咱們這裏是經過netty來模擬redis服務器,能夠整理一下思路大概分爲這麼幾步:框架

1.須要一個底層的通訊框架,這裏選擇的是netty4.0.25
2.須要對客戶端穿過來的數據進行解碼(Decoder),其實就是分別處理以上5種數據類型
3.解碼之後咱們封裝成更加利於理解的命令(Command),好比:set<name> foo hello<params>
4.有了命令之後就是處理命令(execute),其實咱們能夠去鏈接正在的redis服務器,不過這裏只是簡單的模擬
5.處理完以後就是封裝回復(Reply),而後編碼(Encoder),須要根據不一樣的命令分別返回之後5種數據類型
6.測試驗證,經過redis-cli去鏈接netty模擬的redis服務器,看可否返回正確的結果

以上思路參考github上的一個項目:https://github.com/spullara/redis-protocol,測試代碼也是在此基礎上作了一個簡化ide

第一步:通訊框架netty

<dependency>
	<groupId>io.netty</groupId>
	<artifactId>netty-all</artifactId>
	<version>4.0.25.Final</version>
</dependency>

第二步:數據類型解碼

public class RedisCommandDecoder extends ReplayingDecoder<Void> {

	public static final char CR = '\r';
	public static final char LF = '\n';

	public static final byte DOLLAR_BYTE = '$';
	public static final byte ASTERISK_BYTE = '*';

	private byte[][] bytes;
	private int arguments = 0;

	@Override
	protected void decode(ChannelHandlerContext ctx, ByteBuf in,
			List<Object> out) throws Exception {
		if (bytes != null) {
			int numArgs = bytes.length;
			for (int i = arguments; i < numArgs; i++) {
				if (in.readByte() == DOLLAR_BYTE) {
					int l = RedisReplyDecoder.readInt(in);
					if (l > Integer.MAX_VALUE) {
						throw new IllegalArgumentException(
								"Java only supports arrays up to "
										+ Integer.MAX_VALUE + " in size");
					}
					int size = (int) l;
					bytes[i] = new byte[size];
					in.readBytes(bytes[i]);
					if (in.bytesBefore((byte) CR) != 0) {
						throw new RedisException("Argument doesn't end in CRLF");
					}
					// Skip CRLF(\r\n)
					in.skipBytes(2);
					arguments++;
					checkpoint();
				} else {
					throw new IOException("Unexpected character");
				}
			}
			try {
				out.add(new Command(bytes));
			} finally {
				bytes = null;
				arguments = 0;
			}
		} else if (in.readByte() == ASTERISK_BYTE) {
			int l = RedisReplyDecoder.readInt(in);
			if (l > Integer.MAX_VALUE) {
				throw new IllegalArgumentException(
						"Java only supports arrays up to " + Integer.MAX_VALUE
								+ " in size");
			}
			int numArgs = (int) l;
			if (numArgs < 0) {
				throw new RedisException("Invalid size: " + numArgs);
			}
			bytes = new byte[numArgs][];
			checkpoint();
			decode(ctx, in, out);
		} else {
			in.readerIndex(in.readerIndex() - 1);
			byte[][] b = new byte[1][];
			b[0] = in.readBytes(in.bytesBefore((byte) CR)).array();
			in.skipBytes(2);
			out.add(new Command(b, true));
		}
	}

}

首先經過接受到以「*」開頭的多條批量類型初始化二維數組byte[][] bytes,以讀取到第一個以\r\n結尾的數據做爲數組的長度,而後再處理以「$」開頭的批量類型。
以上除了處理咱們熟悉的批量和多條批量類型外,還處理了沒有任何標識的數據,其實有一個專門的名字叫Inline命令:
有些時候僅僅是telnet鏈接Redis服務,或者是僅僅向Redis服務發送一個命令進行檢測。雖然Redis協議能夠很容易的實現,可是使用Interactive sessions 並不理想,並且redis-cli也不老是可使用。基於這些緣由,Redis支持特殊的命令來實現上面描述的狀況。這些命令的設計是很人性化的,被稱做Inline 命令。

第三步:封裝command對象

由第二步中能夠看到不論是commandName仍是params都統一放在了字節二維數組裏面,最後封裝在command對象裏面

public class Command {
	public static final byte[] EMPTY_BYTES = new byte[0];

	private final Object name;
	private final Object[] objects;
	private final boolean inline;

	public Command(Object[] objects) {
		this(null, objects, false);
	}

	public Command(Object[] objects, boolean inline) {
		this(null, objects, inline);
	}

	private Command(Object name, Object[] objects, boolean inline) {
		this.name = name;
		this.objects = objects;
		this.inline = inline;
	}

	public byte[] getName() {
		if (name != null)
			return getBytes(name);
		return getBytes(objects[0]);
	}

	public boolean isInline() {
		return inline;
	}

	private byte[] getBytes(Object object) {
		byte[] argument;
		if (object == null) {
			argument = EMPTY_BYTES;
		} else if (object instanceof byte[]) {
			argument = (byte[]) object;
		} else if (object instanceof ByteBuf) {
			argument = ((ByteBuf) object).array();
		} else if (object instanceof String) {
			argument = ((String) object).getBytes(Charsets.UTF_8);
		} else {
			argument = object.toString().getBytes(Charsets.UTF_8);
		}
		return argument;
	}

	public void toArguments(Object[] arguments, Class<?>[] types) {
		for (int position = 0; position < types.length; position++) {
			if (position >= arguments.length) {
				throw new IllegalArgumentException(
						"wrong number of arguments for '"
								+ new String(getName()) + "' command");
			}
			if (objects.length - 1 > position) {
				arguments[position] = objects[1 + position];
			}
		}
	}

}

全部的數據都放在了Object數組裏面,並且能夠經過getName方法知道Object[0]就是commandName

第四步:執行命令

在經歷瞭解碼和封裝以後,下面須要實現handler類,用來處理消息

public class RedisCommandHandler extends SimpleChannelInboundHandler<Command> {

	private Map<String, Wrapper> methods = new HashMap<String, Wrapper>();

	interface Wrapper {
		Reply<?> execute(Command command) throws RedisException;
	}

	public RedisCommandHandler(final RedisServer rs) {
		Class<? extends RedisServer> aClass = rs.getClass();
		for (final Method method : aClass.getMethods()) {
			final Class<?>[] types = method.getParameterTypes();
			methods.put(method.getName(), new Wrapper() {
				@Override
				public Reply<?> execute(Command command) throws RedisException {
					Object[] objects = new Object[types.length];
					try {
						command.toArguments(objects, types);
						return (Reply<?>) method.invoke(rs, objects);
					} catch (Exception e) {
						return new ErrorReply("ERR " + e.getMessage());
					}
				}
			});
		}
	}

	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
		ctx.flush();
	}

	@Override
	protected void channelRead0(ChannelHandlerContext ctx, Command msg)
			throws Exception {
		String name = new String(msg.getName());
		Wrapper wrapper = methods.get(name);
		Reply<?> reply;
		if (wrapper == null) {
			reply = new ErrorReply("unknown command '" + name + "'");
		} else {
			reply = wrapper.execute(msg);
		}
		if (reply == StatusReply.QUIT) {
			ctx.close();
		} else {
			if (msg.isInline()) {
				if (reply == null) {
					reply = new InlineReply(null);
				} else {
					reply = new InlineReply(reply.data());
				}
			}
			if (reply == null) {
				reply = ErrorReply.NYI_REPLY;
			}
			ctx.write(reply);
		}
	}
}

在實例化handler的時候傳入了一個RedisServer對象,這個方法是真正用來處理redis命令的,理論上這個對象應該支持redis的全部命令,不過這裏只是測試全部只提供了2個方法:

public interface RedisServer {

	public BulkReply get(byte[] key0) throws RedisException;

	public StatusReply set(byte[] key0, byte[] value1) throws RedisException;

}

在channelRead0方法中咱們能夠拿到以前封裝好的command方法,而後經過命令名稱執行操做,這裏的RedisServer也很簡單,只是用簡單的hashmap進行臨時的保存數據。

第五步:封裝回復

第四步種咱們能夠看處處理完命令以後,返回了一個Reply對象

public interface Reply<T> {

	byte[] CRLF = new byte[] { RedisReplyDecoder.CR, RedisReplyDecoder.LF };

	T data();

	void write(ByteBuf os) throws IOException;
}

根據上面提到的5種類型再加上一個inline命令,根據不一樣的數據格式進行拼接,好比StatusReply:

public void write(ByteBuf os) throws IOException {
	os.writeByte('+');
	os.writeBytes(statusBytes);
	os.writeBytes(CRLF);
}

因此對應Decoder的Encoder就很簡單了:

public class RedisReplyEncoder extends MessageToByteEncoder<Reply<?>> {
	@Override
	public void encode(ChannelHandlerContext ctx, Reply<?> msg, ByteBuf out)
			throws Exception {
		msg.write(out);
	}
}

只須要將封裝好的Reply返回給客戶端就好了

最後一步:測試

啓動類:

public class Main {
	private static Integer port = 6379;

	public static void main(String[] args) throws InterruptedException {
		final RedisCommandHandler commandHandler = new RedisCommandHandler(
				new SimpleRedisServer());

		ServerBootstrap b = new ServerBootstrap();
		final DefaultEventExecutorGroup group = new DefaultEventExecutorGroup(1);
		try {
			b.group(new NioEventLoopGroup(), new NioEventLoopGroup())
					.channel(NioServerSocketChannel.class)
					.option(ChannelOption.SO_BACKLOG, 100).localAddress(port)
					.childOption(ChannelOption.TCP_NODELAY, true)
					.childHandler(new ChannelInitializer<SocketChannel>() {
						@Override
						public void initChannel(SocketChannel ch)
								throws Exception {
							ChannelPipeline p = ch.pipeline();
							p.addLast(new RedisCommandDecoder());
							p.addLast(new RedisReplyEncoder());
							p.addLast(group, commandHandler);
						}
					});

			ChannelFuture f = b.bind().sync();
			f.channel().closeFuture().sync();
		} finally {
			group.shutdownGracefully();
		}
	}
}

ChannelPipeline分別添加了RedisCommandDecoder、RedisReplyEncoder和RedisCommandHandler,同時咱們啓動的端口和Redis服務器端口是同樣的也是6379

打開redis-cli程序:

redis 127.0.0.1:6379> get dsf
(nil)
redis 127.0.0.1:6379> set dsf dsfds
OK
redis 127.0.0.1:6379> get dsf
"dsfds"
redis 127.0.0.1:6379>

從結果能夠看出和正常使用redis服務器沒有差異

總結

這樣作的意義其實就是能夠把它當作一個redis代理,由這個代理服務器去進行sharding處理,客戶端不直接訪問redis服務器,對客戶端來講,後臺redis集羣是徹底透明的。

我的博客:codingo.xyz

相關文章
相關標籤/搜索