CyclicBarrier源碼分析

CyclicBarrier源碼分析

CyclicBarrier的做用是讓一組線程互相等待至某個狀態後並行執行(相對外部來講是並行,其實內部仍是串行)java

基本的使用方法是建立一個CyclicBarrier實例,而且指定parties的個數,而後線程依次調用CyclicBarrier的await()方法讓本身進入等待狀態,當最後一個線程進入await()方法時,將會喚醒全部正在等待的線程,並行執行。app

CyclicBarrier雖然也是同步器,可是並不是直接經過AQS來進行實現的,而是藉助了ReentrantLock以及Condition來進行實現。源碼分析


CyclicBarrier的結構

public class CyclicBarrier {
    
    /**
     * 存在一個Generation靜態內部類
     */ 
    private static class Generation {
        boolean broken = false; // 標識CyclicBarrier是否被破壞
    }

    private final ReentrantLock lock = new ReentrantLock();
    
    private final Condition trip = lock.newCondition(); // 與ReentrantLock綁定的Condition實例

    private final int parties; // 用於記錄一共有多少個線程須要等待

    private final Runnable barrierCommand; // 由最後一個進入await()方法的線程進行調用

    private Generation generation = new Generation();

    private int count; // 用於記錄還須要多少個線程進行等待

    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

    public CyclicBarrier(int parties) {
        this(parties, null);
    }

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

    public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }
   
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        // ......
    }
    
    // 其餘省略
}

能夠看到CyclicBarrier存在全局的lock、trip、parties、count、barrierCommand以及generation屬性,其中parties屬性用於記錄一共有多少個線程須要等待,而count用於記錄還須要多少個線程進行等待。this

同時CyclicBarrier中定義了一個Generation靜態內部類,該內部類只有一個broken全局屬性,用於標識CyclicBarrier是否被破壞,默認爲false。線程

同時CyclicBarrier的構造方法會初始化全局的parties、count以及barrierCommand屬性(CyclicBarrier初始化後,count的數量等於parties的數量)code


await()方法

因爲當建立CyclicBarrier實例以後,線程須要依次調用CyclicBarrier的await()方法讓本身進入等待狀態,所以從await()方法開始入手。ip

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}
public int await(long timeout, TimeUnit unit)
    throws InterruptedException,
           BrokenBarrierException,
           TimeoutException {
    return dowait(true, unit.toNanos(timeout));
}

await()方法存在兩個重載,區別是一個支持超時,一個不支持超時,最終都會調用dowait()方法(使用timed參數表示是否有超時限制,若是timed參數爲true則須要傳遞具體的超時時間)同步


dowait()方法

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    // 獲取全局的ReentrantLock實例,並進行加鎖           
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 獲取全局的Generation實例,若是Generation中的broken屬性爲true則表示CyclicBarrier已經被破壞,則直接拋出異常(默認是false)
        final Generation g = generation;

        if (g.broken)
            throw new BrokenBarrierException();

        // 若是線程已經被設置了中斷標識,則調用breakBarrier()方法,破壞CyclicBarrier
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException(); // 拋出異常
        }

        // index屬性用於記錄還須要多少個線程進行等待
        int index = --count;
        // 若是index等於0,表示當前線程是最後一個進入await()方法的線程,若是barrierCommand不爲空,那麼執行barrierCommand的run()方法,而後調用nextGeneration()方法,喚醒在指定Condition實例中等待的全部線程,並重置CyclicBarrier,而後線程直接返回,作本身的事情
        if (index == 0) {  // 最後一個線程走這個邏輯
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                nextGeneration();
                return 0;
            } finally {
                // 若是在執行barrierCommand的run()方法時拋出異常,那麼ranAction標識爲false,那麼須要調用breakBarrier()方法,破壞CyclicBarrier
                if (!ranAction)
                    breakBarrier();
            }
        }

        // 若是非最後一個線程那麼將會往下執行
        
        // 循環
        for (;;) {
            try {
                // 若是沒有超時限制,那麼直接調用Condition實例的await()方法,讓線程在指定的Condition實例中進行等待,並釋放掉它擁有的鎖
                // 若是有超時限制,那麼調用Condition實例的awaitNanos()方法,至多讓線程在指定的Condition實例中等待指定的時間,該方法返回線程被喚醒後剩餘的毫秒數(超時返回小於等於0),並釋放掉它擁有的鎖
                if (!timed)
                    trip.await();
                else if (nanos > 0L) 
                    nanos = trip.awaitNanos(nanos); 
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    Thread.currentThread().interrupt();
                }
            }

            // 當線程被喚醒後將會串行執行如下的邏輯
            
            // 若是發現CyclicBarrier被破壞了,那麼就拋出異常
            if (g.broken)
                throw new BrokenBarrierException();

            // 正常狀況下,當調用了nextGeneration()方法以後,generation引用就指向一個新的Generation實例,所以g!=generation,那麼線程直接返回,作本身的事情
            if (g != generation)
                return index;

            // 若是線程在Condition實例等待的過程當中因爲達到了超時時間而被喚醒了,那麼將會調用breakBarrier()方法,破壞CyclicBarrier
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException(); // 拋出異常
            }
        }
    } finally {
        lock.unlock(); // 解鎖
    }
}

