微言異步回調

前言html

回調,顧名思義,回過頭來調用,詳細的說來就是用戶無需關心內部實現的具體邏輯,只須要在暴露出的回調函數中放入本身的業務邏輯便可。因爲回調機制解耦了框架代碼和業務代碼,因此能夠看作是對面向對象解耦的具體實踐之一。因爲本文的側重點在於講解後端回調,因此對於前端回調甚至於相似JSONP的回調函數類的,利用本章講解的知識進行代入的時候,請斟酌一二,畢竟後端和前端仍是有必定的區別,所謂差之毫釐,可能謬以千里,慎之。因此本章對回調的講解側重於後端,請知悉。前端

回調定義java

說到回調,其實個人理解相似於函數指針的功能,怎麼講呢?由於一個方法,一旦附加了回調入參,那麼用戶在進行調用的時候,這個回調入參是能夠用匿名方法直接替代的。回調的使用必須和方法的簽名保持一致性,下面咱們來看一個JDK實現的例子:git

    default boolean removeIf(Predicate<? super E> filter) {
        Objects.requireNonNull(filter);
        boolean removed = false;
        final Iterator<E> each = iterator();
        while (each.hasNext()) {
            if (filter.test(each.next())) {
                each.remove();
                removed = true;
            }
        }
        return removed;
    }

在JDK中,List結構有一個removeIf的方法,其實現方式如上所示。因爲附帶了具體的註釋講解,我這裏就再也不進行過多的講述。咱們須要着重關注的是其入參:Predicate,由於他就是一個函數式接口,入參爲泛型E,出參爲boolean,其實和Function<? super E, boolean>是等價的。因爲List是一個公共的框架代碼,裏面不可能糅合業務代碼,因此爲了解耦框架代碼和業務代碼,JDK使用了內置的各類函數式接口做爲方法的回調,將具體的業務實踐拋出去,讓用戶本身實現,而它本身只接受用戶返回的結果就好了:只要用戶處理返回true(filter.test(each.next()返回true),那麼我就刪掉當前遍歷的數據;若是用戶處理返回false(filter.test(each.next()返回false),那麼我就保留當前遍歷的數據。是否是很是的nice?github

其實這種完美的協做關係,在JDK類庫中隨處可見,在其餘常常用到的框架中也很常見,諸如Guava,Netty,實在是太多了(這也從側面說明,利用函數式接口解耦框架和業務,是正確的作法),我擷取了部分片斷以下:redis

//將map中的全部entry進行替換
void replaceAll(BiFunction<? super K, ? super V, ? extends V> function)
//將map中的entry進行遍歷
void forEach(BiConsumer<? super K, ? super V> action)
//map中的entry值若是有,則用新值從新創建映射關係
V computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction)

//Deque遍歷元素
void forEach(Consumer<? super T> action)
//Deque按給定條件移除元素
boolean removeIf(Predicate<? super E> filter)

//Guava中獲取特定元素
<T> T get(Object key, final Callable<T> valueLoader) 

//Netty中設置監聽
ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener)

那麼,回過頭來想一想,若是咱們封裝本身的組件,想要封裝的很JDK Style,該怎麼作呢?若是上來直接理解Predicate,Function,Callable,Consumer,我想不少人是有困難的,那就聽我慢慢道來吧。編程

咱們先假設以下一段代碼,這段代碼我相信不少人會很熟悉,不少人也封裝過,這就是咱們的大名鼎鼎的RedisUtils封裝類:後端

  /**
     * key遞增特定的num
     * @param key Redis中保存的key
     * @param num 遞增的值
     * @return key計算完畢後返回的結果值
     */
    public Long incrBy(String key,long num) {
        CallerInfo callerInfo = Profiler.registerInfo("sendCouponService.redis.incrBy", "sendCouponService", false, true);
        Long result;
        try {
            result = jrc.incrBy(key, num);
        } catch (Exception e) {
            logger.error("incrBy", null, "sendCouponService.redis.incrBy異常,key={},value={}", e, key, num);
            Profiler.functionError(callerInfo);
            throw new RuntimeException("sendCouponService.redis.incrBy異常,key=" + key, e);
        } finally {
            Profiler.registerInfoEnd(callerInfo);
        }
        return result;
    }

