Redis 批量操做之 pipeline

業務場景

最近項目中場景須要get一批key的value,由於redis的get操做(不僅僅是get命令)是阻塞的,若是循環取值的話,就算是內網,耗時也是巨大的。因此想到了redis的pipeline命令。java

pipeline簡介

非pipeline:client一個請求,redis server一個響應,期間client阻塞redis

Pipeline:redis的管道命令,容許client將多個請求依次發給服務器(redis的客戶端,如jedisCluster,lettuce等都實現了對pipeline的封裝),過程當中而不須要等待請求的回覆,在最後再一併讀取結果便可。緩存

單機版

單機版比較簡單,直接上代碼服務器

//換成真實的redis實例
Jedis jedis = new Jedis();
//獲取管道
Pipeline p = jedis.pipelined();
for (int i = 0; i < 10000; i++) {
    p.get(i + "");
}
//獲取結果
List<Object> results = p.syncAndReturnAll();

複製代碼

集羣版

由於 JedisCluster 自己不支持 pipeline ,因此咱們須要對 JedisCluster 進行一些封裝。ide

仍是同樣,直接上代碼ui

import lombok.extern.slf4j.Slf4j;
import redis.clients.jedis.*;
import redis.clients.jedis.exceptions.JedisMovedDataException;
import redis.clients.jedis.exceptions.JedisRedirectionException;
import redis.clients.util.JedisClusterCRC16;
import redis.clients.util.SafeEncoder;

import java.io.Closeable;
import java.lang.reflect.Field;
import java.util.*;
import java.util.function.BiConsumer;



@Slf4j
public class JedisClusterPipeline extends PipelineBase implements Closeable {


    /** * 用於獲取 JedisClusterInfoCache */
    private JedisSlotBasedConnectionHandler connectionHandler;
    /** * 根據hash值獲取鏈接 */
    private JedisClusterInfoCache clusterInfoCache;

    /** * 也能夠去繼承JedisCluster和JedisSlotBasedConnectionHandler來提供訪問接口 * JedisCluster繼承於BinaryJedisCluster * 在BinaryJedisCluster,connectionHandler屬性protected修飾的,因此須要反射 * * * 而 JedisClusterInfoCache 屬性在JedisClusterConnectionHandler中,可是這個類是抽象類, * 但它有一個實現類JedisSlotBasedConnectionHandler */
    private static final Field FIELD_CONNECTION_HANDLER;
    private static final Field FIELD_CACHE;
    static {
        FIELD_CONNECTION_HANDLER = getField(BinaryJedisCluster.class, "connectionHandler");
        FIELD_CACHE = getField(JedisClusterConnectionHandler.class, "cache");
    }


    /** * 根據順序存儲每一個命令對應的Client */
    private Queue<Client> clients = new LinkedList<>();
    /** * 用於緩存鏈接 * 一次pipeline過程當中使用到的jedis緩存 */
    private Map<JedisPool, Jedis> jedisMap = new HashMap<>();
    /** * 是否有數據在緩存區 */
    private boolean hasDataInBuf = false;

    /** * 根據jedisCluster實例生成對應的JedisClusterPipeline * 經過此方式獲取pipeline進行操做的話必須調用close()關閉管道 * 調用本類裏pipelineXX方法則不用close(),但建議最好仍是在finally裏調用一下close() * @param * @return */
    public static JedisClusterPipeline pipelined(JedisCluster jedisCluster) {
        JedisClusterPipeline pipeline = new JedisClusterPipeline();
        pipeline.setJedisCluster(jedisCluster);
        return pipeline;
    }


    public JedisClusterPipeline() {
    }

    public void setJedisCluster(JedisCluster jedis) {
        connectionHandler = getValue(jedis, FIELD_CONNECTION_HANDLER);
        clusterInfoCache = getValue(connectionHandler, FIELD_CACHE);
    }

    /** * 刷新集羣信息,當集羣信息發生變動時調用 * @param * @return */
    public void refreshCluster() {
        connectionHandler.renewSlotCache();
    }

    /** * 同步讀取全部數據. 與syncAndReturnAll()相比,sync()只是沒有對數據作反序列化 */
    public void sync() {
        innerSync(null);
    }

    /** * 同步讀取全部數據 並按命令順序返回一個列表 * * @return 按照命令的順序返回全部的數據 */
    public List<Object> syncAndReturnAll() {
        List<Object> responseList = new ArrayList<>();

        innerSync(responseList);

        return responseList;
    }


    @Override
    public void close() {
        clean();

        clients.clear();

        for (Jedis jedis : jedisMap.values()) {
            if (hasDataInBuf) {
                flushCachedData(jedis);
            }

            jedis.close();
        }

        jedisMap.clear();

        hasDataInBuf = false;
    }

    private void flushCachedData(Jedis jedis) {
        try {
            jedis.getClient().getAll();
        } catch (RuntimeException ex) {
        }
    }

    @Override
    protected Client getClient(String key) {
        byte[] bKey = SafeEncoder.encode(key);

        return getClient(bKey);
    }

    @Override
    protected Client getClient(byte[] key) {
        Jedis jedis = getJedis(JedisClusterCRC16.getSlot(key));

        Client client = jedis.getClient();
        clients.add(client);

        return client;
    }

