通俗易懂的JUC源碼剖析-CyclicBarrier

前言

咱們知道,CountDownLatch的計數器是一次性的,它不能重置。也就是說,當count值變爲0時,再調用await()方法會當即返回,不會阻塞。
本文要說的CyclicBarrier就是一種能夠重置計數器的線程同步工具類。CyclicBarrier字面意思是「迴環屏障」,它可讓一組線程所有到達一個狀態後再所有同時往下執行。之因此叫回環是由於當全部線程執行完畢,並重置CyclicBarrier的狀態後它能夠被重用。而之因此叫屏障是由於當某個線程調用await方法後就會被阻塞,這個阻塞點就稱爲屏障,等其餘全部線程都調用了await方法後,這組線程就會一塊兒衝破屏障,並往下執行。java

使用場景

兩個子任務分別執行本身的工做,等它們都執行完後,主任務彙總子任務的結果,並作一些處理,處理完成後兩個子任務又繼續作其餘事情。示例代碼:編程

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
    private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> {
        try {
            System.out.println("main task merge subtask result begin");
            // simulate merge work
            Thread.sleep(5000);
            System.out.println("main task merge subtask result finished");
        } catch (InterruptedException e) {
            // ignore
        }
    });
    public static void main(String[] args) {
        Thread thread1 = new Thread(() -> {
            try {
                Thread.sleep(4000);
                System.out.println("thread1 finished its work");
                cyclicBarrier.await();
                System.out.println("thread1 continue work");
            } catch (InterruptedException | BrokenBarrierException e) {
                // ignore
            }
        });
        Thread thread2 = new Thread(() -> {
            try {
                Thread.sleep(5000);
                System.out.println("thread2 finished its work");
                cyclicBarrier.await();
                System.out.println("thread2 continue work");
            } catch (InterruptedException | BrokenBarrierException e) {
                // ignore
            }
        });
        thread1.start();
        thread2.start();
    }
}

輸出結果:
image.png多線程

能夠看到,線程1和線程2調用await()時,會被阻塞,等主線程任務完成後,線程1和線程2就會衝破屏障,繼續往下執行。這裏的主線程合併工做是可選的,也就是說能夠直接new CyclicBarric(int parties),這種狀況下就沒有到達屏障後的合併工做,會直接在所有線程到達屏障後同時衝破屏障往下執行。能夠比喻成舉辦同窗聚會的場景。有20我的參加聚會,第1我的到達集合地點後要等其餘人,第2個,第3個,...第19我的也須要等,當最後一我的到的時候,所有的20我的就能夠出發去嗨皮了。併發

上面介紹的是「屏障」的應用場景,再來看個「迴環」的應用場景。app

假設一個任務由階段1,階段2,階段3這三個階段組成,每一個線程都串行的依次執行階段1,2,3。當多個線程執行任務時,必須保證等全部線程都執行完階段1後,才能執行階段2,一樣地,也必須保證全部線程都執行完階段2後,才能執行階段3。示例代碼:工具

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo2 {
    private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
    public static void main(String[] args) {
        Thread thread1 = new Thread(() -> {
            try {
                System.out.println("thread1 step 1");
                cyclicBarrier.await();
                System.out.println("thread1 step 2");
                cyclicBarrier.await();
                System.out.println("thread1 step 3");
            } catch (InterruptedException | BrokenBarrierException e) {
                // ignore
            }
        });
        Thread thread2 = new Thread(() -> {
            try {
                System.out.println("thread2 step 1");
                cyclicBarrier.await();
                System.out.println("thread2 step 2");
                cyclicBarrier.await();
                System.out.println("thread2 step 3");
            } catch (InterruptedException | BrokenBarrierException e) {
                // ignore
            }
        });
        thread1.start();
        thread2.start();
 }
}

輸出結果以下:
image.png
能夠看到,實現了這種同階段等待的效果。oop

實現原理

先來看看重要屬性:this

private static class Generation {
    // 屏障是否被打破
    boolean broken = false;
}
/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();
/** The number of parties */
private final int parties;
/* The command to run when tripped */
private final Runnable barrierCommand;
/** The current generation */
private Generation generation = new Generation();
/**
 * Number of parties still waiting. Counts down from parties to 0 on each generation.
 * It is reset to parties on each new generation or when broken. 
 */
private int count;

能夠看到,CyclicBarrier裏用了獨佔鎖ReentrantLock實現多線程間的計數器同步,parties表示當多少個線程到達屏障後,衝破屏障往下執行,而count表示當前還剩餘多少個線程還未到達屏障,當全部線程都衝破屏障後,它又會在新一輪(new generation)被重置爲parties的值。也就是說,count和Generation是用來實現重置效果的。spa

再看看構造方法的屬性賦值:線程

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

再來看看關鍵方法:
await()

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        // false表示不設置超時
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

dowait()方法代碼以下:

// timed:是否超時等待, nanos:超時時間
private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
 TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        final Generation g = generation;
        if (g.broken)
               throw new BrokenBarrierException();
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }
        int index = --count;
        // 若是index爲0,表示全部線程都已到達了屏障,此時去執行初始化時設定的barrierCommand(若是有的話)
        if (index == 0) {  // tripped
           boolean ranAction = false;
           try {
               final Runnable command = barrierCommand;
               if (command != null)
                   command.run();
               ranAction = true;
               // 喚醒其餘線程,並重置進行下一輪
               nextGeneration();
               // 返回
               return 0;
           } finally {
               if (!ranAction)
                   breakBarrier();
           }
        }
        // 不然須要等其餘線程都達到屏障
        // loop until tripped, broken, interrupted, or timed out
        for (;;) {
             try {
                 // 區分超時等待與不超時等待
                 if (!timed)
                    trip.await();
                 else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
             } catch (InterruptedException ie) {
                   if (g == generation && ! g.broken) {
                       breakBarrier();
                       throw ie;
                   } else {
                    // We're about to finish waiting even if we had not
 // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. 
                       Thread.currentThread().interrupt();
                   }
            }
            if (g.broken)
                throw new BrokenBarrierException();
            // g != generation 說明被喚醒後已重置了輪次,說明全部線程均已到達線程屏障,能夠返回了。
            if (g != generation)
                return index;
            // 等待超時,拋出超時異常    
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

其中,nextGeneration()方法以下:

private void nextGeneration() {
    // signal completion of last generation
    // 喚醒等待在trip條件(即屏障)上的其餘全部線程
    trip.signalAll();
    // set up next generation
    // 重置count的值爲初始值parties
    count = parties;
    // 重置當前輪次
    generation = new Generation();
}

參考資料:《Java併發編程之美》

相關文章
相關標籤/搜索