一種簡單實現Redis集羣Pipeline功能的方法及性能測試

上一篇文章《redis pipeline批量處理提升性能》中咱們講到redis pipeline模式在批量數據處理上帶來了很大的性能提高,咱們先來回顧一下pipeline的原理,redis client與server之間採用的是請求應答的模式,以下所示:html

Client: command1 
Server: response1 
Client: command2 
Server: response2 
…

在這種狀況下,若是要完成10個命令,則須要20次交互才能完成。所以,即便redis處理能力很強,仍然會受到網絡傳輸影響,致使吞吐量上不去。而在管道(pipeline)模式下,多個請求能夠變成這樣:java

Client: command1,command2… 
Server: response1,response2…

在這種狀況下,完成命令只須要2次交互。這樣網絡傳輸上可以更加高效,加上redis自己強勁的處理能力,給數據處理帶來極大的性能提高。但實際上遇到的問題是,項目上所用到的是Redis集羣,初始化的時候使用的類是JedisCluster而不是Jedis。去查了JedisCluster的文檔,並無發現提供有像Jedis同樣的獲取Pipeline對象的 pipelined()方法。node

爲何RedisCluster沒法使用pipeline?

咱們知道,Redis 集羣的鍵空間被分割爲 16384 個槽(slot),集羣的最大節點數量也是 16384 個。每一個主節點都負責處理 16384 個哈希槽的其中一部分。具體的redis命令,會根據key計算出一個槽位(slot),而後根據槽位去特定的節點redis上執行操做。以下所示:redis

master1(slave1): 0~5460
master2(slave2):5461~10922
master3(slave3):10923~16383

集羣有三個master節點組成,其中master1分配了 0~5460的槽位,master2分配了 5461~10922的槽位,master3分配了 10923~16383的槽位。算法

一次pipeline會批量執行多個命令,那麼每一個命令都須要根據「key」運算一個槽位(JedisClusterCRC16.getSlot(key)),而後根據槽位去特定的機器執行命令,也就是說一次pipeline操做會使用多個節點的redis鏈接,而目前JedisCluster是沒法支持的。shell

如何基於JedisCluster擴展pipeline?

設計思路

1.首先要根據key計算出這次pipeline會使用到的節點對應的鏈接(也就是jedis對象,一般每一個節點對應一個Pool)。
2.相同槽位的key,使用同一個jedis.pipeline去執行命令。
3.合併這次pipeline全部的response返回。
4.鏈接釋放返回到池中。apache

也就是將一個JedisCluster下的pipeline分解爲每一個單節點下獨立的jedisPipeline操做,最後合併response返回。具體實現就是經過JedisClusterCRC16.getSlot(key)計算key的slot值,經過每一個節點的slot分佈,就知道了哪些key應該在哪些節點上。再獲取這個節點的JedisPool就可使用pipeline進行讀寫了。
實現上面的過程能夠有不少種方式,本文將介紹一種也許是代碼量最少的一種解決方案。網絡

解決方案

上面提到的過程,其實在JedisClusterInfoCache對象中都已經幫助開發人員實現了,可是這個對象在JedisClusterConnectionHandler中爲protected並無對外開放,並且經過JedisCluster的API也沒法拿到JedisClusterConnectionHandler對象。因此經過下面兩個類將這些對象暴露出來,這樣使用getJedisPoolFromSlot就能夠知道每一個key對應的JedisPool了。性能

Maven依賴測試

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.9.0</version>
</dependency>

JedisClusterPipeline

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;

import java.util.Set;

public class JedisClusterPipeline extends JedisCluster {
    public JedisClusterPipeline(Set<HostAndPort> jedisClusterNode, int connectionTimeout, int soTimeout, int maxAttempts, String password, final GenericObjectPoolConfig poolConfig) {
        super(jedisClusterNode,connectionTimeout, soTimeout, maxAttempts, password, poolConfig);
        super.connectionHandler = new JedisSlotAdvancedConnectionHandler(jedisClusterNode, poolConfig,
                connectionTimeout, soTimeout ,password);
    }

    public JedisSlotAdvancedConnectionHandler getConnectionHandler() {
        return (JedisSlotAdvancedConnectionHandler)this.connectionHandler;
    }

    /**
     * 刷新集羣信息,當集羣信息發生變動時調用
     * @param
     * @return
     */
    public void refreshCluster() {
        connectionHandler.renewSlotCache();
    }
}

JedisSlotAdvancedConnectionHandler

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisSlotBasedConnectionHandler;
import redis.clients.jedis.exceptions.JedisNoReachableClusterNodeException;

import java.util.Set;

public class JedisSlotAdvancedConnectionHandler extends JedisSlotBasedConnectionHandler {

    public JedisSlotAdvancedConnectionHandler(Set<HostAndPort> nodes, GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout,String password) {
        super(nodes, poolConfig, connectionTimeout, soTimeout, password);
    }

    public JedisPool getJedisPoolFromSlot(int slot) {
        JedisPool connectionPool = cache.getSlotPool(slot);
        if (connectionPool != null) {
            // It can't guaranteed to get valid connection because of node
            // assignment
            return connectionPool;
        } else {
            renewSlotCache(); //It's abnormal situation for cluster mode, that we have just nothing for slot, try to rediscover state
            connectionPool = cache.getSlotPool(slot);
            if (connectionPool != null) {
                return connectionPool;
            } else {
                throw new JedisNoReachableClusterNodeException("No reachable node in cluster for slot " + slot);
            }
        }
    }
}

編寫測試類,向redis集羣寫入10000條數據,分別測試調用普通JedisCluster模式和調用上面實現的JedisCluster Pipeline模式的性能對比,測試類以下:

import redis.clients.jedis.*;
import redis.clients.util.JedisClusterCRC16;
import java.io.UnsupportedEncodingException;
import java.util.*;

public class PipelineTest {
    public static void main(String[] args) throws UnsupportedEncodingException {
        PipelineTest client = new PipelineTest();
        Set<HostAndPort> nodes = new HashSet<>();
        nodes.add(new HostAndPort("node1",20249));
        nodes.add(new HostAndPort("node2",20508));
        nodes.add(new HostAndPort("node3",20484));
        String redisPassword = "123456";
        //測試
        client.jedisCluster(nodes,redisPassword);
        client.clusterPipeline(nodes,redisPassword);
    }
    //普通JedisCluster 批量寫入測試
    public void jedisCluster(Set<HostAndPort> nodes,String redisPassword) throws UnsupportedEncodingException {
        JedisCluster jc = new JedisCluster(nodes, 2000, 2000,100,redisPassword, new JedisPoolConfig());
        List<String> setKyes = new ArrayList<>();
        for (int i = 0; i < 10000; i++) {
            setKyes.add("single"+i);
        }
        long start = System.currentTimeMillis();
        for(int j = 0;j < setKyes.size();j++){
            jc.setex(setKyes.get(j),100,"value"+j);
        }
        System.out.println("JedisCluster total time:"+(System.currentTimeMillis() - start));
    }
    //JedisCluster Pipeline 批量寫入測試
    public void clusterPipeline(Set<HostAndPort> nodes,String redisPassword) {
        JedisClusterPipeline jedisClusterPipeline = new JedisClusterPipeline(nodes, 2000, 2000,10,redisPassword, new JedisPoolConfig());
        JedisSlotAdvancedConnectionHandler jedisSlotAdvancedConnectionHandler = jedisClusterPipeline.getConnectionHandler();
        Map<JedisPool, List<String>> poolKeys = new HashMap<>();
        List<String> setKyes = new ArrayList<>();
        for (int i = 0; i < 10000; i++) {
                setKyes.add("pipeline"+i);
        }
        long start = System.currentTimeMillis();
        //查詢出 key 所在slot ,經過 slot 獲取 JedisPool ,將key 按 JedisPool 分組
        jedisClusterPipeline.refreshCluster();
        for(int j = 0;j < setKyes.size();j++){
            String key = setKyes.get(j);
            int slot = JedisClusterCRC16.getSlot(key);
            JedisPool jedisPool = jedisSlotAdvancedConnectionHandler.getJedisPoolFromSlot(slot);
            if (poolKeys.keySet().contains(jedisPool)){
                List<String> keys = poolKeys.get(jedisPool);
                keys.add(key);
            }else {
                List<String> keys = new ArrayList<>();
                keys.add(key);
                poolKeys.put(jedisPool, keys);
            }
        }
        //調用Jedis pipeline進行單點批量寫入
        for (JedisPool jedisPool : poolKeys.keySet()) {
            Jedis jedis = jedisPool.getResource();
            Pipeline pipeline = jedis.pipelined();
            List<String> keys = poolKeys.get(jedisPool);
            for(int i=0;i<keys.size();i++){
                pipeline.setex(keys.get(i),100, "value" + i);
            }
            pipeline.sync();//同步提交
            jedis.close();
        }
        System.out.println("JedisCluster Pipeline total time:"+(System.currentTimeMillis() - start));
    }
}

測試結果以下:

JedisCluster total time:29147
JedisCluster Pipeline total time:190

結論:對於批量操做,JedisCluster Pipeline有明顯的性能提高。

總結

本文旨在介紹一種在Redis集羣模式下提供Pipeline批量操做的功能。基本思路就是根據redis cluster對數據哈希取模的算法,先計算數據存放的slot位置, 而後根據不一樣的節點將數據分紅多批,對不一樣批的數據進行單點pipeline處理。 可是須要注意的是,因爲集羣模式存在節點的動態添加刪除,且client不能實時感知(只有在執行命令時纔可能知道集羣發生變動),所以,該實現不保證必定成功,建議在批量操做以前調用 refreshCluster() 方法從新獲取集羣信息。應用須要保證不論成功仍是失敗都會調用close() 方法,不然可能會形成泄露。若是失敗須要應用本身去重試,所以每一個批次執行的命令數量須要控制,防止失敗後重試的數量過多。 基於以上說明,建議在集羣環境較穩定(增減節點不會過於頻繁)的狀況下使用,且容許失敗或有對應的重試策略。

相關文章
相關標籤/搜索