上面這段代碼只是一個示例,其實還有上百個方法基本上都是這種封裝結構,這種封裝有問題嗎?沒問題!並且封裝方式辛辣老道,一看就是高手所爲,由於既加了監控,又作了異常處理,並且還有錯誤日誌記錄,一旦發生問題,咱們可以第一時間知道哪裏出了問題,哪一個方法出了問題,而後設定對應的應對方法。api

這種封裝方式,若是當作普通的Util來用,徹底沒有問題,可是若是想封裝成組件,則欠缺點什麼,我列舉以下:數據結構

1. 當前代碼寫死了用jrc操做,若是後期切換到jimdb,是否是還得爲jimdb專門寫一套呢?

2. 當前代碼,上百個方法,其實不少地方都是重複的,惟有redis操做那塊不一樣,代碼重複度特別高,一旦擴展新方法,基本上是剖解原有代碼,而後拷貝現有方法,最後改爲新方法。

3. 當前方法,包含的都是redis單操做,若是遇到那種涉及到多個操做組合的(好比先set,而後expire或者更復雜一點),須要添加新方法,本質上這種新方法其實和業務性有關了。

從上面列出的這幾點來看,其實咱們能夠徹底將其打形成一個兼容jrc操做和cluster操做,同時具備良好框架擴展性(策略模式+模板模式)和良好代碼重複度控制(函數式接口回調)的框架。因爲本章涉及內容爲異步回調,因此這裏咱們將講解這種代碼如何保持良好的代碼重複度控制上。至於良好的框架擴展性,若是感興趣的話,我會在後面的章節進行講解。那麼咱們開始進行優化吧。

首先,找出公共操做部分(白色)和非公共操做部分(黃色):

/**
     * key遞增特定的num
     * @param key Redis中保存的key
     * @param num 遞增的值
     * @return key計算完畢後返回的結果值
     */
    public Long incrBy(String key,long num) {
        CallerInfo callerInfo = Profiler.registerInfo("sendCouponService.redis.incrBy", "sendCouponService", false, true);
        Long result;
        try {
            result = jrc.incrBy(key, num);
        } catch (Exception e) {
            logger.error("incrBy", null, "sendCouponService.redis.incrBy異常,key={},value={}", e, key, num);
            Profiler.functionError(callerInfo);
            return null;
        } finally {
            Profiler.registerInfoEnd(callerInfo);
        }
        return result;
    }

經過上面的標記,咱們發現非公共操做部分,有兩類:

1. ump提示語和日誌提示語不一致

2. 操做方法不一致

標記出來了公共操做部分,以後咱們開始封裝公共部分:

/**
     * 公共模板抽取
     *
     * @param method
     * @param callable
     * @param <T>
     * @return
     */
    public static <T> T invoke(String method) {
        CallerInfo info = Profiler.registerInfo(method, false, true);
        try {
            //TODO 這裏放置不一樣的redis操做方法
        } catch (Exception e) {
            logger.error(method, e);
            AlarmUtil.alarm(method + e.getCause());
            reutrn null;
        } finally {
            Profiler.registerInfoEnd(info);
        }
    }

可是這裏有個問題,咱們雖然把公共模板抽取出來了,可是TODO標籤裏面的內容怎麼辦呢? 如何把不一樣的redis操做方法傳遞進來呢?

其實在java中,咱們能夠利用接口的方式,將具體的操做代理出去,由外部調用者來實現,聽起來是否是感受又和IOC搭上了點關係,不錯,你想的沒錯,這確實是控制反轉依賴注入的一種作法,經過接口方式將具體的實踐代理出去,這也是進行回調操做的原理。接下來看咱們的改造:

    /**
     * redis操做接口
     */
    public interface RedisOperation<T>{
        //調用redis方法,入參爲空,出參爲T泛型
        T invoke();
    }

    /**
     *  redis操做公共模板
     * @param method
     * @param  redisOperation
     * @param <T>
     * @return
     */
    public static <T> T invoke(String method,RedisOperation redisOperation) {
        CallerInfo info = Profiler.registerInfo(method, false, true);
        try {
           return  redisOperation.invoke();
        } catch (Exception e) {
            logger.error(method, e);
            AlarmUtil.alarm(method + e.getCause());
            reutrn null;
        } finally {
            Profiler.registerInfoEnd(info);
        }
    }

