異步神器CompletableFuture

介紹

上個禮拜咱們線上有個接口比較慢,這個接口在剛開始響應時間是正常的。但隨着數據量的增多,響應時間變慢了。java

這個接口裏面順序調用了2個服務,且2個服務之間沒有數據依賴。我就用CompletableFuture把調用2個服務的過程異步化了一下,響應時間也基本上縮短爲原來的一半,問題解決。web

正好上次分享了函數式接口和Stream的使用,此次就分享一下CompletableFuture,裏面也用到了大量的函數式接口編程

想方便的異步執行任務,就必須放到單獨的線程中。繼承Thread類,實現Runnable都不能拿到任務的執行結果,這時就不得不提建立線程的另外一種方式了,實現Callable接口。微信

@FunctionalInterface
public interface Callable<V{
    call() throws Exception;
}

Callable接口通常配合ExecutorService來使用app

// ExecutorService.java
<T> Future<T> submit(Callable<T> task);
ExecutorService executor = Executors.newCachedThreadPool();
Future<Integer> result = executor.submit(() -> {
    int sum = 0;
    for (int i = 0; i < 100; i++) {
        sum += i;
    }
    return sum;
});
// 4950
System.out.println(result.get());

咱們從Future中獲取結果dom

public interface Future<V{

 // 取消任務的執行
    boolean cancel(boolean mayInterruptIfRunning);

 // 任務是否已經取消
    boolean isCancelled();

 // 任務是否已經完成
    boolean isDone();

 // 獲取任務執行結果,會阻塞線程
    get() throws InterruptedException, ExecutionException;

 // 超時獲取任務執行結果,會阻塞線程
    get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException
;
}

對於簡單的場景使用Future並無什麼不方便。可是一些複雜的場景就很麻煩, 如2個異步任務,其中一個有結果就直接返回。Future用起來就不方便,由於想獲取結果時,要麼執行future.get()方法,可是這樣會阻塞線程,變成同步操做,要麼輪詢isDone()方法,可是比較耗費CPU資源。異步

Netty和Google guava爲了解決這個問題,在Future的基礎上引入了觀察者模式(即在Future上addListener),當計算結果完成時通知監聽者。編輯器

Java8新增的CompletableFuture則借鑑了Netty等對Future的改造,簡化了異步編程的複雜性,而且提供了函數式編程的能力函數式編程

建立CompletableFuture對象

方法名 描述
completedFuture(U value) 返回一個已經計算好的CompletableFuture
runAsync(Runnable runnable) 使用ForkJoinPool.commonPool()做爲線程池執行任務,沒有返回值
runAsync(Runnable runnable, Executor executor) 使用指定的線程池執行任務,沒有返回值
supplyAsync(Supplier<U> supplier) 使用ForkJoinPool.commonPool()做爲線程池執行任務,有返回值
supplyAsync(Supplier<U> supplier, Executor executor) 使用指定的線程池執行任務,有返回值
@FunctionalInterface
public interface Supplier<T{
    get();
}

Supplier在《用好強大的Stream》中已經介紹過了,是一個能獲取返回值的函數式接口異步編程

CompletableFuture<Integer> intFuture = CompletableFuture.completedFuture(100);
// 100
System.out.println(intFuture.get());

CompletableFuture<Void> voidFuture = CompletableFuture.runAsync(() -> System.out.println("hello"));
// null
System.out.println(voidFuture.get());

CompletableFuture<String> stringFuture = CompletableFuture.supplyAsync(() -> "hello");
// hello
System.out.println(stringFuture.get());

計算結果完成時

方法名
whenComplete(BiConsumer<? super T,? super Throwable> action)
whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)

由於入參是BiConsumer<? super T,? super Throwable>函數式接口,因此能夠處理正常和異常的計算結果

whenComplete和whenCompleteAsync的區別以下

  1. whenComplete:執行完當前任務的線程繼續執行whenComplete的任務
  2. whenCompleteAsync:把whenCompleteAsync這個任務提交給線程池來執行

