上次擼完一個ZkClient以後(手把手教你用netty擼一個ZkClient), 忽然想起我以前寫過一篇redis通信協議的文章(redis通信協議(RESP )是什麼). 既然通信協議都弄清楚了, 那麼擼一個redis的客戶端是否是也是手到擒來?github
說幹就幹, 今天咱們就來手動擼一個redis的客戶端redis
你們都知道, redis中無論key也好, value也好, 真正存到內容中去時都是byte數組. 可是對於調用方來講(以java來舉例), redis的key類型實際上是有兩種的:數組
一種是String類型的key, 另一種是Object類型的key.bash
jedis應該是java世界裏應用最廣的redis客戶端了. 它雖然是同時支持String類型和Object類型的key. 可是它在把數據發往服務器以前, 都是先把String類型或者Object類型的key序列化成了byte數組後,再發起請求.服務器
然而正如我在redis通信協議(RESP )是什麼那篇文章裏實驗過的, 其實直接把RESP格式的字符串發給redis服務器,它也是能處理的,並且返回的也是RESP格式的字符串.網絡
所以在本文的實現裏面, 我把String類型和Object類型(其實最終都是byte[])的客戶端分開來實現, 分別是RedisStringClient和RedisBinaryClient.socket
RedisStringClient和RedisBinaryClient的處理邏輯基本一致, 惟一的區別是一個是RESP格式的字符串, 一個是RESP格式的byte數組.ide
RedisStringClient的繼承結構
RedisBinaryClient的繼承結構
能夠看到, 主要的業務邏輯都是在AbstractRedisClient中
public abstract class AbstractRedisClient<T> implements RedisClient<T> {
private ConnectionPool<T> connectionPool;
protected AbstractRedisClient(ClientType clientType){
connectionPool = new ConnectionPool<>(clientType);
}
protected <RETURN> RETURN invokeCmd(Cmd<T> cmd, CmdResp<T, RETURN> cmdResp) throws FailedToGetConnectionException{
RedisConnection<T> connection = null;
try{
T data = cmd.build();
// 從鏈接池中borrow鏈接
connection = connectionPool.borrowConnection();
if(connectionPool.checkChannel(connection)){
// 要鎖定這個鏈接
connection.lock();
try{
// 發送命令
connection.writeAndFlush(data).sync();
// 獲取命令的返回結果
return cmdResp.parseResp(connection.getResp(RedisConfig.TIME_OUT_MS));
}catch (Exception e){
e.printStackTrace();
}finally {
// 釋放鎖
connection.unlock();
}
}else{
throw new FailedToGetConnectionException("can not get connection form connection pool");
}
}catch (Exception e){
e.printStackTrace();
}finally {
if(connectionPool.checkChannel(connection)){
// 把鏈接放回到鏈接池
connectionPool.returnConnection(connection);
}
}
return null;
}
}
複製代碼
因爲RedisStringClient更容易理解和表述, 因此本文主要介紹RedisStringClient(可是其實介紹過程,RedisBinaryClient的邏輯也差很少全提到了的)
AbstractRedisClient類的邏輯其實也不復雜:
因此實際上主要的邏輯仍是在Cmd和CmdResp兩個接口上.
Cmd接口用來抽象一個redis命令, 例如set, get等.它只有一個方法:
public interface Cmd<PT> {
/**
* 構建RESP 協議
* @return
*/
PT build();
}
複製代碼
Cmd接口下面有一個抽象類AbstractCmd, 它進一步豐富Cmd接口:
public abstract class AbstractCmd<T> implements Cmd<T> {
/**
* 具體是什麼命令, 例如get set等待
*/
protected String cmd;
/**
* 這個命令後面的參數
*/
protected List<T> paramList;
/**
* redis命令
* @return
*/
protected abstract String getCmd();
}
複製代碼
舉個例子說, 若是咱們想實現 set key val, 這個命令, 那麼cmd就是"set", "key"和"val"都是 paramList的元素
CmdResp用來抽象解析redis返回報文的邏輯, 它沒有抽象類,由具體的類(例如SetStringCmd)直接實現該接口
public interface CmdResp<PARAM,RETURN> {
/**
* 解析redis的resp
* @param resp
* @return
*/
RETURN parseResp(PARAM resp);
}
複製代碼
CmdResp有兩個泛型參數PARAM和RETURN:
PARAM其實就是redis返回的RESP的類型,若是是RedisStringClient的話, 那麼redis會返回String類型的RESP報文. 若是是RedisBinaryClient的話, 那麼redis會返回byte數組類型的RESP報文
RETURN實際上是咱們最近想解析出來的數據類型. 這個是根據命令的不一樣而不一樣的, 例如set命令, 是須要返回true 或false的, 因此它Boolean. 而其餘命令可能就不同了,是String或Integer等等
先來看一下redis的set命令的參數是怎麼樣:
set key value [EX seconds] [PX milliseconds] [NX|XX]
複製代碼
能夠看到set的cmd就是"set", 有兩個必選參數key和value, 以及三個可選參數. 所以, 能夠抽象出一個AbstractSetCmd類:
/**
* set命令的抽象類
* 命令參數
* set key value [EX seconds] [PX milliseconds] [NX|XX]
* @author laihaohua
*/
public abstract class AbstractSetCmd<T> extends AbstractCmd<T> {
public AbstractSetCmd(T key, T value, T expireMode, T expireTime, T xmode){
super();
super.paramList = new ArrayList<>();
paramList.add(key);
paramList.add(value);
// 設置了過時時間
if(expireMode != null){
paramList.add(expireMode);
paramList.add(expireTime);
}
// 設置了 XX或NX
if(xmode != null){
paramList.add(xmode);
}
}
@Override
protected String getCmd() {
return super.cmd = "set";
}
}
複製代碼
該抽象類能夠有String 和 Binary兩種實現. 咱們來看看String的實現SetStringCmd
**
* 命令參數
* set key value [EX seconds] [PX milliseconds] [NX|XX]
* @author laihaohua
*/
public class SetStringCmd extends AbstractSetCmd<String> implements CmdResp<String, Boolean> {
/**
* 沒有過時時間
* @param key
* @param value
*/
public SetStringCmd(String key, String value){
this(key, value, null, 0, null);
}
/**
*
* @param key
* @param value
* @param expireMode
* @param expireTime
*/
public SetStringCmd(String key, String value, ExpireMode expireMode, long expireTime){
this(key, value, expireMode, expireTime, null);
}
public SetStringCmd(String key, String value, Xmode xmode){
this(key, value, null, 0, xmode);
}
public SetStringCmd(String key, String value, ExpireMode expireMode, long expireTime, Xmode xmode){
super( key,
value ,
expireMode == null ? null : expireMode.getType(),
String.valueOf(expireTime),
xmode == null ? null : xmode.getType() );
}
/**
* 構建請求參數RESP
* @return
*/
@Override
public String build() {
return CmdBuildUtils.buildString(getCmd(), paramList);
}
/**
* 解析redis返回的RESP
* @param resp
* @return
*/
@Override
public Boolean parseResp(String resp) {
char ch = resp.charAt(0);
// 通常返回 +OK 就是成功
if(ch == SymbolUtils.OK_PLUS.charAt(0)){
return true;
}
// 其餘的都是失敗
return false;
}
}
複製代碼
能夠看到該類的實現也是很是的簡潔的(除了構造方法有點多).
CmdBuildUtils.buildString其實就是把cmd和paramList按順序拼接成RESP格式.
parseResp就更簡潔了.根據RESP協議, 返回"+OK"的就是成功, 不然就是失敗
到這裏, 一個redis命令就完成了, 是否是很是的簡單? 其實GET命令更簡單, 由於它只有一個參數.有興趣的能夠到個人github看看GET命令的實現, 這裏就再也不累贅了.
最後附上它的類繼承結構
有些心細的同窗可能已經發現了, redis的RESP裏面並無像ZkClient那樣有一個xid,那麼當一個客戶端收到一個redis響應的時候, 它怎麼知道是哪次請求的響應呢.
理論上說, redis是單線程模型,處理順序確定是按照接受到的請求的順序的,因此本地把請求隊列化感受就能夠解決問題了.然而因爲網絡延時的存在, 服務端接受到的請求的順序, 實際上是有可能跟發送的順序不同的.
因此我這裏換了一種方式實現(其實也就是jedis的實現方式), 使用鏈接池.
所謂的鏈接,其實就是對應netty裏面的一個channel. 所謂的鏈接池, 其實就是在client初始化的時候, 一次建立批量的channel. 而後在發送一個命令以前, 須要向鏈接池獲取鏈接, 獲取到鏈接後,把這個鏈接鎖定,發送請求,接受響應,再釋放鎖, 把鏈接歸還給鏈接池. 這樣就能夠有效的 解決了上面所說的問題.
redis鏈接池涉及到兩個類:RedisConnection和ConnectionPool
RedisConnection是一個假的代理類, 它內部持有一個NioSocketChannel對象, 並代理裏NioSocketChannel對象的writeAndFlush(msg),disconnect(),close(),isActive()四個方法:
public class RedisConnection<T>{
private NioSocketChannel socketChannel;
private Lock lock = new ReentrantLock();
private SynchronousQueue<T> synchronousQueue;
private String name;
public RedisConnection(String name, NioSocketChannel socketChannel, SynchronousQueue<T> synchronousQueue){
this.name = name;
this.socketChannel = socketChannel;
this.synchronousQueue = synchronousQueue;
}
public void cleanUp(){
}
public T getResp(long timeout) throws InterruptedException {
return synchronousQueue.poll(timeout, TimeUnit.MILLISECONDS);
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public void lock() {
lock.lock();
}
public void unlock() {
lock.unlock();
}
/***********************代理channel的幾個方法*******************************/
public ChannelFuture writeAndFlush(Object msg) {
return socketChannel.writeAndFlush(msg);
}
public void disconnect() {
socketChannel.disconnect();
}
public void close() {
socketChannel.close();
}
public boolean isActive() {
return socketChannel.isActive();
}
}
複製代碼
除了代理channel的方法以外, RedisConnection也用於本身的屬性和方法:
1. name
鏈接的名稱
2. lock
用於鎖定該鏈接
3. synchronousQueue
一個同步隊列, 用於從netty中的handler中獲取redis的返回
4. public T getResp(long timeout)
從synchronousQueue獲取redis的返回內容,在AbstractRedisClient中被調用
複製代碼
ConnectionPool相對就比較簡單了, 它只作了4件事:
就這樣, 一個簡陋的鏈接池就完成了.
跟ZkClient同樣, 這個項目一樣提供了體驗用的單元測試RedisStringClientTest和RedisBinaryClientTest, 直接運行這兩個單元測試便可.
不過須要注意的是, 該client只在單機環境下驗證過, 在哨兵模式和集羣模式下都沒有驗證過, 有可能會有異常狀況.不過只有一個節點的redis-cluster的服務端是有測試經過的
至於redis版本, 理論上是redis2.6以上都是沒有問題的, 由於redis2.6之後就都是統一爲RESP協議了
最後附上github源碼地址:
感興趣的同窗能夠參考一下,共同窗習進步.