這樣,咱們就打造好了一個公共的redis操做模板,以後就能夠像下面的方式來使用了:

    @Override
    public Long incrby(String key, long val){
        String method = "com.jd.marketing.util.RedisUtil.incrby";
        RedisOperation<Long> process = () -> {
            return redisUtils.incrBy(key, val);
        };
        return CacheHelper.invoke(method, process);
    }

以後的一百多個方法,你也可使用這樣的方式來一一進行包裝,以後你會發現原來RedisUtils封裝完畢,代碼寫了2000行,可是用這種方式以後,代碼只寫了1000行,並且後續有新的聯合操做過來,你只須要在以下代碼段裏面直接把級聯操做添加進去便可:

 RedisOperation<Long> process = () -> {
           //TODO other methods
           //TODO other methods
            return redisUtils.incrBy(key, val);
};

是否是很方便快捷?在這裏我須要因此下的是,因爲RedisOperation裏面的invoke方法是沒有入參,帶有一個出參結果的調用。因此在回調這裏,我用了匿名錶達式來()->{}來match這種操做。可是若是回調這裏,一個入參,一個出參的話,那麼個人匿名錶達式須要這樣寫 param->{}, 多個入參,那就變成了這樣 (param1, param2, param3)->{} 。因爲這裏並不是重點,我不想過多講解,若是對這種使用方式不熟悉,能夠徹底使用以下的方式來進行書寫也行:

 @Override
    public Long incrby(String key, long val){
        String method = "com.jd.marketing.util.RedisUtil.incrby";
        RedisOperation<Long> process = () -> incrByOperation(key, val);
        return CacheHelper.invoke(method, process);
    }
   
   private Long incrByOperation(String key, long val){
       return redisUtils.incrBy(key, val);
   }

其實說到這裏的時候,我就有必要提一下開頭的埋下的線索了。其實以前演示的Netty的代碼:

//Netty中設置監聽
ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener)

GenericFutureListener這個接口

就是按照上面的寫法來作的,是否是豁然開朗呢?至於其調用方式,也和上面講解的一致,只要符合接口裏面方法的調用標準就行(入參和出參符合就行), 好比 future –> {}。

說到這裏,咱們可能認爲這樣太麻煩了,本身定義接口,而後注入到框架中,最後用戶本身實現調用方法,一長串。是的,你說的沒錯,這樣確實太麻煩了,JDK因而專門用了一個 FunctionalInterface的annotation來幫咱們作了,因此在JDK中,若是你看到Consumer,Function,Supplier等,帶有@FunctionalInterface標註的接口,那麼就說明他是一個函數式接口,而這種接口是幹什麼的,具體的原理就是我上面講的。下面咱們來梳理梳理這些接口吧。

先看一下咱們的RedisUtils使用JDK自帶的函數式接口的最終封裝效果:

image

從圖示代碼能夠看出,總體封裝變得簡潔許多,並且咱們用了JDK內置的函數式接口,因此也無需寫其餘多餘的代碼,看上去很清爽,重複代碼基本不見了。並且,因爲JDK提供的其餘的函數式接口有運算操做,好比Predicate.or, Predicate.and操做等,大大增強了封裝的趣味性和樂趣。

下面我將JDK中涉及的經常使用的函數式接口列舉一遍,而後來詳細講解講解吧,列表以下:

Consumer, 提供void accept(T t)回調

Runnable, 提供void run()回調

Callable, 提供V call() throws Exception回調

Supplier, 提供T get()回調

Function, 提供R apply(T t)回調, 有andThen接續操做

Predicate, 提供boolean test(T t)回調, 等價於 Function<T, boolean>

BiConsumer, 提供void accept(T t, U u)回調,注意帶Bi的回調接口,代表入參都是雙參數,好比BiPredicate    
......


其實還有不少,我這裏就不一一列舉了。感興趣的朋友能夠在這裏找到JDK提供的全部函數式接口

接下來,咱們來說解其使用示範,以便於明白怎麼樣去使用它。

對於Consumer函數式接口,內部的void accept(T t)回調方法,代表了它只能回調有一個入參,沒有返參的方法。示例以下:

/**
     * Consumer調用的例子
     */
    public void ConsumerSample() {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put("key", "val");
        linkedHashMap.forEach((k, v) -> {
            System.out.println("key" + k + ",val" + v);
        });
    }

對於Callable接口,其實和Supplier接口是同樣的,只是有無Exception拋出的區別,示例以下:

 /**
     * Callable調用的例子
     */
    public Boolean setnx(String key, String val){
        String method = "com.jd.marketing.util.RedisUtil.setnx";
        Callable<Boolean> process = () -> {
            Long rst = redisUtils.setnx(key, val);
            if(rst == null || rst == 0){
                return false;
            }
            return true;
        };
        return CacheHelper.invoke(method, process);
    }

對於Predicate<T>接口,等價於Function<T, Boolean>, 示例以下:

 /**
     * Precidate調用的例子
     */
    public void PredicateSample() {
        List list = new ArrayList();
        list.add("a")
        list.removeIf(item -> {
            return item.equals("a");
        });
    }

說明一下,Predicate的入參爲一個參數,出參爲boolean,很適合進行條件判斷的場合。在JDK的List數據結構中,因爲removeIf方法沒法耦合進去業務代碼,因此利用Predicate函數式接口將業務邏輯實現部分拋給了用戶自行處理,用戶處理完畢,只要返回給我true,我就刪掉當前的item;返回給我false,我就保留當前的item。解耦作的很是漂亮。那麼List的removeIf實現方式你以爲是怎樣實現的呢?若是我不看JDK代碼的話,我以爲實現方式以下:

 public boolean removeIf(Predicate<T> predicate){
        final Iterator<T> iterator = getIterator();
        while (iterator.hasNext()) {
            T current = iterator.next();
            boolean result = predicate.test(current);
            if(result){
                iterator.remove();
                return true;
            }
        }
        return false;
    }

可是實際你去看看List默認的removeIf實現,源碼大概和我寫的差很少。因此只要理解了函數式接口,咱們也能寫出JDK Style的代碼,酷吧。

CompletableFuture實現異步處理

好了,上面就是函數式接口的總體介紹和使用簡介,不知道你看了以後,理解了多少呢?接下來咱們要講解的異步,徹底基於上面的函數式接口回調,若是以前的都看懂了,下面的講解你將豁然開朗;反之則要悟了。可是正確的方向都已經指出來了,因此入門應該是沒有難度的。

CompletableFuture,很長的一個名字,我對他的印象停留在一次代碼評審會上,當時有人提到了這個類,我只是簡單的記錄下來了,以後去JDK源碼中搜索了一下,看看主要幹什麼的,也沒有怎麼想去看它。結果當我搜到這個類,而後看到Author的時候,我以爲我發現了金礦同樣,因而我決定深刻的研究下去,那個做者的名字就是:

/**
 * A {@link Future} that may be explicitly completed (setting its
 * value and status), and may be used as a {@link CompletionStage},
 * supporting dependent functions and actions that trigger upon its
 * completion.
 *
 * <p>When two or more threads attempt to
 * {@link #complete complete},
 * {@link #completeExceptionally completeExceptionally}, or
 * {@link #cancel cancel}
 * a CompletableFuture, only one of them succeeds.
 *
 * <p>In addition to these and related methods for directly
 * manipulating status and results, CompletableFuture implements
 * interface {@link CompletionStage} with the following policies: <ul>
 *
 * <li>Actions supplied for dependent completions of
 * <em>non-async</em> methods may be performed by the thread that
 * completes the current CompletableFuture, or by any other caller of
 * a completion method.</li>
 *
 * <li>All <em>async</em> methods without an explicit Executor
 * argument are performed using the {@link ForkJoinPool#commonPool()}
 * (unless it does not support a parallelism level of at least two, in
 * which case, a new Thread is created to run each task).  To simplify
 * monitoring, debugging, and tracking, all generated asynchronous
 * tasks are instances of the marker interface {@link
 * AsynchronousCompletionTask}. </li>
 *
 * <li>All CompletionStage methods are implemented independently of
 * other public methods, so the behavior of one method is not impacted
 * by overrides of others in subclasses.  </li> </ul>
 *
 * <p>CompletableFuture also implements {@link Future} with the following
 * policies: <ul>
 *
 * <li>Since (unlike {@link FutureTask}) this class has no direct
 * control over the computation that causes it to be completed,
 * cancellation is treated as just another form of exceptional
 * completion.  Method {@link #cancel cancel} has the same effect as
 * {@code completeExceptionally(new CancellationException())}. Method
 * {@link #isCompletedExceptionally} can be used to determine if a
 * CompletableFuture completed in any exceptional fashion.</li>
 *
 * <li>In case of exceptional completion with a CompletionException,
 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
 * {@link ExecutionException} with the same cause as held in the
 * corresponding CompletionException.  To simplify usage in most
 * contexts, this class also defines methods {@link #join()} and
 * {@link #getNow} that instead throw the CompletionException directly
 * in these cases.</li> </ul>
 *
 * @author Doug Lea
 * @since 1.8
 */

