Java 8原生API也能夠開發響應式代碼?

asphalt-automobile-automotive-1172105.jpg

前段時間工做上比較忙,這篇文章一直沒來得及寫,本文是閱讀《Java8實戰》的時候,瞭解到Java 8裏已經提供了一個異步非阻塞的接口(CompletableFuture),能夠實現簡單的響應式編程的模式,所以用這篇文章作個梳理。我是帶着下面這幾個問題去學習CompletableFuture這個接口的,java

  1. CompletableFuture是爲了解決什麼問題而設計的?
  2. 它的使用場景是什麼?開源軟件中有實戰使用案例嗎?
  3. CompletableFuture的經常使用API都有哪些?如何使用?
  4. CompletableFuture和RxJava有什麼不一樣?

這篇文章梳理下來,基本上能夠回答前面四個問題,OK,咱們進入正文。面試

基本概念

圖片摘自Dubbo官方博客
RPC(遠程方法調用)的四種方式有:oneway、sync、future和callback,在dubbo或bolt這類通訊框架中,默認使用的是sync模式(同步+阻塞),future和callback都屬於異步模式,不過future模式在get的時候會阻塞,callback模式則不須要等待結果,有結果後服務端會回調請求方。

異步調用這類模式,比較適合的場景是IO密集型場景,要執行不少遠程調用的任務,而且這些調用耗時可能比較久。以openwrite中的一個case爲例:我發佈一篇文章,須要給幾個不一樣的寫做平臺建立文章,這時候我不但願這個過程是順序的,就比較適合用異步調用模式。編程

Future模式除了在get()調用的時候會阻塞外,還有其餘的侷限性,例如:沒有使用Java Lambda表達式的優點,對一連串的異步調用能夠支持,可是寫出來的代碼會比較複雜。後端

CompletableFuture的經常使用API

閱讀CompletableFuture的API的時候,我有一個體會——CompletableFuture之於Future,除了增長了回調這個最重要的特性,其餘的特性有點像Stream對於集合迭代的加強。app

使用CompletableFuture,咱們能夠像Stream同樣使用一部調用,能夠處理一些級聯的異步調用(相似於Stream裏的flatMap)、能夠過濾一些無用的異步調用(anyOf、allOf)。框架

下面這張圖是我按照本身的理解,梳理除了CompletableFuture常見的API,閱讀的時候須要注意下面幾個點:異步

  1. 把握幾個大的分類:建立CompletableFuture、獲取CompletableFuture的執行結果、主動結束CompletableFuture、異步調用任務的組合處理;
  2. 看着方法多,可是有規律可循,例如apply字樣的接口,傳入的方法參數都是有返回值的;
  3. 帶either字樣的,都是多個異步任務有一個知足條件便可的;
  4. 帶executor方法的,都表示該方法能夠用自定義的線程池來優化性能。

CompletableFuture的API

Dubbo項目中的使用案例

Dubbo對於異步化的支持起始在2.6.x中就有提供,是在發佈bean的時候加個屬性配置——async=true,而後利用上下文將異步標識一層層傳遞下去。在以前的公司中有一次排查dubbo(當時咱們用的是dubbox)異步調用的問題,最後查到的緣由就是多個異步調用,上下文裏的信息串了。async

Dubbo 2.7 中使用了 JDK1.8 提供的 CompletableFuture 原生接口對自身的異步化作了改進。CompletableFuture 能夠支持 future 和 callback 兩種調用方式。在Dubbo最新的master代碼中,我知道了Dubbo的異步結果的定義,它的類圖以下,能夠看出AsyncRpcResult是一個CompletableFuture接口的實現。ide

AsyncRpcResult.png

實戰Demo

經過下面的例子,能夠看出CompletableFuture的最大好處——callback特性。首先定義一個接口,其中包括同步接口和該接口的異步版本。性能

public interface AsyncInterfaceExample {

    String computeSomeThine();

    CompletableFuture<String> computeSomeThingAsync();
}
複製代碼

而後定義該接口的實現類,能夠看出,若是要講現有的同步接口異步化,是比較容易的;

public class AsyncInterfaceExampleImpl implements AsyncInterfaceExample {

    @Override
    public String computeSomeThine() {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return "hello, world";
    }

    @Override
    public CompletableFuture<String> computeSomeThingAsync() {
        return CompletableFuture.supplyAsync(this::computeSomeThine);
    }
}
複製代碼

而後看下咱們的測試case,以下:

public class AsyncInterfaceExampleTest {

    private static String getOtherThing() {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return "other";
    }

    public static void main(String[] args) {
        AsyncInterfaceExample asyncInterfaceExample = new AsyncInterfaceExampleImpl();

        //case1 同步調用
        long start = System.currentTimeMillis();
        String someThing = asyncInterfaceExample.computeSomeThine();
        String other = getOtherThing();
        System.out.println("cost:" + (System.currentTimeMillis() - start) + " result:" + someThing + other);

        //case2 異步調用,使用回調
        start = System.currentTimeMillis();
        CompletableFuture<String> someThingFuture = asyncInterfaceExample.computeSomeThingAsync();
        other = getOtherThing();

        long finalStart = start;
        String finalOther = other;
        someThingFuture.whenComplete((returnValue, exception) -> {
            if (exception == null) {
                System.out.println(
                    "cost:" + (System.currentTimeMillis() - finalStart) + " result:" + returnValue + finalOther);
            } else {
                exception.printStackTrace();
            }
        });
    }
}
複製代碼

上面這個案例的執行結果以下圖所示:

執行結果
*** 本號專一於後端技術、JVM問題排查和優化、Java面試題、我的成長和自我管理等主題,爲讀者提供一線開發者的工做和成長經驗,期待你能在這裏有所收穫。
javaadu
相關文章
相關標籤/搜索