Redis分佈式鎖 (圖解-秒懂-史上最全)

文章很長,並且持續更新,建議收藏起來,慢慢讀! Java 高併發 發燒友社羣:瘋狂創客圈(總入口) 奉上如下珍貴的學習資源:javascript


推薦:入大廠 、作架構、大力提高Java 內功 的 精彩博文

入大廠 、作架構、大力提高Java 內功 必備的精彩博文 2021 秋招漲薪1W + 必備的精彩博文
1:Redis 分佈式鎖 (圖解-秒懂-史上最全) 2:Zookeeper 分佈式鎖 (圖解-秒懂-史上最全)
3: Redis與MySQL雙寫一致性如何保證? (面試必備) 4: 面試必備:秒殺超賣 解決方案 (史上最全)
5:面試必備之:Reactor模式 6: 10分鐘看懂, Java NIO 底層原理
7:TCP/IP(圖解+秒懂+史上最全) 8:Feign原理 (圖解)
9:DNS圖解(秒懂 + 史上最全 + 高薪必備) 10:CDN圖解(秒懂 + 史上最全 + 高薪必備)
10: 分佈式事務( 圖解 + 史上最全 + 吐血推薦 )

Java 面試題 30個專題 , 史上最全 , 面試必刷 阿里、京東、美團... 隨意挑、橫着走!!!
1: JVM面試題(史上最強、持續更新、吐血推薦) 2:Java基礎面試題(史上最全、持續更新、吐血推薦
3:架構設計面試題 (史上最全、持續更新、吐血推薦) 4:設計模式面試題 (史上最全、持續更新、吐血推薦)
1七、分佈式事務面試題 (史上最全、持續更新、吐血推薦) 一致性協議 (史上最全)
2九、多線程面試題(史上最全) 30、HR面經,過五關斬六將後,當心陰溝翻船!
9.網絡協議面試題(史上最全、持續更新、吐血推薦) 更多專題, 請參見【 瘋狂創客圈 高併發 總目錄

SpringCloud 精彩博文
nacos 實戰(史上最全) sentinel (史上最全+入門教程)
SpringCloud gateway (史上最全) 更多專題, 請參見【 瘋狂創客圈 高併發 總目錄

跨JVM的線程安全問題

在單體的應用開發場景中,在多線程的環境下,涉及併發同步的時候,爲了保證一個代碼塊在同一時間只能由一個線程訪問,咱們通常能夠使用synchronized語法和ReetrantLock去保證,這其實是本地鎖的方式。html

也就是說,在同一個JVM內部,你們每每採用synchronized或者Lock的方式來解決多線程間的安全問題。但在分佈式集羣工做的開發場景中,在JVM之間,那麼就須要一種更加高級的鎖機制,來處理種跨JVM進程之間的線程安全問題.java

解決方案是:使用分佈式鎖node

總之,對於分佈式場景,咱們能夠使用分佈式鎖,它是控制分佈式系統之間互斥訪問共享資源的一種方式。mysql

好比說在一個分佈式系統中,多臺機器上部署了多個服務,當客戶端一個用戶發起一個數據插入請求時,若是沒有分佈式鎖機制保證,那麼那多臺機器上的多個服務可能進行併發插入操做,致使數據重複插入,對於某些不容許有多餘數據的業務來講,這就會形成問題。而分佈式鎖機制就是爲了解決相似這類問題,保證多個服務之間互斥的訪問共享資源,若是一個服務搶佔了分佈式鎖,其餘服務沒獲取到鎖,就不進行後續操做。git

大體意思以下圖所示(不必定準確):程序員

在這裏插入圖片描述

何爲分佈式鎖?

何爲分佈式鎖?github

  • 當在分佈式模型下,數據只有一份(或有限制),此時須要利用鎖的技術控制某一時刻修改數據的進程數。
  • 用一個狀態值表示鎖,對鎖的佔用和釋放經過狀態值來標識。

分佈式鎖的條件:面試

  • 互斥性。在任意時刻,只有一個客戶端能持有鎖。
  • 不會發生死鎖。即便有一個客戶端在持有鎖的期間崩潰而沒有主動解鎖,也能保證後續其餘客戶端能加鎖。
  • 具備容錯性。只要大部分的 Redis 節點正常運行,客戶端就能夠加鎖和解鎖。
  • 解鈴還須繫鈴人。加鎖和解鎖必須是同一個客戶端,客戶端本身不能把別人加的鎖給解了。

分佈式鎖的實現:redis

分佈式鎖的實現由不少種,文件鎖、數據庫、redis等等,比較多;分佈式鎖常見的多種實現方式:

  1. 數據庫悲觀鎖、
  2. 數據庫樂觀鎖;
  3. 基於Redis的分佈式鎖;
  4. 基於ZooKeeper的分佈式鎖。

在實踐中,仍是redis作分佈式鎖性能會高一些


數據庫悲觀鎖

所謂悲觀鎖,悲觀鎖是對數據被的修改持悲觀態度(認爲數據在被修改的時候必定會存在併發問題),所以在整個數據處理過程當中將數據鎖定。

悲觀鎖的實現,每每依靠數據庫提供的鎖機制(也只有數據庫層提供的鎖機制才能真正保證數據訪問的排他性,不然,即便在應用層中實現了加鎖機制,也沒法保證外部系統不會修改數據)。

數據庫的行鎖、表鎖、排他鎖等都是悲觀鎖,這裏以行鎖爲例,進行介紹。以咱們經常使用的MySQL爲例,咱們經過使用select...for update語句, 執行該語句後,會在表上加持行鎖,一直到事務提交,解除行鎖。

使用場景舉例:

在秒殺案例中,生成訂單和扣減庫存的操做,能夠經過商品記錄的行鎖,進行保護。們經過使用select...for update語句,在查詢商品表庫存時將該條記錄加鎖,待下單減庫存完成後,再釋放鎖。

示例的SQL以下:

//0.開始事務
begin; 
	
//1.查詢出商品信息

select stockCount from seckill_good where id=1 for update;

//2.根據商品信息生成訂單

insert into seckill_order (id,good_id) values (null,1);

//3.修改商品stockCount減一

update seckill_good set stockCount=stockCount-1 where id=1;

//4.提交事務

commit;

以上,在對id = 1的記錄修改前,先經過for update的方式進行加鎖,而後再進行修改。這就是比較典型的悲觀鎖策略。

若是以上修改庫存的代碼發生併發,同一時間只有一個線程能夠開啓事務並得到id=1的鎖,其它的事務必須等本次事務提交以後才能執行。這樣咱們能夠保證當前的數據不會被其它事務修改。

咱們使用select_for_update,另一定要寫在事務中.

注意:要使用悲觀鎖,咱們必須關閉mysql數據庫中自動提交的屬性,命令set autocommit=0;便可關閉,由於MySQL默認使用autocommit模式,也就是說,當你執行一個更新操做後,MySQL會馬上將結果進行提交。

悲觀鎖的實現,每每依靠數據庫提供的鎖機制。在數據庫中,悲觀鎖的流程以下:

  • 在對記錄進行修改前,先嚐試爲該記錄加上排他鎖(exclusive locking)。
  • 若是加鎖失敗,說明該記錄正在被修改,那麼當前查詢可能要等待或者拋出異常。具體響應方式由開發者根據實際須要決定。
  • 若是成功加鎖,那麼就能夠對記錄作修改,事務完成後就會解鎖了。
  • 其間若是有其餘事務對該記錄作加鎖的操做,都要等待當前事務解鎖或直接拋出異常。

數據庫樂觀鎖

使用樂觀鎖就不須要藉助數據庫的鎖機制了。

樂觀鎖的概念中其實已經闡述了他的具體實現細節:主要就是兩個步驟:衝突檢測和數據更新。其實現方式有一種比較典型的就是Compare and Swap(CAS)技術

CAS是項樂觀鎖技術,當多個線程嘗試使用CAS同時更新同一個變量時,只有其中一個線程能更新變量的值,而其它線程都失敗,失敗的線程並不會被掛起,而是被告知此次競爭中失敗,並能夠再次嘗試。

CAS的實現中,在表中增長一個version字段,操做前先查詢version信息,在數據提交時檢查version字段是否被修改,若是沒有被修改則進行提交,不然認爲是過時數據。

好比前面的扣減庫存問題,經過樂觀鎖能夠實現以下:

//1.查詢出商品信息
			
select stockCount, version from seckill_good where id=1;
			
//2.根據商品信息生成訂單
insert into seckill_order (id,good_id) values (null,1);

//3.修改商品庫存
update seckill_good set stockCount=stockCount-1, version = version+1 where id=1, version=version;

以上,咱們在更新以前,先查詢一下庫存表中當前版本(version),而後在作update的時候,以version 做爲一個修改條件。

當咱們提交更新的時候,判斷數據庫表對應記錄的當前version與第一次取出來的version進行比對,若是數據庫表當前version與第一次取出來的version相等,則予以更新,不然認爲是過時數據。

CAS 樂觀鎖有兩個問題:

(1) CAS 存在一個比較重要的問題,即ABA問題. 解決的辦法是version字段順序遞增。

(2) 樂觀鎖的方式,在高併發時,只有一個線程能執行成功,會形成大量的失敗,這給用戶的體驗顯然是很很差的。


Zookeeper分佈式鎖

除了在數據庫層面加分佈式鎖,一般還能夠使用如下更高性能、更高可用的分佈式鎖:

  • 分佈式緩存(如redis)鎖
  • 分佈式協調(如zookeeper)鎖

有關zookeeper分佈式鎖的原理和實現,具體請參見下面的博客:
Zookeeper 分佈式鎖 (圖解+秒懂+史上最全)

或者閱讀筆者的《Java高併發核心編程(卷1)》

在這裏插入圖片描述


Redis分佈式鎖

本文重點介紹Redis分佈式鎖,分爲兩個維度進行介紹:

(1)基於Jedis手工造輪子分佈式鎖

(2)介紹Redission 分佈式鎖的使用和原理。

分佈式鎖通常有以下的特色:

  • 互斥性: 同一時刻只能有一個線程持有鎖
  • 可重入性: 同一節點上的同一個線程若是獲取了鎖以後可以再次獲取鎖
  • 鎖超時:和J.U.C中的鎖同樣支持鎖超時,防止死鎖
  • 高性能和高可用: 加鎖和解鎖須要高效,同時也須要保證高可用,防止分佈式鎖失效
  • 具有阻塞和非阻塞性:可以及時從阻塞狀態中被喚醒

手工造輪子:基於Jedis 的API實現分佈式鎖

咱們首先講解 Jedis 普通分佈式鎖實現,而且是純手工的模式,從最爲基礎的Redis命令開始。

只有充分了解與分佈式鎖相關的普通Redis命令,才能更好的瞭解高級的Redis分佈式鎖的實現,由於高級的分佈式鎖的實現徹底基於普通Redis命令。

Redis幾種架構

Redis發展到如今,幾種常見的部署架構有:

  • 單機模式;
  • 主從模式;
  • 哨兵模式;
  • 集羣模式;

從分佈式鎖的角度來講, 不管是單機模式、主從模式、哨兵模式、集羣模式,其原理都是類同的。 只是主從模式、哨兵模式、集羣模式的更加的高可用、或者更加高併發。

因此,接下來先基於單機模式,基於Jedis手工造輪子實現本身的分佈式鎖。

首先看兩個命令:

Redis分佈式鎖機制,主要藉助setnx和expire兩個命令完成。

setnx命令:

SETNX 是SET if Not eXists的簡寫。將 key 的值設爲 value,當且僅當 key 不存在; 若給定的 key 已經存在,則 SETNX 不作任何動做。

下面爲客戶端使用示例:

127.0.0.1:6379> set lock "unlock"
OK
127.0.0.1:6379> setnx lock "unlock"
(integer) 0
127.0.0.1:6379> setnx lock "lock"
(integer) 0
127.0.0.1:6379>

expire命令:

expire命令爲 key 設置生存時間,當 key 過時時(生存時間爲 0 ),它會被自動刪除. 其格式爲:

EXPIRE key seconds

下面爲客戶端使用示例:

127.0.0.1:6379> expire lock 10
(integer) 1
127.0.0.1:6379> ttl lock
8

基於Jedis API的分佈式鎖的整體流程:

經過Redis的setnx、expire命令能夠實現簡單的鎖機制:

  • key不存在時建立,並設置value和過時時間,返回值爲1;成功獲取到鎖;
  • 如key存在時直接返回0,搶鎖失敗;
  • 持有鎖的線程釋放鎖時,手動刪除key; 或者過時時間到,key自動刪除,鎖釋放。

線程調用setnx方法成功返回1認爲加鎖成功,其餘線程要等到當前線程業務操做完成釋放鎖後,才能再次調用setnx加鎖成功。

在這裏插入圖片描述

以上簡單redis分佈式鎖的問題:

若是出現了這麼一個問題:若是setnx是成功的,可是expire設置失敗,一旦出現了釋放鎖失敗,或者沒有手工釋放,那麼這個鎖永遠被佔用,其餘線程永遠也搶不到鎖。

因此,須要保障setnx和expire兩個操做的原子性,要麼所有執行,要麼所有不執行,兩者不能分開。

解決的辦法有兩種:

  • 使用set的命令時,同時設置過時時間,再也不單獨使用 expire命令
  • 使用lua腳本,將加鎖的命令放在lua腳本中原子性的執行

簡單加鎖:使用set的命令時,同時設置過時時間

使用set的命令時,同時設置過時時間的示例以下:

127.0.0.1:6379> set unlock "234" EX 100 NX
(nil)
127.0.0.1:6379> 
127.0.0.1:6379> set test "111" EX 100 NX
OK

這樣就完美的解決了分佈式鎖的原子性; set 命令的完整格式:

set key value [EX seconds] [PX milliseconds] [NX|XX]

EX seconds:設置失效時長,單位秒
PX milliseconds:設置失效時長,單位毫秒
NX:key不存在時設置value,成功返回OK,失敗返回(nil)
XX:key存在時設置value,成功返回OK,失敗返回(nil)

使用set命令實現加鎖操做,先展現加鎖的簡單代碼實習,再帶你們慢慢解釋爲何這樣實現。

加鎖的簡單代碼實現

package com.crazymaker.springcloud.standard.lock;


@Slf4j
@Data
@AllArgsConstructor
public class JedisCommandLock {

    private  RedisTemplate redisTemplate;

    private static final String LOCK_SUCCESS = "OK";
    private static final String SET_IF_NOT_EXIST = "NX";
    private static final String SET_WITH_EXPIRE_TIME = "PX";

    /**
     * 嘗試獲取分佈式鎖
     * @param jedis Redis客戶端
     * @param lockKey 鎖
     * @param requestId 請求標識
     * @param expireTime 超期時間
     * @return 是否獲取成功
     */
    public static   boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {

        String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);

        if (LOCK_SUCCESS.equals(result)) {
            return true;
        }
        return false;

    }

  

}

