CyclicBarrier理解

  CyclicBarrier字面意思是「循環的屏障」。實際效果是多個線程完成後會到達這個屏障,令線程阻塞,直到全部的線程都完成後,再喚醒全部線程。那爲何叫「循環的」呢?由於這個類能夠重用。關於重用,咱們等下能夠再源代碼中看到。經過源代碼就能夠理解重用的含義了。app

private static class Generation {
        boolean broken = false;
    }

    /** The lock for guarding barrier entry */
    private final ReentrantLock lock = new ReentrantLock();
    /** Condition to wait on until tripped */
    private final Condition trip = lock.newCondition();
    /** The number of parties */
    private final int parties;
    /* The command to run when tripped */
    private final Runnable barrierCommand;
    /** The current generation */
    private Generation generation = new Generation();

    /**
     * Number of parties still waiting. Counts down from parties to 0
     * on each generation.  It is reset to parties on each new
     * generation or when broken.
     */
    private int count;

這是CyclicBarrier這個類的全部屬性,在這裏說明一下,Generation這個內部類,表明的就是屏障,他有一個broken屬性,用來標識這個屏障是否被打破。lock,trip就是以前說過的重入鎖和Condition。所以咱們知道CyclicBarrier底層實際上是經過ReentrantLock來實現的。parties用來標識線程的數量,barrierCommand是當全部得線程都到達屏障以後執行的操做。count表示尚未到達屏障的數量。oop

public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }


    public CyclicBarrier(int parties) {
        this(parties, null);
    }

這個類有兩個構造方法,一種是帶有barrierAction參數的。一種是不帶的。這個barrierAction其實就是當多個線程都到達屏障以後繼續執行的操做。this

public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;
            // 若是屏障已經被打破,拋異常
            if (g.broken)
                throw new BrokenBarrierException();
            // 若是當前線程處於中斷狀態,打破屏障
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
            
            int index = --count;
// 若是全部的線程都到達了屏障
if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null)
// 執行barrierCommand command.run(); ranAction
= true;
//這一代結束,生成下一代 nextGeneration();
return 0; } finally { if (!ranAction)
// 若是barrierCommand執行失敗,打破屏障 breakBarrier(); } }
// loop until tripped, broken, interrupted, or timed out for (;;) { try {
// 經過Condition來讓線程阻塞
if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) {
// 若是線程已經處於中斷狀態,若是屏障沒有被打破,打破屏障並拋出異常
if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution.
// 捕獲了中斷異常以後,還要在執行一遍,實際上是爲了保存中斷狀態,讓上層代碼注意到這個中斷 Thread.currentThread().interrupt(); } }
// 若是屏障被打破,拋出異常。
if (g.broken) throw new BrokenBarrierException(); // 若是已經換代。那麼直接返回index if (g != generation) return index; // 超時以後打破屏障而且拋異常 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation();
    } 
   
 private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }

  整個類的方法仍是比較簡單的,經過代碼咱們就基本上已經知道怎麼使用了,先經過構造方法指定線程數量,以及都達到屏障以後要執行的方法,而後線程完成以後調用await()方法,當全部的線程都調用await()方法後,就會喚醒全部線程,而後執行barrierCommand,而且生成下一代。以前說的重用就是由於會生成新的一代,所以能夠重用。spa

相關文章
相關標籤/搜索