Java併發編程系列之CyclicBarrier詳解

簡介

jdk原文java

A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the waiting threads are released. 

這句話翻譯意思:CyclicBarrier是一個同步輔助類,它容許一組線程相互等待直到全部線程都到達一個公共的屏障點。
在程序中有固定數量的線程,這些線程有時候必須等待彼此,這種狀況下,使用CyclicBarrier頗有幫助。
這個屏障之因此用循環修飾,是由於在全部的線程釋放彼此以後,這個屏障是能夠從新使用的數組

抓住重點:一、容許一組線程相互等待直到達到一個公共屏障點,二、能夠重複使用bash

簡單舉例就是:玩王者榮耀只有全部人進入遊戲以前都必須加載到100%,全部人才能進入遊戲。
與CountDownLatch比較多線程

 

 
image.png

源碼解析

先從構造方法入手併發

/** * Creates a new {@code CyclicBarrier} that will trip when the * given number of parties (threads) are waiting upon it, and * does not perform a predefined action when the barrier is tripped. * * @param parties the number of threads that must invoke {@link #await} * before the barrier is tripped * @throws IllegalArgumentException if {@code parties} is less than 1 */ public CyclicBarrier(int parties) { this(parties, null); } /** * Creates a new {@code CyclicBarrier} that will trip when the * given number of parties (threads) are waiting upon it, and which * will execute the given barrier action when the barrier is tripped, * performed by the last thread entering the barrier. * * @param parties the number of threads that must invoke {@link #await} * before the barrier is tripped * @param barrierAction the command to execute when the barrier is * tripped, or {@code null} if there is no action * @throws IllegalArgumentException if {@code parties} is less than 1 */ public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } 

從jdk註釋咱們能夠看出:
第一個構造器:建立一個新的{@code CyclicBarrier},它會在
給定數量的屏障(線程)正在等待它,而且在屏障被觸發時不執行預約義的操做。
第二個構造器:建立一個新的{@code CyclicBarrier},它會在
給定數量的屏障(線程)正在等待它,以及當屏障被觸發時,優先執行barrierAction,方便處理更復雜的業務場景。less


await()方法
調用await方法的線程告訴CyclicBarrier本身已經到達同步點,而後當前線程被阻塞。直到parties個參與線程調用了await方法,CyclicBarrier一樣提供帶超時時間的await和不帶超時時間的await方法:
await()方法裏面最主要就是doawait()dom

private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { // 獲取獨佔鎖 final ReentrantLock lock = this.lock; lock.lock(); try { // 當前先從 final Generation g = generation; // 若是這個線程損壞了,拋出異常 if (g.broken) throw new BrokenBarrierException(); // 若是線程中斷了,拋出異常 if (Thread.interrupted()) { // 將損壞狀態設置爲true // 並通知其餘阻塞在此屏障上的線程 breakBarrier(); throw new InterruptedException(); } // 獲取下標 int index = --count; // 若是是 0,說明最後一個線程調用了該方法 if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; // 執行屏障任務 if (command != null) command.run(); ranAction = true; // 更新一代,將count重置,將generation重置 // 喚醒以前等待的線程 nextGeneration(); return 0; } finally { // 若是執行屏障任務的時候失敗了,就將損壞狀態設置爲true if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { // 若是沒有時間限制,則直接等待,直到被喚醒 if (!timed) trip.await(); // 若是有時間限制,則等待指定時間 else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { // 當前線程沒有損壞 if (g == generation && ! g.broken) { // 讓屏障失效 breakBarrier(); throw ie; } else { // 上面條件不知足,說明這個線程不是這代的 // 就不會影響當前這代屏障的執行,因此,就打個中斷標記 Thread.currentThread().interrupt(); } } // 當有任何一個線程中斷了,就會調用breakBarrier方法 // 就會喚醒其餘的線程,其餘線程醒來後,也要拋出異常 if (g.broken) throw new BrokenBarrierException(); // g != generation表示正常換代了,返回當前線程所在屏障的下標 // 若是 g == generation,說明尚未換代,那爲何會醒了? // 由於一個線程能夠使用多個屏障,當別的屏障喚醒了這個線程,就會走到這裏,因此須要判斷是不是當前代。 // 正是由於這個緣由,才須要generation來保證正確。 if (g != generation) return index; // 若是有時間限制,且時間小於等於0,銷燬屏障並拋出異常 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { // 釋放獨佔鎖 lock.unlock(); } } 

總結若是該線程不是最後一個調用await方法的線程,則它會一直處於等待狀態,除非發生如下狀況:
最後一個線程到達,即index == 0
某個參與線程等待超時
某個參與線程被中斷
調用了CyclicBarrier的reset()方法。該方法會將屏障重置爲初始狀態ide

Generation描述着CyclicBarrier的更新換代。在CyclicBarrier中,同一批線程屬於同一代。當有parties個線程到達barrier以後,generation就會被更新換代。其中broken標識該當前CyclicBarrier是否已經處於中斷狀態。oop

默認barrier(屏障)是沒有損壞的。當barrier(屏障)損壞了或者有一個線程中斷了,則經過breakBarrier()來終止全部的線程:ui

private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); } 

