JUC 中倒數計數器 CountDownLatch 的使用與原理分析,當須要等待多個線程執行完畢後在作一件事情時候 CountDownLatch 是比調用線程的 join 方法更好的選擇,CountDownLatch 與 線程的 join 方法區別是什麼?java
平常開發中常常會遇到須要在主線程中開啓多線程去並行執行任務,而且主線程須要等待全部子線程執行完畢後再進行彙總的場景,它的內部提供了一個計數器,在構造閉鎖時必須指定計數器的初始值,且計數器的初始值必須大於0。另外它還提供了一個countDown方法來操做計數器的值,每調用一次countDown方法計數器都會減1,直到計數器的值減爲0時就表明條件已成熟,全部因調用await方法而阻塞的線程都會被喚醒。這就是CountDownLatch的內部機制,看起來很簡單,無非就是阻塞一部分線程讓其在達到某個條件以後再執行。可是CountDownLatch的應用場景卻比較普遍,只要你腦洞夠大利用它就能夠玩出各類花樣。最多見的一個應用場景是開啓多個線程同時執行某個任務,等到全部任務都執行完再統計彙總結果。下圖動態演示了閉鎖阻塞線程的整個過程。多線程
在CountDownLatch出現以前通常都是使用線程的join()方法來實現,可是join不夠靈活,不可以知足不一樣場景的需求。接下來咱們看看CountDownLatch的原理實現。ide
一.CountDownLatch原理探究函數
從CountDownLatch的名字能夠猜想內部應該有個計數器,而且這個計數器是遞減的,下面就經過源碼看看JDK開發組是什麼時候初始化計數器,什麼時候遞減的,計數器變爲 0 的時候作了什麼操做,多個線程是如何經過計時器值實現同步的,首先咱們先看看CountDownLatch內部結構,類圖以下:測試
從類圖能夠知道CountDownLatch內部仍是使用AQS實現的,經過下面構造函數初始化計數器的值,可知其實是把計數器的值賦值給了AQS的state,也就是這裏AQS的狀態值來表示計數器值。ui
構造函數源碼以下:this
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } Sync(int count) { setState(count); }
接下來主要看一下CountDownLatch中幾個重要的方法內部是如何調用AQS來實現功能的。atom
1.void await()方法,當前線程調用了CountDownLatch對象的await方法後,當前線程會被阻塞,直到下面的狀況之一纔會返回:(1)當全部線程都調用了CountDownLatch對象的countDown方法後,spa
也就是說計時器值爲 0 的時候。(2)其餘線程調用了當前線程的interrupt()方法中斷了當前線程,當前線程會拋出InterruptedException異常後返回。接下來讓咱們看看await()方法內部是如何調用線程
AQS的方法的,源碼以下:
//CountDownLatch的await()方法 public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } //AQS的獲取共享資源時候可被中斷的方法 public final void acquireSharedInterruptibly(int arg)throws InterruptedException { //若是線程被中斷則拋異常 if (Thread.interrupted()) throw new InterruptedException(); //嘗試看當前是否計數值爲0,爲0則直接返回,否者進入AQS的隊列等待 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } //sync類實現的AQS的接口 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
從上面代碼能夠看到await()方法委託sync調用了AQS的acquireSharedInterruptibly方法,該方法的特色是線程獲取資源的時候能夠被中斷,而且獲取到的資源是共享資源,這裏爲何要調用AQS的這個方法,而不是調用獨佔鎖的accquireInterruptibly方法呢?這是由於這裏狀態值須要的並非非 0 即 1 的效果,而是和初始化時候指定的計數器值有關係,好比你初始化的時候計數器值爲 8 ,那麼state的值應該就有 0 到 8 的狀態,而不是隻有 0 和 1 的獨佔效果。
這裏await()方法調用acquireSharedInterruptibly的時候傳遞的是 1 ,就是說明要獲取一個資源,而這裏計數器值是資源總數,也就是意味着是讓總的資源數減 1 ,acquireSharedInterruptibly內部首先判斷若是當前線程被中斷了則拋出異常,不然調用sync實現的tryAcquireShared方法看當前狀態值(計數器值)是否爲 0 ,是則當前線程的await()方法直接返回,不然調用AQS的doAcquireSharedInterruptibly讓當前線程阻塞。另外調用tryAcquireShared的方法僅僅是檢查當前狀態值是否是爲 0 ,並無調用CAS讓當前狀態值減去 1 。
2.boolean await(long timeout, TimeUnit unit),當線程調用了 CountDownLatch 對象的該方法後,當前線程會被阻塞,直到下面的狀況之一發生纔會返回: (1)當全部線程都調用了 CountDownLatch 對象的 countDown 方法後,也就是計時器值爲 0 的時候,這時候返回 true; (2) 設置的 timeout 時間到了,由於超時而返回 false; (3)其它線程調用了當前線程的 interrupt()方法中斷了當前線程,當前線程會拋出 InterruptedException 異常後返回。源碼以下:
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }
3.void countDown() 當前線程調用了該方法後,會遞減計數器的值,遞減後若是計數器爲 0 則會喚醒全部調用await 方法而被阻塞的線程,不然什麼都不作,接下來看一下countDown()方法內部是如何調用AQS的方法的,源碼以下:
//CountDownLatch的countDown()方法 public void countDown() { //委託sync調用AQS的方法 sync.releaseShared(1); } //AQS的方法 public final boolean releaseShared(int arg) { //調用sync實現的tryReleaseShared if (tryReleaseShared(arg)) { //AQS的釋放資源方法 doReleaseShared(); return true; } return false; }
如上面代碼能夠知道CountDownLatch的countDown()方法是委託sync調用了AQS的releaseShared方法,後者調用了sync 實現的AQS的tryReleaseShared,源碼以下:
//syn的方法 protected boolean tryReleaseShared(int releases) { //循環進行cas,直到當前線程成功完成cas使計數值(狀態值state)減一併更新到state for (;;) { int c = getState(); //若是當前狀態值爲0則直接返回(1) if (c == 0) return false; //CAS設置計數值減一(2) int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
如上代碼能夠看到首先獲取當前狀態值(計數器值),代碼(1)若是當前狀態值爲 0 則直接返回 false ,則countDown()方法直接返回;不然執行代碼(2)使用CAS設置計數器減一,CAS失敗則循環重試,不然若是當前計數器爲 0 則返回 true 。返回 true 後,說明當前線程是最後一個調用countDown()方法的線程,那麼該線程除了讓計數器減一外,還須要喚醒調用CountDownLatch的await 方法而被阻塞的線程。這裏的代碼(1)貌似是多餘的,其實否則,之因此添加代碼 (1) 是爲了防止計數器值爲 0 後,其餘線程又調用了countDown方法,若是沒有代碼(1),狀態值就會變成負數。
4.long getCount() 獲取當前計數器的值,也就是 AQS 的 state 的值,通常在 debug 測試時候使用,源碼以下:
public long getCount() { return sync.getCount(); } int getCount() { return getState(); }
如上代碼可知內部仍是調用了 AQS 的 getState 方法來獲取 state 的值(計數器當前值)。
到目前爲止原理理解的差很少了,接下來用一個例子進行講解CountDownLatch的用法,例子以下:
package com.hjc; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; /** * Created by cong on 2018/7/6. */ public class CountDownLatchTest { private static AtomicInteger id = new AtomicInteger(); // 建立一個CountDownLatch實例,管理計數爲ThreadNum private static volatile CountDownLatch countDownLatch = new CountDownLatch(3); public static void main(String[] args) throws InterruptedException { Thread threadOne = new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(3000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("【玩家" + id.getAndIncrement() + "】已入場"); countDownLatch.countDown(); } }); Thread threadTwo = new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(2000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("【玩家" + id.getAndIncrement() + "】已入場"); countDownLatch.countDown(); } }); Thread threadThree = new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("【玩家" + id.getAndIncrement() + "】已入場"); countDownLatch.countDown(); } }); // 啓動子線程 threadOne.start(); threadTwo.start(); threadThree.start(); System.out.println("等待鬥地主玩家進場"); // 等待子線程執行完畢,返回 countDownLatch.await(); System.out.println("鬥地主玩家已經滿人,開始發牌....."); } }
運行結果以下:
如上代碼,建立了一個 CountDownLatch 實例,由於有兩個子線程因此構造函數參數傳遞爲 3,主線程調用 countDownLatch.await()方法後會被阻塞。子線程執行完畢後調用 countDownLatch.countDown() 方法讓 countDownLatch 內部的計數器減一,等全部子線程執行完畢調用 countDown()後計數器會變爲 0,這時候主線程的 await()纔會返回。
若是把上面的代碼中Thread.sleep和countDownLatch.await()的代碼註釋掉,運行幾遍,運行結果就可能會出現以下結果,以下圖:
能夠看到在註釋掉latch.await()這行以後,就不能保證在全部玩家入場後纔開始發牌了。
總結:CountDownLatch 與 join 方法的區別,一個區別是調用一個子線程的 join()方法後,該線程會一直被阻塞直到該線程運行完畢,而 CountDownLatch 則使用計數器容許子線程運行完畢或者運行中時候遞減計數,也就是 CountDownLatch 能夠在子線程運行任什麼時候候讓 await 方法返回而不必定必須等到線程結束;另外使用線程池來管理線程時候通常都是直接添加 Runable 到線程池這時候就沒有辦法在調用線程的 join 方法了,countDownLatch 相比 Join 方法讓咱們對線程同步有更靈活的控制。