本文主要介紹和對比咱們經常使用的幾種併發工具類,主要涉及 CountDownLatch
、 CyclicBarrier
、 Semaphore
、 Exchanger
相關的內容,若是對多線程相關內容不熟悉,能夠看筆者以前的一些文章:java
CountDownLatch
、CyclicBarrier
二者的使用與區別,他們都是等待多線程完成,是一種併發流程的控制手段,Semaphore
、Exchanger
的使用,semaphore
是信號量,能夠用來控制容許的線程數,而 Exchanger
能夠用來交換兩個線程間的數據。CountDownLatch
是 JDK5
以後加入的一種併發流程控制工具,它在 java.util.concurrent
包下CountDownLatch
容許一個或多個線程等待其餘線程完成操做,這裏須要注意,是能夠是一個等待也能夠是多個來等待CountDownLatch
的構造函數以下,它接受一個 int
類型的參數做爲計數器,即若是你想等待N
個線程完成,那麼這裏就傳入 N
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
countDown
與 await
,其中 當咱們調用 countDown
方法時相應的 N
的值減 1,而 await
方法則會阻塞當前線程,直到 N
的值變爲零。CountDownLatch
來實現這一案例,那麼等待的個數 N
就是上面的裁判線程的個數,即爲 1,/** * @url i-code.onlien * 雲棲簡碼 */ public static void main(String[] args) throws InterruptedException { //模擬跑步比賽,裁判說開始,全部選手開始跑,咱們可使用countDownlatch來實現 //這裏須要等待裁判說開始,因此時等着一個線程 CountDownLatch countDownLatch = new CountDownLatch(1); new Thread(() ->{ try { System.out.println(Thread.currentThread().getName() +"已準備"); countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"開始跑~~"); },"選手1").start(); new Thread(() ->{ try { System.out.println(Thread.currentThread().getName() +"已準備"); countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"開始跑~~"); },"選手2").start(); TimeUnit.SECONDS.sleep(1); System.out.println("裁判:預備~~~"); countDownLatch.countDown(); System.out.println("裁判:跑~~~"); }
在上述代碼中,咱們首先建立了一個計數爲1 的CountDownLatch
對象,這表明咱們須要等待的線程數,以後再建立了兩個線程,用來表明選手線程,同時在選手的線程中咱們都調用了await
方法,讓線程進入阻塞狀態,直到CountDownLatch的計數爲零後再執行後面的內容,在主線程main
方法中咱們等待 1秒後執行countDown
方法,這個方法就是減一,此時的N
則爲零了,那麼選手線程則開始執行後面的內容,總體的輸出如上圖所示
CountDownLatch
來實現,那麼的計數個數 N
則爲5,由於要等待這五個,經過代碼實現以下:public static void main(String[] args) throws InterruptedException { /** * i-code.online * 雲棲簡碼 */ //等待的個數 CountDownLatch countDownLatch = new CountDownLatch(5); for (int i = 0; i < 5; i++) { new Thread(()->{ System.out.println(Thread.currentThread().getName() + "從住所出發..."); try { TimeUnit.SECONDS.sleep((long) (Math.random()*10)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " 到達目的地-----"); countDownLatch.countDown(); },"人員-"+i).start(); } System.out.println("大巴正在等待人員中....."); countDownLatch.await(); System.out.println("-----全部人到齊,出發-----"); }
從上述代碼中咱們能夠看到,定義了一個計數爲5的countDownLatch
,以後經過循環建立五個線程,模擬五我的員,當他們到達指定地點後執行countDown
方法,對計數減一。主線程至關因而大巴車的線程,執行await
方法進行阻塞,只有當N
的值減到0後則執行後面的輸出
public CountDownLatch(int count) { };
它的構造函數是傳入一個參數,該參數
count
是須要倒數的數值。
await()
:調用 await()
方法的線程開始等待,直到倒數結束,也就是 count
值爲 0
的時候纔會繼續執行。await(long timeout, TimeUnit unit)
:await()
有一個重載的方法,裏面會傳入超時參數,這個方法的做用和 await()
相似,可是這裏能夠設置超時時間,若是超時就再也不等待了。countDown()
:把數值倒數 1
,也就是將 count
值減 1
,直到減爲 0
時,以前等待的線程會被喚起。上面的案例介紹了CountDownLatch
的使用,可是CountDownLatch
有個特色,那就是不可以重用,好比已經完成了倒數,那可不能夠在下一次繼續去從新倒數呢?是能夠的,一旦倒數到0 則結束了,沒法再次設置循環執行,可是咱們實際需求中有不少場景中須要循環來處理,這時候咱們可使用CyclicBarrier
來實現
CyclicBarrier
與 CountDownLatch
比較類似,當等待到必定數量的線程後開始執行某個任務CyclicBarrier
的字面意思是能夠循環使用的屏障,它的功能就是讓一組線程到達一個屏障(同步點)時被阻塞,直到最後一個線程到達屏障時,屏障纔會開會,此時全部被屏障阻塞的線程都將繼續執行。以下演示
public CyclicBarrier(int parties) { this(parties, null); } public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
CyclicBarrier(int parties)
構造函數提供了int
類型的參數,表明的是須要攔截的線程數量,而每一個線程經過調用 await
方法來告訴 CyclicBarrier
我到達屏障點了,而後阻塞CyclicBarrier(int parties, Runnable barrierAction)
構造函數是爲咱們提供的一個高級方法,加了一個 barrierAction
的參數,這是一個Runnable
類型的,也就是一個線程,它表示當全部線程到達屏障後,清閒觸發 barrierAction
線程執行,再執行各個線程以後的內容CyclicBarrier
的來實現,這裏咱們須要來攔截的線程就是兩個。具體實現 以下:/* CyclicBarrier 與countDownLatch 比較類似,也是等待線程完成, 不過countDownLatch 是await等待其餘的線程經過countDown的數量,達到必定數則執行, 而 CyclicBarrier 則是直接看await的數量,達到必定數量直接所有執行, */ public static void main(String[] args) { //比如情侶約會,無論誰先到都的等另外一個,這裏就是兩個線程, CyclicBarrier cyclicBarrier = new CyclicBarrier(2); new Thread(() ->{ System.out.println("快速收拾,出門~~~"); try { TimeUnit.MILLISECONDS.sleep(500); System.out.println("到了約會地點等待女友前來~~"); cyclicBarrier.await(); System.out.println("女友到來嗨皮出發~~約會"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } },"男友").start(); new Thread(() ->{ System.out.println("慢慢收拾,出門~~~"); try { TimeUnit.MILLISECONDS.sleep(5000); System.out.println("到了約會地點等待男友前來~~"); cyclicBarrier.await(); System.out.println("男友到來嗨皮出發~~約會"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } },"女友").start(); }
上面代碼,相對簡單,建立一個攔截數爲2的屏障,以後建立兩個線程,調用await方法,只有當調用兩次纔會觸發後面的流程。
Runnable
參數的構造函數;和以前 CountDownLatch
的案例類似,公司組織出遊,這時候確定有不少大巴在等待接送,大巴不會等全部的 人都到纔出發,而是每坐滿一輛車就出發一輛,這種場景咱們就可使用 CyclicBarrier
來實現,實現以下:/* CyclicBarrier是可重複使用到,也就是每當幾個知足是再也不等待執行, 好比公司組織出遊,安排了好多輛大把,每坐滿一輛就發車,再也不等待,相似這種場景,實現以下: */ public static void main(String[] args) { //公司人數 int peopleNum = 2000; //每二十五我的一輛車,湊夠二十五則發車~ CyclicBarrier cyclicBarrier = new CyclicBarrier(25,() ->{ //達到25人出發 System.out.println("------------25人數湊齊出發------------"); }); for (int j = 1; j <= peopleNum; j++) { new Thread(new PeopleTask("People-"+j,cyclicBarrier)).start(); } } static class PeopleTask implements Runnable{ private String name; private CyclicBarrier cyclicBarrier; public PeopleTask(String name,CyclicBarrier cyclicBarrier){ this.name = name; this.cyclicBarrier = cyclicBarrier; } @Override public void run() { System.out.println(name+"從家裏出發,正在前往聚合地...."); try { TimeUnit.MILLISECONDS.sleep(((int) Math.random()*1000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(name+"到達集合地點,等待其餘人.."); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }
相同點:面試
不一樣點:算法
CountDownLatch
的計數器只能使用一次,到到達0後就不能再次使用了,除非新建實例;而 CyclicBarrier
的計數器是能夠複用循環的,因此 CyclicBarrier
能夠用在更復雜的場景,能夠隨時調用 reset
方法來重製攔截數,如計算髮生錯誤時能夠直接充值計數器,讓線程從新執行一次。CyclicBarrier
要等固定數量的線程都到達了屏障位置才能繼續執行,而 CountDownLatch
只需等待數字倒數到 0
,也就是說 CountDownLatch
做用於事件,但 CyclicBarrier
做用於線程;CountDownLatch
是在調用了 countDown
方法以後把數字倒數減 1
,而 CyclicBarrier
是在某線程開始等待後把計數減 1
。CyclicBarrier
有執行動做 barrierAction
,而 CountDownLatch
沒這個功能。Semaphore
(信號量)是用來控制同時訪問特定資源的線程數量,它經過協調各個線程,以保證合理的使用公共資源,
acquire
方法)。線程能夠從信號量中去「獲取」一個許可證,一旦線程獲取以後,信號量持有的許可證就轉移過去了,因此信號量手中剩餘的許可證要減一。release
方法),這個許可證至關於被歸還給信號量了,因而信號量中的許可證的可用數量加一。當信號量擁有的許可證數量減到 0 時,若是下個線程還想要得到許可證,那麼這個線程就必須等待,直到以前獲得許可證的線程釋放,它才能獲取。因爲線程在沒有獲取到許可證以前不能進一步去訪問被保護的共享資源,因此這就控制了資源的併發訪問量,這就是總體思路。IO
操做,咱們能夠啓動不少線程可是數據庫的鏈接池是有限制的,假設咱們設置容許五個連接,若是咱們開啓太多線程直接操做則會出現異常,這時候咱們能夠經過信號量來控制,讓一直最多隻有五個線程來獲取鏈接。代碼以下:/* Semaphore 是信號量, 能夠用來控制線程的併發數,能夠協調各個線程,以達到合理的使用公共資源 */ public static void main(String[] args) { //建立10個容量的線程池 final ExecutorService service = Executors.newFixedThreadPool(100); //設置信號量的值5 ,也就是容許五個線程來執行 Semaphore s = new Semaphore(5); for (int i = 0; i < 100; i++) { service.submit(() ->{ try { s.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } try { System.out.println("數據庫耗時操做"+Thread.currentThread().getName()); TimeUnit.MILLISECONDS.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "正在執行...."); s.release(); }); } }
如上代碼,建立了一個容量100的線程池,模擬咱們程序中大量的線程,添加一百個任務,讓線程池執行。建立了一個容量爲5的信號量,在線程中咱們調用acquire
來得到信號量的許可,只有得到了才能只能下面的內容否則阻塞。當執行完後釋放該許可,經過release
方法,
private static int count = 0; /* Semaphore 中若是咱們容許的的許可證數量爲1 ,那麼它的效果與鎖類似。 */ public static void main(String[] args) throws InterruptedException { final ExecutorService service = Executors.newFixedThreadPool(10); Semaphore semaphore = new Semaphore(1); for (int i = 0; i < 10000; i++) { service.submit(() ->{ try { semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "執行了"); count ++; semaphore.release(); }); } service.shutdown(); TimeUnit.SECONDS.sleep(5); System.out.println(count); }
public boolean tryAcquire()
:tryAcquire
和鎖的 trylock
思惟是一致的,是嘗試獲取許可證,至關於看看如今有沒有空閒的許可證,若是有就獲取,若是如今獲取不到也不要緊,沒必要陷入阻塞,能夠去作別的事。public boolean tryAcquire(long timeout, TimeUnit unit)
:是一個重載的方法,它裏面傳入了超時時間。好比傳入了 3 秒鐘,則意味着最多等待 3 秒鐘,若是等待期間獲取到了許可證,則往下繼續執行;若是超時時間到,依然獲取不到許可證,它就認爲獲取失敗,且返回 false。int availablePermits()
:返回此信號量中當前可用的許可證數int getQueueLength()
:返回正在等待許可證的線程數boolean hasQueuedThreads()
:判斷是否有線程正在等待獲取許可證void reducePermits(int reduction)
:減小 reduction
個許可證,是個 protected
方法Collection<Thread> getQueuedThreads()
:返回正在等待獲取許可證的線程集合,是個 protected
方法Exchanger
(交換者)是一個用於線程間協做的工具類,它主要用於進行線程間數據的交換,它有一個同步點,當兩個線程到達同步點時能夠將各自的數據傳給對方,若是一個線程先到達同步點則會等待另外一個到達同步點,到達同步點後調用 exchange
方法能夠傳遞本身的數據而且得到對方的數據。public class ExchangerTest { /* Exchanger 交換, 用於線程間協做的工具類,能夠交換線程間的數據, 其提供一個同步點,當線程到達這個同步點後進行數據間的交互,遺傳算法能夠如此來實現, 以及校對工做也能夠如此來實現 */ public static void main(String[] args) { /* 模擬 兩個工做人員錄入記錄,爲了防止錯誤,二者錄的相同內容,程序僅從校對,看是否有錯誤不一致的 */ //開闢兩個容量的線程池 final ExecutorService service = Executors.newFixedThreadPool(2); Exchanger<InfoMsg> exchanger = new Exchanger<>(); service.submit(() ->{ //模擬數據 線程 A的 InfoMsg infoMsg = new InfoMsg(); infoMsg.content="這是線程A"; infoMsg.id ="10001"; infoMsg.desc = "1"; infoMsg.message = "message"; System.out.println("正在執行其餘..."); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } try { final InfoMsg exchange = exchanger.exchange(infoMsg); System.out.println("線程A 交換數據====== 獲得"+ exchange); if (!exchange.equals(infoMsg)){ System.out.println("數據不一致~~請稽覈"); return; } } catch (InterruptedException e) { e.printStackTrace(); } }); service.submit(() ->{ //模擬數據 線程 B的 InfoMsg infoMsg = new InfoMsg(); infoMsg.content="這是線程B"; infoMsg.id ="10001"; infoMsg.desc = "1"; infoMsg.message = "message"; System.out.println("正在執行其餘..."); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } try { final InfoMsg exchange = exchanger.exchange(infoMsg); System.out.println("線程B 交換數據====== 獲得"+ exchange); if (!exchange.equals(infoMsg)){ System.out.println("數據不一致~~請稽覈"); return; } } catch (InterruptedException e) { e.printStackTrace(); } }); service.shutdown(); } static class InfoMsg{ String id; String name; String message; String content; String desc; @Override public String toString() { return "InfoMsg{" + "id='" + id + '\'' + ", name='" + name + '\'' + ", message='" + message + '\'' + ", content='" + content + '\'' + ", desc='" + desc + '\'' + '}'; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; InfoMsg infoMsg = (InfoMsg) o; return Objects.equals(id, infoMsg.id) && Objects.equals(name, infoMsg.name) && Objects.equals(message, infoMsg.message) && Objects.equals(content, infoMsg.content) && Objects.equals(desc, infoMsg.desc); } @Override public int hashCode() { return Objects.hash(id, name, message, content, desc); } } }
上面代碼運行能夠看到,當咱們線程A/B
到達同步點即調用exchange
後進行數據的交換,拿到對方的數據再與本身的數據對比能夠作到稽覈 的效果
Exchanger
一樣能夠用於遺傳算法中,選出兩個對象進行交互兩個的數據經過交叉規則獲得兩個混淆的結果。Exchanger
中嗨提供了一個方法 public V exchange(V x, long timeout, TimeUnit unit)
主要是用來防止兩個程序中一個一直沒有執行 exchange
而致使另外一個一直陷入等待狀態,這是能夠用這個方法,設置超時時間,超過這個時間則再也不等待。本文由AnonyStar 發佈,可轉載但需聲明原文出處。
歡迎關注微信公帳號 :雲棲簡碼 獲取更多優質文章
更多文章關注筆者博客 : 雲棲簡碼 i-code.online