線程工具類

                                                  同步併發工具類

同步屏障CyclicBarrier

簡介:CyclicBarrier 的字面意思是可循環使用(Cyclic)的屏障(Barrier)。它要作的事情是,讓一組線程到達一個屏障(也能夠叫同步點)時被阻塞,直到最後一個線程到達屏障時,屏障纔會開門,全部被屏障攔截的線程纔會繼續幹活。CyclicBarrier默認的構造方法是CyclicBarrier(int parties),其參數表示屏障攔截的線程數量,每一個線程調用await方法告訴CyclicBarrier我已經到達了屏障,而後當前線程被阻塞。java

 1 package com.cattsoft.sync;
 2 
 3 import java.util.concurrent.CyclicBarrier;
 4 import java.util.concurrent.ExecutorService;
 5 import java.util.concurrent.Executors;
 6 
 7 /**
 8  * 表示你們彼此等待,集合後纔開始出發(張家界旅遊,早上等車,大峽谷進去集合(範乾),大峽谷出來集合)
 9  * 工做中沒準就有這種應用,當之後的工做中作什麼項目,可能用到這個技術,就要聯想起來
10  * 理論上是必定有這種需求,jdk開發者纔會搞這種技術,只是目前還沒碰到。
11  */
12 public class CyclicBarrierTest {
13 
14     public static void main(String[] args) {
15         ExecutorService executor = Executors.newCachedThreadPool();
16         final CyclicBarrier cb = new CyclicBarrier(15);
17         for (int i = 0; i <15; i++) {
18             Runnable runnable = new Runnable() {
19                 @Override
20                 public void run() {
21                     try {
22                         // 獲取一個信號燈
23                         Thread.sleep((long) (Math.random() * 10000));
24                         System.out.println("線程: " + Thread.currentThread().getName() + "即將到達集合地址1,當前已有"
25                                 + (cb.getNumberWaiting() + 1) + "個已經到達。"
26                                 + (cb.getNumberWaiting() == 14 ? "都到齊了,繼續往下走" : "正在等候"));
27                         cb.await(); // 在這裏集合
28                         
29                         Thread.sleep((long) (Math.random() * 10000));
30                         System.out.println("線程: " + Thread.currentThread().getName() + "即將到達集合地址2,當前已有"
31                                 + (cb.getNumberWaiting() + 1) + "個已經到達。"
32                                 + (cb.getNumberWaiting() == 14 ? "都到齊了,繼續往下走" : "正在等候"));
33                         cb.await(); // 在這裏集合
34                         
35                         Thread.sleep((long) (Math.random() * 10000));
36                         System.out.println("線程: " + Thread.currentThread().getName() + "即將到達集合地址3,當前已有"
37                                 + (cb.getNumberWaiting() + 1) + "個已經到達。"
38                                 + (cb.getNumberWaiting() == 14 ? "都到齊了,繼續往下走" : "正在等候"));
39                         cb.await(); // 在這裏集合
40                     } catch (Exception e) {
41                         e.printStackTrace();
42                     }
43                 }
44             };
45             executor.execute(runnable);
46         }
47         executor.shutdown();
48     }
49 }

若是把new CyclicBarrier(15)修改爲new CyclicBarrier(16)則主線程和子線程會永遠等待,由於沒有第16個線程執行await方法,即沒有第16個線程到達屏障,因此以前到達屏障的兩個線程都不會繼續執行。多線程

CyclicBarrier還提供一個更高級的構造函數CyclicBarrier(int parties, Runnable barrierAction),用於在線程到達屏障時,優先執行barrierAction,方便處理更復雜的業務場景。代碼以下:併發

 1 public class CyclicBarrierTest2 {
 2 
 3     static CyclicBarrier c = new CyclicBarrier(2, new A());
 4 
 5     public static void main(String[] args) {
 6         new Thread(new Runnable() {
 7 
 8             @Override
 9             public void run() {
10                 try {
11                     c.await();
12                 } catch (Exception e) {
13 
14                 }
15                 System.out.println(1);
16             }
17         }).start();
18 
19         try {
20             c.await();
21         } catch (Exception e) {
22 
23         }
24         System.out.println(2);
25     }
26 
27     static class A implements Runnable {
28 
29         @Override
30         public void run() {
31             System.out.println(3);
32         }
33 
34     }
35 
36 }

輸出:dom

1 3
2 1
3 2

CyclicBarrier的應用場景

