public class Jedis extends BinaryJedis implements JedisCommands, MultiKeyCommands, AdvancedJedisCommands, ScriptingCommands, BasicCommands, ClusterCommands, SentinelCommands, ModuleCommands {
Jedis 繼承了BinaryJedis 同時實現了一系列的Commands接口,BinaryJedis裏主要和redis Server進行交互,一系列Commands接口主要是對redis支持的接口進行分類,像BasicCommands主要包含了info、flush等操做,BinaryJedisCommands 主要包含了get、set等操做,MultiKeyBinaryCommands主要包含了一些批量操做的接口例如mset等。安全
Jedis jed = new Jedis("locahost",6379); jed.set("hello","123"); String out = jed.get("hello");
public Jedis(final String host, final int port) { super(host, port);} public BinaryJedis(final String host, final int port) { client = new Client(host, port);}
/** * Set the string value as value of the key. The string can't be longer than 1073741824 bytes (1 * GB). * <p> * Time complexity: O(1) * @param key * @param value * @return Status code reply */ @Override public String set(final String key, String value) { checkIsInMultiOrPipeline(); client.set(key, value); return client.getStatusCodeReply(); }
@Override public void set(final String key, final String value) { set(SafeEncoder.encode(key), SafeEncoder.encode(value)); }
public void set(final byte[] key, final byte[] value) { sendCommand(Command.SET, key, value); }
public Connection sendCommand(final ProtocolCommand cmd, final byte[]... args) { try { connect(); Protocol.sendCommand(outputStream, cmd, args); return this; } catch (JedisConnectionException ex) { /* * When client send request which formed by invalid protocol, Redis send back error message * before close connection. We try to read it to provide reason of failure. */ try { String errorMessage = Protocol.readErrorLineIfPossible(inputStream); if (errorMessage != null && errorMessage.length() > 0) { ex = new JedisConnectionException(errorMessage, ex.getCause()); } } catch (Exception e) { /* * Catch any IOException or JedisConnectionException occurred from InputStream#read and just * ignore. This approach is safe because reading error message is optional and connection * will eventually be closed. */ } // Any other exceptions related to connection? broken = true; throw ex; } }
public void connect() { if (!isConnected()) { try { socket = new Socket(); // ->@wjw_add socket.setReuseAddress(true); socket.setKeepAlive(true); // Will monitor the TCP connection is // valid socket.setTcpNoDelay(true); // Socket buffer Whetherclosed, to // ensure timely delivery of data socket.setSoLinger(true, 0); // Control calls close () method, // the underlying socket is closed // immediately // <-@wjw_add socket.connect(new InetSocketAddress(host, port), connectionTimeout); socket.setSoTimeout(soTimeout); if (ssl) { if (null == sslSocketFactory) { sslSocketFactory = (SSLSocketFactory)SSLSocketFactory.getDefault(); } socket = (SSLSocket) sslSocketFactory.createSocket(socket, host, port, true); if (null != sslParameters) { ((SSLSocket) socket).setSSLParameters(sslParameters); } if ((null != hostnameVerifier) && (!hostnameVerifier.verify(host, ((SSLSocket) socket).getSession()))) { String message = String.format( "The connection to '%s' failed ssl/tls hostname verification.", host); throw new JedisConnectionException(message); } } outputStream = new RedisOutputStream(socket.getOutputStream()); inputStream = new RedisInputStream(socket.getInputStream()); } catch (IOException ex) { broken = true; throw new JedisConnectionException("Failed connecting to host " + host + ":" + port, ex); } } }
public static void sendCommand(final RedisOutputStream os, final ProtocolCommand command, final byte[]... args) { sendCommand(os, command.getRaw(), args); } private static void sendCommand(final RedisOutputStream os, final byte[] command, final byte[]... args) { try { os.write(ASTERISK_BYTE); os.writeIntCrLf(args.length + 1); os.write(DOLLAR_BYTE); os.writeIntCrLf(command.length); os.write(command); os.writeCrLf(); for (final byte[] arg : args) { os.write(DOLLAR_BYTE); os.writeIntCrLf(arg.length); os.write(arg); os.writeCrLf(); } } catch (IOException e) { throw new JedisConnectionException(e); } }
public String getStatusCodeReply() { flush(); final byte[] resp = (byte[]) readProtocolWithCheckingBroken(); if (null == resp) { return null; } else { return SafeEncoder.encode(resp); } }
protected Object readProtocolWithCheckingBroken() { try { return Protocol.read(inputStream); } catch (JedisConnectionException exc) { broken = true; throw exc; } }
public static Object read(final RedisInputStream is) { return process(is);} private static Object process(final RedisInputStream is) { final byte b = is.readByte(); if (b == PLUS_BYTE) { return processStatusCodeReply(is); } else if (b == DOLLAR_BYTE) { return processBulkReply(is); } else if (b == ASTERISK_BYTE) { return processMultiBulkReply(is); } else if (b == COLON_BYTE) { return processInteger(is); } else if (b == MINUS_BYTE) { processError(is); return null; } else { throw new JedisConnectionException("Unknown reply: " + (char) b); } }
JedisPoolConfig config = new JedisPoolConfig(); config.setTestOnBorrow(true); JedisPool pool = new JedisPool(config, hnp.getHost(), hnp.getPort(), 2000, "foobared"); Jedis jedis = pool.getResource(); jedis.set("foo", "bar"); jedis.close();
public class JedisPoolConfig extends GenericObjectPoolConfig { public JedisPoolConfig() { // defaults to make your life with connection pool easier :) setTestWhileIdle(true); setMinEvictableIdleTimeMillis(60000); setTimeBetweenEvictionRunsMillis(30000); setNumTestsPerEvictionRun(-1); } }
JedisPoolConfig繼承了GenericObjectPoolConfig,GenericObjectPoolConfig是ApacheCommons pool提供的一個對象池的配置。JedisPool使用了ApacheCommons pool來進行鏈接池的實現。GenericObjectPoolConfig提供了不少的參數,咱們可使用JedisPoolConfig也可使用GenericObjectPoolConfig。下面列出一些關鍵的參數:
WHEN_EXHAUSTED_FAIL --> 表示無jedis實例時,直接拋出NoSuchElementException;
WHEN_EXHAUSTED_BLOCK --> 則表示阻塞住,或者達到maxWait時拋出JedisConnectionException;
WHEN_EXHAUSTED_GROW --> 則表示新建一個jedis實例,也就說設置的maxActive無用;
testWhileIdle:若是爲true,表示有一個idle object evitor線程對idle object進行掃描,若是validate失敗,此object會被從pool中drop掉;這一項只有在timeBetweenEvictionRunsMillis大於0時纔有意義;
timeBetweenEvictionRunsMillis:表示idle object evitor兩次掃描之間要sleep的毫秒數;
numTestsPerEvictionRun:表示idle object evitor每次掃描的最多的對象數;
minEvictableIdleTimeMillis:表示一個對象至少停留在idle狀態的最短期,而後才能被idle object evitor掃描並驅逐;這一項只有在timeBetweenEvictionRunsMillis大於0時纔有意義;
softMinEvictableIdleTimeMillis:在minEvictableIdleTimeMillis基礎上,加入了至少minIdle個對象已經在pool裏面了。若是爲-1,evicted不會根據idle time驅逐任何對象。若是minEvictableIdleTimeMillis>0,則此項設置無心義,且只有在timeBetweenEvictionRunsMillis大於0時纔有意義;
配置比較多,這裏我不打算詳細的寫Commons Pool的實現機制,只是說說JedisPool是怎麼實現的。
public JedisPool(final GenericObjectPoolConfig poolConfig, final String host, int port, int timeout, final String password) { this(poolConfig, host, port, timeout, password, Protocol.DEFAULT_DATABASE, null); } public JedisPool(final GenericObjectPoolConfig poolConfig, final String host, int port, int timeout, final String password, final int database, final String clientName) { this(poolConfig, host, port, timeout, timeout, password, database, clientName, false, null, null, null); } public JedisPool(final GenericObjectPoolConfig poolConfig, final String host, int port, final int connectionTimeout, final int soTimeout, final String password, final int database, final String clientName, final boolean ssl, final SSLSocketFactory sslSocketFactory, final SSLParameters sslParameters, final HostnameVerifier hostnameVerifier) { super(poolConfig, new JedisFactory(host, port, connectionTimeout, soTimeout, password, database, clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier)); }
這裏實例化了一個JedisFactory,這個工廠類十分關鍵,這個工廠類是Commons pool來進行多對象池對象進行管理的一個工廠,對於全部對象的建立、銷燬、激活和有效性校驗都是在JedisFactory中進行的:
class JedisFactory implements PooledObjectFactory<Jedis> { private final AtomicReference<HostAndPort> hostAndPort = new AtomicReference<HostAndPort>(); private final int connectionTimeout; private final int soTimeout; private final String password; private final int database; private final String clientName; private final boolean ssl; private final SSLSocketFactory sslSocketFactory; private SSLParameters sslParameters; private HostnameVerifier hostnameVerifier; public JedisFactory(final String host, final int port, final int connectionTimeout, final int soTimeout, final String password, final int database, final String clientName, final boolean ssl, final SSLSocketFactory sslSocketFactory, final SSLParameters sslParameters, final HostnameVerifier hostnameVerifier) { this.hostAndPort.set(new HostAndPort(host, port)); this.connectionTimeout = connectionTimeout; this.soTimeout = soTimeout; this.password = password; this.database = database; this.clientName = clientName; this.ssl = ssl; this.sslSocketFactory = sslSocketFactory; this.sslParameters = sslParameters; this.hostnameVerifier = hostnameVerifier; }
JedisFactory實現了PooledObjectFactory接口,PooledObjectFactory是Commons Pool提供的接口。PooledObjectFactory提供了不少的方法:
public interface PooledObjectFactory<T> { PooledObject<T> makeObject() throws Exception; void destroyObject(PooledObject<T> var1) throws Exception; boolean validateObject(PooledObject<T> var1); void activateObject(PooledObject<T> var1) throws Exception; void passivateObject(PooledObject<T> var1) throws Exception; }
對於對象池對對象的管理使用了PooledObjectFactory中的方法,也算作到了「解耦」,本身的東西本身管,Commons Pool 不侵入任何邏輯。
public Jedis getResource() { Jedis jedis = super.getResource(); jedis.setDataSource(this); return jedis; } public T getResource() { try { return internalPool.borrowObject(); } catch (NoSuchElementException nse) { throw new JedisException("Could not get a resource from the pool", nse); } catch (Exception e) { throw new JedisConnectionException("Could not get a resource from the pool", e); } }
internalPool是一個Commons pool。咱們在獲取jedis的時候調用了Commons pool的borrowObject。表面的意思就是借一個連接。同時將JedisPool的引用交給jedis,便於在close的時候進行連接的返還:
@Override public void close() { if (dataSource != null) { if (client.isBroken()) { this.dataSource.returnBrokenResource(this); } else { this.dataSource.returnResource(this); } } else { client.close(); } }
這樣jedis和JedisPool的實現就分析完了,可是對於Commons Pool對咱們仍是黑盒的,接下來會寫一個對Commons pool的實現原理的筆記。同時呢對於jedis Pool只能進行單實例的連接操做,可是對於數據量大的時候,單實例不能知足需求。這個時候就須要對實例進行「分片」。Jedis也是提供了分片的支持,後面也會總結一個jedis分片的實現。