死磕 java同步系列之CyclicBarrier源碼解析——有圖有真相

問題

(1)CyclicBarrier是什麼?java

(2)CyclicBarrier具備什麼特性?app

(3)CyclicBarrier與CountDownLatch的對比?源碼分析

簡介

CyclicBarrier,迴環柵欄,它會阻塞一組線程直到這些線程同時達到某個條件才繼續執行。它與CountDownLatch很相似,但又不一樣,CountDownLatch須要調用countDown()方法觸發事件,而CyclicBarrier不須要,它就像一個柵欄同樣,當一組線程都到達了柵欄處才繼續往下走。學習

使用方法

public class CyclicBarrierTest {
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);

        for (int i = 0; i < 3; i++) {
            new Thread(()->{
                System.out.println("before");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println("after");
            }).start();
        }
    }
}

這段方法很簡單,使用一個CyclicBarrier使得三個線程保持同步,當三個線程同時到達cyclicBarrier.await();處你們再一塊兒往下運行。this

源碼分析

主要內部類

private static class Generation {
    boolean broken = false;
}

Generation,中文翻譯爲代,一代人的代,用於控制CyclicBarrier的循環使用。線程

好比,上面示例中的三個線程完成後進入下一代,繼續等待三個線程達到柵欄處再一塊兒執行,而CountDownLatch則作不到這一點,CountDownLatch是一次性的,沒法重置其次數。翻譯

主要屬性

// 重入鎖
private final ReentrantLock lock = new ReentrantLock();
// 條件鎖,名稱爲trip,絆倒的意思,多是指線程來了先絆倒,等達到必定數量了再喚醒
private final Condition trip = lock.newCondition();
// 須要等待的線程數量
private final int parties;
// 當喚醒的時候執行的命令
private final Runnable barrierCommand;
// 代
private Generation generation = new Generation();
// 當前這一代還須要等待的線程數
private int count;

經過屬性能夠看到,CyclicBarrier內部是經過重入鎖的條件鎖來實現的,那麼你能夠腦補一下這個場景嗎?code

彤哥來腦補一下:假如初始時count = parties = 3,當第一個線程到達柵欄處,count減1,而後把它加入到Condition的隊列中,第二個線程到達柵欄處也是如此,第三個線程到達柵欄處,count減爲0,調用Condition的signalAll()通知另外兩個線程,而後把它們加入到AQS的隊列中,等待當前線程運行完畢,調用lock.unlock()的時候依次從AQS的隊列中喚醒一個線程繼續運行,也就是說實際上三個線程先依次(排隊)到達柵欄處,再依次往下運行。隊列

以上純屬彤哥腦補的內容,真實狀況是否是如此呢,且日後看。事件

構造方法

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    // 初始化parties
    this.parties = parties;
    // 初始化count等於parties
    this.count = parties;
    // 初始化都到達柵欄處執行的命令
    this.barrierCommand = barrierAction;
}

public CyclicBarrier(int parties) {
    this(parties, null);
}

構造方法須要傳入一個parties變量,也就是須要等待的線程數。

await()方法

每一個須要在柵欄處等待的線程都須要顯式地調用await()方法等待其它線程的到來。

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        // 調用dowait方法,不須要超時
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    // 加鎖
    lock.lock();
    try {
        // 當前代
        final Generation g = generation;
        
        // 檢查
        if (g.broken)
            throw new BrokenBarrierException();

        // 中斷檢查
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }
        
        // count的值減1
        int index = --count;
        // 若是數量減到0了,走這段邏輯(最後一個線程走這裏)
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                // 若是初始化的時候傳了命令,這裏執行
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                // 調用下一代方法
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // 這個循環只有非最後一個線程能夠走到
        for (;;) {
            try {
                if (!timed)
                    // 調用condition的await()方法
                    trip.await();
                else if (nanos > 0L)
                    // 超時等待方法
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    Thread.currentThread().interrupt();
                }
            }
            
            // 檢查
            if (g.broken)
                throw new BrokenBarrierException();

            // 正常來講這裏確定不相等
            // 由於上面打破柵欄的時候調用nextGeneration()方法時generation的引用已經變化了
            if (g != generation)
                return index;
            
            // 超時檢查
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}
private void nextGeneration() {
    // 調用condition的signalAll()將其隊列中的等待者所有轉移到AQS的隊列中
    trip.signalAll();
    // 重置count
    count = parties;
    // 進入下一代
    generation = new Generation();
}

dowait()方法裏的整個邏輯分紅兩部分:

(1)最後一個線程走上面的邏輯,當count減爲0的時候,打破柵欄,它調用nextGeneration()方法通知條件隊列中的等待線程轉移到AQS的隊列中等待被喚醒,並進入下一代。

(2)非最後一個線程走下面的for循環邏輯,這些線程會阻塞在condition的await()方法處,它們會加入到條件隊列中,等待被通知,當它們喚醒的時候已經更新換「代」了,這時候返回。

圖解

CyclicBarrier

學習過前面的章節,看這個圖很簡單了,看不懂的同窗還須要把推薦的內容好好看看哦^^

總結

(1)CyclicBarrier會使一組線程阻塞在await()處,當最後一個線程到達時喚醒(只是從條件隊列轉移到AQS隊列中)前面的線程你們再繼續往下走;

(2)CyclicBarrier不是直接使用AQS實現的一個同步器;

(3)CyclicBarrier基於ReentrantLock及其Condition實現整個同步邏輯;

彩蛋

CyclicBarrier與CountDownLatch的異同?

(1)二者都能實現阻塞一組線程等待被喚醒;

(2)前者是最後一個線程到達時自動喚醒;

(3)後者是經過顯式地調用countDown()實現的;

(4)前者是經過重入鎖及其條件鎖實現的,後者是直接基於AQS實現的;

(5)前者具備「代」的概念,能夠重複使用,後者只能使用一次;

(6)前者只能實現多個線程到達柵欄處一塊兒運行;

(7)後者不只能夠實現多個線程等待一個線程條件成立,還能實現一個線程等待多個線程條件成立(詳見CountDownLatch那章使用案例);

推薦閱讀

一、死磕 java同步系列之開篇

二、死磕 java魔法類之Unsafe解析

三、死磕 java同步系列之JMM(Java Memory Model)

四、死磕 java同步系列之volatile解析

五、死磕 java同步系列之synchronized解析

六、死磕 java同步系列之本身動手寫一個鎖Lock

七、死磕 java同步系列之AQS起篇

八、死磕 java同步系列之ReentrantLock源碼解析(一)——公平鎖、非公平鎖

九、死磕 java同步系列之ReentrantLock源碼解析(二)——條件鎖

十、死磕 java同步系列之ReentrantLock VS synchronized

十一、死磕 java同步系列之ReentrantReadWriteLock源碼解析

十二、死磕 java同步系列之Semaphore源碼解析

1三、死磕 java同步系列之CountDownLatch源碼解析

1四、死磕 java同步系列之AQS終篇

1五、死磕 java同步系列之StampedLock源碼解析


歡迎關注個人公衆號「彤哥讀源碼」,查看更多源碼系列文章, 與彤哥一塊兒暢遊源碼的海洋。

qrcode

相關文章
相關標籤/搜索