JUC中的CyclicBarrier提供了一種多線程間的同步機制,可讓多個線程在barrier等待其它線程到達barrier。正如其名CyclicBarrier含義就是能夠循環使用的屏障。java
在CyclicBarrier中用Generation來表明每一輪的Cyclibarrier的運行情況。數據結構
private static class Generation { // broken表示掛否。 boolean broken = false; } private Generation generation = new Generation();
在任意時刻只有一個genration實例是真正表明當前這一輪的運行情況,其餘實例都是跑完或者跑掛的。多線程
CyclicBarrier容許咱們經過構造方法設置一個Runnable對象,用來在全部線程都到達barrier時執行。app
parties表示線程數,在parties個線程都調用await方法後,barrier纔算是被經過(tripped)了。
count表示還剩下未到達barrier(未調用await)的線程數量,count會在新的一輪開啓或者當前這一輪跑掛時重置爲parties。
CyclicBarrier中的trip用於實現線程間的等待與喚醒的通訊,而lock則爲CyclicBarrier中的變量(generation和count)提供可見性保證,爲臨界區的操做提供保護。oop
private final ReentrantLock lock = new ReentrantLock(); private final Condition trip = lock.newCondition(); private final int parties; private int count;
下面分析await方法的源碼。this
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); } private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; // 若是已經跑掛了,拋出BrokenBarrierException。 if (g.broken) throw new BrokenBarrierException(); // 檢查中斷標誌位。 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; // 最後一個到達barrier的線程。 if (index == 0) { boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; // 開啓下一輪。 nextGeneration(); return 0; } finally { // 若是action執行時發生了,也會break掉barrier。 if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out /* * 對於其它(不是最後一個)線程,會在trip條件下等待被喚醒。狀況有如下幾類: * 1. 全部線程都到達barrier,併成功執行了barrierAction。 * 2. 有線程執行了breakBarrier方法。 * 3. 線程自己被中斷。 * 4. 超時(若是調用的帶時間限制的await)。 */ for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { /* * g == generation && !g.broken說明此時當前這一輪還沒結束,而且沒有其它線程執行過breakBarrier方法。 * 這種狀況會執行breakBarrier置generation的broken標識爲true並喚醒其它線程,以後繼續拋出InterruptedException。 */ if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { /* * 若是g != generation,此時這一輪已經結束,後面返回index做爲到達barrier的次序; * 若是g.broken說明以前已經有其它線程執行了breakBarrier方法,後面會拋出BrokenBarrierException。 */ Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); // 這一輪已經結束,則返回到達屏障的次序,0表示最後一個,parties-1表示第一個。 if (g != generation) return index; // 判斷是否超時。 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } } // 保證調用時持有鎖。 private void nextGeneration() { // 喚醒其它在trip條件下等待的線程。 trip.signalAll(); // 重置count。 count = parties; // 開啓下一輪。 generation = new Generation(); } // 保證調用時持有鎖。 private void breakBarrier() { generation.broken = true; // 重置count。 count = parties; // 喚醒其它在trip條件下等待的線程。 trip.signalAll(); },
CyclicBarrier其它還提供了例如getParties, isBroken, getNumberWaiting, reset等方法,都比較簡單。
其中除了getParties因爲parties被final修飾不可變,其他方法都會先去得到互斥鎖。線程
/** * 獲取當前這一輪是否已經broken。 */ public boolean isBroken() { final ReentrantLock lock = this.lock; lock.lock(); try { return generation.broken; } finally { lock.unlock(); } } /** * 重置barrier到初始狀態,全部還在等待中的線程最終會拋出BrokenBarrierException。 */ public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // break the current generation nextGeneration(); // start a new generation } finally { lock.unlock(); } } /** * 得到當前在barrier中等待的線程數。 */ public int getNumberWaiting() { final ReentrantLock lock = this.lock; lock.lock(); try { return parties - count; } finally { lock.unlock(); } }
總的來講CyclicBarrier的源碼仍是比較簡潔易懂的,經過鎖和條件,實現了在barrier上同步的功能。
經常會拿CyclicBarrier和CountdownLatch比較,CountdownLatch的的計數器到0就完事了,無法再重置恢復。而CyclicBarrier的計數器能夠經過正常的一輪同步重置,也能夠經過reset方法強制重置。CountdownLatch每一個調用await的線程會被阻塞直到其它線程經過countDown方法將計數器減到0;而CyclicBarrier則是有parties-1個線程調用await會阻塞直到最後一個線程調用await方法。此外CyclicBarrier還能夠設置一個barrierAction,至關於一個hook,這也是CountdownLatch不具備的。code