--------------------- app
package com.ysma.test; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.*; /** * 在最後一個線程抵達而且其餘線程也都抵達或者broken了的時候,整個阻塞就盤活了,不在阻塞 * @since 1.5 * @see CountDownLatch * @author Doug Lea 又是這哥們寫的,保留這個註釋 */ public class CyclicBarrier { /**一個柵欄就是一代,Generation變化一次就表明柵欄完成了一次*/ private static class Generation { boolean broken = false; } /** The lock for guarding barrier entry */ private final ReentrantLock lock = new ReentrantLock(); /** Condition to wait on until tripped */ private final Condition trip = lock.newCondition(); /** The number of parties */ private final int parties; /* The command to run when tripped */ private final Runnable barrierCommand; /** The current generation */ private Generation generation = new Generation(); private int count; /** * Updates state on barrier trip and wakes up everyone. * Called only while holding lock. */ private void nextGeneration() { // 喚醒通知上一代已經完成 trip.signalAll(); // 重置計數器開啓新時代 count = parties; generation = new Generation(); } /** * 設置當前代中斷,喚醒全部 * Called only while holding lock. */ private void breakBarrier() { generation.broken = true;//因故中斷,標識一下 count = parties;//重置計數器,喚醒全部阻塞線程; trip.signalAll();//PS:並無開啓新時代! } /** * Main barrier code, covering the various policies. */ private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock();//獲取或等待獲取資源,ysma-1 try { final Generation g = generation; if (g.broken)//任一broken則break全部 throw new BrokenBarrierException(); if (Thread.interrupted()) {//獲取資源後發現本身被中斷了 breakBarrier(); throw new InterruptedException(); } int index = --count;//獲取資源,計數器減一 if (index == 0) { // 達到臨界點,執行barrierCommand,nextGeneration,結束=>放行全部線程 boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) trip.await();//不設置超時,wait,釋放cpu else if (nanos > 0L) nanos = trip.awaitNanos(nanos);//wait指定時間 } catch (InterruptedException ie) { if (g == generation && ! g.broken) {//發生異常,判斷本身爲第一個發起中斷者 breakBarrier(); throw ie; } else {//發生異常,本身非第一個中斷者 Thread.currentThread().interrupt(); } } if (g.broken)//被喚醒後,檢測中斷標誌broken throw new BrokenBarrierException(); if (g != generation)//若是柵欄已經開啓了下一代,結束,放行 return index; if (timed && nanos <= 0L) {//被喚醒後,發現超時了,broken中斷 breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock();//解鎖,釋放資源 } } /**構造器,略*/ public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } /**構造器,略*/ public CyclicBarrier(int parties) { this(parties, null); } /**獲取資源/線程數*/ public int getParties() { return parties; } /**不限制等待*/ public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen; } } /**限時等待*/ public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); } /** * 查詢柵欄是否已經broken了 * PS:重入鎖方式進入查看 */ public boolean isBroken() { final ReentrantLock lock = this.lock; lock.lock(); try { return generation.broken; } finally { lock.unlock(); } } /**重置 * 重入鎖方式進入,break柵欄,開啓新時代 * */ public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // break the current generation nextGeneration(); // start a new generation } finally { lock.unlock(); } } /**獲取還有多少資源沒有就緒*/ public int getNumberWaiting() { final ReentrantLock lock = this.lock; lock.lock(); try { return parties - count; } finally { lock.unlock(); } } }