能夠看到,咱們加鎖用到了Jedis的set Api

jedis.set(String key, String value, String nxxx, String expx, int time)

這個set()方法一共有五個形參:

  • 第一個爲key,咱們使用key來當鎖,由於key是惟一的。

  • 第二個爲value,咱們傳的是requestId,不少童鞋可能不明白,有key做爲鎖不就夠了嗎,爲何還要用到value?緣由就是咱們在上面講到可靠性時,分佈式鎖要知足第四個條件解鈴還須繫鈴人,經過給value賦值爲requestId,咱們就知道這把鎖是哪一個請求加的了,在解鎖的時候就能夠有依據。

    requestId能夠使用UUID.randomUUID().toString()方法生成。

  • 第三個爲nxxx,這個參數咱們填的是NX,意思是SET IF NOT EXIST,即當key不存在時,咱們進行set操做;若key已經存在,則不作任何操做;

  • 第四個爲expx,這個參數咱們傳的是PX,意思是咱們要給這個key加一個過時的設置,具體時間由第五個參數決定。

  • 第五個爲time,與第四個參數相呼應,表明key的過時時間。

總的來講,執行上面的set()方法就只會致使兩種結果:

  1. 當前沒有鎖(key不存在),那麼就進行加鎖操做,並對鎖設置個有效期,同時value表示加鎖的客戶端。
  2. 已有鎖存在,不作任何操做。

心細的童鞋就會發現了,咱們的加鎖代碼知足前面描述的四個條件中的三個。

  • 首先,set()加入了NX參數,能夠保證若是已有key存在,則函數不會調用成功,也就是隻有一個客戶端能持有鎖,知足互斥性。

  • 其次,因爲咱們對鎖設置了過時時間,即便鎖的持有者後續發生崩潰而沒有解鎖,鎖也會由於到了過時時間而自動解鎖(即key被刪除),不會被永遠佔用(而發生死鎖)。

  • 最後,由於咱們將value賦值爲requestId,表明加鎖的客戶端請求標識,那麼在客戶端在解鎖的時候就能夠進行校驗是不是同一個客戶端。

  • 因爲咱們只考慮Redis單機部署的場景,因此容錯性咱們暫不考慮。

基於Jedis 的API實現簡單解鎖代碼

仍是先展現代碼,再帶你們慢慢解釋爲何這樣實現。

解鎖的簡單代碼實現

package com.crazymaker.springcloud.standard.lock;


@Slf4j
@Data
@AllArgsConstructor
public class JedisCommandLock {

  

    private static final Long RELEASE_SUCCESS = 1L;

    /**
     * 釋放分佈式鎖
     * @param jedis Redis客戶端
     * @param lockKey 鎖
     * @param requestId 請求標識
     * @return 是否釋放成功
     */
    public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {

        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));

        if (RELEASE_SUCCESS.equals(result)) {
            return true;
        }
        return false;

    }

}

那麼這段Lua代碼的功能是什麼呢?

其實很簡單,首先獲取鎖對應的value值,檢查是否與requestId相等,若是相等則刪除鎖(解鎖)。

第一行代碼,咱們寫了一個簡單的Lua腳本代碼。

第二行代碼,咱們將Lua代碼傳到jedis.eval()方法裏,並使參數KEYS[1]賦值爲lockKey,ARGV[1]賦值爲requestId。eval()方法是將Lua代碼交給Redis服務端執行。

那麼爲何要使用Lua語言來實現呢?

由於要確保上述操做是原子性的。那麼爲何執行eval()方法能夠確保原子性,源於Redis的特性.

簡單來講,就是在eval命令執行Lua代碼的時候,Lua代碼將被當成一個命令去執行,而且直到eval命令執行完成,Redis纔會執行其餘命

錯誤示例1

最多見的解鎖代碼就是直接使用 jedis.del() 方法刪除鎖,這種不先判斷鎖的擁有者而直接解鎖的方式,會致使任何客戶端均可以隨時進行解鎖,即便這把鎖不是它的。

public static void wrongReleaseLock1(Jedis jedis, String lockKey) {
    jedis.del(lockKey);
}

錯誤示例2

這種解鎖代碼乍一看也是沒問題,甚至我以前也差點這樣實現,與正確姿式差很少,惟一區別的是分紅兩條命令去執行,代碼以下:

public static void wrongReleaseLock2(Jedis jedis, String lockKey, String requestId) {
        
    // 判斷加鎖與解鎖是否是同一個客戶端
    if (requestId.equals(jedis.get(lockKey))) {
        // 若在此時,這把鎖忽然不是這個客戶端的,則會誤解鎖
        jedis.del(lockKey);
    }

}

再造輪子:基於Lua腳本實現分佈式鎖

lua腳本的好處

前面提到,在redis中執行lua腳本,有以下的好處:

那麼爲何要使用Lua語言來實現呢?

由於要確保上述操做是原子性的。那麼爲何執行eval()方法能夠確保原子性,源於Redis的特性.

簡單來講,就是在eval命令執行Lua代碼的時候,Lua代碼將被當成一個命令去執行,而且直到eval命令執行完成,Redis纔會執行其餘命

因此:

大部分的開源框架(如 redission)中的分佈式鎖組件,都是用純lua腳本實現的。

題外話: lua腳本是高併發、高性能的必備腳本語言

有關lua的詳細介紹,請參見如下書籍:

在這裏插入圖片描述

那麼,咱們也來模擬一下

基於純Lua腳本的分佈式鎖的執行流程

加鎖和刪除鎖的操做,使用純lua進行封裝,保障其執行時候的原子性。

基於純Lua腳本實現分佈式鎖的執行流程,大體以下:

在這裏插入圖片描述

加鎖的Lua腳本: lock.lua

--- -1 failed
--- 1 success
---
local key = KEYS[1]
local requestId = KEYS[2]
local ttl = tonumber(KEYS[3])
local result = redis.call('setnx', key, requestId)
if result == 1 then
    --PEXPIRE:以毫秒的形式指定過時時間
    redis.call('pexpire', key, ttl)
else
    result = -1;
    -- 若是value相同,則認爲是同一個線程的請求,則認爲重入鎖
    local value = redis.call('get', key)
    if (value == requestId) then
        result = 1;
        redis.call('pexpire', key, ttl)
    end
end
--  若是獲取鎖成功,則返回 1
return result

解鎖的Lua腳本: unlock.lua:

--- -1 failed
--- 1 success

-- unlock key
local key = KEYS[1]
local requestId = KEYS[2]
local value = redis.call('get', key)
if value == requestId then
    redis.call('del', key);
    return 1;
end
return -1

兩個文件,放在資源文件夾下備用:

在這裏插入圖片描述

在Java中調用lua腳本,完成加鎖操做

下一步,實現Lock接口, 完成JedisLock的分佈式鎖。

其加鎖操做,經過調用 lock.lua腳本完成,代碼以下:

package com.crazymaker.springcloud.standard.lock;

import com.crazymaker.springcloud.common.exception.BusinessException;
import com.crazymaker.springcloud.common.util.ThreadUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

@Slf4j
@Data
@AllArgsConstructor
public class JedisLock implements Lock {

    private RedisTemplate redisTemplate;

    RedisScript<Long> lockScript = null;
    RedisScript<Long> unLockScript = null;

    public static final int DEFAULT_TIMEOUT = 2000;
    public static final Long LOCKED = Long.valueOf(1);
    public static final Long UNLOCKED = Long.valueOf(1);
    public static final Long WAIT_GAT = Long.valueOf(200);
    public static final int EXPIRE = 2000;


    String key;
    String lockValue;  // lockValue 鎖的value ,表明線程的uuid

    /**
     * 默認爲2000ms
     */
    long expire = 2000L;

    public JedisLock(String lockKey, String lockValue) {
        this.key = lockKey;
        this.lockValue = lockValue;
    }

    private volatile boolean isLocked = false;

    private Thread thread;

    /**
     * 獲取一個分佈式鎖 , 超時則返回失敗
     *
     * @return 獲鎖成功 - true | 獲鎖失敗 - false
     */
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {

        //本地可重入
        if (isLocked && thread == Thread.currentThread()) {
            return true;
        }
        expire = unit != null ? unit.toMillis(time) : DEFAULT_TIMEOUT;
        long startMillis = System.currentTimeMillis();
        Long millisToWait = expire;

        boolean localLocked = false;

        int turn = 1;
        while (!localLocked) {

            localLocked = this.lockInner(expire);
            if (!localLocked) {
                millisToWait = millisToWait - (System.currentTimeMillis() - startMillis);
                startMillis = System.currentTimeMillis();
                if (millisToWait > 0L) {
                    /**
                     * 尚未超時
                     */
                    ThreadUtil.sleepMilliSeconds(WAIT_GAT);
                    log.info("睡眠一下,從新開始,turn:{},剩餘時間:{}", turn++, millisToWait);
                } else {
                    log.info("搶鎖超時");
                    return false;
                }
            } else {
                isLocked = true;
                localLocked = true;
            }
        }
        return isLocked;
    }


  
    /**
     * 有返回值的搶奪鎖
     *
     * @param millisToWait
     */
    public boolean lockInner(Long millisToWait) {
        if (null == key) {
            return false;
        }
        try {
            List<String> redisKeys = new ArrayList<>();
            redisKeys.add(key);
            redisKeys.add(lockValue);
            redisKeys.add(String.valueOf(millisToWait));
            Long res = (Long) redisTemplate.execute(lockScript, redisKeys);

            return res != null && res.equals(LOCKED);
        } catch (Exception e) {
            e.printStackTrace();
            throw BusinessException.builder().errMsg("搶鎖失敗").build();
        }

    }

   
}

在Java中調用lua腳本,完成解鎖操做

其解鎖操做,經過調用unlock.lua腳本完成,代碼以下:

package com.crazymaker.springcloud.standard.lock;

import com.crazymaker.springcloud.common.exception.BusinessException;
import com.crazymaker.springcloud.common.util.ThreadUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

@Slf4j
@Data
@AllArgsConstructor
public class JedisLock implements Lock {

    private RedisTemplate redisTemplate;

    RedisScript<Long> lockScript = null;
    RedisScript<Long> unLockScript = null;

    //釋放鎖
    @Override
    public void unlock() {
        if (key == null || requestId == null) {
            return;
        }
        try {
            List<String> redisKeys = new ArrayList<>();
            redisKeys.add(key);
            redisKeys.add(requestId);
            Long res = (Long) redisTemplate.execute(unLockScript, redisKeys);

        } catch (Exception e) {
            e.printStackTrace();
            throw BusinessException.builder().errMsg("釋放鎖失敗").build();

        }
    }
  
   
}

編寫RedisLockService用於管理JedisLock

編寫個分佈式鎖服務,用於加載lua腳本,建立 分佈式鎖,代碼以下:

package com.crazymaker.springcloud.standard.lock;

import com.crazymaker.springcloud.common.util.IOUtil;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;

@Slf4j
@Data
public class RedisLockService
{

    private RedisTemplate redisTemplate;

    static String lockLua = "script/lock.lua";
    static String unLockLua = "script/unlock.lua";
    static RedisScript<Long> lockScript = null;
    static RedisScript<Long> unLockScript = null;
    {
        String script = IOUtil.loadJarFile(RedisLockService.class.getClassLoader(),lockLua);
//        String script = FileUtil.readString(lockLua, Charset.forName("UTF-8" ));
        if(StringUtils.isEmpty(script))
        {
            log.error("lua load failed:"+lockLua);
        }

        lockScript = new DefaultRedisScript<>(script, Long.class);

//        script = FileUtil.readString(unLockLua, Charset.forName("UTF-8" ));
        script =  IOUtil.loadJarFile(RedisLockService.class.getClassLoader(),unLockLua);
        if(StringUtils.isEmpty(script))
        {
            log.error("lua load failed:"+unLockLua);
        }
        unLockScript = new DefaultRedisScript<>(script, Long.class);

    }

    public RedisLockService(RedisTemplate redisTemplate)
    {
        this.redisTemplate = redisTemplate;
    }


    public Lock getLock(String lockKey, String lockValue) {
        JedisLock lock=new JedisLock(lockKey,lockValue);
        lock.setRedisTemplate(redisTemplate);
        lock.setLockScript(lockScript);
        lock.setUnLockScript(unLockScript);
        return lock;
    }
}

測試用例

接下來,終於能夠上測試用例了

package com.crazymaker.springcloud.lock;

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {DemoCloudApplication.class})
// 指定啓動類
public class RedisLockTest {

    @Resource
    RedisLockService redisLockService;

    private ExecutorService pool = Executors.newFixedThreadPool(10);

