《Java 8 in Action》Chapter 11:CompletableFuture:組合式異步編程

某個網站的數據來自Facebook、Twitter和Google,這就須要網站與互聯網上的多個Web服務通訊。但是,你並不但願由於等待某些服務的響應,阻塞應用程序的運行,浪費數十億寶貴的CPU時鐘週期。好比,不要由於等待Facebook的數據,暫停對來自Twitter的數據處理。 java

第7章中介紹的分支/合併框架以及並行流是實現並行處理的寶貴工具;它們將一個操做切分爲多個子操做,在多個不一樣的核、CPU甚至是機器上並行地執行這些子操做。與此相反,若是你的意圖是實現併發,而非並行,或者你的主要目標是在同一個CPU上執行幾個鬆耦合的任務,充分利用CPU的核,讓其足夠忙碌,從而最大化程序的吞吐量,那麼你其實真正想作的是避免由於等待遠程服務的返回,或者對數據庫的查詢,而阻塞線程的執行,浪費寶貴的計算資源,由於這種等待的時間極可能至關長。 git

1. Future接口

Future接口在Java 5中被引入,設計初衷是對未來某個時刻會發生的結果進行建模。它建模了一種異步計算,返回一個執行運算結果的引用,當運算結束後,這個引用被返回給調用方。在Future中觸發那些潛在耗時的操做把調用線程解放出來,讓它能繼續執行其餘有價值的工做,再也不須要等待耗時的操做完成。Future的另外一個優勢是它比更底層的Thread更易用。要使用Future,一般你只須要將耗時的操做封裝在一個Callable對象中,再將它提交給ExecutorService。使用Future以異步的方式執行一個耗時的操做: github

線程能夠在ExecutorService以併發方式調用另外一個線程執行耗時操做的同時,去執行一些其餘的任務。接着,若是你已經運行到沒有異步操做的結果就沒法繼續任何有意義的工做時,能夠調用它的get方法去獲取操做的結果。若是操做已經完成,該方法會馬上返回操做的結果,不然它會阻塞你的線程,直到操做完成,返回相應的結果。若是該長時間運行的操做永遠不返回了會怎樣?Future提供了一個無需任何參數的get方法,推薦使用重載版本的get方法,它接受一個超時的參數,能夠定義線程等待Future結果的最長時間,避免無休止的等待。下圖是Future異步執行線程原理圖。 數據庫

2. 使用CompletableFuture構建異步應用

Future接口有必定的侷限性,好比,咱們很難表述Future結果之間的依賴性。所以咱們引入了CompletableFuture。接下來經過一個「最佳價格查詢器「的應用,它會查詢多個在線商店,依據給定的產品或服務找出最低的價格,來展示CompletableFuture實現異步應用。經過此例你能學到這些:網絡

  • 如何編寫異步API
  • 如何讓使用同步API的代碼變爲非阻塞代碼
  • 如何使用流水線將兩個接續的異步操做合併爲一個異步計算操做
  • 如何以響應式的方式處理異步操做的完成事件

同步API和異步API:併發

  • 同步API其實只是對傳統方法調用的另外一種稱呼:你調用了某個方法,調用方在被調用方運行的過程當中會等待,被調用方運行結束返回,調用方取得被調用方的返回值並繼續運行。即便調用方和被調用方在不一樣的線程中運行,調用方仍是須要等待被調用方結束運行,這就是阻塞式調用這個名詞的由來。
  • 異步API會直接返回,或者至少在被調用方計算完成以前,將它剩餘的計算任務交給另外一個線程去作,該線程和調用方是異步的——這就是非阻塞式調用的由來。執行剩餘計算任務的線程會將它的計算結果返回給調用方。返回的方式要麼是經過回調函數,要麼是由調用方再次執行一個「等待,直到計算完成」的方法調用。

2.1 實戰:實現異步API

2.1.1 同步方法

同步操做中會爲等待同步事件完成而等待1s,這種是沒法接受的,對於程序體驗來講是很是很差的。框架

2.1.2 將同步方法轉換爲異步方法

Java 5引入了java.util.concurrent.Future接口表示一個異步計算(即調用線程能夠繼續運行,不會由於調用方法而阻塞)的結果。這意味着Future是一個暫時還不可知值的處理器,這個值在計算完成後,能夠經過調用它的get方法取得。這種方式下,在進行價格查詢的同時,還能執行一些其餘的任務,好比查詢其餘商店中商品的價格,不會阻塞在那裏等待第一家商店返回請求的結果。最後,若是全部有意義的工做都已經完成,全部要執行的工做都依賴於商品價格時,再調用Future的get方法。執行了這個操做後,要麼得到Future中封裝的值(若是異步任務已經完成),要麼發生阻塞,直到該異步任務完成,指望的值可以訪問。同時,若是某個商品價格計算髮生異常,會將當前線程殺死,從而致使等待get方法返回結果的客戶端永久地被阻塞。客戶端可使用重載版本的get方法,設置超時參數來避免。爲了讓客戶端能瞭解沒法提供請求商品價格的緣由,你須要使用CompletableFuture的completeExceptionally方法將致使CompletableFuture內發生問題的異常拋出。異步

2.1.3 使用工廠方法supplyAsync建立CompletableFuture對象

supplyAsync方法接受一個生產者(Supplier)做爲參數,返回一個CompletableFuture對象,該對象完成異步執行後會讀取調用生產者方法的返回值。生產者方法會交由ForkJoinPool池中的某個執行線程(Executor)運行,可是你也可使用supplyAsync方法的重載版本,傳遞第二個參數指定不一樣的執行線程執行生產者方法。函數

3. 消除代碼阻塞問題

3.1 順序同步請求

3.2 使用並行流對請求進行並行操做

3.3 使用CompletableFuture發起異步請求

CompletableFuture版本的程序彷佛比並行流版本的程序還快那麼一點兒。可是最後這個版本也不太使人滿意。它們看起來不相伯仲,究其緣由都同樣:它們內部採用的是一樣的通用線程池,默認都使用固定數目的線程,具體線程數取決於Runtime.getRuntime().availableProcessors()的返回值。然而,CompletableFuture具備必定的優點,由於它容許你對執行器(Executor)進行配置,尤爲是線程池的大小,讓它以更適合應用需求的方式進行配置,知足程序的要求,而這是並行流API沒法提供的。 順序執行和並行執行的原理對比: 工具

圖11-4的上半部分展現了使用單一流水線處理流的過程,咱們看到,執行的流程(以虛線標識)是順序的。事實上,新的CompletableFuture對象只有在前一個操做徹底結束以後,才能建立。與此相反,圖的下半部分展現瞭如何先將CompletableFutures對象彙集到一個列表中(即圖中以橢圓表示的部分),讓對象們能夠在等待其餘對象完成操做以前就能啓動。

3.4 使用CompletableFuture發起異步請求WithExecutor

3.5 調用結果:

3.6 並行——使用流仍是CompletableFutures

目前爲止,你已經知道對集合進行並行計算有兩種方式:要麼將其轉化爲並行流,利用map這樣的操做開展工做,要麼枚舉出集合中的每個元素,建立新的線程,在CompletableFuture內對其進行操做。後者提供了更多的靈活性,你能夠調整線程池的大小,而這能幫助你確保總體的計算不會由於線程都在等待I/O而發生阻塞。

  • 若是你進行的是計算密集型的操做,而且沒有I/O,那麼推薦使用Stream接口,由於實現簡單,同時效率也多是最高的(若是全部的線程都是計算密集型的,那就沒有必要建立比處理器核數更多的線程)。
  • 若是你並行的工做單元還涉及等待I/O的操做(包括網絡鏈接等待),那麼使用CompletableFuture靈活性更好,你能夠像前文討論的那樣,依據等待/計算,或者 W/C的比率設定須要使用的線程數。這種狀況不使用並行流的另外一個緣由是,處理流的流水線中若是發生I/O等待,流的延遲性會讓咱們很難判斷到底何時觸發了等待。

