前面一篇文章咱們《Java高併發編程基礎三大利器之CountDownLatch》它有一個缺點,就是它的計數器只可以使用一次,也就是說當計數器(state
)減到爲 0
的時候,若是 再有線程調用去 await
() 方法,該線程會直接經過,不會再起到等待其餘線程執行結果起到同步的做用。爲了解決這個問題CyclicBarrier
就應運而生了。html
CyclicBarrier
是什麼?把它拆開來翻譯就是循環(Cycle
)和屏障(Barrier
)
它的主要做用其實和CountDownLanch
差很少,都是讓一組線程到達一個屏障時被阻塞,直到最後一個線程到達屏障時,屏障會被打開,全部被屏障阻塞的線程纔會繼續執行,不過它是能夠循環執行的,這是它與CountDownLanch
最大的不一樣。CountDownLanch
是隻有當最後一個線程把計數器置爲0
的時候,其餘阻塞的線程纔會繼續執行。學習CyclicBarrier
以前建議先去看看這幾篇文章:java
咱們首先先來看下關於使用CyclicBarrier
的一個demo
:好比遊戲中有個關卡的時候,每次進入下一關的時候都須要進行加載一些地圖、特效背景音樂什麼的只有所有加載完了纔可以進行遊戲:編程
/**demo 來源https://blog.csdn.net/lstcui/article/details/107389371 * 公衆號【java金融】 */ public class CyclicBarrierExample { static class PreTaskThread implements Runnable { private String task; private CyclicBarrier cyclicBarrier; public PreTaskThread(String task, CyclicBarrier cyclicBarrier) { this.task = task; this.cyclicBarrier = cyclicBarrier; } @Override public void run() { for (int i = 0; i < 4; i++) { Random random = new Random(); try { Thread.sleep(random.nextInt(1000)); System.out.println(String.format("關卡 %d 的任務 %s 完成", i, task)); cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } } public static void main(String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> { System.out.println("本關卡全部的前置任務完成,開始遊戲... ..."); }); new Thread(new PreTaskThread("加載地圖數據", cyclicBarrier)).start(); new Thread(new PreTaskThread("加載人物模型", cyclicBarrier)).start(); new Thread(new PreTaskThread("加載背景音樂", cyclicBarrier)).start(); } } }
輸出結果以下:
咱們能夠看到每次遊戲開始都會等當前關卡把遊戲的人物模型,地圖數據、背景音樂加載完成後纔會開始進行遊戲。而且仍是能夠循環控制的。併發
/** The lock for guarding barrier entry */ private final ReentrantLock lock = new ReentrantLock(); /** Condition to wait on until tripped */ private final Condition trip = lock.newCondition(); /** The number of parties */ private final int parties; /* The command to run when tripped */ private final Runnable barrierCommand; /** The current generation */ private Generation generation = new Generation();
CyclicBarrier
重複利用,每當await
達到最大次數的時候,就會從新new
一個,表示進入了下一個輪迴。裏面只有一個boolean
型屬性,用來表示當前輪迴是否有線程中斷。await
方法app
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } /** * Main barrier code, covering the various policies. */ private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { //獲取barrier當前的 「代」也就是當前循環 final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } // 每來一個線程調用await方法都會進行減1 int index = --count; if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; // new CyclicBarrier 傳入 的barrierCommand, command.run()這個方法是同步的,若是耗時比較多的話,是否執行的時候須要考慮下是否異步來執行。 if (command != null) command.run(); ranAction = true; // 這個方法1. 喚醒全部阻塞的線程,2. 重置下count(count 每來一個線程都會進行減1)和generation,以便於下次循環。 nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { // 進入if條件,說明是不帶超時的await if (!timed) // 當前線程會釋放掉lock,而後進入到trip條件隊列的尾部,而後掛起本身,等待被喚醒。 trip.await(); else if (nanos > 0L) //說明當前線程調用await方法時 是指定了 超時時間的! nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { //Node節點在 條件隊列內 時 收到中斷信號時 會拋出中斷異常! //g == generation 成立,說明當前代並無變化。 //! g.broken 當前代若是沒有被打破,那麼當前線程就去打破,而且拋出異常.. if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. //執行到else有幾種狀況? //1.代發生了變化,這個時候就不須要拋出中斷異常了,由於 代已經更新了,這裏喚醒後就走正常邏輯了..只不過設置下 中斷標記。 //2.代沒有發生變化,可是代被打破了,此時也不用返回中斷異常,執行到下面的時候會拋出 brokenBarrier異常。也記錄下中斷標記位。 Thread.currentThread().interrupt(); } } //喚醒後,執行到這裏,有幾種狀況? //1.正常狀況,當前barrier開啓了新的一代(trip.signalAll()) //2.當前Generation被打破,此時也會喚醒全部在trip上掛起的線程 //3.當前線程trip中等待超時,而後主動轉移到 阻塞隊列 而後獲取到鎖 喚醒。 if (g.broken) throw new BrokenBarrierException(); //喚醒後,執行到這裏,有幾種狀況? //1.正常狀況,當前barrier開啓了新的一代(trip.signalAll()) //2.當前線程trip中等待超時,而後主動轉移到 阻塞隊列 而後獲取到鎖 喚醒。 if (g != generation) return index; //喚醒後,執行到這裏,有幾種狀況? //.當前線程trip中等待超時,而後主動轉移到 阻塞隊列 而後獲取到鎖 喚醒。 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
到了這裏咱們是否是能夠知道爲啥CyclicBarrier
能夠進行循環計數?
CyclicBarrier
採用一個內部類Generation
來維護當前循環,每個await
方法都會存儲當前的generation
,獲取到相同generation
對象的屬於同一組,每當count
的次數耗盡就會從新new
一個Generation
而且從新設置count
的值爲parties
,表示進入下一次新的循環。
從這個await
方法咱們是否是能夠知道只要有一個線程被中斷了,當代的 generation
的broken
就會被設置爲true
,因此會致使其餘的線程也會被拋出BrokenBarrierException
。至關於一個失敗其餘也必須失敗,感受有「強一致性「的味道。dom
CountDownLanch
是爲計數器是設置一個值,當屢次執行countdown
後,計數器減爲0
的時候全部線程被喚醒,而後CountDownLanch
失效,只可以使用一次。CyclicBarrier
是當count
爲0
時一樣喚醒所有線程,同時會從新設置count
爲parties
,從新new
一個generation
來實現重複利用。