java 中的CyclicBarrier

CyclicBarrier是由ReentrantLock可重入鎖和Condition共同實現的。java

  • 可循環(Cyclic)使用的屏障(Barrier)
  • 實現原理:
    • 在CyclicBarrier的內部定義了一個Lock對象,
    • 每當一個線程調用CyclicBarrier的await方法時,將剩餘攔截的線程數減1,
    • 而後判斷剩餘攔截數是否爲0,
      • 若是不是,進入Lock對象的條件隊列等待。
      • 若是是,執行barrierAction對象的Runnable方法,
    • 而後將鎖的條件隊列中的全部線程放入鎖等待隊列中,這些線程會依次的獲取鎖、釋放鎖,接着先從await方法返回,
    • 再從CyclicBarrier的await方法中返回。

================================================================ide

一、迴環柵欄,經過它能夠實現讓一組線程等待至某個狀態以後再所有同時執行this

  • 第一個版本比較經常使用,用來掛起當前線程,直至全部線程都到達barrier狀態再同時執行後續任務
  • 第二個版本是讓這些線程等待至必定的時間,若是還有線程沒有到達barrier狀態就直接讓到達barrier的線程執行後續任務
public int await() throws InterruptedException, BrokenBarrierException { };
public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException { };

二、舉個栗子:線程

public class Test {
    public static void main(String[] args) {
        int N = 4;
        CyclicBarrier barrier  = new CyclicBarrier(N);
        for(int i=0;i<N;i++)
            new Writer(barrier).start();
    }
    static class Writer extends Thread{
        private CyclicBarrier cyclicBarrier;
        public Writer(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }
 
        @Override
        public void run() {
            System.out.println("線程"+Thread.currentThread().getName()+"正在寫入數據...");
            try {
                Thread.sleep(5000);      //以睡眠來模擬寫入數據操做
                System.out.println("線程"+Thread.currentThread().getName()+"寫入數據完畢,等待其餘線程寫入完畢");
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }catch(BrokenBarrierException e){
                e.printStackTrace();
            }
            System.out.println("全部線程寫入完畢,繼續處理其餘任務...");
        }
    }
}
  • 執行結果

三、當四個線程都到達barrier狀態後,會從四個線程中選擇一個線程去執行Runnable3d

public class Test {
    public static void main(String[] args) {
        int N = 4;
        CyclicBarrier barrier  = new CyclicBarrier(N,new Runnable() {
            @Override
            public void run() {
                System.out.println("當前線程"+Thread.currentThread().getName());   
            }
        });
 
        for(int i=0;i<N;i++)
            new Writer(barrier).start();
    }
    static class Writer extends Thread{
        private CyclicBarrier cyclicBarrier;
        public Writer(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }
 
        @Override
        public void run() {
            System.out.println("線程"+Thread.currentThread().getName()+"正在寫入數據...");
            try {
                Thread.sleep(5000);      //以睡眠來模擬寫入數據操做
                System.out.println("線程"+Thread.currentThread().getName()+"寫入數據完畢,等待其餘線程寫入完畢");
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }catch(BrokenBarrierException e){
                e.printStackTrace();
            }
            System.out.println("全部線程寫入完畢,繼續處理其餘任務...");
        }
    }
}

四、故意讓最後一個線程啓動延遲code

  • 在前面三個線程都達到barrier以後,等待了指定的時間發現第四個線程尚未達到barrier,就拋出異常並繼續執行後面的任務
public class Test {
    public static void main(String[] args) {
        int N = 4;
        CyclicBarrier barrier  = new CyclicBarrier(N);
 
        for(int i=0;i<N;i++) {
            if(i<N-1)
                new Writer(barrier).start();
            else {
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                new Writer(barrier).start();
            }
        }
    }
    static class Writer extends Thread{
        private CyclicBarrier cyclicBarrier;
        public Writer(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }
 
        @Override
        public void run() {
            System.out.println("線程"+Thread.currentThread().getName()+"正在寫入數據...");
            try {
                Thread.sleep(5000);      //以睡眠來模擬寫入數據操做
                System.out.println("線程"+Thread.currentThread().getName()+"寫入數據完畢,等待其餘線程寫入完畢");
                try {
                    cyclicBarrier.await(2000, TimeUnit.MILLISECONDS);
                } catch (TimeoutException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }catch(BrokenBarrierException e){
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+"全部線程寫入完畢,繼續處理其餘任務...");
        }
    }
}

五、CyclicBarrier是能夠重用的對象

  • 初次的4個線程越過barrier狀態後,又能夠用來進行新一輪的使用
  • CountDownLatch沒法進行重複使用
public class Test {
    public static void main(String[] args) {
        int N = 4;
        CyclicBarrier barrier  = new CyclicBarrier(N);
 
        for(int i=0;i<N;i++) {
            new Writer(barrier).start();
        }
 
        try {
            Thread.sleep(25000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
 
        System.out.println("CyclicBarrier重用");
 
        for(int i=0;i<N;i++) {
            new Writer(barrier).start();
        }
    }
    static class Writer extends Thread{
        private CyclicBarrier cyclicBarrier;
        public Writer(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }
 
        @Override
        public void run() {
            System.out.println("線程"+Thread.currentThread().getName()+"正在寫入數據...");
            try {
                Thread.sleep(5000);      //以睡眠來模擬寫入數據操做
                System.out.println("線程"+Thread.currentThread().getName()+"寫入數據完畢,等待其餘線程寫入完畢");
 
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }catch(BrokenBarrierException e){
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+"全部線程寫入完畢,繼續處理其餘任務...");
        }
    }
}
相關文章
相關標籤/搜索