Java8 - 使用CompletableFuture(一)

Future 的使用 

自Java 5開始添加了Future,用來描述一個異步計算的結果。獲取一個結果時方法較少,要麼經過輪詢isDone(),確認完成後調用get()獲取值,要麼調用get()設置一個超時時間。可是get()方法會阻塞調用線程,這種阻塞的方式顯然和咱們的異步編程的初衷相違背。如:html

package com.common.future;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class FutureRunnerTest {

    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newSingleThreadExecutor();

        //提交一個 Callable
        Future<Integer> f = es.submit(() -> {
            // 長時間的異步計算
            Thread.sleep(2000L);
            System.out.println("長時間的異步計算");
            return 100;
        });

        // 輪詢
        while (true) {
            System.out.println("f.isDone");
            if (f.isDone()) {
                try {
                    System.out.println(f.get());
                    es.shutdown();
                    break;
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            }
            Thread.sleep(100L);
        }
    }
}

雖然Future以及相關使用方法提供了異步執行任務的能力,可是對於結果的獲取倒是很不方便,只能經過阻塞或者輪詢的方式獲得任務的結果。阻塞的方式顯然和咱們的異步編程的初衷相違背,輪詢的方式又會耗費無謂的CPU資源,並且也不能及時地獲得計算結果。java

Future 的侷限性

  1. 不能手動完成
    當你寫了一個函數,用於經過一個遠程API獲取一個電子商務產品最新價格。由於這個 API 太耗時,你把它容許在一個獨立的線程中,而且從你的函數中返回一個 Future。如今假設這個API服務宕機了,這時你想經過該產品的最新緩存價格手工完成這個Future 。你會發現沒法這樣作。
  2. Future 的結果在非阻塞的狀況下,不能執行更進一步的操做
    Future 不會通知你它已經完成了,它提供了一個阻塞的 get() 方法通知你結果。你沒法給 Future 植入一個回調函數,當 Future 結果可用的時候,用該回調函數自動的調用 Future 的結果。
  3. 多個 Future 不能串聯在一塊兒組成鏈式調用
    有時候你須要執行一個長時間運行的計算任務,而且當計算任務完成的時候,你須要把它的計算結果發送給另一個長時間運行的計算任務等等。你會發現你沒法使用 Future 建立這樣的一個工做流。
  4. 不能組合多個 Future 的結果
    假設你有10個不一樣的Future,你想並行的運行,而後在它們運行未完成後運行一些函數。你會發現你也沒法使用 Future 這樣作。
  5. 沒有異常處理
    Future API 沒有任務的異常處理結構竟然有如此多的限制,幸虧咱們有CompletableFuture,你可使用 CompletableFuture 達到以上全部目的。

CompletableFuture 實現了 Future 和 CompletionStage接口,而且提供了許多關於建立,鏈式調用和組合多個 Future 的便利方法集,並且有普遍的異常處理支持。web

 

建立CompletableFuture對象

在該類中提供了四個靜態方法建立CompletableFuture對象:編程

  1. 使用new 建立CompletableFuture 
  2. 使用completedFuture方法運行一個成功的CompletableFuture
  3. 使用 runAsync() 運行異步計算
  4. 使用 supplyAsync() 運行一個異步任務而且返回結果
package com.common.future;

import java.util.concurrent.*;

public class CompletableFutureRunnerTest {

    // 建立一個固定大小的線程池子
    static ExecutorService executor = Executors.newFixedThreadPool(3, new ThreadFactory() {
        int count = 1;

        @Override
        public Thread newThread(Runnable runnable) {
            return new Thread(runnable, "custom-executor-" + count++);
        }
    });

    public static void sleep() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        // 直接使用new 建立
        CompletableFuture newCompletable = new CompletableFuture();

        // 經過給定的值 建立一個 已經完成的 future
        CompletableFuture<String> cf = CompletableFuture.completedFuture("message");
        System.out.println(cf.isDone());
        System.out.println(cf.getNow(null));

        // 使用 runAsync() 運行異步計算
        CompletableFuture<Void> completableFuture01 = CompletableFuture.runAsync(() -> {
            sleep();
            System.out.println("runAsync...");
        });

        CompletableFuture<Void> completableFuture02 = CompletableFuture.runAsync(() -> {
            sleep();
            System.out.println("runAsync...");
        }, executor);

        // 使用 supplyAsync() 運行一個異步任務而且返回結果
        CompletableFuture<String> completableFuture03 = CompletableFuture.supplyAsync(() -> {
            sleep();
            System.out.println("supplyAsync03...");
            return "hello world";
        });

