多線程併發執行任務,取結果歸集。終極總結:Future、FutureTask、CompletionService、CompletableFuture

開啓線程執行任務,不論是使用Runnable(無返回值不支持上報異常)仍是Callable(有返回值支持上報異常)接口,均可以輕鬆實現。那麼若是開啓線程池並須要獲取結果歸集的狀況下,如何實現,以及優劣,老司機直接看總結便可。java

任務執行完,結果歸集時,幾種方式:編程

1.Futrue

原理:

以下圖,Future接口封裝了取消,獲取線程結果,以及狀態判斷是否取消,是否完成這幾個方法,都頗有用。多線程

demo:

使用線程池提交Callable接口任務,返回Future接口,添加進list,最後遍歷FutureList且內部使用while輪詢,併發獲取結果併發

 1 package thread;
 2 
 3 import java.util.ArrayList;
 4 import java.util.Date;
 5 import java.util.Iterator;
 6 import java.util.List;
 7 import java.util.concurrent.Callable;
 8 import java.util.concurrent.ExecutorService;
 9 import java.util.concurrent.Executors;
10 import java.util.concurrent.Future;
11 
12 /**
13  * @author denny.zhang
14  * @ClassName: FutureDemo
15  * @Description: Future多線程併發任務結果歸集
16  * @date 2016年11月4日 下午1:50:32
17  */
18 public class FutureDemo {
19 
20     public static void main(String[] args) {
21         Long start = System.currentTimeMillis();
22         //開啓多線程
23         ExecutorService exs = Executors.newFixedThreadPool(10);
24         try {
25             //結果集
26             List<Integer> list = new ArrayList<Integer>();
27             List<Future<Integer>> futureList = new ArrayList<Future<Integer>>();
28             //1.高速提交10個任務,每一個任務返回一個Future入list
29             for (int i = 0; i < 10; i++) {
30                 futureList.add(exs.submit(new CallableTask(i + 1)));
31             }
32             Long getResultStart = System.currentTimeMillis();
33             System.out.println("結果歸集開始時間=" + new Date());
34             //2.結果歸集,用迭代器遍歷futureList,高速輪詢(模擬實現了併發),任務完成就移除
35             while(futureList.size()>0){
36                 Iterator<Future<Integer>> iterable = futureList.iterator();
37                 //遍歷一遍
38                 while(iterable.hasNext()){
39                     Future<Integer> future = iterable.next();
40                     //若是任務完成取結果,不然判斷下一個任務是否完成
41                     if (future.isDone() && !future.isCancelled()){
42                         //獲取結果
43                         Integer i = future.get();
44                         System.out.println("任務i=" + i + "獲取完成,移出任務隊列!" + new Date());
45                         list.add(i);
46                         //任務完成移除任務
47                         iterable.remove();
48                     }else{
49                         Thread.sleep(1);//避免CPU高速運轉,這裏休息1毫秒,CPU納秒級別
50                     }
51                 }
52             }
53             System.out.println("list=" + list);
54             System.out.println("總耗時=" + (System.currentTimeMillis() - start) + ",取結果歸集耗時=" + (System.currentTimeMillis() - getResultStart));
55         } catch (Exception e) {
56             e.printStackTrace();
57         } finally {
58             exs.shutdown();
59         }
60     }
61 
62     static class CallableTask implements Callable<Integer> {
63         Integer i;
64 
65         public CallableTask(Integer i) {
66             super();
67             this.i = i;
68         }
69 
70         @Override
71         public Integer call() throws Exception {
72             if (i == 1) {
73                 Thread.sleep(3000);//任務1耗時3秒
74             } else if (i == 5) {
75                 Thread.sleep(5000);//任務5耗時5秒
76             } else {
77                 Thread.sleep(1000);//其它任務耗時1秒
78             }
79             System.out.println("task線程:" + Thread.currentThread().getName() + "任務i=" + i + ",完成!"+ new Date());
80             return i;
81         }
82     }
83 }

如上圖,開啓定長爲10的線程池:ExecutorService exs = Executors.newFixedThreadPool(10);+任務1耗時3秒,任務5耗時5秒,其餘1秒。控制檯打印以下:eclipse

