Java異步編程指南

在咱們平時開發中或多或少都會遇到須要調用接口來完成一個功能的需求,這個接口能夠是內部系統也能夠是外部的,而後等到接口返回數據了才能繼續其餘的業務流程,這就是傳統的同步模式。html

同步模式雖然簡單但缺點也很明顯,若是對方服務處理緩慢遲遲未能返回數據,或網絡問題致使響應變長,就會阻塞咱們調用方的線程,致使咱們主流程的耗時latency延長,傳統的解決方式是增長接口的超時timeout設置,防止無限期等待。但即便這樣仍是會佔用CPU資源。java

在咱們作rpc遠程調用,redis,數據庫訪問等比較耗時的網絡請求時常常要面對這樣的問題,這種業務場景咱們能夠引入異步的編程思想,即主流程不須要阻塞等待接口返回數據,而是繼續往下執行,當真正須要這個接口返回結果時再經過回調或阻塞的方式獲取,此時咱們的主流程和異步任務是並行執行的。web

Java中實現異步主要是經過FutureCompletableFutureGuava ListenableFuture以及一些異步響應式框架如RxJava實現。redis

下面咱們主要看下這幾種組件適用的業務場景和須要注意的地方,避免踩坑。數據庫

一. Future

java.util.concurrent.Future是JDK5引入的,用來獲取一個異步計算的結果。你可使用isDone方法檢查計算是否完成,也可使用get阻塞住調用線程,直到計算完成返回結果,你也可使用cancel方法中止任務的執行。編程

image

Future的api說明api

實際開發中咱們通常會結合線程池的submit配合使用,代碼以下:服務器

package com.javakk;

import java.util.concurrent.*;

public class FutureTest {
    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newCachedThreadPool(); // 線程池
        Future<String> future = executor.submit(() ->{
            Thread.sleep(200); // 模擬接口調用,耗時200ms
            return "hello world";
        });
        // 在輸出下面異步結果時主線程能夠不阻塞的作其餘事情
        // TODO 其餘業務邏輯

        System.out.println("異步結果:"+future.get()); //主線程獲取異步結果
        // 或者經過下面輪詢的方式
        // while(!future.isDone());
    }
}

// 輸出結果:
異步結果:hello world

簡單的說我有一個任務,提交給了Future,Future替我完成這個任務,這期間我能夠去作別的事情。一段時間以後,我再從Future取出結果。網絡

上面的代碼有2個地方須要注意,在15行不建議使用future.get()方式,而應該使用future.get(long timeout, TimeUnit unit)尤爲是在生產環境必定要設置合理的超時時間,防止程序無限期等待下去。另外就是要考慮異步任務執行過程當中報錯拋出異常的狀況,須要捕獲future的異常信息。併發

經過代碼能夠看出一些簡單的異步場景可使用Future解決,可是對於結果的獲取卻不是很方便,只能經過阻塞或者輪詢的方式獲得任務的結果。阻塞的方式至關於把異步變成了同步,顯然和異步編程的初衷相違背,輪詢的方式又會浪費CPU資源。

Future沒有提供通知的機制,就是回調,咱們沒法知道它什麼時間完成任務

並且在複雜一點的狀況下,好比多個異步任務的場景,一個異步任務依賴上一個異步任務的執行結果,異步任務合併等,Future沒法知足需求。

二.ListenableFuture

Google併發包下的listenableFuture對Java原生的future作了擴展,顧名思義就是使用監聽器模式實現的回調,因此叫可監聽的future。

在咱們公司早期的項目裏(jdk8以前的版本)都是使用listenableFuture來實現異步編程。

要使用listenableFuture還要結合MoreExecutor線程池,MoreExecutor是對Java原生線程池的封裝,好比經常使用的MoreExecutors.listeningDecorator(threadPool); 修改Java原生線程池的submit方法,封裝了future返回listenableFuture

代碼示例以下:

// ListeningExecutorService繼承jdk的ExecutorService接口,重寫了submit方法,修改返回值類型爲ListenableFuture
ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
[ListenableFuture](http://javakk.com/tag/listenablefuture "查看更多關於 ListenableFuture 的文章")<String> listenableFuture = executor.submit(() -> {
    Thread.sleep(200); // 模擬接口調用,耗時200ms
    return "hello world";
});

上面的代碼是構造了一個ListenableFuture的異步任務,調用它的結果通常有兩種方式:

基於addListener

listenableFuture.addListener(() -> {
    try {
        System.out.println("異步結果:" + listenableFuture.get());
    } catch (Exception e) {
        e.printStackTrace();
    }
}, executor);

// 輸出結果:
異步結果:hello world

基於addCallback:

Futures.addCallback(listenableFuture, new FutureCallback<String>() {
    @Override
    public void onSuccess(String result) {
        System.out.println("異步結果:" + result);
    }

    @Override
    public void onFailure(Throwable t) {
        t.printStackTrace();
    }
}, executor);

// 輸出結果:
異步結果:hello world

其實兩種方式都是基於回調,具體使用哪一種看業務場景。

  • addListener須要本身代碼裏捕獲處理異常狀況,最好設置超時時間
  • addCallback把正常返回和異常狀況作了分離,方便咱們針對不一樣狀況作處理

另外Futures裏還有不少其餘的api,能夠知足咱們負責場景,好比transform()能夠處理異步任務之間的依賴狀況,allAsList()將多個ListenableFuture合併成一個。

三. CompletableFuture

若是大家公司的jdk是8或以上的版本,那能夠直接使用CompletableFuture類來實現異步編程。

Java8新增的CompletableFuture類借鑑了Google Guava的ListenableFuture,它包含50多個方法,默認使用forkJoinPool線程池,提供了很是強大的Future擴展功能,能夠幫助咱們簡化異步編程的複雜性,結合函數式編程,經過回調的方式處理計算結果,而且提供了轉換和組合CompletableFuture的多種方法,能夠知足大部分異步回調場景。

image

CompletableFuture的api

雖然方法不少但有個特徵:

  • 以Async結尾的方法簽名表示是在異步線程裏執行,沒有以Async結尾的方法則是由主線程調用
  • 若是參數裏有Runnable類型,則沒有返回結果,即純消費的方法
  • 若是參數裏沒有指定executor則默認使用forkJoinPool線程池,指定了則以指定的線程池來執行任務

下面就來看下經常使用的幾種api代碼示例:

轉換 : thenApplyAsync

CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() ->
    "hello"
);
// f2依賴f1的結果作轉換
CompletableFuture<String> f2 = f1.thenApplyAsync(t ->
    t + " world"
);
System.out.println("異步結果:" + f2.get());

// 輸出結果:
異步結果:hello world

這裏先說明一下,示例代碼只關注核心功能,若是要實際使用須要考慮超時和異常狀況,你們須要注意。

在上面的代碼中異步任務f2須要異步任務f1的結果才能執行,但對於咱們的主線程來講,無須等到f1返回結果後再調用函數f2,即不會阻塞主流程,而是告訴CompletableFuture當執行完了f1的方法再去執行f2,只有當須要最後的結果時再獲取。

組合 : thenComposeAsync

CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() ->
    "hello"
);
// f2雖然依賴f1的結果,但不會等待f1結果返回,而是再包裝成一個future返回
CompletableFuture<String> f2 = f1.thenComposeAsync(t ->
    CompletableFuture.supplyAsync(() ->
            t + " world"
    )
);
// 等到真正調用的時候再執行f2裏的邏輯
System.out.println("異步結果:" + f2.get());

// 輸出結果:
異步結果:hello world

經過代碼註釋能看出thenCompose至關於flatMap,避免CompletableFuture<CompletableFuture<String>>這種寫法。

這也是thenComposethenApply的區別,經過查看api也能看出:

thenApply:

public <U> CompletableFuture<U> thenApply(
    Function<? super T,? extends U> fn) {
    return uniApplyStage(null, fn);
}

thenCompose:

public <U> CompletableFuture<U> thenCompose(
    Function<? super T, ? extends CompletionStage<U>> fn) {
    return uniComposeStage(screenExecutor(executor), fn);
}

合併 : thenCombineAsync

CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(1000); // 模擬接口調用耗時1秒
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "hello";
});
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(1000); // 模擬接口調用耗時1秒
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return " world";
});
CompletableFuture<String> f3 = f1.thenCombineAsync(f2, (t1, t2) ->
    t1 + t2
);
long time = System.currentTimeMillis();
System.out.println("異步結果:" + f3.get());
System.out.println("耗時:" + (System.currentTimeMillis() - time));

// 輸出結果:
異步結果:hello world
耗時:1002

從代碼輸出結果能夠看到兩個異步任務f一、f2是並行執行,彼此無前後依賴順序,thenCombineAsync適合將兩個並行執行的異步任務的結果合併返回成一個新的future。

還有一個相似的方法thenAcceptBoth也是合併兩個future的結果,可是不會返回新的值,內部消費掉了。

二選一 : applyToEitherAsync

Random rand = new Random();
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(1000 + rand.nextInt(1000)); // 模擬接口調用耗時1秒
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "hello";
});
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(1000 + rand.nextInt(1000)); // 模擬接口調用耗時1秒
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "world";
});
CompletableFuture<String> f3 = f1.applyToEitherAsync(f2, t -> t);
long time = System.currentTimeMillis();
System.out.println("異步結果:" + f3.get());
System.out.println("耗時:" + (System.currentTimeMillis() - time));

