CountDownLatch原理分析

      閉鎖是一種同步工具類,能夠延遲線程進度直到其到達最終終止狀態。閉鎖關閉以後,沒有任何線程能夠經過,當到達結束狀態時,會永遠打開,並容許全部線程經過。
      閉鎖狀態包括一個計數器,該計數器初始化爲一個正數,表示須要等待的事件數量。countDown 方法遞減計數器,表示一個事件已經發生,await 方法等待計數器達到零,表示全部須要等待的事件都已經發生。CountDownLatch 內部也是經過 AQS 來實現的。

CountDownLatch 主要實現方法:java

  1. CountDownLatch 構造時傳入計數器數目,CountDownLatch 將計數器設置到 AQS 的 state 上;
  2. await 等待,若是計數器爲 0, 則成功經過;不然被阻塞,並將線程記錄到 AQS 的等待鎖隊列中,等待喚醒從新嘗試;
  3. countDown 計數器減1,經過 CAS 將計數器減1,若是計數器爲0,則喚醒等待隊列上全部線程,由於 CountDownLatch 此時已經打開,因此全部的線程均可以經過。


CountDownLatch 結構:node

public class CountDownLatch {
    private final CountDownLatch.Sync sync;

    public CountDownLatch(int var1) {
        if(var1 < 0) {
            throw new IllegalArgumentException("count < 0");
        } else {
            this.sync = new CountDownLatch.Sync(var1);
        }
    }

    public void await() throws InterruptedException {
        this.sync.acquireSharedInterruptibly(1);
    }

    public boolean await(long var1, TimeUnit var3) throws InterruptedException {
        return this.sync.tryAcquireSharedNanos(1, var3.toNanos(var1));
    }

    public void countDown() {
        this.sync.releaseShared(1);
    }

    public long getCount() {
        return (long)this.sync.getCount();
    }
}

從 CountDownLatch 中咱們能夠看到 CountDownLatch 的鎖是經過 Sync 這個類完成的,Sync 則繼承自 AQS,AQS 是獨佔鎖和共享鎖的父類,經過繼承 AQS 實現共享鎖。

Sync 結構:ide

private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int var1) {
            this.setState(var1);
        }

        int getCount() {
            return this.getState();
        }

	    // 判斷是否可用
        protected int tryAcquireShared(int var1) {
            return this.getState() == 0?1:-1;
        }

        // 計數器減1,並返回是否釋放成功
        protected boolean tryReleaseShared(int var1) {
            int var2;
            int var3;
            do {
                var2 = this.getState();
                if(var2 == 0) {
                    return false;
                }

                var3 = var2 - 1;
            } while(!this.compareAndSetState(var2, var3));

            return var3 == 0;
        }
    }

Sync 實現了 tryAcquireShared 方法,調用 await 時判斷鎖是否可用,若是可用,就直接經過;若是不可用,則將線程記錄在等待隊列上。
Sync 實現了 tryReleaseShared 方法,調用 countDown 方法時,會將 state 減1,若是成功釋放則執行 doReleaseShared 方法喚醒隊首線程,隊首線程喚醒後依次喚醒後續隊首的線程,從而作到喚醒全部線程。

AQS 結構:工具

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements Serializable {
    private static final long serialVersionUID = 7373984972572414691L;
    private transient volatile AbstractQueuedSynchronizer.Node head;
    private transient volatile AbstractQueuedSynchronizer.Node tail;
    private volatile int state;
    static final long spinForTimeoutThreshold = 1000L;
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long stateOffset;
    private static final long headOffset;
    private static final long tailOffset;
    private static final long waitStatusOffset;
    private static final long nextOffset;

    protected AbstractQueuedSynchronizer() {
    }
	
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

    // 獲取失敗會被阻塞
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    // 設置等待頭部隊列,並傳播喚醒下個頭部線程
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    // 喚醒等待隊列隊首線程
    private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

    private void doReleaseShared() {
        while(true) {
            AbstractQueuedSynchronizer.Node var1 = this.head;
            if(var1 != null && var1 != this.tail) {
                int var2 = var1.waitStatus;
                if(var2 == -1) {
                    if(!compareAndSetWaitStatus(var1, -1, 0)) {
                        continue;
                    }

                    this.unparkSuccessor(var1);
                } else if(var2 == 0 && !compareAndSetWaitStatus(var1, 0, -3)) {
                    continue;
                }
            }

            if(var1 == this.head) {
                return;
            }
        }
    }
}

CountDownLatch 調用 await 方法時,調用 AQS 的 acquireSharedInterruptibly 方法,AQS 則調用子類的 tryAcquireShared 方法,若是獲取成功,則直接返回;若是獲取失敗,則調用調用 AQS 的方法 doAcquireSharedInterruptibly 阻塞並加入到等待隊列等待喚醒。
CountDownLatch 調用 countDown 方法時,調用 AQS 的 releaseShared 方法,AQS 則調用子類的 tryReleaseShared 方法,若是釋放成功,則調用 doReleaseShared 方法喚醒隊列首部線程,線程啓動後,若是 tryAcquireShared 返回值大於等於 0,則經過 setHeadAndPropagate 方法進行傳播,喚醒下一個線程。

CountDownLatch 使用:oop

public class Main {
    public static void main(String[] args) throws InterruptedException {
        final int threadsNumber = 4;
        final CountDownLatch startGate = new CountDownLatch(1);
        final CountDownLatch endGate = new CountDownLatch(threadsNumber);

        for (int i = 0; i < threadsNumber; i++) {
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        startGate.await();
                        endGate.countDown();
                        System.out.println(this + "end");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
            thread.start();
        }

        startGate.countDown();
        endGate.await();
    }
}
相關文章
相關標籤/搜索