當線程進入dowait()方法後,須要獲取鎖,若是當前線程並不是最後一個進入await()方法的線程,那麼將會在指定的Condition實例中進行等待,而後釋放掉它擁有的鎖,若是當前線程是最後一個進入await()方法的線程(index==0,表示還須要0個線程進行等待),若是barrierCommand不爲空,那麼將會執行barrierCommand的run()方法,最後調用nextGeneration()方法。源碼

若是在執行dowait()方法的過程當中,線程已經被設置了中斷標識,或者最後一個線程在執行barrierCommand的run()方法時拋出異常,或者在指定Condition實例等待的線程因爲達到了超時時間而被喚醒,那麼都會調用breakBarrier()方法。it


nextGeneration()方法

private void nextGeneration() {
    // 喚醒在指定Condition實例中等待的全部線程
    trip.signalAll();
    // 將count的數量設置成parties
    count = parties;
    // 將generation引用指向一個新的Generation實例
    generation = new Generation();
}

nextGeneration()方法用於指向下一個Generation,該方法將會喚醒在指定Condition實例中等待的全部線程,而後將count的數量設置成parties,恢復成CyclicBarrier初始化後的狀態,最後將generation引用指向一個新的Generation實例。


breakBarrier()方法

private void breakBarrier() {
    // 將Generation實例的broken屬性設置爲true,表示CyclicBarrier已經被破壞
    generation.broken = true;
    // 將count的數量設置回parties
    count = parties;
    // 喚醒在指定Condition實例中等待的全部線程
    trip.signalAll();
}

breakBarrier()方法用於破壞CyclicBarrier,將Generation實例的broken屬性設置爲true,表示CyclicBarrier已經被破壞,而後將count的數量設置成parties,最後喚醒在指定Condition實例中等待的全部線程。


reset()方法

public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        breakBarrier(); // 感受是多餘的
        nextGeneration(); // 指向下一個Generation
    } finally {
        lock.unlock();
    }
}

reset()方法用於重置CyclicBarrier,其根本是將generation引用指向一個新的Generation實例。


流程總結

1.當建立了一個CyclicBarrier實例以後,線程須要依次調用CyclicBarrier的await()方法,讓本身進入等待狀態。

2.await()方法又會調用dowait()方法,當線程進入dowait()方法後,須要獲取鎖,若是當前線程並不是最後一個進入await()方法的線程,那麼將會在指定的Condition實例中進行等待,而後釋放掉它擁有的鎖,若是當前線程是最後一個進入await()方法的線程(index==0,表示還須要0個線程進行等待),若是barrierCommand不爲空,那麼將會執行barrierCommand的run()方法,最後調用nextGeneration()方法。

3.nextGeneration()方法用於指向下一個Generation,該方法將會喚醒在指定Condition實例中等待的全部線程,而後將count的數量設置成parties,恢復成CyclicBarrier初始化後的狀態,最後將generation引用指向一個新的Generation實例,當最後一個線程執行完nextGeneration()方法後,將會直接返回,作本身的事情,最後釋放掉它擁有的鎖。

4.當被喚醒的線程依次獲取到鎖後,將會繼續往下執行,若是判斷到generation引用已經指向一個新的Generation實例,那麼直接返回,作本身的事情,最後釋放掉它擁有鎖。

5.若是在執行dowait()方法的過程當中,線程已經被設置了中斷標識,或者最後一個線程在執行barrierCommand的run()方法時拋出異常,或者在指定Condition實例等待的線程因爲達到了超時時間而被喚醒,那麼都會調用breakBarrier()方法,破壞CyclicBarrier,將Generation實例的broken屬性設置爲true,表示CyclicBarrier已經被破壞,而後將count的數量設置成parties,最後喚醒在指定Condition實例中等待的全部線程。

6.當被喚醒的線程依次獲取到鎖後,將會繼續往下執行,若是判斷到Generation實例的broken屬性被設置了true,也就是CyclicBarrier已經被破壞,那麼將會直接拋出異常,最後釋放掉它擁有的鎖。

7.當CyclicBarrier被破壞後是不可以進行復用的,由於Generation的broken屬性已經被設置成true,所以須要先調用一次reset()方法進行重置。


FAQ

爲何說CyclicBarrier是能夠複用的?

由於當最後一個線程進入await()方法,將會調用nextGeneration()方法,該方法除了喚醒在指定Condition中等待的線程以外,還會將count的數量設置成parties,恢復成CyclicBarrier初始化後的狀態,同時將generation引用指向一個新的Generation實例,所以CyclicBarrier是能夠複用的,同時須要注意的是,若是CyclicBarrier已經被破壞,那麼須要先調用一次reset()方法以後纔可以進行復用。

相關文章
相關標籤/搜索