緩存和分佈式鎖

在這裏插入圖片描述

@[toc]html

1、緩存

一、緩存使用

爲了系統性能的提高,咱們通常都會將部分數據放入緩存中,加速訪問。而db承擔數據落盤工做。java

哪些教據適合放入緩存?node

  • 即時性、數據一致性要求不高的git

  • 訪問量大且更新頻率不高的數據(讀多,寫少)github

舉例:電商類應用,商品分類,商品列表等適合緩存並加一個失效時間(根據數據更新頻率來定),後臺若是發佈一個商品,買家須要5分鐘才能看到新的商品通常仍是能夠接受的。redis

在這裏插入圖片描述

僞代碼邏輯:算法

data = cache.load(id);//從緩存加載數據
if(data == null){
	data = db.loadid);//從數據庫加載數據
	cache.put(id,data);//保存到cache中
}
retum data;
複製代碼

注意:在開發中,凡是放入緩存中的數據都應該指定過時時間,使其能夠在系統即便沒有主動更新數據也能自動觸發數據加載進緩存的流程。避免業務崩潰致使的數據永久不一致題。spring

本地緩存: 適合單體應用 shell

在這裏插入圖片描述

分佈式緩存-本地模式在分佈式下的問題: 緩存一致性問題、拓展性問題、高可用問題 數據庫

在這裏插入圖片描述

分佈式緩存: 能夠解決前面兩個的不足,目前最常使用的是redis

在這裏插入圖片描述

二、整合 redis 做爲緩存

須要建立一個 Spring Boot 項目來整合 Redis。若是尚未安裝 Redis,那麼 Redis 的安裝能夠參考:在CentOS中安裝和使用Docker 這篇內容。或者使用 Windows 版本的 Redis 也是能夠的。

一、配置pom 文件

SpringBoot 項目的 pom 文件中引入 redis 依賴,能夠不用寫版本號,使用SpringBoot的默認配置項:

<!-- 引入redis -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
複製代碼

引入依賴以後,項目中就會有 RedisAutoConfiguration.java 自動配置類,能夠進行 redis 的自動配置。 RedisAutoConfiguration.java 將 redis 的全部配置屬性都放在 RedisProperties.java 類中。

二、配置application.yml文件:

spring: 
 redis:
 host: 192.168.56.10 # redis地址
 port: 6379 # 端口號,默認爲6379.相同的話也能夠不配
複製代碼

三、測試 redis

RedisAutoConfiguration.java 類中,已經爲咱們提供了RedisTemplate<Object, Object>StringRedisTemplate 兩個類,來封裝 redis 的操做,下面來使用 StringRedisTemplate 測試一下。

在測試類中添加下面的代碼:

@Autowired
StringRedisTemplate stringRedisTemplate;

@Test
public void testStringRedisTemplate(){
    // 操做字符串
    ValueOperations<String, String> ops = stringRedisTemplate.opsForValue();

    // 保存
    ops.set("hello", "world_"+ UUID.randomUUID().toString());

    // 查詢
    String hello = ops.get("hello");
    System.out.println("獲取以前保存的數據:"+hello);
}
複製代碼

測試輸出結果:獲取以前保存的數據:world_90bf25e1-2e84-4f50-b6e2-5eaab32b4175

還能夠經過安裝 redis 可視化工具 RedisDesktopManager 來查看以前保存的數據:

在這裏插入圖片描述

2、緩存失效問題

一、高併發下緩存失效問題-緩存穿透

在這裏插入圖片描述
緩存穿透: 指查詢一個 必定不存在的數據,因爲緩存是不命中,將去查詢數據庫,可是數據庫也無此記錄,咱們沒有將此次查詢的null寫入緩存,這將致使這個不存在的數據每次請求都要到存儲層去查詢,失去了緩存的意義

風險: 利用不存在的數據進行攻擊,數據庫瞬時壓力增大,最終致使崩潰

解決null結果緩存,並加入短暫過時時間

二、高併發下緩存失效問題-緩存雪崩

在這裏插入圖片描述

緩存雪崩: 緩存雪崩是指在咱們設置緩存時key採用了相同的過時時間,致使緩存在某一時刻同時失效,請求所有轉發到DB,DB瞬時 壓力太重雪崩。

解決原有的失效時間基礎上增長一個隨機值,好比1-5分鐘隨機,這樣每個緩存的過時時間的重複率就會下降,就很難引起集體失效的事件。

三、高併發下緩存失效問題-緩存擊穿

在這裏插入圖片描述

緩存擊穿: • 對於一些設置了過時時間的key,若是這些key可能會在某些時間點被超高併發地訪問,是一種很是「熱點」的數據。 • 若是這個key在大量請求同時進來前正好失效,那麼全部對這個key的數據查詢都落到db,咱們稱爲緩存擊穿。

解決加鎖,大量併發只讓一個去查,其餘人等待,查到之後釋放鎖,其餘人獲取到鎖,先查緩存,就會有數據,不用去db

3、緩存數據一致性

4、分佈式鎖

一、分佈式下如何加鎖?

先來看一個本地鎖的例子:咱們有一個商品服務,每個服務都部署在一個獨立的tomcat中,每個服務中都使用一個鎖。假設目前有8個服務,則須要加8把鎖,且這8把鎖相互獨立。

在這裏插入圖片描述
本地鎖,只能鎖住當前進程,因此咱們須要分佈式鎖

二、鎖-時序問題

在加鎖的時候,須要將設置查數據庫和設置緩存這一步同時放入加鎖的方法中,鬥則會出現屢次查詢數據庫的狀況,這是因爲第一次查詢數據庫的時候,數據尚未放入緩存,而設置緩存也是須要時間的,在設置緩存的這段時間內,緩存中尚未數據,就有可能因爲併發較高,致使屢次查詢數據庫,沒有命中緩存,因此就須要就將設置緩存放到加鎖查數據庫的邏輯裏。

在這裏插入圖片描述

三、分佈式鎖演進-基本原理

因爲本地鎖只能鎖住當前進程,若是咱們在進行秒殺活動或者說搶優惠券活動的時候,若是隻剩了1件商品或者1張優惠券,若是使用的是本地鎖,同時多個服務一塊請求獲取數據,就有可能產生「超賣」的現象,爲了不這種狀況的發生,咱們就須要使用分佈式鎖。

咱們能夠同時去一個地方「佔坑(加鎖)」,若是佔到,就執行邏輯。不然就必須等待,直到釋放鎖。 「佔坑(加鎖)」能夠去 redis,也能夠去數據庫,能夠去任何服務都能訪問的地。 若是沒有獲取到鎖,則能夠能夠以自旋的方式進行等待。

