源碼:html
package java.util.concurrent; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class CyclicBarrier { //使用ReentrantLock可重入獨佔鎖 private final ReentrantLock lock = new ReentrantLock(); //建立一個條件隊列 private final Condition trip = lock.newCondition(); //經過構造器傳入的參數.表示總的等待線程的數量 private final int parties; //當屏障正常打開後運行的程序,經過最後一個調用await的線程來執行 private final Runnable barrierCommand; //當前的Generation。每當屏障失效或者開閘以後都會自動替換掉。從而實現重置的功能 private Generation generation = new Generation(); //實際仍在等待的線程數.當有一個線程到達屏障點,count值就會減一;當一次新的運算開始後,count的值被重置爲parties private int count; //內部類 private static class Generation { boolean broken = false;//表示當前的屏障是否被打破 } //建立一個CyclicBarrier實例,parties指定參與相互等待的線程數 public CyclicBarrier(int parties) { this(parties, null); } //建立一個CyclicBarrier實例,parties指定參與相互等待的線程數 //barrierAction指定當全部線程到達屏障點以後,首先執行的操做,該操做由最後一個進入屏障點的線程執行。 public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } //返回參與相互等待的線程數 public int getParties() { return parties; } //該方法被調用時表示當前線程已經到達屏障點,當前線程阻塞進入休眠狀態 //直到全部線程都到達屏障點,當前線程纔會被喚醒 public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); } } //該方法被調用時表示當前線程已經到達屏障點,當前線程阻塞進入休眠狀態 //在timeout指定的超時時間內,等待其餘參與線程到達屏障點 //若是超出指定的等待時間,則拋出TimeoutException異常,若是該時間小於等於零,則此方法根本不會等待 public int await(long timeout, TimeUnit unit) throws InterruptedException,BrokenBarrierException,TimeoutException { return dowait(true, unit.toNanos(timeout)); } //該方法被調用時表示當前線程已經到達屏障點,當前線程阻塞進入休眠狀態 //在timeout指定的超時時間內,等待其餘參與線程到達屏障點 //若是超出指定的等待時間,則拋出TimeoutException異常,若是該時間小於等於零,則此方法根本不會等待 private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken)//若是當前Generation是處於打破狀態則傳播這個BrokenBarrierExcption throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier();//若是當前線程被中斷則使得當前generation處於打破狀態,重置剩餘count,而且喚醒狀態變量.這時候其餘線程會傳播BrokenBarrierException throw new InterruptedException(); } int index = --count;//嘗試下降當前count if (index == 0) { boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; //當全部參與的線程都到達屏障點,當即去喚醒全部處於休眠狀態的線程,恢復執行 nextGeneration(); return 0; } finally { if (!ranAction)//若是運行command失敗也會致使當前屏障被打破 breakBarrier(); } } for (;;) { try { if (!timed) //讓當前執行的線程阻塞,處於休眠狀態 trip.await(); else if (nanos > 0L) //讓當前執行的線程阻塞,在超時時間內處於休眠狀態 nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } } //喚醒全部處於休眠狀態的線程,恢復執行 //重置count值爲parties //重置中斷狀態爲false private void nextGeneration() { trip.signalAll(); count = parties; generation = new Generation(); } //喚醒全部處於休眠狀態的線程,恢復執行 //重置count值爲parties //重置中斷狀態爲true private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); } //判斷此屏障是否處於中斷狀態。 //若是由於構造或最後一次重置而致使中斷或超時,從而使一個或多個參與者擺脫此屏障點,或由於異常而致使某個屏障操做失敗,則返回true;不然返回false public boolean isBroken() { final ReentrantLock lock = this.lock; lock.lock(); try { return generation.broken; } finally { lock.unlock(); } } //將屏障重置爲其初始狀態 public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { //喚醒全部等待的線程繼續執行,並設置屏障中斷狀態爲true breakBarrier(); //喚醒全部等待的線程繼續執行,並設置屏障中斷狀態爲false nextGeneration(); } finally { lock.unlock(); } } //返回當前在屏障處等待的參與者數目,此方法主要用於調試和斷言 public int getNumberWaiting() { final ReentrantLock lock = this.lock; lock.lock(); try { return parties - count; } finally { lock.unlock(); } } }
extends Objectjava
一個同步輔助類:它容許一組線程互相等待,直到到達某個公共屏障點 (common barrier point)。api
在涉及一組固定大小的線程的程序中,這些線程必須不時地互相等待,此時 CyclicBarrier 頗有用。函數
由於該 barrier 在釋放等待線程後能夠重用,因此稱它爲循環 的 barrier。this
CyclicBarrier 支持一個可選的 Runnable
命令,在一組線程中的最後一個線程到達以後(但在釋放全部線程以前),該命令只在每一個屏障點運行一次。若在繼續全部參與線程以前更新共享狀態,此屏障操做 頗有用。spa
構造方法摘要.net
CyclicBarrier(int parties) 建立一個新的 CyclicBarrier,它將在給定數量的參與者(線程)處於等待狀態時啓動,但它不會在啓動 barrier 時執行預約義的操做。 |
CyclicBarrier(int parties, Runnable barrierAction) 建立一個新的 CyclicBarrier,它將在給定數量的參與者(線程)處於等待狀態時啓動,並在啓動 barrier 時執行給定的屏障操做,該操做由最後一個進入 barrier 的線程執行。 |
方法摘要線程
int |
await() 在全部參與者都已經在此 barrier 上調用 await 方法以前,將一直等待。 |
int |
await(long timeout, TimeUnit unit) 在全部參與者都已經在此屏障上調用 await 方法以前將一直等待,或者超出了指定的等待時間。 |
int |
getNumberWaiting() 返回當前在屏障處等待的參與者數目。 |
int |
getParties() 返回要求啓動此 barrier 的參與者數目。 |
boolean |
isBroken() 查詢此屏障是否處於損壞狀態。 |
void |
reset() 將屏障重置爲其初始狀態。 |
public CyclicBarrier(int parties,Runnable barrierAction)
建立一個新的 CyclicBarrier,它將在給定數量的參與者(線程)處於等待狀態時啓動,並在啓動 barrier 時執行給定的屏障操做,該操做由最後一個進入 barrier 的線程執行。調試
參數:code
parties
- 在啓動 barrier 前必須調用 await()
的線程數
barrierAction
- 在啓動 barrier 時執行的命令;若是不執行任何操做,則該參數爲 null
拋出:
IllegalArgumentException
- 若是 parties
小於 1
public CyclicBarrier(int parties)
建立一個新的 CyclicBarrier,它將在給定數量的參與者(線程)處於等待狀態時啓動,但它不會在啓動 barrier 時執行預約義的操做。
參數:
parties
- 在啓動 barrier 前必須調用 await()
的線程數
拋出:
IllegalArgumentException
- 若是 parties
小於 1
public int getParties()
返回要求啓動此 barrier 的參與者數目。
返回:
要求啓動此 barrier 的參與者數目
public int await() throws InterruptedException, BrokenBarrierException
在全部 參與者都已經在此 barrier 上調用 await 方法以前,將一直等待。
若是當前線程不是將到達的最後一個線程,出於調度目的,將禁用它,且在發生如下狀況之一前,該線程將一直處於休眠狀態:
reset()
。若是當前線程:
則拋出 InterruptedException
,而且清除當前線程的已中斷狀態。
若是在線程處於等待狀態時 barrier 被 reset()
,或者在調用 await 時 barrier 被損壞,抑或任意一個線程正處於等待狀態,則拋出 BrokenBarrierException
異常。
若是任何線程在等待時被 中斷,則其餘全部等待線程都將拋出 BrokenBarrierException
異常,並將 barrier 置於損壞狀態。
若是當前線程是最後一個將要到達的線程,而且構造方法中提供了一個非空的屏障操做,則在容許其餘線程繼續運行以前,當前線程將運行該操做。若是在執行屏障操做過程當中發生異常,則該異常將傳播到當前線程中,並將 barrier 置於損壞狀態。
返回:
到達的當前線程的索引,其中,索引 getParties()
- 1 指示將到達的第一個線程,零指示最後一個到達的線程
拋出:
InterruptedException
- 若是當前線程在等待時被中斷
BrokenBarrierException
- 若是 另外一個 線程在當前線程等待時被中斷或超時,或者重置了 barrier,或者在調用 await
時 barrier 被損壞,抑或因爲異常而致使屏障操做(若是存在)失敗。
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException
在全部 參與者都已經在此屏障上調用 await 方法以前將一直等待,或者超出了指定的等待時間。
若是當前線程不是將到達的最後一個線程,出於調度目的,將禁用它,且在發生如下狀況之一前,該線程將一直處於休眠狀態:
reset()
。若是當前線程,在如下狀況中的一種時:
則拋出 InterruptedException
,而且清除當前線程的已中斷狀態。
若是超出指定的等待時間,則拋出 TimeoutException
異常。若是該時間小於等於零,則此方法根本不會等待。
若是在線程處於等待狀態時 barrier 被 reset()
,或者在調用 await 時 barrier 被損壞,抑或任意一個線程正處於等待狀態,則拋出 BrokenBarrierException
異常。
若是任何線程在等待時被中斷,則其餘全部等待線程都將拋出 BrokenBarrierException
,並將屏障置於損壞狀態。
若是當前線程是最後一個將要到達的線程,而且構造方法中提供了一個非空的屏障操做,則在容許其餘線程繼續運行以前,當前線程將運行該操做。若是在執行屏障操做過程當中發生異常,則該異常將傳播到當前線程中,並將 barrier 置於損壞狀態。
參數:
timeout
- 等待 barrier 的時間
unit
- 超時參數的時間單位
返回:
到達的當前線程的索引,其中,索引 getParties()
- 1 指示第一個將要到達的線程,零指示最後一個到達的線程
拋出:
InterruptedException
- 若是當前線程在等待時被中斷
TimeoutException
- 若是超出了指定的超時時間
BrokenBarrierException
- 若是 另外一個 線程在當前線程等待時被中斷或超時,或者重置了 barrier,或者調用 await
時 barrier 被損壞,抑或因爲異常而致使屏障操做(若是存在)失敗。
public boolean isBroken()
查詢此屏障是否處於損壞狀態。
返回:
若是屢次調用構造函數或者使用重置函數reset(),在屏障等待的參與者的等待狀態會被中斷或超時,從而拋出異常。由於異常而致使某個屏障操做失敗,則返回 true
;不然返回 false
。
public void reset()
將屏障重置爲其初始狀態。若是全部參與者目前都在屏障處等待,則它們將返回,同時拋出一個 BrokenBarrierException
。
注意:在因爲其餘緣由形成損壞(broken)以後,實行重置可能會變得很複雜;此時須要使用其餘方式從新同步線程,並選擇其中一個線程來執行重置。與爲後續使用建立一個新 barrier 相比,這種方法可能更好一些。
public int getNumberWaiting()
返回當前在屏障處等待的參與者數目。此方法主要用於調試和斷言。
返回:
當前阻塞在 await()
中的參與者數目。
package com.thread; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class CyclicBarrierTest1 extends Thread { private static int SIZE = 5; private static CyclicBarrier cb; public void run() { try { System.out.println(Thread.currentThread().getName() + " wait for CyclicBarrier."); // CyclicBarrier的count減1,若count等於0,則喚醒在屏障處等待的全部線程 cb.await(); System.out.println(Thread.currentThread().getName() + " continued."); } catch (BrokenBarrierException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { cb = new CyclicBarrier(SIZE); // 新建5個任務 for (int i = 0; i < SIZE; i++) new CyclicBarrierTest1().start(); } }
運行結果:
Thread-1 wait for CyclicBarrier.
Thread-3 wait for CyclicBarrier.
Thread-0 wait for CyclicBarrier.
Thread-2 wait for CyclicBarrier.
Thread-4 wait for CyclicBarrier.
Thread-4 continued.
Thread-3 continued.
Thread-2 continued.
Thread-1 continued.
Thread-0 continued.
package com.thread; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class CyclicBarrierTest2 extends Thread { private static int SIZE = 5; private static CyclicBarrier cb; public void run() { try { System.out.println(Thread.currentThread().getName() + " wait for CyclicBarrier."); // CyclicBarrier的count減1,若count等於0,則喚醒在屏障處等待的全部線程 cb.await(); System.out.println(Thread.currentThread().getName() + " continued."); } catch (BrokenBarrierException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { cb = new CyclicBarrier(SIZE, new Runnable() { public void run() {//當其餘的線程都已達到barrier,先執行當前任務,再讓其餘線程繼續執行 System.out.println("CyclicBarrier's parties is: " + cb.getParties()); } }); // 新建5個任務 for (int i = 0; i < SIZE; i++) new CyclicBarrierTest1().start(); } }
運行結果:
Thread-0 wait for CyclicBarrier. Thread-2 wait for CyclicBarrier. Thread-1 wait for CyclicBarrier. Thread-4 wait for CyclicBarrier. Thread-3 wait for CyclicBarrier. CyclicBarrier's parties is: 5 Thread-3 continued. Thread-1 continued. Thread-4 continued. Thread-0 continued. Thread-2 continued.