在咱們平常開發中,不免會遇到要加鎖的情景。例如扣除產品庫存,首先要從數據庫中取出庫存,進行庫存判斷,再減去庫存。這一波操做明顯不符合原子性,若是代碼塊不加鎖,很容易由於併發致使超賣問題。我們的系統若是是單體架構,那咱們使用本地鎖就能夠解決問題。若是是分佈式架構,就須要使用分佈式鎖。java
if (setnx("item_1_lock", 1)) { expire("item_1_lock", 30); try { ... 邏輯 } catch { ... } finally { del("item_1_lock"); } }
這種方法看起來能夠解決問題,可是有必定的風險,由於 SETNX
和 EXPIRE
這波操做是非原子性的,若是 SETNX
成功以後,出現錯誤,致使 EXPIRE
沒有執行,致使鎖沒有設置超時時間造成死鎖。node
針對這種狀況,咱們可使用 lua 腳原本保持操做原子性,保證 SETNX
和 EXPIRE
兩個操做要麼都成功,要麼都不成功。git
if (redis.call('setnx', KEYS[1], ARGV[1]) < 1) then return 0; end; redis.call('expire', KEYS[1], tonumber(ARGV[2])); return 1;
經過這樣的方法,咱們初步解決了競爭鎖的原子性問題,雖然其餘功能還未實現,可是應該不會形成死鎖 🤪🤪🤪。github
if (set("item_1_lock", 1, "NX", "EX", 30)) { try { ... 邏輯 } catch { ... } finally { del("item_1_lock"); } }
改進後的方法不須要藉助 lua 腳本就解決了 SETNX
和 EXPIRE
的原子性問題。如今咱們再仔細琢磨琢磨,若是 A 拿到了鎖順利進入代碼塊執行邏輯,可是因爲各類緣由致使超時自動釋放鎖。在這以後 B 成功拿到了鎖進入代碼塊執行邏輯,但此時若是 A 執行邏輯完畢再來釋放鎖,就會把 B 剛得到的鎖釋放了。就比如用本身家的鑰匙開了別家的門,這是不可接受的。redis
爲了解決這個問題咱們能夠嘗試在 SET
的時候設置一個鎖標識,而後在 DEL
的時候驗證當前鎖是否爲本身的鎖。數據庫
String value = UUID.randomUUID().toString().replaceAll("-", ""); if (set("item_1_lock", value, "NX", "EX", 30)) { try { ... 邏輯 } catch { ... } finally { ... lua 腳本保證原子性 } }
if (redis.call('get', KEYS[1]) == ARGV[1]) then return redis.call('del', KEYS[1]) else return 0 end
到這裏,咱們終於解決了競爭鎖的原子性問題和誤刪鎖問題。可是鎖通常還須要支持可重入、循環等待和超時自動續約等功能點。下面咱們學習使用一個很是好用的包來解決這些問題 👏👏👏。架構
Redission 的鎖,實現了可重入和超時自動續約功能,它都幫咱們封裝好了,咱們只要按照本身的需求調用它的 API 就能夠輕鬆實現上面所提到的幾個功能點。詳細功能能夠查看 Redisson 文檔併發
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.13.2</version> </dependency>
implementation 'org.redisson:redisson:3.13.2'
用 Maven 或者 Gradle 構建,目前最新版本爲 3.13.2
,也能夠在這裏 Redisson 找到你須要的版本。dom
RedissonClient redissonClient = Redisson.create(); RLock lock = redissonClient.getLock("lock"); boolean res = lock.lock(); if (res) { try { ... 邏輯 } finally { lock.unlock(); } }
Redisson 將底層邏輯所有作了一個封裝 📦,咱們無需關心具體實現,幾行代碼就能使用一把完美的鎖。下面咱們簡單折騰折騰源碼 🤔🤔🤔。maven
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { // 獲取當前線程 id long threadId = Thread.currentThread().getId(); // 嘗試獲取鎖 Long ttl = tryAcquire(leaseTime, unit, threadId); // 獲取成功直接返回 if (ttl == null) { return; } // 獲取失敗,訂閱鎖對應的頻道 RFuture<RedissonLockEntry> future = subscribe(threadId); if (interruptibly) { commandExecutor.syncSubscriptionInterrupted(future); } else { commandExecutor.syncSubscription(future); } try { while (true) { // 再次嘗試獲取鎖 ttl = tryAcquire(leaseTime, unit, threadId); // 獲取成功直接返回 if (ttl == null) { break; } // 等待 ttl 時間後繼續獲取 if (ttl >= 0) { try { future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { if (interruptibly) { throw e; } future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else { if (interruptibly) { future.getNow().getLatch().acquire(); } else { future.getNow().getLatch().acquireUninterruptibly(); } } } } finally { // 取消頻道訂閱 unsubscribe(future, threadId); } }
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) { // 若是設置了鎖過時時間,則按普通方式獲取鎖 if (leaseTime != -1) { return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } // 若是沒有設置鎖過時時間,則開啓自動續約功能,先設置 30 秒過時時間 RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.onComplete((ttlRemaining, e) -> { // 有錯誤直接返回 if (e != null) { return; } // 獲取鎖 if (ttlRemaining == null) { // 開啓自動續約 scheduleExpirationRenewal(threadId); } }); return ttlRemainingFuture; }
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); return evalWriteAsync(getName(), LongCodec.INSTANCE, command, /** * 鎖不存在,使用 hincrby 建立新 hash 表以及給鎖計數自增 1,並設置過時時間 * 鎖存在而且屬於當前線程,給鎖計數自增 1,並設置過時時間 * 鎖存在可是不屬於當前線程,返回鎖過時時間 **/ "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
public RFuture<Void> unlockAsync(long threadId) { RPromise<Void> result = new RedissonPromise<Void>(); // 解鎖邏輯 RFuture<Boolean> future = unlockInnerAsync(threadId); future.onComplete((opStatus, e) -> { // 取消刷新過時時間的定時任務 cancelExpirationRenewal(threadId); if (e != null) { result.tryFailure(e); return; } // 解鎖線程和鎖不是同一個線程,拋錯 if (opStatus == null) { IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId); result.tryFailure(cause); return; } result.trySuccess(null); }); return result; }
protected RFuture<Boolean> unlockInnerAsync(long threadId) { return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, /** * 判斷鎖是否屬於當前線程,不屬於直接返回 * 鎖計數減去 1,若是鎖計數還大於 0,則設置過時時間,不然釋放鎖併發布鎖釋放消息 **/ "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return nil;", Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); }
使用 Redis 作分佈式鎖來解決併發問題仍存在一些困難,也有不少須要注意的點,咱們應該正確評估系統的體量,不能爲了使用某項技術而用。要徹底解決併發問題,仍須要在數據庫層面作功夫。🧐🧐🧐