對於平常開發,Redis因爲單線程的併發模型、豐富的數據結構和簡單的API,深受廣大程序員的喜好。Redis提供了多種語言的API,像java、c和python等。以前一直都是使用redis,可是沒有多redis的API有一個系統的認識。忙裏偷閒,擼一下Redis相關的API的實現,如今學習了一下jedis的源碼,來分析一下Redis的讀寫流程。java
:python
代碼是比較簡單的,並且不少類也沒有那麼多的抽象和繼承,實際上是比較好懂的。commands包裏面主要是封裝的redis支持的各類命令。
exception包主要是封裝了一些redis的exception。
在jedis包下的是一些redis的Client。
jedis的代碼結構大體就是上述這些,這裏咱們就以最簡單的jedis類來看一下讀寫的流程。程序員
這裏是jedis的UML圖:redis
public class Jedis extends BinaryJedis implements JedisCommands, MultiKeyCommands, AdvancedJedisCommands, ScriptingCommands, BasicCommands, ClusterCommands, SentinelCommands, ModuleCommands {
Jedis 繼承了BinaryJedis 同時實現了一系列的Commands接口,BinaryJedis裏主要和redis Server進行交互,一系列Commands接口主要是對redis支持的接口進行分類,像BasicCommands主要包含了info、flush等操做,BinaryJedisCommands 主要包含了get、set等操做,MultiKeyBinaryCommands主要包含了一些批量操做的接口例如mset等。安全
因爲Jedis實現了各類接口,致使它內部的方法十分的多,這裏咱們使用一個簡單的Demo來學習一下Jedis:數據結構
Jedis jed = new Jedis("locahost",6379); jed.set("hello","123"); String out = jed.get("hello");
首先看Jedis的實例化過程:多線程
public Jedis(final String host, final int port) { super(host, port);} public BinaryJedis(final String host, final int port) { client = new Client(host, port);}
Jedis由於繼承了BinaryJedis,大部分的操做都是在BinaryJedis中實現的,在BinaryJedis的構造方法中就實例化了Client。
Client的繼承結構以下:併發
BinaryJedis中的方法主要是對Client作了代理,Client繼承了BinaryClient,BinaryClient繼承了Connection,實現了Commands接口。Client主要作了一些編解碼的工做,BinaryClient作了Command的發送操做,而全部與redisServer交互的工做由Connection完成。app
首先看Set方法:socket
/** * Set the string value as value of the key. The string can't be longer than 1073741824 bytes (1 * GB). * <p> * Time complexity: O(1) * @param key * @param value * @return Status code reply */ @Override public String set(final String key, String value) { checkIsInMultiOrPipeline(); client.set(key, value); return client.getStatusCodeReply(); }
這裏主要委託給Client進行處理。
@Override public void set(final String key, final String value) { set(SafeEncoder.encode(key), SafeEncoder.encode(value)); }
這裏主要是調用了BinaryClient的set方法。
public void set(final byte[] key, final byte[] value) { sendCommand(Command.SET, key, value); }
這裏主要是委託了Connection的sendCommand方法。接下來到了關鍵部分:
public Connection sendCommand(final ProtocolCommand cmd, final byte[]... args) { try { connect(); Protocol.sendCommand(outputStream, cmd, args); return this; } catch (JedisConnectionException ex) { /* * When client send request which formed by invalid protocol, Redis send back error message * before close connection. We try to read it to provide reason of failure. */ try { String errorMessage = Protocol.readErrorLineIfPossible(inputStream); if (errorMessage != null && errorMessage.length() > 0) { ex = new JedisConnectionException(errorMessage, ex.getCause()); } } catch (Exception e) { /* * Catch any IOException or JedisConnectionException occurred from InputStream#read and just * ignore. This approach is safe because reading error message is optional and connection * will eventually be closed. */ } // Any other exceptions related to connection? broken = true; throw ex; } }
public void connect() { if (!isConnected()) { try { socket = new Socket(); // ->@wjw_add socket.setReuseAddress(true); socket.setKeepAlive(true); // Will monitor the TCP connection is // valid socket.setTcpNoDelay(true); // Socket buffer Whetherclosed, to // ensure timely delivery of data socket.setSoLinger(true, 0); // Control calls close () method, // the underlying socket is closed // immediately // <-@wjw_add socket.connect(new InetSocketAddress(host, port), connectionTimeout); socket.setSoTimeout(soTimeout); if (ssl) { if (null == sslSocketFactory) { sslSocketFactory = (SSLSocketFactory)SSLSocketFactory.getDefault(); } socket = (SSLSocket) sslSocketFactory.createSocket(socket, host, port, true); if (null != sslParameters) { ((SSLSocket) socket).setSSLParameters(sslParameters); } if ((null != hostnameVerifier) && (!hostnameVerifier.verify(host, ((SSLSocket) socket).getSession()))) { String message = String.format( "The connection to '%s' failed ssl/tls hostname verification.", host); throw new JedisConnectionException(message); } } outputStream = new RedisOutputStream(socket.getOutputStream()); inputStream = new RedisInputStream(socket.getInputStream()); } catch (IOException ex) { broken = true; throw new JedisConnectionException("Failed connecting to host " + host + ":" + port, ex); } } }
這裏主要使用Socket通訊來實現命令的發送,鏈接使用長鏈接來減少創建鏈接的開銷。並實例化了RedisOutputStream和RedisInputStream。在每一次進行query的時候都會調用connect方法來保證以前鏈接失效以後能新建鏈接並操做成功。
public static void sendCommand(final RedisOutputStream os, final ProtocolCommand command, final byte[]... args) { sendCommand(os, command.getRaw(), args); } private static void sendCommand(final RedisOutputStream os, final byte[] command, final byte[]... args) { try { os.write(ASTERISK_BYTE); os.writeIntCrLf(args.length + 1); os.write(DOLLAR_BYTE); os.writeIntCrLf(command.length); os.write(command); os.writeCrLf(); for (final byte[] arg : args) { os.write(DOLLAR_BYTE); os.writeIntCrLf(arg.length); os.write(arg); os.writeCrLf(); } } catch (IOException e) { throw new JedisConnectionException(e); } }
這裏代碼比較清晰,利用了Protocol提供的一些請求頭來構造一個請求。這裏具體的協議內容就不細解析了,發送完請求以後返回。
以後調用client.getStatusCodeReply();進行返回狀態的獲取:
public String getStatusCodeReply() { flush(); final byte[] resp = (byte[]) readProtocolWithCheckingBroken(); if (null == resp) { return null; } else { return SafeEncoder.encode(resp); } }
首先調用了flush方法,保證以前的寫入能發送出去,以後調用了readProtocolWithCheckingBroken來獲取響應。
protected Object readProtocolWithCheckingBroken() { try { return Protocol.read(inputStream); } catch (JedisConnectionException exc) { broken = true; throw exc; } }
調用Protocol.read進行對RedisInputStream進行讀取,在這過程當中可能會拋出鏈接異常。
public static Object read(final RedisInputStream is) { return process(is);} private static Object process(final RedisInputStream is) { final byte b = is.readByte(); if (b == PLUS_BYTE) { return processStatusCodeReply(is); } else if (b == DOLLAR_BYTE) { return processBulkReply(is); } else if (b == ASTERISK_BYTE) { return processMultiBulkReply(is); } else if (b == COLON_BYTE) { return processInteger(is); } else if (b == MINUS_BYTE) { processError(is); return null; } else { throw new JedisConnectionException("Unknown reply: " + (char) b); } }
最後在read的時候對返回的響應進行了判斷,枚舉出了幾種響應方式,對不一樣的響應進行不一樣的處理。
這裏能夠看出,整個交互過程就是一個Socket通訊過程。按照必定的協議發送請求,以後讀取返回結果。可是這裏也有一個問題就是線程安全問題,顯然Jedis實例是線程不安全的,對於多線程共享jedis實例是會有問題的。同時直接使用jedis不能避免的須要反覆的建立和銷燬Socket,開銷很大。因此就引出了後面的jedisPool的使用。
JedisPool是Jedis提供的一種對Redis的鏈接池,利用鏈接池能夠很好的對Jedis的鏈接作一個很好的掌控,能避免建立和銷燬的開銷,同時能夠進行按期的保活,能避免反覆的建立鏈接。
下面是一個JedisPool例子:
JedisPoolConfig config = new JedisPoolConfig(); config.setTestOnBorrow(true); JedisPool pool = new JedisPool(config, hnp.getHost(), hnp.getPort(), 2000, "foobared"); Jedis jedis = pool.getResource(); jedis.set("foo", "bar"); jedis.close();
能夠看到新建立了一個JedisPoolConfig,用於對JedisPool的配置。這裏沒有使用以前JedisPool的returnResource。由於jedis.close()已經作了相關的returnResource方法。
咱們先看一下JedisPoolConfig是什麼:
public class JedisPoolConfig extends GenericObjectPoolConfig { public JedisPoolConfig() { // defaults to make your life with connection pool easier :) setTestWhileIdle(true); setMinEvictableIdleTimeMillis(60000); setTimeBetweenEvictionRunsMillis(30000); setNumTestsPerEvictionRun(-1); } }
JedisPoolConfig繼承了GenericObjectPoolConfig,GenericObjectPoolConfig是ApacheCommons pool提供的一個對象池的配置。JedisPool使用了ApacheCommons pool來進行鏈接池的實現。GenericObjectPoolConfig提供了不少的參數,咱們可使用JedisPoolConfig也可使用GenericObjectPoolConfig。下面列出一些關鍵的參數:
maxActive:控制一個pool可分配多少個jedis實例,經過pool.getResource()來獲取;若是賦值爲-1,則表示不限制;若是pool已經分配了maxActive個jedis實例,則此時pool的狀態爲exhausted。
maxIdle:控制一個pool最多有多少個狀態爲idle(空閒)的jedis實例;
whenExhaustedAction:表示當pool中的jedis實例都被allocated完時,pool要採起的操做;默認有三種。
WHEN_EXHAUSTED_FAIL --> 表示無jedis實例時,直接拋出NoSuchElementException;
WHEN_EXHAUSTED_BLOCK --> 則表示阻塞住,或者達到maxWait時拋出JedisConnectionException;
WHEN_EXHAUSTED_GROW --> 則表示新建一個jedis實例,也就說設置的maxActive無用;
maxWait:表示當borrow一個jedis實例時,最大的等待時間,若是超過等待時間,則直接拋出JedisConnectionException;
testOnBorrow:在borrow一個jedis實例時,是否提早進行alidate操做;若是爲true,則獲得的jedis實例均是可用的;
testOnReturn:在return給pool時,是否提早進行validate操做;
testWhileIdle:若是爲true,表示有一個idle object evitor線程對idle object進行掃描,若是validate失敗,此object會被從pool中drop掉;這一項只有在timeBetweenEvictionRunsMillis大於0時纔有意義;
timeBetweenEvictionRunsMillis:表示idle object evitor兩次掃描之間要sleep的毫秒數;
numTestsPerEvictionRun:表示idle object evitor每次掃描的最多的對象數;
minEvictableIdleTimeMillis:表示一個對象至少停留在idle狀態的最短期,而後才能被idle object evitor掃描並驅逐;這一項只有在timeBetweenEvictionRunsMillis大於0時纔有意義;
softMinEvictableIdleTimeMillis:在minEvictableIdleTimeMillis基礎上,加入了至少minIdle個對象已經在pool裏面了。若是爲-1,evicted不會根據idle time驅逐任何對象。若是minEvictableIdleTimeMillis>0,則此項設置無心義,且只有在timeBetweenEvictionRunsMillis大於0時纔有意義;
配置比較多,這裏我不打算詳細的寫Commons Pool的實現機制,只是說說JedisPool是怎麼實現的。
JedisPool的實例化過程以下:
public JedisPool(final GenericObjectPoolConfig poolConfig, final String host, int port, int timeout, final String password) { this(poolConfig, host, port, timeout, password, Protocol.DEFAULT_DATABASE, null); } public JedisPool(final GenericObjectPoolConfig poolConfig, final String host, int port, int timeout, final String password, final int database, final String clientName) { this(poolConfig, host, port, timeout, timeout, password, database, clientName, false, null, null, null); } public JedisPool(final GenericObjectPoolConfig poolConfig, final String host, int port, final int connectionTimeout, final int soTimeout, final String password, final int database, final String clientName, final boolean ssl, final SSLSocketFactory sslSocketFactory, final SSLParameters sslParameters, final HostnameVerifier hostnameVerifier) { super(poolConfig, new JedisFactory(host, port, connectionTimeout, soTimeout, password, database, clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier)); }
這裏實例化了一個JedisFactory,這個工廠類十分關鍵,這個工廠類是Commons pool來進行多對象池對象進行管理的一個工廠,對於全部對象的建立、銷燬、激活和有效性校驗都是在JedisFactory中進行的:
class JedisFactory implements PooledObjectFactory<Jedis> { private final AtomicReference<HostAndPort> hostAndPort = new AtomicReference<HostAndPort>(); private final int connectionTimeout; private final int soTimeout; private final String password; private final int database; private final String clientName; private final boolean ssl; private final SSLSocketFactory sslSocketFactory; private SSLParameters sslParameters; private HostnameVerifier hostnameVerifier; public JedisFactory(final String host, final int port, final int connectionTimeout, final int soTimeout, final String password, final int database, final String clientName, final boolean ssl, final SSLSocketFactory sslSocketFactory, final SSLParameters sslParameters, final HostnameVerifier hostnameVerifier) { this.hostAndPort.set(new HostAndPort(host, port)); this.connectionTimeout = connectionTimeout; this.soTimeout = soTimeout; this.password = password; this.database = database; this.clientName = clientName; this.ssl = ssl; this.sslSocketFactory = sslSocketFactory; this.sslParameters = sslParameters; this.hostnameVerifier = hostnameVerifier; }
JedisFactory實現了PooledObjectFactory接口,PooledObjectFactory是Commons Pool提供的接口。PooledObjectFactory提供了不少的方法:
public interface PooledObjectFactory<T> { PooledObject<T> makeObject() throws Exception; void destroyObject(PooledObject<T> var1) throws Exception; boolean validateObject(PooledObject<T> var1); void activateObject(PooledObject<T> var1) throws Exception; void passivateObject(PooledObject<T> var1) throws Exception; }
makeObject爲建立對象的方法。
destroyObject爲銷燬對象的方法。
validateObject爲校驗對象有消息的方法。
activateObject爲激活對象的方法。
passivateObject爲鈍化對象的方法。
對於對象池對對象的管理使用了PooledObjectFactory中的方法,也算作到了「解耦」,本身的東西本身管,Commons Pool 不侵入任何邏輯。
在建立好了JedisPool以後呢,在使用的時候利用getResource來獲取jedis的客戶端:
public Jedis getResource() { Jedis jedis = super.getResource(); jedis.setDataSource(this); return jedis; } public T getResource() { try { return internalPool.borrowObject(); } catch (NoSuchElementException nse) { throw new JedisException("Could not get a resource from the pool", nse); } catch (Exception e) { throw new JedisConnectionException("Could not get a resource from the pool", e); } }
internalPool是一個Commons pool。咱們在獲取jedis的時候調用了Commons pool的borrowObject。表面的意思就是借一個連接。同時將JedisPool的引用交給jedis,便於在close的時候進行連接的返還:
@Override public void close() { if (dataSource != null) { if (client.isBroken()) { this.dataSource.returnBrokenResource(this); } else { this.dataSource.returnResource(this); } } else { client.close(); } }
在jedis調用close方法時候,調用dataSource.returnResource進行連接的返還。
這樣jedis和JedisPool的實現就分析完了,可是對於Commons Pool對咱們仍是黑盒的,接下來會寫一個對Commons pool的實現原理的筆記。同時呢對於jedis Pool只能進行單實例的連接操做,可是對於數據量大的時候,單實例不能知足需求。這個時候就須要對實例進行「分片」。Jedis也是提供了分片的支持,後面也會總結一個jedis分片的實現。