CyclicBarrier源碼解讀

1. 簡介

JUC中的CyclicBarrier提供了一種多線程間的同步機制,可讓多個線程在barrier等待其它線程到達barrier。正如其名CyclicBarrier含義就是能夠循環使用的屏障。java

2. 源碼解讀

2.1 數據結構

2.1.1 Generation

在CyclicBarrier中用Generation來表明每一輪的Cyclibarrier的運行情況。數據結構

private static class Generation {
    // broken表示掛否。
    boolean broken = false;
}
private Generation generation = new Generation();

在任意時刻只有一個genration實例是真正表明當前這一輪的運行情況,其餘實例都是跑完或者跑掛的。多線程

2.1.2 barrierCommand

CyclicBarrier容許咱們經過構造方法設置一個Runnable對象,用來在全部線程都到達barrier時執行。app

2.1.3 其它

parties表示線程數,在parties個線程都調用await方法後,barrier纔算是被經過(tripped)了。
count表示還剩下未到達barrier(未調用await)的線程數量,count會在新的一輪開啓或者當前這一輪跑掛時重置爲parties。
CyclicBarrier中的trip用於實現線程間的等待與喚醒的通訊,而lock則爲CyclicBarrier中的變量(generation和count)提供可見性保證,爲臨界區的操做提供保護。oop

private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition();

private final int parties;
private int count;

2.2 await方法

下面分析await方法的源碼。this

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
                   BrokenBarrierException,
                   TimeoutException {
            return dowait(true, unit.toNanos(timeout));
}

private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        final Generation g = generation;

        // 若是已經跑掛了,拋出BrokenBarrierException。
        if (g.broken)
            throw new BrokenBarrierException();

        // 檢查中斷標誌位。
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }

        int index = --count;
        // 最後一個到達barrier的線程。
        if (index == 0) {
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                // 開啓下一輪。
                nextGeneration();
                return 0;
            } finally {
                // 若是action執行時發生了,也會break掉barrier。
                if (!ranAction)
                    breakBarrier();
            }
        }

        // loop until tripped, broken, interrupted, or timed out
        /*
         * 對於其它(不是最後一個)線程,會在trip條件下等待被喚醒。狀況有如下幾類:
         * 1. 全部線程都到達barrier,併成功執行了barrierAction。
         * 2. 有線程執行了breakBarrier方法。
         * 3. 線程自己被中斷。
         * 4. 超時(若是調用的帶時間限制的await)。
         */
        for (;;) {
            try {
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                /*
                 * g == generation && !g.broken說明此時當前這一輪還沒結束,而且沒有其它線程執行過breakBarrier方法。
                 * 這種狀況會執行breakBarrier置generation的broken標識爲true並喚醒其它線程,以後繼續拋出InterruptedException。
                 */
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    /*
                     * 若是g != generation,此時這一輪已經結束,後面返回index做爲到達barrier的次序;
                     * 若是g.broken說明以前已經有其它線程執行了breakBarrier方法,後面會拋出BrokenBarrierException。
                     */
                    Thread.currentThread().interrupt();
                }

            }

            if (g.broken)
                throw new BrokenBarrierException();

            // 這一輪已經結束,則返回到達屏障的次序,0表示最後一個,parties-1表示第一個。
            if (g != generation)
                return index;

            // 判斷是否超時。
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

// 保證調用時持有鎖。
private void nextGeneration() {
    // 喚醒其它在trip條件下等待的線程。
    trip.signalAll();
    // 重置count。
    count = parties;
    // 開啓下一輪。
    generation = new Generation();
}

// 保證調用時持有鎖。
private void breakBarrier() {
    generation.broken = true;
    // 重置count。
    count = parties;
    // 喚醒其它在trip條件下等待的線程。
    trip.signalAll();
},

2.3 其它方法

CyclicBarrier其它還提供了例如getParties, isBroken, getNumberWaiting, reset等方法,都比較簡單。
其中除了getParties因爲parties被final修飾不可變,其他方法都會先去得到互斥鎖。線程

/**
 * 獲取當前這一輪是否已經broken。
 */
public boolean isBroken() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return generation.broken;
    } finally {
        lock.unlock();
    }
}

/**
 * 重置barrier到初始狀態,全部還在等待中的線程最終會拋出BrokenBarrierException。
 */
public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        breakBarrier();   // break the current generation
        nextGeneration(); // start a new generation
    } finally {
        lock.unlock();
    }
}


/**
 * 得到當前在barrier中等待的線程數。
 */
public int getNumberWaiting() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return parties - count;
    } finally {
        lock.unlock();
    }
}

3. 總結

總的來講CyclicBarrier的源碼仍是比較簡潔易懂的,經過鎖和條件,實現了在barrier上同步的功能。
經常會拿CyclicBarrier和CountdownLatch比較,CountdownLatch的的計數器到0就完事了,無法再重置恢復。而CyclicBarrier的計數器能夠經過正常的一輪同步重置,也能夠經過reset方法強制重置。CountdownLatch每一個調用await的線程會被阻塞直到其它線程經過countDown方法將計數器減到0;而CyclicBarrier則是有parties-1個線程調用await會阻塞直到最後一個線程調用await方法。此外CyclicBarrier還能夠設置一個barrierAction,至關於一個hook,這也是CountdownLatch不具備的。code

相關文章
相關標籤/搜索