一線天色天宇星辰 IT哈哈
CountDownLauch是Java併發包中的一個同步工具集,常被人們稱之爲併發中的計數器,還有一種被成爲閉鎖!
CountDownLauch主要使用在兩種場景,一種被稱爲開關,它容許一個任務完成以前,一個或一組線程持續等待。此種狀況常常被稱之爲閉鎖,通俗的講就是,至關於一扇大門,在大門打開以前全部線程都被阻斷,一旦大門打開,全部線程都將經過,可是一旦大門打開,全部線程都經過了,那麼這個閉鎖的狀態就失效了,門的狀態也就不能變了,只能是打開狀態。另外一種場景常常被稱之爲計數器,它容許將一個任務拆分爲N個小任務,主線程在全部任務完成以前一直等待,每一個任務完成時將計數器減一,直到全部任務完成後取消主線程的阻塞。
咱們來看一下對應CountDownLauch對應的API。java
CountDownLatch維護了一個正數計數器,countDown方法對計數器作減操做,await方法等待計數器達到0。全部await的線程都會阻塞直到計數器爲0或者等待線程中斷或者超時。
咱們分別來看一下對應的一個應用實例:node
package com.yhj.lauth; import java.util.Date; import java.util.concurrent.CountDownLatch; //工人 class Worker extends Thread{ privateintworkNo;//工號 private CountDownLatch startLauch;//啓動器-閉鎖 private CountDownLatch workLauch;//工做進程-計數器 public Worker(int workNo,CountDownLatch startLauch,CountDownLatch workLauch) { this.workNo = workNo; this.startLauch = startLauch; this.workLauch = workLauch; } @Override publicvoid run() { try { System.out.println(new Date()+" - YHJ"+workNo+" 準備就緒!準備開工!"); startLauch.await();//等待老闆發指令 System.out.println(new Date()+" - YHJ"+workNo+" 正在幹活..."); Thread.sleep(100);//每人花100ms幹活 } catch (InterruptedException e) { e.printStackTrace(); }finally{ System.out.println(new Date()+" - YHJ"+workNo+" 工做完成!"); workLauch.countDown(); } } } //測試用例 publicclass CountDownLauthTestCase { publicstaticvoid main(String[] args) throws InterruptedException { int workerCount = 10;//工人數目 CountDownLatch startLauch = new CountDownLatch(1);//閉鎖至關於開關 CountDownLatch workLauch = new CountDownLatch(workerCount);//計數器 System.out.println(new Date()+" - Boss:集合準備開工了!"); for(int i=0;i<workerCount;++i){ new Worker(i, startLauch, workLauch).start(); } System.out.println(new Date()+" - Boss:休息2s後開工!"); Thread.sleep(2000); System.out.println(new Date()+" - Boss:開工!"); startLauch.countDown();//打開開關 workLauch.await();//任務完成後通知Boss System.out.println(new Date()+" - Boss:不錯!任務都完成了!收工回家!"); } } 執行結果: Sat Jun 08 18:59:33 CST 2013 - Boss:集合準備開工了! Sat Jun 08 18:59:33 CST 2013 - YHJ0 準備就緒!準備開工! Sat Jun 08 18:59:33 CST 2013 - YHJ2 準備就緒!準備開工! Sat Jun 08 18:59:33 CST 2013 - YHJ1 準備就緒!準備開工! Sat Jun 08 18:59:33 CST 2013 - YHJ4 準備就緒!準備開工! Sat Jun 08 18:59:33 CST 2013 - Boss:休息2s後開工! Sat Jun 08 18:59:33 CST 2013 - YHJ8 準備就緒!準備開工! Sat Jun 08 18:59:33 CST 2013 - YHJ6 準備就緒!準備開工! Sat Jun 08 18:59:33 CST 2013 - YHJ3 準備就緒!準備開工! Sat Jun 08 18:59:33 CST 2013 - YHJ7 準備就緒!準備開工! Sat Jun 08 18:59:33 CST 2013 - YHJ5 準備就緒!準備開工! Sat Jun 08 18:59:33 CST 2013 - YHJ9 準備就緒!準備開工! Sat Jun 08 18:59:35 CST 2013 - Boss:開工! Sat Jun 08 18:59:35 CST 2013 - YHJ0 正在幹活... Sat Jun 08 18:59:35 CST 2013 - YHJ2 正在幹活... Sat Jun 08 18:59:35 CST 2013 - YHJ1 正在幹活... Sat Jun 08 18:59:35 CST 2013 - YHJ4 正在幹活... Sat Jun 08 18:59:35 CST 2013 - YHJ8 正在幹活... Sat Jun 08 18:59:35 CST 2013 - YHJ6 正在幹活... Sat Jun 08 18:59:35 CST 2013 - YHJ3 正在幹活... Sat Jun 08 18:59:35 CST 2013 - YHJ7 正在幹活... Sat Jun 08 18:59:35 CST 2013 - YHJ5 正在幹活... Sat Jun 08 18:59:35 CST 2013 - YHJ9 正在幹活... Sat Jun 08 18:59:35 CST 2013 - YHJ5 工做完成! Sat Jun 08 18:59:35 CST 2013 - YHJ1 工做完成! Sat Jun 08 18:59:35 CST 2013 - YHJ3 工做完成! Sat Jun 08 18:59:35 CST 2013 - YHJ6 工做完成! Sat Jun 08 18:59:35 CST 2013 - YHJ7 工做完成! Sat Jun 08 18:59:35 CST 2013 - YHJ9 工做完成! Sat Jun 08 18:59:35 CST 2013 - YHJ4 工做完成! Sat Jun 08 18:59:35 CST 2013 - YHJ0 工做完成! Sat Jun 08 18:59:35 CST 2013 - YHJ2 工做完成! Sat Jun 08 18:59:35 CST 2013 - YHJ8 工做完成! Sat Jun 08 18:59:35 CST 2013 - Boss:不錯!任務都完成了!收工回家!
這個示例裏面使用了兩個CountDownLauch,分別構建了兩種場景,第一個startLauch至關於開關,在開啓以前,沒有任何一個線程執行,當開啓以後,全部線程同時能夠執行。第二個workerLauch其實就是一個計數器,當計數器沒有減到零的時候,主線程一直等待,當全部線程執行完畢後,主線程取消阻塞繼續執行!
第二種場景在咱們後面要學習的線程池中常常會用到,咱們後續再討論!
此處還有一個重要的特性,就是
內存一致性效果:線程中調用 countDown() 以前的操做happen-before緊跟在從另外一個線程中對應 await() 成功返回的操做。
場景應用咱們是看到了,那它究竟是基於什麼原理,怎麼實現的呢?
咱們來看下對應的源碼:併發
privatestaticfinalclass Sync extends AbstractQueuedSynchronizer
類的第二行咱們就看到了其內部實現了AQS的一個同步器。咱們重點來看下咱們用到的幾個方法:await和countDown。首先來看await方法app
publicvoid await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
很明顯是直接調用內部從新實現的同步器的獲取共享鎖的方法(前面咱們一直再講獨佔鎖,今天咱們藉此機會把共享鎖的機制一塊兒講掉)。ide
publicfinalvoid acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) thrownew InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
此處若是線程中斷,則直接退出,不然嘗試獲取共享鎖,咱們來看下tryAcquireShared(arg)的實現(此方法由內部類重寫實現):函數
publicint tryAcquireShared(int acquires) { return getState() == 0? 1 : -1; }
所謂共享鎖是說全部共享鎖的線程共享同一個資源,一旦任意一個線程拿到共享資源,那麼全部線程就都擁有的同一份資源。也就是一般狀況下共享鎖只是一個標誌,全部線程都等待這個標識是否知足,一旦知足全部線程都被激活(至關於全部線程都拿到鎖同樣)。這裏的閉鎖CountDownLatch就是基於共享鎖的實現。和明顯這裏的標識就是state等不等於零,而state實際上是有多少個線程在競爭這份資源,咱們前面能夠看到是經過構造函數傳入的一個大於0的數據,所以此時此刻此處返回的永遠是-1。工具
Sync(int count) { setState(count); }
當tryAcquireShared返回的數據小於零,說明沒有獲取到資源,須要阻塞,此時執行代碼doAcquireSharedInterruptibly():學習
privatevoid doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) break; } } catch (RuntimeException ex) { cancelAcquire(node); throw ex; } // Arrive here only if interrupted cancelAcquire(node); thrownew InterruptedException(); }
這裏首先以共享模式添加一個節點加入到CLH隊列中去,而後檢查當前節點的前繼節點(插入的數據在隊尾),若是前繼節點是頭結點而且當前的計數器爲0的話,則喚醒後繼節點(喚醒後面來說),不然判斷是否須要阻塞,若是須要,則阻塞當前線程!直到被喚醒或被中斷!測試
privatefinalboolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
這裏注意一點,LockSupport.park(Obj)中的參數obj是阻塞的監視對象,而非阻塞的對象,阻塞的對象是當前操做的線程,因此unpack的時候也是應該結算對應的線程!不要搞混了哈!ui
publicstaticvoid park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); unsafe.park(false, 0L); setBlocker(t, null); } publicstaticvoid unpark(Thread thread) { if (thread != null) unsafe.unpark(thread); }
下面咱們來看一下對應的countDown方法的實現
publicvoid countDown() { sync.releaseShared(1); }
首先每執行一次countDown就會執行內部方法的一次釋放鎖的操做!
publicfinalboolean releaseShared(int arg) { if (tryReleaseShared(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); returntrue; } returnfalse; }
若是嘗試成功則設置當前節點爲頭結點,並喚醒對應節點的後繼節點!
publicboolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) returnfalse; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
一樣,釋放鎖的方法也是CountDownLauch內部的同步類本身實現,這個方法自旋檢測當前計數器的數目,若是等於零,說明以前阻塞的線程已經所有釋放了,直接返回false,不然CAS設置當前的計數器,減去countdown的數目,若是設置成功後的數據爲零的話,說明已經所有執行完畢,須要釋放阻塞的線程了,返回true(注意此處精妙的返回nextc == 0),不然返回false。
咱們再來回看releaseShared方法,當tryReleaseShared返回true的時候,說明計數器已經爲零,阻塞的資源須要釋放了!此時執行unparkSuccessor(h)方法喚醒隊列中的頭結點。
此處設計了一個精妙的隊列依次去釋放被阻塞的線程,而不是相似singleAll的方法直接喚醒全部線程。那到底它是怎麼實現的呢?咱們代碼上看只喚醒了頭結點(實際上是頭結點的後繼節點,頭結點只是一個空節點),咱們先來看下unparkSuccessor的實現
privatevoid unparkSuccessor(Node node) { /* * Try to clear status in anticipation of signalling. It is * OK if this fails or if status is changed by waiting thread. */ compareAndSetWaitStatus(node, Node.SIGNAL, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
明顯咱們能夠看到,傳入的參數爲頭結點,經過CAS設置數據後,喚醒了頭結點的後繼結點(注意unpack的是線程而不是阻塞監視器)。而後就返回了!
那剩餘阻塞的線程是怎麼喚醒的呢?咱們再來看下await方法中doAcquireSharedInterruptibly的實現
privatevoid doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); // tag 2 if (r >= 0) { setHeadAndPropagate(node, r); // tag 3 p.next = null; // help GC return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())// tag 1 break; } } catch (RuntimeException ex) { cancelAcquire(node); throw ex; } // Arrive here only if interrupted cancelAcquire(node); thrownew InterruptedException(); }
前面咱們能夠看到在執行parkAndCheckInterrupt()時進行了阻塞,當咱們喚醒頭結點的後繼節點(第一個進入隊列的節點)時,tag1此行代碼被喚醒,break以後繼續進入自旋,而此時tag2行代碼檢測到計數器已經爲0,所以tryAcquireShared(arg)返回的結果是1(以前返回的都是-1),r大於零,進入tag3代碼,tag3會把當前的線程設置爲頭結點,而後繼續喚醒後續的後繼節點。
privatevoid setHeadAndPropagate(Node node, int propagate) { setHead(node); // tag 4 if (propagate > 0 && node.waitStatus != 0) { /* * Don't bother fully figuring out successor. If it * looks null, call unparkSuccessor anyway to be safe. */ Node s = node.next; if (s == null || s.isShared()) unparkSuccessor(node); // tag 5 } }
後繼節點被喚醒後,則繼續喚醒後面的後繼節點,進而把隊列中的數據依次喚醒!整個CountDownLatch就是這個樣子的。其實有了前面原子操做和AQS的原理及實現,分析CountDownLatch仍是比較容易的。