    @Test
    public void testLock() {
        int threads = 10;
        final int[] count = {0};
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        long start = System.currentTimeMillis();
        for (int i = 0; i < threads; i++) {
            pool.submit(() ->
            {
                String lockValue = UUID.randomUUID().toString();

                try {
                    Lock lock = redisLockService.getLock("test:lock:1", lockValue);
                    boolean locked = lock.tryLock(10, TimeUnit.SECONDS);

                    if (locked) {
                        for (int j = 0; j < 1000; j++) {
                            count[0]++;
                        }

                        log.info("count = " + count[0]);
                        lock.unlock();
                    } else {
                        System.out.println("搶鎖失敗");
                    }


                } catch (Exception e) {
                    e.printStackTrace();
                }
                countDownLatch.countDown();
            });
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("10個線程每一個累加1000爲: = " + count[0]);
        //輸出統計結果
        float time = System.currentTimeMillis() - start;

        System.out.println("運行的時長爲(ms):" + time);
        System.out.println("每一次執行的時長爲(ms):" + time / count[0]);

    }

}

執行用例,結果以下:

2021-05-04 23:02:11.900  INFO 22120 --- [pool-1-thread-7] c.c.springcloud.lock.RedisLockTest       LN:50 count = 6000
2021-05-04 23:02:11.901  INFO 22120 --- [pool-1-thread-1] c.c.springcloud.standard.lock.JedisLock  LN:81 睡眠一下,從新開始,turn:3,剩餘時間:9585
2021-05-04 23:02:11.902  INFO 22120 --- [pool-1-thread-1] c.c.springcloud.lock.RedisLockTest       LN:50 count = 7000
2021-05-04 23:02:12.100  INFO 22120 --- [pool-1-thread-4] c.c.springcloud.standard.lock.JedisLock  LN:81 睡眠一下,從新開始,turn:3,剩餘時間:9586
2021-05-04 23:02:12.101  INFO 22120 --- [pool-1-thread-5] c.c.springcloud.standard.lock.JedisLock  LN:81 睡眠一下,從新開始,turn:3,剩餘時間:9585
2021-05-04 23:02:12.101  INFO 22120 --- [pool-1-thread-8] c.c.springcloud.standard.lock.JedisLock  LN:81 睡眠一下,從新開始,turn:3,剩餘時間:9585
2021-05-04 23:02:12.101  INFO 22120 --- [pool-1-thread-4] c.c.springcloud.lock.RedisLockTest       LN:50 count = 8000
2021-05-04 23:02:12.102  INFO 22120 --- [pool-1-thread-8] c.c.springcloud.lock.RedisLockTest       LN:50 count = 9000
2021-05-04 23:02:12.304  INFO 22120 --- [pool-1-thread-5] c.c.springcloud.standard.lock.JedisLock  LN:81 睡眠一下,從新開始,turn:4,剩餘時間:9383
2021-05-04 23:02:12.307  INFO 22120 --- [pool-1-thread-5] c.c.springcloud.lock.RedisLockTest       LN:50 count = 10000
10個線程每一個累加1000爲: = 10000
運行的時長爲(ms):827.0
每一次執行的時長爲(ms):0.0827

STW致使的鎖過時問題

下面有一個簡單的使用鎖的例子,在10秒內佔着鎖:

//寫數據到文件
function writeData(filename, data) {
    boolean locked = lock.tryLock(10, TimeUnit.SECONDS);
    if (!locked) {
        throw 'Failed to acquire lock';
    }

    try {
        //將數據寫到文件
        var file = storage.readFile(filename);
        var updated = updateContents(file, data);
        storage.writeFile(filename, updated);
    } finally {
        lock.unlock();
    }
}

問題是:若是在寫文件過程當中,發生了 fullGC,而且其時間跨度較長, 超過了10秒, 那麼,分佈式就自動釋放了。

在此過程當中,client2 搶到鎖,寫了文件。

client1 的fullGC完成後,也繼續寫文件,注意,此時client1 的並無佔用鎖,此時寫入會致使文件數據錯亂,發生線程安全問題。

這就是STW致使的鎖過時問題。

STW致使的鎖過時問題,具體以下圖所示:

在這裏插入圖片描述

STW致使的鎖過時問題,大概的解決方案,有:

1: 模擬CAS樂觀鎖的方式,增長版本號

2:watch dog自動延期機制

1: 模擬CAS樂觀鎖的方式,增長版本號(以下圖中的token)

c

此方案若是要實現,須要調整業務邏輯,與之配合,因此會入侵代碼。

2:watch dog自動延期機制

客戶端1加鎖的鎖key默認生存時間才30秒,若是超過了30秒,客戶端1還想一直持有這把鎖,怎麼辦呢?

簡單!只要客戶端1一旦加鎖成功,就會啓動一個watch dog看門狗,他是一個後臺線程,會每隔10秒檢查一下,若是客戶端1還持有鎖key,那麼就會不斷的延長鎖key的生存時間。

redission,採用的就是這種方案, 此方案不會入侵業務代碼。

爲啥推薦使用Redission

做爲 Java 開發人員,咱們若想在程序中集成 Redis,必須使用 Redis 的第三方庫。目前你們使用的最多的第三方庫是jedis。

和SpringCloud gateway同樣,Redisson也是基於Netty實現的,是更高性能的第三方庫。 因此,這裏推薦你們使用Redission替代 jedis。

在使用Redission以前,建議你們先掌握Netty的知識。

推薦你們閱讀被不少小夥伴評價爲史上最爲易懂的NIO、Netty書籍:《Java高併發核心編程(卷1)》

在這裏插入圖片描述


Redisson簡介

Redisson是一個在Redis的基礎上實現的Java駐內存數據網格(In-Memory Data Grid)。它不只提供了一系列的分佈式的Java經常使用對象,還實現了可重入鎖(Reentrant Lock)、公平鎖(Fair Lock、聯鎖(MultiLock)、 紅鎖(RedLock)、 讀寫鎖(ReadWriteLock)等,還提供了許多分佈式服務。

img

Redisson提供了使用Redis的最簡單和最便捷的方法。Redisson的宗旨是促進使用者對Redis的關注分離(Separation of Concern),從而讓使用者可以將精力更集中地放在處理業務邏輯上。

img

Redisson與Jedis的對比

1.概況對比

Jedis是Redis的java實現的客戶端,其API提供了比較全面的的Redis命令的支持,Redisson實現了分佈式和可擴展的的java數據結構,和Jedis相比,功能較爲簡單,不支持字符串操做,不支持排序,事物,管道,分區等Redis特性。Redisson的宗旨是促進使用者對Redis的關注分離,從而讓使用者可以將精力更集中的放在處理業務邏輯上。

2.可伸縮性

Jedis使用阻塞的I/O,且其方法調用都是同步的,程序流程要等到sockets處理完I/O才能執行,不支持異步,Jedis客戶端實例不是線程安全的,因此須要經過鏈接池來使用Jedis。

Redisson使用非阻塞的I/O和基於Netty框架的事件驅動的通訊層,其方法調用時異步的。Redisson的API是線程安全的,因此操做單個Redisson鏈接來完成各類操做。

3.第三方框架整合

Redisson在Redis的基礎上實現了java緩存標準規範;Redisson還提供了Spring Session回話管理器的實現。

Redission 的源碼地址:

官網: https://redisson.org/

github: https://github.com/redisson/redisson#quick-start

特性 & 功能:

  • 支持 Redis 單節點(single)模式、哨兵(sentinel)模式、主從(Master/Slave)模式以及集羣(Redis Cluster)模式

  • 程序接口調用方式採用異步執行和異步流執行兩種方式

  • 數據序列化,Redisson 的對象編碼類是用於將對象進行序列化和反序列化,以實現對該對象在 Redis 裏的讀取和存儲

  • 單個集合數據分片,在集羣模式下,Redisson 爲單個 Redis 集合類型提供了自動分片的功能

  • 提供多種分佈式對象,如:Object Bucket,Bitset,AtomicLong,Bloom Filter 和 HyperLogLog 等

  • 提供豐富的分佈式集合,如:Map,Multimap,Set,SortedSet,List,Deque,Queue 等

  • 分佈式鎖和同步器的實現,可重入鎖(Reentrant Lock),公平鎖(Fair Lock),聯鎖(MultiLock),紅鎖(Red Lock),信號量(Semaphonre),可過時性信號鎖(PermitExpirableSemaphore)等

  • 提供先進的分佈式服務,如分佈式遠程服務(Remote Service),分佈式實時對象(Live Object)服務,分佈式執行服務(Executor Service),分佈式調度任務服務(Schedule Service)和分佈式映射概括服務(MapReduce)


Redisson的使用

如何安裝 Redisson

安裝 Redisson 最便捷的方法是使用 Maven 或者 Gradle:

•Maven

<dependency>	
    <groupId>org.redisson</groupId>	
    <artifactId>redisson</artifactId>	
    <version>3.11.4</version>	
</dependency>

•Gradle

compile group: 'org.redisson', name: 'redisson', version: '3.11.4'

目前 Redisson 最新版是 3.11.4,固然你也能夠經過搜索 Maven 中央倉庫 mvnrepository[1] 來找到 Redisson 的各類版本。

獲取RedissonClient對象

RedissonClient有多種模式,主要的模式有:

  • 單節點模式

  • 哨兵模式

  • 主從模式

  • 集羣模式

首先介紹單節點模式。

單節點模式的程序化配置方法,大體以下:

Config config = new Config();
config.useSingleServer().setAddress("redis://myredisserver:6379");
RedissonClient redisson = Redisson.create(config);xxxxxxxxxx Config config = new Config();config.useSingleServer().setAddress("redis://myredisserver:6379");RedissonClient redisson = Redisson.create(config);// connects to 127.0.0.1:6379 by defaultRedissonClient redisson = Redisson.create();
SingleServerConfig singleConfig = config.useSingleServer();

SingleServerConfig類的設置參數以下

address(節點地址)

能夠經過host:port的格式來指定節點地址。

subscriptionConnectionMinimumIdleSize(發佈和訂閱鏈接的最小空閒鏈接數)

默認值:1

用於發佈和訂閱鏈接的最小保持鏈接數(長鏈接)。Redisson內部常常經過發佈和訂閱來實現許多功能。長期保持必定數量的發佈訂閱鏈接是必須的。

subscriptionConnectionPoolSize(發佈和訂閱鏈接池大小)

默認值:50

用於發佈和訂閱鏈接的鏈接池最大容量。鏈接池的鏈接數量自動彈性伸縮。

connectionMinimumIdleSize(最小空閒鏈接數)

默認值:32

最小保持鏈接數(長鏈接)。長期保持必定數量的鏈接有利於提升瞬時寫入反應速度。

connectionPoolSize(鏈接池大小)

默認值:64

鏈接池最大容量。鏈接池的鏈接數量自動彈性伸縮。

dnsMonitoring(是否啓用DNS監測)

默認值:false

在啓用該功能之後,Redisson將會監測DNS的變化狀況。

dnsMonitoringInterval(DNS監測時間間隔,單位:毫秒)

默認值:5000

監測DNS的變化狀況的時間間隔。

idleConnectionTimeout(鏈接空閒超時,單位:毫秒)

默認值:10000

若是當前鏈接池裏的鏈接數量超過了最小空閒鏈接數,而同時有鏈接空閒時間超過了該數值,那麼這些鏈接將會自動被關閉,並從鏈接池裏去掉。時間單位是毫秒。

connectTimeout(鏈接超時,單位:毫秒)

默認值:10000

同節點創建鏈接時的等待超時。時間單位是毫秒。

timeout(命令等待超時,單位:毫秒)

默認值:3000

等待節點回覆命令的時間。該時間從命令發送成功時開始計時。

retryAttempts(命令失敗重試次數)

默認值:3

若是嘗試達到 retryAttempts(命令失敗重試次數) 仍然不能將命令發送至某個指定的節點時,將拋出錯誤。若是嘗試在此限制以內發送成功,則開始啓用 timeout(命令等待超時) 計時。

retryInterval(命令重試發送時間間隔,單位:毫秒)

默認值:1500

在一條命令發送失敗之後,等待重試發送的時間間隔。時間單位是毫秒。

reconnectionTimeout(從新鏈接時間間隔,單位:毫秒)

默認值:3000

當與某個節點的鏈接斷開時,等待與其從新創建鏈接的時間間隔。時間單位是毫秒。

failedAttempts(執行失敗最大次數)

默認值:3

在某個節點執行相同或不一樣命令時,連續 失敗 failedAttempts(執行失敗最大次數) 時,該節點將被從可用節點列表裏清除,直到 reconnectionTimeout(從新鏈接時間間隔) 超時之後再次嘗試。

database(數據庫編號)

默認值:0

嘗試鏈接的數據庫編號。

password(密碼)

默認值:null

用於節點身份驗證的密碼。

subscriptionsPerConnection(單個鏈接最大訂閱數量)

默認值:5

每一個鏈接的最大訂閱數量。

clientName(客戶端名稱)

默認值:null

在Redis節點裏顯示的客戶端名稱。

sslEnableEndpointIdentification(啓用SSL終端識別)

默認值:true

開啓SSL終端識別能力。

sslProvider(SSL實現方式)

默認值:JDK

肯定採用哪一種方式(JDK或OPENSSL)來實現SSL鏈接。

sslTruststore(SSL信任證書庫路徑)

默認值:null

指定SSL信任證書庫的路徑。

sslTruststorePassword(SSL信任證書庫密碼)

默認值:null

指定SSL信任證書庫的密碼。

sslKeystore(SSL鑰匙庫路徑)

默認值:null

指定SSL鑰匙庫的路徑。

sslKeystorePassword(SSL鑰匙庫密碼)

默認值:null

指定SSL鑰匙庫的密碼。

SpringBoot整合Redisson

Redisson有多種模式,首先介紹單機模式的整合。

1、導入Maven依賴

<!-- redisson-springboot -->
   <dependency>
       <groupId>org.redisson</groupId>
       <artifactId>redisson-spring-boot-starter</artifactId>
       <version>3.11.4</version>
   </dependency>

2、核心配置文件

spring:
  redis:
    host: 127.0.0.1
    port: 6379
    database: 0
    timeout: 5000

3、添加配置類

RedissonConfig.java

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class RedissonConfig {

    @Autowired
    private RedisProperties redisProperties;

    @Bean
    public RedissonClient redissonClient() {
        Config config = new Config();
        String redisUrl = String.format("redis://%s:%s", redisProperties.getHost() + "", redisProperties.getPort() + "");
        config.useSingleServer().setAddress(redisUrl).setPassword(redisProperties.getPassword());
        config.useSingleServer().setDatabase(3);
        return Redisson.create(config);
    }

}

自定義starter

因爲redission能夠有多種模式,處於學習的目的,將多種模式封裝成一個start,能夠學習一下starter的製做。

在這裏插入圖片描述

封裝一個RedissonManager,經過策略模式,根據不一樣的配置類型,建立 RedissionConfig實例,而後建立RedissionClient對象。

在這裏插入圖片描述

使用RBucket操做分佈式對象

Redission模擬了Java的面向對象編程思想,能夠簡單理解爲一切皆爲對象。

每個 Redisson 對象 實現了RObject and RExpirable 兩個interfaces.

Usage example:

RObject object = redisson.get...()

object.sizeInMemory();

object.delete();

object.rename("newname");

object.isExists();

// catch expired event
object.addListener(new ExpiredObjectListener() {
   ...
});

// catch delete event
object.addListener(new DeletedObjectListener() {
   ...
});

每個Redisson 對象的名字,就是 Redis中的 Key.

RMap map = redisson.getMap("mymap");
map.getName(); // = mymap

能夠經過 RKeys 接口操做Redis中的keys.

Usage example:

RKeys keys = redisson.getKeys();

Iterable<String> allKeys = keys.getKeys();

Iterable<String> foundedKeys = keys.getKeysByPattern('key*');

long numOfDeletedKeys = keys.delete("obj1", "obj2", "obj3");

long deletedKeysAmount = keys.deleteByPattern("test?");

String randomKey = keys.randomKey();

long keysAmount = keys.count();

keys.flushall();

keys.flushdb();

Redisson經過RBucket接口表明能夠訪問任何類型的基礎對象,或者普通對象

RBucket有一系列的工具方法,如compareAndSet(),get(),getAndDelete(),getAndSet(),set(),size(),trySet()等等,用於設值/取值/獲取尺寸。

RBucket普通對象的最大大小,爲512兆字節

RBucket<AnyObject> bucket = redisson.getBucket("anyObject");

bucket.set(new AnyObject(1));
AnyObject obj = bucket.get();

bucket.trySet(new AnyObject(3));
bucket.compareAndSet(new AnyObject(4), new AnyObject(5));
bucket.getAndSet(new AnyObject(6));

下面是一個完整的實例:

public class RedissionTest {

    @Resource
    RedissonManager redissonManager;

   @Test
    public void testRBucketExamples() {
        // 默認鏈接上 127.0.0.1:6379
        RedissonClient client = redissonManager.getRedisson();
        // RList 繼承了 java.util.List 接口
        RBucket<String> rstring  = client.getBucket("redission:test:bucket:string");
        rstring.set("this is a string");

        RBucket<UserDTO> ruser  = client.getBucket("redission:test:bucket:user");
        UserDTO dto = new UserDTO();
        dto.setToken(UUID.randomUUID().toString());
        ruser.set(dto);
        System.out.println("string is: " + rstring.get());
        System.out.println("dto is: " + ruser.get());

        client.shutdown();
    }


}

運行上面的代碼時,能夠得到如下輸出:

string is: this is a string
dto is: UserDTO(id=null, userId=null, username=null, password=null, nickname=null, token=183b6eeb-65a8-4b2a-80c6-cf17c08332ce, createTime=null, updateTime=null, headImgUrl=null, mobile=null, sex=null, enabled=null, type=null, openId=null, isDel=false)

在這裏插入圖片描述

使用 RList 操做 Redis 列表

下面的代碼簡單演示瞭如何在 Redisson 中使用 RList 對象。RList 是 Java 的 List 集合的分佈式併發實現。

考慮如下代碼:

public class RedissionTest {

    @Resource
    RedissonManager redissonManager;

    @Test
    public void testListExamples() {
        // 默認鏈接上 127.0.0.1:6379
        RedissonClient client = redissonManager.getRedisson();
        // RList 繼承了 java.util.List 接口
        RList<String> nameList = client.getList("redission:test:nameList");
        nameList.clear();
        nameList.add("張三");
        nameList.add("李四");
        nameList.add("王五");
        nameList.remove(-1);

        System.out.println("List size: " + nameList.size());


        boolean contains = nameList.contains("李四");
        System.out.println("Is list contains name '李四': " + contains);
        nameList.forEach(System.out::println);

        client.shutdown();
    }


}

運行上面的代碼時,能夠得到如下輸出:

List size: 2
Is list contains name '李四': true
張三
李四

在這裏插入圖片描述

使用 RMap 操做 Redis 哈希

Redisson 還包括 RMap,它是 Java Map 集合的分佈式併發實現,考慮如下代碼:

public class RedissionTest {

    @Resource
    RedissonManager redissonManager;

    @Test
    public void testListExamples() {
         // 默認鏈接上 127.0.0.1:6379
        RedissonClient client = redissonManager.getRedisson();
        // RMap 繼承了 java.util.concurrent.ConcurrentMap 接口
        RMap<String, Object> map = client.getMap("redission:test:personalMap");
        map.put("name", "張三");
        map.put("address", "北京");
        map.put("age", new Integer(50));

        System.out.println("Map size: " + map.size());

        boolean contains = map.containsKey("age");
        System.out.println("Is map contains key 'age': " + contains);
        String value = String.valueOf(map.get("name"));
        System.out.println("Value mapped by key 'name': " + value);

        client.shutdown();
    }


}

運行上面的代碼時,將會看到如下輸出:

Map size: 3
Is map contains key 'age': true
Value mapped by key 'name': 張三

在這裏插入圖片描述

執行 Lua腳本

Lua是一種開源、簡單易學、輕量小巧的腳本語言,用標準C語言編寫。

其設計的目的就是爲了嵌入應用程序中,從而爲應用程序提供靈活的擴展和定製功能。

Redis從2.6版本開始支持Lua腳本,Redis使用Lua能夠:

  1. 原子操做。Redis會將整個腳本做爲一個總體執行,不會被中斷。能夠用來批量更新、批量插入
  2. 減小網絡開銷。多個Redis操做合併爲一個腳本,減小網絡時延
  3. 代碼複用。客戶端發送的腳本能夠存儲在Redis中,其餘客戶端能夠根據腳本的id調用。
public class RedissionTest {

    @Resource
    RedissonManager redissonManager;

    @Test
    public void testLuaExamples() {
        // 默認鏈接上 127.0.0.1:6379
        RedissonClient redisson = redissonManager.getRedisson();

        redisson.getBucket("redission:test:foo").set("bar");
        String r = redisson.getScript().eval(RScript.Mode.READ_ONLY,
                "return redis.call('get', 'redission:test:foo')", RScript.ReturnType.VALUE);
        System.out.println("foo: " + r);

        // 經過預存的腳本進行一樣的操做
        RScript s = redisson.getScript();
        // 首先將腳本加載到Redis
        String sha1 = s.scriptLoad("return redis.call('get', 'redission:test:foo')");
        // 返回值 res == 282297a0228f48cd3fc6a55de6316f31422f5d17
        System.out.println("sha1: " + sha1);
        // 再經過SHA值調用腳本
        Future<Object> r1 = redisson.getScript().evalShaAsync(RScript.Mode.READ_ONLY,
                sha1,
                RScript.ReturnType.VALUE,
                Collections.emptyList());
        try {
            System.out.println("res: " + r1.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        client.shutdown();
    }
}

運行上面的代碼時,將會看到如下輸出:

foo: bar
sha1: 282297a0228f48cd3fc6a55de6316f31422f5d17
res: bar

在這裏插入圖片描述

使用 RLock 實現 Redis 分佈式鎖

RLock 是 Java 中可重入鎖的分佈式實現,下面的代碼演示了 RLock 的用法:

public class RedissionTest {

    @Resource
    RedissonManager redissonManager;

 @Test
    public void testLockExamples() {
        // 默認鏈接上 127.0.0.1:6379
        RedissonClient redisson = redissonManager.getRedisson();
        // RLock 繼承了 java.util.concurrent.locks.Lock 接口
        RLock lock = redisson.getLock("redission:test:lock:1");

        final int[] count = {0};
        int threads = 10;
        ExecutorService pool = Executors.newFixedThreadPool(10);
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        long start = System.currentTimeMillis();
        for (int i = 0; i < threads; i++) {
            pool.submit(() ->
            {
                for (int j = 0; j < 1000; j++) {
                    lock.lock();

                    count[0]++;
                    lock.unlock();
                }
                countDownLatch.countDown();
            });
        }

        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("10個線程每一個累加1000爲: = " + count[0]);
        //輸出統計結果
        float time = System.currentTimeMillis() - start;

        System.out.println("運行的時長爲:" + time);
        System.out.println("每一次執行的時長爲:" + time/count[0]);
    }

}

此代碼將產生如下輸出:

10個線程每一個累加1000爲: = 10000
運行的時長爲:14172.0
每一次執行的時長爲:1.4172

使用 RAtomicLong 實現 Redis 原子操做

RAtomicLong 是 Java 中 AtomicLong 類的分佈式「替代品」,用於在併發環境中保存長值。如下示例代碼演示了 RAtomicLong 的用法:

public class RedissionTest {

    @Resource
    RedissonManager redissonManager;

    @Test
    public void testRAtomicLongExamples() {
        // 默認鏈接上 127.0.0.1:6379
        RedissonClient redisson = redissonManager.getRedisson();
        RAtomicLong atomicLong = redisson.getAtomicLong("redission:test:myLong");
          // 線程數
        final int threads = 10;
        // 每條線程的執行輪數
        final int turns = 1000;
        ExecutorService pool = Executors.newFixedThreadPool(threads);
        for (int i = 0; i < threads; i++)
        {
            pool.submit(() ->
            {
                try
                {
                    for (int j = 0; j < turns; j++)
                    {
                        atomicLong.incrementAndGet();
                    }

                } catch (Exception e)
                {
                    e.printStackTrace();
                }
            });
        }

        ThreadUtil.sleepSeconds(5);
        System.out.println("atomicLong: " + atomicLong.get());
        redisson.shutdown();
    }

}

此代碼的輸出將是:

atomicLong: 10000

在這裏插入圖片描述

整長型累加器(LongAdder)

基於Redis的Redisson分佈式整長型累加器(LongAdder)採用了與java.util.concurrent.atomic.LongAdder相似的接口。經過利用客戶端內置的LongAdder對象,爲分佈式環境下遞增和遞減操做提供了很高得性能。據統計其性能最高比分佈式AtomicLong對象快 12000 倍。

完美適用於分佈式統計計量場景。下面是RLongAdder的使用案例:

RLongAdder atomicLong = redisson.getLongAdder("myLongAdder");
atomicLong.add(12);
atomicLong.increment();
atomicLong.decrement();
atomicLong.sum();

如下示例代碼演示了 RLongAdder 的用法:

public class RedissionTest {

    @Resource
    RedissonManager redissonManager;

    @Test
    public void testRAtomicLongExamples() {
        // 默認鏈接上 127.0.0.1:6379
        RedissonClient redisson = redissonManager.getRedisson();
        RAtomicLong atomicLong = redisson.getAtomicLong("redission:test:myLong");
          // 線程數
        final int threads = 10;
        // 每條線程的執行輪數
        final int turns = 1000;
        ExecutorService pool = Executors.newFixedThreadPool(threads);
        for (int i = 0; i < threads; i++)
        {
            pool.submit(() ->
            {
                try
                {
                    for (int j = 0; j < turns; j++)
                    {
                        atomicLong.incrementAndGet();
                    }

                } catch (Exception e)
                {
                    e.printStackTrace();
                }
            });
        }

        ThreadUtil.sleepSeconds(5);
        System.out.println("atomicLong: " + atomicLong.get());
        redisson.shutdown();
    }

}

此代碼將產生如下輸出:

longAdder: 10000
運行的時長爲:5085.0
每一次執行的時長爲:0.5085

當再也不使用整長型累加器對象的時候應該自行手動銷燬,若是Redisson對象被關閉(shutdown)了,則不用手動銷燬。

RLongAdder atomicLong = ...
atomicLong.destroy();

序列化

Redisson的對象編碼類是用於將對象進行序列化和反序列化,以實現對該對象在Redis裏的讀取和存儲。Redisson提供瞭如下幾種的對象編碼應用,以供你們選擇:

編碼類名稱 說明
org.redisson.codec.JsonJacksonCodec Jackson JSON 編碼 默認編碼
org.redisson.codec.AvroJacksonCodec Avro 一個二進制的JSON編碼
org.redisson.codec.SmileJacksonCodec Smile 另外一個二進制的JSON編碼
org.redisson.codec.CborJacksonCodec CBOR 又一個二進制的JSON編碼
org.redisson.codec.MsgPackJacksonCodec MsgPack 再來一個二進制的JSON編碼
org.redisson.codec.IonJacksonCodec Amazon Ion 亞馬遜的Ion編碼,格式與JSON相似
org.redisson.codec.KryoCodec Kryo 二進制對象序列化編碼
org.redisson.codec.SerializationCodec JDK序列化編碼
org.redisson.codec.FstCodec FST 10倍於JDK序列化性能並且100%兼容的編碼
org.redisson.codec.LZ4Codec LZ4 壓縮型序列化對象編碼
org.redisson.codec.SnappyCodec Snappy 另外一個壓縮型序列化對象編碼
org.redisson.client.codec.JsonJacksonMapCodec 基於Jackson的映射類使用的編碼。可用於避免序列化類的信息,以及用於解決使用byte[]遇到的問題。
org.redisson.client.codec.StringCodec 純字符串編碼(無轉換)
org.redisson.client.codec.LongCodec 純整長型數字編碼(無轉換)
org.redisson.client.codec.ByteArrayCodec 字節數組編碼
org.redisson.codec.CompositeCodec 用來組合多種不一樣編碼在一塊兒

由Redisson默認的編碼器爲二進制編碼器,爲了序列化後的內容可見,須要使用Json文本序列化編碼工具類。Redisson提供了編碼器 JsonJacksonCodec,做爲Json文本序列化編碼工具類。

問題是:JsonJackson在序列化有雙向引用的對象時,會出現無限循環異常。而fastjson在檢查出雙向引用後會自動用引用符$ref替換,終止循環。

因此,一些特殊場景中:用fastjson能 正常序列化到redis,而JsonJackson則拋出無限循環異常。

爲了序列化後的內容可見,因此不用redission其餘自帶的,自行實現fastjson編碼器:

package com.crayon.distributedredissionspringbootstarter.codec;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import org.redisson.client.codec.BaseCodec;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;

import java.io.IOException;

public class FastjsonCodec extends BaseCodec {

    private final Encoder encoder = in -> {
        ByteBuf out = ByteBufAllocator.DEFAULT.buffer();
        try {
            ByteBufOutputStream os = new ByteBufOutputStream(out);
            JSON.writeJSONString(os, in, SerializerFeature.WriteClassName);
            return os.buffer();
        } catch (IOException e) {
            out.release();
            throw e;
        } catch (Exception e) {
            out.release();
            throw new IOException(e);
        }
    };

    private final Decoder<Object> decoder = (buf, state) ->
            JSON.parseObject(new ByteBufInputStream(buf), Object.class);

    @Override
    public Decoder<Object> getValueDecoder() {
        return decoder;
    }

    @Override
    public Encoder getValueEncoder() {
        return encoder;
    }
}

替換的方法以下:

*/
@Slf4j
public class StandaloneConfigImpl implements RedissonConfigService {

    @Override
    public Config createRedissonConfig(RedissonConfig redissonConfig) {
        Config config = new Config();
        try {
            String address = redissonConfig.getAddress();
            String password = redissonConfig.getPassword();
            int database = redissonConfig.getDatabase();
            String redisAddr = GlobalConstant.REDIS_CONNECTION_PREFIX.getConstant_value() + address;
            config.useSingleServer().setAddress(redisAddr);
            config.useSingleServer().setDatabase(database);
            //密碼能夠爲空
            if (!StringUtils.isEmpty(password)) {
                config.useSingleServer().setPassword(password);
            }
            log.info("初始化[單機部署]方式Config,redisAddress:" + address);

//            config.setCodec( new FstCodec());
            config.setCodec( new FastjsonCodec());
        } catch (Exception e) {
            log.error("單機部署 Redisson init error", e);
        }
        return config;
    }
}

哨兵模式

哨兵模式即sentinel模式,配置Redis哨兵服務的官方文檔在這裏

哨兵模式實現代碼和單機模式幾乎同樣,惟一的不一樣就是Config的構造.

程序化配置哨兵模式的方法以下

Config config = new Config();
config.useSentinelServers()
    .setMasterName("mymaster")
    // use "rediss://" for SSL connection
    .addSentinelAddress("redis://127.0.0.1:26389", "redis://127.0.0.1:26379")
    .addSentinelAddress("redis://127.0.0.1:26319");

RedissonClient redisson = Redisson.create(config);

Redisson的哨兵模式的使用方法以下:

SentinelServersConfig sentinelConfig = config.useSentinelServers();

SentinelServersConfig配置參數以下

配置Redis哨兵服務的官方文檔在這裏。Redisson的哨兵模式的使用方法以下:SentinelServersConfig sentinelConfig = config.useSentinelServers();

SentinelServersConfig 類的設置參數以下:

dnsMonitoringInterval(DNS監控間隔,單位:毫秒)

默認值:5000

用來指定檢查節點DNS變化的時間間隔。使用的時候應該確保JVM裏的DNS數據的緩存時間保持在足夠低的範圍纔有意義。用-1來禁用該功能。

masterName(主服務器的名稱)

主服務器的名稱是哨兵進程中用來監測主從服務切換狀況的。

addSentinelAddress(添加哨兵節點地址)

能夠經過host:port的格式來指定哨兵節點的地址。多個節點能夠一次性批量添加。

readMode(讀取操做的負載均衡模式)

默認值: SLAVE(只在從服務節點裏讀取)

注:在從服務節點裏讀取的數聽說明已經至少有兩個節點保存了該數據,確保了數據的高可用性。

設置讀取操做選擇節點的模式。可用值爲:SLAVE - 只在從服務節點裏讀取。MASTER - 只在主服務節點裏讀取。MASTER_SLAVE - 在主從服務節點裏均可以讀取。

subscriptionMode(訂閱操做的負載均衡模式)

默認值:SLAVE(只在從服務節點裏訂閱)

設置訂閱操做選擇節點的模式。可用值爲:SLAVE - 只在從服務節點裏訂閱。MASTER - 只在主服務節點裏訂閱。

loadBalancer(負載均衡算法類的選擇)

默認值: org.redisson.connection.balancer.RoundRobinLoadBalancer

在使用多個Redis服務節點的環境裏,能夠選用如下幾種負載均衡方式選擇一個節點:org.redisson.connection.balancer.WeightedRoundRobinBalancer - 權重輪詢調度算法org.redisson.connection.balancer.RoundRobinLoadBalancer - 輪詢調度算法org.redisson.connection.balancer.RandomLoadBalancer - 隨機調度算法

subscriptionConnectionMinimumIdleSize(從節點發布和訂閱鏈接的最小空閒鏈接數)

默認值:1

多從節點的環境裏,每一個 從服務節點裏用於發佈和訂閱鏈接的最小保持鏈接數(長鏈接)。Redisson內部常常經過發佈和訂閱來實現許多功能。長期保持必定數量的發佈訂閱鏈接是必須的。

subscriptionConnectionPoolSize(從節點發布和訂閱鏈接池大小)

默認值:50

多從節點的環境裏,每一個 從服務節點裏用於發佈和訂閱鏈接的鏈接池最大容量。鏈接池的鏈接數量自動彈性伸縮。

slaveConnectionMinimumIdleSize(從節點最小空閒鏈接數)

默認值:32

多從節點的環境裏,每一個 從服務節點裏用於普通操做( 發佈和訂閱)的最小保持鏈接數(長鏈接)。長期保持必定數量的鏈接有利於提升瞬時讀取反映速度。

slaveConnectionPoolSize(從節點鏈接池大小)

默認值:64

多從節點的環境裏,每一個 從服務節點裏用於普通操做( 發佈和訂閱)鏈接的鏈接池最大容量。鏈接池的鏈接數量自動彈性伸縮。

masterConnectionMinimumIdleSize(主節點最小空閒鏈接數)

默認值:32

多從節點的環境裏,每一個 主節點的最小保持鏈接數(長鏈接)。長期保持必定數量的鏈接有利於提升瞬時寫入反應速度。

masterConnectionPoolSize(主節點鏈接池大小)

默認值:64

主節點的鏈接池最大容量。鏈接池的鏈接數量自動彈性伸縮。

idleConnectionTimeout(鏈接空閒超時,單位:毫秒)

默認值:10000

若是當前鏈接池裏的鏈接數量超過了最小空閒鏈接數,而同時有鏈接空閒時間超過了該數值,那麼這些鏈接將會自動被關閉,並從鏈接池裏去掉。時間單位是毫秒。

connectTimeout(鏈接超時,單位:毫秒)

默認值:10000

同任何節點創建鏈接時的等待超時。時間單位是毫秒。

timeout(命令等待超時,單位:毫秒)

默認值:3000

等待節點回覆命令的時間。該時間從命令發送成功時開始計時。

retryAttempts(命令失敗重試次數)

默認值:3

若是嘗試達到 retryAttempts(命令失敗重試次數) 仍然不能將命令發送至某個指定的節點時,將拋出錯誤。若是嘗試在此限制以內發送成功,則開始啓用 timeout(命令等待超時) 計時。

retryInterval(命令重試發送時間間隔,單位:毫秒)

默認值:1500

在一條命令發送失敗之後,等待重試發送的時間間隔。時間單位是毫秒。

reconnectionTimeout(從新鏈接時間間隔,單位:毫秒)

默認值:3000

當與某個節點的鏈接斷開時,等待與其從新創建鏈接的時間間隔。時間單位是毫秒。

failedAttempts(執行失敗最大次數)

默認值:3

在某個節點執行相同或不一樣命令時,連續 失敗 failedAttempts(執行失敗最大次數) 時,該節點將被從可用節點列表裏清除,直到 reconnectionTimeout(從新鏈接時間間隔) 超時之後再次嘗試。

database(數據庫編號)

默認值:0

嘗試鏈接的數據庫編號。

password(密碼)

默認值:null

用於節點身份驗證的密碼。

subscriptionsPerConnection(單個鏈接最大訂閱數量)

默認值:5

每一個鏈接的最大訂閱數量。

clientName(客戶端名稱)

默認值:null

在Redis節點裏顯示的客戶端名稱。

sslEnableEndpointIdentification(啓用SSL終端識別)

默認值:true

開啓SSL終端識別能力。

sslProvider(SSL實現方式)

默認值:JDK

肯定採用哪一種方式(JDK或OPENSSL)來實現SSL鏈接。

sslTruststore(SSL信任證書庫路徑)

默認值:null

指定SSL信任證書庫的路徑。

sslTruststorePassword(SSL信任證書庫密碼)

默認值:null

指定SSL信任證書庫的密碼。

sslKeystore(SSL鑰匙庫路徑)

默認值:null

指定SSL鑰匙庫的路徑。

sslKeystorePassword(SSL鑰匙庫密碼)

默認值:null

指定SSL鑰匙庫的密碼。

經過屬性文件,配置的示例以下:

---
sentinelServersConfig:
idleConnectionTimeout: 10000
connectTimeout: 10000
timeout: 3000
retryAttempts: 3
retryInterval: 1500
failedSlaveReconnectionInterval: 3000
failedSlaveCheckInterval: 60000
password: null
subscriptionsPerConnection: 5
clientName: null
loadBalancer: !<org.redisson.connection.balancer.RoundRobinLoadBalancer> {}
subscriptionConnectionMinimumIdleSize: 1
subscriptionConnectionPoolSize: 50
slaveConnectionMinimumIdleSize: 24
slaveConnectionPoolSize: 64
masterConnectionMinimumIdleSize: 24
masterConnectionPoolSize: 64
readMode: "SLAVE"
subscriptionMode: "SLAVE"
sentinelAddresses:
  - "redis://127.0.0.1:26379"
  - "redis://127.0.0.1:26389"
masterName: "mymaster"
database: 0
threads: 16
nettyThreads: 32
codec: !<org.redisson.codec.MarshallingCodec> {}
transportMode: "NIO"

主從模式

介紹配置Redis主從服務組態的文檔在這裏.

程序化配置主從模式的方法以下

Config config = new Config();
config.useMasterSlaveServers()
    // use "rediss://" for SSL connection
    .setMasterAddress("redis://127.0.0.1:6379")
    .addSlaveAddress("redis://127.0.0.1:6389", "redis://127.0.0.1:6332", "redis://127.0.0.1:6419")
    .addSlaveAddress("redis://127.0.0.1:6399");

RedissonClient redisson = Redisson.create(config);

主從模式使用到MasterSlaveServersConfig :

MasterSlaveServersConfig masterSlaveConfig = config.useMasterSlaveServers();

MasterSlaveServersConfig 類的設置參數以下:

dnsMonitoringInterval(DNS監控間隔,單位:毫秒)

默認值:5000

用來指定檢查節點DNS變化的時間間隔。使用的時候應該確保JVM裏的DNS數據的緩存時間保持在足夠低的範圍纔有意義。用-1來禁用該功能。

masterAddress(主節點地址)

能夠經過host:port的格式來指定主節點地址。

addSlaveAddress(添加從主節點地址)

能夠經過host:port的格式來指定從節點的地址。多個節點能夠一次性批量添加。

readMode(讀取操做的負載均衡模式)

默認值: SLAVE(只在從服務節點裏讀取)

注:在從服務節點裏讀取的數聽說明已經至少有兩個節點保存了該數據,確保了數據的高可用性。

設置讀取操做選擇節點的模式。可用值爲:SLAVE - 只在從服務節點裏讀取。MASTER - 只在主服務節點裏讀取。MASTER_SLAVE - 在主從服務節點裏均可以讀取。

subscriptionMode(訂閱操做的負載均衡模式)

默認值:SLAVE(只在從服務節點裏訂閱)

設置訂閱操做選擇節點的模式。可用值爲:SLAVE - 只在從服務節點裏訂閱。MASTER - 只在主服務節點裏訂閱。

loadBalancer(負載均衡算法類的選擇)

默認值: org.redisson.connection.balancer.RoundRobinLoadBalancer

在使用多個Redis服務節點的環境裏,能夠選用如下幾種負載均衡方式選擇一個節點:org.redisson.connection.balancer.WeightedRoundRobinBalancer - 權重輪詢調度算法org.redisson.connection.balancer.RoundRobinLoadBalancer - 輪詢調度算法org.redisson.connection.balancer.RandomLoadBalancer - 隨機調度算法

subscriptionConnectionMinimumIdleSize(從節點發布和訂閱鏈接的最小空閒鏈接數)

默認值:1

多從節點的環境裏,每一個 從服務節點裏用於發佈和訂閱鏈接的最小保持鏈接數(長鏈接)。Redisson內部常常經過發佈和訂閱來實現許多功能。長期保持必定數量的發佈訂閱鏈接是必須的。

subscriptionConnectionPoolSize(從節點發布和訂閱鏈接池大小)

默認值:50

多從節點的環境裏,每一個 從服務節點裏用於發佈和訂閱鏈接的鏈接池最大容量。鏈接池的鏈接數量自動彈性伸縮。

slaveConnectionMinimumIdleSize(從節點最小空閒鏈接數)

默認值:32

多從節點的環境裏,每一個 從服務節點裏用於普通操做( 發佈和訂閱)的最小保持鏈接數(長鏈接)。長期保持必定數量的鏈接有利於提升瞬時讀取反映速度。

slaveConnectionPoolSize(從節點鏈接池大小)

默認值:64

多從節點的環境裏,每一個 從服務節點裏用於普通操做( 發佈和訂閱)鏈接的鏈接池最大容量。鏈接池的鏈接數量自動彈性伸縮。

masterConnectionMinimumIdleSize(主節點最小空閒鏈接數)

默認值:32

多從節點的環境裏,每一個 主節點的最小保持鏈接數(長鏈接)。長期保持必定數量的鏈接有利於提升瞬時寫入反應速度。

masterConnectionPoolSize(主節點鏈接池大小)

默認值:64

主節點的鏈接池最大容量。鏈接池的鏈接數量自動彈性伸縮。

idleConnectionTimeout(鏈接空閒超時,單位:毫秒)

默認值:10000

若是當前鏈接池裏的鏈接數量超過了最小空閒鏈接數,而同時有鏈接空閒時間超過了該數值,那麼這些鏈接將會自動被關閉,並從鏈接池裏去掉。時間單位是毫秒。

connectTimeout(鏈接超時,單位:毫秒)

默認值:10000

同任何節點創建鏈接時的等待超時。時間單位是毫秒。

timeout(命令等待超時,單位:毫秒)

默認值:3000

等待節點回覆命令的時間。該時間從命令發送成功時開始計時。

retryAttempts(命令失敗重試次數)

默認值:3

若是嘗試達到 retryAttempts(命令失敗重試次數) 仍然不能將命令發送至某個指定的節點時,將拋出錯誤。若是嘗試在此限制以內發送成功,則開始啓用 timeout(命令等待超時) 計時。

retryInterval(命令重試發送時間間隔,單位:毫秒)

默認值:1500

在一條命令發送失敗之後,等待重試發送的時間間隔。時間單位是毫秒。

reconnectionTimeout(從新鏈接時間間隔,單位:毫秒)

默認值:3000

當與某個節點的鏈接斷開時,等待與其從新創建鏈接的時間間隔。時間單位是毫秒。

failedAttempts(執行失敗最大次數)

默認值:3

在某個節點執行相同或不一樣命令時,連續 失敗 failedAttempts(執行失敗最大次數) 時,該節點將被從可用節點列表裏清除,直到 reconnectionTimeout(從新鏈接時間間隔) 超時之後再次嘗試。

database(數據庫編號)

默認值:0

嘗試鏈接的數據庫編號。

password(密碼)

默認值:null

用於節點身份驗證的密碼。

subscriptionsPerConnection(單個鏈接最大訂閱數量)

默認值:5

每一個鏈接的最大訂閱數量。

clientName(客戶端名稱)

默認值:null

在Redis節點裏顯示的客戶端名稱。

sslEnableEndpointIdentification(啓用SSL終端識別)

默認值:true

開啓SSL終端識別能力。

sslProvider(SSL實現方式)

默認值:JDK

肯定採用哪一種方式(JDK或OPENSSL)來實現SSL鏈接。

sslTruststore(SSL信任證書庫路徑)

默認值:null

指定SSL信任證書庫的路徑。

sslTruststorePassword(SSL信任證書庫密碼)

默認值:null

指定SSL信任證書庫的密碼。

sslKeystore(SSL鑰匙庫路徑)

默認值:null

指定SSL鑰匙庫的路徑。

sslKeystorePassword(SSL鑰匙庫密碼)

默認值:null

指定SSL鑰匙庫的密碼。

集羣模式

集羣模式除了適用於Redis集羣環境,也適用於任何雲計算服務商提供的集羣模式,例如AWS ElastiCache集羣版Azure Redis Cache阿里雲(Aliyun)的雲數據庫Redis版

介紹配置Redis集羣組態的文檔在這裏。 Redis集羣組態的最低要求是必須有三個主節點。

集羣模式構造Config以下:

Config config = new Config();
config.useClusterServers()
    .setScanInterval(2000) // 集羣狀態掃描間隔時間,單位是毫秒
    //能夠用"rediss://"來啓用SSL鏈接
    .addNodeAddress("redis://127.0.0.1:7000", "redis://127.0.0.1:7001")
    .addNodeAddress("redis://127.0.0.1:7002");
RedissonClient redisson = Redisson.create(config);

集羣模式使用到ClusterServersConfig :

ClusterServersConfig clusterConfig = config.useClusterServers();

ClusterServersConfig 配置參數以下

nodeAddresses(添加節點地址)

能夠經過host:port的格式來添加Redis集羣節點的地址。多個節點能夠一次性批量添加。

scanInterval(集羣掃描間隔時間)

默認值: 1000

對Redis集羣節點狀態掃描的時間間隔。單位是毫秒。

slots(分片數量)

默認值: 231用於指定數據分片過程當中的分片數量。支持數據分片/框架結構有:集(Set)映射(Map)BitSetBloom filter, Spring CacheHibernate Cache等.

readMode(讀取操做的負載均衡模式)

默認值: SLAVE(只在從服務節點裏讀取)

注:在從服務節點裏讀取的數聽說明已經至少有兩個節點保存了該數據,確保了數據的高可用性。

設置讀取操做選擇節點的模式。可用值爲:SLAVE - 只在從服務節點裏讀取。MASTER - 只在主服務節點裏讀取。MASTER_SLAVE - 在主從服務節點裏均可以讀取。

subscriptionMode(訂閱操做的負載均衡模式)

默認值:SLAVE(只在從服務節點裏訂閱)

設置訂閱操做選擇節點的模式。可用值爲:SLAVE - 只在從服務節點裏訂閱。MASTER - 只在主服務節點裏訂閱。

loadBalancer(負載均衡算法類的選擇)

默認值: org.redisson.connection.balancer.RoundRobinLoadBalancer

在多Redis服務節點的環境裏,能夠選用如下幾種負載均衡方式選擇一個節點:org.redisson.connection.balancer.WeightedRoundRobinBalancer - 權重輪詢調度算法org.redisson.connection.balancer.RoundRobinLoadBalancer - 輪詢調度算法org.redisson.connection.balancer.RandomLoadBalancer - 隨機調度算法

subscriptionConnectionMinimumIdleSize(從節點發布和訂閱鏈接的最小空閒鏈接數)

默認值:1

多從節點的環境裏,每一個 從服務節點裏用於發佈和訂閱鏈接的最小保持鏈接數(長鏈接)。Redisson內部常常經過發佈和訂閱來實現許多功能。長期保持必定數量的發佈訂閱鏈接是必須的。

subscriptionConnectionPoolSize(從節點發布和訂閱鏈接池大小)

默認值:50

多從節點的環境裏,每一個 從服務節點裏用於發佈和訂閱鏈接的鏈接池最大容量。鏈接池的鏈接數量自動彈性伸縮。

slaveConnectionMinimumIdleSize(從節點最小空閒鏈接數)

默認值:32

多從節點的環境裏,每一個 從服務節點裏用於普通操做( 發佈和訂閱)的最小保持鏈接數(長鏈接)。長期保持必定數量的鏈接有利於提升瞬時讀取反映速度。

slaveConnectionPoolSize(從節點鏈接池大小)

默認值:64

多從節點的環境裏,每一個 從服務節點裏用於普通操做( 發佈和訂閱)鏈接的鏈接池最大容量。鏈接池的鏈接數量自動彈性伸縮。

masterConnectionMinimumIdleSize(主節點最小空閒鏈接數)

默認值:32

多節點的環境裏,每一個 主節點的最小保持鏈接數(長鏈接)。長期保持必定數量的鏈接有利於提升瞬時寫入反應速度。

masterConnectionPoolSize(主節點鏈接池大小)

默認值:64

多主節點的環境裏,每一個 主節點的鏈接池最大容量。鏈接池的鏈接數量自動彈性伸縮。

idleConnectionTimeout(鏈接空閒超時,單位:毫秒)

默認值:10000

若是當前鏈接池裏的鏈接數量超過了最小空閒鏈接數,而同時有鏈接空閒時間超過了該數值,那麼這些鏈接將會自動被關閉,並從鏈接池裏去掉。時間單位是毫秒。

connectTimeout(鏈接超時,單位:毫秒)

默認值:10000

同任何節點創建鏈接時的等待超時。時間單位是毫秒。

timeout(命令等待超時,單位:毫秒)

默認值:3000

等待節點回覆命令的時間。該時間從命令發送成功時開始計時。

retryAttempts(命令失敗重試次數)

默認值:3

若是嘗試達到 retryAttempts(命令失敗重試次數) 仍然不能將命令發送至某個指定的節點時,將拋出錯誤。若是嘗試在此限制以內發送成功,則開始啓用 timeout(命令等待超時) 計時。

retryInterval(命令重試發送時間間隔,單位:毫秒)

默認值:1500

在一條命令發送失敗之後,等待重試發送的時間間隔。時間單位是毫秒。

reconnectionTimeout(從新鏈接時間間隔,單位:毫秒)

默認值:3000

當與某個節點的鏈接斷開時,等待與其從新創建鏈接的時間間隔。時間單位是毫秒。

failedAttempts(執行失敗最大次數)

默認值:3

在某個節點執行相同或不一樣命令時,連續 失敗 failedAttempts(執行失敗最大次數) 時,該節點將被從可用節點列表裏清除,直到 reconnectionTimeout(從新鏈接時間間隔) 超時之後再次嘗試。

password(密碼)

默認值:null

用於節點身份驗證的密碼。

subscriptionsPerConnection(單個鏈接最大訂閱數量)

默認值:5

每一個鏈接的最大訂閱數量。

clientName(客戶端名稱)

默認值:null

在Redis節點裏顯示的客戶端名稱。

sslEnableEndpointIdentification(啓用SSL終端識別)

默認值:true

開啓SSL終端識別能力。

sslProvider(SSL實現方式)

默認值:JDK

肯定採用哪一種方式(JDK或OPENSSL)來實現SSL鏈接。

sslTruststore(SSL信任證書庫路徑)

默認值:null

指定SSL信任證書庫的路徑。

sslTruststorePassword(SSL信任證書庫密碼)

默認值:null

指定SSL信任證書庫的密碼。

sslKeystore(SSL鑰匙庫路徑)

默認值:null

指定SSL鑰匙庫的路徑。

sslKeystorePassword(SSL鑰匙庫密碼)

默認值:null

指定SSL鑰匙庫的密碼。

簡單Redision鎖的原理

Redis發展到如今,幾種常見的部署架構有:

  1. 單機模式;
  2. 哨兵模式;
  3. 集羣模式;

先介紹,基於單機模式的簡單Redision鎖的使用。

簡單Redision鎖的使用

單機模式下,簡單Redision鎖的使用以下:

// 構造redisson實現分佈式鎖必要的Config
Config config = new Config();
config.useSingleServer().setAddress("redis://172.29.1.180:5379").setPassword("a123456").setDatabase(0);
// 構造RedissonClient
RedissonClient redissonClient = Redisson.create(config);
// 設置鎖定資源名稱
RLock disLock = redissonClient.getLock("DISLOCK");
//嘗試獲取分佈式鎖
boolean isLock= disLock.tryLock(500, 15000, TimeUnit.MILLISECONDS);
if (isLock) {
   try {
        //TODO if get lock success, do something;
        Thread.sleep(15000);

   } catch (Exception e) {
   } finally {
    // 不管如何, 最後都要解鎖
    disLock.unlock();
   }
}

經過代碼可知,通過Redisson的封裝,實現Redis分佈式鎖很是方便,和顯式鎖的使用方法是同樣的。RLock接口繼承了 Lock接口。

咱們再看一下Redis中的value是啥,和前文分析同樣,hash結構, redis 的key就是資源名稱。

hash結構的key就是UUID+threadId,hash結構的value就是重入值,在分佈式鎖時,這個值爲1(Redisson還能夠實現重入鎖,那麼這個值就取決於重入次數了):

172.29.1.180:5379> hgetall DISLOCK
1) "01a6d806-d282-4715-9bec-f51b9aa98110:1"
2) "1"

使用客戶端工具看到的效果以下:

在這裏插入圖片描述

getLock()方法

img

能夠看到,調用getLock()方法後實際返回一個RedissonLock對象

tryLock方法

下面來看下tryLock方法,源碼以下:

@Override
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return true;
        }
        
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }
        
        current = System.currentTimeMillis();
        RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.onComplete((res, e) -> {
                    if (e == null) {
                        unsubscribe(subscribeFuture, threadId);
                    }
                });
            }
            acquireFailed(threadId);
            return false;
        }

        try {
            time -= System.currentTimeMillis() - current;
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }
        
            while (true) {
                long currentTime = System.currentTimeMillis();
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    return true;
                }

                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }

                // waiting for message
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }

                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
            }
        } finally {
            unsubscribe(subscribeFuture, threadId);
        }
