CyclicBarrier 是一個同步輔助工具,容許一組線程所有等待彼此達到共同屏障點,且等待的線程被釋放後還能夠從新使用,因此叫作Cyclic(循環的)。java
好比出去旅行時,導遊須要等待全部的客人到齊後,導遊纔會給你們講解注意事項等數組
在JDK的源碼註釋中,提供了一個簡單的示例demo,稍加修改後就能夠運行app
public class Solver { AtomicInteger sum = new AtomicInteger(0); // 本身新增的一個標識,true表明全部的計算完成了 volatile boolean done = false; final int N; final int[][] data; final CyclicBarrier barrier; class Worker implements Runnable { int myRow; Worker(int row) { myRow = row; } @Override public void run() { while (!done()) { int rowSum = Arrays.stream(data[myRow]).sum(); // 計算行的和 System.out.println("processRow(myRow):" + rowSum); sum.addAndGet(rowSum); try { barrier.await(); } catch (InterruptedException ex) { return; } catch (BrokenBarrierException ex) { return; } } } } private boolean done(){ return done; } public Solver(int[][] matrix) throws InterruptedException{ data = matrix; N = matrix.length; Runnable barrierAction = () -> { System.out.println("mergeRows(...):"+sum.get()); // 輸出二維數組的總和 done = true; }; barrier = new CyclicBarrier(N, barrierAction); List<Thread> threads = new ArrayList<Thread>(N); for (int i = 0; i < N; i++) { Thread thread = new Thread(new Worker(i)); threads.add(thread); thread.start(); } // wait until done for (Thread thread : threads){ thread.join(); } } public static void main(String[] args) throws InterruptedException{ int[][] matrix = {{1,2,3},{4,5,6}}; Solver solver = new Solver(matrix); } }
/** 防禦柵欄入口的鎖 */ private final ReentrantLock lock = new ReentrantLock(); /** 等待直到跳閘的條件 */ private final Condition trip = lock.newCondition(); /** 構造方法參數,在障礙被釋放以前必須調用等待的線程數 */ private final int parties; /* 越過柵欄時運行的命令 */ private final Runnable barrierCommand; /** 當前的一代,控制CyclicBarrier的循環 */ private Generation generation = new Generation(); /** 記錄仍在等待的參與方線程數量,初始值等於parties */ private int count;
/** 代:屏障的每次使用都表示爲一個生成實例 */ private static class Generation { boolean broken = false; // 標識當前的柵欄已破壞或喚醒,jinglingwang.cn }
一共有兩個構造方法,第一個構造方法僅須要傳入一個int值,表示調用等待的線程數;第二個構造方法多了一個runnable接口,當全部的線程越過柵欄時執行的命令,沒有則爲null;ide
public CyclicBarrier(int parties) { this(parties, null); } public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; // Runnable 命令線程 }
每一個須要在柵欄處等待的線程都須要顯式地調用這個方法。工具
public int await() throws InterruptedException, BrokenBarrierException { try { // 調用await方法,0:不超時 return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }
主要的障礙代碼源碼分析
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { // 當前鎖 final ReentrantLock lock = this.lock; // 加鎖 lock.lock(); try { // 當前代 final Generation g = generation; // 檢查當前代的狀態,是否要拋出BrokenBarrierException異常 if (g.broken) throw new BrokenBarrierException(); // 當前線程被中斷了 if (Thread.interrupted()) { // 屏障被打破 breakBarrier(); throw new InterruptedException(); } // count減一 int index = --count; // index等於0,說明最後一個線程到達了屏障處 if (index == 0) { // tripped boolean ranAction = false; // 標識Runnable 命令線程是否有執行 try { final Runnable command = barrierCommand; // 第二個構造方法的入參,須要運行的命令線程 if (command != null) command.run(); // 執行命令線程。by:jinglingwang.cn ranAction = true; nextGeneration(); // 更新重置整個屏障 return 0; } finally { if (!ranAction) // ranAction 沒有被設置成true;被中斷了 breakBarrier(); } } // 循環直到跳閘,斷開,中斷或超時 for (;;) { try { if (!timed) // 沒有設超時時間,直接調用條件鎖的await方法阻塞等待 trip.await(); else if (nanos > 0L) // 有超時時間 nanos = trip.awaitNanos(nanos); //調用條件鎖的await方法阻塞等待一段時間 } catch (InterruptedException ie) { // 捕獲中斷異常 if (g == generation && ! g.broken) { breakBarrier(); //被中斷,當前代會被標識成已被破壞 throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } // 若是上面代碼沒有異常,理論上只有被喚醒後纔會執行到下面的代碼 // 再次檢查當前代是否已經被破壞 if (g.broken) throw new BrokenBarrierException(); // 正常來講,最後一個線程在執行上面的代碼時,會調用nextGeneration,從新生成generation // 因此線程被喚醒後,這裏條件會成立 if (g != generation) return index; // 超時檢查 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); //拋出超時異常 } } } finally { // 釋放鎖 lock.unlock(); } } /** 重置屏障,回到初始狀態,說明能夠重複使用*/ private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; // 重置等的參與方線程數量計數,回到最初的狀態 generation = new Generation(); } private void breakBarrier() { // 標識當前的柵欄狀態 generation.broken = true; count = parties; // 條件鎖,喚醒全部等待的線程,jinglingwang.cn trip.signalAll(); }
dowait() 方法過程總結:this
await
方法Condition
的await
方法,根據前面條件鎖的源碼分析咱們知道,調用條件鎖的await方法會釋放當前鎖,而後再調用Unsafa類底層 park
阻塞線程。nextGeneration
方法,喚醒全部的條件鎖等待的N-1個線程(喚醒並不必定立刻執行),而後重置計數與當前代,也就是一個新的屏障了,這也就是爲何能夠重複使用的緣由。