併發工具類(二)同步屏障CyclicBarrier

前言

  JDK中爲了處理線程之間的同步問題,除了提供鎖機制以外,還提供了幾個很是有用的併發工具類:CountDownLatch、CyclicBarrier、Semphore、Exchanger、Phaser;
  CountDownLatch、CyclicBarrier、Semphore、Phaser 這四個工具類提供一種併發流程的控制手段;而Exchanger工具類則提供了在線程之間交換數據的一種手段。html

簡介

  CyclicBarrier 的字面意思是可循環使用(Cyclic)的屏障(Barrier)。它要作的事情是,讓一組線程到達一個屏障(也能夠叫同步點)時被阻塞,直到最後一個線程到達屏障時,屏障纔會開門,全部被屏障攔截的線程纔會繼續幹活。CyclicBarrier默認的構造方法是CyclicBarrier(int parties),其參數表示屏障攔截的線程數量,每一個線程調用await方法告訴CyclicBarrier我已經到達了屏障,而後當前線程被阻塞。java

構造方法摘要
bash

方法名稱 說明
CyclicBarrier(int parties) 建立一個新的 CyclicBarrier,它將在給定數量的參與者(線程)處於等待狀態時啓動,但它不會在啓動 barrier 時執行預約義的操做。
CyclicBarrier(int parties, Runnable barrierAction) 建立一個新的 CyclicBarrier,它將在給定數量的參與者(線程)處於等待狀態時啓動,並在啓動 barrier 時執行給定的屏障操做,該操做由最後一個進入 barrier 的線程執行。

方法摘要
多線程

方法名稱 說明
public int await() throws
InterruptedException, BrokenBarrierException
在全部參與者都已經在此 barrier 上調用 await 方法以前,將一直等待。
返回:
到達的當前線程的索引,其中,索引 getParties() - 1 指示將到達的第一個線程,
零指示最後一個到達的線程.
public int await(long timeout,TimeUnit unit) throws
InterruptedException,BrokenBarrierException,
ITimeoutException
在全部參與者都已經在此屏障上調用 await 方法以前將一直等待,或者超出了指定的等待時間。
public void reset() 將屏障重置爲其初始狀態。若是全部參與者目前都在屏障處等待,則它們將返回,同時拋出一個 BrokenBarrierException。注意,在因爲其餘緣由形成損壞以後,實行重置可能會變得很複雜;
public boolean isBroken() 查詢此屏障是否處於損壞狀態。
public int getNumberWaiting() 返回當前在屏障處等待的參與者數目。此方法主要用於調試和斷言。
public int getParties() 返回要求啓動此 barrier 的參與者數目。

注意:併發

  • 對於失敗的同步嘗試,CyclicBarrier 使用了一種要麼所有要麼全不 (all-or-none) 的破壞模式:
    若是由於中斷、失敗或者超時等緣由,致使線程過早地離開了屏障點,那麼在該屏障點等待的其餘全部線程也將經過 BrokenBarrierException(若是它們幾乎同時被中斷,則用 InterruptedException)以反常的方式離開。
  • 內存一致性效果:
    線程中調用 await() 以前的操做 happen-before 那些是屏障操做的一部份的操做,後者依次 happen-before 緊跟在從另外一個線程中對應 await() 成功返回的操做。
@ Example1 屏障操做的例子

public static void main(String[] args) {
    //設置5個屏障,而且有屏障操做
    CyclicBarrier barrier = new CyclicBarrier(5,new Runnable() {
        @Override
        public void run() {
                System.out.println("線程"+Thread.currentThread().getName()+"執行了屏障操做");        
        }
    });
    
    for(int i=0;i<5;i++){
       //建立5個線程
        Thread thread = new Thread(new MyRunable(barrier),"thread_"+i);
        thread.start();
    }
}
複製代碼

class MyRunable implements Runnable{

    CyclicBarrier barrier;
    public MyRunable(CyclicBarrier barrier ){
         this.barrier = barrier; 
    }
    
