咱們都知道redis集羣下對於mget、mset、pipeline、事務的支持不太好。html
固然對於mget和mset有這麼幾種方法:node
一、串行遍歷key依次執行(這種就是把批量拆開了)redis
二、使用hash_tag包裝key,在計算key的slot時候,若是key包含{},就會使用第一個{}內部的字符串做爲hash key,這樣就能夠保證擁有一樣{}內部字符串的key就會擁有相同slot(這種方式其實就是把一次批量操做的key所有放到了集羣的一個節點進行操做,屏蔽掉多節點的問題)。算法
三、本身手動進行批量的key作處理,經過CRC16算法對全部的key進行分組(相同slot的分紅一組),而後不一樣的分組keys,使用不一樣的集羣節點進行處理。spring
本文就是適用了第三種方式,經過 pipeline來操做批處理,減小網絡請求次數,加快處理速度。segmentfault
使用jedis封裝的工具類,源碼也是分析的jedis。緩存
對於jedis,集羣的操做使用的JedisCluster類,看下它的繼承實現關係圖:網絡
經過繼承實現管理能夠看到,JedisCluster繼承自 BinaryJedisCluster ,以及實現了其餘接口。咱們再查看下 BinaryJedisCluster源碼。mybatis
在圖中咱們注意到:JedisClusterConnectionHandler 這個類,字面意思redis集羣鏈接處理器,同時這個connectionHandler變量在此類中尚未公共的獲取方法。咱們再日後看,咱們進去到JedisClusterConnectionHandler 源碼看下。app
這個類中有一個內部變量JedisClusterInfoCache cache,看着字面意思是Redis集羣信息緩存,可是JedisClusterConnectionHandler中沒有獲取cache的公共方法,往下看下JedisClusterConnectionHandler中的方法,initializeSlotsCache()
這個變量中存儲了全部的redis集羣節點信息(包含了host和端口),那這裏面有沒有集羣的其餘信息呢?好比slots哈希槽數據,咱們再看下JedisClusterInfoCache的源碼。
看到這裏相信你已經明白了,就是這個cache存儲了redis集羣節點數據和哈希槽對應的節點關係數據,同時這倆變量仍是私有的,往下看下源碼有沒有直接獲取這倆變量的方法呢,答案是看下圖。
只有獲取節點信息(host+ip對應的鏈接池數據集合)的方法,沒有獲取哈希槽對應的節點數據的方法。可是有經過slot獲取節點鏈接池的方法,這個也是jedis中JedisCluster集羣操做類能夠處理集羣操做的關鍵。咱們經過getSlotPool(int slot)方法內部實現能夠知道,slots集合中存儲的關係就是slot對應redis集羣節點鏈接池,這裏就是咱們後面實現集羣下操做的關鍵,先記一下。
反射的知識請自行百度下或者看下其餘人的博客,簡單說,它很強大,能夠經過它獲取這個類或對象中的任何東西,包含私有變量、方法。
直接看下代碼實現:
String clusterNodes="172.16.16.90:16379,172.16.16.90:16380,172.16.16.91:16379,172.16.16.91:16380,172.16.16.92:16379,172.16.16.92:16380"; int redirects=3; int timeOut=2000; String[] serverArray = clusterNodes.split(","); Set<HostAndPort> nodes = new HashSet<HostAndPort>(); for (String ipPort : serverArray) { String[] ipPortPair = ipPort.split(":"); nodes.add(new HostAndPort(ipPortPair[0].trim(), Integer.valueOf(ipPortPair[1].trim()))); } JedisCluster cluster=new JedisCluster(nodes, timeOut, redirects); Field hfield = cluster.getClass().getDeclaredField("connectionHandler");//獲的變量名爲connectionHandler的變量 hfield.setAccessible(true);//打開訪問權限 JedisClusterConnectionHandler connectionHandler = (JedisClusterConnectionHandler)hfield.get(cluster); Field cfield = connectionHandler.getClass().getDeclaredField("cache");//獲的變量名爲cache的變量 cfield.setAccessible(true);//打開訪問權限 JedisClusterInfoCache cache = (JedisClusterInfoCache)cfield.get(connectionHandler); //獲取ip+port對應的鏈接池 Map<String, JedisPool> nodes2 = cache.getNodes();//這個咱們沒怎麼用到 Field field = cache.getClass().getDeclaredField("slots");//獲的變量名爲slots的變量 field.setAccessible(true);//打開訪問權限 //獲取slot對應的鏈接池 Map<Integer, JedisPool> slots=(Map<Integer, JedisPool>)field.get(cache);
可能有人要問:connectionHandler不是屬於BinaryJedisCluster,由於JedisCluster繼承自BinaryJedisCluster,因此一樣能夠獲取到它的內部變量。
若是你的項目框架用的spring(spring boot)+mybatis的話,那更簡單,直接使用mybatis封裝的反射工具操做更方便。
//前面跟上面同樣的,就省略了 //經過Mybatis的反射工具實現 MetaObject metaObject = SystemMetaObject.forObject(cluster); JedisClusterInfoCache cache = (JedisClusterInfoCache) metaObject.getValue("connectionHandler.cache"); //獲取ip+port對應的鏈接池 Map<String, JedisPool> nodes2 = cache.getNodes(); //經過反射獲取JedisClusterInfoCache中的slots MetaObject meta = SystemMetaObject.forObject(cache); //獲取slot對應的鏈接池 Map<Integer, JedisPool> slots=(Map<Integer, JedisPool>)meta.getValue("slots");
是否是省了不少的代碼?
答案在spring的JedisConnectionFactory類中。
spring下配置redis集羣是確定要使用JedisConnectionFactory的,那麼能夠經過這個鏈接工廠類得到JedisCluster,可是咱們看到,它又是私有的,同時這個類中的公共方法也沒有直接獲取的。那隻能又得經過反射方式獲取了。
@Configuration @EnableCaching public class RedisCacheConfig extends CachingConfigurerSupport{ protected final static Logger log = LoggerFactory.getLogger(RedisCacheConfig.class); private volatile JedisConnectionFactory jedisConnectionFactory; private volatile RedisTemplate<String, Object> redisTemplate; private volatile RedisCacheManager cacheManager; public RedisCacheConfig() { super(); } public RedisCacheConfig(JedisConnectionFactory jedisConnectionFactory, RedisTemplate<String, Object> redisTemplate, RedisCacheManager cacheManager) { super(); this.jedisConnectionFactory = jedisConnectionFactory; this.redisTemplate = redisTemplate; this.cacheManager = cacheManager; } public JedisConnectionFactory redisConnectionFactory() { return jedisConnectionFactory; } public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory jedisConnectionFactory) { return redisTemplate; } public CacheManager cacheManager(RedisTemplate<?, ?> redisTemplate) { return cacheManager; } //經過反射獲取spring管理的JedisCluster對象 @Bean public JedisCluster jedisCluster(){ MetaObject metaObject = SystemMetaObject.forObject(redisConnectionFactory()); return (JedisCluster)metaObject.getValue("cluster"); } @Bean public KeyGenerator keyGenerator() { return new KeyGenerator() { @Override public Object generate(Object target, Method method, Object... params) { //規定 本類名+方法名+參數名 爲key StringBuilder sb = new StringBuilder(); sb.append(target.getClass().getName()+":"); sb.append(method.getName()+":"); for (Object obj : params) { sb.append(obj.toString()+","); } sb.deleteCharAt(sb.length() - 1); return sb.toString(); } }; } }
咱們能夠直接經過設置一個@Configuration配置類,在這裏面注入jedisConnectionFactory對象,再經過它得到JedisCluster,並設置一個bean供後面使用。像這樣:
@Configuration public class RedisConfig { @Autowired private JedisConnectionFactory jedisConnectionFactory; //經過反射獲取spring管理的JedisCluster對象 @Bean public JedisCluster jedisCluster(){ MetaObject metaObject = SystemMetaObject.forObject(jedisConnectionFactory); return (JedisCluster)metaObject.getValue("cluster"); } }
獲取到了JedisCluster對象,下面就能夠直接進行集羣下pipeline實現各類集羣沒法實現的mget、mset等等。
再次說下原理:經過CRC16算法求出全部須要操做的key對應的slot,再經過slot獲取到對應的節點鏈接池,以鏈接池進行slot分組,進而對相同鏈接池的key劃分到一個組中,而後只須要對相同鏈接池的key集合進行批量操做就能夠了,至關於一個節點下批量操做,同時又使用了pipeline減小了請求,合併了屢次請求,加快了處理速度。
@Component public class JedisClusterUtil implements InitializingBean{ @Autowired private JedisCluster jedisCluster; //存放每一個節點對應的鏈接池 <host+ip , JedisPool> private Map<String, JedisPool> nodes ; //存放每一個哈希槽(slot)對應的鏈接池<slot , JedisPool> private Map<Integer, JedisPool> slots ; @Override public void afterPropertiesSet() throws Exception { // TODO 屬性賦值以後執行 //從而獲取slot和JedisPool直接的映射,經過Ibatis的反射工具實現 MetaObject metaObject = SystemMetaObject.forObject(jedisCluster); //獲取到JedisClusterInfoCache 對象後,在進行批量操做時,就能夠根據key計算其slot值,獲得對應的JedisPool,對key進行分類,而後以pipeline的方式獲取值。 JedisClusterInfoCache cache = (JedisClusterInfoCache) metaObject.getValue("connectionHandler.cache"); nodes=cache.getNodes(); //經過反射獲取JedisClusterInfoCache中的slots MetaObject meta = SystemMetaObject.forObject(cache); slots=(Map<Integer, JedisPool>)meta.getValue("slots"); } /** * * * @Title: mget * @Description: 批量獲取 * @param @param keys * @param @return 設定文件 * @return List<Object> 返回類型 * @throws */ public List<Object> mget(List<String> keys){ List<Object> resList = new ArrayList<>(); if (keys == null || keys.isEmpty()) { return resList; } if (keys.size() == 1) { resList.add(jedisCluster.get(keys.get(0))); return resList; } /*放key大於1時*/ //緩存線程池對應執行的key集合 Map<JedisPool, List<String>> jedisPoolMap = getPoolMap(keys); List<String> realKeys=null; JedisPool currentJedisPool = null; Pipeline currentPipeline = null; //接收pipline結果 List<Object> res = new ArrayList<Object>(); //接收key對應的結果 Map<String, Object> resultMap = new HashMap<String, Object>(); for (Map.Entry<JedisPool, List<String>> entry : jedisPoolMap.entrySet()) { //得到鏈接池 currentJedisPool=entry.getKey(); //得到本鏈接池對應的key集合 realKeys=entry.getValue(); Jedis jedis =null; try { //獲取pipeline jedis = currentJedisPool.getResource(); currentPipeline = jedis.pipelined(); //這裏不能用mget不然將沒法與key相對應了。 // currentPipeline.mget(realKeys.toArray(new String[realKeys.size()])); for (String key : realKeys) { currentPipeline.get(key); } //從pipeline中獲取結果 res = currentPipeline.syncAndReturnAll(); // currentPipeline.close(); for (int i = 0; i < realKeys.size(); i++) { if (null == res.get(i)) { resultMap.put(realKeys.get(i), null); } else { resultMap.put(realKeys.get(i), res.get(i)); } } } finally { realKeys=null; if(currentPipeline!=null){ try { currentPipeline.close(); } catch (IOException e) { e.printStackTrace(); }finally{ currentPipeline=null;//help GC } } if(jedis!=null){ jedis.close();//歸還鏈接 } } } resList = sortList(keys, resultMap); return resList; } /** * * * @Title: mset * @Description: 批量添加 * @param @param map 設定文件 * @return void 返回類型 * @throws */ public void mset(Map<String, String> map){ if (map == null || map.isEmpty()) { return ; } if (map.size() == 1) { for (Map.Entry<String, String> entry : map.entrySet()) { jedisCluster.set(entry.getKey(), entry.getValue()); } return ; } //當內部數據大於1時 Map<JedisPool, List<String>> jedisPoolMap = getPoolMap(new ArrayList<String>(map.keySet())); List<String> realKeys=null; JedisPool currentJedisPool = null; Pipeline currentPipeline = null; for (Map.Entry<JedisPool, List<String>> entry : jedisPoolMap.entrySet()) { //得到鏈接池 currentJedisPool=entry.getKey(); //得到本鏈接池對應的key集合 realKeys=entry.getValue(); Jedis jedis =null; try { //獲取pipeline jedis = currentJedisPool.getResource(); currentPipeline = jedis.pipelined(); for (String key : realKeys) { currentPipeline.set(key, map.get(key)); } //pipeline執行 currentPipeline.sync(); } finally { realKeys=null; if(currentPipeline!=null){ try { currentPipeline.close(); } catch (IOException e) { e.printStackTrace(); }finally{ currentPipeline=null;//help GC } } if(jedis!=null){ jedis.close();//歸還鏈接 } } } } /** * * * @Title: hmset * @Description: hash數據批量插入 * @param @param hmap 設定文件 * @return void 返回類型 * @throws */ public void hmset(Map<String,Map<String,String>> hmap){ if (hmap == null || hmap.isEmpty()) { return ; } if (hmap.size() == 1) { for (Map.Entry<String, Map<String,String>> entry : hmap.entrySet()) { jedisCluster.hmset(entry.getKey(), entry.getValue()); } return ; } //當內部數據大於1時 Map<JedisPool, List<String>> jedisPoolMap = getPoolMap(new ArrayList<String>(hmap.keySet())); List<String> realKeys=null; JedisPool currentJedisPool = null; Pipeline currentPipeline = null; for (Map.Entry<JedisPool, List<String>> entry : jedisPoolMap.entrySet()) { //得到鏈接池 currentJedisPool=entry.getKey(); //得到本鏈接池對應的key集合 realKeys=entry.getValue(); Jedis jedis =null; try { //獲取pipeline jedis = currentJedisPool.getResource(); currentPipeline = jedis.pipelined(); for (String key : realKeys) { currentPipeline.hmset(key, hmap.get(key)); } //pipeline執行 currentPipeline.sync(); } finally { realKeys=null; if(currentPipeline!=null){ try { currentPipeline.close(); } catch (IOException e) { e.printStackTrace(); }finally{ currentPipeline=null;//help GC } } if(jedis!=null){ jedis.close();//歸還鏈接 } } } } /** * * * @Title: smadd * @Description: set批量插入 * @param @param smap 設定文件 * @return void 返回類型 * @throws */ public void smadd(Map<String,Set<String>> smap){ if (smap == null || smap.isEmpty()) { return ; } if (smap.size() == 1) { for (Map.Entry<String, Set<String>> entry : smap.entrySet()) { Set<String> value = entry.getValue(); String[] values=new String[value.size()]; value.toArray(values); jedisCluster.sadd(entry.getKey(), values); } return ; } //當內部數據大於1時 Map<JedisPool, List<String>> jedisPoolMap = getPoolMap(new ArrayList<String>(smap.keySet())); List<String> realKeys=null; JedisPool currentJedisPool = null; Pipeline currentPipeline = null; for (Map.Entry<JedisPool, List<String>> entry : jedisPoolMap.entrySet()) { //得到鏈接池 currentJedisPool=entry.getKey(); //得到本鏈接池對應的key集合 realKeys=entry.getValue(); Jedis jedis =null; try { //獲取pipeline jedis = currentJedisPool.getResource(); currentPipeline = jedis.pipelined(); for (String key : realKeys) { Set<String> value = smap.get(key); String[] values=new String[value.size()]; value.toArray(values); currentPipeline.sadd(key, values); } //pipeline執行 currentPipeline.sync(); } finally { realKeys=null; if(currentPipeline!=null){ try { currentPipeline.close(); } catch (IOException e) { e.printStackTrace(); }finally{ currentPipeline=null;//help GC } } if(jedis!=null){ jedis.close();//歸還鏈接 } } } } /** * * * @Title: getPoolMap * @Description: 獲取鏈接池對應的操做key的集合 * @param @param keys * @param @return 設定文件 * @return Map<JedisPool,List<String>> 返回類型 * @throws */ public Map<JedisPool, List<String>> getPoolMap(List<String> keys){ //緩存線程池對應執行的key集合 Map<JedisPool, List<String>> jedisPoolMap = new HashMap<JedisPool, List<String>>(); JedisPool currentJedisPool = null; List<String> keyList; for (String key : keys) { //計算哈希槽 int crc = JedisClusterCRC16.getSlot(key); //經過哈希槽獲取節點的鏈接 currentJedisPool = slots.get(crc); /** * 因爲JedisPool做爲value保存在JedisClusterInfoCache中的一個map對象中,每一個節點的 * JedisPool在map的初始化階段就是肯定的和惟一的,因此獲取到的每一個節點的JedisPool都是同樣 * 的,能夠做爲map的key * */ if (jedisPoolMap.containsKey(currentJedisPool)) { jedisPoolMap.get(currentJedisPool).add(key); } else { keyList = new ArrayList<String>(); keyList.add(key); jedisPoolMap.put(currentJedisPool, keyList); } } return jedisPoolMap; } private List<Object> sortList(List<String> keys, Map<String, Object> params) { List<Object> resultList = new ArrayList<>(); Iterator<String> it = keys.iterator(); while (it.hasNext()) { String key = it.next(); resultList.add(params.get(key)); } return resultList; } public Map<String, JedisPool> getNodes() { return nodes; } public Map<Integer, JedisPool> getSlots() { return slots; } }
定義一個spring組件,實現InitializingBean接口,重寫afterPropertiesSet()方法,這樣作的目的就是,在bean初始化後,屬性賦值以後執行slots的賦值,這樣能夠全局使用此集合。相信看代碼能夠明白細節。
留下兩個問題:
一、若是redis集羣節點出現增長或者刪除,或者主從節點的變更該如何處理呢?
二、spring-data-redis的RedisTemplate在集羣下操做單個key時能夠直接使用,可是若是用RedisTemplate添加一條數據,用jedis的JedisCluster讀取一條數據,肯能會存在序列化和反序列化問題,這個該如何處理呢?
能夠思考下。
redis集羣操做:https://www.cnblogs.com/tony-zt/p/10185660.html
jedis cluster源碼學習:https://blog.csdn.net/sinat_36553913/article/details/90342053
https://blog.csdn.net/sinat_36553913/article/details/90551403
https://segmentfault.com/a/1190000013535955
https://www.jianshu.com/p/5ca98b5a336b