在這裏插入圖片描述

一、分佈式鎖演進-V1,setnx("lock","1")

在這裏插入圖片描述

/** * 從數據庫獲取數據,使用redis的分佈式鎖 V1 * 問題: * 一、setnx佔好了位,業務代碼異常或者程序在頁面過程 * 中宕機。沒有執行刪除鎖邏輯,這就形成了死鎖 * 解決: * 設置鎖的自動過時,即便沒有刪除,會自動刪除 * @see CategoryServiceImpl#getCatalogJsonFromDbWithRedisLockV2() * * @return */
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLockV1() {
    //一、佔分布式鎖,去redis佔鎖,對用redis命令 set lock 1 NX
    Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1");
    if (lock){
        //加鎖成功,執行業務
        Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();
        //刪除鎖
        stringRedisTemplate.delete("lock");
        return dataFromDb;
    }else {
        //加鎖失敗...重試
        //休眠100ms重試
        //自旋的方式
        return getCatalogJsonFromDbWithRedisLockV1();
    }
}
複製代碼

存在問題:

setnx佔好了位,業務代碼異常或者程序在執行過程當中宕機。沒有執行刪除鎖邏輯,這就形成了死鎖

如何解決:

設置鎖的自動過時,即便沒有刪除,會自動刪除

二、分佈式鎖演進-V2,setnx("lock","1")+設置鎖過時時間

在這裏插入圖片描述

/** * 從數據庫獲取數據,使用redis的分佈式鎖 V2 * 問題: * 一、setnx設置好,正要去設置過時時間,宕機。又死鎖了。 * 解決: * 設置過時時間和佔位必須是原子的。redis支持使用 setnx ex 命令 (set lock 1 EX 30 NX 加鎖和設置過時時間在一個語句中完成,設置30秒過時) * @see CategoryServiceImpl#getCatalogJsonFromDbWithRedisLockV3() * @return */
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLockV2() {
    //一、佔分布式鎖,去redis佔鎖,對用redis命令 set lock 1 NX
    Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1");
    if (lock){
        //加鎖成功,執行業務
        //二、設置過時時間
        stringRedisTemplate.expire("lock",30, TimeUnit.SECONDS);
        Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();
        //刪除鎖
        stringRedisTemplate.delete("lock");
        return dataFromDb;
    }else {
        //加鎖失敗...重試
        //休眠100ms重試
        //自旋的方式
        return getCatalogJsonFromDbWithRedisLockV1();
    }
}
複製代碼

問題:

setnx設置好,正要去設置過時時間,結果忽然斷電,服務宕機。又死鎖了。

解決:

設置過時時間和佔位必須是原子的。redis支持使用 setnx ex 命令

三、分佈式鎖演進-V3,setnx ex 原子操做

在這裏插入圖片描述

/** * 從數據庫獲取數據,使用redis的分佈式鎖 V3 * 問題: * 一、刪除鎖直接刪除的問題? * 若是因爲業務時間很長,鎖本身過時了,咱們直接刪除,有可能把別人正在持有的鎖刪除了。 * 解決: * 佔鎖的時候,值指定爲uuid,每一個人匹配是本身的鎖才刪除。 * @see CategoryServiceImpl#getCatalogJsonFromDbWithRedisLockV4() * @return */
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLockV3() {
    //一、佔分布式鎖,去redis佔鎖,對用redis命令
    //二、設置過時時間,必須和加鎖是同步的,原子的 set lock 1 EX 30 NX
    Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1", 30, TimeUnit.SECONDS);
    if (lock){
        //加鎖成功,執行業務
        Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();
        //刪除鎖
        stringRedisTemplate.delete("lock");
        return dataFromDb;
    }else {
        //加鎖失敗...重試
        //休眠100ms重試
        //自旋的方式
        return getCatalogJsonFromDbWithRedisLockV1();
    }
}
複製代碼

問題:

刪除鎖直接刪除的問題? 若是因爲業務時間很長,鎖本身過時了,咱們直接刪除,有可能把別人正在持有的鎖刪除了。

解決:

佔鎖的時候,值指定爲 uuid每一個人匹配是本身的鎖才刪除。

四、分佈式鎖演進-V4,setnx ex 原子操做+惟一鎖值

在這裏插入圖片描述

/** * 從數據庫獲取數據,使用redis的分佈式鎖 V4 * 問題: * 一、若是正好判斷是當前值,正要刪除鎖的時候,鎖已通過期,別人已經設置到了新的值。那麼咱們刪除的是別人的鎖 * 解決: * 刪除鎖必須保證原子性。使用redis+Lua腳本完成 * @see CategoryServiceImpl#getCatalogJsonFromDbWithRedisLockV5() * @return */
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLockV4() {
    //一、佔分布式鎖,去redis佔鎖,對用redis命令
    //二、設置過時時間,必須和加鎖是同步的,原子的 set lock 1 EX 30 NX
    //使用uuid做爲鎖的值
    String uuid = UUID.randomUUID().toString();
    Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 30, TimeUnit.SECONDS);
    if (lock){
        //加鎖成功,執行業務
        Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();
        //刪除鎖前先進行獲取,判斷是否是本身的鎖編號uuid,是的話再刪除
        String lockValue = stringRedisTemplate.opsForValue().get("lock");
        if (uuid.equals(lockValue)){
            //刪除本身的鎖
            stringRedisTemplate.delete("lock");
        }
        return dataFromDb;
    }else {
        //加鎖失敗...重試
        //休眠100ms重試
        //自旋的方式
        return getCatalogJsonFromDbWithRedisLockV1();
    }
}
複製代碼

問題:

若是正好判斷是當前值,正要刪除鎖的時候,鎖已通過期,別人已經設置到了新的值。那麼咱們刪除的是別人的鎖。

也就是說,業務執行時間大於鎖的過時時間,這個時候,刪除的鎖就不是以前業務的鎖,而是後來業務的鎖。

解決:

刪除鎖必須保證原子性。使用 redis+Lua腳本完成

五、分佈式鎖演進-V5,setnx ex 原子操做+惟一鎖值+Lua腳本刪除鎖保證原子性

在這裏插入圖片描述

