分佈式鎖實現彙總

[TOC]html

分佈式鎖實現彙總

不少時候咱們須要保證同一時間一個方法只能被同一個線程調用,在單機環境中,Java中其實提供了不少併發處理相關的API,可是這些API在分佈式場景中就無能爲力了。也就是說單純的Java Api並不能提供分佈式鎖的能力。java

針對分佈式鎖的實現目前有多種方案:redis

  • 基於數據庫實現分佈式鎖
  • 基於緩存(redis,memcached)實現分佈式鎖
  • 基於Zookeeper實現分佈式鎖

基於數據庫實現分佈式鎖

簡單實現

直接建一張表,裏面記錄鎖定的方法名 時間 便可。
須要加鎖時,就插入一條數據,釋放鎖時就刪除數據。算法

CREATE TABLE `methodLock` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主鍵',
  `method_name` varchar(64) NOT NULL DEFAULT '' COMMENT '鎖定的方法名',
  `desc` varchar(1024) NOT NULL DEFAULT '備註信息',
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '保存數據時間,自動生成',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uidx_method_name` (`method_name `) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='鎖定中的方法';複製代碼

當咱們想要鎖住某個方法時,執行如下SQL:sql

insert into methodLock(method_name,desc) values (‘method_name’,‘desc’)複製代碼

由於咱們對method_name作了惟一性約束,這裏若是有多個請求同時提交到數據庫的話,數據庫會保證只有一個操做能夠成功,那麼咱們就能夠認爲
操做成功的那個線程得到了該方法的鎖,能夠執行方法體內容。
當方法執行完畢以後,想要釋放鎖的話,須要執行如下Sql:數據庫

delete from methodLock where method_name ='method_name'複製代碼
存在的問題
  1. 這把鎖強依賴數據庫的可用性,數據庫是一個單點,一旦數據庫掛掉,會致使業務系統不可用。
  2. 這把鎖沒有失效時間,一旦解鎖操做失敗,就會致使鎖記錄一直在數據庫中,其餘線程沒法再得到到鎖。
  3. 這把鎖只能是非阻塞的,由於數據的insert操做,一旦插入失敗就會直接報錯。沒有得到鎖的線程並不會進入排隊隊列,要想再次得到鎖就要再次觸發得到鎖操做。
  4. 這把鎖是非重入的,同一個線程在沒有釋放鎖以前沒法再次得到該鎖。由於數據中數據已經存在了。
解決辦法
  1. 單點問題能夠用多數據庫實例,同時塞N個表,N/2+1個成功就職務鎖定成功
  2. 寫一個定時任務,隔一段時間清除一次過時的數據。
  3. 寫一個while循環,不斷的重試插入,直到成功。
  4. 在數據庫表中加個字段,記錄當前得到鎖的機器的主機信息和線程信息,那麼下次再獲取鎖的時候先查詢數據庫,若是當前機器的主機信息和線程信息在數據庫能夠查到的話,直接把鎖分配給他就能夠了。
總結

數據庫實現分佈式鎖的優勢: 直接藉助數據庫,容易理解。apache

數據庫實現分佈式鎖的缺點: 會有各類各樣的問題,在解決問題的過程當中會使整個方案變得愈來愈複雜。緩存

操做數據庫須要必定的開銷,性能問題須要考慮。安全

基於緩存實現分佈式鎖

相比於用數據庫來實現分佈式鎖,基於緩存實現的分佈式鎖的性能會更好一些。bash

目前有不少成熟的分佈式產品,包括Redis、memcache、Tair等。

單點實現
步驟
  1. 獲取鎖的使用,使用setnx加鎖,將值設爲當前的時間戳,再使用expire設置一個過時值。
  2. 獲取到鎖則執行同步代碼塊,沒獲取則根據業務場景能夠選擇自旋、休眠、或作一個等待隊列等擁有鎖進程來喚醒(相似Synchronize的同步隊列),在等待時使用ttl去檢查是否有過時值,若是沒有則使用expire設置一個。
  3. 執行完畢後,先根據value的值來判斷是否是本身的鎖,若是是的話則刪除,不是則代表本身的鎖已通過期,不須要刪除。(此時出現因爲過時而致使的多進程同時擁有鎖的問題)
存在的問題
  1. 單點問題。若是單機redis掛掉了,那麼程序會跟着出錯
  2. 若是轉移使用 slave節點,複製不是同步複製,會出現多個程序獲取鎖的狀況
code
public Object around(ProceedingJoinPoint joinPoint) {
        try {
            MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
            Method method = methodSignature.getMethod();

            DLock dLock = method.getAnnotation(DLock.class);
            if (dLock != null) {
                String lockedPrefix = buildLockedPrefix(dLock, method, joinPoint.getArgs());
                long timeOut = dLock.timeOut();
                int expireTime = dLock.expireTime();
                long value = System.currentTimeMillis();
                if (lock(lockedPrefix, timeOut, expireTime, value)) {
                    try {
                        return joinPoint.proceed();
                    } catch (Throwable throwable) {
                        throwable.printStackTrace();
                    } finally {
                        unlock(lockedPrefix, value);
                    }
                } else {
                    recheck(lockedPrefix, expireTime);
                }

            }
        } catch (Exception e) {
            logger.error("DLockAspect around error", e);
        }
        return null;
    }

    /** * 檢查是否設置過超時 * * @param lockedPrefix * @param expireTime */
    public void recheck(String lockedPrefix, int expireTime) {
        try {
            Result<Long> ttl = cacheFactory.getFactory().ttl(getLockedPrefix(lockedPrefix));
            if (ttl.isSuccess() && ttl.getValue() == -1) {
                Result<String> get = cacheFactory.getFactory().get(getLockedPrefix(lockedPrefix));
                //沒有超時設置則設置超時
                if (get.isSuccess() && !StringUtils.isEmpty(get.getValue())) {
                    long oldTime = Long.parseLong(get.getValue());
                    long newTime = expireTime * 1000 - (System.currentTimeMillis() - oldTime);
                    if (newTime < 0) {
                        //已過超時時間 設默認最小超時時間
                        cacheFactory.getFactory().expire(getLockedPrefix(lockedPrefix), MIX_EXPIRE_TIME);
                    } else {
                        //未超過 設置爲剩餘超時時間
                        cacheFactory.getFactory().expire(getLockedPrefix(lockedPrefix), (int) newTime);
                    }
                    logger.info(lockedPrefix + "recheck:" + newTime);
                }
            }
            logger.info(String.format("執行失敗lockedPrefix:%s count:%d", lockedPrefix, count++));
        } catch (Exception e) {
            logger.error("DLockAspect recheck error", e);
        }

    }
        public boolean lock(String lockedPrefix, long timeOut, int expireTime, long value) {
        long millisTime = System.currentTimeMillis();
        try {
            //在timeOut的時間範圍內不斷輪詢鎖
            while (System.currentTimeMillis() - millisTime < timeOut * 1000) {
                //鎖不存在的話,設置鎖並設置鎖過時時間,即加鎖
                Result<Long> result = cacheFactory.getFactory().setnx(getLockedPrefix(lockedPrefix), String.valueOf(value));
                if (result.isSuccess() && result.getValue() == 1) {

                    Result<Long> result1 = cacheFactory.getFactory().expire(getLockedPrefix(lockedPrefix), expireTime);
                    logger.info(lockedPrefix + "locked and expire " + result1.getValue());
                    return true;
                }
                //短暫休眠,避免可能的活鎖
                Thread.sleep(100, RANDOM.nextInt(50000));
            }
        } catch (Exception e) {
            logger.error("lock error " + getLockedPrefix(lockedPrefix), e);
        }

        return false;
    }

    public void unlock(String lockedPrefix, long value) {
        try {
            Result<String> result = cacheFactory.getFactory().get(getLockedPrefix(lockedPrefix));
            String kvValue = result.getValue();
            if (!StringUtils.isEmpty(kvValue) && kvValue.equals(String.valueOf(value))) {
                cacheFactory.getFactory().del(getLockedPrefix(lockedPrefix));

            }
            logger.info(lockedPrefix + "unlock:" + kvValue + "----" + value);
        } catch (Exception e) {
            logger.error("unlock error" + getLockedPrefix(lockedPrefix), e);
        }
    }複製代碼
RedLock

Redlock是Redis的做者antirez給出的集羣模式的Redis分佈式鎖,它基於N個徹底獨立的Redis節點(一般狀況下N能夠設置成5)。

步驟
  1. 獲取當前時間(毫秒數)。
  2. 按順序依次向N個Redis節點執行獲取鎖的操做。這個獲取操做跟前面基於單Redis節點的獲取鎖的過程相同,包含隨機字符串my_random_value,也包含過時時間(好比PX 30000,即鎖的有效時間)。爲了保證在某個Redis節點不可用的時候算法可以繼續運行,這個獲取鎖的操做還有一個超時時間(time out),它要遠小於鎖的有效時間(幾十毫秒量級)。客戶端在向某個Redis節點獲取鎖失敗之後,應該當即嘗試下一個Redis節點。這裏的失敗,應該包含任何類型的失敗,好比該Redis節點不可用,或者該Redis節點上的鎖已經被其它客戶端持有(注:Redlock原文中這裏只提到了Redis節點不可用的狀況,但也應該包含其它的失敗狀況)。
  3. 計算整個獲取鎖的過程總共消耗了多長時間,計算方法是用當前時間減去第1步記錄的時間。若是客戶端從大多數Redis節點(>= N/2+1)成功獲取到了鎖,而且獲取鎖總共消耗的時間沒有超過鎖的有效時間(lock validity time),那麼這時客戶端才認爲最終獲取鎖成功;不然,認爲最終獲取鎖失敗。
  4. 若是最終獲取鎖成功了,那麼這個鎖的有效時間應該從新計算,它等於最初的鎖的有效時間減去第3步計算出來的獲取鎖消耗的時間。
  5. 若是最終獲取鎖失敗了(可能因爲獲取到鎖的Redis節點個數少於N/2+1,或者整個獲取鎖的過程消耗的時間超過了鎖的最初有效時間),那麼客戶端應該當即向全部Redis節點發起釋放鎖的操做。
優化

客戶端1成功鎖住了A, B, C,獲取鎖成功(但D和E沒有鎖住);節點C崩潰重啓了,但客戶端1在C上加的鎖沒有持久化下來,丟失了;節點C重啓後,客戶端2鎖住了C, D, E,獲取鎖成功。客戶端1和客戶端2同時得到了鎖(針對同一資源)。

這個問題能夠延遲節點的恢復時間,時間長度應大於等於一個鎖的過時時間。

存在的問題
  1. 時鐘發生跳躍。這種狀況發生時,直接致使全部的鎖都超時,新的線程能夠成功的獲取鎖,致使多線程同時處理。

關於RedLock的更多內容能夠看:

基於Redis的分佈式鎖真的安全嗎?(上)

基於Redis的分佈式鎖真的安全嗎?(下)

一個比較好的實現:

Redission

Zookeeper鎖

步驟
  1. 每一個客戶端對某個方法加鎖時,在zookeeper上的與該方法對應的指定節點的目錄下,生成一個惟一的瞬時有序節點。
  2. 判斷是否獲取鎖的方式很簡單,只須要判斷有序節點中序號最小的一個。
  3. 當釋放鎖的時候,只需將這個瞬時節點刪除便可。同時,其能夠避免服務宕機致使的鎖沒法釋放,而產生的死鎖問題。
優勢
  1. 無單點問題。ZK是集羣部署的,只要集羣中有半數以上的機器存活,就能夠對外提供服務。

  2. 持有鎖任意長的時間,可自動釋放鎖。使用Zookeeper能夠有效的解決鎖沒法釋放的問題,由於在建立鎖的時候,客戶端會在ZK中建立一個臨時節點,一旦客戶端獲取到鎖以後忽然掛掉(Session鏈接斷開),那麼這個臨時節點就會自動刪除掉。其餘客戶端就能夠再次得到鎖。這避免了基於Redis的鎖對於有效時間(lock validity time)到底設置多長的兩難問題。實際上,基於ZooKeeper的鎖是依靠Session(心跳)來維持鎖的持有狀態的,而Redis不支持Sesion。

  3. 可阻塞。使用Zookeeper能夠實現阻塞的鎖,客戶端能夠經過在ZK中建立順序節點,而且在節點上綁定監聽器,一旦節點有變化,Zookeeper會通知客戶端,客戶端能夠檢查本身建立的節點是否是當前全部節點中序號最小的,若是是,那麼本身就獲取到鎖,即可以執行業務邏輯了。

  4. 可重入。客戶端在建立節點的時候,把當前客戶端的主機信息和線程信息直接寫入到節點中,下次想要獲取鎖的時候和當前最小的節點中的數據比對一下就能夠了。若是和本身的信息同樣,那麼本身直接獲取到鎖,若是不同就再建立一個臨時的順序節點,參與排隊。

問題
  1. 這種作法可能引起羊羣效應,從而下降鎖的性能。
  2. 性能不如緩存。由於每次在建立鎖和釋放鎖的過程當中,都要動態建立、銷燬瞬時節點來實現鎖功能。ZK中建立和刪除節點只能經過Leader服務器來執行,而後將數據同不到全部的Follower機器上。

一個比較好的實現:

Curator

public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
    try {
        return interProcessMutex.acquire(timeout, unit);
    } catch (Exception e) {
        e.printStackTrace();
    }
    return true;
}
public boolean unlock() {
    try {
        interProcessMutex.release();
    } catch (Throwable e) {
        log.error(e.getMessage(), e);
    } finally {
        executorService.schedule(new Cleaner(client, path), delayTimeForClean, TimeUnit.MILLISECONDS);
    }
    return true;
}複製代碼

acquire方法用戶獲取鎖,release方法用於釋放鎖。

總結

使用Zookeeper實現分佈式鎖的優勢: 有效的解決單點問題,不可重入問題,非阻塞問題以及鎖沒法釋放的問題。實現起來較爲簡單。

使用Zookeeper實現分佈式鎖的缺點 : 性能上不如使用緩存實現分佈式鎖。 須要對ZK的原理有所瞭解。

三種方案的比較

從理解的難易程度角度(從低到高): 數據庫 > 緩存 > Zookeeper

從實現的複雜性角度(從低到高): Zookeeper >= 緩存 > 數據庫

從性能角度(從高到低): 緩存 > Zookeeper >= 數據庫

從可靠性角度(從高到低): Zookeeper > 緩存 > 數據庫

相關文章
相關標籤/搜索