CountDownLatch和CyclicBarrier 傻傻的分不清?超長精美圖文又來了

crescent-4875339_1280.jpg

日拱一兵 | 原創html

  • 你有一個思想,我有一個思想,咱們交換後,一我的就有兩個思想
  • If you can NOT explain it simply, you do NOT understand it well enough

前言

併發編程的三大核心是分工同步互斥。在平常開發中,常常會碰到須要在主線程中開啓多個子線程去並行的執行任務,而且主線程須要等待全部子線程執行完畢再進行彙總的場景,這就涉及到分工與同步的內容了java

在講 有序性可見性,Happens-before來搞定 時,提到過 join() 規則,使用 join() 就能夠簡單的實現上述場景:node

@Slf4j
public class JoinExample {

    public static void main(String[] args) throws InterruptedException {
        Thread thread1 = new Thread(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                log.info("Thread-1 執行完畢");
            }
        }, "Thread-1");

        Thread thread2 = new Thread(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                log.info("Thread-2 執行完畢");
            }
        }, "Thread-2");

        thread1.start();
        thread2.start();

        thread1.join();
        thread2.join();

        log.info("主線程執行完畢");
    }
}

運行結果:編程

整個過程能夠這麼理解api

咱們來查看 join() 的實現源碼:多線程

其實現原理是不停的檢查 join 線程是否存活,若是 join 線程存活,則 wait(0) 永遠的等下去,直至 join 線程終止後,線程的 this.notifyAll() 方法會被調用(該方法是在 JVM 中實現的,JDK 中並不會看到源碼),退出循環恢復主線程執行。很顯然這種循環檢查的方式比較低效併發

除此以外,使用 join() 缺乏不少靈活性,好比實際項目中不多讓本身單首創建線程(緣由在 我會手動建立線程,爲何要使用線程池? 中說過)而是使用 Executor, 這進一步減小了 join() 的使用場景,因此 join() 的使用在多數是停留在 demo 演示上oracle

那如何實現文中開頭提到的場景呢?

CountDownLatch

CountDownLatch, 直譯過來【數量向下門閂】,那確定裏面有計數器的存在了。咱們將上述程序用 CountDownLatch 實現一下,先讓你們有個直觀印象app

@Slf4j
public class CountDownLatchExample {

    private static CountDownLatch countDownLatch = new CountDownLatch(2);

    public static void main(String[] args) throws InterruptedException {
        // 這裏不推薦這樣建立線程池,最好經過 ThreadPoolExecutor 手動建立線程池
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        executorService.submit(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                log.info("Thread-1 執行完畢");
                //計數器減1
                countDownLatch.countDown();
            }
        });

        executorService.submit(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                log.info("Thread-2 執行完畢");
                //計數器減1
                countDownLatch.countDown();
            }
        });

        log.info("主線程等待子線程執行完畢");
        log.info("計數器值爲:" + countDownLatch.getCount());
        countDownLatch.await();
        log.info("計數器值爲:" + countDownLatch.getCount());
        log.info("主線程執行完畢");
        executorService.shutdown();
    }
}

運行結果以下:less

結合上述示例的運行結果,相信你也能猜出 CountDownLatch 的實現原理了:

  1. 初始化計數器數值,好比爲2
  2. 子線程執行完則調用 countDownLatch.countDown() 方法將計數器數值減1
  3. 主線程調用 await() 方法阻塞本身,直至計數器數值爲0(即子線程所有執行結束)
不知道你是否注意, countDownLatch.countDown(); 這行代碼能夠寫在子線程執行的任意位置,不像 join() 要徹底等待子線程執行完,這也是 CountDownLatch 靈活性的一種體現

上述的例子仍是過於簡單,Oracle 官網 CountDownLatch 說明 有兩個很是經典的使用場景,示例很簡單,強烈建議查看相關示例代碼,打開使用思路。我將兩個示例代碼以圖片的形式展現在此處:

官網示例1

  • 第一個是開始信號 startSignal,阻止任何工人 Worker 繼續工做,直到司機 Driver 準備好讓他們繼續工做
  • 第二個是完成信號 doneSignal,容許司機 Driver 等待,直到全部的工人 Worker 完成。

官網示例2

另外一種典型的用法是將一個問題分紅 N 個部分 (好比將一個大的 list 拆分紅多分,每一個 Worker 幹一部分),Worker 執行完本身所處理的部分後,計數器減1,當全部子部分完成後,Driver 才繼續向下執行

結合官網示例,相信你已經能夠結合你本身的業務場景解,經過 CountDownLatch 解決一些串行瓶頸來提升運行效率了,會用還遠遠不夠,咱得知道 CountDownLatch 的實現原理

源碼分析

CountDownLatch 是 AQS 實現中的最後一個內容,有了前序文章的知識鋪墊:

當你看到 CountDownLatch 的源碼內容,你會高興的笑起來,內容真是太少了

展開類結構所有內容就這點東西

既然 CountDownLatch 是基於 AQS 實現的,那確定也離不開對同步狀態變量 state 的操做,咱們在初始化的時候就將計數器的值賦值給了state

另外,它能夠多個線程同時獲取,那必定是基於共享式獲取同步變量的用法了,因此它須要經過重寫下面兩個方法控制同步狀態變量 state :

  • tryAcquireShared()
  • tryReleaseShared()

CountDownLatch 暴露給使用者的只有 await()countDown() 兩個方法,前者是阻塞本身,由於只有獲取同步狀態纔會纔會出現阻塞的狀況,那天然是在 await() 的方法內部會用到 tryAcquireShared();有獲取就要有釋放,那後者 countDown() 方法內部天然是要用到 tryReleaseShared() 方法了

PS:若是你對上面這個很天然的推斷理解有困難,強烈建議你看一下前序文章的鋪墊,以防止知識斷層帶來的困擾

await()

先來看 await() 方法, 從方法簽名上看,該方法會拋出 InterruptedException, 因此它是能夠響應中斷的,這個咱們在 Java多線程中斷機制 中明確說明過

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

其內部調用了同步器提供的模版方法 acquireSharedInterruptibly

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
      // 若是監測到中斷標識爲true,會重置標識,而後拋出 InterruptedException
    if (Thread.interrupted())
        throw new InterruptedException();
      // 調用重寫的 tryAcquireShared 方法,該方法結果若是大於零則直接返回,程序繼續向下執行,若是小於零,則會阻塞本身
    if (tryAcquireShared(arg) < 0)
          // state不等於0,則嘗試阻塞本身
        doAcquireSharedInterruptibly(arg);
}

重寫的 tryAcquireShared 方法很是簡單, 就是判斷同步狀態變量 state 的值是否爲 0, 若是爲零 (子線程已經所有執行完畢)則返回1, 不然返回 -1

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

若是子線程沒有所有執行完畢,則會經過 doAcquireSharedInterruptibly 方法阻塞本身,這個方法在 Java AQS共享式獲取同步狀態及Semaphore的應用分析 中已經仔細分析過了,這裏就再也不贅述了

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                  // 再次嘗試獲取同步裝阿嚏,若是大於0,說明子線程所有執行完畢,直接返回
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
              // 阻塞本身
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

await() 方法的實現就是這麼簡單,接下來看看 countDown() 的實現原理

countDown()

public void countDown() {
    sync.releaseShared(1);
}

一樣是調用同步器提供的模版方法 releaseShared

public final boolean releaseShared(int arg) {
      // 調用本身重寫的同步器方法
    if (tryReleaseShared(arg)) {
          // 喚醒調用 await() 被阻塞的線程
        doReleaseShared();
        return true;
    }
    return false;
}