4. 對多個異步任務進行流水線操做

4.1 案例

經過在shop構成的流上採用流水線方式執行三次map操做,咱們獲得告終果。

  • 第一個操做將每一個shop對象轉換成了一個字符串,該字符串包含了該 shop中指定商品的價格和折扣代碼。
  • 第二個操做對這些字符串進行了解析,在Quote對象中對它們進行轉換。
  • 第三個map會操做聯繫遠程的Discount服務,計算出最終的折扣價格,並返回該價格及提供該價格商品的shop。

代碼如圖:

原理圖:

Java 8的CompletableFuture API提供了名爲thenCompose的方法,它就是專門爲這一目的而設計的,thenCompose方法容許你對兩個異步操做進行流水線,第一個操做完成時,將其結果做爲參數傳遞給第二個操做。換句話說,你能夠建立兩個CompletableFutures對象,對第一個CompletableFuture對象調用thenCompose,並向其傳遞一個函數。當第一個 CompletableFuture執行完畢後,它的結果將做爲該函數的參數,這個函數的返回值是以第一 個CompletableFuture的返回作輸入計算出的第二個CompletableFuture對象。thenCompose方法像CompletableFuture類中的其餘方法同樣,也提供了一個以Async後綴結尾的版本thenComposeAsync。一般而言,名稱中不帶Async的方法和它的前一個任務同樣,在同一個線程中運行;而名稱以Async結尾的方法會將後續的任務提交到一個線程池,因此每一個任務是由不一樣的線程處理的。

4.2 thenCombine方法

將兩個CompletableFuture對象結合起來,不管他們是否存在依賴。thenCombine方法,它接收名爲BiFunction的第二參數,這個參數 定義了當兩個CompletableFuture對象完成計算後,結果如何合併。同thenCompose方法同樣, thenCombine方法也提供有一個Async的版本。這裏,若是使用thenCombineAsync會致使BiFunction中定義的合併操做被提交到線程池中,由另外一個任務以異步的方式執行。

代碼圖:

原理圖:

4.3 響應CompletableFuture的completion事件

Java 8的CompletableFuture經過thenAccept方法提供了這一功能,它接收 CompletableFuture執行完畢後的返回值作參數。thenAccept方法也提供 了一個異步版本,名爲thenAcceptAsync。異步版本的方法會對處理結果的消費者進行調度, 從線程池中選擇一個新的線程繼續執行,再也不由同一個線程完成CompletableFuture的全部任 務。由於你想要避免沒必要要的上下文切換,更重要的是你但願避免在等待線程上浪費時間,儘快響應CompletableFuture的completion事件,因此這裏沒有采用異步版本。

4.3.1 實戰

5. 小結

  • 執行比較操做時,尤爲是那些依賴一個或多個遠程服務的操做,使用異步任務能夠改善程序的性能,加快程序的響應速度。
  • 你應該儘量地爲客戶提供異步API。使用CompletableFuture類提供的特性,你可以輕鬆地實現這一目標。
  • CompletableFuture類還提供了異常管理的機制,讓你有機會拋出/管理異步任務執行中發生的異常。
  • 將同步API的調用封裝到一個CompletableFuture中,你可以以異步的方式使用其結果。
  • 若是異步任務之間相互獨立,或者它們之間某一些的結果是另外一些的輸入,你能夠將這些異步任務構造或者合併成一個。
  • 你能夠爲CompletableFuture註冊一個回調函數,在Future執行完畢或者它們計算的結果可用時,針對性地執行一些程序。
  • 你能夠決定在何時結束程序的運行,是等待由CompletableFuture對象構成的列表中全部的對象都執行完畢,仍是隻要其中任何一個首先完成就停止程序的運行。

Tips

本文同步發表在公衆號,歡迎你們關注!😁 後續筆記歡迎關注獲取第一時間更新!

相關文章
相關標籤/搜索