CountDownLatch
以及CyclicBarrier
都是Java
裏面的同步工具之一,本文介紹了二者的基本原理以及基本使用方法。java
CountDownLatch
CountDownLatch
是一個同步工具類,常見的使用場景包括:dom
好比考慮這樣一個場景,在一個電商網站中,用戶點擊了首頁,須要一部分的商品,同時顯示它們的價格,那麼,調用的流程應該是:工具
解決這樣的問題可使用串行化或並行化操做,串行化就是逐一計算商品的售價,並返回,並行化就是獲取商品後,並行計算每個商品的售價,最後返回,顯而後一種方案要比前一種要好,那麼這時候就能夠用上CountDownLatch
了。網站
一份簡單的模擬代碼以下:this
import java.util.List; import java.util.concurrent.*; import java.util.stream.Collectors; import java.util.stream.IntStream; import static java.util.concurrent.ThreadLocalRandom.current; public class CountDownLatchExample { public static void main(String[] args) throws InterruptedException{ List<Price> list = IntStream.rangeClosed(1,10).mapToObj(Price::new).collect(Collectors.toList()); //計數器大小爲商品列表的長度 final CountDownLatch latch = new CountDownLatch(list.size()); //線程池 ThreadPoolExecutor executor = new ThreadPoolExecutor(5,10,2, TimeUnit.SECONDS,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy()); list.forEach(p-> executor.execute(()->{ System.out.println("Product "+p.id+" start calculate price "); try{ //隨機休眠模擬業務操做耗時 TimeUnit.SECONDS.sleep(current().nextInt(10)); p.setPrice(p.getPrice()*((p.getId() & 1) == 1 ? 0.9 : 0.7)); System.out.println("Product "+p.id+" calculate price completed"); }catch (InterruptedException e){ e.printStackTrace(); }finally { //每完成計算一個商品,將計數器減1,注意須要放在finally中 latch.countDown(); } })); //主線程阻塞直到全部的計數器爲0,也就是等待全部的子任務計算價格完畢 latch.await(); System.out.println("All of prices calculate finished"); //手動終止,否則不會結束運行 executor.shutdown(); } private static class Price{ private final int id; private double price; public Price(int id) { this.id = id; } public int getId() { return id; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } } }
輸出:spa
代碼比較簡單,關鍵地方用上了註釋,能夠看到代碼執行順序以下:線程
值得注意的是計數器減1的操做須要放在finally
中,由於有可能會出現異常,若是出現異常致使計數器不能減小,那麼主線程會一直阻塞。code
另外,CountDownLatch
還有一個await(long timeout,TimeUnit unit)
方法,是帶有超時參數的,也就是說,若是在超時時間內,計數器的值仍是大於0(還有任務沒執行完成),會使得當前線程退出阻塞狀態。隊列
CyclicBarrier
CyclicBarrier
與CountDownLatch
有不少相似的地方,也是一個同步工具類,容許多個線程在執行完相應的操做以後彼此等待到達同一個barrier point
(屏障點)。CyclicBarrier
也適合某個串行化的任務被拆分爲多個並行化任務,這點與CountDownLatch
相似,可是CyclicBarrier
具有的一個更強大的功能是,CyclicBarrier
能夠被重複使用。圖片
先簡單說一下CyclicBarrier
的實現原理:
CyclicBarrier
,傳入一個int
參數,表示分片(parites
),一般意義上來講分片數就是任務的數量await()
,等待其餘線程也到達barrier point
常見的使用方法是設置分片數爲任務數+1,這樣,能夠在主線程中執行await()
,等待全部子任務完成。好比下面是使用CyclicBarrier
實現一樣功能的模擬代碼:
import java.util.List; import java.util.concurrent.*; import java.util.stream.Collectors; import java.util.stream.IntStream; import static java.util.concurrent.ThreadLocalRandom.current; public class CountDownLatchExample { public static void main(String[] args) throws InterruptedException,BrokenBarrierException{ List<Price> list = IntStream.rangeClosed(1,10).mapToObj(Price::new).collect(Collectors.toList()); final CyclicBarrier barrier = new CyclicBarrier(11); ThreadPoolExecutor executor = new ThreadPoolExecutor(10,10,2, TimeUnit.SECONDS,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy()); list.forEach(p-> executor.execute(()->{ System.out.println("Product "+p.id+" start calculate price "); try{ TimeUnit.SECONDS.sleep(current().nextInt(10)); p.setPrice(p.getPrice()*((p.getId() & 1) == 1 ? 0.9 : 0.7)); System.out.println("Product "+p.id+" calculate price completed"); }catch (InterruptedException e){ e.printStackTrace(); }finally { try{ barrier.await(); }catch (InterruptedException | BrokenBarrierException e){ e.printStackTrace(); } } })); barrier.await(); System.out.println("All of prices calculate finished"); executor.shutdown(); } private static class Price{ private final int id; private double price; public Price(int id) { this.id = id; } public int getId() { return id; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } } }
輸出相同,代碼大部分類似,不一樣的地方有:
latch.countDown()
替換成了barrier.await()
latch.await()
替換成了barrier.await()
10
await()
方法會等待全部的線程到達barrier point
,上面代碼執行流程簡述以下:
CyclicBarrier
,分片數爲11(子線程數+1)await()
,等待子線程執行完成await()
,等待其餘線程也到達barrier point
注意一個很大的不一樣就是這裏的線程池核心線程數目改爲了 10,那麼,爲何須要10?
由於若是是設置一個小於10的核心線程個數,因爲線程池是會先建立核心線程來執行任務,核心線程滿了以後,放進任務隊列中,而假設只有5個核心線程,那麼:
這樣的話,會出現死鎖,由於計算中的線程須要隊列中的任務到達barrier point
才能結束,而隊列中的任務須要核心線程計算完畢後,才能調度出來計算,這樣死鎖就出現了。
CyclicBarrier
與CountDownLatch
的一個最大不一樣是,CyclicBarrier
能夠被重複使用,原理上來講,await()
會將內部計數器減1,當計數器減爲0時,會自動進行計數器(分片數)重置。好比,在上面的代碼中,因爲趕上促銷活動,須要對商品的價格再次進行計算:
import java.util.List; import java.util.concurrent.*; import java.util.stream.Collectors; import java.util.stream.IntStream; import static java.util.concurrent.ThreadLocalRandom.current; public class CountDownLatchExample { public static void main(String[] args) throws InterruptedException,BrokenBarrierException{ List<Price> list = IntStream.rangeClosed(1,10).mapToObj(Price::new).collect(Collectors.toList()); final CyclicBarrier barrier = new CyclicBarrier(11); ThreadPoolExecutor executor = new ThreadPoolExecutor(10,10,2, TimeUnit.SECONDS,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy()); list.forEach(p-> executor.execute(()->{ System.out.println("Product "+p.id+" start calculate price."); try{ TimeUnit.SECONDS.sleep(current().nextInt(10)); p.setPrice(p.getPrice()*((p.getId() & 1) == 1 ? 0.9 : 0.7)); System.out.println("Product "+p.id+" calculate price completed."); }catch (InterruptedException e){ e.printStackTrace(); }finally { try{ barrier.await(); }catch (InterruptedException | BrokenBarrierException e){ e.printStackTrace(); } } })); barrier.await(); System.out.println("All of prices calculate finished."); //複製的一段相同代碼 list.forEach(p-> executor.execute(()->{ System.out.println("Product "+p.id+" start calculate price again."); try{ TimeUnit.SECONDS.sleep(current().nextInt(10)); p.setPrice(p.getPrice()*((p.getId() & 1) == 1 ? 0.9 : 0.7)); System.out.println("Product "+p.id+" calculate price completed."); }catch (InterruptedException e){ e.printStackTrace(); }finally { try{ barrier.await(); }catch (InterruptedException | BrokenBarrierException e){ e.printStackTrace(); } } })); barrier.await(); System.out.println("All of prices calculate finished again."); executor.shutdown(); } private static class Price{ private final int id; private double price; public Price(int id) { this.id = id; } public int getId() { return id; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } } }
將計算價格的代碼複製一遍,其中沒有手動修改計數器,只是調用await()
,輸出以下:
能夠看到,並無對CycliBarrier
進行相似reset
之類的操做,可是依然能按正常邏輯運行,這是由於await()
內部會維護一個計數器,當計數器爲0的時候,會自動進行重置,下面是await()
在OpenJDK 11
下的源碼:
public int await() throws InterruptedException, BrokenBarrierException { try { return this.dowait(false, 0L); } catch (TimeoutException var2) { throw new Error(var2); } } private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { ReentrantLock lock = this.lock; lock.lock(); byte var9; try { //... int index = --this.count; if (index != 0) { //計數器不爲0的狀況 //.... } boolean ranAction = false; try { Runnable command = this.barrierCommand; if (command != null) { command.run(); } ranAction = true; this.nextGeneration(); var9 = 0; } finally { if (!ranAction) { this.breakBarrier(); } } } finally { lock.unlock(); } return var9; } private void nextGeneration() { this.trip.signalAll(); this.count = this.parties; this.generation = new CyclicBarrier.Generation(); }
當計數器爲0時,會生成新的Generation
,並將var9
置爲0,最後返回var9
(在這個方法中var9
只有一處賦值,就是代碼中的var9=0
,能夠理解成直接返回0)。
CyclicBarrier
其餘的一些經常使用方法CyclicBarrier(int parties,Runnable barrierAction)
:構造的時候傳入一個Runnable
,表示全部線程到達barrier point
時,會調用該Runnable
await(long timeout,TimeUnit unit)
:與無參的await()
相似,底層調用的是相同的doWait()
,不過增長了超時功能isBroken()
:返回broken
狀態,某個線程因爲執行await
而進入阻塞,此時若是執行了中斷操做(好比interrupt
),那麼isBroken()
會返回true
。須要注意,處於broken
狀態的CyclicBarrier
不能被直接使用,須要調用reset()
進行重置下面是CountDownLatch
與CyclicBarrier
的一些簡單比較,相同點以下:
java.util.concurrent
包下的線程同步工具類不一樣點:
CountDownLatch
的await()
方法會等待計數器歸0,而CyclicBarrier
的await()
會等待其餘線程到達barrier point
CyclicBarrier
內部的計數器是能夠被重置的,可是CountDownLatch
不能夠CyclicBarrier
是由Lock
和Condition
實現的,而CountDownLatch
是由同步控制器AQS
實現的CyclicBarrier
不容許parties
爲0,而CountDownLatch
容許count
爲0