Doug Lea,Java併發編程的大神級人物,整個JDK裏面的併發編程包,幾乎都是他的做品,很務實的一個老爺子,目前在紐約州立大學奧斯威戈分校執教。好比咱們異常熟悉的AtomicInteger類也是其做品:

/**
 * An {@code int} value that may be updated atomically.  See the
 * {@link java.util.concurrent.atomic} package specification for
 * description of the properties of atomic variables. An
 * {@code AtomicInteger} is used in applications such as atomically
 * incremented counters, and cannot be used as a replacement for an
 * {@link java.lang.Integer}. However, this class does extend
 * {@code Number} to allow uniform access by tools and utilities that
 * deal with numerically-based classes.
 *
 * @since 1.5
 * @author Doug Lea
*/
public class AtomicInteger extends Number implements java.io.Serializable {
  // ignore code 
}

想查閱老爺子的最新資料,建議到Wikipedia上查找,裏面有他的博客連接等,我這裏就再也不作過多介紹,回到正題上來,咱們繼續談談CompletableFuture吧。我剛纔貼的關於這個類的描述,都是英文的,並且特別長,咱們不妨貼出中文釋義來,看看具體是個什麼玩意兒:

 繼承自Future,帶有明確的結束標記;同時繼承自CompletionStage,支持多函數調用行爲直至完成態。
 當兩個以上的線程對CompletableFuture進行complete調用,completeExceptionally調用或者cancel調用,只有一個會成功。
 
 爲了直觀的保持相關方法的狀態和結果,CompletableFuture按照以下原則繼承並實現了CompletionStage接口:

 1. 多個同步方法的級聯調用,可能會被當前的CompletableFuture置爲完成態,也可能會被級聯函數中的任何一個方法置爲完成態。

 2. 異步方法的執行,默認使用ForkJoinPool來進行(若是當前的並行標記不支持多併發,那麼將會爲每一個任務開啓一個新的線程來進行)。
    爲了簡化監控,調試,代碼跟蹤等,全部的異步任務必須繼承自AsynchronousCompletionTask。

 3. 全部的CompletionStage方法都是獨立的,overrid子類中的其餘的方法並不會影響當前方法行爲。

 CompletableFuture同時也按照以下原則繼承並實現了Future接口:

 1. 因爲此類沒法控制完成態(一旦完成,直接返回給調用方),因此cancellation被當作是另外一種帶有異常的完成狀態. 在這種狀況下cancel方法和CancellationException是等價的。
    方法isCompletedExceptionally能夠用來監控CompletableFuture在一些異常調用的場景下是否完成。

 2. get方法和get(long, TimeUint)方法將會拋出ExecutionException異常,一旦計算過程當中有CompletionException的話。
    爲了簡化使用,這個類同時也定義了join()方法和getNow()方法來避免CompletionException的拋出(在CompletionException拋出以前就返回告終果)。
 

因爲沒有找到中文文檔,因此這裏自行勉強解釋了一番,有些差強人意。

