最經常使用的CountDownLatch, CyclicBarrier你知道多少? (Java工程師必會)

CountdownLatch,CyclicBarrier是很是經常使用併發工具類,能夠說是Java工程師必會技能了。不但在項目實戰中常常涉及,並且在編寫壓測程序,多線程demo也是必不可少,因此掌握它們的用法和實現原理很是有必要。java

年年不忘,必有迴響!
點贊走一走,找到女友~nginx

等待多線程完成的CountDownLatch

CountDownLatch容許一個或多個線程等待其餘線程完成操做。也就是說經過使用CountDownLatch工具類,可讓一組線程等待彼此執行完畢後在共同執行下一個操做。具體流程以下圖所示,箭頭表示任務,矩形表示柵欄,當三個任務都到達柵欄時,柵欄後wait的任務纔開始執行。多線程

CountDownLatch維護有個int型的狀態碼,每次調用countDown時狀態值就會減1;調用wait方法的線程會阻塞,直到狀態碼爲0時纔會繼續執行。併發

在多線程協同工做時,可能須要等待其餘線程執行完畢以後,主線程才接着往下執行。首先咱們可能會想到使用線程的join方法(調用join方法的線程優先執行,該線程執行完畢後纔會執行其餘線程),顯然這是能夠完成的。框架

使用Thread.join()方法實現

public class RunningRaceTest {
    public static void main(String[] args) throws InterruptedException {
        Thread runner1 = new Thread(new Runner(), "1號");
        Thread runner2 = new Thread(new Runner(), "2號");
        Thread runner3 = new Thread(new Runner(), "3號");
        Thread runner4 = new Thread(new Runner(), "4號");
        Thread runner5 = new Thread(new Runner(), "5號");
        runner1.start();
        runner2.start();
        runner3.start();
        runner4.start();
        runner5.start();

        runner1.join();
        runner2.join();
        runner3.join();
        runner4.join();
        runner5.join();

        // 裁判等待5名選手準備完畢
        System.out.println("裁判:比賽開始~~");
    }
}

class Runner implements Runnable {
    @Override
    public void run() {
        try {
            int sleepMills = ThreadLocalRandom.current().nextInt(1000);
            Thread.sleep(sleepMills);
            System.out.println(Thread.currentThread().getName() + " 選手已就位, 準備共用時: " + sleepMills + "ms");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Thread.join()徹底能夠實現這個需求,不過存在一個問題,若是調用join的線程一直存活,則當前線程則須要一直等待。這顯然不夠靈活,而且當前線程可能會出現死等的狀況。dom

更加靈活的CountDownLatch

jdk1.5以後的併發包中提供了CountDownLatch併發工具了,也能夠實現join的功能,而且功能更增強大。ide

// 參賽選手線程
class Runner implements Runnable {
    private CountDownLatch countdownLatch;
    
    public Runner(CountDownLatch countdownLatch) {
        this.countdownLatch = countdownLatch;
    }

    @Override
    public void run() {
        try {
            int sleepMills = ThreadLocalRandom.current().nextInt(1000);
            Thread.sleep(sleepMills);
            System.out.println(Thread.currentThread().getName() + " 選手已就位, 準備共用時: " + sleepMills + "ms");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 準備完畢,舉手示意
            countdownLatch.countDown();
        }
    }
}

public class RunningRaceTest {
    public static void main(String[] args) throws InterruptedException {
        // 使用線程池的正確姿式
        int size = 5;
        AtomicInteger counter = new AtomicInteger();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(size, size, 1000, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), (r) -> new Thread(r, counter.addAndGet(1) + " 號 "), new ThreadPoolExecutor.AbortPolicy());
        
        CountDownLatch countDownLatch = new CountDownLatch(5);
        for (int i = 0; i < size; i++) {
            threadPoolExecutor.submit(new Runner(countDownLatch));
        }

        // 裁判等待5名選手準備完畢
        countDownLatch.await(); // 爲了不死等,也能夠添加超時時間
        System.out.println("裁判:比賽開始~~");

        threadPoolExecutor.shutdownNow();
    }
}

輸出結果:工具

5 號  選手已就位, 準備共用時: 20ms
4 號  選手已就位, 準備共用時: 156ms
1 號  選手已就位, 準備共用時: 288ms
2 號  選手已就位, 準備共用時: 519ms
3 號  選手已就位, 準備共用時: 945ms
比賽開始~~

同步屏障CyclicBarrier

CyclicBarrier能夠實現CountDownLatch同樣的功能,不一樣的是CountDownLatch屬於一次性對象,聲明後只能使用一次,而CyclicBarrier能夠循環使用ui

從字面意義上來看,CyclicBarrier表示循環的屏障,當一組線程所有都到達屏障時,屏障纔會被移除,不然只能阻塞在屏障處。this

public class RunningRace {
    public static void main(String[] args) {
        // 使用線程池的正確姿式
        int size = 5;
        AtomicInteger counter = new AtomicInteger();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(size, size, 1000, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), (r) -> new Thread(r, counter.addAndGet(1) + " 號 "), new ThreadPoolExecutor.AbortPolicy());

        CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println("裁判:比賽開始~~"));
        for (int i = 0; i < 10; i++) {
            threadPoolExecutor.submit(new Runner(cyclicBarrier));
        }
    }
}

