java.util.Concurrent.CyclicBarrier 源碼

類圖

源碼html

package java.util.concurrent;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;


public class CyclicBarrier {
    //使用ReentrantLock可重入獨佔鎖
    private final ReentrantLock lock = new ReentrantLock();
    //建立一個條件隊列
    private final Condition trip = lock.newCondition();
    //經過構造器傳入的參數.表示總的等待線程的數量
    private final int parties;
    //當屏障正常打開後運行的程序,經過最後一個調用await的線程來執行
    private final Runnable barrierCommand;
    //當前的Generation。每當屏障失效或者開閘以後都會自動替換掉。從而實現重置的功能
    private Generation generation = new Generation();
    //實際仍在等待的線程數.當有一個線程到達屏障點,count值就會減一;當一次新的運算開始後,count的值被重置爲parties
    private int count;

    //內部類
    private static class Generation {
        boolean broken = false;//表示當前的屏障是否被打破
    }

    //建立一個CyclicBarrier實例,parties指定參與相互等待的線程數
    public CyclicBarrier(int parties) {
        this(parties, null);
    }

    //建立一個CyclicBarrier實例,parties指定參與相互等待的線程數
    //barrierAction指定當全部線程到達屏障點以後,首先執行的操做,該操做由最後一個進入屏障點的線程執行。
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

    //返回參與相互等待的線程數
    public int getParties() {
        return parties;
    }

    //該方法被調用時表示當前線程已經到達屏障點,當前線程阻塞進入休眠狀態
    //直到全部線程都到達屏障點,當前線程纔會被喚醒
    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe);
        }
    }

    //該方法被調用時表示當前線程已經到達屏障點,當前線程阻塞進入休眠狀態
    //在timeout指定的超時時間內,等待其餘參與線程到達屏障點
    //若是超出指定的等待時間,則拋出TimeoutException異常,若是該時間小於等於零,則此方法根本不會等待
    public int await(long timeout, TimeUnit unit) throws  InterruptedException,BrokenBarrierException,TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

    //該方法被調用時表示當前線程已經到達屏障點,當前線程阻塞進入休眠狀態
    //在timeout指定的超時時間內,等待其餘參與線程到達屏障點
    //若是超出指定的等待時間,則拋出TimeoutException異常,若是該時間小於等於零,則此方法根本不會等待
    private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;

            if (g.broken)//若是當前Generation是處於打破狀態則傳播這個BrokenBarrierExcption
                throw new BrokenBarrierException();

            if (Thread.interrupted()) {
                breakBarrier();//若是當前線程被中斷則使得當前generation處於打破狀態,重置剩餘count,而且喚醒狀態變量.這時候其餘線程會傳播BrokenBarrierException
                throw new InterruptedException();
            }

            int index = --count;//嘗試下降當前count
            if (index == 0) {
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    //當全部參與的線程都到達屏障點,當即去喚醒全部處於休眠狀態的線程,恢復執行
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)//若是運行command失敗也會致使當前屏障被打破
                        breakBarrier();
                }
            }


            for (;;) {
                try {
                    if (!timed)
                        //讓當前執行的線程阻塞,處於休眠狀態
                        trip.await();
                    else if (nanos > 0L)
                        //讓當前執行的線程阻塞,在超時時間內處於休眠狀態
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

    //喚醒全部處於休眠狀態的線程,恢復執行
    //重置count值爲parties
    //重置中斷狀態爲false
    private void nextGeneration() {
        trip.signalAll();
        count = parties;
        generation = new Generation();
    }

    //喚醒全部處於休眠狀態的線程,恢復執行
    //重置count值爲parties
    //重置中斷狀態爲true
    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }

    //判斷此屏障是否處於中斷狀態。
    //若是由於構造或最後一次重置而致使中斷或超時,從而使一個或多個參與者擺脫此屏障點,或由於異常而致使某個屏障操做失敗,則返回true;不然返回false
    public boolean isBroken() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return generation.broken;
        } finally {
            lock.unlock();
        }
    }

    //將屏障重置爲其初始狀態
    public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //喚醒全部等待的線程繼續執行,並設置屏障中斷狀態爲true
            breakBarrier();
            //喚醒全部等待的線程繼續執行,並設置屏障中斷狀態爲false
            nextGeneration();
        } finally {
            lock.unlock();
        }
    }

    //返回當前在屏障處等待的參與者數目,此方法主要用於調試和斷言
    public int getNumberWaiting() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return parties - count;
        } finally {
            lock.unlock();
        }
    }
}

類 CyclicBarrier

    extends Objectjava

    一個同步輔助類:它容許一組線程互相等待,直到到達某個公共屏障點 (common barrier point)。api

    在涉及一組固定大小的線程的程序中,這些線程必須不時地互相等待,此時 CyclicBarrier 頗有用。函數

    由於該 barrier 在釋放等待線程後能夠重用,因此稱它爲循環 的 barrier。this

    CyclicBarrier 支持一個可選的 Runnable 命令,在一組線程中的最後一個線程到達以後(但在釋放全部線程以前),該命令只在每一個屏障點運行一次。若在繼續全部參與線程以前更新共享狀態,此屏障操做 頗有用。spa

 

