線程間的同步與通訊(7)——CyclicBarrier源碼分析

前言

系列文章目錄java

上一篇 咱們學習了基於AQS共享鎖實現的CountDownLatch,本篇咱們來看看另外一個和它比較像的併發工具CyclicBarrier。segmentfault

與CountdownLatch的區別

將count值遞減的線程

在CountDownLatch中,執行countDown方法的線程和執行await方法的線程不是一類線程。例如,線程M,N須要等待線程A,B,C,D,E執行完成後才能繼續往下執行,則線程A,B,C,D,E執行完成後都將調用countDown方法,使得最後count變爲了0,最後一個將count值減爲0的線程調用的tryReleaseShared方法會成功返回true,從而調用doReleaseShared()喚醒全部在sync queue中等待共享鎖的線程,這裏對應的就是M,N。因此,在CountDownLatch中,執行countDown的線程不會被掛起,調用await方法的線程會阻塞等待共享鎖。數組

而在CyclicBarrier中,將count值遞減的線程和執行await方法的線程是一類線程,它們在執行完遞減count的操做後,若是count值不爲0,則可能同時被掛起。例如,線程A,B,C,D,E須要互相等待,保證全部線程都執行完了以後才能一塊兒經過。 併發

這就好像同一個班級出去春遊,到一個景區後先自由活動,一段時間後在指定的地點集合,而後去下一個景點。這裏這個指定集合的地點就是CyclicBarrier中的barrier,每個人到達後都會執行await方法先將須要繼續等待的人數(count)減1,而後(在條件隊列上)掛起等待,當最後一我的到了以後,發現人已經到到齊了,則他負責執行barrierCommand(例如向班主任彙報人已經到齊),接着就喚醒全部還在等待中的線程,開啓新一代。app

是否能重複使用

CountDownLatch是一次性的,當count值被減爲0後,不會被重置;
而CyclicBarrier在線程經過柵欄後,會開啓新的一代,count值會被重置。函數

鎖的類別與所使用到的隊列

CountDownLatch使用的是共享鎖,count值不爲0時,線程在sync queue中等待,自始至終只牽涉到sync queue,因爲使用共享鎖,喚醒操做沒必要等待鎖釋放後再進行,喚醒操做很迅速。
CyclicBarrier使用的是獨佔鎖,count值不爲0時,線程進入condition queue中等待,當count值降爲0後,將被signalAll()方法喚醒到sync queue中去,而後挨個去爭鎖(由於是獨佔鎖),在前驅節點釋放鎖之後,才能繼續喚醒後繼節點。工具

核心屬性

private static class Generation {
    boolean broken = false;
}

/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();
/** The number of parties */
private final int parties;
/* The command to run when tripped */
private final Runnable barrierCommand;
/** The current generation */
private Generation generation = new Generation();

/**
 * Number of parties still waiting. Counts down from parties to 0
 * on each generation.  It is reset to parties on each new
 * generation or when broken.
 */
private int count;

CyclicBarrier的核心屬性共有6個,咱們將它分爲三組。學習

第一組:this

private final int parties;
private int count;

注意,這兩個屬性都是用來表徵線程的數量,parties表明了參與線程的總數,即須要一同經過barrier的線程數,它是final類型的,由構造函數初始化,在類被建立後就一直不變了;count屬性和CountDownLatch中的count同樣,表明還須要等待的線程數,初始值爲parties,每當一個線程到來就減一,若是該值爲0,則說明全部的線程都到齊了,你們能夠一塊兒經過barrier了。線程

第二組:

private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition();
private Generation generation = new Generation();

這一組表明了CyclicBarrier的基礎實現,即CyclicBarrier是基於獨佔鎖ReentrantLock和條件隊列實現的,而不是共享鎖,全部相互等待的線程都會在一樣的條件隊列trip上掛起,被喚醒後將會被添加到sync queue中去爭取獨佔鎖lock,得到鎖的線程將繼續往下執行。

這裏還有一個Generation對象,從定義上能夠看出,它只有一個boolean類型的broken屬性,關於這個Generation,咱們下面分析源碼的時候再詳細講。

第三組:

private final Runnable barrierCommand;

這是一個Runnable對象,表明了一個任務。當全部線程都到齊後,在它們一同經過barrier以前,就會執行這個對象的run方法,所以,它有點相似於一個鉤子方法。固然這個參數不是必須的,若是線程在經過barrier以前沒有什麼特別須要處理的事情,該值能夠爲null。

構造函數

CyclicBarrier有兩個構造函數:

public CyclicBarrier(int parties) {
    this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

其中,第一個構造函數本質上也是調用了第二個,即若是不傳入Runnable對象,則barrierCommand的值默認爲null。

咱們能夠看出,構造函數就是初始化了partiescountbarrierCommand 三個變量。

輔助方法

要理解CyclicBarrier,首先咱們須要弄明白它的幾個輔助方法。

首先須要理解的是「代」(Generation)的概念,因爲CyclicBarrier是可重複使用的,咱們把每個新的barrier稱爲一「代」。這個怎麼理解呢,打個比方:一個過山車有10個座位,景區經常須要等夠10我的了,纔會去開動過山車。因而咱們經常在欄杆(barrier)外面等,等湊夠了10我的,工做人員就把欄杆打開,讓10我的經過;而後再將欄杆歸位,後面新來的人仍是要在欄杆外等待。這裏,前面已經經過的人就是一「代」,後面再繼續等待的一波人就是另一「代」,欄杆每打開關閉一次,就產生新一的「代」。

在CyclicBarrier,開啓新的一代使用的是nextGeneration方法:

nextGeneration()

private void nextGeneration() {
    // 喚醒當前這一代中全部等待在條件隊列裏的線程
    trip.signalAll();
    // 恢復count值,開啓新的一代
    count = parties;
    generation = new Generation();
}

該方法用於開啓新的「一代」,一般是被最後一個調用await方法的線程調用。在該方法中,咱們的主要工做就是喚醒當前這一代中全部等待在條件隊列裏的線程,將count的值恢復爲parties,以及開啓新的一代。

breakBarrier()

breakBarrier即打破現有的柵欄,讓全部線程經過:

private void breakBarrier() {
    // 標記broken狀態
    generation.broken = true;
    // 恢復count值
    count = parties;
    // 喚醒當前這一代中全部等待在條件隊列裏的線程(由於柵欄已經打破了)
    trip.signalAll();
}

這個breakBarrier怎麼理解呢,繼續拿上面過上車的例子打比方,有時候某個時間段,景區的人比較少,等待過山車的人數湊不夠10我的,眼看後面遲遲沒有人再來,這個時候有的工做人員也會打開柵欄,讓正在等待的人進來坐過山車。這裏工做人員的行爲就是breakBarrier,因爲並非在湊夠10我的的狀況下就開啓了柵欄,咱們就把這一代的broken狀態標記爲true

reset()

reset方法用於將barrier恢復成初始的狀態,它的內部就是簡單地調用了breakBarrier方法和nextGeneration方法。

public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        breakBarrier();   // break the current generation
        nextGeneration(); // start a new generation
    } finally {
        lock.unlock();
    }
}

這裏要注意的是,若是在咱們執行該方法時有線程正等待在barrier上,則它將當即返回並拋出BrokenBarrierException異常。
另一點值得注意的是,該方法執行前須要先得到鎖。

await

看完前面的輔助方法以後,接下來咱們就來看CyclicBarrier最核心的await方法,能夠說整個CyclicBarrier最關鍵的只有它了。它也是一個集「countDown」和「阻塞等待」於一體的方法。

await方法有兩種版本,一種帶超時機制,一種不帶,然而從源碼上看,它們最終調用的都是帶超時機制的dowait方法:

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {
    return dowait(true, unit.toNanos(timeout));
}

其中,dowait方法定義以下,它就是整個CyclicBarrier的核心了,咱們直接在代碼中以註釋的形式分析:

private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
    final ReentrantLock lock = this.lock;
    // 全部執行await方法的線程必須是已經持有了鎖,因此這裏必須先獲取鎖
    lock.lock();
    try {
        final Generation g = generation;

        // 前面說過,調用breakBarrier會將當前「代」的broken屬性設爲true
        // 若是一個正在await的線程發現barrier已經被break了,則將直接拋出BrokenBarrierException異常
        if (g.broken)
            throw new BrokenBarrierException();

        // 若是當前線程被中斷了,則先將柵欄打破,再拋出InterruptedException
        // 這麼作的緣由是,因此等待在barrier的線程都是相互等待的,若是其中一個被中斷了,那其餘的就不用等了。
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }

        // 當前線程已經來到了柵欄前,先將等待的線程數減一
        int index = --count;
        
        // 若是等待的線程數爲0了,說明全部的parties都到齊了
        // 則能夠喚醒全部等待的線程,讓你們一塊兒經過柵欄,並重置柵欄
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    // 若是建立CyclicBarrier時傳入了barrierCommand
                    // 說明經過柵欄前有一些額外的工做要作
                    command.run(); 
                ranAction = true;
                // 喚醒全部線程,開啓新一代
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // 若是count數不爲0,就將當前線程掛起,直到全部的線程到齊,或者超時,或者中斷髮生
        for (;;) {
            try {
                // 若是沒有設定超時機制,則直接調用condition的await方法
                if (!timed)
                    trip.await();  // 當前線程在這裏被掛起
                else if (nanos > 0L)
                    // 若是設了超時,則等待指定的時間
                    nanos = trip.awaitNanos(nanos); // 當前線程在這裏被掛起,超時時間到了就會自動喚醒
            } catch (InterruptedException ie) {
                // 執行到這裏說明線程被中斷了
                // 若是線程被中斷時還處於當前這一「代」,而且當前這一代尚未被broken,則先打破柵欄
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // 注意來到這裏有兩種狀況
                    // 一種是g!=generation,說明新的一代已經產生了,因此咱們沒有必要處理這個中斷,只要再自我中斷一下就好,交給後續的人處理
                    // 一種是g.broken = true, 說明中斷前柵欄已經被打破了,既然中斷髮生時柵欄已經被打破了,也沒有必要再處理這個中斷了
                    Thread.currentThread().interrupt();
                }
            }

            // 注意,執行到這裏是對應於線程從await狀態被喚醒了
            
            // 這裏先檢測broken狀態,能使broken狀態變爲true的,只有breakBarrier()方法,到這裏對應的場景是
            // 1. 其餘執行await方法的線程在掛起前就被中斷了
            // 2. 其餘執行await方法的線程在還處於等待中時被中斷了
            // 2. 最後一個到達的線程在執行barrierCommand的時候發生了錯誤
            // 4. reset()方法被調用
            if (g.broken)
                throw new BrokenBarrierException();

            // 若是線程被喚醒時,新一代已經被開啓了,說明一切正常,直接返回
            if (g != generation)
                return index;

            // 若是是由於超時時間到了被喚醒,則打破柵欄,返回TimeoutException
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