/** * 從數據庫獲取數據,使用redis的分佈式鎖 V5 * 保證加鎖【佔位+過時時間】和刪除鎖【判斷+刪除】的原子性。使用redis+Lua腳本完成 * 更難的事情,是鎖的自動續期 * @return */
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLockV5() {
    //一、佔分布式鎖,去redis佔鎖,對用redis命令
    //二、設置過時時間,必須和加鎖是同步的,原子的 set lock 1 EX 30 NX
    //使用uuid做爲鎖的值
    String uuid = UUID.randomUUID().toString();
    Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 30, TimeUnit.SECONDS);
    if (lock){
        log.info("獲取分佈式鎖成功....");
        Map<String, List<Catelog2Vo>> dataFromDb;
        try {
            //加鎖成功,執行業務
            dataFromDb = getDataFromDb();
        } finally {
            //刪除鎖前先進行獲取,判斷是否是本身的鎖編號uuid,是的話再刪除
            //獲取對比值+對比成刪除==原子操做 使用lua腳本解鎖
            String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
            //刪除鎖,刪除成功返回 1,刪除失敗返回 0
            Long lock1 = stringRedisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class),
                    Arrays.asList("lock"), uuid);
        }
        return dataFromDb;
    }else {
        log.info("獲取分佈式鎖失敗,等待重試....");
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //加鎖失敗...重試
        //休眠100ms重試
        //自旋的方式
        return getCatalogJsonFromDbWithRedisLockV1();
    }
}
複製代碼

存在問題:

一、更難的事情,是鎖的自動續期

二、使用前面 v1 - v5 的操做太麻煩,加鎖解鎖都須要本身完成,若是有不少鎖則須要寫不少重複的代碼

如何解決:

使用封裝好的 reids 分佈式鎖工具類,下一節來介紹

四、分佈式鎖-Redisson簡介&整合

先來援引一段文檔:

命令 SET resource-name anystring NX EX max-lock-time 是一種用 Redis 來實現鎖機制的簡單方法。

若是上述命令返回OK,那麼客戶端就能夠得到鎖(若是上述命令返回Nil,那麼客戶端能夠在一段時間以後從新嘗試),而且能夠經過DEL命令來釋放鎖。

客戶端加鎖以後,若是沒有主動釋放,會在過時時間以後自動釋放。

能夠經過以下優化使得上面的鎖系統變得更加魯棒:

  • 不要設置固定的字符串,而是設置爲隨機的大字符串,能夠稱爲token。
  • 經過腳步刪除指定鎖的key,而不是DEL命令。

上述優化方法會避免下述場景:a客戶端得到的鎖(鍵key)已經因爲過時時間到了被redis服務器刪除,可是這個時候a客戶端還去執行DEL命令。而b客戶端已經在a設置的過時時間以後從新獲取了這個一樣key的鎖,那麼a執行DEL就會釋放了b客戶端加好的鎖。

解鎖腳本的一個例子將相似於如下:

if redis.call("get",KEYS[1]) == ARGV[1]
then
    return redis.call("del",KEYS[1])
else
    return 0
end
複製代碼

注意: 上這種設計模式並不推薦用來實現redis分佈式鎖。應該參考the Redlock algorithm的實現,由於這個方法只是複雜一點,可是卻能保證更好的使用效果。

一、回顧:單Redis實例實現分佈式鎖的正確方法

在嘗試克服上述單實例設置的限制以前,讓咱們先討論一下在這種簡單狀況下實現分佈式鎖的正確作法,實際上這是一種可行的方案,儘管存在競態,結果仍然是可接受的,另外,這裏討論的單實例加鎖方法也是分佈式加鎖算法的基礎。

獲取鎖使用命令:

SET resource_name my_random_value NX PX 30000
複製代碼

這個命令僅在不存在key的時候才能被執行成功(NX選項),而且這個key有一個30秒的自動失效時間(PX屬性)。這個key的值是「my_random_value」(一個隨機值),這個值在全部的客戶端必須是惟一的,全部同一key的獲取者(競爭者)這個值都不能同樣。

value的值必須是隨機數主要是爲了更安全的釋放鎖,釋放鎖的時候使用腳本告訴Redis:只有key存在而且存儲的值和我指定的值同樣才能告訴我刪除成功。能夠經過如下Lua腳本實現:

if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end
複製代碼

使用這種方式釋放鎖能夠避免刪除別的客戶端獲取成功的鎖。舉個例子:客戶端A取得資源鎖,可是緊接着被一個其餘操做阻塞了,當客戶端A運行完畢其餘操做後要釋放鎖時,原來的鎖早已超時而且被Redis自動釋放,而且在這期間資源鎖又被客戶端B再次獲取到。若是僅使用DEL命令將key刪除,那麼這種狀況就會把客戶端B的鎖給刪除掉。使用Lua腳本就不會存在這種狀況,由於腳本僅會刪除value等於客戶端A的value的key(value至關於客戶端的一個簽名)。

這個隨機字符串應該怎麼設置?我認爲它應該是從/dev/urandom產生的一個20字節隨機數,可是我想你能夠找到比這種方法代價更小的方法,只要這個數在你的任務中是惟一的就行。例如一種安全可行的方法是使用/dev/urandom做爲RC4的種子和源產生一個僞隨機流;一種更簡單的方法是把以毫秒爲單位的unix時間和客戶端ID拼接起來,理論上不是徹底安全,可是在多數狀況下能夠知足需求.

key的失效時間,被稱做「鎖定有效期」。它不只是key自動失效時間,並且仍是一個客戶端持有鎖多長時間後能夠被另一個客戶端從新得到。

截至到目前,咱們已經有較好的方法獲取鎖和釋放鎖。基於Redis單實例,假設這個單實例老是可用,這種方法已經足夠安全。如今讓咱們擴展一下,假設Redis沒有老是可用的保障。

二、Redlock算法

在Redis的分佈式環境中,咱們假設有N個Redis master。這些節點徹底互相獨立,不存在主從複製或者其餘集羣協調機制。以前咱們已經描述了在Redis單實例下怎麼安全地獲取和釋放鎖。咱們確保將在每(N)個實例上使用此方法獲取和釋放鎖。在這個樣例中,咱們假設有5個Redis master節點,這是一個比較合理的設置,因此咱們須要在5臺機器上面或者5臺虛擬機上面運行這些實例,這樣保證他們不會同時都宕掉。

