CyclicBarrier是JDK1.5提供容許一組線程等待彼此都達到一個共同的障礙點的同步的工具。CyclicBarrier適用於固定大小線程池,能夠設置一個Runnable任務,當各線程達到共同的障礙點時觸發這個任務。java
//建立線程池 private static ExecutorService executorService = Executors.newFixedThreadPool(10); //建立屏障 static CyclicBarrier cb = new CyclicBarrier(10,new Runnable() { public void run() { System.out.println("到達屏障"); } }); public static void main(String[] args) { //提交任務 for (int i = 0; i < 10; i++) { executorService.submit(new Runnable() { @Override public void run() { try { cb.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }); } }
如上例子建立線程爲10的固定線程池,建立值爲10的屏障,並設置一個Runnable任務。運行main方法提交任務當任務提交到10的時候到達屏障點,會運行Runnable任務並輸出"到達屏障"。app
CyclicBarrier是利用ReentrantLock和Condition對扣減屏障值操做進行加鎖,加鎖後釋放鎖而後阻塞直到屏障值爲0被喚醒。ide
下面來看下CyclicBarrier的成員變量工具
private final ReentrantLock lock = new ReentrantLock();
對扣減屏障值操做進行加鎖用。oop
private final Condition trip = lock.newCondition();
對扣減屏障值操做後阻塞線程用,源碼分析
private final int parties;
屏障值最大值,不可修改。this
private final Runnable barrierCommand;
各線程到達屏障執行的任務。線程
private Generation generation = new Generation();
Generation類型對象,此類型裏成員只有一個boolean類型的變量,做用是判斷屏障是否被打破。code
private int count;
屏障值,操做扣減用。對象
await()源碼以下
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }
直接調用dowait(false, 0L)方法,第一個參數表示是否支持等待超時,第二個參數表示超時時長。await()不須要超時這裏傳了false和0L。由於await()不須要超時TimeoutException這個異常也不可能發生。
下面來看下dowait(false, 0L)的源碼
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; //加鎖 lock.lock(); try { final Generation g = generation; //1.若是屏障被打破拋出屏障打破異常 if (g.broken) throw new BrokenBarrierException(); //2.若是當前線程被中斷拋出中斷異常 if (Thread.interrupted()) { //3.打破屏障 breakBarrier(); throw new InterruptedException(); } //4.屏障值減一 int index = --count; //5.若是減一之後屏障值等於0,就要喚醒全部的阻塞線程 if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; //6.是否配置了任務,若是配置了則執行 if (command != null) command.run(); ranAction = true; //7.若是任務正常運行結束,全部的阻塞線程,並重置屏障值 nextGeneration(); return 0; } finally { //8.若是任務運行出現異常,則打破屏障 if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { //9.若是不支持等待超時,則調用await()一直等待 if (!timed) trip.await(); //10.若是支持等待超時,則等待nanos時間 else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { //11.若是線程被中斷。若是g == generation不成立說明當前線程已經被喚醒,這裏說明還沒被喚醒的中斷就要打破屏障,不然就標記中斷讓上層處理。 if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } //11.若是已經打破屏障,拋出BrokenBarrierException異常 if (g.broken) throw new BrokenBarrierException(); //12.g != generation成立說明已經被激活,這裏正常結束 if (g != generation) return index; //13.若是超時,則打破屏障拋出超時異常。 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { //釋放鎖 lock.unlock(); } }
dowait(false, 0L)主要邏輯就是將屏障值count減1,而後進入等待,直到count等於0到達屏障點被喚醒。 這裏須要注意的是若是一個線程打破屏障,則全部的線程都會被打破拋出BrokenBarrierException異常,而且屏障被打破後若是想繼續使用必須調用reset()方法重置。
下面來看下breakBarrier()方法
private void breakBarrier() { //設置打破屏障狀態 generation.broken = true; //將count設置成原來的值 count = parties; //喚醒全部其餘線程 trip.signalAll(); }
breakBarrier()就是設置打破屏障狀態爲ture,而後喚醒因此其餘阻塞線程,其餘阻塞喚醒後會拋出BrokenBarrierException異常。
下面來看下nextGeneration()方法
private void nextGeneration() { // 喚醒全部其餘線程 trip.signalAll(); // 將count設置成原來的值 count = parties; //初始化屏障狀態 generation = new Generation(); }
nextGeneration()跟breakBarrier()相似,可是nextGeneration()是從新初始化屏障狀態的,因此調用這個方法後CyclicBarrier可重用。
reset()源碼以下
public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { //打破屏障 breakBarrier(); // break the current generation //重置CyclicBarrier狀態 nextGeneration(); // start a new generation } finally { lock.unlock(); } }
reset()很簡單,先打破屏障,終止各線程等待狀態使其餘線程拋出BrokenBarrierException異常,而後重置CyclicBarrier狀態,使其可重用。這裏官方推薦不要重用,從新建立一個CyclicBarrier使用,官方給的緣由也比較含糊。