參考文章:html
Redis分佈式鎖的正確實現方式java
分佈式鎖看這篇就夠了linux
在這兩篇文章的指引下親測 Redis分佈式鎖redis
分佈式系統必定會存在CAP權衡問題,因此纔會出現分佈式鎖spring
什麼是CAP理論? 數據庫
爲了更好的理解文章,建議閱讀:分佈式系統的CAP理論網絡
此處主要指集羣模式下,多個相同服務同時開啓.多線程
在許多的場景中,咱們爲了保證數據的最終一致性,須要不少的技術方案來支持,好比分佈式事務、分佈式鎖等。不少時候咱們須要保證一個方法在同一時間內只能被同一個線程執行。在單機環境中,經過 Java 提供的併發 API 咱們能夠解決,可是在分佈式環境下,就沒有那麼簡單啦。併發
首先,爲了確保分佈式鎖可用,咱們至少要確保鎖的實現同時知足如下四個條件:dom
分佈式鎖通常有三種實現方式:
本文將介紹第二種方式,基於Redis實現分佈式鎖。
注意: Redis 從2.6.12版本開始 set 命令支持 NX 、 PX 這些參數來達到 setnx 、 setex 、 psetex 命令的效果,文檔參見: http://doc.redisfans.com/string/set.html
Spring Boot 下的 RedisTemplate 並不支持 NX 同時設置過時時間這種 set 操做(具備原子性)
因此這裏咱們須要 Maven 引入支持這種 set 操做的 Jedis 依賴
<dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.0</version> </dependency>
併發下單,庫存鎖測試:
建立10個線程,同時啓動下單操做,對庫存操做加入分佈式鎖
測試代碼:
package com.elise.userinfocenter; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.data.redis.RedisProperties; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import redis.clients.jedis.Jedis; import java.util.Collections; @RunWith(SpringRunner.class) @SpringBootTest public class UserInfoCenterApplicationTests { private static final String LOCK_SUCCESS = "OK"; private static final String SET_IF_NOT_EXIST = "NX"; private static final String SET_WITH_EXPIRE_TIME = "PX"; private static final Long RELEASE_SUCCESS = 1L; private int splitPoint = 500; @Autowired private RedisProperties redisConfig; @Test public void redisLock() { ThreadTest[] threadTests = new ThreadTest[10]; for (int i=0; i<10; i++) { threadTests[i] = new ThreadTest(); } for (int i=0; i<10; i++) { threadTests[i].start(); } } private class ThreadTest extends Thread { @Override public void run() { Jedis jedis = new Jedis(redisConfig.getHost(),redisConfig.getPort(),redisConfig.getTimeout()); String requestId = this.getId()+""; int i=0; while (true){ i = ++i; try { if(tryGetDistributedLock(jedis,"lock-test",requestId,2000)) { System.out.println("線程:"+requestId+" 成功得到分佈式鎖!!!"); System.out.println("當前庫存:"+splitPoint); splitPoint = --splitPoint; System.out.println("線程:"+requestId+"下單成功後庫存:"+splitPoint); if(releaseDistributedLock(jedis,"lock-test",requestId)) { System.out.println("線程:"+requestId+" 成功釋放分佈式鎖!!!"); } break; } else { System.out.println("線程:"+requestId+" 第"+i+"次沒法得到分佈式鎖,繼續搶鎖!!!"); } }catch (Exception e) { e.printStackTrace(); } } } } /** * 嘗試獲取分佈式鎖 * @param jedis Redis客戶端 * @param lockKey 鎖 * @param requestId 請求標識 * @param expireTime 超期時間 * @return 是否獲取成功 */ public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) { String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); if (LOCK_SUCCESS.equals(result)) { return true; } return false; } /** * 釋放分佈式鎖 * @param jedis Redis客戶端 * @param lockKey 鎖 * @param requestId 請求標識 * @return 是否釋放成功 */ public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) { String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId)); if (RELEASE_SUCCESS.equals(result)) { return true; } return false; } }
經過上面的代碼+效果圖能夠知道這十個線程啓動以後都開始搶佔 redis分佈式鎖,沒有得到鎖繼續搶鎖,蹭蹭蹭幾下每一個線程都準確無誤滴執行了下單,減小庫存操做,下面具體分析一下加鎖,解鎖代碼
/** * 嘗試獲取分佈式鎖 * @param jedis Redis客戶端 * @param lockKey 鎖 * @param requestId 請求標識 * @param expireTime 超期時間 * @return 是否獲取成功 */ public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) { String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); if (LOCK_SUCCESS.equals(result)) { return true; } return false; }
能夠看到,咱們加鎖就一行代碼:jedis.set(String key, String value, String nxxx, String expx, int time)
,這個set()方法一共有五個形參:
第一個爲key,咱們使用key來當鎖,由於key是惟一的。
第二個爲value,咱們傳的是requestId,不少童鞋可能不明白,有key做爲鎖不就夠了嗎,爲何還要用到value?緣由就是咱們在上面講到可靠性時,分佈式鎖要知足第四個條件解鈴還須繫鈴人,經過給value賦值爲requestId,咱們就知道這把鎖是哪一個請求加的了,在解鎖的時候就能夠有依據。requestId可使用UUID.randomUUID().toString()
方法生成。(本測試用例使用的是當前線程ID)
第三個爲nxxx,這個參數咱們填的是NX,意思是SET IF NOT EXIST,即當key不存在時,咱們進行set操做;若key已經存在,則不作任何操做;
第四個爲expx,這個參數咱們傳的是PX,意思是咱們要給這個key加一個過時的設置,具體時間由第五個參數決定。
第五個爲time,與第四個參數相呼應,表明key的過時時間。
總的來講,執行上面的set()方法就只會致使兩種結果:
心細的童鞋就會發現了,咱們的加鎖代碼知足咱們可靠性裏描述的三個條件。
/** * 釋放分佈式鎖 * @param jedis Redis客戶端 * @param lockKey 鎖 * @param requestId 請求標識 * @return 是否釋放成功 */ public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) { String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId)); if (RELEASE_SUCCESS.equals(result)) { return true; } return false; }
能夠看到,咱們解鎖只須要兩行代碼就搞定了!第一行代碼,咱們寫了一個簡單的Lua腳本代碼。第二行代碼,咱們將Lua代碼傳到jedis.eval()
方法裏,並使參數KEYS[1]賦值爲lockKey,ARGV[1]賦值爲requestId。eval()方法是將Lua代碼交給Redis服務端執行。
那麼這段Lua代碼的功能是什麼呢?其實很簡單,首先獲取鎖對應的value值,檢查是否與requestId相等,若是相等則刪除鎖(解鎖)。那麼爲何要使用Lua語言來實現呢?由於要確保上述操做是原子性的。
簡單來講,就是在eval命令執行Lua代碼的時候,Lua代碼將被當成一個命令去執行,而且直到eval命令執行完成,Redis纔會執行其餘命令。