本文例子基於:5.0.4java
Redis Cluster集羣高可用方案,去中心化,最基本三主多從,主從切換相似Sentinel,關於Sentinel內容能夠查看編者另一篇【Redis從入門到放棄系列(九) Sentinel】.node
在Redis Cluster中,只存在index爲0的數據庫,並且其實Redis做爲單線程,若是在同一個實例上建立多個庫的話,也是須要上下文切換的.redis
因爲Redis Cluster是採用16384個slot來劃分數據的,也就是說你當前插入的數據會存在不一樣的節點上,簡而言之不支持比較複雜的多建操做(能夠對key打上hash tags來解決).數據庫
咱們說Cluster是按照16384個slot來劃分數據的,那麼是如何來肯定一個key落在那個節點上呢?服務器
//計算slot HASH_SLOT = CRC16(key) mod 16384
每一個節點會擁有一部分的slot,經過上述獲取到具體key的slot即知道應該去哪兒找對應的節點啦.但是在網絡中,一切都會有不存穩定因素,網絡抖動.網絡
當在Cluster中存在網絡抖動的時候,當時間過長,有可能產生下線,其實原理跟Sentinel裏面講的很類似,由於都是依賴Gossip協議來實現的.能夠經過如下配置來設置肯定下線的時間.多線程
//節點持續timeout的時間,才認定該節點出現故障,須要進行主從切換, cluster-node-timeout //做爲上面timeout的係數來放大時間 cluster-replica-validity-factor
因爲數據是按照16384個slot去劃分的,那麼當咱們在請求某個key到錯誤的節點,這時候key不在該節點上,Redis會向咱們發送一個錯誤運維
-MOVED 3999 127.0.0.1:6381
該消息是提示咱們該key應該是存在127.0.0.1
這臺服務器上面的3999slot,這時候就須要咱們的redis客戶端去糾正本地的slot映射表,而後請求對應的地址.dom
當咱們在增長或者刪除某個節點的時候,其實就只是將slot從某個節點移動到另一個節點.可使用一下命令來完成這一件事ide
在遷移的時候,redis節點會存在兩種狀態,一種是MIGRATING和IMPORTING,用於將slot從一個節點遷移到另一個節點.
public class RedisUtils { private static final String LOCK_SUCCESS = "OK"; private static final String SET_IF_NOT_EXIST = "NX"; private static final String SET_WITH_EXPIRE_TIME = "PX"; private static final Long RELEASE_SUCCESS = 1L; private final ThreadLocal<String> requestId = new ThreadLocal<>(); private final static ExecutorService executorService = new ThreadPoolExecutor( //核心線程數量 1, //最大線程數量 8, //當線程空閒時,保持活躍的時間 1000, //時間單元 ,毫秒級 TimeUnit.MILLISECONDS, //線程任務隊列 new LinkedBlockingQueue<>(1024), //建立線程的工廠 new RedisTreadFactory("redis-batch")); @Autowired private JedisCluster jedisCluster; public String set(String key, String value) { return jedisCluster.set(key, value); } public String get(String key) { return jedisCluster.get(key); } public Map<String, String> getBatchKey(List<String> keys) { Map<Jedis, List<String>> nodeKeyListMap = jedisKeys(keys); //結果集 Map<String, String> resultMap = new HashMap<>(); CompletionService<Map<String,String>> batchService = new ExecutorCompletionService(executorService); nodeKeyListMap.forEach((k,v)->{ batchService.submit(new BatchGetTask(k,v)); }); nodeKeyListMap.forEach((k,v)->{ try { resultMap.putAll(batchService.take().get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }); return resultMap; } public boolean lock(String lockKey, long expireTime){ String uuid = UUID.randomUUID().toString(); requestId.set(uuid); String result = jedisCluster.set(lockKey, uuid, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); return LOCK_SUCCESS.equals(result); } public boolean unLock(String lockKey){ String uuid = requestId.get(); String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; Object result = jedisCluster.eval(script, Collections.singletonList(lockKey), Collections.singletonList(uuid)); requestId.remove(); return RELEASE_SUCCESS.equals(result); } private Map<Jedis, List<String>> jedisKeys(List<String> keys){ Map<Jedis, List<String>> nodeKeyListMap = new HashMap<>(); for (String key : keys) { //計算slot int slot = JedisClusterCRC16.getSlot(key); Jedis jedis = jedisCluster.getConnectionFromSlot(slot); if (nodeKeyListMap.containsKey(jedis)) { nodeKeyListMap.get(jedis).add(key); } else { nodeKeyListMap.put(jedis, Arrays.asList(key)); } } return nodeKeyListMap; } public long delBatchKey(List<String> keys){ Map<Jedis, List<String>> nodeKeyListMap = jedisKeys(keys); CompletionService<Long> batchService = new ExecutorCompletionService(executorService); nodeKeyListMap.forEach((k,v)->{ batchService.submit(new BatchDelTask(k,v)); }); Long result = 0L; for (int i=0;i<nodeKeyListMap.size();i++){ try { result += batchService.take().get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } return result; } class BatchGetTask implements Callable<Map<String,String>>{ private Jedis jedis; private List<String> keys; private BatchGetTask(Jedis jedis, List<String> keys) { this.jedis = jedis; this.keys = keys; } @Override public Map<String, String> call() throws Exception { Map<String, String> resultMap = new HashMap<>(); String[] keyArray = keys.toArray(new String[]{}); try { List<String> nodeValueList = jedis.mget(keyArray); for (int i = 0; i < keys.size(); i++) { resultMap.put(keys.get(i),nodeValueList.get(i)); } }finally { jedis.close(); } return resultMap; } } class BatchDelTask implements Callable<Long>{ private Jedis jedis; private List<String> keys; private BatchDelTask(Jedis jedis, List<String> keys) { this.jedis = jedis; this.keys = keys; } @Override public Long call() throws Exception { String[] keyArray = keys.toArray(new String[]{}); try { return jedis.del(keyArray); }finally { jedis.close(); } } } static class RedisTreadFactory implements ThreadFactory{ private final AtomicInteger threadNumber = new AtomicInteger(0); private final String namePredix; public RedisTreadFactory(String namePredix) { this.namePredix = namePredix +"-"; } @Override public Thread newThread(Runnable r) { Thread t = new Thread( r,namePredix + threadNumber.getAndIncrement()); if (t.isDaemon()) t.setDaemon(true); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } } }
Redis從入門到放棄系列終於完結啦!!!!!!!!!!!
寫博客,真的是很是耗時間,真的,原本星期六日要寫的,然而由於某些問題而沒有寫出來(PS:純粹是由於打遊戲.hhhh),終於在今天痛定思痛,頂着脖子酸的壓力(PS:貼着狗皮膏藥在擼碼),終於完結了.
感謝各位看官那麼辛苦看我碼字,真心感謝.
但願寫的東西對各位看官有啓發.