Scala 並行和併發編程-Futures 和 Promises【翻譯】

官網地址 html

本文內容

  • 簡介
  • Futures
  • 阻塞
  • 異常
  • Promises
  • 工具

最近看了《七週七語言:理解多種編程泛型》,介紹了七種語言(四種編程範型)的主要特性:基本語法,集合,並行/併發,其中就有 Scala。你不能期望這種書全面介紹,由於其中任何一門語言都夠寫一本書了~java

我比較關注並行/併發,可是書中關於 Scala 的併發部分——Actor,可代碼編譯不經過,官網標註「Deprecated」,哎,這書點不負責,示例代碼也不寫採用編譯器的版本~編程

Java 8 以前,函數式編程實在太弱了,否則也不會出現像 Scala 這樣在 JVM 上運行、可以與 Java 完美配合的語言(估計,Java 在函數式編程方面的落後,已經讓社區等不急了)。api

本文來自 Scala 官網,完整示例代碼幾乎沒有,大部分是理論,雖然講解得很詳細,但看起來實在有點費勁。所以,你最好找點這方面完整示例再看看。數組

官網其實也有中文翻譯,但倒是機器翻譯的。promise

簡介


Future提供了一套高效非阻塞(non-blocking)的方式完成並行操做。其基本思想很簡單,所謂 Future,指的是一類佔位符對象(placeholder object),用於指代某些還沒有完成計算的結果。通常,由Future的計算結果都是並行執行的,計算完後再使用。以這種方式組織並行任務,即可以寫出高效、異步、非阻塞的並行代碼。服務器

默認狀況,future 和 promise 利用回調(callback)的非阻塞方式,並非採用典型的阻塞方式。爲了在語法和概念層面簡化回調的使用,Scala 提供了 flatMap、foreach 和 filter 等算子(combinator),使得咱們可以以非阻塞的方式對future進行組合。固然,future 對於那些必須使用阻塞的狀況仍然支持阻塞操做,能夠阻塞等待future(不過不鼓勵這樣作)。網絡

一個典型的 future 以下所示:session

val inverseFuture:Future[Matrix]=Future{
 
fatMatrix.inverse()// non-blocking long lasting computation
 
}(executionContext)

或是更經常使用的: 閉包

implicit val ec:ExecutionContext=...
 
val inverseFuture :Future[Matrix]=Future{
 
fatMatrix.inverse()
 
}// ec is implicitly passed

這兩個代碼片斷把 fatMatrix.inverse() 的執行委託給 ExecutionContext,在 inverseFuture 中體現計算結果。

Futures


所謂 Future,是一種用於指代某個還沒有就緒的值的對象。這個值一般是某個計算過程的結果:

  • 若該計算過程還沒有完成,咱們就說該Future未完成;
  • 若該計算過程正常結束,或中途拋出異常,咱們就說該Future已完成。

Future 完成分爲兩種狀況:

  • 當Future帶着某個值而完成時,咱們就說該Future帶着計算結果成功完成。
  • 當Future帶着異常而完成時,計算過程當中拋出的異常,咱們就說Future因異常而失敗。

Future 具備一個重要的屬性——只能被賦值一次。一旦給定了某個值或某個異常,future對象就變成了不可變對象——沒法再被改寫。

建立future對象最簡單的方法是調用future方法,開始異步(asynchronous)計算,並返回保存有計算結果的futrue。一旦該future計算完成,其結果就變的可用。

注意,Future[T] 是一個類型,表示future對象,而future是一個方法,建立和調度一個異步計算,並返回一個帶有計算結果的future對象。

下面經過一個例子來展現。

假設,咱們使用某個社交網絡假想的API獲取某個用戶的朋友列表,咱們將打開一個新對話(session),而後發送一個獲取特定用戶朋友列表的請求。

import scala.concurrent._
 
importExecutionContext.Implicits.global
 
val session = socialNetwork.createSessionFor("user", credentials)
 
val f:Future[List[Friend]]=Future{
 
session.getFriends()
 
}
 

