![](http://static.javashuo.com/static/loading.gif)
1 前言
在程序中,咱們想要保證一個變量的可見性及原子性,咱們能夠用volatile(對任意單個volatile變量的讀/寫具備原子性,但相似於volatile++這種複合操做不具備原子性)、synchronized、樂觀鎖、悲觀鎖等等來控制。單體應用內能夠這樣作,而如今隨着時代的發展,大多項目都已經告別的單機時代,擁抱微服務時代,這樣的狀況下不少服務須要作集羣,一個應用須要部署到幾臺機器上而後作負載均衡,在併發狀況下使用上面說的機制來保證變量的可見性及原子性就不可行了(以下圖),從而產生了不少分佈式機制(如分佈式事務、分佈式鎖等),主要的做用仍是用來保證數據的一致性:node
![](http://static.javashuo.com/static/loading.gif)
如上圖,假設變量a是剩餘庫存,值爲1,這時候三個用戶進來下單,正好三個請求被分到了三個不一樣的服務節點上面,三個節點 檢查剩餘庫存,發現還有1個,而後都去進行扣減,這樣就致使庫存爲負數,有兩個用戶沒有貨發,就是俗稱的超賣。這種狀況是不被接受的,用戶會和業務撕逼、業務會和你領導吵架,而後你就收拾書包回家了!程序員
在這種場景中,咱們就須要一種方法解決這個問題,這就是分佈式鎖要解決的問題。web
2 分佈式鎖的實現與特性
2.1 分佈式鎖的實現
本地鎖能夠經過語言自己支持,要實現分佈式鎖,就必須依賴中間件,數據庫、redis、zookeeper等,主要有如下幾種實現方式:
1)Memcached:利用 Memcached 的 add 命令。此命令是原子性操做,只有在 key 不存在的狀況下,才能 add 成功,也就意味着線程獲得了鎖。
2)Redis:和 Memcached 的方式相似,利用 Redis 的 setnx 命令。此命令一樣是原子性操做,只有在 key 不存在的狀況下,才能 set 成功。
3)Zookeeper:利用 Zookeeper 的順序臨時節點,來實現分佈式鎖和等待隊列。Zookeeper 設計的初衷,就是爲了實現分佈式鎖服務的。
4)Chubby:Google 公司實現的粗粒度分佈式鎖服務,底層利用了 Paxos 一致性算法。
redis
2.2 分佈式鎖的特性
1)在分佈式系統環境下,一個方法在同一時間只能被一個機器的一個線程執行。
2)高可用的獲取鎖與釋放鎖。
3)高性能的獲取鎖與釋放鎖。
4)具有可重入特性。
5)具有鎖失效機制,防止死鎖。
6)具有非阻塞鎖特性,即沒有獲取到鎖將直接返回獲取鎖失敗。
算法
3 Redisson實現Redis分佈式鎖以及實現原理
3.1 添加依賴
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.12.4</version>
</dependency>
3.2 測試查看
庫存數量100,調用一次減1,小於等於0的時候返回false,表示下單失敗。spring
@Component
public class RedissonLock {
private static Integer inventory = 100;
/**
* 測試
*
* @return true:下單成功 false:下單失敗
*/
public Boolean redisLockTest(){
// 獲取鎖實例
RLock inventoryLock = RedissonService.getRLock("inventory-number");
try {
// 加鎖
inventoryLock.lock();
if (inventory <= 0){
return false;
}
inventory--;
System.out.println("線程名稱:" + Thread.currentThread().getName() + "剩餘數量:" + RedissonLock.inventory);
}catch (Exception e){
e.printStackTrace();
}finally {
// 釋放鎖
inventoryLock.unlock();
}
return true;
}
}
用jmeter進行壓測:數據庫
線程組100執行20秒:微信
響應斷言true爲正確,false爲失敗:數據結構
結果:併發
3.3 獲取鎖的實例
RLock inventoryLock = RedissonService.getRLock("inventory-number");這段就是獲取鎖的實例,inventory-number爲指定鎖名稱,進去getLock(String name)方法以後就能看到獲取鎖的實例就是在RedissonLock構造方法中,初始化一些屬性。
public RLock getLock(String name) {
return new RedissonLock(this.connectionManager.getCommandExecutor(), name);
}
看下RedissonLock的構造函數:
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
//命令執行器
this.commandExecutor = commandExecutor;
//UUID字符串(MasterSlaveConnectionManager類的構造函數 傳入UUID)
this.id = commandExecutor.getConnectionManager().getId();
//內部鎖過時時間(防止死鎖,默認時間爲30s)
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
//uuid+傳進來的鎖名稱
this.entryName = this.id + ":" + name;
//redis消息體
this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}
內部鎖過時時間(默認30s,若是超過這個時間業務代碼尚未執行完畢,那麼過時時間會自動續約):
3.4 加鎖
inventoryLock.lock();這段代碼表示加鎖,一步一步進去源碼裏面看看,進來首先看到以下lock()方法:
public void lock() {
try {
this.lock(-1L, (TimeUnit)null, false);
} catch (InterruptedException var2) {
throw new IllegalStateException();
}
}
能夠看到這裏設置了一些默認值,而後繼續調用了帶參lock()方法,也是在這裏,完成了加鎖的邏輯,源碼以下:
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
// 線程ID
long threadId = Thread.currentThread().getId();
// 嘗試獲取鎖
Long ttl = this.tryAcquire(leaseTime, unit, threadId);
// 若是過時時間等於null,則表示獲取到鎖,直接返回,不等於null繼續往下執行
if (ttl != null) {
// 若是獲取鎖失敗,則訂閱到對應這個鎖的channel
RFuture<RedissonLockEntry> future = this.subscribe(threadId);
if (interruptibly) {
// 可中斷訂閱
this.commandExecutor.syncSubscriptionInterrupted(future);
} else {
// 不可中斷訂閱
this.commandExecutor.syncSubscription(future);
}
try {
// 不斷循環
while(true) {
// 再次嘗試獲取鎖
ttl = this.tryAcquire(leaseTime, unit, threadId);
// ttl(過時時間)爲空,說明成功獲取鎖,返回
if (ttl == null) {
return;
}
// ttl(過時時間)大於0 則等待ttl時間後繼續嘗試獲取
if (ttl >= 0L) {
try {
((RedissonLockEntry)future.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException var13) {
if (interruptibly) {
throw var13;
}
((RedissonLockEntry)future.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else if (interruptibly) {
((RedissonLockEntry)future.getNow()).getLatch().acquire();
} else {
((RedissonLockEntry)future.getNow()).getLatch().acquireUninterruptibly();
}
}
} finally {
// 取消對channel的訂閱
this.unsubscribe(future, threadId);
}
}
}
再來看下獲取鎖的tryAcquire方法:
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return (Long)this.get(this.tryAcquireAsync(leaseTime, unit, threadId));
}
進去看下tryAcquireAsync方法:
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
// 有設置過時時間
if (leaseTime != -1L) {
return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
// 沒有設置過時時間
RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e == null) {
if (ttlRemaining == null) {
this.scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
}
-
tryLockInnerAsync方法是真正執行獲取鎖的邏輯,它是一段LUA腳本代碼。在這裏,它使用的是hash數據結構。
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
this.internalLockLeaseTime = unit.toMillis(leaseTime);
return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command,
// 若是鎖不存在,則經過hset設置它的值,並設置過時時間
"if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end;
// 若是鎖已存在,而且鎖的是當前線程,則經過hincrby給數值遞增1(這裏顯示了redis分佈式鎖的可重入性)
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end;
// 若是鎖已存在,但並不是本線程,則返回過時時間ttl
return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
}
KEYS[1]表明的是你加鎖的那個key,好比說:RLock inventoryLock = RedissonService.getRLock("inventory-number");這裏你本身設置了加鎖的那個鎖key就是"inventory-number"。
ARGV[1]表明的就是鎖key的默認生存時間,上面也截圖看了,默認時間爲30秒。
ARGV[2]表明的是加鎖的客戶端的ID,相似於後面這樣: 8743c9c0-0795-4907-87fd-6c719a6b4586:1
上面這段LUA代碼看起來也不是很複雜,其中有三個判斷:
經過exists判斷鎖存不存在,若是鎖不存在,則設置值和過時時間,加鎖成功。
經過hexists判斷,若是鎖已存在,而且鎖的是當前線程,則證實是重入鎖,加鎖成功,ARGV[2]的value+1,原來是1,如今變爲2,固然,釋放的時候也要釋放兩次。
若是鎖已存在,但鎖的不是當前線程,則證實有其餘線程持有鎖。返回當前鎖的過時時間,加鎖失敗
![](http://static.javashuo.com/static/loading.gif)
3.5 解鎖
inventoryLock.unlock();這段代碼表示解鎖,跟剛纔同樣,一步一步進去源碼裏面看看,進來首先看到以下unlock()方法:
public void unlock() {
try {
this.get(this.unlockAsync(Thread.currentThread().getId()));
} catch (RedisException var2) {
if (var2.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException)var2.getCause();
} else {
throw var2;
}
}
}
進去unlockAsync()查看,這是解鎖的方法:
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise();
// 釋放鎖的方法
RFuture<Boolean> future = this.unlockInnerAsync(threadId);
// 添加監聽器 解鎖opStatus:返回值
future.onComplete((opStatus, e) -> {
this.cancelExpirationRenewal(threadId);
if (e != null) {
result.tryFailure(e);
//若是返回null,則證實解鎖的線程和當前鎖不是同一個線程,拋出異常
} else if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId);
result.tryFailure(cause);
} else {
// 解鎖成功
result.trySuccess((Object)null);
}
});
return result;
}
再進去看下釋放鎖的方法:unlockInnerAsync():
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 若是釋放鎖的線程和已存在鎖的線程不是同一個線程,返回null
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end;
// 若是是同一個線程,就經過hincrby減1的方式,釋放一次鎖
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
// 若剩餘次數大於0 ,則刷新過時時間
if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0;
// 其餘就證實鎖已經釋放,刪除key併發布鎖釋放的消息
else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end;
return nil;",
Arrays.asList(this.getName(), this.getChannelName()), new Object[]{LockPubSub.UNLOCK_MESSAGE, this.internalLockLeaseTime, this.getLockName(threadId)});
}
上述代碼是釋放鎖的邏輯。一樣的,它也是有三個判斷:
若是解鎖的線程和當前鎖的線程不是同一個,解鎖失敗,拋出異常。
若是解鎖的線程和當前鎖的線程是同一個,就經過hincrby減1的方式,釋放一次鎖。若剩餘次數還大於0,則證實是重入鎖,再次刷新過時時間。
鎖已不存在,經過publish發佈鎖釋放的消息,解鎖成功
![](http://static.javashuo.com/static/loading.gif)
到這裏就結束了,眼過千百不如手過一遍,本身試試就明白了,各位老闆看到這裏能不能點個贊,鄙人想看看恐怖如斯的二級世界,謝謝各位!
本文分享自微信公衆號 - 一個快樂又痛苦的程序員(AsuraTechnology)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。