CyclicBarrier能夠用於多線程計算數據,最後合併計算結果的應用場景。好比咱們用一個Excel保存了用戶全部銀行流水,每一個Sheet保存一個賬戶近一年的每筆銀行流水,如今須要統計用戶的日均銀行流水,先用多線程處理每一個sheet裏的銀行流水,都執行完以後,獲得每一個sheet的日均銀行流水,最後,再用barrierAction用這些線程的計算結果,計算出整個Excel的日均銀行流水。ide

 

同步信號量Semaphore

控制併發線程數函數

Semaphore是一種在多線程環境下使用的設施,該設施負責協調各個線程,以保證它們可以正確、合理的使用公共資源的設施,也是操做系統中用於控制進程同步互斥的量。工具

以一個停車場是運做爲例。爲了簡單起見,假設停車場只有三個車位,一開始三個車位都是空的。這時若是同時來了五輛車,看門人容許其中三輛不受阻礙的進入,而後放下車攔,剩下的車則必須在入口等待,此後來的車也都不得不在入口處等待。這時,有一輛車離開停車場,看門人得知後,打開車攔,放入一輛,若是又離開兩輛,則又能夠放入兩輛,如此往復。
在這個 停車場系統中,車位是公共資源,每輛車比如一個線程,看門人起的就是 信號量的做用。
更進一步,信號量的特性以下:信號量是一個非負整數(車位數),全部經過它的線程(車輛)都會將該整數減一(經過它固然是爲了使用資源),當該整數值爲零時,全部試圖經過它的線程都將處於等待狀態。在信號量上咱們定義兩種操做: Wait(等待) 和 Release(釋放)。 當一個線程調用Wait(等待)操做時,它要麼經過而後將信號量減一,要麼一直等下去,直到信號量大於一或超時。Release(釋放)其實是在 信號量上執行加操做,對應於車輛離開停車場,該操做之因此叫作「釋放」是由於加操做其實是釋放了由信號量守護的資源。
在java中,還能夠設置該信號量是否採用公平模式,若是以公平方式執行,則線程將會按到達的順序(FIFO)執行,若是是非公平,則能夠後請求的有可能排在隊列的頭部。
 1 package com.cattsoft.sync;
 2 
 3 import java.util.concurrent.ExecutorService;
 4 import java.util.concurrent.Executors;
 5 import java.util.concurrent.Semaphore;
 6 
 7 /**
 8  * 信號燈同步工具
 9  * 與互斥鎖對比,信號燈能夠由其它線程釋放,互斥鎖只能由本身釋放
10  * 模擬場景:如今有3個廁所坑,有5我的要進去,先進去3我的,其它2個等待,有人出來,等待的人再進去
11  * <p>
12  * TTP:
13  * 知識很容易就學會,最大的困難是在之後的工做中遇到困難了,正好能想到用這個工具
14  * 每一個藥能治什麼病,很容易學會,難就難在當看到一個病人之後,能想起來就這個藥,能想到這一步你就是華佗。
15  */
16 public class SemaphoreTest {
17     public static void main(String[] args) {
18         ExecutorService executor = Executors.newFixedThreadPool(5);
19 
20         //可以使用的公共資源數量
21         final Semaphore sp = new Semaphore(3);
22 
23         for (int i = 0; i < 5; i++) {
24             Runnable runnable = new Runnable() {
25                 @Override
26                 public void run() {
27                     try {
28                         // 獲取一個信號燈
29                         sp.acquire();
30                     } catch (InterruptedException e) {
31                         e.printStackTrace();
32                     }
33                     System.out.println("線程: " + Thread.currentThread().getName() + "進入,當前已有"
34                             + (3 - sp.availablePermits()) + "個併發。");
35                     try {
36                         Thread.sleep((long) (Math.random() * 10000));
37                     } catch (InterruptedException e) {
38                         e.printStackTrace();
39                     }
40                     System.out.println("線程" + Thread.currentThread().getName() + "即將離開。");
41                     sp.release();
42                     System.out.println("線程" + Thread.currentThread().getName() + "已離開,當前已有"
43                             + (3 - sp.availablePermits()) + "個併發");
44                 }
45             };
46             executor.execute(runnable);
47         }
48         executor.shutdown();
49     }
50 }

CountDownLatch

等待多線程完成,CountDownLatch 容許一個或多個線程等待其餘線程完成操做。ui

應用場景

