源碼分析:CyclicBarrier 之循環柵欄

簡介

CyclicBarrier 是一個同步輔助工具,容許一組線程所有等待彼此達到共同屏障點,且等待的線程被釋放後還能夠從新使用,因此叫作Cyclic(循環的)。java

應用場景

好比出去旅行時,導遊須要等待全部的客人到齊後,導遊纔會給你們講解注意事項等數組

官方示例

在JDK的源碼註釋中,提供了一個簡單的示例demo,稍加修改後就能夠運行app

public class Solver {
    AtomicInteger sum = new AtomicInteger(0);
    // 本身新增的一個標識,true表明全部的計算完成了
    volatile boolean done = false;
    final int N;
    final int[][] data;
    final CyclicBarrier barrier;

    class Worker implements Runnable {
        int myRow;
        Worker(int row) {
            myRow = row;
        }
        @Override
        public void run() {
            while (!done()) {
                int rowSum = Arrays.stream(data[myRow]).sum(); // 計算行的和
                System.out.println("processRow(myRow):" + rowSum);
                sum.addAndGet(rowSum);
                try {
                    barrier.await();
                } catch (InterruptedException ex) {
                    return;
                } catch (BrokenBarrierException ex) {
                    return;
                }
            }
        }
    }

    private boolean done(){
        return done;
    }

    public Solver(int[][] matrix) throws InterruptedException{
        data = matrix;
        N = matrix.length;
        Runnable barrierAction = () -> {
            System.out.println("mergeRows(...):"+sum.get()); // 輸出二維數組的總和
            done = true;
        };
        barrier = new CyclicBarrier(N, barrierAction);

        List<Thread> threads = new ArrayList<Thread>(N);
        for (int i = 0; i < N; i++) {
            Thread thread = new Thread(new Worker(i));
            threads.add(thread);
            thread.start();
        }

        // wait until done
        for (Thread thread : threads){
            thread.join();
        }
    }

    public static void main(String[] args) throws InterruptedException{
        int[][] matrix = {{1,2,3},{4,5,6}};
        Solver solver = new Solver(matrix);
    }
}

源碼分析

主要的屬性

/** 防禦柵欄入口的鎖 */
private final ReentrantLock lock = new ReentrantLock();
/** 等待直到跳閘的條件 */
private final Condition trip = lock.newCondition();
/** 構造方法參數,在障礙被釋放以前必須調用等待的線程數 */
private final int parties;
/* 越過柵欄時運行的命令 */
private final Runnable barrierCommand;
/** 當前的一代,控制CyclicBarrier的循環 */
private Generation generation = new Generation();
/** 記錄仍在等待的參與方線程數量,初始值等於parties */
private int count;

主要內部類

/** 代:屏障的每次使用都表示爲一個生成實例 */
private static class Generation {
	  boolean broken = false; // 標識當前的柵欄已破壞或喚醒,jinglingwang.cn
}

構造方法

一共有兩個構造方法,第一個構造方法僅須要傳入一個int值,表示調用等待的線程數;第二個構造方法多了一個runnable接口,當全部的線程越過柵欄時執行的命令,沒有則爲null;ide

public CyclicBarrier(int parties) {
    this(parties, null);
}

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction; // Runnable 命令線程
}

await() 方法

每一個須要在柵欄處等待的線程都須要顯式地調用這個方法。工具

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        // 調用await方法,0:不超時 
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

dowait() 方法

