Java併發編程之CyclicBarrier

簡介

CyclicBarrier字面意思是循環屏障,它能夠實現線程間的計數等待。當線程到達屏障點時會依次進入等待狀態,直到最後一個線程進入屏障點時會喚醒等待的線程繼續運行。ide

CyclicBarrier和CountDownLatch相似,區別在於CountDownLatch只能使用一次,當計數器歸零後,CountDownLatch的await等方法都會直接返回。而CyclicBarrier是能夠重複使用的,當計數器歸零後,計數器和CyclicBarrier狀態都會被重置。this

CyclicBarrier的使用

構造方法介紹

CyclicBarrier(int parties):建立CyclicBarrier,指定計數器值(等待線程數量)。線程

CyclicBarrier(int parties, Runnable barrierAction):建立CyclicBarrier,指定計數器值(等待線程數量)和計數器歸零後(最後一個線程到達)要執行的任務。遊戲

核心方法介紹

await():阻塞當前線程,直到計數器歸零被喚醒或者線程被中斷。ip

await(long timeout, TimeUnit unit):阻塞當前線程,直到計數器歸零被喚醒、線程被中斷或者超時返回。get

CyclicBarrier例子

等待全部玩家準備就緒,遊戲纔開始,每一輪遊戲的開始意味着CyclicBarrier已經重置,能夠開始新一輪的計數。同步

public class Demo {
    public static void main(String[] args) {
        //建立CyclicBarrier並指定計數器值爲5,以及計數器爲0後要執行的任務
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
            System.out.println("---遊戲開始---");
            System.out.println("---五票同意,遊戲結束---");
        });

        Runnable runnable = () -> {
            //重複使用CyclicBarrier5次
            for(int i = 0; i < 5; i++){
                System.out.println(Thread.currentThread().getName() + ":準備就緒");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        };

        Thread thread1 = new Thread(runnable, "一號玩家");
        Thread thread2 = new Thread(runnable, "二號玩家");
        Thread thread3 = new Thread(runnable, "三號玩家");
        Thread thread4 = new Thread(runnable, "四號玩家");
        Thread thread5 = new Thread(runnable, "五號玩家");
        thread1.start();
        thread2.start();
        thread3.start();
        thread4.start();
        thread5.start();
    }
}

/*
 * 循環輸出5次
 * 輸出結果:
 * 一號玩家:準備就緒
 * 三號玩家:準備就緒
 * 二號玩家:準備就緒
 * 五號玩家:準備就緒
 * 四號玩家:準備就緒
 * ---遊戲開始---
 * ---五票同意,遊戲結束---
 * 三號玩家:準備就緒
 * 一號玩家:準備就緒
 * 五號玩家:準備就緒
 * ......
 */
破損的CyclicBarrier

在使用CyclicBarrier中,假設總的等待線程數量爲5,如今其中一個線程被中斷了,被中斷的線程將拋出InterruptedException異常,而其餘4個線程將拋出BrokenBarrierException異常。源碼

BrokenBarrierException異常表示當前的CyclicBarrier已經破損,可能不能等待全部線程到齊了,避免其餘線程永久的等待。it

CyclicBarrier的源碼

CyclicBarrier是基於顯式鎖ReentrantLock來實現的,CyclicBarrier不少方法都使用顯式鎖作了同步處理,await方法的等待喚醒也是經過Condition實現的。io

CyclicBarrier的成員變量:

//顯式鎖
private final ReentrantLock lock = new ReentrantLock();
//用於顯式鎖的Condition
private final Condition trip = lock.newCondition();
//線程數量
private final int parties;
//當全部線程到達屏障點後執行的任務
private final Runnable barrierCommand;
//Generation內部有一個broken變量,用於標識CyclicBarrier是否破損
private Generation generation = new Generation();
//用於遞減的線程數量,在每一輪結束後會被重置爲parties
private int count;

await方法裏是調用的dowait方法,dowait方法源碼:

private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException {
	final ReentrantLock lock = this.lock;
	lock.lock();
	try {
		final Generation g = generation;
		
		//若是CyclicBarrier已破損,則拋出BrokenBarrierException異常
		if (g.broken)
			throw new BrokenBarrierException();

		//若是當前線程已經中斷,則將CyclicBarrier標記爲已破損並拋出InterruptedException異常
		if (Thread.interrupted()) {
			breakBarrier();
			throw new InterruptedException();
		}

		int index = --count;
		//index == 0表示全部線程都到達了屏障點
		if (index == 0) {  // tripped
			boolean ranAction = false;
			try {
				//執行線程到齊後須要執行的任務
				final Runnable command = barrierCommand;
				if (command != null)
					command.run();
				ranAction = true;
				//喚醒全部等待的線程並重置CyclicBarrier
				nextGeneration();
				return 0;
			} finally {
				if (!ranAction)
					breakBarrier();
			}
		}

		//線程沒到齊,阻塞當前線程
		for (;;) {
			try {
				//不帶超時時間的等待
				if (!timed)
					trip.await();
				//帶超時時間的等待	
				else if (nanos > 0L)
					nanos = trip.awaitNanos(nanos);
			} catch (InterruptedException ie) {
				if (g == generation && ! g.broken) {
					breakBarrier();
					throw ie;
				} else {
					// We're about to finish waiting even if we had not
					// been interrupted, so this interrupt is deemed to
					// "belong" to subsequent execution.
					Thread.currentThread().interrupt();
				}
			}

			if (g.broken)
				throw new BrokenBarrierException();

			if (g != generation)
				return index;

			if (timed && nanos <= 0L) {
				breakBarrier();
				throw new TimeoutException();
			}
		}
	} finally {
		lock.unlock();
	}
}

nextGeneration方法:

private void nextGeneration() {
	//喚醒全部等待的線程
	trip.signalAll();
	//重置CyclicBarrier
	count = parties;
	generation = new Generation();
}
相關文章
相關標籤/搜索