這是java高併發系列第16篇文章。java
假若有這樣一個需求,當咱們須要解析一個Excel裏多個sheet的數據時,能夠考慮使用多線程,每一個線程解析一個sheet裏的數據,等到全部的sheet都解析完以後,程序須要統計解析總耗時。分析一下:解析每一個sheet耗時可能不同,總耗時就是最長耗時的那個操做。spring
咱們可以想到的最簡單的作法是使用join,代碼以下:微信
package com.itsoku.chat13; import java.util.concurrent.TimeUnit; /** * 微信公衆號:javacode2018,獲取年薪50萬課程 */ public class Demo1 { public static class T extends Thread { //休眠時間(秒) int sleepSeconds; public T(String name, int sleepSeconds) { super(name); this.sleepSeconds = sleepSeconds; } @Override public void run() { Thread ct = Thread.currentThread(); long startTime = System.currentTimeMillis(); System.out.println(startTime + "," + ct.getName() + ",開始處理!"); try { //模擬耗時操做,休眠sleepSeconds秒 TimeUnit.SECONDS.sleep(this.sleepSeconds); } catch (InterruptedException e) { e.printStackTrace(); } long endTime = System.currentTimeMillis(); System.out.println(endTime + "," + ct.getName() + ",處理完畢,耗時:" + (endTime - startTime)); } } public static void main(String[] args) throws InterruptedException { long starTime = System.currentTimeMillis(); T t1 = new T("解析sheet1線程", 2); t1.start(); T t2 = new T("解析sheet2線程", 5); t2.start(); t1.join(); t2.join(); long endTime = System.currentTimeMillis(); System.out.println("總耗時:" + (endTime - starTime)); } }
輸出:多線程
1563767560271,解析sheet1線程,開始處理! 1563767560272,解析sheet2線程,開始處理! 1563767562273,解析sheet1線程,處理完畢,耗時:2002 1563767565274,解析sheet2線程,處理完畢,耗時:5002 總耗時:5005
代碼中啓動了2個解析sheet的線程,第一個耗時2秒,第二個耗時5秒,最終結果中總耗時:5秒。上面的關鍵技術點是線程的join()
方法,此方法會讓當前線程等待被調用的線程完成以後才能繼續。能夠看一下join的源碼,內部實際上是在synchronized方法中調用了線程的wait方法,最後被調用的線程執行完畢以後,由jvm自動調用其notifyAll()方法,喚醒全部等待中的線程。這個notifyAll()方法是由jvm內部自動調用的,jdk源碼中是看不到的,須要看jvm源碼,有興趣的同窗能夠去查一下。因此JDK不推薦在線程上調用wait、notify、notifyAll方法。併發
而在JDK1.5以後的併發包中提供的CountDownLatch也能夠實現join的這個功能。jvm
CountDownLatch稱之爲閉鎖,它可使一個或一批線程在閉鎖上等待,等到其餘線程執行完相應操做後,閉鎖打開,這些等待的線程才能夠繼續執行。確切的說,閉鎖在內部維護了一個倒計數器。經過該計數器的值來決定閉鎖的狀態,從而決定是否容許等待的線程繼續執行。ide
經常使用方法:高併發
public CountDownLatch(int count):構造方法,count表示計數器的值,不能小於0,否者會報異常。工具
public void await() throws InterruptedException:調用await()會讓當前線程等待,直到計數器爲0的時候,方法纔會返回,此方法會響應線程中斷操做。this
public boolean await(long timeout, TimeUnit unit) throws InterruptedException:限時等待,在超時以前,計數器變爲了0,方法返回true,否者直到超時,返回false,此方法會響應線程中斷操做。
public void countDown():讓計數器減1
CountDownLatch使用步驟:
await()
,讓當前線程等待countDown()
方法,讓計數器減1await()
方法會返回咱們使用CountDownLatch來完成上面示例中使用join實現的功能,代碼以下:
package com.itsoku.chat13; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; /** * 微信公衆號:javacode2018,獲取年薪50萬課程 */ public class Demo2 { public static class T extends Thread { //休眠時間(秒) int sleepSeconds; CountDownLatch countDownLatch; public T(String name, int sleepSeconds, CountDownLatch countDownLatch) { super(name); this.sleepSeconds = sleepSeconds; this.countDownLatch = countDownLatch; } @Override public void run() { Thread ct = Thread.currentThread(); long startTime = System.currentTimeMillis(); System.out.println(startTime + "," + ct.getName() + ",開始處理!"); try { //模擬耗時操做,休眠sleepSeconds秒 TimeUnit.SECONDS.sleep(this.sleepSeconds); } catch (InterruptedException e) { e.printStackTrace(); } finally { countDownLatch.countDown(); } long endTime = System.currentTimeMillis(); System.out.println(endTime + "," + ct.getName() + ",處理完畢,耗時:" + (endTime - startTime)); } } public static void main(String[] args) throws InterruptedException { System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName() + "線程 start!"); CountDownLatch countDownLatch = new CountDownLatch(2); long starTime = System.currentTimeMillis(); T t1 = new T("解析sheet1線程", 2, countDownLatch); t1.start(); T t2 = new T("解析sheet2線程", 5, countDownLatch); t2.start(); countDownLatch.await(); System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName() + "線程 end!"); long endTime = System.currentTimeMillis(); System.out.println("總耗時:" + (endTime - starTime)); } }
輸出:
1563767580511,main線程 start! 1563767580513,解析sheet1線程,開始處理! 1563767580513,解析sheet2線程,開始處理! 1563767582515,解析sheet1線程,處理完畢,耗時:2002 1563767585515,解析sheet2線程,處理完畢,耗時:5002 1563767585515,main線程 end! 總耗時:5003
從結果中看出,效果和join實現的效果同樣,代碼中建立了計數器爲2的CountDownLatch
,主線程中調用countDownLatch.await();
會讓主線程等待,t一、t2線程中模擬執行耗時操做,最終在finally中調用了countDownLatch.countDown();
,此方法每調用一次,CountDownLatch內部計數器會減1,當計數器變爲0的時候,主線程中的await()會返回,而後繼續執行。注意:上面的countDown()
這個是必需要執行的方法,因此放在finally中執行。
仍是上面的示例,2個線程解析2個sheet,主線程等待2個sheet解析完成。主線程說,我等待2秒,大家仍是沒法處理完成,就不等待了,直接返回。以下代碼:
package com.itsoku.chat13; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; /** * 微信公衆號:javacode2018,獲取年薪50萬課程 */ public class Demo3 { public static class T extends Thread { //休眠時間(秒) int sleepSeconds; CountDownLatch countDownLatch; public T(String name, int sleepSeconds, CountDownLatch countDownLatch) { super(name); this.sleepSeconds = sleepSeconds; this.countDownLatch = countDownLatch; } @Override public void run() { Thread ct = Thread.currentThread(); long startTime = System.currentTimeMillis(); System.out.println(startTime + "," + ct.getName() + ",開始處理!"); try { //模擬耗時操做,休眠sleepSeconds秒 TimeUnit.SECONDS.sleep(this.sleepSeconds); } catch (InterruptedException e) { e.printStackTrace(); } finally { countDownLatch.countDown(); } long endTime = System.currentTimeMillis(); System.out.println(endTime + "," + ct.getName() + ",處理完畢,耗時:" + (endTime - startTime)); } } public static void main(String[] args) throws InterruptedException { System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName() + "線程 start!"); CountDownLatch countDownLatch = new CountDownLatch(2); long starTime = System.currentTimeMillis(); T t1 = new T("解析sheet1線程", 2, countDownLatch); t1.start(); T t2 = new T("解析sheet2線程", 5, countDownLatch); t2.start(); boolean result = countDownLatch.await(2, TimeUnit.SECONDS); System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName() + "線程 end!"); long endTime = System.currentTimeMillis(); System.out.println("主線程耗時:" + (endTime - starTime) + ",result:" + result); } }
輸出:
1563767637316,main線程 start! 1563767637320,解析sheet1線程,開始處理! 1563767637320,解析sheet2線程,開始處理! 1563767639321,解析sheet1線程,處理完畢,耗時:2001 1563767639322,main線程 end! 主線程耗時:2004,result:false 1563767642322,解析sheet2線程,處理完畢,耗時:5002
從輸出結果中能夠看出,線程2耗時了5秒,主線程耗時了2秒,主線程中調用countDownLatch.await(2, TimeUnit.SECONDS);
,表示最多等2秒,無論計數器是否爲0,await方法都會返回,若等待時間內,計數器變爲0了,當即返回true,不然超時後返回false。
有3我的參見跑步比賽,須要先等指令員發指令槍後才能開跑,全部人都跑完以後,指令員喊一聲,你們跑完了。
示例代碼:
package com.itsoku.chat13; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; /** * 微信公衆號:javacode2018,獲取年薪50萬課程 */ public class Demo4 { public static class T extends Thread { //跑步耗時(秒) int runCostSeconds; CountDownLatch commanderCd; CountDownLatch countDown; public T(String name, int runCostSeconds, CountDownLatch commanderCd, CountDownLatch countDown) { super(name); this.runCostSeconds = runCostSeconds; this.commanderCd = commanderCd; this.countDown = countDown; } @Override public void run() { //等待指令員槍響 try { commanderCd.await(); } catch (InterruptedException e) { e.printStackTrace(); } Thread ct = Thread.currentThread(); long startTime = System.currentTimeMillis(); System.out.println(startTime + "," + ct.getName() + ",開始跑!"); try { //模擬耗時操做,休眠runCostSeconds秒 TimeUnit.SECONDS.sleep(this.runCostSeconds); } catch (InterruptedException e) { e.printStackTrace(); } finally { countDown.countDown(); } long endTime = System.currentTimeMillis(); System.out.println(endTime + "," + ct.getName() + ",跑步結束,耗時:" + (endTime - startTime)); } } public static void main(String[] args) throws InterruptedException { System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName() + "線程 start!"); CountDownLatch commanderCd = new CountDownLatch(1); CountDownLatch countDownLatch = new CountDownLatch(3); long starTime = System.currentTimeMillis(); T t1 = new T("小張", 2, commanderCd, countDownLatch); t1.start(); T t2 = new T("小李", 5, commanderCd, countDownLatch); t2.start(); T t3 = new T("路人甲", 10, commanderCd, countDownLatch); t3.start(); //主線程休眠5秒,模擬指令員準備發槍耗時操做 TimeUnit.SECONDS.sleep(5); System.out.println(System.currentTimeMillis() + ",槍響了,你們開始跑"); commanderCd.countDown(); countDownLatch.await(); long endTime = System.currentTimeMillis(); System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName() + "全部人跑完了,主線程耗時:" + (endTime - starTime)); } }
輸出:
1563767691087,main線程 start! 1563767696092,槍響了,你們開始跑 1563767696092,小張,開始跑! 1563767696092,小李,開始跑! 1563767696092,路人甲,開始跑! 1563767698093,小張,跑步結束,耗時:2001 1563767701093,小李,跑步結束,耗時:5001 1563767706093,路人甲,跑步結束,耗時:10001 1563767706093,main全部人跑完了,主線程耗時:15004
代碼中,t一、t二、t3啓動以後,都阻塞在commanderCd.await();
,主線程模擬發槍準備操做耗時5秒,而後調用commanderCd.countDown();
模擬發槍操做,此方法被調用之後,阻塞在commanderCd.await();
的3個線程會向下執行。主線程調用countDownLatch.await();
以後進行等待,每一個人跑完以後,調用countDown.countDown();
通知一下countDownLatch
讓計數器減1,最後3我的都跑完了,主線程從countDownLatch.await();
返回繼續向下執行。
package com.itsoku.chat13; import org.springframework.util.CollectionUtils; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; /** * 微信公衆號:javacode2018,獲取年薪50萬課程 */ public class TaskDisposeUtils { //並行線程數 public static final int POOL_SIZE; static { POOL_SIZE = Integer.max(Runtime.getRuntime().availableProcessors(), 5); } /** * 並行處理,並等待結束 * * @param taskList 任務列表 * @param consumer 消費者 * @param <T> * @throws InterruptedException */ public static <T> void dispose(List<T> taskList, Consumer<T> consumer) throws InterruptedException { dispose(true, POOL_SIZE, taskList, consumer); } /** * 並行處理,並等待結束 * * @param moreThread 是否多線程執行 * @param poolSize 線程池大小 * @param taskList 任務列表 * @param consumer 消費者 * @param <T> * @throws InterruptedException */ public static <T> void dispose(boolean moreThread, int poolSize, List<T> taskList, Consumer<T> consumer) throws InterruptedException { if (CollectionUtils.isEmpty(taskList)) { return; } if (moreThread && poolSize > 1) { poolSize = Math.min(poolSize, taskList.size()); ExecutorService executorService = null; try { executorService = Executors.newFixedThreadPool(poolSize); CountDownLatch countDownLatch = new CountDownLatch(taskList.size()); for (T item : taskList) { executorService.execute(() -> { try { consumer.accept(item); } finally { countDownLatch.countDown(); } }); } countDownLatch.await(); } finally { if (executorService != null) { executorService.shutdown(); } } } else { for (T item : taskList) { consumer.accept(item); } } } public static void main(String[] args) throws InterruptedException { //生成1-10的10個數字,放在list中,至關於10個任務 List<Integer> list = Stream.iterate(1, a -> a + 1).limit(10).collect(Collectors.toList()); //啓動多線程處理list中的數據,每一個任務休眠時間爲list中的數值 TaskDisposeUtils.dispose(list, item -> { try { long startTime = System.currentTimeMillis(); TimeUnit.SECONDS.sleep(item); long endTime = System.currentTimeMillis(); System.out.println(System.currentTimeMillis() + ",任務" + item + "執行完畢,耗時:" + (endTime - startTime)); } catch (InterruptedException e) { e.printStackTrace(); } }); //上面全部任務處理完畢完畢以後,程序才能繼續 System.out.println(list + "中的任務都處理完畢!"); } }
運行代碼輸出:
1563769828130,任務1執行完畢,耗時:1000 1563769829130,任務2執行完畢,耗時:2000 1563769830131,任務3執行完畢,耗時:3001 1563769831131,任務4執行完畢,耗時:4001 1563769832131,任務5執行完畢,耗時:5001 1563769833130,任務6執行完畢,耗時:6000 1563769834131,任務7執行完畢,耗時:7001 1563769835131,任務8執行完畢,耗時:8001 1563769837131,任務9執行完畢,耗時:9001 1563769839131,任務10執行完畢,耗時:10001 [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]中的任務都處理完畢!
TaskDisposeUtils是一個並行處理的工具類,能夠傳入n個任務內部使用線程池進行處理,等待全部任務都處理完成以後,方法纔會返回。好比咱們發送短信,系統中有1萬條短信,咱們使用上面的工具,每次取100條並行發送,待100個都處理完畢以後,再取一批按照一樣的邏輯發送。
java高併發系列連載中,總計估計會有四五十篇文章,能夠關注公衆號:javacode2018,獲取最新文章。