CyclicBarrier源碼探究 (JDK 1.8)

CyclicBarrier也叫回環柵欄,可以實現讓一組線程運行到柵欄處並阻塞,等到全部線程都到達柵欄時再一塊兒執行的功能。「迴環」意味着CyclicBarrier能夠屢次重複使用,相比於CountDownLatch只能使用一次,CyclicBarrier能夠節省許多資源,而且還能夠在構造器中傳入任務,當柵欄條件知足時執行這個任務。CyclicBarrier是使用了ReentrantLock,主要方法在執行時都會加鎖,所以併發性能不是很高。安全

1.相關字段

//重入鎖,CyclicBarrier內部經過重入鎖實現線程安全
    private final ReentrantLock lock = new ReentrantLock();
    //線程阻塞時的等待條件
    private final Condition trip = lock.newCondition();
    //須要等待的線程數
    private final int parties;
    //柵欄打開以後首先執行的任務
    private final Runnable barrierCommand;
    //記錄當前的分代標記
    private Generation generation = new Generation();
    //當前還須要等待多少個線程運行到柵欄位置
    private int count;

須要注意的是generation字段,用於標記柵欄當前處在哪一代。當知足必定的條件時(例如調用了reset方法,或者柵欄打開等),柵欄狀態會切換到下一代,實際就是new一個新的Generation對象,這是CyclicBarrier的內部類,代碼很是簡單,以下:併發

private static class Generation {
        boolean broken = false;   //標記柵欄是否被破壞
    }

實際使用的過程當中,會利用generation字段判斷當前是否在同一個分代,而使用broker字段判斷柵欄是否被破壞。app

2.構造函數

CyclicBarrier有兩個重載的構造函數,構造函數只是對上述的相關字段進行初始化,以下:函數

public CyclicBarrier(int parties) {
        this(parties, null);
    }

    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

3.核心方法

  • await
    await是開發時最經常使用到的方法了,同CountDownLatch同樣,CyclicBarrier也提供了兩個await方法,一個不帶參數,一個帶有超時參數,其內部只是簡單調用了一下dowait方法:
public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

    public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

接下來看看相當重要的dowait方法:oop

private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        //加劇入鎖
        lock.lock();
        try {
            //首先獲取年齡代信息
            final Generation g = generation;
            //若是柵欄狀態被破壞,拋出異常,例如先啓動的線程調用了breakBarrier方法,後啓動的線程就可以看到g.broker=true
            if (g.broken)
                throw new BrokenBarrierException();
            //檢測線程的中斷狀態,若是線程設置了中斷狀態,則經過breakBarrier設置柵欄爲已破壞狀態,並喚醒其餘線程
            //若是這裏可以檢測到中斷狀態,那隻多是在await方法外部設置的
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
            //每調用一次await,就將須要等待的線程數減1
            int index = --count;
            //index=0表示這是最後一個到達的線程,由該線程執行下面的邏輯
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    //若是在構造器中傳入了第二個任務參數,就在放開柵欄前先執行這個任務
                    if (command != null)
                        command.run();
                    ranAction = true;
                    //正常結束,須要喚醒阻塞的線程,並換代
                    nextGeneration();
                    return 0;
                } finally {
                    //try代碼塊若是正常執行,ranAction就必定等於true,而try代碼塊惟一可能發生異常的地方就是command.run(),
                    //所以這裏爲了保證在任務執行失敗時,將柵欄標記爲已破壞,喚醒阻塞線程
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
                    //沒有設置超時標記,就加入等待隊列
                    if (!timed)
                        trip.await();
                    //設置了超時標記,但目前尚未超時,則繼續等待
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    //若是線程等待的過程當中被中斷,會執行到這裏
                    //g == generation表示當前還在同一個年齡分代中,!g.broker表示當前柵欄狀態沒有被破壞
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        //上面的條件不知足,說明:1)g!=generation,說明線程執行到這裏時已經換代了
                        //2)沒有換代,可是柵欄被破壞了
                        //不管哪一種狀況,都只是簡單地設置一下當前線程的中斷狀態
                        Thread.currentThread().interrupt();
                    }
                }
                //柵欄被破壞,拋出異常
                //注意,在breakBarrier方法中會喚醒全部等待條件的線程,這些線程會執行到這裏,判斷柵欄已經被破壞,都會拋出異常
                if (g.broken)
                    throw new BrokenBarrierException();
                //距離上一次設置g變量的值已通過去很長時間了,在執行過程當中generation可能已經發生改變,
                //當前線程仍是前幾代的,不須要再循環阻塞了,直接返回上一代剩餘須要等待的線程數
                //注意:代碼中breakBarrier方法和nextGeneration方法都會喚醒阻塞的線程,可是breakBarrier在上一個判斷就被攔截了,
                //所以走到這裏的有三種狀況:
                //a)最後一個線程正常執行,柵欄打開致使其餘線程被喚醒;不屬於當前代的線程直接返回,
                //屬於當前代的則可能由於沒到柵欄開放條件要繼續循環阻塞
                //b)柵欄被重置(調用了reset方法),此時g!=negeration,全都直接返回
                //c)線程等待超時了,不屬於當前代的返回就能夠了,屬於當前代的則要設置generation.broken = true
                if (g != generation)
                    return index;
                //若是線程等待超時,標記柵欄爲破壞狀態並拋出異常,若是還沒超時,則自旋後又從新阻塞
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            //別忘了解鎖
            lock.unlock();
        }
    }

