基於AQS構建CountDownLatch、CyclicBarrier和Semaphore

前言

本文先用 CountDownLatch 將共享模式說清楚,而後順着把其餘 AQS 相關的類 CyclicBarrier、Semaphore 的源碼一塊兒過一下。java

CountDownLatch

CountDownLatch 這個類是比較典型的 AQS 的共享模式的使用,這是一個高頻使用的類。latch 的中文意思是門栓、柵欄,具體怎麼解釋我就不廢話了,你們隨意,看兩個例子就知道在哪裏用、怎麼用了。node

  • 使用例子

咱們看下 Doug Lea 在 java doc 中給出的例子,這個例子很是實用,咱們常常會寫這個代碼。架構

假設咱們有 N ( N > 0 ) 個任務,那麼咱們會用 N 來初始化一個 CountDownLatch,而後將這個 latch 的引用傳遞到各個線程中,在每一個線程完成了任務後,調用 latch.countDown() 表明完成了一個任務。併發

調用 latch.await() 的方法的線程會阻塞,直到全部的任務完成。app

class Driver2 { // ...
    void main() throws InterruptedException {
        CountDownLatch doneSignal = new CountDownLatch(N);
        Executor e = Executors.newFixedThreadPool(8);

        // 建立 N 個任務,提交給線程池來執行
        for (int i = 0; i < N; ++i) // create and start threads
            e.execute(new WorkerRunnable(doneSignal, i));

        // 等待全部的任務完成,這個方法纔會返回
        doneSignal.await();           // wait for all to finish
    }
}

class WorkerRunnable implements Runnable {
    private final CountDownLatch doneSignal;
    private final int i;

    WorkerRunnable(CountDownLatch doneSignal, int i) {
        this.doneSignal = doneSignal;
        this.i = i;
    }

    public void run() {
        try {
            doWork(i);
            // 這個線程的任務完成了,調用 countDown 方法
            doneSignal.countDown();
        } catch (InterruptedException ex) {
        } // return;
    }

    void doWork() { ...}
}

因此說 CountDownLatch 很是實用,咱們經常會將一個比較大的任務進行拆分,而後開啓多個線程來執行,等全部線程都執行完了之後,再往下執行其餘操做。這裏例子中,只有 main 線程調用了 await 方法。ide

咱們再來看另外一個例子,這個例子很典型,用了兩個 CountDownLatch:oop

class Driver { // ...
    void main() throws InterruptedException {
        CountDownLatch startSignal = new CountDownLatch(1);
        CountDownLatch doneSignal = new CountDownLatch(N);

        for (int i = 0; i < N; ++i) // create and start threads
            new Thread(new Worker(startSignal, doneSignal)).start();

        // 這邊插入一些代碼,確保上面的每一個線程先啓動起來,才執行下面的代碼。
        doSomethingElse();            // don't let run yet
        // 由於這裏 N == 1,因此,只要調用一次,那麼全部的 await 方法均可以經過
        startSignal.countDown();      // let all threads proceed
        doSomethingElse();
        // 等待全部任務結束
        doneSignal.await();           // wait for all to finish
    }
}

class Worker implements Runnable {
    private final CountDownLatch startSignal;
    private final CountDownLatch doneSignal;

    Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
        this.startSignal = startSignal;
        this.doneSignal = doneSignal;
    }

    public void run() {
        try {
            // 爲了讓全部線程同時開始任務,咱們讓全部線程先阻塞在這裏
            // 等你們都準備好了,再打開這個門栓
            startSignal.await();
            doWork();
            doneSignal.countDown();
        } catch (InterruptedException ex) {
        } // return;
    }

    void doWork() { ...}
}

這個例子中,doneSignal 同第一個例子的使用,咱們說說這裏的 startSignal。N 個新開啓的線程都調用了startSignal.await() 進行阻塞等待,它們阻塞在柵欄上,只有當條件知足的時候(startSignal.countDown()),它們才能同時經過這個柵欄。若是始終只有一個線程調用 await 方法等待任務完成,那麼 CountDownLatch 就會簡單不少,因此以後的源碼分析讀者必定要在腦海中構建出這麼一個場景:有 m 個線程是作任務的,有 n 個線程在某個柵欄上等待這 m 個線程作完任務,直到全部 m 個任務完成後,n 個線程同時經過柵欄。源碼分析

  • 源碼分析