    @Override
    public void run() {
        //一系列操做...
         System.out.println("線程 "+Thread.currentThread().getName()+" 到達了屏障點!");
         try {
             int index = barrier.await();
            if(index== (barrier.getParties()-1)){
                //第一個到達屏障點的線程,執行特殊操做....
                System.out.println("全部線程到達屏障點,線程 "+Thread.currentThread().getName()+" 被喚醒!!此線程是第一個到達屏障點");
            }else if(index == 0){//最後一個到達屏障點的線程
                System.out.println("全部線程到達屏障點,線程 "+Thread.currentThread().getName()+" 被喚醒!!此線程是最後一個到達屏障點");
            }else{
                System.out.println("全部線程到達屏障點,線程 "+Thread.currentThread().getName()+" 被喚醒!!");
            }
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}
複製代碼

運行結果:
app

線程 thread_1 到達了屏障點!
線程 thread_4 到達了屏障點!
線程 thread_3 到達了屏障點!
線程 thread_0 到達了屏障點!
線程 thread_2 到達了屏障點!
線程thread_3執行了屏障操做
全部線程到達屏障點,線程 thread_3 被喚醒!!此線程是最後一個到達屏障點
全部線程到達屏障點,線程 thread_0 被喚醒!!
全部線程到達屏障點,線程 thread_4 被喚醒!!
全部線程到達屏障點,線程 thread_1 被喚醒!!此線程是第一個到達屏障點
全部線程到達屏障點,線程 thread_2 被喚醒!!
複製代碼

 上面的例子,使用了傳入屏障操做的Runable參數的構造方法,ide

屏障操做是由最後一個到達屏障點的線程執行的,這是不能夠改變的

。然而,在實際使用中,工具

可能會出現由第n個到達屏障點的線程執行特殊的操做(或者說 屏障操做),那麼就可使用 CyclicBarrier.await()進行判斷

,如上面的例子,第一個和最後一個到達屏障點的線程都執行特殊的操做。this

   順便說一下,可能會對本例子中前5個輸出的順序 有所疑惑:thread_3 經過awiat()方法返回的索引值,可知 thread_3 是最後一個到達屏障點的,但爲何輸出的順序倒是第三個,而不是最後一個;在這就要真正理解CyclicBarrier,CyclicBarrier 本質上是一把鎖,多個線程在使用CyclicBarrier 對象時,是須要先獲取鎖,即須要互斥訪問,因此調用await( )方法不必定可以立刻獲取鎖。上面的例子,是先打印輸出,再去獲取鎖,因此輸出順序不是到達屏障點的順序。
spa

@ Example2 應用場景

   下面的例子是:CyclicBarrier用於多線程計算數據,最後合併計算結果的場景。好比咱們用一個Excel保存了用戶全部銀行流水,每一個Sheet保存一個賬戶近一年的每筆銀行流水,如今須要統計用戶的日均銀行流水,先用多線程處理每一個sheet裏的銀行流水,都執行完以後,獲得每一個sheet的日均銀行流水,最後,再用barrierAction用這些線程的計算結果,計算出整個Excel的日均銀行流水。

public class BankWaterService implements Runnable {
    
    //建立4個屏障,處理完後執行當前類的run方法
    private CyclicBarrier barrier = new CyclicBarrier(4,this);
    
    //假設只有4個sheet,因此只啓動4個線程
    private Executor excutor = Executors.newFixedThreadPool(4);
    
    //保存每一個sheet計算出的結果
    private ConcurrentHashMap< String, Integer> sheetBankWaterCount = new ConcurrentHashMap<>();
    
    private void count(){
        for(int i=0;i<4;i++){
            excutor.execute(new Runnable() {
                
                @Override
                public void run() {
                    //計算過程.....
                    //存儲計算結果
                    sheetBankWaterCount.put(Thread.currentThread().getName(), 1);
                    try {
                        //計算完成,插入屏障
                        barrier.await();
                        //後續操做,將會使用到四個線程的運行結果....
                        System.out.println("線程"+Thread.currentThread().getName()+"運行結束,最終的計算結果:"+sheetBankWaterCount.get("result"));
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
    

    @Override
    public void run() {
        int result = 0;
        for(Entry<String, Integer> item : sheetBankWaterCount.entrySet()){
            result += item.getValue();
        }
        sheetBankWaterCount.put("result", result);
    }
    
    public static void main(String[] args) {
        BankWaterService bankWaterService = new BankWaterService();
        bankWaterService.count();
    }
}
複製代碼

運行結果:

線程pool-1-thread-4運行結束,最終的計算結果:4
線程pool-1-thread-2運行結束,最終的計算結果:4
線程pool-1-thread-1運行結束,最終的計算結果:4
線程pool-1-thread-3運行結束,最終的計算結果:4
複製代碼

CyclicBarrier和CountDownLatch的區別

  • CountDownLatch: 一個線程(或者多個線程), 等待另外N個線程完成某個事情以後才能執行。而這N個線程經過調用CountDownLatch.countDown()方法 來告知「某件事件」完成,即計數減一。而一個線程(或者多個線程)則經過CountDownLatch.awiat( ) 進入等待狀態,直到 CountDownLatch的計數爲0時,纔會所有被喚醒
  • CyclicBarrier : N個線程相互等待,任何一個線程完成某個事情以前,全部的線程都必須等待。
    CountDownLatch 是計數器, 線程完成一個就記一個, 就像 報數同樣, 只不過是遞減的.
    而CyclicBarrier更像一個水閘, 線程執行就想水流, 在水閘處都會堵住, 等到水滿(線程到齊)了, 纔開始泄流.
  • CountDownLatch只能使用一次,CyclicBarrier則能夠經過reset( )方法重置後,從新使用。因此
    CyclicBarrier能夠用於更復雜的業務場景。
    例如:計算錯誤,能夠重置計數器,並讓線程從新執行一次。

文章源地址:https://www.cnblogs.com/jinggod/p/8494193.html

相關文章
相關標籤/搜索