CountDownLatch與CyclicBarrier的基本使用

1 概述

CountDownLatch以及CyclicBarrier都是Java裏面的同步工具之一,本文介紹了二者的基本原理以及基本使用方法。java

2 CountDownLatch

CountDownLatch是一個同步工具類,常見的使用場景包括:dom

  • 容許一個或多個線程等待一系列的其餘線程結束
  • 在串行化任務中須要進行並行化處理,並等待全部並行化任務結束,串行化任務才能繼續進行

好比考慮這樣一個場景,在一個電商網站中,用戶點擊了首頁,須要一部分的商品,同時顯示它們的價格,那麼,調用的流程應該是:工具

  • 獲取商品
  • 計算售價
  • 返回全部商品的最終售價

解決這樣的問題可使用串行化或並行化操做,串行化就是逐一計算商品的售價,並返回,並行化就是獲取商品後,並行計算每個商品的售價,最後返回,顯而後一種方案要比前一種要好,那麼這時候就能夠用上CountDownLatch了。網站

一份簡單的模擬代碼以下:this

import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static java.util.concurrent.ThreadLocalRandom.current;

public class CountDownLatchExample {
    public static void main(String[] args) throws InterruptedException{
        List<Price> list = IntStream.rangeClosed(1,10).mapToObj(Price::new).collect(Collectors.toList());
        //計數器大小爲商品列表的長度
        final CountDownLatch latch = new CountDownLatch(list.size());
        //線程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5,10,2, TimeUnit.SECONDS,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
        list.forEach(p-> executor.execute(()->{
            System.out.println("Product "+p.id+" start calculate price ");
            try{
                //隨機休眠模擬業務操做耗時
                TimeUnit.SECONDS.sleep(current().nextInt(10));
                p.setPrice(p.getPrice()*((p.getId() & 1) == 1 ? 0.9 : 0.7));
                System.out.println("Product "+p.id+" calculate price completed");
            }catch (InterruptedException e){
                e.printStackTrace();
            }finally {
                //每完成計算一個商品,將計數器減1,注意須要放在finally中
                latch.countDown();
            }
        }));
        //主線程阻塞直到全部的計數器爲0,也就是等待全部的子任務計算價格完畢
        latch.await();
        System.out.println("All of prices calculate finished");
        //手動終止,否則不會結束運行
        executor.shutdown();
    }

    private static class Price{
        private final int id;
        private double price;

        public Price(int id) {
            this.id = id;
        }

        public int getId() {
            return id;
        }

        public double getPrice() {
            return price;
        }

        public void setPrice(double price) {
            this.price = price;
        }
    }
}

輸出:spa

在這裏插入圖片描述

代碼比較簡單,關鍵地方用上了註釋,能夠看到代碼執行順序以下:線程

  • 建立多個任務計算商品的價格
  • 主線程阻塞
  • 計算完成後,將計數器減1
  • 當計數器爲0時,主線程退出阻塞狀態

值得注意的是計數器減1的操做須要放在finally中,由於有可能會出現異常,若是出現異常致使計數器不能減小,那麼主線程會一直阻塞。code

另外,CountDownLatch還有一個await(long timeout,TimeUnit unit)方法,是帶有超時參數的,也就是說,若是在超時時間內,計數器的值仍是大於0(還有任務沒執行完成),會使得當前線程退出阻塞狀態。隊列

3 CyclicBarrier

CyclicBarrierCountDownLatch有不少相似的地方,也是一個同步工具類,容許多個線程在執行完相應的操做以後彼此等待到達同一個barrier point(屏障點)。CyclicBarrier也適合某個串行化的任務被拆分爲多個並行化任務,這點與CountDownLatch相似,可是CyclicBarrier具有的一個更強大的功能是,CyclicBarrier能夠被重複使用。圖片

3.1 等待完成

先簡單說一下CyclicBarrier的實現原理:

  • 初始化CyclicBarrier,傳入一個int參數,表示分片(parites),一般意義上來講分片數就是任務的數量
  • 同時串行化執行多個任務
  • 任務執行完成後,調用await(),等待其餘線程也到達barrier point
  • 當全部線程到達後,繼續以串行化方式運行任務

