在前面的的討論裏已經介紹了CQRS讀寫分離模式的一些原理和在akka-typed應用中的實現方式。經過一段時間akka-typed的具體使用對一些經典akka應用的遷移升級,感受最深的是EvenSourcedBehavior和akka-cluster-sharding了。前者是經典akka中persistenceActor的替換,後者是在原有組件基礎上在使用方面的升級版。二者都在使用便捷性方面提供了大幅度的提高。在我看來,cluster-sharding是分佈式應用的核心,若是可以比較容易掌握,對開發正確的分佈式系統有着莫大的裨益。但這篇討論的重點將會集中在EventSourcedBehavior上,由於它是實現CQRS的關鍵。而CQRS又是大數據應用數據採集(輸入)管理最新的一個重要模式。node
EventSourcedBehaviro是akka-typed爲event-sourcing事件源模式提供的開發支持。具體的原理和使用方法在前面的博客裏都介紹過了,在這篇就再也不重複。咱們把時間精力放到對event-sourcing的瞭解和應用上。數據庫
能夠說,event-sourcing是一種數據庫操做的模式。簡單來講:event-sourcing的工做原理是把對數據庫的操做動做保存起來,不直接對數據庫進行即時更新,而是在一個階段以後經過回溯replay這些動做纔對數據庫進行實質的更新。event-sourcing與傳統數據庫操做模式的最大分別就是:event-sourcing對數據庫的更新過程能夠重複,在一個既定的原點開始重演全部動做能夠得出一樣的結果,即一樣的數據庫狀態。在大數據、高併發應用中最難控制的應該就是用戶操做了。用戶可能在任什麼時候間同時對同一項數據進行更新。通用的傳統方式是經過「鎖」來保證數據的正確性,但「鎖」會給系統帶來更多的麻煩如響應慢甚至系統鎖死。而一旦出現系統鎖死重啓後並沒有有效辦法恢復數據庫正確狀態。event-sourcing偏偏就能有針對性的解決這些問題。session
感受到,event-sourcing模式應該能夠避免對「鎖」的使用:在高併發環境裏,event-sourcing系統的每一個用戶在任什麼時候間都有可能對數據庫進行操做。但他們並不直接改變數據庫內容,而是將這些對數據庫操做的動做保存起來。由於用戶保存的是各自的動做,互不關聯,因此不須要任何鎖機制。當系統完成一個階段的工做後,從這個階段的起點開始,把全部用戶的動做按發生時間順序重演並對數據庫進行實質的更新。能夠看到,這個具體的數據庫更新過程是單一用戶的,因此不須要「鎖」了。階段的起點是由數據庫狀態快照來表示。在完成了這個階段全部動做重演後數據庫狀態一次性更新。整個過程便是CQRS讀寫分離模式了,其中:保存動做爲寫部分,動做重演是讀部分。動做重演能夠在以後的任什麼時候間進行,於是讀、寫是徹底分離的。實際上CQRS就是一個數據庫更新管理的狀態機器:從數據起始狀態到終結狀態的一種過程管理方法。下面就用一個實際的應用設計例子來介紹CQRS在應用系統中的具體使用。併發
下面討論一個超市收款機pos軟件的例子:分佈式
收款流程比較簡單:收款員登陸=>掃碼錄入銷售項目=>錄入折扣=>其它操做=>支付=>打小票ide
最終結果是在數據庫產生了一張銷售單,即一組交易數據,是實際反映在交易數據庫裏的。從CQRS流程來解釋:這組銷售數據在開單時爲空,而後在完成全部單據操做後一次性產生,也就是在CQRS模式的讀部分產生的。在這個過程當中一直是寫部分的操做,不影響交易數據庫狀態。固然,咱們還必須在內存裏維護一個模擬的狀態來對每項操做進行控制,如:用戶未登陸時不允許任何操做動做。因此必須有個狀態能表明用戶登陸的,而這個狀態應該能夠經過動做重演來重現,因此用戶登陸也是一個必須保存的動做。如此,每張銷售單在內存裏都應該有一個狀態,這個狀態包括了單據狀態和一個動態的交易項目集合。這個項目集合就表明即將產生的數據庫交易數據。下面是單據狀態的定義:高併發
case class VchStates( opr: String = "", //收款員
num: Int = 1, //當前單號
seq: Int = 1, //當前序號
void: Boolean = false, //取消模式
refd: Boolean = false, //退款模式
susp: Boolean = false, //掛單
canc: Boolean = false, //廢單
due: Boolean = false, //當前餘額
su: String = "", mbr: String = "", disc: Int = 0, //預設折扣,如:會員折扣
mode: Int = 0 //當前操做流程:0=logOff, 1=LogOn, 2=Payment
) extends CborSerializable { ... }
交易項目是交易數據的直接對應:大數據
case class TxnItem( txndate: String = LocalDate.now.format(DateTimeFormatter.ofPattern("yyyyMMdd")) , txntime: String = LocalDateTime.now.format(dateTimeFormatter).substring(11) , opr: String = "" //工號
, num: Int = 0 //銷售單號
, seq: Int = 1 //交易序號
, txntype: Int = TXNTYPE.sales //交易類型
, salestype: Int = SALESTYPE.nul //銷售類型
, qty: Int = 1 //交易數量
, price: Int = 0 //單價(分)
, amount: Int = 0 //碼洋(分)
, disc: Int = 0 //折扣率 (%) 100% = 1
, dscamt: Int = 0 //折扣額:負值 net實洋 = amount + dscamt
, member: String = "" //會員卡號
, code: String = "" //編號(商品、部類編號、帳戶編號、卡號...)
, refnum: String = "" //參考號,如退貨單號
, acct: String = "" //帳號
, dpt: String = "" //部類
) extends CborSerializable {
爲了提升系統效率,根據操做動做實時對交易項目進行了更新,如遇到折扣動做時須要更新上一條交易項目的優惠金額等。這也是在讀部分動做重演必須的,由於CQRS的讀部分目的是把正確的交易數據寫到數據庫裏。因此,CQRS的寫部分就表明對內存中這個交易項目集的動態更新過程。ui
單據狀態在結單時用EventSourcedBehavior拿了個snapshot做爲下一單的起始狀態。銷售中途出現異常退出後能夠在上一單狀態快照的基礎上實施動做重演把狀態恢復到出現異常以前。url
因爲每一個階段均可以清晰的用一張銷售單的生命週期來表明,因此在整單操做完成後就能夠進行CQRS的讀部分了。操做結束的方式最明顯的是單據完成支付操做了,以下:
case PaymentMade(acct, dpt, num, ref,amount) =>
if (curItem.txntype != TXNTYPE.voided) { val due = items.totalSales - items.totalPaid val bal = if (items.totalSales > 0) due - curItem.amount else due + curItem.amount log.step(s"#${vchState.num} PaymentMade with input totalSales[${items.totalSales}], totalPaid[${items.totalPaid}], txnItems[${items}].") val vchs = vchState.copy( seq = vchState.seq + 1, due = (if ((items.totalPaid.abs + curItem.amount.abs) >= items.totalSales.abs) false else true), mode = (if (items.totalPaid.abs > 0) 2 else 1) ) val vItems = items.addItem(curItem.copy( salestype = SALESTYPE.ttl, price = due, amount = curItem.amount, dscamt = bal )).txnitems if (replay) { Voucher(vchs, vItems) } else { if (vchs.due) { val vch = Voucher(vchs,vItems) log.step(s"#${vchState.num} PaymentMade with current item: ${vch.items.head}") vch } else { writerInternal.lastVoucher = Voucher(vchs, vItems) if (!writerInternal.afterRecovery) endVoucher(Voucher(vchs,vItems),TXNTYPE.sales) Voucher(vchs.nextVoucher, List()) } } } else { log.step(s"#${vchState.num} PaymentMade with current item: $curItem") Voucher(vchState.copy( seq = vchState.seq + 1) , items.addItem(curItem).txnitems) }
確認了完成支付調用endVoucher. endVoucher啓動讀部分reader, 以下:
def endVoucher(voucher: Voucher, txntype: Int)(implicit writerInternal: WriterInternal,pid:Messages.PID) = { log.step(s"#${writerInternal.lastVoucher.header.num } ending voucher with state: ${writerInternal.lastVoucher.header}, txns: ${writerInternal.lastVoucher.items}") val readerShard = writerInternal.optSharding.get //ClusterSharding(writerInternal.actorContext.system)
val readerRef = readerShard.entityRefFor(POSReader.EntityKey, s"$pid.shopId:$pid.posId") val eseq = EventSourcedBehavior.lastSequenceNumber(writerInternal.optContext.get) val bseq = eseq - writerInternal.listOfActions.size + 1 log.step(s"#${writerInternal.lastVoucher.header.num } sending PerformRead(${pid.shopid}, ${pid.posid},${writerInternal.lastVoucher.header.num},${writerInternal.lastVoucher.header.opr},$bseq,$eseq,$txntype,${writerInternal.expurl},${writerInternal.expacct},${writerInternal.exppass}) ...") // log.step(s"#${writerInternal.lastVoucher.header.num } ending voucher with actions: ${writerInternal.listOfActions}")
readerRef ! Messages.PerformRead(pid.shopid, pid.posid,writerInternal.lastVoucher.header.num,writerInternal.lastVoucher.header.opr,bseq,eseq,txntype,writerInternal.expurl,writerInternal.expacct,writerInternal.exppass) writerInternal.clearListOfAction() log.step(s"#${writerInternal.lastVoucher.header.num } ending voucher with actions: ${writerInternal.listOfActions}") }
reader是在一個sharding上即時構建的一個actor。這個actor的主要功能就是從journal裏讀出這張單全部動做進行重演得出交易項目集後寫進交易數據庫:
def readActions(ctx: ActorContext[Command],vchnum: Int, cshr: String, startSeq: Long, endSeq: Long, trace: Boolean, nodeAddress: String, shopId: String, posId: String, txntype: Int): Future[List[TxnItem]] = { implicit val classicSystem = ctx.system.toClassic implicit val ec = classicSystem.dispatcher implicit var vchState = VchStates().copy(num = vchnum, opr = cshr) implicit var vchItems = VchItems() implicit var curTxnItem = TxnItem() implicit val pid = PID(shopId,posId) implicit val writerInternal = new Messages.WriterInternal(nodeAddress = nodeAddress, pid = pid, trace=trace) log.stepOn = trace log.step(s"POSReader: readActions($vchnum,$cshr,$startSeq,$endSeq,$trace,$nodeAddress,$shopId,$posId), txntype=$txntype") def buildVoucher(actions: List[Any]): List[TxnItem] = { log.step(s"POSReader: read actions: $actions") val (voidtxns,onlytxns) = actions.asInstanceOf[Seq[Action]].pickOut(_.isInstanceOf[Voided]) val listOfActions = actions.reverse zip (LazyList from 1) //zipWithIndex
listOfActions.foreach { case (txn,idx) => txn.asInstanceOf[Action] match { case ti@_ => curTxnItem = EventHandlers.buildTxnItem(ti.asInstanceOf[Action],vchState).copy(opr=cshr) if (!ti.isInstanceOf[Voided]) { if (voidtxns.exists(a => a.asInstanceOf[Voided].seq == idx)) { curTxnItem = curTxnItem.copy(txntype = TXNTYPE.voided, opr = cshr) log.step(s"POSReader: voided txnitem: $curTxnItem") } } val vch = EventHandlers.updateState(ti.asInstanceOf[Action],vchState,vchItems,curTxnItem,true) vchState = vch.header vchItems = vch.txnItems log.step(s"POSReader: built txnitem: ${vchItems.txnitems.head}") } } log.step(s"POSReader: voucher built with state: $vchState, items: ${vchItems.txnitems}") vchItems.txnitems } val query = PersistenceQuery(classicSystem).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier) implicit val session = CassandraSessionRegistry(classicSystem).sessionFor("alpakka.cassandra") // issue query to journal
val source: Source[EventEnvelope, NotUsed] = query.currentEventsByPersistenceId(s"${pid.shopid}:${pid.posid}", startSeq, endSeq) // materialize stream, consuming events
val readActions: Future[List[Any]] = source.runFold(List[Any]()) { (lstAny, evl) => evl.event :: lstAny } for { lst1 <- readActions //read list from Source
lstTxns <- if (lst1.length < (endSeq -startSeq)) //if imcomplete list read again
readActions else FastFuture.successful(lst1) items <- FastFuture.successful( buildVoucher(lstTxns) ) _ <- JournalTxns.writeTxnsToDB(vchnum,txntype,startSeq,endSeq,items) _ <- session.close(ec) } yield items }