衆所周知,分佈式鎖在微服務架構中是重頭戲,尤爲是在互聯網公司,基本上企業內部都會有本身的一套分佈式鎖開發框架。本文主要介紹使用Redis如何構建高併發分佈式鎖。java
假設 存在一個SpringBoot的控制器,其扣減庫存的業務邏輯以下:redis
@Autowired private StringRedisTemplate stringRedisTemplate; @RequestMapping(value = "/deduct-stock") public String deductSotck() throws Exception { // 將庫存取出來 int i = Interger.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 判斷庫存夠不夠減 if (stock > 0) { // 將庫存回寫到redis int tmp = stock - 1; stringRedisTemplate.opsForValue().set("stock", tmp.toString()); logger.info("庫存扣減成功"); } else { logger.info("庫存扣減失敗"); } return "finished."; }
不難看出,在應用服務器運行這段代碼的時候就會有線程安全性問題。由於多個線程同時去修改Redis服務中的數據。所以考慮給這段代碼加上一把鎖:api
@Autowired private StringRedisTemplate stringRedisTemplate; @RequestMapping(value = "/deduct-stock") public String deductSotck() throws Exception { synchronized (this) { int i = Interger.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 判斷庫存夠不夠減 if (stock > 0) { // 將庫存回寫到redis int tmp = stock - 1; stringRedisTemplate.opsForValue().set("stock", tmp.toString()); logger.info("庫存扣減成功"); } else { logger.info("庫存扣減失敗"); } } return "finished."; }
這樣一來,當多個HTTP請求來請求數據的時候,多個線程去修改同一數據會有JVM本地鎖來進行合理的資源限制。雖然這樣解決了線程安全性問題,可是這僅僅是JVM級別的鎖,在分佈式的環境下,因爲像這樣的Web應用隨時會進行動態擴容,所以當多個應用的時候,一樣會有線程安全性問題,當上面這段代碼遇到相似下面的架構時仍是會有各類各樣的問題:安全
對於上述的狀況,咱們可使用redis api提供的setnx方法解決:服務器
@Autowired private StringRedisTemplate stringRedisTemplate; @RequestMapping(value = "/deduct-stock") public String deductSotck() throws Exception { // 嘗試獲取鎖 Boolean flag = stringRedisTmplate.opsForValue().setIfAbsent("Hello", "World"); // 判斷是否得到鎖 if (!flag) { return "error"; } int i = Interger.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 判斷庫存夠不夠減 if (stock > 0) { // 將庫存回寫到redis int tmp = stock - 1; stringRedisTemplate.opsForValue().set("stock", tmp.toString()); logger.info("庫存扣減成功"); } else { logger.info("庫存扣減失敗"); } // 刪除鎖 stringRedisTemplate.delete("Hello"); return "finished."; }
setnx key value
是將key的值設置爲value,當且僅當key不存在的時候。若是設置成功就返回1,不然就返回0。架構
這樣的話,首先嚐試獲取鎖,而後當業務執行完成的時候再刪除鎖。可是仍是有問題的,當獲取鎖的時候拋出異常或者業務執行拋出異常怎麼辦,因此加入異常處理邏輯:併發
@Autowired private StringRedisTemplate stringRedisTemplate; @RequestMapping(value = "/deduct-stock") public String deductSotck() throws Exception { try { // 嘗試獲取鎖 Boolean flag = stringRedisTmplate.opsForValue().setIfAbsent("Hello", "World"); // 判斷是否得到鎖 if (!flag) { return "error"; } int i = Interger.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 判斷庫存夠不夠減 if (stock > 0) { // 將庫存回寫到redis int tmp = stock - 1; stringRedisTemplate.opsForValue().set("stock", tmp.toString()); logger.info("庫存扣減成功"); } else { logger.info("庫存扣減失敗"); } } finally { // 刪除鎖 stringRedisTemplate.delete("Hello"); } return "finished."; }
通過這樣的修改,看起來沒什麼問題了。可是當程序得到鎖而且開始執行業務邏輯的時候,忽然程序掛掉了或者被一些粗暴的運維工程師給kill,在finally中刪除鎖的邏輯就會得不到執行,所以就會產生死鎖。對於這種狀況,咱們能夠給這個鎖設置一個超時時間:app
@Autowired private StringRedisTemplate stringRedisTemplate; @RequestMapping(value = "/deduct-stock") public String deductSotck() throws Exception { try { // 嘗試獲取鎖 Boolean flag = stringRedisTmplate.opsForValue().setIfAbsent("Hello", "World"); // 設置超時時間, 根據業務場景估計超時時長 stringRedisTmplate.expire("Hello", 10, TimeUnit.SECONDS); // 判斷是否得到鎖 if (!flag) { return "error"; } int i = Interger.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 判斷庫存夠不夠減 if (stock > 0) { // 將庫存回寫到redis int tmp = stock - 1; stringRedisTemplate.opsForValue().set("stock", tmp.toString()); logger.info("庫存扣減成功"); } else { logger.info("庫存扣減失敗"); } } finally { // 刪除鎖 stringRedisTemplate.delete("Hello"); } return "finished."; }
若是程序這麼來寫,相對來講安全一些了,可是仍是存在問題。試想一下,當獲取鎖成功時,正想給這把鎖設置超時的時候,程序掛掉了,仍是會出現死鎖的,所以在redis較高的版本中提供的setIfAbsent方法中能夠同時設置鎖的超時時間。框架
Boolean flag = stringRedisTmplate.opsForValue().setIfAbsent("Hello", "World", 10, TimeUnit.SECONDS);
這樣一來,嘗試獲取鎖和設置鎖的超時時間就具有原子性了。實際上通過咱們這一番改造,這在小型企業已經沒有太大的問題, 由於像這種代碼天天也就執行幾百次,並不算作高併發的場景。當這樣的代碼被暴露在超高併發場景下的時候,仍是會存在各類各樣的問題。試想一個場景,當一個HTTP請求請求到控制器的時候,應用獲取到鎖了,超時時間也設置成功了,可是應用的業務邏輯超過了超時時間,咱們這裏的超時時間設置的是10秒,當應用的業務邏輯執行15秒的時候,鎖就被redis服務刪除了。假設剛好此時又有一個HTTP請求來請求控制器,此時應用服務器會再啓動一個線程來獲取鎖,並且還獲取成功了,可是此次的HTTP請求對應的業務邏輯尚未執行完。新來的TTTP請求也在執行,因爲新來的HTTP請求也在執行,由於鎖超時後被刪除,新的HTTP請求也成功獲取鎖了。當原來的HTTP請求對應的業務邏輯執行完成之後,嘗試刪除鎖,這樣正好刪除的是新來的HTTP請求對應的鎖。這個時候redis中又沒有鎖了,這樣第三個HTTP請求又會得到鎖,因此狀況就不妙了。運維
爲了解決上面的問題,咱們能夠將代碼優化爲下面的樣子:
@Autowired private StringRedisTemplate stringRedisTemplate; @RequestMapping(value = "/deduct-stock") public String deductSotck() throws Exception { String clientUuid = UUID.randomUUID().toString(); try { // 嘗試獲取鎖,設置超時時間, 根據業務場景估計超時時長 Boolean flag = stringRedisTmplate.opsForValue().setIfAbsent("Hello", clientUuid, 10, TimeUnit.SECONDS); // 判斷是否得到鎖 if (!flag) { return "error"; } int i = Interger.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 判斷庫存夠不夠減 if (stock > 0) { // 將庫存回寫到redis int tmp = stock - 1; stringRedisTemplate.opsForValue().set("stock", tmp.toString()); logger.info("庫存扣減成功"); } else { logger.info("庫存扣減失敗"); } } finally { // 刪除鎖的時候判斷是否是本身的鎖 if (clientUuid.equals(stringRedisTemplate.opsForValue().get("Hello"))) { stringRedisTemplate.delete("Hello"); } } return "finished."; }
可是因爲程序的不可預知性,誰也不能保證極端狀況下,同時會有多個線程同時執行這段業務邏輯。咱們能夠在當執行業務邏輯的時候同時開一個定時器線程,每隔幾秒就從新將這把鎖設置爲10秒,也就是給這把鎖進行「續命」。這樣就用擔憂業務邏輯到底執行多長時間了。可是這樣程序的複雜性就會增長,每一個業務邏輯都要寫好多的代碼,所以這裏推薦在分佈式環境下使用redisson。所以咱們使用redisson實現分支線程的代碼:
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.6.5</version> </dependency>
@Bean public Redisson redisson () { Config cfg = new Config(); cfg.useSingleServer().setAddress("redis://localhost:6379").setDatabase(0); return (Redisson) Redisson.create(cfg); }
@Autowired private Redisson redisson;
@Autowired private StringRedisTemplate stringRedisTemplate; @RequestMapping(value = "/deduct-stock") public String deductSotck() throws Exception { // 獲取鎖對象 RLock lock = redisson.getLock("Hello"); try { // 嘗試加鎖, 默認30秒, 自動後臺開一個線程實現鎖的續命 lock.tryLock(); int i = Interger.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 判斷庫存夠不夠減 if (stock > 0) { // 將庫存回寫到redis int tmp = stock - 1; stringRedisTemplate.opsForValue().set("stock", tmp.toString()); logger.info("庫存扣減成功"); } else { logger.info("庫存扣減失敗"); } } finally { // 釋放鎖 lock.unlock(); } return "finished."; }
Redisson分佈式鎖的實現原理以下:
可是這個架構仍是存在問題的,由於redis服務器是主從的架構,當在master節點設置鎖以後,slave節點會馬上同步。可是若是剛在master節點設置上了鎖,slave節點還沒來得及設置,master節點就掛掉了。仍是會產生上一樣的問題,新的線程得到鎖。
所以使用redis構建高併發的分佈式鎖,僅適合單機架構,當使用主從架構的redis時仍是會出現線程安全性問題。