主要的障礙代碼源碼分析

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    // 當前鎖
    final ReentrantLock lock = this.lock;
    // 加鎖 
    lock.lock();
    try {
        // 當前代
        final Generation g = generation;
        // 檢查當前代的狀態,是否要拋出BrokenBarrierException異常
        if (g.broken)
            throw new BrokenBarrierException();

        // 當前線程被中斷了 
        if (Thread.interrupted()) {
            // 屏障被打破
            breakBarrier();
            throw new InterruptedException();
        }
        // count減一
        int index = --count;
        // index等於0,說明最後一個線程到達了屏障處
        if (index == 0) {  // tripped
            boolean ranAction = false; // 標識Runnable 命令線程是否有執行
            try {
                final Runnable command = barrierCommand; // 第二個構造方法的入參,須要運行的命令線程
                if (command != null)
                    command.run(); // 執行命令線程。by:jinglingwang.cn
                ranAction = true;
                nextGeneration(); // 更新重置整個屏障
                return 0;
            } finally {
                if (!ranAction) 
                    // ranAction 沒有被設置成true;被中斷了
                    breakBarrier();
            }
        }

        // 循環直到跳閘,斷開,中斷或超時
        for (;;) {
            try {
                if (!timed) // 沒有設超時時間,直接調用條件鎖的await方法阻塞等待
                    trip.await();
                else if (nanos > 0L) // 有超時時間
                    nanos = trip.awaitNanos(nanos); //調用條件鎖的await方法阻塞等待一段時間
            } catch (InterruptedException ie) { // 捕獲中斷異常
                if (g == generation && ! g.broken) {
                    breakBarrier(); //被中斷,當前代會被標識成已被破壞
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    Thread.currentThread().interrupt();
                }
            }
            // 若是上面代碼沒有異常,理論上只有被喚醒後纔會執行到下面的代碼
            // 再次檢查當前代是否已經被破壞
            if (g.broken)
                throw new BrokenBarrierException();
            // 正常來講,最後一個線程在執行上面的代碼時,會調用nextGeneration,從新生成generation
            // 因此線程被喚醒後,這裏條件會成立
            if (g != generation)
                return index;

            // 超時檢查
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException(); //拋出超時異常
            }
        }
    } finally {
        // 釋放鎖
        lock.unlock();
    }
}
/** 重置屏障,回到初始狀態,說明能夠重複使用*/
private void nextGeneration() {
    // signal completion of last generation
    trip.signalAll();
    // set up next generation
    count = parties;  // 重置等的參與方線程數量計數,回到最初的狀態
    generation = new Generation();
}
private void breakBarrier() {
    // 標識當前的柵欄狀態
    generation.broken = true; 
    count = parties;
    // 條件鎖,喚醒全部等待的線程,jinglingwang.cn
    trip.signalAll();
}

dowait() 方法過程總結:this

  1. 參與方的多個線程執行邏輯代碼後,分別調用await方法
  2. 線程分別拿到當前鎖,最早得到鎖的N-1個線程,調用條件鎖Conditionawait方法,根據前面條件鎖的源碼分析咱們知道,調用條件鎖的await方法會釋放當前鎖,而後再調用Unsafa類底層 park 阻塞線程。
  3. 當最後一個線程調用await方法時(也就是上面的 if (index == 0) 分支邏輯,count減爲0,屏障打破),會執行命令線程(構造方法的第二個入參Runnable),而後調用nextGeneration方法,喚醒全部的條件鎖等待的N-1個線程(喚醒並不必定立刻執行),而後重置計數與當前代,也就是一個新的屏障了,這也就是爲何能夠重複使用的緣由。
  4. 最後一個線程釋放鎖,N-1線程中的線程陸續得到鎖,釋放鎖,完成整個流程

CyclicBarrier 總結

  1. 支持兩個構造參數:線程數和須要執行的命令線程
  2. CyclicBarrier 是基於ReentrantLock和Condition來實現屏障邏輯的
  3. 先搶到鎖的N-1個線程會調用條件鎖的await方法從而被阻塞
  4. 最後一個得到鎖的線程來喚醒以前的N-1個線程以及來調用命令線程的run方法
  5. 最後一個得到鎖的線程會生成一個新的屏障(new Generation()),也就是能夠重複使用的屏障
  6. 若是線程中有一個線程被中斷,整個屏障被破壞後,全部線程均可能拋出BrokenBarrierException異常
  7. 原文首發地址:https://jinglingwang.cn/archives/cyclicbarrier

CyclicBarrier 與CountDownLatch的區別

  1. CyclicBarrier 是基於重入鎖和條件鎖來實現的
  2. CountDownLatch 是基於AQS的同步功能來實現的
  3. CyclicBarrier 不容許0個線程,會拋出異常
  4. CountDownLatch 容許0個線程,雖然沒什麼*用
  5. CyclicBarrier 阻塞的是N-1個線程,須要每一個線程調用await,以後由最後一個線程來喚醒全部的等待線程,這也就是屏障的意思
  6. CountDownLatch 是計數爲N,阻塞的不必定是N個線程(能夠是一個或多個),由線程顯示調用countDown方法來減計數,計數爲0時,喚醒阻塞的一個線程或多個線程
  7. CyclicBarrier 最後一個線程會重置屏障的參數,生成一個新的Generation,能夠重複使用,不須要從新new CyclicBarrier
  8. CountDownLatch 沒有重置計數的地方,計數爲0後不能夠重複使用,須要從新new CountDownLatch 才能夠再次使用
相關文章
相關標籤/搜索