這個await方法雖然包攬了countDown、阻塞線程、喚醒線程、執行barrierCommand任務、開啓新一代,處理中斷等諸多任務,可是代碼自己仍是比較好懂的。

值得注意的是,await方法是有返回值的,表明了線程到達的順序,第一個到達的線程的index爲parties - 1,最後一個到達的線程的index爲0

工具方法

除了重頭戲await方法和它的一些輔助方法,CyclicBarrier還爲咱們提供了一些工具方法:

(1)獲取參與的線程數parties

public int getParties() {
    return parties;
}

parties 在構造完成後就不會被修改了,所以對它的訪問不須要加鎖。

(2)獲取正在等待中的線程數

public int getNumberWaiting() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return parties - count;
    } finally {
        lock.unlock();
    }
}

注意,這裏加了鎖,由於count方法可能會被多個線程同時修改。

(3)判斷當前barrier是否已經broken

public boolean isBroken() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return generation.broken;
    } finally {
        lock.unlock();
    }
}

注意,這裏一樣要加鎖,由於broken屬性可能被多個線程同時訪問或修改。

實戰

爲了學以至用,接下來咱們就來看看怎麼使用這個併發工具,java官方文檔爲咱們提供了一個使用的範例:

class Solver {
    final int N;
    final float[][] data;
    final CyclicBarrier barrier;

    class Worker implements Runnable {
        int myRow;

        Worker(int row) {
            myRow = row;
        }

        public void run() {
            while (!done()) {
                processRow(myRow);

                try {
                    barrier.await();
                } catch (InterruptedException ex) {
                    return;
                } catch (BrokenBarrierException ex) {
                    return;
                }
            }
        }
    }

    public Solver(float[][] matrix) {
        data = matrix;
        N = matrix.length;
        Runnable barrierAction =
                new Runnable() {
                    public void run() {
                        mergeRows(...);
                    }
                };
        barrier = new CyclicBarrier(N, barrierAction);

        List<Thread> threads = new ArrayList<Thread>(N);
        for (int i = 0; i < N; i++) {
            Thread thread = new Thread(new Worker(i));
            threads.add(thread);
            thread.start();
        }

        // wait until done
        for (Thread thread : threads)
            thread.join();
    }
}

在這個例子中,咱們爲傳入的matrix數組的每一行都建立了一個線程進行處理,使用了CyclicBarrier來保證只有全部的線程都處理完以後,纔會調用mergeRows(...)方法來合併結果。只要有一行沒有處理完,全部的線程都會在barrier.await()處等待,最後一個執行完的線程將會負責喚醒全部等待的線程。

總結

  • CyclicBarrier實現了相似CountDownLatch的邏輯,它可使得一組線程之間相互等待,直到全部的線程都到齊了以後再繼續往下執行。
  • CyclicBarrier基於條件隊列和獨佔鎖來實現,而非共享鎖。
  • CyclicBarrier可重複使用,在全部線程都到齊了一塊兒經過後,將會開啓新的一代。
  • CyclicBarrier使用了「all-or-none breakage model」,全部互相等待的線程,要麼一塊兒經過barrier,要麼一個都不要經過,若是有一個線程由於中斷,失敗或者超時而過早的離開了barrier,則該barrier會被broken掉,全部等待在該barrier上的線程都會拋出BrokenBarrierException(或者InterruptedException)。

(完)

系列文章目錄

相關文章
相關標籤/搜索