java5 中 ,提供了幾個併發工具類 ,Semaphore CountDownLatch CyclicBarrier,在併發編程中很是實用。前二者經過 內部類sync 繼承AQS,使用共享資源的模式,AQS的實現可參考個人另外一篇 AQS 實現分析,前二者根據各自功能需求 , 各自內部實現tryAcquireShared(獲取資源)、tryReleaseShared(釋放)。來定義什麼條件 下來獲取與釋放。而CyclicBarrier內部經過Reentrantlock與Condition組合的方式實現。java
與Reentrantlock相似,也有公平和非公平的機制。這裏就不在分析了,默認是非公平的。編程
經過acquire 獲取併發
Semphore public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); //非獨佔模式 }
AQS public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) //當返回值不小於0時,得到資源。 doAcquireSharedInterruptibly(arg); //資源獲取失敗,加入隊列尾部,阻塞 }
從AQS的分析中獲得,當tryAcquireShared 返回值小於0,那麼認爲獲取失敗。而對於tryAcquiredShared的實現中,讓其執行時,其返回值需大於0,對於Semaphore信號量,構造時會設置許可的大小 。表示可以獲取的資源,通常大於1。app
默認是非公平 的,然後會調用以下方法,會返回工具
Semaphore NonFairSync final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
減去本次嘗試的獲取的許可數,當結果小於0,直接返回,而若是不小於0 , cas更新許可,若是更新失敗,代表其它線程也在更新,然後 進入下一次循環,直到可用的小於0或者cas成功。當返回值小於0時,阻塞,等待喚醒資源釋放。返回值大於 0,獲取許可成功,繼續執行。oop
釋放ui
當獲取許可的線程執行完時,必須釋放佔有的許可量,this
Semaphore public void release() { sync.releaseShared(1); //釋放時需CAS更新,獨佔模式不需cas }
AQS public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); //喚醒head的後繼節點 return true; } return false; }
protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) //由於是 共享鎖,可能有多個線程釋放 return true; } }
示例:spa
項目中的示例:在開發的項目中,一個數據中心有不少個集羣,集羣中有不少個虛擬機實例,同一個集羣中有多個虛擬機要開機,可是開機前須要使用調度策略,須要更新集羣中的信息,可是隻容許一個集羣只有一個虛擬機能執行策略。這裏就能夠經過Semaphore實現。.net
clusterLockMap.putIfAbsent(cluster.getId(), new Semaphore(1)); //爲每一個集羣初始一個許可 clusterLockMap.get(cluster.getId()).acquire(); //嘗試獲取許可 //執行邏輯 //釋放資源,這裏調用drainPermits的緣由是,在釋放前清空許可, 由於其它 在阻塞的線程若是被中斷了,會將許可值擴大,因此在釋放前,將其清空。 finally { //保證了在任一時刻,只有一個虛擬機能調用策略 synchronized (clusterLockMap.get(cluster.getId())) { clusterLockMap.get(cluster.getId()).drainPermits(); //清空,返回清空數,爲空時直接返回0 clusterLockMap.get(cluster.getId()).release(); } }
上面的示例中若是不加鎖,異常發生時可能致使許可大於一。
相似於計數器,好比經常使用於 一個或幾個線程,要等待其它的線程執行完才能繼續執行。內部只有一個繼承了AQS的sync。只能使用一次。
經常使用方法 await ,用來等待執行信號。
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); //狀態不爲1時,阻塞。等待被喚醒 }
boolean await(long timeout, TimeUnit unit) 超時尚未爲0,返回false
CountDownLatch sync protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
判斷當前是否等於0 ,也就是說 countDown 的調用次數是否等於初始化的數量。
countDown 信號的釋放
CountDownLatch public void countDown() { sync.releaseShared(1); }
CountDownLatch sync protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
p實例:
F1 比賽前須要等待發車信號,全部的車才能出發。 CountDownLatch(1) ,車比如線程,發車前await。
信號發出,countDown。
F1車進入修息區,必須 全部的檢測 燈都爲綠才能出發。車出發前 await,每個必檢項檢測完調用countDown,才能出發(線程才能運行)。
CyclicBarrier 最大的特別 之處就是,在構造時能夠指定一個線程,而且能夠重複使用。
final Runnable barrierCommand;
該線程的做用在於,當調用await時,且許可爲0 時,執行完 barrierCommand的線程 才能繼續執行。
實現細節,類的結構圖以下:
構造方法時,能夠指定barrierCommand,若是不指定,則功能相似於CountDownLatch,也有點計數的意思 。其從成員變量 能夠看出,內部有ReentrantLock的屬性,內部多是經過ReentrantLock的調用實現。接下來分析源碼。
分析源碼前,幾個關鍵變量注意下,
lock 同步鎖,
trip = lock.newCondition()
count 計數器
generation 內部實現的類,用來表示是否新的計數的開始
構造時指定count,線程調用await,count減1 阻塞,直到count等於0,且barrierCommand執行完若是有的話。
調用方法有兩種:
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); }
內部實現
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); //被中斷,本次計數無效,並將標誌置爲broken throw new InterruptedException(); } int index = --count; // 計數減一 if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); //計數爲0時,執行run方法 ranAction = true; nextGeneration(); //本次計數結束,trip.singnalAll(),計數重置。 return 0; } finally { if (!ranAction) breakBarrier(); //出現異常,本次計數無效。喚醒其它await線程, } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) trip.await(); // 阻塞 else if (nanos > 0L) nanos = trip.awaitNanos(nanos); // 限時阻塞,調用await(long timeout, TimeUnit unit), } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) // 是否仍是同一個計數週期內 return index; // 順利執行,返回 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); //只有調用超時await方法,且超時,拋出。 } } } finally { lock.unlock(); } }
generation 的操做
順利執行後,當計數爲0,且週期重置。
private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); }
執行時出現異常或者等待超時,喚醒。計數無效,使喚醒的線程檢測到generation的broken 標識爲true,拋出BrokenBarrierException 。
private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); }
總結:
從這上面的分析能夠得出如下狀況
大部分的併發工具類,是經過構造內部類繼承AQS,並根據工具類設計的功能,實現對應的獲取與釋放資源的方法。來使得 調用線程什麼時候阻塞,什麼時候釋放。如CountDownLatch 就不一樣於Semaphore的設計思路,Semaphore是先嚐試獲取資源,獲取到才能釋放,獲取不到,阻塞。雖然實現方式(繼承AQS)同樣。而CountDownLatch 釋放資源 前沒有經過相應的方法獲取,而是直接將當前的許可數減一,實現方法 tryReleaseShared ,然後根據是否計數爲0決定是否喚醒線程。而對應的獲取資源方法 tryAcquireShared,判斷計數器是否已經到達0來判斷是否執行。或者阻塞。獲取後不須要釋放。
而CyclicBarrier 並無直接經過內部sync繼承AQS的方式,而是經過現有的工具類Reentantlock,與Condition組合來實現功能,且還能重用。