在併發編程的場景中,最多見的一個case是某個任務的執行,須要等到多個線程都執行完畢以後才能夠進行,CountDownLatch能夠很好解決這個問題前端
下面將主要從使用和實現原理兩個方面進行說明,圍繞點以下java
同步輔助類,在完成一組正在其餘線程中執行的操做以前,它容許一個或多個線程一直等待node
比較有意思的是,CountDownLatch
並未繼承自其餘的類or接口,在jdk中這樣的類並很少見(多半是我孤陋寡聞)編程
在使用以前,得先了解下其定義的幾個方法多線程
// 構造器,必須指定一個大於零的計數 public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } // 線程阻塞,直到計數爲0的時候喚醒;能夠響應線程中斷退出阻塞 public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } // 線程阻塞一段時間,若是計數依然不是0,則返回false;不然返回true public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } // 計數-1 public void countDown() { sync.releaseShared(1); } // 獲取計數 public long getCount() { return sync.getCount(); }
也就幾個接口,基本上都是比較常見的了,須要注意的是不要把 await()
和 Object#wait()
方法弄混了,不然就gg思密達了...併發
依然以講解 ReentrantLock
中的例子來講明,多線程實現累加框架
線程1實現 10加到100 線程2實現 100加到200 線程3實現 線程1和線程2計算結果的和
實現以下oop
public class CountDownLatchDemo { private CountDownLatch countDownLatch; private int start = 10; private int mid = 100; private int end = 200; private volatile int tmpRes1, tmpRes2; private int add(int start, int end) { int sum = 0; for (int i = start; i <= end; i++) { sum += i; } return sum; } private int sum(int a, int b) { return a + b; } public void calculate() { countDownLatch = new CountDownLatch(2); Thread thread1 = new Thread(() -> { try { // 確保線程3先與1,2執行,因爲countDownLatch計數不爲0而阻塞 Thread.sleep(100); System.out.println(Thread.currentThread().getName() + " : 開始執行"); tmpRes1 = add(start, mid); System.out.println(Thread.currentThread().getName() + " : calculate ans: " + tmpRes1); } catch (InterruptedException e) { e.printStackTrace(); } finally { countDownLatch.countDown(); } }, "線程1"); Thread thread2 = new Thread(() -> { try { // 確保線程3先與1,2執行,因爲countDownLatch計數不爲0而阻塞 Thread.sleep(100); System.out.println(Thread.currentThread().getName() + " : 開始執行"); tmpRes2 = add(mid + 1, end); System.out.println(Thread.currentThread().getName() + " : calculate ans: " + tmpRes2); } catch (InterruptedException e) { e.printStackTrace(); } finally { countDownLatch.countDown(); } }, "線程2"); Thread thread3 = new Thread(()-> { try { System.out.println(Thread.currentThread().getName() + " : 開始執行"); countDownLatch.await(); int ans = sum(tmpRes1, tmpRes2); System.out.println(Thread.currentThread().getName() + " : calculate ans: " + ans); } catch (InterruptedException e) { e.printStackTrace(); } }, "線程3"); thread3.start(); thread1.start(); thread2.start(); } public static void main(String[] args) throws InterruptedException { CountDownLatchDemo demo = new CountDownLatchDemo(); demo.calculate(); Thread.sleep(1000); } }
輸出學習
線程3 : 開始執行 線程1 : 開始執行 線程2 : 開始執行 線程1 : calculate ans: 5005 線程2 : calculate ans: 15050 線程3 : calculate ans: 20055
看了上面的定義和Demo以後,使用就會簡單一點了,通常流程如ui
CountDownLatch countDown = new CountDownLatch(2)
countDown.countDown()
countDown.await()
實現阻塞同步注意
countDown()
計數-1方法;必須有線程顯示調用了 await()
方法(沒有這個就沒有必要使用CountDownLatch了)await(long, TimeUnit)
來替代直接使用await()
方法,至少不會形成阻塞死只能重啓的狀況有興趣的小夥伴能夠對比下這個實現與 《Java併發學習之ReentrantLock的工做原理及使用姿式》中的demo,明顯感受使用CountDownLatch
優雅得多(後面有機會介紹用更有意思的Fork/Join來實現累加)
前面給了一個demo演示如何用,那這個東西在實際的業務場景中是否會用到呢?
由於確實在一個業務場景中使用到了,否則也就不會單獨撈出這一節...
電商的詳情頁,由衆多的數據拼裝組成,如能夠分紅一下幾個模塊
上面的幾個模塊信息,都是從不一樣的服務獲取信息,且彼此沒啥關聯;因此爲了提升響應,徹底能夠作成併發獲取數據,如
可是最終拼裝數據並返回給前端,須要等到上面的全部信息都獲取完畢以後,才能返回,這個場景就很是的適合 CountDownLatch
來作了
CountDownLatch#await(long, TimeUnit)
等待全部的模塊信息返回CountDownLatch#countDown()
進行計數-1同ReentrantLock同樣,依然是藉助AQS的雙端隊列,來實現原子的計數-1,線程阻塞和喚醒
前面《Java併發學習之ReentrantLock的工做原理及使用姿式》 介紹了AQS的結構,方便查看,下面直接貼出
AQS是一個用於構建鎖和同步容器的框架。事實上concurrent包內許多類都是基於AQS構建,例如ReentrantLock,Semaphore,CountDownLatch,ReentrantReadWriteLock,FutureTask等。AQS解決了在實現同步容器時設計的大量細節問題
AQS使用一個FIFO的隊列表示排隊等待鎖的線程,隊列頭節點稱做「哨兵節點」或者「啞節點」,它不與任何線程關聯。其餘的節點與等待線程關聯,每一個節點維護一個等待狀態waitStatus
private transient volatile Node head; private transient volatile Node tail; private volatile int state; static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; /** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ static final int PROPAGATE = -3; //取值爲 CANCELLED, SIGNAL, CONDITION, PROPAGATE 之一 volatile int waitStatus; volatile Node prev; volatile Node next; // Link to next node waiting on condition, // or the special value SHARED volatile Thread thread; Node nextWaiter; }
CountDownLatch內部實現了AQS,並覆蓋了tryAcquireShared()
和tryReleaseShared()
兩個方法,下面說明幹嗎用的
經過前面的使用,清楚了計數器的構造必須指定計數值,這個直接初始化了 AQS內部的state變量
Sync(int count) { setState(count); }
後續的計數-1/判斷是否可用都是基於sate進行的
// 計數-1 public void countDown() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { // 首先嚐試釋放鎖 doReleaseShared(); return true; } return false; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) //若是計數已經爲0,則返回失敗 return false; int nextc = c-1; // 原子操做實現計數-1 if (compareAndSetState(c, nextc)) return nextc == 0; } } // 喚醒被阻塞的線程 private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { // 隊列非空,表示有線程被阻塞 int ws = h.waitStatus; if (ws == Node.SIGNAL) { // 頭結點若是爲SIGNAL,則喚醒頭結點下個節點上關聯的線程,並出隊 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // 沒有線程被阻塞,直接跳出 break; } }
上面截出計數減1的完整調用鏈
tryReleaseShared
,實現計數-1doReleaseShared
疑問一: 看到這個實現,是否是隻要countDownLatch的計數爲0了,全部被阻塞的線程都會被執行?
改下上面的demo,新增線程4,實現線程2的結果-線程1的結果
public class CountDownLatchDemo { // ...省略重複 private int sub(int a, int b) { return a - b; } public void calculate() { countDownLatch = new CountDownLatch(2); Thread thread1 = // ... ; Thread thread2 = // ...; Thread thread3 = new Thread(()-> { try { System.out.println(Thread.currentThread().getName() + " : 開始執行"); countDownLatch.await(); System.out.println(Thread.currentThread().getName() + " : 喚醒"); Thread.sleep(100); // 確保線程4先執行完相減 int ans = sum(tmpRes1, tmpRes2); System.out.println(Thread.currentThread().getName() + " : calculate ans: " + ans); } catch (InterruptedException e) { e.printStackTrace(); } }, "線程3"); Thread thread4 = new Thread(()-> { try { System.out.println(Thread.currentThread().getName() + " : 開始執行"); countDownLatch.await(); System.out.println(Thread.currentThread().getName() + " : 喚醒"); int ans = sub(tmpRes2, tmpRes1); Thread.sleep(200); // 保證線程3先輸出執行結果,以驗證線程3和線程4是否併發執行 System.out.println(Thread.currentThread().getName() + " : calculate ans: " + ans); } catch (InterruptedException e) { e.printStackTrace(); } }, "線程4"); thread3.start(); thread4.start(); thread1.start(); thread2.start(); } public static void main(String[] args) throws InterruptedException { CountDownLatchDemo demo = new CountDownLatchDemo(); demo.calculate(); Thread.sleep(1000); } }
輸出以下
線程4 : 開始執行 線程3 : 開始執行 線程2 : 開始執行 線程2 : calculate ans: 15050 線程1 : 開始執行 線程1 : calculate ans: 5005 線程3 : 喚醒 線程4 : 喚醒 線程3 : calculate ans: 20055 線程4 : calculate ans: 10045
上面的實現中,線程3中sleep一段時間,確保線程4的計算會優先執行;線程4計算完成以後的sleep時間,以保證線程3計算完成並輸出結果,而後線程4才輸出結果;結合輸出,這個指望是準確的,也就是說,線程3和線程4被喚醒後是併發執行的,沒有前後阻塞順序
即CountDownLatch計數爲0以後,全部被阻塞的線程都會被喚醒,且彼此相對獨立,不會出現獨佔鎖阻塞的問題
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) // 若線程中端,直接拋異常 throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } // 計數爲0時,表示獲取鎖成功 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } // 阻塞,併入隊 private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); // 入隊 boolean failed = true; try { for (;;) { // 獲取前驅節點 final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { // 獲取鎖成功,設置隊列頭爲node節點 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) // 線程掛起 && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
阻塞的邏輯相對簡單
countDown()
計數-1方法;必須有線程顯示調用了await()
方法(沒有這個就沒有必要使用CountDownLatch了)await(long, TimeUnit)
來替代直接使用await()
方法,至少不會形成阻塞死只能重啓的狀況await
方法,當計數爲0後,全部被阻塞的線程都會被喚醒await內部實現流程:
countDown內部實現流程:
tryReleaseShared
,實現計數-1doReleaseShared