單機環境下咱們能夠經過JAVA的Synchronized和Lock來實現進程內部的鎖,可是隨着分佈式應用和集羣環境的出現,系統資源的競爭從單進程多線程的競爭變成了多進程的競爭,這時候就須要分佈式鎖來保證。
實現分佈式鎖如今主流的方式大體有如下三種
1. 基於數據庫的索引和行鎖
2. 基於Redis的單線程原子操做:setNX
3. 基於Zookeeper的臨時有序節點
這篇文章咱們用Redis來實現,會基於現有的各類鎖實現來分析,最後分享Redission的鎖源碼分析來看下分佈式鎖的開源實現
複製代碼
1、 經過setNx和getSet來實現redis
這是如今網上大部分版本的實現方式,筆者以前項目裏面用到分佈式鎖也是經過這樣的方式實現數據庫
public boolean lock(Jedis jedis, String lockName, Integer expire) {
//返回是否設置成功
//setNx加鎖
long now = System.currentTimeMillis();
boolean result = jedis.setnx(lockName, String.valueOf(now + expire * 1000)) == 1;
if (!result) {
//防止死鎖的容錯
String timestamp = jedis.get(lockName);
if (timestamp != null && Long.parseLong(timestamp) < now) {
//不經過del方法來刪除鎖。而是經過同步的getSet
String oldValue = jedis.getSet(lockName, String.valueOf(now + expire));
if (oldValue != null && oldValue.equals(timestamp)) {
result = true;
jedis.expire(lockName, expire);
}
}
}
if (result) {
jedis.expire(lockName, expire);
}
return result;
}
複製代碼
代碼分析:緩存
經過setNx命令老保證操做的原子性,獲取到鎖,而且把過時時間設置到value裏面安全
經過expire方法設置過時時間,若是設置過時時間失敗的話,再經過value的時間戳來和當前時間戳比較,防止出現死鎖bash
經過getSet命令在發現鎖過時未被釋放的狀況下,避免刪除了在這個過程當中有可能被其他的線程獲取到了鎖服務器
存在問題多線程
2、 經過Redis高版本的原子命令併發
jedis的set命令能夠自帶複雜參數,經過這些參數能夠實現原子的分佈式鎖命令異步
jedis.set(lockName, "", "NX", "PX", expireTime);
複製代碼
代碼分析分佈式
redis的set命令能夠攜帶複雜參數,第一個是鎖的key,第二個是value,能夠存放獲取鎖的客戶端ID,經過這個校驗是否當前客戶端獲取到了鎖,第三個參數取值NX/XX,第四個參數 EX|PX,第五個就是時間
NX:若是不存在就設置這個key XX:若是存在就設置這個key
EX:單位爲秒,PX:單位爲毫秒
這個命令實質上就是把咱們以前的setNx和expire命令合併成一個原子操做命令,不須要咱們考慮set失敗或者expire失敗的狀況
1、 經過Redis的del命令
public boolean unlock(Jedis jedis, String lockName) {
jedis.del(lockName);
return true;
}
複製代碼
代碼分析
經過redis的del命令能夠直接刪除鎖,可能會出現誤刪其餘線程已經存在的鎖的狀況
2、 Redis的del檢查
public static void unlock2(Jedis jedis, String lockKey, String requestId) {
// 判斷加鎖與解鎖是否是同一個客戶端
if (requestId.equals(jedis.get(lockKey))) {
// 若在此時,這把鎖忽然不是這個客戶端的,則會誤解鎖
jedis.del(lockKey);
}
}
複製代碼
代碼分析
新增了requestId客戶端ID的判斷,但因爲不是原子操做,在多個進程下面的併發競爭狀況下,沒法保證安全
3、 Redis的LUA腳本
public static boolean unlock3(Jedis jedis, String lockKey, String requestId) {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(""));
if (1L == (long) result) {
return true;
}
return false;
}
複製代碼
代碼分析
經過Lua腳原本保證操做的原子性,其實就是把以前的先判斷再刪除合併成一個原子性的腳本命令,邏輯就是,先經過get判斷value是否是相等,若相等就刪除,不然就直接return
Redission是redis官網推薦的一個redis客戶端,除了基於redis的基礎的CURD命令之外,重要的是就是Redission提供了方便好用的分佈式鎖API
複製代碼
1、 基本用法
RedissonClient redissonClient = RedissonTool.getInstance();
RLock distribute_lock = redissonClient.getLock("distribute_lock");
try {
boolean result = distribute_lock.tryLock(3, 10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (distribute_lock.isLocked()) {
distribute_lock.unlock();
}
}
複製代碼
代碼流程
2、 具體實現
咱們經過tryLock來分析redission分佈式的實現,lock方法跟tryLock差很少,只不過沒有最長等待時間的設置,會自旋循環等待鎖的釋放,直到獲取鎖爲止
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
//獲取當前線程ID,用於實現可重入鎖
final long threadId = Thread.currentThread().getId();
//嘗試獲取鎖
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= (System.currentTimeMillis() - current);
if (time <= 0) {
//等待時間結束,返回獲取失敗
acquireFailed(threadId);
return false;
}
current = System.currentTimeMillis();
//訂閱鎖的隊列,等待鎖被其他線程釋放後通知
final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
@Override
public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
if (subscribeFuture.isSuccess()) {
unsubscribe(subscribeFuture, threadId);
}
}
});
}
acquireFailed(threadId);
return false;
}
try {
time -= (System.currentTimeMillis() - current);
if (time <= 0) {
acquireFailed(threadId);
return false;
}
while (true) {
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= (System.currentTimeMillis() - currentTime);
if (time <= 0) {
acquireFailed(threadId);
return false;
}
// waiting for message,等待訂閱的隊列消息
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= (System.currentTimeMillis() - currentTime);
if (time <= 0) {
acquireFailed(threadId);
return false;
}
}
} finally {
unsubscribe(subscribeFuture, threadId);
}
複製代碼
代碼分析
首先tryAcquire嘗試獲取鎖,若返回ttl爲null,說明獲取到鎖了
判斷等待時間是否過時,若是過時,直接返回獲取鎖失敗
經過Redis的Channel訂閱監聽隊列,subscribe內部經過信號量semaphore,再經過await方法阻塞,內部實際上是用CountDownLatch來實現阻塞,獲取subscribe異步執行的結果,來保證訂閱成功,再判斷是否到了等待時間
再次嘗試申請鎖和等待時間的判斷,循環阻塞在這裏等待鎖釋放的消息RedissonLockEntry也維護了一個semaphore的信號量
不管是否釋放鎖,最終都要取消訂閱這個隊列消息
redission內部的getEntryName是客戶端實例ID+鎖名稱來保證多個實例下的鎖可重入
tryAcquire獲取鎖
redisssion獲取鎖的核心代碼,內部實際上是異步調用,可是用get方法阻塞了
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
複製代碼
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
複製代碼
tryLockInnerAsync方法內部是基於Lua腳原本獲取鎖的
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
複製代碼
Redission避免死鎖的解決方案:
Redission爲了不鎖未被釋放,採用了一個特殊的解決方案,若未設置過時時間的話,redission默認的過時時間是30s,同時未避免鎖在業務未處理完成以前被提早釋放,Redisson在獲取到鎖且默認過時時間的時候,會在當前客戶端內部啓動一個定時任務,每隔internalLockLeaseTime/3的時間去刷新key的過時時間,這樣既避免了鎖提早釋放,同時若是客戶端宕機的話,這個鎖最多存活30s的時間就會自動釋放(刷新過時時間的定時任務進程也宕機)
// lock acquired,獲取到鎖的時候設置按期更新時間的任務
if (ttlRemaining) {
scheduleExpirationRenewal(threadId);
}
//expirationRenewalMap的併發安全MAP記錄設置過的緩存,避免併發狀況下重複設置任務,internalLockLeaseTime / 3的時間後從新設置過時時間
private void scheduleExpirationRenewal(final long threadId) {
if (expirationRenewalMap.containsKey(getEntryName())) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
expirationRenewalMap.remove(getEntryName());
if (!future.isSuccess()) {
log.error("Can't update lock " + getName() + " expiration", future.cause());
return;
}
if (future.getNow()) {
// reschedule itself
scheduleExpirationRenewal(threadId);
}
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
task.cancel();
}
}
複製代碼
unlock解鎖
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}
複製代碼
Redission的unlock解鎖也是基於Lua腳本實現的,內部邏輯是先判斷鎖是否存在,不存在說明已經被釋放了,發佈鎖釋放消息後返回,鎖存在再判斷當前線程是否鎖擁有者,不是的話,無權釋放返回,解鎖的話,會減去重入的次數,從新更新過時時間,若重入數撿完,刪除當前key,發佈鎖釋放消息
主要基於Redis來設計和實現分佈式鎖,經過經常使用的設計思路引伸到Redission的實現,不管是設計思路仍是代碼健壯性Redission的設計都是優秀的,值得學習,下一步會講解關於Zookeeper的分佈式鎖實現和相關開源源碼分析。