在咱們平常生活中,咱們的不少行爲其實都是要麼有結果的,要麼無結果的。好比說作蛋糕,作出來的蛋糕就是結果,那麼通常咱們用Callable或者Supplier來表明這個行爲,由於這兩個函數式接口的執行,是須要有返回結果的。再好比說吃蛋糕,吃蛋糕這個行爲,是無結果的。由於他僅僅表明咱們去幹了一件事兒,因此會用Consumer或者Runnable來表明吃飯這個行爲。由於這兩個函數式接口的執行,是不返回結果的。有時候我發現家裏沒有作蛋糕的工具,因而我便去外面的蛋糕店委託蛋糕師傅給我作一個,那麼這種委託行爲,其實就是一種異步行爲,會用Future來描述。由於Future神奇的地方在於,可讓一個同步執行的方法編程異步的,就好似委託蛋糕師傅作蛋糕同樣。這樣咱們就能夠在蛋糕師傅給咱們作蛋糕期間去作一些其餘的事兒,好比聽音樂等等。可是因爲Future不具備事件完成告知的能力,因此得須要本身去一遍一遍的問師傅,作好了沒有。而CompletableFuture則具備這種能力,因此總結起來以下:

  • Callable,有結果的同步行爲,好比作蛋糕
  • Runnable,無結果的同步行爲,好比吃蛋糕
  • Future,異步封裝Callable/Runnable,好比委託給蛋糕師傅(其餘線程)去作
  • CompletableFuture,封裝Future,使其擁有回調功能,好比讓師傅主動告訴我蛋糕作好了

那麼上面描述的場景,咱們用代碼封裝一下吧:

 public static void main(String... args) throws Exception {
        CompletableFuture
                .supplyAsync(() -> makeCake())
                .thenAccept(cake -> eatCake(cake));
        System.out.println("先回家聽音樂,蛋糕作好後給我打電話,我來取...");
        Thread.currentThread().join();
    }

    private static Cake makeCake() {
        System.out.println("我是蛋糕房,開始爲你製做蛋糕...");
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        Cake cake = new Cake();
        cake.setName("草莓蛋糕");
        cake.setShape("圓形");
        cake.setPrice(new BigDecimal(99));
        System.out.println("蛋糕製做完畢,請取回...");
        return cake;
    }

    private static void eatCake(Cake cake) {
        System.out.println("這個蛋糕是" + cake.getName() + ",我喜歡,開吃...");
    }

最後執行結果以下:

我是蛋糕房,開始爲你製做蛋糕...
先回家聽音樂,蛋糕作好後給我打電話,我來取...
蛋糕製做完畢,請取回...
這個蛋糕是草莓蛋糕,我喜歡,開吃...

因爲CompletableFuture的api有50幾個,數量很是多,咱們能夠先將其劃分爲若干大類(摘自理解CompletableFuture,總結的很是好,直接拿來用):

建立類:用於CompletableFuture對象建立,好比:

  • completedFuture
  • runAsync
  • supplyAsync
  • anyOf
  • allOf

狀態取值類:用於判斷當前狀態和同步等待取值,好比:

  • join
  • get
  • getNow
  • isCancelled
  • isCompletedExceptionally
  • isDone

控制類:可用於主動控制CompletableFuture完成行爲,好比:

  • complete
  • completeExceptionally
  • cancel

接續類:CompletableFuture最重要的特性,用於注入回調行爲,好比:

  • thenApply, thenApplyAsync
  • thenAccept, thenAcceptAsync
  • thenRun, thenRunAsync
  • thenCombine, thenCombineAsync
  • thenAcceptBoth, thenAcceptBothAsync
  • runAfterBoth, runAfterBothAsync
  • applyToEither, applyToEitherAsync
  • acceptEither, acceptEitherAsync
  • runAfterEither, runAfterEitherAsync
  • thenCompose, thenComposeAsync
  • whenComplete, whenCompleteAsync
  • handle, handleAsync
  • exceptionally

