在JDK的併發包裏提供了幾個很是有用的併發工具類。CountDownLatch、CyclicBarrier和Semaphore工具類提供了一種併發流程控制的手段,Exchanger工具類則提供了在線程間交換數據的一種手段。本章會配合一些應用場景來介紹如何使用這些工具類。java
CountDownLatch容許一個或多個線程等待其餘線程完成操做。
假若有這樣一個需求:咱們須要解析一個Excel裏多個sheet的數據,此時能夠考慮使用多線程,每一個線程解析一個sheet裏的數據,等到全部的sheet都解析完以後,程序須要提示解析完成(或者彙總結果)。在這個需求中,要實現主線程等待全部線程完成sheet的解析操做,最簡單的作法是使用join()方法,如代碼清單8-1所示。算法
1 import java.util.Random; 2 import java.util.concurrent.atomic.AtomicInteger; 3 4 public class JoinCountDownLatchTest { 5 private static Random sr=new Random(47); 6 private static AtomicInteger result=new AtomicInteger(0); 7 private static int threadCount=10; 8 private static class Parser implements Runnable{ 9 String name; 10 public Parser(String name){ 11 this.name=name; 12 } 13 @Override 14 public void run() { 15 int sum=0; 16 int seed=Math.abs(sr.nextInt()) ; 17 Random r=new Random(47); 18 for(int i=0;i<100;i++){ 19 sum+=r.nextInt(seed); 20 } 21 result.addAndGet(sum); 22 System.out.println(name+"線程的解析結果:"+sum); 23 } 24 } 25 public static void main(String[] args) throws InterruptedException { 26 Thread[] threads=new Thread[threadCount]; 27 for(int i=0;i<threadCount;i++){ 28 threads[i]=new Thread(new Parser("Parser-"+i)); 29 } 30 for(int i=0;i<threadCount;i++){ 31 threads[i].start(); 32 } 33 for(int i=0;i<threadCount;i++){ 34 threads[i].join(); 35 } 36 System.out.println("全部線程解析結束!"); 37 System.out.println("全部線程的解析結果:"+result); 38 } 39 }
輸出:數據庫
Parser-1線程的解析結果:-2013585201
Parser-0線程的解析結果:1336321192
Parser-2線程的解析結果:908136818
Parser-5線程的解析結果:-1675827227
Parser-3線程的解析結果:1638121055
Parser-4線程的解析結果:1513365118
Parser-6線程的解析結果:489607354
Parser-8線程的解析結果:1513365118
Parser-7線程的解析結果:-1191966831
Parser-9線程的解析結果:-912399159
全部線程解析結束!
全部線程的解析結果:1605138237
join用於讓當前執行線程等待join線程執行結束。其實現原理是不停檢查join線程是否存活,若是join線程存活則讓當前線程永遠等待。其中,wait(0)表示永遠等待下去,代碼片斷以下。join在內部使用wait進行等待。編程
1 public class Thread implements Runnable { 2 ...... 3 public final void join() throws InterruptedException { 4 join(0); 5 } 6 public final synchronized void join(long millis) 7 throws InterruptedException { 8 long base = System.currentTimeMillis(); 9 long now = 0; 10 11 if (millis < 0) { 12 throw new IllegalArgumentException("timeout value is negative"); 13 } 14 15 if (millis == 0) {//執行到這裏 16 while (isAlive()) { 17 wait(0);//main線程永遠等待join線程 18 } 19 } else { 20 while (isAlive()) { 21 long delay = millis - now; 22 if (delay <= 0) { 23 break; 24 } 25 wait(delay); 26 now = System.currentTimeMillis() - base; 27 } 28 } 29 } 30 ...... 31 }
直到join線程停止後,線程的this.notifyAll()方法會被調用,調用notifyAll()方法是在JVM裏實現的,因此在JDK裏看不到,你們能夠查看JVM源碼。多線程
在JDK 1.5以後的併發包中提供的CountDownLatch也能夠實現join的功能,而且比join的功能更多,如代碼清單8-2所示。併發
1 import java.util.Random; 2 import java.util.concurrent.CountDownLatch; 3 import java.util.concurrent.atomic.AtomicInteger; 4 5 public class CountDownLatchTest { 6 private static Random sr=new Random(47); 7 private static AtomicInteger result=new AtomicInteger(0); 8 private static int threadCount=10;//線程數量 9 private static CountDownLatch countDown=new CountDownLatch(threadCount);//CountDownLatch 10 private static class Parser implements Runnable{ 11 String name; 12 public Parser(String name){ 13 this.name=name; 14 } 15 @Override 16 public void run() { 17 int sum=0; 18 int seed=Math.abs(sr.nextInt()) ; 19 Random r=new Random(47); 20 for(int i=0;i<100;i++){ 21 sum+=r.nextInt(seed); 22 } 23 result.addAndGet(sum); 24 System.out.println(name+"線程的解析結果:"+sum); 25 countDown.countDown();//注意這裏 26 } 27 } 28 public static void main(String[] args) throws InterruptedException { 29 Thread[] threads=new Thread[threadCount]; 30 for(int i=0;i<threadCount;i++){ 31 threads[i]=new Thread(new Parser("Parser-"+i)); 32 } 33 for(int i=0;i<threadCount;i++){ 34 threads[i].start(); 35 } 36 /* 37 for(int i=0;i<threadCount;i++){ 38 threads[i].join(); 39 }*/ 40 countDown.await();//將join改成使用CountDownLatch 41 System.out.println("全部線程解析結束!"); 42 System.out.println("全部線程的解析結果:"+result); 43 } 44 }
輸出:app
Parser-0線程的解析結果:1336321192
Parser-1線程的解析結果:-2013585201
Parser-2線程的解析結果:-1675827227
Parser-4線程的解析結果:1638121055
Parser-3線程的解析結果:908136818
Parser-5線程的解析結果:1513365118
Parser-7線程的解析結果:489607354
Parser-6線程的解析結果:1513365118
Parser-8線程的解析結果:-1191966831
Parser-9線程的解析結果:-912399159
全部線程解析結束!
全部線程的解析結果:1605138237less
CountDownLatch的構造函數接收一個int類型的參數做爲計數器,若是你想等待N個點完成,這裏就傳入N。
當咱們調用CountDownLatch的countDown方法時,N就會減1,CountDownLatch的await方法會阻塞當前線程,直到N變成零。因爲countDown方法能夠用在任何地方,因此這裏說的N個點,能夠是N個線程,也能夠是1個線程裏的N個執行步驟。用在多個線程時,只須要把這個CountDownLatch的引用傳遞到線程裏便可。
若是有某個解析sheet的線程處理得比較慢,咱們不可能讓主線程一直等待,因此可使用另一個帶指定時間的await方法——await(long time,TimeUnit unit),這個方法等待特定時間後,就會再也不阻塞當前線程。join也有相似的方法。
注意:計數器必須大於等於0,只是等於0時候,計數器就是零,調用await方法時不會阻塞當前線程。CountDownLatch不可能從新初始化或者修改CountDownLatch對象的內部計數器的值。一個線程調用countDown方法happen-before,另一個線程調用await方法。dom
1 public class CountDownLatch { 2 /**Synchronization control For CountDownLatch. Uses AQS state to represent count.*/ 3 private static final class Sync extends AbstractQueuedSynchronizer { 4 private static final long serialVersionUID = 4982264981922014374L; 5 6 Sync(int count) { 7 setState(count);//初始化同步狀態 8 } 9 10 int getCount() { 11 return getState(); 12 } 13 14 protected int tryAcquireShared(int acquires) { 15 return (getState() == 0) ? 1 : -1; 16 } 17 18 protected boolean tryReleaseShared(int releases) { 19 // Decrement count; signal when transition to zero 20 for (;;) { 21 int c = getState(); 22 if (c == 0) 23 return false; 24 int nextc = c-1; 25 if (compareAndSetState(c, nextc)) 26 return nextc == 0; 27 } 28 } 29 } 30 31 private final Sync sync;//組合一個同步器(AQS) 32 33 public CountDownLatch(int count) { 34 if (count < 0) throw new IllegalArgumentException("count < 0"); 35 this.sync = new Sync(count);//初始化同步狀態 36 } 37 /*Causes the current thread to wait until the latch has counted down to 38 * zero, unless the thread is {@linkplain Thread#interrupt interrupted}.*/ 39 public void await() throws InterruptedException { 40 sync.acquireSharedInterruptibly(1);// 41 } 42 43 public boolean await(long timeout, TimeUnit unit) 44 throws InterruptedException { 45 return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); 46 } 47 public void countDown() { 48 sync.releaseShared(1);//釋放同步狀態 49 } 50 51 public long getCount() { 52 return sync.getCount(); 53 } 54 55 public String toString() { 56 return super.toString() + "[Count = " + sync.getCount() + "]"; 57 } 58 }
CyclicBarrier的字面意思是可循環使用(Cyclic)的屏障(Barrier)。它要作的事情是,讓一組線程到達一個屏障(也能夠叫同步點)時被阻塞,直到最後一個線程到達屏障時,屏障纔會開門,全部被屏障攔截的線程纔會繼續運行。ide
CyclicBarrier默認的構造方法是CyclicBarrier(int parties),其參數表示屏障攔截的線程數量,每一個線程調用await方法告訴CyclicBarrier我已經到達了屏障,而後當前線程被阻塞。但阻塞數到達設置的攔截參數,則線程一塊兒越過屏障。
1 import java.util.Random; 2 import java.util.concurrent.CyclicBarrier; 3 import java.util.concurrent.atomic.AtomicInteger; 4 5 6 public class CyclicBarrierTest { 7 8 private static Random sr=new Random(47); 9 private static AtomicInteger result=new AtomicInteger(0); 10 private static int threadCount=10; 11 //屏障後面執行彙總 12 private static CyclicBarrier barrier=new CyclicBarrier(threadCount,new Accumulate()); 13 private static class Parser implements Runnable{ 14 String name; 15 public Parser(String name){ 16 this.name=name; 17 } 18 @Override 19 public void run() { 20 int sum=0; 21 int seed=Math.abs(sr.nextInt()) ; 22 Random r=new Random(47); 23 for(int i=0;i<(seed%100*100000);i++){ 24 sum+=r.nextInt(seed); 25 } 26 result.addAndGet(sum); 27 System.out.println(System.currentTimeMillis()+"-"+name+"線程的解析結果:"+sum); 28 try { 29 barrier.await(); 30 System.out.println(System.currentTimeMillis()+"-"+name+"線程越過屏障!"); 31 } catch (Exception e) { 32 e.printStackTrace(); 33 } 34 } 35 } 36 static class Accumulate implements Runnable{ 37 @Override 38 public void run() { 39 System.out.println("全部線程解析結束!"); 40 System.out.println("全部線程的解析結果:"+result); 41 } 42 } 43 public static void main(String[] args) throws InterruptedException { 44 Thread[] threads=new Thread[threadCount]; 45 for(int i=0;i<threadCount;i++){ 46 threads[i]=new Thread(new Parser("Parser-"+i)); 47 } 48 for(int i=0;i<threadCount;i++){ 49 threads[i].start(); 50 } 51 } 52 }
輸出:
1471866228774-Parser-4線程的解析結果:631026992
1471866228930-Parser-3線程的解析結果:-372785277
1471866228961-Parser-1線程的解析結果:-938473891
1471866229008-Parser-7線程的解析結果:-396620018
1471866229008-Parser-2線程的解析結果:-1159985406
1471866229024-Parser-5線程的解析結果:-664234808
1471866229070-Parser-6線程的解析結果:556534377
1471866229117-Parser-9線程的解析結果:-844558478
1471866229383-Parser-0線程的解析結果:919864023
1471866229430-Parser-8線程的解析結果:-2104111089
全部線程解析結束!
全部線程的解析結果:-78376279
1471866229430-Parser-8線程越過屏障!
1471866229430-Parser-2線程越過屏障!
1471866229430-Parser-9線程越過屏障!
1471866229430-Parser-7線程越過屏障!
1471866229430-Parser-1線程越過屏障!
1471866229430-Parser-3線程越過屏障!
1471866229430-Parser-0線程越過屏障!
1471866229430-Parser-6線程越過屏障!
1471866229430-Parser-4線程越過屏障!
1471866229430-Parser-5線程越過屏障!
咱們發現,各個線程解析完成的時間不一致,可是越過屏障的時間倒是一致的。
CountDownLatch的計數器只能使用一次,而CyclicBarrier的計數器可使用reset()方法重置。因此CyclicBarrier能處理更爲複雜的業務場景。例如,若是計算髮生錯誤,能夠重置計數器,並讓線程從新執行一次。
CyclicBarrier還提供其餘有用的方法,好比getNumberWaiting方法能夠得到Cyclic-Barrier阻塞的線程數量。isBroken()方法用來了解阻塞的線程是否被中斷。
Semaphore(信號量)是用來控制同時訪問特定資源的線程數量,它經過協調各個線程,以保證合理的使用公共資源。
多年以來,我都以爲從字面上很難理解Semaphore所表達的含義,只能把它比做是控制流量的紅綠燈。好比××馬路要限制流量,只容許同時有一百輛車在這條路上行使,其餘的都必須在路口等待,因此前一百輛車會看到綠燈,能夠開進這條馬路,後面的車會看到紅燈,不能駛入××馬路,可是若是前一百輛中有5輛車已經離開了××馬路,那麼後面就容許有5輛車駛入馬路,這個例子裏說的車就是線程,駛入馬路就表示線程在執行,離開馬路就表示線程執行完成,看見紅燈就表示線程被阻塞,不能執行。
Semaphore能夠用於作流量控制,特別是公用資源有限的應用場景,好比數據庫鏈接。假若有一個需求,要讀取幾萬個文件的數據,由於都是IO密集型任務,咱們能夠啓動幾十個線程併發地讀取,可是若是讀到內存後,還須要存儲到數據庫中,而數據庫的鏈接數只有10個,這時咱們必須控制只有10個線程同時獲取數據庫鏈接保存數據,不然會報錯沒法獲取數據庫鏈接。這個時候,就可使用Semaphore來作流量控制,如代碼清單8-7所示。
1 import java.util.concurrent.ExecutorService; 2 import java.util.concurrent.Executors; 3 import java.util.concurrent.Semaphore; 4 5 public class SemaphoreTest { 6 private static final int THREAD_COUNT = 30; 7 private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT); 8 private static Semaphore s = new Semaphore(10); 9 10 public static void main(String[] args) { 11 for (int i = 0; i < THREAD_COUNT; i++) { 12 threadPool.execute(new Runnable() { 13 @Override 14 public void run() { 15 try { 16 s.acquire(); 17 System.out.println("save data"); 18 s.release(); 19 } catch (InterruptedException e) { 20 } 21 } 22 }); 23 } 24 threadPool.shutdown(); 25 } 26 }
在代碼中,雖然有30個線程在執行,可是隻容許10個併發執行。Semaphore的構造方法Semaphore(int permits)接受一個整型的數字,表示可用的許可證數量。Semaphore(10)表示容許10個線程獲取許可證,也就是最大併發數是10。
Semaphore的用法也很簡單,首先線程使用Semaphore的acquire()方法獲取一個許可證,使用完以後調用release()方法歸還許可證。還能夠用tryAcquire()方法嘗試獲取許可證。
Semaphore還提供一些其餘方法,具體以下。
int availablePermits():返回此信號量中當前可用的許可證數。
int getQueueLength():返回正在等待獲取許可證的線程數。
boolean hasQueuedThreads():是否有線程正在等待獲取許可證。
void reducePermits(int reduction):減小reduction個許可證,是個protected方法。
Collection getQueuedThreads():返回全部等待獲取許可證的線程集合,是個protected方法。
Exchanger(交換者)是一個用於線程間協做的工具類。Exchanger用於進行線程間的數據交換。它提供一個同步點,在這個同步點,兩個線程能夠交換彼此的數據。這兩個線程經過exchange方法交換數據,若是第一個線程先執行exchange()方法,它會一直等待第二個線程也執行exchange方法,當兩個線程都到達同步點時,這兩個線程就能夠交換數據,將本線程生產出來的數據傳遞給對方。
下面來看一下Exchanger的應用場景。
一、Exchanger能夠用於遺傳算法,遺傳算法裏須要選出兩我的做爲交配對象,這時候會交換兩人的數據,並使用交叉規則得出2個交配結果。
二、Exchanger也能夠用於校對工做,好比咱們須要將紙製銀行流水經過人工的方式錄入成電子銀行流水,爲了不錯誤,採用AB崗兩人進行錄入,錄入到Excel以後,系統須要加載這兩個Excel,並對兩個Excel數據進行校對,看看是否錄入一致.
1 import java.util.concurrent.Exchanger; 2 import java.util.concurrent.ExecutorService; 3 import java.util.concurrent.Executors; 4 5 public class ExchangerTest { 6 7 private static final Exchanger<String> exgr = new Exchanger<String>(); 8 private static ExecutorService threadPool = Executors.newFixedThreadPool(2); 9 10 public static void main(String[] args) { 11 threadPool.execute(new Runnable() { 12 @Override 13 public void run() { 14 try { 15 String A = "銀行流水100";// A錄入銀行流水數據 16 String B=exgr.exchange(A); 17 System.out.println("A的視角:A和B數據是否一致:" + A.equals(B) + 18 ",A錄入的是:" + A + ",B錄入是:" + B); 19 } catch (InterruptedException e) { 20 } 21 } 22 }); 23 threadPool.execute(new Runnable() { 24 @Override 25 public void run() { 26 try { 27 String B = "銀行流水200";// B錄入銀行流水數據 28 String A = exgr.exchange(B); 29 System.out.println("B的視角:A和B數據是否一致:" + A.equals(B) + 30 ",A錄入的是:" + A + ",B錄入是:" + B); 31 } catch (InterruptedException e) { 32 } 33 } 34 }); 35 threadPool.shutdown(); 36 } 37 }
輸出:
B的視角:A和B數據是否一致:false,A錄入的是:銀行流水100,B錄入是:銀行流水200
A的視角:A和B數據是否一致:false,A錄入的是:銀行流水100,B錄入是:銀行流水200
若是兩個線程有一個沒有執行exchange()方法,則會一直等待,若是擔憂有特殊狀況發生,避免一直等待,可使用exchange(V x,longtimeout,TimeUnit unit)設置最大等待時長。
內容源自:
《Java併發編程的藝術》
https://blog.csdn.net/sunxianghuang/article/details/52277394