你們好,我是walking,原文首發於公衆號編程大道。感謝你打開這篇文章,請認真閱讀下去吧。 今天咱們聊聊redis的一個實際開發的使用場景,那就是大名鼎鼎的分佈式鎖。html
咱們學習 Java 都知道鎖的概念,例如基於 JVM 實現的同步鎖 synchronized,以及 jdk 提供的一套代碼級別的鎖機制 lock,咱們在併發編程中會常常用這兩種鎖去保證代碼在多線程環境下運行的正確性。可是這些鎖機制在分佈式場景下是不適用的,緣由是在分佈式業務場景下,咱們的代碼都是跑在不一樣的JVM甚至是不一樣的機器上,synchronized 和 lock 只能在同一個 JVM 環境下起做用。因此這時候就須要用到分佈式鎖了。java
例如,如今有個場景就是整點搶消費券(疫情的緣由,支付寶最近在8點、12點整點開放搶消費券),消費券有一個固定的量,先到先得,搶完就沒了,線上的服務都是部署多個的,大體架構以下:nginx
因此這個時候咱們就得用分佈式鎖來保證共享資源的訪問的正確性。git
假設不使用分佈式鎖,咱們看看 synchronized 能不能保證?實際上是不能的,咱們來演示一下。github
下面我寫了一個簡單的 springboot 項目來模擬這個搶消費券的場景,代碼很簡單,大體意思是先從 Redis 獲取剩餘消費券數,而後判斷大於0,則減一模擬被某個用戶搶到一個,而後減一後再修改 Redis 的剩餘消費券數量,打印扣減成功,剩餘還有多少,不然扣減失敗,就沒搶到。整塊代碼被 synchronized 包裹,Redis 設置的庫存數量爲50。web
//假設庫存編號是00001
private String key = "stock:00001";
@Autowired
private StringRedisTemplate stringRedisTemplate;
/** * 扣減庫存 synchronized同步鎖 */
@RequestMapping("/deductStock")
public String deductStock(){
synchronized (this){
//獲取當前庫存
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get(key));
if(stock>0){
int afterStock = stock-1;
stringRedisTemplate.opsForValue().set(key,afterStock+"");//修改庫存
System.out.println("扣減庫存成功,剩餘庫存"+afterStock);
}else {
System.out.println("扣減庫存失敗");
}
}
return "ok";
}
複製代碼
而後啓動兩個springboot項目,端口分別爲8080,8081,而後在nginx裏配置負載均衡redis
upstream redislock{
server 127.0.0.1:8080;
server 127.0.0.1:8081;
}
server {
listen 80;
server_name 127.0.0.1;
location / {
root html;
index index.html index.htm;
proxy_pass http://redislock;
}
}
複製代碼
而後用jmeter壓測工具進行測試spring
而後咱們看一下控制檯輸出,能夠看到咱們運行的兩個web實例,不少一樣的消費券被不一樣的線程搶到,證實synchronized在這樣的狀況下是不起做用的,因此就須要使用分佈式鎖來保證資源的正確性。編程
在實現分佈式鎖以前,咱們先考慮如何實現,以及都要實現鎖的哪些功能。springboot
一、分佈式特性(部署在多個機器上的實例都可以訪問這把鎖)
二、排他性(同一時間只能有一個線程持有鎖)
三、超時自動釋放的特性(持有鎖的線程須要給定必定的持有鎖的最大時間,防止線程死掉沒法釋放鎖而形成死鎖)
四、...
基於以上列出的分佈式鎖須要擁有的基本特性,咱們思考一下使用Redis該如何實現?
一、第一個分佈式的特性Redis已經支持,多個實例連同一個Redis便可
二、第二個排他性,也就是要實現一個獨佔鎖,可使用Redis的setnx命令實現
三、第三個超時自動釋放特性,Redis能夠針對某個key設置過時時間
四、執行完畢釋放分佈式鎖
科普時間 Redis Setnx 命令 Redis Setnx(SET if Not eXists) 命令在指定的 key 不存在時,爲 key 設置指定的值 語法 redis Setnx 命令基本語法以下: redis 127.0.0.1:6379> SETNX KEY_NAME VALUE 可用版本:>= 1.0.0 返回值:設置成功,返回1, 設置失敗,返回0
@RequestMapping("/stock_redis_lock")
public String stock_redis_lock(){
//底層使用setnx命令
Boolean aTrue = stringRedisTemplate.opsForValue().setIfAbsent(lock_key, "true");
stringRedisTemplate.expire(lock_key,10, TimeUnit.SECONDS);//設置過時時間10秒
if (!aTrue) {//設置失敗則表示沒有拿到分佈式鎖
return "error";//這裏能夠給用戶一個友好的提示
}
//獲取當前庫存
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get(key));
if(stock>0){
int afterStock = stock-1;
stringRedisTemplate.opsForValue().set(key,afterStock+"");
System.out.println("扣減庫存成功,剩餘庫存"+afterStock);
}else {
System.out.println("扣減庫存失敗");
}
stringRedisTemplate.delete(lock_key);//執行完畢釋放分佈式鎖
return "ok";
}
複製代碼
仍然設置庫存數量爲50,咱們再用jmeter測試一下,把jmeter的測試地址改成127.0.0.1/stock_redis_lock,一樣的設置再來測一次。
測試了5次沒有出現髒數據,把發送時間改成0,測了5次也沒問題,而後又把線程數改成600,時間爲0 ,循環4次,測了幾回也是正常的。
上面實現分佈式鎖的代碼已是一個較爲成熟的分佈式鎖的實現了,對大多數軟件公司來講都已經知足需求了。可是上面代碼仍是有優化的空間,例如:
1)上面的代碼咱們是沒有考慮異常狀況的,實際狀況下代碼沒有這麼簡單,可能還會有別的不少複雜的操做,都有可能會出現異常,因此咱們釋放鎖的代碼須要放在finally塊裏來保證即便是代碼拋異常了釋放鎖的代碼他依然會被執行。
2)還有,你有沒有注意到,上面咱們的分佈式鎖的代碼的獲取和設置過時時間的代碼是兩步操做第4行和第5行,即非原子操做,就有可能剛執行了第4行還沒來得及執行第5行這臺機器掛了,那麼這個鎖就沒有設置超時時間,其餘線程就一直沒法獲取,除非人工干預,因此這是一步優化的地方,Redis也提供了原子操做,那就是SET key value EX seconds NX
科普時間 SET key value [EX seconds] [PX milliseconds] [NX|XX] 將字符串值 value 關聯到 key 可選參數 從 Redis 2.6.12 版本開始, SET 命令的行爲能夠經過一系列參數來修改:
- EX second :設置鍵的過時時間爲 second 秒。SET key value EX second 效果等同於 SETEX key second value
- PX millisecond :設置鍵的過時時間爲 millisecond 毫秒。SET key value PX millisecond 效果等同於 PSETEX key millisecond value
- NX :只在鍵不存在時,纔對鍵進行設置操做。SET key value NX 效果等同於 SETNX key value
- XX :只在鍵已經存在時,纔對鍵進行設置操做
SpringBoot的StringRedisTemplate也有對應的方法實現,以下代碼:
//假設庫存編號是00001
private String key = "stock:00001";
private String lock_key = "lock_key:00001";
@Autowired
private StringRedisTemplate stringRedisTemplate;
@RequestMapping("/stock_redis_lock")
public String stock_redis_lock() {
try {
//原子的設置key及超時時間
Boolean aTrue = stringRedisTemplate.opsForValue().setIfAbsent(lock_key, "true", 30, TimeUnit.SECONDS);
if (!aTrue) {
return "error";
}
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get(key));
if (stock > 0) {
int afterStock = stock - 1;
stringRedisTemplate.opsForValue().set(key, afterStock + "");
System.out.println("扣減庫存成功,剩餘庫存" + afterStock);
} else {
System.out.println("扣減庫存失敗");
}
} catch (NumberFormatException e) {
e.printStackTrace();
} finally {
//避免死鎖
stringRedisTemplate.delete(lock_key);
}
return "ok";
}
複製代碼
這樣實現是否就完美了呢?嗯,對於併發量要求不高或者非大併發的場景的話這樣實現已經能夠了。可是對於搶購 ,秒殺這樣的場景,當流量很大,這時候服務器網卡、磁盤IO、CPU負載均可能會達到極限,那麼服務器對於一個請求的的響應時間勢必變得比正常狀況下慢不少,那麼假設就剛纔設置的鎖的超時時間爲10秒,若是某一個線程拿到鎖以後由於某些緣由沒能在10秒內執行完畢鎖就失效了,這時候其餘線程就會搶佔到分佈式鎖去執行業務邏輯,而後以前的線程執行完了,會去執行 finally 裏的釋放鎖的代碼就會把正在佔有分佈式鎖的線程的鎖給釋放掉,實際上剛剛正在佔有鎖的線程還沒執行完,那麼其餘線程就又有機會得到鎖了...這樣整個分佈式鎖就失效了,將會產生意想不到的後果。以下圖模擬了這個場景。
因此這個問題總結一下,就是由於鎖的過時時間設置的不合適或由於某些緣由致使代碼執行時間大於鎖過時時間而致使併發問題以及鎖被別的線程釋放,以致於分佈式鎖混亂。在簡單的說就是兩個問題,1)本身的鎖被別人釋放 2)鎖超時沒法續時間。
第一個問題很好解決,在設置分佈式鎖時,咱們在當前線程中生產一個惟一串將value設置爲這個惟一值,而後在finally塊裏判斷當前鎖的value和本身設置的同樣時再去執行delete,以下:
String uuid = UUID.randomUUID().toString();
try {
//原子的設置key及超時時間,鎖惟一值
Boolean aTrue = stringRedisTemplate.opsForValue().setIfAbsent(lock_key,uuid,30,TimeUnit.SECONDS);
//...
} finally {
//是本身設置的鎖再執行delete
if(uuid.equals(stringRedisTemplate.opsForValue().get(lock_key))){
stringRedisTemplate.delete(lock_key);//避免死鎖
}
}
複製代碼
問題一解決了(設想一下上述代碼還有什麼問題,一下子講),那鎖的超時時間就很關鍵了,不能太大也不能過小,這就須要評估業務代碼的執行時間,好比設置個10秒,20秒。即便是你的鎖設置了合適的超時時間,也避免不了可能會發生上述分析的由於某些緣由代碼沒在正常評估的時間內執行完畢,因此這時候的解決方案就是給鎖續超時時間。大體思路就是,業務線程單獨起一個分線程,定時去監聽業務線程設置的分佈式鎖是否還存在,存在就說明業務線程還沒執行完,那麼就延長鎖的超時時間,若鎖已不存在則業務線程執行完畢,而後就結束本身。
**「鎖續命」**的這套邏輯屬實有點複雜啊,要考慮的問題太多了,稍不注意就會有bug。不要看上面實現分佈式鎖的代碼沒有幾行,就認爲實現起來很簡單,若是說本身去實現的時候沒有實際高併發的經驗,確定也會踩不少坑,例如,
1)鎖的設置和過時時間的設置是非原子操做的,就可能會致使死鎖。
2)還有上面遺留的一個,在finally塊裏判斷鎖是不是本身設置的,是的話再刪除鎖,這兩步操做也不是原子的,假設剛判斷完爲true服務就掛了,那麼刪除鎖的代碼不會執行,就會形成死鎖,即便是設置了過時時間,在沒過時這段時間也會死鎖。因此這裏也是一個注意的點,要保證原子操做的話,Redis提供了執行Lua腳本的功能來保證操做的原子性,具體怎麼使用再也不展開。
因此,**「鎖續命」**的這套邏輯實現起來仍是有點複雜的,好在市面上已經有現成的開源框架幫咱們實現了,那就是Redisson。
實現原理:
一、首先Redisson會嘗試進行加鎖,加鎖的原理也是使用相似Redis的setnx命令原子的加鎖,加鎖成功的話其內部會開啓一個子線程 二、子線程主要負責監聽,其實就是一個定時器,定時監聽主線程是否還持有鎖,持有則將鎖的時間延時,不然結束線程 三、若是加鎖失敗則自旋不斷嘗試加鎖 四、執行完代碼主線程主動釋放鎖
那咱們看一下使用後Redisson後的代碼是什麼樣的。
一、首先在pom.xml文件添加Redisson的maven座標
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.12.5</version>
</dependency>
複製代碼
二、咱們要拿到Redisson的這個對象,以下配置Bean
@SpringBootApplication
public class RedisLockApplication {
public static void main(String[] args) {
SpringApplication.run(RedisLockApplication.class, args);
}
@Bean
public Redisson redisson(){
Config config = new Config();
config.useSingleServer().setAddress("redis://localhost:6379")
.setDatabase(0);
return (Redisson) Redisson.create(config);
}
}
複製代碼
三、而後咱們獲取Redisson的實例,使用其API進行加鎖釋放鎖操做
//假設庫存編號是00001
private String key = "stock:00001";
private String lock_key = "lock_key:00001";
@Autowired
private StringRedisTemplate stringRedisTemplate;
/** * 使用Redisson實現分佈式鎖 * @return */
@RequestMapping("/stock_redisson_lock")
public String stock_redisson_lock() {
RLock redissonLock = redisson.getLock(lock_key);
try {
redissonLock.lock();
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get(key));
if (stock > 0) {
int afterStock = stock - 1;
stringRedisTemplate.opsForValue().set(key, afterStock + "");
System.out.println("扣減庫存成功,剩餘庫存" + afterStock);
} else {
System.out.println("扣減庫存失敗");
}
} catch (NumberFormatException e) {
e.printStackTrace();
} finally {
redissonLock.unlock();
}
return "ok";
}
複製代碼
看這個Redisson的分佈式鎖提供的API是否是很是的簡單?就像Java併發變成裏AQS那套Lock機制同樣,以下獲取一把RedissonLock
RLock redissonLock = redisson.getLock(lock_key);
複製代碼
默認返回的是RedissonLock的對象,該對象實現了RLock接口,而RLock接口繼承了JDK併發編程報包裏的Lock接口
在使用Redisson加鎖時,它也提供了不少API,以下
如今咱們選擇使用的是最簡單的無參lock方法,簡單的點進去跟一下看看他的源碼,咱們找到最終的執行加鎖的代碼以下:
咱們能夠看到其底層使用了Lua腳原本保證原子性,使用Redis的hash結構實現的加鎖,以及可重入鎖。
比咱們本身實現分佈式鎖看起來還要簡單,可是咱們本身寫的鎖功能他都有,咱們沒有的他也有。好比,他實現的分佈式鎖是支持可重入的,也支持可等待,即嘗試等待必定時間,沒拿到鎖就返回false。上述代碼中的redissonLock.lock();是一直等待,內部自旋嘗試加鎖。
Distributed Java locks and synchronizers
Lock
FairLock
MultiLock
RedLock
ReadWriteLock
Semaphore
PermitExpirableSemaphore
CountDownLatch
redisson.org
Redisson提供了豐富的API,內部運用了大量的Lua腳本保證原子操做,篇幅緣由redisson實現鎖的代碼暫不分析了。
注意:在上述示例代碼中,爲了方便演示,查詢redis庫存、修改庫存並不是原子操做,實際這兩部操做也得保證原子行,能夠用redis自帶的Lua腳本功能去實現
到這裏,Redis分佈式鎖實戰基本就講完了,總結一下Redis分佈式鎖吧。
一、若是說是本身實現的話,須要特別注意四點:
二、若是使用現成的分佈式鎖框架Redisson,就須要熟悉一下其經常使用的API以及實現原理,或者選擇其餘開源的分佈式鎖框架,充分考察,選擇適合本身業務需求的便可。
參考
doc.redisfans.com/string/set.…
若是以爲本文對你有幫助的話,請不要吝嗇你的贊哦。
更多幹貨文章歡迎關注公衆號:編程大道
公衆號也有好多關於Redis的技術文,歡迎關注哦
另外walking本人呢也在整理Redis相關的知識點,作成思惟導圖的形式,不過還沒最終整理完,已經整理了好幾天啦,關注公衆號,整理好了會經過公衆號推送給你們~