上面,首先導入 scala.concurrent 包。而後,經過一個假想的 createSessionFor 方法初始化一個向服務器發送請求 session 變量。這個請求是經過網絡發送的,因此可能耗時很長。調用 getFriends 方法返回 List[Friend]。爲了更好的利用CPU,知道響應到達,不該該阻塞(block)程序的其餘部分,這個計算應該被異步調度。future方法就是這樣作的,它並行地執行指定的計算塊,在這個例子中,向服務器發送請求,等待響應。

一旦服務器響應,future f 中的好友列表將變得可用。

失敗可能會致使一個 exception。在下面的例子中,session 的值未被正確的初始化,因而,future 塊中計算將拋出一個 NullPointerException。這樣,future f 失敗了。

val session =null
 
val f:Future[List[Friend]]=Future{
 
session.getFriends
 
}
 

上面的 import ExecutionContext.Implicits.global 導入默認的全局執行上下文(global execution context)。執行上下文執行提交給他們的任務,你也可把執行上下文看做線程池,這對future方法是必不可少的,由於,它們處理如何和什麼時候執行異步計算。你能夠定義本身的執行上下文,並用 future 使用,但如今,只須要知道你可以經過上面的語句導入默認執行上下文就足夠了。

咱們的例子是基於一個假想的社交網絡 API,計算包含了發送網絡請求和等待響應。下面,假設你有一個文本文件,想找出一個特定詞第一次出現的位置。當磁盤正在檢索此文件時,這個計算過程可能會陷入阻塞,所以,並行執行程序的剩餘部分將頗有意義。

val firstOccurrence:Future[Int]=Future{
 
val source = scala.io.Source.fromFile("e:\scala\myText.txt")
 
source.toSeq.indexOfSlice("myKeyword")
 
}

回調函數

如今,咱們知道如何開始一個異步計算來建立一個新的future值,可是咱們沒有演示一旦此結果變得可用後如何使用。咱們常常對計算結果感興趣而不只僅是它的反作用(side-effects)。

在許多future的實現中,一旦future的客戶端對結果感興趣,它必須阻塞它本身的計算,並等待直到future完成——而後才能使用future的值繼續它本身的計算。雖然這在Scala Future API(在後面會展現)中是容許的,但從性能角度來看更好的辦法是徹底非阻塞,即在future中註冊一個回調。一旦future完成,就異步調用回調。若是當註冊回調,future已經完成,那麼,回調或是異步執行,或在相同的線程中循序執行。

註冊回調最一般的形式,是使用OnComplete方法,即建立一個Try[T] => U 類型的回調函數。若是future成功完成,回調則會應用到Success[T]類型的值中,不然應用到 Failure[T] 類型的值中。

Try[T]Option[T] Either[T, S] 類似,由於它是一個可能持有某種類型值的單子(monda)。然而,它是爲持有一個值或異常對象特殊設計的。Option[T] 既能夠是一個值(如:Some[T])也能夠徹底不是值(如:None),若是Try[T]得到一個值是,那麼它是 Success[T] ,不然爲持有異常的 Failure[T]Failure[T] 有不少信息,不只僅是關於爲何沒有值 None。同時,也能夠把 Try[T] 看做一種特殊版本的 Either[Throwable, T],特別是當左邊值爲一個 Throwable 的情形。

「一個單子(Monad)說白了不過就是自函子範疇上的一個幺半羣而已。」這句話出自Haskell大神Philip Wadler,也是他提議把Monad引入Haskell。

回到咱們社交網絡的例子,假設,咱們想獲取最近的帖子並顯示在屏幕上,能夠經過調用 getRecentPosts 方法,它返回 List[String]:

import scala.util.{Success,Failure}
 
val f:Future[List[String]]=Future{
 
session.getRecentPosts
 
}
 
f onComplete {
 
caseSuccess(posts)=>for(post <- posts) println(post)
 
caseFailure(t)=> println("An error has occured: "+ t.getMessage)
 
}
onComplete 方法容許客戶處理失敗或成功的future 結果。對於成功,onSuccess 回調使用以下:
val f:Future[List[String]]=Future{
session.getRecentPosts
}
f 
onSuccess {
case posts =>for(post <- posts
) 
println(post)
}
對於失敗,onFailure 回調使用以下:
val f:Future[List[String]]=Future{
 
session.getRecentPosts
 
}
 
