採用 redis 和 zookeeper 的方式實現html
採用 redis 實現分佈式鎖的基本思路經過一個單節點的 redis 服務器,而後全部客戶端線程經過設置同一個 key 來實現鎖的獲取,這裏的關鍵是要區分開不一樣節點機器的不一樣線程(例如每一個 JVM 裏面的不一樣線程都經過一個單例對象來獲取鎖,該單例對象中對應一個 UUID,這樣就區分開了不一樣的 JVM,而後一個 JVM 的不一樣線程經過線程 ID 來區分,UUID+線程ID 就區分開了不一樣進程的不一樣線程),對應緩存值須要保存兩部分信息,一個就是線程的惟一標識(UUID+線程ID);另一個就是該線程獲取鎖的次數,用來支持重入鎖的特性,能夠將這兩個信息保存到一個 json 對象中,這樣 key 和 value 就都有了,若是線程設置 key 成功,則表示鎖獲取成功,若是當前 key 已經被設置,則說明鎖已經被人佔用,若是當前 key 對應的 value 裏面的線程標識正好好當前線程標識相同,則鎖重入java
採用 zookeeper 方式實現分佈式鎖的基本思路是利用,ZooKeeper機制規定:同一個目錄下只能有一個惟一的文件名。例如:咱們在Zookeeper目錄 /test 目錄下建立,兩個客戶端建立一個名爲 Lock 節點,只有一個可以成功。利用名稱惟一性,加鎖操做時,只須要全部客戶端一塊兒建立 /test/Lock 節點,只有一個建立成功,成功者得到鎖。解鎖時,只需刪除 /test/Lock 節點,其他客戶端再次進入競爭建立節點,直到全部客戶端都得到鎖。node
參考 redisson 項目 git
RLock 接口介紹github
注:這裏的 lockInterruptibly 和 tryLock 方法都和 jdk 中的 Lock 接口有區別redis
lockInterruptibly 方法中多了 leaseTime 參數,表示鎖最大持有時間,即客戶申請到鎖以後不論是否手動 unlock 了,超過 leaseTime 設定的時間後都將自動釋放鎖,防止客戶程序異常致使鎖沒法釋放的問題算法
tryLock 方法除了 waitTime 參數外,也多了一個 leaseTime 時間,其原理也是同樣的json
下面是 RedissonLock 裏面的部分源碼分析緩存
lockInterruptibly 獲取鎖的主要入口方法安全
@Override public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException { Long ttl; if (leaseTime != -1) { ttl = tryLockInner(leaseTime, unit); } else { ttl = tryLockInner(); } // 鎖獲取成功,直接返回 if (ttl == null) { return; } // redis 消息機制 // "redisson_lock__channel__{" + getName() + "}"; 在該 channel 上訂閱消息,當 unlock 發生時,將 channel 上全部監聽者將收到通知 Future<RedissonLockEntry> future = subscribe(); future.sync(); try { while (true) { // 再嘗試一次獲取鎖,若是獲取到了就直接返回 if (leaseTime != -1) { ttl = tryLockInner(leaseTime, unit); } else { ttl = tryLockInner(); } // lock acquired if (ttl == null) { break; } // 阻塞等待,當其餘線程調用 unlock 方法時被喚醒,或者 ttl 時間超時 // 喚醒以後須要從新競爭鎖,由於可能多個線程被同時喚醒,而每次只會有一個線程成功獲取鎖 RedissonLockEntry entry = getEntry(); if (ttl >= 0) { entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { entry.getLatch().acquire(); } } } finally { unsubscribe(future); } }
tryLockInner 方法的內部實現
Long tryLockInner(long leaseTime, TimeUnit unit) { internalLockLeaseTime = unit.toMillis(leaseTime); // 這裏經過一條 redis 的批處理命令來設置 key(這裏由 redis 的特性來保證整條批處理命令的原子性) // 第一個 if 若是 key 不存在,則設置 uuid+線ID 對應的值爲 1,並設值 key 對應的超時時間 // 第二個 if 爲重入鎖特性的支持,而且刷新 key 的超時時間 // 不然返回 key 對應超時時間,即鎖獲取失敗 return commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG, "if (redis.call('exists', KEYS[1]) == 0) then " + // KEYS[1] == Collections.<Object>singletonList(getName()) "redis.call('hset', KEYS[1], ARGV[2], 1); " + // ARGV[2] == getLockName() "redis.call('pexpire', KEYS[1], ARGV[1]); " + // ARGV[1] == internalLockLeaseTime "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName()); }
unlock 方法的內部實現
@Override public void unlock() { // 這裏須要先理解 redis 中緩存的 key 和 value 的結構,key 對應的就是鎖的名稱,全部節點的全部線程都是採用同一個 key (同一把鎖) // value 對應的是一個對象的結構,其中有兩個屬性 {UUID+線程ID, 獲取鎖的次數} Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('exists', KEYS[1]) == 0) then " + // 若是對應的 key 已經不存在,說明 key 已經超時,redis 自動刪除了該 key,getChannelName() 發佈 unlockMessage 通知其餘在阻塞等待獲取鎖的線程 "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end;" + "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + // key 存在,可是已經不是被當前線程佔有 "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + // getLockName() 其實等於 UUID+線程ID,將這個值-1,若是結果大於0,說明鎖存在重入,則從新刷新鎖超時時間 "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + // -1以後=0,則正常刪除 key,而且發佈 unlockMessage 事件 "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;", Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName()); if (opStatus == null) { // IllegalMonitorStateException 表示當前調用 unlock 方法的線程不是持有 lock 的線程 throw new IllegalMonitorStateException("attempt to unlock read lock, not locked by current thread by node id: " + id + " thread-id: " + Thread.currentThread().getId()); } if (opStatus) { cancelExpirationRenewal(); } }
RedissonLock 使用起來很是簡單,若是須要詳細瞭解 RedissonLock 的使用,能夠看看 Redisson 項目中相關的測試用例
public void testLock() { RLock lock = redisson.getLock("lockName"); try { // ..... } finally { if(lock.isHeldByCurrentThread()) lock.unlock(); } } // redisson 建立示例代碼,通常實現成單例模式 Config config = new Config(); config.useSingleServer().setAddress("127.0.0.1:6379"); RedissonClient redisson = Redisson.create(config);
本文詳細介紹了採用 RedissonLock 方式實現分佈式鎖的相關源碼和使用方式,可是這種方式也存在一個問題是,用於實現分佈式鎖的獲取和釋放是一個單實例的 redis 實例,若是該實例宕機,系統中全部的分佈式鎖獲取程序都沒法正常工做,那麼第一個問題:
如何經過一個集羣環境的 redis 實例來實現分佈式鎖的管理,以實現分佈式鎖的高可用性
一個可行的解決思路以下(參考:http://www.open-open.com/lib/view/open1415107259996.html):
咱們假設有 N 個 Redis 主節點。這些節點是相互獨立的,所以咱們不使用複製或其餘隱式同步機制。咱們已經描述過在單實例狀況下如何安全地獲取鎖。咱們也指出此算法將使用這種方法從單實例獲取和釋放鎖。在如下示例中,咱們設置N=5(這是個比較適中的值),這樣咱們須要在不一樣物理機或虛擬機上運行 5 個 Redis 主節點,以確保它們的出錯是儘量獨立的。
爲了獲取鎖,客戶端執行如下操做:
獲取當前時間,以毫秒爲單位。
以串行的方式嘗試從全部的N個實例中獲取鎖,使用的是相同的key值和相同的隨機value值。在從每一個實例獲取鎖時,客戶端會設置一個鏈接超時,其時長相比鎖的自動釋放時間要短得多。例如,若鎖的自動釋放時間是10秒,那麼鏈接超時大概設在5到50毫秒之間。這能夠避免當Redis節點掛掉時,會長時間堵住客戶端:若是某個節點沒及時響應,就應該儘快轉到下個節點。
客戶端計算獲取全部鎖耗費的時長,方法是使用當前時間減去步驟1中的時間戳。當且僅當客戶端能從多數節點(至少3個)中得到鎖,而且耗費的時長小於鎖的有效期時,可認爲鎖已經得到了。
若是鎖得到了,它的最終有效時長將從新計算爲其原時長減去步驟3中獲取鎖耗費的時長。
若是鎖獲取失敗了(要麼是沒有鎖住N/2+1個節點,要麼是鎖的最終有效時長爲負數),客戶端會對全部實例進行解鎖操做(即便對那些沒有加鎖成功的實例也同樣)。