爲了取到鎖,客戶端應該執行如下操做:

  1. 獲取當前Unix時間,以毫秒爲單位。
  2. 依次嘗試從N個實例,使用相同的key和隨機值獲取鎖。在步驟2,當向Redis設置鎖時,客戶端應該設置一個網絡鏈接和響應超時時間,這個超時時間應該小於鎖的失效時間。例如你的鎖自動失效時間爲10秒,則超時時間應該在5-50毫秒之間。這樣能夠避免服務器端Redis已經掛掉的狀況下,客戶端還在死死地等待響應結果。若是服務器端沒有在規定時間內響應,客戶端應該儘快嘗試另一個Redis實例。
  3. 客戶端使用當前時間減去開始獲取鎖時間(步驟1記錄的時間)就獲得獲取鎖使用的時間。當且僅當從大多數(這裏是3個節點)的Redis節點都取到鎖,而且使用的時間小於鎖失效時間時,鎖纔算獲取成功。
  4. 若是取到了鎖,key的真正有效時間等於有效時間減去獲取鎖所使用的時間(步驟3計算的結果)。
  5. 若是由於某些緣由,獲取鎖失敗(沒有在至少N/2+1個Redis實例取到鎖或者取鎖時間已經超過了有效時間),客戶端應該在全部的Redis實例上進行解鎖(即使某些Redis實例根本就沒有加鎖成功)。

一、這個算法是異步的麼?

算法基於這樣一個假設:雖然多個進程之間沒有時鐘同步,但每一個進程都以相同的時鐘頻率前進,時間差相對於失效時間來講幾乎能夠忽略不計。這種假設和咱們的真實世界很是接近:每一個計算機都有一個本地時鐘,咱們能夠容忍多個計算機之間有較小的時鐘漂移。

從這點來講,咱們必須再次強調咱們的互相排斥規則:只有在鎖的有效時間(在步驟3計算的結果)範圍內客戶端可以作完它的工做,鎖的安全性才能獲得保證(鎖的實際有效時間一般要比設置的短,由於計算機之間有時鐘漂移的現象)。.

想要了解更多關於須要時鐘漂移間隙的類似系統, 這裏有一個很是有趣的參考: Leases: an efficient fault-tolerant mechanism for distributed file cache consistency.

二、失敗時重試

當客戶端沒法取到鎖時,應該在一個隨機延遲後重試,防止多個客戶端在同時搶奪同一資源的鎖(這樣會致使腦裂,沒有人會取到鎖)。一樣,客戶端取得大部分Redis實例鎖所花費的時間越短,腦裂出現的機率就會越低(必要的重試),因此,理想狀況一下,客戶端應該同時(併發地)向全部Redis發送SET命令。

須要強調,當客戶端從大多數Redis實例獲取鎖失敗時,應該儘快地釋放(部分)已經成功取到的鎖,這樣其餘的客戶端就沒必要非得等到鎖過完「有效時間」才能取到(然而,若是已經存在網絡分裂,客戶端已經沒法和Redis實例通訊,此時就只能等待key的自動釋放了,等於被懲罰了)。

三、釋放鎖

釋放鎖比較簡單,向全部的Redis實例發送釋放鎖命令便可,不用關心以前有沒有從Redis實例成功獲取到鎖.

三、Redisson簡介&整合

1. 概述

Redisson是一個在Redis的基礎上實現的Java駐內存數據網格(In-Memory Data Grid)。它不只提供了一系列的分佈式的Java經常使用對象,還提供了許多分佈式服務。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最簡單和最便捷的方法。Redisson的宗旨是促進使用者對Redis的關注分離(Separation of Concern),從而讓使用者可以將精力更集中地放在處理業務邏輯上。

關於Redisson項目的詳細介紹能夠在官方網站找到。

每一個Redis服務實例都能管理多達1TB的內存。

可以完美的在雲計算環境裏使用,而且支持AWS ElastiCache主備版AWS ElastiCache集羣版Azure Redis Cache阿里雲(Aliyun)的雲數據庫Redis版

如下是Redisson的結構:

若是你如今正在使用其餘的Redis的Java客戶端,那麼Redis命令和Redisson對象匹配列表 可以幫助你輕鬆的將現有代碼遷徙到Redisson框架裏來。

Redisson底層採用的是Netty 框架。支持Redis 2.8以上版本,支持Java1.6+以上版本。

二、整合

一、方式一:使用 Redisson

一、在 pom 文件中引入依賴:

<!-- 引入 redisson 依賴 -->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.12.5</version>
</dependency>
複製代碼

二、配置 redisson

使用配置類的方式配置

@Configuration
public class MyRedissonConfig {

    /** * 對全部的 Redisson 的使用都是經過 RedissonClient 對象 * @return * @throws IOException */
    @Bean(destroyMethod = "shutdown")
    RedissonClient redisson() throws IOException {
// // 默認鏈接地址 127.0.0.1:6379
// RedissonClient redisson = Redisson.create();

        // 一、建立配置
        Config config = new Config();
        // Redis url should start with redis:// or rediss:// (for SSL connection)
        config.useSingleServer().setAddress("redis://192.168.56.10:6379");

        // 二、根據 Config 建立出 RedissonClient 實例
        RedissonClient redisson = Redisson.create(config);

        return redisson;
    }
}
複製代碼

三、測試

@Autowired
RedissonClient redissonClient;

@Test
public void testRedissonClient(){
    System.out.println(redissonClient);
}
複製代碼

在這裏插入圖片描述

參考文檔:

二、方式二:使用Redisson/Spring Boot Starter

一、在項目中添加依賴項:

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.13.1</version>
</dependency>
複製代碼

二、添加配置到application.settings配置文件

# common spring boot settings

spring.redis.database=redis
spring.redis.host=192.168.56.10
spring.redis.port=6379
spring.redis.password=
spring.redis.ssl=
spring.redis.timeout=
spring.redis.cluster.nodes=
spring.redis.sentinel.master=
spring.redis.sentinel.nodes=

# Redisson settings

#path to config - redisson.yaml
spring.redis.redisson.config=classpath:redisson.yaml
複製代碼

三、經過帶有RedissonClient接口或RedisTemplate/ ReactiveRedisTemplate對象的spring bean使用Redisson