        System.out.println(completableFuture03.isDone());
        
        // Block and wait for the future to complete
        System.out.println(completableFuture03.get());

        CompletableFuture<String> completableFuture04 = CompletableFuture.supplyAsync(() -> {
            sleep();
            System.out.println("supplyAsync04...");
            return "hello world";
        }, executor);

        System.out.println(completableFuture04.isDone());
        System.out.println(completableFuture04.get());

    }
}

上面示例中的isDone() ,get() 方法都是 繼承於 Future 接口中的,通常狀況下,使用CompletableFuture 不須要調用isDone() 方法判斷是否完成,也不須要調用get 方法獲取異步執行的結果的。緩存

 

CompletableFuture 計算結果完成時的處理

當CompletableFuture的計算結果完成時,有以下三個方法進行處理:異步

CompletableFuture<String> completableFuture03 = CompletableFuture.supplyAsync(() -> {
    sleep();
    System.out.println("supplyAsync03...");
    return "hello world";
});

System.out.println(completableFuture03.isDone());
System.out.println(completableFuture03.get());

completableFuture03.whenComplete((t, ex) -> {
    if (t != null) {
        System.out.println(t);
    } else {
        ex.printStackTrace();
    }
});

completableFuture03.whenCompleteAsync((t, ex) -> {
    if (t != null) {
        System.out.println(t);
    } else {
        ex.printStackTrace();
    }
});

completableFuture03.whenCompleteAsync((t, ex) -> {
    if (t != null) {
        System.out.println(t);
    } else {
        ex.printStackTrace();
    }
}, executor);

若是拋出了異常,對異常的處理以下所示,ide

CompletableFuture<Integer> completableFuture05 = CompletableFuture.supplyAsync(() -> {
    sleep();
    return 1 / 0;
});

completableFuture05.whenComplete((t, ex) -> {
    if (t != null) {
        System.out.println(t);
    } else {
        ex.printStackTrace();
    }
});

這裏首先判斷 t 的值是否爲空,若是爲空直接打印異常的堆棧信息異步編程

java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
	at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.ArithmeticException: / by zero
	at com.common.future.CompletableFutureRunnerTest.lambda$main$4(CompletableFutureRunnerTest.java:93)
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
	... 5 more

還有一個專門處理異常狀況的方法,如 exceptionally,函數

CompletableFuture<Integer> completableFuture05 = CompletableFuture.supplyAsync(() -> {
    sleep();
    return 1 / 0;
});

completableFuture05.exceptionally((e) -> {
    e.printStackTrace();
    return 0;
}).whenComplete((t, e) -> {
    if (t != null) {
        System.out.println(t);
    } else {
        e.printStackTrace();
    }
}).join();

 

進行轉換-thenApply 操做

public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor);

這一組函數的功能是當原來的CompletableFuture計算完後,將結果傳遞給函數fn,將fn的結果做爲新的CompletableFuture計算結果。所以它的功能至關於將CompletableFuture<T>轉換成CompletableFuture<U>網站

不以Async結尾的方法由原來的線程計算,以Async結尾的方法由默認的線程池ForkJoinPool.commonPool()或者指定的線程池executor運行。看下面的例子,

package com.common.future;

import java.util.concurrent.CompletableFuture;

public class ThenApplyTest {

    public static void main(String[] args) {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
        String f = future.thenApplyAsync(i -> i * 10).thenApply(i -> i.toString()).join();
        System.out.println(f);

        CompletableFuture.supplyAsync(() -> "hello world".substring(0, 5))
            .thenApply(String::length)
            .whenComplete((r, t) -> {
                if (t == null) {
                    System.out.println("the length = " + r);
                }
            });

    }
}

 

進行消費-thenAccept & thenRun操做

若是你不想從你的回調函數中返回任何東西,僅僅想在Future完成後運行一些代碼片斷,你可使用thenAccept() 和 thenRun()方法,這些方法常常在調用鏈的最末端的最後一個回調函數中使用。

public CompletableFuture<Void> thenAccept(Consumer<? super T> action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor);

public CompletableFuture<Void> thenRun(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor);

CompletableFuture.thenAccept() 持有一個Consumer<T> ,返回一個CompletableFuture<Void>。它能夠訪問CompletableFuture的結果:

// thenAccept() example
CompletableFuture.supplyAsync(() -> {
    return ProductService.getProductDetail(productId);
}).thenAccept(product -> {
    System.out.println("Got product detail from remote service " + product.getName())
});