重寫的 tryReleaseShared 一樣很簡單

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
          // 若是當前狀態值爲0,則直接返回 (1)
        if (c == 0)
            return false;
          // 使用 CAS 讓計數器的值減1 (2)
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

代碼 (1) 判斷當前同步狀態值,若是爲0 則直接返回 false;不然執行代碼 (2),使用 CAS 將計數器減1,若是 CAS 失敗,則循環重試,最終返回 nextc == 0 的結果值,若是該值返回 true,說明最後一個線程已調用 countDown() 方法,而後就要喚醒調用 await() 方法被阻塞的線程,一樣因爲分析過 AQS 的模版方法 doReleaseShared 整個釋放同步狀態以及喚醒的過程,因此這裏一樣再也不贅述了

仔細看CountDownLatch重寫的 tryReleaseShared 方法,有一點須要和你們說明:

代碼 (1) if (c == 0) 看似沒什麼用處,其實用處大大滴,若是沒有這個判斷,當計數器值已經爲零了,其餘線程再調用 countDown 方法會將計數器值變爲負值

如今就差 await(long timeout, TimeUnit unit) 方法沒介紹了

await(long timeout, TimeUnit unit)

public boolean await(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

該方法簽名一樣拋出 InterruptedException,意思可響應中斷。它其實就是 await() 更完善的一個版本,簡單來講就是

主線程設定等待超時時間,若是該時間內子線程沒有執行完畢,主線程也會 直接返回

咱們將上面的例子稍稍修改一下你就會明白(主線程超時時間設置爲 2 秒,而子線程要 sleep 5 秒)

@Slf4j
public class CountDownLatchTimeoutExample {

   private static CountDownLatch countDownLatch = new CountDownLatch(2);

   public static void main(String[] args) throws InterruptedException {
      // 這裏不推薦這樣建立線程池,最好經過 ThreadPoolExecutor 手動建立線程池
      ExecutorService executorService = Executors.newFixedThreadPool(2);

      executorService.submit(() -> {
         try {
            Thread.sleep(5000);
         } catch (InterruptedException e) {
            e.printStackTrace();
         } finally {
            log.info("Thread-1 執行完畢");
            //計數器減1
            countDownLatch.countDown();
         }
      });

      executorService.submit(() -> {
         try {
            Thread.sleep(5000);
         } catch (InterruptedException e) {
            e.printStackTrace();
         } finally {
            log.info("Thread-2 執行完畢");
            //計數器減1
            countDownLatch.countDown();
         }
      });

      log.info("主線程等待子線程執行完畢");
      log.info("計數器值爲:" + countDownLatch.getCount());
      countDownLatch.await(2, TimeUnit.SECONDS);
      log.info("計數器值爲:" + countDownLatch.getCount());
      log.info("主線程執行完畢");
      executorService.shutdown();
   }
}

運行結果以下:

形象化的展現上述示例的運行過程

小結

CountDownLatch 的實現原理就是這麼簡單,瞭解了整個實現過程後,你也許發現了使用 CountDownLatch 的一個問題:

計數器減 1 操做是 一次性的,也就是說當計數器減到 0, 再有線程調用 await() 方法,該線程會直接經過, 不會再起到等待其餘線程執行結果起到同步的做用了

爲了解決這個問題,貼心的 Doug Lea 大師早已給咱們準備好相應策略 CyclicBarrier

原本想將 CyclicBarrier 的內容放到下一個章節,可是 CountDownLatch 的內容着實有些少,不夠解渴,另外有對比才有傷害,因此內容沒結束,咱得繼續看 CyclicBarrier

CyclicBarrier

上面簡單說了一下 CyclicBarrier 被創造出來的理由,這裏先看一下它的字面解釋:

概念老是有些抽象,咱們將上面的例子用 CyclicBarrier 再作個改動,先讓你們有個直觀的使用概念

@Slf4j
public class CyclicBarrierExample {

   // 建立 CyclicBarrier 實例,計數器的值設置爲2
   private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

   public static void main(String[] args) {
      ExecutorService executorService = Executors.newFixedThreadPool(2);
      int breakCount = 0;

         // 將線程提交到線程池
      executorService.submit(() -> {
         try {
            log.info(Thread.currentThread() + "第一回合");
            Thread.sleep(1000);
            cyclicBarrier.await();

            log.info(Thread.currentThread() + "第二回合");
            Thread.sleep(2000);
            cyclicBarrier.await();

            log.info(Thread.currentThread() + "第三回合");
         } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
         } 
      });

      executorService.submit(() -> {
         try {
            log.info(Thread.currentThread() + "第一回合");
            Thread.sleep(2000);
            cyclicBarrier.await();

            log.info(Thread.currentThread() + "第二回合");
            Thread.sleep(1000);
            cyclicBarrier.await();

            log.info(Thread.currentThread() + "第三回合");
         } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
         }
      });

      executorService.shutdown();
   }

}

