閉鎖是一種同步工具類,能夠延遲線程進度直到其到達最終終止狀態。閉鎖關閉以後,沒有任何線程能夠經過,當到達結束狀態時,會永遠打開,並容許全部線程經過。
閉鎖狀態包括一個計數器,該計數器初始化爲一個正數,表示須要等待的事件數量。countDown 方法遞減計數器,表示一個事件已經發生,await 方法等待計數器達到零,表示全部須要等待的事件都已經發生。CountDownLatch 內部也是經過 AQS 來實現的。
CountDownLatch 主要實現方法:java
CountDownLatch 結構:node
public class CountDownLatch { private final CountDownLatch.Sync sync; public CountDownLatch(int var1) { if(var1 < 0) { throw new IllegalArgumentException("count < 0"); } else { this.sync = new CountDownLatch.Sync(var1); } } public void await() throws InterruptedException { this.sync.acquireSharedInterruptibly(1); } public boolean await(long var1, TimeUnit var3) throws InterruptedException { return this.sync.tryAcquireSharedNanos(1, var3.toNanos(var1)); } public void countDown() { this.sync.releaseShared(1); } public long getCount() { return (long)this.sync.getCount(); } }
從 CountDownLatch 中咱們能夠看到 CountDownLatch 的鎖是經過 Sync 這個類完成的,Sync 則繼承自 AQS,AQS 是獨佔鎖和共享鎖的父類,經過繼承 AQS 實現共享鎖。
Sync 結構:ide
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int var1) { this.setState(var1); } int getCount() { return this.getState(); } // 判斷是否可用 protected int tryAcquireShared(int var1) { return this.getState() == 0?1:-1; } // 計數器減1,並返回是否釋放成功 protected boolean tryReleaseShared(int var1) { int var2; int var3; do { var2 = this.getState(); if(var2 == 0) { return false; } var3 = var2 - 1; } while(!this.compareAndSetState(var2, var3)); return var3 == 0; } }
Sync 實現了 tryAcquireShared 方法,調用 await 時判斷鎖是否可用,若是可用,就直接經過;若是不可用,則將線程記錄在等待隊列上。
Sync 實現了 tryReleaseShared 方法,調用 countDown 方法時,會將 state 減1,若是成功釋放則執行 doReleaseShared 方法喚醒隊首線程,隊首線程喚醒後依次喚醒後續隊首的線程,從而作到喚醒全部線程。
AQS 結構:工具
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements Serializable { private static final long serialVersionUID = 7373984972572414691L; private transient volatile AbstractQueuedSynchronizer.Node head; private transient volatile AbstractQueuedSynchronizer.Node tail; private volatile int state; static final long spinForTimeoutThreshold = 1000L; private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long stateOffset; private static final long headOffset; private static final long tailOffset; private static final long waitStatusOffset; private static final long nextOffset; protected AbstractQueuedSynchronizer() { } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } // 獲取失敗會被阻塞 private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); // 設置等待頭部隊列,並傳播喚醒下個頭部線程 if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } // 喚醒等待隊列隊首線程 private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } } private void doReleaseShared() { while(true) { AbstractQueuedSynchronizer.Node var1 = this.head; if(var1 != null && var1 != this.tail) { int var2 = var1.waitStatus; if(var2 == -1) { if(!compareAndSetWaitStatus(var1, -1, 0)) { continue; } this.unparkSuccessor(var1); } else if(var2 == 0 && !compareAndSetWaitStatus(var1, 0, -3)) { continue; } } if(var1 == this.head) { return; } } } }
CountDownLatch 調用 await 方法時,調用 AQS 的 acquireSharedInterruptibly 方法,AQS 則調用子類的 tryAcquireShared 方法,若是獲取成功,則直接返回;若是獲取失敗,則調用調用 AQS 的方法 doAcquireSharedInterruptibly 阻塞並加入到等待隊列等待喚醒。
CountDownLatch 調用 countDown 方法時,調用 AQS 的 releaseShared 方法,AQS 則調用子類的 tryReleaseShared 方法,若是釋放成功,則調用 doReleaseShared 方法喚醒隊列首部線程,線程啓動後,若是 tryAcquireShared 返回值大於等於 0,則經過 setHeadAndPropagate 方法進行傳播,喚醒下一個線程。
CountDownLatch 使用:oop
public class Main { public static void main(String[] args) throws InterruptedException { final int threadsNumber = 4; final CountDownLatch startGate = new CountDownLatch(1); final CountDownLatch endGate = new CountDownLatch(threadsNumber); for (int i = 0; i < threadsNumber; i++) { Thread thread = new Thread(new Runnable() { @Override public void run() { try { startGate.await(); endGate.countDown(); System.out.println(this + "end"); } catch (Exception e) { e.printStackTrace(); } } }); thread.start(); } startGate.countDown(); endGate.await(); } }