在併發編程中最經常使用的手段就是鎖了,好比基於JVM底層的synchronized、基於AQS的ReentrantLock;但是這些鎖都只是侷限於單機,本篇給你們介紹常見的分佈式鎖。redis
基於redis實現的分佈式鎖咱們可使用setnx命令,這個命令的做用是若是指定的key不存在時則set一對k-v,這樣同一時刻就只能有一個請求能夠set這個key達到加鎖的目的,可是這個命令不能同時設置過時時間,這樣可能會致使死鎖。如圖一,請求A和請求B在T1時刻同時發起setnx命令,請求A成功了,而後在T2設置key的過時時間,若是在這以前請求A所在的服務忽然掛了,那這個key就一直存在,這個時候其餘請求就沒法加鎖。算法
使用這個命令能夠在設置一個key的同時設置key的過時時間,NX是當key不存在時進行操做,XX是當key存在時進行操做。業務執行完成以後,這個時候須要手動釋放這個鎖;那麼如何保證釋放鎖的安全性呢?首先要確保釋放的鎖是本身的,咱們能夠利用key對應的value來判斷當前這個key是否是本身設置的,這樣就能保證釋放的鎖是本身的;編程
private Boolean lock(String key, String value) {
return stringRedisTemplate.opsForValue().setIfAbsent(key, value, 10L, TimeUnit.SECONDS);
}
private Boolean unLock(String key, String value) {
String cacheValue = stringRedisTemplate.opsForValue().get(key);
if (!value.equals(cacheValue)) {
return false;
}
return stringRedisTemplate.delete(key);
}
複製代碼
那上面這段代碼就能保證釋放鎖的安全性嗎?這個方法存在的問題在於在判斷了key對應的value與本身的value相等以後,若是這個時候key不爭氣的恰好到期失效了,其餘線程獲取了這個鎖,那麼下面的delete key操做就將其餘線程的鎖釋放掉了。怎麼就那麼多幺蛾子…… 那麼如何保證釋放鎖的原子性呢?安全
Lua腳本我不過多的介紹,有興趣的同窗能夠去了解,直接上代碼bash
private Boolean luaUnLock(String key, String value) {
ScriptSource lua = new ResourceScriptSource(new ClassPathResource("redisUnLock.lua"));
DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>();
redisScript.setScriptSource(lua);
redisScript.setResultType(Boolean.class);
return stringRedisTemplate.execute(redisScript, Collections.singletonList(key), value);
}
複製代碼
redisUnLock.lua數據結構
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
複製代碼
這段腳本比較簡單,就是比較參數key對應的value是否與參數value相等,相等則刪除這個key,在redis中lua腳本可以保證原子性。那麼問題叒來了!這樣就保證了這個分佈式鎖的安全性嗎?如今這個分佈式鎖的問題在於存在業務時間過長致使鎖過時被其餘線程獲取的狀況,此時須要檢測續租鎖來避免這個問題。併發
那麼如何續租呢,主要思路就是用一個線程檢測當前這個業務是否執行完,鎖還有多久過時;若是鎖即將失效時業務尚未執行完那麼就給這個鎖從新設置過時時間。這裏咱們使用redisson的實現,畢竟本身實現的輪子沒那麼靠譜😅。異步
public class RedissonLockerImpl implements RedissonLocker {
@Resource
private RedissonClient redissonClient;
@Override
public void lock(String lockKey) {
RLock lock = redissonClient.getLock(lockKey);
lock.lock();
}
@Override
public void unlock(String lockKey) {
RLock lock = redissonClient.getLock(lockKey);
lock.unlock();
}
}
public void test() {
CountDownLatch count = new CountDownLatch(2);
String lockKey = "LOCK_KEY";
CustomizeThreadPool.threadPool.execute(() -> {
try {
count.await();
redissonLocker.lock(lockKey);
log.info("線程1獲取鎖");
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
redissonLocker.unlock(lockKey);
log.info("線程1釋放鎖");
}
});
count.countDown();
CustomizeThreadPool.threadPool.execute(() -> {
try {
count.await();
redissonLocker.lock(lockKey);
log.info("線程2獲取鎖");
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
redissonLocker.unlock(lockKey);
log.info("線程2釋放鎖");
}
});
count.countDown();
}
複製代碼
public void test2() {
String lockKey = "LOCK_KEY";
CustomizeThreadPool.threadPool.execute(() -> {
redissonLocker.lock(lockKey);
redissonLocker.lock(lockKey);
try {
TimeUnit.SECONDS.sleep(25);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
redissonLocker.unlock(lockKey);
}
});
}
複製代碼
怎麼樣,這個API是否是至關簡潔呢,可能有同窗看到以前的代碼會問你這裏加鎖的時候爲何沒有給鎖設置過時時間呢?你加鎖key對應的value呢?經過這兩個例子咱們能夠看到加鎖key對應的value是一個hash結構,第一個屬性對應的就是咱們所說的value,用來判斷是不是本身加的鎖;第二個屬性對應的實際上是加鎖的次數,這和Java中的ReentrantLock同樣是可重入鎖,因此第二個例子裏只作了一次unLock沒辦法釋放鎖。至於這個key對應的value和鎖過時時間在下面的源碼分析介紹。分佈式
package org.redisson.config;
import ..........
public class Config {
// 其餘源碼省略
private long lockWatchdogTimeout;
public Config() {
// 默認的鎖過時時間
this.lockWatchdogTimeout = 30000L;
}
public Config(Config oldConf) {
this.lockWatchdogTimeout = 30000L;
// 若是有讀取配置文件修改的鎖過時時間
this.setLockWatchdogTimeout(oldConf.getLockWatchdogTimeout());
}
}
// 不帶過時時間加鎖
public void lock() {
try {
this.lockInterruptibly();
} catch (InterruptedException var2) {
Thread.currentThread().interrupt();
}
}
public void lockInterruptibly() throws InterruptedException {
this.lockInterruptibly(-1L, (TimeUnit)null);
}
public void lock(long leaseTime, TimeUnit unit) {
try {
this.lockInterruptibly(leaseTime, unit);
} catch (InterruptedException var5) {
Thread.currentThread().interrupt();
}
}
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
// 獲取當前線程的ID
long threadId = Thread.currentThread().getId();
// 嘗試獲取鎖返回過時時間
Long ttl = this.tryAcquire(leaseTime, unit, threadId);
// 若是加鎖失敗
if (ttl != null) {
// 訂閱解鎖隊列
RFuture<RedissonLockEntry> future = this.subscribe(threadId);
this.commandExecutor.syncSubscription(future);
try {
while(true) {
// 嘗試加鎖
ttl = this.tryAcquire(leaseTime, unit, threadId);
// 加鎖成功則返回
if (ttl == null) {
return;
}
if (ttl >= 0L) {
// 加鎖失敗阻塞
this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
this.getEntry(threadId).getLatch().acquire();
}
}
} finally {
// 取消訂閱解鎖消息
this.unsubscribe(future, threadId);
}
}
}
//嘗試加鎖
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return (Long)this.get(this.tryAcquireAsync(leaseTime, unit, threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
// 設置了默認的過時時間
if (leaseTime != -1L) {
return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
// 若是調用的是沒有過時時間的lock,則默認時間爲lockWatchdogTimeout
RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
public void operationComplete(Future<Long> future) throws Exception {
if (future.isSuccess()) {
Long ttlRemaining = (Long)future.getNow();
// 加鎖成功以後開始一個調度任務
if (ttlRemaining == null) {
RedissonLock.this.scheduleExpirationRenewal(threadId);
}
}
}
});
return ttlRemainingFuture;
}
}
// 調用lua腳本異步加鎖,value由getLockName()生成,uuid+threadId
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
this.internalLockLeaseTime = unit.toMillis(leaseTime);
// 若是當前不存在key則加鎖,若是當前存在而且是本身的則加鎖次數加一
return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
}
// 鎖續租定時任務
private void scheduleExpirationRenewal(final long threadId) {
if (!expirationRenewalMap.containsKey(this.getEntryName())) {
// 添加一個回調任務,每1/3鎖過時時間執行一次
Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
// 異步重置鎖過時時間
RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
future.addListener(new FutureListener<Boolean>() {
public void operationComplete(Future<Boolean> future) throws Exception {
// 從加鎖集合移除 RedissonLock.expirationRenewalMap.remove(RedissonLock.this.getEntryName());
if (!future.isSuccess()) {
RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", future.cause());
} else {
// 成功重置鎖時間以後再次調用
if ((Boolean)future.getNow()) {
RedissonLock.this.scheduleExpirationRenewal(threadId);
}
}
}
});
}
}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
// 保證任務不會被重複建立,取消任務
if (expirationRenewalMap.putIfAbsent(this.getEntryName(), new RedissonLock.ExpirationEntry(threadId, task)) != null) {
task.cancel();
}
}
}
複製代碼
從源碼中咱們能夠了解redisson實現分佈式鎖的大體流程;當咱們沒有設置鎖過時時間的時候,redisson會使用lockWatchdogTimeout時間(默認爲30s)設置爲鎖過時時間;redisson設置的redis數據結構是一個hash,其中一個屬性是鎖的值,由uuid和當前線程id組成,另外一個屬性是加鎖次數用來實現可重入性;當沒有設置鎖過時時間的時候,redisson會每隔1/3鎖過時時間將鎖過時時間重置爲初始值(默認30s時,當過時時間還有20s就會從新設置過時時間爲30s)直到釋放鎖;若是設置了過時時間則不會有鎖續租的功能。加鎖的時候若是當前key不存在則直接設置key,若是存在而且是本身的則將加鎖次數加一。加鎖失敗則訂閱釋放鎖redis channel,線程進入阻塞。釋放鎖先判斷當前是不是本身的鎖,若是是則將當前加鎖次數減一,若是減一以後爲0則刪除key,若是有續租任務則取消續租任務,向redis channel中發一條消息喚醒被阻塞的線程獲取鎖。 ide