分佈式鎖的實現分析


一、設計目標

分佈式部署的應用集羣中保證數據更新的互斥性,且程序出現異常時,鎖可以自動釋放,避免死鎖發生。java

二、爲何要使用分佈式鎖

爲了保證分佈式部署的應用集羣中同一時間只有一個客戶端對共享資源進行操做。根據鎖的用途再細分:node

  • 對共享資源的操做是冪等性的,使用分佈式鎖可以避免重複操做,從而提升效率。
  • 對共享資源的操做是非冪等的,好比訂單狀態的修改,若是多個客戶端同時操做,最後的結果可能很遭,使用分佈式鎖可讓各個客戶端分散時間操做。

三、分佈式鎖應具有哪些條件

這也是分佈式鎖的關鍵技術。git

  • 互斥性 這個是分佈式鎖的基本要求,分佈式鎖須要保證不一樣客戶端的不一樣線程之間互斥。
  • 可重入性 支持鎖的重入,減小資源消耗。
  • 鎖超時釋放 獲取鎖的客戶端由於某些緣由而宕機,而未能釋放鎖,其餘客戶端沒法獲取此鎖,鎖超時釋放是爲了不死鎖。
  • 安全性 鎖只能被持有該鎖的用戶刪除,而不能被其餘用戶刪除。
  • 高效與高可用 加鎖與解鎖須要高效,並保證高可用,當部分節點宕機,客戶端仍能獲取鎖或者釋放鎖。
  • 支持阻塞與非阻塞 阻塞就是線程獲取不到鎖一直阻塞,增長超時時間能夠防止一直阻塞。非阻塞則獲取不到鎖不阻塞線程。
  • 支持公平與非公平 公平鎖就是按照加鎖的順序獲取到鎖,非公平鎖即無序。

四、基於Redis

4.一、 從一個問題開始

問題:假設設置失效時間10秒,若是因爲某些緣由致使10秒還沒執行完任務,這時候鎖自動失效,致使其餘線程也會拿到分佈式鎖,怎麼處理? 答:Redisson內部提供了一個監控鎖的看門狗,它的做用是在Redisson實例被關閉前,不斷的延長鎖的有效期。github

4.二、 Redisson實現的分佈式鎖

具體使用能夠參考Redisson官方文檔 這裏貼上我簡單使用的例子:redis

4.2.一、例子
4.2.1.一、 pom

我使用的是springboot,因此直接用了redisson提供的集成包。算法

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.11.4</version>
</dependency>
4.2.1.二、application.properties

我用的redis是官方cluster的3主3從。spring

# common spring boot settings

#spring.redis.database=
#spring.redis.host=
#spring.redis.port=
spring.redis.password=XXXXXXXXXX
#spring.redis.ssl=
#spring.redis.timeout=
spring.redis.cluster.nodes=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
#spring.redis.sentinel.master=
#spring.redis.sentinel.nodes=

# Redisson settings

#path to redisson.yaml or redisson.json
#spring.redis.redisson.config=classpath:redisson.yaml
4.2.1.三、 test
public void contextLoads() {
        //簡單使用測試
//        RBucket<String> bucket = redisson.getBucket("bucket");
//        bucket.set("test");
//        String obj = bucket.get();
//        System.out.println(obj);

        // 得到鎖對象實例
        RLock lock = redisson.getLock("lock");

        // 獲取分佈式鎖,採用默認超時時間30秒
        // 若是負責儲存這個分佈式鎖的Redisson節點宕機之後,
        // 並且這個鎖正好處於鎖住的狀態時,
        // 這個鎖會出現鎖死的狀態。
        // 爲了不這種狀況的發生,Redisson內部提供了一個監控鎖的看門狗,
        // 它的做用是在Redisson實例被關閉前,不斷的延長鎖的有效期。
        // 默認狀況下,看門狗的檢查鎖的超時時間是30秒鐘,
        // 也能夠經過修改Config.lockWatchdogTimeout來另行指定
        lock.lock();
        try {
            Thread.sleep(80000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 釋放鎖
            lock.unlock();
        }

        // 加鎖之後10秒鐘自動解鎖
        // 無需調用unlock方法手動解鎖
        // 這種指定了超時時間的鎖不會走看門狗邏輯,
        // 即會發生任務沒有執行完成時,鎖超時了,其餘進程會獲取到這個分佈式鎖。
        // 儘可能使用第一種方式,走看門狗邏輯。
//        lock.lock(40, TimeUnit.SECONDS);
//        try {
//            Thread.sleep(80000);
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
    }
4.2.二、解析

這裏再看一看具體獲取鎖和釋放鎖的核心邏輯:json

4.2.2.一、 獲取鎖

首先,調用了RedissonLock中的Lock方法:安全

@Override
    public void lock() {
        try {
            lock(-1, null, false);
        } catch (InterruptedException e) {
            throw new IllegalStateException();
        }
    }

注意這裏第一個入參爲-1。 進入lock方法:springboot

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
        long threadId = Thread.currentThread().getId();
		//獲取鎖邏輯
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
		//獲取成功,返回
            return;
        }

        //訂閱鎖
        RFuture<RedissonLockEntry> future = subscribe(threadId);
        commandExecutor.syncSubscription(future);

        try {
		    //持續獲取鎖
            while (true) {
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    break;
                }

                // waiting for message
                if (ttl >= 0) {
                    try {
                        getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        if (interruptibly) {
                            throw e;
                        }
                        getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    }
                } else {
                    if (interruptibly) {
                        getEntry(threadId).getLatch().acquire();
                    } else {
                        getEntry(threadId).getLatch().acquireUninterruptibly();
                    }
                }
            }
        } finally {
		    //最終取消訂閱獲取鎖
            unsubscribe(future, threadId);
        }
    }

