Java 1.5開始,提供了Callable和Future,經過它們能夠在任務執行完畢以後獲得任務執行結果。java
Future接口能夠構建異步應用,是多線程開發中常見的設計模式。設計模式
當咱們須要調用一個函數方法時。若是這個函數執行很慢,那麼咱們就要進行等待。但有時候,咱們可能並不急着要結果。多線程
所以,咱們可讓被調用者當即返回,讓他在後臺慢慢處理這個請求。對於調用者來講,則能夠先處理一些其餘任務,在真正須要數據的場合再去嘗試獲取須要的數據。app
java.lang.Runnable是一個接口,在它裏面只聲明瞭一個run()方法,run返回值是void,任務執行完畢後沒法返回任何結果dom
public interface Runnable {
public abstract void run(); }
Callable位於java.util.concurrent包下,它也是一個接口,在它裏面也只聲明瞭一個方法叫作call(),這是一個泛型接口,call()函數返回的類型就是傳遞進來的V類型異步
public interface Callable<V> { V call() throws Exception; }
Future就是對於具體的Runnable或者Callable任務的執行結果進行取消、查詢是否完成、獲取結果。必要時能夠經過get方法獲取執行結果,該方法會阻塞直到任務返回結果ide
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
怎麼使用Future和Callable呢?通常狀況下是配合ExecutorService來使用的,在ExecutorService接口中聲明瞭若干個submit方法的重載版本函數
<T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task);
Future+Callable,使用示例以下(採用第一個方法):spa
import java.util.Random;
import java.util.concurrent.*;
/**
* @program: callable
* @description: Test
* @author: Mr.Wang
* @create: 2018-08-12 11:37
**/
public class MyTest {
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
Future<Integer> result = executor.submit(new Callable<Integer>() {
public Integer call() throws Exception {
return new Random().nextInt();
}
});
executor.shutdown();
try {
System.out.println("result:" + result.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
結果:線程
result:297483790
其它方式:
import java.util.Random;
import java.util.concurrent.*;
/**
* @program: callable
* @description: testfuture
* @author: Mr.Wang
* @create: 2018-08-12 12:11
**/
public class Testfuture {
public static void main(String[] args){
//第一種方式
FutureTask<Integer> task = new FutureTask<Integer>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return new Random().nextInt();
}
});
new Thread(task).start();
//第二種方方式
// ExecutorService executor = Executors.newSingleThreadExecutor();
// FutureTask<Integer> task = new FutureTask<Integer>(new Callable<Integer>() {
// @Override
// public Integer call() throws Exception {
// return new Random().nextInt();
// }
// });
// executor.submit(task);
try {
System.out.println("result: "+task.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
result:-358490809
瞭解了Future的使用,這裏就要談談Future的侷限性。Future很難直接表述多個Future 結果之間的依賴性,開發中,咱們常常須要達成如下目的:
首先,CompletableFuture類實現了CompletionStage和Future接口,所以你能夠像Future那樣使用它。
莫急,下面經過例子來一步一步解釋CompletableFuture的使用。
說明:Async結尾的方法都是能夠異步執行的,若是指定了線程池,會在指定的線程池中執行,若是沒有指定,默認會在ForkJoinPool.commonPool()中執行。下面不少方法都是相似的,再也不作特別說明。
四個靜態方法用來爲一段異步執行的代碼建立CompletableFuture對象,方法的參數類型都是函數式接口,因此可使用lambda表達式實現異步任務
runAsync方法:它以Runnabel函數式接口類型爲參數,因此CompletableFuture的計算結果爲空。
supplyAsync方法以Supplier<U>函數式接口類型爲參數,CompletableFuture的計算結果類型爲U。
public static CompletableFuture<Void> runAsync(Runnable runnable) public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn); public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn); public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor);
這些方法的輸入是上一個階段計算後的結果,返回值是通過轉化後結果
例子:
import java.util.concurrent.CompletableFuture; /** * @program: callable * @description: test * @author: Mr.Wang * @create: 2018-08-12 12:36 **/ public class TestCompleteFuture { public static void main(String[] args){ String result = CompletableFuture.supplyAsync(()->{return "Hello ";}).thenApplyAsync(v -> v + "world").join(); System.out.println(result); } }
結果:
Hello world
public CompletionStage<Void> thenAccept(Consumer<? super T> action); public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action); public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
這些方法只是針對結果進行消費,入參是Consumer,沒有返回值
例子:
import java.util.concurrent.CompletableFuture; /** * @program: callable * @description: test * @author: Mr.Wang * @create: 2018-08-12 12:36 **/ public class TestCompleteFuture { public static void main(String[] args){ CompletableFuture.supplyAsync(()->{return "Hello ";}).thenAccept(v -> { System.out.println("consumer: " + v);}); } }
結果:
consumer: Hello
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn); public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn); public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
須要上一階段的返回值,而且other表明的CompletionStage也要返回值以後,把這兩個返回值,進行轉換後返回指定類型的值。
說明:一樣,也存在對兩個CompletionStage結果進行消耗的一組方法,例如thenAcceptBoth,這裏再也不進行示例。
例子:
import java.util.concurrent.CompletableFuture; /** * @program: callable * @description: test * @author: Mr.Wang * @create: 2018-08-12 12:36 **/ public class TestCompleteFuture { public static void main(String[] args){ String result = CompletableFuture.supplyAsync(()->{ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "Hello"; }).thenCombine(CompletableFuture.supplyAsync(()->{ try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "world"; }),(s1,s2)->{return s1 + " " + s2;}).join(); System.out.println(result); } }
結果:
Hello world
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn); public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn); public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);
兩種渠道完成同一個事情,就能夠調用這個方法,找一個最快的結果進行處理,最終有返回值。
例子:
import java.util.concurrent.CompletableFuture; /** * @program: callable * @description: test * @author: Mr.Wang * @create: 2018-08-12 12:36 **/ public class TestCompleteFuture { public static void main(String[] args){ String result = CompletableFuture.supplyAsync(()->{ try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } return "Hi Boy"; }).applyToEither(CompletableFuture.supplyAsync(()->{ try { Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } return "Hi Girl"; }),(s)->{return s;}).join(); System.out.println(result); } }
結果:
Hi Boy
public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);
例子:
import java.util.concurrent.CompletableFuture; /** * @program: callable * @description: test * @author: Mr.Wang * @create: 2018-08-12 12:36 **/ public class TestCompleteFuture { public static void main(String[] args){ String result = CompletableFuture.supplyAsync(()->{ try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } if(true) { throw new RuntimeException("exception test!"); } return "Hi Boy"; }).exceptionally(e->{ System.out.println(e.getMessage()); return "Hello world!"; }).join(); System.out.println(result); } }
結果:
java.lang.RuntimeException: exception test!
Hello world!
OK,瞭解了以上使用,基本上就對CompletableFuture比較清楚了。
後面會找個時間說說CompletableFuture實現原理