java併發之併發工具

  在JDK的併發包裏提供了幾個很是有用的併發工具類。CountDownLatch、CyclicBarrier和Semaphore工具類提供了一種併發流程控制的手段,Exchanger工具類則提供了在線程間交換數據的一種手段。本章會配合一些應用場景來介紹如何使用這些工具類。java

1,等待多線程完成的CountDownLatch

  CountDownLatch容許一個或多個線程等待其餘線程完成操做
  假若有這樣一個需求:咱們須要解析一個Excel裏多個sheet的數據,此時能夠考慮使用多線程,每一個線程解析一個sheet裏的數據,等到全部的sheet都解析完以後,程序須要提示解析完成(或者彙總結果)。在這個需求中,要實現主線程等待全部線程完成sheet的解析操做,最簡單的作法是使用join()方法,如代碼清單8-1所示。算法

 1 import java.util.Random;
 2 import java.util.concurrent.atomic.AtomicInteger;
 3  
 4 public class JoinCountDownLatchTest {
 5     private static Random sr=new Random(47); 
 6     private static AtomicInteger result=new AtomicInteger(0);
 7     private static int threadCount=10;
 8     private static class Parser implements Runnable{ 
 9         String name;
10         public Parser(String name){
11             this.name=name;
12         }
13         @Override
14         public void run() {
15             int sum=0;
16             int seed=Math.abs(sr.nextInt()) ;
17             Random r=new Random(47); 
18             for(int i=0;i<100;i++){  
19                 sum+=r.nextInt(seed);
20             }  
21             result.addAndGet(sum);
22             System.out.println(name+"線程的解析結果:"+sum);
23         } 
24     }
25     public static void main(String[] args) throws InterruptedException {
26         Thread[] threads=new Thread[threadCount];
27         for(int i=0;i<threadCount;i++){
28             threads[i]=new Thread(new Parser("Parser-"+i));
29         } 
30         for(int i=0;i<threadCount;i++){
31             threads[i].start();
32         } 
33         for(int i=0;i<threadCount;i++){
34             threads[i].join();
35         } 
36         System.out.println("全部線程解析結束!");
37         System.out.println("全部線程的解析結果:"+result);
38     } 
39 }

輸出:數據庫

 