再看tryAcquire方法:

private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
        return get(tryAcquireAsync(leaseTime, unit, threadId));
    }
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
        if (leaseTime != -1) {
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
		//獲取鎖
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        //看門狗邏輯
		ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e != null) {
                return;
            }

            // lock acquired
            if (ttlRemaining == null) {
                scheduleExpirationRenewal(threadId);
            }
        });
        return ttlRemainingFuture;
    }

由於leaseTime爲-1,因此首先異步的獲取鎖,以後會走看門狗邏輯。 先看獲取鎖的操做:tryLockInnerAsync

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  "if (redis.call('exists', KEYS[1]) == 0) then "  
                      "redis.call('hset', KEYS[1], ARGV[2], 1); "  
                      "redis.call('pexpire', KEYS[1], ARGV[1]); "  
                      "return nil; "  
                  "end; "  
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then "  
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); "  
                      "redis.call('pexpire', KEYS[1], ARGV[1]); "  
                      "return nil; "  
                  "end; "  
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }

Redisson 使用 EVAL 命令執行上面的 Lua 腳原本完成獲取鎖的操做:

  • 若是經過 exists 命令發現當前 key 不存在,即鎖沒被佔用,則執行 hset 寫入 Hash 類型數據 key:全局鎖名稱(例如共享資源ID), field:鎖實例名稱(Redisson客戶端ID:線程ID), value:1,並執行 pexpire 對該 key 設置失效時間,返回空值 nil,至此獲取鎖成功。
  • 若是經過 hexists 命令發現 Redis 中已經存在當前 key 和 field 的 Hash 數據,說明當前線程以前已經獲取到鎖,由於這裏的鎖是可重入的,則執行 hincrby 對當前 key field 的值加一,並從新設置失效時間,返回空值,至此重入獲取鎖成功。
  • 最後是鎖已被佔用的狀況,即當前 key 已經存在,可是 Hash 中的 Field 與當前值不一樣,則執行 pttl 獲取鎖的剩餘存活時間並返回,至此獲取鎖失敗。
4.2.2.二、 釋放鎖
@Override
    public void unlock() {
        try {
            get(unlockAsync(Thread.currentThread().getId()));
        } catch (RedisException e) {
            if (e.getCause() instanceof IllegalMonitorStateException) {
                throw (IllegalMonitorStateException) e.getCause();
            } else {
                throw e;
            }
        }
    }
 @Override
    public RFuture<Void> unlockAsync(long threadId) {
        RPromise<Void> result = new RedissonPromise<Void>();
        RFuture<Boolean> future = unlockInnerAsync(threadId);

        future.onComplete((opStatus, e) -> {
            if (e != null) {
                cancelExpirationRenewal(threadId);
                result.tryFailure(e);
                return;
            }

            if (opStatus == null) {
                IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                          id   " thread-id: "   threadId);
                result.tryFailure(cause);
                return;
            }
            
            cancelExpirationRenewal(threadId);
            result.trySuccess(null);
        });

        return result;
    }

上面opStatus爲null時,會拋出異常,必須由加鎖的線程釋放鎖。 再來看核心方法:unlockInnerAsync

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then "  
                    "return nil;"  
                "end; "  
                "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); "  
                "if (counter > 0) then "  
                    "redis.call('pexpire', KEYS[1], ARGV[2]); "  
                    "return 0; "  
                "else "  
                    "redis.call('del', KEYS[1]); "  
                    "redis.call('publish', KEYS[2], ARGV[1]); "  
                    "return 1; " 
                "end; "  
                "return nil;",
                Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));

    }