CompletableFuture的全部方法的定義和whenComplete都很相似

  1. 方法不以Async結尾意味着使用相同的線程執行
  2. 方法以Async結尾意味着將任務提交到線程池來執行
  3. 方法以Async結尾時能夠用ForkJoinPool.commonPool()做爲線程池,也可使用本身的線程池

後續介紹的全部方法都只寫一種case

CompletableFuture future = CompletableFuture.supplyAsync(() -> {
    return "hello";
}).whenComplete((v, e) -> {
    // hello
    System.out.println(v);
});
// hello
System.out.println(future.get());

轉換,消費,執行

方法名 描述
thenApply 獲取上一個任務的返回,並返回當前任務的值
thenAccept 獲取上一個任務的返回,單純消費,沒有返回值
thenRun 上一個任務執行完成後,開始執行thenRun中的任務
CompletableFuture.supplyAsync(() -> {
    return "hello ";
}).thenAccept(str -> {
    // hello world
    System.out.println(str + "world");
}).thenRun(() -> {
    // task finish
    System.out.println("task finish");
});

組合(兩個任務都完成)

方法名 描述
thenCombine 組合兩個future,獲取兩個future的返回結果,並返回當前任務的返回值
thenAcceptBoth 組合兩個future,獲取兩個future任務的返回結果,而後處理任務,沒有返回值
runAfterBoth 組合兩個future,不須要獲取future的結果,只需兩個future處理完任務後,處理該任務
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
    return "歡迎關注 ";
}).thenApply(t -> {
    return t + "微信公衆號 ";
}).thenCombine(CompletableFuture.completedFuture("Java識堂"), (t, u) -> {
    return t + u;
}).whenComplete((t, e) -> {
    // 歡迎關注 微信公衆號 Java識堂
    System.out.println(t);
});

組合(只須要一個任務完成)

方法名 描述
applyToEither 兩個任務有一個執行完成,獲取它的返回值,處理任務並返回當前任務的返回值
acceptEither 兩個任務有一個執行完成,獲取它的返回值,處理任務,沒有返回值
runAfterEither 兩個任務有一個執行完成,不須要獲取future的結果,處理任務,也沒有返回值
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    sleepRandom();
    return "歡迎關注微信公衆號";
});
CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
    sleepRandom();
    return "Java識堂";
});
CompletableFuture future = future1.applyToEither(future2, str -> str);
// 歡迎關注微信公衆號 Java識堂 隨機輸出
System.out.println(future.get());

sleepRandom()爲我寫的一個隨機暫停的函數

多任務組合

方法名 描述
allOf 當全部的CompletableFuture完成後執行計算
anyOf 任意一個CompletableFuture完成後執行計算

allOf的使用

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    sleepRandom();
    return "歡迎關注";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    sleepRandom();
    return "微信公衆號";
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
    sleepRandom();
    return "Java識堂";
});
// 歡迎關注 微信公衆號 Java識堂
CompletableFuture.allOf(future1, future2, future3)
        .thenApply(v ->
                Stream.of(future1, future2, future3)
                        .map(CompletableFuture::join)
                        .collect(Collectors.joining(" ")))
        .thenAccept(System.out::print);

anyOf的使用

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    sleepRandom();
    return "歡迎關注";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    sleepRandom();
    return "微信公衆號";
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
    sleepRandom();
    return "Java識堂";
});
CompletableFuture<Object> resultFuture = CompletableFuture.anyOf(future1, future2, future3);
// 歡迎關注 微信公衆號 Java識堂 隨機輸出
System.out.println(resultFuture.get());

異常處理

方法名 描述
exceptionally 捕獲異常,進行處理
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    return 100 / 0;
}).thenApply(num -> {
    return num + 10;
}).exceptionally(throwable -> {
    return 0;
});
// 0
System.out.println(future.get());

固然有一些接口能捕獲異常

CompletableFuture future = CompletableFuture.supplyAsync(() -> {
    String str = null;
    return str.length();
}).whenComplete((v, e) -> {
    if (e == null) {
        System.out.println("正常結果爲" + v);
    } else {
        // 發生異常了java.util.concurrent.CompletionException: java.lang.NullPointerException
        System.out.println("發生異常了" + e.toString());
    }
});

歡迎關注


本文分享自微信公衆號 - Java識堂(erlieStar)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索