class Runner implements Runnable {
    private CyclicBarrier cyclicBarrier;

    public Runner(CyclicBarrier countdownLatch) {
        this.cyclicBarrier = countdownLatch;
    }

    @Override
    public void run() {
        try {
            int sleepMills = ThreadLocalRandom.current().nextInt(1000);
            Thread.sleep(sleepMills);
            System.out.println(Thread.currentThread().getName() + " 選手已就位, 準備共用時: " + sleepMills + "ms" + cyclicBarrier.getNumberWaiting());
            cyclicBarrier.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

因爲CyclicBarrier能夠循環使用,因此CyclicBarrier的構造方法中能夠傳入一個Runnable參數,在每一輪執行完畢以後就會馬上執行這個Runnable任務

CountDownLatch設計與實現

CountDownLath是基於AQS框架的一種簡單實現,有兩個核心的方法,即await()和countDown(),經過構造方法傳入一個狀態值,調用await()方法時線程會阻塞,直到狀態碼被修改爲0時纔會返回,每次調用countDown()時會將狀態值減1。

wait方法:執行wait方法後,會嘗試獲取同步狀態,若是爲狀態爲0則方法繼續執行,否擇當前線程會被加入到同步隊列中,詳情可見筆者關於AQS的兩篇文章。

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 若是狀態碼不爲0,嘗試獲取同步狀態,若是失敗則被加入到同步隊列中
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
// 當狀態碼爲0時返回1,否擇返回-1,這個方法中參數沒有任何用處
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

countDown方法:每次執行countDown方法時,會將狀態碼的值減1.

public void countDown() {
    sync.releaseShared(1);
}

CyclicBarrier的設計與實現

CyclicBarrier與CountDownLatch實現思想相同,也是基於AQS框架實現。不一樣的是CyclicBarrier內部維護一個狀態值藉助基於AQS實現的鎖ReentrantLock來實現狀態值的同步更新,以及AQS除了同步狀態以外的另外一個核心概念條件隊列來完成線程的阻塞。

parties: 和CountdownLatch中的狀態值同樣,用來記錄每次要相互等待的線程數量,只有parties個線程同時到達屏障時,纔會喚醒阻塞的線程。

count臨時計數器: 因爲CyclicBarrier是能夠循環使用的,count能夠理解爲是一個臨時變量,每一輪執行完畢或者被打斷都會重置count爲parties值。

Generation內部類: 只有一個屬性 broken表示當前這一輪執行是否被中斷,若是被中斷後其餘線程再執行await方法會拋出異常(目的是中止本輪線程未執行線程的繼續執行)。

await方法: 當執行await方法時,會同步得對內部的count執行--count操做, 若是count = 0,則執行barrierCommand任務(經過構造方法傳來的Runnable參數)。

reset方法:中斷本輪執行,重置count值,喚醒等待的線程而後開始下一輪,此時本輪正在執行的線程調用await方法會拋出異常。

// await方法實際執行的代碼
private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
TimeoutException {
    final ReentrantLock lock = this.lock;
    // 加鎖,保證併發操做的一致性
    lock.lock();
    try {
        // 若是當前這一輪操做被中斷,拋出中斷異常(該異常只是起警示做用,沒有任何其餘信息)
        final Generation g = generation;
        if (g.broken)
            throw new BrokenBarrierException();
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }
        // 本輪執行的計數器 數值-1
        int index = --count;
        if (index == 0) {  // 計數器值=1, 本輪線程所有到達屏障,執行barrierCommand任務
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                nextGeneration();// 喚醒全部等待在條件隊列上的任務
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // 若是狀態不等於0,循環等待直到計數器值爲0,本輪執行被打破,線程被中斷,或者等待超時
        for (;;) {
            try {
                if (!timed)
                    // 狀態碼不爲0,將當前線程加入到條件隊列中,進入阻塞狀態
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken)
                throw new BrokenBarrierException();

            if (g != generation)
                return index;

            if (timed && nanos <= 0L) {
                breakBarrier();// 喚醒全部條件隊列中的線程,重置count的值
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

重置柵欄的狀態

public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        breakBarrier();   // break the current generation
        nextGeneration(); // start a new generation
    } finally {
        lock.unlock();
    }
}
/**
 * Sets current barrier generation as broken and wakes up everyone.
 * Called only while holding lock.
 */
private void breakBarrier() {
    generation.broken = true;
    count = parties;
    trip.signalAll();
}

當一輪執行完畢以後,既count=0後,CyclicBarrier的臨時狀態會重置爲parties

/**
 * 進入下一輪
 * 喚醒全部等待線程,充值count
 */
private void nextGeneration() {
    // signal completion of last generation
    trip.signalAll();
    // set up next generation
    count = parties;
    generation = new Generation();
}

總結

  1. CountDownLatch建立後只能使用一次,而CyclicBarrier能夠循環使用,而且CyclicBarrier功能更完善。
  2. CountDownLatch內部的狀態是基於AQS中的狀態信息,而CyclicBarrier中的狀態值是單獨維護的,使用ReentrantLock加鎖保證併發修改狀態值的數據一致性。
  3. 它們的使用場景:容許一個或多個線程等待其餘線程完成操做, 即當指定數量線程執行完某個操做再繼續執行下一個操做。

相關文章
相關標籤/搜索