上一篇咱們主要經過ExecutorCompletionService與FutureTask類的源碼,對Future模型體系的原理作了瞭解,本篇開始解讀concurrent包中的工具類的源碼。首先來看兩個很是實用的工具類CyclicBarrier與CountDownLatch是如何實現的。安全
CyclicBarrier直譯過來是「循環屏障」,做用是可使固定數量的線程都達到某個屏障點(調用await方發處)後,才繼續向下執行。關於用法和實例本文就不作過多說明,如今直接進入CyclicBarrier的源碼。app
首先,來看下CyclicBarrier的幾個標誌性的成員變量:框架
1 private static class Generation { 2 boolean broken = false; 3 } 4 /** The number of parties */ 5 private final int parties; 6 /* The command to run when tripped */ 7 private final Runnable barrierCommand; 8 /** The current generation */ 9 private Generation generation = new Generation(); 10 11 /** 12 * Number of parties still waiting. Counts down from parties to 0 13 * on each generation. It is reset to parties on each new 14 * generation or when broken. 15 */ 16 private int count;
這幾個成員變量有如下說明:工具
瞭解上述成員變量的說明後,基本上就能夠知道了CyclicBarrier的實現原理,下面咱們來看看代碼是如何寫的。其實實現很簡單,咱們只需經過await()方法就能夠說明:oop
1 public int await() throws InterruptedException, BrokenBarrierException { 2 try { 3 return dowait(false, 0L); 4 } catch (TimeoutException toe) { 5 throw new Error(toe); // cannot happen; 6 } 7 }
await()方法調用了真是的執行方法dowait(),這個方法裏涵蓋了全部乾坤:ui
1 /** 2 * Main barrier code, covering the various policies. 3 */ 4 private int dowait(boolean timed, long nanos) 5 throws InterruptedException, BrokenBarrierException, 6 TimeoutException { 7 final ReentrantLock lock = this.lock; 8 lock.lock(); 9 try { 10 final Generation g = generation; 11 12 if (g.broken) 13 throw new BrokenBarrierException(); 14 15 if (Thread.interrupted()) { 16 breakBarrier(); 17 throw new InterruptedException(); 18 } 19 20 int index = --count; 21 if (index == 0) { // tripped 22 boolean ranAction = false; 23 try { 24 final Runnable command = barrierCommand; 25 if (command != null) 26 command.run(); 27 ranAction = true; 28 nextGeneration(); 29 return 0; 30 } finally { 31 if (!ranAction) 32 breakBarrier(); 33 } 34 } 35 36 // loop until tripped, broken, interrupted, or timed out 37 for (;;) { 38 try { 39 if (!timed) 40 trip.await(); 41 else if (nanos > 0L) 42 nanos = trip.awaitNanos(nanos); 43 } catch (InterruptedException ie) { 44 if (g == generation && ! g.broken) { 45 breakBarrier(); 46 throw ie; 47 } else { 48 // We're about to finish waiting even if we had not 49 // been interrupted, so this interrupt is deemed to 50 // "belong" to subsequent execution. 51 Thread.currentThread().interrupt(); 52 } 53 } 54 55 if (g.broken) 56 throw new BrokenBarrierException(); 57 58 if (g != generation) 59 return index; 60 61 if (timed && nanos <= 0L) { 62 breakBarrier(); 63 throw new TimeoutException(); 64 } 65 } 66 } finally { 67 lock.unlock(); 68 } 69 }
代碼第20行對應「說明2」。this
代碼第21行對應「說明3」。spa
代碼第26行對應「說明4」。線程
代碼第28行對應「說明5」,nextGeneration()方法中使用generation = new Generation();表示屏障已經換代,並喚醒全部線程。nextGeneration()請自行查看源碼。code
代碼第16行、第45行等全部調用breakBarrier()方法處,對應「說明6」,表示屏障被破壞,breakBarrier()方法中將generation.broken = true,喚醒全部線程,拋出異常。
最後,代碼第40行處trip.await(),表示持有trip的線程進入等待被喚醒狀態。
另外,實現中還有一個很重要的點,就是第8行的lock和第67行的unlock,保證同步狀態下執行這段邏輯,也就保證了count與generation.broken的線程安全。
以上就是CyclicBarrier(循環使用的屏障)的源碼實現,是否是比較簡單。
CountDownLatch直譯過來是「倒計數鎖」。在線程的countDown()動做將計數減至0時,全部的await()處的線程將能夠繼續向下執行。CountDownLatch的功能與CyclicBarrier有一點點像,但實現方式卻很不一樣,下面直接來觀察CountDownLatch的兩個最重要的方法:
1 public void await() throws InterruptedException { 2 sync.acquireSharedInterruptibly(1); 3 } 4 5 public void countDown() { 6 sync.releaseShared(1); 7 }
能夠看到,這兩個方法實際是由靜態內部類Sync來實現的。這個Sync咱們在上一篇FutureTask的實現中也見過,那咱們就先簡單介紹下Sync到底是用來作什麼的:
Sync extends AbstractQueuedSynchronizer
這個抽象類AbstractQueuedSynchronizer是一個框架,這個框架使用了「共享」與「獨佔」兩張方式經過一個int值來表示狀態的同步器。類中含有一個先進先出的隊列用來存儲等待的線程。
這個類定義了對int值的原子操做的方法,並強制子類定義int的那種狀態是獲取,哪一種狀態是釋放。子類能夠選擇「共享」和「獨佔」的一種或兩種來實現。
共享方式的實現方式是死循環嘗試獲取對象狀態,相似自旋鎖。
獨佔方式的實現方式是經過實現Condition功能的內部的類,保證獨佔鎖。
而咱們正在解讀的CountDownLatch中的內部類Sync是使用的共享方式,對於AbstractQueuedSynchronizer的解讀本篇不打算詳細說明,由於筆者對「獨佔」方式還沒完全弄通,若是之後有機會再來補充。
接下來就直接觀察CountDownLatch.Sync的源碼:
1 /** 2 * Synchronization control For CountDownLatch. 3 * Uses AQS state to represent count. 4 */ 5 private static final class Sync extends AbstractQueuedSynchronizer { 6 private static final long serialVersionUID = 4982264981922014374L; 7 8 Sync(int count) { 9 setState(count); 10 } 11 12 int getCount() { 13 return getState(); 14 } 15 16 public int tryAcquireShared(int acquires) { 17 return getState() == 0? 1 : -1; 18 } 19 20 public boolean tryReleaseShared(int releases) { 21 // Decrement count; signal when transition to zero 22 for (;;) { 23 int c = getState(); 24 if (c == 0) 25 return false; 26 int nextc = c-1; 27 if (compareAndSetState(c, nextc)) 28 return nextc == 0; 29 } 30 } 31 }
結合最初列出的await()和countDown()方法,
經過上述代碼第9行能夠看到,CountDownLatch將構造時傳入的用來倒計數的count做爲狀態值。
經過上述代碼第17行能夠看到,CountDownLatch定義了當count=0時表示能夠共享獲取狀態(在await()方法中調用的sync.acquireSharedInterruptibly(1)會死循環嘗試獲取狀態)。
經過上述代碼第26行能夠看到,CountDownLatch定義了當count-1表示一次共享釋放狀態(在countDown()方法中調用的sync.releaseShared(1)會涉及)。
以上就是CountDownLatch的源碼實現。
CyclicBarrier與CountDownLatch有一點類似之處,可是有很大區別。它們的異同我我的總結以下:
相似功能
不一樣之處