@Autowired
RedissonClient redisson;
@ResponseBody
@GetMapping("/hello")
public String hello() {
    //一、獲取一把鎖,只要鎖的名字同樣,就是同一把鎖
    RLock lock = redisson.getLock("my-lock");
    //二、加鎖
    //lock.lock(); //阻塞式等待。默認加的鎖都是30s時間
    
    // 加鎖之後10秒鐘自動解鎖
    // 無需調用unlock方法手動解鎖;在鎖時間到了之後,不會自動續期,自動解鎖時間必定要大於業務執行時間
    lock.lock(10, TimeUnit.SECONDS);
    try {
        System.out.println("加鎖成功,執行業務..." + Thread.currentThread().getId());
        Thread.sleep(30000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        System.out.println("釋放鎖..." + Thread.currentThread().getId());
        lock.unlock();
    }
    return "hello";
}
複製代碼

參考文檔:

四、分佈式鎖-Redisson-lock鎖測試

基於Redis的Redisson分佈式可重入鎖RLock Java對象實現了java.util.concurrent.locks.Lock接口。同時還提供了異步(Async)反射式(Reactive)RxJava2標準的接口。

RLock lock = redisson.getLock("anyLock");
// 最多見的使用方法
lock.lock();
複製代碼

你們都知道,若是負責儲存這個分佈式鎖的Redisson節點宕機之後,並且這個鎖正好處於鎖住的狀態時,這個鎖會出現鎖死的狀態。爲了不這種狀況的發生,Redisson內部提供了一個監控鎖的看門狗,它的做用是在Redisson實例被關閉前,不斷的延長鎖的有效期。默認狀況下,看門狗的檢查鎖的超時時間是30秒鐘,也能夠經過修改Config.lockWatchdogTimeout來另行指定。

接下來我本身來測試一下,首先實現一個簡單的測試接口:

@ResponseBody
@GetMapping("/hello")
public String hello() {
    //一、獲取一把鎖,只要鎖的名字同樣,就是同一把鎖
    RLock lock = redisson.getLock("my-lock");
    //二、加鎖
    lock.lock(); //阻塞式等待。默認加的鎖都是30s時間
    try {
        //redisson解決了兩個問題:
        //1)、鎖的自動續期,若是業務執行時間超長,運行期間自動給鎖續上新的30s,不用擔憂業務時間長,鎖自動過時被刪掉
        //2)、加鎖的業務只要運行完成,就不會給當前鎖續期,即便不手動解鎖,鎖默認在30s後自動刪除
        System.out.println("加鎖成功,執行業務..." + Thread.currentThread().getId());
        Thread.sleep(10000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        //三、解鎖 假設當前服務執行時宕機,解鎖代碼沒有運行,redisson會不會出現死鎖?
        System.out.println("釋放鎖..." + Thread.currentThread().getId());
        lock.unlock();
    }
    return "hello";
}
複製代碼

同時啓動兩個不一樣端口的相同服務,記做服務A、B。在請求A、B以後,手動關閉服務A,模擬遭遇宕機解鎖代碼沒有執行的狀況,看最後是否解鎖,服務B是否能夠得到鎖:

在這裏插入圖片描述

在這裏插入圖片描述

從上面的執行結果中,能夠看到,服務宕機,redisson依然解鎖成功。

redisson解決了兩個問題: 1)、鎖的自動續期,若是業務執行時間超長,運行期間自動給鎖續上新的30s,不用擔憂業務時間長,鎖自動過時被刪掉 2)、加鎖的業務只要運行完成,就不會給當前鎖續期,即便不手動解鎖,鎖默認在30s後自動刪除

這些都是基於看門狗實現的,寫一節來了解一下看門狗的實現原理。

參考文檔:

五、分佈式鎖-Redisson-lock看門狗原理-redisson如何解決死鎖

Redisson還經過加鎖的方法提供了leaseTime的參數來指定加鎖的時間。超過這個時間後鎖便自動解開了。

// 加鎖之後10秒鐘自動解鎖
// 無需調用unlock方法手動解鎖
lock.lock(10, TimeUnit.SECONDS);
複製代碼

下面咱們來測試一下,咱們設置10s自動解鎖,設置業務執行時間是30s:

@ResponseBody
@GetMapping("/hello")
public String hello() {
    //一、獲取一把鎖,只要鎖的名字同樣,就是同一把鎖
    RLock lock = redisson.getLock("my-lock");
    //二、加鎖
    // 加鎖之後10秒鐘自動解鎖
    lock.lock(10, TimeUnit.SECONDS);
    try {
        System.out.println("加鎖成功,執行業務..." + Thread.currentThread().getId());
        Thread.sleep(30000); //業務執行時間30s
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        System.out.println("釋放鎖..." + Thread.currentThread().getId());
        lock.unlock();
    }
    return "hello";
}
複製代碼

執行後的效果:

在這裏插入圖片描述
在這裏插入圖片描述

RLock對象徹底符合Java的Lock規範。也就是說只有擁有鎖的進程才能解鎖,其餘進程解鎖則會拋出IllegalMonitorStateException錯誤

問題:lock.lock(10, TimeUnit.SECONDS);在鎖時間到了之後,不會自動續期

  • 一、若是咱們傳遞了鎖的超時時間,就發送給redis執行腳本,進行佔鎖,默認超時就是咱們指定的時間
  • 二、若是咱們未指定鎖的超時時間,就使用 lockWatchdogTimeout = 30000L;【lockWatchdogTimeout看門狗的默認時間】。只要佔鎖成功,就會啓動一個定時任務【從新給鎖設定過時時間,新的過時時間就是看門狗的默認時間】,每隔10s都會自動續期,續成30s,續期時間的間隔是【internalLockLeaseTime(看門狗時間) / 3L】 10s 續期一次

下面來看一下源碼:

先看不設置過時時間的加鎖方法:lock()

public void lock() {
    try {
        //leaseTime:-1,在後邊的判斷會用到;TimeUnit:null;是否可中斷:false
        this.lock(-1L, (TimeUnit)null, false);
    } catch (InterruptedException var2) {
        throw new IllegalStateException();
    }
}

//看一下再點擊來看一下 lock(long leaseTime, TimeUnit unit, boolean interruptibly) 方法的實現
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
    //獲取當前線程的id
    long threadId = Thread.currentThread().getId();
    // 嘗試獲取鎖,這個方法是重點,下面進入這個方法中
    Long ttl = this.tryAcquire(leaseTime, unit, threadId);
    ... //略
}

// 查看 tryAcquire 方法
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
    // 進入 嘗試獲取異步 tryAcquireAsync 這個方法
    return (Long)this.get(this.tryAcquireAsync(leaseTime, unit, threadId));
}

//查看 嘗試獲取異步 tryAcquireAsync 方法
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
    //若是leaseTime不是-1,則進入這個邏輯,根據前面的代碼知道lock()默認leaseTime=-1,因此lock()方法不進這個邏輯,因此設置自動過時時間的方法 lock.lock(10, TimeUnit.SECONDS) 是會進入這個邏輯的
    if (leaseTime != -1L) {
        return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        //獲取一個 RFuture,和java中的Future是相似的, 設置鎖的默認過時時間this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout() 這個是設置默認鎖過時時間,也就是下面Config類中的lockWatchdogTimeout
        RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        //佔鎖成功,進行監聽
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            //沒有拋出異常說明,佔鎖成功
            if (e == null) {
                if (ttlRemaining == null) {
                    //啓動一個定時任務【從新給鎖設定過時時間,新的過時時間就是看門狗的默認時間】,每隔10s都會自動續期,續成30s,下面來看這個方法
                    this.scheduleExpirationRenewal(threadId);
                }

            }
        });
        return ttlRemainingFuture;
    }
}

