@[toc]html
爲了系統性能的提高,咱們通常都會將部分數據放入緩存中,加速訪問。而db承擔數據落盤工做。java
哪些教據適合放入緩存?node
即時性、數據一致性要求不高的git
訪問量大且更新頻率不高的數據(讀多,寫少)github
舉例:電商類應用,商品分類,商品列表等適合緩存並加一個失效時間(根據數據更新頻率來定),後臺若是發佈一個商品,買家須要5分鐘才能看到新的商品通常仍是能夠接受的。redis
僞代碼邏輯:算法
data = cache.load(id);//從緩存加載數據
if(data == null){
data = db.loadid);//從數據庫加載數據
cache.put(id,data);//保存到cache中
}
retum data;
複製代碼
注意:在開發中,凡是放入緩存中的數據都應該指定過時時間,使其能夠在系統即便沒有主動更新數據也能自動觸發數據加載進緩存的流程。避免業務崩潰致使的數據永久不一致題。
spring
本地緩存:
適合單體應用 shell
分佈式緩存-本地模式在分佈式下的問題:
緩存一致性問題、拓展性問題、高可用問題 數據庫
分佈式緩存:
能夠解決前面兩個的不足,目前最常使用的是redis
須要建立一個 Spring Boot 項目來整合 Redis。若是尚未安裝 Redis,那麼 Redis 的安裝能夠參考:在CentOS中安裝和使用Docker 這篇內容。或者使用 Windows 版本的 Redis 也是能夠的。
pom
文件在 SpringBoot
項目的 pom
文件中引入 redis
依賴,能夠不用寫版本號,使用SpringBoot
的默認配置項:
<!-- 引入redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
複製代碼
引入依賴以後,項目中就會有 RedisAutoConfiguration.java
自動配置類,能夠進行 redis 的自動配置。 RedisAutoConfiguration.java
將 redis 的全部配置屬性都放在 RedisProperties.java
類中。
application.yml
文件:spring:
redis:
host: 192.168.56.10 # redis地址
port: 6379 # 端口號,默認爲6379.相同的話也能夠不配
複製代碼
在 RedisAutoConfiguration.java
類中,已經爲咱們提供了RedisTemplate<Object, Object>
和 StringRedisTemplate
兩個類,來封裝 redis 的操做,下面來使用 StringRedisTemplate
測試一下。
在測試類中添加下面的代碼:
@Autowired
StringRedisTemplate stringRedisTemplate;
@Test
public void testStringRedisTemplate(){
// 操做字符串
ValueOperations<String, String> ops = stringRedisTemplate.opsForValue();
// 保存
ops.set("hello", "world_"+ UUID.randomUUID().toString());
// 查詢
String hello = ops.get("hello");
System.out.println("獲取以前保存的數據:"+hello);
}
複製代碼
測試輸出結果:
獲取以前保存的數據:world_90bf25e1-2e84-4f50-b6e2-5eaab32b4175
還能夠經過安裝 redis 可視化工具 RedisDesktopManager 來查看以前保存的數據:
緩存穿透
必定不存在
的數據,因爲緩存是不命中,將去查詢數據庫,可是數據庫也無此記錄,咱們沒有將此次查詢的null寫入緩存,這將致使這個不存在的數據每次請求都要到存儲層去查詢,失去了緩存的意義
風險: 利用不存在的數據進行攻擊,數據庫瞬時壓力增大,最終致使崩潰
解決: null結果緩存,並加入短暫過時時間
緩存雪崩
緩存雪崩: 緩存雪崩是指在咱們設置緩存時key採用了相同的過時時間,致使緩存在某一時刻同時失效,請求所有轉發到DB,DB瞬時 壓力太重雪崩。
解決: 原有的失效時間基礎上增長一個隨機值,好比1-5分鐘隨機,這樣每個緩存的過時時間的重複率就會下降,就很難引起集體失效的事件。
緩存擊穿
緩存擊穿: • 對於一些設置了過時時間的key,若是這些key可能會在某些時間點被超高併發地訪問,是一種很是「熱點」的數據。 • 若是這個key在大量請求同時進來前正好失效,那麼全部對這個key的數據查詢都落到db,咱們稱爲緩存擊穿。
解決: 加鎖,大量併發只讓一個去查,其餘人等待,查到之後釋放鎖,其餘人獲取到鎖,先查緩存,就會有數據,不用去db
先來看一個本地鎖的例子:咱們有一個商品服務,每個服務都部署在一個獨立的tomcat中,每個服務中都使用一個鎖。假設目前有8個服務,則須要加8把鎖,且這8把鎖相互獨立。
本地鎖,只能鎖住當前進程,因此咱們須要分佈式鎖
在加鎖的時候,須要將設置查數據庫和設置緩存這一步同時放入加鎖的方法中,鬥則會出現屢次查詢數據庫的狀況,這是因爲第一次查詢數據庫的時候,數據尚未放入緩存,而設置緩存也是須要時間的,在設置緩存的這段時間內,緩存中尚未數據,就有可能因爲併發較高,致使屢次查詢數據庫,沒有命中緩存,因此就須要就將設置緩存放到加鎖查數據庫的邏輯裏。
因爲本地鎖只能鎖住當前進程,若是咱們在進行秒殺活動或者說搶優惠券活動的時候,若是隻剩了1件商品或者1張優惠券,若是使用的是本地鎖,同時多個服務一塊請求獲取數據,就有可能產生「超賣」的現象,爲了不這種狀況的發生,咱們就須要使用分佈式鎖。
咱們能夠同時去一個地方「佔坑(加鎖)」,若是佔到,就執行邏輯。不然就必須等待,直到釋放鎖。 「佔坑(加鎖)」能夠去 redis,也能夠去數據庫,能夠去任何服務都能訪問的地。 若是沒有獲取到鎖,則能夠能夠以自旋的方式進行等待。
/** * 從數據庫獲取數據,使用redis的分佈式鎖 V1 * 問題: * 一、setnx佔好了位,業務代碼異常或者程序在頁面過程 * 中宕機。沒有執行刪除鎖邏輯,這就形成了死鎖 * 解決: * 設置鎖的自動過時,即便沒有刪除,會自動刪除 * @see CategoryServiceImpl#getCatalogJsonFromDbWithRedisLockV2() * * @return */
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLockV1() {
//一、佔分布式鎖,去redis佔鎖,對用redis命令 set lock 1 NX
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1");
if (lock){
//加鎖成功,執行業務
Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();
//刪除鎖
stringRedisTemplate.delete("lock");
return dataFromDb;
}else {
//加鎖失敗...重試
//休眠100ms重試
//自旋的方式
return getCatalogJsonFromDbWithRedisLockV1();
}
}
複製代碼
存在問題:
setnx佔好了位,業務代碼異常或者程序在執行過程當中宕機。沒有執行刪除鎖邏輯,這就形成了
死鎖
如何解決:
設置鎖的自動過時,即便沒有刪除,會自動刪除
設置鎖過時時間
/** * 從數據庫獲取數據,使用redis的分佈式鎖 V2 * 問題: * 一、setnx設置好,正要去設置過時時間,宕機。又死鎖了。 * 解決: * 設置過時時間和佔位必須是原子的。redis支持使用 setnx ex 命令 (set lock 1 EX 30 NX 加鎖和設置過時時間在一個語句中完成,設置30秒過時) * @see CategoryServiceImpl#getCatalogJsonFromDbWithRedisLockV3() * @return */
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLockV2() {
//一、佔分布式鎖,去redis佔鎖,對用redis命令 set lock 1 NX
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1");
if (lock){
//加鎖成功,執行業務
//二、設置過時時間
stringRedisTemplate.expire("lock",30, TimeUnit.SECONDS);
Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();
//刪除鎖
stringRedisTemplate.delete("lock");
return dataFromDb;
}else {
//加鎖失敗...重試
//休眠100ms重試
//自旋的方式
return getCatalogJsonFromDbWithRedisLockV1();
}
}
複製代碼
問題:
setnx設置好,正要去設置過時時間,結果忽然斷電,服務宕機。又死鎖了。
解決:
設置過時時間和佔位必須是原子的
。redis支持使用setnx ex
命令
/** * 從數據庫獲取數據,使用redis的分佈式鎖 V3 * 問題: * 一、刪除鎖直接刪除的問題? * 若是因爲業務時間很長,鎖本身過時了,咱們直接刪除,有可能把別人正在持有的鎖刪除了。 * 解決: * 佔鎖的時候,值指定爲uuid,每一個人匹配是本身的鎖才刪除。 * @see CategoryServiceImpl#getCatalogJsonFromDbWithRedisLockV4() * @return */
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLockV3() {
//一、佔分布式鎖,去redis佔鎖,對用redis命令
//二、設置過時時間,必須和加鎖是同步的,原子的 set lock 1 EX 30 NX
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1", 30, TimeUnit.SECONDS);
if (lock){
//加鎖成功,執行業務
Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();
//刪除鎖
stringRedisTemplate.delete("lock");
return dataFromDb;
}else {
//加鎖失敗...重試
//休眠100ms重試
//自旋的方式
return getCatalogJsonFromDbWithRedisLockV1();
}
}
複製代碼
問題:
刪除鎖直接刪除的問題?
若是因爲業務時間很長,鎖本身過時了,咱們直接刪除,有可能把別人正在持有的鎖刪除了。
解決:
佔鎖的時候,值指定爲
uuid
,每一個人匹配是本身的鎖才刪除。
/** * 從數據庫獲取數據,使用redis的分佈式鎖 V4 * 問題: * 一、若是正好判斷是當前值,正要刪除鎖的時候,鎖已通過期,別人已經設置到了新的值。那麼咱們刪除的是別人的鎖 * 解決: * 刪除鎖必須保證原子性。使用redis+Lua腳本完成 * @see CategoryServiceImpl#getCatalogJsonFromDbWithRedisLockV5() * @return */
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLockV4() {
//一、佔分布式鎖,去redis佔鎖,對用redis命令
//二、設置過時時間,必須和加鎖是同步的,原子的 set lock 1 EX 30 NX
//使用uuid做爲鎖的值
String uuid = UUID.randomUUID().toString();
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 30, TimeUnit.SECONDS);
if (lock){
//加鎖成功,執行業務
Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();
//刪除鎖前先進行獲取,判斷是否是本身的鎖編號uuid,是的話再刪除
String lockValue = stringRedisTemplate.opsForValue().get("lock");
if (uuid.equals(lockValue)){
//刪除本身的鎖
stringRedisTemplate.delete("lock");
}
return dataFromDb;
}else {
//加鎖失敗...重試
//休眠100ms重試
//自旋的方式
return getCatalogJsonFromDbWithRedisLockV1();
}
}
複製代碼
問題:
若是正好判斷是當前值,正要刪除鎖的時候,鎖已通過期,別人已經設置到了新的值。那麼咱們刪除的是別人的鎖。
也就是說,業務執行時間大於鎖的過時時間,這個時候,刪除的鎖就不是以前業務的鎖,而是後來業務的鎖。
解決:
刪除鎖必須保證原子性
。使用redis+Lua腳本
完成
/** * 從數據庫獲取數據,使用redis的分佈式鎖 V5 * 保證加鎖【佔位+過時時間】和刪除鎖【判斷+刪除】的原子性。使用redis+Lua腳本完成 * 更難的事情,是鎖的自動續期 * @return */
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLockV5() {
//一、佔分布式鎖,去redis佔鎖,對用redis命令
//二、設置過時時間,必須和加鎖是同步的,原子的 set lock 1 EX 30 NX
//使用uuid做爲鎖的值
String uuid = UUID.randomUUID().toString();
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 30, TimeUnit.SECONDS);
if (lock){
log.info("獲取分佈式鎖成功....");
Map<String, List<Catelog2Vo>> dataFromDb;
try {
//加鎖成功,執行業務
dataFromDb = getDataFromDb();
} finally {
//刪除鎖前先進行獲取,判斷是否是本身的鎖編號uuid,是的話再刪除
//獲取對比值+對比成刪除==原子操做 使用lua腳本解鎖
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
//刪除鎖,刪除成功返回 1,刪除失敗返回 0
Long lock1 = stringRedisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class),
Arrays.asList("lock"), uuid);
}
return dataFromDb;
}else {
log.info("獲取分佈式鎖失敗,等待重試....");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
//加鎖失敗...重試
//休眠100ms重試
//自旋的方式
return getCatalogJsonFromDbWithRedisLockV1();
}
}
複製代碼
存在問題:
一、更難的事情,是鎖的自動續期
二、使用前面 v1 - v5 的操做太麻煩,加鎖解鎖都須要本身完成,若是有不少鎖則須要寫不少重複的代碼
如何解決:
使用封裝好的 reids 分佈式鎖工具類,下一節來介紹
先來援引一段文檔:
命令
SET resource-name anystring NX EX max-lock-time
是一種用 Redis 來實現鎖機制的簡單方法。若是上述命令返回
OK
,那麼客戶端就能夠得到鎖(若是上述命令返回Nil,那麼客戶端能夠在一段時間以後從新嘗試),而且能夠經過DEL命令來釋放鎖。客戶端加鎖以後,若是沒有主動釋放,會在過時時間以後自動釋放。
能夠經過以下優化使得上面的鎖系統變得更加魯棒:
- 不要設置固定的字符串,而是設置爲隨機的大字符串,能夠稱爲token。
- 經過腳步刪除指定鎖的key,而不是DEL命令。
上述優化方法會避免下述場景:a客戶端得到的鎖(鍵key)已經因爲過時時間到了被redis服務器刪除,可是這個時候a客戶端還去執行DEL命令。而b客戶端已經在a設置的過時時間以後從新獲取了這個一樣key的鎖,那麼a執行DEL就會釋放了b客戶端加好的鎖。
解鎖腳本的一個例子將相似於如下:
if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end 複製代碼
注意: 上這種設計模式並不推薦用來實現redis分佈式鎖。應該參考the Redlock algorithm的實現,由於這個方法只是複雜一點,可是卻能保證更好的使用效果。
在嘗試克服上述單實例設置的限制以前,讓咱們先討論一下在這種簡單狀況下實現分佈式鎖的正確作法,實際上這是一種可行的方案,儘管存在競態,結果仍然是可接受的,另外,這裏討論的單實例加鎖方法也是分佈式加鎖算法的基礎。
獲取鎖使用命令:
SET resource_name my_random_value NX PX 30000
複製代碼
這個命令僅在不存在key的時候才能被執行成功(NX選項),而且這個key有一個30秒的自動失效時間(PX屬性)。這個key的值是「my_random_value」(一個隨機值),這個值在全部的客戶端必須是惟一的,全部同一key的獲取者(競爭者)這個值都不能同樣。
value的值必須是隨機數主要是爲了更安全的釋放鎖,釋放鎖的時候使用腳本告訴Redis:只有key存在而且存儲的值和我指定的值同樣才能告訴我刪除成功。能夠經過如下Lua腳本實現:
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
複製代碼
使用這種方式釋放鎖能夠避免刪除別的客戶端獲取成功的鎖。舉個例子:客戶端A取得資源鎖,可是緊接着被一個其餘操做阻塞了,當客戶端A運行完畢其餘操做後要釋放鎖時,原來的鎖早已超時而且被Redis自動釋放,而且在這期間資源鎖又被客戶端B再次獲取到。若是僅使用DEL命令將key刪除,那麼這種狀況就會把客戶端B的鎖給刪除掉。使用Lua腳本就不會存在這種狀況,由於腳本僅會刪除value等於客戶端A的value的key(value至關於客戶端的一個簽名)。
這個隨機字符串應該怎麼設置?我認爲它應該是從/dev/urandom產生的一個20字節隨機數,可是我想你能夠找到比這種方法代價更小的方法,只要這個數在你的任務中是惟一的就行。例如一種安全可行的方法是使用/dev/urandom做爲RC4的種子和源產生一個僞隨機流;一種更簡單的方法是把以毫秒爲單位的unix時間和客戶端ID拼接起來,理論上不是徹底安全,可是在多數狀況下能夠知足需求.
key的失效時間,被稱做「鎖定有效期」。它不只是key自動失效時間,並且仍是一個客戶端持有鎖多長時間後能夠被另一個客戶端從新得到。
截至到目前,咱們已經有較好的方法獲取鎖和釋放鎖。基於Redis單實例,假設這個單實例老是可用,這種方法已經足夠安全。如今讓咱們擴展一下,假設Redis沒有老是可用的保障。
在Redis的分佈式環境中,咱們假設有N個Redis master。這些節點徹底互相獨立,不存在主從複製或者其餘集羣協調機制。以前咱們已經描述了在Redis單實例下怎麼安全地獲取和釋放鎖。咱們確保將在每(N)個實例上使用此方法獲取和釋放鎖。在這個樣例中,咱們假設有5個Redis master節點,這是一個比較合理的設置,因此咱們須要在5臺機器上面或者5臺虛擬機上面運行這些實例,這樣保證他們不會同時都宕掉。
爲了取到鎖,客戶端應該執行如下操做:
算法基於這樣一個假設:雖然多個進程之間沒有時鐘同步,但每一個進程都以相同的時鐘頻率前進,時間差相對於失效時間來講幾乎能夠忽略不計。這種假設和咱們的真實世界很是接近:每一個計算機都有一個本地時鐘,咱們能夠容忍多個計算機之間有較小的時鐘漂移。
從這點來講,咱們必須再次強調咱們的互相排斥規則:只有在鎖的有效時間(在步驟3計算的結果)範圍內客戶端可以作完它的工做,鎖的安全性才能獲得保證(鎖的實際有效時間一般要比設置的短,由於計算機之間有時鐘漂移的現象)。.
想要了解更多關於須要時鐘漂移間隙的類似系統, 這裏有一個很是有趣的參考: Leases: an efficient fault-tolerant mechanism for distributed file cache consistency.
當客戶端沒法取到鎖時,應該在一個隨機延遲後重試,防止多個客戶端在同時搶奪同一資源的鎖(這樣會致使腦裂,沒有人會取到鎖)。一樣,客戶端取得大部分Redis實例鎖所花費的時間越短,腦裂出現的機率就會越低(必要的重試),因此,理想狀況一下,客戶端應該同時(併發地)向全部Redis發送SET命令。
須要強調,當客戶端從大多數Redis實例獲取鎖失敗時,應該儘快地釋放(部分)已經成功取到的鎖,這樣其餘的客戶端就沒必要非得等到鎖過完「有效時間」才能取到(然而,若是已經存在網絡分裂,客戶端已經沒法和Redis實例通訊,此時就只能等待key的自動釋放了,等於被懲罰了)。
釋放鎖比較簡單,向全部的Redis實例發送釋放鎖命令便可,不用關心以前有沒有從Redis實例成功獲取到鎖.
Redisson是一個在Redis的基礎上實現的Java駐內存數據網格(In-Memory Data Grid)。它不只提供了一系列的分佈式的Java經常使用對象,還提供了許多分佈式服務。其中包括(BitSet
, Set
, Multimap
, SortedSet
, Map
, List
, Queue
, BlockingQueue
, Deque
, BlockingDeque
, Semaphore
, Lock
, AtomicLong
, CountDownLatch
, Publish / Subscribe
, Bloom filter
, Remote service
, Spring cache
, Executor service
, Live Object service
, Scheduler service
) Redisson提供了使用Redis的最簡單和最便捷的方法。Redisson的宗旨是促進使用者對Redis的關注分離(Separation of Concern),從而讓使用者可以將精力更集中地放在處理業務邏輯上。
關於Redisson項目的詳細介紹能夠在官方網站找到。
每一個Redis服務實例都能管理多達1TB的內存。
可以完美的在雲計算環境裏使用,而且支持AWS ElastiCache主備版,AWS ElastiCache集羣版,Azure Redis Cache和阿里雲(Aliyun)的雲數據庫Redis版
如下是Redisson的結構:
若是你如今正在使用其餘的Redis的Java客戶端,那麼Redis命令和Redisson對象匹配列表 可以幫助你輕鬆的將現有代碼遷徙到Redisson框架裏來。
Redisson底層採用的是Netty 框架。支持Redis 2.8以上版本,支持Java1.6+以上版本。
一、在 pom 文件中引入依賴:
<!-- 引入 redisson 依賴 -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.12.5</version>
</dependency>
複製代碼
二、配置 redisson
使用配置類的方式配置
@Configuration
public class MyRedissonConfig {
/** * 對全部的 Redisson 的使用都是經過 RedissonClient 對象 * @return * @throws IOException */
@Bean(destroyMethod = "shutdown")
RedissonClient redisson() throws IOException {
// // 默認鏈接地址 127.0.0.1:6379
// RedissonClient redisson = Redisson.create();
// 一、建立配置
Config config = new Config();
// Redis url should start with redis:// or rediss:// (for SSL connection)
config.useSingleServer().setAddress("redis://192.168.56.10:6379");
// 二、根據 Config 建立出 RedissonClient 實例
RedissonClient redisson = Redisson.create(config);
return redisson;
}
}
複製代碼
三、測試
@Autowired
RedissonClient redissonClient;
@Test
public void testRedissonClient(){
System.out.println(redissonClient);
}
複製代碼
參考文檔:
一、在項目中添加依賴項:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.13.1</version>
</dependency>
複製代碼
二、添加配置到application.settings
配置文件
# common spring boot settings
spring.redis.database=redis
spring.redis.host=192.168.56.10
spring.redis.port=6379
spring.redis.password=
spring.redis.ssl=
spring.redis.timeout=
spring.redis.cluster.nodes=
spring.redis.sentinel.master=
spring.redis.sentinel.nodes=
# Redisson settings
#path to config - redisson.yaml
spring.redis.redisson.config=classpath:redisson.yaml
複製代碼
三、經過帶有RedissonClient
接口或RedisTemplate
/ ReactiveRedisTemplate
對象的spring bean使用Redisson
@Autowired
RedissonClient redisson;
@ResponseBody
@GetMapping("/hello")
public String hello() {
//一、獲取一把鎖,只要鎖的名字同樣,就是同一把鎖
RLock lock = redisson.getLock("my-lock");
//二、加鎖
//lock.lock(); //阻塞式等待。默認加的鎖都是30s時間
// 加鎖之後10秒鐘自動解鎖
// 無需調用unlock方法手動解鎖;在鎖時間到了之後,不會自動續期,自動解鎖時間必定要大於業務執行時間
lock.lock(10, TimeUnit.SECONDS);
try {
System.out.println("加鎖成功,執行業務..." + Thread.currentThread().getId());
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("釋放鎖..." + Thread.currentThread().getId());
lock.unlock();
}
return "hello";
}
複製代碼
參考文檔:
基於Redis的Redisson分佈式可重入鎖RLock
Java對象實現了java.util.concurrent.locks.Lock
接口。同時還提供了異步(Async)、反射式(Reactive)和RxJava2標準的接口。
RLock lock = redisson.getLock("anyLock");
// 最多見的使用方法
lock.lock();
複製代碼
你們都知道,若是負責儲存這個分佈式鎖的Redisson節點宕機之後,並且這個鎖正好處於鎖住的狀態時,這個鎖會出現鎖死的狀態。爲了不這種狀況的發生,Redisson內部提供了一個監控鎖的看門狗,它的做用是在Redisson實例被關閉前,不斷的延長鎖的有效期
。默認狀況下,看門狗的檢查鎖的超時時間是30秒鐘
,也能夠經過修改Config.lockWatchdogTimeout來另行指定。
接下來我本身來測試一下,首先實現一個簡單的測試接口:
@ResponseBody
@GetMapping("/hello")
public String hello() {
//一、獲取一把鎖,只要鎖的名字同樣,就是同一把鎖
RLock lock = redisson.getLock("my-lock");
//二、加鎖
lock.lock(); //阻塞式等待。默認加的鎖都是30s時間
try {
//redisson解決了兩個問題:
//1)、鎖的自動續期,若是業務執行時間超長,運行期間自動給鎖續上新的30s,不用擔憂業務時間長,鎖自動過時被刪掉
//2)、加鎖的業務只要運行完成,就不會給當前鎖續期,即便不手動解鎖,鎖默認在30s後自動刪除
System.out.println("加鎖成功,執行業務..." + Thread.currentThread().getId());
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//三、解鎖 假設當前服務執行時宕機,解鎖代碼沒有運行,redisson會不會出現死鎖?
System.out.println("釋放鎖..." + Thread.currentThread().getId());
lock.unlock();
}
return "hello";
}
複製代碼
同時啓動兩個不一樣端口的相同服務,記做服務A、B。在請求A、B以後,手動關閉服務A,模擬遭遇宕機解鎖代碼沒有執行的狀況,看最後是否解鎖,服務B是否能夠得到鎖:
從上面的執行結果中,能夠看到,服務宕機,redisson依然解鎖成功。
redisson解決了兩個問題:
1)、鎖的自動續期,若是業務執行時間超長,運行期間自動給鎖續上新的30s,不用擔憂業務時間長,鎖自動過時被刪掉
2)、加鎖的業務只要運行完成,就不會給當前鎖續期,即便不手動解鎖,鎖默認在30s後自動刪除
這些都是基於看門狗實現的,寫一節來了解一下看門狗的實現原理。
參考文檔:
Redisson還經過加鎖的方法提供了leaseTime
的參數來指定加鎖的時間。超過這個時間後鎖便自動解開了。
// 加鎖之後10秒鐘自動解鎖
// 無需調用unlock方法手動解鎖
lock.lock(10, TimeUnit.SECONDS);
複製代碼
下面咱們來測試一下,咱們設置10s自動解鎖,設置業務執行時間是30s:
@ResponseBody
@GetMapping("/hello")
public String hello() {
//一、獲取一把鎖,只要鎖的名字同樣,就是同一把鎖
RLock lock = redisson.getLock("my-lock");
//二、加鎖
// 加鎖之後10秒鐘自動解鎖
lock.lock(10, TimeUnit.SECONDS);
try {
System.out.println("加鎖成功,執行業務..." + Thread.currentThread().getId());
Thread.sleep(30000); //業務執行時間30s
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("釋放鎖..." + Thread.currentThread().getId());
lock.unlock();
}
return "hello";
}
複製代碼
執行後的效果:
RLock
對象徹底符合Java的Lock規範。也就是說只有擁有鎖的進程才能解鎖
,其餘進程解鎖則會拋出IllegalMonitorStateException
錯誤
問題:lock.lock(10, TimeUnit.SECONDS);在鎖時間到了之後,不會自動續期
一、若是咱們傳遞了鎖的超時時間,就發送給redis執行腳本,進行佔鎖,默認超時就是咱們指定的時間
二、若是咱們未指定鎖的超時時間,就使用 lockWatchdogTimeout = 30000L;【lockWatchdogTimeout看門狗的默認時間】。只要佔鎖成功,就會啓動一個定時任務【從新給鎖設定過時時間,新的過時時間就是看門狗的默認時間】,每隔10s都會自動續期,續成30s,續期時間的間隔是【internalLockLeaseTime(看門狗時間) / 3L】 10s 續期一次
下面來看一下源碼:
先看不設置過時時間的加鎖方法:lock()
public void lock() {
try {
//leaseTime:-1,在後邊的判斷會用到;TimeUnit:null;是否可中斷:false
this.lock(-1L, (TimeUnit)null, false);
} catch (InterruptedException var2) {
throw new IllegalStateException();
}
}
//看一下再點擊來看一下 lock(long leaseTime, TimeUnit unit, boolean interruptibly) 方法的實現
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
//獲取當前線程的id
long threadId = Thread.currentThread().getId();
// 嘗試獲取鎖,這個方法是重點,下面進入這個方法中
Long ttl = this.tryAcquire(leaseTime, unit, threadId);
... //略
}
// 查看 tryAcquire 方法
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
// 進入 嘗試獲取異步 tryAcquireAsync 這個方法
return (Long)this.get(this.tryAcquireAsync(leaseTime, unit, threadId));
}
//查看 嘗試獲取異步 tryAcquireAsync 方法
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
//若是leaseTime不是-1,則進入這個邏輯,根據前面的代碼知道lock()默認leaseTime=-1,因此lock()方法不進這個邏輯,因此設置自動過時時間的方法 lock.lock(10, TimeUnit.SECONDS) 是會進入這個邏輯的
if (leaseTime != -1L) {
return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
//獲取一個 RFuture,和java中的Future是相似的, 設置鎖的默認過時時間this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout() 這個是設置默認鎖過時時間,也就是下面Config類中的lockWatchdogTimeout
RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
//佔鎖成功,進行監聽
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
//沒有拋出異常說明,佔鎖成功
if (e == null) {
if (ttlRemaining == null) {
//啓動一個定時任務【從新給鎖設定過時時間,新的過時時間就是看門狗的默認時間】,每隔10s都會自動續期,續成30s,下面來看這個方法
this.scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
}
// 對應上面 this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout() 中的時間
public Config() {
...
this.lockWatchdogTimeout = 30000L;
...
}
// 時間表到期續訂方法
private void scheduleExpirationRenewal(long threadId) {
RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry();
RedissonLock.ExpirationEntry oldEntry = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
//進入續期方法
this.renewExpiration();
}
}
//續期方法
private void renewExpiration() {
RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
if (ee != null) {
Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());
if (ent != null) {
Long threadId = ent.getFirstThreadId();
if (threadId != null) {
RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);
} else {
if (res) {
RedissonLock.this.renewExpiration();
}
}
});
}
}
}
// this.internalLockLeaseTime / 3L 續期時間,在RedissonLock(CommandAsyncExecutor commandExecutor, String name)方法中能夠看到internalLockLeaseTime就是 lockWatchdogTimeout看門狗的默認時間30s,因此是每隔10s續期一次,續成30s
}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
}
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
...
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
...
}
複製代碼
再來設置過時時間的加鎖方法:lock.lock(10, TimeUnit.SECONDS)
public void lock(long leaseTime, TimeUnit unit) {
try {
this.lock(leaseTime, unit, false);
} catch (InterruptedException var5) {
throw new IllegalStateException();
}
}
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
//一樣是進入tryAcquire嘗試獲取鎖這個方法,和lock()方法同樣
Long ttl = this.tryAcquire(leaseTime, unit, threadId);
...
}
//嘗試獲取鎖
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
// 進入 嘗試獲取異步 tryAcquireAsync 這個方法,和lock()方法同樣
return (Long)this.get(this.tryAcquireAsync(leaseTime, unit, threadId));
}
// 嘗試獲取異步
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1L) {
// lock.lock(10, TimeUnit.SECONDS),進入這個邏輯
return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e == null) {
if (ttlRemaining == null) {
this.scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
}
// 嘗試獲取異步,獲得lua腳本
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
this.internalLockLeaseTime = unit.toMillis(leaseTime);
return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
}
複製代碼
參考文檔:
讀寫鎖測試,加讀寫鎖能夠保證必定能讀到最新數據修改期間,寫鎖是一個排它鎖(互斥鎖),讀鎖是一個共享鎖,寫鎖沒釋放,讀寫就必須等待。
實現一個write和read接口,分別用來測試寫鎖和讀鎖:
@GetMapping("/write")
@ResponseBody
public String writeValue(){
RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");
String s = "";
RLock rLock = lock.writeLock();
try {
//一、修改數據加寫鎖,讀數據加讀鎖
rLock.lock();
s = UUID.randomUUID().toString();
stringRedisTemplate.opsForValue().set("writeValue", s);
Thread.sleep(10000);
} catch (Exception e) {
e.printStackTrace();
} finally {
rLock.unlock();
}
return s;
}
@GetMapping("/read")
@ResponseBody
public String readValue(){
RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");
String s = "";
//獲取讀鎖
RLock rLock = lock.readLock();
try {
rLock.lock();
s = stringRedisTemplate.opsForValue().get("writeValue");
} catch (Exception e) {
e.printStackTrace();
} finally {
rLock.unlock();
}
return s;
}
複製代碼
測試效果:
參考文檔:
redisson 的閉鎖和 java 中的 java.util.concurrent.CountDownLatch
是相似的。
測試閉鎖:
/** * 測試閉鎖:鎖門方法 */
@GetMapping("/lockdoor")
@ResponseBody
public String lockDoor() throws InterruptedException {
RCountDownLatch door = redisson.getCountDownLatch("door");
door.trySetCount(5);
door.await();//等待閉鎖所有完成
return "放假了";
}
/** * 模擬班級學生全都離開班級的方法 */
@GetMapping("/go/{id}")
@ResponseBody
public String go(@PathVariable("id") Long id) {
RCountDownLatch door = redisson.getCountDownLatch("door");
door.countDown();//每離開一個班就計數減一
return id + "班的人都走了...";
}
複製代碼
測試效果:
參考文檔:
測試信號量:相似於 java 中的java.util.concurrent.Semaphore
模擬車庫停車:3個車位,同時只能有3輛車停,只有有車位了才能停車
/** * 車庫停車 * @return * @throws InterruptedException */
@GetMapping("/park")
@ResponseBody
public String park() throws InterruptedException {
RSemaphore park = redisson.getSemaphore("park");
park.acquire();//阻塞式的
return "ok";
}
/** * 車位上的車離開 */
@GetMapping("/leave")
@ResponseBody
public String leave(){
RSemaphore park = redisson.getSemaphore("park");
park.release();//釋放一個車位,釋放一個信號量
return "ok";
}
複製代碼
應用:
好比信號量也能夠用做分佈式限流
的場景,好比同時在線人數只容許100000人等。
參考文檔:
Spring 從3.1開始定義了org.springframework.cache.Cache
和org.springframework.cache.CacheManager
接口來統一不一樣的緩存技術;並支持使用 JCache (ISR-107)
註解簡化咱們開發;
Cache 接口爲緩存的組件規範定義,包含緩存的各類操做集合;Cache 接口下 Spring 提供了各類 xxCache
的實現;如 RedisCache
, EhCacheCache
,ConcurrentMapCache
等;
每次調用須要緩存功能的方法時, Spring會檢查檢查指定參數的指定的目標方法是否已經被調用過;若是有就直接從緩存中獲取方法調用後的結果,若是沒有就調用方法並緩存結果後返回給用戶。下次調用直接從緩存中獲取。
使用 Spring 緩存抽象時咱們須要關注如下兩點:
一、肯定方法須要被緩存以及他們的緩存策略
二、從緩存中讀取以前緩存存儲的數據
CacheManager 管理衆多 Cache。緩存管理器是定義規則的,真正實際上處理緩存的是不一樣的緩存組件。
代碼結構圖:
代碼模塊圖:
spring-boot-starter-cache、spring-boot-starter-data-redis(使用redis做爲緩存就要引入redis的依賴)
<!-- 引入 spring-boot-starter-cache 依賴 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cache</artifactId>
</dependency>
<!-- 引入redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</exclusion>
</exclusions>
</dependency>
複製代碼
org.springframework.boot.autoconfigure.cache.CacheAutoConfiguration
緩存自動配置類 org.springframework.boot.autoconfigure.cache.CacheProperties
全部在xml文件中配置的屬性都封裝在這裏 org.springframework.boot.autoconfigure.cache.CacheConfigurations
獲取每一種類型的緩存 org.springframework.boot.autoconfigure.cache.CacheConfigurations
#getConfigurationClass
獲得對應緩存的映射 org.springframework.boot.autoconfigure.cache.CacheType
一個枚舉類,封裝了各類類型的緩存 org.springframework.boot.autoconfigure.cache.RedisCacheConfiguration
使用redis做爲緩存時的各類配置
`CacheAutoConfiguration 會導入 RedisCacheConfiguration`
`RedisCacheConfiguration 會自動裝配好了redis緩存管理器 RedisCacheManager`
複製代碼
配置使用redis做爲緩存。在application.properties
、或application.yml
或bootstrap.properties
或配置中心
中配置 spring.cache.type=redis
@Cacheable
: Triggers cache population. 觸發緩存保存 @CacheEvic
t: Triggers cache eviction. 觸發刪除緩存 @CachePut
: Updates the cache without interfering with the method execution. 更新緩存,而不影響方法的執行 @Caching
: Regroups multiple cache operations to be applied on a method. 從新組合要在一個方法上應用的多個緩存操做 @CacheConfig
: Shares some common cache-related settings at class-level. 在類級別共享一些與緩存相關的常見設置
在啓動類 XxxApplication
上使用 @EnableCaching
註解,開啓緩存功能
@Cacheable
註解只須要在須要緩存數據的方法上使用註解就能完成緩存操做
// @Cacheable(value = {"category"},key = "'level1Categorys'")
@Cacheable(value = {"category"},key = "#root.method.name") // #root.methodName spel表達式,使用調用的方法名做爲緩存的key
@Override
public List<CategoryEntity> getLevel1Categorys() {
long l = System.currentTimeMillis();
List<CategoryEntity> categoryEntities = baseMapper.selectList(new QueryWrapper<CategoryEntity>().eq("cat_level", 1));
System.out.println("DB耗時:"+(System.currentTimeMillis()-l));
return categoryEntities;
}
複製代碼
對於緩存聲明,Spring的緩存抽象提供了一組Java註釋:
@Cacheable
:觸發緩存保存。@CacheEvict
:觸發刪除緩存。@CachePut
:更新緩存,而不影響方法的執行。@Caching
:從新組合要在一個方法上應用的多個緩存操做。@CacheConfig
:在類級別共享一些與緩存相關的常見設置。在業務中每個須要緩存的數據都要指定放到對應的那個名字的緩存中。至關於緩存的分區,通常建議按照業務類型來劃分。
@Cacheable
表明當前方法的結果須要緩存,若是緩存中有,方法不用調用。若是緩存中沒有,會調用方法,最後將方法的結果放入緩存。
@Cacheable
的默認行爲:
若是咱們須要自定義屬性,該怎麼作呢?
- 指定生成的緩存使用的key:key屬性指定,使用spel表達式 SPEL表達式:docs.spring.io/spring/docs…
- 指定緩存的數據的存活時間:配置文件中修改ttl,spring.cache.redis.time-to-live=3600000
- 將數據保存爲json格式
// @Cacheable(value = {"category"},key = "'level1Categorys'")
@Cacheable(value = {"category"},key = "#root.method.name") // #root.methodName spel表達式,使用調用的方法名做爲緩存的key
@Override
public List<CategoryEntity> getLevel1Categorys() {
long l = System.currentTimeMillis();
List<CategoryEntity> categoryEntities = baseMapper.selectList(new QueryWrapper<CategoryEntity>().eq("cat_level", 1));
System.out.println("DB耗時:"+(System.currentTimeMillis()-l));
return categoryEntities;
}
複製代碼
@CacheEvict
@CacheEvict
註解是支持緩存一致性——失效模式
的註解。@CachePut
是支持緩存一致性——雙寫模式
的註解。
要讓一個緩存在更新數據的時候失效,就須要使用@CacheEvict
註解:
清空單個緩存: @CacheEvict(value = {"category"},key = "'getLevel1Categorys'")
/** * 級聯更全部關聯數據 * @CacheEvict:緩存一致性——失效模式 * @CachePut:緩存一致性——雙寫模式 * @param category */
@CacheEvict(value = {"category"},key = "'getLevel1Categorys'") //清空單個緩存
@Transactional // 開啓事務
@Override
public void updateCascade(CategoryEntity category) {
// 一、先更新當前表的內容
this.updateById(category);
//二、更新級聯表的冗餘內容
categoryBrandRelationService.updateCategory(category.getCatId(),category.getName());
}
複製代碼
清空多個緩存:
(1)、同時操做多個緩存:@Caching
@Caching(evict = { //清空多個緩存
@CacheEvict(value = {"category"},key = "'getLevel1Categorys'"),
@CacheEvict(value = {"category"},key = "'getCatalogJson'")
})
@Caching(evict = { //清空多個緩存
@CacheEvict(value = {"category"},key = "'getLevel1Categorys'"),
@CacheEvict(value = {"category"},key = "'getCatalogJson'")
})
@Transactional // 開啓事務
@Override
public void updateCascade(CategoryEntity category) {
// 一、先更新當前表的內容
this.updateById(category);
//二、更新級聯表的冗餘內容
categoryBrandRelationService.updateCategory(category.getCatId(),category.getName());
}
複製代碼
(2) 指定刪除某個分區下的全部數據:@CacheEvict(value = {"category"},allEntries = true)
@CacheEvict(value = {"category"},allEntries = true)
//清空整個分區的緩存
@CacheEvict(value = {"category"},allEntries = true) //清空整個分區的緩存
@Transactional // 開啓事務
@Override
public void updateCascade(CategoryEntity category) {
// 一、先更新當前表的內容
this.updateById(category);
//二、更新級聯表的冗餘內容
categoryBrandRelationService.updateCategory(category.getCatId(),category.getName());
}
複製代碼
因此之後使用緩存能夠定義以下規則:
同一類型
的數據,均可以指定成同一個分區
。分區名默認就是緩存的前綴
,因此不須要須要設置緩存前綴 spring.cache.redis.key-prefix=CACHE_
,這樣緩存中的鍵值就是分區名::方法名
@CacheEvict(value = {"category"}, allEntries = true)
測試效果:
自定義緩存配置,須要定義一個緩存配置類:
@EnableConfigurationProperties(CacheProperties.class) // 讓 CacheProperties 的綁定生效
@Configuration
@EnableCaching // 開啓緩存(配置在 XxxApplication 主類上也能夠)
public class MyCacheConfig {
// @Autowired
// CacheProperties cacheProperties;
/** * 使用緩存配置類後,緩存配置文件中設置的屬性將會失效,好比: * spring.cache.redis.time-to-live=3600000 # 設置緩存存活時間,單位是ms * 因此須要另外在這裏配置從新綁定 * * 一、原來和配置文件綁定的配置類是這樣子的 * @ConfigurationProperties( * prefix = "spring.cache" * ) * public class CacheProperties { * * 二、要讓它生效: * 1)、@EnableConfigurationProperties(CacheProperties.class) 讓 CacheProperties 的綁定生效 * 2)、注入 CacheProperties 或者在配置方法中加上 CacheProperties 參數 * * @return */
@Bean
RedisCacheConfiguration redisCacheConfiguration(CacheProperties cacheProperties) {
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig();
//設置key的序列化
config = config.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()));
//設置value的序列化,使用fastjson
config = config.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericFastJsonRedisSerializer()));
CacheProperties.Redis redisProperties = cacheProperties.getRedis();
// 將配置文件中的全部配置都生效
if (redisProperties.getTimeToLive() != null) {
config = config.entryTtl(redisProperties.getTimeToLive());
}
if (redisProperties.getKeyPrefix() != null) {
config = config.prefixKeysWith(redisProperties.getKeyPrefix());
}
if (!redisProperties.isCacheNullValues()) {
config = config.disableCachingNullValues();
}
if (!redisProperties.isUseKeyPrefix()) {
config = config.disableKeyPrefix();
}
return config;
}
}
複製代碼
其餘的緩存屬性配置:
# 設置緩存存活時間,單位是ms
spring.cache.redis.time-to-live=3600000
# 設置緩存的前綴,若是指定了前綴就使用咱們指定的前綴,不然就默認使用緩存的名字做爲前綴
spring.cache.redis.key-prefix=CACHE_
# 設置是否啓用緩存前綴,默認是true
spring.cache.redis.use-key-prefix=true
# 是否緩存空值,設置爲true能夠防止緩存穿透
spring.cache.redis.cache-null-values=true
複製代碼
自定義測試效果:
在配置文件中配置容許緩存空值,解決緩存穿透問題
# 是否緩存空值,設置爲true能夠防止緩存穿透
spring.cache.redis.cache-null-values=true
複製代碼
原理:
CacheManager
(RedisCacheManager
) --建立-->Cache
(RedisCache
)-->Cache
負責緩存的讀寫操做
不足:
(1)、讀模式
緩存穿透
:查詢一個null數據。解決:緩存空數據
,添加配置 spring.cache.redis.cache-null-values=true
緩存擊穿
:大量併發進來同時查詢一個正好過時的數據。解決:加鎖
。可是Spring-Cache默認put時是不加鎖的,因此沒有辦法解決這個問題。可是能夠設置 sync = true
@Cacheable(value = xxx, key = xxx, sync = true)
,在查緩存的時候調用使用了同步的get方法org.springframework.data.redis.cache.RedisCache#get(java.lang.Object, java.util.concurrent.Callable)
獲取到獲取到空數據時在put中放一份空的數據。緩存雪崩
:大量的key同時過時。解決:加隨機時間
。加上過時時間:spring.cache.redis.time-to-live=3600000
(2)、寫模式(緩存與數據庫一致)
讀寫加鎖
:使用讀多寫少場景讀多寫多
:直接去數據庫查詢總結:
常規數據
(讀多寫少、即時性、一致性要求不高的數據):徹底可使用Spring-Cache;寫模式:只要緩存設置了過時時間就足夠了
特殊數據
:特殊設計
參考: