CyclicBarrier是一個同步輔助類,它容許一組線程相互等待,直到到達某個公共屏障點(common barrier point)。經過它能夠完成多個線程之間相互等待,只有當每一個線程都準備就緒後,才能各自繼續往下執行後面的操做。html
CyclicBarrier經過計數器來實現的。當某個線程調用await方法時,該線程進入等待狀態,計數器加1,當計數器的值達到設置的初始值時,全部因調用await進入等待狀態的線程被喚醒,繼續執行後續操做。由於CycliBarrier在釋放等待線程後能夠重用,因此稱爲循環barrier。CycliBarrier支持一個可選的Runnable,在計數器的值到達設定值後(但在釋放全部線程以前),該Runnable運行一次,注,Runnable在每一個屏障點只運行一個。java
CyclicBarrier與CountDownLatch本質上都是依賴於volatile和CAS實現,二者比較以下:app
|CyclicBarrier|CountDownLatch| |:|:| |一個線程(或者多個),等待另外N個線程完成某個事情以後才能執執行|N個線程相互等待,任何一個線程完成以前,全部的線程都必須等待。| |一次性的|能夠重複使用| |基於AQS|基於鎖和Condition|dom
賽跑時,等待全部人都準備好時,才起跑:oop
public static void main(String[] args) throws InterruptedException { //若是將參數改成4,可是下面barrier的計數器的值爲3,這將永遠等待下去 CyclicBarrier barrier = new CyclicBarrier(3); Runnable runnable = ()->{ try { Thread.sleep(1000 * (new Random()).nextInt(8)); System.out.println(Thread.currentThread().getName() + " 準備好了..."); //barrier的計數器會加1而且全部當前線程會進入等待狀態 barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " 起跑!"); }; ExecutorService executor = Executors.newFixedThreadPool(3);//將固定線程的大小設置爲1或者2觀察運行結果 executor.submit(new Thread(runnable)); executor.submit(new Thread(runnable)); executor.submit(new Thread(runnable)); executor.shutdown(); }
//parties表示屏障攔截的線程數量,當屏障撤銷時,先執行barrierAction,而後在釋放全部線程 public CyclicBarrier(int parties, Runnable barrierAction) //barrierAction默認爲null public CyclicBarrier(int parties) /* * 當前線程等待直到全部線程都調用了該屏障的await()方法 * 若是當前線程不是將到達的最後一個線程,將會被阻塞。解除阻塞的狀況有如下幾種 * 1)最後一個線程調用await() * 2)當前線程被中斷 3)其餘正在該CyclicBarrier上等待的線程被中斷 4)其餘正在該CyclicBarrier上等待的線程超時 5)其餘某個線程調用該CyclicBarrier的reset()方法 * 若是當前線程在進入此方法時已經設置了該線程的中斷狀態或者在等待時被中斷, * 將拋出InterruptedException,而且清除當前線程的已中斷狀態。 * 若是在線程處於等待狀態時barrier被reset()或者在調用await()時 barrier 被損壞, * 將拋出 BrokenBarrierException 異常。 * 若是任何線程在等待時被中斷,則其餘全部等待線程都將拋出 * BrokenBarrierException 異常,並將 barrier 置於損壞狀態。 * 若是當前線程是最後一個將要到達的線程,而且構造方法中提供了一個非空的屏障操做 * (barrierAction),那麼在容許其餘線程繼續運行以前,當前線程將運行該操做。 * 若是在執行屏障操做過程當中發生異常,則該異常將傳播到當前線程中, * 並將 barrier 置於損壞狀態。 * * 返回值爲當前線程的索引,0表示當前線程是最後一個到達的線程 */ public int await() throws InterruptedException, BrokenBarrierException //在await()的基礎上增長超時機制,若是超出指定的等待時間,則拋出 TimeoutException 異常。若是該時間小於等於零,則此方法根本不會等待。 public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException //將屏障重置爲其初始狀態。若是全部參與者目前都在屏障處等待,則它們將返回,同時拋出一個BrokenBarrierException。 public void reset()
對於失敗的同步嘗試,CyclicBarrier 使用了一種要麼所有要麼全不 (all-or-none) 的破壞模式:若是由於中斷、失敗或者超時等緣由,致使線程過早地離開了屏障點,那麼在該屏障點等待的其餘全部線程也將經過 BrokenBarrierException(若是它們幾乎同時被中斷,則用 InterruptedException)以反常的方式離開。this
CyclicBarrier基於ReentrantLock和Condition機制實現。除了getParties()方法,CyclicBarrier的其餘方法都須要獲取鎖。.net
在CyclicBarrier的內部定義了一個Lock對象,每當一個線程調用await方法時,將攔截的線程數減1,而後判斷剩餘攔截數是否爲初始值parties,若是不是,進入Lock對象的條件隊列等待。若是是,執行barrierAction對象的Runnable方法,而後將鎖的條件隊列中的全部線程放入鎖等待隊列中,這些線程會依次的獲取鎖、釋放鎖。線程
public class CyclicBarrier { private final ReentrantLock lock = new ReentrantLock();//重入鎖 private final Condition trip = lock.newCondition();//等待條件 private final int parties;//攔截的線程數量 private final Runnable barrierCommand; //當屏障撤銷時,須要執行的屏障操做 private Generation generation = new Generation(); //當前的Generation。每當屏障失效或者開閘以後都會自動替換掉。從而實現重置的功能 //還能阻塞的線程數(即parties-當前阻塞的線程數),當新建generation或generation被破壞時,count會被重置。由於對Count的操做都是在獲取鎖以後,因此不須要其餘同步措施。 private int count; private static class Generation { boolean broken = false;//當前屏障是否被破壞 }
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation;//保存此時的generation if (g.broken)//若是當前屏障被破壞則拋出異常 throw new BrokenBarrierException(); //判斷線程是否被中斷,若是被中斷,調用breakBarrier()進行屏障破壞處理,並拋出InterruptedException if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } //剩餘count遞減,並賦值給線程索引,做爲方法的返回值 int index = --count; //若是線程索引將爲0,說明當前線程是最後一個到達的線程。執行可能存在的屏障操做 barrierCommand,設置下一個Generation。至關於每次開閘以後都進行了一次reset。 if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run();//同步執行barrierCommand ranAction = true; nextGeneration();//執行成功設置下一個nextGeneration return 0; } finally { if (!ranAction)//若是barrierCommand執行失敗,進行屏障破壞處理 breakBarrier(); } } //若是當前線程不是最後一個到達的線程 // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) trip.await();//若是沒有超時調用Condition的await()方法阻塞 else if (nanos > 0L) nanos = trip.awaitNanos(nanos);//設置超時時間,調用Condition的awaitNanos()方法阻塞 } catch (InterruptedException ie) { //若是當前線程被中斷,則判斷是否有其餘線程已經使屏障破壞。若沒有則進行屏障破壞處理,並拋出異常;不然再次中斷當前線程 if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { //這種捕獲了InterruptException以後調用Thread.currentThread().interrupt()是一種通用的方式。其實就是爲了保存中斷狀態,從而讓其餘更高層次的代碼注意到這個中斷。 Thread.currentThread().interrupt(); } } //若是屏障被破壞,當前線程拋BrokenBarrierException if (g.broken) throw new BrokenBarrierException(); //若是已經換代,直接返回index(last thread已經執行的nextGeneration,但當前線程尚未執行到該語句) if (g != generation) return index; //超時,進行屏障破壞處理,並拋TimeoutException if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock();//釋放鎖 } }
//將當前屏障置爲破壞狀態、重置count、並喚醒全部被阻塞的線程。 //必須先獲取鎖,才能調用此方法 private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); }
//喚醒trip上等待的全部線程,設置下一個Generation private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); }
//重置屏障,先進行屏障破壞處理,再設置下一代generation public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // break the current generation nextGeneration(); // start a new generation } finally { lock.unlock(); } }
參考地址:code