CountDownLatch所描述的是」在完成一組正在其餘線程中執行的操做以前,它容許一個或多個線程一直等待「:用給定的計數 初始化 CountDownLatch。因爲調用了 countDown() 方法,因此在當前計數到達零以前,await 方法會一直受阻塞。以後,會釋放全部等待的線程,await 的全部後續調用都將當即返回。CountDownLatch的本質也是一個"共享鎖"node
\併發
CountDownLatch(int count) 構造一個用給定計數初始化的 CountDownLatch。 // 使當前線程在鎖存器倒計數至零以前一直等待,除非線程被中斷。 void await() // 使當前線程在鎖存器倒計數至零以前一直等待,除非線程被中斷或超出了指定的等待時間。 boolean await(long timeout, TimeUnit unit) // 遞減鎖存器的計數,若是計數到達零,則釋放全部等待的線程。 void countDown() // 返回當前計數。 long getCount() // 返回標識此鎖存器及其狀態的字符串。 String toString()
CountDownLatch是經過一個計數器來實現的,當咱們在new 一個CountDownLatch對象的時候須要帶入該計數器值,該值就表示了線程的數量。每當一個線程完成本身的任務後,計數器的值就會減1。當計數器的值變爲0時,就表示全部的線程均已經完成了任務,而後就能夠恢復等待的線程繼續執行了。ide
雖然,CountDownlatch與CyclicBarrier(後續會接受。另一併發工具類)區別:工具
經過上面的結構圖咱們能夠看到,CountDownLatch內部依賴Sync實現,而Sync繼承AQS。CountDownLatch僅提供了一個構造方法:ui
CountDownLatch(int count) : 構造一個用給定計數初始化的 CountDownLatchthis
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
sync爲CountDownLatch的一個內部類,其定義以下:spa
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } //獲取同步狀態 int getCount() { return getState(); } //獲取同步狀態 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } //釋放同步狀態 protected boolean tryReleaseShared(int releases) { for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } }
經過這個內部類Sync咱們能夠清楚地看到CountDownLatch是採用共享鎖來實現的。線程
CountDownLatch提供await()方法來使當前線程在鎖存器倒計數至零以前一直等待,除非線程被中斷,定義以下:code
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
await其內部使用AQS的acquireSharedInterruptibly(int arg):對象
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
在內部類Sync中重寫了tryAcquireShared(int arg)方法:
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
getState()獲取同步狀態,其值等於計數器的值,從這裏咱們能夠看到若是計數器值不等於0,則會調用doAcquireSharedInterruptibly(int arg),該方法爲一個自旋方法會嘗試一直去獲取同步狀態:
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { /** * 對於CountDownLatch而言,若是計數器值不等於0,那麼r 會一直小於0 */ int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //等待 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
CountDownLatch提供countDown() 方法遞減鎖存器的計數,若是計數到達零,則釋放全部等待的線程。
public void countDown() { sync.releaseShared(1); }
內部調用AQS的releaseShared(int arg)方法來釋放共享鎖同步狀態:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
tryReleaseShared(int arg)方法被CountDownLatch的內部類Sync重寫:
protected boolean tryReleaseShared(int releases) { for (;;) { //獲取鎖狀態 int c = getState(); //c == 0 直接返回,釋放鎖成功 if (c == 0) return false; //計算新「鎖計數器」 int nextc = c-1; //更新鎖狀態(計數器) if (compareAndSetState(c, nextc)) return nextc == 0; } }
CountDownLatch內部經過共享鎖實現。在建立CountDownLatch實例時,須要傳遞一個int型的參數:count,該參數爲計數器的初始值,也能夠理解爲該共享鎖能夠獲取的總次數。當某個線程調用await()方法,程序首先判斷count的值是否爲0,若是不會0的話則會一直等待直到爲0爲止。當其餘線程調用countDown()方法時,則執行釋放共享鎖狀態,使count值 - 1。當在建立CountDownLatch時初始化的count參數,必需要有count線程調用countDown方法纔會使計數器count等於0,鎖纔會釋放,前面等待的線程纔會繼續運行。注意CountDownLatch不能回滾重置。
示例仍然使用開會案例。老闆進入會議室等待5我的所有到達會議室纔會開會。因此這裏有兩個線程老闆等待開會線程、員工到達會議室:
public class CountDownLatchTest { private volatile static CountDownLatch countDownLatch = new CountDownLatch(5); /** * Boss線程,等待員工到達開會 */ static class BossThread extends Thread{ BossThread(String name){ super(name); } @Override public void run() { System.out.println(Thread.currentThread().getName() + ":Boss在會議室等待,總共有" + countDownLatch.getCount() + "我的開會..."); try { //Boss等待 countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + ":全部人都已經到齊了,開會吧..."); } } //員工到達會議室 static class EmpleoyeeThread extends Thread{ @Override public void run() { System.out.println(Thread.currentThread().getName() + ",到達會議室...."); //員工到達會議室 count - 1 countDownLatch.countDown(); } } public static void main(String[] args) throws InterruptedException{ //Boss線程啓動 new BossThread("張總").start(); new BossThread("李總").start(); new BossThread("王總").start(); Thread.sleep(1000); for(int i = 0 ; i < 5 ; i++){ new EmpleoyeeThread().start(); } } }
張總:Boss在會議室等待,總共有5我的開會... 李總:Boss在會議室等待,總共有5我的開會... 王總:Boss在會議室等待,總共有5我的開會... Thread-0,到達會議室.... Thread-1,到達會議室.... Thread-2,到達會議室.... Thread-3,到達會議室.... Thread-4,到達會議室.... 張總:全部人都已經到齊了,開會吧... 王總:全部人都已經到齊了,開會吧... 李總:全部人都已經到齊了,開會吧...