運行結果:

結合程序代碼與運行結果,咱們能夠看出,子線程執行完第一回合後(執行回合所需時間不一樣),都會調用 await() 方法,等全部線程都到達屏障點後,會突破屏障繼而執行第二回合,一樣的道理最終到達第三回合

形象化的展現上述示例的運行過程

看到這裏,你應該明白 CyclicBarrier 的基本用法,但隨之你心裏也應該有了一些疑問:

  1. 怎麼判斷全部線程都到達屏障點的?
  2. 突破某一屏障後,又是怎麼重置 CyclicBarrier 計數器,等待線程再一次突破屏障呢?

帶着這些問題咱們來看一看源碼

源碼分析

一樣先打開 CyclicBarrier 的類結構,展開類所有內容,其實也沒多少內容

從類結構中看到有:

  1. await() 方法,猜想應該和 CountDownLatch 是相似的,都是獲取同步狀態,阻塞本身
  2. ReentrantLock,CyclicBarrier 內部居然也用到了咱們以前講過的 ReentrantLock,猜想這個鎖必定保護 CyclicBarrier 的某個變量,那確定也是基於 AQS 相關知識了
  3. Condition,存在條件,猜想會有等待/通知機制的運用

咱們繼續帶着這些猜想,結合上面的實例代碼一點點來驗證

// 建立 CyclicBarrier 實例,計數器的值設置爲2
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

查看構造函數 (這裏的英文註釋捨不得刪掉,由於說的太清楚了,我來結合註釋來講明一下):

private final int parties;
private int count;

public CyclicBarrier(int parties) {
    this(parties, null);
}

    /**
     * Creates a new {@code CyclicBarrier} that will trip when the
     * given number of parties (threads) are waiting upon it, and which
     * will execute the given barrier action when the barrier is tripped,
     * performed by the last thread entering the barrier.
     *
     * @param parties the number of threads that must invoke {@link #await}
     *        before the barrier is tripped
     * @param barrierAction the command to execute when the barrier is
     *        tripped, or {@code null} if there is no action
     * @throws IllegalArgumentException if {@code parties} is less than 1
     */
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

根據註釋說明,parties 表明衝破屏障以前要觸發的線程總數,count 自己又是計數器,那問題來了

直接就用 count 不就能夠了嘛?爲啥一樣用於初始化計數器,要維護兩個變量呢?

從 parties 和 count 的變量聲明中,你也能看出一些門道,前者有 final 修飾,初始化後就不能夠改變了,由於 CyclicBarrier 的設計目的是能夠循環利用的,因此始終用 parties 來記錄線程總數,當 count 計數器變爲 0 後,若是沒有 parties 的值賦給它,怎麼進行從新複用再次計數呢,因此這裏維護兩個變量頗有必要

接下來就看看 await() 究竟是怎麼實現的

