akka-typed中已經沒有PersistentActor了。取而代之的是帶有EventSourcedBehavior的actor,也就是一種專門支持EventSource模式的actor。EventSource的原理和做用在以前的博客裏已經有了比較詳細的介紹,這裏就再也不重複了。本篇直接從EventsourcedBehavior actor的具體應用開始介紹。支持EventSource應用的基本數據類型包括 指令Command, 事件Event,狀態State。EventSourcing其實就是一個有限狀態機fsm finite-state-machine,執行Command,產生Event,改變State,終而復始。下面是一個簡單的EventSource類型定義:java
trait CborSerializable {} object Cart { case class Item(name: String, price: Double) sealed trait Command extends CborSerializable sealed trait Event //commands
case class AddItem(item: Item) extends Command case object PayCart extends Command //event
case class ItemAdded(item: Item) extends Event case object CartPaid extends Event //state
case class CartLoad(load: Set[Item] = Set.empty) val commandHandler: (CartLoad, Command) => Effect[Event,CartLoad] = { (state, cmd) => cmd match { case AddItem(item) => Effect.persist(ItemAdded(item)) case PayCart => Effect.persist(CartPaid) } } val eventHandler: (CartLoad,Event) => CartLoad = { (state,evt) => evt match { case ItemAdded(item) => val sts = state.copy(load = state.load+item) println(s"current cart loading: ${sts}") sts case CartPaid => val sts = state.copy(load = Set.empty) println(s"current cart loading: ${sts.load}") sts } } def apply(): Behavior[Command] = EventSourcedBehavior[Command,Event,CartLoad]( persistenceId = PersistenceId("10","1012"), emptyState = CartLoad(), commandHandler = commandHandler, eventHandler = eventHandler ) } object EventSource extends App { import Cart._ val cart = ActorSystem(Cart(),"shopping-cart") cart ! Cart.AddItem(Item("banana",11.20)) cart ! Cart.AddItem(Item("watermelon",4.70)) scala.io.StdIn.readLine() cart.terminate() }
首先要搞清楚幾件事:EvenSourcedBehavior定義了一個actor。從Behavior[Command]這個結果類型來看,這個actor能夠接收並處理Command類型的消息。既然是個actor那麼應該具有了receiveMessage,receiveSignal這兩項基本能力,但咱們又不用本身來定義這些功能。怎麼回事呢?看看EventSourcedBehavior的源代碼吧:app
object EventSourcedBehavior { ... def apply[Command, Event, State]( persistenceId: PersistenceId, emptyState: State, commandHandler: (State, Command) => Effect[Event, State], eventHandler: (State, Event) => State): EventSourcedBehavior[Command, Event, State] = { val loggerClass = LoggerClass.detectLoggerClassFromStack(classOf[EventSourcedBehavior[_, _, _]], logPrefixSkipList) EventSourcedBehaviorImpl(persistenceId, emptyState, commandHandler, eventHandler, loggerClass) } ... }
這個EventSourcedBehavior就是某種Behavior。它的全部特殊功能看來應該是在EventSourcedBehaviorsImpl裏實現的:ide
private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( persistenceId: PersistenceId, emptyState: State, commandHandler: EventSourcedBehavior.CommandHandler[Command, Event, State], eventHandler: EventSourcedBehavior.EventHandler[State, Event], loggerClass: Class[_], ... ) extends EventSourcedBehavior[Command, Event, State] { ... Behaviors .supervise { Behaviors.setup[Command] { _ => val eventSourcedSetup = new BehaviorSetup( ctx.asInstanceOf[ActorContext[InternalProtocol]], persistenceId, emptyState, commandHandler, eventHandler, WriterIdentity.newIdentity(), actualSignalHandler, tagger, eventAdapter, snapshotAdapter, snapshotWhen, recovery, retention, holdingRecoveryPermit = false, settings = settings, stashState = stashState) // needs to accept Any since we also can get messages from the journal // not part of the user facing Command protocol
def interceptor: BehaviorInterceptor[Any, InternalProtocol] = new BehaviorInterceptor[Any, InternalProtocol] { import BehaviorInterceptor._ override def aroundReceive( ctx: typed.TypedActorContext[Any], msg: Any, target: ReceiveTarget[InternalProtocol]): Behavior[InternalProtocol] = { val innerMsg = msg match { case res: JournalProtocol.Response => InternalProtocol.JournalResponse(res) case res: SnapshotProtocol.Response => InternalProtocol.SnapshotterResponse(res) case RecoveryPermitter.RecoveryPermitGranted => InternalProtocol.RecoveryPermitGranted case internal: InternalProtocol => internal // such as RecoveryTickEvent
case cmd: Command @unchecked => InternalProtocol.IncomingCommand(cmd) } target(ctx, innerMsg) } override def aroundSignal( ctx: typed.TypedActorContext[Any], signal: Signal, target: SignalTarget[InternalProtocol]): Behavior[InternalProtocol] = { if (signal == PostStop) { eventSourcedSetup.cancelRecoveryTimer() // clear stash to be GC friendly
stashState.clearStashBuffers() } target(ctx, signal) } override def toString: String = "EventSourcedBehaviorInterceptor" } Behaviors.intercept(() => interceptor)(RequestingRecoveryPermit(eventSourcedSetup)).narrow } } .onFailure[JournalFailureException](supervisionStrategy) }
EventSourcedBehaviorImpl仍是一種Behavior[Command],它又是經過一個BehaviorInterceptor實現的。BehaviorInterceptor.aroundReceive和BehaviorInterceptor.aroundSignal能夠代替receiveMessage和receiveSignal的工做,這點從這兩個函數的結果類型能夠獲得一些驗證:函數
/* @tparam Outer The outer message type – the type of messages the intercepting behavior will accept * @tparam Inner The inner message type - the type of message the wrapped behavior accepts * * @see [[BehaviorSignalInterceptor]] */
abstract class BehaviorInterceptor[Outer, Inner](val interceptMessageClass: Class[Outer]) { import BehaviorInterceptor._ ... /** * Intercept a message sent to the running actor. Pass the message on to the next behavior * in the stack by passing it to `target.apply`, return `Behaviors.same` without invoking `target` * to filter out the message. * * @return The behavior for next message or signal */ def aroundReceive(ctx: TypedActorContext[Outer], msg: Outer, target: ReceiveTarget[Inner]): Behavior[Inner] /** * Override to intercept a signal sent to the running actor. Pass the signal on to the next behavior * in the stack by passing it to `target.apply`. * * @return The behavior for next message or signal * * @see [[BehaviorSignalInterceptor]] */ def aroundSignal(ctx: TypedActorContext[Outer], signal: Signal, target: SignalTarget[Inner]): Behavior[Inner] ... }
另外,對於EventSourcedBehavior來講,收到Command, 處理Command方式應該是經過外部提供的這個commandHandler來實現纔是最值得注意的:測試
final class HandlingCommands(state: RunningState[S]) extends AbstractBehavior[InternalProtocol](setup.context) with WithSeqNrAccessible { def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = msg match { case IncomingCommand(c: C @unchecked) => onCommand(state, c) case JournalResponse(r) => onDeleteEventsJournalResponse(r, state.state) case SnapshotterResponse(r) => onDeleteSnapshotResponse(r, state.state) case get: GetState[S @unchecked] => onGetState(get) case _ => Behaviors.unhandled } override def onSignal: PartialFunction[Signal, Behavior[InternalProtocol]] = { case PoisonPill =>
if (isInternalStashEmpty && !isUnstashAllInProgress) Behaviors.stopped else new HandlingCommands(state.copy(receivedPoisonPill = true)) case signal =>
if (setup.onSignal(state.state, signal, catchAndLog = false)) this
else Behaviors.unhandled } def onCommand(state: RunningState[S], cmd: C): Behavior[InternalProtocol] = { val effect = setup.commandHandler(state.state, cmd) applyEffects(cmd, state, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast?
} ... }
上面這段代碼已經足夠說明了。根據commandHandler和eventHandler的函數類型能夠得出EventSourcedBehavior處理流程 (State, Command) => (State, Event) => new State, 最終輸出new State:ui
object EventSourcedBehavior { type CommandHandler[Command, Event, State] = (State, Command) => Effect[Event, State] type EventHandler[State, Event] = (State, Event) => State ... }
commandHandler返回Effect[Event,State]類型結果,也就是說處理Command過程就是產生Event過程,下面是Effect的各類選項:this
object Effect { /** * Persist a single event * * Side effects can be chained with `thenRun` */ def persist[Event, State](event: Event): EffectBuilder[Event, State] = Persist(event) /** * Persist multiple events * * Side effects can be chained with `thenRun` */ def persist[Event, A <: Event, B <: Event, State](evt1: A, evt2: B, events: Event*): EffectBuilder[Event, State] = persist(evt1 :: evt2 :: events.toList) /** * Persist multiple events * * Side effects can be chained with `thenRun` */ def persist[Event, State](events: im.Seq[Event]): EffectBuilder[Event, State] = PersistAll(events) /** * Do not persist anything * * Side effects can be chained with `thenRun` */ def none[Event, State]: EffectBuilder[Event, State] = PersistNothing.asInstanceOf[EffectBuilder[Event, State]] /** * This command is not handled, but it is not an error that it isn't. * * Side effects can be chained with `thenRun` */ def unhandled[Event, State]: EffectBuilder[Event, State] = Unhandled.asInstanceOf[EffectBuilder[Event, State]] /** * Stop this persistent actor * Side effects can be chained with `thenRun` */ def stop[Event, State](): EffectBuilder[Event, State] = none.thenStop() /** * Stash the current command. Can be unstashed later with [[Effect.unstashAll]]. * * Note that the stashed commands are kept in an in-memory buffer, so in case of a crash they will not be * processed. They will also be discarded if the actor is restarted (or stopped) due to that an exception was * thrown from processing a command or side effect after persisting. The stash buffer is preserved for persist * failures if a backoff supervisor strategy is defined with [[EventSourcedBehavior.onPersistFailure]]. * * Side effects can be chained with `thenRun` */ def stash[Event, State](): ReplyEffect[Event, State] = Stash.asInstanceOf[EffectBuilder[Event, State]].thenNoReply() /** * Unstash the commands that were stashed with [[Effect.stash]]. * * It's allowed to stash messages while unstashing. Those newly added * commands will not be processed by this `unstashAll` effect and have to be unstashed * by another `unstashAll`. * * @see [[EffectBuilder.thenUnstashAll]] */ def unstashAll[Event, State](): Effect[Event, State] = CompositeEffect(none.asInstanceOf[EffectBuilder[Event, State]], SideEffect.unstashAll[State]()) /** * Send a reply message to the command. The type of the * reply message must conform to the type specified by the passed replyTo `ActorRef`. * * This has the same semantics as `cmd.replyTo.tell`. * * It is provided as a convenience (reducing boilerplate) and a way to enforce that replies are not forgotten * when the `EventSourcedBehavior` is created with [[EventSourcedBehavior.withEnforcedReplies]]. When * `withEnforcedReplies` is used there will be compilation errors if the returned effect isn't a [[ReplyEffect]]. * The reply message will be sent also if `withEnforcedReplies` isn't used, but then the compiler will not help * finding mistakes. */ def reply[ReplyMessage, Event, State](replyTo: ActorRef[ReplyMessage])( replyWithMessage: ReplyMessage): ReplyEffect[Event, State] = none[Event, State].thenReply[ReplyMessage](replyTo)(_ => replyWithMessage) /** * When [[EventSourcedBehavior.withEnforcedReplies]] is used there will be compilation errors if the returned effect * isn't a [[ReplyEffect]]. This `noReply` can be used as a conscious decision that a reply shouldn't be * sent for a specific command or the reply will be sent later. */ def noReply[Event, State]: ReplyEffect[Event, State] = none.thenNoReply() }
接着用handleEvent來根據產生的Event更新State,以下:spa
@tailrec def applyEffects( msg: Any, state: RunningState[S], effect: Effect[E, S], sideEffects: immutable.Seq[SideEffect[S]] = Nil): Behavior[InternalProtocol] = { if (setup.log.isDebugEnabled && !effect.isInstanceOf[CompositeEffect[_, _]]) setup.log.debugN( s"Handled command [{}], resulting effect: [{}], side effects: [{}]", msg.getClass.getName, effect, sideEffects.size) effect match { case CompositeEffect(eff, currentSideEffects) =>
// unwrap and accumulate effects
applyEffects(msg, state, eff, currentSideEffects ++ sideEffects) case Persist(event) =>
// apply the event before persist so that validation exception is handled before persisting // the invalid event, in case such validation is implemented in the event handler. // also, ensure that there is an event handler for each single event
val newState = state.applyEvent(setup, event) val eventToPersist = adaptEvent(event) val eventAdapterManifest = setup.eventAdapter.manifest(event) val newState2 = internalPersist(setup.context, msg, newState, eventToPersist, eventAdapterManifest) val shouldSnapshotAfterPersist = setup.shouldSnapshot(newState2.state, event, newState2.seqNr) persistingEvents(newState2, state, numberOfEvents = 1, shouldSnapshotAfterPersist, sideEffects) case PersistAll(events) =>
if (events.nonEmpty) { // apply the event before persist so that validation exception is handled before persisting // the invalid event, in case such validation is implemented in the event handler. // also, ensure that there is an event handler for each single event
var seqNr = state.seqNr val (newState, shouldSnapshotAfterPersist) = events.foldLeft((state, NoSnapshot: SnapshotAfterPersist)) { case ((currentState, snapshot), event) => seqNr += 1 val shouldSnapshot =
if (snapshot == NoSnapshot) setup.shouldSnapshot(currentState.state, event, seqNr) else snapshot (currentState.applyEvent(setup, event), shouldSnapshot) } val eventsToPersist = events.map(evt => (adaptEvent(evt), setup.eventAdapter.manifest(evt))) val newState2 = internalPersistAll(setup.context, msg, newState, eventsToPersist) persistingEvents(newState2, state, events.size, shouldSnapshotAfterPersist, sideEffects) } else { // run side-effects even when no events are emitted
tryUnstashOne(applySideEffects(sideEffects, state)) } case _: PersistNothing.type => tryUnstashOne(applySideEffects(sideEffects, state)) case _: Unhandled.type => import akka.actor.typed.scaladsl.adapter._ setup.context.system.toClassic.eventStream .publish(UnhandledMessage(msg, setup.context.system.toClassic.deadLetters, setup.context.self.toClassic)) tryUnstashOne(applySideEffects(sideEffects, state)) case _: Stash.type => stashUser(IncomingCommand(msg)) tryUnstashOne(applySideEffects(sideEffects, state)) } }
好了,基本原理都在這了,再挖下去會更骯髒。爲上面的例子設了個運行環境,主要是測試persistence-cassandra-plugin的正確設置,以下:scala
build.sbtdebug
name := "learn-akka-typed" version := "0.1" scalaVersion := "2.13.1" scalacOptions in Compile ++= Seq("-deprecation", "-feature", "-unchecked", "-Xlog-reflective-calls", "-Xlint") javacOptions in Compile ++= Seq("-Xlint:unchecked", "-Xlint:deprecation") val AkkaVersion = "2.6.5" val AkkaPersistenceCassandraVersion = "1.0.0" libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-cluster-sharding-typed" % AkkaVersion, "com.typesafe.akka" %% "akka-persistence-typed" % AkkaVersion, "com.typesafe.akka" %% "akka-persistence-query" % AkkaVersion, "com.typesafe.akka" %% "akka-serialization-jackson" % AkkaVersion, "com.typesafe.akka" %% "akka-persistence-cassandra" % AkkaPersistenceCassandraVersion, "com.typesafe.akka" %% "akka-slf4j" % AkkaVersion, "ch.qos.logback" % "logback-classic" % "1.2.3" )
application.conf
akka.actor.allow-java-serialization = on akka { loglevel = DEBUG actor { serialization-bindings { "com.learn.akka.CborSerializable" = jackson-cbor } } # use Cassandra to store both snapshots and the events of the persistent actors persistence { journal.plugin = "akka.persistence.cassandra.journal" snapshot-store.plugin = "akka.persistence.cassandra.snapshot" } } akka.persistence.cassandra { # don't use autocreate in production journal.keyspace = "poc" journal.keyspace-autocreate = on journal.tables-autocreate = on snapshot.keyspace = "poc_snapshot" snapshot.keyspace-autocreate = on snapshot.tables-autocreate = on } datastax-java-driver { basic.contact-points = ["192.168.11.189:9042"] basic.load-balancing-policy.local-datacenter = "datacenter1" }