前面介紹了事件源(EventSource)和集羣(cluster),如今到了討論CQRS的時候了。CQRS即讀寫分離模式,由獨立的寫方程序和讀方程序組成,具體原理在之前的博客裏介紹過了。akka-typed應該天然支持CQRS模式,最起碼自己提供了對寫方編程的支持,這點從EventSourcedBehavior 能夠知道。akka-typed提供了新的EventSourcedBehavior-Actor,極大方便了對persistentActor的應用開發,但同時也給編程者形成了一些限制。如手工改變狀態會更困難了、EventSourcedBehavior不支持多層式的persist,也就是說經過persist某些特定的event而後在event-handler程序裏進行狀態處理是不可能的了。我這裏有個例子,是個購物車應用:當完成支付後須要取個快照(snapshot),下面是這個snapshot的代碼:java
snapshotWhen { (state,evt,seqNr) => CommandHandler.takeSnapshot(state,evt,seqNr) } ... def takeSnapshot(state: Voucher, evt: Events.Action, lstSeqNr: Long)(implicit pid: PID) = { if (evt.isInstanceOf[Events.PaymentMade] || evt.isInstanceOf[Events.VoidVoucher.type] || evt.isInstanceOf[Events.SuspVoucher.type]) if (state.items.isEmpty) { log.step(s"#${state.header.num} taking snapshot at [$lstSeqNr] ...") true } else
false
else
false
}
判斷event類型是沒有問題的,由於正是當前的事件,但另外一個條件是購物車必須是清空了的。這個有點爲難,由於這個狀態要依賴這幾個event運算的結果才能肯定,也就是下一步,但肯定結果又須要對購物車內容進行計算,好像是個死循環。在akka-classic裏咱們能夠在判斷了event運算結果後,若是須要改變狀態就再persist一個特殊的event,而後在這個event的handler進行狀態處理。沒辦法,EventSourcedBehavior不支持多層persist,只有這樣作:node
case PaymentMade(acct, dpt, num, ref,amount) => ... writerInternal.lastVoucher = Voucher(vchs, vItems) endVoucher(Voucher(vchs,vItems),TXNTYPE.sales) Voucher(vchs.nextVoucher, List()) ...
我只能先吧當前狀態保存下來、進行結單運算、而後清空購物車,這樣snapshot就能夠順利進行了。數據庫
好了,akka的讀方編程是經過PersistentQuery實現的。reader的做用就是把event從數據庫讀出來後再恢復成具體的數據格式。咱們從reader的調用瞭解一下這個應用裏reader的實現細節:編程
val readerShard = writerInternal.optSharding.get val readerRef = readerShard.entityRefFor(POSReader.EntityKey, s"$pid.shopId:$pid.posId") readerRef ! Messages.PerformRead(pid.shopid, pid.posid,writerInternal.lastVoucher.header.num,writerInternal.lastVoucher.header.opr,bseq,eseq,txntype,writerInternal.expurl,writerInternal.expacct,writerInternal.exppass)
能夠看到這個reader是一個集羣分片,sharding-entity。想法是每單完成購買後發個消息給一個entity、這個entity再完成reader功能後自動終止,當即釋放出佔用的資源。reader-actor的定義以下:json
object POSReader extends LogSupport { val EntityKey: EntityTypeKey[Command] = EntityTypeKey[Command]("POSReader") def apply(nodeAddress: String, trace: Boolean): Behavior[Command] = { log.stepOn = trace implicit var pid: PID = PID("","") Behaviors.supervise( Behaviors.setup[Command] { ctx => Behaviors.withTimers { timer =>
implicit val ec = ctx.executionContext Behaviors.receiveMessage { case PerformRead(shopid, posid, vchnum, opr, bseq, eseq, txntype, xurl, xacct, xpass) => pid = PID(shopid, posid) log.step(s"POSReader: PerformRead($shopid,$posid,$vchnum,$opr,$bseq,$eseq,$txntype,$xurl,$xacct,$xpass)")(PID(shopid, posid)) val futReadSaveNExport = for { txnitems <- ActionReader.readActions(ctx, vchnum, opr, bseq, eseq, trace, nodeAddress, shopid, posid, txntype) _ <- ExportTxns.exportTxns(xurl, xacct, xpass, vchnum, txntype == Events.TXNTYPE.suspend, { if(txntype == Events.TXNTYPE.voidall) txnitems.map (_.copy(txntype=Events.TXNTYPE.voidall)) else txnitems }, trace)(ctx.system.toClassic, pid) } yield () ctx.pipeToSelf(futReadSaveNExport) { case Success(_) => { timer.startSingleTimer(ReaderFinish(shopid, posid, vchnum), readInterval.seconds) StopReader } case Failure(err) => log.error(s"POSReader: Error: ${err.getMessage}") timer.startSingleTimer(ReaderFinish(shopid, posid, vchnum), readInterval.seconds) StopReader } Behaviors.same case StopReader => Behaviors.same case ReaderFinish(shopid, posid, vchnum) => Behaviors.stopped( () => log.step(s"POSReader: {$shopid,$posid} finish reading voucher#$vchnum and stopped")(PID(shopid, posid)) ) } } } ).onFailure(SupervisorStrategy.restart) }
reader就是一個普通的actor。值得注意的是讀方程序多是一個龐大複雜的程序,確定須要分割成多個模塊,因此咱們能夠按照流程順序進行模塊功能切分:這樣下面的模塊可能會須要上面模塊產生的結果才能繼續。記住,在actor中絕對避免阻塞線程,全部的模塊都返回Future, 而後用for-yield串起來。上面咱們用了ctx.pipeToSelf 在Future運算完成後發送ReaderFinish消息給本身,通知本身中止。api
在這個例子裏咱們把reader任務分紅:session
一、從數據庫讀取事件app
二、事件重演一次產生狀態數據(購物車內容)ide
三、將造成的購物車內容做爲交易單據項目存入數據庫post
四、向用戶提供的restapi輸出交易數據
event讀取是經過cassandra-persistence-plugin實現的:
val query = PersistenceQuery(classicSystem).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier) // issue query to journal
val source: Source[EventEnvelope, NotUsed] = query.currentEventsByPersistenceId(s"${pid.shopid}:${pid.posid}", startSeq, endSeq) // materialize stream, consuming events
val readActions: Future[List[Any]] = source.runFold(List[Any]()) { (lstAny, evl) => evl.event :: lstAny }
這部分比較簡單:定義一個PersistenceQuery,用它產生一個Source,而後run這個Source獲取Future[List[Any]]。
重演事件產生交易數據:
def buildVoucher(actions: List[Any]): List[TxnItem] = { log.step(s"POSReader: read actions: $actions") val (voidtxns,onlytxns) = actions.asInstanceOf[Seq[Action]].pickOut(_.isInstanceOf[Voided]) val listOfActions = onlytxns.reverse zip (LazyList from 1) //zipWithIndex
listOfActions.foreach { case (txn,idx) => txn.asInstanceOf[Action] match { case Voided(_) =>
case ti@_ => curTxnItem = EventHandlers.buildTxnItem(ti.asInstanceOf[Action],vchState).copy(opr=cshr) if(voidtxns.exists(a => a.asInstanceOf[Voided].seq == idx)) { curTxnItem = curTxnItem.copy(txntype = TXNTYPE.voided, opr=cshr) log.step(s"POSReader: voided txnitem: $curTxnItem") } val vch = EventHandlers.updateState(ti.asInstanceOf[Action],vchState,vchItems,curTxnItem,true) vchState = vch.header vchItems = vch.txnItems log.step(s"POSReader: built txnitem: ${vchItems.txnitems.head}") } } log.step(s"POSReader: voucher built with state: $vchState, items: ${vchItems.txnitems}") vchItems.txnitems }
重演List[Event],產生了List[TxnItem]。
向數據庫裏寫List[TxnItem]:
def writeTxnsToDB(vchnum: Int, txntype: Int, bseq: Long, eseq: Long, txns: List[TxnItem])( implicit system: akka.actor.ActorSystem, session: CassandraSession, pid: PID): Future[Seq[TxnItem]] = ???
注意返回結果類型Future[Seq[TxnItem]]。咱們用for-yield把這幾個動做串起來:
val txnitems: Future[List[Events.TxnItem]] = for { lst1 <- readActions //read list from Source
lstTxns <- if (lst1.length < (endSeq -startSeq)) //if imcomplete list read again
readActions else FastFuture.successful(lst1) items <- FastFuture.successful( buildVoucher(lstTxns) ) _ <- JournalTxns.writeTxnsToDB(vchnum,txntype,startSeq,endSeq,items) _ <- session.close(ec) } yield items
注意返回結果類型Future[Seq[TxnItem]]。咱們用for-yield把這幾個動做串起來:
val txnitems: Future[List[Events.TxnItem]] = for { lst1 <- readActions //read list from Source
lstTxns <- if (lst1.length < (endSeq -startSeq)) //if imcomplete list read again
readActions else FastFuture.successful(lst1) items <- FastFuture.successful( buildVoucher(lstTxns) ) _ <- JournalTxns.writeTxnsToDB(vchnum,txntype,startSeq,endSeq,items) _ <- session.close(ec) } yield items
注意:這個for返回的Future[List[TxnItem]],是提供給restapi輸出功能的。在那裏List[TxnItem]會被轉換成json做爲post的包嵌數據。
如今全部子任務的返回結果類型都是Future了。咱們能夠再用for來把它們串起來:
val futReadSaveNExport = for { txnitems <- ActionReader.readActions(ctx, vchnum, opr, bseq, eseq, trace, nodeAddress, shopid, posid, txntype) _ <- ExportTxns.exportTxns(xurl, xacct, xpass, vchnum, txntype == Events.TXNTYPE.suspend, { if(txntype == Events.TXNTYPE.voidall) txnitems.map (_.copy(txntype=Events.TXNTYPE.voidall)) else txnitems }, trace)(ctx.system.toClassic, pid) } yield ()
說到EventSourcedBehavior,由於用了cassandra-plugin,突然想起配置文件裏新舊有很大區別。如今這個application.conf是這樣的:
akka { loglevel = INFO actor { provider = cluster serialization-bindings { "com.datatech.pos.cloud.CborSerializable" = jackson-cbor } } remote { artery { canonical.hostname = "192.168.11.189" canonical.port = 0 } } cluster { seed-nodes = [ "akka://cloud-pos-server@192.168.11.189:2551"] sharding { passivate-idle-entity-after = 5 m } } # use Cassandra to store both snapshots and the events of the persistent actors persistence { journal.plugin = "akka.persistence.cassandra.journal" snapshot-store.plugin = "akka.persistence.cassandra.snapshot" } } akka.persistence.cassandra { # don't use autocreate in production
journal.keyspace = "poc2g" journal.keyspace-autocreate = on journal.tables-autocreate = on snapshot.keyspace = "poc2g_snapshot" snapshot.keyspace-autocreate = on snapshot.tables-autocreate = on } datastax-java-driver { basic.contact-points = ["192.168.11.189:9042"] basic.load-balancing-policy.local-datacenter = "datacenter1" }
akka.persitence.cassandra段落裏能夠定義keyspace名稱,這樣新舊版本應用能夠共用一個cassandra,同時在線。