開啓線程執行任務,不論是使用Runnable(無返回值不支持上報異常)仍是Callable(有返回值支持上報異常)接口,均可以輕鬆實現。那麼若是是開啓線程池並須要獲取結果歸集的狀況下,如何實現,以及優劣,老司機直接看總結便可。java
任務執行完,結果歸集時,幾種方式:編程
以下圖,Future接口封裝了取消,獲取線程結果,以及狀態判斷是否取消,是否完成這幾個方法,都頗有用。多線程
使用線程池提交Callable接口任務,返回Future接口,添加進list,最後遍歷FutureList且內部使用while輪詢,併發獲取結果併發
1 package thread; 2 3 import java.util.ArrayList; 4 import java.util.Date; 5 import java.util.Iterator; 6 import java.util.List; 7 import java.util.concurrent.Callable; 8 import java.util.concurrent.ExecutorService; 9 import java.util.concurrent.Executors; 10 import java.util.concurrent.Future; 11 12 /** 13 * @author denny.zhang 14 * @ClassName: FutureDemo 15 * @Description: Future多線程併發任務結果歸集 16 * @date 2016年11月4日 下午1:50:32 17 */ 18 public class FutureDemo { 19 20 public static void main(String[] args) { 21 Long start = System.currentTimeMillis(); 22 //開啓多線程 23 ExecutorService exs = Executors.newFixedThreadPool(10); 24 try { 25 //結果集 26 List<Integer> list = new ArrayList<Integer>(); 27 List<Future<Integer>> futureList = new ArrayList<Future<Integer>>(); 28 //1.高速提交10個任務,每一個任務返回一個Future入list 29 for (int i = 0; i < 10; i++) { 30 futureList.add(exs.submit(new CallableTask(i + 1))); 31 } 32 Long getResultStart = System.currentTimeMillis(); 33 System.out.println("結果歸集開始時間=" + new Date()); 34 //2.結果歸集,用迭代器遍歷futureList,高速輪詢(模擬實現了併發),任務完成就移除 35 while(futureList.size()>0){ 36 Iterator<Future<Integer>> iterable = futureList.iterator(); 37 //遍歷一遍 38 while(iterable.hasNext()){ 39 Future<Integer> future = iterable.next(); 40 //若是任務完成取結果,不然判斷下一個任務是否完成 41 if (future.isDone() && !future.isCancelled()){ 42 //獲取結果 43 Integer i = future.get(); 44 System.out.println("任務i=" + i + "獲取完成,移出任務隊列!" + new Date()); 45 list.add(i); 46 //任務完成移除任務 47 iterable.remove(); 48 }else{ 49 Thread.sleep(1);//避免CPU高速運轉,這裏休息1毫秒,CPU納秒級別 50 } 51 } 52 } 53 System.out.println("list=" + list); 54 System.out.println("總耗時=" + (System.currentTimeMillis() - start) + ",取結果歸集耗時=" + (System.currentTimeMillis() - getResultStart)); 55 } catch (Exception e) { 56 e.printStackTrace(); 57 } finally { 58 exs.shutdown(); 59 } 60 } 61 62 static class CallableTask implements Callable<Integer> { 63 Integer i; 64 65 public CallableTask(Integer i) { 66 super(); 67 this.i = i; 68 } 69 70 @Override 71 public Integer call() throws Exception { 72 if (i == 1) { 73 Thread.sleep(3000);//任務1耗時3秒 74 } else if (i == 5) { 75 Thread.sleep(5000);//任務5耗時5秒 76 } else { 77 Thread.sleep(1000);//其它任務耗時1秒 78 } 79 System.out.println("task線程:" + Thread.currentThread().getName() + "任務i=" + i + ",完成!"+ new Date()); 80 return i; 81 } 82 } 83 }
如上圖,開啓定長爲10的線程池:ExecutorService exs = Executors.newFixedThreadPool(10);+任務1耗時3秒,任務5耗時5秒,其餘1秒。控制檯打印以下:eclipse
結果歸集開始時間=Fri Jun 01 09:59:33 CST 2018----起始33秒
task線程:pool-1-thread-3任務i=3,完成!Fri Jun 01 09:59:34 CST 2018
task線程:pool-1-thread-2任務i=2,完成!Fri Jun 01 09:59:34 CST 2018
task線程:pool-1-thread-4任務i=4,完成!Fri Jun 01 09:59:34 CST 2018
task線程:pool-1-thread-6任務i=6,完成!Fri Jun 01 09:59:34 CST 2018
task線程:pool-1-thread-7任務i=7,完成!Fri Jun 01 09:59:34 CST 2018
task線程:pool-1-thread-8任務i=8,完成!Fri Jun 01 09:59:34 CST 2018
task線程:pool-1-thread-9任務i=9,完成!Fri Jun 01 09:59:34 CST 2018
task線程:pool-1-thread-10任務i=10,完成!Fri Jun 01 09:59:34 CST 2018
任務i=2獲取完成,移出任務隊列!Fri Jun 01 09:59:34 CST 2018---通常任務耗時1秒,33+1=34,驗證經過!
任務i=3獲取完成,移出任務隊列!Fri Jun 01 09:59:34 CST 2018
任務i=4獲取完成,移出任務隊列!Fri Jun 01 09:59:34 CST 2018
任務i=6獲取完成,移出任務隊列!Fri Jun 01 09:59:34 CST 2018
任務i=7獲取完成,移出任務隊列!Fri Jun 01 09:59:34 CST 2018
任務i=8獲取完成,移出任務隊列!Fri Jun 01 09:59:34 CST 2018
任務i=9獲取完成,移出任務隊列!Fri Jun 01 09:59:34 CST 2018
任務i=10獲取完成,移出任務隊列!Fri Jun 01 09:59:34 CST 2018
task線程:pool-1-thread-1任務i=1,完成!Fri Jun 01 09:59:36 CST 2018
任務i=1獲取完成,移出任務隊列!Fri Jun 01 09:59:36 CST 2018---任務1 耗時3秒 33+3=36,驗證經過!
task線程:pool-1-thread-5任務i=5,完成!Fri Jun 01 09:59:38 CST 2018
任務i=5獲取完成,移出任務隊列!Fri Jun 01 09:59:38 CST 2018---任務5 耗時5秒 33+5=38,驗證經過!
list=[2, 3, 4, 6, 7, 8, 9, 10, 1, 5]--》多執行幾遍,最後2個老是1,5最後加進去的,可實現按照任務完成先手順序獲取結果!
總耗時=5012,取結果歸集耗時=5002---》符合邏輯,10個任務,定長10線程池,其中一個任務耗時3秒,一個任務耗時5秒,因爲併發高速輪訓,耗時取最長5秒
是接口RunnableFuture的惟一實現類。類圖以下(網上截取來的。。。個人eclipse類圖插件還沒裝好):異步
如上圖,可見RunnableFuture接口繼承自Future<V>+Runnable:async
1.Runnable接口,可開啓單個線程執行。 ide
2.Future<v>接口,可接受Callable接口的返回值,futureTask.get()阻塞獲取結果。函數式編程
FutureTask的構造方法有兩種,其實最終都是賦值callable。以下圖:函數
1 /** 2 * 3 * @ClassName:FutureTaskDemo 4 * @Description:FutureTask彌補了Future必須用線程池提交返回Future的缺陷,實現功能以下: 5 * 1.Runnable接口,可開啓線程執行。 6 * 2.Future<v>接口,可接受Callable接口的返回值,futureTask.get()阻塞獲取結果。 7 * 這兩個步驟:一個開啓線程執行任務,一個阻塞等待執行結果,分離這兩步驟,可在這兩步中間穿插別的相關業務邏輯。 8 * @author diandian.zhang 9 * @date 2017年6月16日上午10:36:05 10 */ 11 public class FutureTaskContorlDemo { 12 13 public static void main(String[] args) { 14 try { 15 System.out.println("=====例如一個統計公司總部和分部的總利潤是否達標100萬=========="); 16 //利潤 17 Integer count = 0; 18 //1.定義一個futureTask,假設去遠程http獲取各個分公司業績. 19 FutureTask<Integer> futureTask = new FutureTask<Integer>(new CallableTask()); 20 Thread futureTaskThread = new Thread(futureTask); 21 futureTaskThread.start(); 22 System.out.println("futureTaskThread start!"+new Date()); 23 24 //2.主線程先作點別的事 25 System.out.println("主線程查詢總部公司利潤開始時間:"+new Date()); 26 Thread.sleep(5000); 27 count+=10;//北京集團總部利潤。 28 System.out.println("主線程查詢總部公司利潤結果時間:"+new Date()); 29 30 //總部已達標100萬利潤,就再也不繼續執行獲取分公司業績任務了 31 if(count>=100){ 32 System.out.println("總部公司利潤達標,取消futureTask!"+new Date()); 33 futureTask.cancel(true);//不須要再去獲取結果,那麼直接取消便可 34 }else{ 35 System.out.println("總部公司利潤未達標,進入阻塞查詢分公司利潤!"+new Date()); 36 //3總部未達標.阻塞獲取,各個分公司結果 37 Integer i = futureTask.get();//真正執行CallableTask 38 System.out.println("i="+i+"獲取到結果!"+new Date()+new Date()); 39 } 40 } catch (Exception e) { 41 e.printStackTrace(); 42 } 43 } 44 45 /** 46 * 47 * @ClassName:CallableTask 48 * @Description:一個十分耗時的任務 49 * @author diandian.zhang 50 * @date 2017年6月16日上午10:39:04 51 */ 52 static class CallableTask implements Callable<Integer>{ 53 @Override 54 public Integer call() throws Exception { 55 System.out.println("CallableTask-call,查詢分公司利潤,執行開始!"+new Date()); 56 Thread.sleep(10000); 57 System.out.println("CallableTask-call,查詢分公司利潤,執行完畢!"+new Date()); 58 return 10; 59 } 60 }
執行結果以下:
=====例如一個統計公司總部和分部的總利潤是否達標100萬========== futureTaskThread start!Fri Jun 16 11:14:54 CST 2017----》futureTaskThread 已開始運行 CallableTask-call,查詢分公司利潤,執行開始!Fri Jun 16 11:14:54 CST 2017 主線程查詢總部公司利潤開始時間:Fri Jun 16 11:14:54 CST 2017------》主線程耗時5秒 主線程查詢總部公司利潤結果時間:Fri Jun 16 11:14:59 CST 2017 總部公司利潤未達標,進入阻塞查詢分公司利潤!Fri Jun 16 11:14:59 CST 2017 CallableTask-call,查詢分公司利潤,執行完畢!Fri Jun 16 11:15:04 CST 2017----》futureTaskThread 執行完畢,耗時10秒 i=10獲取到結果!Fri Jun 16 11:15:04 CST 2017Fri Jun 16 11:15:04 CST 2017
如上圖,分離以後,futureTaskThread耗時10秒期間,主線程還穿插的執行了耗時5秒的操做,大大減少總耗時。且可根據業務邏輯實時判斷是否須要繼續執行futureTask。
1 package thread; 2 3 import java.util.ArrayList; 4 import java.util.Date; 5 import java.util.Iterator; 6 import java.util.List; 7 import java.util.concurrent.ExecutorService; 8 import java.util.concurrent.Executors; 9 import java.util.concurrent.Future; 10 import java.util.concurrent.FutureTask; 11 12 /** 13 * 14 * @ClassName:FutureTaskDemo 15 * @Description:FutureTask實現多線程併發執行任務並取結果歸集 16 * @author diandian.zhang 17 * @date 2017年6月16日上午10:36:05 18 */ 19 public class FutureTaskDemo { 20 21 public static void main(String[] args) { 22 Long start = System.currentTimeMillis(); 23 //開啓多線程 24 ExecutorService exs = Executors.newFixedThreadPool(10); 25 try { 26 //結果集 27 List<Integer> list = new ArrayList<Integer>(); 28 List<FutureTask<Integer>> futureList = new ArrayList<FutureTask<Integer>>(); 29 //啓動線程池,10個任務固定線程數爲5 30 for(int i=0;i<10;i++){ 31 FutureTask<Integer> futureTask = new FutureTask<Integer>(new CallableTask(i+1)); 32 //提交任務,添加返回,Runnable特性 33 exs.submit(futureTask); 34 //Future特性 35 futureList.add(futureTask); 36 } 37 Long getResultStart = System.currentTimeMillis(); 38 System.out.println("結果歸集開始時間="+new Date()); 39 //結果歸集 40 while(futureList.size()>0){ 41 Iterator<FutureTask<Integer>> iterable = futureList.iterator(); 42 //遍歷一遍 43 while(iterable.hasNext()){ 44 Future<Integer> future = iterable.next(); 45 if (future.isDone()&& !future.isCancelled()) { 46 //Future特性 47 Integer i = future.get(); 48 System.out.println("任務i=" + i + "獲取完成,移出任務隊列!" + new Date()); 49 list.add(i); 50 //任務完成移除任務 51 iterable.remove(); 52 }else { 53 //避免CPU高速輪循,能夠休息一下。 54 Thread.sleep(1); 55 } 56 } 57 } 58 59 System.out.println("list="+list); 60 System.out.println("總耗時="+(System.currentTimeMillis()-start)+",取結果歸集耗時="+(System.currentTimeMillis()-getResultStart)); 61 } catch (Exception e) { 62 e.printStackTrace(); 63 } finally { 64 exs.shutdown(); 65 } 66 } 67 68 }
CallableTask:
1 package thread; 2 3 import java.util.Date; 4 import java.util.concurrent.Callable; 5 6 /** 7 * @Description 回調方法 8 * @author denny 9 * @date 2018/8/17 下午3:16 10 */ 11 public class CallableTask implements Callable<Integer> { 12 Integer i; 13 14 public CallableTask(Integer i) { 15 super(); 16 this.i = i; 17 } 18 19 @Override 20 public Integer call() throws Exception { 21 if (i == 1) { 22 Thread.sleep(3000);//任務1耗時3秒 23 } else if (i == 5) { 24 Thread.sleep(5000);//任務5耗時5秒 25 } else { 26 Thread.sleep(1000);//其它任務耗時1秒 27 } 28 System.out.println("task線程:" + Thread.currentThread().getName() + "任務i=" + i + ",完成!"+ new Date()); 29 return i; 30 } 31 }
結果:
結果歸集開始時間=Fri Aug 17 15:51:54 CST 2018
task線程:pool-1-thread-2任務i=2,完成!Fri Aug 17 15:51:55 CST 2018
task線程:pool-1-thread-3任務i=3,完成!Fri Aug 17 15:51:55 CST 2018
task線程:pool-1-thread-4任務i=4,完成!Fri Aug 17 15:51:55 CST 2018
task線程:pool-1-thread-6任務i=6,完成!Fri Aug 17 15:51:55 CST 2018
task線程:pool-1-thread-7任務i=7,完成!Fri Aug 17 15:51:55 CST 2018
task線程:pool-1-thread-8任務i=8,完成!Fri Aug 17 15:51:55 CST 2018
task線程:pool-1-thread-10任務i=10,完成!Fri Aug 17 15:51:55 CST 2018
task線程:pool-1-thread-9任務i=9,完成!Fri Aug 17 15:51:55 CST 2018
任務i=8獲取完成,移出任務隊列!Fri Aug 17 15:51:55 CST 2018
任務i=9獲取完成,移出任務隊列!Fri Aug 17 15:51:55 CST 2018
任務i=10獲取完成,移出任務隊列!Fri Aug 17 15:51:55 CST 2018
任務i=2獲取完成,移出任務隊列!Fri Aug 17 15:51:55 CST 2018
任務i=3獲取完成,移出任務隊列!Fri Aug 17 15:51:55 CST 2018
任務i=4獲取完成,移出任務隊列!Fri Aug 17 15:51:55 CST 2018
任務i=6獲取完成,移出任務隊列!Fri Aug 17 15:51:55 CST 2018
任務i=7獲取完成,移出任務隊列!Fri Aug 17 15:51:55 CST 2018
task線程:pool-1-thread-1任務i=1,完成!Fri Aug 17 15:51:57 CST 2018
任務i=1獲取完成,移出任務隊列!Fri Aug 17 15:51:57 CST 2018
task線程:pool-1-thread-5任務i=5,完成!Fri Aug 17 15:51:59 CST 2018
任務i=5獲取完成,移出任務隊列!Fri Aug 17 15:51:59 CST 2018
list=[8, 9, 10, 2, 3, 4, 6, 7, 1, 5]
總耗時=5014,取結果歸集耗時=5007
原理:內部經過阻塞隊列+FutureTask,實現了任務先完成可優先獲取到,即結果按照完成前後順序排序。
1 package thread.future; 2 3 import java.util.ArrayList; 4 import java.util.Date; 5 import java.util.List; 6 import java.util.concurrent.Callable; 7 import java.util.concurrent.CompletionService; 8 import java.util.concurrent.ExecutorCompletionService; 9 import java.util.concurrent.ExecutorService; 10 import java.util.concurrent.Executors; 11 import java.util.concurrent.Future; 12 13 /** 14 * 15 * @ClassName: CompletionServiceDemo 16 * @Description: CompletionService多線程併發任務結果歸集 17 * @author denny.zhang 18 * @date 2016年11月4日 下午1:50:32 19 * 20 */ 21 public class CompletionServiceDemo{ 22 23 public static void main(String[] args) { 24 Long start = System.currentTimeMillis(); 25 //開啓3個線程 26 ExecutorService exs = Executors.newFixedThreadPool(5); 27 try { 28 int taskCount = 10; 29 //結果集 30 List<Integer> list = new ArrayList<Integer>(); 31 //1.定義CompletionService 32 CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(exs); 33 List<Future<Integer>> futureList = new ArrayList<Future<Integer>>(); 34 //2.添加任務 35 for(int i=0;i<taskCount;i++){ 36 futureList.add(completionService.submit(new Task(i+1))); 37 } 38 //==================結果歸集=================== 39 //方法1:future是提交時返回的,遍歷queue則按照任務提交順序,獲取結果 40 // for (Future<Integer> future : futureList) { 41 // System.out.println("===================="); 42 // Integer result = future.get();//線程在這裏阻塞等待該任務執行完畢,按照 43 // System.out.println("任務result="+result+"獲取到結果!"+new Date()); 44 // list.add(result); 45 // } 46 47 // //方法2.使用內部阻塞隊列的take() 48 for(int i=0;i<taskCount;i++){ 49 Integer result = completionService.take().get();//採用completionService.take(),內部維護阻塞隊列,任務先完成的先獲取到 50 System.out.println("任務i=="+result+"完成!"+new Date()); 51 list.add(result); 52 } 53 System.out.println("list="+list); 54 System.out.println("總耗時="+(System.currentTimeMillis()-start)); 55 } catch (Exception e) { 56 e.printStackTrace(); 57 } finally { 58 exs.shutdown();//關閉線程池 59 } 60 61 } 62 63 static class Task implements Callable<Integer>{ 64 Integer i; 65 66 public Task(Integer i) { 67 super(); 68 this.i=i; 69 } 70 71 @Override 72 public Integer call() throws Exception { 73 if(i==5){ 74 Thread.sleep(5000); 75 }else{ 76 Thread.sleep(1000); 77 } 78 System.out.println("線程:"+Thread.currentThread().getName()+"任務i="+i+",執行完成!"); 79 return i; 80 } 81 82 } 83 }
打印結果以下:
線程:pool-1-thread-3任務i=3,執行完成! 線程:pool-1-thread-1任務i=1,執行完成! 線程:pool-1-thread-4任務i=4,執行完成! 線程:pool-1-thread-2任務i=2,執行完成! 任務i==3完成!Fri Jun 16 11:39:17 CST 2017 任務i==1完成!Fri Jun 16 11:39:17 CST 2017 任務i==4完成!Fri Jun 16 11:39:17 CST 2017 任務i==2完成!Fri Jun 16 11:39:17 CST 2017 線程:pool-1-thread-4任務i=8,執行完成! 線程:pool-1-thread-3任務i=7,執行完成! 線程:pool-1-thread-1任務i=6,執行完成! 線程:pool-1-thread-2任務i=9,執行完成! 任務i==8完成!Fri Jun 16 11:39:18 CST 2017 任務i==7完成!Fri Jun 16 11:39:18 CST 2017 任務i==6完成!Fri Jun 16 11:39:18 CST 2017 任務i==9完成!Fri Jun 16 11:39:18 CST 2017 線程:pool-1-thread-3任務i=10,執行完成! 任務i==10完成!Fri Jun 16 11:39:19 CST 2017 線程:pool-1-thread-5任務i=5,執行完成! 任務i==5完成!Fri Jun 16 11:39:21 CST 2017 list=[3, 1, 4, 2, 8, 7, 6, 9, 10, 5]---》這裏證明了確實按照執行完成順序排序 總耗時=5004---》符合邏輯,10個任務,定長5線程池執行,取最長時間。
4.1.從註釋看:
JDK1.8才新加入的一個實現類,實現了Future<T>, CompletionStage<T>2個接口,JDK註釋以下圖:
譯文(沒興趣的能夠跳過):
當一個Future可能須要顯示地完成時,使用CompletionStage接口去支持完成時觸發的函數和操做。當2個以上線程同時嘗試完成、異常完成、取消一個CompletableFuture時,只有一個能成功。
CompletableFuture實現了CompletionStage接口的以下策略:
1.爲了完成當前的CompletableFuture接口或者其餘完成方法的回調函數的線程,提供了非異步的完成操做。
2.沒有顯式入參Executor的全部async方法都使用ForkJoinPool.commonPool()
爲了簡化監視、調試和跟蹤,全部生成的異步任務都是標記接口AsynchronousCompletionTask
的實例。
3.全部的CompletionStage方法都是獨立於其餘共有方法實現的,所以一個方法的行爲不會受到子類中其餘方法的覆蓋。
CompletableFuture實現了Futurre接口的以下策略:
1.CompletableFuture沒法直接控制完成,因此cancel操做被視爲是另外一種異常完成形式。方法isCompletedExceptionally
能夠用來肯定一個CompletableFuture是否以任何異常的方式完成。
2.以一個CompletionException爲例,方法get()和get(long,TimeUnit)拋出一個ExecutionException,對應CompletionException。爲了在大多數上下文中簡化用法,這個類還定義了方法join()和getNow,而不是直接在這些狀況中直接拋出CompletionException。
4.2.CompletionStage接口實現流式編程:
JDK8新增接口,此接口包含38個方法...是的,你沒看錯,就是38個方法。這些方法主要是爲了支持函數式編程中流式處理。
4.3.CompletableFuture中4個異步執行任務靜態方法:
如上圖,其中supplyAsync用於有返回值的任務,runAsync則用於沒有返回值的任務。Executor參數能夠手動指定線程池,不然默認ForkJoinPool.commonPool()系統級公共線程池,注意:這些線程都是Daemon線程,主線程結束Daemon線程不結束,只有JVM關閉時,生命週期終止
。
4.4.組合CompletableFuture:
thenCombine(): 先完成當前CompletionStage和other 2個CompletionStage任務,而後把結果傳參給BiFunction進行結果合併操做。
thenCompose():第一個CompletableFuture執行完畢後,傳遞給下一個CompletionStage做爲入參進行操做。
JDK CompletableFuture 自帶多任務組合方法allOf和anyOf
allOf是等待全部任務完成,構造後CompletableFuture完成
anyOf是隻要有一個任務完成,構造後CompletableFuture就完成
方式一:循環建立CompletableFuture list,調用sequence()組裝返回一個有返回值的CompletableFuture,返回結果get()獲取
方式二:全流式處理轉換成CompletableFuture[]+allOf組裝成一個無返回值CompletableFuture,join等待執行完畢。返回結果whenComplete獲取。---》推薦
1 package thread; 2 3 import java.util.ArrayList; 4 import java.util.Date; 5 import java.util.List; 6 import java.util.concurrent.CompletableFuture; 7 import java.util.concurrent.ExecutorService; 8 import java.util.concurrent.Executors; 9 import java.util.stream.Collectors; 10 import java.util.stream.Stream; 11 12 import com.google.common.collect.Lists; 13 14 /** 15 * 16 * @ClassName:CompletableFutureDemo 17 * @Description:多線程併發任務,取結果歸集 18 * @author diandian.zhang 19 * @date 2017年6月14日下午12:44:01 20 */ 21 public class CompletableFutureDemo { 22 public static void main(String[] args) { 23 Long start = System.currentTimeMillis(); 24 //結果集 25 List<String> list = new ArrayList<String>(); 26 List<String> list2 = new ArrayList<String>(); 27 //定長10線程池 28 ExecutorService exs = Executors.newFixedThreadPool(10); 29 List<CompletableFuture<String>> futureList = new ArrayList<>(); 30 final List<Integer> taskList = Lists.newArrayList(2,1,3,4,5,6,7,8,9,10); 31 try { 32 ////方式一:循環建立CompletableFuture list,調用sequence()組裝返回一個有返回值的CompletableFuture,返回結果get()獲取 33 //for(int i=0;i<taskList.size();i++){ 34 // final int j=i; 35 // //異步執行 36 // CompletableFuture<String> future = CompletableFuture.supplyAsync(()->calc(taskList.get(j)), exs) 37 // //Integer轉換字符串 thenAccept只接受不返回不影響結果 38 // .thenApply(e->Integer.toString(e)) 39 // //如需獲取任務完成前後順序,此處代碼便可 40 // .whenComplete((v, e) -> { 41 // System.out.println("任務"+v+"完成!result="+v+",異常 e="+e+","+new Date()); 42 // list2.add(v); 43 // }) 44 // ; 45 // futureList.add(future); 46 //} 47 ////流式獲取結果:此處是根據任務添加順序獲取的結果 48 //list = sequence(futureList).get(); 49 50 //方式二:全流式處理轉換成CompletableFuture[]+組裝成一個無返回值CompletableFuture,join等待執行完畢。返回結果whenComplete獲取 51 CompletableFuture[] cfs = taskList.stream().map(object-> CompletableFuture.supplyAsync(()->calc(object), exs) 52 .thenApply(h->Integer.toString(h)) 53 //如需獲取任務完成前後順序,此處代碼便可 54 .whenComplete((v, e) -> { 55 System.out.println("任務"+v+"完成!result="+v+",異常 e="+e+","+new Date()); 56 list2.add(v); 57 })).toArray(CompletableFuture[]::new); 58 //等待總任務完成,可是封裝後無返回值,必須本身whenComplete()獲取 59 CompletableFuture.allOf(cfs).join(); 60 System.out.println("任務完成前後順序,結果list2="+list2+";任務提交順序,結果list="+list+",耗時="+(System.currentTimeMillis()-start)); 61 } catch (Exception e) { 62 e.printStackTrace(); 63 }finally { 64 exs.shutdown(); 65 } 66 } 67 68 public static Integer calc(Integer i){ 69 try { 70 if(i==1){ 71 //任務1耗時3秒 72 Thread.sleep(3000); 73 }else if(i==5){ 74 //任務5耗時5秒 75 Thread.sleep(5000); 76 }else{ 77 //其它任務耗時1秒 78 Thread.sleep(1000); 79 } 80 System.out.println("task線程:"+Thread.currentThread().getName()+"任務i="+i+",完成!+"+new Date()); 81 } catch (InterruptedException e) { 82 e.printStackTrace(); 83 } 84 return i; 85 } 86 87 /** 88 * 89 * @Description 組合多個CompletableFuture爲一個CompletableFuture,全部子任務所有完成,組合後的任務纔會完成。帶返回值,可直接get. 90 * @param futures List 91 * @return 92 * @author diandian.zhang 93 * @date 2017年6月19日下午3:01:09 94 * @since JDK1.8 95 */ 96 public static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) { 97 //1.構造一個空CompletableFuture,子任務數爲入參任務list size 98 CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])); 99 //2.流式(總任務完成後,每一個子任務join取結果,後轉換爲list) 100 return allDoneFuture.thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList())); 101 } 102 103 /** 104 * 105 * @Description Stream流式類型futures轉換成一個CompletableFuture,全部子任務所有完成,組合後的任務纔會完成。帶返回值,可直接get. 106 * @param futures Stream 107 * @return 108 * @author diandian.zhang 109 * @date 2017年6月19日下午6:23:40 110 * @since JDK1.8 111 */ 112 public static <T> CompletableFuture<List<T>> sequence(Stream<CompletableFuture<T>> futures) { 113 List<CompletableFuture<T>> futureList = futures.filter(f -> f != null).collect(Collectors.toList()); 114 return sequence(futureList); 115 } 116 }
方式二返回結果:
task線程:pool-1-thread-1任務i=2,完成!+Mon Jun 19 18:26:17 CST 2017 task線程:pool-1-thread-9任務i=9,完成!+Mon Jun 19 18:26:17 CST 2017 task線程:pool-1-thread-6任務i=6,完成!+Mon Jun 19 18:26:17 CST 2017 task線程:pool-1-thread-8任務i=8,完成!+Mon Jun 19 18:26:17 CST 2017 任務6完成!result=6,異常 e=null,Mon Jun 19 18:26:17 CST 2017 task線程:pool-1-thread-4任務i=4,完成!+Mon Jun 19 18:26:17 CST 2017 task線程:pool-1-thread-7任務i=7,完成!+Mon Jun 19 18:26:17 CST 2017 任務4完成!result=4,異常 e=null,Mon Jun 19 18:26:17 CST 2017 task線程:pool-1-thread-3任務i=3,完成!+Mon Jun 19 18:26:17 CST 2017 任務3完成!result=3,異常 e=null,Mon Jun 19 18:26:17 CST 2017 task線程:pool-1-thread-10任務i=10,完成!+Mon Jun 19 18:26:17 CST 2017 任務10完成!result=10,異常 e=null,Mon Jun 19 18:26:17 CST 2017 任務7完成!result=7,異常 e=null,Mon Jun 19 18:26:17 CST 2017 任務8完成!result=8,異常 e=null,Mon Jun 19 18:26:17 CST 2017 任務2完成!result=2,異常 e=null,Mon Jun 19 18:26:17 CST 2017 任務9完成!result=9,異常 e=null,Mon Jun 19 18:26:17 CST 2017 task線程:pool-1-thread-2任務i=1,完成!+Mon Jun 19 18:26:19 CST 2017---》任務1耗時3秒 任務1完成!result=1,異常 e=null,Mon Jun 19 18:26:19 CST 2017 task線程:pool-1-thread-5任務i=5,完成!+Mon Jun 19 18:26:21 CST 2017---》任務5耗時5秒 任務5完成!result=5,異常 e=null,Mon Jun 19 18:26:21 CST 2017 list2=[6, 4, 3, 10, 7, 8, 2, 9, 1, 5]list=[],耗時=5076---》符合邏輯,10個任務,10個線程併發執行,其中任務1耗時3秒,任務5耗時5秒,耗時取最大值。
本文從原理、demo、建議三個方向分析了經常使用多線程併發,取結果歸集的幾種實現方案,但願對你們有所啓發,整理表格以下:
|
Futrue |
FutureTask |
CompletionService |
CompletableFuture |
原理 | Futrue接口 |
接口RunnableFuture的惟一實現類,RunnableFuture接口繼承自Future<V>+Runnable: | 內部經過阻塞隊列+FutureTask接口 | JDK8實現了Future<T>, CompletionStage<T>2個接口 |
多任務併發執行 | 支持 | 支持 | 支持 | 支持 |
獲取任務結果的順序 | 支持任務完成前後順序 |
未知 | 支持任務完成的前後順序 | 支持任務完成的前後順序 |
異常捕捉 | 本身捕捉 | 本身捕捉 | 本身捕捉 | 源生API支持,返回每一個任務的異常 |
建議 | CPU高速輪詢,耗資源,可使用,但不推薦 |
功能不對口,併發任務這一塊多套一層,不推薦使用。 | 推薦使用,沒有JDK8CompletableFuture以前最好的方案,沒有質疑 | API極端豐富,配合流式編程,速度飛起,推薦使用! |
=====參考=================
1.《JAVA高併發程序設計》