Redis、Jedis的用途就不介紹了,不瞭解的能夠先去官網:www.redis.net.cn/tutorial/35… 學習和使用。本文章着重講解如何手動實現一個相似jedis的工具。html
通過源碼的研究,能夠發現Jedis的實現是基於Socket,能夠從Jedis的set(key,value)方法開始追溯:java
public void set(String key, String value) {
this.set(SafeEncoder.encode(key), SafeEncoder.encode(value));
}
public void set(byte[] key, byte[] value) {
this.sendCommand(Command.SET, new byte[][]{key, value});
}
public static void sendCommand(RedisOutputStream os, Protocol.Command command, byte[]... args) {
sendCommand(os, command.raw, args);
}
private static void sendCommand(RedisOutputStream os, byte[] command, byte[]... args) {
try {
os.write((byte)42);
os.writeIntCrLf(args.length + 1);
os.write((byte)36);
os.writeIntCrLf(command.length);
os.write(command);
os.writeCrLf();
byte[][] var3 = args;
int var4 = args.length;
for(int var5 = 0; var5 < var4; ++var5) {
byte[] arg = var3[var5];
os.write((byte)36);
os.writeIntCrLf(arg.length);
os.write(arg);
os.writeCrLf();
}
} catch (IOException var7) {
throw new JedisConnectionException(var7);
}
}
複製代碼
System.out.println((char)42);
System.out.println(((char)36));
控制檯:
*
$
複製代碼
能夠大致的看出他得實現過程。把sendCommand
方法翻譯一下就是:git
os.write("*".getBytes());
os.write(//數組長度\r\n);
os.write("$".getBytes());
os.write(//命令長度\r\n)
os.write(命令\r\n);
而後進入循環寫入:
$+參數長度\r\n
參數\r\n
複製代碼
整理一下:就是Redis的通訊協議,官方文檔:redis.io/topics/prot… 。github
總結一下就是如下幾點:redis
*3 // 數據一共有三個數組
//數組1
$6 //下行爲6個長度的字符串
APPEND
//數組2
$5 // 下行爲5個長度的字符串
fantj
//數組3
$3 // 下行爲3個長度的字符串
666
複製代碼
經過源碼能夠看到,它在Connection
類中進行sendCommend,那咱們也同樣:數組
負責與Redis的Server端創建鏈接並獲取反饋信息。bash
package com.fantj.jedis.connect;
import com.fantj.jedis.protocol.Protocol;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
/**
* 鏈接類
* 在這裏進行建立鏈接並處理IO請求,用inputStream進行數據回顯,
* 提供OutputStream給協議層,以便讓其給服務端發送命令
*/
public class Connection {
private String host = "localhost";
private int port = 6379;
private Socket socket;
private OutputStream outputStream;
private InputStream inputStream;
public Connection() {
}
public Connection(String host) {
this.host = host;
}
public Connection(String host, int port) {
this.host = host;
this.port = port;
}
public Connection sendCommand(Protocol.Command cmd, byte[]... args) {
connect();
Protocol.sendCommand(outputStream, cmd, args);
return this;
}
private void connect() {
try {
if (socket == null) { //IO複用
socket = new Socket(host, port);
inputStream = socket.getInputStream();
outputStream = socket.getOutputStream();
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 操做狀態的返回
* 好比:SET 操做成功返回 +OK
*/
public String getStatus() {
byte[] bytes = new byte[1024];
try {
socket.getInputStream().read(bytes);
} catch (IOException e) {
e.printStackTrace();
}
return new String(bytes);
}
}
複製代碼
而真正的sendCommend
實現是在Protocol
類中實現(Jedis源碼也是這樣):app
負責提供RESP協議支持和拼接。socket
package com.fantj.jedis.protocol;
import java.io.IOException;
import java.io.OutputStream;
/**
* RESP協議
* 詳見; https://redis.io/topics/protocol
*/
public class Protocol {
// jedis後來將這些常量優化爲byte,在os進行寫出的時候對其進行char轉型
private static final String DOLLAR_BYTE = "$";
private static final String ASTERISK_BYTE = "*";
public static final byte PLUS_BYTE = 43;
public static final byte MINUS_BYTE = 45;
public static final byte COLON_BYTE = 58;
private static final String BLANK_BYTE = "\r\n";
/**
* 拼接RESP 並 發送write
*/
public static void sendCommand(OutputStream os,Protocol.Command cmd, byte[] ... args){
// 1. 生成協議 *3 $3 SET $3 key $5 value
StringBuffer stringBuffer = new StringBuffer();
// 1.1 數組長度 *3
stringBuffer.append(ASTERISK_BYTE).append(args.length+1).append(BLANK_BYTE);
// 1.2 命令長度 $3
stringBuffer.append(DOLLAR_BYTE).append(cmd.name().length()).append(BLANK_BYTE);
// 1.3 命令 SET / GET
stringBuffer.append(cmd).append(BLANK_BYTE);
for (byte[] arg: args){
// 1.4 key/value 長度
stringBuffer.append(DOLLAR_BYTE).append(arg.length).append(BLANK_BYTE);
// 1.5 key/value
stringBuffer.append(new String(arg)).append(BLANK_BYTE);
}
// 寫出到服務端
try {
os.write(stringBuffer.toString().getBytes());
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 定義一個枚舉類 存放命令
*/
public static enum Command{
SET , GET , KEYS, APPEND
}}
複製代碼
對外提供API。ide
package com.fantj.jedis.client;
import com.fantj.jedis.connect.Connection;
import com.fantj.jedis.protocol.Protocol;
/**
* 客戶端
* 給開發人員使用提供API
*/
public class Client {
private Connection connection;
public Client(String host, int port){
connection = new Connection(host,port);
}
public String set(String key, String value) {
connection.sendCommand(Protocol.Command.SET, key.getBytes(), value.getBytes());
return connection.getStatus();
}
public String get(String key) {
connection.sendCommand(Protocol.Command.GET, key.getBytes());
return connection.getStatus();
}
public void set(String key, String value, String nx, String ex, int i) {
connection.sendCommand(Protocol.Command.SET, key.getBytes(), value.getBytes(), nx.getBytes(), ex.getBytes(), String.valueOf(i).getBytes());
}
public void append(String key, String value){
connection.sendCommand(Protocol.Command.APPEND, key.getBytes(),value.getBytes());
}
}
複製代碼
對Client類的進一層封裝,留給開發人員使用。
package com.fantj.jedis.client;
public class Jedis extends Client{
public Jedis(String host, int port) {
super(host, port);
}
@Override
public String set(String key, String value) {
return super.set(key, value);
}
@Override
public String get(String key) {
return super.get(key);
}
@Override
public void set(String key, String value, String nx, String ex, int i) {
super.set(key, value, nx, ex, i);
}
@Override
public void append(String key, String value) {
super.append(key, value);
}
}
複製代碼
/**
* 測試咱們本身寫的客戶端
*/
public class Main {
private Jedis client = new Jedis("www.xxx.top",6380);
@Test
public void set(){
client.set("fantj","fantj");
String result = client.get("fantj");
System.out.println(result);
}
@Test
public void setNx(){
client.set("fantj","fantj","NX","EX",10000);
String result = client.get("fantj");
System.out.println(result);
}
@Test
public void append(){
// client.append("fantj","-2019");
String fantj = client.get("fantj");
System.out.println(fantj);
}
@Test
public void testChar(){
System.out.println((char)42);
System.out.println(((char)36));
}
}
複製代碼
測試均可以經過。
假設我執行append("fantj","666")
這個命令,那客戶端的操做過程:
*3 // 數據一共有三個數組
//數組1
$6 //下行爲6個長度的字符串
APPEND
//數組2
$5 // 下行爲5個長度的字符串
fantj
//數組3
$3 // 下行爲3個長度的字符串
666
複製代碼
StringBuffer sb = new StringBuffer();
// 注意每一個類型表示完後都進行換行
sb.append("*").append("3").append("\r\n");
sb.append("$").append("6").append("\r\n");
sb.append("APPEND").append("\r\n");
sb.append("$").append("5").append("\r\n");
sb.append("fantj").append("\r\n");
sb.append("$").append("3").append("\r\n");
sb.append("666").append("\r\n");
複製代碼
os.write(sb.toString().getBytes());
複製代碼
/**
* 鏈接池契約
*/
public interface Pool<T> {
/**
* 初始化鏈接池
* @param maxTotal 最大鏈接數
* @param maxWaitMillis 最大等待時間
*/
public void init(int maxTotal, long maxWaitMillis);
/**
* 獲取鏈接
* @return 返回jedis對象
*/
public Jedis getResource() throws Exception;
/**
* 釋放鏈接
*/
public void release(T t);
}
複製代碼
實現方式與ExcutorThreadPool工做流程相似,註釋寫的挺全的就不作詳細解讀了。
package com.fantj.jedis.pool;
import com.fantj.jedis.client.Jedis;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class JedisPool implements Pool<Jedis>{
public JedisPool(String url, int port) {
this.url = url;
this.port = port;
init(maxTotal,maxWaitMillis);
}
public JedisPool(String url, int port, int maxTotal, long maxWaitMillis) {
this.url = url;
this.port = port;
this.maxTotal = maxTotal;
this.maxWaitMillis = maxWaitMillis;
}
private String url;
private int port;
private int maxTotal = 20;
private long maxWaitMillis = 1000;
// 空閒的鏈接queue
private LinkedBlockingQueue<Jedis> idleWorkQueue = null;
private Queue<Jedis> activeWorkQueue = null;
// 當前鏈接數量
private AtomicInteger count = new AtomicInteger(0);
@Override
public void init(int maxTotal, long maxWaitMillis) {
maxTotal = maxTotal;
maxWaitMillis = maxWaitMillis;
idleWorkQueue = new LinkedBlockingQueue<>(maxTotal);
activeWorkQueue = new LinkedBlockingQueue<>(maxTotal);
}
@Override
public Jedis getResource() throws Exception {
Jedis jedis = null;
// 1. 記錄開始時間,檢測超時
long startTime = System.currentTimeMillis();
while (true){
// 2. 從空閒隊列中獲取鏈接,若是拿到,一式兩份存放到活動隊列
jedis = idleWorkQueue.poll();
if (jedis != null){
activeWorkQueue.offer(jedis);
return jedis;
}
// 3. 若是失敗,判斷池是否滿,沒滿則建立
if (count.get() < maxTotal){
if (count.incrementAndGet() <= maxTotal){
jedis = new Jedis(url,port);
activeWorkQueue.offer(jedis);
System.out.printf("建立了一個新的鏈接: %s \r\n", jedis.toString());
return jedis;
}else {
count.decrementAndGet();
}
}
// 4. 若是鏈接池滿了,則在超時時間內進行等待
try {
jedis = idleWorkQueue.poll(maxWaitMillis-(System.currentTimeMillis()-startTime), TimeUnit.MILLISECONDS);
if (jedis != null){
activeWorkQueue.offer(jedis);
return jedis;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
// 5. poll可能被中斷,因此在這裏再進行超時判斷
if (maxWaitMillis < (System.currentTimeMillis()-startTime)){
throw new RuntimeException("JedisPool: jedis connect timeout");
}
}
}
@Override
public void release(Jedis jedis) {
if (activeWorkQueue.remove(jedis)){
idleWorkQueue.offer(jedis);
}
}
}
複製代碼
GitHub地址: github.com/fantj2016/e…
總GitHub地址: github.com/fantj2016/j…