依然使用 EVAL 命令執行 Lua 腳原本釋放鎖:

  • key 不存在,說明鎖已釋放,直接執行 publish 命令發佈釋放鎖消息並返回 1。
  • key 存在,可是 field 在 Hash 中不存在,說明本身不是鎖持有者,無權釋放鎖,返回 nil。
  • 由於鎖可重入,因此釋放鎖時不能把全部已獲取的鎖全都釋放掉,一次只能釋放一把鎖,所以執行 hincrby 對鎖的值減一。
  • 釋放一把鎖後,若是還有剩餘的鎖,則刷新鎖的失效時間並返回 0;若是剛纔釋放的已是最後一把鎖,則執行 del 命令刪除鎖的 key,併發布鎖釋放消息,返回 1。
  • 上面執行結果返回 nil 的狀況,由於本身不是鎖的持有者,不容許釋放別人的鎖,故拋出異常。
  • 執行結果返回 1 的狀況,該鎖的全部實例都已所有釋放,因此不須要再刷新鎖的失效時間。

上面的代碼解析文本源自:Redisson 分佈式鎖實現分析(一)

4.三、RedLock算法

Redis做者antirez基於分佈式環境下提出了一種更高級的分佈式鎖的實現方式:Redlock。 Redisson中也實現了這種算法,具體能夠參考看8.4章節 這裏簡單描述一下這種算法: 假設有5個互不鏈接的Redis集羣

  • 獲取當前時間,單位毫秒
  • 依次嘗試從5個集羣中獲取相同的鎖。當獲取鎖的時候,客戶端設置一個網絡鏈接和超時時間, 這個超時時間應該小於鎖的失效時間,若是服務端沒有在規定的時間內響應,則嘗試另外一個redis集羣。
  • 客戶端使用當前時間減去開始獲取鎖的時間(第一步記錄的時間),獲得獲取鎖使用的時間。 當且僅當一半以上(這裏爲3)的集羣獲取到鎖,而且使用的時間小於鎖失效時間時,纔算獲取鎖成功。
  • 獲取到鎖以後,設置key真正有效的時間等於有效時間減去獲取鎖花費的時間。
  • 若是獲取鎖失敗了,客戶端應在全部redis集羣上進行解鎖。

貼一段Redisson的小例子:

RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");

RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
// 同時加鎖:lock1 lock2 lock3
// 紅鎖在大部分節點上加鎖成功就算成功。
lock.lock();
...
lock.unlock();

RedissonRedLock繼承自RedissonMultiLock,具體源碼就再也不繼續分析了。

五、基於Zookeeper

5.一、節點類型

Zookeeper是一個一致性的文件系統,保證了其每一個節點的惟一性。 有4種節點類型:

  • 持久化目錄節點 客戶端與Zookeeper斷開後,該節點依舊存在。
  • 持久化順序編號目錄節點 保持持久化目錄節點的特性外,每一個節點的名稱會被順序編號。
  • 臨時目錄節點 客戶端與Zookeeper斷開後,該節點被刪除。
  • 臨時順序標號目錄節點 保持臨時目錄節點的特性外,每一個節點的名稱會被順序編號。

節點類型.png

順序號是單調遞增的計數器,由父節點維護。

5.二、分佈式鎖原理

分佈式鎖就是利用了Zookeeper的臨時順序標號目錄節點的原理來實現。Locks主節點下面的ID最小的節點得到鎖的權限,其餘客戶端來獲取鎖時,發現本身不是最靠前的,會監視他的前一個鎖節點,當鎖釋放時,相應的節點被刪除,會通知這個等待的客戶端,讓其獲取鎖的權利,至關於造成了一個等待隊列。

分佈式鎖原理.png

Zookeeper分佈式鎖的優勢就是有現成的框架能夠拿來就用,由於有等待隊列,槍鎖的效率也會高。缺點是由於Zookeeper是相似文件系統的數據結構,因此刪除和新增節點的效率會比較低。

參考

《Redis官方文檔》用Redis構建分佈式鎖 8. 分佈式鎖和同步器 Redisson 分佈式鎖實現分析(一) Redis 分佈式鎖的前世此生 Redlock:Redis分佈式鎖最牛逼的實現

tencent.jpg

相關文章
相關標籤/搜索