前面提到過,akka-typed中較重要的改變是加入了EventSourcedBehavior。也就是說增長了一種專門負責EventSource模式的actor, 最終和其它種類的actor一道能夠完美實現CQRS。新的actor,我仍是把它稱爲persistentActor,仍是一種能維護和維持運行狀態的actor。即,actor內部狀態能夠存放在數據庫裏,而後經過一組功能函數來提供對狀態的處理轉變,即持續化處理persistence。固然做爲一種具有EventSourcedBehavior的actor, 廣泛應有的actor屬性、方法、消息處理協議、監管什麼的都還必須存在。在這篇討論裏咱們就經過案例和源碼來講明一下EventSourcedBehavior是如何維護內部狀態及做爲一種actor又應該怎麼去使用它。java
咱們把上一篇討論裏購物車的例子拿來用,再增長一些消息回覆response機制,主要是彙報購物車狀態:數據庫
object ItemInfo { case class Item(name: String, price: Double) } object MyCart { import ItemInfo._ sealed trait Command sealed trait Event extends CborSerializable sealed trait Response //commands
case class AddItem(item: Item) extends Command case object PayCart extends Command case class CountItems(replyTo: ActorRef[Response]) extends Command //event
case class ItemAdded(item: Item) extends Event case object CartPaid extends Event //state
case class CartLoad(load: List[Item] = Nil) //response
case class PickedItems(items: List[Item]) extends Response case object CartEmpty extends Response val commandHandler: (CartLoad, Command) => Effect[Event,CartLoad] = { (state, cmd) => cmd match { case AddItem(item) => Effect.persist(ItemAdded(item)) case PayCart => Effect.persist(CartPaid) case CountItems(replyTo) => Effect.none.thenRun { cart => cart.load match { case Nil => replyTo ! CartEmpty case listOfItems => replyTo ! PickedItems(listOfItems) } } } } val eventHandler: (CartLoad,Event) => CartLoad = { (state,evt) => evt match { case ItemAdded(item) => state.copy(load = item :: state.load) case CartPaid => state.copy(load = Nil) } } def apply(): Behavior[Command] = EventSourcedBehavior[Command,Event,CartLoad]( persistenceId = PersistenceId("10","1013"), emptyState = CartLoad(), commandHandler = commandHandler, eventHandler = eventHandler ) } object Shopper { import ItemInfo._ sealed trait Command extends CborSerializable case class GetItem(item: Item) extends Command case object Settle extends Command case object GetCount extends Command case class WrappedResponse(res: MyCart.Response) extends Command def apply(): Behavior[Command] = Behaviors.setup[Command] { ctx => val shoppingCart = ctx.spawn(MyCart(), "shopping-cart") val cartRef: ActorRef[MyCart.Response] = ctx.messageAdapter(WrappedResponse) Behaviors.receiveMessage { msg => msg match { case GetItem(item) => shoppingCart ! MyCart.AddItem(item) case Settle => shoppingCart ! MyCart.PayCart case GetCount => shoppingCart ! MyCart.CountItems(cartRef) case WrappedResponse(res) => res match { case MyCart.PickedItems(items) => ctx.log.info("**************Current Items in Cart: {}*************", items) case MyCart.CartEmpty => ctx.log.info("**************shopping cart is empty!***************") } } Behaviors.same } } } object ShoppingCart extends App { import ItemInfo._ val shopper = ActorSystem(Shopper(),"shopper") shopper ! Shopper.GetItem(Item("banana",11.20)) shopper ! Shopper.GetItem(Item("watermelon",4.70)) shopper ! Shopper.GetCount shopper ! Shopper.Settle shopper ! Shopper.GetCount scala.io.StdIn.readLine() shopper.terminate() }
實際上EventSourcedBehavior裏還嵌入了回覆機制,完成一項Command處理後必須回覆指令方,不然程序沒法經過編譯。以下:安全
private def withdraw(acc: OpenedAccount, cmd: Withdraw): ReplyEffect[Event, Account] = { if (acc.canWithdraw(cmd.amount)) Effect.persist(Withdrawn(cmd.amount)).thenReply(cmd.replyTo)(_ => Confirmed) else Effect.reply(cmd.replyTo)(Rejected(s"Insufficient balance ${acc.balance} to be able to withdraw ${cmd.amount}")) }
不過這個回覆機制是一種反作用。即,串連在Effect產生以後當即實施。這個動做是在eventHandler以前。在這個時段沒法回覆最新的狀態。app
說到side-effect, 如Effect.persist().thenRun(produceSideEffect): 當成功持續化event後能夠安心進行一些其它的操做。例如,當影響庫存數的event被persist後能夠立刻從帳上扣減庫存。dom
在上面這個ShoppingCart例子裏咱們沒有發現狀態轉換代碼如Behaviors.same。這隻能是EventSourcedBehavior屬於更高層次的Behavior,狀態轉換已經嵌入在eventHandler裏了,還記着這個函數的款式吧 (State,Event) => State, 這個State就是狀態了。ide
Events persist在journal裏,若是persist操做中journal出現異常,EventSourcedBehavior自備了安全監管策略,以下:函數
def apply(): Behavior[Command] = EventSourcedBehavior[Command,Event,CartLoad]( persistenceId = PersistenceId("10","1013"), emptyState = CartLoad(), commandHandler = commandHandler, eventHandler = eventHandler ).onPersistFailure( SupervisorStrategy .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1) .withMaxRestarts(3) .withResetBackoffAfter(10.seconds))
值得注意的是:這個策略只適用於onPersistFailure(),從外部用Behaviors.supervisor()包嵌是沒法實現處理PersistFailure效果的。但整個actor仍是須要一種Backoff策略,由於在EventSourcedBehavior內部commandHandler,eventHandler裏可能也會涉及一些數據庫操做。在操做失敗後須要某種Backoff重啓策略。那麼咱們能夠爲actor增長監控策略以下:ui
def apply(): Behavior[Command] = Behaviors.supervise( Behaviors.setup { ctx => EventSourcedBehavior[Command, Event, CartLoad]( persistenceId = PersistenceId("10", "1013"), emptyState = CartLoad(), commandHandler = commandHandler, eventHandler = eventHandler ).onPersistFailure( SupervisorStrategy .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1) .withMaxRestarts(3) .withResetBackoffAfter(10.seconds)) } ).onFailure( SupervisorStrategy .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1) .withMaxRestarts(3) .withResetBackoffAfter(10.seconds) )
如今這個MyCart能夠說已是個安全、強韌性的actor了。this
既然是一種persistentActor,那麼持久化的管理應該也算是核心功能了。EventSourcedBehavior經過接收信號提供了對持久化過程監控功能,如:spa
def apply(): Behavior[Command] = Behaviors.supervise( Behaviors.setup[Command] { ctx => EventSourcedBehavior[Command, Event, CartLoad]( persistenceId = PersistenceId("10", "1013"), emptyState = CartLoad(), commandHandler = commandHandler, eventHandler = eventHandler ).onPersistFailure( SupervisorStrategy .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1) .withMaxRestarts(3) .withResetBackoffAfter(10.seconds) ).receiveSignal { case (state, RecoveryCompleted) => ctx.log.info("**************Recovery Completed with state: {}***************",state) case (state, SnapshotCompleted(meta)) => ctx.log.info("**************Snapshot Completed with state: {},id({},{})***************",state,meta.persistenceId, meta.sequenceNr) case (state,RecoveryFailed(err)) => ctx.log.error("recovery failed with: {}",err.getMessage) case (state,SnapshotFailed(meta,err)) => ctx.log.error("snapshoting failed with: {}",err.getMessage) } } ).onFailure( SupervisorStrategy .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1) .withMaxRestarts(3) .withResetBackoffAfter(10.seconds) )
EventSourcedBehavior.receiveSignal是個偏函數:
def receiveSignal(signalHandler: PartialFunction[(State, Signal), Unit]): EventSourcedBehavior[Command, Event, State]
下面是一個EventSourcedBehavior Signal 清單:
sealed trait EventSourcedSignal extends Signal @DoNotInherit sealed abstract class RecoveryCompleted extends EventSourcedSignal case object RecoveryCompleted extends RecoveryCompleted { def instance: RecoveryCompleted = this } final case class RecoveryFailed(failure: Throwable) extends EventSourcedSignal { def getFailure(): Throwable = failure } final case class SnapshotCompleted(metadata: SnapshotMetadata) extends EventSourcedSignal { def getSnapshotMetadata(): SnapshotMetadata = metadata } final case class SnapshotFailed(metadata: SnapshotMetadata, failure: Throwable) extends EventSourcedSignal { def getFailure(): Throwable = failure def getSnapshotMetadata(): SnapshotMetadata = metadata } object SnapshotMetadata { /** * @param persistenceId id of persistent actor from which the snapshot was taken. * @param sequenceNr sequence number at which the snapshot was taken. * @param timestamp time at which the snapshot was saved, defaults to 0 when unknown. * in milliseconds from the epoch of 1970-01-01T00:00:00Z. */ def apply(persistenceId: String, sequenceNr: Long, timestamp: Long): SnapshotMetadata =
new SnapshotMetadata(persistenceId, sequenceNr, timestamp) } /** * Snapshot metadata. * * @param persistenceId id of persistent actor from which the snapshot was taken. * @param sequenceNr sequence number at which the snapshot was taken. * @param timestamp time at which the snapshot was saved, defaults to 0 when unknown. * in milliseconds from the epoch of 1970-01-01T00:00:00Z. */ final class SnapshotMetadata(val persistenceId: String, val sequenceNr: Long, val timestamp: Long) { override def toString: String = s"SnapshotMetadata($persistenceId,$sequenceNr,$timestamp)" } final case class DeleteSnapshotsCompleted(target: DeletionTarget) extends EventSourcedSignal { def getTarget(): DeletionTarget = target } final case class DeleteSnapshotsFailed(target: DeletionTarget, failure: Throwable) extends EventSourcedSignal { def getFailure(): Throwable = failure def getTarget(): DeletionTarget = target } final case class DeleteEventsCompleted(toSequenceNr: Long) extends EventSourcedSignal { def getToSequenceNr(): Long = toSequenceNr } final case class DeleteEventsFailed(toSequenceNr: Long, failure: Throwable) extends EventSourcedSignal { def getFailure(): Throwable = failure def getToSequenceNr(): Long = toSequenceNr }
固然,EventSourcedBehavior之因此能具有自我修復能力其中一項是由於它有對持久化的事件重演機制。若是每次啓動都須要對全部歷史事件進行重演的話會很不現實。必須用snapshot來濃縮歷史事件:
def apply(): Behavior[Command] = Behaviors.supervise( Behaviors.setup[Command] { ctx => EventSourcedBehavior[Command, Event, CartLoad]( persistenceId = PersistenceId("10", "1013"), emptyState = CartLoad(), commandHandler = commandHandler, eventHandler = eventHandler ).onPersistFailure( SupervisorStrategy .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1) .withMaxRestarts(3) .withResetBackoffAfter(10.seconds) ).receiveSignal { case (state, RecoveryCompleted) => ctx.log.info("**************Recovery Completed with state: {}***************",state) case (state, SnapshotCompleted(meta)) => ctx.log.info("**************Snapshot Completed with state: {},id({},{})***************",state,meta.persistenceId, meta.sequenceNr) case (state,RecoveryFailed(err)) => ctx.log.error("recovery failed with: {}",err.getMessage) case (state,SnapshotFailed(meta,err)) => ctx.log.error("snapshoting failed with: {}",err.getMessage) }.snapshotWhen { case (state,CartPaid,seqnum) => ctx.log.info("*****************snapshot taken at: {} with state: {}",seqnum,state) true
case (state,event,seqnum) => false }.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 2)) } ).onFailure( SupervisorStrategy .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1) .withMaxRestarts(3) .withResetBackoffAfter(10.seconds) )
下面是本次示範的源碼:
build.sbt
name := "learn-akka-typed" version := "0.1" scalaVersion := "2.13.1" scalacOptions in Compile ++= Seq("-deprecation", "-feature", "-unchecked", "-Xlog-reflective-calls", "-Xlint") javacOptions in Compile ++= Seq("-Xlint:unchecked", "-Xlint:deprecation") val AkkaVersion = "2.6.5" val AkkaPersistenceCassandraVersion = "1.0.0" libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-cluster-sharding-typed" % AkkaVersion, "com.typesafe.akka" %% "akka-persistence-typed" % AkkaVersion, "com.typesafe.akka" %% "akka-persistence-query" % AkkaVersion, "com.typesafe.akka" %% "akka-serialization-jackson" % AkkaVersion, "com.typesafe.akka" %% "akka-persistence-cassandra" % AkkaPersistenceCassandraVersion, "com.typesafe.akka" %% "akka-slf4j" % AkkaVersion, "ch.qos.logback" % "logback-classic" % "1.2.3" )
application.conf
akka.actor.allow-java-serialization = on akka { loglevel = DEBUG actor { serialization-bindings { "com.learn.akka.CborSerializable" = jackson-cbor } } # use Cassandra to store both snapshots and the events of the persistent actors persistence { journal.plugin = "akka.persistence.cassandra.journal" snapshot-store.plugin = "akka.persistence.cassandra.snapshot" } } akka.persistence.cassandra { # don't use autocreate in production journal.keyspace = "poc" journal.keyspace-autocreate = on journal.tables-autocreate = on snapshot.keyspace = "poc_snapshot" snapshot.keyspace-autocreate = on snapshot.tables-autocreate = on } datastax-java-driver { basic.contact-points = ["192.168.11.189:9042"] basic.load-balancing-policy.local-datacenter = "datacenter1" }
ShoppingCart.scala
package com.learn.akka import akka.actor.typed._ import akka.persistence.typed._ import akka.actor.typed.scaladsl.Behaviors import akka.persistence.typed.scaladsl._ import scala.concurrent.duration._ object ItemInfo { case class Item(name: String, price: Double) } object MyCart { import ItemInfo._ sealed trait Command sealed trait Event extends CborSerializable sealed trait Response //commands case class AddItem(item: Item) extends Command case object PayCart extends Command case class CountItems(replyTo: ActorRef[Response]) extends Command //event case class ItemAdded(item: Item) extends Event case object CartPaid extends Event //state case class CartLoad(load: List[Item] = Nil) //response case class PickedItems(items: List[Item]) extends Response case object CartEmpty extends Response val commandHandler: (CartLoad, Command) => Effect[Event,CartLoad] = { (state, cmd) => cmd match { case AddItem(item) => Effect.persist(ItemAdded(item)) case PayCart => Effect.persist(CartPaid) case CountItems(replyTo) => Effect.none.thenRun { cart => cart.load match { case Nil => replyTo ! CartEmpty case listOfItems => replyTo ! PickedItems(listOfItems) } } } } val eventHandler: (CartLoad,Event) => CartLoad = { (state,evt) => evt match { case ItemAdded(item) => state.copy(load = item :: state.load) case CartPaid => state.copy(load = Nil) } } def apply(): Behavior[Command] = Behaviors.supervise( Behaviors.setup[Command] { ctx => EventSourcedBehavior[Command, Event, CartLoad]( persistenceId = PersistenceId("10", "1013"), emptyState = CartLoad(), commandHandler = commandHandler, eventHandler = eventHandler ).onPersistFailure( SupervisorStrategy .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1) .withMaxRestarts(3) .withResetBackoffAfter(10.seconds) ).receiveSignal { case (state, RecoveryCompleted) => ctx.log.info("**************Recovery Completed with state: {}***************",state) case (state, SnapshotCompleted(meta)) => ctx.log.info("**************Snapshot Completed with state: {},id({},{})***************",state,meta.persistenceId, meta.sequenceNr) case (state,RecoveryFailed(err)) => ctx.log.error("recovery failed with: {}",err.getMessage) case (state,SnapshotFailed(meta,err)) => ctx.log.error("snapshoting failed with: {}",err.getMessage) }.snapshotWhen { case (state,CartPaid,seqnum) => ctx.log.info("*****************snapshot taken at: {} with state: {}",seqnum,state) true case (state,event,seqnum) => false }.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 2)) } ).onFailure( SupervisorStrategy .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1) .withMaxRestarts(3) .withResetBackoffAfter(10.seconds) ) } object Shopper { import ItemInfo._ sealed trait Command extends CborSerializable case class GetItem(item: Item) extends Command case object Settle extends Command case object GetCount extends Command case class WrappedResponse(res: MyCart.Response) extends Command def apply(): Behavior[Command] = Behaviors.setup[Command] { ctx => val shoppingCart = ctx.spawn(MyCart(), "shopping-cart") val cartRef: ActorRef[MyCart.Response] = ctx.messageAdapter(WrappedResponse) Behaviors.receiveMessage { msg => msg match { case GetItem(item) => shoppingCart ! MyCart.AddItem(item) case Settle => shoppingCart ! MyCart.PayCart case GetCount => shoppingCart ! MyCart.CountItems(cartRef) case WrappedResponse(res) => res match { case MyCart.PickedItems(items) => ctx.log.info("**************Current Items in Cart: {}*************", items) case MyCart.CartEmpty => ctx.log.info("**************shopping cart is empty!***************") } } Behaviors.same } } } object ShoppingCart extends App { import ItemInfo._ val shopper = ActorSystem(Shopper(),"shopper") shopper ! Shopper.GetItem(Item("banana",11.20)) shopper ! Shopper.GetItem(Item("watermelon",4.70)) shopper ! Shopper.GetCount shopper ! Shopper.Settle shopper ! Shopper.GetCount scala.io.StdIn.readLine() shopper.terminate() }