1 介紹
這篇博文講介紹如何一步步構建一個基於Redis的分佈式鎖。會從最原始的版本開始,而後根據問題進行調整,最後完成一個較爲合理的分佈式鎖。html
本篇文章會將分佈式鎖的實現分爲兩部分,一個是單機環境,另外一個是集羣環境下的Redis鎖實現。在介紹分佈式鎖的實現以前,先來了解下分佈式鎖的一些信息。java
2 分佈式鎖
2.1 什麼是分佈式鎖?
分佈式鎖是控制分佈式系統或不一樣系統之間共同訪問共享資源的一種鎖實現,若是不一樣的系統或同一個系統的不一樣主機之間共享了某個資源時,每每須要互斥來防止彼此干擾來保證一致性。git
2.2 分佈式鎖須要具有哪些條件
- 互斥性:在任意一個時刻,只有一個客戶端持有鎖。
- 無死鎖:即使持有鎖的客戶端崩潰或者其餘意外事件,鎖仍然能夠被獲取。
- 容錯:只要大部分Redis節點都活着,客戶端就能夠獲取和釋放鎖
2.4 分佈式鎖的實現有哪些?
- 數據庫
- Memcached(add命令)
- Redis(setnx命令)
- Zookeeper(臨時節點)
- 等等
3 單機Redis的分佈式鎖
3.1 準備工做
3.1.1 定義常量類
public class LockConstants { public static final String OK = "OK"; /** NX|XX, NX -- Only set the key if it does not already exist. XX -- Only set the key if it already exist. **/ public static final String NOT_EXIST = "NX"; public static final String EXIST = "XX"; /** expx EX|PX, expire time units: EX = seconds; PX = milliseconds **/ public static final String SECONDS = "EX"; public static final String MILLISECONDS = "PX"; private LockConstants() {} } 複製代碼
3.1.2 定義鎖的抽象類
抽象類RedisLock實現java.util.concurrent包下的Lock接口,而後對一些方法提供默認實現,子類只需實現lock方法和unlock方法便可。代碼以下github
public abstract class RedisLock implements Lock { protected Jedis jedis; protected String lockKey; public RedisLock(Jedis jedis,String lockKey) { this(jedis, lockKey); } public void sleepBySencond(int sencond){ try { Thread.sleep(sencond*1000); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void lockInterruptibly(){} @Override public Condition newCondition() { return null; } @Override public boolean tryLock() { return false; } @Override public boolean tryLock(long time, TimeUnit unit){ return false; } } 複製代碼
3.2 最基礎的版本1
先來一個最基礎的版本,代碼以下redis
public class LockCase1 extends RedisLock { public LockCase1(Jedis jedis, String name) { super(jedis, name); } @Override public void lock() { while(true){ String result = jedis.set(lockKey, "value", NOT_EXIST); if(OK.equals(result)){ System.out.println(Thread.currentThread().getId()+"加鎖成功!"); break; } } } @Override public void unlock() { jedis.del(lockKey); } } 複製代碼
LockCase1類提供了lock和unlock方法。
其中lock方法也就是在reids客戶端執行以下命令算法
SET lockKey value NX
複製代碼
而unlock方法就是調用DEL命令將鍵刪除。
好了,方法介紹完了。如今來想一想這其中會有什麼問題?
假設有兩個客戶端A和B,A獲取到分佈式的鎖。A執行了一會,忽然A所在的服務器斷電了(或者其餘什麼的),也就是客戶端A掛了。這時出現一個問題,這個鎖一直存在,且不會被釋放,其餘客戶端永遠獲取不到鎖。以下示意圖數據庫
能夠經過設置過時時間來解決這個問題編程
3.3 版本2-設置鎖的過時時間
public void lock() { while(true){ String result = jedis.set(lockKey, "value", NOT_EXIST,SECONDS,30); if(OK.equals(result)){ System.out.println(Thread.currentThread().getId()+"加鎖成功!"); break; } } } 複製代碼
相似的Redis命令以下bash
SET lockKey value NX EX 30
複製代碼
注:要保證設置過時時間和設置鎖具備原子性服務器
這時又出現一個問題,問題出現的步驟以下
- 客戶端A獲取鎖成功,過時時間30秒。
- 客戶端A在某個操做上阻塞了50秒。
- 30秒時間到了,鎖自動釋放了。
- 客戶端B獲取到了對應同一個資源的鎖。
- 客戶端A從阻塞中恢復過來,釋放掉了客戶端B持有的鎖。
示意圖以下
這時會有兩個問題
- 過時時間如何保證大於業務執行時間?
- 如何保證鎖不會被誤刪除?
先來解決如何保證鎖不會被誤刪除這個問題。
這個問題能夠經過設置value爲當前客戶端生成的一個隨機字符串,且保證在足夠長的一段時間內在全部客戶端的全部獲取鎖的請求中都是惟一的。
版本2的完整代碼:Github地址
3.4 版本3-設置鎖的value
抽象類RedisLock增長lockValue字段,lockValue字段的默認值爲UUID隨機值假設當前線程ID。
public abstract class RedisLock implements Lock {
//...
protected String lockValue;
public RedisLock(Jedis jedis,String lockKey) {
this(jedis, lockKey, UUID.randomUUID().toString()+Thread.currentThread().getId());
}
public RedisLock(Jedis jedis, String lockKey, String lockValue) {
this.jedis = jedis;
this.lockKey = lockKey;
this.lockValue = lockValue;
}
//...
}
複製代碼
加鎖代碼
public void lock() { while(true){ String result = jedis.set(lockKey, lockValue, NOT_EXIST,SECONDS,30); if(OK.equals(result)){ System.out.println(Thread.currentThread().getId()+"加鎖成功!"); break; } } } 複製代碼
解鎖代碼
public void unlock() { String lockValue = jedis.get(lockKey); if (lockValue.equals(lockValue)){ jedis.del(lockKey); } } 複製代碼
這時看看加鎖代碼,好像沒有什麼問題啊。
再來看看解鎖的代碼,這裏的解鎖操做包含三步操做:獲取值、判斷和刪除鎖。這時你有沒有想到在多線程環境下的i++
操做?
3.4.1 i++問題
i++
操做也可分爲三個步驟:讀i的值,進行i+1,設置i的值。
若是兩個線程同時對i進行i++操做,會出現以下狀況
- i設置值爲0
- 線程A讀到i的值爲0
- 線程B也讀到i的值爲0
- 線程A執行了+1操做,將結果值1寫入到內存
- 線程B執行了+1操做,將結果值1寫入到內存
- 此時i進行了兩次i++操做,可是結果卻爲1
在多線程環境下有什麼方式能夠避免這類狀況發生?
解決方式有不少種,例如用AtomicInteger、CAS、synchronized等等。
這些解決方式的目的都是要確保i++
操做的原子性。那麼回過頭來看看解鎖,同理咱們也是要確保解鎖的原子性。咱們能夠利用Redis的lua腳原本實現解鎖操做的原子性。
版本3的完整代碼:Github地址
3.5 版本4-具備原子性的釋放鎖
lua腳本內容以下
if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end 複製代碼
這段Lua腳本在執行的時候要把的lockValue做爲ARGV[1]的值傳進去,把lockKey做爲KEYS[1]的值傳進去。如今來看看解鎖的java代碼
public void unlock() { // 使用lua腳本進行原子刪除操做 String checkAndDelScript = "if redis.call('get', KEYS[1]) == ARGV[1] then " + "return redis.call('del', KEYS[1]) " + "else " + "return 0 " + "end"; jedis.eval(checkAndDelScript, 1, lockKey, lockValue); } 複製代碼
好了,解鎖操做也確保了原子性了,那麼是否是單機Redis環境的分佈式鎖到此就完成了?
別忘了版本2-設置鎖的過時時間還有一個,過時時間如何保證大於業務執行時間問題沒有解決。
版本4的完整代碼:Github地址
3.6 版本5-確保過時時間大於業務執行時間
抽象類RedisLock增長一個boolean類型的屬性isOpenExpirationRenewal,用來標識是否開啓定時刷新過時時間。
在增長一個scheduleExpirationRenewal方法用於開啓刷新過時時間的線程。
public abstract class RedisLock implements Lock {
//...
protected volatile boolean isOpenExpirationRenewal = true; /** * 開啓定時刷新 */ protected void scheduleExpirationRenewal(){ Thread renewalThread = new Thread(new ExpirationRenewal()); renewalThread.start(); } /** * 刷新key的過時時間 */ private class ExpirationRenewal implements Runnable{ @Override public void run() { while (isOpenExpirationRenewal){ System.out.println("執行延遲失效時間中..."); String checkAndExpireScript = "if redis.call('get', KEYS[1]) == ARGV[1] then " + "return redis.call('expire',KEYS[1],ARGV[2]) " + "else " + "return 0 end"; jedis.eval(checkAndExpireScript, 1, lockKey, lockValue, "30"); //休眠10秒 sleepBySencond(10); } } } } 複製代碼
加鎖代碼在獲取鎖成功後將isOpenExpirationRenewal置爲true,而且調用scheduleExpirationRenewal方法,開啓刷新過時時間的線程。
public void lock() { while (true) { String result = jedis.set(lockKey, lockValue, NOT_EXIST, SECONDS, 30); if (OK.equals(result)) { System.out.println("線程id:"+Thread.currentThread().getId() + "加鎖成功!時間:"+LocalTime.now()); //開啓定時刷新過時時間 isOpenExpirationRenewal = true; scheduleExpirationRenewal(); break; } System.out.println("線程id:"+Thread.currentThread().getId() + "獲取鎖失敗,休眠10秒!時間:"+LocalTime.now()); //休眠10秒 sleepBySencond(10); } } 複製代碼
解鎖代碼增長一行代碼,將isOpenExpirationRenewal屬性置爲false,中止刷新過時時間的線程輪詢。
public void unlock() { //... isOpenExpirationRenewal = false; } 複製代碼
版本5的完整代碼:Github地址
3.7 測試
測試代碼以下
public void testLockCase5() { //定義線程池 ThreadPoolExecutor pool = new ThreadPoolExecutor(0, 10, 1, TimeUnit.SECONDS, new SynchronousQueue<>()); //添加10個線程獲取鎖 for (int i = 0; i < 10; i++) { pool.submit(() -> { try { Jedis jedis = new Jedis("localhost"); LockCase5 lock = new LockCase5(jedis, lockName); lock.lock(); //模擬業務執行15秒 lock.sleepBySencond(15); lock.unlock(); } catch (Exception e){ e.printStackTrace(); } }); } //當線程池中的線程數爲0時,退出 while (pool.getPoolSize() != 0) {} } 複製代碼
測試結果
或許到這裏基於單機Redis環境的分佈式就介紹完了。可是使用java的同窗有沒有發現一個鎖的重要特性
那就是鎖的重入,那麼分佈式鎖的重入該如何實現呢?這裏就留一個坑了
4 集羣Redis的分佈式鎖
在Redis的分佈式環境中,Redis 的做者提供了RedLock 的算法來實現一個分佈式鎖。
4.1 加鎖
RedLock算法加鎖步驟以下
- 獲取當前Unix時間,以毫秒爲單位。
- 依次嘗試從N個實例,使用相同的key和隨機值獲取鎖。在步驟2,當向Redis設置鎖時,客戶端應該設置一個網絡鏈接和響應超時時間,這個超時時間應該小於鎖的失效時間。例如你的鎖自動失效時間爲10秒,則超時時間應該在5-50毫秒之間。這樣能夠避免服務器端Redis已經掛掉的狀況下,客戶端還在死死地等待響應結果。若是服務器端沒有在規定時間內響應,客戶端應該儘快嘗試另一個Redis實例。
- 客戶端使用當前時間減去開始獲取鎖時間(步驟1記錄的時間)就獲得獲取鎖使用的時間。當且僅當從大多數(這裏是3個節點)的Redis節點都取到鎖,而且使用的時間小於鎖失效時間時,鎖纔算獲取成功。
- 若是取到了鎖,key的真正有效時間等於有效時間減去獲取鎖所使用的時間(步驟3計算的結果)。
- 若是由於某些緣由,獲取鎖失敗(沒有在至少N/2+1個Redis實例取到鎖或者取鎖時間已經超過了有效時間),客戶端應該在全部的Redis實例上進行解鎖(即使某些Redis實例根本就沒有加鎖成功)。
4.2 解鎖
向全部的Redis實例發送釋放鎖命令便可,不用關心以前有沒有從Redis實例成功獲取到鎖.
關於RedLock算法,還有一個小插曲,就是Martin Kleppmann 和 RedLock 做者 antirez的對RedLock算法的互懟。 官網原話以下
Martin Kleppmann analyzed Redlock here. I disagree with the analysis and posted my reply to his analysis here.
更多關於RedLock算法這裏就不在說明,有興趣的能夠到官網閱讀相關文章。
5 總結
這篇文章講述了一個基於Redis的分佈式鎖的編寫過程及解決問題的思路,可是本篇文章實現的分佈式鎖並不適合用於生產環境。java環境有 Redisson 可用於生產環境,可是分佈式鎖仍是Zookeeper會比較好一些(能夠看Martin Kleppmann 和 RedLock的分析)。
Martin Kleppmann對RedLock的分析:martin.kleppmann.com/2016/02/08/…
RedLock 做者 antirez的迴應:antirez.com/news/101
整個項目的地址存放在Github上,有須要的能夠看看:Github地址
原創文章,轉載請註明: 轉載自併發編程網 – ifeve.com本文連接地址: 基於redis的分佈式鎖