1 前言java
最近在作項目的時候,遇到一個多線程訪問申請一個資源的問題。須要每一個線程都可以最終在有效的時間內申請到或者超時失敗。之前通常這種方式用的是redis作枷鎖機制,每一個線程都去redis加一把共同的鎖,若是枷鎖成功,則執行資源申請操做。而沒有枷鎖成功的線程,則在有效時間內循環嘗試去枷鎖,而且每次木有加鎖成功,則就Thread.sleep()一會。 redis
經過這種方式能夠看出,這裏對於sleep的時間間隔要求設置很嚴格,若是過小,則就會增長大規模的redis請求操做;而若是太長,當資源可用的時候,可是線程若都在sleep,則會出現資源空閒等待,線程不能及時的使用資源。編程
基於上面這種緣由,開始換種用curator的分佈式鎖機制來實現互斥鎖。多線程
2 簡介分佈式
curator的特性,沒必要多說,能夠說是 curator是zookeeper中的相似於guava對於java的意義同樣,提供了豐富的封裝,異常處理,提供了fluent編程模型,提供了master選舉,分佈式鎖,分佈式基數,分佈式barrier,能夠很方便的爲平常生產所使用。ide
此次主要用了curator的InterProcessMutex這種互斥鎖來作,也借於此機會,閱讀了它的代碼,分享一下它的機制,共你們一塊兒來交流學習下。oop
3 InterProcessMutex基本簡介
學習
InterProcessMutex一個可重入鎖,提供分佈式鎖的入口服務。基本的構造過程以下:
this
public InterProcessMutex(CuratorFramework client, String path) { this(client, path, LOCK_NAME, 1, new StandardLockInternalsDriver()); }
構造器內部最終會構造一個
spa
internals = new LockInternals(client, driver, path, lockName, maxLeases);
LockInternals這個是全部申請鎖與釋放鎖的核心實現
4 InterProcessMutex的獲取鎖
InterProcessMutex.internalLock()提供兩種機制來加鎖,
第一種是無限等待,直到獲取到鎖。第二種是有限等待,在規定的時間內獲取鎖,若是木有失敗。
該方法內部簡略以下:
Thread currentThread = Thread.currentThread(); LockData lockData = threadData.get(currentThread); if ( lockData != null ) { // re-entering lockData.lockCount.incrementAndGet(); return true; } String lockPath = internals.attemptLock(time, unit, getLockNodeBytes()); if ( lockPath != null ) { LockData newLockData = new LockData(currentThread, lockPath); threadData.put(currentThread, newLockData); return true; } return false;
對於同一個線程再次獲取鎖的時候,會判斷當前線程是否已經擁有了,
若是擁有了,則直接作原子操做加1,而後返回true,這樣就實現了可重入鎖。
對於其餘狀況,則都會調用 LockInternals.attemptLock();
4.1 LockInternals.attemptLock()
1)根據傳入的超時作判斷,是否須要millisToWait設置
2)建立臨時順序節點路徑:
ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
假如basepath=/zklock/activityno,這個是活動no的根路徑,這個path是構建InterProcessMutex設置的。
則path是/zklock/activityno/lock-,注意這個path是在basepath下建立的,lock-是curator自動添加的
則建立的順序節點如: /zklock/activityno/_c_f4a49d75-86f8-40b2-8b9c-d813392aa1db-lock-0000000008
尤爲要注意這裏,LockInternals會對全部請求獲取鎖的線程都會建立一個臨時順序節點,節點後綴順序由zk來保證,
同時zk客戶端底層可以保證同一個客戶端發送的請求是按照順序的,這樣就可以保證同一個客戶端先申請鎖建立的後綴序號比後申請的編號小。
3)循環等待嘗試枷鎖internalLockLoop(startMillis, millisToWait, ourPath);
內部核心代碼流程以下:
3.1) 獲取全部的子節點
List<String> children = getSortedChildren();
排序:獲取basepath下全部的子節點,而後截取lock-後面的編號,作升序排序,注意這裏的升序排序,保證了最早申請鎖的排在最開始,公平策略是根據誰先申請那麼你的優先級就應該最高
String sequenceNodeName = ourPath.substring(basePath.length() + 1);
獲取節點名如: _c_f4a49d75-86f8-40b2-8b9c-d813392aa1db-lock-0000000008
3.2 核心獲取鎖的判斷.
根據拿到的全部子節點路徑以及當前子節點去嘗試獲取鎖。
maxLeases表明是租賃個數,對於分佈式互斥鎖,這裏值爲0,保證了只容許租賃一個。
這裏就是獲取鎖的核心實現,若最終獲取成功,則直接return,不然進行無限wait或者有限wait()
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases); if ( predicateResults.getsTheLock() ) { haveTheLock = true; }else{ //wait等待,有限或者無限等待,注意這裏用了線程的wait來作等待 client.getData().usingWatcher(watcher).forPath(previousSequencePath); }
根據driver.getsTheLock的結果,若是木有獲取到,則就會watcher返回的path,而後根據傳入的時間來作wait操做。
注意這裏wait是互斥信號量是LockInternals. 自定義的watcher很簡單,一旦監聽的到的節點數據變動或刪除,則就直接notifyFromWatcher();
driver.getsTheLock內部代碼以下:
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception { //先根據子節點名獲取在已經升序的list中的索引位置。 int ourIndex = children.indexOf(sequenceNodeName); validateOurIndex(sequenceNodeName, ourIndex); //比較索引位置,因爲maxLeases=1,則只有ourIndex=0才成立,這樣就能夠用來判斷當前子節點是不是升序第一個節點,而且也有很好的擴展性 //能夠改變maxLeases來容許同時租賃的數量 //注意,這裏升序是根據zk生成的順序編號排序的,申請越早編號越小。 boolean getsTheLock = ourIndex < maxLeases; //若getsTheLock=true,表示獲取到鎖,不然獲取它上一個位置的路徑,注意這個路徑會用來作watche的 String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases); return new PredicateResults(pathToWatch, getsTheLock); }
5 InterProcessMutex的釋放鎖
主要是判斷是不是當前線程,或者非當前線程。最終會根據線程號找到對應的path路徑,而後直接刪除該臨時節點。
6 InterProcessMutex總結
1) curator的InterProcessMutex提供了多種鎖機制,互斥鎖,讀寫鎖,以及可定時數的互斥鎖
2)全部申請鎖都會建立臨時順序節點,保證了都可以有機會去獲取鎖。
3)內部用了線程的wait()和notifyAll()這種等待機制,能夠及時的喚醒最渴望獲得鎖的線程。
避免常規利用Thread.sleep()這種無用的間隔等待機制.
4) 利用redis作鎖的時候,通常都須要作鎖的有效時間限定。而curator則利用了zookeeper的臨時順序節點特性,
一旦客戶端失去鏈接後,則就會自動清除該節點。