結果歸集開始時間=Fri Jun 01 09:59:33 CST 2018----起始33秒
task線程:pool-1-thread-3任務i=3,完成!Fri Jun 01 09:59:34 CST 2018
task線程:pool-1-thread-2任務i=2,完成!Fri Jun 01 09:59:34 CST 2018
task線程:pool-1-thread-4任務i=4,完成!Fri Jun 01 09:59:34 CST 2018
task線程:pool-1-thread-6任務i=6,完成!Fri Jun 01 09:59:34 CST 2018
task線程:pool-1-thread-7任務i=7,完成!Fri Jun 01 09:59:34 CST 2018
task線程:pool-1-thread-8任務i=8,完成!Fri Jun 01 09:59:34 CST 2018
task線程:pool-1-thread-9任務i=9,完成!Fri Jun 01 09:59:34 CST 2018
task線程:pool-1-thread-10任務i=10,完成!Fri Jun 01 09:59:34 CST 2018
任務i=2獲取完成,移出任務隊列!Fri Jun 01 09:59:34 CST 2018---通常任務耗時1秒,33+1=34,驗證經過!
任務i=3獲取完成,移出任務隊列!Fri Jun 01 09:59:34 CST 2018
任務i=4獲取完成,移出任務隊列!Fri Jun 01 09:59:34 CST 2018
任務i=6獲取完成,移出任務隊列!Fri Jun 01 09:59:34 CST 2018
任務i=7獲取完成,移出任務隊列!Fri Jun 01 09:59:34 CST 2018
任務i=8獲取完成,移出任務隊列!Fri Jun 01 09:59:34 CST 2018
任務i=9獲取完成,移出任務隊列!Fri Jun 01 09:59:34 CST 2018
任務i=10獲取完成,移出任務隊列!Fri Jun 01 09:59:34 CST 2018
task線程:pool-1-thread-1任務i=1,完成!Fri Jun 01 09:59:36 CST 2018
任務i=1獲取完成,移出任務隊列!Fri Jun 01 09:59:36 CST 2018---任務1 耗時3秒 33+3=36,驗證經過!
task線程:pool-1-thread-5任務i=5,完成!Fri Jun 01 09:59:38 CST 2018
任務i=5獲取完成,移出任務隊列!Fri Jun 01 09:59:38 CST 2018---任務5 耗時5秒 33+5=38,驗證經過!
list=[2, 3, 4, 6, 7, 8, 9, 10, 1, 5]--》多執行幾遍,最後2個老是1,5最後加進去的,可實現按照任務完成先手順序獲取結果!
總耗時=5012,取結果歸集耗時=5002---》符合邏輯,10個任務,定長10線程池,其中一個任務耗時3秒,一個任務耗時5秒,因爲併發高速輪訓,耗時取最長5秒

建議:此種方法可實現基本目標,任務並行且按照完成順序獲取結果。使用很廣泛,老小皆宜,就是CPU有消耗,可使用

2.FutureTask

原理:

是接口RunnableFuture的惟一實現類。類圖以下(網上截取來的。。。個人eclipse類圖插件還沒裝好):異步

 

如上圖,可見RunnableFuture接口繼承自Future<V>+Runnable:async

1.Runnable接口,可開啓單個線程執行。 ide

2.Future<v>接口,可接受Callable接口的返回值,futureTask.get()阻塞獲取結果。函數式編程

FutureTask的構造方法有兩種,其實最終都是賦值callable。以下圖:函數

demo:

demo1:兩個步驟:1.開啓單個線程執行任務,2.阻塞等待執行結果,分離這兩步驟,可在這兩步中間穿插別的相關業務邏輯。

 1 /**
 2  * 
 3  * @ClassName:FutureTaskDemo
 4  * @Description:FutureTask彌補了Future必須用線程池提交返回Future的缺陷,實現功能以下:
 5  * 1.Runnable接口,可開啓線程執行。
 6  * 2.Future<v>接口,可接受Callable接口的返回值,futureTask.get()阻塞獲取結果。
 7  * 這兩個步驟:一個開啓線程執行任務,一個阻塞等待執行結果,分離這兩步驟,可在這兩步中間穿插別的相關業務邏輯。
 8  * @author diandian.zhang
 9  * @date 2017年6月16日上午10:36:05
10  */
11 public class FutureTaskContorlDemo {
12     
13     public static void main(String[] args)  {
14         try {
15             System.out.println("=====例如一個統計公司總部和分部的總利潤是否達標100萬==========");
16             //利潤
17             Integer count = 0;
18             //1.定義一個futureTask,假設去遠程http獲取各個分公司業績.
19             FutureTask<Integer> futureTask = new FutureTask<Integer>(new CallableTask());
20             Thread futureTaskThread =  new Thread(futureTask);
21             futureTaskThread.start();
22             System.out.println("futureTaskThread start!"+new Date());
23             
24             //2.主線程先作點別的事
25             System.out.println("主線程查詢總部公司利潤開始時間:"+new Date());
26             Thread.sleep(5000);
27             count+=10;//北京集團總部利潤。
28             System.out.println("主線程查詢總部公司利潤結果時間:"+new Date());
29             
30             //總部已達標100萬利潤,就再也不繼續執行獲取分公司業績任務了
31             if(count>=100){
32                 System.out.println("總部公司利潤達標,取消futureTask!"+new Date());
33                 futureTask.cancel(true);//不須要再去獲取結果,那麼直接取消便可
34             }else{
35                 System.out.println("總部公司利潤未達標,進入阻塞查詢分公司利潤!"+new Date());
36                 //3總部未達標.阻塞獲取,各個分公司結果
37                 Integer i = futureTask.get();//真正執行CallableTask
38                 System.out.println("i="+i+"獲取到結果!"+new Date()+new Date());
39             }
40         } catch (Exception e) {
41             e.printStackTrace();
42         }
43     }
44 
45     /**
46      * 
47      * @ClassName:CallableTask
48      * @Description:一個十分耗時的任務
49      * @author diandian.zhang
50      * @date 2017年6月16日上午10:39:04
51      */
52     static class CallableTask implements Callable<Integer>{
53         @Override
54         public Integer call() throws Exception {
55             System.out.println("CallableTask-call,查詢分公司利潤,執行開始!"+new Date());
56             Thread.sleep(10000);
57             System.out.println("CallableTask-call,查詢分公司利潤,執行完畢!"+new Date());
58             return 10;
59         }
60     }

執行結果以下:

=====例如一個統計公司總部和分部的總利潤是否達標100萬==========
futureTaskThread start!Fri Jun 16 11:14:54 CST 2017----》futureTaskThread 已開始運行
CallableTask-call,查詢分公司利潤,執行開始!Fri Jun 16 11:14:54 CST 2017
主線程查詢總部公司利潤開始時間:Fri Jun 16 11:14:54 CST 2017------》主線程耗時5秒
主線程查詢總部公司利潤結果時間:Fri Jun 16 11:14:59 CST 2017
總部公司利潤未達標,進入阻塞查詢分公司利潤!Fri Jun 16 11:14:59 CST 2017
CallableTask-call,查詢分公司利潤,執行完畢!Fri Jun 16 11:15:04 CST 2017----》futureTaskThread 執行完畢,耗時10秒
i=10獲取到結果!Fri Jun 16 11:15:04 CST 2017Fri Jun 16 11:15:04 CST 2017

如上圖,分離以後,futureTaskThread耗時10秒期間,主線程還穿插的執行了耗時5秒的操做,大大減少總耗時。且可根據業務邏輯實時判斷是否須要繼續執行futureTask

demo2:固然FutureTask同樣能夠併發執行任務並獲取結果,以下:

 1 package thread;
 2 
 3 import java.util.ArrayList;
 4 import java.util.Date;
 5 import java.util.Iterator;
 6 import java.util.List;
 7 import java.util.concurrent.ExecutorService;
 8 import java.util.concurrent.Executors;
 9 import java.util.concurrent.Future;
10 import java.util.concurrent.FutureTask;
11 
12 /**
13  * 
14  * @ClassName:FutureTaskDemo
15  * @Description:FutureTask實現多線程併發執行任務並取結果歸集
16  * @author diandian.zhang
17  * @date 2017年6月16日上午10:36:05
18  */
19 public class FutureTaskDemo {
20     
21     public static void main(String[] args)  {
22         Long start = System.currentTimeMillis();
23         //開啓多線程
24         ExecutorService exs = Executors.newFixedThreadPool(10);
25         try {
26             //結果集
27             List<Integer> list = new ArrayList<Integer>();
28             List<FutureTask<Integer>> futureList = new ArrayList<FutureTask<Integer>>();
29             //啓動線程池,10個任務固定線程數爲5
30             for(int i=0;i<10;i++){
31                  FutureTask<Integer> futureTask = new FutureTask<Integer>(new CallableTask(i+1));
32                 //提交任務,添加返回,Runnable特性
33                 exs.submit(futureTask);
34                 //Future特性
35                 futureList.add(futureTask);
36             }
37             Long getResultStart = System.currentTimeMillis();
38             System.out.println("結果歸集開始時間="+new Date());
39             //結果歸集
40             while(futureList.size()>0){
41                 Iterator<FutureTask<Integer>> iterable = futureList.iterator();
42                 //遍歷一遍
43                 while(iterable.hasNext()){
44                     Future<Integer> future = iterable.next();
45                     if (future.isDone()&& !future.isCancelled()) {
46                         //Future特性
47                         Integer i = future.get();
48                         System.out.println("任務i=" + i + "獲取完成,移出任務隊列!" + new Date());
49                         list.add(i);
50                         //任務完成移除任務
51                         iterable.remove();
52                     }else {
53                         //避免CPU高速輪循,能夠休息一下。
54                         Thread.sleep(1);
55                     }
56                 }
57             }
58 
59             System.out.println("list="+list);
60             System.out.println("總耗時="+(System.currentTimeMillis()-start)+",取結果歸集耗時="+(System.currentTimeMillis()-getResultStart));
61         } catch (Exception e) {
62             e.printStackTrace();
63         } finally {
64             exs.shutdown();
65         }
66     }
67 
68 }
CallableTask:
 1 package thread;
 2 
 3 import java.util.Date;
 4 import java.util.concurrent.Callable;
 5 
 6 /**
 7  * @Description 回調方法
 8  * @author denny
 9  * @date 2018/8/17 下午3:16
10  */
11 public class CallableTask implements Callable<Integer> {
12     Integer i;
13 
14     public CallableTask(Integer i) {
15         super();
16         this.i = i;
17     }
18 
19     @Override
20     public Integer call() throws Exception {
21         if (i == 1) {
22             Thread.sleep(3000);//任務1耗時3秒
23         } else if (i == 5) {
24             Thread.sleep(5000);//任務5耗時5秒
25         } else {
26             Thread.sleep(1000);//其它任務耗時1秒
27         }
28         System.out.println("task線程:" + Thread.currentThread().getName() + "任務i=" + i + ",完成!"+ new Date());
29         return i;
30     }
31 }

結果:

 結果歸集開始時間=Fri Aug 17 15:51:54 CST 2018
task線程:pool-1-thread-2任務i=2,完成!Fri Aug 17 15:51:55 CST 2018
task線程:pool-1-thread-3任務i=3,完成!Fri Aug 17 15:51:55 CST 2018
task線程:pool-1-thread-4任務i=4,完成!Fri Aug 17 15:51:55 CST 2018
task線程:pool-1-thread-6任務i=6,完成!Fri Aug 17 15:51:55 CST 2018
task線程:pool-1-thread-7任務i=7,完成!Fri Aug 17 15:51:55 CST 2018
task線程:pool-1-thread-8任務i=8,完成!Fri Aug 17 15:51:55 CST 2018
task線程:pool-1-thread-10任務i=10,完成!Fri Aug 17 15:51:55 CST 2018
task線程:pool-1-thread-9任務i=9,完成!Fri Aug 17 15:51:55 CST 2018
任務i=8獲取完成,移出任務隊列!Fri Aug 17 15:51:55 CST 2018
任務i=9獲取完成,移出任務隊列!Fri Aug 17 15:51:55 CST 2018
任務i=10獲取完成,移出任務隊列!Fri Aug 17 15:51:55 CST 2018
任務i=2獲取完成,移出任務隊列!Fri Aug 17 15:51:55 CST 2018
任務i=3獲取完成,移出任務隊列!Fri Aug 17 15:51:55 CST 2018
任務i=4獲取完成,移出任務隊列!Fri Aug 17 15:51:55 CST 2018
任務i=6獲取完成,移出任務隊列!Fri Aug 17 15:51:55 CST 2018
任務i=7獲取完成,移出任務隊列!Fri Aug 17 15:51:55 CST 2018
task線程:pool-1-thread-1任務i=1,完成!Fri Aug 17 15:51:57 CST 2018
任務i=1獲取完成,移出任務隊列!Fri Aug 17 15:51:57 CST 2018
task線程:pool-1-thread-5任務i=5,完成!Fri Aug 17 15:51:59 CST 2018
任務i=5獲取完成,移出任務隊列!Fri Aug 17 15:51:59 CST 2018
list=[8, 9, 10, 2, 3, 4, 6, 7, 1, 5]
總耗時=5014,取結果歸集耗時=5007

建議:demo1在特定場合例若有十分耗時的業務但有依賴於其餘業務不必定非要執行的,能夠嘗試使用。demo2多線程併發執行並結果歸集,這裏多套一層FutureTask比較雞肋(直接返回Future簡單明瞭)不建議使用。

3.CompletionService:

原理:內部經過阻塞隊列+FutureTask,實現了任務先完成可優先獲取到,即結果按照完成前後順序排序。

demo:

 1 package thread.future;
 2 
 3 import java.util.ArrayList;
 4 import java.util.Date;
 5 import java.util.List;
 6 import java.util.concurrent.Callable;
 7 import java.util.concurrent.CompletionService;
 8 import java.util.concurrent.ExecutorCompletionService;
 9 import java.util.concurrent.ExecutorService;
10 import java.util.concurrent.Executors;
11 import java.util.concurrent.Future;
12 
13 /**
14  * 
15  * @ClassName: CompletionServiceDemo
16  * @Description: CompletionService多線程併發任務結果歸集
17  * @author denny.zhang
18  * @date 2016年11月4日 下午1:50:32
19  *
20  */
21 public class CompletionServiceDemo{
22 
23     public static void main(String[] args)  {
24         Long start = System.currentTimeMillis();
25         //開啓3個線程
26         ExecutorService exs = Executors.newFixedThreadPool(5);
27         try {
28             int taskCount = 10;
29             //結果集
30             List<Integer> list = new ArrayList<Integer>();
31             //1.定義CompletionService
32             CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(exs);  
33             List<Future<Integer>> futureList = new ArrayList<Future<Integer>>();
34             //2.添加任務
35             for(int i=0;i<taskCount;i++){
36                 futureList.add(completionService.submit(new Task(i+1)));
37             }
38             //==================結果歸集===================
39             //方法1:future是提交時返回的,遍歷queue則按照任務提交順序,獲取結果
40 //            for (Future<Integer> future : futureList) {
41 //                System.out.println("====================");
42 //                Integer result = future.get();//線程在這裏阻塞等待該任務執行完畢,按照
43 //                System.out.println("任務result="+result+"獲取到結果!"+new Date());
44 //                list.add(result);
45 //            }
46 
47 //            //方法2.使用內部阻塞隊列的take()
48             for(int i=0;i<taskCount;i++){
49                 Integer result = completionService.take().get();//採用completionService.take(),內部維護阻塞隊列,任務先完成的先獲取到
50                 System.out.println("任務i=="+result+"完成!"+new Date());
51                 list.add(result);
52             }
53             System.out.println("list="+list);
54             System.out.println("總耗時="+(System.currentTimeMillis()-start));
55         } catch (Exception e) {
56             e.printStackTrace();
57         } finally {
58             exs.shutdown();//關閉線程池
59         }
60         
61     }
62 
63     static class Task implements Callable<Integer>{
64         Integer i;
65         
66         public Task(Integer i) {
67             super();
68             this.i=i;
69         }
70 
71         @Override
72         public Integer call() throws Exception {
73             if(i==5){
74                 Thread.sleep(5000);
75             }else{
76                 Thread.sleep(1000);
77             }
78             System.out.println("線程:"+Thread.currentThread().getName()+"任務i="+i+",執行完成!");  
79             return i;
80         }
81         
82     }
83 }

打印結果以下:

線程:pool-1-thread-3任務i=3,執行完成!
線程:pool-1-thread-1任務i=1,執行完成!
線程:pool-1-thread-4任務i=4,執行完成!
線程:pool-1-thread-2任務i=2,執行完成!
任務i==3完成!Fri Jun 16 11:39:17 CST 2017
任務i==1完成!Fri Jun 16 11:39:17 CST 2017
任務i==4完成!Fri Jun 16 11:39:17 CST 2017
任務i==2完成!Fri Jun 16 11:39:17 CST 2017
線程:pool-1-thread-4任務i=8,執行完成!
線程:pool-1-thread-3任務i=7,執行完成!
線程:pool-1-thread-1任務i=6,執行完成!
線程:pool-1-thread-2任務i=9,執行完成!
任務i==8完成!Fri Jun 16 11:39:18 CST 2017
任務i==7完成!Fri Jun 16 11:39:18 CST 2017
任務i==6完成!Fri Jun 16 11:39:18 CST 2017
任務i==9完成!Fri Jun 16 11:39:18 CST 2017
線程:pool-1-thread-3任務i=10,執行完成!
任務i==10完成!Fri Jun 16 11:39:19 CST 2017
線程:pool-1-thread-5任務i=5,執行完成!
任務i==5完成!Fri Jun 16 11:39:21 CST 2017
list=[3, 1, 4, 2, 8, 7, 6, 9, 10, 5]---》這裏證明了確實按照執行完成順序排序
總耗時=5004---》符合邏輯,10個任務,定長5線程池執行,取最長時間。

 

建議:使用率也挺高,並且能按照完成前後排序,建議若是有排序需求的優先使用。只是多線程併發執行任務結果歸集,也可使用。

4.CompletableFuture

原理:

4.1.從註釋看

JDK1.8才新加入的一個實現類,實現了Future<T>, CompletionStage<T>2個接口,JDK註釋以下圖:

譯文(沒興趣的能夠跳過):

當一個Future可能須要顯示地完成時,使用CompletionStage接口去支持完成時觸發的函數和操做。當2個以上線程同時嘗試完成、異常完成、取消一個CompletableFuture時,只有一個能成功。

CompletableFuture實現了CompletionStage接口的以下策略:

1.爲了完成當前的CompletableFuture接口或者其餘完成方法的回調函數的線程,提供了非異步的完成操做。

2.沒有顯式入參Executor的全部async方法都使用ForkJoinPool.commonPool()爲了簡化監視、調試和跟蹤,全部生成的異步任務都是標記接口AsynchronousCompletionTask的實例。

3.全部的CompletionStage方法都是獨立於其餘共有方法實現的,所以一個方法的行爲不會受到子類中其餘方法的覆蓋。

CompletableFuture實現了Futurre接口的以下策略:

1.CompletableFuture沒法直接控制完成,因此cancel操做被視爲是另外一種異常完成形式。方法isCompletedExceptionally能夠用來肯定一個CompletableFuture是否以任何異常的方式完成。

2.以一個CompletionException爲例,方法get()和get(long,TimeUnit)拋出一個ExecutionException,對應CompletionException。爲了在大多數上下文中簡化用法,這個類還定義了方法join()和getNow,而不是直接在這些狀況中直接拋出CompletionException。

 

4.2.CompletionStage接口實現流式編程:

JDK8新增接口,此接口包含38個方法...是的,你沒看錯,就是38個方法。這些方法主要是爲了支持函數式編程中流式處理。

 

4.3.CompletableFuture中4個異步執行任務靜態方法

如上圖,其中supplyAsync用於有返回值的任務,runAsync則用於沒有返回值的任務。Executor參數能夠手動指定線程池,不然默認ForkJoinPool.commonPool()系統級公共線程池,注意:這些線程都是Daemon線程,主線程結束Daemon線程不結束,只有JVM關閉時,生命週期終止

4.4.組合CompletableFuture:

thenCombine(): 先完成當前CompletionStage和other 2個CompletionStage任務,而後把結果傳參給BiFunction進行結果合併操做。

 

thenCompose():第一個CompletableFuture執行完畢後,傳遞給下一個CompletionStage做爲入參進行操做。

demo:

JDK CompletableFuture  自帶多任務組合方法allOf和anyOf

allOf是等待全部任務完成,構造後CompletableFuture完成

anyOf是隻要有一個任務完成,構造後CompletableFuture就完成

方式一:循環建立CompletableFuture list,調用sequence()組裝返回一個有返回值的CompletableFuture,返回結果get()獲取

方式二:全流式處理轉換成CompletableFuture[]+allOf組裝成一個無返回值CompletableFuture,join等待執行完畢。返回結果whenComplete獲取。---》推薦

 

  1 package thread;
  2 
  3 import java.util.ArrayList;
  4 import java.util.Date;
  5 import java.util.List;
  6 import java.util.concurrent.CompletableFuture;
  7 import java.util.concurrent.ExecutorService;
  8 import java.util.concurrent.Executors;
  9 import java.util.stream.Collectors;
 10 import java.util.stream.Stream;
 11 
 12 import com.google.common.collect.Lists;
 13 
 14 /**
 15  * 
 16  * @ClassName:CompletableFutureDemo
 17  * @Description:多線程併發任務,取結果歸集
 18  * @author diandian.zhang
 19  * @date 2017年6月14日下午12:44:01
 20  */
 21 public class CompletableFutureDemo {
 22     public static void main(String[] args) {
 23         Long start = System.currentTimeMillis();
 24         //結果集
 25         List<String> list = new ArrayList<String>();
 26         List<String> list2 = new ArrayList<String>();
 27         //定長10線程池
 28         ExecutorService exs = Executors.newFixedThreadPool(10);
 29         List<CompletableFuture<String>> futureList = new ArrayList<>();
 30         final List<Integer> taskList = Lists.newArrayList(2,1,3,4,5,6,7,8,9,10);
 31         try {
 32             ////方式一:循環建立CompletableFuture list,調用sequence()組裝返回一個有返回值的CompletableFuture,返回結果get()獲取
 33             //for(int i=0;i<taskList.size();i++){
 34             //    final int j=i;
 35             //    //異步執行
 36             //    CompletableFuture<String> future = CompletableFuture.supplyAsync(()->calc(taskList.get(j)), exs)
 37             //        //Integer轉換字符串    thenAccept只接受不返回不影響結果
 38             //        .thenApply(e->Integer.toString(e))
 39             //        //如需獲取任務完成前後順序,此處代碼便可
 40             //        .whenComplete((v, e) -> {
 41             //            System.out.println("任務"+v+"完成!result="+v+",異常 e="+e+","+new Date());
 42             //            list2.add(v);
 43             //        })
 44             //        ;
 45             //    futureList.add(future);
 46             //}
 47             ////流式獲取結果:此處是根據任務添加順序獲取的結果
 48             //list = sequence(futureList).get();
 49             
 50             //方式二:全流式處理轉換成CompletableFuture[]+組裝成一個無返回值CompletableFuture,join等待執行完畢。返回結果whenComplete獲取
 51             CompletableFuture[] cfs = taskList.stream().map(object-> CompletableFuture.supplyAsync(()->calc(object), exs)
 52                     .thenApply(h->Integer.toString(h))
 53                     //如需獲取任務完成前後順序,此處代碼便可
 54                     .whenComplete((v, e) -> {
 55                         System.out.println("任務"+v+"完成!result="+v+",異常 e="+e+","+new Date());
 56                         list2.add(v);
 57                     })).toArray(CompletableFuture[]::new);
 58             //等待總任務完成,可是封裝後無返回值,必須本身whenComplete()獲取
 59             CompletableFuture.allOf(cfs).join();
 60             System.out.println("任務完成前後順序,結果list2="+list2+";任務提交順序,結果list="+list+",耗時="+(System.currentTimeMillis()-start));
 61         } catch (Exception e) {
 62             e.printStackTrace();
 63         }finally {
 64             exs.shutdown();
 65         }
 66     }
 67     
 68     public static Integer calc(Integer i){
 69         try {
 70             if(i==1){
 71                 //任務1耗時3秒
 72                 Thread.sleep(3000);
 73             }else if(i==5){
 74                 //任務5耗時5秒
 75                 Thread.sleep(5000);
 76             }else{
 77                 //其它任務耗時1秒
 78                 Thread.sleep(1000);
 79             }
 80             System.out.println("task線程:"+Thread.currentThread().getName()+"任務i="+i+",完成!+"+new Date());  
 81         } catch (InterruptedException e) {
 82             e.printStackTrace();
 83         }
 84         return i;
 85     }
 86     
 87     /**
 88      * 
 89      * @Description 組合多個CompletableFuture爲一個CompletableFuture,全部子任務所有完成,組合後的任務纔會完成。帶返回值,可直接get.
 90      * @param futures List
 91      * @return
 92      * @author diandian.zhang
 93      * @date 2017年6月19日下午3:01:09
 94      * @since JDK1.8
 95      */
 96     public static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) {
 97         //1.構造一個空CompletableFuture,子任務數爲入參任務list size
 98         CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
 99         //2.流式(總任務完成後,每一個子任務join取結果,後轉換爲list)
100         return allDoneFuture.thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
101     }
102    
103     /**
104      * 
105      * @Description Stream流式類型futures轉換成一個CompletableFuture,全部子任務所有完成,組合後的任務纔會完成。帶返回值,可直接get.
106      * @param futures Stream
107      * @return
108      * @author diandian.zhang
109      * @date 2017年6月19日下午6:23:40
110      * @since JDK1.8
111      */
112     public static <T> CompletableFuture<List<T>> sequence(Stream<CompletableFuture<T>> futures) {
113         List<CompletableFuture<T>> futureList = futures.filter(f -> f != null).collect(Collectors.toList());
114         return sequence(futureList);
115     }
116 }

 