Parser-1線程的解析結果:-2013585201
Parser-0線程的解析結果:1336321192
Parser-2線程的解析結果:908136818
Parser-5線程的解析結果:-1675827227
Parser-3線程的解析結果:1638121055
Parser-4線程的解析結果:1513365118
Parser-6線程的解析結果:489607354
Parser-8線程的解析結果:1513365118
Parser-7線程的解析結果:-1191966831
Parser-9線程的解析結果:-912399159
全部線程解析結束!
全部線程的解析結果:1605138237
  join用於讓當前執行線程等待join線程執行結束。其實現原理是不停檢查join線程是否存活,若是join線程存活則讓當前線程永遠等待。其中,wait(0)表示永遠等待下去,代碼片斷以下。join在內部使用wait進行等待。編程

 1 public class Thread implements Runnable {
 2     ......
 3     public final void join() throws InterruptedException {
 4         join(0);
 5     }
 6     public final synchronized void join(long millis)
 7     throws InterruptedException {
 8         long base = System.currentTimeMillis();
 9         long now = 0;
10  
11         if (millis < 0) {
12             throw new IllegalArgumentException("timeout value is negative");
13         }
14  
15         if (millis == 0) {//執行到這裏
16             while (isAlive()) {
17                 wait(0);//main線程永遠等待join線程
18             }
19         } else {
20             while (isAlive()) {
21                 long delay = millis - now;
22                 if (delay <= 0) {
23                     break;
24                 }
25                 wait(delay);
26                 now = System.currentTimeMillis() - base;
27             }
28         }
29     }
30     ......
31 }
View Code

  直到join線程停止後,線程的this.notifyAll()方法會被調用,調用notifyAll()方法是在JVM裏實現的,因此在JDK裏看不到,你們能夠查看JVM源碼。多線程

  在JDK 1.5以後的併發包中提供的CountDownLatch也能夠實現join的功能,而且比join的功能更多,如代碼清單8-2所示。併發

 1 import java.util.Random;
 2 import java.util.concurrent.CountDownLatch;
 3 import java.util.concurrent.atomic.AtomicInteger;
 4  
 5 public class CountDownLatchTest {
 6     private static Random sr=new Random(47); 
 7     private static AtomicInteger result=new AtomicInteger(0);
 8     private static int threadCount=10;//線程數量
 9     private static CountDownLatch countDown=new CountDownLatch(threadCount);//CountDownLatch
10     private static class Parser implements Runnable{ 
11         String name;
12         public Parser(String name){
13             this.name=name;
14         }
15         @Override
16         public void run() {
17             int sum=0;
18             int seed=Math.abs(sr.nextInt()) ;
19             Random r=new Random(47); 
20             for(int i=0;i<100;i++){  
21                 sum+=r.nextInt(seed);
22             }  
23             result.addAndGet(sum);
24             System.out.println(name+"線程的解析結果:"+sum);
25             countDown.countDown();//注意這裏
26         } 
27     }
28     public static void main(String[] args) throws InterruptedException {
29         Thread[] threads=new Thread[threadCount];
30         for(int i=0;i<threadCount;i++){
31             threads[i]=new Thread(new Parser("Parser-"+i));
32         } 
33         for(int i=0;i<threadCount;i++){
34             threads[i].start();
35         } 
36         /*
37         for(int i=0;i<threadCount;i++){
38             threads[i].join();
39         }*/
40         countDown.await();//將join改成使用CountDownLatch
41         System.out.println("全部線程解析結束!");
42         System.out.println("全部線程的解析結果:"+result);
43     } 
44 }

輸出:app

Parser-0線程的解析結果:1336321192
Parser-1線程的解析結果:-2013585201
Parser-2線程的解析結果:-1675827227
Parser-4線程的解析結果:1638121055
Parser-3線程的解析結果:908136818
Parser-5線程的解析結果:1513365118
Parser-7線程的解析結果:489607354
Parser-6線程的解析結果:1513365118
Parser-8線程的解析結果:-1191966831
Parser-9線程的解析結果:-912399159
全部線程解析結束!
全部線程的解析結果:1605138237less

  CountDownLatch的構造函數接收一個int類型的參數做爲計數器,若是你想等待N個點完成,這裏就傳入N。
  當咱們調用CountDownLatch的countDown方法時,N就會減1,CountDownLatch的await方法會阻塞當前線程,直到N變成零。因爲countDown方法能夠用在任何地方,因此這裏說的N個點,能夠是N個線程,也能夠是1個線程裏的N個執行步驟。用在多個線程時,只須要把這個CountDownLatch的引用傳遞到線程裏便可。
  若是有某個解析sheet的線程處理得比較慢,咱們不可能讓主線程一直等待,因此可使用另一個帶指定時間的await方法——await(long time,TimeUnit unit),這個方法等待特定時間後,就會再也不阻塞當前線程。join也有相似的方法。
  注意:計數器必須大於等於0,只是等於0時候,計數器就是零,調用await方法時不會阻塞當前線程。CountDownLatch不可能從新初始化或者修改CountDownLatch對象的內部計數器的值。一個線程調用countDown方法happen-before,另一個線程調用await方法。dom

 1 public class CountDownLatch {
 2     /**Synchronization control For CountDownLatch. Uses AQS state to represent count.*/
 3     private static final class Sync extends AbstractQueuedSynchronizer {
 4         private static final long serialVersionUID = 4982264981922014374L;
 5  
 6         Sync(int count) {
 7             setState(count);//初始化同步狀態
 8         }
 9  
10         int getCount() {
11             return getState();
12         }
13  
14         protected int tryAcquireShared(int acquires) {
15             return (getState() == 0) ? 1 : -1;
16         }
17  
18         protected boolean tryReleaseShared(int releases) {
19             // Decrement count; signal when transition to zero
20             for (;;) {
21                 int c = getState();
22                 if (c == 0)
23                     return false;
24                 int nextc = c-1;
25                 if (compareAndSetState(c, nextc))
26                     return nextc == 0;
27             }
28         }
29     }
30  
31     private final Sync sync;//組合一個同步器(AQS)
32  
33     public CountDownLatch(int count) {
34         if (count < 0) throw new IllegalArgumentException("count < 0");
35         this.sync = new Sync(count);//初始化同步狀態
36     }
37     /*Causes the current thread to wait until the latch has counted down to
38      * zero, unless the thread is {@linkplain Thread#interrupt interrupted}.*/
39     public void await() throws InterruptedException {
40         sync.acquireSharedInterruptibly(1);//
41     }
42  
43     public boolean await(long timeout, TimeUnit unit)
44         throws InterruptedException {
45         return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
46     }
47     public void countDown() {
48         sync.releaseShared(1);//釋放同步狀態
49     }
50  
51     public long getCount() {
52         return sync.getCount();
53     }
54  
55     public String toString() {
56         return super.toString() + "[Count = " + sync.getCount() + "]";
57     }
58 }
View Code