f onFailure {
 
case t => println("An error has occured: "+ t.getMessage)
 
}
 
f onSuccess {
 
case posts =>for(post <- posts) println(post)
 
}

onFailure 回調只有在 future 失敗,也就是包含一個異常時纔會執行。

由於部分函數(partial functions)具備 isDefinedAt 方法, 因此,onFailure 方法只有爲了特定 Throwable 而定義纔會觸發。下面的例子,已註冊的 onFailure 回調永遠不會被觸發:

val f =Future{
2/0
}
 
f onFailure {
case npe:NullPointerException=>
println("I'd be amazed if this printed out.")
}
 

部分函數(Partial functions),假設有一個數學函數 f(a,b,c)partial(f,1)返回的是數學函數 f(1,b,c),函數的參數 a 已經被代入。

回到前面例子,查找某個第一次出現的關鍵字,在屏幕上輸出該關鍵字的位置:

val firstOccurrence:Future[Int]=Future{
val source = scala.io.Source.fromFile("myText.txt")
source.toSeq.indexOfSlice("myKeyword")
}
firstOccurrence onSuccess {
case idx => println("The keyword first appears at position: "+ idx)
}
firstOccurrence onFailure {
case t => println("Could not process file: "+ t.getMessage)
}

onComplete,、onSuccess 和 onFailure 方法都具備結果類型 Unit,這意味着這些回調方法不能被連接。注意,這種設計是爲了不鏈式調用可能隱含在已註冊回調上一個順序的執行(同一個 future 中註冊的回調是無序的)。

也就是說,咱們如今應討論論什麼時候調用回調。由於回調須要future 中的值是可用的,只有future完成後才能被調用。然而,不能保證被完成 future 的線程或建立回調的線程調用。反而, 回調有時會在future對象完成後被某個線程調用。咱們能夠說,回調最終會被執行。

更進一步,回調被執行的順序不是預先定義的,甚至在同一個應用程序。事實上,回調也許不是一個接一個連續調用的,但在同一時間併發調用。這意味着,下面例子中,變量 totalA 也許不能從計算的文本中獲得大小寫字母數量的正確值。

@volatilevar totalA =0
val text =Future{
"na"*16+"BATMAN!!!"
}
text onSuccess {
case txt => totalA += txt.count(_ =='a')
}
text onSuccess {
case txt => totalA += txt.count(_ =='A')
}
 

上面,兩個回調可能一個接一個地執行,變量 totalA 獲得的預期值爲 18。然而,它們也多是併發執行,因而,totalA 最終多是16或2,由於+= 不是一個原子操做(即它是由一個讀和一個寫的步驟組成,這樣就可能使其與其餘的讀和寫任意交錯執行)。

考慮到完整性,回調的語義以下:

  1. future 上註冊 onComplete 回調,要確保 future 執行完成後,相應的閉包(closure)最終被調用。
  2. 註冊 onSuccess 或 onFailure 回調,與 onComplete 語義同樣,不一樣的是,只有在 future 成功地或失敗地執行完後,纔會調用。
  3. 在 future 上註冊一個已經完成的回調,將致使回調最終被執行。將最終致使一直處於執行狀態的回調(上面 1 所隱含的)。
  4. 在 future 上註冊多個回調時,這些回調的執行順序是不肯定的。事實上,這些回調多是並行執行的,然而,某個 ExecutionContext 執行可能致使明確的執行順序。
  5. 在某些回調拋出異常時,其餘回調的執行不受影響。
  6. 在某些回調沒法永遠沒法結束時(例如,回調包含一個無限循環),其餘回調可能徹底不會執行。這種狀況下,那些潛在的阻塞的回調須要使用阻塞結構。將在後面「阻塞」小節說明。
  7. 一旦執行完後,回調會從 future 對象中移除,這對垃圾回收機制(GC)很合適。

函數組合(Functional Composition)和For解構(For-Comprehensions)