// 對應上面 this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout() 中的時間
public Config() {
    ...
    this.lockWatchdogTimeout = 30000L;
	...
}

// 時間表到期續訂方法
private void scheduleExpirationRenewal(long threadId) {
    RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry();
    RedissonLock.ExpirationEntry oldEntry = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {
        entry.addThreadId(threadId);
        //進入續期方法
        this.renewExpiration();
    }
}

//續期方法
private void renewExpiration() {
    RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
    if (ee != null) {
        Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            public void run(Timeout timeout) throws Exception {
                RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());
                if (ent != null) {
                    Long threadId = ent.getFirstThreadId();
                    if (threadId != null) {
                        RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
                        future.onComplete((res, e) -> {
                            if (e != null) {
                                RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);
                            } else {
                                if (res) {
                                    RedissonLock.this.renewExpiration();
                                }

                            }
                        });
                    }
                }
            }
            // this.internalLockLeaseTime / 3L 續期時間,在RedissonLock(CommandAsyncExecutor commandExecutor, String name)方法中能夠看到internalLockLeaseTime就是 lockWatchdogTimeout看門狗的默認時間30s,因此是每隔10s續期一次,續成30s
        }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
        ee.setTimeout(task);
    }
}

public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
    ...
    this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
    ...
}
複製代碼

再來設置過時時間的加鎖方法:lock.lock(10, TimeUnit.SECONDS)

public void lock(long leaseTime, TimeUnit unit) {
    try {
        this.lock(leaseTime, unit, false);
    } catch (InterruptedException var5) {
        throw new IllegalStateException();
    }
}

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    //一樣是進入tryAcquire嘗試獲取鎖這個方法,和lock()方法同樣
    Long ttl = this.tryAcquire(leaseTime, unit, threadId);
	...
}

//嘗試獲取鎖
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
     // 進入 嘗試獲取異步 tryAcquireAsync 這個方法,和lock()方法同樣
    return (Long)this.get(this.tryAcquireAsync(leaseTime, unit, threadId));
}

// 嘗試獲取異步
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
    if (leaseTime != -1L) {
        // lock.lock(10, TimeUnit.SECONDS),進入這個邏輯
        return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e == null) {
                if (ttlRemaining == null) {
                    this.scheduleExpirationRenewal(threadId);
                }

            }
        });
        return ttlRemainingFuture;
    }
}

// 嘗試獲取異步,獲得lua腳本
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    this.internalLockLeaseTime = unit.toMillis(leaseTime);
    return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
}
複製代碼

參考文檔:

六、分佈式鎖-Redisson-讀寫鎖測試

讀寫鎖測試,加讀寫鎖能夠保證必定能讀到最新數據修改期間,寫鎖是一個排它鎖(互斥鎖),讀鎖是一個共享鎖,寫鎖沒釋放,讀寫就必須等待。

  • 讀+讀:至關於無鎖,併發讀,只會在redis中記錄好,全部當前的讀鎖。它們都會同時加鎖成功
  • 寫+讀:等待寫鎖釋放
  • 寫+寫:阻塞方式
  • 讀+寫:有讀鎖,寫也須要等待
  • 只要有寫的存在,都必須等待

實現一個write和read接口,分別用來測試寫鎖和讀鎖:

@GetMapping("/write")
@ResponseBody
public String writeValue(){
    RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");

    String s = "";
    RLock rLock = lock.writeLock();
    try {
        //一、修改數據加寫鎖,讀數據加讀鎖
        rLock.lock();
        s = UUID.randomUUID().toString();
        stringRedisTemplate.opsForValue().set("writeValue", s);
        Thread.sleep(10000);
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        rLock.unlock();
    }

    return s;
}

@GetMapping("/read")
@ResponseBody
public String readValue(){
    RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");

    String s = "";
    //獲取讀鎖
    RLock rLock = lock.readLock();
    try {
        rLock.lock();
        s = stringRedisTemplate.opsForValue().get("writeValue");
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        rLock.unlock();
    }

    return s;
}
複製代碼

測試效果:

在這裏插入圖片描述

參考文檔:

七、分佈式鎖-Redisson-閉鎖測試

redisson 的閉鎖和 java 中的 java.util.concurrent.CountDownLatch 是相似的。

測試閉鎖:

  • 一、模擬一個放假鎖門的場景
  • 二、學校一共5個班,只有等5個班都沒人了才能夠鎖學校大門
/** * 測試閉鎖:鎖門方法 */
@GetMapping("/lockdoor")
@ResponseBody
public String lockDoor() throws InterruptedException {
    RCountDownLatch door = redisson.getCountDownLatch("door");
    door.trySetCount(5);
    door.await();//等待閉鎖所有完成

    return "放假了";

}
/** * 模擬班級學生全都離開班級的方法 */
@GetMapping("/go/{id}")
@ResponseBody
public String go(@PathVariable("id") Long id) {
    RCountDownLatch door = redisson.getCountDownLatch("door");
    door.countDown();//每離開一個班就計數減一

    return id + "班的人都走了...";
}
複製代碼

測試效果:

在這裏插入圖片描述

參考文檔:

八、分佈式鎖-Redisson-信號量測試

測試信號量:相似於 java 中的java.util.concurrent.Semaphore

模擬車庫停車:3個車位,同時只能有3輛車停,只有有車位了才能停車

/** * 車庫停車 * @return * @throws InterruptedException */
@GetMapping("/park")
@ResponseBody
public String park() throws InterruptedException {
    RSemaphore park = redisson.getSemaphore("park");
    park.acquire();//阻塞式的

    return "ok";
}
/** * 車位上的車離開 */
@GetMapping("/leave")
@ResponseBody
public String leave(){
    RSemaphore park = redisson.getSemaphore("park");
    park.release();//釋放一個車位,釋放一個信號量
    return "ok";
}
複製代碼

應用:

好比信號量也能夠用做分佈式限流的場景,好比同時在線人數只容許100000人等。

參考文檔:

5、Spring Cache

一、簡介

