繼續 上一篇 《 學習筆記五:線程間的協做與通訊》java
在jdk的併發包裏提供了幾個很是有用的併發工具類。CountDownLatch、CyclicBarrier和Semaphore工具類提供了一種併發流程控制的手段,Exchanger工具類提供了在線程間交換數據的一種手段,以下主要介紹工具類的使用。數據庫
CountDownLatch容許一個或多個線程等待其餘線程完成操做。多線程
CountDownLatch類其實是使用計數器的方式去控制的,當咱們初始化CountDownLatch的時候傳入了一個int變量這個時候在類的內部初始化一個int的變量,每當咱們調用countDownt()方法的時候就使得這個變量的值減1,而對於await()方法則去判斷這個int的變量的值是否爲0,是則表示全部的操做都已經完成,不然繼續等待。併發
構造器中的計數值(count)實際上就是閉鎖須要等待的線程數量。計數器的值必須大於等於0,只是等於0的時候,計數器就是零,調用await方法時不會阻塞當前線程;這個值只能被設置一次,並且CountDownLatch沒有提供任何機制去從新設置這個計數值;一個線程調用countDown方法happen-before,另一個線程調用await方法。app
與CountDownLatch的第一次交互是主線程等待其餘線程。主線程必須在啓動其餘線程後當即調用CountDownLatch.await()方法。這樣主線程的操做就會在這個方法上阻塞,直到其餘線程完成各自的任務。ide
示例:boss等待全部員工來開會,當全部人員都到齊以後,boss宣佈開始會議!!!函數
package com.black.example.mutilThread; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; /** * Created by 10250H on 2018/7/26. */ public class CountDownLatchDemo { //聲明countDownLatch 變量,初始化線程數量(內部計數器) static CountDownLatch countDownLatch = new CountDownLatch(2); public static void main(String[] args) { new Thread(new MyRunner(countDownLatch, "小李", 2000)).start(); new Thread(new MyRunner(countDownLatch, "小張", 4000)).start(); new Thread(new MyRunner(countDownLatch, "小王", 5000)).start(); try { System.out.println("等待員工到來開會。。。。。。。"); //注意這裏是await。主線程將會一直等待在這裏,當全部線程都執行 countDownLatch.countDown();以後當前線程纔會繼續執行 countDownLatch.await(); startMeeting("老闆"); } catch (InterruptedException e) { e.printStackTrace(); } } private static void startMeeting(String name) { System.out.println(name + "說:人齊了。會議開始!!"); } static class MyRunner implements Runnable { CountDownLatch countDownLatch; String name; int time; public MyRunner(CountDownLatch countDownLatch, String name, int time) { this.countDownLatch = countDownLatch; this.name = name; this.time = time; } @Override public void run() { try { System.out.println(name + " 開始出發去公司。"); TimeUnit.SECONDS.sleep(1); System.out.println(name + " 終於到會議室!!!"); countDownLatch.countDown(); System.out.println(name + " 準備好了!!"); } catch (Exception e) { e.printStackTrace(); } } } }運行結果:注意,至因而誰先到會議室,每次運行結果都會不同。由於主線程和子線程的調用時由CPU決定的工具
若是某我的缺席會議,咱們不能讓主線程一直等待,因此可使用另一個帶指定時間的await方法-await(long time,TimeUtil unit)的那個帶指定時間後,就好再也不阻塞當前線程。jion也有相似的方法。學習
CyclicBarrier的字面意思是可循環使用(Cyclic)的屏障(Barrier),它要作的事情是,讓一組線程到達一個屏障(也能夠叫同步點)時被阻塞,直到最後一個線程到達屏障時,屏障纔會開門,全部被屏障攔截的線程纔會繼續運行。ui
CyclicBarrier默認構造方法時CyclicBarrier(int parties),其參數表示屏障攔截的線程數量,每一個線程調用await方法告訴CyclicBarrier我已經到達屏障了,而後當前線程被阻塞。 示例代碼以下:
package com.black.example.mutilThread; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; /** * Created by 10250H on 2018/7/26. */ public class CyclicBarrierDemo1 { static CyclicBarrier cyclicBarrier = new CyclicBarrier(2); public static void main(String[] args) { new Thread(new MyThread(1)).start(); new Thread(new MyThread(2)).start(); } static class MyThread implements Runnable{ private int counter; public MyThread(int counter){ this.counter = counter; } @Override public void run() { try { System.out.println("當前值輸出:"+counter); TimeUnit.SECONDS.sleep(1); cyclicBarrier.await(); System.out.println("counter="+counter+",線程="+Thread.currentThread().getName()+",繼續執行"); } catch (InterruptedException e) { e.printStackTrace(); }catch (BrokenBarrierException e) { e.printStackTrace(); } } } }運行結果:由於主線程和子線程的調用時由CPU決定的,兩個線程都有可能先執行,因此會產生不一樣的結果
若是把new CyclicBarrier(2) 修改成new CyclicBarrier(3),則主線程和子線程會永遠等待,不會繼續執行,由於第三個尚未到達屏障,因此以前到達屏障的兩個線程都不會繼續執行。
CyclicBarrier還提供了高級構造函數CyclicBarrier(int parties,Runnable barrier-Action),用於在現場到達屏障時,優先執行barrierAction,方便處理更復雜的業務場景,示例代碼以下:
package com.black.example.mutilThread; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; public class CyclicBarrierDemo2 { static CyclicBarrier cyclicBarrier = new CyclicBarrier(2,new MyThread(3)); public static void main(String[] args){ new Thread(new Runnable() { @Override public void run() { try { System.out.println("通用計數當前值:1"); cyclicBarrier.await(); System.out.println("默認執行通用線程2"); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }).start(); new Thread(new Runnable() { @Override public void run() { try { System.out.println("通用計數當前值:2"); cyclicBarrier.await(); System.out.println("默認執行通用線程2"); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }).start(); } static class MyThread implements Runnable{ private int counter; public MyThread(int counter){ this.counter = counter; } @Override public void run() { try { System.out.println("優先執行:"+counter); TimeUnit.SECONDS.sleep(1); System.out.println("counter="+counter+",線程="+Thread.currentThread().getName()+",繼續執行"); } catch (InterruptedException e) { e.printStackTrace(); } } } }運行結果:當兩個線程都到達屏障後,優先執行對象MyThread 的任務。
CyclicBarrier的應用場景
能夠用於多線程計算數據,最後合併計算結果的場景。例如:用一個Excel保存用戶的全部銀行流水,每一個Sheet保存一個帳戶近一年的每筆銀行流水,如今須要統計用戶的日均銀行流水,先用多線程處理每一個sheet裏的銀行流水,都執行完以後,獲得每一個sheet的日均銀行流水。最後再用barrierAction總結出整個Excel的日均銀行流水,示例代碼以下:
package com.black.example.mutilThread; import java.util.Map; import java.util.concurrent.*; public class BankWaterService implements Runnable { //建立4個屏障,處理完以後執行當前類的run方法 private CyclicBarrier cyclicBarrier = new CyclicBarrier(4,this); //假設只有4個sheet,因此啓動4個線程 private Executor executor = Executors.newFixedThreadPool(4); //保存每一個sheet計算出的銀行流水結果 private ConcurrentHashMap<String,Integer> sheetCountMap = new ConcurrentHashMap<String,Integer>(); private void count(){ for (int i=0;i<4;i++){ executor.execute(new Runnable() { @Override public void run() { //1:計算當前sheet的銀行流水數據,計算代碼省略....僞代碼以下: sheetCountMap.put(Thread.currentThread().getName(),1); //計算完成,插入屏障 try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }); } } @Override public void run() { int result=0; //彙總每一個sheet的計算結果 for (Map.Entry<String,Integer> entry:sheetCountMap.entrySet()){ result+=entry.getValue(); } //輸出結果 sheetCountMap.put("result",result); System.out.println("最終結果:"+result); } public static void main(String[] args) { BankWaterService bankWaterService = new BankWaterService(); bankWaterService.count(); } }運行結果:最終結果:4
CountDownLatch計數器只能使用一次,而CyclicBarrier計數器可使用reset()方法重置。
因此CyclicBarrier能夠處理更爲複雜的業務場景,如:計算髮生錯誤,能夠重置計數器,並讓線程從新執行一次。
CyclicBarrier其餘用法及源代碼以下,如:
isBroken() :阻塞的線程是否被中斷,返回值boolean類型
getNumberWaiting() :獲取Cyclic-Barrier阻塞的線程數量
Semaphore(信號量)用來控制同時訪問特定資源的線程數量,經過協調各個線程,以保證合理的使用公共資源。
應用場景
Semaphore能夠用於流量控制,特別是公用資源有限的應用場景,好比數據庫連接。
假若有個需求,要讀幾萬個文件的數據,由於是IO密集型任務,咱們能夠啓動幾十個線程併發讀取,可是若是讀到內存後,還須要存儲到數據庫中,而數據庫的鏈接數只有10個,這時咱們必須控制只有10個線程同時獲取數據庫連接保存數據,不然會報錯沒法獲取數據庫連接。這時可使用Semaphore來作流量控制,代碼示例以下:
package com.black.example.mutilThread; import java.util.concurrent.*; public class SemaphoreDemo { private static final int THREAD_COUNT=30; private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT); private static Semaphore semaphore = new Semaphore(10);//10個併發 public static void main(String[] args) { for (int i=0;i<THREAD_COUNT;i++){ threadPool.execute(new Runnable() { @Override public void run() { try { System.out.println(Thread.currentThread().getName()+"-----請求資源"); //請求獲取資源,若是有空閒資源則會當即獲取,進入臨界區,不然將會等待,一直等待到獲取到臨界區資源 semaphore.acquire(); System.out.println(Thread.currentThread().getName()+"----獲取資源,保存數據!"); TimeUnit.SECONDS.sleep(1); semaphore.release();//釋放資源 } catch (InterruptedException e) { e.printStackTrace(); } } }); } } }運行結果:
pool-1-thread-4-----請求資源 pool-1-thread-2-----請求資源 pool-1-thread-4----獲取資源,保存數據! pool-1-thread-1-----請求資源 pool-1-thread-5-----請求資源 pool-1-thread-3-----請求資源 pool-1-thread-5----獲取資源,保存數據! pool-1-thread-8-----請求資源 pool-1-thread-1----獲取資源,保存數據! pool-1-thread-9-----請求資源 pool-1-thread-2----獲取資源,保存數據! pool-1-thread-7-----請求資源 pool-1-thread-9----獲取資源,保存數據! pool-1-thread-10-----請求資源 pool-1-thread-11-----請求資源 pool-1-thread-8----獲取資源,保存數據! pool-1-thread-6-----請求資源 pool-1-thread-3----獲取資源,保存數據! pool-1-thread-13-----請求資源 pool-1-thread-11----獲取資源,保存數據! pool-1-thread-10----獲取資源,保存數據! pool-1-thread-12-----請求資源 pool-1-thread-7----獲取資源,保存數據! pool-1-thread-15-----請求資源 pool-1-thread-14-----請求資源 pool-1-thread-16-----請求資源 pool-1-thread-17-----請求資源 pool-1-thread-18-----請求資源 pool-1-thread-19-----請求資源 pool-1-thread-21-----請求資源 pool-1-thread-20-----請求資源 pool-1-thread-22-----請求資源 pool-1-thread-23-----請求資源 pool-1-thread-24-----請求資源 pool-1-thread-25-----請求資源 pool-1-thread-27-----請求資源 pool-1-thread-26-----請求資源 pool-1-thread-28-----請求資源 pool-1-thread-29-----請求資源 pool-1-thread-30-----請求資源 pool-1-thread-6----獲取資源,保存數據! pool-1-thread-13----獲取資源,保存數據! pool-1-thread-12----獲取資源,保存數據! pool-1-thread-15----獲取資源,保存數據! pool-1-thread-16----獲取資源,保存數據! pool-1-thread-17----獲取資源,保存數據! pool-1-thread-14----獲取資源,保存數據! pool-1-thread-21----獲取資源,保存數據! pool-1-thread-19----獲取資源,保存數據! pool-1-thread-18----獲取資源,保存數據! pool-1-thread-20----獲取資源,保存數據! pool-1-thread-22----獲取資源,保存數據! pool-1-thread-23----獲取資源,保存數據! pool-1-thread-26----獲取資源,保存數據! pool-1-thread-27----獲取資源,保存數據! pool-1-thread-25----獲取資源,保存數據! pool-1-thread-24----獲取資源,保存數據! pool-1-thread-30----獲取資源,保存數據! pool-1-thread-29----獲取資源,保存數據! pool-1-thread-28----獲取資源,保存數據!Semaphore簡單用法以下:
//構造方法,可用的許可證數量,默認使用非公平鎖的方式建立 public Semaphore(int permits){...} //嘗試獲取許可證 public boolean tryAcquire(){...} //嘗試獲取許可證,在指定時間內若獲取不到則返回 public boolean tryAcquire(long timeout, TimeUnit unit)throws InterruptedException{...} //返回信號量中當前可用的許可證數 public int availablePermits(){...} //返回正在等待獲取許可證的線程數(估計值) public final int getQueueLength() {...} //查詢是否有任何線程等待獲取許可證 public final boolean hasQueuedThreads(){...} //減小reduction個許可證,這個方法在使用的子類中頗有用跟蹤不可用資源的信號量 protected void reducePermits(int reduction) {...} //返回全部等待獲取許可證的線程集合 protected Collection<Thread> getQueuedThreads(){...}
上一篇:學習筆記五:線程間的協做與通訊
下一篇:待續....