Redis分佈式鎖

在多線程開發中咱們使用鎖來避免線程爭奪共享資源。在分佈式系統中,程序在多個節點上運行沒法使用單機鎖來避免資源競爭,所以咱們須要一個鎖服務來避免多個節點上的進程爭奪資源。html

Redis數據庫基於內存,具備高吞吐量、便於執行原子性操做等特色很是適合開發對一致性要求不高的鎖服務。java

本文介紹了簡單分佈式鎖、Redisson分佈式鎖的實現以及解決單點服務的RedLock分佈式鎖概念。node

Redis是一致性較低的數據庫,若對鎖服務的一致性要求較高建議使用zookeeper等中間件開發鎖服務。redis

基於單點Redis的分佈式鎖

Redis實現分佈式鎖的原理很是簡單, 節點在訪問共享資源前先查詢redis中是否有該資源對應的鎖記錄, 若不存在鎖記錄則寫入一條鎖記錄(即獲取鎖)隨後訪問共享資源. 若節點查詢到redis中已經存在了資源對應的鎖記錄, 則放棄操做共享資源.數據庫

下面給出一個很是簡單的分佈式鎖示例:安全

import redis.clients.jedis.Jedis;

import java.util.Random;
import java.util.UUID;


public class MyRedisLock {

    private Jedis jedis;

    private String lockKey;

    private String value;

    private static final Integer DEFAULT_TIMEOUT = 30;

    private static final String SUFFIX = ":lock";

    public MyRedisLock(Jedis jedis) {
        this.jedis = jedis;
    }
    
    public boolean acquire(String key, long time) throws InterruptedException {
        Long outdatedTime = System.currentTimeMillis() + time;
        lockKey = key + SUFFIX;
        while (true) {
            if (System.currentTimeMillis() >= outdatedTime) {
                return false;
            }
            value = UUID.randomUUID().toString(); // 1
            return "OK".equals(jedis.set(lockKey, value, "NX", DEFAULT_TIMEOUT)); // 2
        }
    }

    public boolean check() {
        return value != null && value.equals(jedis.get(lockKey)); // 3
    }

    public boolean release() {
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        return 1L.equals(jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(value))); // 3
    }
}

加鎖後全部對共享資源的操做都應該先檢查當前線程是否仍持有鎖。網絡

在分佈式鎖的實現中有幾點須要注意:多線程

  1. 加鎖過程:
    1. 鎖的過時時間應設置到redis中,保證在加鎖客戶端故障的狀況下鎖能夠被自動釋放
    2. 使用set key value EX seconds NX命令進行加鎖,不要使用setnx和expire兩個命令加鎖。
      若setnx執行成功而expire失敗(如執行setnx後客戶端崩潰),則可能形成死鎖。
    3. 鎖記錄的值不能使用固定值。 使用固定值可能致使嚴重錯誤: 線程A的鎖由於超時被釋放, 隨後線程B成功加鎖。 B寫入的鎖記錄與A的鎖記錄沒有區別, 所以A在檢查時會誤判爲本身仍持有鎖。
  2. 解鎖過程:
    1. 解鎖操做使用lua腳本執行get和del兩個操做,爲了保證兩個操做的原子性。若兩個操做不具備原子性則可能出現錯誤時序: 線程A執行get操做判斷本身仍持有鎖 -> 鎖超時釋放 -> 線程B成功加鎖 -> 線程A刪除鎖記錄(線程A認爲刪除了本身的鎖記錄,實際上刪除了線程B的鎖記錄)。

上文只是提供了簡單示例,還有一些重要功能沒有實現:dom

  1. 阻塞加鎖:能夠使用redis的發佈訂閱功能,獲取鎖失敗的線程訂閱鎖被釋放的消息再次嘗試加鎖
  2. 無限期鎖:應寫入有TTL的鎖記錄,設置定時任務在鎖失效前刷新鎖過時的時間。這種方式能夠避免持有鎖的線程崩潰致使的死鎖
  3. 可重入鎖(持有鎖的線程能夠再次加鎖):示例中持有鎖的線程沒法對同一個資源再次加鎖,即不可重入鎖。實現可重入鎖須要鎖記錄由(key:資源標記, value:持有者標記)的鍵值對結構變爲(key:資源標記, field:持有者標記, value:計數器)這樣的hash結構。持有鎖的線程每次重入鎖計數器加1,每次釋放鎖計數器減1,計數器爲0時刪除鎖記錄。

總結來看實現Redis分佈式鎖有幾點須要注意:異步

  1. 加解鎖操做應保證原子性,避免多個線程同時操做出現異常
  2. 應考慮進程崩潰、Redis崩潰、操做成功執行但未收到成功響應等異常情況,避免死鎖
  3. 解鎖操做必須避免 某個線程釋放了不屬於本身的鎖 的異常

Redisson

這裏咱們以基於Java的Redisson爲例討論一下成熟的Redis分佈式鎖的實現。

redisson實現了java.util.concurrent.locks.Lock接口,能夠像使用普通鎖同樣使用redisson:

RLock lock = redisson.getLock("key"); 
lock.lock(); 
try {
    // do sth.
} finally {
    lock.unlock(); 
}

分析一下RLock的實現類org.redisson.RedissonLock:

加鎖操做

@Override
public void lock() {
    try {
        lockInterruptibly();
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}

@Override
public void lockInterruptibly() throws InterruptedException {
    lockInterruptibly(-1, null);
}

再看等待加鎖的方法lockInterruptibly:

@Override
    public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return;
        }

        RFuture<RedissonLockEntry> future = subscribe(threadId);
        commandExecutor.syncSubscription(future);

        try {
            while (true) {
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    break;
                }

                // waiting for message
                if (ttl >= 0) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().acquire();
                }
            }
        } finally {
            unsubscribe(future, threadId);
        }
    }

