Java併發編程之CyclicBarrier源碼分析

CyclicBarrier介紹

CyclicBarrier是JDK1.5提供容許一組線程等待彼此都達到一個共同的障礙點的同步的工具。CyclicBarrier適用於固定大小線程池,能夠設置一個Runnable任務,當各線程達到共同的障礙點時觸發這個任務。java

例子

//建立線程池
	private static ExecutorService executorService =  Executors.newFixedThreadPool(10);
	//建立屏障
	static CyclicBarrier cb = new CyclicBarrier(10,new Runnable() {
		public void run() {
			System.out.println("到達屏障");
		}
	});
	public static void main(String[] args)  {
	//提交任務
		for (int i = 0; i < 10; i++) {
			executorService.submit(new Runnable() {
				@Override
				public void run() {
					try {
						cb.await();
					} catch (InterruptedException e) {
						e.printStackTrace();
					} catch (BrokenBarrierException e) {
						e.printStackTrace();
					}
				}
			});
		}
	}

如上例子建立線程爲10的固定線程池,建立值爲10的屏障,並設置一個Runnable任務。運行main方法提交任務當任務提交到10的時候到達屏障點,會運行Runnable任務並輸出"到達屏障"。app

特色

  • (1)可重用,正常狀況下可重複使用,中斷狀況下在調用reset()後重復使用
  • (2)可中斷,運行過程當中可中斷執行
  • (3)支持配置Runnable任務,當達到障礙點時可觸發

與CountDownLatch比較

  • 相同點:都是同步屏障,均可以中斷
  • 不一樣點: CyclicBarrier到達屏障後喚醒所有線程;而CountDownLatch到達屏障後是一個一個傳播喚醒。CyclicBarrier支持配置Runnable任務CountDownLatch不支持;CyclicBarrier可重用CountDownLatch不可重用

CyclicBarrier原理分析

CyclicBarrier是利用ReentrantLock和Condition對扣減屏障值操做進行加鎖,加鎖後釋放鎖而後阻塞直到屏障值爲0被喚醒。ide

下面來看下CyclicBarrier的成員變量工具

private final ReentrantLock lock = new ReentrantLock();

對扣減屏障值操做進行加鎖用。oop

private final Condition trip = lock.newCondition();

對扣減屏障值操做後阻塞線程用,源碼分析

private final int parties;

屏障值最大值,不可修改。this

private final Runnable barrierCommand;

各線程到達屏障執行的任務。線程

private Generation generation = new Generation();

Generation類型對象,此類型裏成員只有一個boolean類型的變量,做用是判斷屏障是否被打破。code

private int count;

屏障值,操做扣減用。對象

await()源碼分析

await()源碼以下

public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

直接調用dowait(false, 0L)方法,第一個參數表示是否支持等待超時,第二個參數表示超時時長。await()不須要超時這裏傳了false和0L。由於await()不須要超時TimeoutException這個異常也不可能發生。

下面來看下dowait(false, 0L)的源碼

private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
		//加鎖
        lock.lock();
        try {
            final Generation g = generation;
		//1.若是屏障被打破拋出屏障打破異常
            if (g.broken)
                throw new BrokenBarrierException();
		//2.若是當前線程被中斷拋出中斷異常
            if (Thread.interrupted()) {
		//3.打破屏障
                breakBarrier();
                throw new InterruptedException();
            }
		//4.屏障值減一
            int index = --count;
		//5.若是減一之後屏障值等於0,就要喚醒全部的阻塞線程
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
		//6.是否配置了任務,若是配置了則執行
                    if (command != null)
                        command.run();
                    ranAction = true;
		//7.若是任務正常運行結束,全部的阻塞線程,並重置屏障值
                    nextGeneration();
                    return 0;
                } finally {
		//8.若是任務運行出現異常,則打破屏障
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
			//9.若是不支持等待超時,則調用await()一直等待
                    if (!timed)
                        trip.await();
			//10.若是支持等待超時,則等待nanos時間
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
				//11.若是線程被中斷。若是g == generation不成立說明當前線程已經被喚醒,這裏說明還沒被喚醒的中斷就要打破屏障,不然就標記中斷讓上層處理。
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }
		//11.若是已經打破屏障,拋出BrokenBarrierException異常
                if (g.broken)
                    throw new BrokenBarrierException();
		//12.g != generation成立說明已經被激活,這裏正常結束
                if (g != generation)
                    return index;
		//13.若是超時,則打破屏障拋出超時異常。
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
		//釋放鎖
            lock.unlock();
        }
    }

dowait(false, 0L)主要邏輯就是將屏障值count減1,而後進入等待,直到count等於0到達屏障點被喚醒。 這裏須要注意的是若是一個線程打破屏障,則全部的線程都會被打破拋出BrokenBarrierException異常,而且屏障被打破後若是想繼續使用必須調用reset()方法重置。

下面來看下breakBarrier()方法

private void breakBarrier() {
	//設置打破屏障狀態
        generation.broken = true;
	//將count設置成原來的值
        count = parties;
	//喚醒全部其餘線程
        trip.signalAll();
    }

breakBarrier()就是設置打破屏障狀態爲ture,而後喚醒因此其餘阻塞線程,其餘阻塞喚醒後會拋出BrokenBarrierException異常。

下面來看下nextGeneration()方法

private void nextGeneration() {
        // 喚醒全部其餘線程
        trip.signalAll();
        // 將count設置成原來的值
        count = parties;
		//初始化屏障狀態
        generation = new Generation();
    }

nextGeneration()跟breakBarrier()相似,可是nextGeneration()是從新初始化屏障狀態的,因此調用這個方法後CyclicBarrier可重用。

reset()源碼分析

reset()源碼以下

public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
		//打破屏障
            breakBarrier();   // break the current generation
		//重置CyclicBarrier狀態
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }

reset()很簡單,先打破屏障,終止各線程等待狀態使其餘線程拋出BrokenBarrierException異常,而後重置CyclicBarrier狀態,使其可重用。這裏官方推薦不要重用,從新建立一個CyclicBarrier使用,官方給的緣由也比較含糊。

相關文章
相關標籤/搜索