上面的方法很是多,而大多具備類似性,咱們大可沒必要立刻記憶。先來看看幾個通常性的規律,即可輔助記憶(重要):

  1. 以Async後綴結尾的方法,均是異步方法,對應無Async則是同步方法。
  2. 以Async後綴結尾的方法,必定有兩個重載方法。其一是採用內部forkjoin線程池執行異步,其二是指定一個Executor去運行。
  3. 以run開頭的方法,其方法入參的lambda表達式必定是無參數,而且無返回值的,其實就是指定Runnable
  4. 以supply開頭的方法,其方法入參的lambda表達式必定是無參數,而且有返回值,其實就是指Supplier
  5. 以Accept爲開頭或結尾的方法,其方法入參的lambda表達式必定是有參數,可是無返回值,其實就是指Consumer
  6. 以Apply爲開頭或者結尾的方法,其方法入參的lambda表達式必定是有參數,可是有返回值,其實就是指Function
  7. 帶有either後綴的表示誰先完成則消費誰。

以上6條記住以後,就能夠記住60%以上的API了。

先來看一下其具體的使用方式吧(網上有個外國人寫了CompletableFuture的20個例子,我看有中文版了,放到這裏,你們能夠參考下)。

  /**
     * CompletableFuture調用completedFuture方法,代表執行完畢
     */
    static void sample1() {
        CompletableFuture cf = CompletableFuture.completedFuture("message");
        Assert.assertTrue(cf.isDone());
        Assert.assertEquals("message", cf.getNow(null));
    }

sample1代碼,能夠看出,若是想讓一個ComopletableFuture執行完畢,最簡單的方式就是調用其completedFuture方法便可。以後就能夠用getNow對其結果進行獲取,若是獲取不到就返回默認值null。

 /**
     * 兩個方法串行執行,後一個方法依賴前一個方法的返回
     */
    static void sample2() {
        CompletableFuture cf = CompletableFuture
                .completedFuture("message")
                .thenApply(message -> {
                    Assert.assertFalse(Thread.currentThread().isDaemon());
                    return message.toUpperCase();
                });
        Assert.assertEquals("MESSAGE", cf.getNow(null));
    }

sample2代碼,利用thenApply實現兩個函數串行執行,後一個函數的執行以來前一個函數的返回結果。

/**
     * 兩個方法並行執行,兩個都執行完畢後,在進行彙總
     */
    static void sample3() {

        long start = System.currentTimeMillis();

        CompletableFuture cf = CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        CompletableFuture cf1 = CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        CompletableFuture.allOf(cf, cf1).whenComplete((v,t)->{
            System.out.println("都完成了");
        }).join();
        long end = System.currentTimeMillis();
        System.out.println((end-start));
    }

sample3最後的執行結果爲:

都完成了
2087

