akka-typed(2) - typed-actor交流方式和交流協議

   akka系統是一個分佈式的消息驅動系統。akka應用由一羣負責不一樣運算工做的actor組成,每一個actor都是被動等待外界的某種消息來驅動本身的做業。因此,通俗點描述:akka應用就是一羣actor相互之間發送消息的系統,每一個actor接收到消息後開始本身負責的工做。對於akka-typed來講,typed-actor只能接收指定類型的消息,因此actor之間的消息交流須要按照消息類型來進行,即須要協議來規範消息交流機制。想一想看,若是用戶須要一個actor作某件事,他必須用這個actor明白的消息類型來發送消息,這就是一種交流協議。app

所謂消息交流方式包括單向和雙向兩類。若是涉及兩個actor之間的消息交換,消息發送方式能夠是單向和雙向的。但若是是從外界向一個actor發送消息,那麼確定只能是單向的發送方式了,由於消息發送兩端只有一端是actor。分佈式

典型的單向消息發送fire-and-forget以下:函數

import akka.actor.typed._ import scaladsl._ object Printer { case class PrintMe(message: String) // 只接收PrintMe類型message
  def apply(): Behavior[PrintMe] = Behaviors.receive { case (context, PrintMe(message)) => context.log.info(message) Behaviors.same } } object FireAndGo extends App { // system就是一個root-actor
val system: ActorRef[Printer.PrintMe] = ActorSystem(Printer(), "fire-and-forget-sample") val printer: ActorRef[Printer.PrintMe] = system // 單向消息發送,printMe類型的消息
printer ! Printer.PrintMe("hello") printer ! Printer.PrintMe("world!") system.asInstanceOf[ActorSystem[Printer.PrintMe]].terminate() }

固然,在現實中一般咱們要求actor去進行某些運算而後返回運算結果。這就涉及到actor之間雙向信息交換了。第一種狀況:兩個actor之間的消息是任意無序的,這是一種典型的無順序request-response模式。就是說一個response不必定是按照request的接收順序返回的,只是它們之間可以交流而已。不過,在akka-typed中這種模式最基本的要求就是發送的消息類型必須符合接收方actor的類型。ui

好了,咱們先對這個模式作個示範。全部actor的定義能夠先從它的消息類型開始。對每一個參加雙向交流的actor來講,能夠從request和response兩種消息來反映它的功能:this

object FrontEnd { sealed trait FrontMessages case class SayHi(who: String) extends FrontMessages } object BackEnd { //先從這個actor的迴應消息開始
     sealed trait Response case class HowAreU(msg: String) extends Response case object Unknown extends Response //可接收消息類型
  sealed trait BackMessages //這個replyTo應該是一個能處理Reponse類型消息的actor
  case class MakeHello(who: String, replyTo: ActorRef[Response]) extends BackMessages }

這個FrontEnd接收SayHi消息後開始工做,不過目前尚未定義返回的消息類型。BackEnd接到MakeHello類型消息後返回response類型消息。從這個角度來說,返回的對方actor必須可以處理Response類型的消息。spa

咱們試試實現這個FrontEnd actor:scala