lockInterruptibly 方法會嘗試獲取鎖,若獲取失敗則會訂閱釋放鎖的消息。收到鎖被釋放的通知後再次嘗試獲取鎖,直到成功或者超時。

接下來分析tryAcquire:

private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
    return get(tryAcquireAsync(leaseTime, unit, threadId)); // 調用異步得到鎖的實現,使用get(future)實現同步
}

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
    // 設置了超時時間
    if (leaseTime != -1) {
        // tryLockInnerAsync 加鎖成功返回 null, 加鎖失敗在 Future 中返回鎖記錄剩餘的有效時間
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    // 未設置超時時間,嘗試得到無限期的鎖
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.addListener(new FutureListener<Long>() {
        @Override
        public void operationComplete(Future<Long> future) throws Exception {
            if (!future.isSuccess()) {
                return;
            }
            Long ttlRemaining = future.getNow();
            // lock acquired
            if (ttlRemaining == null) {
                // 避免對共享資源操做完成前鎖就被釋放掉,按期刷新鎖失效的時間
                // 默認鎖失效時間的三分之一即進行刷新
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}

tryAcquireAsync中主要邏輯是無限期鎖的實現,Redisson並不是設置了永久的鎖記錄,而是按期刷新鎖失效的時間。

這種方式避免了持有鎖的進程崩潰沒法釋放鎖致使死鎖。

真正實現獲取鎖邏輯的是tryLockInnerAsync方法:

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);
    return commandExecutor.evalWriteAsync(
        getName(),
        LongCodec.INSTANCE, 
        command,
          "if (redis.call('exists', KEYS[1]) == 0) then " + // 資源未被加鎖
              "redis.call('hset', KEYS[1], ARGV[2], 1); " + // 寫入鎖記錄, 鎖記錄是一個hash; key:共享資源名稱, field:鎖實例名稱(Redisson客戶端ID:線程ID), value: 1(value是一個計數器,記錄當前線程獲取該鎖的次數,實現可重入鎖)
              "redis.call('pexpire', KEYS[1], ARGV[1]); " + // 設置鎖記錄過時時間
              "return nil; " +
          "end; " +
          "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + // 若當前線程已經持有該資源的鎖
              "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + // 將鎖計數器加1, 
              "redis.call('pexpire', KEYS[1], ARGV[1]); " +
              "return nil; " +
          "end; " +
          "return redis.call('pttl', KEYS[1]);", // 資源已被其它線程加鎖,加鎖失敗。獲取鎖剩餘生存時間後返回
        Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

上述操做使用eval命令執行lua腳本保證了操做的原子性。

unlock

解鎖過程相對簡單:

@Override
public void unlock() {
    Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));
    if (opStatus == null) {
        throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                + id + " thread-id: " + Thread.currentThread().getId());
    }
    if (opStatus) {
        cancelExpirationRenewal();
    }
}

unlockInnerAsync方法實現了具體的解鎖邏輯:

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('exists', KEYS[1]) == 0) then " + // 資源未被加鎖,可能鎖已被超時釋放
                "redis.call('publish', KEYS[2], ARGV[1]); " + // 發佈鎖被釋放的消息
                "return 1; " +
            "end;" +
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + // 鎖的持有者不是本身,拋出異常
                "return nil;" +
            "end; " +
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + // 本身持有鎖,由於鎖是可重入的將計數器減1
            "if (counter > 0) then " + // 計數器大於0,鎖未被徹底釋放,刷新鎖過時時間
                "redis.call('pexpire', KEYS[1], ARGV[2]); " + 
                "return 0; " +
            "else " +
                "redis.call('del', KEYS[1]); " + // 鎖被徹底釋放,刪除鎖記錄,發佈鎖被釋放的消息
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; "+
            "end; " +
            "return nil;",
            Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

}

RedLock

基於單點的分佈式鎖沒法解決redis故障的問題. 爲了保證redis的可用性咱們一般採用主從備份的方法, 即 使用一個master實例和至少一個slave實例.

當有寫入請求時先寫入master而後寫入到全部slave, 當master實例故障時選擇一個slave實例升級爲master實例繼續提供服務.

其中存在的問題是, 寫入master和寫入slave存在時間差. 若線程A成功將鎖記錄寫入了master, 隨後在同步寫入slave以前, master故障轉移到slave.

由於slave(新master)中沒有鎖記錄, 所以線程B也能夠成功加鎖, 所以可能出現A和B同時持有鎖的錯誤.

爲了解決redis失效可能形成的問題, redis的做者antirez提出了RedLock實現方案:

  1. 客戶端獲取當前時間

  2. 客戶端嘗試獲取N個節點的鎖, 每一個節點使用相同的key和value. 請求超時時間要遠小於鎖超時時間, 避免在節點或者網絡故障時浪費時間.

  3. 客戶端計算在加鎖時消耗的時間, 只有客戶端成功得到超過一半節點的鎖且總時間小於鎖超時間時才能成功加鎖. 客戶端持有鎖的時間爲鎖超時時間減去加鎖消耗的時間.

  4. 若獲取鎖失敗則訪問全部節點, 發起釋放鎖的請求.

釋放鎖時須要向全部Redis節點發出釋放鎖的請求, 緣由在於可能某個Redis實例中成功寫入了鎖記錄, 可是沒有響應沒有到達客戶端.

爲了保證全部鎖記錄都被正確釋放, 因此須要向全部Redis實例發送釋放請求.

關於安全性的討論

關於RedLock的安全性問題, Martin Kleppmann和做者antirez進行了一些討論:

關於這場討論的分析能夠參考:

相關文章
相關標籤/搜索