    private Jedis getJedis(int slot) {
        JedisPool pool = clusterInfoCache.getSlotPool(slot);

        // 根據pool從緩存中獲取Jedis
        Jedis jedis = jedisMap.get(pool);
        if (null == jedis) {
            jedis = pool.getResource();
            jedisMap.put(pool, jedis);
        }

        hasDataInBuf = true;
        return jedis;
    }



    public static void pipelineSetEx(String[] keys, String[] values, int[] exps,JedisCluster jedisCluster) {
        operate(new Command() {
            @Override
            public List execute() {
                JedisClusterPipeline p = pipelined(jedisCluster);
                for (int i = 0, len = keys.length; i < len; i++) {
                    p.setex(keys[i], exps[i], values[i]);
                }
                return p.syncAndReturnAll();
            }
        });
    }

    public static List<Map<String, String>> pipelineHgetAll(String[] keys,JedisCluster jedisCluster) {
        return operate(new Command() {
            @Override
            public List execute() {
                JedisClusterPipeline p = pipelined(jedisCluster);
                for (int i = 0, len = keys.length; i < len; i++) {
                    p.hgetAll(keys[i]);
                }
                return p.syncAndReturnAll();
            }
        });
    }



    public static List<Boolean> pipelineSismember(String[] keys, String members,JedisCluster jedisCluster) {
        return operate(new Command() {
            @Override
            public List execute() {
                JedisClusterPipeline p = pipelined(jedisCluster);
                for (int i = 0, len = keys.length; i < len; i++) {
                    p.sismember(keys[i], members);
                }
                return p.syncAndReturnAll();
            }
        });
    }



    public static <O> List pipeline(BiConsumer<O, JedisClusterPipeline> function, O obj,JedisCluster jedisCluster) {
        return operate(new Command() {
            @Override
            public List execute() {
                JedisClusterPipeline jcp = JedisClusterPipeline.pipelined(jedisCluster);
                function.accept(obj, jcp);
                return jcp.syncAndReturnAll();
            }
        });
    }



    private void innerSync(List<Object> formatted) {
        HashSet<Client> clientSet = new HashSet<>();

        try {
            for (Client client : clients) {
                // 在sync()調用時實際上是不須要解析結果數據的,可是若是不調用get方法,發生了JedisMovedDataException這樣的錯誤應用是不知道的,所以須要調用get()來觸發錯誤。
                // 其實若是Response的data屬性能夠直接獲取,能夠省掉解析數據的時間,然而它並無提供對應方法,要獲取data屬性就得用反射,不想再反射了,因此就這樣了
                Object data = generateResponse(client.getOne()).get();
                if (null != formatted) {
                    formatted.add(data);
                }

                // size相同說明全部的client都已經添加,就不用再調用add方法了
                if (clientSet.size() != jedisMap.size()) {
                    clientSet.add(client);
                }
            }
        } catch (JedisRedirectionException jre) {
            if (jre instanceof JedisMovedDataException) {
                // if MOVED redirection occurred, rebuilds cluster's slot cache,
                // recommended by Redis cluster specification
                refreshCluster();
            }

            throw jre;
        } finally {
            if (clientSet.size() != jedisMap.size()) {
                // 全部尚未執行過的client要保證執行(flush),防止放回鏈接池後後面的命令被污染
                for (Jedis jedis : jedisMap.values()) {
                    if (clientSet.contains(jedis.getClient())) {
                        continue;
                    }

                    flushCachedData(jedis);
                }
            }

            hasDataInBuf = false;
            close();
        }
    }


    private static Field getField(Class<?> cls, String fieldName) {
        try {
            Field field = cls.getDeclaredField(fieldName);
            field.setAccessible(true);

            return field;
        } catch (NoSuchFieldException | SecurityException e) {
            throw new RuntimeException("cannot find or access field '" + fieldName + "' from " + cls.getName(), e);
        }
    }

    @SuppressWarnings({"unchecked" })
    private static <T> T getValue(Object obj, Field field) {
        try {
            return (T)field.get(obj);
        } catch (IllegalArgumentException | IllegalAccessException e) {
            log.error("get value fail", e);

            throw new RuntimeException(e);
        }
    }

    private static <T> T operate(Command command) {
        try  {
            return command.execute();
        } catch (Exception e) {
            log.error("redis operate error");
            throw new RuntimeException(e);
        }
    }

    interface Command {
        /** * 具體執行命令 * * @param <T> * @return */
        <T> T execute();
    }

}


複製代碼

使用demothis

public Object testPipelineOperate() {
        // String[] keys = {"dylan1","dylan2"};
        // String[] values = {"dylan1-v1","dylan2-v2"};
        // int[] exps = {100,200};
        // JedisClusterPipeline.pipelineSetEx(keys, values, exps, jedisCluster);
        long start = System.currentTimeMillis();

        List<String> keyList = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            keyList.add(i + "");
        }
        // List<String> pipeline = JedisClusterPipeline.pipeline(this::getValue, keyList, jedisCluster);
        // List<String> pipeline = JedisClusterPipeline.pipeline(this::getHashValue, keyList, jedisCluster);
        String[] keys = {"dylan-test1", "dylan-test2"};

        List<Map<String, String>> all = JedisClusterPipeline.pipelineHgetAll(keys, jedisCluster);
        long end = System.currentTimeMillis();
        System.out.println("testPipelineOperate cost:" + (end-start));

        return Response.success(all);
    }

複製代碼
相關文章
相關標籤/搜索