儘管前文所展現的回調機制已經足夠把future的結果和後繼計算結合起來的,可是有時回調機制並不易於使用,且容易形成冗餘的代碼。能夠經過一個例子來講明。假設,咱們有一個用於進行貨幣交易服務的API,想要在有盈利的時候購進一些美圓。讓咱們先來看看怎樣用回調來解決這個問題:

val rateQuote =Future{
connection.getCurrentValue(USD)
}
rateQuote onSuccess {case quote =>
val purchase =Future{
if(isProfitable(quote)) connection.buy(amount, quote)
else throw new Exception("not profitable")
}
purchase onSuccess {
case _ => println("Purchased "+ amount +" USD")
}
}
 

首先,咱們建立一個名爲rateQuote的future對象並得到當前的匯率。從服務器返回匯率且該future對象成功完成以後,在onSuccess回調判斷買仍是不買。因此,咱們建立了另外一個名爲purchase的future對象,用來在可盈利的狀況下作出購買決定,並在發送一個請求。最後,一旦purchase運行結束,會輸出一條通知消息。

這確實是可行,但有兩點緣由使這種作法並不方便。其一,不得不使用onSuccess,且不得不在其中嵌套另外一個purchase future對象。試想一下,若是在purchase執行完成以後咱們可能想要賣掉一些其餘的貨幣。這時將不得不在onSuccess的回調中重複這個模式,從而可能使代碼過分嵌套,冗長且難以理解。

其二,purchase只是定義在局部範圍,只能被來自onSuccess內部的回調響應。也就是說,這個應用的其餘部分看不到purchase,並且不能爲它註冊其餘的onSuccess回調,好比說賣掉些其餘貨幣。

爲解決上述的兩個問題,futures提供了組合器(combinators)使之具備更多易用的組合。映射(map)是最基本的組合器之一。試想,給定一個future對象和一個經過映射來得到該future值的函數,映射方法將建立一個新Future對象,一旦原來的Future成功完成了計算,新Future會經過該返回值來完成本身的計算。你可以像理解容器(collections)的map同樣來理解future的map。

讓咱們用map的方法來重構一下前面的例子:

val rateQuote =Future{
 
connection.getCurrentValue(USD)
 
}
 
val purchase = rateQuote map { quote =>
 
if(isProfitable(quote)) connection.buy(amount, quote)
 
elsethrownewException("not profitable")
 
}
 
purchase onSuccess {
 
case _ => println("Purchased "+ amount +" USD")
 
}
 

經過對rateQuote的映射咱們減小了一次onSuccess回調,更重要的是避免了嵌套。這時若是咱們決定出售一些貨幣就能夠再次使用purchase方法上的映射了。

但是,若是isProfitable方法返回了false將會發生些什麼?會引起異常?這種狀況下,purchase的確會因異常而失敗。不只僅如此,想象一下,連接的中斷和getCurrentValue方法拋出異常會使rateQuote的操做失敗。在這些狀況下,映射將不會返回任何值,而purchase也會自動以和rateQuote相同的異常而執行失敗。

總之,若是原Future的計算成功完成了,那麼返回的Future將會使用原Future的映射值來完成計算。若是映射函數拋出了異常,則Future也會帶着該異常完成計算。若是原Future因爲異常而計算失敗,那麼返回的Future也會包含相同的異常。這種異常的傳導方式也一樣適用於其餘的組合器(combinators)。

Future的這個設計目的是使之可以在 For-comprehensions 下使用。也正是由於這,Future還擁有flatMap,filter和foreach等組合器。其中,flatMap方法能夠構造一個函數,能夠把值映射到一個姑且稱爲g的新future,而後返回一個隨g的完成而完成的Future對象。

讓咱們假設,想把一些美圓兌換成法郎。必須爲這兩種貨幣報價,而後再在這兩個報價的基礎上肯定交易。下面是一個在 for-comprehensions 中使用flatMap和withFilter的例子:

val usdQuote =Future{ connection.getCurrentValue(USD)}
 
val chfQuote =Future{ connection.getCurrentValue(CHF)}
 
val purchase =for{
 
usd <- usdQuote
 
chf <- chfQuote
 
if isProfitable(usd, chf)
 
}yield connection.buy(amount, chf)
 