假若有這樣一個需求,當咱們須要解析一個Excel裏多個sheet的數據時,能夠考慮使用多線程,每一個線程解析一個sheet裏的數據,等到全部的sheet都解析完以後,程序須要提示解析完成。在這個需求中,要實現主線程等待全部線程完成sheet的解析操做,最簡單的作法是使用join。代碼以下:spa

 1 public class JoinCountDownLatchTest {
 2 
 3     public static void main(String[] args) throws InterruptedException {
 4         Thread parser1 = new Thread(new Runnable() {
 5             @Override
 6             public void run() {
 7             }
 8         });
 9 
10         Thread parser2 = new Thread(new Runnable() {
11             @Override
12             public void run() {
13                 System.out.println("parser2 finish");
14             }
15         });
16 
17         parser1.start();
18         parser2.start();
19         parser1.join();
20         parser2.join();
21         System.out.println("all parser finish");
22     }
23 
24 }

 

 1 package com.cattsoft.sync;
 2 
 3 import java.util.concurrent.CountDownLatch;
 4 import java.util.concurrent.ExecutorService;
 5 import java.util.concurrent.Executors;
 6 
 7 /**
 8  * 倒計時計數器
 9  * 模擬場景:
10  *  三個運動員跑步比賽,聽到一聽哨響,開始跑,所有跑到了,一塊兒公佈成績。
11  */
12 public class CoutDownLatchTest {
13     public static void main(String[] args) {
14         ExecutorService executor = Executors.newCachedThreadPool();
15         final CountDownLatch cdOrder = new CountDownLatch(1);
16         final CountDownLatch cdAnswer = new CountDownLatch(3);
17 
18         for (int i = 0; i < 3; i++) {
19             Runnable runnable = new Runnable() {
20                 @Override
21                 public void run() {
22                     try {
23                         System.out.println("線程"+ Thread.currentThread().getName() + "正在準備接受命令");
24                         cdOrder.await();
25                         System.out.println("線程" + Thread.currentThread().getName() + "已經接受命令");
26 
27                         Thread.sleep((long) (Math.random() * 10000));
28                         System.out.println("線程"+ Thread.currentThread().getName() + "迴應此命令的處理結果");
29                         cdAnswer.countDown();
30 
31                     } catch (Exception e) {
32                         e.printStackTrace();
33                     }
34                 }
35             };
36             executor.execute(runnable);
37         }
38 
39         try {
40             Thread.sleep((long) (Math.random() * 10000));
41             System.out.println("線程"+ Thread.currentThread().getName() + "即將發佈命令");
42             cdOrder.countDown();
43             System.out.println("線程"+ Thread.currentThread().getName() + "已經發送命令,正在等待結果");
44             cdAnswer.await();
45             System.out.println("線程"+ Thread.currentThread().getName() + "已收到全部響應結果");
46         } catch (Exception e) {
47             e.printStackTrace();
48         }
49         executor.shutdown();
50     }
51 }
52 
53 
54   Map<String,Integer> linkMap = new LinkedHashMap<>();
55              for(Entry<String,Integer> newEntry :result){
56                  linkMap.put(newEntry.getKey(), newEntry.getValue());            
57              }
58              for(Map.Entry<String, Integer> mapEntry : linkMap.entrySet()){
59                  System.out.println("key:"+mapEntry.getKey()+"  value:"+mapEntry.getValue());
60              }
61          } 

 

 

CyclicBarrier和CountDownLatch的區別

  • CountDownLatch的計數器只能使用一次。而CyclicBarrier的計數器可使用reset() 方法重置。因此CyclicBarrier能處理更爲複雜的業務場景,好比若是計算髮生錯誤,能夠重置計數器,並讓線程們從新執行一次。
  • CyclicBarrier還提供其餘有用的方法,好比getNumberWaiting方法能夠得到CyclicBarrier阻塞的線程數量。isBroken方法用來知道阻塞的線程是否被中斷。好比如下代碼執行完以後會返回true。

isBroken的使用代碼以下:操作系統

 1 import java.util.concurrent.BrokenBarrierException;
 2 import java.util.concurrent.CyclicBarrier;
 3 
 4 public class CyclicBarrierTest3 {
 5 
 6     static CyclicBarrier c = new CyclicBarrier(2);
 7 
 8     public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
 9         Thread thread = new Thread(new Runnable() {
10 
11             @Override
12             public void run() {
13                 try {
14                     c.await();
15                 } catch (Exception e) {
16                 }
17             }
18         });
19         thread.start();
20         thread.interrupt();
21         try {
22             c.await();
23         } catch (Exception e) {
24             System.out.println(c.isBroken());
25         }
26     }
27 }
相關文章
相關標籤/搜索