object FrontEnd { sealed trait FrontMessages case class SayHi(who: String) extends FrontMessages def apply(backEnd: ActorRef[BackEnd.BackMessages]): Behavior[FrontMessages] = { Behaviors.receive { (ctx,msg) => msg match { case SayHi(who) => ctx.log.info("requested to say hi to {}", who) backEnd ! BackEnd.MakeHello(who, ???) } } }

MakeHello須要一個replyTo,應該是什麼呢?不過它必定是能夠處理Response類型消息的actor。但咱們知道這個replyTo就是FrontEnd,不過FrontEnd只能處理FrontMessages類型消息,應該怎麼辦呢?可不能夠把replyTo直接寫成FrontEnd呢?雖然能夠這麼作,但這個MakeHello消息就只能跟FrontEnd綁死了。若是其它的actor也須要用到這個MakeHello的話就須要另外定義一個了。因此,最好的解決方案就是用某種類型轉換方式來實現。以下:code

import akka.actor.typed._ import scaladsl._ object FrontEnd { sealed trait FrontMessages case class SayHi(who: String) extends FrontMessages case class WrappedBackEndResonse(res: BackEnd.Response) extends FrontMessages def apply(backEnd: ActorRef[BackEnd.BackMessages]): Behavior[FrontMessages] = { Behaviors.setup[FrontMessages] { ctx =>
                                                   //ctx.messageAdapter(ref => WrappedBackEndResonse(ref))
      val backEndRef: ActorRef[BackEnd.Response] = ctx.messageAdapter(WrappedBackEndResonse) Behaviors.receive { (ctx, msg) => msg match { case SayHi(who) => ctx.log.info("requested to say hi to {}", who) backEnd ! BackEnd.MakeHello(who, backEndRef) Behaviors.same //messageAdapter將BackEnd.Response轉換成WrappedBackEndResponse
          case WrappedBackEndResonse(msg) => msg match { case BackEnd.HowAreU(msg) => ctx.log.info(msg) Behaviors.same case BackEnd.Unknown => ctx.log.info("Unable to say hello") Behaviors.same } } } } } }

首先,咱們用ctx.mesageAdapter產生了ActorRef[BackEnd.Response],正是咱們須要提供給MakeHello消息的replyTo。看看這個messageAdapter函數:blog

def messageAdapter[U: ClassTag](f: U => T): ActorRef[U]

若是咱們進行類型替換U -> BackEnd.Response, T -> FrontMessage 那麼:ip

      val backEndRef: ActorRef[BackEnd.Response] = ctx.messageAdapter((response: BackEnd.Response) => WrappedBackEndResonse(response))

實際上這個messageAdapter函數在本地ActorContext範圍內登記了一個從BackEnd.Response類型到FrontMessages的轉換。把接收到的BackEnd.Response當即轉換成WrappedBackEndResponse(response)。

還有一種兩個actor之間的雙向交流模式是 1:1 request-response,即一對一模式。一對一的意思是發送方發送消息後等待迴應消息。這就意味着收信方須要在完成運算任務後當即向發信方發送迴應,不然形成發信方的超時異常。沒法避免的是,這種模式依然會涉及消息類型的轉換,以下:

object FrontEnd { sealed trait FrontMessages case class SayHi(who: String) extends FrontMessages case class WrappedBackEndResonse(res: BackEnd.Response) extends FrontMessages case class ErrorResponse(errmsg: String) extends FrontMessages def apply(backEnd: ActorRef[BackEnd.BackMessages]): Behavior[FrontMessages] = { Behaviors.setup[FrontMessages] { ctx =>
      //ask須要超時上限
 import scala.concurrent.duration._ import scala.util._ implicit val timeOut: Timeout = 3.seconds Behaviors.receive[FrontMessages] { (ctx, msg) => msg match { case SayHi(who) => ctx.log.info("requested to say hi to {}", who) ctx.ask(backEnd,(backEndRef: ActorRef[BackEnd.Response]) => BackEnd.MakeHello(who,backEndRef) ){ case Success(backResponse) => WrappedBackEndResonse(backResponse) case Failure(err) =>ErrorResponse(err.getLocalizedMessage) } Behaviors.same case WrappedBackEndResonse(msg) => msg match { case BackEnd.HowAreU(msg) => ctx.log.info(msg) Behaviors.same case BackEnd.Unknown => ctx.log.info("Unable to say hello") Behaviors.same } case ErrorResponse(errmsg) => ctx.log.info("ask error: {}",errmsg) Behaviors.same } } } } }

彷佛類型轉換是在ask裏實現的,看看這個函數:

  def ask[Req, Res](target: RecipientRef[Req], createRequest: ActorRef[Res] => Req)( mapResponse: Try[Res] => T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit

req -> BackEnd.BackMessages, res -> BackEnd.Response, T -> FrontMessages。如今ask能夠寫成下面這樣:

 

 ctx.ask[BackEnd.BackMessages,BackEnd.Response](backEnd, (backEndRef: ActorRef[BackEnd.Response]) => BackEnd.MakeHello(who,backEndRef) ){ case Success(backResponse:BackEnd.Response) => WrappedBackEndResonse(backResponse) case Failure(err) =>ErrorResponse(err.getLocalizedMessage) }

 

這樣看起來更明白點,也就是說ask把接收的BackEnd.Response轉換成了FrontEnd處理的消息類型WrappedBackEndRespnse,也就是FrontMessages

還有一種ask模式是在actor以外進行的,以下:

object AskDemo extends App { import akka.actor.typed.scaladsl.AskPattern._ import scala.concurrent._ import scala.concurrent.duration._ import akka.util._ import scala.util._ implicit val system: ActorSystem[BackEnd.BackMessages] = ActorSystem(BackEnd(), "front-app") // asking someone requires a timeout if the timeout hits without response // the ask is failed with a TimeoutException
  implicit val timeout: Timeout = 3.seconds val result: Future[BackEnd.Response] = system.asInstanceOf[ActorRef[BackEnd.BackMessages]] .ask[BackEnd.Response]((ref: ActorRef[BackEnd.Response]) => BackEnd.MakeHello("John", ref)) // the response callback will be executed on this execution context
  implicit val ec = system.executionContext result.onComplete { case Success(res)  => res match { case BackEnd.HowAreU(msg) => println(msg) case BackEnd.Unknown => println("Unable to say hello") } case Failure(ex)  => println(s"error: ${ex.getMessage}") } system.terminate() }

 這個ask是在akka.actor.typed.scaladsl.AskPattern包裏。函數款式以下:

   def ask[Res](replyTo: ActorRef[Res] => Req)(implicit timeout: Timeout, scheduler: Scheduler): Future[Res]

 

向ask傳入一個函數ActorRef[BackEnd.Response] => BackEnd.BackMessages,而後返回Future[BackEnd.Response]。這個模式中接收回複方是在ActorContext以外,不存在消息截獲機制,因此不涉及消息類型的轉換。

另外一種單actor雙向消息交換模式,即本身ask本身。在ActorContext內向本身發送消息並提供迴應消息的接收,如pipeToSelf:

 

object PipeFutureTo { trait CustomerDataAccess { def update(value: Customer): Future[Done] } final case class Customer(id: String, version: Long, name: String, address: String) object CustomerRepository { sealed trait Command final case class Update(value: Customer, replyTo: ActorRef[UpdateResult]) extends Command sealed trait UpdateResult final case class UpdateSuccess(id: String) extends UpdateResult final case class UpdateFailure(id: String, reason: String) extends UpdateResult private final case class WrappedUpdateResult(result: UpdateResult, replyTo: ActorRef[UpdateResult]) extends Command private val MaxOperationsInProgress = 10 def apply(dataAccess: CustomerDataAccess): Behavior[Command] = { Behaviors.setup[Command] { ctx =>
          implicit val dispatcher =  ctx.system.dispatchers.lookup(DispatcherSelector.fromConfig("my-dispatcher")) next(dataAccess, operationsInProgress = 0) } } private def next(dataAccess: CustomerDataAccess, operationsInProgress: Int)(implicit ec: ExecutionContextExecutor): Behavior[Command] = { Behaviors.receive { (context, command) => command match { case Update(value, replyTo) =>
            if (operationsInProgress == MaxOperationsInProgress) { replyTo ! UpdateFailure(value.id, s"Max $MaxOperationsInProgress concurrent operations supported") Behaviors.same } else { val futureResult = dataAccess.update(value) context.pipeToSelf(futureResult) { // map the Future value to a message, handled by this actor
                case Success(_) => WrappedUpdateResult(UpdateSuccess(value.id), replyTo) case Failure(e) => WrappedUpdateResult(UpdateFailure(value.id, e.getMessage), replyTo) } // increase operationsInProgress counter
              next(dataAccess, operationsInProgress + 1) } case WrappedUpdateResult(result, replyTo) =>
            // send result to original requestor
            replyTo ! result // decrease operationsInProgress counter
            next(dataAccess, operationsInProgress - 1) } } } } }
相關文章
相關標籤/搜索