如何實現一個 Redis 分佈式鎖?

前言

在咱們平常開發中,不免會遇到要加鎖的情景。例如扣除產品庫存,首先要從數據庫中取出庫存,進行庫存判斷,再減去庫存。這一波操做明顯不符合原子性,若是代碼塊不加鎖,很容易由於併發致使超賣問題。我們的系統若是是單體架構,那咱們使用本地鎖就能夠解決問題。若是是分佈式架構,就須要使用分佈式鎖。node

方案

使用 SETNX 和 EXPIRE 命令

SETNX key value
EXPIRE key seconds
DEL key
if (setnx("item_1_lock", 1)) {
    expire("item_1_lock", 30);
    try {
        ... 邏輯
    } catch {
        ...
    } finally {
        del("item_1_lock");
    }
}

這種方法看起來能夠解決問題,可是有必定的風險,由於 SETNX 和 EXPIRE 這波操做是非原子性的,若是 SETNX 成功以後,出現錯誤,致使 EXPIRE 沒有執行,致使鎖沒有設置超時時間造成死鎖。python

針對這種狀況,咱們可使用 lua 腳原本保持操做原子性,保證 SETNX 和 EXPIRE 兩個操做要麼都成功,要麼都不成功。linux

if (redis.call('setnx', KEYS[1], ARGV[1]) < 1)
then return 0;
end;
redis.call('expire', KEYS[1], tonumber(ARGV[2]));
return 1;

經過這樣的方法,咱們初步解決了競爭鎖的原子性問題,雖然其餘功能還未實現,可是應該不會形成死鎖。redis

Redis 2.6.12 以上可靈活使用 SET 命令

SET key value NX EX 30
DEL key
if (set("item_1_lock", 1, "NX", "EX", 30)) {
    try {
        ... 邏輯
    } catch {
        ...
    } finally {
        del("item_1_lock");
    }
}

改進後的方法不須要藉助 lua 腳本就解決了 SETNX 和 EXPIRE 的原子性問題。如今咱們再仔細琢磨琢磨,若是 A 拿到了鎖順利進入代碼塊執行邏輯,可是因爲各類緣由致使超時自動釋放鎖。數據庫

在這以後 B 成功拿到了鎖進入代碼塊執行邏輯,但此時若是 A 執行邏輯完畢再來釋放鎖,就會把 B 剛得到的鎖釋放了。就比如用本身家的鑰匙開了別家的門,這是不可接受的。架構

爲了解決這個問題咱們能夠嘗試在 SET 的時候設置一個鎖標識,而後在 DEL 的時候驗證當前鎖是否爲本身的鎖。併發

String value = UUID.randomUUID().toString().replaceAll("-", "");
if (set("item_1_lock", value, "NX", "EX", 30)) {
    try {
        ... 邏輯
    } catch {
        ...
    } finally {
        ... lua 腳本保證原子性
    }
}
if (redis.call('get', KEYS[1]) == ARGV[1])
then return redis.call('del', KEYS[1])
else return 0
end

到這裏,咱們終於解決了競爭鎖的原子性問題和誤刪鎖問題。可是鎖通常還須要支持可重入、循環等待和超時自動續約等功能點。下面咱們學習使用一個很是好用的包來解決這些問題。dom

入門 Redisson

Redission 的鎖,實現了可重入和超時自動續約功能,它都幫咱們封裝好了,咱們只要按照本身的需求調用它的 API 就能夠輕鬆實現上面所提到的幾個功能點。詳細功能能夠查看 Redisson 文檔分佈式

在項目中安裝 Redisson

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.13.2</version>
</dependency>
implementation 'org.redisson:redisson:3.13.2'

用 Maven 或者 Gradle 構建,目前最新版本爲 3.13.2,也能夠在這裏 Redisson 找到你須要的版本。學習

簡單嘗試

RedissonClient redissonClient = Redisson.create();
RLock lock = redissonClient.getLock("lock");
boolean res = lock.lock();
if (res) {
   try {
     ... 邏輯
   } finally {
       lock.unlock();
   }
}

Redisson 將底層邏輯所有作了一個封裝 📦,咱們無需關心具體實現,幾行代碼就能使用一把完美的鎖。下面咱們簡單折騰折騰源碼 。

加鎖

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    Long ttl = tryAcquire(leaseTime, unit, threadId);
    if (ttl == null) {
        return;
    }
    RFuture<RedissonLockEntry> future = subscribe(threadId);
    if (interruptibly) {
        commandExecutor.syncSubscriptionInterrupted(future);
    } else {
        commandExecutor.syncSubscription(future);
    }
    try {
        while (true) {
            ttl = tryAcquire(leaseTime, unit, threadId);
            if (ttl == null) {
                break;
            }
            if (ttl >= 0) {
                try {
                    future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    if (interruptibly) {
                        throw e;
                    }
                    future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                }
            } else {
                if (interruptibly) {
                    future.getNow().getLatch().acquire();
                } else {
                    future.getNow().getLatch().acquireUninterruptibly();
                }
            }
        }
    } finally {
        unsubscribe(future, threadId);
    }
}

獲取鎖

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
    if (leaseTime != -1) {
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if (e != null) {
            return;
        }
        if (ttlRemaining == null) {
            scheduleExpirationRenewal(threadId);
        }
    });
    return ttlRemainingFuture;
}
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);
    return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                    "end; " +
                    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                    "end; " +
                    "return redis.call('pttl', KEYS[1]);",
            Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

刪除鎖

public RFuture<Void> unlockAsync(long threadId) {
    RPromise<Void> result = new RedissonPromise<Void>();
    RFuture<Boolean> future = unlockInnerAsync(threadId);
    future.onComplete((opStatus, e) -> {
        cancelExpirationRenewal(threadId);
        if (e != null) {
            result.tryFailure(e);
            return;
        }
        if (opStatus == null) {
            IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                    + id + " thread-id: " + threadId);
            result.tryFailure(cause);
            return;
        }
        result.trySuccess(null);
    });
    return result;
}
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                    "end; " +
                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                    "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                    "else " +
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; " +
                    "end; " +
                    "return nil;",
            Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

總結

使用 Redis 作分佈式鎖來解決併發問題仍存在一些困難,也有不少須要注意的點,咱們應該正確評估系統的體量,不能爲了使用某項技術而用。要徹底解決併發問題,仍須要在數據庫層面作功夫。

福利:豆花同窗爲你們精心整理了一份關於linux和python的學習資料大合集!有須要的小夥伴們,關注豆花我的公衆號:python頭條!回覆關鍵詞「資料合集」便可免費領取!

相關文章
相關標籤/搜索