AQS同步組件--CyclicBarrier

CyclicBarrier

CyclicBarrier也是一個同步輔助類,它容許一組線程相互等待直到到達某個工做屏障點,經過他能夠完成多線程之間的相互等待。每一個線程都就緒以後才能執行後面的操做。和CountLatch有類似的地方都是經過計數器來實現的。當某個線程執行了await()方法後就進入等待狀態,計數器進行加1操做,當增長後的值達到咱們設定的值後,線程被喚醒,繼續執行後續操做。CyclicBarrier是可重用的計數器,CyclicBarrier的使用場景和CountDownLatch的使用場景很類似,能夠用於多線程計算數據最後總計結果。

CyclicBarrier和CountDownLatch的使用區別:java

  1. CountDownLatch的計數器只能使用一次,CyclicBarrier能夠用reset方法重置。
  2. CountDownLatch是一個線程等待其餘線程完成某個操做後才能繼續執行。也就是一個或個多線程等待其餘的關係,而CyclicBarrier是實現了多個線程之間的相互等待,全部線程都知足了條件以後才能繼續使用。

演示代碼多線程

@Slf4j
public class CyclicBarrierExample1 {

    private static CyclicBarrier barrier = new CyclicBarrier(5);

    public static void main(String[] args) throws Exception {

        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(() -> {
                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        executor.shutdown();
    }

    private static void race(int threadNum) throws Exception {
        Thread.sleep(1000);
        log.info("{} is ready", threadNum);
        barrier.await();
        log.info("{} continue", threadNum);
    }
}

輸出結果以下:線程

20:43:46.324 [pool-1-thread-1] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 0 is ready
20:43:47.322 [pool-1-thread-2] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 1 is ready
20:43:48.323 [pool-1-thread-3] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 2 is ready
20:43:49.323 [pool-1-thread-4] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 3 is ready
20:43:50.325 [pool-1-thread-5] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 4 is ready
20:43:50.325 [pool-1-thread-5] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 4 continue
20:43:50.325 [pool-1-thread-1] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 0 continue
20:43:50.325 [pool-1-thread-2] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 1 continue
20:43:50.325 [pool-1-thread-4] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 3 continue
20:43:50.325 [pool-1-thread-3] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 2 continue
20:43:51.325 [pool-1-thread-6] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 5 is ready
20:43:52.326 [pool-1-thread-1] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 6 is ready
20:43:53.326 [pool-1-thread-2] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 7 is ready
20:43:54.326 [pool-1-thread-4] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 8 is ready
20:43:55.327 [pool-1-thread-3] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 9 is ready
20:43:55.327 [pool-1-thread-3] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 9 continue
20:43:55.327 [pool-1-thread-6] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 5 continue
20:43:55.327 [pool-1-thread-1] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 6 continue
20:43:55.327 [pool-1-thread-2] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 7 continue
20:43:55.327 [pool-1-thread-4] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 8 continue

咱們定義了 private static CyclicBarrier barrier = new CyclicBarrier(5),每次調用await後加1 知道等於5就一塊兒執行接下來的操做。code

咱們在建立CyclicBarrier的時候是能夠傳入一段runable的,下面看一下代碼同步

@Slf4j
public class CyclicBarrierExample3 {

    private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
        log.info("callback is running");
    });

    public static void main(String[] args) throws Exception {

        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(() -> {
                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        executor.shutdown();
    }

    private static void race(int threadNum) throws Exception {
        Thread.sleep(1000);
        log.info("{} is ready", threadNum);
        barrier.await();
        log.info("{} continue", threadNum);
    }
}

在線程到達=執行屏障時,線程會優先執行這個runableit

private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
        log.info("callback is running");
    });
相關文章
相關標籤/搜索