//        return get(tryLockAsync(waitTime, leaseTime, unit));
    }

以上代碼使用了異步回調模式,RFuture 繼承了 java.util.concurrent.Future , CompletionStage 兩大接口,異步回調模式的基礎知識,請參見 《Java高併發核心編程 卷2 》

在這裏插入圖片描述

tryAcquire()方法

在RedissonLock對象的lock()方法主要調用tryAcquire()方法

img

tryLockInnerAsync

img

因爲leaseTime == -1,因而走tryLockInnerAsync()方法,這個方法纔是關鍵

img

首先,看一下evalWriteAsync方法的定義

<T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);

這和前面的jedis調用lua腳本相似,最後兩個參數分別是keys和params。

單獨將調用的那一段摘出來看,實際調用是這樣的:

commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));

結合上面的參數聲明,咱們能夠知道,這裏KEYS[1]就是getName(),ARGV[2]是getLockName(threadId)

假設:

  • 前面獲取鎖時傳的name是「DISLOCK」,
  • 假設調用的線程ID是1,
  • 假設成員變量UUID類型的id是01a6d806-d282-4715-9bec-f51b9aa98110

那麼KEYS[1]=DISLOCK,ARGV[2]=01a6d806-d282-4715-9bec-f51b9aa98110:1

所以,這段腳本的意思是

  一、判斷有沒有一個叫「DISLOCK」的key

  二、若是沒有,則在其下設置一個字段爲「01a6d806-d282-4715-9bec-f51b9aa98110:1」,值爲「1」的鍵值對 ,並設置它的過時時間

  三、若是存在,則進一步判斷「01a6d806-d282-4715-9bec-f51b9aa98110:1」是否存在,若存在,則其值加1,並從新設置過時時間

  四、返回「DISLOCK」的生存時間(毫秒)