public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}
// 老套路了,內部封裝一個 Sync 類繼承自 AQS
private static final class Sync extends AbstractQueuedSynchronizer {
    Sync(int count) {
        // 這樣就 state == count 了
        setState(count);
    }
    ...
}

代碼都是套路,先分析套路:AQS 裏面的 state 是一個整數值,這邊用一個 int count 參數其實初始化就是設置了這個值,全部調用了 await 方法的等待線程會掛起,而後有其餘一些線程會作 state = state - 1 操做,當 state 減到 0 的同時,那個線程會負責喚醒調用了 await 方法的全部線程。都是套路啊,只是 Doug Lea 的套路很深,代碼很巧妙,否則咱們也沒有要分析源碼的必要。ui

對於 CountDownLatch,咱們僅僅須要關心兩個方法,一個是 countDown() 方法,另外一個是 await() 方法。countDown() 方法每次調用都會將 state 減 1,直到 state 的值爲 0;而 await 是一個阻塞方法,當 state 減爲 0 的時候,await 方法纔會返回。await 能夠被多個線程調用,讀者這個時候腦子裏要有個圖:全部調用了 await 方法的線程阻塞在 AQS 的阻塞隊列中,等待條件知足(state == 0),將線程從隊列中一個個喚醒過來。this

咱們用如下程序來分析源碼,t1 和 t2 負責調用 countDown() 方法,t3 和 t4 調用 await 方法阻塞:

public class CountDownLatchDemo {

    public static void main(String[] args) {

        CountDownLatch latch = new CountDownLatch(2);

        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException ignore) {
                }
                // 休息 5 秒後(模擬線程工做了 5 秒),調用 countDown()
                latch.countDown();
            }
        }, "t1");

        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException ignore) {
                }
                // 休息 10 秒後(模擬線程工做了 10 秒),調用 countDown()
                latch.countDown();
            }
        }, "t2");

        t1.start();
        t2.start();

        Thread t3 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    // 阻塞,等待 state 減爲 0
                    latch.await();
                    System.out.println("線程 t3 從 await 中返回了");
                } catch (InterruptedException e) {
                    System.out.println("線程 t3 await 被中斷");
                    Thread.currentThread().interrupt();
                }
            }
        }, "t3");
        Thread t4 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    // 阻塞,等待 state 減爲 0
                    latch.await();
                    System.out.println("線程 t4 從 await 中返回了");
                } catch (InterruptedException e) {
                    System.out.println("線程 t4 await 被中斷");
                    Thread.currentThread().interrupt();
                }
            }
        }, "t4");

        t3.start();
        t4.start();
    }
}

上述程序,大概在過了 10 秒左右的時候,會輸出:

線程 t3 從 await 中返回了
線程 t4 從 await 中返回了
// 這兩條輸出,順序不是絕對的
// 後面的分析,咱們假設 t3 先進入阻塞隊列

接下來,咱們按照流程一步一步走:先 await 等待,而後被喚醒,await 方法返回。

首先,咱們來看 await() 方法,它表明線程阻塞,等待 state 的值減爲 0。

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    // 這也是老套路了,我在第二篇的中斷那一節說過了
    if (Thread.interrupted())
        throw new InterruptedException();
    // t3 和 t4 調用 await 的時候,state 都大於 0。
    // 也就是說,這個 if 返回 true,而後往裏看
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
// 只有當 state == 0 的時候,這個方法纔會返回 1
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

從方法名咱們就能夠看出,這個方法是獲取共享鎖,而且此方法是可中斷的(中斷的時候拋出 InterruptedException 退出這個方法)。

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    // 1. 入隊
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                // 同上,只要 state 不等於 0,那麼這個方法返回 -1
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 2
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

咱們再一步步看具體的流程。首先,咱們看 countDown() 方法:

public void countDown() {
    sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
    // 只有當 state 減爲 0 的時候,tryReleaseShared 才返回 true
    // 不然只是簡單的 state = state - 1 那麼 countDown 方法就結束了
    if (tryReleaseShared(arg)) {
        // 喚醒 await 的線程
        doReleaseShared();
        return true;
    }
    return false;
}
// 這個方法很簡單,用自旋的方法實現 state 減 1
protected boolean tryReleaseShared(int releases) {
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

countDown 方法就是每次調用都將 state 值減 1,若是 state 減到 0 了,那麼就調用下面的方法進行喚醒阻塞隊列中的線程:

// 調用這個方法的時候,state == 0
// 這個方法先不要看全部的代碼,按照思路往下到我寫註釋的地方,其餘的以後還會仔細分析
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // t3 入隊的時候,已經將頭節點的 waitStatus 設置爲 Node.SIGNAL(-1) 了
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                // 就是這裏,喚醒 head 的後繼節點,也就是阻塞隊列中的第一個節點
                // 在這裏,也就是喚醒 t3
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // todo
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

一旦 t3 被喚醒後,咱們繼續回到 await 的這段代碼,parkAndCheckInterrupt 返回,咱們先不考慮中斷的狀況:

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r); // 2. 這裏是下一步
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                // 1. 喚醒後這個方法返回
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

接下來,t3 會進到 setHeadAndPropagate(node, r) 這個方法,先把 head 給佔了,而後喚醒隊列中其餘的線程:

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);

    // 下面說的是,喚醒當前 node 以後的節點,即 t3 已經醒了,立刻喚醒 t4
    // 相似的,若是 t4 後面還有 t5,那麼 t4 醒了之後,立刻將 t5 給喚醒了
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            // 又是這個方法,只是如今的 head 已經不是原來的空節點了,是 t3 的節點了
            doReleaseShared();
    }
}

又回到這個方法了,那麼接下來,咱們好好分析 doReleaseShared 這個方法,咱們根據流程,頭節點 head 此時是 t3 節點了:

// 調用這個方法的時候,state == 0
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        // 1. h == null: 說明阻塞隊列爲空
        // 2. h == tail: 說明頭結點多是剛剛初始化的頭節點,
        //   或者是普通線程節點,可是此節點既然是頭節點了,那麼表明已經被喚醒了,阻塞隊列沒有其餘節點了
        // 因此這兩種狀況不須要進行喚醒後繼節點
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // t4 將頭節點(此時是 t3)的 waitStatus 設置爲 Node.SIGNAL(-1) 了
            if (ws == Node.SIGNAL) {
                // 這裏 CAS 失敗的場景請看下面的解讀
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                // 就是這裏,喚醒 head 的後繼節點,也就是阻塞隊列中的第一個節點
                // 在這裏,也就是喚醒 t4
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     // 這個 CAS 失敗的場景是:執行到這裏的時候,恰好有一個節點入隊,入隊會將這個 ws 設置爲 -1
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        // 若是到這裏的時候,前面喚醒的線程已經佔領了 head,那麼再循環
        // 不然,就是 head 沒變,那麼退出循環,
        // 退出循環是否是意味着阻塞隊列中的其餘節點就不喚醒了?固然不是,喚醒的線程以後仍是會調用這個方法的
        if (h == head)                   // loop if head changed
            break;
    }
}

咱們分析下最後一個 if 語句,而後才能解釋第一個 CAS 爲何可能會失敗:

h == head:說明頭節點尚未被剛剛用 unparkSuccessor 喚醒的線程(這裏能夠理解爲 t4)佔有,此時 break 退出循環。
h != head:頭節點被剛剛喚醒的線程(這裏能夠理解爲 t4)佔有,那麼這裏從新進入下一輪循環,喚醒下一個節點(這裏是 t4 )。咱們知道,等到 t4 被喚醒後,實際上是會主動喚醒 t五、t六、t7...,那爲何這裏要進行下一個循環來喚醒 t5 呢?我以爲是出於吞吐量的考慮。
知足上面的 2 的場景,那麼咱們就能知道爲何上面的 CAS 操做 compareAndSetWaitStatus(h, Node.SIGNAL, 0) 會失敗了?

