分佈式鎖的功能和訴求,咱們已經在Redis分佈式鎖:基於AOP和Redis實現的簡易版分佈式鎖簡單的介紹過了。java
目前自研的Redis分佈式鎖,已可知足大部分場景(非公平+可自動續期+可重入的分佈式鎖),可投入生產環境的單機環境中使用。可是由於是基於Redis單機的環境,只能用於併發量並不高的場景。隨着接入的業務場景擴大,Redis單機已經變得不可靠了,那麼接下來給咱們的選擇只有兩種: 一、Redis單機改成集羣。 二、改用其餘基於一致性算法的實現方式。redis
方案1有先天性的缺陷,redis集羣沒法保證一致性問題,在master節點宕機的瞬間,master和slave節點之間的數據多是不一致的。這將會致使服務a從master節點拿到了鎖a,而後master節點宕機,在slave節點還沒有徹底同步完master的數據以前,服務b將從slave節點上成功拿到一樣的鎖a。算法
而在其餘基於一致性算法的實現方式上,zk和ectd是不錯的選擇。而後考慮到zk已廉頗老矣,咱們選擇了ectd這個後起之秀。緩存
因爲在分佈式鎖的場景內,咱們更關注的是鎖的一致性,而非鎖的可用性,因此cp鎖比ap鎖更可靠。bash
etcd引入了租約的概念,咱們首先須要授予一個租約,而後同時設置租約的有效時間。租約的有效時間咱們能夠用來做爲鎖的有效時間。網絡
而後咱們能夠直接調用etcd的lock功能,在指定的租約上對指定的lockName進行加鎖操做。若是當前沒有其餘線程持有該鎖,則該線程能直接持有鎖。不然須要等待。這裏咱們能夠將timeout的時間設置爲鎖的等待時間來實現競爭鎖失敗等待獲取的過程。固然因爲網絡波動等問題,我建議timeout的時間最少設置爲500ms(或大家認爲合理的數值)。多線程
而後解鎖的過程,咱們放棄了etcd的unlock操做,而直接使用了etcd的revoke操做。之因此沒采用unlock操做,一是由於unlock所須要的參數是上一步lock操做返回的lockKey,咱們並不但願多維護一個字段,二是由於咱們最終會執行revoke操做,而revoke操做會將該租約下的全部key都失效,由於咱們目前目前設計的是一個租約對應一個鎖,不存在會釋放其它業務場景中的鎖的狀況。併發
此外,爲了保證線程在等待獲取鎖的過程當中租約不會過時,因此咱們得爲這個線程設置一個守護線程,在該線程授予租約後就開啓守護線程,按期去判斷是否須要續期。異步
和redis分佈式鎖不同的是,redis分佈式鎖的有效時間是緩存的有效時間,因此能夠在獲取鎖成功後再開啓用於續期的守護線程,而etcd分佈式鎖的有效時間是租約的有效時間,在等待獲取鎖的過程當中可能租約會過時,因此得在獲取租約後就得開啓守護線程。這樣就增長了不少的複雜度。分佈式
##具體實現 原生的etcd是經過Go語言來寫的,直接在java程序中應用會有一點困難,因此咱們直接採用jetcd來做爲etcd的客戶端,這樣在java程序中就可使用代碼方式和etcd服務端通信。
jetcd提供了LeaseClient,咱們能夠直接使用grant功能完成授予租約的操做。
public LockLeaseData getLeaseData(String lockName, Long lockTime) {
try {
LockLeaseData lockLeaseData = new LockLeaseData();
CompletableFuture<LeaseGrantResponse> leaseGrantResponseCompletableFuture = client.getLeaseClient().grant(lockTime);
Long leaseId = leaseGrantResponseCompletableFuture.get(1, TimeUnit.SECONDS).getID();
lockLeaseData.setLeaseId(leaseId);
CpSurvivalClam cpSurvivalClam = new CpSurvivalClam(Thread.currentThread(), leaseId, lockName, lockTime, this);
Thread survivalThread = threadFactoryManager.getThreadFactory().newThread(cpSurvivalClam);
survivalThread.start();
lockLeaseData.setCpSurvivalClam(cpSurvivalClam);
lockLeaseData.setSurvivalThread(survivalThread);
return lockLeaseData;
} catch (InterruptedException | ExecutionException | TimeoutException e) {
return null;
}
}
複製代碼
此外如上所述,咱們在獲取租約後,開啓了CpSurvivalClam的守護線程來按期續期。CpSurvivalClam的實現和咱們在redis分佈式鎖的時候實現大致一致,差異只是將其中的expandLockTime操做改成了etcd中的keepAliveOnce。expandLockTime方法具體以下所示:
/**
* 重置鎖的有效時間
*
* @param leaseId 鎖的租約id
* @return 是否成功重置
*/
public Boolean expandLockTime(Long leaseId) {
try {
CompletableFuture<LeaseKeepAliveResponse> leaseKeepAliveResponseCompletableFuture = client.getLeaseClient().keepAliveOnce(leaseId);
leaseKeepAliveResponseCompletableFuture.get();
return Boolean.TRUE;
} catch (InterruptedException | ExecutionException e) {
return Boolean.FALSE;
}
}
複製代碼
而後jetcd提供了LockClient,咱們直接能夠用lock功能,將leaseId和lockName傳入,咱們會獲得一個在該租約下的lockKey。此外爲了保證加鎖成功後,租約未過時。咱們加了一步timeToLive的操做,用於判斷租約在獲取鎖成功後的是否還存活。若是ttl未大於0,則判斷爲加鎖失敗。
/**
* 在指定的租約上加鎖,若是租約過時,則算加鎖失敗。
*
* @param leaseId 鎖的租約Id
* @param lockName 鎖的名稱
* @param waitTime 加鎖過程當中的的等待時間,單位ms
* @return 是否加鎖成功
*/
public Boolean tryLock(Long leaseId, String lockName, Long waitTime) {
try {
CompletableFuture<LockResponse> lockResponseCompletableFuture = client.getLockClient().lock(ByteSequence.from(lockName, Charset.defaultCharset()), leaseId);
long timeout = Math.max(500, waitTime);
lockResponseCompletableFuture.get(timeout, TimeUnit.MILLISECONDS).getKey();
CompletableFuture<LeaseTimeToLiveResponse> leaseTimeToLiveResponseCompletableFuture = client.getLeaseClient().timeToLive(leaseId, LeaseOption.DEFAULT);
long ttl = leaseTimeToLiveResponseCompletableFuture.get(1, TimeUnit.SECONDS).getTTl();
if (ttl > 0) {
return Boolean.TRUE;
} else {
return Boolean.FALSE;
}
} catch (TimeoutException | InterruptedException | ExecutionException e) {
return Boolean.FALSE;
}
}
複製代碼
解鎖過程,咱們能夠直接使用LeaseClient下的revoke操做,在撤銷租約的同時將該租約下的lock釋放。
/**
* 取消租約,並釋放鎖
*
* @param leaseId 租約id
* @return 是否成功釋放
*/
public Boolean unLock(Long leaseId) {
try {
CompletableFuture<LeaseRevokeResponse> revokeResponseCompletableFuture = client.getLeaseClient().revoke(leaseId);
revokeResponseCompletableFuture.get(1, TimeUnit.SECONDS);
return Boolean.TRUE;
} catch (InterruptedException | ExecutionException | TimeoutException e) {
return Boolean.FALSE;
}
}
複製代碼
而後是統一的CpLock對象,封裝了加解鎖的過程,對外只暴露execute方法,避免使用者忘記解鎖步驟。
public class CpLock {
private String lockName;
private LockEtcdClient lockEtcdClient;
/**
* 分佈式鎖的鎖持有數
*/
private volatile int state;
private volatile transient Thread lockOwnerThread;
/**
* 當前線程擁有的lease對象
*/
private FastThreadLocal<LockLeaseData> lockLeaseDataFastThreadLocal = new FastThreadLocal<>();
/**
* 鎖自動釋放時間,單位s,默認爲30
*/
private static Long LOCK_TIME = 30L;
/**
* 獲取鎖失敗單次等待時間,單位ms,默認爲300
*/
private static Integer SLEEP_TIME_ONCE = 300;
CpLock(String lockName, LockEtcdClient lockEtcdClient) {
this.lockName = lockName;
this.lockEtcdClient = lockEtcdClient;
}
private LockLeaseData getLockLeaseData(String lockName, long lockTime) {
if (lockLeaseDataFastThreadLocal.get() != null) {
return lockLeaseDataFastThreadLocal.get();
} else {
LockLeaseData lockLeaseData = lockEtcdClient.getLeaseData(lockName, lockTime);
lockLeaseDataFastThreadLocal.set(lockLeaseData);
return lockLeaseData;
}
}
final Boolean tryLock(long waitTime) {
final long startTime = System.currentTimeMillis();
final long endTime = startTime + waitTime * 1000;
final long lockTime = LOCK_TIME;
final Thread current = Thread.currentThread();
try {
do {
int c = this.getState();
if (c == 0) {
LockLeaseData lockLeaseData = this.getLockLeaseData(lockName, lockTime);
if (Objects.isNull(lockLeaseData)) {
return Boolean.FALSE;
}
Long leaseId = lockLeaseData.getLeaseId();
if (lockEtcdClient.tryLock(leaseId, lockName, endTime - System.currentTimeMillis())) {
log.info("線程獲取重入鎖成功,cp鎖的名稱爲{}", lockName);
this.setLockOwnerThread(current);
this.setState(c + 1);
return Boolean.TRUE;
}
} else if (lockOwnerThread == Thread.currentThread()) {
if (c + 1 <= 0) {
throw new Error("Maximum lock count exceeded");
}
this.setState(c + 1);
log.info("線程重入鎖成功,cp鎖的名稱爲{},當前LockCount爲{}", lockName, state);
return Boolean.TRUE;
}
int sleepTime = SLEEP_TIME_ONCE;
if (waitTime > 0) {
log.info("線程暫時沒法得到cp鎖,當前已等待{}ms,本次將再等待{}ms,cp鎖的名稱爲{}", System.currentTimeMillis() - startTime, sleepTime, lockName);
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
log.info("線程等待過程當中被中斷,cp鎖的名稱爲{}", lockName, e);
}
}
} while (System.currentTimeMillis() <= endTime);
if (waitTime == 0) {
log.info("線程得到cp鎖失敗,將放棄獲取,cp鎖的名稱爲{}", lockName);
} else {
log.info("線程得到cp鎖失敗,以前共等待{}ms,將放棄等待獲取,cp鎖的名稱爲{}", System.currentTimeMillis() - startTime, lockName);
}
this.stopKeepAlive();
return Boolean.FALSE;
} catch (Exception e) {
log.error("execute error", e);
this.stopKeepAlive();
return Boolean.FALSE;
}
}
/**
* 中止續約,並將租約對象從線程中移除
*/
private void stopKeepAlive() {
LockLeaseData lockLeaseData = lockLeaseDataFastThreadLocal.get();
if (Objects.nonNull(lockLeaseData)) {
lockLeaseData.getCpSurvivalClam().stop();
lockLeaseData.setCpSurvivalClam(null);
lockLeaseData.getSurvivalThread().interrupt();
lockLeaseData.setSurvivalThread(null);
}
lockLeaseDataFastThreadLocal.remove();
}
final void unLock() {
if (lockOwnerThread == Thread.currentThread()) {
int c = this.getState() - 1;
if (c == 0) {
this.setLockOwnerThread(null);
this.setState(c);
LockLeaseData lockLeaseData = lockLeaseDataFastThreadLocal.get();
this.stopKeepAlive();
//unLock操做必須在最後執行,避免其餘線程獲取到鎖時的state等數據不正確
lockEtcdClient.unLock(lockLeaseData.getLeaseId());
log.info("重入鎖LockCount-1,線程已成功釋放鎖,cp鎖的名稱爲{}", lockName);
} else {
this.setState(c);
log.info("重入鎖LockCount-1,cp鎖的名稱爲{},剩餘LockCount爲{}", lockName, c);
}
}
}
public <T> T execute(Supplier<T> supplier, int waitTime) {
Boolean holdLock = Boolean.FALSE;
Preconditions.checkArgument(waitTime >= 0, "waitTime必須爲天然數");
try {
if (holdLock = this.tryLock(waitTime)) {
return supplier.get();
}
return null;
} catch (Exception e) {
log.error("cpLock execute error", e);
return null;
} finally {
if (holdLock) {
this.unLock();
}
}
}
public <T> T execute(Supplier<T> supplier) {
return this.execute(supplier, 0);
}
}
複製代碼
CpLock和以前Redis分佈式鎖中的ApLock實現大致一致。區別主要有:
一、由於咱們是在授予租約的操做中開啓了守護線程,因此在競爭鎖失敗、出現異常和釋放鎖這些場景下,咱們必須得中止守護線程續期。又由於是可重入的場景,咱們又只但願在state爲0的狀況下再去生成租約去競爭鎖。因此避免多種狀況判斷,咱們引入了FastThreadLocal lockLeaseDataFastThreadLocal來保存當前線程的Lease對象。
二、redis分佈式鎖在任何場景下,等待獲取鎖都是經過休眠輪詢的方式實現的,而在etcd場景下,咱們在state爲0時經過etcd自身的等待邏輯來完成等待,在state非0場景下,依然經過休眠輪詢的方式來實現等待。由於可能會存在state從非0轉爲0的狀況,因此咱們的waitTime值是endTime - System.currentTimeMillis(),而非本來傳入的waitTime。這樣可以讓等待時間更接近咱們指望值。
本次更新,咱們實現了基於etcd的cp分佈式鎖,同時也修復了redis分佈式鎖中的一個隱藏問題。
以前的setState操做在unLock以後,這樣在併發場景下會致使一個問題發生。線程a和線程b在競爭獲取鎖a,此時各自的局部變量c和state都爲0,而後線程a在獲取到了鎖以後馬上釋放了鎖,此時先執行了unLock,state仍是1,線程b成功得到鎖,將state重置爲c+1,依然是1,而後線程a執行setState,將stete改成0。此時線程b若是去釋放鎖,執行stete-1操做,變爲了-1。這個問題主要是由於獲取state值和state值修改操做是異步的,而在多線程場景下,分佈式鎖是經過lock控制的,咱們只須要將unLock操做挪到全部賦值以後便可解決這個問題。
目前實現的cp分佈式鎖的版本,已可知足分佈式鎖的絕大部分場景(非公平+可自動續期+可重入+強一致性的分佈式鎖),已可投入生產環境的集羣中使用。後續的計劃中,ap鎖和cp鎖將會分別更新,會優化一些使用場景。也會嘗試去解決公平鎖的問題,以及循環獲取鎖須要等待休眠的問題。
以上計劃已完成,如何實現公平鎖可詳見Etcd分佈式鎖(二):支持公平鎖,避免某些場景下線程長期沒法獲取鎖
本次cp分佈式鎖須要考慮大量的使用場景,目前只進行了小規模的測試,若有考慮不周的地方,還望你們海涵。
一、Redis分佈式鎖:基於AOP和Redis實現的簡易版分佈式鎖
二、Redis分佈式鎖(二):支持鎖的續期,避免鎖超時後致使多個線程得到鎖
三、Redis分佈式鎖(三):支持鎖可重入,避免鎖遞歸調用時死鎖
好了,咱們下一期再見,歡迎你們一塊兒留言討論。同時也歡迎點贊~