原理:加鎖機制

這裏用的數據結構是hash,hash的結構是: key 字段1 值1 字段2 值2 。。。

用在鎖這個場景下,key就表示鎖的名稱,也能夠理解爲臨界資源,字段就表示當前得到鎖的線程

全部競爭這把鎖的線程都要判斷在這個key下有沒有本身線程的字段,若是沒有則不能得到鎖,若是有,則至關於重入,字段值加1(次數)

在這裏插入圖片描述

Lua腳本的詳解

爲什麼要使用lua語言?

由於一大堆複雜的業務邏輯,能夠經過封裝在lua腳本中發送給redis,保證這段複雜業務邏輯執行的原子性

在這裏插入圖片描述

回顧一下evalWriteAsync方法的定義

<T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);

注意,其最後兩個參數分別是keys和params。

關於 lua腳本的參數解釋:

KEYS[1]表明的是你加鎖的那個key,好比說:

RLock lock = redisson.getLock("DISLOCK");

這裏你本身設置了加鎖的那個鎖key就是「DISLOCK」。

ARGV[1]表明的就是鎖key的默認生存時間

調用的時候,傳遞的參數爲 internalLockLeaseTime ,該值默認30秒。

ARGV[2]表明的是加鎖的客戶端的ID,相似於下面這樣:

01a6d806-d282-4715-9bec-f51b9aa98110:1

lua腳本的第一段if判斷語句,就是用「exists DISLOCK」命令判斷一下,若是你要加鎖的那個鎖key不存在的話,你就進行加鎖。

如何加鎖呢?很簡單,用下面的redis命令:

hset DISLOCK 01a6d806-d282-4715-9bec-f51b9aa98110:1 1

經過這個命令設置一個hash數據結構,這行命令執行後,會出現一個相似下面的數據結構:

DISLOCK:
    {
        8743c9c0-0795-4907-87fd-6c719a6b4586:1 1
    }

接着會執行「pexpire DISLOCK 30000」命令,設置DISLOCK這個鎖key的生存時間是30秒(默認)

鎖互斥機制

那麼在這個時候,若是客戶端2來嘗試加鎖,執行了一樣的一段lua腳本,會咋樣呢?

很簡單,第一個if判斷會執行「exists DISLOCK」,發現DISLOCK 這個鎖key已經存在了。

接着第二個if判斷,判斷一下,DISLOCK鎖key的hash數據結構中,是否包含客戶端2的ID,可是明顯不是的,由於那裏包含的是客戶端1的ID。

因此,客戶端2會獲取到pttl DISLOCK返回的一個數字,這個數字表明瞭DISLOCK 這個鎖key的剩餘生存時間。好比還剩15000毫秒的生存時間。

此時客戶端2會進入一個while循環,不停的嘗試加鎖。

可重入加鎖機制

若是客戶端1都已經持有了這把鎖了,結果可重入的加鎖會怎麼樣呢?

RLock lock = redisson.getLock("DISLOCK")
lock.lock();
//業務代碼
lock.lock();
//業務代碼
lock.unlock();
lock.unlock();

分析上面那段lua腳本。

第一個if判斷確定不成立,「exists DISLOCK」會顯示鎖key已經存在了。

第二個if判斷會成立,由於DISLOCK的hash數據結構中包含的那個ID,就是客戶端1的那個ID,也就是「8743c9c0-0795-4907-87fd-6c719a6b4586:1」

此時就會執行可重入加鎖的邏輯,他會用:

incrby DISLOCK

8743c9c0-0795-4907-87fd-6c719a6b4586:1 1

經過這個命令,對客戶端1的加鎖次數,累加1。

此時DISLOCK數據結構變爲下面這樣:

DISLOCK:
    {
        8743c9c0-0795-4907-87fd-6c719a6b4586:1 2
    }

釋放鎖機制

若是執行lock.unlock(),就能夠釋放分佈式鎖,此時的業務邏輯也是很是簡單的。

其實說白了,就是每次都對DISLOCK數據結構中的那個加鎖次數減1。

若是發現加鎖次數是0了,說明這個客戶端已經再也不持有鎖了,此時就會用:

「del DISLOCK」命令,從redis裏刪除這個key。

而後呢,另外的客戶端2就能夠嘗試完成加鎖了。

unlock 源碼

@Override
    public void unlock() {
        try {
            get(unlockAsync(Thread.currentThread().getId()));
        } catch (RedisException e) {
            if (e.getCause() instanceof IllegalMonitorStateException) {
                throw (IllegalMonitorStateException) e.getCause();
            } else {
                throw e;
            }
        }
        
//        Future<Void> future = unlockAsync();
//        future.awaitUninterruptibly();
//        if (future.isSuccess()) {
//            return;
//        }
//        if (future.cause() instanceof IllegalMonitorStateException) {
//            throw (IllegalMonitorStateException)future.cause();
//        }
//        throw commandExecutor.convertException(future);
    }

再深刻一下,實際調用的是unlockInnerAsync方法

unlockInnerAsync方法

在這裏插入圖片描述

原理:Redision 解鎖機制

上圖沒有截取完整,完整的源碼以下:

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                "end; " +
                "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                "else " +
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; "+
                "end; " +
                "return nil;",
                Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));

    }

咱們仍是假設name=DISLOCK,假設線程ID是1

同理,咱們能夠知道

KEYS[1]是getName(),即KEYS[1]=DISLOCK

KEYS[2]是getChannelName(),即KEYS[2]=redisson_lock__channel:{DISLOCK}

ARGV[1]是LockPubSub.unlockMessage,即ARGV[1]=0

ARGV[2]是生存時間

ARGV[3]是getLockName(threadId),即ARGV[3]=8743c9c0-0795-4907-87fd-6c719a6b4586:1

所以,上面腳本的意思是:

  一、判斷是否存在一個叫「DISLOCK」的key

  二、若是不存在,返回nil

  三、若是存在,使用Redis Hincrby 命令用於爲哈希表中的字段值加上指定增量值 -1 ,表明減去1

  四、若counter >,返回空,若字段存在,則字段值減1

  五、若減完之後,counter > 0 值仍大於0,則返回0

  六、減完後,若字段值小於或等於0,則用 publish 命令廣播一條消息,廣播內容是0,並返回1;

能夠猜想,廣播0表示資源可用,即通知那些等待獲取鎖的線程如今能夠得到鎖了

在這裏插入圖片描述

經過redis Channel 解鎖訂閱

以上是正常狀況下獲取到鎖的狀況,那麼當沒法當即獲取到鎖的時候怎麼辦呢?

再回到前面獲取鎖的位置

@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    Long ttl = tryAcquire(leaseTime, unit, threadId);
    // lock acquired
    if (ttl == null) {
        return;
    }

    //    訂閱
    RFuture<RedissonLockEntry> future = subscribe(threadId);
    commandExecutor.syncSubscription(future);

    try {
        while (true) {
            ttl = tryAcquire(leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                break;
            }

            // waiting for message
            if (ttl >= 0) {
                getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                getEntry(threadId).getLatch().acquire();
            }
        }
    } finally {
        unsubscribe(future, threadId);
    }
//        get(lockAsync(leaseTime, unit));
}


protected static final LockPubSub PUBSUB = new LockPubSub();

protected RFuture<RedissonLockEntry> subscribe(long threadId) {
    return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
}

protected void unsubscribe(RFuture<RedissonLockEntry> future, long threadId) {
    PUBSUB.unsubscribe(future.getNow(), getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
}

這裏會訂閱Channel,當資源可用時能夠及時知道,並搶佔,防止無效的輪詢而浪費資源

這裏的channel爲:

redisson_lock__channel:{DISLOCK}

在這裏插入圖片描述

在這裏插入圖片描述

當資源可用用的時候,循環去嘗試獲取鎖,因爲多個線程同時去競爭資源,因此這裏用了信號量,對於同一個資源只容許一個線程得到鎖,其它的線程阻塞

這點,有點兒相似 Zookeeper分佈式鎖:

有關zookeeper分佈式鎖的原理和實現,具體請參見下面的博客:
Zookeeper 分佈式鎖 (圖解+秒懂+史上最全)

watch dog自動延期機制

客戶端1加鎖的鎖key默認生存時間才30秒,若是超過了30秒,客戶端1還想一直持有這把鎖,怎麼辦呢?

簡單!只要客戶端1一旦加鎖成功,就會啓動一個watch dog看門狗,他是一個後臺線程,會每隔10秒檢查一下,若是客戶端1還持有鎖key,那麼就會不斷的延長鎖key的生存時間。


使用watchDog機制實現鎖的續期

可是聰明的同窗確定會問:

有效時間設置多長,假如個人業務操做比有效時間長,個人業務代碼還沒執行完,就自動給我解鎖了,不就完蛋了嗎。

這個問題就有點棘手了,在網上也有不少討論:

第一種解決方法就是靠程序員本身去把握,預估一下業務代碼須要執行的時間,而後設置有效期時間比執行時間長一些,保證不會由於自動解鎖影響到客戶端業務代碼的執行。

可是這並非萬全之策,好比網絡抖動這種狀況是沒法預測的,也有可能致使業務代碼執行的時間變長,因此並不安全。

第二種方法,使用監事狗watchDog機制實現鎖的續期。

第二種方法比較靠譜一點,並且無業務入侵。

在Redisson框架實現分佈式鎖的思路,就使用watchDog機制實現鎖的續期。

當加鎖成功後,同時開啓守護線程,默認有效期是30秒,每隔10秒就會給鎖續期到30秒,只要持有鎖的客戶端沒有宕機,就能保證一直持有鎖,直到業務代碼執行完畢由客戶端本身解鎖,若是宕機了天然就在有效期失效後自動解鎖。

這裏,和前面解決 JVM STW的鎖過時問題有點相似,只不過,watchDog自動續期,也沒有徹底解決JVM STW的鎖過時問題。

如何完全解決 JVM STW的鎖過時問題,能夠來瘋狂創客圈的社羣討論。

redisson watchdog 使用和原理

實際上,redisson加鎖的基本流程圖以下:

在這裏插入圖片描述

這裏專一於介紹watchdog。

首先watchdog的具體思路是 加鎖時,默認加鎖 30秒,每10秒鐘檢查一次,若是存在就從新設置 過時時間爲30秒。

而後設置默認加鎖時間的參數是 lockWatchdogTimeout(監控鎖的看門狗超時,單位:毫秒)

官方文檔描述以下

lockWatchdogTimeout(監控鎖的看門狗超時,單位:毫秒)

默認值:30000

監控鎖的看門狗超時時間單位爲毫秒。該參數只適用於分佈式鎖的加鎖請求中未明確使用leaseTimeout參數的狀況。若是該看門狗未使用lockWatchdogTimeout去從新調整一個分佈式鎖的lockWatchdogTimeout超時,那麼這個鎖將變爲失效狀態。這個參數能夠用來避免由Redisson客戶端節點宕機或其餘緣由形成死鎖的狀況。

須要注意的是

1.watchDog 只有在未顯示指定加鎖時間時纔會生效。(這點很重要)

2.lockWatchdogTimeout設定的時間不要過小 ,好比我以前設置的是 100毫秒,因爲網絡直接致使加鎖完後,watchdog去延期時,這個key在redis中已經被刪除了。

tryAcquireAsync原理

在調用lock方法時,會最終調用到tryAcquireAsync。詳細解釋以下:

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    //若是指定了加鎖時間,會直接去加鎖
        if (leaseTime != -1) {
            return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
    //沒有指定加鎖時間 會先進行加鎖,而且默認時間就是 LockWatchdogTimeout的時間
    //這個是異步操做 返回RFuture 相似netty中的future
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,
                                                commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
                                                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
       //這裏也是相似netty Future 的addListener,在future內容執行完成後執行
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e != null) {
                return;
            }

            // lock acquired
            if (ttlRemaining == null) {
            //這裏是定時執行 當前鎖自動延期的動做
                scheduleExpirationRenewal(threadId);
            }
        });
        return ttlRemainingFuture;
    }

scheduleExpirationRenewal 中會調用renewExpiration。

renewExpiration執行延期動做

這裏咱們能夠看到是 啓用了一個timeout定時,去執行延期動做

private void renewExpiration() {
   
      
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    return;
                }
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                future.onComplete((res, e) -> {
                    if (e != null) {
                        log.error("Can't update lock " + getName() + " expiration", e);
                        return;
                    }
                    
                    if (res) {
                     	//若是 沒有報錯,就再次定時延期
                     // reschedule itself
                     
                        renewExpiration();
                    }
                });
            }
            // 這裏咱們能夠看到定時任務 是 lockWatchdogTimeout 的1/3時間去執行 renewExpirationAsync
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
        
        ee.setTimeout(task);
    }

最終 scheduleExpirationRenewal會調用到 renewExpirationAsync,

renewExpirationAsync

執行下面這段 lua腳本。他主要判斷就是 這個鎖是否在redis中存在,若是存在就進行 pexpire 延期。

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
        return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return 0;",
                Collections.singletonList(getName()),
                internalLockLeaseTime, getLockName(threadId));
    }

watchLog總結

1.要使 watchLog機制生效 ,lock時 不要設置 過時時間

2.watchlog的延時時間 能夠由 lockWatchdogTimeout指定默認延時時間,可是不要設置過小。如100

3.watchdog 會每 lockWatchdogTimeout/3時間,去延時。

4.watchdog 經過 相似netty的 Future功能來實現異步延時

5.watchdog 最終仍是經過 lua腳原本進行延時

Redisson框架的分佈式鎖

Redisson框架十分強大,除了前面介紹的 getLock方法獲取的分佈式鎖(輸入可重入鎖的類型),還有不少其餘的分佈式鎖類型。

整體的Redisson框架的分佈式鎖類型,大體以下:

  • 可重入鎖
  • 公平鎖
  • 聯鎖
  • 紅鎖
  • 讀寫鎖
  • 信號量
  • 可過時信號量
  • 閉鎖(/倒數閂)

1.可重入鎖(Reentrant Lock)

Redisson的分佈式可重入鎖RLock Java對象實現了java.util.concurrent.locks.Lock接口,同時還支持自動過時解鎖。

public void testReentrantLock(RedissonClient redisson){
	RLock lock = redisson.getLock("anyLock");
	try{
		// 1. 最多見的使用方法
		//lock.lock();
		// 2. 支持過時解鎖功能,10秒鐘之後自動解鎖, 無需調用unlock方法手動解鎖
		//lock.lock(10, TimeUnit.SECONDS);
		// 3. 嘗試加鎖,最多等待3秒,上鎖之後10秒自動解鎖
		boolean res = lock.tryLock(3, 10, TimeUnit.SECONDS);
		if(res){ //成功
		// do your business
		}
	} catch (InterruptedException e) {
		e.printStackTrace();
	} finally {
		lock.unlock();
	}
}

Redisson同時還爲分佈式鎖提供了異步執行的相關方法:

public void testAsyncReentrantLock(RedissonClient redisson){
	RLock lock = redisson.getLock("anyLock");
	try{
		lock.lockAsync();
		lock.lockAsync(10, TimeUnit.SECONDS);
		Future<Boolean> res = lock.tryLockAsync(3, 10, TimeUnit.SECONDS);
		if(res.get()){
		// do your business
		}
	} catch (InterruptedException e) {
		e.printStackTrace();
	} catch (ExecutionException e) {
		e.printStackTrace();
	} finally {
		lock.unlock();
	}
}

2.公平鎖(Fair Lock)

Redisson分佈式可重入公平鎖也是實現了java.util.concurrent.locks.Lock接口的一種RLock對象。在提供了自動過時解鎖功能的同時,保證了當多個Redisson客戶端線程同時請求加鎖時,優先分配給先發出請求的線程。

public void testFairLock(RedissonClient redisson){
	RLock fairLock = redisson.getFairLock("anyLock");
	try{
		// 最多見的使用方法
		fairLock.lock();
		// 支持過時解鎖功能, 10秒鐘之後自動解鎖,無需調用unlock方法手動解鎖
		fairLock.lock(10, TimeUnit.SECONDS);
		// 嘗試加鎖,最多等待100秒,上鎖之後10秒自動解鎖
		boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
	} catch (InterruptedException e) {
		e.printStackTrace();
	} finally {
		fairLock.unlock();
	}
}

Redisson同時還爲分佈式可重入公平鎖提供了異步執行的相關方法:

RLock fairLock = redisson.getFairLock("anyLock");
fairLock.lockAsync();
fairLock.lockAsync(10, TimeUnit.SECONDS);
Future<Boolean> res = fairLock.tryLockAsync(100, 10, TimeUnit.SECONDS);

3.聯鎖(MultiLock)

Redisson的RedissonMultiLock對象能夠將多個RLock對象關聯爲一個聯鎖,每一個RLock對象實例能夠來自於不一樣的Redisson實例。

public void testMultiLock(RedissonClient redisson1,RedissonClient redisson2, RedissonClient redisson3){
	RLock lock1 = redisson1.getLock("lock1");
	RLock lock2 = redisson2.getLock("lock2");
	RLock lock3 = redisson3.getLock("lock3");
	RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
	try {
		// 同時加鎖:lock1 lock2 lock3, 全部的鎖都上鎖成功纔算成功。
		lock.lock();
		// 嘗試加鎖,最多等待100秒,上鎖之後10秒自動解鎖
		boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
	} catch (InterruptedException e) {
		e.printStackTrace();
	} finally {
		lock.unlock();
	}
}

4.紅鎖(RedLock)

Redisson的RedissonRedLock對象實現了Redlock介紹的加鎖算法。該對象也能夠用來將多個RLock對象關聯爲一個紅鎖,每一個RLock對象實例能夠來自於不一樣的Redisson實例。

public void testRedLock(RedissonClient redisson1,RedissonClient redisson2, RedissonClient redisson3){
	RLock lock1 = redisson1.getLock("lock1");
	RLock lock2 = redisson2.getLock("lock2");
	RLock lock3 = redisson3.getLock("lock3");
	RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
	try {
		// 同時加鎖:lock1 lock2 lock3, 紅鎖在大部分節點上加鎖成功就算成功。
		lock.lock();
		// 嘗試加鎖,最多等待100秒,上鎖之後10秒自動解鎖
		boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
	} catch (InterruptedException e) {
		e.printStackTrace();
	} finally {
		lock.unlock();
	}
}

5.讀寫鎖(ReadWriteLock)

Redisson的分佈式可重入讀寫鎖RReadWriteLock,Java對象實現了java.util.concurrent.locks.ReadWriteLock接口。同時還支持自動過時解鎖。該對象容許同時有多個讀取鎖,可是最多隻能有一個寫入鎖。

RReadWriteLock rwlock = redisson.getLock("anyRWLock");
// 最多見的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();
// 支持過時解鎖功能
// 10秒鐘之後自動解鎖
// 無需調用unlock方法手動解鎖
rwlock.readLock().lock(10, TimeUnit.SECONDS);
// 或
rwlock.writeLock().lock(10, TimeUnit.SECONDS);
// 嘗試加鎖,最多等待100秒,上鎖之後10秒自動解鎖
boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);
// 或
boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();

6.信號量(Semaphore)

Redisson的分佈式信號量(Semaphore)Java對象RSemaphore採用了與java.util.concurrent.Semaphore類似的接口和用法。

