Java併發系列[8]----CyclicBarrier源碼分析

現實生活中咱們常常會遇到這樣的情景,在進行某個活動前須要等待人所有都齊了纔開始。例如吃飯時要等全家人都上座了才動筷子,旅遊時要等所有人都到齊了纔出發,比賽時要等運動員都上場後纔開始。在JUC包中爲咱們提供了一個同步工具類可以很好的模擬這類場景,它就是CyclicBarrier類。利用CyclicBarrier類能夠實現一組線程相互等待,當全部線程都到達某個屏障點後再進行後續的操做。下圖演示了這一過程。app

在CyclicBarrier類的內部有一個計數器,每一個線程在到達屏障點的時候都會調用await方法將本身阻塞,此時計數器會減1,當計數器減爲0的時候全部因調用await方法而被阻塞的線程將被喚醒。這就是實現一組線程相互等待的原理,下面咱們先看看CyclicBarrier有哪些成員變量。dom

 1 //同步操做鎖
 2 private final ReentrantLock lock = new ReentrantLock();
 3 //線程攔截器
 4 private final Condition trip = lock.newCondition();
 5 //每次攔截的線程數
 6 private final int parties;
 7 //換代前執行的任務
 8 private final Runnable barrierCommand;
 9 //表示柵欄的當前代
10 private Generation generation = new Generation();
11 //計數器
12 private int count;
13 
14 //靜態內部類Generation
15 private static class Generation {
16     boolean broken = false;
17 }

上面貼出了CyclicBarrier全部的成員變量,能夠看到CyclicBarrier內部是經過條件隊列trip來對線程進行阻塞的,而且其內部維護了兩個int型的變量parties和count,parties表示每次攔截的線程數,該值在構造時進行賦值。count是內部計數器,它的初始值和parties相同,之後隨着每次await方法的調用而減1,直到減爲0就將全部線程喚醒。CyclicBarrier有一個靜態內部類Generation,該類的對象表明柵欄的當前代,就像玩遊戲時表明的本局遊戲,利用它能夠實現循環等待。barrierCommand表示換代前執行的任務,當count減爲0時表示本局遊戲結束,須要轉到下一局。在轉到下一局遊戲以前會將全部阻塞的線程喚醒,在喚醒全部線程以前你能夠經過指定barrierCommand來執行本身的任務。接下來咱們看看它的構造器。ide

 1 //構造器1
 2 public CyclicBarrier(int parties, Runnable barrierAction) {
 3     if (parties <= 0) throw new IllegalArgumentException();
 4     this.parties = parties;
 5     this.count = parties;
 6     this.barrierCommand = barrierAction;
 7 }
 8 
 9 //構造器2
10 public CyclicBarrier(int parties) {
11     this(parties, null);
12 }

CyclicBarrier有兩個構造器,其中構造器1是它的核心構造器,在這裏你能夠指定本局遊戲的參與者數量(要攔截的線程數)以及本局結束時要執行的任務,還能夠看到計數器count的初始值被設置爲parties。CyclicBarrier類最主要的功能就是使先到達屏障點的線程阻塞並等待後面的線程,其中它提供了兩種等待的方法,分別是定時等待和非定時等待。工具

 1 //非定時等待
 2 public int await() throws InterruptedException, BrokenBarrierException {
 3     try {
 4         return dowait(false, 0L);
 5     } catch (TimeoutException toe) {
 6         throw new Error(toe);
 7     }
 8 }
 9 
10 //定時等待
11 public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {
12     return dowait(true, unit.toNanos(timeout));
13 }

能夠看到不論是定時等待仍是非定時等待,它們都調用了dowait方法,只不過是傳入的參數不一樣而已。下面咱們就來看看dowait方法都作了些什麼。ui

 1 //核心等待方法
 2 private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
 3     final ReentrantLock lock = this.lock;
 4     lock.lock();
 5     try {
 6         final Generation g = generation;
 7         //檢查當前柵欄是否被打翻
 8         if (g.broken) {
 9             throw new BrokenBarrierException();
10         }
11         //檢查當前線程是否被中斷
12         if (Thread.interrupted()) {
13             //若是當前線程被中斷會作如下三件事
14             //1.打翻當前柵欄
15             //2.喚醒攔截的全部線程
16             //3.拋出中斷異常
17             breakBarrier();
18             throw new InterruptedException();
19         }
20        //每次都將計數器的值減1
21        int index = --count;
22        //計數器的值減爲0則需喚醒全部線程並轉換到下一代
23        if (index == 0) {
24            boolean ranAction = false;
25            try {
26                //喚醒全部線程前先執行指定的任務
27                final Runnable command = barrierCommand;
28                if (command != null) {
29                    command.run();
30                }
31                ranAction = true;
32                //喚醒全部線程並轉到下一代
33                nextGeneration();
34                return 0;
35            } finally {
36                //確保在任務未成功執行時能將全部線程喚醒
37                if (!ranAction) {
38                    breakBarrier();
39                }
40            }
41        }
42 
43        //若是計數器不爲0則執行此循環
44        for (;;) {
45            try {
46                //根據傳入的參數來決定是定時等待仍是非定時等待
47                if (!timed) {
48                    trip.await();
49                }else if (nanos > 0L) {
50                    nanos = trip.awaitNanos(nanos);
51                }
52            } catch (InterruptedException ie) {
53                //若當前線程在等待期間被中斷則打翻柵欄喚醒其餘線程
54                if (g == generation && ! g.broken) {
55                    breakBarrier();
56                    throw ie;
57                } else {
58                    //若在捕獲中斷異常前已經完成在柵欄上的等待, 則直接調用中斷操做
59                    Thread.currentThread().interrupt();
60                }
61            }
62            //若是線程由於打翻柵欄操做而被喚醒則拋出異常
63            if (g.broken) {
64                throw new BrokenBarrierException();
65            }
66            //若是線程由於換代操做而被喚醒則返回計數器的值
67            if (g != generation) {
68                return index;
69            }
70            //若是線程由於時間到了而被喚醒則打翻柵欄並拋出異常
71            if (timed && nanos <= 0L) {
72                breakBarrier();
73                throw new TimeoutException();
74            }
75        }
76     } finally {
77         lock.unlock();
78     }
79 }

上面貼出的代碼中註釋都比較詳細,咱們只挑一些重要的來說。能夠看到在dowait方法中每次都將count減1,減完後立馬進行判斷看看是否等於0,若是等於0的話就會先去執行以前指定好的任務,執行完以後再調用nextGeneration方法將柵欄轉到下一代,在該方法中會將全部線程喚醒,將計數器的值從新設爲parties,最後會從新設置柵欄代次,在執行完nextGeneration方法以後就意味着遊戲進入下一局。若是計數器此時還不等於0的話就進入for循環,根據參數來決定是調用trip.awaitNanos(nanos)仍是trip.await()方法,這兩方法對應着定時和非定時等待。若是在等待過程當中當前線程被中斷就會執行breakBarrier方法,該方法叫作打破柵欄,意味着遊戲在中途被掐斷,設置generation的broken狀態爲true並喚醒全部線程。同時這也說明在等待過程當中有一個線程被中斷整盤遊戲就結束,全部以前被阻塞的線程都會被喚醒。線程醒來後會執行下面三個判斷,看看是否由於調用breakBarrier方法而被喚醒,若是是則拋出異常;看看是不是正常的換代操做而被喚醒,若是是則返回計數器的值;看看是否由於超時而被喚醒,若是是的話就調用breakBarrier打破柵欄並拋出異常。這裏還須要注意的是,若是其中有一個線程由於等待超時而退出,那麼整盤遊戲也會結束,其餘線程都會被喚醒。下面貼出nextGeneration方法和breakBarrier方法的具體代碼。this

 1 //切換柵欄到下一代
 2 private void nextGeneration() {
 3     //喚醒條件隊列全部線程
 4     trip.signalAll();
 5     //設置計數器的值爲須要攔截的線程數
 6     count = parties;
 7     //從新設置柵欄代次
 8     generation = new Generation();
 9 }
10 
11 //打翻當前柵欄
12 private void breakBarrier() {
13     //將當前柵欄狀態設置爲打翻
14     generation.broken = true;
15     //設置計數器的值爲須要攔截的線程數
16     count = parties;
17     //喚醒全部線程
18     trip.signalAll();
19 }

上面咱們已經經過源碼將CyclicBarrier的原理基本都講清楚了,下面咱們就經過一個賽馬的例子來深刻掌握它的使用。spa

 1 class Horse implements Runnable {
 2     
 3     private static int counter = 0;
 4     private final int id = counter++;
 5     private int strides = 0;
 6     private static Random rand = new Random(47);
 7     private static CyclicBarrier barrier;
 8     
 9     public Horse(CyclicBarrier b) { barrier = b; }
10     
11     @Override
12     public void run() {
13         try {
14             while(!Thread.interrupted()) {
15                 synchronized(this) {
16                     //賽馬每次隨機跑幾步
17                     strides += rand.nextInt(3);
18                 }
19                 barrier.await();
20             }
21         } catch(Exception e) {
22             e.printStackTrace();
23         }
24     }
25     
26     public String tracks() {
27         StringBuilder s = new StringBuilder();
28         for(int i = 0; i < getStrides(); i++) {
29             s.append("*");
30         }
31         s.append(id);
32         return s.toString();
33     }
34     
35     public synchronized int getStrides() { return strides; }
36     public String toString() { return "Horse " + id + " "; }
37     
38 }
39 
40 public class HorseRace implements Runnable {
41     
42     private static final int FINISH_LINE = 75;
43     private static List<Horse> horses = new ArrayList<Horse>();
44     private static ExecutorService exec = Executors.newCachedThreadPool();
45     
46     @Override
47     public void run() {
48         StringBuilder s = new StringBuilder();
49         //打印賽道邊界
50         for(int i = 0; i < FINISH_LINE; i++) {
51             s.append("=");
52         }
53         System.out.println(s);
54         //打印賽馬軌跡
55         for(Horse horse : horses) {
56             System.out.println(horse.tracks());
57         }
58         //判斷是否結束
59         for(Horse horse : horses) {
60             if(horse.getStrides() >= FINISH_LINE) {
61                 System.out.println(horse + "won!");
62                 exec.shutdownNow();
63                 return;
64             }
65         }
66         //休息指定時間再到下一輪
67         try {
68             TimeUnit.MILLISECONDS.sleep(200);
69         } catch(InterruptedException e) {
70             System.out.println("barrier-action sleep interrupted");
71         }
72     }
73     
74     public static void main(String[] args) {
75         CyclicBarrier barrier = new CyclicBarrier(7, new HorseRace());
76         for(int i = 0; i < 7; i++) {
77             Horse horse = new Horse(barrier);
78             horses.add(horse);
79             exec.execute(horse);
80         }
81     }
82     
83 }

該賽馬程序主要是經過在控制檯不停的打印各賽馬的當前軌跡,以此達到動態顯示的效果。整場比賽有多個輪次,每一輪次各個賽馬都會隨機走上幾步而後調用await方法進行等待,當全部賽馬走完一輪的時候將會執行任務將全部賽馬的當前軌跡打印到控制檯上。這樣每一輪下來各賽馬的軌跡都在不停的增加,當其中某個賽馬的軌跡最早增加到指定的值的時候將會結束整場比賽,該賽馬成爲整場比賽的勝利者!程序的運行結果以下:線程

至此咱們不免會將CyclicBarrier與CountDownLatch進行一番比較。這兩個類均可以實現一組線程在到達某個條件以前進行等待,它們內部都有一個計數器,當計數器的值不斷的減爲0的時候全部阻塞的線程將會被喚醒。有區別的是CyclicBarrier的計數器由本身控制,而CountDownLatch的計數器則由使用者來控制,在CyclicBarrier中線程調用await方法不只會將本身阻塞還會將計數器減1,而在CountDownLatch中線程調用await方法只是將本身阻塞而不會減小計數器的值。另外,CountDownLatch只能攔截一輪,而CyclicBarrier能夠實現循環攔截。通常來講用CyclicBarrier能夠實現CountDownLatch的功能,而反之則不能,例如上面的賽馬程序就只能使用CyclicBarrier來實現。總之,這兩個類的異同點大體如此,至於什麼時候使用CyclicBarrier,什麼時候使用CountDownLatch,還須要讀者本身去拿捏。3d

相關文章
相關標籤/搜索