需求場景:當咱們須要解析一個Excel裏多個sheet的數據,此時能夠考慮使用多線程,每一個線程解析一個sheet裏的數據,等到sheet都解析完以後,程序須要提示解析完成。
固然咱們可使用join方法,join用於讓當前線程等待join線程執行結束。在JDK1.5以後的併發包中提供的CountDownLatch也能夠實現join的功能。java
CountDownLatch容許一個或多個線程等待其餘線程完成操做。
CountDownLatch的構造函數接收一個int類型的參數做爲計數器,若是你想等待N個點完成,就傳入N。當咱們調用CountDownLatch的countDown方法時,N就會減1,CountDownLatch的await方法會阻塞當前線程,直到N變爲零。因爲countDown方法能夠用在任何地方,因此這裏說的N個點,能夠是N個線程,也能夠是1個線程裏的N個執行步驟。用在多個線程時,只須要把這個CountDownLatch的引用傳遞到線程裏便可。數據庫
public class CountDownLatchTest { static CountDownLatch c = new CountDownLatch(2); public static void main(String[] args) throws InterruptedException { // new Thread(new Runnable() { // @Override // public void run() { // System.out.println(1); // c.countDown(); // System.out.println(2); // c.countDown(); // // } // }).start(); // c.await(); // System.out.println(3); new Thread(new Runnable() { @Override public void run() { System.out.println(1); c.countDown(); } }).start(); new Thread(new Runnable() { @Override public void run() { System.out.println(2); c.countDown(); } }).start(); c.await(); System.out.println(3); } }
上面執行結果多是2 1 finished
,也多是1 2 finished
,總之一、2在finished以前輸出。編程
CyclicBarrier要作的事是讓一組線程到達一個屏障(也能夠叫同步點)時被阻塞,直到最後一個線程到達屏障時,屏障纔會開門,全部被屏障攔截的線程纔會繼續運行。多線程
public class CyclicBarrierTest { static CyclicBarrier c = new CyclicBarrier(2); public static void main(String[] args) { new Thread(new Runnable() { @Override public void run() { try { c.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.println(1); } }).start(); try { c.await(); } catch (Exception e) { e.printStackTrace(); } System.out.println(2); } }
由於主線程和子線程的調度是由CPU決定的,兩個線程都有可能先執行,因此輸出1 2,有多是輸出2 1。併發
若是把new CyclicBarrier(2)修改爲new CyclicBarrier(3),則主線程和子線程會永遠等待,由於沒有第三個線程執行await()方法,既沒有第三個線程到達屏障,因此兩個線程都不會繼續執行。ide
CyclicBarrier還提供一個更高級的構造函數CyclicBarrier(int parties, Runnable barrieAction),用於在線程到達屏障時,優先執行barrierAction,方便處理更復雜的業務場景。函數
public class CyclicBarrierTest1 { static CyclicBarrier c=new CyclicBarrier(2, new Runnable() { @Override public void run() { System.out.println("initialize...."); } }); public static void main(String[] args) { new Thread(new Runnable() { @Override public void run() { try { c.await(); } catch (Exception e) { e.printStackTrace(); } System.out.println(1); } }).start(); try { c.await(); } catch (Exception e) { e.printStackTrace(); } System.out.println(2); } }
上面會首先輸出initialize....
工具
應用場景:
CyclicBarrier能夠用於多線程計算數據,最後合併計算結果的場景。例如,用一個Excel保存了用戶全部銀行流水,每一個Sheet保存一個帳戶近一年的每筆銀行流水,如今須要統計用戶的日均銀行流水,先用多線程處理每一個sheet裏的銀行流水,都執行完以後,獲得每一個sheet的日均銀行流水,最後,再用barrierAction根據這些線程的計算結果,計算出整個Excel的日均銀行流水。ui
public class CyclicBarrierTest2 implements Runnable{ private CyclicBarrier c = new CyclicBarrier(4, this); private Executor executor = Executors.newFixedThreadPool(4); private ConcurrentHashMap<String, Integer> sheetBankWaterCount = new ConcurrentHashMap<>(); private void count() { for (int i=0;i<4;i++) { executor.execute(new Runnable() { @Override public void run() { try { //模擬計算當前sheet的流水數據。 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } sheetBankWaterCount.put(Thread.currentThread().getName(), 1); try { //銀流計算完成,插入一個屏障 c.await(); } catch (Exception e){ e.printStackTrace(); } } }); } } @Override public void run() { int result=0; //全部線程到達屏障後,彙總每一個sheet計算出的結果 for (Map.Entry<String, Integer> sheet : sheetBankWaterCount.entrySet()) { result+=sheet.getValue(); } sheetBankWaterCount.put("result", result); System.out.println(result); } public static void main(String[] args) { CyclicBarrierTest2 barrierTest2=new CyclicBarrierTest2(); barrierTest2.count(); } }
Semaphore用來控制同時訪問特定資源的線程數量,它經過協調各個線程,以保證合理地使用公共資源。
應用場景:
Semaphore能夠用於作流量控制,特別是公共資源有限的應用場景,好比數據庫鏈接。假若有一個需求要讀取幾萬個文件的數據,由於都是IO密集型任務,咱們能夠啓動幾十個線程併發地讀取,可是若是讀到內存中,還須要存儲到數據庫中,而數據庫的鏈接數只有10個,這是咱們必須控制只有10個線程同時獲取數據庫鏈接保存數據,不然會報錯沒法獲取數據庫鏈接。這個時候,就可使用Semaphore來作流量控制。this
public class SemaphoreTest { private static final int THREAD_COUNT=30; private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT); private static Semaphore s = new Semaphore(10); public static void main(String[] args){ for (int i=0;i<THREAD_COUNT;i++) { threadPool.execute(new Runnable() { @Override public void run() { try { s.acquire(); System.out.println("save data"); s.release(); } catch (Exception e) { e.printStackTrace(); } } }); } threadPool.shutdown(); } }
在代碼中,雖然有30個線程在執行,可是隻容許10個併發執行。Semaphore的構造方法Semaphore(int permits)接受一個整數的數字,表示可用的許可證數量。Semaphore(10)表示容許10個線程獲取許可證,也就是最大併發數10.Semaphore的用法也很簡單,首先線程使用Semaphore的acquire()方法獲取一個許可證,使用完以後調用release()方法歸還許可證。還能夠用tryAcquire()方法嘗試獲取許可證。
Exchanger是一個用於線程間協做的工具類。Exchanger用於進行線程間的數據交換。它提供一個同步點,在這個同步點,兩個線程能夠交換彼此的數據。這兩個線程經過exchanger方法交換數據,若是第一個線程先執行exchange()方法,它會一直等待第二個線程也執行exchange方法,當兩個線程都到達同步點時,這兩個線程就能夠交換數據,將本線程生產出來的數據傳遞給對方。
應用場景:
Exchanger可用與校對工做,好比咱們須要將紙質銀行流水經過人工的方式錄入成電子銀行流水,爲了不錯誤,採用AB崗兩人進行錄入,錄入到Excel以後,系統須要加載這兩個Excel,並對兩個Excel數據進行校對,看看是否錄入一致。
public class ExchangerTest { private static final Exchanger<String> exchanger=new Exchanger<>(); private static ExecutorService threadPool = Executors.newFixedThreadPool(2); public static void main(String[] args) { threadPool.execute(new Runnable() { @Override public void run() { String a = "銀行流水A"; try { a=exchanger.exchange(a); System.out.println("交換後,a="+a); } catch (InterruptedException e) { e.printStackTrace(); } } }); threadPool.execute(new Runnable() { @Override public void run() { String b = "銀行流水B"; try { b=exchanger.exchange(b); System.out.println("交換後,b="+b); } catch (InterruptedException e) { e.printStackTrace(); } } }); threadPool.shutdown(); } }
上面輸出:
交換後,a=銀行流水B 交換後,b=銀行流水A
若是兩個線程有一個沒有執行exchange()方法,則會一直等待,若是擔憂有特殊狀況發生,避免一直等待,能夠用exchange(V x, long timeout, TimeUnit unit)
設置最大等待時長。
參考:《Java併發編程的藝術》-方騰飛