CompletableFuture讓你的代碼免受阻塞之苦

前言

如今大部分的CPU都是多核,咱們都知道想要提高咱們應用程序的運行效率,就必須得充分利用多核CPU的計算能力;Java早已經爲咱們提供了多線程的API,可是實現方式略微麻煩,今天咱們就來看看Java8在這方面提供的改善。segmentfault


假設場景

如今你須要爲在線教育平臺提供一個查詢用戶詳情的API,該接口須要返回用戶的基本信息,標籤信息,這兩個信息存放在不一樣位置,須要遠程調用來獲取這兩個信息;爲了模擬遠程調用,咱們須要在代碼裏面延遲 1s;服務器

public interface RemoteLoader {

    String load();

    default void delay() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public class CustomerInfoService implements RemoteLoader {

    public String load() {
        this.delay();
        return "基本信息";
    }

}

public class LearnRecordService implements RemoteLoader {

    public String load() {
        this.delay();
        return "學習信息";
    }

}

同步方式實現版本

若是咱們採用同步的方式來完成這個API接口,咱們的實現代碼:網絡

@Test
public void testSync() {
    long start = System.currentTimeMillis();
    List<RemoteLoader> remoteLoaders = Arrays.asList(new CustomerInfoService(), new LearnRecordService());
    List<String> customerDetail = remoteLoaders.stream().map(RemoteLoader::load).collect(toList());
    System.out.println(customerDetail);
    long end = System.currentTimeMillis();
    System.out.println("總共花費時間:" + (end - start));
}

不出所料,由於調用的兩個接口都是延遲了 1s ,因此結果大於2秒
result多線程


Future實現的版本

接下來咱們把這個例子用Java7提供的Future來實現異步的版本,看下效果如何呢?代碼以下:異步

@Test
public void testFuture() {
    long start = System.currentTimeMillis();
    ExecutorService executorService = Executors.newFixedThreadPool(2);
    List<RemoteLoader> remoteLoaders = Arrays.asList(new CustomerInfoService(), new LearnRecordService());
    List<Future<String>> futures = remoteLoaders.stream()
            .map(remoteLoader -> executorService.submit(remoteLoader::load))
            .collect(toList());

    List<String> customerDetail = futures.stream()
            .map(future -> {
                try {
                    return future.get();
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
                return null;
            })
            .filter(Objects::nonNull)
            .collect(toList());
    System.out.println(customerDetail);
    long end = System.currentTimeMillis();
    System.out.println("總共花費時間:" + (end - start));
}

此次咱們採用多線程的方式來改造了咱們這個例子,結果仍是比較滿意的,時間大概花費了1s多一點
resultide

注意:這裏我分紅了兩個Stream,如何合在一塊兒用同一個Stream,那麼在用 future.get()的時候會致使阻塞,至關於提交一個任務執行完後才提交下一個任務,這樣達不到異步的效果

這裏咱們能夠看到雖然Future達到了咱們預期的效果,可是若是須要實現將兩個異步的結果進行合併處理就稍微麻一些,這裏就不細說,後面主要看下CompletableFuture在這方面的改進學習


Java8並行流

以上咱們用的是Java8以前提供的方法來實現,接下來咱們來看下Java8中提供的並行流來實習咱們這個例子效果怎樣呢?優化

@Test
public void testParallelStream() {
    long start = System.currentTimeMillis();
    List<RemoteLoader> remoteLoaders = Arrays.asList(new CustomerInfoService(), new LearnRecordService());
    List<String> customerDetail = remoteLoaders.parallelStream().map(RemoteLoader::load).collect(toList());
    System.out.println(customerDetail);
    long end = System.currentTimeMillis();
    System.out.println("總共花費時間:" + (end - start));
}

運行的結果仍是至關的滿意,花費時間 1s 多點
resultthis

和Java8以前的實現對比,咱們發現整個代碼會更加的簡潔;spa

接下來咱們把咱們的例子改變一下,查詢用戶詳情的接口還須要返回視頻觀看記錄,用戶的標籤信息,購買訂單

public class WatchRecordService implements RemoteLoader {
    @Override
    public String load() {
        this.delay();
        return "觀看記錄";
    }
}

public class OrderService implements RemoteLoader {
    @Override
    public String load() {
        this.delay();
        return "訂單信息";
    }
}

public class LabelService implements RemoteLoader {
    @Override
    public String load() {
        this.delay();
        return "標籤信息";
    }
}

咱們繼續使用Java8提供的並行流來實現,看下運行的結果是否理想

@Test
public void testParallelStream2() {
    long start = System.currentTimeMillis();
    List<RemoteLoader> remoteLoaders = Arrays.asList(
            new CustomerInfoService(),
            new LearnRecordService(),
            new LabelService(),
            new OrderService(),
            new WatchRecordService());
    List<String> customerDetail = remoteLoaders.parallelStream().map(RemoteLoader::load).collect(toList());
    System.out.println(customerDetail);
    long end = System.currentTimeMillis();
    System.out.println("總共花費時間:" + (end - start));
}

可是此次運行的結果不是太理想,花費時間超過了2秒


CompletableFuture

基本的用法
@Test
public void testCompletableFuture() {
    CompletableFuture<String> future = new CompletableFuture<>();
    new Thread(() -> {
        doSomething();
        future.complete("Finish");          //任務執行完成後 設置返回的結果
    }).start();
    System.out.println(future.join());      //獲取任務線程返回的結果
}

private void doSomething() {
    System.out.println("doSomething...");
}

這種用法還有個問題,就是任務出現了異常,主線程會無感知,任務線程不會把異常給拋出來;這會致使主線程會一直等待,一般咱們也須要知道出現了什麼異常,作出對應的響應;改進的方式是在任務中try-catch全部的異常,而後調用future.completeExceptionally(e) ,代碼以下:

@Test
public void testCompletableFuture() throws ExecutionException, InterruptedException {
    CompletableFuture<String> future = new CompletableFuture<>();
    new Thread(() -> {
        try {
            doSomething();
            future.complete("Finish");
        } catch (Exception e) {
            future.completeExceptionally(e);
        }
    }).start();
    System.out.println(future.get());
}

private void doSomething() {
    System.out.println("doSomething...");
    throw new RuntimeException("Test Exception");
}

從如今來看CompletableFuture的使用過程須要處理的事情不少,不太簡潔,你會以爲看起來很麻煩;可是這只是表象,Java8其實對這個過程進行了封裝,提供了不少簡潔的操做方式;接下來咱們看下如何改造上面的代碼

@Test
public void testCompletableFuture2() throws ExecutionException, InterruptedException {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        doSomething();
        return "Finish";
    });
    System.out.println(future.get());
}

