手寫一個Jedis以及JedisPool

Redis、Jedis的用途就不介紹了,不瞭解的能夠先去官網:www.redis.net.cn/tutorial/35… 學習和使用。本文章着重講解如何手動實現一個相似jedis的工具。html

1. 源碼探索

通過源碼的研究,能夠發現Jedis的實現是基於Socket,能夠從Jedis的set(key,value)方法開始追溯:java

public void set(String key, String value) {
    this.set(SafeEncoder.encode(key), SafeEncoder.encode(value));
}
public void set(byte[] key, byte[] value) {
    this.sendCommand(Command.SET, new byte[][]{key, value});
}    
public static void sendCommand(RedisOutputStream os, Protocol.Command command, byte[]... args) {
    sendCommand(os, command.raw, args);
}

private static void sendCommand(RedisOutputStream os, byte[] command, byte[]... args) {
    try {
        os.write((byte)42);
        os.writeIntCrLf(args.length + 1);
        os.write((byte)36);
        os.writeIntCrLf(command.length);
        os.write(command);
        os.writeCrLf();
        byte[][] var3 = args;
        int var4 = args.length;

        for(int var5 = 0; var5 < var4; ++var5) {
            byte[] arg = var3[var5];
            os.write((byte)36);
            os.writeIntCrLf(arg.length);
            os.write(arg);
            os.writeCrLf();
        }

    } catch (IOException var7) {
        throw new JedisConnectionException(var7);
    }
}
複製代碼
System.out.println((char)42);
System.out.println(((char)36));

控制檯:
*
$
複製代碼

能夠大致的看出他得實現過程。把sendCommand方法翻譯一下就是:git

