JDK源碼分析之concurrent包(四) -- CyclicBarrier與CountDownLatch

  上一篇咱們主要經過ExecutorCompletionService與FutureTask類的源碼,對Future模型體系的原理作了瞭解,本篇開始解讀concurrent包中的工具類的源碼。首先來看兩個很是實用的工具類CyclicBarrier與CountDownLatch是如何實現的。安全


 

CyclicBarrier

CyclicBarrier直譯過來是「循環屏障」,做用是可使固定數量的線程都達到某個屏障點(調用await方發處)後,才繼續向下執行。關於用法和實例本文就不作過多說明,如今直接進入CyclicBarrier的源碼。app

首先,來看下CyclicBarrier的幾個標誌性的成員變量:框架

 1 private static class Generation {
 2     boolean broken = false;
 3 }
 4 /** The number of parties */
 5 private final int parties;
 6 /* The command to run when tripped */
 7 private final Runnable barrierCommand;
 8 /** The current generation */
 9 private Generation generation = new Generation();
10 
11 /**
12  * Number of parties still waiting. Counts down from parties to 0
13  * on each generation.  It is reset to parties on each new
14  * generation or when broken.
15  */
16 private int count;

這幾個成員變量有如下說明:工具

  • 說明1:parties是final的,在構造時,傳入的固定線程數,不可變;
  • 說明2:count是計數器,若是有線程到達了屏障點,count就減1;
  • 說明3:直到count=0時,其它線程才能夠向下執行;
  • 說明4:barrierCommand是Runnable任務,在全部線程到達屏障點是,就執行barrierCommand,barrierCommand是構造時傳入的,能夠爲空;
  • 說明5:generation比較複雜,是靜態內部類Generation的實例,一個generation對象表明一代的屏障,就是說,若是generation對象不一樣,就表明進入了下一次的屏障,因此說,這個線程屏障是可循環的(Cyclic)。
  • 說明6:另外,generation的惟一的一個名爲broken的成員變量表明屏障是否被破壞掉,破壞的緣由多是線程中斷、失敗或者超時等。若是被破壞,則全部線程都將拋出異常。

瞭解上述成員變量的說明後,基本上就能夠知道了CyclicBarrier的實現原理,下面咱們來看看代碼是如何寫的。其實實現很簡單,咱們只需經過await()方法就能夠說明:oop

1 public int await() throws InterruptedException, BrokenBarrierException {
2     try {
3         return dowait(false, 0L);
4     } catch (TimeoutException toe) {
5         throw new Error(toe); // cannot happen;
6     }
7 }

await()方法調用了真是的執行方法dowait(),這個方法裏涵蓋了全部乾坤:ui

 1 /**
 2  * Main barrier code, covering the various policies.
 3  */
 4 private int dowait(boolean timed, long nanos)
 5     throws InterruptedException, BrokenBarrierException,
 6            TimeoutException {
 7     final ReentrantLock lock = this.lock;
 8     lock.lock();
 9     try {
10         final Generation g = generation;
11 
12         if (g.broken)
13             throw new BrokenBarrierException();
14 
15         if (Thread.interrupted()) {
16             breakBarrier();
17             throw new InterruptedException();
18         }
19 
20        int index = --count;
21        if (index == 0) {  // tripped
22            boolean ranAction = false;
23            try {
24    final Runnable command = barrierCommand;
25                if (command != null)
26                    command.run();
27                ranAction = true;
28                nextGeneration();
29                return 0;
30            } finally {
31                if (!ranAction)
32                    breakBarrier();
33            }
34        }
35 
36         // loop until tripped, broken, interrupted, or timed out
37         for (;;) {
38             try {
39                 if (!timed)
40                     trip.await();
41                 else if (nanos > 0L)
42                     nanos = trip.awaitNanos(nanos);
43             } catch (InterruptedException ie) {
44                 if (g == generation && ! g.broken) {
45                     breakBarrier();
46                                 throw ie;
47                         } else {
48                         // We're about to finish waiting even if we had not
49                         // been interrupted, so this interrupt is deemed to
50                         // "belong" to subsequent execution.
51                         Thread.currentThread().interrupt();
52                         }
53             }
54 
55             if (g.broken)
56                 throw new BrokenBarrierException();
57 
58             if (g != generation)
59                 return index;
60 
61             if (timed && nanos <= 0L) {
62                 breakBarrier();
63                 throw new TimeoutException();
64             }
65         }
66     } finally {
67         lock.unlock();
68     }
69 }

代碼第20行對應「說明2」。this

代碼第21行對應「說明3」。spa

代碼第26行對應「說明4」。線程

代碼第28行對應「說明5」,nextGeneration()方法中使用generation = new Generation();表示屏障已經換代,並喚醒全部線程。nextGeneration()請自行查看源碼。code

代碼第16行第45行等全部調用breakBarrier()方法處,對應「說明6」,表示屏障被破壞,breakBarrier()方法中將generation.broken = true,喚醒全部線程,拋出異常。

最後,代碼第40行處trip.await(),表示持有trip的線程進入等待被喚醒狀態。

另外,實現中還有一個很重要的點,就是第8行的lock和第67行的unlock,保證同步狀態下執行這段邏輯,也就保證了count與generation.broken的線程安全。

以上就是CyclicBarrier(循環使用的屏障)的源碼實現,是否是比較簡單。

CountDownLatch

CountDownLatch直譯過來是「倒計數鎖」。在線程的countDown()動做將計數減至0時,全部的await()處的線程將能夠繼續向下執行。CountDownLatch的功能與CyclicBarrier有一點點像,但實現方式卻很不一樣,下面直接來觀察CountDownLatch的兩個最重要的方法:

1 public void await() throws InterruptedException {
2     sync.acquireSharedInterruptibly(1);
3 }
4 
5 public void countDown() {
6     sync.releaseShared(1);
7 }

能夠看到,這兩個方法實際是由靜態內部類Sync來實現的。這個Sync咱們在上一篇FutureTask的實現中也見過,那咱們就先簡單介紹下Sync到底是用來作什麼的:

Sync extends AbstractQueuedSynchronizer

這個抽象類AbstractQueuedSynchronizer是一個框架,這個框架使用了「共享」與「獨佔」兩張方式經過一個int值來表示狀態的同步器。類中含有一個先進先出的隊列用來存儲等待的線程。

這個類定義了對int值的原子操做的方法,並強制子類定義int的那種狀態是獲取,哪一種狀態是釋放。子類能夠選擇「共享」和「獨佔」的一種或兩種來實現。

共享方式的實現方式是死循環嘗試獲取對象狀態,相似自旋鎖。

獨佔方式的實現方式是經過實現Condition功能的內部的類,保證獨佔鎖。

而咱們正在解讀的CountDownLatch中的內部類Sync是使用的共享方式,對於AbstractQueuedSynchronizer的解讀本篇不打算詳細說明,由於筆者對「獨佔」方式還沒完全弄通,若是之後有機會再來補充。

接下來就直接觀察CountDownLatch.Sync的源碼:

 1 /**
 2  * Synchronization control For CountDownLatch.
 3  * Uses AQS state to represent count.
 4  */
 5 private static final class Sync extends AbstractQueuedSynchronizer {
 6     private static final long serialVersionUID = 4982264981922014374L;
 7 
 8     Sync(int count) {
 9         setState(count);
10     }
11 
12     int getCount() {
13         return getState();
14     }
15 
16     public int tryAcquireShared(int acquires) {
17         return getState() == 0? 1 : -1;
18     }
19 
20     public boolean tryReleaseShared(int releases) {
21         // Decrement count; signal when transition to zero
22         for (;;) {
23             int c = getState();
24             if (c == 0)
25                 return false;
26             int nextc = c-1;
27             if (compareAndSetState(c, nextc))
28                 return nextc == 0;
29         }
30     }
31 }

結合最初列出的await()和countDown()方法,

經過上述代碼第9行能夠看到,CountDownLatch將構造時傳入的用來倒計數的count做爲狀態值。

經過上述代碼第17行能夠看到,CountDownLatch定義了當count=0時表示能夠共享獲取狀態(在await()方法中調用的sync.acquireSharedInterruptibly(1)會死循環嘗試獲取狀態)。

經過上述代碼第26行能夠看到,CountDownLatch定義了當count-1表示一次共享釋放狀態(在countDown()方法中調用的sync.releaseShared(1)會涉及)。

以上就是CountDownLatch的源碼實現。

總結

CyclicBarrier與CountDownLatch有一點類似之處,可是有很大區別。它們的異同我我的總結以下:

相似功能

  • CyclicBarrier與CountDownLatch都是經過計數到達必定標準後,使得在await()處的線程繼續向下執行。

不一樣之處

  • CyclicBarrier的實現是經過線程的等待喚醒;CountDownLatch的實現是經過死循環訪問狀態的自旋機制
  • CyclicBarrier在線程改變計數後不能向下執行(await()改變計數);CountDownLatch在線程改變計數後繼續向下執行(countDown()改變計數)
  • CyclicBarrier的計數能夠被重置,循環使用;CountDownLatch的計數只能使用一次
相關文章
相關標籤/搜索