![](http://static.javashuo.com/static/loading.gif)
介紹
上個禮拜咱們線上有個接口比較慢,這個接口在剛開始響應時間是正常的。但隨着數據量的增多,響應時間變慢了。java
這個接口裏面順序調用了2個服務,且2個服務之間沒有數據依賴。我就用CompletableFuture把調用2個服務的過程異步化了一下,響應時間也基本上縮短爲原來的一半,問題解決。web
![](http://static.javashuo.com/static/loading.gif)
正好上次分享了函數式接口和Stream的使用,此次就分享一下CompletableFuture,裏面也用到了大量的函數式接口編程
想方便的異步執行任務,就必須放到單獨的線程中。繼承Thread類,實現Runnable都不能拿到任務的執行結果,這時就不得不提建立線程的另外一種方式了,實現Callable接口。微信
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
Callable接口通常配合ExecutorService來使用app
// ExecutorService.java
<T> Future<T> submit(Callable<T> task);
ExecutorService executor = Executors.newCachedThreadPool();
Future<Integer> result = executor.submit(() -> {
int sum = 0;
for (int i = 0; i < 100; i++) {
sum += i;
}
return sum;
});
// 4950
System.out.println(result.get());
咱們從Future中獲取結果dom
public interface Future<V> {
// 取消任務的執行
boolean cancel(boolean mayInterruptIfRunning);
// 任務是否已經取消
boolean isCancelled();
// 任務是否已經完成
boolean isDone();
// 獲取任務執行結果,會阻塞線程
V get() throws InterruptedException, ExecutionException;
// 超時獲取任務執行結果,會阻塞線程
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
對於簡單的場景使用Future並無什麼不方便。可是一些複雜的場景就很麻煩, 如2個異步任務,其中一個有結果就直接返回。Future用起來就不方便,由於想獲取結果時,要麼執行future.get()方法,可是這樣會阻塞線程,變成同步操做,要麼輪詢isDone()方法,可是比較耗費CPU資源。異步
Netty和Google guava爲了解決這個問題,在Future的基礎上引入了觀察者模式(即在Future上addListener),當計算結果完成時通知監聽者。編輯器
Java8新增的CompletableFuture則借鑑了Netty等對Future的改造,簡化了異步編程的複雜性,而且提供了函數式編程的能力函數式編程
建立CompletableFuture對象
方法名 | 描述 |
---|---|
completedFuture(U value) | 返回一個已經計算好的CompletableFuture |
runAsync(Runnable runnable) | 使用ForkJoinPool.commonPool()做爲線程池執行任務,沒有返回值 |
runAsync(Runnable runnable, Executor executor) | 使用指定的線程池執行任務,沒有返回值 |
supplyAsync(Supplier<U> supplier) | 使用ForkJoinPool.commonPool()做爲線程池執行任務,有返回值 |
supplyAsync(Supplier<U> supplier, Executor executor) | 使用指定的線程池執行任務,有返回值 |
@FunctionalInterface
public interface Supplier<T> {
T get();
}
Supplier在《用好強大的Stream》中已經介紹過了,是一個能獲取返回值的函數式接口異步編程
CompletableFuture<Integer> intFuture = CompletableFuture.completedFuture(100);
// 100
System.out.println(intFuture.get());
CompletableFuture<Void> voidFuture = CompletableFuture.runAsync(() -> System.out.println("hello"));
// null
System.out.println(voidFuture.get());
CompletableFuture<String> stringFuture = CompletableFuture.supplyAsync(() -> "hello");
// hello
System.out.println(stringFuture.get());
計算結果完成時
方法名 |
---|
whenComplete(BiConsumer<? super T,? super Throwable> action) |
whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) |
whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor) |
由於入參是BiConsumer<? super T,? super Throwable>函數式接口,因此能夠處理正常和異常的計算結果
whenComplete和whenCompleteAsync的區別以下
-
whenComplete:執行完當前任務的線程繼續執行whenComplete的任務 -
whenCompleteAsync:把whenCompleteAsync這個任務提交給線程池來執行
CompletableFuture的全部方法的定義和whenComplete都很相似
-
方法不以Async結尾意味着使用相同的線程執行 -
方法以Async結尾意味着將任務提交到線程池來執行 -
方法以Async結尾時能夠用ForkJoinPool.commonPool()做爲線程池,也可使用本身的線程池
後續介紹的全部方法都只寫一種case
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
return "hello";
}).whenComplete((v, e) -> {
// hello
System.out.println(v);
});
// hello
System.out.println(future.get());
轉換,消費,執行
方法名 | 描述 |
---|---|
thenApply | 獲取上一個任務的返回,並返回當前任務的值 |
thenAccept | 獲取上一個任務的返回,單純消費,沒有返回值 |
thenRun | 上一個任務執行完成後,開始執行thenRun中的任務 |
CompletableFuture.supplyAsync(() -> {
return "hello ";
}).thenAccept(str -> {
// hello world
System.out.println(str + "world");
}).thenRun(() -> {
// task finish
System.out.println("task finish");
});
組合(兩個任務都完成)
方法名 | 描述 |
---|---|
thenCombine | 組合兩個future,獲取兩個future的返回結果,並返回當前任務的返回值 |
thenAcceptBoth | 組合兩個future,獲取兩個future任務的返回結果,而後處理任務,沒有返回值 |
runAfterBoth | 組合兩個future,不須要獲取future的結果,只需兩個future處理完任務後,處理該任務 |
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
return "歡迎關注 ";
}).thenApply(t -> {
return t + "微信公衆號 ";
}).thenCombine(CompletableFuture.completedFuture("Java識堂"), (t, u) -> {
return t + u;
}).whenComplete((t, e) -> {
// 歡迎關注 微信公衆號 Java識堂
System.out.println(t);
});
組合(只須要一個任務完成)
方法名 | 描述 |
---|---|
applyToEither | 兩個任務有一個執行完成,獲取它的返回值,處理任務並返回當前任務的返回值 |
acceptEither | 兩個任務有一個執行完成,獲取它的返回值,處理任務,沒有返回值 |
runAfterEither | 兩個任務有一個執行完成,不須要獲取future的結果,處理任務,也沒有返回值 |
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
sleepRandom();
return "歡迎關注微信公衆號";
});
CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
sleepRandom();
return "Java識堂";
});
CompletableFuture future = future1.applyToEither(future2, str -> str);
// 歡迎關注微信公衆號 Java識堂 隨機輸出
System.out.println(future.get());
sleepRandom()爲我寫的一個隨機暫停的函數
多任務組合
方法名 | 描述 |
---|---|
allOf | 當全部的CompletableFuture完成後執行計算 |
anyOf | 任意一個CompletableFuture完成後執行計算 |
allOf的使用
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
sleepRandom();
return "歡迎關注";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
sleepRandom();
return "微信公衆號";
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
sleepRandom();
return "Java識堂";
});
// 歡迎關注 微信公衆號 Java識堂
CompletableFuture.allOf(future1, future2, future3)
.thenApply(v ->
Stream.of(future1, future2, future3)
.map(CompletableFuture::join)
.collect(Collectors.joining(" ")))
.thenAccept(System.out::print);
anyOf的使用
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
sleepRandom();
return "歡迎關注";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
sleepRandom();
return "微信公衆號";
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
sleepRandom();
return "Java識堂";
});
CompletableFuture<Object> resultFuture = CompletableFuture.anyOf(future1, future2, future3);
// 歡迎關注 微信公衆號 Java識堂 隨機輸出
System.out.println(resultFuture.get());
異常處理
方法名 | 描述 |
---|---|
exceptionally | 捕獲異常,進行處理 |
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 100 / 0;
}).thenApply(num -> {
return num + 10;
}).exceptionally(throwable -> {
return 0;
});
// 0
System.out.println(future.get());
固然有一些接口能捕獲異常
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
String str = null;
return str.length();
}).whenComplete((v, e) -> {
if (e == null) {
System.out.println("正常結果爲" + v);
} else {
// 發生異常了java.util.concurrent.CompletionException: java.lang.NullPointerException
System.out.println("發生異常了" + e.toString());
}
});
歡迎關注
![](http://static.javashuo.com/static/loading.gif)
本文分享自微信公衆號 - Java識堂(erlieStar)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。