2,同步屏障CyclicBarrier

  CyclicBarrier的字面意思是可循環使用(Cyclic)的屏障(Barrier)。它要作的事情是,讓一組線程到達一個屏障(也能夠叫同步點)時被阻塞直到最後一個線程到達屏障時,屏障纔會開門,全部被屏障攔截的線程纔會繼續運行ide

  CyclicBarrier默認的構造方法是CyclicBarrier(int parties),其參數表示屏障攔截的線程數量每一個線程調用await方法告訴CyclicBarrier我已經到達了屏障,而後當前線程被阻塞。但阻塞數到達設置的攔截參數,則線程一塊兒越過屏障。

 1 import java.util.Random; 
 2 import java.util.concurrent.CyclicBarrier;
 3 import java.util.concurrent.atomic.AtomicInteger;
 4  
 5  
 6 public class CyclicBarrierTest {
 7  
 8     private static Random sr=new Random(47); 
 9     private static AtomicInteger result=new AtomicInteger(0);
10     private static int threadCount=10;
11     //屏障後面執行彙總
12     private static CyclicBarrier barrier=new CyclicBarrier(threadCount,new Accumulate());
13     private static class Parser implements Runnable{ 
14         String name;
15         public Parser(String name){
16             this.name=name;
17         }
18         @Override
19         public void run() {
20             int sum=0;
21             int seed=Math.abs(sr.nextInt()) ;
22             Random r=new Random(47); 
23             for(int i=0;i<(seed%100*100000);i++){  
24                 sum+=r.nextInt(seed); 
25             }  
26             result.addAndGet(sum);
27             System.out.println(System.currentTimeMillis()+"-"+name+"線程的解析結果:"+sum);
28             try { 
29                 barrier.await();
30                 System.out.println(System.currentTimeMillis()+"-"+name+"線程越過屏障!");
31             } catch (Exception e) {
32                 e.printStackTrace();
33             }
34         } 
35     }
36     static class Accumulate implements Runnable{ 
37         @Override
38         public void run() { 
39             System.out.println("全部線程解析結束!");
40             System.out.println("全部線程的解析結果:"+result);
41         } 
42     }
43     public static void main(String[] args) throws InterruptedException {
44         Thread[] threads=new Thread[threadCount];
45         for(int i=0;i<threadCount;i++){
46             threads[i]=new Thread(new Parser("Parser-"+i));
47         } 
48         for(int i=0;i<threadCount;i++){
49             threads[i].start();
50         }   
51     } 
52 }

輸出:

 