thenAcceptBoth以及相關方法提供了相似的功能,當兩個CompletionStage都正常完成計算的時候,就會執行提供的action,它用來組合另一個異步的結果。

//thenAccept
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action);
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action);
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor);

以下面的使用示例,

CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "first";
    }).thenAcceptBoth(CompletableFuture.completedFuture("second"), (first, second) -> System.out.println(first + " : " + second)).join();

雖然thenAccept()能夠訪問CompletableFuture的結果,但thenRun()不能訪Future的結果,它持有一個Runnable返回CompletableFuture<Void>:

// thenRun() example
CompletableFuture.supplyAsync(() -> {
    // Run some computation  
}).thenRun(() -> {
    // Computation Finished.
});

 

組合兩個CompletableFuture

1. 使用thenCompose()組合兩個獨立的future

假設你想從一個遠程API中獲取一個用戶的詳細信息,一旦用戶信息可用,你想從另一個服務中獲取他的貸款利率。
考慮下如下兩個方法getUserDetail() getCreditRating()的實現:

static CompletableFuture<User> getUsersDetail(String userId) {
    return CompletableFuture.supplyAsync(() -> {
        return new User("12312121", "xiaoming", 12);
    });
}

static CompletableFuture<Double> getCreditRating(User user) {
    return CompletableFuture.supplyAsync(() -> {
        if (user.getUserId() == null || user.getUserId().equals("")) {
            return 0.0;
        } else {
            return 0.1;
        }
    });
}

如今讓咱們弄明白當使用了thenApply()後是否會達到咱們指望的結果-

// 若是使用 thenApply ,則返回了一個 最終結果是一個嵌套的CompletableFuture。
CompletableFuture<CompletableFuture<Double>> res =
    getUsersDetail("testUserId").thenApply(user -> getCreditRating(user));

在更早的示例中,Supplier函數傳入thenApply將返回一個簡單的值,可是在本例中,將返回一個CompletableFuture。以上示例的最終結果是一個嵌套的CompletableFuture。
若是你想獲取最終的結果給最頂層future,使用 thenCompose()方法代替-

// 若是你想獲取最終的結果給最頂層future,使用 thenCompose()方法代替-
CompletableFuture<Double> result = getUsersDetail("testUserId")
    .thenCompose(user -> getCreditRating(user));

所以,規則就是-若是你的回調函數返回一個CompletableFuture,可是你想從CompletableFuture鏈中獲取一個直接合並後的結果,這時候你可使用thenCompose()

2. 使用thenCombine()組合兩個獨立的 future

雖然thenCompose()被用於當一個future依賴另一個future的時候用來組合兩個future。thenCombine()被用來當兩個獨立的Future都完成的時候,用來作一些事情。

System.out.println("Retrieving weight.");
CompletableFuture<Double> weightInKgFuture = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    return 65.0;
});

System.out.println("Retrieving height.");
CompletableFuture<Double> heightInCmFuture = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    return 177.8;
});

System.out.println("Calculating BMI.");
CompletableFuture<Double> combinedFuture = weightInKgFuture
    .thenCombine(heightInCmFuture, (weightInKg, heightInCm) -> {
        Double heightInMeter = heightInCm / 100;
        return weightInKg / (heightInMeter * heightInMeter);
    });

System.out.println("Your BMI is - " + combinedFuture.get());

當兩個Future都完成的時候,傳給``thenCombine()的回調函數將被調用。

 

組合多個CompletableFuture

咱們使用thenCompose() 和 thenCombine()把兩個CompletableFuture組合在一塊兒。如今若是你想組合任意數量的CompletableFuture,應該怎麼作?咱們可使用如下兩個方法組合任意數量的CompletableFuture。

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);

CompletableFuture.allOf()

CompletableFuture.allOf的使用場景是當你有一個列表的獨立future,而且你想在它們都完成後並行的作一些事情。

假設你想下載一個網站的100個不一樣的頁面。你能夠串行的作這個操做,可是這很是消耗時間。所以你想寫一個函數,傳入一個頁面連接,返回一個CompletableFuture,異步的下載頁面內容。

public static CompletableFuture<String> downloadWebPage(String pageLink) {
    return CompletableFuture.supplyAsync(() -> {
        // Code to download and return the web page's content
        try {
            String html = Jsoup.connect(pageLink).execute().body();
            return html;
        } catch (IOException e) {
            e.printStackTrace();
        }
        return "";
    });
}

如今,當全部的頁面已經下載完畢,你想計算包含關鍵字CompletableFuture頁面的數量。可使用CompletableFuture.allOf()達成目的。

