一塊兒在java1.5被引入的併發工具類還有CountDownLatch、CyclicBarrier、Semaphore、ConcurrentHashMap和BlockingQueue,它們都存在於java.util.concurrent包下。html
CountDownLatch這個類可以使一個線程等待其餘線程完成各自的工做後再執行。例如,應用程序的主線程但願在負責啓動框架服務的線程已經啓動全部的框架服務以後再執行。java
CountDownLatch是經過一個計數器來實現的,計數器的初始值爲線程的數量。每當一個線程完成了本身的任務後,計數器的值就會減1。當計數器值到達0時,它表示全部的線程已經完成了任務,而後在閉鎖上等待的線程就能夠恢復執行任務。併發
圖示:框架
僞代碼:dom
//Main thread start //Create CountDownLatch for N threads //Create and start N threads //Main thread wait on latch //N threads completes there tasks are returns //Main thread resume execution
CountDownLatch.java類中定義的構造函數:ide
//Constructs a CountDownLatch initialized with the given count. public void CountDownLatch(int count) {...}
構造器中的計數值(count)實際上就是閉鎖須要等待的線程數量。這個值只能被設置一次,並且CountDownLatch沒有提供任何機制去從新設置這個計數值。函數
與CountDownLatch的第一次交互是主線程等待其餘線程。主線程必須在啓動其餘線程後當即調用CountDownLatch.await()方法。這樣主線程的操做就會在這個方法上阻塞,直到其餘線程完成各自的任務。工具
其餘N 個線程必須引用閉鎖對象,由於他們須要通知CountDownLatch對象,他們已經完成了各自的任務。這種通知機制是經過 CountDownLatch.countDown()方法來完成的;每調用一次這個方法,在構造函數中初始化的count值就減1。因此當N個線程都調 用了這個方法,count的值等於0,而後主線程就能經過await()方法,恢復執行本身的任務。測試
咱們嘗試羅列出在java實時系統中CountDownLatch都有哪些使用場景。我所羅列的都是我所能想到的。若是你有別的可能的使用方法,請在留言裏列出來,這樣會幫助到你們。this
在這個例子中,我模擬了一個應用程序啓動類,它開始時啓動了n個線程類,這些線程將檢查外部系統並通知閉鎖,而且啓動類一直在閉鎖上等待着。一旦驗證和檢查了全部外部服務,那麼啓動類恢復執行。
BaseHealthChecker.java:這個類是一個Runnable,負責全部特定的外部服務健康的檢測。它刪除了重複的代碼和閉鎖的中心控制代碼。
public abstract class BaseHealthChecker implements Runnable { private CountDownLatch _latch; private String _serviceName; private boolean _serviceUp; //Get latch object in constructor so that after completing the task, thread can countDown() the latch public BaseHealthChecker(String serviceName, CountDownLatch latch) { super(); this._latch = latch; this._serviceName = serviceName; this._serviceUp = false; } @Override public void run() { try { verifyService(); _serviceUp = true; } catch (Throwable t) { t.printStackTrace(System.err); _serviceUp = false; } finally { if(_latch != null) { _latch.countDown(); } } } public String getServiceName() { return _serviceName; } public boolean isServiceUp() { return _serviceUp; } //This methos needs to be implemented by all specific service checker public abstract void verifyService(); }
NetworkHealthChecker.java:這個類繼承了BaseHealthChecker,實現了verifyService()方法。DatabaseHealthChecker.java和CacheHealthChecker.java除了服務名和休眠時間外,與NetworkHealthChecker.java是同樣的。
public class NetworkHealthChecker extends BaseHealthChecker { public NetworkHealthChecker (CountDownLatch latch) { super("Network Service", latch); } @Override public void verifyService() { System.out.println("Checking " + this.getServiceName()); try { Thread.sleep(7000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(this.getServiceName() + " is UP"); } }
ApplicationStartupUtil.java:這個類是一個主啓動類,它負責初始化閉鎖,而後等待,直到全部服務都被檢測完。
public class ApplicationStartupUtil { //List of service checkers private static List<BaseHealthChecker> _services; //This latch will be used to wait on private static CountDownLatch _latch; private ApplicationStartupUtil() { } private final static ApplicationStartupUtil INSTANCE = new ApplicationStartupUtil(); public static ApplicationStartupUtil getInstance() { return INSTANCE; } public static boolean checkExternalServices() throws Exception { //Initialize the latch with number of service checkers _latch = new CountDownLatch(3); //All add checker in lists _services = new ArrayList<BaseHealthChecker>(); _services.add(new NetworkHealthChecker(_latch)); _services.add(new CacheHealthChecker(_latch)); _services.add(new DatabaseHealthChecker(_latch)); //Start service checkers using executor framework Executor executor = Executors.newFixedThreadPool(_services.size()); for(final BaseHealthChecker v : _services) { executor.execute(v); } //Now wait till all services are checked _latch.await(); //Services are file and now proceed startup for(final BaseHealthChecker v : _services) { if( ! v.isServiceUp()) { return false; } } return true; } }
如今你能夠寫測試代碼去檢測一下閉鎖的功能了。
public class Main { public static void main(String[] args) { boolean result = false; try { result = ApplicationStartupUtil.checkExternalServices(); } catch (Exception e) { e.printStackTrace(); } System.out.println("External services validation completed !! Result was :: "+ result); } }
輸出以下:
//Output in console: Checking Network Service Checking Cache Service Checking Database Service Database Service is UP Cache Service is UP Network Service is UP External services validation completed !! Result was :: true
主要的方法就是一個:await()。
await() 方法每被調用一次,計數便會減小1,並阻塞住當前線程。當計數減至0時,阻塞解除,全部在此 CyclicBarrier 上面阻塞的線程開始運行。在這以後,若是再次調用 await() 方法,計數就又會變成 N-1,新一輪從新開始,這即是 Cyclic 的含義所在。
在全部參與者都已經在此 barrier 上調用 await方法以前,將一直等待。若是當前線程不是將到達的最後一個線程,出於調度目的,將禁用它,且在發生如下狀況之一前,該線程將一直處於休眠狀態:
CyclicBarrier 的使用並不難,但須要注意它所相關的異常。除了常見的異常,CyclicBarrier.await() 方法會拋出一個獨有的 BrokenBarrierException。這個異常發生在當某個線程在等待本 CyclicBarrier 時被中斷或超時或被重置時,其它一樣在這個 CyclicBarrier 上等待的線程便會受到 BrokenBarrierException。意思就是說,同志們,別等了,有個小夥伴已經掛了,我們若是繼續等有可能會一直等下去,全部各回各家吧。
CyclicBarrier.await() 方法帶有返回值,用來表示當前線程是第幾個到達這個 Barrier 的線程。
和 CountDownLatch 同樣,CyclicBarrier 一樣能夠能夠在構造函數中設定總計數值。與 CountDownLatch 不一樣的是,CyclicBarrier 的構造函數還能夠接受一個 Runnable,會在 CyclicBarrier 被釋放時執行。
/** * Description: 賽跑時,等待全部人都準備好時,才起跑: * * @author shenlongguang * @date: 2017/4/27 下午1:50. */ public class CyclicBarrierTest { public static void main(String[] args) throws IOException, InterruptedException { //若是將參數改成4,可是下面只加入了3個選手,這永遠等待下去 //Waits until all parties have invoked await on this barrier. CyclicBarrier barrier = new CyclicBarrier(5); ExecutorService executor = Executors.newFixedThreadPool(5); executor.submit(new Thread(new Runner(barrier, " No.1"))); executor.submit(new Thread(new Runner(barrier, " No.2"))); executor.submit(new Thread(new Runner(barrier, " No.3"))); executor.submit(new Thread(new Runner(barrier, " No.4"))); executor.submit(new Thread(new Runner(barrier, " No.5"))); executor.shutdown(); } } class Runner implements Runnable { // 一個同步輔助類,它容許一組線程互相等待,直到到達某個公共屏障點 (common barrier point) private CyclicBarrier barrier; private String name; public Runner(CyclicBarrier barrier, String name) { super(); this.barrier = barrier; this.name = name; } @Override public void run() { try { Thread.sleep(1000 * (new Random()).nextInt(8)); System.out.println(System.currentTimeMillis()+ name + " ready..."); // barrier的await方法,在全部參與者都已經在此 barrier 上調用 await 方法以前,將一直等待。 barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.println(System.currentTimeMillis()+ name + " go!"); } }
console 輸出:
1493272909108 No.5 ready... 1493272910101 No.2 ready... 1493272914101 No.1 ready... 1493272915106 No.3 ready... 1493272915106 No.4 ready... 1493272915106 No.4 go! 1493272915107 No.5 go! 1493272915107 No.3 go! 1493272915107 No.1 go! 1493272915107 No.2 go!
CountDownLatch 適用於一組線程和另外一個主線程之間的工做協做。一個主線程等待一組工做線程的任務完畢才繼續它的執行是使用 CountDownLatch 的主要場景;
CyclicBarrier 用於一組或幾組線程,好比一組線程須要在一個時間點上達成一致,例如同時開始一個工做。另外,CyclicBarrier 的循環特性和構造函數所接受的 Runnable 參數也是 CountDownLatch 所不具有的
CountDownLatch英文原文when-to-use-countdownlatch-java-concurrency
CountDownLatch翻譯文何時使用CountDownLatch
cyclicbarrier示例the-introduction-and-use-of-cyclicbarrier