常見的使用方法是設置分片數爲任務數+1,這樣,能夠在主線程中執行await(),等待全部子任務完成。好比下面是使用CyclicBarrier實現一樣功能的模擬代碼:

import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static java.util.concurrent.ThreadLocalRandom.current;

public class CountDownLatchExample {
    public static void main(String[] args) throws InterruptedException,BrokenBarrierException{
        List<Price> list = IntStream.rangeClosed(1,10).mapToObj(Price::new).collect(Collectors.toList());
        final CyclicBarrier barrier = new CyclicBarrier(11);
        ThreadPoolExecutor executor = new ThreadPoolExecutor(10,10,2, TimeUnit.SECONDS,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
        list.forEach(p-> executor.execute(()->{
            System.out.println("Product "+p.id+" start calculate price ");
            try{
                TimeUnit.SECONDS.sleep(current().nextInt(10));
                p.setPrice(p.getPrice()*((p.getId() & 1) == 1 ? 0.9 : 0.7));
                System.out.println("Product "+p.id+" calculate price completed");
            }catch (InterruptedException e){
                e.printStackTrace();
            }finally {
                try{
                    barrier.await();
                }catch (InterruptedException | BrokenBarrierException e){
                    e.printStackTrace();
                }
            }
        }));
        barrier.await();
        System.out.println("All of prices calculate finished");
        executor.shutdown();
    }

    private static class Price{
        private final int id;
        private double price;

        public Price(int id) {
            this.id = id;
        }

        public int getId() {
            return id;
        }

        public double getPrice() {
            return price;
        }

        public void setPrice(double price) {
            this.price = price;
        }
    }
}

輸出相同,代碼大部分類似,不一樣的地方有:

  • latch.countDown()替換成了barrier.await()
  • latch.await()替換成了barrier.await()
  • 線程池的核心線程數替換成了10

await()方法會等待全部的線程到達barrier point,上面代碼執行流程簡述以下:

  • 初始化CyclicBarrier,分片數爲11(子線程數+1)
  • 主線程調用await(),等待子線程執行完成
  • 子線程各自進行商品價格的計算,計算完成後,調用await(),等待其餘線程也到達barrier point
  • 當全部子線程計算完成後,因爲沒有後續操做,因此子線程運行結束,同時因爲主線程還有後續操做,會先輸出提示信息再終止線程池

注意一個很大的不一樣就是這裏的線程池核心線程數目改爲了 10,那麼,爲何須要10?

由於若是是設置一個小於10的核心線程個數,因爲線程池是會先建立核心線程來執行任務,核心線程滿了以後,放進任務隊列中,而假設只有5個核心線程,那麼:

  • 5個線程進行計算價格
  • 另外5個任務放在任務隊列中

這樣的話,會出現死鎖,由於計算中的線程須要隊列中的任務到達barrier point才能結束,而隊列中的任務須要核心線程計算完畢後,才能調度出來計算,這樣死鎖就出現了。

3.2 重複使用

CyclicBarrierCountDownLatch的一個最大不一樣是,CyclicBarrier能夠被重複使用,原理上來講,await()會將內部計數器減1,當計數器減爲0時,會自動進行計數器(分片數)重置。好比,在上面的代碼中,因爲趕上促銷活動,須要對商品的價格再次進行計算:

import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static java.util.concurrent.ThreadLocalRandom.current;

public class CountDownLatchExample {
    public static void main(String[] args) throws InterruptedException,BrokenBarrierException{
        List<Price> list = IntStream.rangeClosed(1,10).mapToObj(Price::new).collect(Collectors.toList());
        final CyclicBarrier barrier = new CyclicBarrier(11);
        ThreadPoolExecutor executor = new ThreadPoolExecutor(10,10,2, TimeUnit.SECONDS,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
        list.forEach(p-> executor.execute(()->{
            System.out.println("Product "+p.id+" start calculate price.");
            try{
                TimeUnit.SECONDS.sleep(current().nextInt(10));
                p.setPrice(p.getPrice()*((p.getId() & 1) == 1 ? 0.9 : 0.7));
                System.out.println("Product "+p.id+" calculate price completed.");
            }catch (InterruptedException e){
                e.printStackTrace();
            }finally {
                try{
                    barrier.await();
                }catch (InterruptedException | BrokenBarrierException e){
                    e.printStackTrace();
                }
            }
        }));
        barrier.await();
        System.out.println("All of prices calculate finished.");
        
        //複製的一段相同代碼
        list.forEach(p-> executor.execute(()->{
            System.out.println("Product "+p.id+" start calculate price again.");
            try{
                TimeUnit.SECONDS.sleep(current().nextInt(10));
                p.setPrice(p.getPrice()*((p.getId() & 1) == 1 ? 0.9 : 0.7));
                System.out.println("Product "+p.id+" calculate price completed.");
            }catch (InterruptedException e){
                e.printStackTrace();
            }finally {
                try{
                    barrier.await();
                }catch (InterruptedException | BrokenBarrierException e){
                    e.printStackTrace();
                }
            }
        }));
        barrier.await();
        System.out.println("All of prices calculate finished again.");
        executor.shutdown();
    }

    private static class Price{
        private final int id;
        private double price;

        public Price(int id) {
            this.id = id;
        }

        public int getId() {
            return id;
        }

        public double getPrice() {
            return price;
        }

        public void setPrice(double price) {
            this.price = price;
        }
    }
}

將計算價格的代碼複製一遍,其中沒有手動修改計數器,只是調用await(),輸出以下:

在這裏插入圖片描述

能夠看到,並無對CycliBarrier進行相似reset之類的操做,可是依然能按正常邏輯運行,這是由於await()內部會維護一個計數器,當計數器爲0的時候,會自動進行重置,下面是await()OpenJDK 11下的源碼:

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return this.dowait(false, 0L);
    } catch (TimeoutException var2) {
        throw new Error(var2);
    }
}
    
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
    ReentrantLock lock = this.lock;
    lock.lock();

    byte var9;
    try {
        //...
        int index = --this.count;
        if (index != 0) {
            //計數器不爲0的狀況
            //....
        }

        boolean ranAction = false;

        try {
            Runnable command = this.barrierCommand;
            if (command != null) {
                command.run();
            }

            ranAction = true;
            
            this.nextGeneration();
            var9 = 0;
        } finally {
            if (!ranAction) {
                this.breakBarrier();
            }

        }
    } finally {
        lock.unlock();
    }

    return var9;
}

private void nextGeneration() {
    this.trip.signalAll();
    this.count = this.parties;
    this.generation = new CyclicBarrier.Generation();
}

當計數器爲0時,會生成新的Generation,並將var9置爲0,最後返回var9(在這個方法中var9只有一處賦值,就是代碼中的var9=0,能夠理解成直接返回0)。

3.3 CyclicBarrier其餘的一些經常使用方法

  • CyclicBarrier(int parties,Runnable barrierAction):構造的時候傳入一個Runnable,表示全部線程到達barrier point時,會調用該Runnable
  • await(long timeout,TimeUnit unit):與無參的await()相似,底層調用的是相同的doWait(),不過增長了超時功能
  • isBroken():返回broken狀態,某個線程因爲執行await而進入阻塞,此時若是執行了中斷操做(好比interrupt),那麼isBroken()會返回true。須要注意,處於broken狀態的CyclicBarrier不能被直接使用,須要調用reset()進行重置

4 總結

下面是CountDownLatchCyclicBarrier的一些簡單比較,相同點以下:

  • 都是java.util.concurrent包下的線程同步工具類
  • 均可以用於「主線程阻塞一直等待,直到子任務完成,主線程才繼續執行」的狀況

不一樣點:

  • CountDownLatchawait()方法會等待計數器歸0,而CyclicBarrierawait()會等待其餘線程到達barrier point
  • CyclicBarrier內部的計數器是能夠被重置的,可是CountDownLatch不能夠
  • CyclicBarrier是由LockCondition實現的,而CountDownLatch是由同步控制器AQS實現的
  • 構造時CyclicBarrier不容許parties爲0,而CountDownLatch容許count爲0
相關文章
相關標籤/搜索