Spring 從3.1開始定義了org.springframework.cache.Cacheorg.springframework.cache.CacheManager 接口來統一不一樣的緩存技術;並支持使用 JCache (ISR-107) 註解簡化咱們開發;

Cache 接口爲緩存的組件規範定義,包含緩存的各類操做集合;Cache 接口下 Spring 提供了各類 xxCache 的實現;如 RedisCacheEhCacheCacheConcurrentMapCache等;

每次調用須要緩存功能的方法時, Spring會檢查檢查指定參數的指定的目標方法是否已經被調用過;若是有就直接從緩存中獲取方法調用後的結果,若是沒有就調用方法並緩存結果後返回給用戶。下次調用直接從緩存中獲取。

使用 Spring 緩存抽象時咱們須要關注如下兩點:

  • 一、肯定方法須要被緩存以及他們的緩存策略

  • 二、從緩存中讀取以前緩存存儲的數據

二、基礎概念

CacheManager 管理衆多 Cache。緩存管理器是定義規則的,真正實際上處理緩存的是不一樣的緩存組件。

代碼結構圖:

在這裏插入圖片描述

代碼模塊圖:

在這裏插入圖片描述

三、SpringCache-整合

一、整合

一、引入依賴

spring-boot-starter-cache、spring-boot-starter-data-redis(使用redis做爲緩存就要引入redis的依賴)

<!-- 引入 spring-boot-starter-cache 依賴 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-cache</artifactId>
</dependency>
<!-- 引入redis -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <exclusions>
        <exclusion>
            <groupId>io.lettuce</groupId>
            <artifactId>lettuce-core</artifactId>
        </exclusion>
    </exclusions>
</dependency>
複製代碼

二、寫配置

(1)、自動配置了那些?

​ org.springframework.boot.autoconfigure.cache.CacheAutoConfiguration 緩存自動配置類 ​ org.springframework.boot.autoconfigure.cache.CacheProperties 全部在xml文件中配置的屬性都封裝在這裏 ​ org.springframework.boot.autoconfigure.cache.CacheConfigurations 獲取每一種類型的緩存 ​ org.springframework.boot.autoconfigure.cache.CacheConfigurations#getConfigurationClass 獲得對應緩存的映射 ​ org.springframework.boot.autoconfigure.cache.CacheType 一個枚舉類,封裝了各類類型的緩存 ​ org.springframework.boot.autoconfigure.cache.RedisCacheConfiguration 使用redis做爲緩存時的各類配置

`CacheAutoConfiguration 會導入 RedisCacheConfiguration`
`RedisCacheConfiguration 會自動裝配好了redis緩存管理器 RedisCacheManager`
複製代碼
(2)、咱們本身須要配置的內容?

配置使用redis做爲緩存。在application.properties、或application.ymlbootstrap.properties配置中心中配置 spring.cache.type=redis

三、測試使用緩存

@Cacheable: Triggers cache population. 觸發緩存保存 @CacheEvict: Triggers cache eviction. 觸發刪除緩存 @CachePut: Updates the cache without interfering with the method execution. 更新緩存,而不影響方法的執行 @Caching: Regroups multiple cache operations to be applied on a method. 從新組合要在一個方法上應用的多個緩存操做 @CacheConfig: Shares some common cache-related settings at class-level. 在類級別共享一些與緩存相關的常見設置

(1)、開啓緩存功能:

在啓動類 XxxApplication 上使用 @EnableCaching 註解,開啓緩存功能

(2)、在方法上使用 @Cacheable 註解

只須要在須要緩存數據的方法上使用註解就能完成緩存操做

// @Cacheable(value = {"category"},key = "'level1Categorys'")
@Cacheable(value = {"category"},key = "#root.method.name") // #root.methodName spel表達式,使用調用的方法名做爲緩存的key
@Override
public List<CategoryEntity> getLevel1Categorys() {

    long l = System.currentTimeMillis();
    List<CategoryEntity> categoryEntities = baseMapper.selectList(new QueryWrapper<CategoryEntity>().eq("cat_level", 1));
    System.out.println("DB耗時:"+(System.currentTimeMillis()-l));
    return categoryEntities;
}
複製代碼

二、註解:

對於緩存聲明,Spring的緩存抽象提供了一組Java註釋:

  • @Cacheable:觸發緩存保存。
  • @CacheEvict:觸發刪除緩存。
  • @CachePut:更新緩存,而不影響方法的執行。
  • @Caching:從新組合要在一個方法上應用的多個緩存操做。
  • @CacheConfig:在類級別共享一些與緩存相關的常見設置。

在業務中每個須要緩存的數據都要指定放到對應的那個名字的緩存中。至關於緩存的分區,通常建議按照業務類型來劃分。

(1)、@Cacheable

表明當前方法的結果須要緩存,若是緩存中有,方法不用調用。若是緩存中沒有,會調用方法,最後將方法的結果放入緩存。

@Cacheable 的默認行爲:

  • 若是緩存中有,方法不用調用
  • key默認自動生成,緩存名字::SimpleKey [](自動生成的key)
  • 緩存的value值,默認使用的是jdk的序列化機制,將序列化後的值存在redis中
  • 默認時間ttl=-1

若是咱們須要自定義屬性,該怎麼作呢?

  • 指定生成的緩存使用的key:key屬性指定,使用spel表達式 SPEL表達式:docs.spring.io/spring/docs…
  • 指定緩存的數據的存活時間:配置文件中修改ttl,spring.cache.redis.time-to-live=3600000
  • 將數據保存爲json格式
// @Cacheable(value = {"category"},key = "'level1Categorys'")
@Cacheable(value = {"category"},key = "#root.method.name") // #root.methodName spel表達式,使用調用的方法名做爲緩存的key
@Override
public List<CategoryEntity> getLevel1Categorys() {

    long l = System.currentTimeMillis();
    List<CategoryEntity> categoryEntities = baseMapper.selectList(new QueryWrapper<CategoryEntity>().eq("cat_level", 1));
    System.out.println("DB耗時:"+(System.currentTimeMillis()-l));
    return categoryEntities;
}
複製代碼

(2)、@CacheEvict

@CacheEvict註解是支持緩存一致性——失效模式的註解。@CachePut是支持緩存一致性——雙寫模式的註解。

要讓一個緩存在更新數據的時候失效,就須要使用@CacheEvict註解:

清空單個緩存: @CacheEvict(value = {"category"},key = "'getLevel1Categorys'")

