在多線程開發中咱們使用鎖來避免線程爭奪共享資源。在分佈式系統中,程序在多個節點上運行沒法使用單機鎖來避免資源競爭,所以咱們須要一個鎖服務來避免多個節點上的進程爭奪資源。html
Redis數據庫基於內存,具備高吞吐量、便於執行原子性操做等特色很是適合開發對一致性要求不高的鎖服務。java
本文介紹了簡單分佈式鎖、Redisson分佈式鎖的實現以及解決單點服務的RedLock分佈式鎖概念。node
Redis是一致性較低的數據庫,若對鎖服務的一致性要求較高建議使用zookeeper等中間件開發鎖服務。redis
Redis實現分佈式鎖的原理很是簡單, 節點在訪問共享資源前先查詢redis中是否有該資源對應的鎖記錄, 若不存在鎖記錄則寫入一條鎖記錄(即獲取鎖)隨後訪問共享資源. 若節點查詢到redis中已經存在了資源對應的鎖記錄, 則放棄操做共享資源.數據庫
下面給出一個很是簡單的分佈式鎖示例:安全
import redis.clients.jedis.Jedis; import java.util.Random; import java.util.UUID; public class MyRedisLock { private Jedis jedis; private String lockKey; private String value; private static final Integer DEFAULT_TIMEOUT = 30; private static final String SUFFIX = ":lock"; public MyRedisLock(Jedis jedis) { this.jedis = jedis; } public boolean acquire(String key, long time) throws InterruptedException { Long outdatedTime = System.currentTimeMillis() + time; lockKey = key + SUFFIX; while (true) { if (System.currentTimeMillis() >= outdatedTime) { return false; } value = UUID.randomUUID().toString(); // 1 return "OK".equals(jedis.set(lockKey, value, "NX", DEFAULT_TIMEOUT)); // 2 } } public boolean check() { return value != null && value.equals(jedis.get(lockKey)); // 3 } public boolean release() { String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; return 1L.equals(jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(value))); // 3 } }
加鎖後全部對共享資源的操做都應該先檢查當前線程是否仍持有鎖。網絡
在分佈式鎖的實現中有幾點須要注意:多線程
set key value EX seconds NX
命令進行加鎖,不要使用setnx和expire兩個命令加鎖。上文只是提供了簡單示例,還有一些重要功能沒有實現:dom
總結來看實現Redis分佈式鎖有幾點須要注意:異步
這裏咱們以基於Java的Redisson爲例討論一下成熟的Redis分佈式鎖的實現。
redisson實現了java.util.concurrent.locks.Lock
接口,能夠像使用普通鎖同樣使用redisson:
RLock lock = redisson.getLock("key"); lock.lock(); try { // do sth. } finally { lock.unlock(); }
分析一下RLock的實現類org.redisson.RedissonLock
:
@Override public void lock() { try { lockInterruptibly(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } @Override public void lockInterruptibly() throws InterruptedException { lockInterruptibly(-1, null); }
再看等待加鎖的方法lockInterruptibly
:
@Override public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException { long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return; } RFuture<RedissonLockEntry> future = subscribe(threadId); commandExecutor.syncSubscription(future); try { while (true) { ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { break; } // waiting for message if (ttl >= 0) { getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { getEntry(threadId).getLatch().acquire(); } } } finally { unsubscribe(future, threadId); } }
lockInterruptibly
方法會嘗試獲取鎖,若獲取失敗則會訂閱釋放鎖的消息。收到鎖被釋放的通知後再次嘗試獲取鎖,直到成功或者超時。
接下來分析tryAcquire
:
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) { return get(tryAcquireAsync(leaseTime, unit, threadId)); // 調用異步得到鎖的實現,使用get(future)實現同步 } private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) { // 設置了超時時間 if (leaseTime != -1) { // tryLockInnerAsync 加鎖成功返回 null, 加鎖失敗在 Future 中返回鎖記錄剩餘的有效時間 return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } // 未設置超時時間,嘗試得到無限期的鎖 RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.addListener(new FutureListener<Long>() { @Override public void operationComplete(Future<Long> future) throws Exception { if (!future.isSuccess()) { return; } Long ttlRemaining = future.getNow(); // lock acquired if (ttlRemaining == null) { // 避免對共享資源操做完成前鎖就被釋放掉,按期刷新鎖失效的時間 // 默認鎖失效時間的三分之一即進行刷新 scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; }
tryAcquireAsync
中主要邏輯是無限期鎖的實現,Redisson並不是設置了永久的鎖記錄,而是按期刷新鎖失效的時間。
這種方式避免了持有鎖的進程崩潰沒法釋放鎖致使死鎖。
真正實現獲取鎖邏輯的是tryLockInnerAsync
方法:
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWriteAsync( getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + // 資源未被加鎖 "redis.call('hset', KEYS[1], ARGV[2], 1); " + // 寫入鎖記錄, 鎖記錄是一個hash; key:共享資源名稱, field:鎖實例名稱(Redisson客戶端ID:線程ID), value: 1(value是一個計數器,記錄當前線程獲取該鎖的次數,實現可重入鎖) "redis.call('pexpire', KEYS[1], ARGV[1]); " + // 設置鎖記錄過時時間 "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + // 若當前線程已經持有該資源的鎖 "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + // 將鎖計數器加1, "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", // 資源已被其它線程加鎖,加鎖失敗。獲取鎖剩餘生存時間後返回 Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
上述操做使用eval命令執行lua腳本保證了操做的原子性。
解鎖過程相對簡單:
@Override public void unlock() { Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId())); if (opStatus == null) { throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + Thread.currentThread().getId()); } if (opStatus) { cancelExpirationRenewal(); } }
unlockInnerAsync
方法實現了具體的解鎖邏輯:
protected RFuture<Boolean> unlockInnerAsync(long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('exists', KEYS[1]) == 0) then " + // 資源未被加鎖,可能鎖已被超時釋放 "redis.call('publish', KEYS[2], ARGV[1]); " + // 發佈鎖被釋放的消息 "return 1; " + "end;" + "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + // 鎖的持有者不是本身,拋出異常 "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + // 本身持有鎖,由於鎖是可重入的將計數器減1 "if (counter > 0) then " + // 計數器大於0,鎖未被徹底釋放,刷新鎖過時時間 "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + // 鎖被徹底釋放,刪除鎖記錄,發佈鎖被釋放的消息 "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;", Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId)); }
基於單點的分佈式鎖沒法解決redis故障的問題. 爲了保證redis的可用性咱們一般採用主從備份的方法, 即 使用一個master實例和至少一個slave實例.
當有寫入請求時先寫入master而後寫入到全部slave, 當master實例故障時選擇一個slave實例升級爲master實例繼續提供服務.
其中存在的問題是, 寫入master和寫入slave存在時間差. 若線程A成功將鎖記錄寫入了master, 隨後在同步寫入slave以前, master故障轉移到slave.
由於slave(新master)中沒有鎖記錄, 所以線程B也能夠成功加鎖, 所以可能出現A和B同時持有鎖的錯誤.
爲了解決redis失效可能形成的問題, redis的做者antirez提出了RedLock實現方案:
客戶端獲取當前時間
客戶端嘗試獲取N個節點的鎖, 每一個節點使用相同的key和value. 請求超時時間要遠小於鎖超時時間, 避免在節點或者網絡故障時浪費時間.
客戶端計算在加鎖時消耗的時間, 只有客戶端成功得到超過一半節點的鎖且總時間小於鎖超時間時才能成功加鎖. 客戶端持有鎖的時間爲鎖超時時間減去加鎖消耗的時間.
若獲取鎖失敗則訪問全部節點, 發起釋放鎖的請求.
釋放鎖時須要向全部Redis節點發出釋放鎖的請求, 緣由在於可能某個Redis實例中成功寫入了鎖記錄, 可是沒有響應沒有到達客戶端.
爲了保證全部鎖記錄都被正確釋放, 因此須要向全部Redis實例發送釋放請求.
關於RedLock的安全性問題, Martin Kleppmann和做者antirez進行了一些討論:
關於這場討論的分析能夠參考: