CountDownLatch、CyclicBarrier、Semaphore、Exchanger 的詳細解析


本文主要介紹和對比咱們經常使用的幾種併發工具類,主要涉及 CountDownLatchCyclicBarrierSemaphoreExchanger 相關的內容,若是對多線程相關內容不熟悉,能夠看筆者以前的一些文章:java


  • 介紹 CountDownLatchCyclicBarrier 二者的使用與區別,他們都是等待多線程完成,是一種併發流程的控制手段,
  • 介紹 SemaphoreExchanger 的使用,semaphore 是信號量,能夠用來控制容許的線程數,而 Exchanger 能夠用來交換兩個線程間的數據。

CountDownLatch

  • CountDownLatchJDK5 以後加入的一種併發流程控制工具,它在 java.util.concurrent 包下
  • CountDownLatch 容許一個或多個線程等待其餘線程完成操做,這裏須要注意,是能夠是一個等待也能夠是多個來等待
  • CountDownLatch 的構造函數以下,它接受一個 int 類型的參數做爲計數器,即若是你想等待N 個線程完成,那麼這裏就傳入 N
public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
  • 其中有兩個核心的方法 countDownawait ,其中 當咱們調用 countDown 方法時相應的 N 的值減 1,而 await 方法則會阻塞當前線程,直到 N 的值變爲零。
  • 提及來比較抽象,下面咱們經過實際案例來講明。

多個線程等待一個線程

  • 在咱們生活中最典型的案例就是體育中的跑步,假設如今咱們要進行一場賽跑,那麼全部的選手都須要等待裁判員的起跑命令,這時候,咱們將其抽象化每一個選手對應的是一個線程,而裁判員也是一個線程,那麼就是多個選手的線程再等待裁判員線程的命令來執行
  • 咱們經過 CountDownLatch 來實現這一案例,那麼等待的個數 N 就是上面的裁判線程的個數,即爲 1,
/**
     * @url i-code.onlien
     * 雲棲簡碼
     */
    public static void main(String[] args) throws InterruptedException {
        //模擬跑步比賽,裁判說開始,全部選手開始跑,咱們可使用countDownlatch來實現

        //這裏須要等待裁判說開始,因此時等着一個線程
        CountDownLatch countDownLatch = new CountDownLatch(1);

        new Thread(() ->{
            try {
                System.out.println(Thread.currentThread().getName() +"已準備");
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+"開始跑~~");

        },"選手1").start();
        new Thread(() ->{
            try {
                System.out.println(Thread.currentThread().getName() +"已準備");
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+"開始跑~~");

        },"選手2").start();

        TimeUnit.SECONDS.sleep(1);
        System.out.println("裁判:預備~~~");
        countDownLatch.countDown();
        System.out.println("裁判:跑~~~");
    }
  • 運行結果以下:

在上述代碼中,咱們首先建立了一個計數爲1 的 CountDownLatch 對象,這表明咱們須要等待的線程數,以後再建立了兩個線程,用來表明選手線程,同時在選手的線程中咱們都調用了 await 方法,讓線程進入阻塞狀態,直到CountDownLatch的計數爲零後再執行後面的內容,在主線程 main 方法中咱們等待 1秒後執行 countDown 方法,這個方法就是減一,此時的 N 則爲零了,那麼選手線程則開始執行後面的內容,總體的輸出如上圖所示面試

一個/多個線程等待多個線程

  • 一樣從咱們生活中的場景來抽象,假設公司要組織出遊,大巴車接送,當湊夠五我的大巴車則發車出發,這裏就是大巴車須要等待這五我的所有到齊才能繼續執行,咱們抽象以後用 CountDownLatch 來實現,那麼的計數個數 N 則爲5,由於要等待這五個,經過代碼實現以下:
public static void main(String[] args) throws InterruptedException {
        /**
         * i-code.online
         * 雲棲簡碼 
         */
        //等待的個數
        CountDownLatch countDownLatch = new CountDownLatch(5);

        for (int i = 0; i < 5; i++) {
            new Thread(()->{
                System.out.println(Thread.currentThread().getName() + "從住所出發...");
                try {
                    TimeUnit.SECONDS.sleep((long) (Math.random()*10));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + " 到達目的地-----");
                countDownLatch.countDown();
            },"人員-"+i).start();
        }

        System.out.println("大巴正在等待人員中.....");
        countDownLatch.await();
        System.out.println("-----全部人到齊,出發-----");
    }
  • 上面代碼執行結果以下:

從上述代碼中咱們能夠看到,定義了一個計數爲5的 countDownLatch ,以後經過循環建立五個線程,模擬五我的員,當他們到達指定地點後執行 countDown 方法,對計數減一。主線程至關因而大巴車的線程,執行 await 方法進行阻塞,只有當 N 的值減到0後則執行後面的輸出算法

CountDownLatch 主要方法介紹

  • 構造函數:
public CountDownLatch(int count) {  };

它的構造函數是傳入一個參數,該參數 count 是須要倒數的數值。數據庫

  • await() :調用 await() 方法的線程開始等待,直到倒數結束,也就是 count 值爲 0 的時候纔會繼續執行。
  • await(long timeout, TimeUnit unit)await() 有一個重載的方法,裏面會傳入超時參數,這個方法的做用和 await() 相似,可是這裏能夠設置超時時間,若是超時就再也不等待了。
  • countDown():把數值倒數 1,也就是將 count 值減 1,直到減爲 0 時,以前等待的線程會被喚起。

上面的案例介紹了 CountDownLatch 的使用,可是 CountDownLatch 有個特色,那就是不可以重用,好比已經完成了倒數,那可不能夠在下一次繼續去從新倒數呢?是能夠的,一旦倒數到0 則結束了,沒法再次設置循環執行,可是咱們實際需求中有不少場景中須要循環來處理,這時候咱們可使用 CyclicBarrier 來實現編程

CyclicBarrier

  • CyclicBarrierCountDownLatch 比較類似,當等待到必定數量的線程後開始執行某個任務
  • CyclicBarrier 的字面意思是能夠循環使用的屏障,它的功能就是讓一組線程到達一個屏障(同步點)時被阻塞,直到最後一個線程到達屏障時,屏障纔會開會,此時全部被屏障阻塞的線程都將繼續執行。以下演示

  • 上圖中能夠看到,到線程到達屏障後阻塞,直到最後一個也到達後,則所有放行
  • 首先咱們來看下它的構造函數,以下:
public CyclicBarrier(int parties) {
        this(parties, null);
    }

    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
  • CyclicBarrier(int parties) 構造函數提供了int 類型的參數,表明的是須要攔截的線程數量,而每一個線程經過調用 await 方法來告訴 CyclicBarrier 我到達屏障點了,而後阻塞
  • CyclicBarrier(int parties, Runnable barrierAction) 構造函數是爲咱們提供的一個高級方法,加了一個 barrierAction 的參數,這是一個Runnable類型的,也就是一個線程,它表示當全部線程到達屏障後,清閒觸發 barrierAction 線程執行,再執行各個線程以後的內容

案例

  • 假設你要和你女友約會,約定了一個時間地點,那麼無論大家誰先到都會等待另外一個到纔會出發取約會~ 那麼這時候咱們經過CyclicBarrier 的來實現,這裏咱們須要來攔截的線程就是兩個。具體實現 以下:
/*
    CyclicBarrier 與countDownLatch 比較類似,也是等待線程完成,
    不過countDownLatch 是await等待其餘的線程經過countDown的數量,達到必定數則執行,
    而 CyclicBarrier 則是直接看await的數量,達到必定數量直接所有執行,
     */
    public static void main(String[] args) {
        //比如情侶約會,無論誰先到都的等另外一個,這裏就是兩個線程,
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

        new Thread(() ->{
            System.out.println("快速收拾,出門~~~");
            try {
                TimeUnit.MILLISECONDS.sleep(500);
                System.out.println("到了約會地點等待女友前來~~");
                cyclicBarrier.await();
                System.out.println("女友到來嗨皮出發~~約會");
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }

        },"男友").start();
        new Thread(() ->{
            System.out.println("慢慢收拾,出門~~~");
            try {
                TimeUnit.MILLISECONDS.sleep(5000);
                System.out.println("到了約會地點等待男友前來~~");
                cyclicBarrier.await();
                System.out.println("男友到來嗨皮出發~~約會");
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        },"女友").start();

    }
  • 代碼執行結果以下:

上面代碼,相對簡單,建立一個攔截數爲2的屏障,以後建立兩個線程,調用await方法,只有當調用兩次纔會觸發後面的流程。微信

  • 咱們再寫一個案例sh,使用含有Runnable 參數的構造函數;和以前 CountDownLatch 的案例類似,公司組織出遊,這時候確定有不少大巴在等待接送,大巴不會等全部的 人都到纔出發,而是每坐滿一輛車就出發一輛,這種場景咱們就可使用 CyclicBarrier 來實現,實現以下:
/*
    CyclicBarrier是可重複使用到,也就是每當幾個知足是再也不等待執行,
    好比公司組織出遊,安排了好多輛大把,每坐滿一輛就發車,再也不等待,相似這種場景,實現以下:
     */

    public static void main(String[] args) {
        //公司人數
        int peopleNum = 2000;
        //每二十五我的一輛車,湊夠二十五則發車~
        CyclicBarrier cyclicBarrier = new CyclicBarrier(25,() ->{
            //達到25人出發
            System.out.println("------------25人數湊齊出發------------");
        });

        for (int j = 1; j <= peopleNum; j++) {
            new Thread(new PeopleTask("People-"+j,cyclicBarrier)).start();
        }

    }

    static class PeopleTask implements Runnable{

        private String name;
        private  CyclicBarrier cyclicBarrier;
        public PeopleTask(String name,CyclicBarrier cyclicBarrier){
            this.name = name;
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            System.out.println(name+"從家裏出發,正在前往聚合地....");
            try {
                TimeUnit.MILLISECONDS.sleep(((int) Math.random()*1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(name+"到達集合地點,等待其餘人..");
            try {
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }

        }
    }

CyclicBarrier 和 CountDownLatch 的異同

相同點:多線程

  • 都能阻塞一個或一組線程,直到某個預設的條件達成發生,再統一出發

不一樣點:併發

  • 可重複性:CountDownLatch 的計數器只能使用一次,到到達0後就不能再次使用了,除非新建實例;而 CyclicBarrier 的計數器是能夠複用循環的,因此 CyclicBarrier 能夠用在更復雜的場景,能夠隨時調用 reset方法來重製攔截數,如計算髮生錯誤時能夠直接充值計數器,讓線程從新執行一次。
  • 做用對象:CyclicBarrier 要等固定數量的線程都到達了屏障位置才能繼續執行,而 CountDownLatch 只需等待數字倒數到 0,也就是說 CountDownLatch 做用於事件,但 CyclicBarrier 做用於線程;CountDownLatch 是在調用了 countDown 方法以後把數字倒數減 1,而 CyclicBarrier 是在某線程開始等待後把計數減 1
  • 執行動做:CyclicBarrier 有執行動做 barrierAction,而 CountDownLatch 沒這個功能。

Semaphore

  • Semaphore (信號量)是用來控制同時訪問特定資源的線程數量,它經過協調各個線程,以保證合理的使用公共資源,

  • 從圖中能夠看出,信號量的一個最主要的做用就是,來控制那些須要限制併發訪問量的資源。具體來說,信號量會維護「許可證」的計數,而線程去訪問共享資源前,必須先拿到許可證(acquire 方法)。線程能夠從信號量中去「獲取」一個許可證,一旦線程獲取以後,信號量持有的許可證就轉移過去了,因此信號量手中剩餘的許可證要減一。
  • 同理,線程也能夠「釋放」一個許可證,若是線程釋放了許可證(release 方法),這個許可證至關於被歸還給信號量了,因而信號量中的許可證的可用數量加一。當信號量擁有的許可證數量減到 0 時,若是下個線程還想要得到許可證,那麼這個線程就必須等待,直到以前獲得許可證的線程釋放,它才能獲取。因爲線程在沒有獲取到許可證以前不能進一步去訪問被保護的共享資源,因此這就控制了資源的併發訪問量,這就是總體思路。

案例

  • 如咱們平時開發中典型的數據庫操做,這是一個密集IO 操做,咱們能夠啓動不少線程可是數據庫的鏈接池是有限制的,假設咱們設置容許五個連接,若是咱們開啓太多線程直接操做則會出現異常,這時候咱們能夠經過信號量來控制,讓一直最多隻有五個線程來獲取鏈接。代碼以下:
/*
        Semaphore 是信號量, 能夠用來控制線程的併發數,能夠協調各個線程,以達到合理的使用公共資源
     */

    public static void main(String[] args) {
        //建立10個容量的線程池
        final ExecutorService service = Executors.newFixedThreadPool(100);
        //設置信號量的值5 ,也就是容許五個線程來執行
        Semaphore s = new Semaphore(5);
        for (int i = 0; i < 100; i++) {
            service.submit(() ->{
                try {
                    s.acquire();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                try {
                    System.out.println("數據庫耗時操做"+Thread.currentThread().getName());
                    TimeUnit.MILLISECONDS.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "正在執行....");
                s.release();
            });
        }

    }

如上代碼,建立了一個容量100的線程池,模擬咱們程序中大量的線程,添加一百個任務,讓線程池執行。建立了一個容量爲5的信號量,在線程中咱們調用 acquire 來得到信號量的許可,只有得到了才能只能下面的內容否則阻塞。當執行完後釋放該許可,經過 release 方法,dom

  • 經過上面的演示,有沒有以爲很是眼熟,對,就是和咱們以前接觸過的鎖很類似,只是鎖是隻容許一個線程訪問,那咱們能不能將信號量的容量設置爲1呢? 這固然是能夠的,當咱們設置爲1時其實就和咱們的鎖的功能是一致的,以下代碼:
private static int count = 0;
    /*
        Semaphore 中若是咱們容許的的許可證數量爲1 ,那麼它的效果與鎖類似。
     */
    public static void main(String[] args) throws InterruptedException {
        final ExecutorService service = Executors.newFixedThreadPool(10);

        Semaphore semaphore = new Semaphore(1);
        for (int i = 0; i < 10000; i++) {
            service.submit(() ->{
                try {
                    semaphore.acquire();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "執行了");
                count ++;
                semaphore.release();
            });
        }
        service.shutdown();
        TimeUnit.SECONDS.sleep(5);
        System.out.println(count);

    }

其餘主要方法介紹

  • public boolean tryAcquire()tryAcquire 和鎖的 trylock 思惟是一致的,是嘗試獲取許可證,至關於看看如今有沒有空閒的許可證,若是有就獲取,若是如今獲取不到也不要緊,沒必要陷入阻塞,能夠去作別的事。
  • public boolean tryAcquire(long timeout, TimeUnit unit):是一個重載的方法,它裏面傳入了超時時間。好比傳入了 3 秒鐘,則意味着最多等待 3 秒鐘,若是等待期間獲取到了許可證,則往下繼續執行;若是超時時間到,依然獲取不到許可證,它就認爲獲取失敗,且返回 false。
  • int availablePermits():返回此信號量中當前可用的許可證數
  • int getQueueLength():返回正在等待許可證的線程數
  • boolean hasQueuedThreads():判斷是否有線程正在等待獲取許可證
  • void reducePermits(int reduction):減小 reduction 個許可證,是個 protected 方法
  • Collection<Thread> getQueuedThreads():返回正在等待獲取許可證的線程集合,是個 protected 方法

Exchanger

  • Exchanger(交換者)是一個用於線程間協做的工具類,它主要用於進行線程間數據的交換,它有一個同步點,當兩個線程到達同步點時能夠將各自的數據傳給對方,若是一個線程先到達同步點則會等待另外一個到達同步點,到達同步點後調用 exchange 方法能夠傳遞本身的數據而且得到對方的數據。
  • 咱們假設如今須要錄入一些重要的帳單信息,爲了保證準備,讓兩我的分別錄入,以後再進行對比後是否一致,防止錯誤繁盛。下面經過代碼來演示:
public class ExchangerTest {

    /*
    Exchanger 交換, 用於線程間協做的工具類,能夠交換線程間的數據,
    其提供一個同步點,當線程到達這個同步點後進行數據間的交互,遺傳算法能夠如此來實現,
    以及校對工做也能夠如此來實現
     */

    public static void main(String[] args) {
        /*
        模擬 兩個工做人員錄入記錄,爲了防止錯誤,二者錄的相同內容,程序僅從校對,看是否有錯誤不一致的
         */

        //開闢兩個容量的線程池
        final ExecutorService service = Executors.newFixedThreadPool(2);

        Exchanger<InfoMsg> exchanger = new Exchanger<>();

        service.submit(() ->{
            //模擬數據 線程 A的
            InfoMsg infoMsg = new InfoMsg();
            infoMsg.content="這是線程A";
            infoMsg.id ="10001";
            infoMsg.desc = "1";
            infoMsg.message = "message";
            System.out.println("正在執行其餘...");
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                final InfoMsg exchange = exchanger.exchange(infoMsg);
                System.out.println("線程A 交換數據====== 獲得"+ exchange);
                if (!exchange.equals(infoMsg)){
                    System.out.println("數據不一致~~請稽覈");
                    return;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        service.submit(() ->{
            //模擬數據 線程 B的
            InfoMsg infoMsg = new InfoMsg();
            infoMsg.content="這是線程B";
            infoMsg.id ="10001";
            infoMsg.desc = "1";
            infoMsg.message = "message";
            System.out.println("正在執行其餘...");
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                final InfoMsg exchange = exchanger.exchange(infoMsg);
                System.out.println("線程B 交換數據====== 獲得"+ exchange);
                if (!exchange.equals(infoMsg)){
                    System.out.println("數據不一致~~請稽覈");
                    return;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        service.shutdown();
    }

    static class InfoMsg{
        String id;
        String name;
        String message;
        String content;
        String desc;

        @Override
        public String toString() {
            return "InfoMsg{" +
                    "id='" + id + '\'' +
                    ", name='" + name + '\'' +
                    ", message='" + message + '\'' +
                    ", content='" + content + '\'' +
                    ", desc='" + desc + '\'' +
                    '}';
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            InfoMsg infoMsg = (InfoMsg) o;
            return Objects.equals(id, infoMsg.id) &&
                    Objects.equals(name, infoMsg.name) &&
                    Objects.equals(message, infoMsg.message) &&
                    Objects.equals(content, infoMsg.content) &&
                    Objects.equals(desc, infoMsg.desc);
        }

        @Override
        public int hashCode() {
            return Objects.hash(id, name, message, content, desc);
        }
    }
}
  • 運行結果以下:

上面代碼運行能夠看到,當咱們線程 A/B 到達同步點即調用 exchange 後進行數據的交換,拿到對方的數據再與本身的數據對比能夠作到稽覈 的效果ide

  • Exchanger 一樣能夠用於遺傳算法中,選出兩個對象進行交互兩個的數據經過交叉規則獲得兩個混淆的結果。
  • Exchanger 中嗨提供了一個方法 public V exchange(V x, long timeout, TimeUnit unit) 主要是用來防止兩個程序中一個一直沒有執行 exchange 而致使另外一個一直陷入等待狀態,這是能夠用這個方法,設置超時時間,超過這個時間則再也不等待。


本文由AnonyStar 發佈,可轉載但需聲明原文出處。
歡迎關注微信公帳號 :雲棲簡碼 獲取更多優質文章
更多文章關注筆者博客 :雲棲簡碼 i-code.online

相關文章
相關標籤/搜索