akka-typed(0) - typed-actor, typed messages

   akka 2.6.x正式發佈以來已經有好一段時間了。核心變化是typed-actor的正式啓用,固然persistence,cluster等模塊也有較大變化。一開始從名稱估摸就是把傳統any類型的消息改爲強類型消息,因此想拖一段時間看看到底能對咱們現有基於akka-classic的應用軟件有什麼深層次的影響。不過最近考慮的一些系統架構逼的我不得不當即開始akka-typed的調研,也就是說akka-classic已經沒法或者很困難去實現新的系統架構,且聽我道來:最近在考慮一個微服務中臺。做爲後臺數據服務調用的惟一入口,平臺應該是個分佈式軟件,那麼採用akka-cluster目前是惟一的選擇,畢竟前期搞過不少基於akka-cluster的應用軟件。可是,akka-cluster-sharding只能支持一種entity actor。畢竟,因爲akka-classic的消息是沒有類型的,只能在收到消息後再經過類型模式匹配的方式肯定應該運行的代碼。因此,這個actor必須包括全部的業務邏輯處理運算。也就是說對於一個大型應用來講這就是一塊巨型代碼。還有,若是涉及到維護actor狀態的話,好比persistenceActor,或者綜合類型業務運算,那麼又須要多少種類的數據結構,又怎樣去維護、管理這些結構呢?對我來講這基本上是mission-impossible。實際上logom應該正符合這個中臺的要求:cluster-sharding, CQRS... 抱着一種好奇的心態瞭解了一下lagom源碼,突然恍然大悟:這個東西是基於akka-typed的!想一想看也是:若是咱們能夠把actor和消息類型綁在一塊兒,那麼咱們就能夠經過消息類型對應到某種actor。也就是說基於akka-typed,咱們能夠把綜合性的業務劃分紅多個actor模塊,而後咱們能夠指定那種actor作那些事情。固然,通過了功能細分,actor的設計也簡單了許多。如今這個新的中臺能夠實現前臺應用直接調用對應的actor處理業務了。不用多想了,這注定就是akka應用的未來,還等什麼呢?數據結構

先從一個最簡單的hello程序開始吧:基本上是兩個actor相互交換消息。先用第一個來示範標準的actor構建過程:架構

 

  object HelloActor { sealed trait Request case class Greeting(fromWhom: String, replyTo: ActorRef[Greeter.Greeted]) extends Request def apply(): Behavior[Greeting] = { Behaviors.receive { (ctx, greeter) => ctx.log.info("receive greeting from {}", greeter.fromWhom) greeter.replyTo ! Greeter.Greeted(s"hello ${greeter.fromWhom}!") Behaviors.same } } }

 

akka-typed的actor構建是經過定義它的Behavior行爲實現的。特別的是類型參數Behavior[Greeting],表明這個actor只處理Greeting類型的消息,於是是個typed-actor。akka-typed已經不支持sender()了,在消息裏自帶,如Greeting.replyTo。Behavior定義是經過工廠模式Behaviors實現的,看看Behaviors的定義:app

 

/** * Factories for [[akka.actor.typed.Behavior]]. */
object Behaviors { def setup[T](factory: ActorContext[T] => Behavior[T]): Behavior[T] def withStash[T](capacity: Int)(factory: StashBuffer[T] => Behavior[T]): Behavior[T] def same[T]: Behavior[T] def unhandled[T]: Behavior[T] def stopped[T]: Behavior[T] def stopped[T](postStop: () => Unit): Behavior[T] def empty[T]: Behavior[T] def ignore[T]: Behavior[T] def receive[T](onMessage: (ActorContext[T], T) => Behavior[T]): Receive[T] def receiveMessage[T](onMessage: T => Behavior[T]): Receive[T] def receivePartial[T](onMessage: PartialFunction[(ActorContext[T], T), Behavior[T]]): Receive[T] def receiveMessagePartial[T](onMessage: PartialFunction[T, Behavior[T]]): Receive[T] def receiveSignal[T](handler: PartialFunction[(ActorContext[T], Signal), Behavior[T]]): Behavior[T] def supervise[T](wrapped: Behavior[T]): Supervise[T] def withTimers[T](factory: TimerScheduler[T] => Behavior[T]): Behavior[T] ... }