os.write("*".getBytes());
os.write(//數組長度\r\n);
os.write("$".getBytes());
os.write(//命令長度\r\n)
os.write(命令\r\n);
而後進入循環寫入: 
    $+參數長度\r\n
    參數\r\n
複製代碼

整理一下:就是Redis的通訊協議,官方文檔:redis.io/topics/prot…github

總結一下就是如下幾點:redis

*3   // 數據一共有三個數組
//數組1
$6   //下行爲6個長度的字符串
APPEND   
//數組2
$5    // 下行爲5個長度的字符串
fantj
//數組3
$3    // 下行爲3個長度的字符串
666
複製代碼

手寫Jedis

經過源碼能夠看到,它在Connection類中進行sendCommend,那咱們也同樣:數組

Connection.java

負責與Redis的Server端創建鏈接並獲取反饋信息。bash

package com.fantj.jedis.connect;

import com.fantj.jedis.protocol.Protocol;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;

/**
 * 鏈接類
 * 在這裏進行建立鏈接並處理IO請求,用inputStream進行數據回顯,
 * 提供OutputStream給協議層,以便讓其給服務端發送命令
 */
public class Connection {
    private String host = "localhost";
    private int port = 6379;
    private Socket socket;
    private OutputStream outputStream;
    private InputStream inputStream;

    public Connection() {
    }

    public Connection(String host) {
        this.host = host;
    }

    public Connection(String host, int port) {
        this.host = host;
        this.port = port;
    }
    public Connection sendCommand(Protocol.Command cmd, byte[]... args) {
        connect();
        Protocol.sendCommand(outputStream, cmd, args);
        return this;
    }

    private void connect() {

        try {
            if (socket == null) {  //IO複用
                socket = new Socket(host, port);
                inputStream = socket.getInputStream();
                outputStream = socket.getOutputStream();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 操做狀態的返回
     * 好比:SET 操做成功返回 +OK
     */
    public String getStatus() {
        byte[] bytes = new byte[1024];
        try {
            socket.getInputStream().read(bytes);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return new String(bytes);
    }
}
複製代碼

而真正的sendCommend實現是在Protocol類中實現(Jedis源碼也是這樣):app

Protocol.java

負責提供RESP協議支持和拼接。socket

package com.fantj.jedis.protocol;

import java.io.IOException;
import java.io.OutputStream;

/**
 * RESP協議
 * 詳見; https://redis.io/topics/protocol
 */
public class Protocol {
    // jedis後來將這些常量優化爲byte,在os進行寫出的時候對其進行char轉型
    private static final String DOLLAR_BYTE = "$";
    private static final String ASTERISK_BYTE = "*";
    public static final byte PLUS_BYTE = 43;
    public static final byte MINUS_BYTE = 45;
    public static final byte COLON_BYTE = 58;
    private static final String BLANK_BYTE = "\r\n";

    /**
     * 拼接RESP 並 發送write
     */
    public static void sendCommand(OutputStream os,Protocol.Command cmd, byte[] ... args){
        // 1. 生成協議 *3 $3 SET $3 key $5 value
        StringBuffer stringBuffer = new StringBuffer();
        // 1.1 數組長度 *3
        stringBuffer.append(ASTERISK_BYTE).append(args.length+1).append(BLANK_BYTE);
        // 1.2 命令長度 $3
        stringBuffer.append(DOLLAR_BYTE).append(cmd.name().length()).append(BLANK_BYTE);
        // 1.3 命令 SET / GET
        stringBuffer.append(cmd).append(BLANK_BYTE);
        for (byte[] arg: args){
            // 1.4 key/value 長度
            stringBuffer.append(DOLLAR_BYTE).append(arg.length).append(BLANK_BYTE);
            // 1.5 key/value
            stringBuffer.append(new String(arg)).append(BLANK_BYTE);
        }
        // 寫出到服務端
        try {
            os.write(stringBuffer.toString().getBytes());
        } catch (IOException e) {
            e.printStackTrace();
        }

    }


    /**
     * 定義一個枚舉類 存放命令
     */
    public static  enum Command{
        SET , GET , KEYS, APPEND
    }}
複製代碼

Client.java

對外提供API。ide

package com.fantj.jedis.client;

import com.fantj.jedis.connect.Connection;
import com.fantj.jedis.protocol.Protocol;

/**
 * 客戶端
 * 給開發人員使用提供API
 */
public class Client {
    private Connection connection;
    public Client(String host, int port){
        connection = new Connection(host,port);
    }

    public String set(String key, String value) {
        connection.sendCommand(Protocol.Command.SET, key.getBytes(), value.getBytes());
        return connection.getStatus();
    }

    public String get(String key) {
        connection.sendCommand(Protocol.Command.GET, key.getBytes());
        return connection.getStatus();
    }


    public void set(String key, String value, String nx, String ex, int i) {
        connection.sendCommand(Protocol.Command.SET, key.getBytes(), value.getBytes(), nx.getBytes(), ex.getBytes(), String.valueOf(i).getBytes());
    }
    public void append(String key, String value){
        connection.sendCommand(Protocol.Command.APPEND, key.getBytes(),value.getBytes());
    }
}
複製代碼

Jedis.java

對Client類的進一層封裝,留給開發人員使用。

package com.fantj.jedis.client;

public class Jedis extends Client{

    public Jedis(String host, int port) {
        super(host, port);
    }

    @Override
    public String set(String key, String value) {
        return super.set(key, value);
    }

    @Override
    public String get(String key) {
        return super.get(key);
    }

    @Override
    public void set(String key, String value, String nx, String ex, int i) {
        super.set(key, value, nx, ex, i);
    }

    @Override
    public void append(String key, String value) {
        super.append(key, value);
    }
}
複製代碼

測試

/**
 * 測試咱們本身寫的客戶端
 */
public class Main {
    private Jedis client = new Jedis("www.xxx.top",6380);
    @Test
    public void set(){
        client.set("fantj","fantj");
        String result = client.get("fantj");
        System.out.println(result);
    }
    @Test
    public void setNx(){
        client.set("fantj","fantj","NX","EX",10000);
        String result = client.get("fantj");
        System.out.println(result);
    }
    @Test
    public void append(){
//        client.append("fantj","-2019");
        String fantj = client.get("fantj");
        System.out.println(fantj);
    }
    @Test
    public void testChar(){
        System.out.println((char)42);
        System.out.println(((char)36));
    }
}
複製代碼

測試均可以經過。

總結一下原理

假設我執行append("fantj","666")這個命令,那客戶端的操做過程:

  1. RESP協議對命令的分析
*3   // 數據一共有三個數組
//數組1
$6   //下行爲6個長度的字符串
APPEND   
//數組2
$5    // 下行爲5個長度的字符串
fantj
//數組3
$3    // 下行爲3個長度的字符串
666
複製代碼
  1. 而後咱們進行拼接
StringBuffer sb = new StringBuffer();
// 注意每一個類型表示完後都進行換行
sb.append("*").append("3").append("\r\n");
sb.append("$").append("6").append("\r\n");
sb.append("APPEND").append("\r\n");
sb.append("$").append("5").append("\r\n");
sb.append("fantj").append("\r\n");
sb.append("$").append("3").append("\r\n");
sb.append("666").append("\r\n");
複製代碼
  1. 而後用os進行寫出
os.write(sb.toString().getBytes());
複製代碼

鏈接池的實現

Pool.java

/**
 * 鏈接池契約
 */
public interface Pool<T> {
    /**
     * 初始化鏈接池
     * @param maxTotal 最大鏈接數
     * @param maxWaitMillis 最大等待時間
     */
    public void init(int maxTotal, long maxWaitMillis);

    /**
     * 獲取鏈接
     * @return 返回jedis對象
     */
    public Jedis getResource() throws Exception;

    /**
     * 釋放鏈接
     */
    public void release(T t);
}
複製代碼

JedisPool.java

實現方式與ExcutorThreadPool工做流程相似,註釋寫的挺全的就不作詳細解讀了。

package com.fantj.jedis.pool;

import com.fantj.jedis.client.Jedis;

import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class JedisPool implements Pool<Jedis>{

    public JedisPool(String url, int port) {
        this.url = url;
        this.port = port;
        init(maxTotal,maxWaitMillis);
    }

    public JedisPool(String url, int port, int maxTotal, long maxWaitMillis) {
        this.url = url;
        this.port = port;
        this.maxTotal = maxTotal;
        this.maxWaitMillis = maxWaitMillis;
    }

    private String url;
    private int port;
    private int maxTotal = 20;
    private long maxWaitMillis = 1000;
    // 空閒的鏈接queue
    private LinkedBlockingQueue<Jedis> idleWorkQueue = null;
    private Queue<Jedis> activeWorkQueue = null;
    // 當前鏈接數量
    private AtomicInteger count = new AtomicInteger(0);
    @Override
    public void init(int maxTotal, long maxWaitMillis) {
        maxTotal = maxTotal;
        maxWaitMillis = maxWaitMillis;
        idleWorkQueue = new LinkedBlockingQueue<>(maxTotal);
        activeWorkQueue = new LinkedBlockingQueue<>(maxTotal);
    }

    @Override
    public Jedis getResource() throws Exception {
        Jedis jedis = null;
        // 1. 記錄開始時間,檢測超時
        long startTime = System.currentTimeMillis();
        while (true){
            // 2. 從空閒隊列中獲取鏈接,若是拿到,一式兩份存放到活動隊列
            jedis = idleWorkQueue.poll();
            if (jedis != null){
                activeWorkQueue.offer(jedis);
                return jedis;
            }
            // 3. 若是失敗,判斷池是否滿,沒滿則建立
            if (count.get() < maxTotal){
                if (count.incrementAndGet() <= maxTotal){
                    jedis = new Jedis(url,port);
                    activeWorkQueue.offer(jedis);
                    System.out.printf("建立了一個新的鏈接: %s \r\n", jedis.toString());
                    return jedis;
                }else {
                    count.decrementAndGet();
                }
            }
            // 4. 若是鏈接池滿了,則在超時時間內進行等待
            try {
                jedis = idleWorkQueue.poll(maxWaitMillis-(System.currentTimeMillis()-startTime), TimeUnit.MILLISECONDS);
                if (jedis != null){
                    activeWorkQueue.offer(jedis);
                    return jedis;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 5. poll可能被中斷,因此在這裏再進行超時判斷
            if (maxWaitMillis < (System.currentTimeMillis()-startTime)){
                throw new RuntimeException("JedisPool: jedis connect timeout");
            }
        }
    }

    @Override
    public void release(Jedis jedis) {
        if (activeWorkQueue.remove(jedis)){
            idleWorkQueue.offer(jedis);
        }
    }
}
複製代碼

GitHub地址: github.com/fantj2016/e…

歡迎關注公衆號

總GitHub地址: github.com/fantj2016/j…

相關文章
相關標籤/搜索