能夠看到,耗時爲2087毫秒,若是是串行執行,須要耗時3000毫秒,可是並行執行,則以最長執行時間爲準,其實這個特性在進行遠程RPC/HTTP服務調用的時候,將會很是有用,咱們一下子再進行講解如何用它來反哺業務。

 /**
     * 方法執行取消
     */
    static void sample4(){
        CompletableFuture cf = CompletableFuture.supplyAsync(()->{
            try {
                System.out.println("開始執行函數...");
                Thread.sleep(2000);
                System.out.println("執行函數完畢...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "ok";
        });
        CompletableFuture cf2 = cf.exceptionally(throwable -> {
            return throwable;
        });
        cf2.cancel(true);
        if(cf2.isCompletedExceptionally()){
            System.out.println("成功取消了函數的執行");
        }
        cf2.join();

    }

調用結果以下:

開始執行函數...
成功取消了函數的執行
Exception in thread "main" java.util.concurrent.CancellationException
    at java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2263)
    at com.jd.jsmartredis.article.Article.sample4(Article.java:108)
    at com.jd.jsmartredis.article.Article.main(Article.java:132)

能夠看到咱們成功的將函數執行中斷,同時因爲cf2返回的會一個throwable的Exception,因此咱們的console界面將其也原封不動的打印了出來。

講解了基本使用以後,如何使用其來反哺咱們的業務呢?咱們就以通用下單爲例吧,來看看通用下單有哪些能夠優化的點。

image

上圖就是咱們在通用下單接口常常調用的接口,分爲下單地址接口,商品信息接口,京豆接口,因爲這三個接口沒有依賴關係,因此能夠並行的來執行。若是換作是目前的作法,那麼確定是順序執行,假如三個接口獲取都耗時1s的話,那麼三個接口獲取完畢,咱們的耗時爲3s。可是若是改爲異步方式執行的話,那麼將會簡單不少,接下來,咱們開始改造吧。

public Result submitOrder(String pin, CartVO cartVO) {

        //獲取下單地址
        CompletableFuture addressFuture = CompletableFuture.supplyAsync(() -> {
            AddressResult addressResult = addressRPC.getAddressListByPin(pin);
            return addressResult;
        });

        //獲取商品信息
        CompletableFuture goodsFuture = CompletableFuture.supplyAsync(() -> {
            GoodsResult goodsResult = goodsRPC.getGoodsInfoByPin(pin, cartVO);
            return goodsResult;
        });

        //獲取京豆信息
        CompletableFuture beanFuture = CompletableFuture.supplyAsync(() -> {
            JinbeanResult jinbeanResult = JinbeanRPC.getJinbeanByPin(pin);
            return jinbeanResult;
        });

        CompletableFuture.allOf(addressFuture, goodsFuture, beanFuture).whenComplete((v, throwable) -> {
            if (throwable == null) {
                logger.error("獲取地址,商品,京豆信息失敗", throwable);
                //TODO 嘗試從新獲取
            } else {
                logger.error("獲取地址,商品,京豆信息成功");
            }
        }).join();

        AddressResult addressResult = addressFuture.getNow(null);
        GoodsResult goodsResult = goodsFuture.getNow(null);
        JinbeanResult jinbeanResult = beanFuture.getNow(null);
        
        //TODO 後續處理
    }

這樣,咱們利用將普通的RPC執行編程了異步,並且附帶了強大的錯誤處理,是否是很簡單?

可是若是遇到以下圖示的調用結構,CompletableFuture可否很輕鬆的應對呢?

image

因爲業務變動,須要附帶延保信息,爲了後續從新計算價格,因此必須將延保商品獲取出來,而後計算價格。其實這種既有同步,又有異步的作法,利用CompletableFuture來handle,也是輕鬆天然,代碼以下:

 public Result submitOrder(String pin, CartVO cartVO) {

        //獲取下單地址
        CompletableFuture addressFuture = CompletableFuture.supplyAsync(() -> {
            AddressResult addressResult = addressRPC.getAddressListByPin(pin);
            return addressResult;
        });

        //獲取商品信息
        CompletableFuture goodsFuture = CompletableFuture.supplyAsync(() -> {
            GoodsResult goodsResult = goodsRPC.getGoodsInfoByPin(pin, cartVO);
            return goodsResult;
        }).thenApplyAsync((goodsResult, Map)->{
            YanbaoResult yanbaoResult = yanbaoRPC.getYanbaoInfoByGoodID(goodsResult.getGoodId, pin);
            Map<String, Object> map = new HashMap<>();
            map.put("good", goodsResult);
            map.put("yanbao",yanbaoResult);
            return map;
        });

        //獲取京豆信息
        CompletableFuture beanFuture = CompletableFuture.supplyAsync(() -> {
            JinbeanResult jinbeanResult = JinbeanRPC.getJinbeanByPin(pin);
            return jinbeanResult;
        });

        CompletableFuture.allOf(addressFuture, goodsFuture, beanFuture).whenComplete((v, throwable) -> {
            if (throwable == null) {
                logger.error("獲取地址,商品-延保,京豆信息失敗", throwable);
                //TODO 嘗試從新獲取
            } else {
                logger.error("獲取地址,商品-延保,京豆信息成功");
            }
        }).join();

        AddressResult addressResult = addressFuture.getNow(null);
        GoodsResult goodsResult = goodsFuture.getNow(null);
        JinbeanResult jinbeanResult = beanFuture.getNow(null);

        //TODO 後續處理
    }

這樣咱們就能夠了,固然這種改造給咱們帶來的好處也是顯而易見的,咱們不須要針對全部的接口進行OPS優化,而是針對性能最差的接口進行OPS優化,只要提高了性能最差的接口,那麼總體的性能就上去了。

洋洋灑灑寫了這麼多,但願對你們有用,謝謝。

參考資料:

理解CompletableFuture

CompletableFuture 詳解

JDK中CompletableFuture的源碼

相關文章
相關標籤/搜索