摘要: 引言 瞭解Jedis的童鞋可能清楚,Jedis中JedisCluster是不支持pipeline操做的,若是使用了redis集羣,在spring-boot-starter-data-redis中又正好用到的pipeline,那麼會接收到Pipeline is currently not supported for JedisClusterConnection.這樣的報錯。html
瞭解Jedis的童鞋可能清楚,Jedis中JedisCluster
是不支持pipeline操做的,若是使用了redis集羣,在spring-boot-starter-data-redis
中又正好用到的pipeline,那麼會接收到Pipeline is currently not supported for JedisClusterConnection.
這樣的報錯。錯誤來自於org.springframework.data.redis.connection.jedis.JedisClusterConnection
:git
/* * (non-Javadoc) * @see org.springframework.data.redis.connection.RedisConnection#openPipeline() */ @Override public void openPipeline() { throw new UnsupportedOperationException("Pipeline is currently not supported for JedisClusterConnection."); }
org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration
會幫咱們自動配置,不管你redis使用的是standalone、sentinel、cluster配置。這個源碼很容易理解,讀者可自行閱讀,不理解的能夠一塊兒討論。github
spring boot 2.0開始,配置spring-boot-starter-data-redis
將不依賴Jedis,而是依賴Lettuce,在Lettuce中,redis cluster使用pipeline不會有問題。redis
再往下看可能須要讀者具有以下的能力:spring
redis cluster一共有16384個桶(hash slot),用來裝數據,創建集羣的時候每一個集羣節點會負責一些slot的數據存儲,好比我負責0-1000,你負責1001-2000,他負責2001-3000……
數據存儲時,每一個key在存入redis cluster前,會利用CRC16計算出一個值,這個值就是對應redis cluster的hash slot,就知道這個key會被放到哪一個服務器上了。服務器
JedisCluster本質上是使用Jedis來和redis集羣進行打交道的,具體過程是:spring-boot
JedisClusterCRC16.getSlot(key)
JedisClusterConnectionHandler
實例中獲取到該slot對應的Jedis
實例:Jedis connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
redis提供了mset,hmset之類的命令,或者說集合操做可使用sadd key 1 2 3 4 5 6 ..... 10000000000這種一口氣傳一堆數據的命令。
有時候你甚至會發現*mset這種一口氣操做一堆數據的速度更快。那麼這種使用場景會有什麼弊端呢?答案是:阻塞。
操做這一堆數據須要多久,就會阻塞多久。spa
因爲JedisCluster中的全部操做本質上是使用Jedis,而Jedis是支持pipeline操做的,全部,要在redis cluster中使用pipeline是有可能的,只要你操做同一個鍵便可,準確的說,應該是你操做的鍵位於同一臺服務器,更直白的,你操做的鍵是同一個Jedis實例。ok,若是你已經暈了,那你須要回看一下「知識儲備」。
說說筆者的使用場景吧,咱們是把csv文件的一批數據讀到內存中,同一批數據是存儲到同一個key中的,最後的操做會相似於:code
set key member1 set key member2 set key member3 ... set key member100000
操做的是同一個key,能夠利用JedisCluster獲取到該key的Jedis實例,而後利用pipeline操做。
提供一下代碼思路。
RedisConnectionFactory factory = redisTemplate.getConnectionFactory(); RedisConnection redisConnection = factory.getConnection(); JedisClusterConnection jedisClusterConnection = (JedisClusterConnection) redisConnection; // 獲取到原始到JedisCluster鏈接 JedisCluster jedisCluster = jedisClusterConnection.getNativeConnection(); // 經過key獲取到具體的Jedis實例 // 計算hash slot,根據特定的slot能夠獲取到特定的Jedis實例 int slot = JedisClusterCRC16.getSlot(key); /** * 不建議這麼使用,官方在2.10版本已經修復<a href="https://github.com/xetorthio/jedis/pull/1532">此問題</a><br> * 2.10版本中,官方會直接提供JedisCluster#getConnectionFromSlot */ Field field = ReflectionUtils.findField(BinaryJedisCluster.class, null, JedisClusterConnectionHandler.class); field.setAccessible(true); JedisSlotBasedConnectionHandler jedisClusterConnectionHandler = (JedisSlotBasedConnectionHandler) field.get(jedisCluster); Jedis jedis = jedisClusterConnectionHandler.getConnectionFromSlot(slot); // 接下來就是pipeline操做了 Pipeline pipeline = jedis.pipelined(); ... pipeline.syncAndReturnAll();
以上代碼徹底能夠模仿spring-data-redis中RedisTemplate#executePipelined
方法寫成一個通用的方法,供使用者調用。