Akka-CQRS(6)- read-side

前面咱們全面介紹了在akka-cluster環境下實現的CQRS寫端write-side。簡單來講就是把發生事件描述做爲對象嚴格按發生時間順序寫入數據庫。這些事件對象通常是按照二進制binary方式如blob存入數據庫的。cassandra-plugin的表結構以下:數據庫

CREATE KEYSPACE IF NOT EXISTS akka WITH REPLICATION = { 'class' : 'SimpleStrategy','replication_factor':1 }; CREATE TABLE IF NOT EXISTS akka.messages ( used boolean static, persistence_id text, partition_nr bigint, sequence_nr bigint, timestamp timeuuid, timebucket text, writer_uuid text, ser_id int, ser_manifest text, event_manifest text, event blob, meta_ser_id int, meta_ser_manifest text, meta blob, message blob, tags set<text>, PRIMARY KEY ((persistence_id, partition_nr), sequence_nr, timestamp, timebucket)) WITH gc_grace_seconds =864000 AND compaction = { 'class' : 'SizeTieredCompactionStrategy', 'enabled' : true, 'tombstone_compaction_interval' : 86400, 'tombstone_threshold' : 0.2, 'unchecked_tombstone_compaction' : false, 'bucket_high' : 1.5, 'bucket_low' : 0.5, 'max_threshold' : 32, 'min_threshold' : 4, 'min_sstable_size' : 50 }; 

事件對象是存放在event裏的,是個blob類型字段。下面是個典型的寫動做示範: 安全

 val receiveCommand: Receive = { case Cmd(data) => persist(ActionGo) { event => updateState(event) } } 

這些事件描述的存寫即寫這個ActionGo時不會影響到實際業務數據狀態。真正發生做用,改變當前業務數據狀態的是在讀端read-side。也就是說在另外一個線程裏有個程序也按時間順序把這些二進制格式的對象讀出來、恢復成某種結構如ActionGo類型、而後按照結構內的操做指令對業務數據進行實際操做處理,這時纔會產生對業務數據的影響。作個假設:若是這些事件不會依賴時間順序的話是否是能夠偷偷懶直接用一種pub/sub模式把reader放在訂閱subscriber端,以下: app

 

//寫端 import DistributedPubSubMediator.Publish val mediator = DistributedPubSub(context.system).mediator val receiveCommand: Receive = { case Cmd(data) => persist(DataUpdated) { event => updateState(event) mediator ! Publish(persistentId, event,sendOneMessageToEachGroup = true) } } //讀端 val mediator = DistributedPubSub(context.system).mediator mediator ! Subscribe(persistentId, self) def receive = { case DataUpdated: Event ⇒ updateDataTables() } 

 

這種pub/sub模式的特色是消息收發雙方耦合度很是鬆散,但同時也存在訂閱方sub即reader十分難以控制的問題,並且能夠確定的是訂閱到達消息沒法保證是按發出時間順序接收的,咱們沒法控制akka傳遞消息的過程。由於業務邏輯中一個動做的發生時間順序每每會對周圍業務數據產生不一樣的影響,因此如今只能考慮事件源event-sourcing這種模式了。es方式的CQRS是經過數據庫表做爲讀寫間隔實現寫端程序和讀端程序的分離。寫端只管往數據庫寫數據操做指令,讀端從同一數據庫位置讀出指令進行實質的數據處理操做,因此讀寫過程當中會產生必定的延遲,讀端須要不斷從數據庫抽取pull事件。而具體pull的時段間隔如何設定也是一個比較棘手的問題。不管如何,akka提供了Persistence-Query做爲一種CQRS讀端工具。咱們先從一個簡單的cassandra-persistence-query用例開始: 負載均衡

// obtain read journal by plugin id val readJournal = PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier) // issue query to journal val source: Source[EventEnvelope, NotUsed] = readJournal.eventsByPersistenceId("user-1337", 0, Long.MaxValue) // materialize stream, consuming events implicit val mat = ActorMaterializer() source.runForeach { pack => updateDatabase(pack.event) } 

eventsByPersistenceId(...)構建了一個akka-stream的Source[EventEnvelope,_]。這個EventEnvelope類定義以下: ide

/** * Event wrapper adding meta data for the events in the result stream of * [[akka.persistence.query.scaladsl.EventsByTagQuery]] query, or similar queries. */ final case class EventEnvelope( offset: Offset, persistenceId: String, sequenceNr: Long, event: Any) 

上面這個event字段就是從數據庫讀取的事件對象。EventEnvelope是以流元素的形式從數據庫中提出。eventsByPersistenceId(...)啓動了一個數據流,而後akka-persistence-query會按refresh-interval時間間隔重複運算這個流stream。refresh-interval能夠在配置文件中設置,以下面的cassandra-plugin配置: 函數

cassandra-query-journal { # Implementation class of the Cassandra ReadJournalProvider class = "akka.persistence.cassandra.query.CassandraReadJournalProvider" # Absolute path to the write journal plugin configuration section write-plugin = "cassandra-journal" # New events are retrieved (polled) with this interval. refresh-interval = 3s ... } 

以上描述的是一種接近實時的讀取模式。通常來說,爲了實現高效、安全的事件存寫,咱們會盡可能簡化事件結構,這樣就會高几率出現一個業務操做單位須要多個事件來描述,那麼若是在完成一項業務操做單元的全部事件存寫後纔開始讀端的動做不就簡單多了嗎?並且還比較容易控制。雖然這樣會形成某種延遲,但若是以業務操做爲衡量單位,這種延遲應該是很正常的,能夠接受的。如今每當完成一項業務的全部事件存寫後在讀端一次性成批把事件讀出來而後進行實質的數據操做,應當可取。akka-persistence-query提供了下面這個函數: 工具

 

 

  /**    * Same type of query as `eventsByPersistenceId` but the event stream    * is completed immediately when it reaches the end of the "result set". Events that are    * stored after the query is completed are not included in the event stream.    */   override def currentEventsByPersistenceId(       persistenceId: String,       fromSequenceNr: Long,       toSequenceNr: Long): Source[EventEnvelope, NotUsed] = ... 

咱們能夠run這個stream把數據讀入一個集合裏,而後能夠在任何一個線程裏用這個集合演算業務邏輯(如咱們前面提到的寫端狀態轉變維護過程),能夠用router/routee模式來實現一種在集羣節點中負載均衡式的分配reader-actor做業節點。post

下一篇準備對應前面的CQRS Writer Actor 示範裏的akka-cluster-pos進行rCQRS-Reader-Actor示範。ui

相關文章
相關標籤/搜索