JDK中爲了處理線程之間的同步問題,除了提供鎖機制以外,還提供了幾個很是有用的併發工具類:CountDownLatch、CyclicBarrier、Semphore、Exchanger、Phaser;
CountDownLatch、CyclicBarrier、Semphore、Phaser 這四個工具類提供一種併發流程的控制手段;而Exchanger工具類則提供了在線程之間交換數據的一種手段。html
CyclicBarrier 的字面意思是可循環使用(Cyclic)的屏障(Barrier)。它要作的事情是,讓一組線程到達一個屏障(也能夠叫同步點)時被阻塞,直到最後一個線程到達屏障時,屏障纔會開門,全部被屏障攔截的線程纔會繼續幹活。CyclicBarrier默認的構造方法是CyclicBarrier(int parties),其參數表示屏障攔截的線程數量,每一個線程調用await方法告訴CyclicBarrier我已經到達了屏障,而後當前線程被阻塞。java
構造方法摘要
bash
方法名稱 | 說明 |
---|---|
CyclicBarrier(int parties) | 建立一個新的 CyclicBarrier,它將在給定數量的參與者(線程)處於等待狀態時啓動,但它不會在啓動 barrier 時執行預約義的操做。 |
CyclicBarrier(int parties, Runnable barrierAction) | 建立一個新的 CyclicBarrier,它將在給定數量的參與者(線程)處於等待狀態時啓動,並在啓動 barrier 時執行給定的屏障操做,該操做由最後一個進入 barrier 的線程執行。 |
方法摘要
多線程
方法名稱 | 說明 |
---|---|
public int await() throws InterruptedException, BrokenBarrierException |
在全部參與者都已經在此 barrier 上調用 await 方法以前,將一直等待。
返回:
到達的當前線程的索引,其中,索引 getParties() - 1 指示將到達的第一個線程,
零指示最後一個到達的線程.
|
public int await(long timeout,TimeUnit unit) throws InterruptedException,BrokenBarrierException, ITimeoutException |
在全部參與者都已經在此屏障上調用 await 方法以前將一直等待,或者超出了指定的等待時間。 |
public void reset() | 將屏障重置爲其初始狀態。若是全部參與者目前都在屏障處等待,則它們將返回,同時拋出一個 BrokenBarrierException。注意,在因爲其餘緣由形成損壞以後,實行重置可能會變得很複雜; |
public boolean isBroken() | 查詢此屏障是否處於損壞狀態。 |
public int getNumberWaiting() | 返回當前在屏障處等待的參與者數目。此方法主要用於調試和斷言。 |
public int getParties() | 返回要求啓動此 barrier 的參與者數目。 |
注意:併發
public static void main(String[] args) {
//設置5個屏障,而且有屏障操做
CyclicBarrier barrier = new CyclicBarrier(5,new Runnable() {
@Override
public void run() {
System.out.println("線程"+Thread.currentThread().getName()+"執行了屏障操做");
}
});
for(int i=0;i<5;i++){
//建立5個線程
Thread thread = new Thread(new MyRunable(barrier),"thread_"+i);
thread.start();
}
}
複製代碼
class MyRunable implements Runnable{
CyclicBarrier barrier;
public MyRunable(CyclicBarrier barrier ){
this.barrier = barrier;
}
@Override
public void run() {
//一系列操做...
System.out.println("線程 "+Thread.currentThread().getName()+" 到達了屏障點!");
try {
int index = barrier.await();
if(index== (barrier.getParties()-1)){
//第一個到達屏障點的線程,執行特殊操做....
System.out.println("全部線程到達屏障點,線程 "+Thread.currentThread().getName()+" 被喚醒!!此線程是第一個到達屏障點");
}else if(index == 0){//最後一個到達屏障點的線程
System.out.println("全部線程到達屏障點,線程 "+Thread.currentThread().getName()+" 被喚醒!!此線程是最後一個到達屏障點");
}else{
System.out.println("全部線程到達屏障點,線程 "+Thread.currentThread().getName()+" 被喚醒!!");
}
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
複製代碼
運行結果:
app
線程 thread_1 到達了屏障點!
線程 thread_4 到達了屏障點!
線程 thread_3 到達了屏障點!
線程 thread_0 到達了屏障點!
線程 thread_2 到達了屏障點!
線程thread_3執行了屏障操做
全部線程到達屏障點,線程 thread_3 被喚醒!!此線程是最後一個到達屏障點
全部線程到達屏障點,線程 thread_0 被喚醒!!
全部線程到達屏障點,線程 thread_4 被喚醒!!
全部線程到達屏障點,線程 thread_1 被喚醒!!此線程是第一個到達屏障點
全部線程到達屏障點,線程 thread_2 被喚醒!!
複製代碼
上面的例子,使用了傳入屏障操做的Runable參數的構造方法,ide
。然而,在實際使用中,工具
,如上面的例子,第一個和最後一個到達屏障點的線程都執行特殊的操做。this
順便說一下,可能會對本例子中前5個輸出的順序 有所疑惑:thread_3 經過awiat()方法返回的索引值,可知 thread_3 是最後一個到達屏障點的,但爲何輸出的順序倒是第三個,而不是最後一個;在這就要真正理解CyclicBarrier,CyclicBarrier 本質上是一把鎖,多個線程在使用CyclicBarrier 對象時,是須要先獲取鎖,即須要互斥訪問,因此調用await( )方法不必定可以立刻獲取鎖。上面的例子,是先打印輸出,再去獲取鎖,因此輸出順序不是到達屏障點的順序。
spa
下面的例子是:CyclicBarrier用於多線程計算數據,最後合併計算結果的場景。好比咱們用一個Excel保存了用戶全部銀行流水,每一個Sheet保存一個賬戶近一年的每筆銀行流水,如今須要統計用戶的日均銀行流水,先用多線程處理每一個sheet裏的銀行流水,都執行完以後,獲得每一個sheet的日均銀行流水,最後,再用barrierAction用這些線程的計算結果,計算出整個Excel的日均銀行流水。
public class BankWaterService implements Runnable {
//建立4個屏障,處理完後執行當前類的run方法
private CyclicBarrier barrier = new CyclicBarrier(4,this);
//假設只有4個sheet,因此只啓動4個線程
private Executor excutor = Executors.newFixedThreadPool(4);
//保存每一個sheet計算出的結果
private ConcurrentHashMap< String, Integer> sheetBankWaterCount = new ConcurrentHashMap<>();
private void count(){
for(int i=0;i<4;i++){
excutor.execute(new Runnable() {
@Override
public void run() {
//計算過程.....
//存儲計算結果
sheetBankWaterCount.put(Thread.currentThread().getName(), 1);
try {
//計算完成,插入屏障
barrier.await();
//後續操做,將會使用到四個線程的運行結果....
System.out.println("線程"+Thread.currentThread().getName()+"運行結束,最終的計算結果:"+sheetBankWaterCount.get("result"));
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
});
}
}
@Override
public void run() {
int result = 0;
for(Entry<String, Integer> item : sheetBankWaterCount.entrySet()){
result += item.getValue();
}
sheetBankWaterCount.put("result", result);
}
public static void main(String[] args) {
BankWaterService bankWaterService = new BankWaterService();
bankWaterService.count();
}
}
複製代碼
運行結果:
線程pool-1-thread-4運行結束,最終的計算結果:4
線程pool-1-thread-2運行結束,最終的計算結果:4
線程pool-1-thread-1運行結束,最終的計算結果:4
線程pool-1-thread-3運行結束,最終的計算結果:4
複製代碼
文章源地址:https://www.cnblogs.com/jinggod/p/8494193.html