上面的構建函數除返回Behavior[T]外還有Receive[T]和Supervise[T],這兩個類型是什麼?它們仍是Behavior[T]:分佈式

 trait Receive[T] extends Behavior[T] { def receiveSignal(onSignal: PartialFunction[(ActorContext[T], Signal), Behavior[T]]): Behavior[T] } def supervise[T](wrapped: Behavior[T]): Supervise[T] =
    new Supervise[T](wrapped) private final val ThrowableClassTag = ClassTag(classOf[Throwable]) final class Supervise[T] private[akka] (val wrapped: Behavior[T]) extends AnyVal { /** Specify the [[SupervisorStrategy]] to be invoked when the wrapped behavior throws. */ def onFailure[Thr <: Throwable: ClassTag](strategy: SupervisorStrategy): Behavior[T] = { val tag = classTag[Thr] val effectiveTag = if (tag == ClassTag.Nothing) ThrowableClassTag else tag Supervisor(Behavior.validateAsInitial(wrapped), strategy)(effectiveTag) } }

注意,Supervise.onFailure返回了Behavior[T]。函數

helloActor的Behavior是經過Behaviors.receive構建的。還能夠用setup,receiveMessage來構建。注意:構建函數的入參數也是Behavior[T],因此這些構造器能夠一層層嵌套着使用。setup,receive爲函數內層提供了ActorContext, withTimers提供TimerScheduler[T]。那麼我能夠把HelloActor的功能再完善點,加個監管策略SupervisorStrategy:微服務

  object HelloActor { sealed trait Request case class Greeting(fromWhom: String, replyTo: ActorRef[Greeter.Greeted]) extends Request def apply(): Behavior[Greeting] = { Behaviors.supervise( Behaviors.receive[Greeting] { (ctx, greeter) => ctx.log.info("receive greeting from {}", greeter.fromWhom) greeter.replyTo ! Greeter.Greeted(s"hello ${greeter.fromWhom}!") Behaviors.same } ).onFailure(SupervisorStrategy.restartWithBackoff(10.seconds, 1.minute, 0.20)) } }

在akka-typed裏,actor監管已經從父輩轉到自身。再就是增長了BackOff-SupervisorStrategy,不須要獨立的BackOffSupervisor actor了。post

再看看另外一個Greeter:ui

 object Greeter { sealed trait Response case class Greeted(hello: String) extends Response def apply(): Behavior[Greeted] = { Behaviors.setup ( ctx => Behaviors.receiveMessage { message => ctx.log.info(message.hello) Behaviors.same } ) } }

這個跟HelloActor沒什麼不一樣,不過用了setup,receiveMessage套裝。值得注意的是Greeter負責處理Greeted消息,這是一個不帶sender ActorRef的類型,意味着處理這類消息後不須要答覆消息發送者。spa

而後還須要一個actor來構建上面兩個actor實例,啓動對話:scala

 object GreetStarter { sealed trait Command case class SayHiTo(whom: String) extends Command case class RepeatedGreeting(whom: String, interval: FiniteDuration) extends Command def apply(): Behavior[Command] = { Behaviors.setup[Command] { ctx => val helloActor = ctx.spawn(HelloActor(), "hello-actor") val greeter = ctx.spawn(Greeter(), "greeter") Behaviors.withTimers { timer =>
          new GreetStarter( helloActor,greeter,ctx,timer) .repeatGreeting(1,3) } } } } class GreetStarter private ( helloActor: ActorRef[HelloActor.Greeting], greeter: ActorRef[Greeter.Greeted], ctx: ActorContext[GreetStarter.Command], timer: TimerScheduler[GreetStarter.Command]){ import GreetStarter._ private def repeatGreeting(count: Int, max: Int): Behavior[Command] = Behaviors.receiveMessage { msg => msg match { case RepeatedGreeting(whom, interval) => ctx.log.info2("start greeting to {} with interval {}", whom, interval) timer.startSingleTimer(SayHiTo(whom), interval) Behaviors.same case SayHiTo(whom) => ctx.log.info2("{}th time greeting to {}",count,whom) if (max == count) Behaviors.stopped else { helloActor ! HelloActor.Greeting(whom, greeter) repeatGreeting(count + 1, max) } } } }

上面這個例子有點複雜,邏輯也有些問題,主要是爲了示範一種函數式actor構建模式及actor狀態轉換虛構出來的。akka-typed已經再也不支持become方法了。

最後,須要一個至關於main這麼一個頂層的程序:

 def main(args: Array[String]) { val man: ActorSystem[GreetStarter.Command] = ActorSystem(GreetStarter(), "greetDemo") man ! GreetStarter.RepeatedGreeting("Tiger",5.seconds) man ! GreetStarter.RepeatedGreeting("Peter",5.seconds) man ! GreetStarter.RepeatedGreeting("Susanna",5.seconds) }

akka-classic的頂級actor,即: /users是由系統默認建立的。akka-typed須要用戶提供這個頂層actor。這個是在ActorSystem的第一個參數指定的。咱們再看看akka-typed的ActorSystem的構建函數:

object ActorSystem { /** * Scala API: Create an ActorSystem */ def apply[T](guardianBehavior: Behavior[T], name: String): ActorSystem[T] = createInternal(name, guardianBehavior, Props.empty, ActorSystemSetup.create(BootstrapSetup())) /** * Scala API: Create an ActorSystem */ def apply[T](guardianBehavior: Behavior[T], name: String, config: Config): ActorSystem[T] = createInternal(name, guardianBehavior, Props.empty, ActorSystemSetup.create(BootstrapSetup(config))) /** * Scala API: Create an ActorSystem */ def apply[T](guardianBehavior: Behavior[T], name: String, config: Config, guardianProps: Props): ActorSystem[T] = createInternal(name, guardianBehavior, guardianProps, ActorSystemSetup.create(BootstrapSetup(config))) ... }

其中一個apply與akka-classic的ActorSystem構建方式很類似:

 def main(args: Array[String]) { val config = ConfigFactory.load("application.conf") val man: ActorSystem[GreetStarter.Command] = ActorSystem(GreetStarter(), "greetDemo",config) man ! GreetStarter.RepeatedGreeting("Tiger",5.seconds) man ! GreetStarter.RepeatedGreeting("Peter",5.seconds) man ! GreetStarter.RepeatedGreeting("Susanna",5.seconds) }

下面是本次討論的完整源代碼:

build.sbt

name := "learn-akka-typed"

version := "0.1"

scalaVersion := "2.13.2"

lazy val akkaVersion = "2.6.5"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-actor-typed"            % akkaVersion,
  "ch.qos.logback"     % "logback-classic"             % "1.2.3"
)

fork in Test := true

Lesson01.scala

import akka.actor.typed._
import scaladsl._
import scala.concurrent.duration._
import com.typesafe.config._
object Lesson01 {

  object HelloActor {
    sealed trait Request
    case class Greeting(fromWhom: String, replyTo: ActorRef[Greeter.Greeted]) extends Request

    def apply(): Behavior[Greeting] = {
      Behaviors.supervise(
        Behaviors.receive[Greeting] { (ctx, greeter) =>
          ctx.log.info("receive greeting from {}", greeter.fromWhom)
          greeter.replyTo ! Greeter.Greeted(s"hello ${greeter.fromWhom}!")
          Behaviors.same
        }
      ).onFailure(SupervisorStrategy.restartWithBackoff(10.seconds, 1.minute, 0.20))
    }
  }

  object Greeter {

    sealed trait Response
    case class Greeted(hello: String) extends Response

    def apply(): Behavior[Greeted] = {
      Behaviors.setup ( ctx =>
        Behaviors.receiveMessage { message =>
          ctx.log.info(message.hello)
          Behaviors.same
        }
      )
    }
  }

  object GreetStarter {
    sealed trait Command
    case class SayHiTo(whom: String) extends Command
    case class RepeatedGreeting(whom: String, interval: FiniteDuration) extends Command

    def apply(): Behavior[Command] = {
      Behaviors.setup[Command] { ctx =>
        val helloActor = ctx.spawn(HelloActor(), "hello-actor")
        val greeter = ctx.spawn(Greeter(), "greeter")
        Behaviors.withTimers { timer =>
          new GreetStarter(
            helloActor,greeter,ctx,timer)
            .repeatGreeting(1,3)
        }
      }
    }
  }
  class GreetStarter private (
     helloActor: ActorRef[HelloActor.Greeting],
     greeter: ActorRef[Greeter.Greeted],
     ctx: ActorContext[GreetStarter.Command],
     timer: TimerScheduler[GreetStarter.Command]){
    import GreetStarter._

    private def repeatGreeting(count: Int, max: Int): Behavior[Command] =
       Behaviors.receiveMessage { msg =>
         msg match {
           case RepeatedGreeting(whom, interval) =>
             ctx.log.info2("start greeting to {} with interval {}", whom, interval)
             timer.startSingleTimer(SayHiTo(whom), interval)
             Behaviors.same
           case SayHiTo(whom) =>
             ctx.log.info2("{}th time greeting to {}",count,whom)
             if (max == count)
               Behaviors.stopped
             else {
               helloActor ! HelloActor.Greeting(whom, greeter)
               repeatGreeting(count + 1, max)
             }
         }
       }
  }


  def main(args: Array[String]) {
    val config = ConfigFactory.load("application.conf")
    val man: ActorSystem[GreetStarter.Command] = ActorSystem(GreetStarter(), "greetDemo",config)
    man ! GreetStarter.RepeatedGreeting("Tiger",5.seconds)
    man ! GreetStarter.RepeatedGreeting("Peter",5.seconds)
    man ! GreetStarter.RepeatedGreeting("Susanna",5.seconds)
  }
}
相關文章
相關標籤/搜索