java 分佈式鎖

實現分佈式鎖的基本思路

採用 redis 和 zookeeper 的方式實現html

  • 採用 redis 實現分佈式鎖的基本思路經過一個單節點的 redis 服務器,而後全部客戶端線程經過設置同一個 key 來實現鎖的獲取,這裏的關鍵是要區分開不一樣節點機器的不一樣線程(例如每一個 JVM 裏面的不一樣線程都經過一個單例對象來獲取鎖,該單例對象中對應一個 UUID,這樣就區分開了不一樣的 JVM,而後一個 JVM 的不一樣線程經過線程 ID 來區分,UUID+線程ID 就區分開了不一樣進程的不一樣線程),對應緩存值須要保存兩部分信息,一個就是線程的惟一標識(UUID+線程ID);另一個就是該線程獲取鎖的次數,用來支持重入鎖的特性,能夠將這兩個信息保存到一個 json 對象中,這樣 key 和 value 就都有了,若是線程設置 key 成功,則表示鎖獲取成功,若是當前 key 已經被設置,則說明鎖已經被人佔用,若是當前 key 對應的 value 裏面的線程標識正好好當前線程標識相同,則鎖重入java

  • 採用 zookeeper 方式實現分佈式鎖的基本思路是利用,ZooKeeper機制規定:同一個目錄下只能有一個惟一的文件名。例如:咱們在Zookeeper目錄 /test 目錄下建立,兩個客戶端建立一個名爲 Lock 節點,只有一個可以成功。利用名稱惟一性,加鎖操做時,只須要全部客戶端一塊兒建立 /test/Lock 節點,只有一個建立成功,成功者得到鎖。解鎖時,只需刪除 /test/Lock 節點,其他客戶端再次進入競爭建立節點,直到全部客戶端都得到鎖。node


RedissonLock 實現分佈式鎖部分源碼介紹

參考 redisson 項目 git

RLock 接口介紹github

注:這裏的 lockInterruptibly 和 tryLock 方法都和 jdk 中的 Lock 接口有區別redis

lockInterruptibly 方法中多了 leaseTime 參數,表示鎖最大持有時間,即客戶申請到鎖以後不論是否手動 unlock 了,超過 leaseTime 設定的時間後都將自動釋放鎖,防止客戶程序異常致使鎖沒法釋放的問題算法

tryLock 方法除了 waitTime 參數外,也多了一個 leaseTime 時間,其原理也是同樣的json


下面是 RedissonLock 裏面的部分源碼分析緩存

lockInterruptibly 獲取鎖的主要入口方法安全

    @Override
    public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
        Long ttl;
        if (leaseTime != -1) {
            ttl = tryLockInner(leaseTime, unit);
        } else {
            ttl = tryLockInner();
        }
        // 鎖獲取成功,直接返回
        if (ttl == null) {
            return;
        }

        // redis 消息機制
        // "redisson_lock__channel__{" + getName() + "}"; 在該 channel 上訂閱消息,當 unlock 發生時,將 channel 上全部監聽者將收到通知
        Future<RedissonLockEntry> future = subscribe();
        future.sync();

        try {
            while (true) {
                // 再嘗試一次獲取鎖,若是獲取到了就直接返回
                if (leaseTime != -1) {
                    ttl = tryLockInner(leaseTime, unit);
                } else {
                    ttl = tryLockInner();
                }
                // lock acquired
                if (ttl == null) {
                    break;
                }

                // 阻塞等待,當其餘線程調用 unlock 方法時被喚醒,或者 ttl 時間超時
                // 喚醒以後須要從新競爭鎖,由於可能多個線程被同時喚醒,而每次只會有一個線程成功獲取鎖
                RedissonLockEntry entry = getEntry();
                if (ttl >= 0) {
                    entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    entry.getLatch().acquire();
                }
            }
        } finally {
            unsubscribe(future);
        }
    }

tryLockInner 方法的內部實現

    Long tryLockInner(long leaseTime, TimeUnit unit) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        // 這裏經過一條 redis 的批處理命令來設置 key(這裏由 redis 的特性來保證整條批處理命令的原子性)
        // 第一個 if 若是 key 不存在,則設置 uuid+線ID 對應的值爲 1,並設值 key 對應的超時時間
        // 第二個 if 爲重入鎖特性的支持,而且刷新 key 的超時時間
        // 不然返回 key 對應超時時間,即鎖獲取失敗
        return commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
                  "if (redis.call('exists', KEYS[1]) == 0) then " +   // KEYS[1] == Collections.<Object>singletonList(getName())
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +  // ARGV[2] == getLockName()
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +   // ARGV[1] == internalLockLeaseTime 
                      "return nil; " +
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName());
    }

