提供了一個基於FIFO隊列,能夠用於構建鎖或者其餘相關同步裝置的基礎框架。該同步器(如下簡稱同步器)利用了一個int來表示狀態,指望它可以成爲實現大部分同步需求的基礎。使用的方法是繼承,子類經過繼承同步器並須要實現它的方法來管理其狀態,管理的方式就是經過相似acquire和release的方式來操縱狀態。然而多線程環境中對狀態的操縱必須確保原子性,所以子類對於狀態的把握,須要使用這個同步器提供的如下三個方法對狀態進行操做:java
子類推薦被定義爲自定義同步裝置的內部類,同步器自身沒有實現任何同步接口,它僅僅是定義了若干acquire之類的方法來供使用。該同步器便可以做爲排他模式也能夠做爲共享模式,當它被定義爲一個排他模式時,其餘線程對其的獲取就被阻止,而共享模式對於多個線程獲取均可以成功。node
同步器是實現鎖的關鍵,利用同步器將鎖的語義實現,而後在鎖的實現中聚合同步器。能夠這樣理解:鎖的API是面向使用者的,它定義了與鎖交互的公共行爲,而每一個鎖須要完成特定的操做也是透過這些行爲來完成的(好比:能夠容許兩個線程進行加鎖,排除兩個以上的線程),可是實現是依託給同步器來完成;同步器面向的是線程訪問和資源控制,它定義了線程對資源是否可以獲取以及線程的排隊等操做。鎖和同步器很好的隔離了兩者所須要關注的領域,嚴格意義上講,同步器能夠適用於除了鎖之外的其餘同步設施上(包括鎖)。
同步器的開始提到了其實現依賴於一個FIFO隊列,那麼隊列中的元素Node就是保存着線程引用和線程狀態的容器,每一個線程對同步器的訪問,均可以看作是隊列中的一個節點。Node的主要包含如下成員變量:安全
1 Node { 2 int waitStatus; 3 Node prev; 4 Node next; 5 Node nextWaiter; 6 Thread thread; 7 }
以上五個成員變量主要負責保存該節點的線程引用,同步等待隊列(如下簡稱sync隊列)的前驅和後繼節點,同時也包括了同步狀態。網絡
屬性名稱 | 描述 |
int waitStatus | 表示節點的狀態。其中包含的狀態有:
|
Node prev | 前驅節點,好比當前節點被取消,那就須要前驅節點和後繼節點來完成鏈接。 |
Node next | 後繼節點。 |
Node nextWaiter | 存儲condition隊列中的後繼節點。 |
Thread thread | 入隊列時的當前線程。 |
節點成爲sync隊列和condition隊列構建的基礎,在同步器中就包含了sync隊列。同步器擁有三個成員變量:sync隊列的頭結點head、sync隊列的尾節點tail和狀態state。對於鎖的獲取,請求造成節點,將其掛載在尾部,而鎖資源的轉移(釋放再獲取)是從頭部開始向後進行。對於同步器維護的狀態state,多個線程對其的獲取將會產生一個鏈式的結構。多線程
實現自定義同步器時,須要使用同步器提供的getState()、setState()和compareAndSetState()方法來操縱狀態的變遷。併發
方法名稱 | 描述 |
protected boolean tryAcquire(int arg) | 排它的獲取這個狀態。這個方法的實現須要查詢當前狀態是否容許獲取,而後再進行獲取(使用compareAndSetState來作)狀態。 |
protected boolean tryRelease(int arg) | 釋放狀態。 |
protected int tryAcquireShared(int arg) | 共享的模式下獲取狀態。 |
protected boolean tryReleaseShared(int arg) | 共享的模式下釋放狀態。 |
protected boolean isHeldExclusively() | 在排它模式下,狀態是否被佔用。 |
實現這些方法必須是非阻塞並且是線程安全的,推薦使用該同步器的父類java.util.concurrent.locks.AbstractOwnableSynchronizer來設置當前的線程。
開始提到同步器內部基於一個FIFO隊列,對於一個獨佔鎖的獲取和釋放有如下僞碼能夠表示。
獲取一個排他鎖。框架
01 while(獲取鎖) { 02 if (獲取到) { 03 退出while循環 04 } else { 05 if(當前線程沒有入隊列) { 06 那麼入隊列 07 } 08 阻塞當前線程 09 } 10 }
釋放一個排他鎖。ide
1 if (釋放成功) { 2 刪除頭結點 3 激活原頭結點的後繼節點 4 }
下面經過一個排它鎖的例子來深刻理解一下同步器的工做原理,而只有掌握同步器的工做原理纔可以更加深刻了解其餘的併發組件。
排他鎖的實現,一次只能一個線程獲取到鎖。工具
01 class Mutex implements Lock, java.io.Serializable { 02 // 內部類,自定義同步器 03 private static class Sync extends AbstractQueuedSynchronizer { 04 // 是否處於佔用狀態 05 protected boolean isHeldExclusively() { 06 return getState() == 1; 07 } 08 // 當狀態爲0的時候獲取鎖 09 public boolean tryAcquire(int acquires) { 10 assert acquires == 1; // Otherwise unused 11 if (compareAndSetState(0, 1)) { 12 setExclusiveOwnerThread(Thread.currentThread()); 13 return true; 14 } 15 return false; 16 } 17 // 釋放鎖,將狀態設置爲0 18 protected boolean tryRelease(int releases) { 19 assert releases == 1; // Otherwise unused 20 if (getState() == 0) throw new IllegalMonitorStateException(); 21 setExclusiveOwnerThread(null); 22 setState(0); 23 return true; 24 } 25 // 返回一個Condition,每一個condition都包含了一個condition隊列 26 Condition newCondition() { return new ConditionObject(); } 27 } 28 // 僅須要將操做代理到Sync上便可 29 private final Sync sync = new Sync(); 30 public void lock() { sync.acquire(1); } 31 public boolean tryLock() { return sync.tryAcquire(1); } 32 public void unlock() { sync.release(1); } 33 public Condition newCondition() { return sync.newCondition(); } 34 public boolean isLocked() { return sync.isHeldExclusively(); } 35 public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } 36 public void lockInterruptibly() throws InterruptedException { 37 sync.acquireInterruptibly(1); 38 } 39 public boolean tryLock(long timeout, TimeUnit unit) 40 throws InterruptedException { 41 return sync.tryAcquireNanos(1, unit.toNanos(timeout)); 42 } 43 }
能夠看到Mutex將Lock接口均代理給了同步器的實現。
使用方將Mutex構造出來以後,調用lock獲取鎖,調用unlock進行解鎖。下面以Mutex爲例子,詳細分析如下同步器的實現邏輯。測試
該方法以排他的方式獲取鎖,對中斷不敏感,完成synchronized語義。
1 public final void acquire(int arg) { 2 if (!tryAcquire(arg) && 3 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 4 selfInterrupt(); 5 }
上述邏輯主要包括:
1. 嘗試獲取(調用tryAcquire更改狀態,須要保證原子性);
在tryAcquire方法中使用了同步器提供的對state操做的方法,利用compareAndSet保證只有一個線程可以對狀態進行成功修改,而沒有成功修改的線程將進入sync隊列排隊。
2. 若是獲取不到,將當前線程構形成節點Node並加入sync隊列;
進入隊列的每一個線程都是一個節點Node,從而造成了一個雙向隊列,相似CLH隊列,這樣作的目的是線程間的通訊會被限制在較小規模(也就是兩個節點左右)。
3. 再次嘗試獲取,若是沒有獲取到那麼將當前線程從線程調度器上摘下,進入等待狀態。
使用LockSupport將當前線程unpark,關於LockSupport後續會詳細介紹。
01 private Node addWaiter(Node mode) { 02 Node node = new Node(Thread.currentThread(), mode); 03 // 快速嘗試在尾部添加 04 Node pred = tail; 05 if (pred != null) { 06 node.prev = pred; 07 if (compareAndSetTail(pred, node)) { 08 pred.next = node; 09 return node; 10 } 11 } 12 enq(node); 13 return node; 14 } 15 16 private Node enq(final Node node) { 17 for (;;) { 18 Node t = tail; 19 if (t == null) { // Must initialize 20 if (compareAndSetHead(new Node())) 21 tail = head; 22 } else { 23 node.prev = t; 24 if (compareAndSetTail(t, node)) { 25 t.next = node; 26 return t; 27 } 28 } 29 }
上述邏輯主要包括:
1. 使用當前線程構造Node;
對於一個節點須要作的是將當節點前驅節點指向尾節點(current.prev = tail),尾節點指向它(tail = current),原有的尾節點的後繼節點指向它(t.next = current)而這些操做要求是原子的。上面的操做是利用尾節點的設置來保證的,也就是compareAndSetTail來完成的。
2. 先行嘗試在隊尾添加;
若是尾節點已經有了,而後作以下操做:
(1)分配引用T指向尾節點;
(2)將節點的前驅節點更新爲尾節點(current.prev = tail);
(3)若是尾節點是T,那麼將當尾節點設置爲該節點(tail = current,原子更新);
(4)T的後繼節點指向當前節點(T.next = current)。
注意第3點是要求原子的。
這樣能夠以最短路徑O(1)的效果來完成線程入隊,是最大化減小開銷的一種方式。
3. 若是隊尾添加失敗或者是第一個入隊的節點。
若是是第1個節點,也就是sync隊列沒有初始化,那麼會進入到enq這個方法,進入的線程可能有多個,或者說在addWaiter中沒有成功入隊的線程都將進入enq這個方法。
能夠看到enq的邏輯是確保進入的Node都會有機會順序的添加到sync隊列中,而加入的步驟以下:
(1)若是尾節點爲空,那麼原子化的分配一個頭節點,並將尾節點指向頭節點,這一步是初始化;
(2)而後是重複在addWaiter中作的工做,可是在一個while(true)的循環中,直到當前節點入隊爲止。
進入sync隊列以後,接下來就是要進行鎖的獲取,或者說是訪問控制了,只有一個線程可以在同一時刻繼續的運行,而其餘的進入等待狀態。而每一個線程都是一個獨立的個體,它們自省的觀察,當條件知足的時候(本身的前驅是頭結點而且原子性的獲取了狀態),那麼這個線程可以繼續運行。
01 final boolean acquireQueued(final Node node, int arg) { 02 boolean failed = true; 03 try { 04 boolean interrupted = false; 05 for (;;) { 06 final Node p = node.predecessor(); 07 if (p == head && tryAcquire(arg)) { 08 setHead(node); 09 p.next = null; // help GC 10 failed = false; 11 return interrupted; 12 } 13 if (shouldParkAfterFailedAcquire(p, node) && 14 parkAndCheckInterrupt()) 15 interrupted = true; 16 } 17 } finally { 18 if (failed) 19 cancelAcquire(node); 20 } 21 }
上述邏輯主要包括:
1. 獲取當前節點的前驅節點;
須要獲取當前節點的前驅節點,而頭結點所對應的含義是當前站有鎖且正在運行。
2. 當前驅節點是頭結點而且可以獲取狀態,表明該當前節點佔有鎖;
若是知足上述條件,那麼表明可以佔有鎖,根據節點對鎖佔有的含義,設置頭結點爲當前節點。
3. 不然進入等待狀態。
若是沒有輪到當前節點運行,那麼將當前線程從線程調度器上摘下,也就是進入等待狀態。
這裏針對acquire作一下總結:
1. 狀態的維護;
須要在鎖定時,須要維護一個狀態(int類型),而對狀態的操做是原子和非阻塞的,經過同步器提供的對狀態訪問的方法對狀態進行操縱,而且利用compareAndSet來確保原子性的修改。
2. 狀態的獲取;
一旦成功的修改了狀態,當前線程或者說節點,就被設置爲頭節點。
3. sync隊列的維護。
在獲取資源未果的過程當中條件不符合的狀況下(不應本身,前驅節點不是頭節點或者沒有獲取到資源)進入睡眠狀態,中止線程調度器對當前節點線程的調度。
這時引入的一個釋放的問題,也就是說使睡眠中的Node或者說線程得到通知的關鍵,就是前驅節點的通知,而這一個過程就是釋放,釋放會通知它的後繼節點從睡眠中返回準備運行。
下面的流程圖基本描述了一次acquire所須要經歷的過程:
如上圖所示,其中的斷定退出隊列的條件,斷定條件是否知足和休眠當前線程就是完成了自旋spin的過程。
在unlock方法的實現中,使用了同步器的release方法。相對於在以前的acquire方法中能夠得出調用acquire,保證可以獲取到鎖(成功獲取狀態),而release則表示將狀態設置回去,也就是將資源釋放,或者說將鎖釋放。
1 public final boolean release(int arg) { 2 if (tryRelease(arg)) { 3 Node h = head; 4 if (h != null && h.waitStatus != 0) 5 unparkSuccessor(h); 6 return true; 7 } 8 return false; 9 }
上述邏輯主要包括:
1. 嘗試釋放狀態;
tryRelease可以保證原子化的將狀態設置回去,固然須要使用compareAndSet來保證。若是釋放狀態成功過以後,將會進入後繼節點的喚醒過程。
2. 喚醒當前節點的後繼節點所包含的線程。
經過LockSupport的unpark方法將休眠中的線程喚醒,讓其繼續acquire狀態。
01 private void unparkSuccessor(Node node) { 02 // 將狀態設置爲同步狀態 03 int ws = node.waitStatus; 04 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 獲取當前節點的後繼節點,若是知足狀態,那麼進行喚醒操做
// 若是沒有知足狀態,從尾部開始找尋符合要求的節點並將其喚醒 Node s = node.next; if (s == null || s.waitStatus > 0) { 05 s = null; 06 for (Node t = tail; t != null && t != node; t = t.prev) 07 if (t.waitStatus <= 0) 08 s = t; 09 } 10 if (s != null) 11 LockSupport.unpark(s.thread); 12 }
上述邏輯主要包括,該方法取出了當前節點的next引用,而後對其線程(Node)進行了喚醒,這時就只有一個或合理個數的線程被喚醒,被喚醒的線程繼續進行對資源的獲取與爭奪。
回顧整個資源的獲取和釋放過程:
在獲取時,維護了一個sync隊列,每一個節點都是一個線程在進行自旋,而依據就是本身是不是首節點的後繼而且可以獲取資源;
在釋放時,僅僅須要將資源還回去,而後通知一下後繼節點並將其喚醒。
這裏須要注意,隊列的維護(首節點的更換)是依靠消費者(獲取時)來完成的,也就是說在知足了自旋退出的條件時的一刻,這個節點就會被設置成爲首節點。
tryAcquire是自定義同步器須要實現的方法,也就是自定義同步器非阻塞原子化的獲取狀態,若是鎖該方法通常用於Lock的tryLock實現中,這個特性是synchronized沒法提供的。
該方法提供獲取狀態能力,固然在沒法獲取狀態的狀況下會進入sync隊列進行排隊,這相似acquire,可是和acquire不一樣的地方在於它可以在外界對當前線程進行中斷的時候提早結束獲取狀態的操做,換句話說,就是在相似synchronized獲取鎖時,外界可以對當前線程進行中斷,而且獲取鎖的這個操做可以響應中斷並提早返回。一個線程處於synchronized塊中或者進行同步I/O操做時,對該線程進行中斷操做,這時該線程的中斷標識位被設置爲true,可是線程依舊繼續運行。
若是在獲取一個經過網絡交互實現的鎖時,這個鎖資源忽然進行了銷燬,那麼使用acquireInterruptibly的獲取方式就可以讓該時刻嘗試獲取鎖的線程提早返回。而同步器的這個特性被實現Lock接口中的lockInterruptibly方法。根據Lock的語義,在被中斷時,lockInterruptibly將會拋出InterruptedException來告知使用者。
01 public final void acquireInterruptibly(int arg) 02 throws InterruptedException { 03 if (Thread.interrupted()) 04 throw new InterruptedException(); 05 if (!tryAcquire(arg)) 06 doAcquireInterruptibly(arg); 07 } 08 09 private void doAcquireInterruptibly(int arg) 10 throws InterruptedException { 11 final Node node = addWaiter(Node.EXCLUSIVE); 12 boolean failed = true; 13 try { 14 for (;;) { 15 final Node p = node.predecessor(); 16 if (p == head && tryAcquire(arg)) { 17 setHead(node); 18 p.next = null; // help GC 19 failed = false; 20 return; 21 } 22 // 檢測中斷標誌位 23 if (shouldParkAfterFailedAcquire(p, node) && 24 parkAndCheckInterrupt()) 25 throw new InterruptedException(); 26 } 27 } finally { 28 if (failed) 29 cancelAcquire(node); 30 } 31 }
上述邏輯主要包括:
1. 檢測當前線程是否被中斷;
判斷當前線程的中斷標誌位,若是已經被中斷了,那麼直接拋出異常並將中斷標誌位設置爲false。
2. 嘗試獲取狀態;
調用tryAcquire獲取狀態,若是順利會獲取成功並返回。
3. 構造節點並加入sync隊列;
獲取狀態失敗後,將當前線程引用構造爲節點並加入到sync隊列中。退出隊列的方式在沒有中斷的場景下和acquireQueued相似,當頭結點是本身的前驅節點而且可以獲取到狀態時,便可以運行,固然要將本節點設置爲頭結點,表示正在運行。
4. 中斷檢測。
在每次被喚醒時,進行中斷檢測,若是發現當前線程被中斷,那麼拋出InterruptedException並退出循環。
該方法提供了具有有超時功能的獲取狀態的調用,若是在指定的nanosTimeout內沒有獲取到狀態,那麼返回false,反之返回true。能夠將該方法看作acquireInterruptibly的升級版,也就是在判斷是否被中斷的基礎上增長了超時控制。
針對超時控制這部分的實現,主要須要計算出睡眠的delta,也就是間隔值。間隔能夠表示爲nanosTimeout = 原有nanosTimeout – now(當前時間)+ lastTime(睡眠以前記錄的時間)。若是nanosTimeout大於0,那麼還須要使當前線程睡眠,反之則返回false。
01 private boolean doAcquireNanos(int arg, long nanosTimeout) 02 throws InterruptedException { 03 long lastTime = System.nanoTime(); 04 final Node node = addWaiter(Node.EXCLUSIVE); 05 boolean failed = true; 06 try { 07 for (;;) { 08 final Node p = node.predecessor(); 09 if (p == head && tryAcquire(arg)) { 10 setHead(node); 11 p.next = null; // help GC 12 failed = false; 13 return true; 14 } 15 if (nanosTimeout <= 0) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) 16 LockSupport.parkNanos(this, nanosTimeout); 17 long now = System.nanoTime(); 18 //計算時間,當前時間減去睡眠以前的時間獲得睡眠的時間,而後被 19 //原有超時時間減去,獲得了還應該睡眠的時間 20 nanosTimeout -= now - lastTime; 21 lastTime = now; 22 if (Thread.interrupted()) 23 throw new InterruptedException(); 24 } 25 } finally { 26 if (failed) 27 cancelAcquire(node); 28 } 29 }
上述邏輯主要包括:
1. 加入sync隊列;
將當前線程構形成爲節點Node加入到sync隊列中。
2. 條件知足直接返回;
退出條件判斷,若是前驅節點是頭結點而且成功獲取到狀態,那麼設置本身爲頭結點並退出,返回true,也就是在指定的nanosTimeout以前獲取了鎖。
3. 獲取狀態失敗休眠一段時間;
經過LockSupport.unpark來指定當前線程休眠一段時間。
4. 計算再次休眠的時間;
喚醒後的線程,計算仍須要休眠的時間,該時間表示爲nanosTimeout = 原有nanosTimeout – now(當前時間)+ lastTime(睡眠以前記錄的時間)。其中now – lastTime表示此次睡眠所持續的時間。
5. 休眠時間的斷定。
喚醒後的線程,計算仍須要休眠的時間,並沒有阻塞的嘗試再獲取狀態,若是失敗後查看其nanosTimeout是否大於0,若是小於0,那麼返回徹底超時,沒有獲取到鎖。 若是nanosTimeout小於等於1000L納秒,則進入快速的自旋過程。那麼快速自旋會形成處理器資源緊張嗎?結果是不會,通過測算,開銷看起來很小,幾乎微乎其微。Doug Lea應該測算了在線程調度器上的切換形成的額外開銷,所以在短時1000納秒內就讓當前線程進入快速自旋狀態,若是這時再休眠相反會讓nanosTimeout的獲取時間變得更加不精確。
上述過程能夠以下圖所示:
上述這個圖中能夠理解爲在相似獲取狀態須要排隊的基礎上增長了一個超時控制的邏輯。每次超時的時間就是當前超時剩餘的時間減去睡眠的時間,而在這個超時時間的基礎上進行了判斷,若是大於0那麼繼續睡眠(等待),能夠看出這個超時版本的獲取狀態只是一個近似超時的獲取狀態,所以任何含有超時的調用基本結果就是近似於給定超時。
調用該方法可以以共享模式獲取狀態,共享模式和以前的獨佔模式有所區別。以文件的查看爲例,若是一個程序在對其進行讀取操做,那麼這一時刻,對這個文件的寫操做就被阻塞,相反,這一時刻另外一個程序對其進行一樣的讀操做是能夠進行的。若是一個程序在對其進行寫操做,那麼全部的讀與寫操做在這一時刻就被阻塞,直到這個程序完成寫操做。
以讀寫場景爲例,描述共享和獨佔的訪問模式,以下圖所示:
上圖中,紅色表明被阻塞,綠色表明能夠經過。
01 public final void acquireShared(int arg) { 02 if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) { 03 setHeadAndPropagate(node, r); 04 p.next = null; // help GC 05 if (interrupted) 06 selfInterrupt(); 07 failed = false; 08 return; 09 } 10 } 11 if (shouldParkAfterFailedAcquire(p, node) && 12 parkAndCheckInterrupt()) 13 interrupted = true; 14 } 15 } finally { 16 if (failed) 17 cancelAcquire(node); 18 } 19 }
上述邏輯主要包括:
1. 嘗試獲取共享狀態;
調用tryAcquireShared來獲取共享狀態,該方法是非阻塞的,若是獲取成功則馬上返回,也就表示獲取共享鎖成功。
2. 獲取失敗進入sync隊列;
在獲取共享狀態失敗後,當前時刻有多是獨佔鎖被其餘線程所把持,那麼將當前線程構形成爲節點(共享模式)加入到sync隊列中。
3. 循環內判斷退出隊列條件;
若是當前節點的前驅節點是頭結點而且獲取共享狀態成功,這裏和獨佔鎖acquire的退出隊列條件相似。
4. 獲取共享狀態成功;
在退出隊列的條件上,和獨佔鎖之間的主要區別在於獲取共享狀態成功以後的行爲,而若是共享狀態獲取成功以後會判斷後繼節點是不是共享模式,若是是共享模式,那麼就直接對其進行喚醒操做,也就是同時激發多個線程併發的運行。
5. 獲取共享狀態失敗。
經過使用LockSupport將當前線程從線程調度器上摘下,進入休眠狀態。
對於上述邏輯中,節點之間的通知過程以下圖所示:
上圖中,綠色表示共享節點,它們之間的通知和喚醒操做是在前驅節點獲取狀態時就進行的,紅色表示獨佔節點,它的被喚醒必須取決於前驅節點的釋放,也就是release操做,能夠看出來圖中的獨佔節點若是要運行,必須等待前面的共享節點均釋放了狀態才能夠。而獨佔節點若是獲取了狀態,那麼後續的獨佔式獲取和共享式獲取均被阻塞。
調用該方法釋放共享狀態,每次獲取共享狀態acquireShared都會操做狀態,一樣在共享鎖釋放的時候,也須要將狀態釋放。好比說,一個限定必定數量訪問的同步工具,每次獲取都是共享的,可是若是超過了必定的數量,將會阻塞後續的獲取操做,只有當以前獲取的消費者將狀態釋放纔可使阻塞的獲取操做得以運行。
上述邏輯主要就是調用同步器的tryReleaseShared方法來釋放狀態,並同時在doReleaseShared方法中喚醒其後繼節點。
在上述對同步器AbstractQueuedSynchronizer進行了實現層面的分析以後,咱們經過一個例子來加深對同步器的理解:
設計一個同步工具,該工具在同一時刻,只能有兩個線程可以並行訪問,超過限制的其餘線程進入阻塞狀態。
對於這個需求,能夠利用同步器完成一個這樣的設定,定義一個初始狀態,爲2,一個線程進行獲取那麼減1,一個線程釋放那麼加1,狀態正確的範圍在[0,1,2]三個之間,當在0時,表明再有新的線程對資源進行獲取時只能進入阻塞狀態(注意在任什麼時候候進行狀態變動的時候均須要以CAS做爲原子性保障)。因爲資源的數量多於1個,同時能夠有兩個線程佔有資源,所以須要實現tryAcquireShared和tryReleaseShared方法,這裏謝謝luoyuyou和同事小明指正,已經修改了實現。
01 public class TwinsLock implements Lock { 02 private final Sync sync = new Sync(2); 03 04 private static final class Sync extends AbstractQueuedSynchronizer { 05 private static final long serialVersionUID = -7889272986162341211L; 06 07 Sync(int count) { 08 if (count <= 0) { 09 throw new IllegalArgumentException("count must large than zero."); 10 } 11 setState(count); 12 } 13 14 public int tryAcquireShared(int reduceCount) { 15 for (;;) { 16 int current = getState(); 17 int newCount = current - reduceCount; 18 if (newCount < 0 || compareAndSetState(current, newCount)) { 19 return newCount; 20 } 21 } 22 } 23 24 public boolean tryReleaseShared(int returnCount) { 25 for (;;) { 26 int current = getState(); 27 int newCount = current + returnCount; 28 if (compareAndSetState(current, newCount)) { 29 return true; 30 } 31 } 32 } 33 } 34 35 public void lock() { 36 sync.acquireShared(1); 37 } 38 39 public void lockInterruptibly() throws InterruptedException { 40 sync.acquireSharedInterruptibly(1); 41 } 42 43 public boolean tryLock() { 44 return sync.tryAcquireShared(1) >= 0; 45 } 46 47 public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { 48 return sync.tryAcquireSharedNanos(1, unit.toNanos(time)); 49 } 50 51 public void unlock() { 52 sync.releaseShared(1); 53 } 54 55 @Override 56 public Condition newCondition() { 57 return null; 58 } 59 }
這裏咱們編寫一個測試來驗證TwinsLock是否可以正常工做並達到預期。
01 public class TwinsLockTest { 02 03 @Test 04 public void test() { 05 final Lock lock = new TwinsLock(); 06 07 class Worker extends Thread { 08 public void run() { 09 while (true) { 10 lock.lock(); 11 12 try { 13 Thread.sleep(1000L); 14 System.out.println(Thread.currentThread()); 15 Thread.sleep(1000L); 16 } catch (Exception ex) { 17 18 } finally { 19 lock.unlock(); 20 } 21 } 22 } 23 } 24 25 for (int i = 0; i < 10; i++) { 26 Worker w = new Worker(); 27 w.start(); 28 } 29 30 new Thread() { 31 public void run() { 32 while (true) { 33 34 try { 35 Thread.sleep(200L); 36 System.out.println(); 37 } catch (Exception ex) { 38 39 } 40 } 41 } 42 }.start(); 43 44 try { 45 Thread.sleep(20000L); 46 } catch (InterruptedException e) { 47 e.printStackTrace(); 48 } 49 } 50 }
上述測試用例的邏輯主要包括:1. 打印線程Worker在兩次睡眠之間打印自身線程,若是一個時刻只能有兩個線程同時訪問,那麼打印出來的內容將是成對出現。2. 分隔線程不停的打印換行,能讓Worker的輸出看起來更加直觀。該測試的結果是在一個時刻,僅有兩個線程可以得到到鎖,並完成打印,而表象就是打印的內容成對出現。