Jedis是Redis的Java客戶端,本代碼是Jedis應用的一個範例。html
已同步本人技術博客:siyuanwang.github.io/2019/12/07/…java
Redis分了了主從模式和集羣模式。node
主從模式即便用一個Redis實例做爲主機(Master),其他的實例做爲備份機(Slave),Master支持寫入和讀取等各類操做,Slave支持讀操做和與Master同步數據。主從模式的核心思想是讀寫分離,數據冗餘存儲和HA,Master節點出現問題,能夠經過Redis Sentinel作到主從切換。git
Sentinel 系統用於管理多個 Redis 服務器(instance), 該系統執行如下三個任務:github
Redis主從模式雖然很強大,可是其單Master的架構,當遇到單機內存、併發、流量等瓶頸時便一籌莫展,Redis集羣的出現就是爲了解決主從模式所遇到的問題。在Redis Cluster面世以前,業界爲了解決Redis這個問題,也出現了一些優秀的Redis集羣解決方案,好比Twemproxy和Codis,若是你們感興趣,能夠去學習,本文再也不比較各自的優劣。redis
摘抄自參考文檔3,該文做者已經有了很好的總結。數據庫
分佈式數據庫首先要解決把整個數據集按照分區規則映射到多個節點的問題,每一個節點負責總體數據的一個子集。apache
數據分佈一般有哈希分區和順序分區兩種方式,對好比下:服務器
分區方式 | 特色 | 相關產品 |
---|---|---|
哈希分區 | 離散程度好,數據分佈與業務無關,沒法順序訪問 | Redis Cluster,Cassandra,Dynamo |
順序分區 | 離散程度易傾斜,數據分佈與業務相關,能夠順序訪問 | BigTable,HBase,Hypertable |
因爲Redis Cluster採用哈希分區規則,這裏重點討論哈希分區。常見的哈希分區規則有幾種:架構
節點取餘分區:使用特定的數據,如 Redis的鍵或用戶ID,再根據節點數量N使用公式:hash(key)% N
計算出 哈希值,用來決定數據 映射 到哪個節點上。這種方式簡單實用,經常使用語數據庫分庫分表,通常採用預分區的方式,提早按預估的數據量規劃好分區數。缺點也很明顯,當節點數量發生變化時,好比發生擴容或縮容時,數據節點的映射關係須要從新計算,會致使數據的從新遷移。
一致性哈希分區:一致性哈希能夠很好的解決穩定性問題,能夠將全部的存儲節點排列在首尾相接的Hash環上,每一個key在計算Hash後順時針找到臨接的存儲節點存放。當有節點加入或退出時,僅影響該節點在hash環上順時針相鄰的後續節點。加入和刪除節點,隻影響哈希環中順時針方向的相鄰的節點,對其餘節點無影響,可是仍是會形成哈希環中部分數據沒法命中。當使用少許節點時,節點變化將大範圍影響哈希環中的數據映射,不適合少許數據節點的分佈式方案。普通的一致性哈希分區在增減節點時,須要增長一倍或減去一半節點,才能保證數據和負載的均衡。
虛擬槽分區:虛擬槽分區巧妙的使用了哈希空間,使用分散度良好的哈希函數把全部數據映射到一個固定範圍的整數集合中,整數定義爲槽(slot)。這個範圍通常遠遠大於節點數,好比Redis Cluster的槽範圍是0~16383。槽是集羣內數據管理和遷移的基本單位。採用大範圍槽的主要目的是爲了方便數據拆分和集羣擴展。每一個節點會負責必定數量的槽。因爲從一個節點將哈希槽移動到另外一個節點並不會中止服務,因此不管添加刪除或者改變某個節點的哈希槽數量,都不會形成集羣不可用的狀態。
Redis虛擬槽分區的特色:
Redis
集羣相對 單機 在功能上存在一些限制,須要 開發人員 提早了解,在使用時作好規避。
key
批量操做 支持有限。相似 mset
、mget
操做,目前只支持對具備相同 slot
值的 key
執行 批量操做。對於 映射爲不一樣 slot
值的 key
因爲執行 mget
、mget
等操做可能存在於多個節點上,所以不被支持。
key
事務操做 支持有限。只支持 多 key
在 同一節點上 的 事務操做,當多個 key
分佈在 不一樣 的節點上時 沒法 使用事務功能。
key
做爲 數據分區 的最小粒度不能將一個 大的鍵值 對象如 hash
、list
等映射到 不一樣的節點。
單機 下的 Redis
能夠支持 16
個數據庫(db0 ~ db15
),集羣模式 下只能使用 一個 數據庫空間,即 db0
。
從節點 只能複製 主節點,不支持 嵌套樹狀複製 結構。
import org.apache.ibatis.reflection.MetaObject;
import org.apache.ibatis.reflection.SystemMetaObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.*;
import redis.clients.util.JedisClusterCRC16;
import java.util.*;
public class RedisCacheDelegate extends AbstractCache implements CacheManager, Cache {
private static Logger logger = LoggerFactory.getLogger(RedisCacheDelegate.class);
/** * 集羣節點 */
private String clusterNodes;
/** * 重試次數 */
private int maxAttempts;
/** * 超時時間,單位是秒 */
private int timeout;
private JedisCluster jedisCluster;
private JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
private static RedisCacheDelegate redisCacheDelegate = null;
private JedisClusterInfoCache cache;
private final static String WARM_KEY = "warm_key";
private final static String WARM_VALUE = "value";
public static RedisCacheDelegate getInstant(CacheProperties cacheProperties) {
if (redisCacheDelegate == null) {
synchronized (RedisCacheDelegate.class) {
if (redisCacheDelegate == null) {
redisCacheDelegate = new RedisCacheDelegate(cacheProperties.getNodes(), cacheProperties.getTimeout(), cacheProperties.getMaxAttempts());
}
}
}
return redisCacheDelegate;
}
private RedisCacheDelegate(String clusterNodes, int timeout, int maxAttempts) {
this.clusterNodes = clusterNodes;
this.timeout = timeout;
this.maxAttempts = maxAttempts;
init();
}
private JedisPoolConfig getJedisPoolConfig() {
//鏈接最長等待時間,默認是-1
jedisPoolConfig.setMaxWaitMillis(200);
//鏈接池最大數量
jedisPoolConfig.setMaxTotal(50);
//最小閒置個數 閒置超過最小閒置個數但不超過最大閒置個數,則逐步清理閒置直到最小閒置個數
jedisPoolConfig.setMinIdle(10);
//最大閒置個數 閒置超過最大閒置個數則直接殺死超過部分
jedisPoolConfig.setMaxIdle(30);
//鏈接耗盡等待,等待最長{MaxWaitMillis}毫秒
jedisPoolConfig.setBlockWhenExhausted(true);
//是否開啓jmx監控
jedisPoolConfig.setJmxEnabled(true);
//是否開啓空閒資源監測
jedisPoolConfig.setTestWhileIdle(true);
//空閒資源的檢測週期(單位爲毫秒)
jedisPoolConfig.setMinEvictableIdleTimeMillis(60000);
//資源池中資源最小空閒時間(單位爲毫秒),達到此值後空閒資源將被移除
jedisPoolConfig.setTimeBetweenEvictionRunsMillis(30000);
//作空閒資源檢測時,每次的採樣數,若是設置爲-1,就是對全部鏈接作空閒監測
jedisPoolConfig.setNumTestsPerEvictionRun(-1);
return jedisPoolConfig;
}
@Override
public void init() {
String[] serverArray = clusterNodes.split(",");
Set<HostAndPort> nodes = new HashSet<>();
for (String ipPort : serverArray) {
String[] ipPortPair = ipPort.split(":");
nodes.add(new HostAndPort(ipPortPair[0].trim(), Integer.valueOf(ipPortPair[1].trim())));
}
jedisCluster = new JedisCluster(nodes, timeout * 1000, maxAttempts, getJedisPoolConfig());
MetaObject metaObject = SystemMetaObject.forObject(jedisCluster);
cache = (JedisClusterInfoCache) metaObject.getValue("connectionHandler.cache");
warm();
}
/** * warm the jedis pool */
@Override
public void warm() {
set(WARM_KEY, WARM_VALUE, 60);
for (int i = 0; i < jedisPoolConfig.getMinIdle(); i++) {
ttl(WARM_KEY);
}
}
@Override
public void set(String key, String value) {
jedisCluster.set(key, value);
}
@Override
public void set(String key, String value, int expiredTime) {
jedisCluster.setex(key, expiredTime, value);
}
@Override
public void mSet(Map<String, String> data) {
if (data != null && data.size() > 0) {
data.forEach((key, value) -> jedisCluster.set(key, value));
}
}
@Override
public void mSetPipLine(Map<String, String> data) {
setPipLine(data, 0);
}
private void setPipLine(Map<String, String> data, int expiredTime) {
if (data.size() < 1) {
return;
}
//保存地址+端口和命令的映射
Map<JedisPool, Map<String, String>> jedisPoolMap = new HashMap<>();
JedisPool currentJedisPool = null;
for (String key : data.keySet()) {
//計算哈希槽
int crc = JedisClusterCRC16.getSlot(key);
//經過哈希槽獲取節點的鏈接
currentJedisPool = cache.getSlotPool(crc);
if (jedisPoolMap.containsKey(currentJedisPool)) {
jedisPoolMap.get(currentJedisPool).put(key, data.get(key));
} else {
Map<String, String> inner = new HashMap<>();
inner.put(key, data.get(key));
jedisPoolMap.put(currentJedisPool, inner);
}
}
//保存結果
Map<String, String> map = null;
//執行
for (Map.Entry<JedisPool, Map<String, String>> entry : jedisPoolMap.entrySet()) {
try {
currentJedisPool = entry.getKey();
map = entry.getValue();
Jedis jedis = currentJedisPool.getResource();
//獲取pipeline
Pipeline currentPipeline = jedis.pipelined();
// NX是不存在時才set, XX是存在時才set, EX是秒,PX是毫秒
if (expiredTime > 0) {
map.forEach((k, v) -> currentPipeline.setex(k, expiredTime, v));
} else {
map.forEach((k, v) -> currentPipeline.set(k, v));
}
//從pipeline中獲取結果
currentPipeline.sync();
currentPipeline.close();
jedis.close();
} catch (Exception e) {
logger.error("setPipline error.", e);
}
}
}
@Override
public void mSet(Map<String, String> data, int expiredTime) {
if (data != null && data.size() > 0) {
data.forEach((key, value) -> jedisCluster.setex(key, expiredTime, value));
}
}
@Override
public void mSetPipLine(Map<String, String> data, int expiredTime) {
setPipLine(data, expiredTime);
}
@Override
public String get(String key) {
return jedisCluster.get(key);
}
@Override
public List<String> mGet(List<String> keys) {
if (keys.size() < 1) {
return null;
}
List<String> result = new ArrayList<>(keys.size());
for (String key : keys) {
result.add(jedisCluster.get(key));
}
return result;
}
@Override
public List<String> mGetPipLine(List<String> key) {
return getPipLine(key);
}
@Override
public long ttl(String key) {
return jedisCluster.ttl(key);
}
private List<String> getPipLine(List<String> keys) {
if (keys.size() < 1) {
return null;
}
List<String> result = new ArrayList<>(keys.size());
Map<String, String> resultMap = new HashMap<>(keys.size());
if (keys.size() == 1) {
result.add(jedisCluster.get(keys.get(0)));
return result;
}
//保存地址+端口和命令的映射
Map<JedisPool, List<String>> jedisPoolMap = new HashMap<>();
List<String> keyList = null;
JedisPool currentJedisPool = null;
Pipeline currentPipeline = null;
for (String key : keys) {
//cuteculate hash
int crc = JedisClusterCRC16.getSlot(key);
//經過哈希槽獲取節點的鏈接
currentJedisPool = cache.getSlotPool(crc);
if (jedisPoolMap.containsKey(currentJedisPool)) {
jedisPoolMap.get(currentJedisPool).add(key);
} else {
keyList = new ArrayList<>();
keyList.add(key);
jedisPoolMap.put(currentJedisPool, keyList);
}
}
//保存結果
List<Object> res;
//執行
for (Map.Entry<JedisPool, List<String>> entry : jedisPoolMap.entrySet()) {
try {
currentJedisPool = entry.getKey();
keyList = entry.getValue();
//獲取pipeline
Jedis jedis = currentJedisPool.getResource();
currentPipeline = jedis.pipelined();
for (String key : keyList) {
currentPipeline.get(key);
}
//從pipeline中獲取結果
res = currentPipeline.syncAndReturnAll();
currentPipeline.close();
jedis.close();
for (int i = 0; i < keyList.size(); i++) {
resultMap.put(keyList.get(i), res.get(i) == null ? null : res.get(i).toString());
}
} catch (Exception e) {
logger.error("getPipLine error.", e);
}
}
//sort
for (String key : keys) {
result.add(resultMap.containsKey(key) ? resultMap.get(key) : null);
}
return result;
}
@Override
public void destroy() {
try {
jedisCluster.close();
} catch (Exception e) {
}
}
}
複製代碼