這裏咱們採用了supplyAsync,這下看起來簡潔了許多,世界都明亮了; Java8不只提供容許任務返回結果的supplyAsync,還提供了沒有返回值的runAsync;讓咱們能夠更加的關注業務的開發,不須要處理異常錯誤的管理


CompletableFuture異常處理

若是說主線程須要關心任務到底發生了什麼異常,須要對其做出相應操做,這個時候就須要用到exceptionally

@Test
public void testCompletableFuture2() throws ExecutionException, InterruptedException {
    CompletableFuture<String> future = CompletableFuture
            .supplyAsync(() -> {
                doSomething();
                return "Finish";
            })
            .exceptionally(throwable -> "Throwable exception message:" + throwable.getMessage());
    System.out.println(future.get());
}

使用CompletableFuture來完成咱們查詢用戶詳情的API接口
@Test
public void testCompletableFuture3() throws ExecutionException, InterruptedException {
    long start = System.currentTimeMillis();
    List<RemoteLoader> remoteLoaders = Arrays.asList(
            new CustomerInfoService(),
            new LearnRecordService(),
            new LabelService(),
            new OrderService(),
            new WatchRecordService());
    List<CompletableFuture<String>> completableFutures = remoteLoaders
            .stream()
            .map(loader -> CompletableFuture.supplyAsync(loader::load))
            .collect(toList());

    List<String> customerDetail = completableFutures
            .stream()
            .map(CompletableFuture::join)
            .collect(toList());
    
    System.out.println(customerDetail);
    long end = System.currentTimeMillis();
    System.out.println("總共花費時間:" + (end - start));
}

這裏依然是採用的兩個Stream來完成的,執行的結果以下:

result

這個結果不太滿意,和並行流的結果差很少,消耗時間 2秒多點;在這種場景下咱們用CompletableFuture作了這麼多工做,可是效果不理想,難道就有沒有其餘的方式可讓它在快一點嗎?

爲了解決這個問題,咱們必須深刻了解下並行流和CompletableFuture的實現原理,它們底層使用的線程池的大小都是CPU的核數Runtime.getRuntime().availableProcessors();那麼咱們來嘗試一下修改線程池的大小,看看效果如何?


自定義線程池,優化CompletableFuture

使用並行流沒法自定義線程池,可是CompletableFuture能夠

@Test
public void testCompletableFuture4() throws ExecutionException, InterruptedException {
    long start = System.currentTimeMillis();
    List<RemoteLoader> remoteLoaders = Arrays.asList(
            new CustomerInfoService(),
            new LearnRecordService(),
            new LabelService(),
            new OrderService(),
            new WatchRecordService());
    
    ExecutorService executorService = Executors.newFixedThreadPool(Math.min(remoteLoaders.size(), 50));
    
    List<CompletableFuture<String>> completableFutures = remoteLoaders
            .stream()
            .map(loader -> CompletableFuture.supplyAsync(loader::load, executorService))
            .collect(toList());

    List<String> customerDetail = completableFutures
            .stream()
            .map(CompletableFuture::join)
            .collect(toList());

    System.out.println(customerDetail);
    long end = System.currentTimeMillis();
    System.out.println("總共花費時間:" + (end - start));
}

咱們使用自定義線程池,設置最大的線程池數量50,來看下執行的結果
result

這下執行的結果比較滿意了,1秒多點;理論上來講這個結果能夠一直持續,直到達到線程池的大小50


並行流和CompletableFuture二者該如何選擇

這二者如何選擇主要看任務類型,建議

  1. 若是你的任務是計算密集型的,而且沒有I/O操做的話,那麼推薦你選擇Stream的並行流,實現簡單並行效率也是最高的
  2. 若是你的任務是有頻繁的I/O或者網絡鏈接等操做,那麼推薦使用CompletableFuture,採用自定義線程池的方式,根據服務器的狀況設置線程池的大小,儘量的讓CPU忙碌起來

CompletableFuture的其餘經常使用方法
  1. thenApply、thenApplyAsync: 假如任務執行完成後,還須要後續的操做,好比返回結果的解析等等;能夠經過這兩個方法來完成
  2. thenCompose、thenComposeAsync: 容許你對兩個異步操做進行流水線的操做,當第一個操做完成後,將其結果傳入到第二個操做中
  3. thenCombine、thenCombineAsync:容許你把兩個異步的操做整合;好比把第一個和第二個操做返回的結果作字符串的鏈接操做

總結

  1. Java8並行流的使用方式
  2. CompletableFuture的使用方式、異常處理機制,讓咱們有機會管理任務執行中發送的異常
  3. Java8並行流和CompletableFuture二者該如何選擇
  4. CompletableFuture的經常使用方法

原創不易 轉載請註明出處: https://silently9527.cn/archi...
相關文章
相關標籤/搜索