// 從方法簽名上能夠看出,該方法一樣能夠被中斷,另外還有一個 BrokenBarrierException 異常,咱們一會看
public int await() throws InterruptedException, BrokenBarrierException {
    try {
          // 調用內部 dowait 方法, 第一個參數爲 false,表示不設置超時時間,第二個參數也就沒了意義
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

接下來看看 dowait(false, 0L) 作了哪些事情 (這個方法內容有點多,別擔憂,邏輯並不複雜,請看關鍵代碼註釋)

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    // 還記得以前說過的 Lock 標準範式嗎? JDK 內部都是這麼使用的,你必定也要遵循範式
    lock.lock();
    try {
        final Generation g = generation;

          // broken 是靜態內部類 Generation惟一的一個成員變量,用於記錄當前屏障是否被打破,若是打破,則拋出 BrokenBarrierException 異常
          // 這裏感受挺困惑的,咱們要【衝破】屏障,這裏【打破】屏障卻拋出異常,注意我這裏的用詞
        if (g.broken)
            throw new BrokenBarrierException();

          // 若是線程被中斷,則會經過 breakBarrier 方法將 broken 設置爲true,也就是說,若是有線程收到中斷通知,直接就打破屏障,中止 CyclicBarrier, 並喚醒全部線程
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }
      
          // ************************************
          // 由於 breakBarrier 方法在這裏會被調用屢次,爲了便於你們理解,我直接將 breakBarrier 代碼插入到這裏
          private void breakBarrier() {
          // 將打破屏障標識 設置爲 true
          generation.broken = true;
          // 重置計數器
          count = parties;
          // 喚醒全部等待的線程
          trip.signalAll();
            }
          // ************************************

                // 每當一個線程調用 await 方法,計數器 count 就會減1
        int index = --count;
          // 當 count 值減到 0 時,說明這是最後一個調用 await() 的子線程,則會突破屏障
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                  // 獲取構造函數中的 barrierCommand,若是有值,則運行該方法
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                  // 激活其餘因調用 await 方法而被阻塞的線程,並重置 CyclicBarrier
                nextGeneration();
              
                // ************************************
                // 爲了便於你們理解,我直接將 nextGeneration 實現插入到這裏
                private void nextGeneration() {
                    // signal completion of last generation
                    trip.signalAll();
                    // set up next generation
                    count = parties;
                    generation = new Generation();
                }
                // ************************************
              
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

          // index 不等於0, 說明當前不是最後一個線程調用 await 方法
        // loop until tripped, broken, interrupted, or timed out
        for (;;) {
            try {
                  // 沒有設置超時時間
                if (!timed)
                      // 進入條件等待
                    trip.await();
                else if (nanos > 0L)
                      // 不然,判斷超時時間,這個咱們在 AQS 中有說明過,包括爲何最後超時閾值 spinForTimeoutThreshold 再也不比較的緣由,你們會看就好
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                  // 條件等待被中斷,則判斷是否有其餘線程已經使屏障破壞。若沒有則進行屏障破壞處理,並拋出異常;不然再次中斷當前線程

                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken)
                throw new BrokenBarrierException();

              // 若是新一輪迴環結束,會經過 nextGeneration 方法新建 generation 對象
            if (g != generation)
                return index;

            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

doWait 就是 CyclicBarrier 的核心邏輯, 能夠看出,該方法入口使用了 ReentrantLock,這也就是爲何 Generation broken 變量沒有被聲明爲 volatile 類型保持可見性,由於對其的更改都是在鎖的內部,一樣在鎖的內部對計數器 count 作更新,也保證了原子性

doWait 方法中,是經過 nextGeneration 方法來從新初始化/重置 CyclicBarrier 狀態的,該類中還有一個 reset() 方法,也是重置 CyclicBarrier 狀態的

public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        breakBarrier();   // break the current generation
        nextGeneration(); // start a new generation
    } finally {
        lock.unlock();
    }
}

但 reset() 方法並無在 CyclicBarrier 內部被調用,顯然是給 CyclicBarrier 使用者來調用的,那問題來了

何時調用 reset() 方法呢

正常狀況下,CyclicBarrier 是會被自動重置狀態的,從 reset 的方法實現中能夠看出調用了 breakBarrier

方法,也就是說,調用 reset 會使當前處在等待中的線程最終拋出 BrokenBarrierException 並當即被喚醒,因此說 reset() 只會在你想打破屏障時纔會使用

上述示例,咱們構建 CyclicBarrier 對象時,並無傳遞 barrierCommand 對象, 咱們修改示例傳入一個 barrierCommand 對象,看看會有什麼結果:

// 建立 CyclicBarrier 實例,計數器的值設置爲2
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> {
   log.info("所有運行結束");
});

運行結果:

從運行結果中來看,每次衝破屏障後都會執行 CyclicBarrier 初始化 barrierCommand 的方法, 這與咱們對 doWait() 方法的分析徹底吻合,從上面的運行結果中能夠看出,最後一個線程是運行 barrierCommand run() 方法的線程,咱們再來形象化的展現一下整個過程

從上圖能夠看出,barrierAction 與每次突破屏障是串行化的執行過程,假如 barrierAction 是很耗時的彙總操做,那這就是能夠優化的點了,咱們繼續修改代碼

// 建立單線程線程池
private static Executor executor = Executors.newSingleThreadExecutor();

// 建立 CyclicBarrier 實例,計數器的值設置爲2
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> {
   executor.execute(() -> gather());
});

private static void gather() {
   try {
      Thread.sleep(2000);
   } catch (InterruptedException e) {
      e.printStackTrace();
   }
   log.info("所有運行結束");
}

咱們這裏將 CyclicBarrier 的回調函數 barrierAction使用單線程的線程池,這樣最後一個衝破屏障的線程就不用等待 barrierAction 的執行,直接分配個線程池裏的線程異步執行,進一步提高效率

運行結果以下:

咱們再形象化的看一下整個過程:

這裏使用了單一線程池,增長了並行操做,提升了程序運行效率,那問題來了:

若是 barrierAction 很是很是耗時,衝破屏障的任務就可能堆積在單一線程池的等待隊列中,就存在 OOM 的風險,那怎麼辦呢?

這是就要須要必定的限流策略或者使用線程池的拒絕的略等

那把單一線程池換成非單一的固定線程池不就能夠了嘛?好比 fixed(5)

乍一看確實能緩解單線程池可能引發的任務堆積問題,上面代碼咱們看到的 gather() 方法,假如該方法內部沒有使用鎖或者說存在竟態條件,那 CyclicBarrier 的回調函數 barrierAction 使用多線程一定引發結果的不許確

因此在實際使用中還要結合具體的業務場景不斷優化代碼,使之更加健壯

總結

本文講解了 CountDownLatch 和 CyclicBarrier 的經典使用場景以及實現原理,以及在使用過程當中可能會遇到的問題,好比將大的 list 拆分做業就能夠用到前者,讀取多個 Excel 的sheet 頁,最後進行結果彙總就能夠用到後者 (文中完整示例代碼已上傳)

最後,再形象化的比喻一下

  • CountDownLatch 主要用來解決一個線程等待多個線程的場景,能夠類比旅遊團團長要等待全部遊客到齊才能去下一個景點
  • 而 CyclicBarrier 是一組線程之間的相互等待,能夠類比幾個驢友之間的不離不棄,共同到達某個地方,再繼續出發,這樣反覆

靈魂追問

  1. 怎樣拿到 CyclicBarrier 的彙總結果呢?
  2. 線程池中的 Future 特性你有使用過嗎?

接下來,我們就聊聊那些可使用的 Future 特性

參考

  1. Java 併發編程實戰
  2. Java 併發編程的藝術
  3. Java 併發編程之美
  4. When to reset CyclicBarrier in java multithreading

日拱一兵 | 原創

相關文章
相關標籤/搜索