dowait的方法邏輯是:每個調用await方法的線程都會將計數count1,最後一個線程將count減爲0時,順帶還要執行barrierCommand指定的任務,並將generation切換到下一代,固然,最重要的仍是要喚醒以前在柵欄處阻塞的線程。因爲trip對應的Condition對象沒有任何地方會修改,所以trip.signalAll()會喚醒全部在該條件上等待的線程,若是線程在等待的過程當中,其餘線程將generation更新到下一代,就會出現被喚醒的線程中有部分還屬於以前那一代的狀況。
接下來將會對dowait用到的一些方法進行簡單介紹。性能

  • breakBarrier
    dowait方法有四個地方調用了breakBarrier,從名字能夠看出,該方法會將generation.broken設置爲true,除此以外,還會還原count的值,而且喚醒全部被阻塞的線程:
private void breakBarrier() {
        generation.broken = true;
        count = parties;
        //喚醒全部的阻塞線程
        trip.signalAll();
    }

縱觀CyclicBarrier源碼,generation.broken統一在breakBarrier方法中被設置爲true,而一旦將generation.broken設置爲true以後,代碼中檢查到這個狀態以後都會拋出異常,柵欄就沒辦法再使用了(能夠手動調用reset進行重置),而源碼中會在如下幾種狀況調用breakBarrier方法:
1) 當前線程被中斷
2)經過構造器傳入的任務執行失敗
3) 條件等待時被中斷
4) 線程等待超時
5) 顯式調用reset方法this

  • nextGeneration
private void nextGeneration() {
        // 喚醒全部的阻塞線程
        trip.signalAll();
        // 開啓下一代
        count = parties;
        generation = new Generation();
    }
  • reset
    reset方法主要是結束這一代,並切換到下一代
public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }

介紹到這裏,整個CyclicBarrier已經差很少介紹完了,可是內部的流程遠遠沒有這麼簡單,由於很大一部分邏輯封裝在AbstractQueuedSynchronizer中,這個類定義了阻塞的線程如何加入等待隊列,又如何被喚醒,所以若是想要深刻了解線程等待的邏輯,還須要仔細研究AbstractQueuedSynchronizer才行。本文不會對這部份內容進行介紹,後面有時間的話將會專門對其進行介紹。線程

相關文章
相關標籤/搜索