前言html
回調,顧名思義,回過頭來調用,詳細的說來就是用戶無需關心內部實現的具體邏輯,只須要在暴露出的回調函數中放入本身的業務邏輯便可。因爲回調機制解耦了框架代碼和業務代碼,因此能夠看作是對面向對象解耦的具體實踐之一。因爲本文的側重點在於講解後端回調,因此對於前端回調甚至於相似JSONP的回調函數類的,利用本章講解的知識進行代入的時候,請斟酌一二,畢竟後端和前端仍是有必定的區別,所謂差之毫釐,可能謬以千里,慎之。因此本章對回調的講解側重於後端,請知悉。前端
回調定義java
說到回調,其實個人理解相似於函數指針的功能,怎麼講呢?由於一個方法,一旦附加了回調入參,那麼用戶在進行調用的時候,這個回調入參是能夠用匿名方法直接替代的。回調的使用必須和方法的簽名保持一致性,下面咱們來看一個JDK實現的例子:git
default boolean removeIf(Predicate<? super E> filter) { Objects.requireNonNull(filter); boolean removed = false; final Iterator<E> each = iterator(); while (each.hasNext()) { if (filter.test(each.next())) { each.remove(); removed = true; } } return removed; }
在JDK中,List結構有一個removeIf的方法,其實現方式如上所示。因爲附帶了具體的註釋講解,我這裏就再也不進行過多的講述。咱們須要着重關注的是其入參:Predicate,由於他就是一個函數式接口,入參爲泛型E,出參爲boolean,其實和Function<? super E, boolean>是等價的。因爲List是一個公共的框架代碼,裏面不可能糅合業務代碼,因此爲了解耦框架代碼和業務代碼,JDK使用了內置的各類函數式接口做爲方法的回調,將具體的業務實踐拋出去,讓用戶本身實現,而它本身只接受用戶返回的結果就好了:只要用戶處理返回true(filter.test(each.next()返回true),那麼我就刪掉當前遍歷的數據;若是用戶處理返回false(filter.test(each.next()返回false),那麼我就保留當前遍歷的數據。是否是很是的nice?github
其實這種完美的協做關係,在JDK類庫中隨處可見,在其餘常常用到的框架中也很常見,諸如Guava,Netty,實在是太多了(這也從側面說明,利用函數式接口解耦框架和業務,是正確的作法),我擷取了部分片斷以下:redis
//將map中的全部entry進行替換 void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) //將map中的entry進行遍歷 void forEach(BiConsumer<? super K, ? super V> action) //map中的entry值若是有,則用新值從新創建映射關係 V computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) //Deque遍歷元素 void forEach(Consumer<? super T> action) //Deque按給定條件移除元素 boolean removeIf(Predicate<? super E> filter) //Guava中獲取特定元素 <T> T get(Object key, final Callable<T> valueLoader) //Netty中設置監聽 ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener)
那麼,回過頭來想一想,若是咱們封裝本身的組件,想要封裝的很JDK Style,該怎麼作呢?若是上來直接理解Predicate,Function,Callable,Consumer,我想不少人是有困難的,那就聽我慢慢道來吧。編程
咱們先假設以下一段代碼,這段代碼我相信不少人會很熟悉,不少人也封裝過,這就是咱們的大名鼎鼎的RedisUtils封裝類:後端
/** * key遞增特定的num * @param key Redis中保存的key * @param num 遞增的值 * @return key計算完畢後返回的結果值 */ public Long incrBy(String key,long num) { CallerInfo callerInfo = Profiler.registerInfo("sendCouponService.redis.incrBy", "sendCouponService", false, true); Long result; try { result = jrc.incrBy(key, num); } catch (Exception e) { logger.error("incrBy", null, "sendCouponService.redis.incrBy異常,key={},value={}", e, key, num); Profiler.functionError(callerInfo); throw new RuntimeException("sendCouponService.redis.incrBy異常,key=" + key, e); } finally { Profiler.registerInfoEnd(callerInfo); } return result; }
上面這段代碼只是一個示例,其實還有上百個方法基本上都是這種封裝結構,這種封裝有問題嗎?沒問題!並且封裝方式辛辣老道,一看就是高手所爲,由於既加了監控,又作了異常處理,並且還有錯誤日誌記錄,一旦發生問題,咱們可以第一時間知道哪裏出了問題,哪一個方法出了問題,而後設定對應的應對方法。api
這種封裝方式,若是當作普通的Util來用,徹底沒有問題,可是若是想封裝成組件,則欠缺點什麼,我列舉以下:數據結構
1. 當前代碼寫死了用jrc操做,若是後期切換到jimdb,是否是還得爲jimdb專門寫一套呢?
2. 當前代碼,上百個方法,其實不少地方都是重複的,惟有redis操做那塊不一樣,代碼重複度特別高,一旦擴展新方法,基本上是剖解原有代碼,而後拷貝現有方法,最後改爲新方法。
3. 當前方法,包含的都是redis單操做,若是遇到那種涉及到多個操做組合的(好比先set,而後expire或者更復雜一點),須要添加新方法,本質上這種新方法其實和業務性有關了。
從上面列出的這幾點來看,其實咱們能夠徹底將其打形成一個兼容jrc操做和cluster操做,同時具備良好框架擴展性(策略模式+模板模式)和良好代碼重複度控制(函數式接口回調)的框架。因爲本章涉及內容爲異步回調,因此這裏咱們將講解這種代碼如何保持良好的代碼重複度控制上。至於良好的框架擴展性,若是感興趣的話,我會在後面的章節進行講解。那麼咱們開始進行優化吧。
首先,找出公共操做部分(白色)和非公共操做部分(黃色):
/** * key遞增特定的num * @param key Redis中保存的key * @param num 遞增的值 * @return key計算完畢後返回的結果值 */ public Long incrBy(String key,long num) { CallerInfo callerInfo = Profiler.registerInfo("sendCouponService.redis.incrBy", "sendCouponService", false, true); Long result;
try { result = jrc.incrBy(key, num);
} catch (Exception e) { logger.error("incrBy", null, "sendCouponService.redis.incrBy異常,key={},value={}", e, key, num);
Profiler.functionError(callerInfo);
return null;
} finally { Profiler.registerInfoEnd(callerInfo); } return result; }
經過上面的標記,咱們發現非公共操做部分,有兩類:
1. ump提示語和日誌提示語不一致
2. 操做方法不一致
標記出來了公共操做部分,以後咱們開始封裝公共部分:
/** * 公共模板抽取 * * @param method * @param callable * @param <T> * @return */ public static <T> T invoke(String method) { CallerInfo info = Profiler.registerInfo(method, false, true); try { //TODO 這裏放置不一樣的redis操做方法 } catch (Exception e) { logger.error(method, e); AlarmUtil.alarm(method + e.getCause()); reutrn null; } finally { Profiler.registerInfoEnd(info); } }
可是這裏有個問題,咱們雖然把公共模板抽取出來了,可是TODO標籤裏面的內容怎麼辦呢? 如何把不一樣的redis操做方法傳遞進來呢?
其實在java中,咱們能夠利用接口的方式,將具體的操做代理出去,由外部調用者來實現,聽起來是否是感受又和IOC搭上了點關係,不錯,你想的沒錯,這確實是控制反轉依賴注入的一種作法,經過接口方式將具體的實踐代理出去,這也是進行回調操做的原理。接下來看咱們的改造:
/** * redis操做接口 */ public interface RedisOperation<T>{ //調用redis方法,入參爲空,出參爲T泛型 T invoke(); } /** * redis操做公共模板 * @param method * @param redisOperation * @param <T> * @return */ public static <T> T invoke(String method,RedisOperation redisOperation) { CallerInfo info = Profiler.registerInfo(method, false, true); try { return redisOperation.invoke(); } catch (Exception e) { logger.error(method, e); AlarmUtil.alarm(method + e.getCause()); reutrn null; } finally { Profiler.registerInfoEnd(info); } }
這樣,咱們就打造好了一個公共的redis操做模板,以後就能夠像下面的方式來使用了:
@Override public Long incrby(String key, long val){ String method = "com.jd.marketing.util.RedisUtil.incrby"; RedisOperation<Long> process = () -> { return redisUtils.incrBy(key, val); }; return CacheHelper.invoke(method, process); }
以後的一百多個方法,你也可使用這樣的方式來一一進行包裝,以後你會發現原來RedisUtils封裝完畢,代碼寫了2000行,可是用這種方式以後,代碼只寫了1000行,並且後續有新的聯合操做過來,你只須要在以下代碼段裏面直接把級聯操做添加進去便可:
RedisOperation<Long> process = () -> { //TODO other methods //TODO other methods return redisUtils.incrBy(key, val); };
是否是很方便快捷?在這裏我須要因此下的是,因爲RedisOperation裏面的invoke方法是沒有入參,帶有一個出參結果的調用。因此在回調這裏,我用了匿名錶達式來()->{}來match這種操做。可是若是回調這裏,一個入參,一個出參的話,那麼個人匿名錶達式須要這樣寫 param->{}, 多個入參,那就變成了這樣 (param1, param2, param3)->{} 。因爲這裏並不是重點,我不想過多講解,若是對這種使用方式不熟悉,能夠徹底使用以下的方式來進行書寫也行:
@Override public Long incrby(String key, long val){ String method = "com.jd.marketing.util.RedisUtil.incrby"; RedisOperation<Long> process = () -> incrByOperation(key, val); return CacheHelper.invoke(method, process); } private Long incrByOperation(String key, long val){ return redisUtils.incrBy(key, val); }
其實說到這裏的時候,我就有必要提一下開頭的埋下的線索了。其實以前演示的Netty的代碼:
//Netty中設置監聽
ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener)
GenericFutureListener這個接口
就是按照上面的寫法來作的,是否是豁然開朗呢?至於其調用方式,也和上面講解的一致,只要符合接口裏面方法的調用標準就行(入參和出參符合就行), 好比 future –> {}。
說到這裏,咱們可能認爲這樣太麻煩了,本身定義接口,而後注入到框架中,最後用戶本身實現調用方法,一長串。是的,你說的沒錯,這樣確實太麻煩了,JDK因而專門用了一個 FunctionalInterface的annotation來幫咱們作了,因此在JDK中,若是你看到Consumer,Function,Supplier等,帶有@FunctionalInterface標註的接口,那麼就說明他是一個函數式接口,而這種接口是幹什麼的,具體的原理就是我上面講的。下面咱們來梳理梳理這些接口吧。
先看一下咱們的RedisUtils使用JDK自帶的函數式接口的最終封裝效果:
從圖示代碼能夠看出,總體封裝變得簡潔許多,並且咱們用了JDK內置的函數式接口,因此也無需寫其餘多餘的代碼,看上去很清爽,重複代碼基本不見了。並且,因爲JDK提供的其餘的函數式接口有運算操做,好比Predicate.or, Predicate.and操做等,大大增強了封裝的趣味性和樂趣。
下面我將JDK中涉及的經常使用的函數式接口列舉一遍,而後來詳細講解講解吧,列表以下:
Consumer, 提供void accept(T t)回調 Runnable, 提供void run()回調 Callable, 提供V call() throws Exception回調 Supplier, 提供T get()回調 Function, 提供R apply(T t)回調, 有andThen接續操做 Predicate, 提供boolean test(T t)回調, 等價於 Function<T, boolean> BiConsumer, 提供void accept(T t, U u)回調,注意帶Bi的回調接口,代表入參都是雙參數,好比BiPredicate
......
其實還有不少,我這裏就不一一列舉了。感興趣的朋友能夠在這裏找到JDK提供的全部函數式接口。
接下來,咱們來說解其使用示範,以便於明白怎麼樣去使用它。
對於Consumer函數式接口,內部的void accept(T t)回調方法,代表了它只能回調有一個入參,沒有返參的方法。示例以下:
/** * Consumer調用的例子 */ public void ConsumerSample() { LinkedHashMap linkedHashMap = new LinkedHashMap(); linkedHashMap.put("key", "val"); linkedHashMap.forEach((k, v) -> { System.out.println("key" + k + ",val" + v); }); }
對於Callable接口,其實和Supplier接口是同樣的,只是有無Exception拋出的區別,示例以下:
/** * Callable調用的例子 */ public Boolean setnx(String key, String val){ String method = "com.jd.marketing.util.RedisUtil.setnx"; Callable<Boolean> process = () -> { Long rst = redisUtils.setnx(key, val); if(rst == null || rst == 0){ return false; } return true; }; return CacheHelper.invoke(method, process); }
對於Predicate<T>接口,等價於Function<T, Boolean>, 示例以下:
/** * Precidate調用的例子 */ public void PredicateSample() { List list = new ArrayList(); list.add("a") list.removeIf(item -> { return item.equals("a"); }); }
說明一下,Predicate的入參爲一個參數,出參爲boolean,很適合進行條件判斷的場合。在JDK的List數據結構中,因爲removeIf方法沒法耦合進去業務代碼,因此利用Predicate函數式接口將業務邏輯實現部分拋給了用戶自行處理,用戶處理完畢,只要返回給我true,我就刪掉當前的item;返回給我false,我就保留當前的item。解耦作的很是漂亮。那麼List的removeIf實現方式你以爲是怎樣實現的呢?若是我不看JDK代碼的話,我以爲實現方式以下:
public boolean removeIf(Predicate<T> predicate){ final Iterator<T> iterator = getIterator(); while (iterator.hasNext()) { T current = iterator.next(); boolean result = predicate.test(current); if(result){ iterator.remove(); return true; } } return false; }
可是實際你去看看List默認的removeIf實現,源碼大概和我寫的差很少。因此只要理解了函數式接口,咱們也能寫出JDK Style的代碼,酷吧。
CompletableFuture實現異步處理
好了,上面就是函數式接口的總體介紹和使用簡介,不知道你看了以後,理解了多少呢?接下來咱們要講解的異步,徹底基於上面的函數式接口回調,若是以前的都看懂了,下面的講解你將豁然開朗;反之則要悟了。可是正確的方向都已經指出來了,因此入門應該是沒有難度的。
CompletableFuture,很長的一個名字,我對他的印象停留在一次代碼評審會上,當時有人提到了這個類,我只是簡單的記錄下來了,以後去JDK源碼中搜索了一下,看看主要幹什麼的,也沒有怎麼想去看它。結果當我搜到這個類,而後看到Author的時候,我以爲我發現了金礦同樣,因而我決定深刻的研究下去,那個做者的名字就是:
/** * A {@link Future} that may be explicitly completed (setting its * value and status), and may be used as a {@link CompletionStage}, * supporting dependent functions and actions that trigger upon its * completion. * * <p>When two or more threads attempt to * {@link #complete complete}, * {@link #completeExceptionally completeExceptionally}, or * {@link #cancel cancel} * a CompletableFuture, only one of them succeeds. * * <p>In addition to these and related methods for directly * manipulating status and results, CompletableFuture implements * interface {@link CompletionStage} with the following policies: <ul> * * <li>Actions supplied for dependent completions of * <em>non-async</em> methods may be performed by the thread that * completes the current CompletableFuture, or by any other caller of * a completion method.</li> * * <li>All <em>async</em> methods without an explicit Executor * argument are performed using the {@link ForkJoinPool#commonPool()} * (unless it does not support a parallelism level of at least two, in * which case, a new Thread is created to run each task). To simplify * monitoring, debugging, and tracking, all generated asynchronous * tasks are instances of the marker interface {@link * AsynchronousCompletionTask}. </li> * * <li>All CompletionStage methods are implemented independently of * other public methods, so the behavior of one method is not impacted * by overrides of others in subclasses. </li> </ul> * * <p>CompletableFuture also implements {@link Future} with the following * policies: <ul> * * <li>Since (unlike {@link FutureTask}) this class has no direct * control over the computation that causes it to be completed, * cancellation is treated as just another form of exceptional * completion. Method {@link #cancel cancel} has the same effect as * {@code completeExceptionally(new CancellationException())}. Method * {@link #isCompletedExceptionally} can be used to determine if a * CompletableFuture completed in any exceptional fashion.</li> * * <li>In case of exceptional completion with a CompletionException, * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an * {@link ExecutionException} with the same cause as held in the * corresponding CompletionException. To simplify usage in most * contexts, this class also defines methods {@link #join()} and * {@link #getNow} that instead throw the CompletionException directly * in these cases.</li> </ul> * * @author Doug Lea * @since 1.8 */
Doug Lea,Java併發編程的大神級人物,整個JDK裏面的併發編程包,幾乎都是他的做品,很務實的一個老爺子,目前在紐約州立大學奧斯威戈分校執教。好比咱們異常熟悉的AtomicInteger類也是其做品:
/** * An {@code int} value that may be updated atomically. See the * {@link java.util.concurrent.atomic} package specification for * description of the properties of atomic variables. An * {@code AtomicInteger} is used in applications such as atomically * incremented counters, and cannot be used as a replacement for an * {@link java.lang.Integer}. However, this class does extend * {@code Number} to allow uniform access by tools and utilities that * deal with numerically-based classes. * * @since 1.5 * @author Doug Lea */ public class AtomicInteger extends Number implements java.io.Serializable { // ignore code }
想查閱老爺子的最新資料,建議到Wikipedia上查找,裏面有他的博客連接等,我這裏就再也不作過多介紹,回到正題上來,咱們繼續談談CompletableFuture吧。我剛纔貼的關於這個類的描述,都是英文的,並且特別長,咱們不妨貼出中文釋義來,看看具體是個什麼玩意兒:
繼承自Future,帶有明確的結束標記;同時繼承自CompletionStage,支持多函數調用行爲直至完成態。
當兩個以上的線程對CompletableFuture進行complete調用,completeExceptionally調用或者cancel調用,只有一個會成功。
爲了直觀的保持相關方法的狀態和結果,CompletableFuture按照以下原則繼承並實現了CompletionStage接口:
1. 多個同步方法的級聯調用,可能會被當前的CompletableFuture置爲完成態,也可能會被級聯函數中的任何一個方法置爲完成態。
2. 異步方法的執行,默認使用ForkJoinPool來進行(若是當前的並行標記不支持多併發,那麼將會爲每一個任務開啓一個新的線程來進行)。
爲了簡化監控,調試,代碼跟蹤等,全部的異步任務必須繼承自AsynchronousCompletionTask。
3. 全部的CompletionStage方法都是獨立的,overrid子類中的其餘的方法並不會影響當前方法行爲。
CompletableFuture同時也按照以下原則繼承並實現了Future接口:
1. 因爲此類沒法控制完成態(一旦完成,直接返回給調用方),因此cancellation被當作是另外一種帶有異常的完成狀態. 在這種狀況下cancel方法和CancellationException是等價的。
方法isCompletedExceptionally能夠用來監控CompletableFuture在一些異常調用的場景下是否完成。
2. get方法和get(long, TimeUint)方法將會拋出ExecutionException異常,一旦計算過程當中有CompletionException的話。
爲了簡化使用,這個類同時也定義了join()方法和getNow()方法來避免CompletionException的拋出(在CompletionException拋出以前就返回告終果)。
因爲沒有找到中文文檔,因此這裏自行勉強解釋了一番,有些差強人意。
在咱們平常生活中,咱們的不少行爲其實都是要麼有結果的,要麼無結果的。好比說作蛋糕,作出來的蛋糕就是結果,那麼通常咱們用Callable或者Supplier來表明這個行爲,由於這兩個函數式接口的執行,是須要有返回結果的。再好比說吃蛋糕,吃蛋糕這個行爲,是無結果的。由於他僅僅表明咱們去幹了一件事兒,因此會用Consumer或者Runnable來表明吃飯這個行爲。由於這兩個函數式接口的執行,是不返回結果的。有時候我發現家裏沒有作蛋糕的工具,因而我便去外面的蛋糕店委託蛋糕師傅給我作一個,那麼這種委託行爲,其實就是一種異步行爲,會用Future來描述。由於Future神奇的地方在於,可讓一個同步執行的方法編程異步的,就好似委託蛋糕師傅作蛋糕同樣。這樣咱們就能夠在蛋糕師傅給咱們作蛋糕期間去作一些其餘的事兒,好比聽音樂等等。可是因爲Future不具備事件完成告知的能力,因此得須要本身去一遍一遍的問師傅,作好了沒有。而CompletableFuture則具備這種能力,因此總結起來以下:
那麼上面描述的場景,咱們用代碼封裝一下吧:
public static void main(String... args) throws Exception { CompletableFuture .supplyAsync(() -> makeCake()) .thenAccept(cake -> eatCake(cake)); System.out.println("先回家聽音樂,蛋糕作好後給我打電話,我來取..."); Thread.currentThread().join(); } private static Cake makeCake() { System.out.println("我是蛋糕房,開始爲你製做蛋糕..."); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } Cake cake = new Cake(); cake.setName("草莓蛋糕"); cake.setShape("圓形"); cake.setPrice(new BigDecimal(99)); System.out.println("蛋糕製做完畢,請取回..."); return cake; } private static void eatCake(Cake cake) { System.out.println("這個蛋糕是" + cake.getName() + ",我喜歡,開吃..."); }
最後執行結果以下:
我是蛋糕房,開始爲你製做蛋糕... 先回家聽音樂,蛋糕作好後給我打電話,我來取... 蛋糕製做完畢,請取回... 這個蛋糕是草莓蛋糕,我喜歡,開吃...
因爲CompletableFuture的api有50幾個,數量很是多,咱們能夠先將其劃分爲若干大類(摘自理解CompletableFuture,總結的很是好,直接拿來用):
建立類:用於CompletableFuture對象建立,好比:
狀態取值類:用於判斷當前狀態和同步等待取值,好比:
控制類:可用於主動控制CompletableFuture完成行爲,好比:
接續類:CompletableFuture最重要的特性,用於注入回調行爲,好比:
上面的方法很是多,而大多具備類似性,咱們大可沒必要立刻記憶。先來看看幾個通常性的規律,即可輔助記憶(重要):
無參數
,而且無返回值
的,其實就是指定Runnable無參數
,而且有返回值
,其實就是指Supplier有參數
,可是無返回值
,其實就是指Consumer有參數
,可是有返回值
,其實就是指Function以上6條記住以後,就能夠記住60%以上的API了。
先來看一下其具體的使用方式吧(網上有個外國人寫了CompletableFuture的20個例子,我看有中文版了,放到這裏,你們能夠參考下)。
/** * CompletableFuture調用completedFuture方法,代表執行完畢 */ static void sample1() { CompletableFuture cf = CompletableFuture.completedFuture("message"); Assert.assertTrue(cf.isDone()); Assert.assertEquals("message", cf.getNow(null)); }
sample1代碼,能夠看出,若是想讓一個ComopletableFuture執行完畢,最簡單的方式就是調用其completedFuture方法便可。以後就能夠用getNow對其結果進行獲取,若是獲取不到就返回默認值null。
/** * 兩個方法串行執行,後一個方法依賴前一個方法的返回 */ static void sample2() { CompletableFuture cf = CompletableFuture .completedFuture("message") .thenApply(message -> { Assert.assertFalse(Thread.currentThread().isDaemon()); return message.toUpperCase(); }); Assert.assertEquals("MESSAGE", cf.getNow(null)); }
sample2代碼,利用thenApply實現兩個函數串行執行,後一個函數的執行以來前一個函數的返回結果。
/** * 兩個方法並行執行,兩個都執行完畢後,在進行彙總 */ static void sample3() { long start = System.currentTimeMillis(); CompletableFuture cf = CompletableFuture.runAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }); CompletableFuture cf1 = CompletableFuture.runAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } }); CompletableFuture.allOf(cf, cf1).whenComplete((v,t)->{ System.out.println("都完成了"); }).join(); long end = System.currentTimeMillis(); System.out.println((end-start)); }
sample3最後的執行結果爲:
都完成了
2087
能夠看到,耗時爲2087毫秒,若是是串行執行,須要耗時3000毫秒,可是並行執行,則以最長執行時間爲準,其實這個特性在進行遠程RPC/HTTP服務調用的時候,將會很是有用,咱們一下子再進行講解如何用它來反哺業務。
/** * 方法執行取消 */ static void sample4(){ CompletableFuture cf = CompletableFuture.supplyAsync(()->{ try { System.out.println("開始執行函數..."); Thread.sleep(2000); System.out.println("執行函數完畢..."); } catch (InterruptedException e) { e.printStackTrace(); } return "ok"; }); CompletableFuture cf2 = cf.exceptionally(throwable -> { return throwable; }); cf2.cancel(true); if(cf2.isCompletedExceptionally()){ System.out.println("成功取消了函數的執行"); } cf2.join(); }
調用結果以下:
開始執行函數... 成功取消了函數的執行 Exception in thread "main" java.util.concurrent.CancellationException at java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2263) at com.jd.jsmartredis.article.Article.sample4(Article.java:108) at com.jd.jsmartredis.article.Article.main(Article.java:132)
能夠看到咱們成功的將函數執行中斷,同時因爲cf2返回的會一個throwable的Exception,因此咱們的console界面將其也原封不動的打印了出來。
講解了基本使用以後,如何使用其來反哺咱們的業務呢?咱們就以通用下單爲例吧,來看看通用下單有哪些能夠優化的點。
上圖就是咱們在通用下單接口常常調用的接口,分爲下單地址接口,商品信息接口,京豆接口,因爲這三個接口沒有依賴關係,因此能夠並行的來執行。若是換作是目前的作法,那麼確定是順序執行,假如三個接口獲取都耗時1s的話,那麼三個接口獲取完畢,咱們的耗時爲3s。可是若是改爲異步方式執行的話,那麼將會簡單不少,接下來,咱們開始改造吧。
public Result submitOrder(String pin, CartVO cartVO) { //獲取下單地址 CompletableFuture addressFuture = CompletableFuture.supplyAsync(() -> { AddressResult addressResult = addressRPC.getAddressListByPin(pin); return addressResult; }); //獲取商品信息 CompletableFuture goodsFuture = CompletableFuture.supplyAsync(() -> { GoodsResult goodsResult = goodsRPC.getGoodsInfoByPin(pin, cartVO); return goodsResult; }); //獲取京豆信息 CompletableFuture beanFuture = CompletableFuture.supplyAsync(() -> { JinbeanResult jinbeanResult = JinbeanRPC.getJinbeanByPin(pin); return jinbeanResult; }); CompletableFuture.allOf(addressFuture, goodsFuture, beanFuture).whenComplete((v, throwable) -> { if (throwable == null) { logger.error("獲取地址,商品,京豆信息失敗", throwable); //TODO 嘗試從新獲取 } else { logger.error("獲取地址,商品,京豆信息成功"); } }).join(); AddressResult addressResult = addressFuture.getNow(null); GoodsResult goodsResult = goodsFuture.getNow(null); JinbeanResult jinbeanResult = beanFuture.getNow(null); //TODO 後續處理 }
這樣,咱們利用將普通的RPC執行編程了異步,並且附帶了強大的錯誤處理,是否是很簡單?
可是若是遇到以下圖示的調用結構,CompletableFuture可否很輕鬆的應對呢?
因爲業務變動,須要附帶延保信息,爲了後續從新計算價格,因此必須將延保商品獲取出來,而後計算價格。其實這種既有同步,又有異步的作法,利用CompletableFuture來handle,也是輕鬆天然,代碼以下:
public Result submitOrder(String pin, CartVO cartVO) { //獲取下單地址 CompletableFuture addressFuture = CompletableFuture.supplyAsync(() -> { AddressResult addressResult = addressRPC.getAddressListByPin(pin); return addressResult; }); //獲取商品信息 CompletableFuture goodsFuture = CompletableFuture.supplyAsync(() -> { GoodsResult goodsResult = goodsRPC.getGoodsInfoByPin(pin, cartVO); return goodsResult; }).thenApplyAsync((goodsResult, Map)->{ YanbaoResult yanbaoResult = yanbaoRPC.getYanbaoInfoByGoodID(goodsResult.getGoodId, pin); Map<String, Object> map = new HashMap<>(); map.put("good", goodsResult); map.put("yanbao",yanbaoResult); return map; }); //獲取京豆信息 CompletableFuture beanFuture = CompletableFuture.supplyAsync(() -> { JinbeanResult jinbeanResult = JinbeanRPC.getJinbeanByPin(pin); return jinbeanResult; }); CompletableFuture.allOf(addressFuture, goodsFuture, beanFuture).whenComplete((v, throwable) -> { if (throwable == null) { logger.error("獲取地址,商品-延保,京豆信息失敗", throwable); //TODO 嘗試從新獲取 } else { logger.error("獲取地址,商品-延保,京豆信息成功"); } }).join(); AddressResult addressResult = addressFuture.getNow(null); GoodsResult goodsResult = goodsFuture.getNow(null); JinbeanResult jinbeanResult = beanFuture.getNow(null); //TODO 後續處理 }
這樣咱們就能夠了,固然這種改造給咱們帶來的好處也是顯而易見的,咱們不須要針對全部的接口進行OPS優化,而是針對性能最差的接口進行OPS優化,只要提高了性能最差的接口,那麼總體的性能就上去了。
洋洋灑灑寫了這麼多,但願對你們有用,謝謝。
參考資料: