Jedis集羣模式經典實現

Jedis是Redis的Java客戶端,本代碼是Jedis應用的一個範例。html

已同步本人技術博客:siyuanwang.github.io/2019/12/07/…java

Redis經常使用模式

Redis分了了主從模式集羣模式node

主從模式

主從模式即便用一個Redis實例做爲主機(Master),其他的實例做爲備份機(Slave),Master支持寫入和讀取等各類操做,Slave支持讀操做和與Master同步數據。主從模式的核心思想是讀寫分離,數據冗餘存儲和HA,Master節點出現問題,能夠經過Redis Sentinel作到主從切換。git

Sentinel 系統用於管理多個 Redis 服務器(instance), 該系統執行如下三個任務:github

  • 監控(Monitoring): Sentinel 會不斷地檢查你的主服務器和從服務器是否運做正常。
  • 提醒(Notification): 當被監控的某個 Redis 服務器出現問題時, Sentinel 能夠經過 API 向管理員或者其餘應用程序發送通知。
  • 自動故障遷移(Automatic failover): 當一個主服務器不能正常工做時, Sentinel 會開始一次自動故障遷移操做, 它會將失效主服務器的其中一個從服務器升級爲新的主服務器, 並讓失效主服務器的其餘從服務器改成複製新的主服務器; 當客戶端試圖鏈接失效的主服務器時, 集羣也會向客戶端返回新主服務器的地址, 使得集羣可使用新主服務器代替失效服務器。

集羣模式

Redis主從模式雖然很強大,可是其單Master的架構,當遇到單機內存併發流量等瓶頸時便一籌莫展,Redis集羣的出現就是爲了解決主從模式所遇到的問題。在Redis Cluster面世以前,業界爲了解決Redis這個問題,也出現了一些優秀的Redis集羣解決方案,好比TwemproxyCodis,若是你們感興趣,能夠去學習,本文再也不比較各自的優劣。redis

集羣模式數據分佈

數據分佈理論

摘抄自參考文檔3,該文做者已經有了很好的總結。數據庫

分佈式數據庫首先要解決把整個數據集按照分區規則映射到多個節點的問題,每一個節點負責總體數據的一個子集。apache

數據分佈一般有哈希分區順序分區兩種方式,對好比下:服務器

分區方式 特色 相關產品
哈希分區 離散程度好,數據分佈與業務無關,沒法順序訪問 Redis Cluster,Cassandra,Dynamo
順序分區 離散程度易傾斜,數據分佈與業務相關,能夠順序訪問 BigTable,HBase,Hypertable

因爲Redis Cluster採用哈希分區規則,這裏重點討論哈希分區。常見的哈希分區規則有幾種:架構

  • 節點取餘分區:使用特定的數據,如 Redis的鍵或用戶ID,再根據節點數量N使用公式:hash(key)% N計算出 哈希值,用來決定數據 映射 到哪個節點上。這種方式簡單實用,經常使用語數據庫分庫分表,通常採用預分區的方式,提早按預估的數據量規劃好分區數。缺點也很明顯,當節點數量發生變化時,好比發生擴容縮容時,數據節點的映射關係須要從新計算,會致使數據的從新遷移。

  • 一致性哈希分區一致性哈希能夠很好的解決穩定性問題,能夠將全部的存儲節點排列在首尾相接的Hash環上,每一個key在計算Hash後順時針找到臨接的存儲節點存放。當有節點加入退出時,僅影響該節點在hash環上順時針相鄰的後續節點。加入和刪除節點,隻影響哈希環中順時針方向的相鄰的節點,對其餘節點無影響,可是仍是會形成哈希環中部分數據沒法命中。當使用少許節點時,節點變化將大範圍影響哈希環中的數據映射,不適合少許數據節點的分佈式方案普通的一致性哈希分區在增減節點時,須要增長一倍或減去一半節點,才能保證數據和負載的均衡

  • 虛擬槽分區:虛擬槽分區巧妙的使用了哈希空間,使用分散度良好的哈希函數把全部數據映射到一個固定範圍的整數集合中,整數定義爲槽(slot)這個範圍通常遠遠大於節點數,好比Redis Cluster的槽範圍是0~16383。槽是集羣內數據管理和遷移的基本單位。採用大範圍槽的主要目的是爲了方便數據拆分和集羣擴展。每一個節點會負責必定數量的槽。因爲從一個節點將哈希槽移動到另外一個節點並不會中止服務,因此不管添加刪除或者改變某個節點的哈希槽數量,都不會形成集羣不可用的狀態

    Redis虛擬槽分區的特色:

    • 解耦數據和節點之間的關係,簡化了節點擴容和收縮的難度
    • 節點自身維護槽的映射關係,不須要客戶端或者代理服務維護槽分區元數據。
    • 支持節點、槽、鍵之間的映射查詢,用於數據路由、在線伸縮等場景。

Redis集羣的功能限制

Redis 集羣相對 單機 在功能上存在一些限制,須要 開發人員 提早了解,在使用時作好規避。

  • key 批量操做 支持有限。

相似 msetmget 操做,目前只支持對具備相同 slot 值的 key 執行 批量操做。對於 映射爲不一樣 slot 值的 key 因爲執行 mgetmget 等操做可能存在於多個節點上,所以不被支持。

  • key 事務操做 支持有限。

只支持 key同一節點上事務操做,當多個 key 分佈在 不一樣 的節點上時 沒法 使用事務功能。

  • key 做爲 數據分區 的最小粒度

不能將一個 大的鍵值 對象如 hashlist 等映射到 不一樣的節點

  • 不支持 多數據庫空間

單機 下的 Redis 能夠支持 16 個數據庫(db0 ~ db15),集羣模式 下只能使用 一個 數據庫空間,即 db0

  • 複製結構 只支持一層

從節點 只能複製 主節點,不支持 嵌套樹狀複製 結構。

Jedis經典實現

import org.apache.ibatis.reflection.MetaObject;
import org.apache.ibatis.reflection.SystemMetaObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.*;
import redis.clients.util.JedisClusterCRC16;

import java.util.*;

public class RedisCacheDelegate extends AbstractCache implements CacheManager, Cache {
    private static Logger logger = LoggerFactory.getLogger(RedisCacheDelegate.class);
    /** * 集羣節點 */
    private String clusterNodes;
    /** * 重試次數 */
    private int maxAttempts;
    /** * 超時時間,單位是秒 */
    private int timeout;

    private JedisCluster jedisCluster;
    private JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();

    private static RedisCacheDelegate redisCacheDelegate = null;
    private JedisClusterInfoCache cache;

    private final static String WARM_KEY = "warm_key";
    private final static String WARM_VALUE = "value";


    public static RedisCacheDelegate getInstant(CacheProperties cacheProperties) {
        if (redisCacheDelegate == null) {
            synchronized (RedisCacheDelegate.class) {
                if (redisCacheDelegate == null) {
                    redisCacheDelegate = new RedisCacheDelegate(cacheProperties.getNodes(), cacheProperties.getTimeout(), cacheProperties.getMaxAttempts());
                }
            }
        }

        return redisCacheDelegate;
    }

    private RedisCacheDelegate(String clusterNodes, int timeout, int maxAttempts) {
        this.clusterNodes = clusterNodes;
        this.timeout = timeout;
        this.maxAttempts = maxAttempts;
        init();
    }

    private JedisPoolConfig getJedisPoolConfig() {

        //鏈接最長等待時間,默認是-1
        jedisPoolConfig.setMaxWaitMillis(200);
        //鏈接池最大數量
        jedisPoolConfig.setMaxTotal(50);
        //最小閒置個數 閒置超過最小閒置個數但不超過最大閒置個數,則逐步清理閒置直到最小閒置個數
        jedisPoolConfig.setMinIdle(10);
        //最大閒置個數 閒置超過最大閒置個數則直接殺死超過部分
        jedisPoolConfig.setMaxIdle(30);
        //鏈接耗盡等待,等待最長{MaxWaitMillis}毫秒
        jedisPoolConfig.setBlockWhenExhausted(true);
        //是否開啓jmx監控
        jedisPoolConfig.setJmxEnabled(true);
        //是否開啓空閒資源監測
        jedisPoolConfig.setTestWhileIdle(true);
        //空閒資源的檢測週期(單位爲毫秒)
        jedisPoolConfig.setMinEvictableIdleTimeMillis(60000);
        //資源池中資源最小空閒時間(單位爲毫秒),達到此值後空閒資源將被移除
        jedisPoolConfig.setTimeBetweenEvictionRunsMillis(30000);
        //作空閒資源檢測時,每次的採樣數,若是設置爲-1,就是對全部鏈接作空閒監測
        jedisPoolConfig.setNumTestsPerEvictionRun(-1);

        return jedisPoolConfig;
    }

    @Override
    public void init() {
        String[] serverArray = clusterNodes.split(",");
        Set<HostAndPort> nodes = new HashSet<>();
        for (String ipPort : serverArray) {
            String[] ipPortPair = ipPort.split(":");
            nodes.add(new HostAndPort(ipPortPair[0].trim(), Integer.valueOf(ipPortPair[1].trim())));
        }

        jedisCluster = new JedisCluster(nodes, timeout * 1000, maxAttempts, getJedisPoolConfig());
        MetaObject metaObject = SystemMetaObject.forObject(jedisCluster);
        cache = (JedisClusterInfoCache) metaObject.getValue("connectionHandler.cache");
        warm();
    }

    /** * warm the jedis pool */
    @Override
    public void warm() {
        set(WARM_KEY, WARM_VALUE, 60);
        for (int i = 0; i < jedisPoolConfig.getMinIdle(); i++) {
            ttl(WARM_KEY);
        }
    }

    @Override
    public void set(String key, String value) {
        jedisCluster.set(key, value);
    }

    @Override
    public void set(String key, String value, int expiredTime) {
        jedisCluster.setex(key, expiredTime, value);
    }

    @Override
    public void mSet(Map<String, String> data) {
        if (data != null && data.size() > 0) {
            data.forEach((key, value) -> jedisCluster.set(key, value));
        }
    }

    @Override
    public void mSetPipLine(Map<String, String> data) {
        setPipLine(data, 0);
    }

    private void setPipLine(Map<String, String> data, int expiredTime) {
        if (data.size() < 1) {
            return;
        }

        //保存地址+端口和命令的映射
        Map<JedisPool, Map<String, String>> jedisPoolMap = new HashMap<>();
        JedisPool currentJedisPool = null;

        for (String key : data.keySet()) {
            //計算哈希槽
            int crc = JedisClusterCRC16.getSlot(key);
            //經過哈希槽獲取節點的鏈接
            currentJedisPool = cache.getSlotPool(crc);

            if (jedisPoolMap.containsKey(currentJedisPool)) {
                jedisPoolMap.get(currentJedisPool).put(key, data.get(key));
            } else {
                Map<String, String> inner = new HashMap<>();
                inner.put(key, data.get(key));
                jedisPoolMap.put(currentJedisPool, inner);
            }
        }
        //保存結果
        Map<String, String> map = null;
        //執行
        for (Map.Entry<JedisPool, Map<String, String>> entry : jedisPoolMap.entrySet()) {
            try {
                currentJedisPool = entry.getKey();
                map = entry.getValue();
                Jedis jedis = currentJedisPool.getResource();
                //獲取pipeline
                Pipeline currentPipeline = jedis.pipelined();
                // NX是不存在時才set, XX是存在時才set, EX是秒,PX是毫秒
                if (expiredTime > 0) {
                    map.forEach((k, v) -> currentPipeline.setex(k, expiredTime, v));
                } else {
                    map.forEach((k, v) -> currentPipeline.set(k, v));
                }
                //從pipeline中獲取結果
                currentPipeline.sync();
                currentPipeline.close();
                jedis.close();
            } catch (Exception e) {
                logger.error("setPipline error.", e);
            }

        }
    }

    @Override
    public void mSet(Map<String, String> data, int expiredTime) {
        if (data != null && data.size() > 0) {
            data.forEach((key, value) -> jedisCluster.setex(key, expiredTime, value));
        }
    }

    @Override
    public void mSetPipLine(Map<String, String> data, int expiredTime) {
        setPipLine(data, expiredTime);
    }

    @Override
    public String get(String key) {
        return jedisCluster.get(key);
    }

    @Override
    public List<String> mGet(List<String> keys) {
        if (keys.size() < 1) {
            return null;
        }
        List<String> result = new ArrayList<>(keys.size());
        for (String key : keys) {
            result.add(jedisCluster.get(key));
        }
        return result;
    }

    @Override
    public List<String> mGetPipLine(List<String> key) {
        return getPipLine(key);
    }

    @Override
    public long ttl(String key) {
        return jedisCluster.ttl(key);
    }

    private List<String> getPipLine(List<String> keys) {
        if (keys.size() < 1) {
            return null;
        }
        List<String> result = new ArrayList<>(keys.size());
        Map<String, String> resultMap = new HashMap<>(keys.size());
        if (keys.size() == 1) {
            result.add(jedisCluster.get(keys.get(0)));
            return result;
        }

        //保存地址+端口和命令的映射
        Map<JedisPool, List<String>> jedisPoolMap = new HashMap<>();

        List<String> keyList = null;
        JedisPool currentJedisPool = null;
        Pipeline currentPipeline = null;

        for (String key : keys) {
            //cuteculate hash
            int crc = JedisClusterCRC16.getSlot(key);
            //經過哈希槽獲取節點的鏈接
            currentJedisPool = cache.getSlotPool(crc);

            if (jedisPoolMap.containsKey(currentJedisPool)) {
                jedisPoolMap.get(currentJedisPool).add(key);
            } else {
                keyList = new ArrayList<>();
                keyList.add(key);
                jedisPoolMap.put(currentJedisPool, keyList);
            }
        }

        //保存結果
        List<Object> res;
        //執行
        for (Map.Entry<JedisPool, List<String>> entry : jedisPoolMap.entrySet()) {
            try {
                currentJedisPool = entry.getKey();
                keyList = entry.getValue();
                //獲取pipeline
                Jedis jedis = currentJedisPool.getResource();
                currentPipeline = jedis.pipelined();
                for (String key : keyList) {
                    currentPipeline.get(key);
                }
                //從pipeline中獲取結果
                res = currentPipeline.syncAndReturnAll();
                currentPipeline.close();
                jedis.close();
                for (int i = 0; i < keyList.size(); i++) {
                    resultMap.put(keyList.get(i), res.get(i) == null ? null : res.get(i).toString());
                }
            } catch (Exception e) {
                logger.error("getPipLine error.", e);
            }
        }
        //sort
        for (String key : keys) {
            result.add(resultMap.containsKey(key) ? resultMap.get(key) : null);
        }

        return result;
    }

    @Override
    public void destroy() {
        try {
            jedisCluster.close();
        } catch (Exception e) {

        }
    }
}
複製代碼

參考文檔

一、玩轉Redis集羣之Cluster

二、Redis哨兵模式實現主從切換

三、深刻剖析Redis系列(三) - Redis集羣模式搭建與原理詳解

相關文章
相關標籤/搜索