Java8——異步編程

Java8——異步編程

異步編程

所謂異步其實就是實現一個無需等待被調用函數的返回值而讓操做繼續運行的方法java

建立任務並執行任務

無參建立

CompletableFuture<String> noArgsFuture = new CompletableFuture<>();

傳入相應任務,無返回值

runAsync方法能夠在後臺執行異步計算,可是此時並無返回值。持有一個Runnable對象。git

CompletableFuture noReturn = CompletableFuture.runAsync(()->{
    //執行邏輯,無返回值
});

傳入相應任務,有返回值

此時咱們看到返回的是CompletableFuture<T>此處的T就是你想要的返回值的類型。其中的Supplier<T>是一個簡單的函數式接口。github

CompletableFuture<String> hasReturn = CompletableFuture.supplyAsync(new Supplier<String>() {
    @Override
    public String get() {
        return "hasReturn";
    }
});

此時可使用lambda表達式使上面的邏輯更加清晰編程

CompletableFuture<String> hasReturnLambda = CompletableFuture.supplyAsync(TestFuture::get);

private static String get() {
    return "hasReturnLambda";
}

獲取返回值

異步任務也是有返回值的,當咱們想要用到異步任務的返回值時,咱們能夠調用CompletableFuture get()阻塞,直到有異步任務執行完有返回值才往下執行。異步

咱們將上面的get()方法改造一下,使其停頓十秒時間。ide

private static String get() {
    System.out.println("Begin Invoke getFuntureHasReturnLambda");
    try {
        Thread.sleep(10000);
    } catch (InterruptedException e) {

    }
    System.out.println("End Invoke getFuntureHasReturnLambda");
    return "hasReturnLambda";
}

而後進行調用異步編程

public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture<String> funtureHasReturnLambda = (CompletableFuture<String>) getFuntureHasReturnLambda();
    System.out.println("Main Method Is Invoking");
    funtureHasReturnLambda.get();
    System.out.println("Main Method End");
}

能夠看到輸出以下,只有調用get()方法的時候纔會阻塞當前線程。函數

Main Method Is Invoking
Begin Invoke getFuntureHasReturnLambda
End Invoke getFuntureHasReturnLambda
Main Method End

自定義返回值

除了等待異步任務返回值之外,咱們也能夠在任意時候調用complete()方法來自定義返回值。post

