手把手教你用netty徒手擼一個RedisClient

手把手教你用netty擼一個ZkClientjava

redis通信協議(RESP )是什麼git

上次擼完一個ZkClient以後(手把手教你用netty擼一個ZkClient), 忽然想起我以前寫過一篇redis通信協議的文章(redis通信協議(RESP )是什麼). 既然通信協議都弄清楚了, 那麼擼一個redis的客戶端是否是也是手到擒來?github

說幹就幹, 今天咱們就來手動擼一個redis的客戶端redis

redis key的類型

你們都知道, redis中無論key也好, value也好, 真正存到內容中去時都是byte數組. 可是對於調用方來講(以java來舉例), redis的key類型實際上是有兩種的:數組

一種是String類型的key, 另一種是Object類型的key.bash

jedis應該是java世界裏應用最廣的redis客戶端了. 它雖然是同時支持String類型和Object類型的key. 可是它在把數據發往服務器以前, 都是先把String類型或者Object類型的key序列化成了byte數組後,再發起請求.服務器

然而正如我在redis通信協議(RESP )是什麼那篇文章裏實驗過的, 其實直接把RESP格式的字符串發給redis服務器,它也是能處理的,並且返回的也是RESP格式的字符串.網絡

所以在本文的實現裏面, 我把String類型和Object類型(其實最終都是byte[])的客戶端分開來實現, 分別是RedisStringClient和RedisBinaryClient.socket

RedisStringClient和RedisBinaryClient的處理邏輯基本一致, 惟一的區別是一個是RESP格式的字符串, 一個是RESP格式的byte數組.ide

RedisStringClient的繼承結構

image

RedisBinaryClient的繼承結構

image

能夠看到, 主要的業務邏輯都是在AbstractRedisClient中

public abstract class AbstractRedisClient<T> implements RedisClient<T> {
    private ConnectionPool<T> connectionPool;

    protected AbstractRedisClient(ClientType clientType){
          connectionPool = new ConnectionPool<>(clientType);
    }


    protected <RETURN> RETURN invokeCmd(Cmd<T> cmd, CmdResp<T, RETURN> cmdResp) throws FailedToGetConnectionException{
        RedisConnection<T> connection = null;
        try{
            T data = cmd.build();
            // 從鏈接池中borrow鏈接
            connection = connectionPool.borrowConnection();
            if(connectionPool.checkChannel(connection)){
                // 要鎖定這個鏈接
                connection.lock();
                try{
                    // 發送命令
                    connection.writeAndFlush(data).sync();
                    // 獲取命令的返回結果
                    return cmdResp.parseResp(connection.getResp(RedisConfig.TIME_OUT_MS));
                }catch (Exception e){
                   e.printStackTrace();
                }finally {
                    // 釋放鎖
                    connection.unlock();
                }
            }else{
                throw new FailedToGetConnectionException("can not get connection form connection pool");
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            if(connectionPool.checkChannel(connection)){
                // 把鏈接放回到鏈接池
                connectionPool.returnConnection(connection);
            }
        }
        return null;
    }
}
複製代碼

因爲RedisStringClient更容易理解和表述, 因此本文主要介紹RedisStringClient(可是其實介紹過程,RedisBinaryClient的邏輯也差很少全提到了的)

Cmd 和 CmdResp

AbstractRedisClient類的邏輯其實也不復雜:

  1. 調用Cmd類的build方法構建RESP報文
  2. 把RESP報文發送給redis服務端, 等待響應
  3. redis返回後,調用cmdResp.parseResp()方法解析返回的內容.

因此實際上主要的邏輯仍是在Cmd和CmdResp兩個接口上.

Cmd

Cmd接口用來抽象一個redis命令, 例如set, get等.它只有一個方法:

public interface Cmd<PT> {
    /**
     * 構建RESP 協議
     * @return
     */
    PT build();
}
複製代碼

Cmd接口下面有一個抽象類AbstractCmd, 它進一步豐富Cmd接口:

public abstract class AbstractCmd<T> implements Cmd<T> {
  /**
   * 具體是什麼命令, 例如get set等待
   */
  protected String cmd;
  /**
   * 這個命令後面的參數
   */
  protected List<T> paramList;
  /**
   *  redis命令
   * @return
   */
  protected abstract String getCmd();

}
複製代碼

舉個例子說, 若是咱們想實現 set key val, 這個命令, 那麼cmd就是"set", "key"和"val"都是 paramList的元素

CmdResp

CmdResp用來抽象解析redis返回報文的邏輯, 它沒有抽象類,由具體的類(例如SetStringCmd)直接實現該接口

public interface CmdResp<PARAM,RETURN> {
   /**
    * 解析redis的resp
    * @param resp
    * @return
    */
   RETURN parseResp(PARAM resp);
}
複製代碼

CmdResp有兩個泛型參數PARAM和RETURN:

PARAM其實就是redis返回的RESP的類型,若是是RedisStringClient的話, 那麼redis會返回String類型的RESP報文. 若是是RedisBinaryClient的話, 那麼redis會返回byte數組類型的RESP報文

RETURN實際上是咱們最近想解析出來的數據類型. 這個是根據命令的不一樣而不一樣的, 例如set命令, 是須要返回true 或false的, 因此它Boolean. 而其餘命令可能就不同了,是String或Integer等等

SET命令

先來看一下redis的set命令的參數是怎麼樣:

set key value [EX seconds] [PX milliseconds] [NX|XX]

複製代碼

能夠看到set的cmd就是"set", 有兩個必選參數key和value, 以及三個可選參數. 所以, 能夠抽象出一個AbstractSetCmd類:

/**
 *   set命令的抽象類
 *   命令參數
 *  set key value [EX seconds] [PX milliseconds] [NX|XX]
 * @author laihaohua
 */
public abstract class AbstractSetCmd<T> extends AbstractCmd<T> {

    public AbstractSetCmd(T  key, T  value, T expireMode, T expireTime, T xmode){
        super();
        super.paramList = new ArrayList<>();
        paramList.add(key);
        paramList.add(value);
        // 設置了過時時間
        if(expireMode != null){
            paramList.add(expireMode);
            paramList.add(expireTime);
        }
        // 設置了 XX或NX
        if(xmode != null){
            paramList.add(xmode);
        }
    }

    @Override
    protected String getCmd() {
        return super.cmd = "set";
    }
}
複製代碼

該抽象類能夠有String 和 Binary兩種實現. 咱們來看看String的實現SetStringCmd

**
 *  命令參數
 *  set key value [EX seconds] [PX milliseconds] [NX|XX]
 * @author laihaohua
 */
public class SetStringCmd extends AbstractSetCmd<String>  implements CmdResp<String, Boolean> {
    /**
     * 沒有過時時間
     * @param key
     * @param value
     */
    public SetStringCmd(String key, String value){
          this(key, value, null, 0, null);
    }

    /**
     *
     * @param key
     * @param value
     * @param expireMode
     * @param expireTime
     */
    public SetStringCmd(String key, String value, ExpireMode expireMode, long expireTime){
        this(key, value, expireMode, expireTime, null);

    }
    public SetStringCmd(String key, String value, Xmode xmode){
        this(key, value, null, 0, xmode);
    }
    public SetStringCmd(String key, String value, ExpireMode expireMode, long expireTime, Xmode xmode){
        super( key,
               value ,
               expireMode == null ? null : expireMode.getType(),
               String.valueOf(expireTime),
               xmode == null ? null : xmode.getType() );
    }


    /**
     * 構建請求參數RESP
     * @return
     */
    @Override
    public String build() {
        return CmdBuildUtils.buildString(getCmd(), paramList);
    }

