【Akka】在併發程序中使用Future

引言

在Akka中, 一個Future是用來獲取某個併發操做的結果的數據結構。這個操做通常是由Actor運行或由Dispatcher直接運行的. 這個結果可以以同步(堵塞)或異步(非堵塞)的方式訪問。
Future提供了一種簡單的方式來運行並行算法。css

Future直接使用

Future中的一個常見用例是在不需要使用Actor的狀況下併發地運行計算。java


Future有兩種使用方式:git

  1. 堵塞方式(Blocking):該方式下,父actor或主程序中止運行知道所有future完畢各自任務。經過scala.concurrent.Await使用。

  2. 非堵塞方式(Non-Blocking),也稱爲回調方式(Callback):父actor或主程序在運行期間啓動future,future任務和父actor並行運行,當每個future完畢任務,將通知父actor。經過onCompleteonSuccessonFailure方式使用。

運行上下文(ExecutionContext)

爲了運行回調和操做,Futures需要有一個ExecutionContextgithub


假設你在做用域內有一個ActorSystem。它會它本身派發器用做ExecutionContext,你也可以用ExecutionContext伴生對象提供的工廠方法來將Executors和ExecutorServices進行包裹。或者甚至建立本身的實例。
經過導入ExecutionContext.Implicits.global來導入默認的全局運行上下文。算法

你可以把該運行上下文看作是一個線程池,ExecutionContext是在某個線程池運行任務的抽象。markdown


假設在代碼中沒有導入該運行上下文,代碼將沒法編譯。數據結構

堵塞方式

第一個樣例展現怎樣建立一個future,而後經過堵塞方式等待其計算結果。儘管堵塞方式不是一個很是好的使用方法,但是可以說明問題。併發


這個樣例中。經過在將來某個時間計算1+1,當計算結果後再返回。dom

import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

object FutureBlockDemo extends App{
  implicit val baseTime = System.currentTimeMillis

  // create a Future
  val f = Future {
    Thread.sleep(500)
    1+1
  }
  // this is blocking(blocking is bad)
  val result = Await.result(f, 1 second)
  // 假設Future沒有在Await規定的時間裏返回,
  // 將拋出java.util.concurrent.TimeoutException
  println(result)
  Thread.sleep(1000)
}

代碼解釋:異步

  1. 在上面的代碼中。被傳遞給Future的代碼塊會被缺省的Dispatcher所運行。代碼塊的返回結果會被用來完畢Future。 與從Actor返回的Future不一樣,這個Future擁有正確的類型, 咱們還避免了管理Actor的開銷。
  2. Await.result方法將堵塞1秒時間來等待Future結果返回。假設Future在規定時間內沒有返回,將拋出java.util.concurrent.TimeoutException異常。
  3. 經過導入scala.concurrent.duration._,可以用一種方便的方式來聲明時間間隔,如100 nanos500 millis5 seconds1 minute1 hour3 days

    還可以經過Duration(100, MILLISECONDS)Duration(200, "millis")來建立時間間隔。

非堵塞方式(回調方式)

有時你只需要監聽Future的完畢事件,對其進行響應,不是建立新的Future,而不過產生反作用。


經過onComplete,onSuccess,onFailure三個回調函數來異步運行Future任務,然後二者不過第一項的特例。

使用onComplete的代碼演示樣例:

import scala.concurrent.{Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
import scala.util.Random

object FutureNotBlock extends App{
  println("starting calculation ...")
  val f = Future {
    Thread.sleep(Random.nextInt(500))
    42
  }

  println("before onComplete")
  f.onComplete{
    case Success(value) => println(s"Got the callback, meaning = $value")
    case Failure(e) => e.printStackTrace
  }

  // do the rest of your work
  println("A ...")
  Thread.sleep(100)
  println("B ....")
  Thread.sleep(100)
  println("C ....")
  Thread.sleep(100)
  println("D ....")
  Thread.sleep(100)
  println("E ....")
  Thread.sleep(100)

  Thread.sleep(2000)
}

使用onSuccess、onFailure的代碼演示樣例:

import scala.concurrent.{Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
import scala.util.Random

object Test12_FutureOnSuccessAndFailure extends App{
  val f = Future {
    Thread.sleep(Random.nextInt(500))
    if (Random.nextInt(500) > 250) throw new Exception("Tikes!") else 42
  }

  f onSuccess {
    case result => println(s"Success: $result")
  }

  f onFailure {
    case t => println(s"Exception: ${t.getMessage}")
  }

  // do the rest of your work
  println("A ...")
  Thread.sleep(100)
  println("B ....")
  Thread.sleep(100)
  println("C ....")
  Thread.sleep(100)
  println("D ....")
  Thread.sleep(100)
  println("E ....")
  Thread.sleep(100)

  Thread.sleep(1000)
}

代碼解釋:
上面兩段樣例中,Future結構中隨機延遲一段時間,而後返回結果或者拋出異常。
而後在回調函數中進行相關處理。

建立返回Future[T]的方法

先看一下演示樣例:

import scala.concurrent.{Await, Future, future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}

object ReturnFuture extends App{
  implicit val baseTime = System.currentTimeMillis

  // `future` method is another way to create a future
  // It starts the computation asynchronously and retures a Future[Int] that
  // will hold the result of the computation.
  def longRunningComputation(i: Int): Future[Int] = future {
    Thread.sleep(100)
    i + 1
  }

  // this does not block
  longRunningComputation(11).onComplete {
    case Success(result) => println(s"result = $result")
    case Failure(e) => e.printStackTrace
  }

  // keep the jvm from shutting down
  Thread.sleep(1000)
}

代碼解釋:
上面代碼中的longRunningComputation返回一個Future[Int],而後進行相關的異步操做。
當中future方法是建立一個future的還有一種方法。它將啓動一個異步計算並且返回包括計算結果的Future[T]

Future用於Actor

一般有兩種方法來從一個Actor獲取迴應: 第一種是發送一個消息actor ! msg。這樣的方法只在發送者是一個Actor時有效;另一種是經過一個Future。


使用Actor的?方法來發送消息會返回一個Future。 要等待並獲取結果的最簡單方法是:

import scala.concurrent.Await
import akka.pattern.ask
import scala.concurrent.duration._
import akka.util.Timeout

implicit val timeout = Timeout(5 seconds)
val future = actor ? msg
val result = Await.result(future, timeout.duration).asInstanceOf[String]

如下是使用?發送消息給actor,並等待迴應的代碼演示樣例:

import akka.actor._
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.{Await, Future}
import scala.language.postfixOps
import scala.concurrent.duration._

case object AskNameMessage

class TestActor extends Actor {
  def receive = {
    case AskNameMessage => // respond to the 'ask' request
      sender ! "Fred"
    case _ => println("that was unexpected")
  }
}
object AskDemo extends App{
  //create the system and actor
  val system = ActorSystem("AskDemoSystem")
  val myActor = system.actorOf(Props[TestActor], name="myActor")

  // (1) this is one way to "ask" another actor for information
  implicit val timeout = Timeout(5 seconds)
  val future = myActor ? AskNameMessage
  val result = Await.result(future, timeout.duration).asInstanceOf[String]
  println(result)

  // (2) a slightly different way to ask another actor for information
  val future2: Future[String] = ask(myActor, AskNameMessage).mapTo[String]
  val result2 = Await.result(future2, 1 second)
  println(result2)

  system.shutdown
}

代碼解釋:

  1. Await.result(future, timeout.duration).asInstanceOf[String]會致使當前線程被堵塞,並等待actor經過它的應答來完畢Future

    但是堵塞會致使性能問題。因此是不推薦的。

    致堵塞的操做位於Await.resultAwait.ready中,這樣就方便定位堵塞的位置。

  2. 還要注意actor返回的Future的類型是Future[Any],這是因爲actor是動態的。 這也是爲何上例中凝視(1)使用了asInstanceOf
  3. 在使用非堵塞方式時,最好使用mapTo方法來將Future轉換到指望的類型。假設轉換成功。mapTo方法會返回一個包括結果的新的 Future。假設不成功,則返回ClassCastException異常。

轉載請註明做者Jason Ding及其出處
Github博客主頁(http://jasonding1354.github.io/)
GitCafe博客主頁(http://jasonding1354.gitcafe.io/)
CSDN博客(http://blog.csdn.net/jasonding1354)
簡書主頁(http://www.jianshu.com/users/2bd9b48f6ea8/latest_articles)
Google搜索jasonding1354進入個人博客主頁

相關文章
相關標籤/搜索