好久以前有講過併發編程中的鎖併發編程的鎖機制:synchronized和lock。在單進程的系統中,當存在多個線程能夠同時改變某個變量時,就須要對變量或代碼塊作同步,使其在修改這種變量時可以線性執行消除併發修改變量。而同步的本質是經過鎖來實現的。爲了實現多個線程在一個時刻同一個代碼塊只能有一個線程可執行,那麼須要在某個地方作個標記,這個標記必須每一個線程都能看到,當標記不存在時能夠設置該標記,其他後續線程發現已經有標記了則等待擁有標記的線程結束同步代碼塊取消標記後再去嘗試設置標記。html
分佈式環境下,數據一致性問題一直是一個比較重要的話題,而又不一樣於單進程的狀況。分佈式與單機狀況下最大的不一樣在於其不是多線程而是多進程。多線程因爲能夠共享堆內存,所以能夠簡單的採起內存做爲標記存儲位置。而進程之間甚至可能都不在同一臺物理機上,所以須要將標記存儲在一個全部進程都能看到的地方。java
常見的是秒殺場景,訂單服務部署了多個實例。如秒殺商品有4個,第一個用戶購買3個,第二個用戶購買2個,理想狀態下第一個用戶能購買成功,第二個用戶提示購買失敗,反之亦可。而實際可能出現的狀況是,兩個用戶都獲得庫存爲4,第一個用戶買到了3個,更新庫存以前,第二個用戶下了2個商品的訂單,更新庫存爲2,致使出錯。redis
在上面的場景中,商品的庫存是共享變量,面對高併發情形,須要保證對資源的訪問互斥。在單機環境中,Java中其實提供了不少併發處理相關的API,可是這些API在分佈式場景中就無能爲力了。也就是說單純的Java Api並不能提供分佈式鎖的能力。分佈式系統中,因爲分佈式系統的分佈性,即多線程和多進程而且分佈在不一樣機器中,synchronized和lock這兩種鎖將失去原有鎖的效果,須要咱們本身實現分佈式鎖。sql
常見的鎖方案以下:數據庫
下面咱們簡單介紹下這幾種鎖的實現。apache
基於數據庫的鎖實現也有兩種方式,一是基於數據庫表,另外一種是基於數據庫排他鎖。編程
基於數據庫表增刪是最簡單的方式,首先建立一張鎖的表主要包含下列字段:方法名,時間戳等字段。緩存
具體使用的方法,當須要鎖住某個方法時,往該表中插入一條相關的記錄。這邊須要注意,方法名是有惟一性約束的,若是有多個請求同時提交到數據庫的話,數據庫會保證只有一個操做能夠成功,那麼咱們就能夠認爲操做成功的那個線程得到了該方法的鎖,能夠執行方法體內容。bash
執行完畢,須要delete該記錄。服務器
固然,筆者這邊只是簡單介紹一下。對於上述方案能夠進行優化,如應用主從數據庫,數據之間雙向同步。一旦掛掉快速切換到備庫上;作一個定時任務,每隔必定時間把數據庫中的超時數據清理一遍;使用while循環,直到insert成功再返回成功,雖然並不推薦這樣作;還能夠記錄當前得到鎖的機器的主機信息和線程信息,那麼下次再獲取鎖的時候先查詢數據庫,若是當前機器的主機信息和線程信息在數據庫能夠查到的話,直接把鎖分配給他就能夠了,實現可重入鎖。
咱們還能夠經過數據庫的排他鎖來實現分佈式鎖。基於MySql的InnoDB引擎,可使用如下方法來實現加鎖操做:
public void lock(){
connection.setAutoCommit(false)
int count = 0;
while(count < 4){
try{
select * from lock where lock_name=xxx for update;
if(結果不爲空){
//表明獲取到鎖
return;
}
}catch(Exception e){
}
//爲空或者拋異常的話都表示沒有獲取到鎖
sleep(1000);
count++;
}
throw new LockException();
}
複製代碼
在查詢語句後面增長for update,數據庫會在查詢過程當中給數據庫表增長排他鎖。當某條記錄被加上排他鎖以後,其餘線程沒法再在該行記錄上增長排他鎖。其餘沒有獲取到鎖的就會阻塞在上述select語句上,可能的結果有2種,在超時以前獲取到了鎖,在超時以前仍未獲取到鎖。
得到排它鎖的線程便可得到分佈式鎖,當獲取到鎖以後,能夠執行方法的業務邏輯,執行完方法以後,釋放鎖connection.commit()
。
存在的問題主要是性能不高和sql超時的異常。
上面兩種方式都是依賴數據庫的一張表,一種是經過表中的記錄的存在狀況肯定當前是否有鎖存在,另一種是經過數據庫的排他鎖來實現分佈式鎖。
基於zookeeper臨時有序節點能夠實現的分佈式鎖。每一個客戶端對某個方法加鎖時,在zookeeper上的與該方法對應的指定節點的目錄下,生成一個惟一的瞬時有序節點。 判斷是否獲取鎖的方式很簡單,只須要判斷有序節點中序號最小的一個。 當釋放鎖的時候,只需將這個瞬時節點刪除便可。同時,其能夠避免服務宕機致使的鎖沒法釋放,而產生的死鎖問題。
提供的第三方庫有curator,具體使用讀者能夠自行去看一下。Curator提供的InterProcessMutex是分佈式鎖的實現。acquire方法獲取鎖,release方法釋放鎖。另外,鎖釋放、阻塞鎖、可重入鎖等問題均可以有有效解決。講下阻塞鎖的實現,客戶端能夠經過在ZK中建立順序節點,而且在節點上綁定監聽器,一旦節點有變化,Zookeeper會通知客戶端,客戶端能夠檢查本身建立的節點是否是當前全部節點中序號最小的,若是是就獲取到鎖,即可以執行業務邏輯。
最後,Zookeeper實現的分佈式鎖其實存在一個缺點,那就是性能上可能並無緩存服務那麼高。由於每次在建立鎖和釋放鎖的過程當中,都要動態建立、銷燬瞬時節點來實現鎖功能。ZK中建立和刪除節點只能經過Leader服務器來執行,而後將數據同不到全部的Follower機器上。併發問題,可能存在網絡抖動,客戶端和ZK集羣的session鏈接斷了,zk集羣覺得客戶端掛了,就會刪除臨時節點,這時候其餘客戶端就能夠獲取到分佈式鎖了。
相對於基於數據庫實現分佈式鎖的方案來講,基於緩存來實如今性能方面會表現的更好一點,存取速度快不少。並且不少緩存是能夠集羣部署的,能夠解決單點問題。基於緩存的鎖有好幾種,如memcached、redis、本文下面主要講解基於redis的分佈式實現。
使用redis的SETNX實現分佈式鎖,多個進程執行如下Redis命令:
SETNX lock.id <current Unix time + lock timeout + 1>
複製代碼
SETNX是將 key 的值設爲 value,當且僅當 key 不存在。若給定的 key 已經存在,則 SETNX 不作任何動做。
SETNX實現分佈式鎖,可能會存在死鎖的狀況。與單機模式下的鎖相比,分佈式環境下不只須要保證進程可見,還須要考慮進程與鎖之間的網絡問題。某個線程獲取了鎖以後,斷開了與Redis 的鏈接,鎖沒有及時釋放,競爭該鎖的其餘線程都會hung,產生死鎖的狀況。
在使用 SETNX 得到鎖時,咱們將鍵 lock.id 的值設置爲鎖的有效時間,線程得到鎖後,其餘線程還會不斷的檢測鎖是否已超時,若是超時,等待的線程也將有機會得到鎖。然而,鎖超時,咱們不能簡單地使用 DEL 命令刪除鍵 lock.id 以釋放鎖。
考慮如下狀況:
- A已經首先得到了鎖 lock.id,而後線A斷線。B,C都在等待競爭該鎖;
- B,C讀取lock.id的值,比較當前時間和鍵 lock.id 的值來判斷是否超時,發現超時;
- B執行 DEL lock.id命令,並執行 SETNX lock.id 命令,並返回1,B得到鎖;
- C因爲各剛剛檢測到鎖已超時,執行 DEL lock.id命令,將B剛剛設置的鍵 lock.id 刪除,執行 SETNX lock.id命令,並返回1,即C得到鎖。
上面的步驟很明顯出現了問題,致使B,C同時獲取了鎖。在檢測到鎖超時後,線程不能直接簡單地執行 DEL 刪除鍵的操做以得到鎖。
對於上面的步驟進行改進,問題是出在刪除鍵的操做上面,那麼獲取鎖以後應該怎麼改進呢? 首先看一下redis的GETSET這個操做,GETSET key value
,將給定 key 的值設爲 value ,並返回 key 的舊值(old value)。利用這個操做指令,咱們改進一下上述的步驟。
- A已經首先得到了鎖 lock.id,而後線A斷線。B,C都在等待競爭該鎖;
- B,C讀取lock.id的值,比較當前時間和鍵 lock.id 的值來判斷是否超時,發現超時;
- B檢測到鎖已超時,即當前的時間大於鍵 lock.id 的值,B會執行
GETSET lock.id <current Unix timestamp + lock timeout + 1>
設置時間戳,經過比較鍵 lock.id 的舊值是否小於當前時間,判斷進程是否已得到鎖;- B發現GETSET返回的值小於當前時間,則執行 DEL lock.id命令,並執行 SETNX lock.id 命令,並返回1,B得到鎖;
- C執行GETSET獲得的時間大於當前時間,則繼續等待。
在線程釋放鎖,即執行 DEL lock.id 操做前,須要先判斷鎖是否已超時。若是鎖已超時,那麼鎖可能已由其餘線程得到,這時直接執行 DEL lock.id 操做會致使把其餘線程已得到的鎖釋放掉。
public boolean lock(long acquireTimeout, TimeUnit timeUnit) throws InterruptedException {
acquireTimeout = timeUnit.toMillis(acquireTimeout);
long acquireTime = acquireTimeout + System.currentTimeMillis();
//使用J.U.C的ReentrantLock
threadLock.tryLock(acquireTimeout, timeUnit);
try {
//循環嘗試
while (true) {
//調用tryLock
boolean hasLock = tryLock();
if (hasLock) {
//獲取鎖成功
return true;
} else if (acquireTime < System.currentTimeMillis()) {
break;
}
Thread.sleep(sleepTime);
}
} finally {
if (threadLock.isHeldByCurrentThread()) {
threadLock.unlock();
}
}
return false;
}
public boolean tryLock() {
long currentTime = System.currentTimeMillis();
String expires = String.valueOf(timeout + currentTime);
//設置互斥量
if (redisHelper.setNx(mutex, expires) > 0) {
//獲取鎖,設置超時時間
setLockStatus(expires);
return true;
} else {
String currentLockTime = redisUtil.get(mutex);
//檢查鎖是否超時
if (Objects.nonNull(currentLockTime) && Long.parseLong(currentLockTime) < currentTime) {
//獲取舊的鎖時間並設置互斥量
String oldLockTime = redisHelper.getSet(mutex, expires);
//舊值與當前時間比較
if (Objects.nonNull(oldLockTime) && Objects.equals(oldLockTime, currentLockTime)) {
//獲取鎖,設置超時時間
setLockStatus(expires);
return true;
}
}
return false;
}
}
複製代碼
lock調用tryLock方法,參數爲獲取的超時時間與單位,線程在超時時間內,獲取鎖操做將自旋在那裏,直到該自旋鎖的保持者釋放了鎖。
tryLock方法中,主要邏輯以下:
public boolean unlock() {
//只有鎖的持有線程才能解鎖
if (lockHolder == Thread.currentThread()) {
//判斷鎖是否超時,沒有超時纔將互斥量刪除
if (lockExpiresTime > System.currentTimeMillis()) {
redisHelper.del(mutex);
logger.info("刪除互斥量[{}]", mutex);
}
lockHolder = null;
logger.info("釋放[{}]鎖成功", mutex);
return true;
} else {
throw new IllegalMonitorStateException("沒有獲取到鎖的線程沒法執行解鎖操做");
}
}
複製代碼
在上面獲取鎖的實現下,其實此處的釋放鎖函數能夠不須要了,有興趣的讀者能夠結合上面的代碼看下爲何?有想法能夠留言哦!
本文主要講解了基於redis分佈式鎖的實現,在分佈式環境下,數據一致性問題一直是一個比較重要的話題,而synchronized和lock鎖在分佈式環境已經失去了做用。常見的鎖的方案有基於數據庫實現分佈式鎖、基於緩存實現分佈式鎖、基於Zookeeper實現分佈式鎖,簡單介紹了每種鎖的實現特色;而後,文中探索了一下redis鎖的實現方案;最後,本文給出了基於Java實現的redis分佈式鎖,讀者能夠自行驗證一下。