purchase onSuccess {
 
case _ => println("Purchased "+ amount +" CHF")
 
}
 

purchase只有當usdQuote和chfQuote都完成計算後才能完成,它以其餘兩個Future的計算值爲前提,因此它本身的計算不能更早開始。

上面的 for-comprhension 將被轉換爲:

val purchase = usdQuote flatMap {
 
usd =>
 
chfQuote
 
.withFilter(chf => isProfitable(usd, chf))
 
.map(chf => connection.buy(amount, chf))
 
}
 

這的確是比for-comprehension稍微難以把握一些,可是咱們這樣分析有助於更容易的理解flatMap操做。FlatMap操做會把自身的值映射到其餘future對象上,並隨着該對象計算完成的返回值一塊兒完成計算。在例子中,flatMap用usdQuote的值把chfQuote的值映射到第三個futrue對象裏,該對象用於發送必定量法郎的購入請求。只有當經過映射返回的第三個future對象完成了計算,purchase才能完成計算。

這可能有些難以置信,但幸運的是faltMap操做在for-comprhensions模式之外不多使用,由於for-comprehensions自己更容易理解和使用。

再說說filter,它能夠用於建立一個新的future對象,該對象只有在知足某些特定條件的前提下才會獲得原始future的計算值,不然就會拋出一個NoSuchElementException的異常而失敗。調用了filter的future,其效果與直接調用withFilter徹底同樣。

collect 和 filter 組合器之間的關係有些相似容器(collections)API裏那些方法之間的關係。

值得注意的是,調用foreach組合器並不會在計算值可用的時候阻塞當前的進程去獲取計算值。偏偏相反,只有當future對象成功計算完成後,foreach所迭代的函數纔可以被異步執行。這意味着foreach與onSuccess回調意義徹底相同。

因爲Future trait(trait 相似 java 中的接口 interface)從概念上看包含兩種類型的返回值(計算結果和異常),因此組合器會有一個處理異常的需求。

好比,咱們準備在rateQuote的基礎上決定購入必定量的貨幣,那麼connection.buy方法須要知道購入的數量和指望的報價值,最終完成購買的數量將會被返回。假如報價值恰恰在這個時候改變了,那麼,buy方法將會拋出一個QuoteChangedExecption,而且不會作任何交易。若是咱們想讓咱們的Future對象返回0,而不是拋出那個該死的異常,那咱們須要使用recover組合器:

val purchase:Future[Int]= rateQuote map {
 
quote => connection.buy(amount, quote)
 
} recover {
 
caseQuoteChangedException()=>0
 
}
 

recover可以建立一個新future對象,該對象當計算完成時持有和原future對象同樣的值。若是執行不成功,則偏函數的參數會被傳遞給使原Future失敗的那個Throwable異常。若是它把Throwable映射到了某個值,那麼新的Future就會成功完成並返回該值。若是偏函數沒有定義在Throwable中,那麼最終產生結果的future也會失敗並返回一樣的Throwable。

組合器recoverWith可以建立一個新future對象,當原future對象成功完成計算時,新future對象包含有和原future對象相同的計算結果。若原future失敗或異常,偏函數將會返回形成原future失敗的相同的Throwable異常。若是此時Throwable又被映射給了別的future,那麼新Future就會完成並返回這個future的結果。recoverWith 和 recover的關係與flatMap和map之間的關係很像。

fallbackTo組合器生成的future對象能夠在該原future成功完成計算時返回結果,若是原future失敗或異常返回future參數對象的成功值。在原future和參數future都失敗的狀況下,新future對象會完成並返回原future對象拋出的異常。正以下面例子,本想打印美圓的匯率,但在獲取美圓匯率失敗的狀況下會打印法郎的匯率:

val usdQuote =Future{
connection.getCurrentValue(USD)
} map {
usd =>"Value: "+ usd +"$"
}
 
val chfQuote =Future{
connection.getCurrentValue(CHF)
} map {
chf =>"Value: "+ chf +"CHF"
}
 
val anyQuote = usdQuote fallbackTo chfQuote
anyQuote onSuccess { println(_)}