由於當前進行 for 循環的線程到這裏的時候,可能剛剛喚醒的線程 t4 也剛恰好到這裏了,那麼就有可能 CAS 失敗了。

for 循環第一輪的時候會喚醒 t4,t4 醒後會將本身設置爲頭節點,若是在 t4 設置頭節點後,for 循環才跑到 if (h == head),那麼此時會返回 false,for 循環會進入下一輪。t4 喚醒後也會進入到這個方法裏面,那麼 for 循環第二輪和 t4 就有可能在這個 CAS 相遇,那麼就只會有一個成功了。

CyclicBarrier

字面意思是「可重複使用的柵欄」,CyclicBarrier 相比 CountDownLatch 來講,要簡單不少,其源碼沒有什麼高深的地方,它是 ReentrantLock 和 Condition 的組合使用。看以下示意圖,CyclicBarrier 和 CountDownLatch 是否是很像,只是 CyclicBarrier 能夠有不止一個柵欄,由於它的柵欄(Barrier)能夠重複使用(Cyclic)。
首先,CyclicBarrier 的源碼實現和 CountDownLatch 截然不同,CountDownLatch 基於 AQS 的共享模式的使用,而 CyclicBarrier 基於 Condition 來實現。

由於 CyclicBarrier 的源碼相對來講簡單許多,讀者只要熟悉了前面關於 Condition 的分析,那麼這裏的源碼是毫無壓力的,就是幾個特殊概念罷了。

廢話結束,先上基本屬性和構造方法:

public class CyclicBarrier {
    // 咱們說了,CyclicBarrier 是能夠重複使用的,咱們把每次從開始使用到穿過柵欄當作"一代"
    private static class Generation {
        boolean broken = false;
    }

    /** The lock for guarding barrier entry */
    private final ReentrantLock lock = new ReentrantLock();
    // CyclicBarrier 是基於 Condition 的
    // Condition 是「條件」的意思,CyclicBarrier 的等待線程經過 barrier 的「條件」是你們都到了柵欄上
    private final Condition trip = lock.newCondition();

    // 參與的線程數
    private final int parties;

    // 若是設置了這個,表明越過柵欄以前,要執行相應的操做
    private final Runnable barrierCommand;

    // 當前所處的「代」
    private Generation generation = new Generation();

    // 尚未到柵欄的線程數,這個值初始爲 parties,而後遞減
    // 尚未到柵欄的線程數 = parties - 已經到柵欄的數量
    private int count;

    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

    public CyclicBarrier(int parties) {
        this(parties, null);
    }

我用一圖來描繪下 CyclicBarrier 裏面的一些概念:
圖片描述
看圖咱們也知道了,CyclicBarrier 的源碼最重要的就是 await() 方法了。

首先,先看怎麼開啓新的一代:

// 開啓新的一代,當最後一個線程到達柵欄上的時候,調用這個方法來喚醒其餘線程,同時初始化「下一代」
private void nextGeneration() {
    // 首先,須要喚醒全部的在柵欄上等待的線程
    trip.signalAll();
    // 更新 count 的值
    count = parties;
    // 從新生成「新一代」
    generation = new Generation();
}

看看怎麼打破一個柵欄:

private void breakBarrier() {
    // 設置狀態 broken 爲 true
    generation.broken = true;
    // 重置 count 爲初始值 parties
    count = parties;
    // 喚醒全部已經在等待的線程
    trip.signalAll();
}

這兩個方法以後用獲得,如今開始分析最重要的等待經過柵欄方法 await 方法:

// 不帶超時機制
public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}
// 帶超時機制,若是超時拋出 TimeoutException 異常
public int await(long timeout, TimeUnit unit)
    throws InterruptedException,
           BrokenBarrierException,
           TimeoutException {
    return dowait(true, unit.toNanos(timeout));
}

繼續往裏看:

private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
    final ReentrantLock lock = this.lock;
    // 先要獲取到鎖,而後在 finally 中要記得釋放鎖
    // 若是記得 Condition 部分的話,咱們知道 condition 的 await 會釋放鎖,signal 的時候須要從新獲取鎖
    lock.lock();
    try {
        final Generation g = generation;
        // 檢查柵欄是否被打破,若是被打破,拋出 BrokenBarrierException 異常
        if (g.broken)
            throw new BrokenBarrierException();
        // 檢查中斷狀態,若是中斷了,拋出 InterruptedException 異常
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }
        // index 是這個 await 方法的返回值
        // 注意到這裏,這個是從 count 遞減後獲得的值
        int index = --count;

        // 若是等於 0,說明全部的線程都到柵欄上了,準備經過
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                // 若是在初始化的時候,指定了經過柵欄前須要執行的操做,在這裏會獲得執行
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                // 若是 ranAction 爲 true,說明執行 command.run() 的時候,沒有發生異常退出的狀況
                ranAction = true;
                // 喚醒等待的線程,而後開啓新的一代
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    // 進到這裏,說明執行指定操做的時候,發生了異常,那麼須要打破柵欄
                    // 以前咱們說了,打破柵欄意味着喚醒全部等待的線程,設置 broken 爲 true,重置 count 爲 parties
                    breakBarrier();
            }
        }

        // loop until tripped, broken, interrupted, or timed out
        // 若是是最後一個線程調用 await,那麼上面就返回了
        // 下面的操做是給那些不是最後一個到達柵欄的線程執行的
        for (;;) {
            try {
                // 若是帶有超時機制,調用帶超時的 Condition 的 await 方法等待,直到最後一個線程調用 await
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                // 若是到這裏,說明等待的線程在 await(是 Condition 的 await)的時候被中斷
                if (g == generation && ! g.broken) {
                    // 打破柵欄
                    breakBarrier();
                    // 打破柵欄後,從新拋出這個 InterruptedException 異常給外層調用的方法
                    throw ie;
                } else {
                    // 到這裏,說明 g != generation, 說明新的一代已經產生,即最後一個線程 await 執行完成,
                    // 那麼此時沒有必要再拋出 InterruptedException 異常,記錄下來這個中斷信息便可
                    // 或者是柵欄已經被打破了,那麼也不該該拋出 InterruptedException 異常,
                    // 而是以後拋出 BrokenBarrierException 異常
                    Thread.currentThread().interrupt();
                }
            }

              // 喚醒後,檢查柵欄是不是「破的」
            if (g.broken)
                throw new BrokenBarrierException();

            // 這個 for 循環除了異常,就是要從這裏退出了
            // 咱們要清楚,最後一個線程在執行完指定任務(若是有的話),會調用 nextGeneration 來開啓一個新的代
            // 而後釋放掉鎖,其餘線程從 Condition 的 await 方法中獲得鎖並返回,而後到這裏的時候,其實就會知足 g != generation 的
            // 那何時不知足呢?barrierCommand 執行過程當中拋出了異常,那麼會執行打破柵欄操做,
            // 設置 broken 爲true,而後喚醒這些線程。這些線程會從上面的 if (g.broken) 這個分支拋 BrokenBarrierException 異常返回
            // 固然,還有最後一種可能,那就是 await 超時,此種狀況不會從上面的 if 分支異常返回,也不會從這裏返回,會執行後面的代碼
            if (g != generation)
                return index;

            // 若是醒來發現超時了,打破柵欄,拋出異常
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

好了,我想我應該講清楚了吧,我好像幾乎沒有漏掉任何一行代碼吧?

下面開始收尾工做。

首先,咱們看看怎麼獲得有多少個線程到了柵欄上,處於等待狀態:

public int getNumberWaiting() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return parties - count;
    } finally {
        lock.unlock();
    }
}

判斷一個柵欄是否被打破了,這個很簡單,直接看 broken 的值便可:

public boolean isBroken() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return generation.broken;
    } finally {
        lock.unlock();
    }
}

前面咱們在說 await 的時候也幾乎說清楚了,何時柵欄會被打破,總結以下:

1.中斷,咱們說了,若是某個等待的線程發生了中斷,那麼會打破柵欄,同時拋出 InterruptedException 異常;
2.超時,打破柵欄,同時拋出 TimeoutException 異常;
3.指定執行的操做拋出了異常,這個咱們前面也說過。
最後,咱們來看看怎麼重置一個柵欄:

public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        breakBarrier();   // break the current generation
        nextGeneration(); // start a new generation
    } finally {
        lock.unlock();
    }
}

咱們設想一下,若是初始化時,指定了線程 parties = 4,前面有 3 個線程調用了 await 等待,在第 4 個線程調用 await 以前,咱們調用 reset 方法,那麼會發生什麼?

首先,打破柵欄,那意味着全部等待的線程(3個等待的線程)會喚醒,await 方法會經過拋出 BrokenBarrierException 異常返回。而後開啓新的一代,重置了 count 和 generation,至關於一切歸零了。

怎麼樣,CyclicBarrier 源碼很簡單吧。

Semaphore

有了 CountDownLatch 的基礎後,分析 Semaphore 會簡單不少。Semaphore 是什麼呢?它相似一個資源池(讀者能夠類比線程池),每一個線程須要調用 acquire() 方法獲取資源,而後才能執行,執行完後,須要 release 資源,讓給其餘的線程用。

大概你們也能夠猜到,Semaphore 其實也是 AQS 中共享鎖的使用,由於每一個線程共享一個池嘛。

套路解讀:建立 Semaphore 實例的時候,須要一個參數 permits,這個基本上能夠肯定是設置給 AQS 的 state 的,而後每一個線程調用 acquire 的時候,執行 state = state - 1,release 的時候執行 state = state + 1,固然,acquire 的時候,若是 state = 0,說明沒有資源了,須要等待其餘線程 release。

構造方法:

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

這裏和 ReentrantLock 相似,用了公平策略和非公平策略。

看 acquire 方法:

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
public void acquireUninterruptibly() {
    sync.acquireShared(1);
}
public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}
public void acquireUninterruptibly(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireShared(permits);
}

這幾個方法也是老套路了,你們基本都懂了吧,這邊多了兩個能夠傳參的 acquire 方法,不過你們也都懂的吧,若是咱們須要一次獲取超過一個的資源,會用得着這個的。

咱們接下來看不拋出 InterruptedException 異常的 acquireUninterruptibly() 方法吧:

public void acquireUninterruptibly() {
    sync.acquireShared(1);
}
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

前面說了,Semaphore 分公平策略和非公平策略,咱們對比一下兩個 tryAcquireShared 方法:

// 公平策略:
protected int tryAcquireShared(int acquires) {
    for (;;) {
        // 區別就在因而不是會先判斷是否有線程在排隊,而後才進行 CAS 減操做
        if (hasQueuedPredecessors())
            return -1;
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}
// 非公平策略:
protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

也是老套路了,因此從源碼分析角度的話,咱們其實不太須要關心是否是公平策略仍是非公平策略,它們的區別每每就那麼一兩行。

咱們再回到 acquireShared 方法,

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

因爲 tryAcquireShared(arg) 返回小於 0 的時候,說明 state 已經小於 0 了(沒資源了),此時 acquire 不能立馬拿到資源,須要進入到阻塞隊列等待,雖然貼了不少代碼,不在意多這點了:

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

這個方法我就不介紹了,線程掛起後等待有資源被 release 出來。接下來,咱們就要看 release 的方法了:

// 任務介紹,釋放一個資源
public void release() {
    sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        // 溢出,固然,咱們通常也不會用這麼大的數
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}

tryReleaseShared 方法老是會返回 true,而後是 doReleaseShared,這個也是咱們熟悉的方法了,我就貼下代碼,不分析了,這個方法用於喚醒全部的等待線程:

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

Semphore 的源碼確實很簡單,基本上都是分析過的老代碼的組合使用了。

總結

寫到這裏,終於把 AbstractQueuedSynchronizer 基本上說完了,對於 Java 併發,Doug Lea 真的是神同樣的存在。往後咱們還會接觸到不少 Doug Lea 的代碼,但願咱們你們均可以朝着大神的方向不斷打磨本身的技術,少一些高大上的架構,多一些實實在在的優秀代碼吧。

(全文完)

相關文章
相關標籤/搜索