構造方法摘要.net

CyclicBarrier(int parties) 
          建立一個新的 CyclicBarrier,它將在給定數量的參與者(線程)處於等待狀態時啓動,但它不會在啓動 barrier 時執行預約義的操做。
CyclicBarrier(int parties, Runnable barrierAction) 
          建立一個新的 CyclicBarrier,它將在給定數量的參與者(線程)處於等待狀態時啓動,並在啓動 barrier 時執行給定的屏障操做,該操做由最後一個進入 barrier 的線程執行。

 

方法摘要線程

 int await() 
          在全部參與者都已經在此 barrier 上調用 await 方法以前,將一直等待。
 int await(long timeout, TimeUnit unit) 
          在全部參與者都已經在此屏障上調用 await 方法以前將一直等待,或者超出了指定的等待時間。
 int getNumberWaiting() 
          返回當前在屏障處等待的參與者數目。
 int getParties() 
          返回要求啓動此 barrier 的參與者數目。
 boolean isBroken() 
          查詢此屏障是否處於損壞狀態。
 void reset() 
          將屏障重置爲其初始狀態。

 

CyclicBarrier:將使count等於parties

public CyclicBarrier(int parties,Runnable barrierAction)

    建立一個新的 CyclicBarrier,它將在給定數量的參與者(線程)處於等待狀態時啓動,並在啓動 barrier 時執行給定的屏障操做,該操做由最後一個進入 barrier 的線程執行。調試

    參數:code

    parties - 在啓動 barrier 前必須調用 await() 的線程數

    barrierAction - 在啓動 barrier 時執行的命令;若是不執行任何操做,則該參數爲 null

    拋出:

    IllegalArgumentException - 若是 parties 小於 1

 

CyclicBarrier:將使count等於parties

public CyclicBarrier(int parties)

    建立一個新的 CyclicBarrier,它將在給定數量的參與者(線程)處於等待狀態時啓動,但它不會在啓動 barrier 時執行預約義的操做。

    參數:

    parties - 在啓動 barrier 前必須調用 await() 的線程數

    拋出:

    IllegalArgumentException - 若是 parties 小於 1

 

getParties:獲得parties的值

public int getParties()

    返回要求啓動此 barrier 的參與者數目。

    返回:

        要求啓動此 barrier 的參與者數目

 

await:count減1,若count等於0,則喚醒等待的線程;不等於0,則掛起等待

public int await() throws InterruptedException, BrokenBarrierException

    在全部 參與者都已經在此 barrier 上調用 await 方法以前,將一直等待。

    若是當前線程不是將到達的最後一個線程,出於調度目的,將禁用它,且在發生如下狀況之一前,該線程將一直處於休眠狀態:

  • 最後一個線程到達;或者
  • 其餘某個線程中斷當前線程;或者
  • 其餘某個線程中斷另外一個等待線程;或者
  • 其餘某個線程在等待 barrier 時超時;或者
  • 其餘某個線程在此 barrier 上調用 reset()

    若是當前線程:

  • 在進入此方法時已經設置了該線程的中斷狀態;或者
  • 在等待時被中斷

    則拋出 InterruptedException,而且清除當前線程的已中斷狀態。

    若是在線程處於等待狀態時 barrier 被 reset(),或者在調用 await 時 barrier 被損壞,抑或任意一個線程正處於等待狀態,則拋出 BrokenBarrierException 異常。

    若是任何線程在等待時被 中斷,則其餘全部等待線程都將拋出 BrokenBarrierException 異常,並將 barrier 置於損壞狀態。

    若是當前線程是最後一個將要到達的線程,而且構造方法中提供了一個非空的屏障操做,則在容許其餘線程繼續運行以前,當前線程將運行該操做。若是在執行屏障操做過程當中發生異常,則該異常將傳播到當前線程中,並將 barrier 置於損壞狀態。

    返回:

        到達的當前線程的索引,其中,索引 getParties() - 1 指示將到達的第一個線程,零指示最後一個到達的線程

    拋出:

    InterruptedException - 若是當前線程在等待時被中斷

    BrokenBarrierException - 若是 另外一個 線程在當前線程等待時被中斷或超時,或者重置了 barrier,或者在調用 await 時 barrier 被損壞,抑或因爲異常而致使屏障操做(若是存在)失敗。

 

await:count減1,若count等於0,則喚醒等待的線程;不等於0,則掛起等待指定時間。在過了指定時間以後,count仍不等於0,則拋出異常。

