文章很長,並且持續更新,建議收藏起來,慢慢讀! Java 高併發 發燒友社羣:瘋狂創客圈(總入口) 奉上如下珍貴的學習資源:javascript
入大廠 、作架構、大力提高Java 內功 必備的精彩博文 | 2021 秋招漲薪1W + 必備的精彩博文 |
---|---|
1:Redis 分佈式鎖 (圖解-秒懂-史上最全) | 2:Zookeeper 分佈式鎖 (圖解-秒懂-史上最全) |
3: Redis與MySQL雙寫一致性如何保證? (面試必備) | 4: 面試必備:秒殺超賣 解決方案 (史上最全) |
5:面試必備之:Reactor模式 | 6: 10分鐘看懂, Java NIO 底層原理 |
7:TCP/IP(圖解+秒懂+史上最全) | 8:Feign原理 (圖解) |
9:DNS圖解(秒懂 + 史上最全 + 高薪必備) | 10:CDN圖解(秒懂 + 史上最全 + 高薪必備) |
10: 分佈式事務( 圖解 + 史上最全 + 吐血推薦 ) |
Java 面試題 30個專題 , 史上最全 , 面試必刷 | 阿里、京東、美團... 隨意挑、橫着走!!! |
---|---|
1: JVM面試題(史上最強、持續更新、吐血推薦) | 2:Java基礎面試題(史上最全、持續更新、吐血推薦 |
3:架構設計面試題 (史上最全、持續更新、吐血推薦) | 4:設計模式面試題 (史上最全、持續更新、吐血推薦) |
1七、分佈式事務面試題 (史上最全、持續更新、吐血推薦) | 一致性協議 (史上最全) |
2九、多線程面試題(史上最全) | 30、HR面經,過五關斬六將後,當心陰溝翻船! |
9.網絡協議面試題(史上最全、持續更新、吐血推薦) | 更多專題, 請參見【 瘋狂創客圈 高併發 總目錄 】 |
SpringCloud 精彩博文 | |
---|---|
nacos 實戰(史上最全) | sentinel (史上最全+入門教程) |
SpringCloud gateway (史上最全) | 更多專題, 請參見【 瘋狂創客圈 高併發 總目錄 】 |
在單體的應用開發場景中,在多線程的環境下,涉及併發同步的時候,爲了保證一個代碼塊在同一時間只能由一個線程訪問,咱們通常能夠使用synchronized語法和ReetrantLock去保證,這其實是本地鎖的方式。html
也就是說,在同一個JVM內部,你們每每採用synchronized或者Lock的方式來解決多線程間的安全問題。但在分佈式集羣工做的開發場景中,在JVM之間,那麼就須要一種更加高級的鎖機制,來處理種跨JVM進程之間的線程安全問題.java
解決方案是:使用分佈式鎖node
總之,對於分佈式場景,咱們能夠使用分佈式鎖,它是控制分佈式系統之間互斥訪問共享資源的一種方式。mysql
好比說在一個分佈式系統中,多臺機器上部署了多個服務,當客戶端一個用戶發起一個數據插入請求時,若是沒有分佈式鎖機制保證,那麼那多臺機器上的多個服務可能進行併發插入操做,致使數據重複插入,對於某些不容許有多餘數據的業務來講,這就會形成問題。而分佈式鎖機制就是爲了解決相似這類問題,保證多個服務之間互斥的訪問共享資源,若是一個服務搶佔了分佈式鎖,其餘服務沒獲取到鎖,就不進行後續操做。git
大體意思以下圖所示(不必定準確):程序員
何爲分佈式鎖?github
分佈式鎖的條件:面試
分佈式鎖的實現:redis
分佈式鎖的實現由不少種,文件鎖、數據庫、redis等等,比較多;分佈式鎖常見的多種實現方式:
在實踐中,仍是redis作分佈式鎖性能會高一些
所謂悲觀鎖,悲觀鎖是對數據被的修改持悲觀態度(認爲數據在被修改的時候必定會存在併發問題),所以在整個數據處理過程當中將數據鎖定。
悲觀鎖的實現,每每依靠數據庫提供的鎖機制(也只有數據庫層提供的鎖機制才能真正保證數據訪問的排他性,不然,即便在應用層中實現了加鎖機制,也沒法保證外部系統不會修改數據)。
數據庫的行鎖、表鎖、排他鎖等都是悲觀鎖,這裏以行鎖爲例,進行介紹。以咱們經常使用的MySQL爲例,咱們經過使用select...for update語句, 執行該語句後,會在表上加持行鎖,一直到事務提交,解除行鎖。
使用場景舉例:
在秒殺案例中,生成訂單和扣減庫存的操做,能夠經過商品記錄的行鎖,進行保護。們經過使用select...for update語句,在查詢商品表庫存時將該條記錄加鎖,待下單減庫存完成後,再釋放鎖。
示例的SQL以下:
//0.開始事務 begin; //1.查詢出商品信息 select stockCount from seckill_good where id=1 for update; //2.根據商品信息生成訂單 insert into seckill_order (id,good_id) values (null,1); //3.修改商品stockCount減一 update seckill_good set stockCount=stockCount-1 where id=1; //4.提交事務 commit;
以上,在對id = 1的記錄修改前,先經過for update的方式進行加鎖,而後再進行修改。這就是比較典型的悲觀鎖策略。
若是以上修改庫存的代碼發生併發,同一時間只有一個線程能夠開啓事務並得到id=1的鎖,其它的事務必須等本次事務提交以後才能執行。這樣咱們能夠保證當前的數據不會被其它事務修改。
咱們使用select_for_update,另一定要寫在事務中.
注意:要使用悲觀鎖,咱們必須關閉mysql數據庫中自動提交的屬性,命令set autocommit=0;便可關閉,由於MySQL默認使用autocommit模式,也就是說,當你執行一個更新操做後,MySQL會馬上將結果進行提交。
悲觀鎖的實現,每每依靠數據庫提供的鎖機制。在數據庫中,悲觀鎖的流程以下:
使用樂觀鎖就不須要藉助數據庫的鎖機制了。
樂觀鎖的概念中其實已經闡述了他的具體實現細節:主要就是兩個步驟:衝突檢測和數據更新。其實現方式有一種比較典型的就是Compare and Swap(CAS)技術。
CAS是項樂觀鎖技術,當多個線程嘗試使用CAS同時更新同一個變量時,只有其中一個線程能更新變量的值,而其它線程都失敗,失敗的線程並不會被掛起,而是被告知此次競爭中失敗,並能夠再次嘗試。
CAS的實現中,在表中增長一個version字段,操做前先查詢version信息,在數據提交時檢查version字段是否被修改,若是沒有被修改則進行提交,不然認爲是過時數據。
好比前面的扣減庫存問題,經過樂觀鎖能夠實現以下:
//1.查詢出商品信息 select stockCount, version from seckill_good where id=1; //2.根據商品信息生成訂單 insert into seckill_order (id,good_id) values (null,1); //3.修改商品庫存 update seckill_good set stockCount=stockCount-1, version = version+1 where id=1, version=version;
以上,咱們在更新以前,先查詢一下庫存表中當前版本(version),而後在作update的時候,以version 做爲一個修改條件。
當咱們提交更新的時候,判斷數據庫表對應記錄的當前version與第一次取出來的version進行比對,若是數據庫表當前version與第一次取出來的version相等,則予以更新,不然認爲是過時數據。
CAS 樂觀鎖有兩個問題:
(1) CAS 存在一個比較重要的問題,即ABA問題. 解決的辦法是version字段順序遞增。
(2) 樂觀鎖的方式,在高併發時,只有一個線程能執行成功,會形成大量的失敗,這給用戶的體驗顯然是很很差的。
除了在數據庫層面加分佈式鎖,一般還能夠使用如下更高性能、更高可用的分佈式鎖:
有關zookeeper分佈式鎖的原理和實現,具體請參見下面的博客:
Zookeeper 分佈式鎖 (圖解+秒懂+史上最全)
或者閱讀筆者的《Java高併發核心編程(卷1)》
本文重點介紹Redis分佈式鎖,分爲兩個維度進行介紹:
(1)基於Jedis手工造輪子分佈式鎖
(2)介紹Redission 分佈式鎖的使用和原理。
咱們首先講解 Jedis 普通分佈式鎖實現,而且是純手工的模式,從最爲基礎的Redis命令開始。
只有充分了解與分佈式鎖相關的普通Redis命令,才能更好的瞭解高級的Redis分佈式鎖的實現,由於高級的分佈式鎖的實現徹底基於普通Redis命令。
Redis發展到如今,幾種常見的部署架構有:
從分佈式鎖的角度來講, 不管是單機模式、主從模式、哨兵模式、集羣模式,其原理都是類同的。 只是主從模式、哨兵模式、集羣模式的更加的高可用、或者更加高併發。
因此,接下來先基於單機模式,基於Jedis手工造輪子實現本身的分佈式鎖。
Redis分佈式鎖機制,主要藉助setnx和expire兩個命令完成。
setnx命令:
SETNX 是SET if Not eXists的簡寫。將 key 的值設爲 value,當且僅當 key 不存在; 若給定的 key 已經存在,則 SETNX 不作任何動做。
下面爲客戶端使用示例:
127.0.0.1:6379> set lock "unlock" OK 127.0.0.1:6379> setnx lock "unlock" (integer) 0 127.0.0.1:6379> setnx lock "lock" (integer) 0 127.0.0.1:6379>
expire命令:
expire命令爲 key 設置生存時間,當 key 過時時(生存時間爲 0 ),它會被自動刪除. 其格式爲:
EXPIRE key seconds
下面爲客戶端使用示例:
127.0.0.1:6379> expire lock 10 (integer) 1 127.0.0.1:6379> ttl lock 8
經過Redis的setnx、expire命令能夠實現簡單的鎖機制:
線程調用setnx方法成功返回1認爲加鎖成功,其餘線程要等到當前線程業務操做完成釋放鎖後,才能再次調用setnx加鎖成功。
以上簡單redis分佈式鎖的問題:
若是出現了這麼一個問題:若是setnx
是成功的,可是expire
設置失敗,一旦出現了釋放鎖失敗,或者沒有手工釋放,那麼這個鎖永遠被佔用,其餘線程永遠也搶不到鎖。
因此,須要保障setnx和expire兩個操做的原子性,要麼所有執行,要麼所有不執行,兩者不能分開。
解決的辦法有兩種:
使用set的命令時,同時設置過時時間的示例以下:
127.0.0.1:6379> set unlock "234" EX 100 NX (nil) 127.0.0.1:6379> 127.0.0.1:6379> set test "111" EX 100 NX OK
這樣就完美的解決了分佈式鎖的原子性; set 命令的完整格式:
set key value [EX seconds] [PX milliseconds] [NX|XX]
EX seconds:設置失效時長,單位秒 PX milliseconds:設置失效時長,單位毫秒 NX:key不存在時設置value,成功返回OK,失敗返回(nil) XX:key存在時設置value,成功返回OK,失敗返回(nil)
使用set命令實現加鎖操做,先展現加鎖的簡單代碼實習,再帶你們慢慢解釋爲何這樣實現。
加鎖的簡單代碼實現
package com.crazymaker.springcloud.standard.lock; @Slf4j @Data @AllArgsConstructor public class JedisCommandLock { private RedisTemplate redisTemplate; private static final String LOCK_SUCCESS = "OK"; private static final String SET_IF_NOT_EXIST = "NX"; private static final String SET_WITH_EXPIRE_TIME = "PX"; /** * 嘗試獲取分佈式鎖 * @param jedis Redis客戶端 * @param lockKey 鎖 * @param requestId 請求標識 * @param expireTime 超期時間 * @return 是否獲取成功 */ public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) { String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); if (LOCK_SUCCESS.equals(result)) { return true; } return false; } }
能夠看到,咱們加鎖用到了Jedis的set Api:
jedis.set(String key, String value, String nxxx, String expx, int time)
這個set()方法一共有五個形參:
第一個爲key,咱們使用key來當鎖,由於key是惟一的。
第二個爲value,咱們傳的是requestId,不少童鞋可能不明白,有key做爲鎖不就夠了嗎,爲何還要用到value?緣由就是咱們在上面講到可靠性時,分佈式鎖要知足第四個條件解鈴還須繫鈴人,經過給value賦值爲requestId,咱們就知道這把鎖是哪一個請求加的了,在解鎖的時候就能夠有依據。
requestId能夠使用
UUID.randomUUID().toString()
方法生成。
第三個爲nxxx,這個參數咱們填的是NX,意思是SET IF NOT EXIST,即當key不存在時,咱們進行set操做;若key已經存在,則不作任何操做;
第四個爲expx,這個參數咱們傳的是PX,意思是咱們要給這個key加一個過時的設置,具體時間由第五個參數決定。
第五個爲time,與第四個參數相呼應,表明key的過時時間。
總的來講,執行上面的set()方法就只會致使兩種結果:
- 當前沒有鎖(key不存在),那麼就進行加鎖操做,並對鎖設置個有效期,同時value表示加鎖的客戶端。
- 已有鎖存在,不作任何操做。
心細的童鞋就會發現了,咱們的加鎖代碼知足前面描述的四個條件中的三個。
首先,set()加入了NX參數,能夠保證若是已有key存在,則函數不會調用成功,也就是隻有一個客戶端能持有鎖,知足互斥性。
其次,因爲咱們對鎖設置了過時時間,即便鎖的持有者後續發生崩潰而沒有解鎖,鎖也會由於到了過時時間而自動解鎖(即key被刪除),不會被永遠佔用(而發生死鎖)。
最後,由於咱們將value賦值爲requestId,表明加鎖的客戶端請求標識,那麼在客戶端在解鎖的時候就能夠進行校驗是不是同一個客戶端。
因爲咱們只考慮Redis單機部署的場景,因此容錯性咱們暫不考慮。
仍是先展現代碼,再帶你們慢慢解釋爲何這樣實現。
解鎖的簡單代碼實現:
package com.crazymaker.springcloud.standard.lock; @Slf4j @Data @AllArgsConstructor public class JedisCommandLock { private static final Long RELEASE_SUCCESS = 1L; /** * 釋放分佈式鎖 * @param jedis Redis客戶端 * @param lockKey 鎖 * @param requestId 請求標識 * @return 是否釋放成功 */ public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) { String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId)); if (RELEASE_SUCCESS.equals(result)) { return true; } return false; } }
那麼這段Lua代碼的功能是什麼呢?
其實很簡單,首先獲取鎖對應的value值,檢查是否與requestId相等,若是相等則刪除鎖(解鎖)。
第一行代碼,咱們寫了一個簡單的Lua腳本代碼。
第二行代碼,咱們將Lua代碼傳到jedis.eval()
方法裏,並使參數KEYS[1]賦值爲lockKey,ARGV[1]賦值爲requestId。eval()方法是將Lua代碼交給Redis服務端執行。
那麼爲何要使用Lua語言來實現呢?
由於要確保上述操做是原子性的。那麼爲何執行eval()方法能夠確保原子性,源於Redis的特性.
簡單來講,就是在eval命令執行Lua代碼的時候,Lua代碼將被當成一個命令去執行,而且直到eval命令執行完成,Redis纔會執行其餘命
錯誤示例1
最多見的解鎖代碼就是直接使用 jedis.del() 方法刪除鎖,這種不先判斷鎖的擁有者而直接解鎖的方式,會致使任何客戶端均可以隨時進行解鎖,即便這把鎖不是它的。
public static void wrongReleaseLock1(Jedis jedis, String lockKey) { jedis.del(lockKey); }
錯誤示例2
這種解鎖代碼乍一看也是沒問題,甚至我以前也差點這樣實現,與正確姿式差很少,惟一區別的是分紅兩條命令去執行,代碼以下:
public static void wrongReleaseLock2(Jedis jedis, String lockKey, String requestId) { // 判斷加鎖與解鎖是否是同一個客戶端 if (requestId.equals(jedis.get(lockKey))) { // 若在此時,這把鎖忽然不是這個客戶端的,則會誤解鎖 jedis.del(lockKey); } }
前面提到,在redis中執行lua腳本,有以下的好處:
那麼爲何要使用Lua語言來實現呢?
由於要確保上述操做是原子性的。那麼爲何執行eval()方法能夠確保原子性,源於Redis的特性.
簡單來講,就是在eval命令執行Lua代碼的時候,Lua代碼將被當成一個命令去執行,而且直到eval命令執行完成,Redis纔會執行其餘命
因此:
大部分的開源框架(如 redission)中的分佈式鎖組件,都是用純lua腳本實現的。
題外話: lua腳本是高併發、高性能的必備腳本語言
有關lua的詳細介紹,請參見如下書籍:
那麼,咱們也來模擬一下
加鎖和刪除鎖的操做,使用純lua進行封裝,保障其執行時候的原子性。
基於純Lua腳本實現分佈式鎖的執行流程,大體以下:
--- -1 failed --- 1 success --- local key = KEYS[1] local requestId = KEYS[2] local ttl = tonumber(KEYS[3]) local result = redis.call('setnx', key, requestId) if result == 1 then --PEXPIRE:以毫秒的形式指定過時時間 redis.call('pexpire', key, ttl) else result = -1; -- 若是value相同,則認爲是同一個線程的請求,則認爲重入鎖 local value = redis.call('get', key) if (value == requestId) then result = 1; redis.call('pexpire', key, ttl) end end -- 若是獲取鎖成功,則返回 1 return result
--- -1 failed --- 1 success -- unlock key local key = KEYS[1] local requestId = KEYS[2] local value = redis.call('get', key) if value == requestId then redis.call('del', key); return 1; end return -1
下一步,實現Lock接口, 完成JedisLock的分佈式鎖。
其加鎖操做,經過調用 lock.lua腳本完成,代碼以下:
package com.crazymaker.springcloud.standard.lock; import com.crazymaker.springcloud.common.exception.BusinessException; import com.crazymaker.springcloud.common.util.ThreadUtil; import lombok.AllArgsConstructor; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.script.RedisScript; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @Slf4j @Data @AllArgsConstructor public class JedisLock implements Lock { private RedisTemplate redisTemplate; RedisScript<Long> lockScript = null; RedisScript<Long> unLockScript = null; public static final int DEFAULT_TIMEOUT = 2000; public static final Long LOCKED = Long.valueOf(1); public static final Long UNLOCKED = Long.valueOf(1); public static final Long WAIT_GAT = Long.valueOf(200); public static final int EXPIRE = 2000; String key; String lockValue; // lockValue 鎖的value ,表明線程的uuid /** * 默認爲2000ms */ long expire = 2000L; public JedisLock(String lockKey, String lockValue) { this.key = lockKey; this.lockValue = lockValue; } private volatile boolean isLocked = false; private Thread thread; /** * 獲取一個分佈式鎖 , 超時則返回失敗 * * @return 獲鎖成功 - true | 獲鎖失敗 - false */ @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { //本地可重入 if (isLocked && thread == Thread.currentThread()) { return true; } expire = unit != null ? unit.toMillis(time) : DEFAULT_TIMEOUT; long startMillis = System.currentTimeMillis(); Long millisToWait = expire; boolean localLocked = false; int turn = 1; while (!localLocked) { localLocked = this.lockInner(expire); if (!localLocked) { millisToWait = millisToWait - (System.currentTimeMillis() - startMillis); startMillis = System.currentTimeMillis(); if (millisToWait > 0L) { /** * 尚未超時 */ ThreadUtil.sleepMilliSeconds(WAIT_GAT); log.info("睡眠一下,從新開始,turn:{},剩餘時間:{}", turn++, millisToWait); } else { log.info("搶鎖超時"); return false; } } else { isLocked = true; localLocked = true; } } return isLocked; } /** * 有返回值的搶奪鎖 * * @param millisToWait */ public boolean lockInner(Long millisToWait) { if (null == key) { return false; } try { List<String> redisKeys = new ArrayList<>(); redisKeys.add(key); redisKeys.add(lockValue); redisKeys.add(String.valueOf(millisToWait)); Long res = (Long) redisTemplate.execute(lockScript, redisKeys); return res != null && res.equals(LOCKED); } catch (Exception e) { e.printStackTrace(); throw BusinessException.builder().errMsg("搶鎖失敗").build(); } } }
其解鎖操做,經過調用unlock.lua腳本完成,代碼以下:
package com.crazymaker.springcloud.standard.lock; import com.crazymaker.springcloud.common.exception.BusinessException; import com.crazymaker.springcloud.common.util.ThreadUtil; import lombok.AllArgsConstructor; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.script.RedisScript; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @Slf4j @Data @AllArgsConstructor public class JedisLock implements Lock { private RedisTemplate redisTemplate; RedisScript<Long> lockScript = null; RedisScript<Long> unLockScript = null; //釋放鎖 @Override public void unlock() { if (key == null || requestId == null) { return; } try { List<String> redisKeys = new ArrayList<>(); redisKeys.add(key); redisKeys.add(requestId); Long res = (Long) redisTemplate.execute(unLockScript, redisKeys); } catch (Exception e) { e.printStackTrace(); throw BusinessException.builder().errMsg("釋放鎖失敗").build(); } } }
編寫個分佈式鎖服務,用於加載lua腳本,建立 分佈式鎖,代碼以下:
package com.crazymaker.springcloud.standard.lock; import com.crazymaker.springcloud.common.util.IOUtil; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.RedisScript; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; @Slf4j @Data public class RedisLockService { private RedisTemplate redisTemplate; static String lockLua = "script/lock.lua"; static String unLockLua = "script/unlock.lua"; static RedisScript<Long> lockScript = null; static RedisScript<Long> unLockScript = null; { String script = IOUtil.loadJarFile(RedisLockService.class.getClassLoader(),lockLua); // String script = FileUtil.readString(lockLua, Charset.forName("UTF-8" )); if(StringUtils.isEmpty(script)) { log.error("lua load failed:"+lockLua); } lockScript = new DefaultRedisScript<>(script, Long.class); // script = FileUtil.readString(unLockLua, Charset.forName("UTF-8" )); script = IOUtil.loadJarFile(RedisLockService.class.getClassLoader(),unLockLua); if(StringUtils.isEmpty(script)) { log.error("lua load failed:"+unLockLua); } unLockScript = new DefaultRedisScript<>(script, Long.class); } public RedisLockService(RedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; } public Lock getLock(String lockKey, String lockValue) { JedisLock lock=new JedisLock(lockKey,lockValue); lock.setRedisTemplate(redisTemplate); lock.setLockScript(lockScript); lock.setUnLockScript(unLockScript); return lock; } }
接下來,終於能夠上測試用例了
package com.crazymaker.springcloud.lock; @Slf4j @RunWith(SpringRunner.class) @SpringBootTest(classes = {DemoCloudApplication.class}) // 指定啓動類 public class RedisLockTest { @Resource RedisLockService redisLockService; private ExecutorService pool = Executors.newFixedThreadPool(10); @Test public void testLock() { int threads = 10; final int[] count = {0}; CountDownLatch countDownLatch = new CountDownLatch(threads); long start = System.currentTimeMillis(); for (int i = 0; i < threads; i++) { pool.submit(() -> { String lockValue = UUID.randomUUID().toString(); try { Lock lock = redisLockService.getLock("test:lock:1", lockValue); boolean locked = lock.tryLock(10, TimeUnit.SECONDS); if (locked) { for (int j = 0; j < 1000; j++) { count[0]++; } log.info("count = " + count[0]); lock.unlock(); } else { System.out.println("搶鎖失敗"); } } catch (Exception e) { e.printStackTrace(); } countDownLatch.countDown(); }); } try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("10個線程每一個累加1000爲: = " + count[0]); //輸出統計結果 float time = System.currentTimeMillis() - start; System.out.println("運行的時長爲(ms):" + time); System.out.println("每一次執行的時長爲(ms):" + time / count[0]); } }
執行用例,結果以下:
2021-05-04 23:02:11.900 INFO 22120 --- [pool-1-thread-7] c.c.springcloud.lock.RedisLockTest LN:50 count = 6000 2021-05-04 23:02:11.901 INFO 22120 --- [pool-1-thread-1] c.c.springcloud.standard.lock.JedisLock LN:81 睡眠一下,從新開始,turn:3,剩餘時間:9585 2021-05-04 23:02:11.902 INFO 22120 --- [pool-1-thread-1] c.c.springcloud.lock.RedisLockTest LN:50 count = 7000 2021-05-04 23:02:12.100 INFO 22120 --- [pool-1-thread-4] c.c.springcloud.standard.lock.JedisLock LN:81 睡眠一下,從新開始,turn:3,剩餘時間:9586 2021-05-04 23:02:12.101 INFO 22120 --- [pool-1-thread-5] c.c.springcloud.standard.lock.JedisLock LN:81 睡眠一下,從新開始,turn:3,剩餘時間:9585 2021-05-04 23:02:12.101 INFO 22120 --- [pool-1-thread-8] c.c.springcloud.standard.lock.JedisLock LN:81 睡眠一下,從新開始,turn:3,剩餘時間:9585 2021-05-04 23:02:12.101 INFO 22120 --- [pool-1-thread-4] c.c.springcloud.lock.RedisLockTest LN:50 count = 8000 2021-05-04 23:02:12.102 INFO 22120 --- [pool-1-thread-8] c.c.springcloud.lock.RedisLockTest LN:50 count = 9000 2021-05-04 23:02:12.304 INFO 22120 --- [pool-1-thread-5] c.c.springcloud.standard.lock.JedisLock LN:81 睡眠一下,從新開始,turn:4,剩餘時間:9383 2021-05-04 23:02:12.307 INFO 22120 --- [pool-1-thread-5] c.c.springcloud.lock.RedisLockTest LN:50 count = 10000 10個線程每一個累加1000爲: = 10000 運行的時長爲(ms):827.0 每一次執行的時長爲(ms):0.0827
下面有一個簡單的使用鎖的例子,在10秒內佔着鎖:
//寫數據到文件 function writeData(filename, data) { boolean locked = lock.tryLock(10, TimeUnit.SECONDS); if (!locked) { throw 'Failed to acquire lock'; } try { //將數據寫到文件 var file = storage.readFile(filename); var updated = updateContents(file, data); storage.writeFile(filename, updated); } finally { lock.unlock(); } }
問題是:若是在寫文件過程當中,發生了 fullGC,而且其時間跨度較長, 超過了10秒, 那麼,分佈式就自動釋放了。
在此過程當中,client2 搶到鎖,寫了文件。
client1 的fullGC完成後,也繼續寫文件,注意,此時client1 的並無佔用鎖,此時寫入會致使文件數據錯亂,發生線程安全問題。
這就是STW致使的鎖過時問題。
STW致使的鎖過時問題,具體以下圖所示:
STW致使的鎖過時問題,大概的解決方案,有:
1: 模擬CAS樂觀鎖的方式,增長版本號
2:watch dog自動延期機制
1: 模擬CAS樂觀鎖的方式,增長版本號(以下圖中的token)
此方案若是要實現,須要調整業務邏輯,與之配合,因此會入侵代碼。
2:watch dog自動延期機制
客戶端1加鎖的鎖key默認生存時間才30秒,若是超過了30秒,客戶端1還想一直持有這把鎖,怎麼辦呢?
簡單!只要客戶端1一旦加鎖成功,就會啓動一個watch dog看門狗,他是一個後臺線程,會每隔10秒檢查一下,若是客戶端1還持有鎖key,那麼就會不斷的延長鎖key的生存時間。
redission,採用的就是這種方案, 此方案不會入侵業務代碼。
做爲 Java 開發人員,咱們若想在程序中集成 Redis,必須使用 Redis 的第三方庫。目前你們使用的最多的第三方庫是jedis。
和SpringCloud gateway同樣,Redisson也是基於Netty實現的,是更高性能的第三方庫。 因此,這裏推薦你們使用Redission替代 jedis。
在使用Redission以前,建議你們先掌握Netty的知識。
推薦你們閱讀被不少小夥伴評價爲史上最爲易懂的NIO、Netty書籍:《Java高併發核心編程(卷1)》
Redisson是一個在Redis的基礎上實現的Java駐內存數據網格(In-Memory Data Grid)。它不只提供了一系列的分佈式的Java經常使用對象,還實現了可重入鎖(Reentrant Lock)、公平鎖(Fair Lock、聯鎖(MultiLock)、 紅鎖(RedLock)、 讀寫鎖(ReadWriteLock)等,還提供了許多分佈式服務。
Redisson提供了使用Redis的最簡單和最便捷的方法。Redisson的宗旨是促進使用者對Redis的關注分離(Separation of Concern),從而讓使用者可以將精力更集中地放在處理業務邏輯上。
1.概況對比
Jedis是Redis的java實現的客戶端,其API提供了比較全面的的Redis命令的支持,Redisson實現了分佈式和可擴展的的java數據結構,和Jedis相比,功能較爲簡單,不支持字符串操做,不支持排序,事物,管道,分區等Redis特性。Redisson的宗旨是促進使用者對Redis的關注分離,從而讓使用者可以將精力更集中的放在處理業務邏輯上。
2.可伸縮性
Jedis使用阻塞的I/O,且其方法調用都是同步的,程序流程要等到sockets處理完I/O才能執行,不支持異步,Jedis客戶端實例不是線程安全的,因此須要經過鏈接池來使用Jedis。
Redisson使用非阻塞的I/O和基於Netty框架的事件驅動的通訊層,其方法調用時異步的。Redisson的API是線程安全的,因此操做單個Redisson鏈接來完成各類操做。
3.第三方框架整合
Redisson在Redis的基礎上實現了java緩存標準規範;Redisson還提供了Spring Session回話管理器的實現。
github: https://github.com/redisson/redisson#quick-start
支持 Redis 單節點(single)模式、哨兵(sentinel)模式、主從(Master/Slave)模式以及集羣(Redis Cluster)模式
程序接口調用方式採用異步執行和異步流執行兩種方式
數據序列化,Redisson 的對象編碼類是用於將對象進行序列化和反序列化,以實現對該對象在 Redis 裏的讀取和存儲
單個集合數據分片,在集羣模式下,Redisson 爲單個 Redis 集合類型提供了自動分片的功能
提供多種分佈式對象,如:Object Bucket,Bitset,AtomicLong,Bloom Filter 和 HyperLogLog 等
提供豐富的分佈式集合,如:Map,Multimap,Set,SortedSet,List,Deque,Queue 等
分佈式鎖和同步器的實現,可重入鎖(Reentrant Lock),公平鎖(Fair Lock),聯鎖(MultiLock),紅鎖(Red Lock),信號量(Semaphonre),可過時性信號鎖(PermitExpirableSemaphore)等
提供先進的分佈式服務,如分佈式遠程服務(Remote Service),分佈式實時對象(Live Object)服務,分佈式執行服務(Executor Service),分佈式調度任務服務(Schedule Service)和分佈式映射概括服務(MapReduce)
安裝 Redisson 最便捷的方法是使用 Maven 或者 Gradle:
•Maven
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.11.4</version> </dependency>
•Gradle
compile group: 'org.redisson', name: 'redisson', version: '3.11.4'
目前 Redisson 最新版是 3.11.4,固然你也能夠經過搜索 Maven 中央倉庫 mvnrepository[1] 來找到 Redisson 的各類版本。
RedissonClient有多種模式,主要的模式有:
單節點模式
哨兵模式
主從模式
集羣模式
首先介紹單節點模式。
單節點模式的程序化配置方法,大體以下:
Config config = new Config(); config.useSingleServer().setAddress("redis://myredisserver:6379"); RedissonClient redisson = Redisson.create(config);xxxxxxxxxx Config config = new Config();config.useSingleServer().setAddress("redis://myredisserver:6379");RedissonClient redisson = Redisson.create(config);// connects to 127.0.0.1:6379 by defaultRedissonClient redisson = Redisson.create();
SingleServerConfig singleConfig = config.useSingleServer();
SingleServerConfig類的設置參數以下:
address(節點地址)
能夠經過
host:port
的格式來指定節點地址。subscriptionConnectionMinimumIdleSize(發佈和訂閱鏈接的最小空閒鏈接數)
默認值:
1
用於發佈和訂閱鏈接的最小保持鏈接數(長鏈接)。Redisson內部常常經過發佈和訂閱來實現許多功能。長期保持必定數量的發佈訂閱鏈接是必須的。
subscriptionConnectionPoolSize(發佈和訂閱鏈接池大小)
默認值:
50
用於發佈和訂閱鏈接的鏈接池最大容量。鏈接池的鏈接數量自動彈性伸縮。
connectionMinimumIdleSize(最小空閒鏈接數)
默認值:
32
最小保持鏈接數(長鏈接)。長期保持必定數量的鏈接有利於提升瞬時寫入反應速度。
connectionPoolSize(鏈接池大小)
默認值:
64
鏈接池最大容量。鏈接池的鏈接數量自動彈性伸縮。
dnsMonitoring(是否啓用DNS監測)
默認值:
false
在啓用該功能之後,Redisson將會監測DNS的變化狀況。
dnsMonitoringInterval(DNS監測時間間隔,單位:毫秒)
默認值:
5000
監測DNS的變化狀況的時間間隔。
idleConnectionTimeout(鏈接空閒超時,單位:毫秒)
默認值:
10000
若是當前鏈接池裏的鏈接數量超過了最小空閒鏈接數,而同時有鏈接空閒時間超過了該數值,那麼這些鏈接將會自動被關閉,並從鏈接池裏去掉。時間單位是毫秒。
connectTimeout(鏈接超時,單位:毫秒)
默認值:
10000
同節點創建鏈接時的等待超時。時間單位是毫秒。
timeout(命令等待超時,單位:毫秒)
默認值:
3000
等待節點回覆命令的時間。該時間從命令發送成功時開始計時。
retryAttempts(命令失敗重試次數)
默認值:
3
若是嘗試達到 retryAttempts(命令失敗重試次數) 仍然不能將命令發送至某個指定的節點時,將拋出錯誤。若是嘗試在此限制以內發送成功,則開始啓用 timeout(命令等待超時) 計時。
retryInterval(命令重試發送時間間隔,單位:毫秒)
默認值:
1500
在一條命令發送失敗之後,等待重試發送的時間間隔。時間單位是毫秒。
reconnectionTimeout(從新鏈接時間間隔,單位:毫秒)
默認值:
3000
當與某個節點的鏈接斷開時,等待與其從新創建鏈接的時間間隔。時間單位是毫秒。
failedAttempts(執行失敗最大次數)
默認值:
3
在某個節點執行相同或不一樣命令時,連續 失敗 failedAttempts(執行失敗最大次數) 時,該節點將被從可用節點列表裏清除,直到 reconnectionTimeout(從新鏈接時間間隔) 超時之後再次嘗試。
database(數據庫編號)
默認值:
0
嘗試鏈接的數據庫編號。
password(密碼)
默認值:
null
用於節點身份驗證的密碼。
subscriptionsPerConnection(單個鏈接最大訂閱數量)
默認值:
5
每一個鏈接的最大訂閱數量。
clientName(客戶端名稱)
默認值:
null
在Redis節點裏顯示的客戶端名稱。
sslEnableEndpointIdentification(啓用SSL終端識別)
默認值:
true
開啓SSL終端識別能力。
sslProvider(SSL實現方式)
默認值:
JDK
肯定採用哪一種方式(JDK或OPENSSL)來實現SSL鏈接。
sslTruststore(SSL信任證書庫路徑)
默認值:
null
指定SSL信任證書庫的路徑。
sslTruststorePassword(SSL信任證書庫密碼)
默認值:
null
指定SSL信任證書庫的密碼。
sslKeystore(SSL鑰匙庫路徑)
默認值:
null
指定SSL鑰匙庫的路徑。
sslKeystorePassword(SSL鑰匙庫密碼)
默認值:
null
指定SSL鑰匙庫的密碼。
Redisson有多種模式,首先介紹單機模式的整合。
<!-- redisson-springboot --> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.11.4</version> </dependency>
spring: redis: host: 127.0.0.1 port: 6379 database: 0 timeout: 5000
RedissonConfig.java
import org.redisson.Redisson; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.data.redis.RedisProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RedissonConfig { @Autowired private RedisProperties redisProperties; @Bean public RedissonClient redissonClient() { Config config = new Config(); String redisUrl = String.format("redis://%s:%s", redisProperties.getHost() + "", redisProperties.getPort() + ""); config.useSingleServer().setAddress(redisUrl).setPassword(redisProperties.getPassword()); config.useSingleServer().setDatabase(3); return Redisson.create(config); } }
因爲redission能夠有多種模式,處於學習的目的,將多種模式封裝成一個start,能夠學習一下starter的製做。
封裝一個RedissonManager,經過策略模式,根據不一樣的配置類型,建立 RedissionConfig實例,而後建立RedissionClient對象。
Redission模擬了Java的面向對象編程思想,能夠簡單理解爲一切皆爲對象。
每個 Redisson 對象 實現了RObject and RExpirable 兩個interfaces.
Usage example:
RObject object = redisson.get...() object.sizeInMemory(); object.delete(); object.rename("newname"); object.isExists(); // catch expired event object.addListener(new ExpiredObjectListener() { ... }); // catch delete event object.addListener(new DeletedObjectListener() { ... });
每個Redisson 對象的名字,就是 Redis中的 Key.
RMap map = redisson.getMap("mymap"); map.getName(); // = mymap
能夠經過 RKeys 接口操做Redis中的keys.
Usage example:
RKeys keys = redisson.getKeys(); Iterable<String> allKeys = keys.getKeys(); Iterable<String> foundedKeys = keys.getKeysByPattern('key*'); long numOfDeletedKeys = keys.delete("obj1", "obj2", "obj3"); long deletedKeysAmount = keys.deleteByPattern("test?"); String randomKey = keys.randomKey(); long keysAmount = keys.count(); keys.flushall(); keys.flushdb();
Redisson經過RBucket接口表明能夠訪問任何類型的基礎對象,或者普通對象。
RBucket有一系列的工具方法,如compareAndSet(),get(),getAndDelete(),getAndSet(),set(),size(),trySet()等等,用於設值/取值/獲取尺寸。
RBucket普通對象的最大大小,爲512兆字節。
RBucket<AnyObject> bucket = redisson.getBucket("anyObject"); bucket.set(new AnyObject(1)); AnyObject obj = bucket.get(); bucket.trySet(new AnyObject(3)); bucket.compareAndSet(new AnyObject(4), new AnyObject(5)); bucket.getAndSet(new AnyObject(6));
下面是一個完整的實例:
public class RedissionTest { @Resource RedissonManager redissonManager; @Test public void testRBucketExamples() { // 默認鏈接上 127.0.0.1:6379 RedissonClient client = redissonManager.getRedisson(); // RList 繼承了 java.util.List 接口 RBucket<String> rstring = client.getBucket("redission:test:bucket:string"); rstring.set("this is a string"); RBucket<UserDTO> ruser = client.getBucket("redission:test:bucket:user"); UserDTO dto = new UserDTO(); dto.setToken(UUID.randomUUID().toString()); ruser.set(dto); System.out.println("string is: " + rstring.get()); System.out.println("dto is: " + ruser.get()); client.shutdown(); } }
運行上面的代碼時,能夠得到如下輸出:
string is: this is a string dto is: UserDTO(id=null, userId=null, username=null, password=null, nickname=null, token=183b6eeb-65a8-4b2a-80c6-cf17c08332ce, createTime=null, updateTime=null, headImgUrl=null, mobile=null, sex=null, enabled=null, type=null, openId=null, isDel=false)
下面的代碼簡單演示瞭如何在 Redisson 中使用 RList
對象。RList
是 Java 的 List 集合的分佈式併發實現。
考慮如下代碼:
public class RedissionTest { @Resource RedissonManager redissonManager; @Test public void testListExamples() { // 默認鏈接上 127.0.0.1:6379 RedissonClient client = redissonManager.getRedisson(); // RList 繼承了 java.util.List 接口 RList<String> nameList = client.getList("redission:test:nameList"); nameList.clear(); nameList.add("張三"); nameList.add("李四"); nameList.add("王五"); nameList.remove(-1); System.out.println("List size: " + nameList.size()); boolean contains = nameList.contains("李四"); System.out.println("Is list contains name '李四': " + contains); nameList.forEach(System.out::println); client.shutdown(); } }
運行上面的代碼時,能夠得到如下輸出:
List size: 2 Is list contains name '李四': true 張三 李四
Redisson 還包括 RMap,它是 Java Map 集合的分佈式併發實現,考慮如下代碼:
public class RedissionTest { @Resource RedissonManager redissonManager; @Test public void testListExamples() { // 默認鏈接上 127.0.0.1:6379 RedissonClient client = redissonManager.getRedisson(); // RMap 繼承了 java.util.concurrent.ConcurrentMap 接口 RMap<String, Object> map = client.getMap("redission:test:personalMap"); map.put("name", "張三"); map.put("address", "北京"); map.put("age", new Integer(50)); System.out.println("Map size: " + map.size()); boolean contains = map.containsKey("age"); System.out.println("Is map contains key 'age': " + contains); String value = String.valueOf(map.get("name")); System.out.println("Value mapped by key 'name': " + value); client.shutdown(); } }
運行上面的代碼時,將會看到如下輸出:
Map size: 3 Is map contains key 'age': true Value mapped by key 'name': 張三
Lua是一種開源、簡單易學、輕量小巧的腳本語言,用標準C語言編寫。
其設計的目的就是爲了嵌入應用程序中,從而爲應用程序提供靈活的擴展和定製功能。
Redis從2.6版本開始支持Lua腳本,Redis使用Lua能夠:
public class RedissionTest { @Resource RedissonManager redissonManager; @Test public void testLuaExamples() { // 默認鏈接上 127.0.0.1:6379 RedissonClient redisson = redissonManager.getRedisson(); redisson.getBucket("redission:test:foo").set("bar"); String r = redisson.getScript().eval(RScript.Mode.READ_ONLY, "return redis.call('get', 'redission:test:foo')", RScript.ReturnType.VALUE); System.out.println("foo: " + r); // 經過預存的腳本進行一樣的操做 RScript s = redisson.getScript(); // 首先將腳本加載到Redis String sha1 = s.scriptLoad("return redis.call('get', 'redission:test:foo')"); // 返回值 res == 282297a0228f48cd3fc6a55de6316f31422f5d17 System.out.println("sha1: " + sha1); // 再經過SHA值調用腳本 Future<Object> r1 = redisson.getScript().evalShaAsync(RScript.Mode.READ_ONLY, sha1, RScript.ReturnType.VALUE, Collections.emptyList()); try { System.out.println("res: " + r1.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } client.shutdown(); } }
運行上面的代碼時,將會看到如下輸出:
foo: bar sha1: 282297a0228f48cd3fc6a55de6316f31422f5d17 res: bar
RLock 是 Java 中可重入鎖的分佈式實現,下面的代碼演示了 RLock 的用法:
public class RedissionTest { @Resource RedissonManager redissonManager; @Test public void testLockExamples() { // 默認鏈接上 127.0.0.1:6379 RedissonClient redisson = redissonManager.getRedisson(); // RLock 繼承了 java.util.concurrent.locks.Lock 接口 RLock lock = redisson.getLock("redission:test:lock:1"); final int[] count = {0}; int threads = 10; ExecutorService pool = Executors.newFixedThreadPool(10); CountDownLatch countDownLatch = new CountDownLatch(threads); long start = System.currentTimeMillis(); for (int i = 0; i < threads; i++) { pool.submit(() -> { for (int j = 0; j < 1000; j++) { lock.lock(); count[0]++; lock.unlock(); } countDownLatch.countDown(); }); } try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("10個線程每一個累加1000爲: = " + count[0]); //輸出統計結果 float time = System.currentTimeMillis() - start; System.out.println("運行的時長爲:" + time); System.out.println("每一次執行的時長爲:" + time/count[0]); } }
此代碼將產生如下輸出:
10個線程每一個累加1000爲: = 10000 運行的時長爲:14172.0 每一次執行的時長爲:1.4172
RAtomicLong 是 Java 中 AtomicLong 類的分佈式「替代品」,用於在併發環境中保存長值。如下示例代碼演示了 RAtomicLong 的用法:
public class RedissionTest { @Resource RedissonManager redissonManager; @Test public void testRAtomicLongExamples() { // 默認鏈接上 127.0.0.1:6379 RedissonClient redisson = redissonManager.getRedisson(); RAtomicLong atomicLong = redisson.getAtomicLong("redission:test:myLong"); // 線程數 final int threads = 10; // 每條線程的執行輪數 final int turns = 1000; ExecutorService pool = Executors.newFixedThreadPool(threads); for (int i = 0; i < threads; i++) { pool.submit(() -> { try { for (int j = 0; j < turns; j++) { atomicLong.incrementAndGet(); } } catch (Exception e) { e.printStackTrace(); } }); } ThreadUtil.sleepSeconds(5); System.out.println("atomicLong: " + atomicLong.get()); redisson.shutdown(); } }
此代碼的輸出將是:
atomicLong: 10000
基於Redis的Redisson分佈式整長型累加器(LongAdder)採用了與java.util.concurrent.atomic.LongAdder相似的接口。經過利用客戶端內置的LongAdder對象,爲分佈式環境下遞增和遞減操做提供了很高得性能。據統計其性能最高比分佈式AtomicLong
對象快 12000 倍。
完美適用於分佈式統計計量場景。下面是RLongAdder的使用案例:
RLongAdder atomicLong = redisson.getLongAdder("myLongAdder"); atomicLong.add(12); atomicLong.increment(); atomicLong.decrement(); atomicLong.sum();
如下示例代碼演示了 RLongAdder 的用法:
public class RedissionTest { @Resource RedissonManager redissonManager; @Test public void testRAtomicLongExamples() { // 默認鏈接上 127.0.0.1:6379 RedissonClient redisson = redissonManager.getRedisson(); RAtomicLong atomicLong = redisson.getAtomicLong("redission:test:myLong"); // 線程數 final int threads = 10; // 每條線程的執行輪數 final int turns = 1000; ExecutorService pool = Executors.newFixedThreadPool(threads); for (int i = 0; i < threads; i++) { pool.submit(() -> { try { for (int j = 0; j < turns; j++) { atomicLong.incrementAndGet(); } } catch (Exception e) { e.printStackTrace(); } }); } ThreadUtil.sleepSeconds(5); System.out.println("atomicLong: " + atomicLong.get()); redisson.shutdown(); } }
此代碼將產生如下輸出:
longAdder: 10000 運行的時長爲:5085.0 每一次執行的時長爲:0.5085
當再也不使用整長型累加器對象的時候應該自行手動銷燬,若是Redisson對象被關閉(shutdown)了,則不用手動銷燬。
RLongAdder atomicLong = ... atomicLong.destroy();
Redisson的對象編碼類是用於將對象進行序列化和反序列化,以實現對該對象在Redis裏的讀取和存儲。Redisson提供瞭如下幾種的對象編碼應用,以供你們選擇:
編碼類名稱 | 說明 |
---|---|
org.redisson.codec.JsonJacksonCodec |
Jackson JSON 編碼 默認編碼 |
org.redisson.codec.AvroJacksonCodec |
Avro 一個二進制的JSON編碼 |
org.redisson.codec.SmileJacksonCodec |
Smile 另外一個二進制的JSON編碼 |
org.redisson.codec.CborJacksonCodec |
CBOR 又一個二進制的JSON編碼 |
org.redisson.codec.MsgPackJacksonCodec |
MsgPack 再來一個二進制的JSON編碼 |
org.redisson.codec.IonJacksonCodec |
Amazon Ion 亞馬遜的Ion編碼,格式與JSON相似 |
org.redisson.codec.KryoCodec |
Kryo 二進制對象序列化編碼 |
org.redisson.codec.SerializationCodec |
JDK序列化編碼 |
org.redisson.codec.FstCodec |
FST 10倍於JDK序列化性能並且100%兼容的編碼 |
org.redisson.codec.LZ4Codec |
LZ4 壓縮型序列化對象編碼 |
org.redisson.codec.SnappyCodec |
Snappy 另外一個壓縮型序列化對象編碼 |
org.redisson.client.codec.JsonJacksonMapCodec |
基於Jackson的映射類使用的編碼。可用於避免序列化類的信息,以及用於解決使用byte[] 遇到的問題。 |
org.redisson.client.codec.StringCodec |
純字符串編碼(無轉換) |
org.redisson.client.codec.LongCodec |
純整長型數字編碼(無轉換) |
org.redisson.client.codec.ByteArrayCodec |
字節數組編碼 |
org.redisson.codec.CompositeCodec |
用來組合多種不一樣編碼在一塊兒 |
由Redisson默認的編碼器爲二進制編碼器,爲了序列化後的內容可見,須要使用Json文本序列化編碼工具類。Redisson提供了編碼器 JsonJacksonCodec,做爲Json文本序列化編碼工具類。
問題是:JsonJackson在序列化有雙向引用的對象時,會出現無限循環異常。而fastjson在檢查出雙向引用後會自動用引用符$ref替換,終止循環。
因此,一些特殊場景中:用fastjson能 正常序列化到redis,而JsonJackson則拋出無限循環異常。
爲了序列化後的內容可見,因此不用redission其餘自帶的,自行實現fastjson編碼器:
package com.crayon.distributedredissionspringbootstarter.codec; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.serializer.SerializerFeature; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufInputStream; import io.netty.buffer.ByteBufOutputStream; import org.redisson.client.codec.BaseCodec; import org.redisson.client.protocol.Decoder; import org.redisson.client.protocol.Encoder; import java.io.IOException; public class FastjsonCodec extends BaseCodec { private final Encoder encoder = in -> { ByteBuf out = ByteBufAllocator.DEFAULT.buffer(); try { ByteBufOutputStream os = new ByteBufOutputStream(out); JSON.writeJSONString(os, in, SerializerFeature.WriteClassName); return os.buffer(); } catch (IOException e) { out.release(); throw e; } catch (Exception e) { out.release(); throw new IOException(e); } }; private final Decoder<Object> decoder = (buf, state) -> JSON.parseObject(new ByteBufInputStream(buf), Object.class); @Override public Decoder<Object> getValueDecoder() { return decoder; } @Override public Encoder getValueEncoder() { return encoder; } }
替換的方法以下:
*/ @Slf4j public class StandaloneConfigImpl implements RedissonConfigService { @Override public Config createRedissonConfig(RedissonConfig redissonConfig) { Config config = new Config(); try { String address = redissonConfig.getAddress(); String password = redissonConfig.getPassword(); int database = redissonConfig.getDatabase(); String redisAddr = GlobalConstant.REDIS_CONNECTION_PREFIX.getConstant_value() + address; config.useSingleServer().setAddress(redisAddr); config.useSingleServer().setDatabase(database); //密碼能夠爲空 if (!StringUtils.isEmpty(password)) { config.useSingleServer().setPassword(password); } log.info("初始化[單機部署]方式Config,redisAddress:" + address); // config.setCodec( new FstCodec()); config.setCodec( new FastjsonCodec()); } catch (Exception e) { log.error("單機部署 Redisson init error", e); } return config; } }
哨兵模式即sentinel模式,配置Redis哨兵服務的官方文檔在這裏。
哨兵模式實現代碼和單機模式幾乎同樣,惟一的不一樣就是Config的構造.
程序化配置哨兵模式的方法以下:
Config config = new Config(); config.useSentinelServers() .setMasterName("mymaster") // use "rediss://" for SSL connection .addSentinelAddress("redis://127.0.0.1:26389", "redis://127.0.0.1:26379") .addSentinelAddress("redis://127.0.0.1:26319"); RedissonClient redisson = Redisson.create(config);
Redisson的哨兵模式的使用方法以下:
SentinelServersConfig sentinelConfig = config.useSentinelServers();
SentinelServersConfig配置參數以下:
配置Redis哨兵服務的官方文檔在這裏。Redisson的哨兵模式的使用方法以下:
SentinelServersConfig sentinelConfig = config.useSentinelServers();
SentinelServersConfig
類的設置參數以下:dnsMonitoringInterval(DNS監控間隔,單位:毫秒)
默認值:
5000
用來指定檢查節點DNS變化的時間間隔。使用的時候應該確保JVM裏的DNS數據的緩存時間保持在足夠低的範圍纔有意義。用
-1
來禁用該功能。masterName(主服務器的名稱)
主服務器的名稱是哨兵進程中用來監測主從服務切換狀況的。
addSentinelAddress(添加哨兵節點地址)
能夠經過
host:port
的格式來指定哨兵節點的地址。多個節點能夠一次性批量添加。readMode(讀取操做的負載均衡模式)
默認值:
SLAVE
(只在從服務節點裏讀取)注:在從服務節點裏讀取的數聽說明已經至少有兩個節點保存了該數據,確保了數據的高可用性。
設置讀取操做選擇節點的模式。可用值爲:
SLAVE
- 只在從服務節點裏讀取。MASTER
- 只在主服務節點裏讀取。MASTER_SLAVE
- 在主從服務節點裏均可以讀取。subscriptionMode(訂閱操做的負載均衡模式)
默認值:
SLAVE
(只在從服務節點裏訂閱)設置訂閱操做選擇節點的模式。可用值爲:
SLAVE
- 只在從服務節點裏訂閱。MASTER
- 只在主服務節點裏訂閱。loadBalancer(負載均衡算法類的選擇)
默認值:
org.redisson.connection.balancer.RoundRobinLoadBalancer
在使用多個Redis服務節點的環境裏,能夠選用如下幾種負載均衡方式選擇一個節點:
org.redisson.connection.balancer.WeightedRoundRobinBalancer
- 權重輪詢調度算法org.redisson.connection.balancer.RoundRobinLoadBalancer
- 輪詢調度算法org.redisson.connection.balancer.RandomLoadBalancer
- 隨機調度算法subscriptionConnectionMinimumIdleSize(從節點發布和訂閱鏈接的最小空閒鏈接數)
默認值:
1
多從節點的環境裏,每一個 從服務節點裏用於發佈和訂閱鏈接的最小保持鏈接數(長鏈接)。Redisson內部常常經過發佈和訂閱來實現許多功能。長期保持必定數量的發佈訂閱鏈接是必須的。
subscriptionConnectionPoolSize(從節點發布和訂閱鏈接池大小)
默認值:
50
多從節點的環境裏,每一個 從服務節點裏用於發佈和訂閱鏈接的鏈接池最大容量。鏈接池的鏈接數量自動彈性伸縮。
slaveConnectionMinimumIdleSize(從節點最小空閒鏈接數)
默認值:
32
多從節點的環境裏,每一個 從服務節點裏用於普通操做(非 發佈和訂閱)的最小保持鏈接數(長鏈接)。長期保持必定數量的鏈接有利於提升瞬時讀取反映速度。
slaveConnectionPoolSize(從節點鏈接池大小)
默認值:
64
多從節點的環境裏,每一個 從服務節點裏用於普通操做(非 發佈和訂閱)鏈接的鏈接池最大容量。鏈接池的鏈接數量自動彈性伸縮。
masterConnectionMinimumIdleSize(主節點最小空閒鏈接數)
默認值:
32
多從節點的環境裏,每一個 主節點的最小保持鏈接數(長鏈接)。長期保持必定數量的鏈接有利於提升瞬時寫入反應速度。
masterConnectionPoolSize(主節點鏈接池大小)
默認值:
64
主節點的鏈接池最大容量。鏈接池的鏈接數量自動彈性伸縮。
idleConnectionTimeout(鏈接空閒超時,單位:毫秒)
默認值:
10000
若是當前鏈接池裏的鏈接數量超過了最小空閒鏈接數,而同時有鏈接空閒時間超過了該數值,那麼這些鏈接將會自動被關閉,並從鏈接池裏去掉。時間單位是毫秒。
connectTimeout(鏈接超時,單位:毫秒)
默認值:
10000
同任何節點創建鏈接時的等待超時。時間單位是毫秒。
timeout(命令等待超時,單位:毫秒)
默認值:
3000
等待節點回覆命令的時間。該時間從命令發送成功時開始計時。
retryAttempts(命令失敗重試次數)
默認值:
3
若是嘗試達到 retryAttempts(命令失敗重試次數) 仍然不能將命令發送至某個指定的節點時,將拋出錯誤。若是嘗試在此限制以內發送成功,則開始啓用 timeout(命令等待超時) 計時。
retryInterval(命令重試發送時間間隔,單位:毫秒)
默認值:
1500
在一條命令發送失敗之後,等待重試發送的時間間隔。時間單位是毫秒。
reconnectionTimeout(從新鏈接時間間隔,單位:毫秒)
默認值:
3000
當與某個節點的鏈接斷開時,等待與其從新創建鏈接的時間間隔。時間單位是毫秒。
failedAttempts(執行失敗最大次數)
默認值:
3
在某個節點執行相同或不一樣命令時,連續 失敗 failedAttempts(執行失敗最大次數) 時,該節點將被從可用節點列表裏清除,直到 reconnectionTimeout(從新鏈接時間間隔) 超時之後再次嘗試。
database(數據庫編號)
默認值:
0
嘗試鏈接的數據庫編號。
password(密碼)
默認值:
null
用於節點身份驗證的密碼。
subscriptionsPerConnection(單個鏈接最大訂閱數量)
默認值:
5
每一個鏈接的最大訂閱數量。
clientName(客戶端名稱)
默認值:
null
在Redis節點裏顯示的客戶端名稱。
sslEnableEndpointIdentification(啓用SSL終端識別)
默認值:
true
開啓SSL終端識別能力。
sslProvider(SSL實現方式)
默認值:
JDK
肯定採用哪一種方式(JDK或OPENSSL)來實現SSL鏈接。
sslTruststore(SSL信任證書庫路徑)
默認值:
null
指定SSL信任證書庫的路徑。
sslTruststorePassword(SSL信任證書庫密碼)
默認值:
null
指定SSL信任證書庫的密碼。
sslKeystore(SSL鑰匙庫路徑)
默認值:
null
指定SSL鑰匙庫的路徑。
sslKeystorePassword(SSL鑰匙庫密碼)
默認值:
null
指定SSL鑰匙庫的密碼。
經過屬性文件,配置的示例以下:
--- sentinelServersConfig: idleConnectionTimeout: 10000 connectTimeout: 10000 timeout: 3000 retryAttempts: 3 retryInterval: 1500 failedSlaveReconnectionInterval: 3000 failedSlaveCheckInterval: 60000 password: null subscriptionsPerConnection: 5 clientName: null loadBalancer: !<org.redisson.connection.balancer.RoundRobinLoadBalancer> {} subscriptionConnectionMinimumIdleSize: 1 subscriptionConnectionPoolSize: 50 slaveConnectionMinimumIdleSize: 24 slaveConnectionPoolSize: 64 masterConnectionMinimumIdleSize: 24 masterConnectionPoolSize: 64 readMode: "SLAVE" subscriptionMode: "SLAVE" sentinelAddresses: - "redis://127.0.0.1:26379" - "redis://127.0.0.1:26389" masterName: "mymaster" database: 0 threads: 16 nettyThreads: 32 codec: !<org.redisson.codec.MarshallingCodec> {} transportMode: "NIO"
介紹配置Redis主從服務組態的文檔在這裏.
程序化配置主從模式的方法以下:
Config config = new Config(); config.useMasterSlaveServers() // use "rediss://" for SSL connection .setMasterAddress("redis://127.0.0.1:6379") .addSlaveAddress("redis://127.0.0.1:6389", "redis://127.0.0.1:6332", "redis://127.0.0.1:6419") .addSlaveAddress("redis://127.0.0.1:6399"); RedissonClient redisson = Redisson.create(config);
主從模式使用到MasterSlaveServersConfig :
MasterSlaveServersConfig masterSlaveConfig = config.useMasterSlaveServers();
MasterSlaveServersConfig
類的設置參數以下:
dnsMonitoringInterval(DNS監控間隔,單位:毫秒)
默認值:
5000
用來指定檢查節點DNS變化的時間間隔。使用的時候應該確保JVM裏的DNS數據的緩存時間保持在足夠低的範圍纔有意義。用
-1
來禁用該功能。masterAddress(主節點地址)
能夠經過
host:port
的格式來指定主節點地址。addSlaveAddress(添加從主節點地址)
能夠經過
host:port
的格式來指定從節點的地址。多個節點能夠一次性批量添加。readMode(讀取操做的負載均衡模式)
默認值:
SLAVE
(只在從服務節點裏讀取)注:在從服務節點裏讀取的數聽說明已經至少有兩個節點保存了該數據,確保了數據的高可用性。
設置讀取操做選擇節點的模式。可用值爲:
SLAVE
- 只在從服務節點裏讀取。MASTER
- 只在主服務節點裏讀取。MASTER_SLAVE
- 在主從服務節點裏均可以讀取。subscriptionMode(訂閱操做的負載均衡模式)
默認值:
SLAVE
(只在從服務節點裏訂閱)設置訂閱操做選擇節點的模式。可用值爲:
SLAVE
- 只在從服務節點裏訂閱。MASTER
- 只在主服務節點裏訂閱。loadBalancer(負載均衡算法類的選擇)
默認值:
org.redisson.connection.balancer.RoundRobinLoadBalancer
在使用多個Redis服務節點的環境裏,能夠選用如下幾種負載均衡方式選擇一個節點:
org.redisson.connection.balancer.WeightedRoundRobinBalancer
- 權重輪詢調度算法org.redisson.connection.balancer.RoundRobinLoadBalancer
- 輪詢調度算法org.redisson.connection.balancer.RandomLoadBalancer
- 隨機調度算法subscriptionConnectionMinimumIdleSize(從節點發布和訂閱鏈接的最小空閒鏈接數)
默認值:
1
多從節點的環境裏,每一個 從服務節點裏用於發佈和訂閱鏈接的最小保持鏈接數(長鏈接)。Redisson內部常常經過發佈和訂閱來實現許多功能。長期保持必定數量的發佈訂閱鏈接是必須的。
subscriptionConnectionPoolSize(從節點發布和訂閱鏈接池大小)
默認值:
50
多從節點的環境裏,每一個 從服務節點裏用於發佈和訂閱鏈接的鏈接池最大容量。鏈接池的鏈接數量自動彈性伸縮。
slaveConnectionMinimumIdleSize(從節點最小空閒鏈接數)
默認值:
32
多從節點的環境裏,每一個 從服務節點裏用於普通操做(非 發佈和訂閱)的最小保持鏈接數(長鏈接)。長期保持必定數量的鏈接有利於提升瞬時讀取反映速度。
slaveConnectionPoolSize(從節點鏈接池大小)
默認值:
64
多從節點的環境裏,每一個 從服務節點裏用於普通操做(非 發佈和訂閱)鏈接的鏈接池最大容量。鏈接池的鏈接數量自動彈性伸縮。
masterConnectionMinimumIdleSize(主節點最小空閒鏈接數)
默認值:
32
多從節點的環境裏,每一個 主節點的最小保持鏈接數(長鏈接)。長期保持必定數量的鏈接有利於提升瞬時寫入反應速度。
masterConnectionPoolSize(主節點鏈接池大小)
默認值:
64
主節點的鏈接池最大容量。鏈接池的鏈接數量自動彈性伸縮。
idleConnectionTimeout(鏈接空閒超時,單位:毫秒)
默認值:
10000
若是當前鏈接池裏的鏈接數量超過了最小空閒鏈接數,而同時有鏈接空閒時間超過了該數值,那麼這些鏈接將會自動被關閉,並從鏈接池裏去掉。時間單位是毫秒。
connectTimeout(鏈接超時,單位:毫秒)
默認值:
10000
同任何節點創建鏈接時的等待超時。時間單位是毫秒。
timeout(命令等待超時,單位:毫秒)
默認值:
3000
等待節點回覆命令的時間。該時間從命令發送成功時開始計時。
retryAttempts(命令失敗重試次數)
默認值:
3
若是嘗試達到 retryAttempts(命令失敗重試次數) 仍然不能將命令發送至某個指定的節點時,將拋出錯誤。若是嘗試在此限制以內發送成功,則開始啓用 timeout(命令等待超時) 計時。
retryInterval(命令重試發送時間間隔,單位:毫秒)
默認值:
1500
在一條命令發送失敗之後,等待重試發送的時間間隔。時間單位是毫秒。
reconnectionTimeout(從新鏈接時間間隔,單位:毫秒)
默認值:
3000
當與某個節點的鏈接斷開時,等待與其從新創建鏈接的時間間隔。時間單位是毫秒。
failedAttempts(執行失敗最大次數)
默認值:
3
在某個節點執行相同或不一樣命令時,連續 失敗 failedAttempts(執行失敗最大次數) 時,該節點將被從可用節點列表裏清除,直到 reconnectionTimeout(從新鏈接時間間隔) 超時之後再次嘗試。
database(數據庫編號)
默認值:
0
嘗試鏈接的數據庫編號。
password(密碼)
默認值:
null
用於節點身份驗證的密碼。
subscriptionsPerConnection(單個鏈接最大訂閱數量)
默認值:
5
每一個鏈接的最大訂閱數量。
clientName(客戶端名稱)
默認值:
null
在Redis節點裏顯示的客戶端名稱。
sslEnableEndpointIdentification(啓用SSL終端識別)
默認值:
true
開啓SSL終端識別能力。
sslProvider(SSL實現方式)
默認值:
JDK
肯定採用哪一種方式(JDK或OPENSSL)來實現SSL鏈接。
sslTruststore(SSL信任證書庫路徑)
默認值:
null
指定SSL信任證書庫的路徑。
sslTruststorePassword(SSL信任證書庫密碼)
默認值:
null
指定SSL信任證書庫的密碼。
sslKeystore(SSL鑰匙庫路徑)
默認值:
null
指定SSL鑰匙庫的路徑。
sslKeystorePassword(SSL鑰匙庫密碼)
默認值:
null
指定SSL鑰匙庫的密碼。
集羣模式除了適用於Redis集羣環境,也適用於任何雲計算服務商提供的集羣模式,例如AWS ElastiCache集羣版、Azure Redis Cache和阿里雲(Aliyun)的雲數據庫Redis版。
介紹配置Redis集羣組態的文檔在這裏。 Redis集羣組態的最低要求是必須有三個主節點。
集羣模式構造Config以下:
Config config = new Config(); config.useClusterServers() .setScanInterval(2000) // 集羣狀態掃描間隔時間,單位是毫秒 //能夠用"rediss://"來啓用SSL鏈接 .addNodeAddress("redis://127.0.0.1:7000", "redis://127.0.0.1:7001") .addNodeAddress("redis://127.0.0.1:7002"); RedissonClient redisson = Redisson.create(config);
集羣模式使用到ClusterServersConfig :
ClusterServersConfig clusterConfig = config.useClusterServers();
ClusterServersConfig 配置參數以下:
nodeAddresses(添加節點地址)
能夠經過
host:port
的格式來添加Redis集羣節點的地址。多個節點能夠一次性批量添加。scanInterval(集羣掃描間隔時間)
默認值:
1000
對Redis集羣節點狀態掃描的時間間隔。單位是毫秒。
slots(分片數量)
默認值:
231
用於指定數據分片過程當中的分片數量。支持數據分片/框架結構有:集(Set)、映射(Map)、BitSet、Bloom filter, Spring Cache和Hibernate Cache等.readMode(讀取操做的負載均衡模式)
默認值:
SLAVE
(只在從服務節點裏讀取)注:在從服務節點裏讀取的數聽說明已經至少有兩個節點保存了該數據,確保了數據的高可用性。
設置讀取操做選擇節點的模式。可用值爲:
SLAVE
- 只在從服務節點裏讀取。MASTER
- 只在主服務節點裏讀取。MASTER_SLAVE
- 在主從服務節點裏均可以讀取。subscriptionMode(訂閱操做的負載均衡模式)
默認值:
SLAVE
(只在從服務節點裏訂閱)設置訂閱操做選擇節點的模式。可用值爲:
SLAVE
- 只在從服務節點裏訂閱。MASTER
- 只在主服務節點裏訂閱。loadBalancer(負載均衡算法類的選擇)
默認值:
org.redisson.connection.balancer.RoundRobinLoadBalancer
在多Redis服務節點的環境裏,能夠選用如下幾種負載均衡方式選擇一個節點:
org.redisson.connection.balancer.WeightedRoundRobinBalancer
- 權重輪詢調度算法org.redisson.connection.balancer.RoundRobinLoadBalancer
- 輪詢調度算法org.redisson.connection.balancer.RandomLoadBalancer
- 隨機調度算法subscriptionConnectionMinimumIdleSize(從節點發布和訂閱鏈接的最小空閒鏈接數)
默認值:
1
多從節點的環境裏,每一個 從服務節點裏用於發佈和訂閱鏈接的最小保持鏈接數(長鏈接)。Redisson內部常常經過發佈和訂閱來實現許多功能。長期保持必定數量的發佈訂閱鏈接是必須的。
subscriptionConnectionPoolSize(從節點發布和訂閱鏈接池大小)
默認值:
50
多從節點的環境裏,每一個 從服務節點裏用於發佈和訂閱鏈接的鏈接池最大容量。鏈接池的鏈接數量自動彈性伸縮。
slaveConnectionMinimumIdleSize(從節點最小空閒鏈接數)
默認值:
32
多從節點的環境裏,每一個 從服務節點裏用於普通操做(非 發佈和訂閱)的最小保持鏈接數(長鏈接)。長期保持必定數量的鏈接有利於提升瞬時讀取反映速度。
slaveConnectionPoolSize(從節點鏈接池大小)
默認值:
64
多從節點的環境裏,每一個 從服務節點裏用於普通操做(非 發佈和訂閱)鏈接的鏈接池最大容量。鏈接池的鏈接數量自動彈性伸縮。
masterConnectionMinimumIdleSize(主節點最小空閒鏈接數)
默認值:
32
多節點的環境裏,每一個 主節點的最小保持鏈接數(長鏈接)。長期保持必定數量的鏈接有利於提升瞬時寫入反應速度。
masterConnectionPoolSize(主節點鏈接池大小)
默認值:
64
多主節點的環境裏,每一個 主節點的鏈接池最大容量。鏈接池的鏈接數量自動彈性伸縮。
idleConnectionTimeout(鏈接空閒超時,單位:毫秒)
默認值:
10000
若是當前鏈接池裏的鏈接數量超過了最小空閒鏈接數,而同時有鏈接空閒時間超過了該數值,那麼這些鏈接將會自動被關閉,並從鏈接池裏去掉。時間單位是毫秒。
connectTimeout(鏈接超時,單位:毫秒)
默認值:
10000
同任何節點創建鏈接時的等待超時。時間單位是毫秒。
timeout(命令等待超時,單位:毫秒)
默認值:
3000
等待節點回覆命令的時間。該時間從命令發送成功時開始計時。
retryAttempts(命令失敗重試次數)
默認值:
3
若是嘗試達到 retryAttempts(命令失敗重試次數) 仍然不能將命令發送至某個指定的節點時,將拋出錯誤。若是嘗試在此限制以內發送成功,則開始啓用 timeout(命令等待超時) 計時。
retryInterval(命令重試發送時間間隔,單位:毫秒)
默認值:
1500
在一條命令發送失敗之後,等待重試發送的時間間隔。時間單位是毫秒。
reconnectionTimeout(從新鏈接時間間隔,單位:毫秒)
默認值:
3000
當與某個節點的鏈接斷開時,等待與其從新創建鏈接的時間間隔。時間單位是毫秒。
failedAttempts(執行失敗最大次數)
默認值:
3
在某個節點執行相同或不一樣命令時,連續 失敗 failedAttempts(執行失敗最大次數) 時,該節點將被從可用節點列表裏清除,直到 reconnectionTimeout(從新鏈接時間間隔) 超時之後再次嘗試。
password(密碼)
默認值:
null
用於節點身份驗證的密碼。
subscriptionsPerConnection(單個鏈接最大訂閱數量)
默認值:
5
每一個鏈接的最大訂閱數量。
clientName(客戶端名稱)
默認值:
null
在Redis節點裏顯示的客戶端名稱。
sslEnableEndpointIdentification(啓用SSL終端識別)
默認值:
true
開啓SSL終端識別能力。
sslProvider(SSL實現方式)
默認值:
JDK
肯定採用哪一種方式(JDK或OPENSSL)來實現SSL鏈接。
sslTruststore(SSL信任證書庫路徑)
默認值:
null
指定SSL信任證書庫的路徑。
sslTruststorePassword(SSL信任證書庫密碼)
默認值:
null
指定SSL信任證書庫的密碼。
sslKeystore(SSL鑰匙庫路徑)
默認值:
null
指定SSL鑰匙庫的路徑。
sslKeystorePassword(SSL鑰匙庫密碼)
默認值:
null
指定SSL鑰匙庫的密碼。
Redis發展到如今,幾種常見的部署架構有:
先介紹,基於單機模式的簡單Redision鎖的使用。
單機模式下,簡單Redision鎖的使用以下:
// 構造redisson實現分佈式鎖必要的Config Config config = new Config(); config.useSingleServer().setAddress("redis://172.29.1.180:5379").setPassword("a123456").setDatabase(0); // 構造RedissonClient RedissonClient redissonClient = Redisson.create(config); // 設置鎖定資源名稱 RLock disLock = redissonClient.getLock("DISLOCK"); //嘗試獲取分佈式鎖 boolean isLock= disLock.tryLock(500, 15000, TimeUnit.MILLISECONDS); if (isLock) { try { //TODO if get lock success, do something; Thread.sleep(15000); } catch (Exception e) { } finally { // 不管如何, 最後都要解鎖 disLock.unlock(); } }
經過代碼可知,通過Redisson的封裝,實現Redis分佈式鎖很是方便,和顯式鎖的使用方法是同樣的。RLock接口繼承了 Lock接口。
咱們再看一下Redis中的value是啥,和前文分析同樣,hash結構, redis 的key就是資源名稱。
hash結構的key就是UUID+threadId,hash結構的value就是重入值,在分佈式鎖時,這個值爲1(Redisson還能夠實現重入鎖,那麼這個值就取決於重入次數了):
172.29.1.180:5379> hgetall DISLOCK 1) "01a6d806-d282-4715-9bec-f51b9aa98110:1" 2) "1"
使用客戶端工具看到的效果以下:
能夠看到,調用getLock()方法後實際返回一個RedissonLock對象
下面來看下tryLock方法,源碼以下:
@Override public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(threadId); return false; } current = System.currentTimeMillis(); RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false)) { subscribeFuture.onComplete((res, e) -> { if (e == null) { unsubscribe(subscribeFuture, threadId); } }); } acquireFailed(threadId); return false; } try { time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(threadId); return false; } while (true) { long currentTime = System.currentTimeMillis(); ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(threadId); return false; } // waiting for message currentTime = System.currentTimeMillis(); if (ttl >= 0 && ttl < time) { getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(threadId); return false; } } } finally { unsubscribe(subscribeFuture, threadId); } // return get(tryLockAsync(waitTime, leaseTime, unit)); }
以上代碼使用了異步回調模式,RFuture 繼承了 java.util.concurrent.Future
在RedissonLock對象的lock()方法主要調用tryAcquire()方法
因爲leaseTime == -1,因而走tryLockInnerAsync()方法,這個方法纔是關鍵
首先,看一下evalWriteAsync方法的定義
<T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
這和前面的jedis調用lua腳本相似,最後兩個參數分別是keys和params。
單獨將調用的那一段摘出來看,實際調用是這樣的:
commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
結合上面的參數聲明,咱們能夠知道,這裏KEYS[1]就是getName(),ARGV[2]是getLockName(threadId)
假設:
那麼KEYS[1]=DISLOCK,ARGV[2]=01a6d806-d282-4715-9bec-f51b9aa98110:1
所以,這段腳本的意思是
一、判斷有沒有一個叫「DISLOCK」的key
二、若是沒有,則在其下設置一個字段爲「01a6d806-d282-4715-9bec-f51b9aa98110:1」,值爲「1」的鍵值對 ,並設置它的過時時間
三、若是存在,則進一步判斷「01a6d806-d282-4715-9bec-f51b9aa98110:1」是否存在,若存在,則其值加1,並從新設置過時時間
四、返回「DISLOCK」的生存時間(毫秒)
這裏用的數據結構是hash,hash的結構是: key 字段1 值1 字段2 值2 。。。
用在鎖這個場景下,key就表示鎖的名稱,也能夠理解爲臨界資源,字段就表示當前得到鎖的線程
全部競爭這把鎖的線程都要判斷在這個key下有沒有本身線程的字段,若是沒有則不能得到鎖,若是有,則至關於重入,字段值加1(次數)
爲什麼要使用lua語言?
由於一大堆複雜的業務邏輯,能夠經過封裝在lua腳本中發送給redis,保證這段複雜業務邏輯執行的原子性
回顧一下evalWriteAsync方法的定義
<T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
注意,其最後兩個參數分別是keys和params。
KEYS[1]表明的是你加鎖的那個key,好比說:
RLock lock = redisson.getLock("DISLOCK");
這裏你本身設置了加鎖的那個鎖key就是「DISLOCK」。
ARGV[1]表明的就是鎖key的默認生存時間
調用的時候,傳遞的參數爲 internalLockLeaseTime ,該值默認30秒。
ARGV[2]表明的是加鎖的客戶端的ID,相似於下面這樣:
01a6d806-d282-4715-9bec-f51b9aa98110:1
lua腳本的第一段if判斷語句,就是用「exists DISLOCK」命令判斷一下,若是你要加鎖的那個鎖key不存在的話,你就進行加鎖。
如何加鎖呢?很簡單,用下面的redis命令:
hset DISLOCK 01a6d806-d282-4715-9bec-f51b9aa98110:1 1
經過這個命令設置一個hash數據結構,這行命令執行後,會出現一個相似下面的數據結構:
DISLOCK: { 8743c9c0-0795-4907-87fd-6c719a6b4586:1 1 }
接着會執行「pexpire DISLOCK 30000」命令,設置DISLOCK這個鎖key的生存時間是30秒(默認)
那麼在這個時候,若是客戶端2來嘗試加鎖,執行了一樣的一段lua腳本,會咋樣呢?
很簡單,第一個if判斷會執行「exists DISLOCK」,發現DISLOCK 這個鎖key已經存在了。
接着第二個if判斷,判斷一下,DISLOCK鎖key的hash數據結構中,是否包含客戶端2的ID,可是明顯不是的,由於那裏包含的是客戶端1的ID。
因此,客戶端2會獲取到pttl DISLOCK返回的一個數字,這個數字表明瞭DISLOCK 這個鎖key的剩餘生存時間。好比還剩15000毫秒的生存時間。
此時客戶端2會進入一個while循環,不停的嘗試加鎖。
若是客戶端1都已經持有了這把鎖了,結果可重入的加鎖會怎麼樣呢?
RLock lock = redisson.getLock("DISLOCK") lock.lock(); //業務代碼 lock.lock(); //業務代碼 lock.unlock(); lock.unlock();
分析上面那段lua腳本。
第一個if判斷確定不成立,「exists DISLOCK」會顯示鎖key已經存在了。
第二個if判斷會成立,由於DISLOCK的hash數據結構中包含的那個ID,就是客戶端1的那個ID,也就是「8743c9c0-0795-4907-87fd-6c719a6b4586:1」
此時就會執行可重入加鎖的邏輯,他會用:
incrby DISLOCK
8743c9c0-0795-4907-87fd-6c719a6b4586:1 1
經過這個命令,對客戶端1的加鎖次數,累加1。
此時DISLOCK數據結構變爲下面這樣:
DISLOCK: { 8743c9c0-0795-4907-87fd-6c719a6b4586:1 2 }
若是執行lock.unlock(),就能夠釋放分佈式鎖,此時的業務邏輯也是很是簡單的。
其實說白了,就是每次都對DISLOCK數據結構中的那個加鎖次數減1。
若是發現加鎖次數是0了,說明這個客戶端已經再也不持有鎖了,此時就會用:
「del DISLOCK」命令,從redis裏刪除這個key。
而後呢,另外的客戶端2就能夠嘗試完成加鎖了。
@Override public void unlock() { try { get(unlockAsync(Thread.currentThread().getId())); } catch (RedisException e) { if (e.getCause() instanceof IllegalMonitorStateException) { throw (IllegalMonitorStateException) e.getCause(); } else { throw e; } } // Future<Void> future = unlockAsync(); // future.awaitUninterruptibly(); // if (future.isSuccess()) { // return; // } // if (future.cause() instanceof IllegalMonitorStateException) { // throw (IllegalMonitorStateException)future.cause(); // } // throw commandExecutor.convertException(future); }
再深刻一下,實際調用的是unlockInnerAsync方法
上圖沒有截取完整,完整的源碼以下:
protected RFuture<Boolean> unlockInnerAsync(long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;", Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); }
咱們仍是假設name=DISLOCK,假設線程ID是1
同理,咱們能夠知道
KEYS[1]是getName(),即KEYS[1]=DISLOCK
KEYS[2]是getChannelName(),即KEYS[2]=redisson_lock__channel:{DISLOCK}
ARGV[1]是LockPubSub.unlockMessage,即ARGV[1]=0
ARGV[2]是生存時間
ARGV[3]是getLockName(threadId),即ARGV[3]=8743c9c0-0795-4907-87fd-6c719a6b4586:1
所以,上面腳本的意思是:
一、判斷是否存在一個叫「DISLOCK」的key
二、若是不存在,返回nil
三、若是存在,使用Redis Hincrby 命令用於爲哈希表中的字段值加上指定增量值 -1 ,表明減去1
四、若counter >,返回空,若字段存在,則字段值減1
五、若減完之後,counter > 0 值仍大於0,則返回0
六、減完後,若字段值小於或等於0,則用 publish 命令廣播一條消息,廣播內容是0,並返回1;
能夠猜想,廣播0表示資源可用,即通知那些等待獲取鎖的線程如今能夠得到鎖了
以上是正常狀況下獲取到鎖的狀況,那麼當沒法當即獲取到鎖的時候怎麼辦呢?
再回到前面獲取鎖的位置
@Override public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException { long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return; } // 訂閱 RFuture<RedissonLockEntry> future = subscribe(threadId); commandExecutor.syncSubscription(future); try { while (true) { ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { break; } // waiting for message if (ttl >= 0) { getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { getEntry(threadId).getLatch().acquire(); } } } finally { unsubscribe(future, threadId); } // get(lockAsync(leaseTime, unit)); } protected static final LockPubSub PUBSUB = new LockPubSub(); protected RFuture<RedissonLockEntry> subscribe(long threadId) { return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService()); } protected void unsubscribe(RFuture<RedissonLockEntry> future, long threadId) { PUBSUB.unsubscribe(future.getNow(), getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService()); }
這裏會訂閱Channel,當資源可用時能夠及時知道,並搶佔,防止無效的輪詢而浪費資源
這裏的channel爲:
redisson_lock__channel:{DISLOCK}
當資源可用用的時候,循環去嘗試獲取鎖,因爲多個線程同時去競爭資源,因此這裏用了信號量,對於同一個資源只容許一個線程得到鎖,其它的線程阻塞
這點,有點兒相似 Zookeeper分佈式鎖:
有關zookeeper分佈式鎖的原理和實現,具體請參見下面的博客:
Zookeeper 分佈式鎖 (圖解+秒懂+史上最全)
客戶端1加鎖的鎖key默認生存時間才30秒,若是超過了30秒,客戶端1還想一直持有這把鎖,怎麼辦呢?
簡單!只要客戶端1一旦加鎖成功,就會啓動一個watch dog看門狗,他是一個後臺線程,會每隔10秒檢查一下,若是客戶端1還持有鎖key,那麼就會不斷的延長鎖key的生存時間。
可是聰明的同窗確定會問:
有效時間設置多長,假如個人業務操做比有效時間長,個人業務代碼還沒執行完,就自動給我解鎖了,不就完蛋了嗎。
這個問題就有點棘手了,在網上也有不少討論:
第一種解決方法就是靠程序員本身去把握,預估一下業務代碼須要執行的時間,而後設置有效期時間比執行時間長一些,保證不會由於自動解鎖影響到客戶端業務代碼的執行。
可是這並非萬全之策,好比網絡抖動這種狀況是沒法預測的,也有可能致使業務代碼執行的時間變長,因此並不安全。
第二種方法,使用監事狗watchDog機制實現鎖的續期。
第二種方法比較靠譜一點,並且無業務入侵。
在Redisson框架實現分佈式鎖的思路,就使用watchDog機制實現鎖的續期。
當加鎖成功後,同時開啓守護線程,默認有效期是30秒,每隔10秒就會給鎖續期到30秒,只要持有鎖的客戶端沒有宕機,就能保證一直持有鎖,直到業務代碼執行完畢由客戶端本身解鎖,若是宕機了天然就在有效期失效後自動解鎖。
這裏,和前面解決 JVM STW的鎖過時問題有點相似,只不過,watchDog自動續期,也沒有徹底解決JVM STW的鎖過時問題。
如何完全解決 JVM STW的鎖過時問題,能夠來瘋狂創客圈的社羣討論。
實際上,redisson加鎖的基本流程圖以下:
這裏專一於介紹watchdog。
首先watchdog的具體思路是 加鎖時,默認加鎖 30秒,每10秒鐘檢查一次,若是存在就從新設置 過時時間爲30秒。
而後設置默認加鎖時間的參數是 lockWatchdogTimeout(監控鎖的看門狗超時,單位:毫秒)
官方文檔描述以下
lockWatchdogTimeout(監控鎖的看門狗超時,單位:毫秒)
默認值:
30000
監控鎖的看門狗超時時間單位爲毫秒。該參數只適用於分佈式鎖的加鎖請求中未明確使用
leaseTimeout
參數的狀況。若是該看門狗未使用lockWatchdogTimeout
去從新調整一個分佈式鎖的lockWatchdogTimeout
超時,那麼這個鎖將變爲失效狀態。這個參數能夠用來避免由Redisson客戶端節點宕機或其餘緣由形成死鎖的狀況。
須要注意的是
1.watchDog 只有在未顯示指定加鎖時間時纔會生效。(這點很重要)
2.lockWatchdogTimeout設定的時間不要過小 ,好比我以前設置的是 100毫秒,因爲網絡直接致使加鎖完後,watchdog去延期時,這個key在redis中已經被刪除了。
在調用lock方法時,會最終調用到tryAcquireAsync。詳細解釋以下:
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { //若是指定了加鎖時間,會直接去加鎖 if (leaseTime != -1) { return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } //沒有指定加鎖時間 會先進行加鎖,而且默認時間就是 LockWatchdogTimeout的時間 //這個是異步操做 返回RFuture 相似netty中的future RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); //這裏也是相似netty Future 的addListener,在future內容執行完成後執行 ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null) { return; } // lock acquired if (ttlRemaining == null) { //這裏是定時執行 當前鎖自動延期的動做 scheduleExpirationRenewal(threadId); } }); return ttlRemainingFuture; }
scheduleExpirationRenewal 中會調用renewExpiration。
這裏咱們能夠看到是 啓用了一個timeout定時,去執行延期動做
private void renewExpiration() { Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } RFuture<Boolean> future = renewExpirationAsync(threadId); future.onComplete((res, e) -> { if (e != null) { log.error("Can't update lock " + getName() + " expiration", e); return; } if (res) { //若是 沒有報錯,就再次定時延期 // reschedule itself renewExpiration(); } }); } // 這裏咱們能夠看到定時任務 是 lockWatchdogTimeout 的1/3時間去執行 renewExpirationAsync }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); ee.setTimeout(task); }
最終 scheduleExpirationRenewal會調用到 renewExpirationAsync,
執行下面這段 lua腳本。他主要判斷就是 這個鎖是否在redis中存在,若是存在就進行 pexpire 延期。
protected RFuture<Boolean> renewExpirationAsync(long threadId) { return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
1.要使 watchLog機制生效 ,lock時 不要設置 過時時間
2.watchlog的延時時間 能夠由 lockWatchdogTimeout指定默認延時時間,可是不要設置過小。如100
3.watchdog 會每 lockWatchdogTimeout/3時間,去延時。
4.watchdog 經過 相似netty的 Future功能來實現異步延時
5.watchdog 最終仍是經過 lua腳原本進行延時
Redisson框架十分強大,除了前面介紹的 getLock方法獲取的分佈式鎖(輸入可重入鎖的類型),還有不少其餘的分佈式鎖類型。
整體的Redisson框架的分佈式鎖類型,大體以下:
Redisson的分佈式可重入鎖RLock Java對象實現了java.util.concurrent.locks.Lock接口,同時還支持自動過時解鎖。
public void testReentrantLock(RedissonClient redisson){ RLock lock = redisson.getLock("anyLock"); try{ // 1. 最多見的使用方法 //lock.lock(); // 2. 支持過時解鎖功能,10秒鐘之後自動解鎖, 無需調用unlock方法手動解鎖 //lock.lock(10, TimeUnit.SECONDS); // 3. 嘗試加鎖,最多等待3秒,上鎖之後10秒自動解鎖 boolean res = lock.tryLock(3, 10, TimeUnit.SECONDS); if(res){ //成功 // do your business } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }
Redisson同時還爲分佈式鎖提供了異步執行的相關方法:
public void testAsyncReentrantLock(RedissonClient redisson){ RLock lock = redisson.getLock("anyLock"); try{ lock.lockAsync(); lock.lockAsync(10, TimeUnit.SECONDS); Future<Boolean> res = lock.tryLockAsync(3, 10, TimeUnit.SECONDS); if(res.get()){ // do your business } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } finally { lock.unlock(); } }
Redisson分佈式可重入公平鎖也是實現了java.util.concurrent.locks.Lock接口的一種RLock對象。在提供了自動過時解鎖功能的同時,保證了當多個Redisson客戶端線程同時請求加鎖時,優先分配給先發出請求的線程。
public void testFairLock(RedissonClient redisson){ RLock fairLock = redisson.getFairLock("anyLock"); try{ // 最多見的使用方法 fairLock.lock(); // 支持過時解鎖功能, 10秒鐘之後自動解鎖,無需調用unlock方法手動解鎖 fairLock.lock(10, TimeUnit.SECONDS); // 嘗試加鎖,最多等待100秒,上鎖之後10秒自動解鎖 boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } finally { fairLock.unlock(); } }
Redisson同時還爲分佈式可重入公平鎖提供了異步執行的相關方法:
RLock fairLock = redisson.getFairLock("anyLock"); fairLock.lockAsync(); fairLock.lockAsync(10, TimeUnit.SECONDS); Future<Boolean> res = fairLock.tryLockAsync(100, 10, TimeUnit.SECONDS);
Redisson的RedissonMultiLock對象能夠將多個RLock對象關聯爲一個聯鎖,每一個RLock對象實例能夠來自於不一樣的Redisson實例。
public void testMultiLock(RedissonClient redisson1,RedissonClient redisson2, RedissonClient redisson3){ RLock lock1 = redisson1.getLock("lock1"); RLock lock2 = redisson2.getLock("lock2"); RLock lock3 = redisson3.getLock("lock3"); RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3); try { // 同時加鎖:lock1 lock2 lock3, 全部的鎖都上鎖成功纔算成功。 lock.lock(); // 嘗試加鎖,最多等待100秒,上鎖之後10秒自動解鎖 boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }
Redisson的RedissonRedLock對象實現了Redlock介紹的加鎖算法。該對象也能夠用來將多個RLock對象關聯爲一個紅鎖,每一個RLock對象實例能夠來自於不一樣的Redisson實例。
public void testRedLock(RedissonClient redisson1,RedissonClient redisson2, RedissonClient redisson3){ RLock lock1 = redisson1.getLock("lock1"); RLock lock2 = redisson2.getLock("lock2"); RLock lock3 = redisson3.getLock("lock3"); RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3); try { // 同時加鎖:lock1 lock2 lock3, 紅鎖在大部分節點上加鎖成功就算成功。 lock.lock(); // 嘗試加鎖,最多等待100秒,上鎖之後10秒自動解鎖 boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }
Redisson的分佈式可重入讀寫鎖RReadWriteLock,Java對象實現了java.util.concurrent.locks.ReadWriteLock接口。同時還支持自動過時解鎖。該對象容許同時有多個讀取鎖,可是最多隻能有一個寫入鎖。
RReadWriteLock rwlock = redisson.getLock("anyRWLock"); // 最多見的使用方法 rwlock.readLock().lock(); // 或 rwlock.writeLock().lock(); // 支持過時解鎖功能 // 10秒鐘之後自動解鎖 // 無需調用unlock方法手動解鎖 rwlock.readLock().lock(10, TimeUnit.SECONDS); // 或 rwlock.writeLock().lock(10, TimeUnit.SECONDS); // 嘗試加鎖,最多等待100秒,上鎖之後10秒自動解鎖 boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS); // 或 boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS); ... lock.unlock();
Redisson的分佈式信號量(Semaphore)Java對象RSemaphore採用了與java.util.concurrent.Semaphore類似的接口和用法。
RSemaphore semaphore = redisson.getSemaphore("semaphore"); semaphore.acquire(); //或 semaphore.acquireAsync(); semaphore.acquire(23); semaphore.tryAcquire(); //或 semaphore.tryAcquireAsync(); semaphore.tryAcquire(23, TimeUnit.SECONDS); //或 semaphore.tryAcquireAsync(23, TimeUnit.SECONDS); semaphore.release(10); semaphore.release(); //或 semaphore.releaseAsync();
Redisson的可過時性信號量(PermitExpirableSemaphore)實在RSemaphore對象的基礎上,爲每一個信號增長了一個過時時間。每一個信號能夠經過獨立的ID來辨識,釋放時只能經過提交這個ID才能釋放。
RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("mySemaphore"); String permitId = semaphore.acquire(); // 獲取一個信號,有效期只有2秒鐘。 String permitId = semaphore.acquire(2, TimeUnit.SECONDS); // ... semaphore.release(permitId);
Redisson的分佈式閉鎖(CountDownLatch)Java對象RCountDownLatch採用了與java.util.concurrent.CountDownLatch類似的接口和用法。
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch"); latch.trySetCount(1); latch.await(); // 在其餘線程或其餘JVM裏 RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch"); latch.countDown();
關於Redis分佈式鎖的高可用問題,大體以下:
在master- slave的集羣架構中,就是若是你對某個redis master實例,寫入了DISLOCK這種鎖key的value,此時會異步複製給對應的master slave實例。
可是,這個過程當中一旦發生redis master宕機,主備切換,redis slave變爲了redis master。而此時的主從複製沒有完全完成.....
接着就會致使,客戶端2來嘗試加鎖的時候,在新的redis master上完成了加鎖,而客戶端1也覺得本身成功加了鎖。
此時就會致使多個客戶端對一個分佈式鎖完成了加鎖。
這時系統在業務語義上必定會出現問題,致使髒數據的產生。
因此這個是是redis master-slave架構的主從異步複製致使的redis分佈式鎖的最大缺陷:
在redis master實例宕機的時候,可能致使多個客戶端同時完成加鎖。
RedLock算法思想:
不能只在一個redis實例上建立鎖,應該是在多個redis實例上建立鎖,n / 2 + 1,必須在大多數redis節點上都成功建立鎖,才能算這個總體的RedLock加鎖成功,避免說僅僅在一個redis實例上加鎖而帶來的問題。
這個場景是假設有一個 redis cluster,有 5 個 redis master 實例。而後執行以下步驟獲取一把紅鎖:
RedLock是基於redis實現的分佈式鎖,它可以保證如下特性:
互斥性:在任什麼時候候,只能有一個客戶端可以持有鎖;避免死鎖:
當客戶端拿到鎖後,即便發生了網絡分區或者客戶端宕機,也不會發生死鎖;(利用key的存活時間)
容錯性:只要多數節點的redis實例正常運行,就可以對外提供服務,加鎖或者釋放鎖;
以sentinel模式架構爲例,以下圖所示,有sentinel-1,sentinel-2,sentinel-3總計3個sentinel模式集羣,若是要獲取分佈式鎖,那麼須要向這3個sentinel集羣經過EVAL命令執行LUA腳本,須要3/2+1=2,即至少2個sentinel集羣響應成功,纔算成功的以Redlock算法獲取到分佈式鎖:
提早說明,使用redis分佈式鎖,是追求高性能, 在cap理論中,追求的是 ap 而不是cp。
因此,若是追求高可用,建議使用 zookeeper分佈式鎖。
redis分佈式鎖可能致使的數據不一致性,建議使用業務補償的方式去彌補。因此,不太建議使用紅鎖,可是從學習的層面來講,你們仍是必定要掌握的。
Redisson中有一個MultiLock
的概念,能夠將多個鎖合併爲一個大鎖,對一個大鎖進行統一的申請加鎖以及釋放鎖
而Redisson中實現RedLock就是基於MultiLock
去作的,接下來就具體看看對應的實現吧
先看下官方的代碼使用:
(https://github.com/redisson/redisson/wiki/8.-distributed-locks-and-synchronizers#84-redlock)
RLock lock1 = redisson1.getLock("lock1"); RLock lock2 = redisson2.getLock("lock2"); RLock lock3 = redisson3.getLock("lock3"); RLock redLock = anyRedisson.getRedLock(lock1, lock2, lock3); // traditional lock method redLock.lock(); // or acquire lock and automatically unlock it after 10 seconds redLock.lock(10, TimeUnit.SECONDS); // or wait for lock aquisition up to 100 seconds // and automatically unlock it after 10 seconds boolean res = redLock.tryLock(100, 10, TimeUnit.SECONDS); if (res) { try { ... } finally { redLock.unlock(); } }
這裏是分別對3個redis實例加鎖,而後獲取一個最後的加鎖結果。
上面示例中使用redLock.lock()或者tryLock()最終都是執行RedissonRedLock
中方法。
RedissonRedLock
繼承自RedissonMultiLock
, 實現了其中的一些方法:
public class RedissonRedLock extends RedissonMultiLock { public RedissonRedLock(RLock... locks) { super(locks); } /** * 鎖能夠失敗的次數,鎖的數量-鎖成功客戶端最小的數量 */ @Override protected int failedLocksLimit() { return locks.size() - minLocksAmount(locks); } /** * 鎖的數量 / 2 + 1,例若有3個客戶端加鎖,那麼最少須要2個客戶端加鎖成功 */ protected int minLocksAmount(final List<RLock> locks) { return locks.size()/2 + 1; } /** * 計算多個客戶端一塊兒加鎖的超時時間,每一個客戶端的等待時間 * remainTime默認爲4.5s */ @Override protected long calcLockWaitTime(long remainTime) { return Math.max(remainTime / locks.size(), 1); } @Override public void unlock() { unlockInner(locks); } }
看到locks.size()/2 + 1
,例如咱們有3個客戶端實例,那麼最少2個實例加鎖成功纔算分佈式鎖加鎖成功。
接着咱們看下lock()
的具體實現
public class RedissonMultiLock implements Lock { final List<RLock> locks = new ArrayList<RLock>(); public RedissonMultiLock(RLock... locks) { if (locks.length == 0) { throw new IllegalArgumentException("Lock objects are not defined"); } this.locks.addAll(Arrays.asList(locks)); } public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long newLeaseTime = -1; if (leaseTime != -1) { // 若是等待時間設置了,那麼將等待時間 * 2 newLeaseTime = unit.toMillis(waitTime)*2; } // time爲當前時間戳 long time = System.currentTimeMillis(); long remainTime = -1; if (waitTime != -1) { remainTime = unit.toMillis(waitTime); } // 計算鎖的等待時間,RedLock中:若是remainTime=-1,那麼lockWaitTime爲1 long lockWaitTime = calcLockWaitTime(remainTime); // RedLock中failedLocksLimit即爲n/2 + 1 int failedLocksLimit = failedLocksLimit(); List<RLock> acquiredLocks = new ArrayList<RLock>(locks.size()); // 循環每一個redis客戶端,去獲取鎖 for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) { RLock lock = iterator.next(); boolean lockAcquired; try { // 調用tryLock方法去獲取鎖,若是獲取鎖成功,則lockAcquired=true if (waitTime == -1 && leaseTime == -1) { lockAcquired = lock.tryLock(); } else { long awaitTime = Math.min(lockWaitTime, remainTime); lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS); } } catch (Exception e) { lockAcquired = false; } // 若是獲取鎖成功,將鎖加入到list集合中 if (lockAcquired) { acquiredLocks.add(lock); } else { // 若是獲取鎖失敗,判斷失敗次數是否等於失敗的限制次數 // 好比,3個redis客戶端,最多隻能失敗1次 // 這裏locks.size = 3, 3-x=1,說明只要成功了2次就能夠直接break掉循環 if (locks.size() - acquiredLocks.size() == failedLocksLimit()) { break; } // 若是最大失敗次數等於0 if (failedLocksLimit == 0) { // 釋放全部的鎖,RedLock加鎖失敗 unlockInner(acquiredLocks); if (waitTime == -1 && leaseTime == -1) { return false; } failedLocksLimit = failedLocksLimit(); acquiredLocks.clear(); // 重置迭代器 重試再次獲取鎖 while (iterator.hasPrevious()) { iterator.previous(); } } else { // 失敗的限制次數減一 // 好比3個redis實例,最大的限制次數是1,若是遍歷第一個redis實例,失敗了,那麼failedLocksLimit會減成0 // 若是failedLocksLimit就會走上面的if邏輯,釋放全部的鎖,而後返回false failedLocksLimit--; } } if (remainTime != -1) { remainTime -= (System.currentTimeMillis() - time); time = System.currentTimeMillis(); if (remainTime <= 0) { unlockInner(acquiredLocks); return false; } } } if (leaseTime != -1) { List<RFuture<Boolean>> futures = new ArrayList<RFuture<Boolean>>(acquiredLocks.size()); for (RLock rLock : acquiredLocks) { RFuture<Boolean> future = rLock.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS); futures.add(future); } for (RFuture<Boolean> rFuture : futures) { rFuture.syncUninterruptibly(); } } return true; } }
核心代碼都已經加了註釋,實現原理其實很簡單,基於RedLock思想,遍歷全部的Redis客戶端,而後依次加鎖,最後統計成功的次數來判斷是否加鎖成功。
分佈式鎖一旦加了以後,對同一個商品的下單請求,會致使全部下單操做,都必須對同一個商品key加分佈式鎖。
假設一個商品1分鐘6000訂單,每秒的 600個下單操做,假設加鎖以後,釋放鎖以前,查庫存 -> 建立訂單 -> 扣減庫存,每一個IO操做100ms,大概300毫秒。
具體以下圖:
能夠再進行一下優化,將 建立訂單 + 扣減庫存 併發執行,將兩個100ms 減小爲一個100ms,這既是空間換時間的思想,大概200毫秒。
將 建立訂單 + 扣減庫存 批量執行,減小一次IO,也是大概200毫秒。
這個優化方案,有個重要的前提,就是 訂單表和庫存表在相同的庫中,可是,這個前提條件,在數據量大+高併發的場景下,夠嗆。
那麼,一秒內,只能完成多少個商品的秒殺訂單的下單操做呢?
1000毫秒 / 200 =5 個訂單
如何達到每秒600個下單呢? 仍是要從基礎知識裏邊尋找答案?
分段加鎖的思想來源與基礎知識。
我常常在瘋狂創客圈社羣裏邊,對小夥伴們強調 基礎知識的重要性,反覆強調, 《Java 高併發三部曲》 必定要多刷,最好刷三遍。
中 《Java 高併發核心編程 卷2》 介紹了 JUC的 LongAdder 和 ConcurrentHashMap的源碼和底層原理,他們提高性能的辦法是:
空間換時間, 分段加鎖
尤爲是 LongAdder 的實現思想,能夠用於 Redis分佈式鎖 做爲性能提高的手段,將 Redis分佈式鎖 優化爲 Redis分段鎖。
有關LongAdder 的系統化學習,請參見 《Java 高併發核心編程 卷2》
回到前面的場景:
假設一個商品1分鐘6000訂單,每秒的 600個下單操做,假設加鎖以後,釋放鎖以前,查庫存 -> 建立訂單 -> 扣減庫存,通過優化,每一個IO操做100ms,大概200毫秒,一秒鐘5個訂單。
爲了達到每秒600個訂單,能夠將鎖分紅 600 /5 =120 個段, 每一次使用隨機算法,隨機到一個分段, 若是不行,就輪詢下一個分段,具體的流程,大體以下:
缺點:
這個是一個理論的時間預估,沒有扣除 嘗試下一個分段的 時間, 另外,實際上的性能, 會比理論上差,從我們實操案例的測試結果,也能夠證實這點。
尼恩的忠實建議:
理論水平的提高,看看視頻、看看書,只有兩個字,就是須要:多看。
實戰水平的提高,只有兩個字,就是須要:多幹。
參照 LongAdder ,手寫一個Redis分段鎖, 仍是有點複雜,可是很重要,建議你們動手幹一票.
手寫一個Redis分段鎖的實操,是高併發實戰的重要動手實操之一。
有關Redis分段鎖的實操的具體材料、源碼、問題,歡迎來 瘋狂創客圈社羣交流。
高併發Java發燒友社羣 - 瘋狂創客圈 總入口 點擊瞭解詳情:
圖書:《Netty Zookeeper Redis 高併發實戰》 圖書簡介 - 瘋狂創...
圖書:《Netty Zookeeper Redis 高併發實戰》 圖書簡介 - 瘋狂創...
基於Zookeeper 的分佈式鎖實現 - SegmentFault 思否