在咱們平常工做中,除了Spring和Mybatis外,用到最多無外乎分佈式緩存框架——Redis。可是不少工做不少年的朋友對Redis還處於一個最基礎的使用和認識。因此我就像把本身對分佈式緩存的一些理解和應用整理一個系列,但願能夠幫助到你們加深對Redis的理解。本系列的文章思路先從Redis的應用開始。再解析Redis的內部實現原理。最後以常常會問到Redist相關的面試題爲結尾。html
爲了實現分佈式鎖,須要確保鎖同時知足如下四個條件:面試
在講解使用Redis實現分佈式鎖的正確姿式以前,咱們有必要來看下錯誤實現方式。redis
首先,爲了保證互斥性和不會發送死鎖2個條件,因此咱們在加鎖操做的時候,須要使用SETNX指令來保證互斥性——只有一個客戶端可以持有鎖。爲了保證不會發送死鎖,須要給鎖加一個過時時間,這樣就能夠保證即便持有鎖的客戶端期間崩潰了也不會一直不釋放鎖。緩存
爲了保證這2個條件,有些人錯誤的實現會用以下代碼來實現加鎖操做:多線程
/** * 實現加鎖的錯誤姿式 * @param jedis * @param lockKey * @param requestId * @param expireTime */ public static void wrongGetLock1(Jedis jedis, String lockKey, String requestId, int expireTime) { Long result = jedis.setnx(lockKey, requestId); if (result == 1) { // 若在這裏程序忽然崩潰,則沒法設置過時時間,將發生死鎖 jedis.expire(lockKey, expireTime); } }
可能一些初學者還沒看出以上實現加鎖操做的錯誤緣由。這樣咱們解釋下。setnx 和expire是兩條Redis指令,不具有原子性,若是程序在執行完setnx以後忽然崩潰,致使沒有設置鎖的過時時間,從而就致使死鎖了。由於這個客戶端持有的全部不會被其餘客戶端釋放,持有鎖的客戶端又崩潰了,也不會主動釋放。從而該鎖永遠不會釋放,致使其餘客戶端也得到不能鎖。從而其餘客戶端一直阻塞。因此針對該代碼正確姿式應該保證setnx和expire原子性。架構
實現加鎖操做的錯誤姿式2。具體實現以下代碼所示併發
/** * 實現加鎖的錯誤姿式2 * @param jedis * @param lockKey * @param expireTime * @return */ public static boolean wrongGetLock2(Jedis jedis, String lockKey, int expireTime) { long expires = System.currentTimeMillis() + expireTime; String expiresStr = String.valueOf(expires); // 若是當前鎖不存在,返回加鎖成功 if (jedis.setnx(lockKey, expiresStr) == 1) { return true; } // 若是鎖存在,獲取鎖的過時時間 String currentValueStr = jedis.get(lockKey); if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) { // 鎖已過時,獲取上一個鎖的過時時間,並設置如今鎖的過時時間 String oldValueStr = jedis.getSet(lockKey, expiresStr); if (oldValueStr != null && oldValueStr.equals(currentValueStr)) { // 考慮多線程併發的狀況,只有一個線程的設置值和當前值相同,它纔有權利加鎖 return true; } } // 其餘狀況,一概返回加鎖失敗 return false; }
這個加鎖操做咋一看沒有毛病對吧。那以上這段代碼的問題毛病出在哪裏呢?框架
1. 因爲客戶端本身生成過時時間,因此須要強制要求分佈式環境下全部客戶端的時間必須同步。dom
2. 當鎖過時的時候,若是多個客戶端同時執行jedis.getSet()方法,雖然最終只有一個客戶端加鎖,可是這個客戶端的鎖的過時時間可能被其餘客戶端覆蓋。不具有加鎖和解鎖必須是同一個客戶端的特性。解決上面這段代碼的方式就是爲每一個客戶端加鎖添加一個惟一標示,已確保加鎖和解鎖操做是來自同一個客戶端。分佈式
分佈式鎖的實現沒法就2個方法,一個加鎖,一個就是解鎖。下面咱們來看下解鎖的錯誤姿式。
錯誤姿式1.
/** * 解鎖錯誤姿式1 * @param jedis * @param lockKey */ public static void wrongReleaseLock1(Jedis jedis, String lockKey) { jedis.del(lockKey); }
上面實現是最簡單直接的解鎖方式,這種不先判斷擁有者而直接解鎖的方式,會致使任何客戶端均可以隨時解鎖。即便這把鎖不是它上鎖的。
錯誤姿式2:
/** * 解鎖錯誤姿式2 * @param jedis * @param lockKey * @param requestId */ public static void wrongReleaseLock2(Jedis jedis, String lockKey, String requestId) { // 判斷加鎖與解鎖是否是同一個客戶端 if (requestId.equals(jedis.get(lockKey))) { // 若在此時,這把鎖忽然不是這個客戶端的,則會誤解鎖 jedis.del(lockKey); }
既然錯誤姿式1中沒有判斷鎖的擁有者,那姿式2中判斷了擁有者,那錯誤緣由又在哪裏呢?答案又是原子性上面。由於判斷和刪除不是一個原子性操做。在併發的時候極可能發生解除了別的客戶端加的鎖。具體場景有:客戶端A加鎖,一段時間以後客戶端A進行解鎖操做時,在執行jedis.del()以前,鎖忽然過時了,此時客戶端B嘗試加鎖成功,而後客戶端A再執行del方法,則客戶端A將客戶端B的鎖給解除了。從而不也不知足加鎖和解鎖必須是同一個客戶端特性。解決思路就是須要保證GET和DEL操做在一個事務中進行,保證其原子性。
剛剛介紹完了錯誤的姿式後,從上面錯誤姿式中,咱們能夠知道,要使用Redis實現分佈式鎖。加鎖操做的正確姿式爲:
解鎖的正確姿式爲:
1. 須要拿加鎖成功的惟一標示要進行解鎖,從而保證加鎖和解鎖的是同一個客戶端
在此我向你們推薦一個架構學習交流圈:830478757 幫助突破瓶頸 提高思惟能力
2. 解鎖操做須要比較惟一標示是否相等,相等再執行刪除操做。這2個操做能夠採用Lua腳本方式使2個命令的原子性。
Redis分佈式鎖實現的正確姿式的實現代碼:
public interface DistributedLock { /** * 獲取鎖 * @author zhi.li * @return 鎖標識 */ String acquire(); /** * 釋放鎖 * @author zhi.li * @param indentifier * @return */ boolean release(String indentifier); } /** * @author zhi.li * @Description * @created 2019/1/1 20:32 */ @Slf4j public class RedisDistributedLock implements DistributedLock{ private static final String LOCK_SUCCESS = "OK"; private static final Long RELEASE_SUCCESS = 1L; private static final String SET_IF_NOT_EXIST = "NX"; private static final String SET_WITH_EXPIRE_TIME = "PX"; /** * redis 客戶端 */ private Jedis jedis; /** * 分佈式鎖的鍵值 */ private String lockKey; /** * 鎖的超時時間 10s */ int expireTime = 10 * 1000; /** * 鎖等待,防止線程飢餓 */ int acquireTimeout = 1 * 1000; /** * 獲取指定鍵值的鎖 * @param jedis jedis Redis客戶端 * @param lockKey 鎖的鍵值 */ public RedisDistributedLock(Jedis jedis, String lockKey) { this.jedis = jedis; this.lockKey = lockKey; } /** * 獲取指定鍵值的鎖,同時設置獲取鎖超時時間 * @param jedis jedis Redis客戶端 * @param lockKey 鎖的鍵值 * @param acquireTimeout 獲取鎖超時時間 */ public RedisDistributedLock(Jedis jedis,String lockKey, int acquireTimeout) { this.jedis = jedis; this.lockKey = lockKey; this.acquireTimeout = acquireTimeout; } /** * 獲取指定鍵值的鎖,同時設置獲取鎖超時時間和鎖過時時間 * @param jedis jedis Redis客戶端 * @param lockKey 鎖的鍵值 * @param acquireTimeout 獲取鎖超時時間 * @param expireTime 鎖失效時間 */ public RedisDistributedLock(Jedis jedis, String lockKey, int acquireTimeout, int expireTime) { this.jedis = jedis; this.lockKey = lockKey; this.acquireTimeout = acquireTimeout; this.expireTime = expireTime; } @Override public String acquire() { try { // 獲取鎖的超時時間,超過這個時間則放棄獲取鎖 long end = System.currentTimeMillis() + acquireTimeout; // 隨機生成一個value String requireToken = UUID.randomUUID().toString(); while (System.currentTimeMillis() < end) { String result = jedis.set(lockKey, requireToken, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); if (LOCK_SUCCESS.equals(result)) { return requireToken; } try { Thread.sleep(100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } catch (Exception e) { log.error("acquire lock due to error", e); } return null; } @Override public boolean release(String identify) { if(identify == null){ return false; } String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; Object result = new Object(); try { result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(identify)); if (RELEASE_SUCCESS.equals(result)) { log.info("release lock success, requestToken:{}", identify); return true; }}catch (Exception e){ log.error("release lock due to error",e); }finally { if(jedis != null){ jedis.close(); } } log.info("release lock failed, requestToken:{}, result:{}", identify, result); return false; } } 下面就以秒殺庫存數量爲場景,測試下上面實現的分佈式鎖的效果。具體測試代碼以下: public class RedisDistributedLockTest { static int n = 500; public static void secskill() { System.out.println(--n); } public static void main(String[] args) { Runnable runnable = () -> { RedisDistributedLock lock = null; String unLockIdentify = null; try { Jedis conn = new Jedis("127.0.0.1",6379); lock = new RedisDistributedLock(conn, "test1"); unLockIdentify = lock.acquire(); System.out.println(Thread.currentThread().getName() + "正在運行"); 在此我向你們推薦一個架構學習交流圈:830478757 幫助突破瓶頸 提高思惟能力 secskill(); } finally { if (lock != null) { lock.release(unLockIdentify); } } }; for (int i = 0; i < 10; i++) { Thread t = new Thread(runnable); t.start(); } } }
運行效果以下圖所示。從圖中能夠看出,同一個資源在同一個時刻只能被一個線程獲取,從而保證了庫存數量N的遞減是順序的。
這樣是否是已經完美使用Redis實現了分佈式鎖呢?答案是並無結束。上面的實現代碼只是針對單機的Redis沒問題。可是現實生產中大部分都是集羣的或者是主備的。但上面的實現姿式在集羣或者主備狀況下會有相應的問題。這裏先買一個關子,在後面一篇文章將詳細分析集羣或者主備環境下Redis分佈式鎖的實現方式。