/** * 級聯更全部關聯數據 * @CacheEvict:緩存一致性——失效模式 * @CachePut:緩存一致性——雙寫模式 * @param category */
@CacheEvict(value = {"category"},key = "'getLevel1Categorys'") //清空單個緩存
@Transactional // 開啓事務
@Override
public void updateCascade(CategoryEntity category) {
    // 一、先更新當前表的內容
    this.updateById(category);
    //二、更新級聯表的冗餘內容
    categoryBrandRelationService.updateCategory(category.getCatId(),category.getName());
}
複製代碼

清空多個緩存:

(1)、同時操做多個緩存:@Caching

@Caching(evict = { //清空多個緩存 @CacheEvict(value = {"category"},key = "'getLevel1Categorys'"), @CacheEvict(value = {"category"},key = "'getCatalogJson'") })

@Caching(evict = { //清空多個緩存
    @CacheEvict(value = {"category"},key = "'getLevel1Categorys'"),
    @CacheEvict(value = {"category"},key = "'getCatalogJson'")
})
@Transactional // 開啓事務
@Override
public void updateCascade(CategoryEntity category) {
    // 一、先更新當前表的內容
    this.updateById(category);
    //二、更新級聯表的冗餘內容
    categoryBrandRelationService.updateCategory(category.getCatId(),category.getName());
}
複製代碼

(2) 指定刪除某個分區下的全部數據:@CacheEvict(value = {"category"},allEntries = true)

@CacheEvict(value = {"category"},allEntries = true) //清空整個分區的緩存

@CacheEvict(value = {"category"},allEntries = true) //清空整個分區的緩存
@Transactional // 開啓事務
@Override
public void updateCascade(CategoryEntity category) {
// 一、先更新當前表的內容
this.updateById(category);
//二、更新級聯表的冗餘內容
categoryBrandRelationService.updateCategory(category.getCatId(),category.getName());
}
複製代碼

因此之後使用緩存能夠定義以下規則:

  • 一、存儲同一類型的數據,均可以指定成同一個分區分區名默認就是緩存的前綴,因此不須要須要設置緩存前綴 spring.cache.redis.key-prefix=CACHE_,這樣緩存中的鍵值就是分區名::方法名
  • 二、指定刪除某個分區下的全部數據: @CacheEvict(value = {"category"}, allEntries = true)

測試效果:

在這裏插入圖片描述

四、自定義緩存配置

自定義緩存配置,須要定義一個緩存配置類:

@EnableConfigurationProperties(CacheProperties.class) // 讓 CacheProperties 的綁定生效
@Configuration
@EnableCaching // 開啓緩存(配置在 XxxApplication 主類上也能夠)
public class MyCacheConfig {

// @Autowired
// CacheProperties cacheProperties;

    /** * 使用緩存配置類後,緩存配置文件中設置的屬性將會失效,好比: * spring.cache.redis.time-to-live=3600000 # 設置緩存存活時間,單位是ms * 因此須要另外在這裏配置從新綁定 * * 一、原來和配置文件綁定的配置類是這樣子的 * @ConfigurationProperties( * prefix = "spring.cache" * ) * public class CacheProperties { * * 二、要讓它生效: * 1)、@EnableConfigurationProperties(CacheProperties.class) 讓 CacheProperties 的綁定生效 * 2)、注入 CacheProperties 或者在配置方法中加上 CacheProperties 參數 * * @return */
    @Bean
    RedisCacheConfiguration redisCacheConfiguration(CacheProperties cacheProperties) {
        RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig();

        //設置key的序列化
        config = config.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()));
        //設置value的序列化,使用fastjson
        config = config.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericFastJsonRedisSerializer()));

        CacheProperties.Redis redisProperties = cacheProperties.getRedis();
        // 將配置文件中的全部配置都生效
        if (redisProperties.getTimeToLive() != null) {
            config = config.entryTtl(redisProperties.getTimeToLive());
        }

        if (redisProperties.getKeyPrefix() != null) {
            config = config.prefixKeysWith(redisProperties.getKeyPrefix());
        }

        if (!redisProperties.isCacheNullValues()) {
            config = config.disableCachingNullValues();
        }

        if (!redisProperties.isUseKeyPrefix()) {
            config = config.disableKeyPrefix();
        }

        return config;
    }
}
複製代碼

其餘的緩存屬性配置:

# 設置緩存存活時間,單位是ms
spring.cache.redis.time-to-live=3600000
# 設置緩存的前綴,若是指定了前綴就使用咱們指定的前綴,不然就默認使用緩存的名字做爲前綴
spring.cache.redis.key-prefix=CACHE_
# 設置是否啓用緩存前綴,默認是true
spring.cache.redis.use-key-prefix=true
# 是否緩存空值,設置爲true能夠防止緩存穿透
spring.cache.redis.cache-null-values=true
複製代碼

自定義測試效果:

在這裏插入圖片描述

五、緩存穿透問題解決

在配置文件中配置容許緩存空值,解決緩存穿透問題

# 是否緩存空值,設置爲true能夠防止緩存穿透
spring.cache.redis.cache-null-values=true
複製代碼

六、Spring-Cache的不足

原理:

CacheManagerRedisCacheManager) --建立--> CacheRedisCache)--> Cache負責緩存的讀寫操做

不足:

(1)、讀模式

  • 緩存穿透:查詢一個null數據。解決:緩存空數據,添加配置 spring.cache.redis.cache-null-values=true
  • 緩存擊穿:大量併發進來同時查詢一個正好過時的數據。解決:加鎖。可是Spring-Cache默認put時是不加鎖的,因此沒有辦法解決這個問題。可是能夠設置 sync = true @Cacheable(value = xxx, key = xxx, sync = true),在查緩存的時候調用使用了同步的get方法org.springframework.data.redis.cache.RedisCache#get(java.lang.Object, java.util.concurrent.Callable) 獲取到獲取到空數據時在put中放一份空的數據。
  • 緩存雪崩:大量的key同時過時。解決:加隨機時間。加上過時時間:spring.cache.redis.time-to-live=3600000

(2)、寫模式(緩存與數據庫一致)

  • 1)讀寫加鎖:使用讀多寫少場景
  • 2)`引入Canal:感知到MySQL的更新就去更新緩存
  • 3)讀多寫多:直接去數據庫查詢

總結:

  • 常規數據(讀多寫少、即時性、一致性要求不高的數據):徹底可使用Spring-Cache;寫模式:只要緩存設置了過時時間就足夠了

  • 特殊數據:特殊設計


參考:

  1. www.redis.cn/topics/dist…
  2. github.com/redisson/re…
  3. docs.spring.io/spring/docs…
相關文章
相關標籤/搜索