redis集羣使用pipeline

前言java

redis的pipeline能夠一次性發送多個命令去執行,在執行大量命令時,能夠減小網絡通訊次數提升效率。可是很惋惜,redis的集羣並不支持pipeline語法(只是不提供相應的方法而已)。不過只要稍稍看下jedis的源碼,就能夠發現雖然沒有現成的輪子,可是卻很好造。node

1、簡介redis

先說下redis集羣的簡單結構和數據的定位規則(見下圖)。redis提供了16384個槽點,併爲每一個節點分配若干槽位,操做redis數據時會根據key進行hash,而後找到對應的節點進行操做,這也解釋了爲何jedisCluster不支持pipeline。由於pipeline中若干個須要操做的key可能位於不一樣的分片,若是想要獲取數據就必須進行一次請求的轉發(可能這個詞不標準,可是好理解,或者稱之爲漂移吧),這與pipeline爲了減小網絡通訊次數的本意衝突。那咱們只要根據key進行hash運算,而後再根據hash值獲取鏈接,接着按照鏈接對全部的key進行分組,保證同一pipeline內全部的key都對應一個節點就行了,最後經過pipeline執行。試試吧,萬一好使了呢算法

2、思路apache

既然知道了緣由和流程,咱們就試下能不能造輪子吧。首先咱們須要hash算法以及根據hash結果獲取JedisPool的方法,很不巧的是,jedis都提供了。網絡

爲了實現上面的功能,咱們須要一個類和兩個屬性,類是JedisClusterCRC16(hash算法),兩個屬性分別是connectionHandler(用於獲取cache)和cache(根據hash值獲取鏈接)測試

有興趣的同窗能夠看下JedisCluster的源碼,它集成自BinaryJedisCluster,在BinaryJedisCluster有個connectionHandler屬性,恰巧它又是protected修飾的,這不分明就是讓你繼承麼。而cache屬性在JedisClusterConnectionHandler中,注意這個類是抽象類,輕易的重寫會致使整個功能都不能用了(若是內部實現不是很瞭解,繼承每每優於重寫,由於能夠super嘛),咱們發現它有一個實現類JedisSlotBasedConnectionHandler,那咱們繼承這個類就行了。詳細的設計以下圖:this

3、實現spa

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.*;
import redis.clients.util.JedisClusterCRC16;

/**
 * @author zhangyining on 18/12/1 001.
 */
@Slf4j
public class JedisPipelineCluster extends JedisCluster {

    static {
        cluster = init();
    }

    private static JedisPipelineCluster cluster;

    public static JedisPipelineCluster getCluster(){
        return cluster;
    }
    private static JedisPipelineCluster init(){
        //todo 鏈接代碼省略...
        return jedisCluster;
    }

    public JedisPool getJedisPoolFromSlot(String redisKey) {
        return getConnectionHandler().getJedisPoolFromSlot(redisKey);
    }

    private JedisPipelineCluster(Set<HostAndPort> jedisClusterNode, int timeout, int maxAttempts,final GenericObjectPoolConfig poolConfig){
        super(jedisClusterNode, timeout, maxAttempts, poolConfig);
        //繼承能夠添加個性化的方法,仔細看構造方法其實和父類是同樣的
        connectionHandler = new JedisSlotAdvancedConnectionHandler(jedisClusterNode, poolConfig,
                timeout, timeout);
    }

    private JedisSlotAdvancedConnectionHandler getConnectionHandler() {
        return (JedisSlotAdvancedConnectionHandler)this.connectionHandler;
    }

    private class JedisSlotAdvancedConnectionHandler extends JedisSlotBasedConnectionHandler {

        private JedisSlotAdvancedConnectionHandler(Set<HostAndPort> nodes, GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout) {
            super(nodes, poolConfig, connectionTimeout, soTimeout);
        }
      
        private JedisPool getJedisPoolFromSlot(String redisKey) {
            int slot = JedisClusterCRC16.getSlot(redisKey);
            JedisPool connectionPool = cache.getSlotPool(slot);
            if (connectionPool != null) {
                return connectionPool;
            } else {
                renewSlotCache();
                connectionPool = cache.getSlotPool(slot);
                if (connectionPool != null) {
                    return connectionPool;
                } else {
                    throw new RuntimeException("No reachable node in cluster for slot " + slot);
                }
            }
        }
    }
}

 

4、測試設計

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Pipeline;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @author zhangyining on 18/12/1 001.
 */
public class Test {

    public static void main(String[] args) {
        JedisPipelineCluster cluster = JedisPipelineCluster.getCluster();
        String[] testKeys = {"USER_FEAT_10013425884935", "USER_FEAT_10006864229638", "USER_FEAT_10008005187846"};

        Map<JedisPool, List<String>> poolKeys = new HashMap<>();

        for (String key : testKeys) {
            JedisPool jedisPool = cluster.getJedisPoolFromSlot(key);
            if (poolKeys.keySet().contains(jedisPool)){
                List<String> keys = poolKeys.get(jedisPool);
                keys.add(key);
            }else {
                List<String> keys = new ArrayList<>();
                keys.add(key);
                poolKeys.put(jedisPool, keys);
            }
        }

        for (JedisPool jedisPool : poolKeys.keySet()) {
            Jedis jedis = jedisPool.getResource();
            Pipeline pipeline = jedis.pipelined();

            List<String> keys = poolKeys.get(jedisPool);
            keys.forEach(key ->pipeline.get(key));
            List result = pipeline.syncAndReturnAll();
            System.out.println(result);
            jedis.close();
        }
    }
}

 5、總結

以前看到過有人經過反射來獲取connectionHandler和cache屬性的,我的以爲反射雖然強大,可是明明能夠繼承卻反射,有點怪怪的,看我的習慣吧。總之無論是那種方式,重點是熟悉redis集羣是怎麼分配數據以及執行請求的,剩下的不過是用不一樣的地方話(不用方式的代碼)說出來而已

相關文章
相關標籤/搜索