1471866228774-Parser-4線程的解析結果:631026992
1471866228930-Parser-3線程的解析結果:-372785277
1471866228961-Parser-1線程的解析結果:-938473891
1471866229008-Parser-7線程的解析結果:-396620018
1471866229008-Parser-2線程的解析結果:-1159985406
1471866229024-Parser-5線程的解析結果:-664234808
1471866229070-Parser-6線程的解析結果:556534377
1471866229117-Parser-9線程的解析結果:-844558478
1471866229383-Parser-0線程的解析結果:919864023
1471866229430-Parser-8線程的解析結果:-2104111089
全部線程解析結束!
全部線程的解析結果:-78376279
1471866229430-Parser-8線程越過屏障!
1471866229430-Parser-2線程越過屏障!
1471866229430-Parser-9線程越過屏障!
1471866229430-Parser-7線程越過屏障!
1471866229430-Parser-1線程越過屏障!
1471866229430-Parser-3線程越過屏障!
1471866229430-Parser-0線程越過屏障!
1471866229430-Parser-6線程越過屏障!
1471866229430-Parser-4線程越過屏障!
1471866229430-Parser-5線程越過屏障!
咱們發現,各個線程解析完成的時間不一致,可是越過屏障的時間倒是一致的。

 

CyclicBarrier和CountDownLatch的區別

  CountDownLatch的計數器只能使用一次,而CyclicBarrier的計數器可使用reset()方法重置。因此CyclicBarrier能處理更爲複雜的業務場景。例如,若是計算髮生錯誤,能夠重置計數器,並讓線程從新執行一次。
  CyclicBarrier還提供其餘有用的方法,好比getNumberWaiting方法能夠得到Cyclic-Barrier阻塞的線程數量。isBroken()方法用來了解阻塞的線程是否被中斷。

3,控制併發線程數的Semaphore

  Semaphore(信號量)是用來控制同時訪問特定資源的線程數量,它經過協調各個線程,以保證合理的使用公共資源。
  多年以來,我都以爲從字面上很難理解Semaphore所表達的含義,只能把它比做是控制流量的紅綠燈。好比××馬路要限制流量,只容許同時有一百輛車在這條路上行使,其餘的都必須在路口等待,因此前一百輛車會看到綠燈,能夠開進這條馬路,後面的車會看到紅燈,不能駛入××馬路,可是若是前一百輛中有5輛車已經離開了××馬路,那麼後面就容許有5輛車駛入馬路,這個例子裏說的車就是線程,駛入馬路就表示線程在執行,離開馬路就表示線程執行完成,看見紅燈就表示線程被阻塞,不能執行。

應用場景

  Semaphore能夠用於作流量控制,特別是公用資源有限的應用場景,好比數據庫鏈接。假若有一個需求,要讀取幾萬個文件的數據,由於都是IO密集型任務,咱們能夠啓動幾十個線程併發地讀取,可是若是讀到內存後,還須要存儲到數據庫中,而數據庫的鏈接數只有10個,這時咱們必須控制只有10個線程同時獲取數據庫鏈接保存數據,不然會報錯沒法獲取數據庫鏈接。這個時候,就可使用Semaphore來作流量控制,如代碼清單8-7所示。

 1 import java.util.concurrent.ExecutorService;
 2 import java.util.concurrent.Executors;
 3 import java.util.concurrent.Semaphore;
 4  
 5 public class SemaphoreTest { 
 6     private static final int THREAD_COUNT = 30;
 7     private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
 8     private static Semaphore s = new Semaphore(10);
 9     
10     public static void main(String[] args) {
11         for (int i = 0; i < THREAD_COUNT; i++) {
12             threadPool.execute(new Runnable() {
13                 @Override
14                 public void run() {
15                     try { 
16  s.acquire(); 17                         System.out.println("save data");
18  s.release();
19                     } catch (InterruptedException e) {
20                     }
21                 }
22             });
23         }
24         threadPool.shutdown();
25     }
26 }

  在代碼中,雖然有30個線程在執行,可是隻容許10個併發執行。Semaphore的構造方法Semaphore(int permits)接受一個整型的數字,表示可用的許可證數量。Semaphore(10)表示容許10個線程獲取許可證,也就是最大併發數是10。

  Semaphore的用法也很簡單,首先線程使用Semaphore的acquire()方法獲取一個許可證,使用完以後調用release()方法歸還許可證。還能夠用tryAcquire()方法嘗試獲取許可證。