組合器andThen的用法是出於純粹的side-effecting目的。經andThen返回的新Future,不管原Future成功或失敗都會返回與原Future同樣的結果。一旦原Future完成並返回結果,andThen後跟的代碼塊就會被調用,且新Future將返回與原Future同樣的結果,這確保了多個andThen調用的順序執行。正以下例所示,這段代碼能夠從社交網站上把近期發出的帖子收集到一個可變集合裏,而後把它們都打印在屏幕上:

val allposts =mutable.Set[String]()
Future{
session.getRecentPosts
} andThen {
posts => allposts ++= posts
} andThen {
posts =>
clearAll()
for(post <- allposts) render(post)
}

綜上所述,Future的組合器功能是純函數式的,每種組合器都會返回一個與原Future相關的新Future對象。

投影(Projections)

爲了確保 for-comprehensions 可以返回異常,futures也提供了投影(projections)。若是原future對象失敗了,失敗的投影(projection)會返回一個帶有Throwable類型返回值的future對象。若是原Future成功了,失敗的投影(projection)會拋出一個NoSuchElementException異常。下面是一個在屏幕上打印出異常的例子:

val f =Future{
 
2/0
 
}
 
for(exc <- f.failed) println(exc)
下面例子不會在屏幕上打印出任何東西:
val f =Future{
4/2
}
for(exc <- f.failed) println(exc)

Future 擴展

更多的實用方法對Futures API進行了擴展支持,這將爲不少外部框架提供更多專業工具。

阻塞


future 通常都是異步的,不會阻塞潛在的執行線程。然而,在某些狀況下,阻塞是必要的。咱們區分了兩種阻塞執行線程的形式:future 內的阻塞,以及 future 外的阻塞,等待直到 future 完成。

Future 內的阻塞

As seen with the global ExecutionContext, it is possible to notify an ExecutionContext of a blocking call with the blocking construct. The implementation is however at the complete discretion of the ExecutionContext. While some ExecutionContext such as ExecutionContext.global implement blocking by means of a ManagedBlocker, some execution contexts such as the fixed thread pool:

正如全局 ExecutionContext,它能夠通知一個具備阻塞結構的阻塞調用的 ExecutionContext。然而,實現是很慎重的。當某些 ExecutionContext 經過 ManagedBlocker 實現阻塞,一些執行上下文,如固定的線程池:

ExecutionContext.fromExecutor(Executors.newFixedThreadPool(x))

下面代碼將什麼都不作:

implicit val ec =ExecutionContext.fromExecutor(
Executors.newFixedThreadPool(4))
Future{
blocking { blockingStuff()}
}

下面效果同樣:

Future{ blockingStuff()}

阻塞的代碼也可能拋出異常。在這種狀況下,這個異常會轉發給調用者。

future 外阻塞

正如前面所說,在 future 上阻塞是不鼓勵的,由於會出現性能和死鎖。回調(Callbacks)和組合器(combinators)纔是首選方法,但在某些狀況中阻塞也是須要的,而且Futures 和 Promises API 也支持。

在以前的併發交易例子中,在最後有一處用到阻塞來肯定是否全部的futures都已完成。下面是如何使用block來處理一個future結果的例子:

import scala.concurrent._
import scala.concurrent.duration._
 
def main(args:Array[String]){
val rateQuote =Future{
connection.getCurrentValue(USD)
}
 
 
val purchase = rateQuote map { quote =>
if(isProfitable(quote)) connection.buy(amount, quote)
elsethrownewException("not profitable")
}
 
Await.result(purchase,0 nanos)
}

在這種狀況下,future失敗了,調用者轉發出了該future 失敗的異常。它包含了失敗的投影(projection)——阻塞該結果,將會形成一個NoSuchElementException異常,若原future對象被成功完成。

相反的,調用Await.ready來等待,知道這個future完成,但獲不到結果。一樣,若是 future 失敗,調用不會拋出異常的方法。

Future trait 用ready()result()方法實現了Awaitable trait。這些方法不能被客戶端直接調用——它們只能被執行上下文調用。

異常