List<String> webPageLinks = Arrays.asList("https://my.oschina.net/xinxingegeya/blog/674006",
    "https://my.oschina.net/xinxingegeya/blog/637773", "https://my.oschina.net/xinxingegeya/blog/2222079");

List<CompletableFuture<String>> pageContentFutures = webPageLinks.stream()
    .map(webPageLink -> downloadWebPage(webPageLink))
    .collect(Collectors.toList());

// Create a combined Future using allOf()
CompletableFuture<Void> allFutures = CompletableFuture.allOf(pageContentFutures.toArray(new CompletableFuture[0]));

使用CompletableFuture.allOf()的問題是它返回CompletableFuture<Void>。可是咱們能夠經過寫一些額外的代碼來獲取全部封裝的CompletableFuture結果。

// When all the Futures are completed, call `future.join()` to get their results and collect the results in a list -
CompletableFuture<List<String>> allPageContentsFuture = allFutures.thenApply(v -> {
    return pageContentFutures.stream()
        .map(pageContentFuture -> pageContentFuture.join())
        .collect(Collectors.toList());
});

花一些時間理解下以上代碼片斷。當全部future完成的時候,咱們調用了future.join(),所以咱們不會在任何地方阻塞。

join()方法和get()方法很是相似,這惟一不一樣的地方是若是最頂層的CompletableFuture完成的時候發生了異常,它會拋出一個未經檢查的異常。

如今讓咱們計算包含關鍵字頁面的數量。

// Count the number of web pages having the "CompletableFuture" keyword.
CompletableFuture<Long> countFuture = allPageContentsFuture.thenApply(pageContents -> {
    return pageContents.stream()
        .filter(pageContent -> pageContent.contains("CompletableFuture"))
        .count();
});

System.out.println("Number of Web Pages having CompletableFuture keyword - " + countFuture.get());

完整的代碼以下,

package common.future;

import org.jsoup.Jsoup;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

public class MoreCompletableFutureTest {

    public static CompletableFuture<String> downloadWebPage(String pageLink) {
        return CompletableFuture.supplyAsync(() -> {
            // Code to download and return the web page's content
            try {
                String html = Jsoup.connect(pageLink).execute().body();
                return html;
            } catch (IOException e) {
                e.printStackTrace();
            }
            return "";
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        List<String> webPageLinks = Arrays.asList("https://my.oschina.net/xinxingegeya/blog/674006",
            "https://my.oschina.net/xinxingegeya/blog/637773", "https://my.oschina.net/xinxingegeya/blog/2222079");

        List<CompletableFuture<String>> pageContentFutures = webPageLinks.stream()
            .map(webPageLink -> downloadWebPage(webPageLink))
            .collect(Collectors.toList());

        // Create a combined Future using allOf()
        CompletableFuture<Void> allFutures = CompletableFuture.allOf(pageContentFutures.toArray(new CompletableFuture[0]));

        // When all the Futures are completed, call `future.join()` to get their results and collect the results in a list -
        CompletableFuture<List<String>> allPageContentsFuture = allFutures.thenApply(v -> {
            return pageContentFutures.stream()
                .map(pageContentFuture -> pageContentFuture.join())
                .collect(Collectors.toList());
        });

        // Count the number of web pages having the "CompletableFuture" keyword.
        CompletableFuture<Long> countFuture = allPageContentsFuture.thenApply(pageContents -> {
            return pageContents.stream()
                .filter(pageContent -> pageContent.contains("CompletableFuture"))
                .count();
        });

        System.out.println("Number of Web Pages having CompletableFuture keyword - " + countFuture.get());

    }
}

CompletableFuture.anyOf()

CompletableFuture.anyOf()和其名字介紹的同樣,當任何一個CompletableFuture完成的時候【相同的結果類型】,返回一個新的CompletableFuture。如下示例:

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 1";
});

CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 2";
});

CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 3";
});

CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2, future3);

System.out.println(anyOfFuture.get()); // Result of Future 2

在以上示例中,當三個中的任何一個CompletableFuture完成, anyOfFuture就會完成。由於future2的休眠時間最少,所以她最早完成,最終的結果將是future2的結果。

CompletableFuture.anyOf()傳入一個Future可變參數,返回CompletableFuture<Object>。CompletableFuture.anyOf()的問題是若是你的CompletableFuture返回的結果是不一樣類型的,這時候你講會不知道你最終CompletableFuture是什麼類型。

======END======

相關文章
相關標籤/搜索