    /**
     * 解析redis返回的RESP
     * @param resp
     * @return
     */
    @Override
    public Boolean parseResp(String resp) {
        char ch = resp.charAt(0);
        // 通常返回 +OK 就是成功
        if(ch == SymbolUtils.OK_PLUS.charAt(0)){
              return true;
        }
        // 其餘的都是失敗
        return false;
    }
}
複製代碼

能夠看到該類的實現也是很是的簡潔的(除了構造方法有點多).

CmdBuildUtils.buildString其實就是把cmd和paramList按順序拼接成RESP格式.

parseResp就更簡潔了.根據RESP協議, 返回"+OK"的就是成功, 不然就是失敗

到這裏, 一個redis命令就完成了, 是否是很是的簡單? 其實GET命令更簡單, 由於它只有一個參數.有興趣的能夠到個人github看看GET命令的實現, 這裏就再也不累贅了.

最後附上它的類繼承結構

image

redis 鏈接池

有些心細的同窗可能已經發現了, redis的RESP裏面並無像ZkClient那樣有一個xid,那麼當一個客戶端收到一個redis響應的時候, 它怎麼知道是哪次請求的響應呢.

理論上說, redis是單線程模型,處理順序確定是按照接受到的請求的順序的,因此本地把請求隊列化感受就能夠解決問題了.然而因爲網絡延時的存在, 服務端接受到的請求的順序, 實際上是有可能跟發送的順序不同的.

因此我這裏換了一種方式實現(其實也就是jedis的實現方式), 使用鏈接池.

所謂的鏈接,其實就是對應netty裏面的一個channel. 所謂的鏈接池, 其實就是在client初始化的時候, 一次建立批量的channel. 而後在發送一個命令以前, 須要向鏈接池獲取鏈接, 獲取到鏈接後,把這個鏈接鎖定,發送請求,接受響應,再釋放鎖, 把鏈接歸還給鏈接池. 這樣就能夠有效的 解決了上面所說的問題.

redis鏈接池涉及到兩個類:RedisConnection和ConnectionPool

RedisConnection

RedisConnection是一個假的代理類, 它內部持有一個NioSocketChannel對象, 並代理裏NioSocketChannel對象的writeAndFlush(msg),disconnect(),close(),isActive()四個方法:

public class RedisConnection<T>{
    private NioSocketChannel socketChannel;
    private Lock lock = new ReentrantLock();
    private SynchronousQueue<T> synchronousQueue;
    private String name;
    public RedisConnection(String name, NioSocketChannel socketChannel, SynchronousQueue<T> synchronousQueue){
        this.name = name;
           this.socketChannel = socketChannel;
           this.synchronousQueue = synchronousQueue;
    }
    public void cleanUp(){
    }
    public T getResp(long timeout) throws InterruptedException {
        return synchronousQueue.poll(timeout, TimeUnit.MILLISECONDS);
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public void lock() {
        lock.lock();
    }
    public void unlock() {
         lock.unlock();
    }

    /***********************代理channel的幾個方法*******************************/
    public ChannelFuture writeAndFlush(Object msg) {
        return socketChannel.writeAndFlush(msg);
    }

    public void disconnect() {
        socketChannel.disconnect();
    }

    public void close() {
        socketChannel.close();
    }

    public boolean isActive() {
       return socketChannel.isActive();
    }
}
複製代碼

除了代理channel的方法以外, RedisConnection也用於本身的屬性和方法:

1. name 
     鏈接的名稱
  2. lock
     用於鎖定該鏈接
  3. synchronousQueue
     一個同步隊列, 用於從netty中的handler中獲取redis的返回
  4. public T getResp(long timeout) 
     從synchronousQueue獲取redis的返回內容,在AbstractRedisClient中被調用
複製代碼

ConnectionPool

ConnectionPool相對就比較簡單了, 它只作了4件事:

  1. 初始化時, 生成必定量的RedisConnection對象
  2. 提供檢測RedisConnection對象是否活躍的方法
  3. 提供borrowConnection方法
  4. 提供returnConnection方法

就這樣, 一個簡陋的鏈接池就完成了.

運行代碼

跟ZkClient同樣, 這個項目一樣提供了體驗用的單元測試RedisStringClientTest和RedisBinaryClientTest, 直接運行這兩個單元測試便可.

不過須要注意的是, 該client只在單機環境下驗證過, 在哨兵模式和集羣模式下都沒有驗證過, 有可能會有異常狀況.不過只有一個節點的redis-cluster的服務端是有測試經過的

至於redis版本, 理論上是redis2.6以上都是沒有問題的, 由於redis2.6之後就都是統一爲RESP協議了

源碼

最後附上github源碼地址:

github.com/NorthWard/a…

感興趣的同窗能夠參考一下,共同窗習進步.

相關文章
相關標籤/搜索