方式二返回結果:

task線程:pool-1-thread-1任務i=2,完成!+Mon Jun 19 18:26:17 CST 2017
task線程:pool-1-thread-9任務i=9,完成!+Mon Jun 19 18:26:17 CST 2017
task線程:pool-1-thread-6任務i=6,完成!+Mon Jun 19 18:26:17 CST 2017
task線程:pool-1-thread-8任務i=8,完成!+Mon Jun 19 18:26:17 CST 2017
任務6完成!result=6,異常 e=null,Mon Jun 19 18:26:17 CST 2017
task線程:pool-1-thread-4任務i=4,完成!+Mon Jun 19 18:26:17 CST 2017
task線程:pool-1-thread-7任務i=7,完成!+Mon Jun 19 18:26:17 CST 2017
任務4完成!result=4,異常 e=null,Mon Jun 19 18:26:17 CST 2017
task線程:pool-1-thread-3任務i=3,完成!+Mon Jun 19 18:26:17 CST 2017
任務3完成!result=3,異常 e=null,Mon Jun 19 18:26:17 CST 2017
task線程:pool-1-thread-10任務i=10,完成!+Mon Jun 19 18:26:17 CST 2017
任務10完成!result=10,異常 e=null,Mon Jun 19 18:26:17 CST 2017
任務7完成!result=7,異常 e=null,Mon Jun 19 18:26:17 CST 2017
任務8完成!result=8,異常 e=null,Mon Jun 19 18:26:17 CST 2017
任務2完成!result=2,異常 e=null,Mon Jun 19 18:26:17 CST 2017
任務9完成!result=9,異常 e=null,Mon Jun 19 18:26:17 CST 2017
task線程:pool-1-thread-2任務i=1,完成!+Mon Jun 19 18:26:19 CST 2017---》任務1耗時3秒
任務1完成!result=1,異常 e=null,Mon Jun 19 18:26:19 CST 2017
task線程:pool-1-thread-5任務i=5,完成!+Mon Jun 19 18:26:21 CST 2017---》任務5耗時5秒
任務5完成!result=5,異常 e=null,Mon Jun 19 18:26:21 CST 2017
list2=[6, 4, 3, 10, 7, 8, 2, 9, 1, 5]list=[],耗時=5076---》符合邏輯,10個任務,10個線程併發執行,其中任務1耗時3秒,任務5耗時5秒,耗時取最大值。

