在項目中咱們常常使用spring-data-redis來操做Redis,它封裝了Jedis客戶端來與Redis服務器進行各類命令操做。因爲最近用到了Redis Cluster集羣功能,這裏就分析總結一下Jedis cluster集羣初始化主要過程及源碼。node
jar版本: spring-data-redis-1.8.4-RELEASE.jar、jedis-2.9.0.jargit
測試環境: Redis 3.2.8,八個集羣節點github
applicationContext-redis-cluster.xml 配置文件:redis
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd"> <!-- 鏈接池配置. --> <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig"> <!-- 鏈接池中最大鏈接數。高版本:maxTotal,低版本:maxActive --> <property name="maxTotal" value="8" /> <!-- 鏈接池中最大空閒的鏈接數. --> <property name="maxIdle" value="4" /> <!-- 鏈接池中最少空閒的鏈接數. --> <property name="minIdle" value="1" /> <!-- 當鏈接池資源耗盡時,調用者最大阻塞的時間,超時將跑出異常。單位,毫秒數;默認爲-1.表示永不超時。高版本:maxWaitMillis,低版本:maxWait --> <property name="maxWaitMillis" value="5000" /> <!-- 鏈接空閒的最小時間,達到此值後空閒鏈接將可能會被移除。負值(-1)表示不移除. --> <property name="minEvictableIdleTimeMillis" value="300000" /> <!-- 對於「空閒連接」檢測線程而言,每次檢測的連接資源的個數。默認爲3 --> <property name="numTestsPerEvictionRun" value="3" /> <!-- 「空閒連接」檢測線程,檢測的週期,毫秒數。若是爲負值,表示不運行「檢測線程」。默認爲-1. --> <property name="timeBetweenEvictionRunsMillis" value="60000" /> <!-- testOnBorrow:向調用者輸出「連接」資源時,是否檢測是有有效,若是無效則從鏈接池中移除,並嘗試獲取繼續獲取。默認爲false。建議保持默認值. --> <!-- testOnReturn:向鏈接池「歸還」連接時,是否檢測「連接」對象的有效性。默認爲false。建議保持默認值. --> <!-- testWhileIdle:向調用者輸出「連接」對象時,是否檢測它的空閒超時;默認爲false。若是「連接」空閒超時,將會被移除。建議保持默認值. --> <!-- whenExhaustedAction:當「鏈接池」中active數量達到閥值時,即「連接」資源耗盡時,鏈接池須要採起的手段, 默認爲1(0:拋出異常。1:阻塞,直到有可用連接資源。2:強制建立新的連接資源) --> </bean> <bean id="n1" class="org.springframework.data.redis.connection.RedisNode"> <constructor-arg value="127.0.0.1" /> <constructor-arg value="6379" type="int" /> </bean> <bean id="n2" class="org.springframework.data.redis.connection.RedisNode"> <constructor-arg value="127.0.0.1" /> <constructor-arg value="6380" type="int" /> </bean> <bean id="n3" class="org.springframework.data.redis.connection.RedisNode"> <constructor-arg value="127.0.0.1" /> <constructor-arg value="6381" type="int" /> </bean> <bean id="redisClusterConfiguration" class="org.springframework.data.redis.connection.RedisClusterConfiguration"> <property name="clusterNodes"> <set> <ref bean="n1" /> <ref bean="n2" /> <ref bean="n3" /> </set> </property> <property name="maxRedirects" value="5" /> </bean> <bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"> <constructor-arg ref="redisClusterConfiguration" /> <constructor-arg ref="jedisPoolConfig" /> </bean> <!-- Spring提供的訪問Redis類. --> <bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate"> <property name="connectionFactory" ref="jedisConnectionFactory" /> <property name="KeySerializer"> <bean class="org.springframework.data.redis.serializer.StringRedisSerializer" /> </property> <property name="ValueSerializer"> <bean class="org.springframework.data.redis.serializer.StringRedisSerializer" /> </property> <property name="hashKeySerializer"> <bean class="org.springframework.data.redis.serializer.StringRedisSerializer" /> </property> <property name="hashValueSerializer"> <bean class="org.springframework.data.redis.serializer.StringRedisSerializer" /> </property> </bean> <!-- Redis配置結束 --> </beans>
Jedis與Redis集羣交互時,涉及的類能夠分爲兩類,分別以下:spring
一、Redis集羣信息配置類:緩存
類名 | 說明 |
---|---|
redis.clients.jedis.JedisPoolConfig | 保存Jedis鏈接池配置信息 |
org.springframework.data.redis.connection.RedisNode | 保存Redis集羣節點信息 |
org.springframework.data.redis.connection.RedisClusterConfiguration | 保存Redis集羣配置信息 |
org.springframework.data.redis.connection.jedis.JedisConnectionFactory | Jedis鏈接工廠,負責建立JedisCluster集羣操做類,獲取Redis鏈接對象 |
org.springframework.data.redis.connection.jedis.JedisClusterConnection | 在JedisCluster基礎上實現,根據key類型使用具體的Jedis類與Redis進行交互 |
二、Redis集羣信息操做類:安全
類名 | 說明 |
---|---|
redis.clients.jedis.JedisCluster | 擴展了BinaryJedisCluster類,負責與Redis集羣進行String類型的key交互 |
redis.clients.jedis.BinaryJedisCluster | JedisCluster的父類,負責與Redis集羣進行byte[]類型的key交互 |
redis.clients.jedis.JedisSlotBasedConnectionHandler | JedisClusterConnectionHandler類的子類,負責根據key的slot值獲取Redis鏈接 |
redis.clients.jedis.JedisClusterConnectionHandler | 一個抽象類,負責初始化、重建、重置Redis slot槽緩存 |
redis.clients.jedis.JedisClusterInfoCache | Redis slot緩存類,負責保存、重建和自動發現Redis slot槽與集羣節點的關係 |
從上面的配置文件applicationContext-redis-cluster.xml中咱們聲明瞭JedisConnectionFactory這個類:服務器
<bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"> <constructor-arg ref="redisClusterConfiguration" /> <constructor-arg ref="jedisPoolConfig" /> </bean>
這個類是用來建立、管理和銷燬Jedis與Redis集羣的鏈接的。因爲咱們在Spring配置文件中聲明瞭這個類,所以當應用啓動時,Spring會自動加載該類,Jedis集羣信息初始化的動做也由此開始。該類初始化的方法代碼以下:app
public class JedisConnectionFactory implements InitializingBean, DisposableBean, RedisConnectionFactory { private JedisPoolConfig poolConfig = new JedisPoolConfig(); private RedisClusterConfiguration clusterConfig; public JedisConnectionFactory(RedisClusterConfiguration clusterConfig, JedisPoolConfig poolConfig) { this.clusterConfig = clusterConfig; this.poolConfig = poolConfig; } /* * (non-Javadoc) * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet() */ public void afterPropertiesSet() { if (shardInfo == null) { shardInfo = new JedisShardInfo(hostName, port); if (StringUtils.hasLength(password)) { shardInfo.setPassword(password); } if (timeout > 0) { setTimeoutOn(shardInfo, timeout); } } if (usePool && clusterConfig == null) { this.pool = createPool(); } //若是集羣配置信息不爲空,則建立JedisCluster對象 if (clusterConfig != null) { this.cluster = createCluster(); } } }
在上面的配置文件中,咱們使用構造函數注入的方式初始化了JedisConnectionFactory,因爲該類實現了InitializingBean接口,所以在它被初始化以後會調用afterPropertiesSet()方法,在該方法中會根據clusterConfig集羣配置信息是否爲空來建立JedisCluster對象。createCluster()代碼定義以下:ide
private JedisCluster createCluster() { JedisCluster cluster = createCluster(this.clusterConfig, this.poolConfig); this.clusterCommandExecutor = new ClusterCommandExecutor( new JedisClusterConnection.JedisClusterTopologyProvider(cluster), new JedisClusterConnection.JedisClusterNodeResourceProvider(cluster), EXCEPTION_TRANSLATION); return cluster; } /** * Creates {@link JedisCluster} for given {@link RedisClusterConfiguration} and {@link GenericObjectPoolConfig}. * * @param clusterConfig must not be {@literal null}. * @param poolConfig can be {@literal null}. * @return * @since 1.7 */ protected JedisCluster createCluster(RedisClusterConfiguration clusterConfig, GenericObjectPoolConfig poolConfig) { Assert.notNull(clusterConfig, "Cluster configuration must not be null!"); Set<HostAndPort> hostAndPort = new HashSet<HostAndPort>(); for (RedisNode node : clusterConfig.getClusterNodes()) { hostAndPort.add(new HostAndPort(node.getHost(), node.getPort())); } int redirects = clusterConfig.getMaxRedirects() != null ? clusterConfig.getMaxRedirects().intValue() : 5; return StringUtils.hasText(getPassword()) ? new JedisCluster(hostAndPort, timeout, timeout, redirects, password, poolConfig) : new JedisCluster(hostAndPort, timeout, redirects, poolConfig); }
上面的代碼調用了JedisCluster的構造函數來建立JedisCluster對象,JedisCluster使用super關鍵字調用父類的構造函數:
public JedisCluster(Set<HostAndPort> jedisClusterNode, int timeout, int maxAttempts, final GenericObjectPoolConfig poolConfig) { super(jedisClusterNode, timeout, maxAttempts, poolConfig); }
BinaryJedisCluster構造函數:
public BinaryJedisCluster(Set<HostAndPort> jedisClusterNode, int timeout, int maxAttempts, final GenericObjectPoolConfig poolConfig) { this.connectionHandler = new JedisSlotBasedConnectionHandler(jedisClusterNode, poolConfig, timeout); this.maxAttempts = maxAttempts; }
初始化流程到這裏,主要的部分就要浮出水面了。在BinaryJedisCluster類的構造函數中初始化了JedisSlotBasedConnectionHandler類,該類的出現說明Jedis要開始獲取Redis集羣的slot槽和Redis集羣節點信息了,該類也是使用super關鍵字調用父類構造函數來初始化的,它的父類JedisClusterConnectionHandler構造函數以下:
public JedisClusterConnectionHandler(Set<HostAndPort> nodes, final GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password) { this.cache = new JedisClusterInfoCache(poolConfig, connectionTimeout, soTimeout, password); //這裏是關鍵 initializeSlotsCache(nodes, poolConfig, password); }
JedisClusterConnectionHandler類的構造函數中建立了JedisClusterInfoCache對象,並調用initializeSlotsCache()方法對Redis集羣信息進行初始化。該類的主要方法以下:
public Jedis getConnectionFromNode(HostAndPort node) { return cache.setupNodeIfNotExist(node).getResource(); } public Map<String, JedisPool> getNodes() { return cache.getNodes(); } private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig, String password) { for (HostAndPort hostAndPort : startNodes) { Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort()); if (password != null) { jedis.auth(password); } try { cache.discoverClusterNodesAndSlots(jedis); break; } catch (JedisConnectionException e) { // try next nodes } finally { if (jedis != null) { jedis.close(); } } } } public void renewSlotCache() { cache.renewClusterSlots(null); } public void renewSlotCache(Jedis jedis) { cache.renewClusterSlots(jedis); } @Override public void close() { cache.reset(); }
能夠看到,該類主要仍是調用JedisClusterInfoCache對象的方法來完成slot的相關操做。所以咱們重點看一下JedisClusterInfoCache類。
JedisClusterInfoCache類主要負責發送cluster slots命令來獲取Redis集羣節點的槽和Redis集羣節點信信息,並將相應信息保存到Map緩存中。咱們使用redis-cli客戶端工具鏈接上任意一個Redis中的集羣節點,向Redis發送該命令以後,得到的結果以下:
127.0.0.1:6379> cluster slots 1) 1) (integer) 12288 2) (integer) 16383 3) 1) "127.0.0.1" 2) (integer) 6382 3) "65aea5fc4485bc7c0c3c4425fb3f500c562ee243" 4) 1) "127.0.0.1" 2) (integer) 6386 3) "4061e306b094e707b6f4a7c8cd8e82bd61155060" 2) 1) (integer) 4096 2) (integer) 8191 3) 1) "127.0.0.1" 2) (integer) 6380 3) "c6e1b3691b968b009357dcac3349afbcd557fd8c" 4) 1) "127.0.0.1" 2) (integer) 6384 3) "f915c7e6812a7d8fbe637c782ad261cd453022b2" 3) 1) (integer) 0 2) (integer) 4095 3) 1) "127.0.0.1" 2) (integer) 6379 3) "91bb43a956a04a9812e4d6950efebbb2e0f646fd" 4) 1) "127.0.0.1" 2) (integer) 6383 3) "c1d9d907f6905dd826dad774d127b75484ef8ea8" 4) 1) (integer) 8192 2) (integer) 12287 3) 1) "127.0.0.1" 2) (integer) 6381 3) "745936c1192bc1b136fd1f5df842bc1dd517ef36" 4) 1) "127.0.0.1" 2) (integer) 6385 3) "1c07bd8406156122eb4855d2e8b36e785e7901c7"
我如今本地的Redis集羣有八個節點,四個主節點,四個從節點,經過cluster slots命令的結果均可以清楚地看到這些節點信息。這個命令的每一組結果由四個部分組成:起始槽節點、終止槽節點、主節點IP和端口加節點ID、從節點IP和端口加節點ID。
在JedisClusterInfoCache類中,相關的源碼以下:
public class JedisClusterInfoCache { // 保存Redis集羣節點和節點鏈接池信息:key爲節點地址、value爲鏈接池 private final Map<String, JedisPool> nodes = new HashMap<String, JedisPool>(); // 保存Redis集羣節點槽和槽所在的主節點鏈接池信息:key爲節點槽、value爲鏈接池 private final Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>(); // 使用讀寫鎖保證nodes和slots兩個map的寫安全 private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); private final Lock r = rwl.readLock(); private final Lock w = rwl.writeLock(); // 重建緩存的標識變量,false爲未進行,true爲正在進行 private volatile boolean rediscovering; private final GenericObjectPoolConfig poolConfig; private int connectionTimeout; private int soTimeout; private String password; // 主節點索引位置標識,遍歷cluster slots結果時使用 private static final int MASTER_NODE_INDEX = 2; public JedisClusterInfoCache(final GenericObjectPoolConfig poolConfig, int timeout) { this(poolConfig, timeout, timeout, null); } public JedisClusterInfoCache(final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout, final String password) { this.poolConfig = poolConfig; this.connectionTimeout = connectionTimeout; this.soTimeout = soTimeout; this.password = password; } /** * 在jedis封裝的redis集羣節點信息上發送cluster slots命令,獲取全部集羣節點信息和槽信息 * * @param jedis */ public void discoverClusterNodesAndSlots(Jedis jedis) { w.lock();// 由當前線程得到寫鎖,在當前線程操做未結束以前,其餘線程只能等待 try { reset();// 重置nodes、slots兩個Map,釋放JedisPool鏈接池資源 List<Object> slots = jedis.clusterSlots();// 在redis集羣節點信息上發送cluster slots命令,獲取全部集羣節點信息和槽信息 // 遍歷slots集合,保存Redis集羣節點和節點鏈接池信息到nodes Map中,保存Redis集羣節點槽和槽所在的主節點鏈接池信息到slots Map中 for (Object slotInfoObj : slots) { List<Object> slotInfo = (List<Object>) slotInfoObj; if (slotInfo.size() <= MASTER_NODE_INDEX) { continue; } // 獲取槽節點集合 List<Integer> slotNums = getAssignedSlotArray(slotInfo); // hostInfos int size = slotInfo.size(); // 遍歷slots集合元素中的主從節點信息,保存Redis集羣節點和節點鏈接池信息到nodes Map中,保存Redis集羣節點槽和槽所在的主節點鏈接池信息到slots Map中 for (int i = MASTER_NODE_INDEX; i < size; i++) { List<Object> hostInfos = (List<Object>) slotInfo.get(i); if (hostInfos.size() <= 0) { continue; } // 獲取集羣節點的服務器地址和端口 HostAndPort targetNode = generateHostAndPort(hostInfos); // 保存Redis集羣節點和節點鏈接池信息到nodes Map中 setupNodeIfNotExist(targetNode); // 若是當前遍歷的是主節點信息,則保存Redis集羣節點槽和槽所在的主節點鏈接池信息到slots Map中 if (i == MASTER_NODE_INDEX) { assignSlotsToNode(slotNums, targetNode); } } } } finally { w.unlock();// 釋放寫鎖,使其餘線程使用 } } /** * 重建Cluster集羣節點和Slot槽緩存 * * @param jedis */ public void renewClusterSlots(Jedis jedis) { // 若是重建操做未進行,則開始重建緩存操做 if (!rediscovering) { try { w.lock(); rediscovering = true;// 設重建緩存標識變量的值爲true,表示重建操做正在進行 // 若是封裝redis鏈接信息的jedis對象不爲空,則使用該節點進行重建緩存操做並返回 if (jedis != null) { try { discoverClusterSlots(jedis); return; } catch (JedisException e) { // try nodes from all pools } } // 若是封裝redis鏈接信息的jedis對象爲空,則打亂nodes Map中保存的jedis鏈接池信息,遍歷鏈接池中的節點進行重建緩存操做並返回 for (JedisPool jp : getShuffledNodesPool()) { try { jedis = jp.getResource(); discoverClusterSlots(jedis); return; } catch (JedisConnectionException e) { // try next nodes } finally { if (jedis != null) { jedis.close(); } } } } finally { rediscovering = false;// 設重建緩存標識變量的值爲false,表示重建操做未進行 w.unlock(); } } } /** * 邏輯相似discoverClusterNodesAndSlots方法 * * @param jedis */ private void discoverClusterSlots(Jedis jedis) { List<Object> slots = jedis.clusterSlots(); this.slots.clear(); for (Object slotInfoObj : slots) { List<Object> slotInfo = (List<Object>) slotInfoObj; if (slotInfo.size() <= MASTER_NODE_INDEX) { continue; } List<Integer> slotNums = getAssignedSlotArray(slotInfo); // hostInfos List<Object> hostInfos = (List<Object>) slotInfo.get(MASTER_NODE_INDEX); if (hostInfos.isEmpty()) { continue; } // at this time, we just use master, discard slave information HostAndPort targetNode = generateHostAndPort(hostInfos); assignSlotsToNode(slotNums, targetNode); } } private HostAndPort generateHostAndPort(List<Object> hostInfos) { return new HostAndPort(SafeEncoder.encode((byte[]) hostInfos.get(0)), ((Long) hostInfos.get(1)).intValue()); } /** * 保存Redis集羣節點和節點鏈接池信息到nodes Map中 * * @param node * @return */ public JedisPool setupNodeIfNotExist(HostAndPort node) { w.lock(); try { // 獲取節點key,形式爲"服務器地址:端口" String nodeKey = getNodeKey(node); // 若是節點已存在nodes Map中,則直接返回 JedisPool existingPool = nodes.get(nodeKey); if (existingPool != null) return existingPool; // 建立節點相應的JedisPool鏈接池對象,並保存到nodes Map中,而後返回JedisPool鏈接池對象 JedisPool nodePool = new JedisPool(poolConfig, node.getHost(), node.getPort(), connectionTimeout, soTimeout, password, 0, null, false, null, null, null); nodes.put(nodeKey, nodePool); return nodePool; } finally { w.unlock(); } } /** * 遍歷槽集合,保存Redis集羣節點槽和槽所在的主節點鏈接池信息到slots Map中 * * @param targetSlots * @param targetNode */ public void assignSlotsToNode(List<Integer> targetSlots, HostAndPort targetNode) { w.lock(); try { JedisPool targetPool = setupNodeIfNotExist(targetNode); for (Integer slot : targetSlots) { slots.put(slot, targetPool); } } finally { w.unlock(); } } /** * 根據節點key獲取JedisPool鏈接池對象 * * @param nodeKey * @return */ public JedisPool getNode(String nodeKey) { r.lock(); try { return nodes.get(nodeKey); } finally { r.unlock(); } } /** * 根據slot槽值獲取JedisPool鏈接池對象 * * @param slot * @return */ public JedisPool getSlotPool(int slot) { r.lock(); try { return slots.get(slot); } finally { r.unlock(); } } /** * 獲取節點信息和節點對象對應的鏈接池信息 * * @return */ public Map<String, JedisPool> getNodes() { r.lock(); try { return new HashMap<String, JedisPool>(nodes); } finally { r.unlock(); } } /** * 獲取nodes Map打亂順序後的Redis集羣節點鏈接池信息 * * @return */ public List<JedisPool> getShuffledNodesPool() { r.lock(); try { List<JedisPool> pools = new ArrayList<JedisPool>(nodes.values()); Collections.shuffle(pools); return pools; } finally { r.unlock(); } } /** * 清空集羣節點集合和槽集合,釋放JedisPool資源 */ public void reset() { w.lock(); try { for (JedisPool pool : nodes.values()) { try { if (pool != null) { pool.destroy(); } } catch (Exception e) { // pass } } nodes.clear(); slots.clear(); } finally { w.unlock(); } } public static String getNodeKey(HostAndPort hnp) { return hnp.getHost() + ":" + hnp.getPort(); } /** * 遍歷槽區間,獲取槽節點集合 * * @param slotInfo * @return */ private List<Integer> getAssignedSlotArray(List<Object> slotInfo) { List<Integer> slotNums = new ArrayList<Integer>(); for (int slot = ((Long) slotInfo.get(0)).intValue(); slot <= ((Long) slotInfo.get(1)).intValue(); slot++) { slotNums.add(slot); } return slotNums; } }
Jedis初始化Redis集羣信息時,先使用JedisConnectionFactory獲取JedisCluster對象,再根據JedisCluster去逐步引出JedisClusterInfoCache對象完成Redis集羣信息的獲取。在這個類中,主要有如下幾點:
下一篇文章剖析Jedis cluster命令執行流程。