信號量,見文知義,經常使用於併發控制中的限流做用,我理解是限定數量的共享鎖機制。該共享資源最多同時可以讓n個線程訪問,超過n個線程就阻塞等待,若有資源空閒, 喚醒其餘等待線程(喚醒又分公平與非公平,默認非公平)好比一條四車道大橋,每次僅能併發經過4輛汽車,而在高峯期時100輛車涌入,此次須要一個信號燈來限制車輛,每次最多放行4輛車,在車輛經過後再放行。在併發環境下,每輛車就是如一個線程,4車道大橋就若有限的資源,須要控制線程的數量,在這種業務場景下,靠鎖同步的機制(如synchronized)力有不逮,java併發包中提供Semaphore類能夠幫助解決此類場景。java
應用場景: 一、資源控制:控制數據庫鏈接數,若有多個IO操做,每一個須要操做數據庫寫入,數據庫鏈接數有限,控制鏈接數據庫數量。 例:100個線程執行IO,只有10個mysql鏈接,最多同時能夠有10個線程獲取到鏈接,不然會報錯沒法獲取鏈接,這時可用信號量控制。 二、可當同步鎖使用,設置信號量通道等於1。
容許一個或多個線程等待其餘線程完成操做後再執行。其內部維護一個計數器,設置初始值給state,每調用 countDown()方法一次,state數量減1,調用await()方法的線程被阻塞,須要等待state減小爲0時纔可被喚醒繼續執行。mysql
應用場景: 一、一個任務要統計公司一星期的財務流水總額,每次須要讀取5張Excel表統計流水彙總,如何快速地統計出來? 可使用CountDownLatch,先開始5個線程併發地分別統計每張表的流水額度總和,當5個線程統計結束,再彙總總額。 二、開發對外接口,要求響應快,而該接口內部邏輯複雜,涉及多個服務的調用,並依賴這些獨立服務響應結果進行下一步操做,這時能夠考慮CountDownLatch或者 CyclicBarrier,併發調用多個服務,獲取這些結果後才進行下一步,縮短處理時間。
循環屏障,柵欄類; CyclicBarrier 可以讓多個線程相互等待,當全部線程都達到後再喚醒全部線程繼續執行。如導遊設定目標點,全部遊客到這集合,先到的遊客等待其餘遊客,當全部的遊客都到了後,你們再一塊兒出發。sql
CyclicBarrier與CountDownLatch區別: 1) CyclicBarrier 在應用場景中,多個線程之間相互等待,線程之間在業務上可能會更有依賴性 ; CountDownLatch是每一個執行 coutDown方法的線程之間能夠沒有依賴性,而是執行await方法的線程更依賴這些執行coutDown的線程。 2)CountDownLatch計數器只能使用一次,Semaphore能夠屢次使用,能夠重置使用。 3)CyclicBarrier 能夠處理更復雜的業務場景,如線程都達到屏障後,能夠在構造函數中 CyclicBarrier(int parties, Runnable barrierAction) 傳入線程barrierAction,當達到屏障觸發條件時,能夠比其餘等待線程優先執行,處理業務。 4)CyclicBarrier還有不少方法,如查看當前到達屏障被阻塞的線程數量 getNumberWaiting()。 5)CountDownLatch 是每一個線程執行完減1操做,當計數器爲0時,才喚醒等待線程,阻塞線程只有1個或者多個;CyclicBarrier是讓全部線程到達屏障處就被阻塞了,當全部線程都到達時,喚醒全部被阻塞線程繼續執行,阻塞線程有多個,線程之間是相互等待。
Semaphore(int permits) //默認非公平 //是否公平嘗試獲取許可證 Semaphore(int permits, boolean fair)
acquire() //從信號量那去獲取一個許可,若是沒有剩餘的就被阻塞 acquire(int permits)//也可一次獲取多個許可 release() //釋放一個許可,將其返還給信號量,給其餘線程使用。 release(int permits) //一次釋放多個許可
package Semaphore; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; /** * @author zdd * 2019/11/27 6:43 * Description: 信號量測試 */ public class SemaphoreService { private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); private Semaphore semaphore = new Semaphore(4); public void doSomeThing() throws InterruptedException { //1,獲取一個許可 semaphore.acquire(); System.out.println(Thread.currentThread().getName() + "--start--" +getFormatDate()); // 停頓1s TimeUnit.SECONDS.sleep(1); System.out.println(Thread.currentThread().getName() + "--end--" +getFormatDate()); //2,用完釋放一個許可,可供其餘線程使用 semaphore.release(); } public static void main(String[] args) { SemaphoreService semaphoreService = new SemaphoreService(); for (int i = 0; i < 10; i++) { WorkThread workThread = new WorkThread(i+"",semaphoreService); workThread.start(); } } public String getFormatDate() { return sdf.format(new Date()); } // 工做線程類 - 靜態內部類 static class WorkThread extends Thread { SemaphoreService semaphoreService; //構造參數傳入線程名稱及SemaphoreService public WorkThread(String name, SemaphoreService semaphoreService) { this.semaphoreService = semaphoreService; setName(name); } @Override public void run() { try { semaphoreService.doSomeThing(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
1.線程最多4個同時執行數據庫
//1,設置信號量數爲4,執行結果以下圖 private Semaphore semaphore = new Semaphore(4);
2.線程依次執行,實現同步併發
//1,設置信號量數爲1,執行結果以下圖 private Semaphore semaphore = new Semaphore(1);
countDown() 計數器減1 await() 阻塞等待,直到計數器爲0喚醒繼續執行 await(long timeout, TimeUnit unit) 阻塞等待,在等待設定時間計時器還沒到減爲0,也不會再繼續等待了。
package countDownLatch; import javax.swing.plaf.IconUIResource; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @author zdd * 2019/11/29 7:19 下午 * Description: 班長召集士兵集合拉練案例 */ public class CountDownLatchSoldierTest { private static final Integer THREAD_COUNT = 10; //等待士兵10人 private static CountDownLatch countDownLatch = new CountDownLatch(10); public static void main(String[] args) throws InterruptedException { //1,開10個線程模擬士兵簽到,採用線程池建立 ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT); for (int i = 0; i < 10; i++) { //2,提交10個線程執行 executorService.submit(()->{ try{ System.out.println(Thread.currentThread().getName()+ ": 士兵報告"); }finally { //3,計數器減1 countDownLatch.countDown(); } }); } //3,主線程作班長,負責等待全部士兵到齊,開始拉練 countDownLatch.await(); System.out.println("班長: 集合完畢,開始5千米越野! "); //4,關閉線程池 executorService.shutdown(); } }
計算器未減到0,主線程持續等待。ide
package countDownLatch; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; /** * zdd * 2019/11/5 10:50 * Description:測試計數器使用 */ public class CountDownLatchTest { static int number = 2; //1,設置計數器的值爲 2 static CountDownLatch countDownLatch = new CountDownLatch(number); public static void main(String[] args) throws InterruptedException { //2,開啓一個線程,傳入計數器 new Thread(()-> { try { //睡2s,模擬執行業務 TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("調用countDown前, 當前count 值爲:"+countDownLatch.getCount()); //3,調用遞減 1次 countDownLatch.countDown(); System.out.println("調用countDown後, 當前count 值爲:"+countDownLatch.getCount()); } },"worker").start(); //狀況一、主線程開始執行 ,等待1s以後,若是還沒到條件,也不等了哦 // countDownLatch.await(1,TimeUnit.SECONDS); //狀況二、阻塞等待,若是計數未到0,一直阻塞等待 countDownLatch.await(); System.out.println("主線程繼續執行"); } }
主線程阻塞等待,執行結果以下:
函數
CyclicBarrier(int parties) //parties爲須要等待線程的數量 //barrierAction,在全部其餘線程到達後,優先執行的線程。可根據業務添加 CyclicBarrier(int parties, Runnable barrierAction)
package CyclicBarrier; import java.util.Map; import java.util.concurrent.*; /** * @author zdd * 2019/11/28 9:20 * Description: 開啓4個線程分別統計sheet表,使用柵欄類實現同步統計計算,最後計算總和;此處也可用 * CountDownLatch實現 */ public class BankAccountService { private ConcurrentHashMap<String,Integer> concurrentHashMap = new ConcurrentHashMap<>(); // 線程數 private final static int THREAD_COUNT = 4; private CyclicBarrier cyclicBarrier = new CyclicBarrier(4, new Runnable() { @Override public void run() { Integer sumcount = 0; //彙總每一個sheet計算結果 for (Map.Entry<String, Integer> entry: concurrentHashMap.entrySet()) { sumcount+=entry.getValue(); } System.out.println("優先執行, 求和計算完畢,總和爲:"+ sumcount); } }); public void count() { ExecutorService executor = Executors.newFixedThreadPool(4); for (int i = 0; i < THREAD_COUNT; i++) { executor.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()+"到達屏障!"); concurrentHashMap.put(Thread.currentThread().getName(),1); try { cyclicBarrier.await(); System.out.println(Thread.currentThread().getName()+"繼續執行!"); } catch (Exception e) { e.printStackTrace(); } } }); } executor.shutdown(); } public static void main(String[] args) { BankAccountService bankAccountService = new BankAccountService(); bankAccountService.count(); } }
運行結果以下:測試
// 將線程數改成2,阻塞等待其餘線程 private final static int THREAD_COUNT = 2;