建議:CompletableFuture知足併發執行,順序完成先手順序獲取的目標。並且支持每一個任務的異常返回,配合流式編程,用起來速度飛起。JDK源生支持,API豐富,推薦使用

5.總結:

本文從原理、demo、建議三個方向分析了經常使用多線程併發,取結果歸集的幾種實現方案,但願對你們有所啓發,整理表格以下:

 

Futrue

FutureTask

CompletionService

CompletableFuture

原理

Futrue接口

接口RunnableFuture的惟一實現類,RunnableFuture接口繼承自Future<V>+Runnable: 內部經過阻塞隊列+FutureTask接口 JDK8實現了Future<T>, CompletionStage<T>2個接口
多任務併發執行 支持 支持 支持 支持
獲取任務結果的順序

支持任務完成前後順序

 未知  支持任務完成的前後順序 支持任務完成的前後順序
異常捕捉  本身捕捉  本身捕捉  本身捕捉 源生API支持,返回每一個任務的異常
建議 CPU高速輪詢,耗資源,可使用,但不推薦
 功能不對口,併發任務這一塊多套一層,不推薦使用。  推薦使用,沒有JDK8CompletableFuture以前最好的方案,沒有質疑

  API極端豐富,配合流式編程,速度飛起,推薦使用!

 

 

 

 

 

 

 

 

 

 

 

 

 

=====參考=================

1.《JAVA高併發程序設計》

2.使用Java 8的CompletableFuture實現函數式的回調

3.Java CompletableFuture 詳解

相關文章
相關標籤/搜索