breakBarrier()不只會把broken設置爲true,還會將全部處於等待狀態的線程所有喚醒(singalAll)方法

注意CyclicBarrier使用獨佔鎖來執行await方法,併發性可能不是很高

簡單例子加深印象

/** * @author shuliangzhao * @Title: CyclicBarrierTest * @ProjectName design-parent * @Description: TODO * @date 2019/6/3 0:23 */ public class CyclicBarrierTest { public static void main(String[] args) { int N = 4; CyclicBarrier barrier = new CyclicBarrier(N); for(int i=0;i<N;i++) { new Writer(barrier).start(); } /* try { Thread.sleep(25000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("CyclicBarrier重用"); for(int i=0;i<N;i++) { new Writer(barrier).start(); }*/ } static class Writer extends Thread{ private CyclicBarrier cyclicBarrier; public Writer(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } @Override public void run() { System.out.println("線程"+Thread.currentThread().getName()+"正在寫入數據..."); try { Thread.sleep(2000); //以睡眠來模擬寫入數據操做 System.out.println("線程"+Thread.currentThread().getName()+"寫入數據完畢,等待其餘線程寫入完畢"); cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); }catch(BrokenBarrierException e){ e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"全部線程寫入完畢,繼續處理其餘任務..."); } } } 

運行結果

image.png

怎麼用多線程求和

/** * @author shuliangzhao * @Title: CyclicBarrier * @ProjectName design-parent * @Description: TODO * @date 2019/6/3 0:18 */ public class CyclicBarrierExc { //private static final Logger LOGGER = LoggerFactory.getLogger(CyclicBarrierExc.class); public static void main(String[] args) { //數組大小 int size = 50000; //定義數組 int[] numbers = new int[size]; //隨機初始化數組 for (int i = 0; i < size; i++) { numbers[i] = RandomUtils.nextInt(100, 1000); } //多線程計算結果 //定義線程池 ExecutorService executorService = Executors.newFixedThreadPool(5); //定義五個Future去保存子數組計算結果 final int[] results = new int[5]; //定義一個循環屏障,在屏障線程中進行計算結果合併 CyclicBarrier barrier = new CyclicBarrier(5, () -> { int sums = 0; for (int i = 0; i < 5; i++) { sums += results[i]; } System.out.println("多線程計算結果:" + sums); }); //子數組長度 int length = 10000; //定義五個線程去計算 for (int i = 0; i < 5; i++) { //定義子數組 int[] subNumbers = Arrays.copyOfRange(numbers, (i * length), ((i + 1) * length)); //盛放計算結果 int finalI = i; executorService.submit(() -> { for (int j = 0; j < subNumbers.length; j++) { results[finalI] += subNumbers[j]; } //等待其餘線程進行計算 try { barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }); } //關閉線程池 executorService.shutdown(); } }
相關文章
相關標籤/搜索