當異步計算拋出未處理的異常時,與那些計算相關的futures就失敗了。失敗的futures存儲了一個Throwable的實例,而不是返回值。Futures提供onFailure回調方法,它用一個PartialFunction去表示一個Throwable。下列特殊異常的處理方式不一樣:

scala.runtime.NonLocalReturnControl[_] –此異常保存了一個與返回相關聯的值。一般狀況下,在方法體中的返回結構被調用去拋出這個異常。相關聯的值將會存儲到future或一個promise中,而不是一直保存在這個異常中。

ExecutionException-當由於一個未處理的中斷異常、錯誤或者scala.util.control.ControlThrowable致使計算失敗時會被存儲起來。這種狀況下,ExecutionException會爲此具備未處理的異常。這些異常會在執行失敗的異步計算線程中從新拋出。這樣作的目的,是爲了防止正常狀況下沒有被客戶端代碼處理過的那些關鍵的、與控制流相關的異常繼續傳播下去,同時告知客戶端其中的future對象是計算失敗的。

更精確的語義描述請參見 [NonFatal]。

Promises


到目前爲止,咱們僅考慮了經過異步計算的方式建立future對象來使用future的方法。儘管如此,futures也可使用promises來建立。

若是說futures是爲了一個尚未存在的結果,而當成一種只讀佔位符的對象類型去建立,那麼promise就被認爲是一個可寫的,能夠實現一個future的單一賦值容器。這就是說,promise經過這種success方法能夠成功去實現一個帶有值的future。相反的,由於一個失敗的promise經過failure方法就會實現一個帶有異常的future。

一個promise p經過p.future方式返回future。 這個futrue對象被指定到promise p。根據這種實現方式,可能就會出現p.future與p相同的狀況。

考慮下面的生產者 - 消費者的例子,其中一個計算產生一個值,並把它轉移到另外一個使用該值的計算。這個傳遞中的值經過一個promise來完成。

import scala.concurrent.{ future, promise }
import scala.concurrent.ExecutionContext.Implicits.global
 
val p = promise[T]
val f = p.future
 
val producer = future {
  val r = produceSomething()
  p success r
  continueDoingSomethingUnrelated()
}
 
val consumer = future {
  startDoingSomething()
  f onSuccess {
case r => doSomethingWithResult()
}
}

這裏,咱們建立了一個promise,並利用它的future方法得到一個 Future。而後,開始兩個異步計算。第一種,作些計算,將結果放在 r 中,經過執行promise p,這個值被用來完成future對象f。第二個,也作某寫計算,而後讀取實現了 future f的計算結果值 r。注意,在生產者完成執行 continueDoingSomethingUnrelated() 方法以前,消費者能夠得到這個結果值。

正如前面提到的,promises 具備單賦值語義。所以,它們僅能被實現一次。在一個已經計算完成的promise或者failed的promise上調用success方法將會拋出一個IllegalStateException 異常。

下面例子顯示瞭如何失敗一個 promise。

val p = promise[T]
val f = p.future
 
val producer = future {
  val r = someComputation
if(isInvalid(r))
    p failure (newIllegalStateException)
else{
    val q = doSomeMoreComputation(r)
    p success q
}
}

如上,生產者計算出一箇中間結果 r,並判斷它的有效性。若是無效,它會經過返回一個異常實現 promise p 的方式失敗這個 promise,關聯的future f是 failed。不然,生產者會繼續它的計算,最終使用一個有效的結果值實現 future f,同時實現 promise p。

Promises也能經過一個 complete 方法來實現,這個方法採用了一個 potential value Try[T],這個值要麼是一個類型爲 Failure[Throwable] 的失敗結果,要麼是一個類型爲 Success[T] 的成功結果。

相似 success 方法,在一個已經完成的 promise 對象上調用 failure 和 complete 方法一樣會拋出一個 IllegalStateException 異常。

前面所述的promises和futures方法的一個優勢是,這些方法是單一操做的,而且是沒有反作用(side-effects)的,所以程序是肯定性的(deterministic)。肯定性意味着,若是該程序沒有拋出異常(future的計算值被得到),不管並行的程序如何調度,那麼程序的結果將會永遠是同樣的。