CompletableFuture<String> funtureHasReturnLambda = (CompletableFuture<String>) getFuntureHasReturnLambda();
System.out.println("Main Method Is Invoking");
new Thread(()->{
    System.out.println("Thread Is Invoking ");
    try {
        Thread.sleep(1000);
        funtureHasReturnLambda.complete("custome value");
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("Thread End ");
}).run();
String value = funtureHasReturnLambda.get();
System.out.println("Main Method End value is "+ value);

咱們能夠發現輸出是新起線程的輸出值,固然這是由於咱們的異步方法設置了等待10秒,若是此時異步方法等待1秒,新起的線程等待10秒,那麼輸出的值就是異步方法中的值了。線程

Main Method Is Invoking
Begin Invoke getFuntureHasReturnLambda
Thread Is Invoking 
Thread End 
Main Method End value is custome value

按順序執行異步任務

若是有一個異步任務的完成須要依賴前一個異步任務的完成,那麼該如何寫呢?是調用get()方法得到返回值之後而後再執行嗎?這樣寫有些麻煩,CompletableFuture 爲咱們提供了方法來完成咱們想要順序執行一些異步任務的需求。thenApply thenAccept thenRun 這三個方法。這三個方法的區別就是。

方法名 是否可得到前一個任務的返回值 是否有返回值
thenApply 能得到
thenAccept 能得到
thenRun 不可得到

因此通常來講thenAccept thenRun 這兩個方法在調用鏈的最末端使用。接下來咱們用真實的例子感覺一下。

//thenApply  可獲取到前一個任務的返回值,也有返回值
CompletableFuture<String> seqFutureOne = CompletableFuture.supplyAsync(()-> "seqFutureOne");
CompletableFuture<String> seqFutureTwo = seqFutureOne.thenApply(name -> name + " seqFutureTwo");
System.out.println(seqFutureTwo.get());


//thenAccept  可獲取到前一個任務的返回值,可是無返回值
CompletableFuture<Void> thenAccept = seqFutureOne
        .thenAccept(name -> System.out.println(name + "thenAccept"));
System.out.println("-------------");
System.out.println(thenAccept.get());

//thenRun 獲取不到前一個任務的返回值,也無返回值
System.out.println("-------------");
CompletableFuture<Void> thenRun = seqFutureOne.thenRun(() -> {
    System.out.println("thenRun");
});
System.out.println(thenRun.get());

返回的信息以下

seqFutureOne seqFutureTwo
seqFutureOnethenAccept
-------------
null
-------------
thenRun
null

thenApply和thenApplyAsync的區別

咱們能夠發現這三個方法都帶有一個後綴爲Async的方法,例如thenApplyAsync。那麼帶Async的方法和不帶此後綴的方法有什麼不一樣呢?咱們就以thenApplythenApplyAsync 兩個方法進行對比,其餘的和這個同樣的。

這兩個方法區別就在於誰去執行這個任務,若是使用thenApplyAsync ,那麼執行的線程是從ForkJoinPool.commonPool()中獲取不一樣的線程進行執行,若是使用thenApply ,若是supplyAsync 方法執行速度特別快,那麼thenApply 任務就是主線程進行執行,若是執行特別慢的話就是和supplyAsync 執行線程同樣。接下來咱們經過例子來看一下,使用sleep方法來反應supplyAsync 執行速度的快慢。

//thenApply和thenApplyAsync的區別
System.out.println("-------------");
CompletableFuture<String> supplyAsyncWithSleep = CompletableFuture.supplyAsync(()->{
    try {
        Thread.sleep(10000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "supplyAsyncWithSleep Thread Id : " + Thread.currentThread();
});
CompletableFuture<String> thenApply = supplyAsyncWithSleep
        .thenApply(name -> name + "------thenApply Thread Id : " + Thread.currentThread());
CompletableFuture<String> thenApplyAsync = supplyAsyncWithSleep
        .thenApplyAsync(name -> name + "------thenApplyAsync Thread Id : " + Thread.currentThread());
System.out.println("Main Thread Id: "+ Thread.currentThread());
System.out.println(thenApply.get());
System.out.println(thenApplyAsync.get());
System.out.println("-------------No Sleep");
CompletableFuture<String> supplyAsyncNoSleep = CompletableFuture.supplyAsync(()->{
    return "supplyAsyncNoSleep Thread Id : " + Thread.currentThread();
});
CompletableFuture<String> thenApplyNoSleep = supplyAsyncNoSleep
        .thenApply(name -> name + "------thenApply Thread Id : " + Thread.currentThread());
CompletableFuture<String> thenApplyAsyncNoSleep = supplyAsyncNoSleep
        .thenApplyAsync(name -> name + "------thenApplyAsync Thread Id : " + Thread.currentThread());
System.out.println("Main Thread Id: "+ Thread.currentThread());
System.out.println(thenApplyNoSleep.get());
System.out.println(thenApplyAsyncNoSleep.get());

咱們能夠看到輸出爲

-------------
Main Thread Id: Thread[main,5,main]
supplyAsyncWithSleep Thread Id : Thread[ForkJoinPool.commonPool-worker-1,5,main]------thenApply Thread Id : Thread[ForkJoinPool.commonPool-worker-1,5,main]
supplyAsyncWithSleep Thread Id : Thread[ForkJoinPool.commonPool-worker-1,5,main]------thenApplyAsync Thread Id : Thread[ForkJoinPool.commonPool-worker-1,5,main]
-------------No Sleep
Main Thread Id: Thread[main,5,main]
supplyAsyncNoSleep Thread Id : Thread[ForkJoinPool.commonPool-worker-2,5,main]------thenApply Thread Id : Thread[main,5,main]
supplyAsyncNoSleep Thread Id : Thread[ForkJoinPool.commonPool-worker-2,5,main]------thenApplyAsync Thread Id : Thread[ForkJoinPool.commonPool-worker-2,5,main]

能夠看到supplyAsync方法執行速度慢的話thenApply方法執行線程和supplyAsync 執行線程相同,若是supplyAsync 方法執行速度快的話,那麼thenApply方法執行線程和Main方法執行線程相同。

組合CompletableFuture

將兩個CompletableFuture組合到一塊兒有兩個方法

  1. thenCompose():當第一個任務完成時纔會執行第二個操做
  2. thenCombine():兩個異步任務所有完成時纔會執行某些操做

thenCompose() 用法

咱們定義兩個異步任務,假設第二個定時任務須要用到第一個定時任務的返回值。

public static CompletableFuture<String> getTastOne(){
    return CompletableFuture.supplyAsync(()-> "topOne");
}

public static CompletableFuture<String> getTastTwo(String s){
    return CompletableFuture.supplyAsync(()-> s + "  topTwo");
}

咱們利用thenCompose()方法進行編寫

CompletableFuture<String> thenComposeComplet = getTastOne().thenCompose(s -> getTastTwo(s));
System.out.println(thenComposeComplet.get());

輸出就是

topOne  topTwo

若是還記得前面的thenApply()方法的話,應該會想這個利用thenApply()方法也是可以實現相似的功能的。

//thenApply
CompletableFuture<CompletableFuture<String>> thenApply = getTastOne()
        .thenApply(s -> getTastTwo(s));
System.out.println(thenApply.get().get());

可是咱們發現返回值是嵌套返回的一個類型,而想要得到最終的返回值須要調用兩次get()

thenCombine() 用法

例如咱們此時須要計算兩個異步方法返回值的和。求和這個操做是必須是兩個異步方法得出來值的狀況下才能進行計算,所以咱們能夠用thenCombine()方法進行計算。

CompletableFuture<Integer> thenComposeOne = CompletableFuture.supplyAsync(() -> 192);
CompletableFuture<Integer> thenComposeTwo = CompletableFuture.supplyAsync(() -> 196);
CompletableFuture<Integer> thenComposeCount = thenComposeOne
        .thenCombine(thenComposeTwo, (s, y) -> s + y);
System.out.println(thenComposeCount.get());

此時thenComposeOne thenComposeTwo 都完成時纔會調用傳給thenCombine 方法的回調函數。

組合多個CompletableFuture

在上面咱們用thenCompose()thenCombine()兩個方法將兩個CompletableFuture 組裝起來,若是咱們想要將任意數量的CompletableFuture 組合起來呢?可使用下面兩個方法進行組合。

  • allOf():等待全部CompletableFuture 完後之後纔會運行回調函數
  • anyOf():只要其中一個CompletableFuture 完成,那麼就會執行回調函數。注意此時其餘的任務也就不執行了。

接下來演示一下兩個方法的用法

//allOf()
CompletableFuture<Integer> one = CompletableFuture.supplyAsync(() -> 1);
CompletableFuture<Integer> two = CompletableFuture.supplyAsync(() -> 2);
CompletableFuture<Integer> three = CompletableFuture.supplyAsync(() -> 3);
CompletableFuture<Integer> four = CompletableFuture.supplyAsync(() -> 4);
CompletableFuture<Integer> five = CompletableFuture.supplyAsync(() -> 5);
CompletableFuture<Integer> six = CompletableFuture.supplyAsync(() -> 6);

CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(one, two, three, four, five, six);
voidCompletableFuture.thenApply(v->{
    return Stream.of(one,two,three,four, five, six)
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
}).thenAccept(System.out::println);

CompletableFuture<Void> voidCompletableFuture1 = CompletableFuture.runAsync(() -> {
    try {
        Thread.sleep(1000);
    } catch (Exception e) {

    }
    System.out.println("1");
});

咱們定義了6個CompletableFuture 等待全部的CompletableFuture等待全部任務完成之後而後將其值輸出。

anyOf()的用法

CompletableFuture<Void> voidCompletableFuture1 = CompletableFuture.runAsync(() -> {
try {
    Thread.sleep(1000);
} catch (Exception e) {

}
System.out.println("voidCompletableFuture1");
});

CompletableFuture<Void> voidCompletableFutur2 = CompletableFuture.runAsync(() -> {
try {
    Thread.sleep(2000);
} catch (Exception e) {

}
System.out.println("voidCompletableFutur2");
});

CompletableFuture<Void> voidCompletableFuture3 = CompletableFuture.runAsync(() -> {
try {
    Thread.sleep(3000);
} catch (Exception e) {

}
System.out.println("voidCompletableFuture3");
});

CompletableFuture<Object> objectCompletableFuture = CompletableFuture
    .anyOf(voidCompletableFuture1, voidCompletableFutur2, voidCompletableFuture3);
objectCompletableFuture.get();

這裏咱們定義了3個CompletableFuture進行一些耗時的任務,此時第一個CompletableFuture會率先完成。打印結果以下。

voidCompletableFuture1

異常處理

咱們瞭解了CompletableFuture 如何異步執行,如何組合不一樣的CompletableFuture ,如何順序執行CompletableFuture 。那麼接下來還有一個重要的一步,就是在執行異步任務時發生異常的話該怎麼辦。咱們先寫個例子。

CompletableFuture.supplyAsync(()->{
    //發生異常
    int i = 10/0;
    return "Success";
}).thenRun(()-> System.out.println("thenRun"))
.thenAccept(v -> System.out.println("thenAccept"));

CompletableFuture.runAsync(()-> System.out.println("CompletableFuture.runAsync"));

執行結果爲,咱們發現只要執行鏈中有一個發生了異常,那麼接下來的鏈條也就不執行了,可是主流程下的其餘CompletableFuture仍是會運行的。

CompletableFuture.runAsync

exceptionally()

咱們可使用exceptionally進行異常的處理

//處理異常

CompletableFuture<String> exceptionally = CompletableFuture.supplyAsync(() -> {
    //發生異常
    int i = 10 / 0;
    return "Success";
}).exceptionally(e -> {
    System.out.println(e);
    return "Exception has Handl";
});
System.out.println(exceptionally.get());

打印以下,能夠發現其接收值是異常信息,也可以返回自定義返回值。

java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
Exception has Handl

handle()

調用handle()方法也可以捕捉到異常而且自定義返回值,他和exceptionally()方法不一樣一點是handle()方法不管發沒發生異常都會被調用。例子以下

System.out.println("-------有異常-------");
CompletableFuture.supplyAsync(()->{
    //發生異常
    int i = 10/0;
    return "Success";
}).handle((response,e)->{
    System.out.println("Exception:" + e);
    System.out.println("Response:" + response);
    return response;
});

System.out.println("-------無異常-------");
CompletableFuture.supplyAsync(()->{
    return "Sucess";
}).handle((response,e)->{
    System.out.println("Exception:" + e);
    System.out.println("Response:" + response);
    return response;
});

打印以下,咱們能夠看到在沒有發生異常的時候handle()方法也被調用了

-------有異常-------
Exception:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
Response:null
-------無異常-------
Exception:null
Response:Sucess

源代碼地址

參考文章

相關文章
相關標籤/搜索