RSemaphore semaphore = redisson.getSemaphore("semaphore");
semaphore.acquire();
//或
semaphore.acquireAsync();
semaphore.acquire(23);
semaphore.tryAcquire();
//或
semaphore.tryAcquireAsync();
semaphore.tryAcquire(23, TimeUnit.SECONDS);
//或
semaphore.tryAcquireAsync(23, TimeUnit.SECONDS);
semaphore.release(10);
semaphore.release();
//或
semaphore.releaseAsync();

7.可過時性信號量(PermitExpirableSemaphore)

Redisson的可過時性信號量(PermitExpirableSemaphore)實在RSemaphore對象的基礎上,爲每一個信號增長了一個過時時間。每一個信號能夠經過獨立的ID來辨識,釋放時只能經過提交這個ID才能釋放。

RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("mySemaphore");
String permitId = semaphore.acquire();
// 獲取一個信號,有效期只有2秒鐘。
String permitId = semaphore.acquire(2, TimeUnit.SECONDS);
// ...
semaphore.release(permitId);

8.閉鎖/倒數閂(CountDownLatch)

Redisson的分佈式閉鎖(CountDownLatch)Java對象RCountDownLatch採用了與java.util.concurrent.CountDownLatch類似的接口和用法。

RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(1);
latch.await();
// 在其餘線程或其餘JVM裏
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.countDown();

redis分佈式鎖的高可用

關於Redis分佈式鎖的高可用問題,大體以下:

在master- slave的集羣架構中,就是若是你對某個redis master實例,寫入了DISLOCK這種鎖key的value,此時會異步複製給對應的master slave實例。

可是,這個過程當中一旦發生redis master宕機,主備切換,redis slave變爲了redis master。而此時的主從複製沒有完全完成.....

接着就會致使,客戶端2來嘗試加鎖的時候,在新的redis master上完成了加鎖,而客戶端1也覺得本身成功加了鎖。

此時就會致使多個客戶端對一個分佈式鎖完成了加鎖。

這時系統在業務語義上必定會出現問題,致使髒數據的產生。

因此這個是是redis master-slave架構的主從異步複製致使的redis分佈式鎖的最大缺陷:

在redis master實例宕機的時候,可能致使多個客戶端同時完成加鎖。

高可用的RedLock(紅鎖)原理

RedLock算法思想:

不能只在一個redis實例上建立鎖,應該是在多個redis實例上建立鎖,n / 2 + 1,必須在大多數redis節點上都成功建立鎖,才能算這個總體的RedLock加鎖成功,避免說僅僅在一個redis實例上加鎖而帶來的問題。

這個場景是假設有一個 redis cluster,有 5 個 redis master 實例。而後執行以下步驟獲取一把紅鎖:

  1. 獲取當前時間戳,單位是毫秒;
  2. 跟上面相似,輪流嘗試在每一個 master 節點上建立鎖,過時時間較短,通常就幾十毫秒;
  3. 嘗試在大多數節點上創建一個鎖,好比 5 個節點就要求是 3 個節點 n / 2 + 1;
  4. 客戶端計算創建好鎖的時間,若是創建鎖的時間小於超時時間,就算創建成功了;
  5. 要是鎖創建失敗了,那麼就依次以前創建過的鎖刪除;
  6. 只要別人創建了一把分佈式鎖,你就得不斷輪詢去嘗試獲取鎖。

img

RedLock是基於redis實現的分佈式鎖,它可以保證如下特性:

  • 互斥性:在任什麼時候候,只能有一個客戶端可以持有鎖;避免死鎖:

  • 當客戶端拿到鎖後,即便發生了網絡分區或者客戶端宕機,也不會發生死鎖;(利用key的存活時間)

  • 容錯性:只要多數節點的redis實例正常運行,就可以對外提供服務,加鎖或者釋放鎖;

以sentinel模式架構爲例,以下圖所示,有sentinel-1,sentinel-2,sentinel-3總計3個sentinel模式集羣,若是要獲取分佈式鎖,那麼須要向這3個sentinel集羣經過EVAL命令執行LUA腳本,須要3/2+1=2,即至少2個sentinel集羣響應成功,纔算成功的以Redlock算法獲取到分佈式鎖:

Redisson

高可用的紅鎖會致使性能下降

提早說明,使用redis分佈式鎖,是追求高性能, 在cap理論中,追求的是 ap 而不是cp。

因此,若是追求高可用,建議使用 zookeeper分佈式鎖。

redis分佈式鎖可能致使的數據不一致性,建議使用業務補償的方式去彌補。因此,不太建議使用紅鎖,可是從學習的層面來講,你們仍是必定要掌握的。

實現原理

Redisson中有一個MultiLock的概念,能夠將多個鎖合併爲一個大鎖,對一個大鎖進行統一的申請加鎖以及釋放鎖

而Redisson中實現RedLock就是基於MultiLock 去作的,接下來就具體看看對應的實現吧

RedLock使用案例

先看下官方的代碼使用:
(https://github.com/redisson/redisson/wiki/8.-distributed-locks-and-synchronizers#84-redlock)

RLock lock1 = redisson1.getLock("lock1");
RLock lock2 = redisson2.getLock("lock2");
RLock lock3 = redisson3.getLock("lock3");

RLock redLock = anyRedisson.getRedLock(lock1, lock2, lock3);

// traditional lock method
redLock.lock();

// or acquire lock and automatically unlock it after 10 seconds
redLock.lock(10, TimeUnit.SECONDS);

// or wait for lock aquisition up to 100 seconds 
// and automatically unlock it after 10 seconds
boolean res = redLock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
   try {
     ...
   } finally {
       redLock.unlock();
   }
}

這裏是分別對3個redis實例加鎖,而後獲取一個最後的加鎖結果。

RedissonRedLock實現原理

上面示例中使用redLock.lock()或者tryLock()最終都是執行RedissonRedLock中方法。

RedissonRedLock 繼承自RedissonMultiLock, 實現了其中的一些方法:

public class RedissonRedLock extends RedissonMultiLock {
    public RedissonRedLock(RLock... locks) {
        super(locks);
    }

    /**
     * 鎖能夠失敗的次數,鎖的數量-鎖成功客戶端最小的數量
     */
    @Override
    protected int failedLocksLimit() {
        return locks.size() - minLocksAmount(locks);
    }
    
    /**
     * 鎖的數量 / 2 + 1,例若有3個客戶端加鎖,那麼最少須要2個客戶端加鎖成功
     */
    protected int minLocksAmount(final List<RLock> locks) {
        return locks.size()/2 + 1;
    }

    /** 
     * 計算多個客戶端一塊兒加鎖的超時時間,每一個客戶端的等待時間
     * remainTime默認爲4.5s
     */
    @Override
    protected long calcLockWaitTime(long remainTime) {
        return Math.max(remainTime / locks.size(), 1);
    }
    
    @Override
    public void unlock() {
        unlockInner(locks);
    }

}

看到locks.size()/2 + 1 ,例如咱們有3個客戶端實例,那麼最少2個實例加鎖成功纔算分佈式鎖加鎖成功。

接着咱們看下lock()的具體實現

RedissonMultiLock實現原理

public class RedissonMultiLock implements Lock {

    final List<RLock> locks = new ArrayList<RLock>();

    public RedissonMultiLock(RLock... locks) {
        if (locks.length == 0) {
            throw new IllegalArgumentException("Lock objects are not defined");
        }
        this.locks.addAll(Arrays.asList(locks));
    }

    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long newLeaseTime = -1;
        if (leaseTime != -1) {
            // 若是等待時間設置了,那麼將等待時間 * 2
            newLeaseTime = unit.toMillis(waitTime)*2;
        }
        
        // time爲當前時間戳
        long time = System.currentTimeMillis();
        long remainTime = -1;
        if (waitTime != -1) {
            remainTime = unit.toMillis(waitTime);
        }
        // 計算鎖的等待時間,RedLock中:若是remainTime=-1,那麼lockWaitTime爲1
        long lockWaitTime = calcLockWaitTime(remainTime);
        
        // RedLock中failedLocksLimit即爲n/2 + 1
        int failedLocksLimit = failedLocksLimit();
        List<RLock> acquiredLocks = new ArrayList<RLock>(locks.size());
        // 循環每一個redis客戶端,去獲取鎖
        for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
            RLock lock = iterator.next();
            boolean lockAcquired;
            try {
                // 調用tryLock方法去獲取鎖,若是獲取鎖成功,則lockAcquired=true
                if (waitTime == -1 && leaseTime == -1) {
                    lockAcquired = lock.tryLock();
                } else {
                    long awaitTime = Math.min(lockWaitTime, remainTime);
                    lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
                }
            } catch (Exception e) {
                lockAcquired = false;
            }
            
            // 若是獲取鎖成功,將鎖加入到list集合中
            if (lockAcquired) {
                acquiredLocks.add(lock);
            } else {
                // 若是獲取鎖失敗,判斷失敗次數是否等於失敗的限制次數
                // 好比,3個redis客戶端,最多隻能失敗1次
                // 這裏locks.size = 3, 3-x=1,說明只要成功了2次就能夠直接break掉循環
                if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
                    break;
                }

                // 若是最大失敗次數等於0
                if (failedLocksLimit == 0) {
                    // 釋放全部的鎖,RedLock加鎖失敗
                    unlockInner(acquiredLocks);
                    if (waitTime == -1 && leaseTime == -1) {
                        return false;
                    }
                    failedLocksLimit = failedLocksLimit();
                    acquiredLocks.clear();
                    // 重置迭代器 重試再次獲取鎖
                    while (iterator.hasPrevious()) {
                        iterator.previous();
                    }
                } else {
                    // 失敗的限制次數減一
                    // 好比3個redis實例,最大的限制次數是1,若是遍歷第一個redis實例,失敗了,那麼failedLocksLimit會減成0
                    // 若是failedLocksLimit就會走上面的if邏輯,釋放全部的鎖,而後返回false
                    failedLocksLimit--;
                }
            }
            
            if (remainTime != -1) {
                remainTime -= (System.currentTimeMillis() - time);
                time = System.currentTimeMillis();
                if (remainTime <= 0) {
                    unlockInner(acquiredLocks);
                    return false;
                }
            }
        }

        if (leaseTime != -1) {
            List<RFuture<Boolean>> futures = new ArrayList<RFuture<Boolean>>(acquiredLocks.size());
            for (RLock rLock : acquiredLocks) {
                RFuture<Boolean> future = rLock.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
                futures.add(future);
            }
            
            for (RFuture<Boolean> rFuture : futures) {
                rFuture.syncUninterruptibly();
            }
        }
        
        return true;
    }
}

核心代碼都已經加了註釋,實現原理其實很簡單,基於RedLock思想,遍歷全部的Redis客戶端,而後依次加鎖,最後統計成功的次數來判斷是否加鎖成功。

Redis分段鎖

普通Redis分佈式鎖的性能瓶頸問題

分佈式鎖一旦加了以後,對同一個商品的下單請求,會致使全部下單操做,都必須對同一個商品key加分佈式鎖。

假設一個商品1分鐘6000訂單,每秒的 600個下單操做,假設加鎖以後,釋放鎖以前,查庫存 -> 建立訂單 -> 扣減庫存,每一個IO操做100ms,大概300毫秒。

具體以下圖:

在這裏插入圖片描述

能夠再進行一下優化,將 建立訂單 + 扣減庫存 併發執行,將兩個100ms 減小爲一個100ms,這既是空間換時間的思想,大概200毫秒。

在這裏插入圖片描述

將 建立訂單 + 扣減庫存 批量執行,減小一次IO,也是大概200毫秒。

這個優化方案,有個重要的前提,就是 訂單表和庫存表在相同的庫中,可是,這個前提條件,在數據量大+高併發的場景下,夠嗆。

那麼,一秒內,只能完成多少個商品的秒殺訂單的下單操做呢?

1000毫秒 / 200 =5 個訂單

如何達到每秒600個下單呢? 仍是要從基礎知識裏邊尋找答案?

分段加鎖的思想來源

分段加鎖的思想來源與基礎知識。

我常常在瘋狂創客圈社羣裏邊,對小夥伴們強調 基礎知識的重要性,反覆強調, 《Java 高併發三部曲》 必定要多刷,最好刷三遍。

《Java 高併發核心編程 卷2》 介紹了 JUC的 LongAdder 和 ConcurrentHashMap的源碼和底層原理,他們提高性能的辦法是:

空間換時間, 分段加鎖

尤爲是 LongAdder 的實現思想,能夠用於 Redis分佈式鎖 做爲性能提高的手段,將 Redis分佈式鎖 優化爲 Redis分段鎖。

有關LongAdder 的系統化學習

有關LongAdder 的系統化學習,請參見 《Java 高併發核心編程 卷2》

在這裏插入圖片描述

使用Redis分段鎖提高秒殺的併發性能

回到前面的場景:

假設一個商品1分鐘6000訂單,每秒的 600個下單操做,假設加鎖以後,釋放鎖以前,查庫存 -> 建立訂單 -> 扣減庫存,通過優化,每一個IO操做100ms,大概200毫秒,一秒鐘5個訂單。

爲了達到每秒600個訂單,能夠將鎖分紅 600 /5 =120 個段, 每一次使用隨機算法,隨機到一個分段, 若是不行,就輪詢下一個分段,具體的流程,大體以下:

在這裏插入圖片描述

缺點:

這個是一個理論的時間預估,沒有扣除 嘗試下一個分段的 時間, 另外,實際上的性能, 會比理論上差,從我們實操案例的測試結果,也能夠證實這點。

實戰: 手寫一個Redis分段鎖

尼恩的忠實建議:

  • 理論水平的提高,看看視頻、看看書,只有兩個字,就是須要:多看。

  • 實戰水平的提高,只有兩個字,就是須要:多幹。

參照 LongAdder ,手寫一個Redis分段鎖, 仍是有點複雜,可是很重要,建議你們動手幹一票.

手寫一個Redis分段鎖的實操,是高併發實戰的重要動手實操之一。

有關Redis分段鎖的實操的具體材料、源碼、問題,歡迎來 瘋狂創客圈社羣交流。

高併發Java發燒友社羣 - 瘋狂創客圈 總入口 點擊瞭解詳情

文章核心內容和源碼來源

圖書:《Netty Zookeeper Redis 高併發實戰》 圖書簡介 - 瘋狂創...

參考文檔:

圖書:《Netty Zookeeper Redis 高併發實戰》 圖書簡介 - 瘋狂創...

Distributed locks with Redis

how-to-do-distributed-locking

redisson watchdog 使用和原理

zookeeper實現分佈式鎖_java_腳本之家

基於Zookeeper分佈式鎖實現 - SegmentFault 思否

分佈式鎖用 Redis 仍是 Zookeeper - 知乎

ZooKeeper分佈式鎖的實現原理 - 菜鳥奮鬥史 - 博客園

http://www.javashuo.com/article/p-rfbajnnj-ed.html

相關文章
相關標籤/搜索