unlock 方法的內部實現

    @Override
    public void unlock() {
        // 這裏須要先理解 redis 中緩存的 key 和 value 的結構,key 對應的就是鎖的名稱,全部節點的全部線程都是採用同一個 key (同一把鎖)
        // value 對應的是一個對象的結構,其中有兩個屬性 {UUID+線程ID, 獲取鎖的次數}
        Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                        "if (redis.call('exists', KEYS[1]) == 0) then " +   // 若是對應的 key 已經不存在,說明 key 已經超時,redis 自動刪除了該 key,getChannelName() 發佈 unlockMessage 通知其餘在阻塞等待獲取鎖的線程
                            "redis.call('publish', KEYS[2], ARGV[1]); " +
                            "return 1; " +
                        "end;" +
                        "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +  // key 存在,可是已經不是被當前線程佔有
                            "return nil;" +
                        "end; " +
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +  // getLockName() 其實等於 UUID+線程ID,將這個值-1,若是結果大於0,說明鎖存在重入,則從新刷新鎖超時時間
                        "if (counter > 0) then " +
                            "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                            "return 0; " +
                        "else " +
                            "redis.call('del', KEYS[1]); " +  // -1以後=0,則正常刪除 key,而且發佈 unlockMessage 事件
                            "redis.call('publish', KEYS[2], ARGV[1]); " +
                            "return 1; "+
                        "end; " +
                        "return nil;",
                        Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName());
        if (opStatus == null) {
            // IllegalMonitorStateException 表示當前調用 unlock 方法的線程不是持有 lock 的線程
            throw new IllegalMonitorStateException("attempt to unlock read lock, not locked by current thread by node id: "
                    + id + " thread-id: " + Thread.currentThread().getId());
        }
        if (opStatus) {
            cancelExpirationRenewal();
        }
    }


RedissonLock 分佈式鎖的基本使用

RedissonLock 使用起來很是簡單,若是須要詳細瞭解 RedissonLock 的使用,能夠看看 Redisson 項目中相關的測試用例

public void testLock() {
	RLock lock = redisson.getLock("lockName");
	try {
		// .....
	} finally {
		if(lock.isHeldByCurrentThread())
			lock.unlock();
	}
}

// redisson 建立示例代碼,通常實現成單例模式
Config config = new Config();
config.useSingleServer().setAddress("127.0.0.1:6379");
RedissonClient redisson = Redisson.create(config);


其餘問題探討

本文詳細介紹了採用 RedissonLock 方式實現分佈式鎖的相關源碼和使用方式,可是這種方式也存在一個問題是,用於實現分佈式鎖的獲取和釋放是一個單實例的 redis 實例,若是該實例宕機,系統中全部的分佈式鎖獲取程序都沒法正常工做,那麼第一個問題:

如何經過一個集羣環境的 redis 實例來實現分佈式鎖的管理,以實現分佈式鎖的高可用性

一個可行的解決思路以下(參考:http://www.open-open.com/lib/view/open1415107259996.html):

咱們假設有 N 個 Redis 主節點。這些節點是相互獨立的,所以咱們不使用複製或其餘隱式同步機制。咱們已經描述過在單實例狀況下如何安全地獲取鎖。咱們也指出此算法將使用這種方法從單實例獲取和釋放鎖。在如下示例中,咱們設置N=5(這是個比較適中的值),這樣咱們須要在不一樣物理機或虛擬機上運行 5 個 Redis 主節點,以確保它們的出錯是儘量獨立的。

爲了獲取鎖,客戶端執行如下操做:

  1. 獲取當前時間,以毫秒爲單位。

  2. 以串行的方式嘗試從全部的N個實例中獲取鎖,使用的是相同的key值和相同的隨機value值。在從每一個實例獲取鎖時,客戶端會設置一個鏈接超時,其時長相比鎖的自動釋放時間要短得多。例如,若鎖的自動釋放時間是10秒,那麼鏈接超時大概設在5到50毫秒之間。這能夠避免當Redis節點掛掉時,會長時間堵住客戶端:若是某個節點沒及時響應,就應該儘快轉到下個節點。

  3. 客戶端計算獲取全部鎖耗費的時長,方法是使用當前時間減去步驟1中的時間戳。當且僅當客戶端能從多數節點(至少3個)中得到鎖,而且耗費的時長小於鎖的有效期時,可認爲鎖已經得到了。

  4. 若是鎖得到了,它的最終有效時長將從新計算爲其原時長減去步驟3中獲取鎖耗費的時長。

  5. 若是鎖獲取失敗了(要麼是沒有鎖住N/2+1個節點,要麼是鎖的最終有效時長爲負數),客戶端會對全部實例進行解鎖操做(即便對那些沒有加鎖成功的實例也同樣)。

相關文章
相關標籤/搜索