CountDownLatch是同步工具類之一,能夠指定一個計數值,在併發環境下由線程進行減1操做,當計數值變爲0以後,被await方法阻塞的線程將會喚醒,實現線程間的同步。java
CountDownLatch是一次性的,計數器的值只能在構造方法中初始化一次,以後沒有任何機制再次對其設置值,當CountDownLatch使用完畢後,它不能再次被使用。node
public static void main(String[] args) { int threadNum = 10; final CountDownLatch countDownLatch = new CountDownLatch(threadNum); for (int i = 0; i < threadNum; i++) { final int tNum = i + 1; new Thread(() -> { System.out.println("thread " + tNum + " start"); Random random = new Random(); try { Thread.sleep(random.nextInt(10000) + 1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("thread " + tNum + " finish"); countDownLatch.countDown();//每個線程執行完就會減一 }).start(); } //主線程啓動10個子線程後阻塞在await方法,須要等子線程都執行完畢,主線程才能喚醒繼續執行。 try { countDownLatch.await();//當前線程被阻塞 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(threadNum + " thread finish"); }
CountDownLatch和ReentrantLock同樣,內部使用Sync繼承AQS。Sync的構造函數接收了計數值並設置爲state。咱們知道AQS的state是一個由子類決定含義的「狀態」。對於ReentrantLock來講,state是線程獲取鎖的次數;對於CountDownLatch來講,則表示計數值的大小。併發
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
Sync(int count) { setState(count);//設置state,表示計數值的大小 }
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted())//判斷當前線程是否被中斷 throw new InterruptedException(); //tryAcquireShared:是否須要阻塞當前線程 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg);//由CountDownLatch實現阻塞當前線程的邏輯。 }
/** * tryAcquireShared方法其實就是判斷一下當前計數器的值,是否爲0。 * 爲0則返回1:表示當前線程不會阻塞在countDownLatch.await() */ protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED);//建立共享的node節點,並加入等待隊列 boolean failed = true; try { for (;;) {//死循環 final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg);//判斷state是否爲0 if (r >= 0) { setHeadAndPropagate(node, r);//設置本身爲head,並通知其餘等待的線程 p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
以上對於countDownLatch.await()方法是怎麼「阻塞」當前線程的咱們已經很明白了,即當咱們調用了countDownLatch.await()方法後,你當前線程就會進入了一個死循環當中,在這個死循環裏面,會不斷的進行判斷,經過調用tryAcquireShared方法,不斷判斷咱們上面說的那個計數器,看看它的值是否爲0了(爲0的時候,其實就是咱們調用了足夠屢次數的countDownLatch.countDown()方法的時候),若是是爲0的話,tryAcquireShared就會返回1,代碼會執行if(r>=0){...},而後跳出了循環,也就再也不「阻塞」當前線程了。須要注意的是,說是在不停的循環,其實也並不是在不停的執行for循環裏面的內容,由於在後面調用parkAndCheckInterrupt()方法時,在這個方法裏面是會調用 LockSupport.park(this);,來禁用當前線程的。dom
//釋放鎖的操做,每調用一次,計數值減小1 public void countDown() { sync.releaseShared(1); }
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) {//嘗試釋放鎖,具體實如今CountDownLatch中 doReleaseShared(); return true; } return false; }
protected boolean tryReleaseShared(int releases) { for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc))//state的減1操做 return nextc == 0;//當計數值等於0,表明全部子線程都執行完畢,被await阻塞的線程就能夠被喚醒 } }
//喚醒被await阻塞的線程 private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { //頭節點狀態若是SIGNAL,則狀態重置爲0, if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; unparkSuccessor(h);//喚醒下個節點 } //被喚醒的節點狀態會重置成0,在下一次循環中被設置成PROPAGATE狀態, //表明狀態要向後傳播。 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head) break; } }
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0);//設置非CANCELLED節點的等待狀態爲0 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread);//喚醒節點 }
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared())//若是節點爲共享類型 doReleaseShared();//釋放,下個節點被喚醒後,再次喚醒後續的等待節點,達到共享狀態向後傳播。 } }
參考地址:函數