目錄java
在實際開發中,碰上CPU密集且執行時間很是耗時的任務,一般咱們會選擇將該任務進行分割,以多線程方式同時執行若干個子任務,等這些子任務都執行完後再將所得的結果進行合併。這正是著名的map-reduce思想,不過map-reduce一般被用在分佈式計算的語境下,這裏舉這個例子只是爲了說明對多線程併發執行流程進行控制的重要性,好比某些線程必須等其餘線程執行完後才能開始它的工做。使用jdk中的內置鎖或者重入鎖配合等待通知機制能夠實現這個需求,可是會比較麻煩。由於不論是內置仍是重入鎖,它們關注的重點在於如何協調多線程對共享資源的訪問,而不是協調特定線程的執行次序,完成複雜的併發流程控制。好在JDK在併發包下提供了CountDownLatch,CyclicBarrier,Semaphore等併發工具,可讓咱們站在更高的角度思考並解決這個問題。node
CountDownLatch一般稱之爲閉鎖。它可使一個或一批線程在閉鎖上等待,等到其餘線程執行完相應操做後,閉鎖打開,這些等待的線程才能夠繼續執行。確切的說,閉鎖在內部維護了一個倒計數器。經過該計數器的值來決定閉鎖的狀態,從而決定是否容許等待的線程繼續執行。該計數器的初始值由用戶在建立閉鎖對象時經過傳入的構造參數決定,以下所示服務器
/** * Constructs a {@code CountDownLatch} initialized with the given count. * * @param count the number of times {@link #countDown} must be invoked * before threads can pass through {@link #await} * @throws IllegalArgumentException if {@code count} is negative */ public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
默認計數器初始值不能小於0,不然將拋出異常。多線程
當計數器的值大於0時,該閉鎖處於關閉狀態,調用閉鎖的await()
方法將致使當前線程在閉鎖上等待。併發
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
可是咱們能夠經過調用閉鎖的countDown()
方法來使閉鎖的計數值減小app
public void countDown() { sync.releaseShared(1); }
每調用一次countDown()
方法都會使閉鎖的計數值減小1,因此閉鎖的計數器準確來講是個倒計數器。當計數值減小到0時,阻塞在閉鎖上的線程將被喚醒從而繼續執行。下面以一個相似map-reduce的例子來對CountDownLatch的用法作講解。分佈式
爲了計算一個CPU密集型的大任務,將該任務分割成10個子任務,交由開啓的10個子線程去執行。當全部子任務執行完畢後,主線程再執行後續的工做。任務的執行時間以線程休眠進行模擬,整個流程以日誌方式進行記錄。完整代碼以下ide
/** * @author: takumiCX * @create: 2018-09-17 **/ class CountDownLatchTest { static CountDownLatch countDownLatch; public static void main(String[] args) throws InterruptedException { int count=10; //初始化計數器值爲10 countDownLatch=new CountDownLatch(count); //開啓10個子線程執行子任務 for(int i=0;i<count;i++){ Thread thread = new Thread(new CountDownThread(countDownLatch,i)); thread.start(); } //主線程等待,直到全部子任務完成 countDownLatch.await(); //模擬主線程執行後續工做 TimeUnit.SECONDS.sleep(1); System.out.println("任務執行完畢!"); } private static class CountDownThread implements Runnable{ CountDownLatch countDownLatch; //子任務序號 int taskNum; public CountDownThread(CountDownLatch countDownLatch, int taskNum) { this.countDownLatch = countDownLatch; this.taskNum = taskNum; } @Override public void run() { try { //模擬子任務的執行 TimeUnit.MILLISECONDS.sleep(30); } catch (InterruptedException e) { e.printStackTrace(); } //任務執行完畢,則調用countDown方法使計數器值減小1 countDownLatch.countDown(); System.out.println("子任務:"+taskNum+" 執行完畢!"); } } }
結果以下所示
工具
能夠看到主線程在全部子任務執行完前必須在閉鎖上等待。當最後一個子任務完成後,它將被喚醒,從而能夠繼續以後的工做。oop
CountDownLatch底層也是經過AQS實現的。和ReentrentLock以獨佔的方式獲取和釋放同步狀態不一樣,CountDownLatch是以共享的方式獲取和釋放同步狀態的。獨佔式和共享式的區別主要有如下幾點:
同步狀態依舊使用AQS中的state值進行表示,在CountDownLatch的語境下表示計數器的值,且只有在state=0時線程才能成功獲取到同步狀態,儘管有些奇怪,不過考慮到CountDownLatch中的計數器是個倒計數器,這麼設定也並不是不可理解。爲了更好的理解CountDownLatch的源碼,從釋放同步狀態的方法countDown()
開始講起
public void countDown() { sync.releaseShared(1); }
正確找到sync的實現類後跟進源碼
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { //嘗試在共享模式下釋放同步狀態 doReleaseShared(); return true; } return false; }
tryReleaseShared()嘗試在共享模式下釋放同步狀態,該方法是在AQS中定義的鉤子方法,必須由AQS的實現類本身實現,方法內容以下
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); //獲取同步狀態值 if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) //以CAS方式更新同步狀態值 return nextc == 0; } }
使用死循環+CAS方式將計數值state減小1。僅當更新操做成功且state值被更新爲0時返回true,表示在共享模式下釋放同步狀態成功,接着便會繼續執行doReleaseShared()方法,方法內容以下
private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); //喚醒後繼結點中的線程 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
該方法主要完成的工做是喚醒頭結點以後的結點中的線程。那麼其餘在同步隊列中等待的線程使如何被喚醒的?別急,咱們能夠在await()方法中找到答案。
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
找到sync正確的實現類後跟進源碼
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
tryAcquireShared()是在共享模式下嘗試獲取同步狀態,
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
當同步狀態值state=0時返回1,表示獲取同步狀態成功,不然返回-1表示獲取同步狀態失敗。獲取同步狀態失敗的線程顯然應該加入同步等待隊列並在隊列中等待,這部分邏輯咱們在解讀ReentrentLock的源碼時應該已經看過了,不過在共享模式下細節方面有些不一樣
/** * Acquires in shared interruptible mode. * @param arg the acquire argument */ private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED);//構造結點並加入同步隊列 boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg);//當前驅結點是頭結點時獲取同步狀態 if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
第一步天然是構造結點並加入同步隊列尾部,這部分邏輯在addWaiter()
方法中,注意結點類型爲共享類型。以後的邏輯和獨佔模式相似,檢查前驅結點是不是隊列的頭結點,是則嘗試獲取同步狀態,成功則將當前結點設置爲隊列頭結點,失敗則阻塞當前線程並等待喚醒並從新執行以上流程。不過在共享模式下,當前線程在成功獲取同步狀態並設置自身爲頭結點後,還必須作些額外的工做:當後繼結點爲共享類型時,喚醒後繼結點中的線程。
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); //設置當前結點爲隊列頭結點 /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); //喚醒後繼結點的線程 } }
至此,CountDownLatch的原理就搞明白了,它是以AQS的共享模式來實現複雜的併發流程控制的。當其內部的計數器不爲0時,調用其await方法將致使線程加入同步隊列並阻塞。當調用countDown方法使計數器的值爲0時,會喚醒隊列中第一個等待的線程,以後由該線程喚醒後面的線程,以此類推,直到阻塞在閉鎖上的線程都被成功喚醒。
CyclicBarrier一般稱爲循環屏障。它和CountDownLatch很類似,均可以使線程先等待而後再執行。不過CountDownLatch是使一批線程等待另外一批線程執行完後再執行;而CyclicBarrier只是使等待的線程達到必定數目後再讓它們繼續執行。故而CyclicBarrier內部也有一個計數器,計數器的初始值在建立對象時經過構造參數指定,以下所示
public CyclicBarrier(int parties) { this(parties, null); }
每調用一次await()方法都將使阻塞的線程數+1,只有阻塞的線程數達到設定值時屏障纔會打開,容許阻塞的全部線程繼續執行。除此以外,CyclicBarrier還有幾點須要注意的地方:
1.CyclicBarrier的計數器能夠重置而CountDownLatch不行,這意味着CyclicBarrier實例能夠被重複使用而CountDownLatch只能被使用一次。而這也是循環屏障循環二字的語義所在。
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
一旦用戶在建立CyclicBarrier對象時設置了barrierAction參數,則在阻塞線程數達到設定值屏障打開前,會調用barrierAction的run()方法完成用戶自定義的操做。
仍是以多線程分割大任務併發執行的例子來進行講解,不過此次狀況要稍微複雜些。線程在執行完分配給它的子任務後不能當即退出,必須等待全部任務都完成後再執行釋放資源的操做。而主線程在全部子任務都執行完畢後也要執行特定的操做,且該操做在線程釋放資源前。全部操做都以打印日誌的方式進行模擬。代碼以下:
/** * @author: takumiCX * @create: 2018-09-18 **/ public class CyclicBarrierTest { static CyclicBarrier cyclicBarrier; public static void main(String[] args) { int count = 10; //當全部子任務都執行完畢時,barrierAction的run方法會被調用 cyclicBarrier = new CyclicBarrier(count, () -> System.out.println("執行barrierAction操做!")); //開啓多個線程執行子任務 for(int i=0;i<count;i++){ new Thread(new CyclicBarrierThread(cyclicBarrier,i)).start(); } } private static class CyclicBarrierThread implements Runnable { public CyclicBarrier cyclicBarrier; //任務序號 public int taskNum; public CyclicBarrierThread(CyclicBarrier cyclicBarrier, int taskNum) { this.cyclicBarrier = cyclicBarrier; this.taskNum = taskNum; } @Override public void run() { //執行子任務 System.out.println("子任務:"+taskNum+" 執行完畢!"); try { //等待全部子任務執行完成 cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } //釋放資源 System.out.println("線程:"+taskNum+" 釋放資源!"); } } }
開啓10個線程執行子任務,每一個線程執行完子任務後在CyclicBarrier上等待。等到全部子任務完成後,用戶設置自定義的barrierAction操做即被執行,以後屏障正式打開,阻塞的全部線程將完成釋放資源的操做。
結果以下圖所示
CyclicBarrier內部使用ReentrentLock來實現線程同步,而經過Condition來實現線程的阻塞和喚醒。當計數器值爲0時,首先會執行用戶自定義的barrierAction操做。
int index = --count; //計數器值 if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; //用戶自定義的barrierAction if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } }
以後再進行阻塞線程的喚醒,以及將計數器重置爲初始值。這部分代碼在nextGeneration()中
private void nextGeneration() { // signal completion of last generation trip.signalAll(); //喚醒全部的阻塞線程 // set up next generation count = parties; //計數器重置爲初始值 generation = new Generation(); }
若是學過操做系統的話,對信號量Semaphore應該不陌生。操做系統中的信號量是這麼一個機構:它維護了必定數目的資源,進程向其請求資源將致使Semaphore中資源數量減小,當資源數量小於0時將會致使當前線程阻塞;而進程釋放資源將致使Semaphore中資源數量增長,當資源數量大於0時會喚醒阻塞的進程。操做系統中使用信號量能夠輕鬆實現進程間的互斥和同步。java在語言層面也支持信號量機制,其工做原理和操做系統中的信號量相似,能夠經過調用
public void acquire(int permits)
或者public boolean tryAcquire(int permits)
請求信號量中的許可(資源)。不事後者在信號量中許可數量不夠時不會阻塞而是當即返回一個失敗結果。固然,也能夠經過public void release()
向信號量歸還資源。
信號量在建立時必須爲其指定能夠用的許可總數,以下所示
public Semaphore(int permits) { sync = new NonfairSync(permits); }
當建立信號量時指定許可總數爲1,則能夠起到獨佔鎖的做用,不過它是不容許線程重入的。同時,它還有公平和非公平模式之分,經過在建立對象時傳入參數進行指定
public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
和ReentrentLock同樣默認是非公平模式。
假設服務器上有一種資源能夠同時供多個用戶進行訪問,出於系統穩定性考慮須要限制同時訪問的用戶的數量,整個過程能夠模擬以下
/** * @author: takumiCX * @create: 2018-09-24 **/ public class SemaphoreTest { public static void main(String[] args) throws InterruptedException { //信號量控制併發數最多爲3 Semaphore semaphore = new Semaphore(3); //同時開啓10個線程 for(int i=1;i<=10;i++){ new Thread(new ReaderThread(semaphore,i)).start(); } } static class ReaderThread implements Runnable{ Semaphore semaphore; //用戶序號 int userIndex; public ReaderThread(Semaphore semaphore, int userIndex) { this.semaphore = semaphore; this.userIndex = userIndex; } @Override public void run() { try { //獲取許可 semaphore.acquire(1); //模擬訪問資源所用的時間 TimeUnit.SECONDS.sleep(1); System.out.println("用戶 "+userIndex+" 訪問資源,時間:"+System.currentTimeMillis()); //釋放許可 semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
使用信號量限制同時併發訪問的線程數爲3,而後開啓10個線程模擬用戶訪問。獲得的結果以下
從結果上能夠清晰的看到,每次最多容許3個用戶同時訪問資源,信號量很好的起到了限流做用。
和CountDownLatch相似,Semaphore底層也是經過AQS的共享模式實現的。它和CountDownLatch的區別只是對於AQS共享模式的鉤子方法tryAcquireShared()
和tryReleaseShared()
的實現不一樣。
以Semaphore的非公平模式爲例,其嘗試釋放同步狀態的邏輯以下
final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); //獲取可用許可數 int remaining = available - acquires; //計算被消耗後剩餘的許可數 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
首先會獲取當前可用的許可值(state),根據請求數量計算出剩餘的許可值,若剩餘許可數小於0則直接返回剩餘值表示該操做失敗;不然以CAS方式將state值更新爲計算後的剩餘值,並返回一個大於等於0的數表示成功。經過該方法的返回值能夠知道嘗試獲取同步狀態的操做是否成功,返回值小於0表示沒有足夠的許可,線程將會加入同步隊列並等待;返回值大於等於0則表示許可足夠,則整個獲取許可的流程就結束了。
tryReleaseShared()的實現也很簡單,
protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); //獲取當前許可數 int next = current + releases; //計算釋放後的許可總數 if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) //cas更新許可值 return true; } }
計算釋放後的許可總數並以CAS方式對state值進行更新。以後將返回上層繼續執行
doReleaseShared()
喚醒頭結點後面結點中的線程,被喚醒的線程將執行tryAcquireShared()
從新嘗試獲取同步狀態,獲取失敗則繼續阻塞,獲取成功將設置當前結點爲隊列頭結點並繼續喚醒後續結點中的線程。