public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException

    在全部 參與者都已經在此屏障上調用 await 方法以前將一直等待,或者超出了指定的等待時間。

    若是當前線程不是將到達的最後一個線程,出於調度目的,將禁用它,且在發生如下狀況之一前,該線程將一直處於休眠狀態:

  • 最後一個線程到達;
  • 超出指定的超時時間;
  • 其餘某個線程中斷當前線程;
  • 其餘某個線程中斷另外一個等待線程;
  • 其餘某個線程在等待 barrier 時超時;
  • 其餘某個線程在此 barrier 上調用 reset()

    若是當前線程,在如下狀況中的一種時:

  • 在進入此方法時已經設置了該線程的中斷狀態;
  • 在等待時被中斷

    則拋出 InterruptedException,而且清除當前線程的已中斷狀態。

    若是超出指定的等待時間,則拋出 TimeoutException 異常。若是該時間小於等於零,則此方法根本不會等待。

    若是在線程處於等待狀態時 barrier 被 reset(),或者在調用 await 時 barrier 被損壞,抑或任意一個線程正處於等待狀態,則拋出 BrokenBarrierException 異常。

    若是任何線程在等待時被中斷,則其餘全部等待線程都將拋出 BrokenBarrierException,並將屏障置於損壞狀態。

    若是當前線程是最後一個將要到達的線程,而且構造方法中提供了一個非空的屏障操做,則在容許其餘線程繼續運行以前,當前線程將運行該操做。若是在執行屏障操做過程當中發生異常,則該異常將傳播到當前線程中,並將 barrier 置於損壞狀態。

    參數:

    timeout - 等待 barrier 的時間

    unit - 超時參數的時間單位

    返回:

        到達的當前線程的索引,其中,索引 getParties() - 1 指示第一個將要到達的線程,零指示最後一個到達的線程

    拋出:

    InterruptedException - 若是當前線程在等待時被中斷

    TimeoutException - 若是超出了指定的超時時間

    BrokenBarrierException - 若是 另外一個 線程在當前線程等待時被中斷或超時,或者重置了 barrier,或者調用 await 時 barrier 被損壞,抑或因爲異常而致使屏障操做(若是存在)失敗。

 

isBroken:判斷屏障是否損壞

public boolean isBroken()

    查詢此屏障是否處於損壞狀態。

    返回:

        若是屢次調用構造函數或者使用重置函數reset(),在屏障等待的參與者的等待狀態會被中斷或超時,從而拋出異常。由於異常而致使某個屏障操做失敗,則返回 true;不然返回 false

 

reset:屏障狀態重置爲true,count等於parties。

public void reset()

    將屏障重置爲其初始狀態。若是全部參與者目前都在屏障處等待,則它們將返回,同時拋出一個 BrokenBarrierException

    注意:在因爲其餘緣由形成損壞(broken)以後,實行重置可能會變得很複雜;此時須要使用其餘方式從新同步線程,並選擇其中一個線程來執行重置。與爲後續使用建立一個新 barrier 相比,這種方法可能更好一些。

 

getNumberWaiting:獲得當前在屏障處等待的參與數

public int getNumberWaiting()

    返回當前在屏障處等待的參與者數目。此方法主要用於調試和斷言。

    返回:

        當前阻塞在 await() 中的參與者數目。

 

使用實例:

  • 1.新建5個線程,這5個線程達到必定的條件時,它們才繼續日後運行。
package com.thread;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierTest1 extends Thread {
    private static int SIZE = 5;
    private static CyclicBarrier cb;

    public void run() {
        try {
            System.out.println(Thread.currentThread().getName() + " wait for CyclicBarrier.");
            // CyclicBarrier的count減1,若count等於0,則喚醒在屏障處等待的全部線程
            cb.await();
            System.out.println(Thread.currentThread().getName() + " continued.");
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        cb = new CyclicBarrier(SIZE);
        // 新建5個任務
        for (int i = 0; i < SIZE; i++)
            new CyclicBarrierTest1().start();
    }
}

    運行結果:

Thread-1 wait for CyclicBarrier.
Thread-3 wait for CyclicBarrier.
Thread-0 wait for CyclicBarrier.
Thread-2 wait for CyclicBarrier.
Thread-4 wait for CyclicBarrier.
Thread-4 continued.
Thread-3 continued.
Thread-2 continued.
Thread-1 continued.
Thread-0 continued.

 

  • 2.新建5個線程,當這5個線程達到必定的條件時,執行某項任務。
package com.thread;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierTest2 extends Thread {
    private static int SIZE = 5;
    private static CyclicBarrier cb;

    public void run() {
        try {
            System.out.println(Thread.currentThread().getName() + " wait for CyclicBarrier.");
            // CyclicBarrier的count減1,若count等於0,則喚醒在屏障處等待的全部線程
            cb.await();
            System.out.println(Thread.currentThread().getName() + " continued.");
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        cb = new CyclicBarrier(SIZE, new Runnable() {
            public void run() {//當其餘的線程都已達到barrier,先執行當前任務,再讓其餘線程繼續執行
                System.out.println("CyclicBarrier's parties is: " + cb.getParties());
            }
        });
        // 新建5個任務
        for (int i = 0; i < SIZE; i++)
            new CyclicBarrierTest1().start();
    }
}

    運行結果:

Thread-0 wait for CyclicBarrier. Thread-2 wait for CyclicBarrier. Thread-1 wait for CyclicBarrier. Thread-4 wait for CyclicBarrier. Thread-3 wait for CyclicBarrier. CyclicBarrier's parties is: 5 Thread-3 continued. Thread-1 continued. Thread-4 continued. Thread-0 continued. Thread-2 continued.

相關文章
相關標籤/搜索