簡介:CyclicBarrier 的字面意思是可循環使用(Cyclic)的屏障(Barrier)。它要作的事情是,讓一組線程到達一個屏障(也能夠叫同步點)時被阻塞,直到最後一個線程到達屏障時,屏障纔會開門,全部被屏障攔截的線程纔會繼續幹活。CyclicBarrier默認的構造方法是CyclicBarrier(int parties),其參數表示屏障攔截的線程數量,每一個線程調用await方法告訴CyclicBarrier我已經到達了屏障,而後當前線程被阻塞。java
1 package com.cattsoft.sync; 2 3 import java.util.concurrent.CyclicBarrier; 4 import java.util.concurrent.ExecutorService; 5 import java.util.concurrent.Executors; 6 7 /** 8 * 表示你們彼此等待,集合後纔開始出發(張家界旅遊,早上等車,大峽谷進去集合(範乾),大峽谷出來集合) 9 * 工做中沒準就有這種應用,當之後的工做中作什麼項目,可能用到這個技術,就要聯想起來 10 * 理論上是必定有這種需求,jdk開發者纔會搞這種技術,只是目前還沒碰到。 11 */ 12 public class CyclicBarrierTest { 13 14 public static void main(String[] args) { 15 ExecutorService executor = Executors.newCachedThreadPool(); 16 final CyclicBarrier cb = new CyclicBarrier(15); 17 for (int i = 0; i <15; i++) { 18 Runnable runnable = new Runnable() { 19 @Override 20 public void run() { 21 try { 22 // 獲取一個信號燈 23 Thread.sleep((long) (Math.random() * 10000)); 24 System.out.println("線程: " + Thread.currentThread().getName() + "即將到達集合地址1,當前已有" 25 + (cb.getNumberWaiting() + 1) + "個已經到達。" 26 + (cb.getNumberWaiting() == 14 ? "都到齊了,繼續往下走" : "正在等候")); 27 cb.await(); // 在這裏集合 28 29 Thread.sleep((long) (Math.random() * 10000)); 30 System.out.println("線程: " + Thread.currentThread().getName() + "即將到達集合地址2,當前已有" 31 + (cb.getNumberWaiting() + 1) + "個已經到達。" 32 + (cb.getNumberWaiting() == 14 ? "都到齊了,繼續往下走" : "正在等候")); 33 cb.await(); // 在這裏集合 34 35 Thread.sleep((long) (Math.random() * 10000)); 36 System.out.println("線程: " + Thread.currentThread().getName() + "即將到達集合地址3,當前已有" 37 + (cb.getNumberWaiting() + 1) + "個已經到達。" 38 + (cb.getNumberWaiting() == 14 ? "都到齊了,繼續往下走" : "正在等候")); 39 cb.await(); // 在這裏集合 40 } catch (Exception e) { 41 e.printStackTrace(); 42 } 43 } 44 }; 45 executor.execute(runnable); 46 } 47 executor.shutdown(); 48 } 49 }
若是把new CyclicBarrier(15)修改爲new CyclicBarrier(16)則主線程和子線程會永遠等待,由於沒有第16個線程執行await方法,即沒有第16個線程到達屏障,因此以前到達屏障的兩個線程都不會繼續執行。多線程
CyclicBarrier還提供一個更高級的構造函數CyclicBarrier(int parties, Runnable barrierAction),用於在線程到達屏障時,優先執行barrierAction,方便處理更復雜的業務場景。代碼以下:併發
1 public class CyclicBarrierTest2 { 2 3 static CyclicBarrier c = new CyclicBarrier(2, new A()); 4 5 public static void main(String[] args) { 6 new Thread(new Runnable() { 7 8 @Override 9 public void run() { 10 try { 11 c.await(); 12 } catch (Exception e) { 13 14 } 15 System.out.println(1); 16 } 17 }).start(); 18 19 try { 20 c.await(); 21 } catch (Exception e) { 22 23 } 24 System.out.println(2); 25 } 26 27 static class A implements Runnable { 28 29 @Override 30 public void run() { 31 System.out.println(3); 32 } 33 34 } 35 36 }
輸出:dom
1 3 2 1 3 2
CyclicBarrier能夠用於多線程計算數據,最後合併計算結果的應用場景。好比咱們用一個Excel保存了用戶全部銀行流水,每一個Sheet保存一個賬戶近一年的每筆銀行流水,如今須要統計用戶的日均銀行流水,先用多線程處理每一個sheet裏的銀行流水,都執行完以後,獲得每一個sheet的日均銀行流水,最後,再用barrierAction用這些線程的計算結果,計算出整個Excel的日均銀行流水。ide
控制併發線程數函數
Semaphore是一種在多線程環境下使用的設施,該設施負責協調各個線程,以保證它們可以正確、合理的使用公共資源的設施,也是操做系統中用於控制進程同步互斥的量。工具
1 package com.cattsoft.sync; 2 3 import java.util.concurrent.ExecutorService; 4 import java.util.concurrent.Executors; 5 import java.util.concurrent.Semaphore; 6 7 /** 8 * 信號燈同步工具 9 * 與互斥鎖對比,信號燈能夠由其它線程釋放,互斥鎖只能由本身釋放 10 * 模擬場景:如今有3個廁所坑,有5我的要進去,先進去3我的,其它2個等待,有人出來,等待的人再進去 11 * <p> 12 * TTP: 13 * 知識很容易就學會,最大的困難是在之後的工做中遇到困難了,正好能想到用這個工具 14 * 每一個藥能治什麼病,很容易學會,難就難在當看到一個病人之後,能想起來就這個藥,能想到這一步你就是華佗。 15 */ 16 public class SemaphoreTest { 17 public static void main(String[] args) { 18 ExecutorService executor = Executors.newFixedThreadPool(5); 19 20 //可以使用的公共資源數量 21 final Semaphore sp = new Semaphore(3); 22 23 for (int i = 0; i < 5; i++) { 24 Runnable runnable = new Runnable() { 25 @Override 26 public void run() { 27 try { 28 // 獲取一個信號燈 29 sp.acquire(); 30 } catch (InterruptedException e) { 31 e.printStackTrace(); 32 } 33 System.out.println("線程: " + Thread.currentThread().getName() + "進入,當前已有" 34 + (3 - sp.availablePermits()) + "個併發。"); 35 try { 36 Thread.sleep((long) (Math.random() * 10000)); 37 } catch (InterruptedException e) { 38 e.printStackTrace(); 39 } 40 System.out.println("線程" + Thread.currentThread().getName() + "即將離開。"); 41 sp.release(); 42 System.out.println("線程" + Thread.currentThread().getName() + "已離開,當前已有" 43 + (3 - sp.availablePermits()) + "個併發"); 44 } 45 }; 46 executor.execute(runnable); 47 } 48 executor.shutdown(); 49 } 50 }
等待多線程完成,CountDownLatch 容許一個或多個線程等待其餘線程完成操做。ui
假若有這樣一個需求,當咱們須要解析一個Excel裏多個sheet的數據時,能夠考慮使用多線程,每一個線程解析一個sheet裏的數據,等到全部的sheet都解析完以後,程序須要提示解析完成。在這個需求中,要實現主線程等待全部線程完成sheet的解析操做,最簡單的作法是使用join。代碼以下:spa
1 public class JoinCountDownLatchTest { 2 3 public static void main(String[] args) throws InterruptedException { 4 Thread parser1 = new Thread(new Runnable() { 5 @Override 6 public void run() { 7 } 8 }); 9 10 Thread parser2 = new Thread(new Runnable() { 11 @Override 12 public void run() { 13 System.out.println("parser2 finish"); 14 } 15 }); 16 17 parser1.start(); 18 parser2.start(); 19 parser1.join(); 20 parser2.join(); 21 System.out.println("all parser finish"); 22 } 23 24 }
1 package com.cattsoft.sync; 2 3 import java.util.concurrent.CountDownLatch; 4 import java.util.concurrent.ExecutorService; 5 import java.util.concurrent.Executors; 6 7 /** 8 * 倒計時計數器 9 * 模擬場景: 10 * 三個運動員跑步比賽,聽到一聽哨響,開始跑,所有跑到了,一塊兒公佈成績。 11 */ 12 public class CoutDownLatchTest { 13 public static void main(String[] args) { 14 ExecutorService executor = Executors.newCachedThreadPool(); 15 final CountDownLatch cdOrder = new CountDownLatch(1); 16 final CountDownLatch cdAnswer = new CountDownLatch(3); 17 18 for (int i = 0; i < 3; i++) { 19 Runnable runnable = new Runnable() { 20 @Override 21 public void run() { 22 try { 23 System.out.println("線程"+ Thread.currentThread().getName() + "正在準備接受命令"); 24 cdOrder.await(); 25 System.out.println("線程" + Thread.currentThread().getName() + "已經接受命令"); 26 27 Thread.sleep((long) (Math.random() * 10000)); 28 System.out.println("線程"+ Thread.currentThread().getName() + "迴應此命令的處理結果"); 29 cdAnswer.countDown(); 30 31 } catch (Exception e) { 32 e.printStackTrace(); 33 } 34 } 35 }; 36 executor.execute(runnable); 37 } 38 39 try { 40 Thread.sleep((long) (Math.random() * 10000)); 41 System.out.println("線程"+ Thread.currentThread().getName() + "即將發佈命令"); 42 cdOrder.countDown(); 43 System.out.println("線程"+ Thread.currentThread().getName() + "已經發送命令,正在等待結果"); 44 cdAnswer.await(); 45 System.out.println("線程"+ Thread.currentThread().getName() + "已收到全部響應結果"); 46 } catch (Exception e) { 47 e.printStackTrace(); 48 } 49 executor.shutdown(); 50 } 51 } 52 53 54 Map<String,Integer> linkMap = new LinkedHashMap<>(); 55 for(Entry<String,Integer> newEntry :result){ 56 linkMap.put(newEntry.getKey(), newEntry.getValue()); 57 } 58 for(Map.Entry<String, Integer> mapEntry : linkMap.entrySet()){ 59 System.out.println("key:"+mapEntry.getKey()+" value:"+mapEntry.getValue()); 60 } 61 }
isBroken的使用代碼以下:操作系統
1 import java.util.concurrent.BrokenBarrierException; 2 import java.util.concurrent.CyclicBarrier; 3 4 public class CyclicBarrierTest3 { 5 6 static CyclicBarrier c = new CyclicBarrier(2); 7 8 public static void main(String[] args) throws InterruptedException, BrokenBarrierException { 9 Thread thread = new Thread(new Runnable() { 10 11 @Override 12 public void run() { 13 try { 14 c.await(); 15 } catch (Exception e) { 16 } 17 } 18 }); 19 thread.start(); 20 thread.interrupt(); 21 try { 22 c.await(); 23 } catch (Exception e) { 24 System.out.println(c.isBroken()); 25 } 26 } 27 }