其餘方法

Semaphore還提供一些其餘方法,具體以下。
int availablePermits():返回此信號量中當前可用的許可證數。
int getQueueLength():返回正在等待獲取許可證的線程數。
boolean hasQueuedThreads():是否有線程正在等待獲取許可證。
void reducePermits(int reduction):減小reduction個許可證,是個protected方法。
Collection getQueuedThreads():返回全部等待獲取許可證的線程集合,是個protected方法。

4,線程間交換數據的Exchanger

  Exchanger(交換者)是一個用於線程間協做的工具類。Exchanger用於進行線程間的數據交換。它提供一個同步點,在這個同步點,兩個線程能夠交換彼此的數據。這兩個線程經過exchange方法交換數據,若是第一個線程先執行exchange()方法,它會一直等待第二個線程也執行exchange方法,當兩個線程都到達同步點時,這兩個線程就能夠交換數據,將本線程生產出來的數據傳遞給對方。
下面來看一下Exchanger的應用場景。

  一、Exchanger能夠用於遺傳算法,遺傳算法裏須要選出兩我的做爲交配對象,這時候會交換兩人的數據,並使用交叉規則得出2個交配結果。

  二、Exchanger也能夠用於校對工做,好比咱們須要將紙製銀行流水經過人工的方式錄入成電子銀行流水,爲了不錯誤,採用AB崗兩人進行錄入,錄入到Excel以後,系統須要加載這兩個Excel,並對兩個Excel數據進行校對,看看是否錄入一致.

 1 import java.util.concurrent.Exchanger;
 2 import java.util.concurrent.ExecutorService;
 3 import java.util.concurrent.Executors;
 4  
 5 public class ExchangerTest {
 6  
 7     private static final Exchanger<String> exgr = new Exchanger<String>();
 8     private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
 9  
10     public static void main(String[] args) {
11         threadPool.execute(new Runnable() {
12             @Override
13             public void run() {
14                 try {
15                     String A = "銀行流水100";// A錄入銀行流水數據
16                     String B=exgr.exchange(A);
17                     System.out.println("A的視角:A和B數據是否一致:" + A.equals(B) + 
18 ",A錄入的是:" + A + ",B錄入是:" + B);
19                 } catch (InterruptedException e) {
20                 }
21             }
22         });
23         threadPool.execute(new Runnable() {
24             @Override
25             public void run() {
26                 try {
27                     String B = "銀行流水200";// B錄入銀行流水數據
28                     String A = exgr.exchange(B);
29                     System.out.println("B的視角:A和B數據是否一致:" + A.equals(B) + 
30 ",A錄入的是:" + A + ",B錄入是:" + B);
31                 } catch (InterruptedException e) {
32                 }
33             }
34         });
35         threadPool.shutdown();
36     }
37 }

輸出:

B的視角:A和B數據是否一致:false,A錄入的是:銀行流水100,B錄入是:銀行流水200
A的視角:A和B數據是否一致:false,A錄入的是:銀行流水100,B錄入是:銀行流水200

若是兩個線程有一個沒有執行exchange()方法,則會一直等待,若是擔憂有特殊狀況發生,避免一直等待,可使用exchange(V x,longtimeout,TimeUnit unit)設置最大等待時長。

內容源自:

《Java併發編程的藝術》

 

https://blog.csdn.net/sunxianghuang/article/details/52277394

相關文章
相關標籤/搜索