建立線程的兩種方式,一種是直接繼承Thread,另一種就是實現Runnable接口。這兩種方式都有一個缺陷就是:在執行完任務以後沒法獲取執行結果。若是須要獲取執行結果,就必須經過共享變量或者使用線程通訊的方式來達到效果,這樣使用起來就比較麻煩。而自從Java 1.5開始,就提供了Callable和Future,經過它們能夠在任務執行完畢以後獲得任務執行結果。html
詳述:https://www.cnblogs.com/bjlhx/p/7588971.htmljava
它是一個接口,裏面只聲明瞭一個run()方法:編程
public interface Runnable { public abstract void run(); }
因爲run()方法返回值爲void類型,因此在執行完任務以後沒法返回任何結果。設計模式
Callable接口位於java.util.concurrent包下,在它裏面也只聲明瞭一個方法,只不過這個方法叫作call()。數據結構
public interface Callable<V> { V call() throws Exception; }
是一個泛型接口,call()函數返回的類型就是傳遞進來的V類型。Callable接口能夠看做是Runnable接口的補充,call方法帶有返回值,而且能夠拋出異常。多線程
Future的核心思想是:併發
一個方法,計算過程可能很是耗時,等待方法返回,顯然不明智。能夠在調用方法的時候,立馬返回一個Future,能夠經過Future這個數據結構去控制方法f的計算過程。dom
Future類位於java.util.concurrent包下,它是一個接口:這裏的控制包括:異步
get方法:獲取計算結果(若是還沒計算完,也是必須等待的)這個方法會產生阻塞,會一直等到任務執行完畢才返回;ide
get(long timeout, TimeUnit unit)用來獲取執行結果,若是在指定時間內,還沒獲取到結果,就直接返回null。
cancel方法:還沒計算完,能夠取消計算過程,若是取消任務成功則返回true,若是取消任務失敗則返回false。參數mayInterruptIfRunning表示是否容許取消正在執行卻沒有執行完畢的任務,若是設置true,則表示能夠取消正在執行過程當中的任務。若是任務已經完成,則不管mayInterruptIfRunning爲true仍是false,此方法確定返回false,即若是取消已經完成的任務會返回false;若是任務正在執行,若mayInterruptIfRunning設置爲true,則返回true,若mayInterruptIfRunning設置爲false,則返回false;若是任務尚未執行,則不管mayInterruptIfRunning爲true仍是false,確定返回true。
isDone方法:判斷是否計算完
isCancelled方法:判斷計算是否被取消,方法表示任務是否被取消成功,若是在任務正常完成前被取消成功,則返回 true。
也就是說Future提供了三種功能:
1)判斷任務是否完成;
2)可以中斷任務;
3)可以獲取任務執行結果。
Future就是對於具體的Runnable或者Callable任務的執行結果進行取消、查詢是否完成、獲取結果。必要時能夠經過get方法獲取執行結果,該方法會阻塞直到任務返回結果。
由於Future只是一個接口,因此是沒法直接用來建立對象使用的,所以就有了下面的FutureTask。
使用Callable+Future獲取執行結果:
public class Test { public static void main(String[] args) { ExecutorService executor = Executors.newCachedThreadPool(); Task task = new Task(); Future<Integer> result = executor.submit(task); executor.shutdown(); try { Thread.sleep(1000); } catch (InterruptedException e1) { e1.printStackTrace(); } System.out.println("主線程在執行任務"); try { System.out.println("task運行結果"+result.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } System.out.println("全部任務執行完畢"); } } class Task implements Callable<Integer>{ @Override public Integer call() throws Exception { System.out.println("子線程在進行計算"); Thread.sleep(3000); int sum = 0; for(int i=0;i<100;i++) sum += i; return sum; } }
FutureTask繼承體系中的核心接口是Future。事實上,FutureTask是Future接口的一個惟一實現類。
如何獲取Callable的返回結果:通常是經過FutureTask這個中間媒介來實現的。總體的流程是這樣的:
把Callable實例看成參數,生成一個FutureTask的對象,而後把這個對象看成一個Runnable,做爲參數另起線程。
方式1、使用thread方式
FutureTask實現了Runnable,所以它既能夠經過Thread包裝來直接執行,也能夠提交給ExecuteService來執行。如下使用Thread包裝線程方式啓動
public static void main(String[] args) throws Exception { Callable<Integer> call = () -> { System.out.println("計算線程正在計算結果..."); Thread.sleep(3000); return 1; }; FutureTask<Integer> task = new FutureTask<>(call); Thread thread = new Thread(task); thread.start(); System.out.println("main線程乾點別的..."); Integer result = task.get(); System.out.println("從計算線程拿到的結果爲:" + result); }
方式2、使用 ExecutorService
ExecutorService executor = Executors.newFixedThreadPool(2);線程池方式
public static void main(String[] args) { Callable<String> callable1=()->{ Thread.sleep(2000); return Thread.currentThread().getName(); }; Callable<String> callable2=()->{ Thread.sleep(3000); return Thread.currentThread().getName(); }; FutureTask<String> futureTask1 = new FutureTask<>(callable1);// 將Callable寫的任務封裝到一個由執行者調度的FutureTask對象 FutureTask<String> futureTask2 = new FutureTask<>(callable2); ExecutorService executor = Executors.newFixedThreadPool(2); // 建立線程池並返回ExecutorService實例 executor.execute(futureTask1); // 執行任務 executor.execute(futureTask2); //同時開啓了兩個任務 long startTime = System.currentTimeMillis(); while (true) { try { if(futureTask1.isDone() && futureTask2.isDone()){// 兩個任務都完成 System.out.println("Done"); executor.shutdown(); // 關閉線程池和服務 return; } if(!futureTask1.isDone()){ // 任務1沒有完成,會等待,直到任務完成 System.out.println("FutureTask1 output="+futureTask1.get()); } System.out.println("Waiting for FutureTask2 to complete"); String s = futureTask2.get(200L, TimeUnit.MILLISECONDS); if(s !=null){ System.out.println("FutureTask2 output="+s); } } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }catch(TimeoutException e){ //do nothing } System.out.println((System.currentTimeMillis()-startTime)); } }
使用Callable+FutureTask獲取執行結果
public class Test { public static void main(String[] args) { //第一種方式 ExecutorService executor = Executors.newCachedThreadPool(); Task task = new Task(); FutureTask<Integer> futureTask = new FutureTask<Integer>(task); executor.submit(futureTask); executor.shutdown(); //第二種方式,注意這種方式和第一種方式效果是相似的,只不過一個使用的是ExecutorService,一個使用的是Thread /*Task task = new Task(); FutureTask<Integer> futureTask = new FutureTask<Integer>(task); Thread thread = new Thread(futureTask); thread.start();*/ try { Thread.sleep(1000); } catch (InterruptedException e1) { e1.printStackTrace(); } System.out.println("主線程在執行任務"); try { System.out.println("task運行結果"+futureTask.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } System.out.println("全部任務執行完畢"); } } class Task implements Callable<Integer>{ @Override public Integer call() throws Exception { System.out.println("子線程在進行計算"); Thread.sleep(3000); int sum = 0; for(int i=0;i<100;i++) sum += i; return sum; } }
原理:內部經過阻塞隊列+FutureTask,實現了任務先完成可優先獲取到,即結果按照完成前後順序排序。
package com.lhx.cloud.futruetask; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; public class CompletionServiceDemo { public static void main(String[] args) { Long start = System.currentTimeMillis(); //開啓5個線程 ExecutorService exs = Executors.newFixedThreadPool(5); try { int taskCount = 10; //結果集 List<Integer> list = new ArrayList<>(); //1.定義CompletionService CompletionService<Integer> completionService = new ExecutorCompletionService<>(exs); List<Future<Integer>> futureList = new ArrayList<>(); //2.添加任務 for(int i=0;i<taskCount;i++){ futureList.add(completionService.submit(new Task(i+1))); } //==================結果歸集=================== //方法1:future是提交時返回的,遍歷queue則按照任務提交順序,獲取結果 // for (Future<Integer> future : futureList) { // System.out.println("===================="); // Integer result = future.get();//線程在這裏阻塞等待該任務執行完畢,按照 // System.out.println("任務result="+result+"獲取到結果!"+new Date()); // list.add(result); // } // //方法2.使用內部阻塞隊列的take() for(int i=0;i<taskCount;i++){ Integer result = completionService.take().get();//採用completionService.take(),內部維護阻塞隊列,任務先完成的先獲取到 System.out.println(LocalDateTime.now()+"---任務i=="+result+"完成!"); list.add(result); } System.out.println("list="+list); System.out.println("總耗時="+(System.currentTimeMillis()-start)); } catch (Exception e) { e.printStackTrace(); } finally { exs.shutdown();//關閉線程池 } } static class Task implements Callable<Integer>{ Integer i; public Task(Integer i) { super(); this.i=i; } @Override public Integer call() throws Exception { if(i==5){ Thread.sleep(5000); }else{ Thread.sleep(1000); } System.out.println("線程:"+Thread.currentThread().getName()+"任務i="+i+",執行完成!"); return i; } } }
Future 接口,用於描述一個異步計算的結果。雖然 Future 以及相關使用方法提供了異步執行任務的能力,可是對於結果的獲取倒是很不方便,只能經過阻塞或者輪詢的方式獲得任務的結果。
阻塞的方式顯然和咱們的異步編程的初衷相違背,輪詢的方式又會耗費無謂的 CPU 資源,並且也不能及時地獲得計算結果,爲何不能用觀察者設計模式呢?即當計算結果完成及時通知監聽者。
Future侷限性,它很難直接表述多個Future 結果之間的依賴性。
CompletionStage表明異步計算過程當中的某一個階段,一個階段完成之後可能會觸發另一個階段
一個階段的計算執行能夠是一個Function,Consumer或者Runnable。好比:stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println())
一個階段的執行多是被單個階段的完成觸發,也多是由多個階段一塊兒觸發
CompletableFuture.compleatedFuture是一個靜態輔助方法,用來返回一個已經計算好的CompletableFuture.
如下四個靜態方法用來爲一段異步執行的代碼建立CompletableFuture對象:
public static CompletableFuture<Void> runAsync(Runnable runnable) public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
以Async結尾而且沒有指定Executor的方法會使用ForkJoinPool.commonPool() 做爲它的線程池執行異步代碼。
runAsync方法:它以Runnabel函數式接口類型爲參數,因此CompletableFuture的計算結果爲空。
supplyAsync方法以Supplier<U>函數式接口類型爲參數,CompletableFuture的計算結果類型爲U。
注意:這些線程都是Daemon線程,主線程結束Daemon線程不結束,只有JVM關閉時,生命週期終止。
示例:簡單同步用法
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { //長時間的計算任務 try { System.out.println("計算型任務開始"); Thread.sleep(2000); return "計算型任務結束"; } catch (InterruptedException e) { e.printStackTrace(); } return "·00"; }); System.out.println(future.get()); }
當CompletableFuture的計算結果完成,或者拋出異常的時候,能夠執行特定的Action。主要是下面的方法:
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action) public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor) public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
能夠看到Action的類型是BiConsumer<? super T,? super Throwable>它能夠處理正常的計算結果,或者異常狀況。
方法不以Async結尾,意味着Action使用相同的線程執行,而Async可能會使用其餘線程執行(若是是使用相同的線程池,也可能會被同一個線程選中執行)
示例:
package com.lhx.cloud.futruetask; import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; public class BasicFuture { private static Random rand = new Random(); private static long t = System.currentTimeMillis(); static int getMoreData() { System.out.println("begin to start compute"); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("end to compute,passed " + (System.currentTimeMillis()-t)); return rand.nextInt(1000); } public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(BasicFuture::getMoreData); Future<Integer> f = future.whenComplete((v, e) -> { System.out.println(v); System.out.println(e); }); System.out.println(f.get()); }}
CompletableFuture能夠做爲monad(單子)和functor. 因爲回調風格的實現,咱們沒必要由於等待一個計算完成而阻塞着調用線程,而是告訴CompletableFuture當計算完成的時候請執行某個Function. 還能夠串聯起來。
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
爲了能獲取任務線程內發生的異常,須要使用 CompletableFuture的completeExceptionally
方法將致使CompletableFuture內發生問題的異常拋出。
這樣,當執行任務發生異常時,調用get()
方法的線程將會收到一個 ExecutionException
異常,該異常接收了一個包含失敗緣由的Exception 參數。
/** * 任務沒有異常 正常執行,而後結束 */ @Test public void test1() throws ExecutionException, InterruptedException { CompletableFuture<String> completableFuture = new CompletableFuture<>(); new Thread(() -> { // 模擬執行耗時任務 System.out.println("task doing..."); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } // 告訴completableFuture任務已經完成 completableFuture.complete("ok"); }).start(); // 獲取任務結果,若是沒有完成會一直阻塞等待 String result = completableFuture.get(); System.out.println("計算結果:" + result); } /** * 線程有異常 正常執行,而後沒法結束,主線程會一直等待 */ @Test public void test2() throws ExecutionException, InterruptedException { CompletableFuture<String> completableFuture = new CompletableFuture<>(); new Thread(() -> { // 模擬執行耗時任務 System.out.println("task doing..."); try { Thread.sleep(3000); int i=1/0; } catch (InterruptedException e) { e.printStackTrace(); } // 告訴completableFuture任務已經完成 completableFuture.complete("ok"); }).start(); // 獲取任務結果,若是沒有完成會一直阻塞等待 String result = completableFuture.get(); System.out.println("計算結果:" + result); }
/** * 線程有異常 正常執行,而後經過completableFuture.completeExceptionally(e);告訴completableFuture任務發生異常了 * 主線程接收到 程序繼續處理,至結束 */ @Test public void test3() throws ExecutionException, InterruptedException { CompletableFuture<String> completableFuture = new CompletableFuture<>(); new Thread(() -> { // 模擬執行耗時任務 System.out.println("task doing..."); try { Thread.sleep(3000); int i = 1/0; } catch (Exception e) { // 告訴completableFuture任務發生異常了 completableFuture.completeExceptionally(e); } // 告訴completableFuture任務已經完成 completableFuture.complete("ok"); }).start(); // 獲取任務結果,若是沒有完成會一直阻塞等待 String result = completableFuture.get(); System.out.println("計算結果:" + result); }
allOf
是等待全部任務完成,構造後CompletableFuture完成
anyOf
是隻要有一個任務完成,構造後CompletableFuture就完成
package com.lhx.cloud.futruetask; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CompletableFutureDemo { public static void main(String[] args) { Long start = System.currentTimeMillis(); // 結果集 List<String> list = new ArrayList<>(); ExecutorService executorService = Executors.newFixedThreadPool(10); List<Integer> taskList = Arrays.asList(2, 1, 3, 4, 5, 6, 7, 8, 9, 10); // 全流式處理轉換成CompletableFuture[]+組裝成一個無返回值CompletableFuture,join等待執行完畢。返回結果whenComplete獲取 CompletableFuture[] cfs = taskList.stream() .map(integer -> CompletableFuture.supplyAsync(() -> calc(integer), executorService) .thenApply(h -> Integer.toString(h)) .whenComplete((s, e) -> { System.out.println(LocalDateTime.now()+"---任務" + s + "完成!result=" + s + ",異常 e=" + e); list.add(s); }) ).toArray(CompletableFuture[]::new); // 封裝後無返回值,必須本身whenComplete()獲取 CompletableFuture.allOf(cfs).join(); System.out.println("list=" + list + ",耗時=" + (System.currentTimeMillis() - start)); } public static Integer calc(Integer i) { try { if (i == 1) { Thread.sleep(3000);//任務1耗時3秒 } else if (i == 5) { Thread.sleep(5000);//任務5耗時5秒 } else { Thread.sleep(1000);//其它任務耗時1秒 } System.out.println(LocalDateTime.now()+"---task線程:" + Thread.currentThread().getName() + "任務i=" + i + ",完成!" ); } catch (InterruptedException e) { e.printStackTrace(); } return i; } }
描述 | Future | FutureTask | CompletionService | CompletableFuture |
---|---|---|---|---|
原理 | Future接口 | 接口RunnableFuture的惟一實現類,RunnableFuture接口繼承自Future+Runnable | 內部經過阻塞隊列+FutureTask接口 | JDK8實現了Future, CompletionStage兩個接口 |
多任務併發執行 | 支持 | 支持 | 支持 | 支持 |
獲取任務結果的順序 | 按照提交順序獲取結果 | 未知 | 支持任務完成的前後順序 | 支持任務完成的前後順序 |
異常捕捉 | 本身捕捉 | 本身捕捉 | 本身捕捉 | 原生API支持,返回每一個任務的異常 |
建議 | CPU高速輪詢,耗資源,或者阻塞,可使用,但不推薦 | 功能不對口,併發任務這一塊多套一層,不推薦使用 | 推薦使用,沒有JDK8CompletableFuture以前最好的方案 | API極端豐富,配合流式編程,推薦使用! |