在一些狀況下,客戶端也許但願可以只在 promie 沒有完成的狀況下完成該 promise的計算(例如,若是有多個HTTP請求被多個不一樣的futures對象來執行,而且客戶端只關心一個HTTP應答,該應答對應於第一個完成該 promise的 future)。所以,future 提供了 tryComplete,trySuccess 和 tryFailure 方法。客戶端須要意識到調用這些的結果是不肯定的,調用的結果將依賴程序的調度。

completeWith 方法將用另一個 future 完成 promise 計算。當該future結束的時候,該promise對象獲得那個future對象一樣的值,以下的程序將打印 1:

val f = future {1}
val p = promise[Int]
 
p completeWith f
 
p.future onSuccess {
case x => println(x)
}

當讓一個 promise 以異常失敗的時候,三種子類型的 Throwable 異常被分別的處理。若是中斷該promise的可拋出(Throwable)一場是scala.runtime.NonLocalReturnControl,那麼該promise將以對應的值結束;若是是一個Error的實例,InterruptedException 或者scala.util.control.ControlThrowable,那麼該可拋出(Throwable)異常將會封裝一個 ExecutionException 異常,該 ExectionException 將會讓該 promise 以失敗結束。

經過使用promises,futures的onComplete方法和future的構造方法,你可以實現前文描述的任何函數式競爭組合器(compition combinators)。假設,你想實現一個新的組合器,該組合器首先使用兩個future對象 f 和 g,併產生第三個future,該 future 是經過 f 或者 g 完成的,但只在成功完成的狀況下。

下面是關於這個的例子:

def first[T](f:Future[T], g:Future[T]):Future[T]={
  val p = promise[T]
 
  f onSuccess {
case x => p.trySuccess(x)
}
 
  g onSuccess {
case x => p.trySuccess(x)
}
 
  p.future

注意,在這種實現方式中,若是f與g都不成功,那麼 first(f, g)將永遠不會完成(即返回一個值或者返回一個異常)。

工具(Utilities)


爲了簡化在併發應用中處理時序的問題,scala.concurrent 引入了 Duration 抽象。Duration 不是被做爲另一個一般的時間抽象存在的。他是爲了用在併發庫中,Duration位於scala.concurrent包中。

Duration是表示時間長短的基礎類,其能夠是有限的或者無限的。有限的duration用FiniteDuration類來表示,並經過時間長度(length)java.util.concurrent.TimeUnit來構造。無限的durations,一樣擴展了Duration,只在兩種狀況下存在,Duration.Inf Duration.MinusInf。庫中一樣提供了一些Durations的子類用來作隱式的轉換,這些子類不該被直接使用。

抽象的 Duration 類包含了以下方法:

  1. 轉換到不一樣的四件單位(toNanos, toMicros, toMillis, toSeconds, toMinutes, toHours, toDays and toUnit(unit: TimeUnit));
  2. durations 比較(<, <=, > 和 >=);
  3. 算術運算符(+, -, *, / 和 unary_-);
  4. Minimum and maximum between this duration and the one supplied in the argument (min, max).
  5. 檢查 duration 是否無限(isFinite)。

Duration 可以用以下方法實例化(instantiated):

  1. 隱式的經過Int和Long類型轉換得來 val d = 100 millis
  2. 經過傳遞一個Long lengthjava.util.concurrent.TimeUnit。例如val d = Duration(100, MILLISECONDS)
  3. 經過傳遞一個字符串來表示時間區間,例如 val d = Duration("1.2 µs")

Duration也提供了unapply方法,能夠被用於模式匹配構造(pattern matching constructs)中,例如:

import scala.concurrent.duration._
import java.util.concurrent.TimeUnit._
 
// instantiation
val d1 =Duration(100, MILLISECONDS)// from Long and TimeUnit
val d2 =Duration(100,"millis")// from Long and String
val d3 =100 millis // implicitly from Long, Int or Double
val d4 =Duration("1.2 µs")// from String
 
// pattern matching
val Duration(length, unit)=5 millis

參考資料


相關文章
相關標籤/搜索