本文主要對當下開源流行的redis客戶端jedis和spring-data-redis的部分核心源碼進行剖析,記錄一下怎麼去實現一個redis的java客戶端以及在使用redis集羣時客戶端的操做須要注意的要點。java
jedis:v2.9.0、node
spring-data-redis:v2.0.8.RELEASEios
它繼承於org.springframework.data.redis.core.RedisAccessor,主要是設置org.springframework.data.redis.connection.RedisConnectionFactory,並在spring bean初始化完畢對connectionFactory進行爲空校驗;redis
實現的核心接口org.springframework.data.redis.core.RedisOperations主要提供了一些基礎操做,但這個接口並不常用,由於redis的數據結構比較複雜,更多具體的操做都封裝在了ValueOperations、ListOperations等,以及BoundValueOperations、BoundListOperations等這兩類操做中,這兩類操做的主要區別在因而否綁定key了,BoundXX接口繼承了BoundKeyOperations這個對key基礎操做的接口。spring
與redis服務端的交互全部操做幾乎都調用了以下方法:緩存
/** * Executes the given action object within a connection that can be exposed or not. Additionally, the connection can * be pipelined. Note the results of the pipeline are discarded (making it suitable for write-only scenarios). * * @param <T> return type * @param action callback object to execute * @param exposeConnection whether to enforce exposure of the native Redis Connection to callback code * @param pipeline whether to pipeline or not the connection for the execution * @return object returned by the action */ @Nullable public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline) { Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it"); Assert.notNull(action, "Callback object must not be null"); RedisConnectionFactory factory = getRequiredConnectionFactory(); RedisConnection conn = null; try { if (enableTransactionSupport) { // only bind resources in case of potential transaction synchronization conn = RedisConnectionUtils.bindConnection(factory, enableTransactionSupport); } else { conn = RedisConnectionUtils.getConnection(factory); } boolean existingConnection = TransactionSynchronizationManager.hasResource(factory); RedisConnection connToUse = preProcessConnection(conn, existingConnection); boolean pipelineStatus = connToUse.isPipelined(); if (pipeline && !pipelineStatus) { connToUse.openPipeline(); } RedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse)); T result = action.doInRedis(connToExpose); // close pipeline if (pipeline && !pipelineStatus) { connToUse.closePipeline(); } // TODO: any other connection processing? return postProcessResult(result, connToUse, existingConnection); } finally { RedisConnectionUtils.releaseConnection(conn, factory); } }
咱們能夠看到:數據結構
Connection conn = null; try { conn = connectionFactory.getConnection(); // handle data // ... } finally { if (conn != null) conn.close();// or release, or disconnect }
/* * (non-Javadoc) * @see org.springframework.data.redis.connection.RedisConnectionFactory#getConnection() */ public RedisConnection getConnection() { if (isRedisClusterAware()) {// 若是clusterConfiguration不爲空,那麼使用集羣模式鏈接 return getClusterConnection(); } Jedis jedis = fetchJedisConnector();// 不然,用Jedis單個節點進行鏈接(是否用鏈接池技術能夠進行設置) String clientName = clientConfiguration.getClientName().orElse(null); JedisConnection connection = (getUsePool() ? new JedisConnection(jedis, pool, getDatabase(), clientName) : new JedisConnection(jedis, null, getDatabase(), clientName)); connection.setConvertPipelineAndTxResults(convertPipelineAndTxResults); return postProcessConnection(connection); }
集羣模式的鏈接創建:app
/* * (non-Javadoc) * @see org.springframework.data.redis.connection.RedisConnectionFactory#getClusterConnection() */ @Override public RedisClusterConnection getClusterConnection() { if (!isRedisClusterAware()) { throw new InvalidDataAccessApiUsageException("Cluster is not configured!"); } return new JedisClusterConnection(cluster, clusterCommandExecutor); }// 將當前類的JedisCluster對象傳遞給一個新的JedisClusterConnection // 而咱們又發現:JedisConnectionFactory implements InitializingBean /* * (non-Javadoc) * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet() */ public void afterPropertiesSet() { ............... .............. if (isRedisClusterAware()) { this.cluster = createCluster(); } } // 再看createCluster(): ... private JedisCluster createCluster() { JedisCluster cluster = createCluster(this.clusterConfig, getPoolConfig()); JedisClusterTopologyProvider topologyProvider = new JedisClusterTopologyProvider(cluster); this.clusterCommandExecutor = new ClusterCommandExecutor(topologyProvider, new JedisClusterConnection.JedisClusterNodeResourceProvider(cluster, topologyProvider), EXCEPTION_TRANSLATION); return cluster; } /** * Creates {@link JedisCluster} for given {@link RedisClusterConfiguration} and {@link GenericObjectPoolConfig}. * * @param clusterConfig must not be {@literal null}. * @param poolConfig can be {@literal null}. * @return the actual {@link JedisCluster}. * @since 1.7 */ protected JedisCluster createCluster(RedisClusterConfiguration clusterConfig, GenericObjectPoolConfig poolConfig) { Assert.notNull(clusterConfig, "Cluster configuration must not be null!"); Set<HostAndPort> hostAndPort = new HashSet<>(); for (RedisNode node : clusterConfig.getClusterNodes()) { hostAndPort.add(new HostAndPort(node.getHost(), node.getPort())); } int redirects = clusterConfig.getMaxRedirects() != null ? clusterConfig.getMaxRedirects() : 5; int connectTimeout = getConnectTimeout(); int readTimeout = getReadTimeout(); return StringUtils.hasText(getPassword()) ? new JedisCluster(hostAndPort, connectTimeout, readTimeout, redirects, getPassword(), poolConfig) : new JedisCluster(hostAndPort, connectTimeout, readTimeout, redirects, poolConfig); } // 能夠看出在spring容器初始化完成以後執行了當前類中JedisCluster對象的實例化
單節點模式的鏈接創建:dom
/** * Returns a Jedis instance to be used as a Redis connection. The instance can be newly created or retrieved from a * pool. * * @return Jedis instance ready for wrapping into a {@link RedisConnection}. */ protected Jedis fetchJedisConnector() { try { if (getUsePool() && pool != null) { return pool.getResource(); } Jedis jedis = createJedis(); // force initialization (see Jedis issue #82) jedis.connect(); potentiallySetClientName(jedis); return jedis; } catch (Exception ex) { throw new RedisConnectionFailureException("Cannot get Jedis connection", ex); } } private Jedis createJedis() { if (providedShardInfo) { return new Jedis(getShardInfo()); } // 能夠看出,每次都從新創建新的客戶端鏈接 Jedis jedis = new Jedis(getHostName(), getPort(), getConnectTimeout(), getReadTimeout(), isUseSsl(), clientConfiguration.getSslSocketFactory().orElse(null), // clientConfiguration.getSslParameters().orElse(null), // clientConfiguration.getHostnameVerifier().orElse(null)); Client client = jedis.getClient(); getRedisPassword().map(String::new).ifPresent(client::setPassword); client.setDb(getDatabase()); return jedis; }
再看兩種模式下鏈接的關閉:socket
/* * (non-Javadoc) * @see org.springframework.beans.factory.DisposableBean#destroy() */ public void destroy() { // spring容器銷燬時,鏈接池和jediscluster進行銷燬或關閉(若是有的話) if (getUsePool() && pool != null) { try { pool.destroy(); } catch (Exception ex) { log.warn("Cannot properly close Jedis pool", ex); } pool = null; } if (cluster != null) { try { cluster.close(); } catch (Exception ex) { log.warn("Cannot properly close Jedis cluster", ex); } try { clusterCommandExecutor.destroy(); } catch (Exception ex) { log.warn("Cannot properly close cluster command executor", ex); } } }
除此以外,單節點模式下的關閉:org.springframework.data.redis.connection.jedis.JedisConnection#close;
public void close() throws DataAccessException { super.close(); // return the connection to the pool if (pool != null) { if (!broken) { // reset the connection try { if (dbIndex > 0) { jedis.select(0); } pool.returnResource(jedis); return; } catch (Exception ex) { DataAccessException dae = convertJedisAccessException(ex); if (broken) { pool.returnBrokenResource(jedis); } else { pool.returnResource(jedis); } throw dae; } } else { pool.returnBrokenResource(jedis); return; } } // else close the connection normally (doing the try/catch dance) Exception exc = null; if (isQueueing()) { try { client.quit(); } catch (Exception ex) { exc = ex; } try { client.disconnect(); } catch (Exception ex) { exc = ex; } return; } try { jedis.quit(); } catch (Exception ex) { exc = ex; } try { jedis.disconnect(); } catch (Exception ex) { exc = ex; } if (exc != null) throw convertJedisAccessException(exc); }
集羣模式下的關閉:org.springframework.data.redis.connection.jedis.JedisClusterConnection#close
/* * (non-Javadoc) * @see org.springframework.data.redis.connection.RedisConnection#close() */ @Override public void close() throws DataAccessException { if (!closed && disposeClusterCommandExecutorOnClose) { try { clusterCommandExecutor.destroy(); } catch (Exception ex) { log.warn("Cannot properly close cluster command executor", ex); } } closed = true;// 僅僅是設置了一下狀態,頂多銷燬一下相關的bean,bean銷燬的代碼以下: } ... ... org.springframework.data.redis.connection.ClusterCommandExecutor#destroy /* * (non-Javadoc) * @see org.springframework.beans.factory.DisposableBean#destroy() */ @Override public void destroy() throws Exception { if (executor instanceof DisposableBean) { ((DisposableBean) executor).destroy(); } if (resourceProvider instanceof DisposableBean) { ((DisposableBean) resourceProvider).destroy(); } }
redis.clients.jedis.JedisClusterCommand#runWithRetries: // 這裏又看到了try..finally操做,對鏈接進行創建和釋放 // JedisCluster繼承於BinaryJedisCluster // BinaryJedisCluster又使用抽象類JedisClusterCommand實現的 private T runWithRetries(byte[] key, int attempts, boolean tryRandomNode, boolean asking) { if (attempts <= 0) { throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections?"); } Jedis connection = null; try { if (asking) { // TODO: Pipeline asking with the original command to make it // faster.... connection = askConnection.get(); connection.asking(); // if asking success, reset asking flag asking = false; } else { if (tryRandomNode) { connection = connectionHandler.getConnection(); } else { connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key)); } } return execute(connection); } catch (JedisNoReachableClusterNodeException jnrcne) { throw jnrcne; } catch (JedisConnectionException jce) { // release current connection before recursion releaseConnection(connection); connection = null; if (attempts <= 1) { //We need this because if node is not reachable anymore - we need to finally initiate slots renewing, //or we can stuck with cluster state without one node in opposite case. //But now if maxAttempts = 1 or 2 we will do it too often. For each time-outed request. //TODO make tracking of successful/unsuccessful operations for node - do renewing only //if there were no successful responses from this node last few seconds this.connectionHandler.renewSlotCache(); //no more redirections left, throw original exception, not JedisClusterMaxRedirectionsException, because it's not MOVED situation throw jce; } return runWithRetries(key, attempts - 1, tryRandomNode, asking); } catch (JedisRedirectionException jre) { // if MOVED redirection occurred, if (jre instanceof JedisMovedDataException) { // it rebuilds cluster's slot cache // recommended by Redis cluster specification this.connectionHandler.renewSlotCache(connection); } // release current connection before recursion or renewing releaseConnection(connection); connection = null; if (jre instanceof JedisAskDataException) { asking = true; askConnection.set(this.connectionHandler.getConnectionFromNode(jre.getTargetNode())); } else if (jre instanceof JedisMovedDataException) { } else { throw new JedisClusterException(jre); } return runWithRetries(key, attempts - 1, false, asking); } finally { releaseConnection(connection); } }
Jedis的實現:
// 由Jedis extends BinaryJedis查看一下redis.clients.jedis.BinaryJedis中引用了redis.clients.jedis.Client對象進行數據交互; // 由Client extends BinaryClient,且BinaryClient extends Connection能夠知道鏈接的開啓和關閉: // redis.clients.jedis.Connection使用Socket與服務端進行通訊 public void connect() { if (!isConnected()) { try { socket = new Socket(); // ->@wjw_add socket.setReuseAddress(true); socket.setKeepAlive(true); // Will monitor the TCP connection is // valid socket.setTcpNoDelay(true); // Socket buffer Whetherclosed, to // ensure timely delivery of data socket.setSoLinger(true, 0); // Control calls close () method, // the underlying socket is closed // immediately // <-@wjw_add socket.connect(new InetSocketAddress(host, port), connectionTimeout); socket.setSoTimeout(soTimeout); if (ssl) { if (null == sslSocketFactory) { sslSocketFactory = (SSLSocketFactory)SSLSocketFactory.getDefault(); } socket = (SSLSocket) sslSocketFactory.createSocket(socket, host, port, true); if (null != sslParameters) { ((SSLSocket) socket).setSSLParameters(sslParameters); } if ((null != hostnameVerifier) && (!hostnameVerifier.verify(host, ((SSLSocket) socket).getSession()))) { String message = String.format( "The connection to '%s' failed ssl/tls hostname verification.", host); throw new JedisConnectionException(message); } } outputStream = new RedisOutputStream(socket.getOutputStream()); inputStream = new RedisInputStream(socket.getInputStream()); } catch (IOException ex) { broken = true; throw new JedisConnectionException(ex); } } } @Override public void close() { disconnect(); } public void disconnect() { if (isConnected()) { try { outputStream.flush(); socket.close(); } catch (IOException ex) { broken = true; throw new JedisConnectionException(ex); } finally { IOUtils.closeQuietly(socket); } } }