目錄java
在編寫多線程程序時,不免須要對併發流程進行控制,Thread類有join()和yield()等方法,JUC提供了更爲靈活的併發工具類,下面就學習這些工具類的用法以及實現。api
latch意思是門閂,countdown指從上往下數,CountDownLatch容許一個或多個線程等待其餘任務線程完成操做,就像它的字面意思:從大往小數,數到某個值(0)的時候打開門閂。下面是CountDownLatch的api:多線程
//構造器 public CountDownLatch(int count); //調用await()方法的線程會進入等待狀態,它會等待直到count值爲0才繼續執行 public void await(); //和await()相似,只不過等待必定的時間後count值還沒變爲0的話就會繼續執行 public boolean await(long timeout, TimeUnit unit); //計數器減一 public void countDown()
能夠看到經過構造器構造一個計數器,經過調用countDown方法計數減少,await在計數器大於0時線程處於等待狀態,經過下面例子能夠學會CountDownLatch的用法:併發
public class LatchTest { public static void main(String[] args) { //兩個線程,計數器傳入2 final CountDownLatch latch = new CountDownLatch(2); //這兩個線程執行了latch.countDown(),計數器歸0,主線程才被喚醒繼續執行 new Thread(() -> { try { System.out.println("子線程1: "+Thread.currentThread().getName()+"正在執行"); Thread.sleep(3000); System.out.println("子線程1: "+Thread.currentThread().getName()+"執行完畢"); latch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); new Thread(() -> { try { System.out.println("子線程2: "+Thread.currentThread().getName()+"正在執行"); Thread.sleep(3000); System.out.println("子線程2: "+Thread.currentThread().getName()+"執行完畢"); latch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); try { System.out.println("等待2個子線程執行完畢..."); latch.await(); System.out.println("2個子線程已經執行完畢"); System.out.println("繼續執行主線程"); } catch (InterruptedException e) { e.printStackTrace(); } } }
運行結果:
app
CountDownLatch是基於共享鎖實現的,內部類Sync繼承同步器AQS,重點分析CountDownLatch如下三個方法:ide
經過構造函數傳入的參數count設置同步狀態(count必須大於0,不然拋出異常),同步狀態在這裏並不表示線程得到鎖的重入次數,而是表示一個計數器,計數器的大小與任務線程的數目是一致的,函數
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } Sync(int count) { setState(count); }
調用了await的線程會處於等待狀態,直到計數器歸0纔會被喚醒。await方法調用了Sync父類AQS的acquireSharedInterruptibly方法,acquireSharedInterruptibly首先檢查線程有中斷,而後調用tryAcquireShared嘗試獲取共享鎖,獲取成功返回1,失敗返回-1,若失敗調用doAcquireSharedInterruptibly將當前線程加入同步隊列阻塞住,等待計數器爲0喚醒。工具
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
countDown方法將計數器減一,調用了AQS的releaseShared方法,當tryReleaseShared方法返回true執行doReleaseShared方法,這個方法在分析讀寫鎖是介紹過了,就是喚醒同步等列等待獲取鎖的線程,即喚醒調用了await方法等待計數器歸0的線程。oop
public void countDown() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
protected boolean tryReleaseShared(int releases) { for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
Thread類的join方法與CountDownLatch做用相似,join方法的實現原理不停檢查調用join的線程是否存活,若是存活則讓當前線程處於等待狀態,當join線程終止後,會喚醒當前線程。CountDownLatch與join相比更靈活,沒必要非得線程停止只要調用了countDown方法就好了,能夠響應中斷以及可以設置超時等功能。學習
CyclicBarrier是指可循環使用的屏障,它可讓一組線程當他們分別達到了同步點(common barrier point)時被阻塞,直到最後一個線程到達了同步點,屏障纔會開門,讓全部被屏障屏蔽的線程繼續運行。
public class BarrierTest { public static void main(String[] args) { int size = 4; CyclicBarrier barrier = new CyclicBarrier(size); for(int i=0;i<size;i++) new Writer(barrier).start(); } static class Writer extends Thread{ private CyclicBarrier cyclicBarrier; public Writer(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } @Override public void run() { System.out.println("線程"+Thread.currentThread().getName()+" is coming..."); try { //睡眠模擬業務操做 Thread.sleep(5000); System.out.println("線程"+Thread.currentThread().getName()+" is waiting on barrier"); cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); }catch(BrokenBarrierException e){ e.printStackTrace(); } } } }
運行結果:
public class CyclicBarrier { //CyclicBarrier使用完了能夠重置,每使用一次都會有一個新的Generation對象,broken表示當前屏障是否被損壞 private static class Generation { boolean broken = false; } //重入鎖 private final ReentrantLock lock = new ReentrantLock(); //condition實現線程等待與喚醒 private final Condition trip = lock.newCondition(); //表示線程數,在parties個線程都調用await方法後,barrier纔算是被經過(tripped)了。 private final int parties; //經過構造方法設置一個Runnable對象,用來在全部線程都到達barrier時執行。 private final Runnable barrierCommand; /** The current generation */ private Generation generation = new Generation(); //count表示還剩下未到達barrier(未調用await)的線程數量 private int count; //構造函數 public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } public CyclicBarrier(int parties) { this(parties, null); }
await重載的兩種方法都是調用的doWait方法。
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); }
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { //獨佔鎖 final ReentrantLock lock = this.lock; lock.lock(); try { //Generation對象 final Generation g = generation; //屏障被破壞,拋出異常 if (g.broken) throw new BrokenBarrierException(); //線程被中斷 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; //最後一個到達同步點的線程 if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out //一直循環直到最後一個線程到達同步點、屏障破損(genneration的broken屬性爲true)、中斷或超時 for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { //g == generation && !g.broken說明此時當前這一輪還沒結束,而且沒有其它線程執行過 //breakBarrier方法。這種狀況會執行breakBarrier置generation的broken標識爲true並 //喚醒其它線程,以後繼續拋出InterruptedException。 if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // 若是g != generation,此時這一輪已經結束,後面返回index做爲到達barrier的次序; // 若是g.broken說明以前已經有其它線程執行了breakBarrier方法,後面會拋出 //BrokenBarrierException。 Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; //超時 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); }
private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); }
從功能上說,CountDownLatch容許一個或多個線程等待其餘線程完成操做,而CyclicBarrier是讓一組線程達到一個公共同步點以後再一塊兒放行;CountDownLatch計數器只能使用一次,CyclicBarrier可使用reset方法重置用以處理某些複雜的業務場景。