前言java
redis的pipeline能夠一次性發送多個命令去執行,在執行大量命令時,能夠減小網絡通訊次數提升效率。可是很惋惜,redis的集羣並不支持pipeline語法(只是不提供相應的方法而已)。不過只要稍稍看下jedis的源碼,就能夠發現雖然沒有現成的輪子,可是卻很好造。node
1、簡介redis
先說下redis集羣的簡單結構和數據的定位規則(見下圖)。redis提供了16384個槽點,併爲每一個節點分配若干槽位,操做redis數據時會根據key進行hash,而後找到對應的節點進行操做,這也解釋了爲何jedisCluster不支持pipeline。由於pipeline中若干個須要操做的key可能位於不一樣的分片,若是想要獲取數據就必須進行一次請求的轉發(可能這個詞不標準,可是好理解,或者稱之爲漂移吧),這與pipeline爲了減小網絡通訊次數的本意衝突。那咱們只要根據key進行hash運算,而後再根據hash值獲取鏈接,接着按照鏈接對全部的key進行分組,保證同一pipeline內全部的key都對應一個節點就行了,最後經過pipeline執行。試試吧,萬一好使了呢算法
2、思路apache
既然知道了緣由和流程,咱們就試下能不能造輪子吧。首先咱們須要hash算法以及根據hash結果獲取JedisPool的方法,很不巧的是,jedis都提供了。網絡
爲了實現上面的功能,咱們須要一個類和兩個屬性,類是JedisClusterCRC16(hash算法),兩個屬性分別是connectionHandler(用於獲取cache)和cache(根據hash值獲取鏈接)測試
有興趣的同窗能夠看下JedisCluster的源碼,它集成自BinaryJedisCluster,在BinaryJedisCluster有個connectionHandler屬性,恰巧它又是protected修飾的,這不分明就是讓你繼承麼。而cache屬性在JedisClusterConnectionHandler中,注意這個類是抽象類,輕易的重寫會致使整個功能都不能用了(若是內部實現不是很瞭解,繼承每每優於重寫,由於能夠super嘛),咱們發現它有一個實現類JedisSlotBasedConnectionHandler,那咱們繼承這個類就行了。詳細的設計以下圖:this
3、實現spa
import lombok.extern.slf4j.Slf4j; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import redis.clients.jedis.*; import redis.clients.util.JedisClusterCRC16; /** * @author zhangyining on 18/12/1 001. */ @Slf4j public class JedisPipelineCluster extends JedisCluster { static { cluster = init(); } private static JedisPipelineCluster cluster; public static JedisPipelineCluster getCluster(){ return cluster; } private static JedisPipelineCluster init(){ //todo 鏈接代碼省略... return jedisCluster; } public JedisPool getJedisPoolFromSlot(String redisKey) { return getConnectionHandler().getJedisPoolFromSlot(redisKey); } private JedisPipelineCluster(Set<HostAndPort> jedisClusterNode, int timeout, int maxAttempts,final GenericObjectPoolConfig poolConfig){ super(jedisClusterNode, timeout, maxAttempts, poolConfig); //繼承能夠添加個性化的方法,仔細看構造方法其實和父類是同樣的 connectionHandler = new JedisSlotAdvancedConnectionHandler(jedisClusterNode, poolConfig, timeout, timeout); } private JedisSlotAdvancedConnectionHandler getConnectionHandler() { return (JedisSlotAdvancedConnectionHandler)this.connectionHandler; } private class JedisSlotAdvancedConnectionHandler extends JedisSlotBasedConnectionHandler { private JedisSlotAdvancedConnectionHandler(Set<HostAndPort> nodes, GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout) { super(nodes, poolConfig, connectionTimeout, soTimeout); } private JedisPool getJedisPoolFromSlot(String redisKey) { int slot = JedisClusterCRC16.getSlot(redisKey); JedisPool connectionPool = cache.getSlotPool(slot); if (connectionPool != null) { return connectionPool; } else { renewSlotCache(); connectionPool = cache.getSlotPool(slot); if (connectionPool != null) { return connectionPool; } else { throw new RuntimeException("No reachable node in cluster for slot " + slot); } } } } }
4、測試設計
import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.Pipeline; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /** * @author zhangyining on 18/12/1 001. */ public class Test { public static void main(String[] args) { JedisPipelineCluster cluster = JedisPipelineCluster.getCluster(); String[] testKeys = {"USER_FEAT_10013425884935", "USER_FEAT_10006864229638", "USER_FEAT_10008005187846"}; Map<JedisPool, List<String>> poolKeys = new HashMap<>(); for (String key : testKeys) { JedisPool jedisPool = cluster.getJedisPoolFromSlot(key); if (poolKeys.keySet().contains(jedisPool)){ List<String> keys = poolKeys.get(jedisPool); keys.add(key); }else { List<String> keys = new ArrayList<>(); keys.add(key); poolKeys.put(jedisPool, keys); } } for (JedisPool jedisPool : poolKeys.keySet()) { Jedis jedis = jedisPool.getResource(); Pipeline pipeline = jedis.pipelined(); List<String> keys = poolKeys.get(jedisPool); keys.forEach(key ->pipeline.get(key)); List result = pipeline.syncAndReturnAll(); System.out.println(result); jedis.close(); } } }
5、總結
以前看到過有人經過反射來獲取connectionHandler和cache屬性的,我的以爲反射雖然強大,可是明明能夠繼承卻反射,有點怪怪的,看我的習慣吧。總之無論是那種方式,重點是熟悉redis集羣是怎麼分配數據以及執行請求的,剩下的不過是用不一樣的地方話(不用方式的代碼)說出來而已