輸出的結果有時候是hello 有時候是world,哪一個future先執行完就根據它的結果計算,取兩個future最早返回的。

這裏要說明一點,若是兩個future是同時返回結果,那麼applyToEitherAsync永遠以第一個future的結果爲準,你們能夠把上面代碼的Thread.sleep註釋掉測試下。

另外acceptEither方法和這個相似,可是沒有返回值。

allOf / anyOf

前面講的compose,combine,either都是處理兩個future的方法,若是是超過2個的可使用allOfanyOf

allOf:

CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(1000); // 模擬接口調用耗時1秒
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "hello";
});
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(1000); // 模擬接口調用耗時1秒
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "world";
});
CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(1000); // 模擬接口調用耗時1秒
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "java老k";
});
List<CompletableFuture<String>> list = new ArrayList<>();
list.add(f1);
list.add(f2);
list.add(f3);

CompletableFuture<Void> f4 = CompletableFuture.allOf(list.toArray(new CompletableFuture[]{}));
long time = System.currentTimeMillis();
f4.thenRunAsync(() ->
    list.forEach(f -> {
        try {
            System.out.println("異步結果:" + f.get());
        } catch (Exception e) {
            e.printStackTrace();
        }
    })
);
f4.get();
System.out.println("耗時:" + (System.currentTimeMillis() - time));
// 輸出結果:
耗時:1004
異步結果:hello
異步結果:world
異步結果:java老k

allOf方法是當全部的CompletableFuture都執行完後執行計算,無返回值。

anyOf:

Random rand = new Random(); // 隨機數
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(1000 + rand.nextInt(1000)); // 模擬接口調用耗時1秒
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "hello";
});
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(1000 + rand.nextInt(1000)); // 模擬接口調用耗時1秒
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "world";
});
CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(1000 + rand.nextInt(1000)); // 模擬接口調用耗時1秒
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "java老k";
});

CompletableFuture<Object> f4 = CompletableFuture.anyOf(f1, f2, f3);
long time = System.currentTimeMillis();
System.out.println("異步結果:" + f4.get());
System.out.println("耗時:" + (System.currentTimeMillis() - time));

// 輸出結果:
異步結果:java老k
耗時:1075

屢次執行輸出的結果不同,anyOf方法當任意一個CompletableFuture執行完後就會執行計算。

雖說CompletableFuture更適合I/O場景,但使用時必定要結合具體業務,好比說有些公共方法處理異步任務時須要考慮異常狀況,這時候使用CompletableFuture.handle(BiFunction<? super T, Throwable, ? extends U> fn)更合適,handle方法會處理正常計算值和異常,所以它能夠屏蔽異常,避免異常繼續拋出

CompletableFuture還有一個坑須要注意:若是線上流量比較大的狀況下會出現響應緩慢的問題。

由於CompletableFuture默認使用的線程池是forkJoinPool,當時對一臺使用了CompletableFuture實現異步回調功能的接口作壓測,經過監控系統發現有大量的ForkJoinPool.commonPool-worker-* 線程處於等待狀態,進一步分析dump信息發現是forkJoinPool的makeCommonPool問題,以下圖:

image
image

看到這你們應該清楚了,若是在項目裏沒有設置java.util.concurrent.ForkJoinPool.common.parallelism的值,那麼forkJoinPool線程池的線程數就是(cpu-1),咱們測試環境的機器是2核,這樣實際執行任務的線程數只有1個,當有大量請求過來時,若是有耗時高的io操做,勢必會形成更多的線程等待,進而拖累服務響應時間。

解決方案一個是設置java.util.concurrent.ForkJoinPool.common.parallelism這個值(要在項目啓動時指定),或者指定線程池不使用默認的forkJoinPool。

forkJoinPoll線程池不瞭解的能夠看下這篇文章:線程池ForkJoinPool簡介

線程數如何設置能夠參考《Java併發編程實戰》這本書給出的建議,以下圖:

image

線程池設置線程數公式:

threads = N CPU U CPU (1 + W/C)

其中:

  • N CPU 是處理器的核數
  • U CPU 是指望的CPU利用率(介於0和1之間)
  • W/C是等待時間與計算時間的比率

網上也有這麼區分的:

若是服務是cpu密集型的,設置爲電腦的核數

若是服務是io密集型的,設置爲電腦的核數*2

其實我以爲並不嚴謹,尤爲是io密集型的還要參考QPS和web服務器的配置。

線程池使用不當形成的後果和分析能夠在推薦閱讀裏瞭解。

今天主要講了java實現異步編程的幾種方式,你們能夠結合本身的實際狀況參考,下次有時間會跟你們分享下咱們另一個項目如何使用RxJava實現的全異步化服務。

文章來源:http://javakk.com/225.html

相關文章
相關標籤/搜索