在Akka中, 一個Future
是用來獲取某個併發操做的結果的數據結構。這個操做通常是由Actor運行或由Dispatcher直接運行的. 這個結果可以以同步(堵塞)或異步(非堵塞)的方式訪問。
Future提供了一種簡單的方式來運行並行算法。css
Future中的一個常見用例是在不需要使用Actor的狀況下併發地運行計算。java
Future有兩種使用方式:git
- 堵塞方式(Blocking):該方式下,父actor或主程序中止運行知道所有future完畢各自任務。經過
scala.concurrent.Await
使用。- 非堵塞方式(Non-Blocking),也稱爲回調方式(Callback):父actor或主程序在運行期間啓動future,future任務和父actor並行運行,當每個future完畢任務,將通知父actor。經過
onComplete
、onSuccess
、onFailure
方式使用。
爲了運行回調和操做,Futures需要有一個ExecutionContext
。github
假設你在做用域內有一個ActorSystem
。它會它本身派發器用做ExecutionContext,你也可以用ExecutionContext伴生對象提供的工廠方法來將Executors和ExecutorServices進行包裹。或者甚至建立本身的實例。
經過導入ExecutionContext.Implicits.global
來導入默認的全局運行上下文。算法
你可以把該運行上下文看作是一個線程池,ExecutionContext是在某個線程池運行任務的抽象。markdown
假設在代碼中沒有導入該運行上下文,代碼將沒法編譯。數據結構
第一個樣例展現怎樣建立一個future,而後經過堵塞方式等待其計算結果。儘管堵塞方式不是一個很是好的使用方法,但是可以說明問題。併發
這個樣例中。經過在將來某個時間計算1+1,當計算結果後再返回。dom
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
object FutureBlockDemo extends App{
implicit val baseTime = System.currentTimeMillis
// create a Future
val f = Future {
Thread.sleep(500)
1+1
}
// this is blocking(blocking is bad)
val result = Await.result(f, 1 second)
// 假設Future沒有在Await規定的時間裏返回,
// 將拋出java.util.concurrent.TimeoutException
println(result)
Thread.sleep(1000)
}
代碼解釋:異步
- 在上面的代碼中。被傳遞給Future的代碼塊會被缺省的
Dispatcher
所運行。代碼塊的返回結果會被用來完畢Future
。 與從Actor返回的Future不一樣,這個Future擁有正確的類型, 咱們還避免了管理Actor的開銷。Await.result
方法將堵塞1秒時間來等待Future結果返回。假設Future在規定時間內沒有返回,將拋出java.util.concurrent.TimeoutException
異常。- 經過導入
scala.concurrent.duration._
,可以用一種方便的方式來聲明時間間隔,如100 nanos
,500 millis
,5 seconds
、1 minute
、1 hour
,3 days
。還可以經過
Duration(100, MILLISECONDS)
,Duration(200, "millis")
來建立時間間隔。
有時你只需要監聽Future
的完畢事件,對其進行響應,不是建立新的Future,而不過產生反作用。
經過onComplete
,onSuccess
,onFailure
三個回調函數來異步運行Future任務,然後二者不過第一項的特例。
使用onComplete的代碼演示樣例:
import scala.concurrent.{Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
import scala.util.Random
object FutureNotBlock extends App{
println("starting calculation ...")
val f = Future {
Thread.sleep(Random.nextInt(500))
42
}
println("before onComplete")
f.onComplete{
case Success(value) => println(s"Got the callback, meaning = $value")
case Failure(e) => e.printStackTrace
}
// do the rest of your work
println("A ...")
Thread.sleep(100)
println("B ....")
Thread.sleep(100)
println("C ....")
Thread.sleep(100)
println("D ....")
Thread.sleep(100)
println("E ....")
Thread.sleep(100)
Thread.sleep(2000)
}
使用onSuccess、onFailure的代碼演示樣例:
import scala.concurrent.{Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
import scala.util.Random
object Test12_FutureOnSuccessAndFailure extends App{
val f = Future {
Thread.sleep(Random.nextInt(500))
if (Random.nextInt(500) > 250) throw new Exception("Tikes!") else 42
}
f onSuccess {
case result => println(s"Success: $result")
}
f onFailure {
case t => println(s"Exception: ${t.getMessage}")
}
// do the rest of your work
println("A ...")
Thread.sleep(100)
println("B ....")
Thread.sleep(100)
println("C ....")
Thread.sleep(100)
println("D ....")
Thread.sleep(100)
println("E ....")
Thread.sleep(100)
Thread.sleep(1000)
}
代碼解釋:
上面兩段樣例中,Future結構中隨機延遲一段時間,而後返回結果或者拋出異常。
而後在回調函數中進行相關處理。
先看一下演示樣例:
import scala.concurrent.{Await, Future, future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
object ReturnFuture extends App{
implicit val baseTime = System.currentTimeMillis
// `future` method is another way to create a future
// It starts the computation asynchronously and retures a Future[Int] that
// will hold the result of the computation.
def longRunningComputation(i: Int): Future[Int] = future {
Thread.sleep(100)
i + 1
}
// this does not block
longRunningComputation(11).onComplete {
case Success(result) => println(s"result = $result")
case Failure(e) => e.printStackTrace
}
// keep the jvm from shutting down
Thread.sleep(1000)
}
代碼解釋:
上面代碼中的longRunningComputation返回一個Future[Int]
,而後進行相關的異步操做。
當中future
方法是建立一個future的還有一種方法。它將啓動一個異步計算並且返回包括計算結果的Future[T]
。
一般有兩種方法來從一個Actor獲取迴應: 第一種是發送一個消息actor ! msg
。這樣的方法只在發送者是一個Actor時有效;另一種是經過一個Future。
使用Actor的?
方法來發送消息會返回一個Future。 要等待並獲取結果的最簡單方法是:
import scala.concurrent.Await
import akka.pattern.ask
import scala.concurrent.duration._
import akka.util.Timeout
implicit val timeout = Timeout(5 seconds)
val future = actor ? msg
val result = Await.result(future, timeout.duration).asInstanceOf[String]
如下是使用?
發送消息給actor,並等待迴應的代碼演示樣例:
import akka.actor._
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.{Await, Future}
import scala.language.postfixOps
import scala.concurrent.duration._
case object AskNameMessage
class TestActor extends Actor {
def receive = {
case AskNameMessage => // respond to the 'ask' request
sender ! "Fred"
case _ => println("that was unexpected")
}
}
object AskDemo extends App{
//create the system and actor
val system = ActorSystem("AskDemoSystem")
val myActor = system.actorOf(Props[TestActor], name="myActor")
// (1) this is one way to "ask" another actor for information
implicit val timeout = Timeout(5 seconds)
val future = myActor ? AskNameMessage
val result = Await.result(future, timeout.duration).asInstanceOf[String]
println(result)
// (2) a slightly different way to ask another actor for information
val future2: Future[String] = ask(myActor, AskNameMessage).mapTo[String]
val result2 = Await.result(future2, 1 second)
println(result2)
system.shutdown
}
代碼解釋:
Await.result(future, timeout.duration).asInstanceOf[String]
會致使當前線程被堵塞,並等待actor經過它的應答來完畢Future
。但是堵塞會致使性能問題。因此是不推薦的。
致堵塞的操做位於
Await.result
和Await.ready
中,這樣就方便定位堵塞的位置。- 還要注意actor返回的Future的類型是
Future[Any]
,這是因爲actor是動態的。 這也是爲何上例中凝視(1)使用了asInstanceOf
。- 在使用非堵塞方式時,最好使用
mapTo
方法來將Future轉換到指望的類型。假設轉換成功。mapTo
方法會返回一個包括結果的新的 Future。假設不成功,則返回ClassCastException
異常。
轉載請註明做者Jason Ding及其出處
Github博客主頁(http://jasonding1354.github.io/)
GitCafe博客主頁(http://jasonding1354.gitcafe.io/)
CSDN博客(http://blog.csdn.net/jasonding1354)
簡書主頁(http://www.jianshu.com/users/2bd9b48f6ea8/latest_articles)
Google搜索jasonding1354進入個人博客主頁