CyclicBarrier字面意思是循環屏障,它能夠實現線程間的計數等待。當線程到達屏障點時會依次進入等待狀態,直到最後一個線程進入屏障點時會喚醒等待的線程繼續運行。ide
CyclicBarrier和CountDownLatch相似,區別在於CountDownLatch只能使用一次,當計數器歸零後,CountDownLatch的await等方法都會直接返回。而CyclicBarrier是能夠重複使用的,當計數器歸零後,計數器和CyclicBarrier狀態都會被重置。this
CyclicBarrier(int parties):建立CyclicBarrier,指定計數器值(等待線程數量)。線程
CyclicBarrier(int parties, Runnable barrierAction):建立CyclicBarrier,指定計數器值(等待線程數量)和計數器歸零後(最後一個線程到達)要執行的任務。遊戲
await():阻塞當前線程,直到計數器歸零被喚醒或者線程被中斷。ip
await(long timeout, TimeUnit unit):阻塞當前線程,直到計數器歸零被喚醒、線程被中斷或者超時返回。get
等待全部玩家準備就緒,遊戲纔開始,每一輪遊戲的開始意味着CyclicBarrier已經重置,能夠開始新一輪的計數。同步
public class Demo { public static void main(String[] args) { //建立CyclicBarrier並指定計數器值爲5,以及計數器爲0後要執行的任務 CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> { System.out.println("---遊戲開始---"); System.out.println("---五票同意,遊戲結束---"); }); Runnable runnable = () -> { //重複使用CyclicBarrier5次 for(int i = 0; i < 5; i++){ System.out.println(Thread.currentThread().getName() + ":準備就緒"); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }; Thread thread1 = new Thread(runnable, "一號玩家"); Thread thread2 = new Thread(runnable, "二號玩家"); Thread thread3 = new Thread(runnable, "三號玩家"); Thread thread4 = new Thread(runnable, "四號玩家"); Thread thread5 = new Thread(runnable, "五號玩家"); thread1.start(); thread2.start(); thread3.start(); thread4.start(); thread5.start(); } } /* * 循環輸出5次 * 輸出結果: * 一號玩家:準備就緒 * 三號玩家:準備就緒 * 二號玩家:準備就緒 * 五號玩家:準備就緒 * 四號玩家:準備就緒 * ---遊戲開始--- * ---五票同意,遊戲結束--- * 三號玩家:準備就緒 * 一號玩家:準備就緒 * 五號玩家:準備就緒 * ...... */
在使用CyclicBarrier中,假設總的等待線程數量爲5,如今其中一個線程被中斷了,被中斷的線程將拋出InterruptedException異常,而其餘4個線程將拋出BrokenBarrierException異常。源碼
BrokenBarrierException異常表示當前的CyclicBarrier已經破損,可能不能等待全部線程到齊了,避免其餘線程永久的等待。it
CyclicBarrier是基於顯式鎖ReentrantLock來實現的,CyclicBarrier不少方法都使用顯式鎖作了同步處理,await方法的等待喚醒也是經過Condition實現的。io
CyclicBarrier的成員變量:
//顯式鎖 private final ReentrantLock lock = new ReentrantLock(); //用於顯式鎖的Condition private final Condition trip = lock.newCondition(); //線程數量 private final int parties; //當全部線程到達屏障點後執行的任務 private final Runnable barrierCommand; //Generation內部有一個broken變量,用於標識CyclicBarrier是否破損 private Generation generation = new Generation(); //用於遞減的線程數量,在每一輪結束後會被重置爲parties private int count;
await方法裏是調用的dowait方法,dowait方法源碼:
private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; //若是CyclicBarrier已破損,則拋出BrokenBarrierException異常 if (g.broken) throw new BrokenBarrierException(); //若是當前線程已經中斷,則將CyclicBarrier標記爲已破損並拋出InterruptedException異常 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; //index == 0表示全部線程都到達了屏障點 if (index == 0) { // tripped boolean ranAction = false; try { //執行線程到齊後須要執行的任務 final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; //喚醒全部等待的線程並重置CyclicBarrier nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } //線程沒到齊,阻塞當前線程 for (;;) { try { //不帶超時時間的等待 if (!timed) trip.await(); //帶超時時間的等待 else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
nextGeneration方法:
private void nextGeneration() { //喚醒全部等待的線程 trip.signalAll(); //重置CyclicBarrier count = parties; generation = new Generation(); }