近兩年,一直在折騰用FP與OO共存的編程語言Scala,採起以函數式編程爲主的方式,結合TDD和BDD的手段,採用Domain Driven Design的方法學,去構造DDDD應用(Domain Driven Design & Distributed)。期間,嘗試了大量的框架:業務領域主要適用Akka、Scalaz等框架,UI和基礎設施方面主要適用Spring、Kafka等框架,測試則主要適用Spock、ScalaTest、Selenium等框架。html
兩年的折騰,不能說沒有一點收穫。在覈心的領域模型設計方面,經過嘗試用傳統Akka的Actor包裹聚合,以自定義的Command和Event進行信息交換,用Free Monad構造表達樹實現延遲計算,用Akka Persistence實現Event Store,用Kafka維護命令和事件隊列,讓我依稀看到了以FP的方式實現ES+CQRS架構應用的曙光。java
但如此多的框架不只讓人眼花繚亂,還要學習Scala、Groovy、Java等編程語言以及Gradle等Build工具的DSL,若是再加上Cluster、Cloud和Micro Service,這無疑是一個浩大的工程。所幸,伴隨Akka 2.6版本推出的Akka Typed,解決了消息難以管理、框架入侵過甚、節點部署繁複等痛點和難點。特別是在ES和CQRS方面,Akka Typed能夠說是提供了一個完整的解決方案。就如同當初LINQ的出現同樣,給我打開了一扇新的窗口。node
如下,即是我學習Akka Typed官方文檔時隨看隨寫留下的筆記(語種Scala,版本2.6.5,在學習過程當中已更新至2.6.6),以備查考。內容和路線均以我我的的關注點爲主,因此只是節選且不免有失偏頗。react
📌 文中提到的RMP是指Vaughn Vernon撰寫的Reactive Messaging Patterns with the Actor Model一書。它是我學習傳統Actor模型時的重要參考書籍。git
📎 在Github上,Paweł Kaczor開發出的akka-ddd框架,是一個很是棒的學習案例。github
- akka-ddd:一個基於Akka和EventStore實現的CQRS架構的DDD框架。
- ddd-leaven-akka-v2:使用akka-ddd框架實現的電子商務應用。
🔗 https://doc.akka.io/docs/akka/current/typed/guide/tutorial.htmlsql
這是一個在物聯時代,讓用戶能夠藉助遍及家中的溫度監測儀,隨時掌握屋內各個角落溫度狀況的應用。docker
在IDEA的Editor/General/Auto Import
中排除對*javadsl*
的自動導入。數據庫
須要的依賴:express
implementation 'org.scala-lang:scala-library:2.13.2' implementation 'com.typesafe.akka:akka-actor-typed_2.13:2.6.5' implementation 'ch.qos.logback:logback-classic:1.2.3' testImplementation 'org.scalatest:scalatest_2.13:3.1.2' testImplementation 'com.typesafe.akka:akka-actor-testkit-typed_2.13:2.6.5'
val child = context.spawn(Behaviors.supervise(child()).onFailure(SupervisorStrategy.restart), name = "child-actor")
改變默認的監督策略Stop。傳統Akka採起的方式是override supervisorStrategy,用一個閉包聲明Decider函數(參見RMP-52)。context.watchWith(watcheeActor, WatcheeTerminated(watcheeActor,...))
創建觀察關係。(💀 WatcheeTerminated會被context自動填充嗎?貌似是的。)爲防止死鎖(你們都搶,結果都吃不上)、飢餓(弱肉強食,弱者總是吃不上)和活鎖(你們都謙讓,結果都很差意思吃),有三種確保死鎖無憂的無阻塞設計,其能力由強到弱以下:
整個Actor System的體系,如同一個組織,任務老是逐級下派,命令老是下級服從上級。這與常見的分層軟件設計(Layered Software Design)是不一樣的,後者老是千方百計把問題隱藏和解決在本身那一層,而不是交給上級去處理或與其餘人協商。推薦的作法主要包括:
🔗 https://doc.akka.io/docs/akka/current/coordinated-shutdown.html
當應用的全部工做完成後,能夠通知/user監督者中止運行,或者調用ActorSystem.terminate方法,從而經過運行協調關機CoordinatedShutdown來中止全部正在運行的Actor。在此過程當中,你還能夠執行其餘一些清理和掃尾工做。
官方文檔有專章講解Actor的方方面面,本章只是介紹基本概念。
Actor的主要做用包括:向熟識的其餘Actor發送消息,建立新的Actor,指定處理下一條消息的行爲。它做爲一個容器,包括有狀態State、行爲Behavior、郵箱Mailbox、監督策略Supervisor Strategy以及若干的子Actor等內容物,且該容器只能經過指定消息類型的參數化ActorRef進行引用,以確保最基本的隔離:
在Actor終止後,其持有的全部資源將被回收,剩下未處理的消息將轉入Actor System的死信郵箱Dead Letter Mailbox,然後續新傳來的消息也將悉數轉到System的EventStream做爲死信處理。
⚠️ Akka Typed的監管已經從新設計,與傳統Akka有顯著區別
監管的對象是意料以外的失敗(Unexpected Failure),而不是校驗錯誤或者try-catch能處理的預期異常。因此,監管是Actor的額外裝飾,並不屬於Actor消息處理的組成部分。然後者,則是屬於Actor業務邏輯的一部分。
當失敗發生時,監管的策略包括如下三種:
⚡ 要注意的是,引起失敗的那條消息將不會再被處理,並且期間Actor發生的這些變化,在父Actor之外的範圍都是不可知的。
Lifecycle Monitoring一般是指DeathWatch(💀 以前叫Dead Watch,Watch觀察,Monitoring監測,譯爲觀察更爲妥帖)。這是除了父子間的監管關係外,Actor之間另外一種監測關係。因爲Supervision致使的Actor Restart對外是不可知的,因此要用Monitoring在一對Actor之間創建監測關係。但從目的上講兩者是有區別的,Supervision主要爲應對失敗情形,Monitoring主要爲確保另外一方知悉本方已終止運行。
使用context.watch(targetActorRef)
及unwatch來創建或撤銷監測關係。當被監測Actor終止時,監測方Actor將收到一條Terminated消息(💀不是Signal嗎?),而默認的消息處理是拋出一個DeathPactException
。
⚡ 要注意的是,監測關係的創建和目標Actor終止時間無關。這就意味着在創建監測關係時,即便目標Actor已經終止,此時監測Actor仍將收到一條Terminated消息。
🔗 https://doc.akka.io/docs/akka/current/typed/fault-tolerance.html
爲防止Actor相互可見和消息亂序問題,Akka嚴格遵照如下兩條「發生以前(happens before)」守則:
Delivery翻譯爲「投遞」更爲妥帖,更好模仿郵政業務的妥投等術語。「送達」側重結果,「發送"側重動做自己。
藉助Akka Persistence確保消息妥投。(參見RMP-164)
🔗 https://doc.akka.io/docs/akka/current/typed/reliable-delivery.html
事件溯源的本質,是執行一條Command,衍生出若干條Event,這些Event既是Command產生的反作用,也是改變對象狀態的動因,及其生命週期內不可變的歷史。
Akka Persistence對事件溯源提供了直接支持。
🔗 https://doc.akka.io/docs/akka/current/typed/persistence.html#event-sourcing-concepts
能夠經過自定義郵箱,實現消息投遞的重試。但這多數僅限於本地通信的場景,具體緣由參見🔗 The Rules for In-JVM (Local) Message Sends
沒法妥投的而不是因網絡故障等緣由被丟失了的消息,將被送往名爲/deadLetters的Actor,所以這些消息被稱爲Dead Letter(參見RMP-161)。產生死信的緣由主要是收件人不詳或已經死亡,而死信Actor也主要用於系統調試。
因爲死信不能經過網絡傳遞,因此要蒐集一個集羣內的全部死信,則須要一臺一臺地收集每臺主機本地的死信後再進行彙總。經過在系統的Event Stream對象akka.actor.DeadLetter
中註冊,普通Actor將能夠訂閱到本地的全部死信消息。
Akka使用Typesafe Config Library管理配置信息。該庫獨立於Akka,也可用於其餘應用的配置信息管理。
Akka的ActorSystem在啓動時,全部的配置信息均會經過解析class path根目錄處的application.conf/.json/.properties等文件而加載入Config對象,並經過合併全部的reference.conf造成後備配置。
⚠️ 若正在編寫的屬於Akka應用程序,則Akka配置信息應寫入application.conf;如果基於Akka的庫,則配置信息應寫入reference.conf。而且,Akka不支持從另外一個庫中覆寫(override)當前庫中的config property。
配置信息既能夠從外部配置文件加載,也可用代碼實現運行時解析,還能夠利用ConfigFactory.load()從不一樣地方加載。
import akka.actor.typed.ActorSystem import com.typesafe.config.ConfigFactory val customConf = ConfigFactory.parseString(""" akka.log-config-on-start = on """) // ConfigFactory.load sandwiches customConfig between default reference // config and default overrides, and then resolves it. val system = ActorSystem(rootBehavior, "MySystem", ConfigFactory.load(customConf))
一個典型的多項目配置示例:
myapp1 { akka.loglevel = "WARNING" my.own.setting = 43 } myapp2 { akka.loglevel = "ERROR" app2.setting = "appname" } my.own.setting = 42 my.other.setting = "hello"
相應的配置信息加載代碼示例:
val config = ConfigFactory.load() val app1 = ActorSystem(rootBehavior, "MyApp1", config.getConfig("myapp1").withFallback(config)) val app2 = ActorSystem(rootBehavior, "MyApp2", config.getConfig("myapp2").withOnlyPath("akka").withFallback(config))
🔗 Akka的默認配置列表,長達近千行……
📎 Akka Config Checker是用於查找Akka配置衝突的有力工具。
🏭 com.typesafe.akka:akka-actor-typed:2.6.5
示例HelloWorld是由HelloWorldMain建立一個HelloWorld(即Greeter),在每次ActorSystem要求HelloWorld SayHello的時候,就建立一個SayHello消息所賦名稱對應的HelloWorldBot(因此會有若干個動做相同但名稱不一樣的Bot),而後要求Greeter去向這個Bot問好,最後以Greeter與Bot相互問候數次做爲結束。
示例採用了FP風格,Actor的狀態和行爲均在Singleton對象裏定義,採用了相似傳統Akka receive()
的函數Behaviors.receive { (context, message) => ... }
,以消息類型做爲約束,實現了Actor的互動與組合。在每一個Bot裏,利用消息的遞歸重入維持一個Greeting的計數值,屆滿則用Behaviors.stopped中止響應,不然遞歸重入。
Behaviors.receive {...}與receiveMessage {...}的區別,在於前者將把context帶入閉包。
這是一個相似聊天室功能的示例,各Actor的職責、定義和聯繫以下表:
Actor | 職責 | Behavior類型 | Command | Event |
---|---|---|---|---|
Main | 建立聊天室ChatRoom和客戶Gabbler,併爲兩者牽線搭橋 | NotUsed | ||
ChatRoom | 建立並管理一組Session | RoomCommand |
|
|
Session | 負責播發諸如Gabbler這樣的Client的發言 | SessionCommand |
|
|
Gabbler | 響應Session | SessionEvent |
|
示例先採用FP風格實現。好比ChatRoom在處理GetSession消息時,最後以chatRoom(ses :: sessions)返回一個新的Behavior實例結束,這裏的sessions正是Actor ChatRoom維護的狀態。
示例演示瞭如何限制消息的發件人。好比session及其工廠方法,以及PublishSessionMessage類型均爲chatroom私有,外部不可訪問;在session Behavior的PostMessage分支中,chatroom的ActorRef經過工廠方法傳入session,且類型被限制爲ActorRef[PublishSessionMessage]。這樣外界只能與ChatRoom通訊,而後由ChatRoom在內部將消息轉交Session處理。
處理消息的參數來源於工廠方法的傳入參數,仍是封裝在消息的字段裏,這個示例也分別給出了樣板。💀 在設計通訊協議時,消息定義爲Command仍是Event,消息的主人是誰,處理消息須要的參數如何傳入等等,都是須要考慮的問題。
爲實現程序安全退出,示例在Main的Behavior裏,設置了Dead Watch觀察gabbler,並定義了Behaviors.receiveSignal {...},在收到gabbler處理完MessagePosted消息,因返回Behaviors.stopped而發出的Terminated信號後,以Main自身的Behaviors.stopped做爲結束。
⚡ Behaviors.setup是一個Behavior的工廠方法,該Behavior的實例將在Actor啓動後才建立。而Behaviors.receive雖也是Behavior的工廠方法之一,但Behavior的實例倒是在Actor啓動的那一刻就同時建立的。
Actor是一個須要顯式啓停而且自帶狀態的資源(子Actor與隨父Actor雖不共生、但定共死),因此回想在GC出現前須要本身管理內存句柄的時代吧。
Actor System是一個高能耗的系統,因此一般一個應用或者一個JVM裏只有一個Actor System。
ActorContext可用做:
self
。ActorContext自己並非徹底線程安全的,主要有如下限制:
孵化有兩層含義:建立並啓動。
在使用Behaviors.setup啓用SpawnProtocol後,在應用中任何地方都將能夠不直接引用context,改用telling或asking方式完成Actor系統的組裝。其中,Ask方式的使用相似傳統Akka,它將返回Future[ActorRef[XX]]。
⚡ 留意示例代碼裏的幾處泛型約束,由這些Message串起了應用的流程。
// 啓用SpawnProtocol的Actor object HelloWorldMain { def apply(): Behavior[SpawnProtocol.Command] = Behaviors.setup { context => // Start initial tasks // context.spawn(...) SpawnProtocol() } } implicit val system: ActorSystem[SpawnProtocol.Command] = ActorSystem(HelloWorldMain(), "hello") val greeter: Future[ActorRef[HelloWorld.Greet]] = system.ask(SpawnProtocol.Spawn(behavior = HelloWorld(), name = "greeter", props = Props.empty, _)) val greetedBehavior = Behaviors.receive[HelloWorld.Greeted] { (context, message) => context.log.info2("Greeting for {} from {}", message.whom, message.from) Behaviors.stopped } val greetedReplyTo: Future[ActorRef[HelloWorld.Greeted]] = system.ask(SpawnProtocol.Spawn(greetedBehavior, name = "", props = Props.empty, _)) for (greeterRef <- greeter; replyToRef <- greetedReplyTo) { greeterRef ! HelloWorld.Greet("Akka", replyToRef) }
Actor能夠經過返回Behaviors.stopped做爲接替Behavior來中止自身運行。
子Actor能夠在處理完當前消息後,被其父Actor使用ActorContext.stop方法強行關停。
全部子Actor都將伴隨其父Actor關停而關停。
當Actor中止後將會收到一個PostStop信號,能夠用Behaviors.receiveSignal在該信號的處理方法裏完成其餘的清理掃尾工做,或者提早給Behaviors.stopped傳入一個負責掃尾的閉包函數,以實現Actor優雅地關停。(💀 經測試,前者將先於後者執行。)
因爲Terminated信號只帶有被觀察者的ActorRef,因此爲了添加額外的信息,在註冊觀察關係時能夠用context.watchWith(watchee, SpecifiedMessageRef)取代context.watch(watchee)。這樣在Terminated信號觸發時,觀察者將收到預約義的這個SpecifiedMessageRef。
⚡ 註冊、撤銷註冊和Terminated事件的到來,在時序上並不必定嚴格遵照先註冊後Terminated這樣的規則,由於消息是異步的,且有郵箱的存在。
Actor之間的交互,只能經過彼此的ActorRef[Message]來進行。這些ActorRef和Message,構成了Protocol的所有,既代表了通訊的雙方,也代表了Actor能處理的消息、限制了能發給它的消息類型。
📎 要運行示例代碼,須要導入日誌和Ask模式的支持:
import akka.actor.typed.scaladsl.LoggerOps import akka.actor.typed.scaladsl.AskPattern._
而且在test/resources文件夾下的logback-test.xml裏配置好日誌:
<?xml version="1.0" encoding="UTF-8"?> <configuration> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> <level>INFO</level> </filter> <encoder> <pattern>[%date{ISO8601}] [%level] [%logger] [%marker] [%thread] - %msg MDC: {%mdc}%n</pattern> </encoder> </appender> <appender name="CapturingAppender" class="akka.actor.testkit.typed.internal.CapturingAppender"/> <logger name="akka.actor.testkit.typed.internal.CapturingAppenderDelegate"> <appender-ref ref="STDOUT"/> </logger> <root level="DEBUG"> <appender-ref ref="CapturingAppender"/> </root> </configuration>
使用異步的、線程安全的tell發出消息,但不保證對方收到消息,也不關心該消息是否被對方處理完畢了。
recipient ! message
發件人發出Request並附上回信地址,並以得到收件人的Response做爲消息妥投並被處理的確信。
先定義Request和Response,隨後sender在發出Request時把self做爲replyTo的ActorRef一併傳出,方便recipient收到Request後回覆Response。
把收件人的Response進行簡單封裝,即做爲發件人可處理的消息類型,從而減小發件人定義Protocol的負擔。
定義收件人recipient的Response類型,再在sender裏定義適配後的Response類型,而後在其Behavior.setup裏用context.messageAdapter(rsp => WrappedResponse(rsp))
註冊一個消息適配器,最後在適配後消息的分支裏取出原始的Response(當初由收件人回覆的),再處理該消息。適配器能匹配預約義的響應類型及其派生類,總以最晚註冊的爲有效版本,屬於sender且與sender同生命週期,因此當適配器觸發異常時將致使其宿主中止。
把Request-Response本來使用的tell方式改成ask,從而能限定Response發回的時間,超時則視做ask失敗。
在提問人中,經過Behaviors.setup定義隱式的超時時限,以context.ask(recipientRef, request) { case Success(Response(msg)) => AdaptedResponse(msg); case Failure(_) => AdaptedResponse(...) }
使用ask方式發出Request,並備妥Response到來時的適配預案(無需再額外象Adapted Response那樣註冊消息適配器),最後用工廠Behaviors.receiveMessage定義適配後響應消息的處理函數。
在Actor System之外直接用ask向某個Actor提問,最終獲得用Future包裝好的回覆。
定義隱式的ActorSystem實例和超時時限,用reply: Future[Response] = recipient.ask(ref => Request).flatMap { case response => Future.successful(response); case another => Future.failed(...) }
定義Request-Response的對應關係,再經過system.executionContext
啓動執行,最後在reply的回調onComplete { case Success(Response) => ...; case Failure(_) => ...}
裏取出Response區別處理。
當不關心收件人的迴應時,在Request裏把回信地址設置爲何也不幹的ignoreRef
,使模式從Request-Response變爲Fire-Forget。
發件人發出Request時,把回覆地址從context.self
改成什麼消息也不處理的context.system.ignoreRef
。
因爲ignoreRef將忽略全部發給它的消息,因此使用時必須當心。
在Actor內部有Future類型的調用時,使用pipeToSelf獲取回調結果。儘管直接用Future.onComplete也能取出結果,但會所以將Actor的內部狀態暴露給外部線程(在onComplete裏能直接訪問Actor內部狀態),因此並不安全。
在Actor內部,先定義Future調用futureResult,再使用context.pipeToSelf(futureResult) { case Success(_) => WrappedResult(...); case Failure(_) => WrappedResult(...)}
將回調結果封裝入WrappedResult消息,最後在WrappedResult消息分支裏再做迴應。
當一份響應須要綜合多個Actor的回覆信息才能做出時,由一個父Actor委託多個子Actor蒐集信息,待信息齊備後才由父Actor彙總發回給Request的請求人,請求人除與父Actor之間的協議外,對其間細節一律不知。這些子Actor僅活在每次會話期間,故名爲「每會話」的子Actor。
由父Actor在Behaviors.setup裏構造實際承擔工做的一組子Actor,在Request處理過程當中構造負責組織協調子Actor的管家Actor(其行爲類型爲Behavior[AnyRef],以保證類型最大程度地兼容)。隨後在管家Actor的Behaviors.setup裏向子Actor發出Request,接着在Behaviors.receiveMessage裏,使用遞歸反覆嘗試從子Actor的Response裏取出結果(生產條件下應該設定子Actor響應超時)。當全部結果都取出後,由管家Actor利用父Actor傳入的replyTo直接向外發出Response,最後中止管家Actor。
這當中的關鍵點包括:一是在管家Actor裏的幾處,使用narrow限定Actor的類型T:<U,這也算是一種妥協,確保消息類型爲子類型T而非父類型U,從而實現更嚴謹的約束;二是利用遞歸配合Option[T]取出子Actor的響應結果。
// 子Actor case class Keys() case class Wallet() // 父Actor object Home { sealed trait Command case class LeaveHome(who: String, replyTo: ActorRef[ReadyToLeaveHome]) extends Command case class ReadyToLeaveHome(who: String, keys: Keys, wallet: Wallet) def apply(): Behavior[Command] = { Behaviors.setup[Command] { context => val keyCabinet: ActorRef[KeyCabinet.GetKeys] = context.spawn(KeyCabinet(), "key-cabinet") val drawer: ActorRef[Drawer.GetWallet] = context.spawn(Drawer(), "drawer") Behaviors.receiveMessage[Command] { case LeaveHome(who, replyTo) => context.spawn(prepareToLeaveHome(who, replyTo, keyCabinet, drawer), s"leaving-$who") Behaviors.same } } } // 管家Actor def prepareToLeaveHome(whoIsLeaving: String, replyTo: ActorRef[ReadyToLeaveHome], keyCabinet: ActorRef[KeyCabinet.GetKeys], drawer: ActorRef[Drawer.GetWallet]): Behavior[NotUsed] = { Behaviors.setup[AnyRef] { context => var wallet: Option[Wallet] = None var keys: Option[Keys] = None keyCabinet ! KeyCabinet.GetKeys(whoIsLeaving, context.self.narrow[Keys]) drawer ! Drawer.GetWallet(whoIsLeaving, context.self.narrow[Wallet]) Behaviors.receiveMessage { case w: Wallet => wallet = Some(w) nextBehavior() case k: Keys => keys = Some(k) nextBehavior() case _ => Behaviors.unhandled } def nextBehavior(): Behavior[AnyRef] = (keys, wallet) match { case (Some(w), Some(k)) => // 已取得全部結果 replyTo ! ReadyToLeaveHome(whoIsLeaving, w, k) Behaviors.stopped case _ => Behaviors.same } }.narrow[NotUsed] } }
本模式很是相似每會話子Actor模式,由聚合器負責收集子Actor迴應的信息,再反饋給委託人Actor。
實現與Per Session Child Actor近似,只是在具體代碼上更具通用性而已。其中,context.spawnAnonymous
是起聯結做用的重要步驟。它不只負責孵化聚合器,還要提早準備向子Actor發出Request的閉包,以及將子Actor回覆轉換爲統一的格式的映射閉包。聚合器被啓動後,即開始收集子Actor的回覆,收集完成時即了結止。
// 容許子Actor有不一樣的協議,沒必要向Aggregator妥協 object Hotel1 { final case class RequestQuote(replyTo: ActorRef[Quote]) final case class Quote(hotel: String, price: BigDecimal) } object Hotel2 { final case class RequestPrice(replyTo: ActorRef[Price]) final case class Price(hotel: String, price: BigDecimal) } object HotelCustomer { sealed trait Command final case class AggregatedQuotes(quotes: List[Quote]) extends Command // 將子Actor的回覆封裝成統一的格式 final case class Quote(hotel: String, price: BigDecimal) def apply(hotel1: ActorRef[Hotel1.RequestQuote], hotel2: ActorRef[Hotel2.RequestPrice]): Behavior[Command] = { Behaviors.setup[Command] { context => context.spawnAnonymous( // 這個傳遞給聚合器工廠的sendRequests是銜接聚合器及其委託人的關鍵 Aggregator[Reply, AggregatedQuotes]( sendRequests = { replyTo => hotel1 ! Hotel1.RequestQuote(replyTo) hotel2 ! Hotel2.RequestPrice(replyTo) }, expectedReplies = 2, context.self, aggregateReplies = replies => AggregatedQuotes( replies .map { case Hotel1.Quote(hotel, price) => Quote(hotel, price) case Hotel2.Price(hotel, price) => Quote(hotel, price) } .sortBy(_.price) .toList), timeout = 5.seconds)) Behaviors.receiveMessage { case AggregatedQuotes(quotes) => context.log.info("Best {}", quotes.headOption.getOrElse("Quote N/A")) Behaviors.same } } } } object Aggregator { // 用來兼容不一樣子Actor響應而定義的回覆類型 type Reply = Any sealed trait Command private case object ReceiveTimeout extends Command private case class WrappedReply[R](reply: R) extends Command def apply[Reply: ClassTag, Aggregate]( sendRequests: ActorRef[Reply] => Unit, expectedReplies: Int, replyTo: ActorRef[Aggregate], aggregateReplies: immutable.IndexedSeq[Reply] => Aggregate, timeout: FiniteDuration): Behavior[Command] = { Behaviors.setup { context => context.setReceiveTimeout(timeout, ReceiveTimeout) val replyAdapter = context.messageAdapter[Reply](WrappedReply(_)) // 向子Actor發出Request並蒐集整理回覆信息 sendRequests(replyAdapter) def collecting(replies: immutable.IndexedSeq[Reply]): Behavior[Command] = { Behaviors.receiveMessage { case WrappedReply(reply: Reply) => val newReplies = replies :+ reply if (newReplies.size == expectedReplies) { val result = aggregateReplies(newReplies) replyTo ! result Behaviors.stopped } else collecting(newReplies) case ReceiveTimeout => val aggregate = aggregateReplies(replies) replyTo ! aggregate Behaviors.stopped } } collecting(Vector.empty) } } }
這是聚合器模式的一種變形。相似於集羣條件下,每一個Actor承擔着一樣的工做職責,當其中某個Actor未定期響應時,將工做從這個遲延的Actor手裏交給另外一個Actor負責。
💀 這個例子不夠完整,還須要進一步理解,好比爲何sendRequests須要一個Int參數,若是換做OO風格如何實現。
參考文獻 🔗 Achieving Rapid Response Times in Large Online Services
(Int, ActorRef[Reply]) => Boolean
)聯結掐尾器和具體承擔工做的Actor。若是sendRequest成功,說明請求已經發送給承擔工做的子Actor,那麼就調度一條由請求超時限定的單個Request的消息,不然就調度一條由最遲交付超時限定的消息。object TailChopping { sealed trait Command private case object RequestTimeout extends Command private case object FinalTimeout extends Command private case class WrappedReply[R](reply: R) extends Command def apply[Reply: ClassTag]( sendRequest: (Int, ActorRef[Reply]) => Boolean, nextRequestAfter: FiniteDuration, replyTo: ActorRef[Reply], finalTimeout: FiniteDuration, timeoutReply: Reply): Behavior[Command] = { Behaviors.setup { context => Behaviors.withTimers { timers => val replyAdapter = context.messageAdapter[Reply](WrappedReply(_)) sendNextRequest(1) def waiting(requestCount: Int): Behavior[Command] = { Behaviors.receiveMessage { case WrappedReply(reply: Reply) => replyTo ! reply Behaviors.stopped // 單個任務沒能按時完成,另外找人 case RequestTimeout => sendNextRequest(requestCount + 1) // 整個工做交付不了,抱歉 case FinalTimeout => replyTo ! timeoutReply Behaviors.stopped } } def sendNextRequest(requestCount: Int): Behavior[Command] = { if (sendRequest(requestCount, replyAdapter)) { timers.startSingleTimer(RequestTimeout, nextRequestAfter) } else { timers.startSingleTimer(FinalTimeout, finalTimeout) } waiting(requestCount) } } } } }
使用定時器,在指定時限到期時給本身發送一條指定的消息。
object Buncher { sealed trait Command final case class ExcitingMessage(message: String) extends Command final case class Batch(messages: Vector[Command]) private case object Timeout extends Command private case object TimerKey def apply(target: ActorRef[Batch], after: FiniteDuration, maxSize: Int): Behavior[Command] = { Behaviors.withTimers(timers => new Buncher(timers, target, after, maxSize).idle()) } } class Buncher( timers: TimerScheduler[Buncher.Command], target: ActorRef[Buncher.Batch], after: FiniteDuration, maxSize: Int) { private def idle(): Behavior[Command] = { Behaviors.receiveMessage[Command] { message => timers.startSingleTimer(TimerKey, Timeout, after) active(Vector(message)) } } def active(buffer: Vector[Command]): Behavior[Command] = { Behaviors.receiveMessage[Command] { // 收到定時器發來的Timeout消息,緩衝區buffer中止接收,將結果回覆給target。 case Timeout => target ! Batch(buffer) idle() // 時限到達前,新建緩衝區並把消息存入,直到緩衝區滿 case m => val newBuffer = buffer :+ m if (newBuffer.size == maxSize) { timers.cancel(TimerKey) target ! Batch(newBuffer) idle() } else active(newBuffer) } } }
調度週期有兩種:一種是FixedDelay:指定先後兩次消息發送的時間間隔;一種是FixedRate:指定兩次任務執行的時間間隔。若是實難選擇,建議使用FixedDelay。(❗ 此處Task等價於一次消息處理過程,可見對Akka裏的各類術語還需進一步規範。)
區別主要在於:Delay不會補償兩次消息間隔之間因各類緣由致使的延誤,先後兩條消息的間隔時間是固定的,而不會關心前一條消息是什麼時候才交付處理的;而Rate會對這之間的延誤進行補償,後一條消息發出的時間會根據前一條消息交付處理的時間而肯定。(💀 換句話說,Delay以發出時間計,Rate以開始處理的時間計。)
長遠來看,Delay方式下的消息處理的頻率一般會略低於指定延遲的倒數,因此更適合短頻快的工做;Rate方式下的消息處理頻率剛好是指定間隔的倒數,因此適合注重完整執行次數的工做。
⚠️ 在Rate方式下,若是任務延遲超出了預設的時間間隔,則將在前一條消息以後當即發送下一條消息。好比scheduleAtFixedRate的間隔爲1秒,而消息處理過程因長時間暫停垃圾回收等緣由形成JVM被掛起30秒鐘,則ActorSystem將快速地連續發送30條消息進行追趕,從而形成短期內的消息爆發,因此通常狀況下Delay方式更被推崇。
在集羣條件下,一般採用的在Request中傳遞本Shard Actor之ActorRef的方法仍舊適用。但若是該Actor在發出Request後被移動或鈍化(指Actor暫時地關閉本身以節約內存,須要時再重啓),則回覆的Response將會所有發至Dead Letters。此時,引入EntityId做爲標識,取代ActorRef以解決之(參見RMP-68)。缺點是沒法再使用消息適配器。
⚠️ RMP-77:Actor的內部狀態不會隨Actor對象遷移,因此須要相應持久化機制來恢復Actor對象的狀態。
把一般設計中的ActorRef換成EntityId,再使用TypeKey和EntityId定位Actor的引用便可。
object CounterConsumer { sealed trait Command final case class NewCount(count: Long) extends Command val TypeKey: EntityTypeKey[Command] = EntityTypeKey[Command]("example-sharded-response") } object Counter { trait Command case object Increment extends Command final case class GetValue(replyToEntityId: String) extends Command val TypeKey: EntityTypeKey[Command] = EntityTypeKey[Command]("example-sharded-counter") private def apply(): Behavior[Command] = Behaviors.setup { context => counter(ClusterSharding(context.system), 0) } private def counter(sharding: ClusterSharding, value: Long): Behavior[Command] = Behaviors.receiveMessage { case Increment => counter(sharding, value + 1) case GetValue(replyToEntityId) => val replyToEntityRef = sharding.entityRefFor(CounterConsumer.TypeKey, replyToEntityId) replyToEntityRef ! CounterConsumer.NewCount(value) Behaviors.same } }
默認狀況下,當Actor在初始化或處理消息時觸發了異常、失敗,則該Actor將被中止(⚠️ 傳統Akka默認是重啓Actor)。
要區別校驗錯誤與失敗:校驗錯誤Validate Error意味着發給Actor的Command自己就是無效的,因此將其界定爲Protocol規範的內容,由發件人嚴格遵照,這遠甚過收件人發現收到的是無效Command後直接拋出異常。失敗Failure則是因爲Actor不可控的外因致使的,這一般沒法成爲雙方Protocol的一部分,發件人對此也無能爲力。
發生失敗時,一般採起「就讓它崩」的原則。其思路在於,與其花費心思零敲碎打地在局部進行細粒度的修復和內部狀態糾正,不如就讓它崩潰中止,而後利用已有的災備方案,重建一個確定有效的新Actor從新來過。
監管就是一個放置災備方案的好地方。默認監視策略是在引起異常時中止Actor,若是要自定義此策略,則應在spawn子Actor時,使用Behaviors.supervise進行指定。
策略有許多可選參數,也能夠象下面這樣進行嵌套,以應對不一樣的異常類型。
Behaviors.supervise( Behaviors.supervise(behavior) .onFailure[IllegalStateException](SupervisorStrategy.restart)) .onFailure[IllegalArgumentException](SupervisorStrategy.stop)
⚠️ 若Actor被重啓,則傳遞給Behaviors.supervise的Behavior內定義的可變狀態就須要在相似Behaviors.setup這樣的工廠方法中進行初始化。若採用OO風格,則推薦在setup中完成初始化;若採用FP風格,因爲一般不存在函數內的可變量,因此無需如此。
🔗 完整列表參見API指南:SupervisorStrategy
第二個放置災備的地方是Behaviors.setup裏。由於當父Actor重啓時,其Behaviors.setup會再次執行。同時,子Actor會隨父Actor重啓而中止運行,以防止資源泄漏等問題發生。
注意區別如下兩種方式:
這種方式下,每當父Actor重啓時,就會徹底重構一次子Actor,從而老是回到父Actor剛建立時候的樣子。
def child(size: Long): Behavior[String] = Behaviors.receiveMessage(msg => child(size + msg.length)) def parent: Behavior[String] = { Behaviors .supervise[String] { // setup被supervise包裹,意味着每次父Actor重啓,該setup必被從新執行 Behaviors.setup { ctx => val child1 = ctx.spawn(child(0), "child1") val child2 = ctx.spawn(child(0), "child2") Behaviors.receiveMessage[String] { msg => val parts = msg.split(" ") child1 ! parts(0) child2 ! parts(1) Behaviors.same } } } .onFailure(SupervisorStrategy.restart) }
這種方式下,子Actor不會受到父Actor的重啓影響,它們既不會中止,更不會被重建。
def parent2: Behavior[String] = { Behaviors.setup { ctx => // 此setup只會在父Actor建立時運行一次 val child1 = ctx.spawn(child(0), "child1") val child2 = ctx.spawn(child(0), "child2") Behaviors .supervise { // 在父Actor重啓時,只有這段receiveMessage工廠會被執行 Behaviors.receiveMessage[String] { msg => val parts = msg.split(" ") child1 ! parts(0) child2 ! parts(1) Behaviors.same } } // 參數false決定了父Actor重啓時不會中止子Actor .onFailure(SupervisorStrategy.restart.withStopChildren(false)) } }
第三個放置災備方案的地方是在PreRestart信號處理過程裏。和以前提過的PostStop信號同樣,Actor因監測而重啓前,會收到一個信號PreRestart信號,方便Actor自身在重啓前完成清理掃尾工做。
💀 RMP-47的對傳統Akka的描述適用於Akka Typed嗎?
- PreStart:在Actor啓動前觸發
- PostStop:在Actor中止後觸發
- PreRestart:在重啓Actor前觸發,完成任務後會觸發PostStop
- PostRestart:在Actor重啓後觸發,完成任務後會觸發PreStart
在傳統Akka裏,子Actor觸發的異常將被上交給父Actor,由後者決定如何處置。而在Akka Typed裏,提供了更豐富的手段處理這種狀況。
方法就是由父Actor觀察(watch)子Actor,這樣當子Actor因失敗而中止時,父Actor將會收到附上緣由的ChildFailed信號。特別地,ChildFailed信號派生自Terminated,因此若是業務上不須要刻意區分的話,處理Terminated信號便可。
在子Actor觸發異常後,若是它的祖先Actor(不只僅是父親)沒有處理Terminated信號,那麼將會觸發akka.actor.typed.DeathPactException異常。
📎 示例裏用Boss -> MiddleManagement -> Work這樣的層級進行了演示。當Boss發出Fail消息後,MiddleManagement將消息轉發給Work,Work收到Fail消息後拋出異常。因MiddleManagement和Boss均未對Terminated信號進行處理,所以相繼中止。隨後Boss按預約策略重啓,並順次重建MiddleManagement和Work,從而確保測試腳本嘗試在等候200毫秒後從新發送消息Hello成功。
除了經過建立Actor得到其引用外,還能夠經過接線員Receptionist獲取Actor的引用。
Receptionist採用了註冊會員制,註冊過程還是基於Akka Protocol。在Receptionist上註冊後的會員都持有key,方便集羣上的其餘Actor經過key找到它。當發出Find請求後,Receptionist會回覆一個Listing,其中將包括一個由若干符合條件的Actor組成的集合。(⚠️ 同一個key能夠對應多個Actor)
由Receptionist維護的註冊表是動態的,其中的Actor可能因其中止運行、手動從表中註銷或是節點從集羣中刪除而從表中消失。若是須要關注這種動態變化,能夠使用Receptionist.Subscribe(keyOfActor, replyTo)訂閱關注的Actor,Receptionist會在註冊表變化時將Listing消息發送給replyTo。
⚠️ 切記:上述操做均是基於異步消息的,因此操做不是即時產生結果的。可能發出註銷請求了,但Actor還在註冊表裏。
要點:
ServiceKey[Message]("name")
建立Keycontext.system.receptionist ! Receptionist.Register(key, replyTo)
註冊Actor,用Deregister註銷context.system.receptionist ! Receptionist.Subscribe(key, replyTo)
訂閱註冊表變更事件context.system.receptionist ! Receptionist.Find(key, messageAdapter)
查找指定key對應的若干Actor在集羣條件下,一個Actor註冊到本地節點的接線員後,其餘節點上的接線員也會經過分佈式數據廣播獲悉,從而保證全部節點都能經過ServiceKey找到相同的Actor們。
但須要注意集羣條件下與本地環境之間的差異:一是在集羣條件下進行的Subscription與Find將只能獲得可達Actor的集合。若是須要得到全部的已註冊Actor(包括不可達的Actor),則得經過Listing.allServiceInstances得到。二是在集羣內各節點之間傳遞的消息,都須要通過序列化。
接線員沒法擴展到任意數量、也達不到異常高吞吐的接轉要求,它一般最多就支持數千至上萬的接轉量。因此,若是應用確實須要超過Akka框架所能提供的接轉服務水平的,就得本身去解決各節點Actor初始化鏈接的難題。
儘管Actor在任意時刻只能處理一條消息,但這不併妨礙同時有多個Actor處理同一條消息,這即是Akka的路由功能使然。
路由器自己也是一種Actor,但主要職責是轉發消息而不是處理消息。與傳統Akka同樣,Akka Typed的路由也分爲兩種:池路由池與組路由。
在池路由方式下,由Router負責構建並管理全部的Routee。當這些做爲子actor的Routee終止時,Router將會把它從Router中移除。當全部的Routee都移除後,Router自己中止運行。
val pool = Routers.pool(poolSize = 4)(Behaviors.supervise(Worker()).onFailure[Exception](SupervisorStrategy.restart))
定義池路由,其中監管策略應是必不可少的內容,被監管的Worker()便是Routee,poolSize則是池中最多能建立並管理的Routee數目。val router = ctx.spawn(pool, "worker-pool")
建立路由器自己。因爲Router自己也是Actor,Routee是其子Actor,所以能夠指定其消息分發器。(💀 Router中以with開頭的API還有很多,須要仔細參考API文檔。)
// 指定Routee使用默認的Blocking IO消息分發器 val blockingPool = pool.withRouteeProps(routeeProps = DispatcherSelector.blocking()) // 指定Router使用與其父Actor一致的消息分發器 val blockingRouter = ctx.spawn(blockingPool, "blocking-pool", DispatcherSelector.sameAsParent()) // 使用輪循策略分發消息,保證每一個Routee都儘可能得到一樣數量的任務,這是池路由默認策略 // 示例將得到a-b-a-b順序的日誌 val alternativePool = pool.withPoolSize(2).withRoundRobinRouting()
📌 在學習Akka Typed的過程當中,應引發重視和警醒的是,不能象傳統Akka同樣執着於定義Actor的Class或Object自己,而應該牢牢圍繞Behavior來思考、認識和設計系統。
在Akka Typed的世界裏,包括Behaviors各式工廠在內的許多API均是以Behavior爲核心進行設計的。而Behavior又與特定類型的Message綁定,這便意味着Behavior與Protocol進行了綁定,因而消息Message及處理消息的Behavior[Message]便構成了完整的Protocol。
與池路由不一樣的是,組路由方式下的Routee均由外界其它Actor產生(自行建立、自行管理),Router只是負責將其編組在一塊兒。
組路由基於ServiceKey和Receptionist,管理着屬於同一個key的若干個Routee。雖然這種方式下對Routee構建和監控將更靈活和便捷,但也意味着組路由將徹底依賴Receptionist維護的註冊表才能工做。在Router啓動之初,當註冊表仍是空白時,發來的消息將做爲akka.actor.Dropped扔到事件流中。當註冊表中註冊有Routee後,若其可達,則消息將順利送達,不然該Routee將被標記爲不可達。
輪循策略 Round Robin
輪循策略將公平調度各Routee,平均分配任務,因此適合於Routee數目不會常常變化的場合,是池路由的默認策略。它有一個可選的參數preferLocalRoutees
,爲true時將強制只使用本地的Routee(默認值爲false)。
隨機策略 Random
隨機策略將隨機選取Routee分配任務,適合Routee數目可能會變化的場合,是組路由的默認策略。它一樣有可靠參數preferLocalRoutees
。
一致的散列策略 Consistent Hashing
散列策略將基於一張以傳入消息爲鍵的映射表選擇Routee。
🔗 參考文獻:Consistent Hashing
💀 該文只展現瞭如何設計一個ConsistentHash[T]類,並提供add/remove/get等API函數,卻沒講怎麼使用它,因此須要完整示例!
若是把Routee看做CPU的核心,那天然是多多益善。但因爲Router自己也是一個Actor,因此其Mailbox的承載能力反而會成爲整個路由器的瓶頸,而Akka Typed並未就此提供額外方案,所以遇到須要更高吞吐量的場合則須要本身去解決。
Stash(暫存),是指Actor將當前Behavior暫時還不能處理的消息所有或部分緩存起來,等完成初始化等準備工做或是處理完上一條冪等消息後,再切換至匹配的Behavior,從緩衝區取出消息進行處理的過程。
trait DB { def save(id: String, value: String): Future[Done] def load(id: String): Future[String] } object DataAccess { sealed trait Command final case class Save(value: String, replyTo: ActorRef[Done]) extends Command final case class Get(replyTo: ActorRef[String]) extends Command private final case class InitialState(value: String) extends Command private case object SaveSuccess extends Command private final case class DBError(cause: Throwable) extends Command // 使用Behaviors.withStash(capacity)設置Stash容量 // 隨後切換到初始Behavior start() def apply(id: String, db: DB): Behavior[Command] = { Behaviors.withStash(100) { buffer => Behaviors.setup[Command] { context => new DataAccess(context, buffer, id, db).start() } } } } // 大量使用context.pipeToSelf進行Future交互 class DataAccess( context: ActorContext[DataAccess.Command], buffer: StashBuffer[DataAccess.Command], id: String, db: DB) { import DataAccess._ private def start(): Behavior[Command] = { context.pipeToSelf(db.load(id)) { case Success(value) => InitialState(value) case Failure(cause) => DBError(cause) } Behaviors.receiveMessage { case InitialState(value) => // 完成初始化,轉至Behavior active()開始處理消息 buffer.unstashAll(active(value)) case DBError(cause) => throw cause case other => // 正在處理冪等消息,故暫存後續消息 buffer.stash(other) Behaviors.same } } // Behaviors.receiveMessagePartial():從部分消息處理程序構造一個Behavior // 該行爲將把未定義的消息視爲未處理。 private def active(state: String): Behavior[Command] = { Behaviors.receiveMessagePartial { case Get(replyTo) => replyTo ! state Behaviors.same // 處理冪等的Save消息 case Save(value, replyTo) => context.pipeToSelf(db.save(id, value)) { case Success(_) => SaveSuccess case Failure(cause) => DBError(cause) } // 轉至Behavior saving(),反饋冪等消息處理結果 saving(value, replyTo) } } private def saving(state: String, replyTo: ActorRef[Done]): Behavior[Command] = { Behaviors.receiveMessage { case SaveSuccess => replyTo ! Done // 冪等消息處理結束並已反饋結果,轉至Behavior active()開始處理下一條消息 buffer.unstashAll(active(state)) case DBError(cause) => throw cause case other => buffer.stash(other) Behaviors.same } } }
注意事項
StashOverflowException
異常。因此在往緩衝區裏暫存消息前,應當使用StashBuffer.isFull
提早進行檢測。unstashAll()
將會中止Actor響應新的消息,直到當前暫存的全部消息被處理完畢,但這有可能因長時間佔用消息處理線程而致使其餘Actor陷入飢餓狀態。爲此,可改用方法unstash(numberOfMessages)
,確保一次只處理有限數量的暫存消息。有限狀態機:當前處於狀態S,發生E事件後,執行操做A,而後狀態將轉換爲S’。
這部份內容對應傳統Akka的FSM:Finite State Machine,可參考RMP及下文
📎 參考示例:哲學家用餐問題,及其解析:🔗 Dining Hakkers
object Buncher { // 把FSM裏驅動狀態改變的事件,都用Message代替了 sealed trait Event final case class SetTarget(ref: ActorRef[Batch]) extends Event final case class Queue(obj: Any) extends Event case object Flush extends Event private case object Timeout extends Event // 狀態 sealed trait Data case object Uninitialized extends Data final case class Todo(target: ActorRef[Batch], queue: immutable.Seq[Any]) extends Data final case class Batch(obj: immutable.Seq[Any]) // 初始狀態爲Uninitialized,對應初始的Behavior爲idle() def apply(): Behavior[Event] = idle(Uninitialized) private def idle(data: Data): Behavior[Event] = Behaviors.receiveMessage[Event] { message: Event => (message, data) match { case (SetTarget(ref), Uninitialized) => idle(Todo(ref, Vector.empty)) case (Queue(obj), t @ Todo(_, v)) => active(t.copy(queue = v :+ obj)) case _ => Behaviors.unhandled } } // 處於激活狀態時,對應Behavior active() private def active(data: Todo): Behavior[Event] = Behaviors.withTimers[Event] { timers => // 設置超時條件 timers.startSingleTimer(Timeout, 1.second) Behaviors.receiveMessagePartial { case Flush | Timeout => data.target ! Batch(data.queue) idle(data.copy(queue = Vector.empty)) case Queue(obj) => active(data.copy(queue = data.queue :+ obj)) } } }
在Akka Typed裏,因爲Protocol和Behavior的出現,簡化了傳統Akka中有限狀態機FSM的實現。不一樣的狀態下,對應不一樣的Behavior,響應不一樣的請求,成爲Akka Typed的典型做法,這在此前的大量示例裏已經有所展現。
CoordinatedShutdown是一個擴展,經過提早註冊好的任務Task,能夠在系統關閉前完成一些清理掃尾工做,防止資源泄漏等問題產生。
關閉過程當中,默認的各階段(Phase)都定義在下面這個akka.coordinated-shutdown.phases
裏,各Task則後續再添加至相應的階段中。
在application.conf配置裏,能夠經過定義不一樣的depends-on來覆蓋缺省的設置。其中,before-service-unbind
、before-cluster-shutdown
和before-actor-system-terminate
是最常被覆蓋的。
各Phase原則上按照被依賴者先於依賴者的順序執行,從而構成一個有向無環圖(Directed Acyclic Graph,DAG),最終全部Phase按DAG的拓撲順序執行。
# CoordinatedShutdown is enabled by default and will run the tasks that # are added to these phases by individual Akka modules and user logic. # # The phases are ordered as a DAG by defining the dependencies between the phases # to make sure shutdown tasks are run in the right order. # # In general user tasks belong in the first few phases, but there may be use # cases where you would want to hook in new phases or register tasks later in # the DAG. # # Each phase is defined as a named config section with the # following optional properties: # - timeout=15s: Override the default-phase-timeout for this phase. # - recover=off: If the phase fails the shutdown is aborted # and depending phases will not be executed. # - enabled=off: Skip all tasks registered in this phase. DO NOT use # this to disable phases unless you are absolutely sure what the # consequences are. Many of the built in tasks depend on other tasks # having been executed in earlier phases and may break if those are disabled. # depends-on=[]: Run the phase after the given phases phases { # The first pre-defined phase that applications can add tasks to. # Note that more phases can be added in the application's # configuration by overriding this phase with an additional # depends-on. before-service-unbind { } # Stop accepting new incoming connections. # This is where you can register tasks that makes a server stop accepting new connections. Already # established connections should be allowed to continue and complete if possible. service-unbind { depends-on = [before-service-unbind] } # Wait for requests that are in progress to be completed. # This is where you register tasks that will wait for already established connections to complete, potentially # also first telling them that it is time to close down. service-requests-done { depends-on = [service-unbind] } # Final shutdown of service endpoints. # This is where you would add tasks that forcefully kill connections that are still around. service-stop { depends-on = [service-requests-done] } # Phase for custom application tasks that are to be run # after service shutdown and before cluster shutdown. before-cluster-shutdown { depends-on = [service-stop] } # Graceful shutdown of the Cluster Sharding regions. # This phase is not meant for users to add tasks to. cluster-sharding-shutdown-region { timeout = 10 s depends-on = [before-cluster-shutdown] } # Emit the leave command for the node that is shutting down. # This phase is not meant for users to add tasks to. cluster-leave { depends-on = [cluster-sharding-shutdown-region] } # Shutdown cluster singletons # This is done as late as possible to allow the shard region shutdown triggered in # the "cluster-sharding-shutdown-region" phase to complete before the shard coordinator is shut down. # This phase is not meant for users to add tasks to. cluster-exiting { timeout = 10 s depends-on = [cluster-leave] } # Wait until exiting has been completed # This phase is not meant for users to add tasks to. cluster-exiting-done { depends-on = [cluster-exiting] } # Shutdown the cluster extension # This phase is not meant for users to add tasks to. cluster-shutdown { depends-on = [cluster-exiting-done] } # Phase for custom application tasks that are to be run # after cluster shutdown and before ActorSystem termination. before-actor-system-terminate { depends-on = [cluster-shutdown] } # Last phase. See terminate-actor-system and exit-jvm above. # Don't add phases that depends on this phase because the # dispatcher and scheduler of the ActorSystem have been shutdown. # This phase is not meant for users to add tasks to. actor-system-terminate { timeout = 10 s depends-on = [before-actor-system-terminate] } }
一般應在系統啓動後儘早註冊任務,不然添加得太晚的任務將不會被運行。
向同一個Phase添加的任務將並行執行,沒有前後之分。
下一個Phase會一般會等待上一個Phase裏的Task都執行完畢或超時後纔會啓動。能夠爲Phase配置recover = off
,從而在Task失敗或超時後,停止整個系統的關機過程。
一般狀況下,使用CoordinatedShutdown(system).addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "someTaskName") { ... }
向Phase中添加Task,此處的名稱主要用做調試或者日誌。
使用CoordinatedShutdown(system).addCancellableTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "cleanup") { () => Future { ... } }
添加可取消的Task,以後能夠用c.cancel()取消Task的執行。
一般狀況下,不須要Actor回覆Task已完成的消息,由於這會拖慢關機進程,直接讓Actor終止運行便可。若是要關注該Task什麼時候完成,能夠使用CoordinatedShutdown(system).addActorTerminationTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "someTaskName", someActor, Some("stop"))
添加任務,而且給這個someActor發送一條消息,隨後watch該Actor的終止即可知曉Task完成狀況。
使用ActorSystem.terminate()
或val done: Future[Done] = CoordinatedShutdown(system).run(CoordinatedShutdown.UnknownReason)
能夠啓動協調關機過程,且屢次調用也只會執行一次。
ActorSystem會在最後一個Phase裏的Task所有執行完畢後關閉,但JVM不必定會中止,除非全部守護進程均已中止運行。經過配置akka.coordinated-shutdown.exit-jvm = on
,能夠強制一併關閉JVM。
在集羣條件下,當節點正在從集羣中離開或退出時,將會自動觸發協調關機。並且系統會自動添加Cluster Singleton和Cluster Sharding等正常退出羣集的任務。
默認狀況下,當經過殺死SIGTERM信號(Ctrl-C對SIGINT不起做用)終止JVM進程時,CoordinatedShutdown也將運行,該默認行爲能夠經過配置akka.coordinated-shutdown.run-by-jvm-shutdown-hook=off
禁用之。
能夠使用CoordinatedShutdown(system).addJvmShutdownHook { ... }
添加JVM Hook任務,以保證其在Akka關機前得以執行。
在測試時,若是不但願啓用協調關機,能夠採用如下配置禁用之:
# Don't terminate ActorSystem via CoordinatedShutdown in tests akka.coordinated-shutdown.terminate-actor-system = off akka.coordinated-shutdown.run-by-actor-system-terminate = off akka.coordinated-shutdown.run-by-jvm-shutdown-hook = off akka.cluster.run-coordinated-shutdown-when-down = off
MessageDispatcher是Akka的心臟,是它驅動着整個ActorSystem的正常運轉,而且爲全部的Actor提供了執行上下文ExecutionContext,方便在其中執行代碼、進行Future回調等等。
默認Dispatcher
每一個ActorSystem都有一個默認的Dispatcher,能夠在akka.actor.default-dispatcher
配置中細調,其默認的執行器Executor類型爲 「fork-join-executor」,這在絕大多數狀況下都能提供優越的性能,也能夠在akka.actor.default-dispatcher.executor
一節中進行設置。
內部專用Dispatcher
爲保護Akka各模塊內部維護的Actor,有一個獨立的內部專用Dispatcher。它能夠在akka.actor.internal-dispatcher
配置中細調,也能夠設置akka.actor.internal-dispatcher爲其餘Dispatcher名字(別名)來替換之。
查找指定的Dispatcher
Dispatcher均實現了ExecutionContext接口,因此象這樣val executionContext = context.system.dispatchers.lookup(DispatcherSelector.fromConfig("my-dispatcher"))
就可加載不一樣的Dispatcher。
選擇指定的Dispatcher
// 爲新的Actor使用默認Dispatcher context.spawn(yourBehavior, "DefaultDispatcher") context.spawn(yourBehavior, "ExplicitDefaultDispatcher", DispatcherSelector.default()) // 爲不支持Future的阻塞調用(好比訪問一些老式的數據庫),使用blocking Dispatcher context.spawn(yourBehavior, "BlockingDispatcher", DispatcherSelector.blocking()) // 使用和父Actor同樣的Dispatcher context.spawn(yourBehavior, "ParentDispatcher", DispatcherSelector.sameAsParent()) // 從配置加載指定的Dispatcher context.spawn(yourBehavior, "DispatcherFromConfig", DispatcherSelector.fromConfig("your-dispatcher"))
your-dispatcher { type = Dispatcher executor = "thread-pool-executor" thread-pool-executor { fixed-pool-size = 32 } throughput = 1 }
對比 | Dispatcher | PinnedDispatcher |
---|---|---|
線程池 | 事件驅動,一組Actor共用一個線程池。 | 每一個Actor都擁有專屬的一個線程池,池中只有一個線程。 |
能否被共享 | 沒有限制 | 不可共享 |
郵箱 | 每一個Actor擁有一個 | 每一個Actor擁有一個 |
適用場景 | 是Akka默認的Dispatcher, 支持隔板 | 支持隔板 |
驅動 | 由java.util.concurrent.ExecutorService 驅動。使用fork-join-executor、thread-pool-executor或基於akka.dispatcher.ExecutorServiceConfigurator實現的徹底限定類名,可指定其使用的executor。 |
由任意的akka.dispatch.ThreadPoolExecutorConfigurator 驅動,默認執行器爲thread-pool-executor 。 |
一個Fork-Join執行器示例:
my-dispatcher { # Dispatcher is the name of the event-based dispatcher type = Dispatcher # What kind of ExecutionService to use executor = "fork-join-executor" # Configuration for the fork join pool fork-join-executor { # Min number of threads to cap factor-based parallelism number to parallelism-min = 2 # Parallelism (threads) ... ceil(available processors * factor) parallelism-factor = 2.0 # Max number of threads to cap factor-based parallelism number to parallelism-max = 10 } # Throughput defines the maximum number of messages to be # processed per actor before the thread jumps to the next actor. # Set to 1 for as fair as possible. throughput = 100 }
📎 講解阻塞危害的參考視頻:
Managing Blocking in Akka video,及其示例代碼:https://github.com/raboof/akka-blocking-dispatcher
在使用默認Dispatcher的狀況下,多個Actor共用一個線程池,因此當其中一些Actor因被阻塞而佔用線程後,有可能致使可用線程耗盡,而使其餘同組的Actor陷入線程飢餓狀態。
監測工具推薦:YourKit,VisualVM,Java Mission Control,Lightbend出品的Thread Starvation Detector等等。
示例使用了兩個Actor做對比,在(1 to 100)的循環裏,新建的一個Actor在消息處理函數中sleep 5秒,致使同時新建的另外一個Actor沒法得到線程處理消息而卡住。
針對上述狀況,首先可能想到的象下面這樣,用Future來封裝這樣的長時調用,但這樣的想法實際上過於簡單。由於仍舊使用了由全體Actor共用的ExecutionContext做爲Future的執行上下文,因此隨着應用程序的負載不斷增長,內存和線程都會飛快地被耗光。
object BlockingFutureActor { def apply(): Behavior[Int] = Behaviors.setup { context => implicit val executionContext: ExecutionContext = context.executionContext Behaviors.receiveMessage { i => triggerFutureBlockingOperation(i) Behaviors.same } } def triggerFutureBlockingOperation(i: Int)(implicit ec: ExecutionContext): Future[Unit] = { println(s"Calling blocking Future: $i") Future { Thread.sleep(5000) //block for 5 seconds println(s"Blocking future finished $i") } } }
正確的解決方案,是爲全部的阻塞調用提供一個獨立的Dispatcher,這種技巧被稱做「隔板 bulk-heading」或者「隔離阻塞 isolating blocking」。
在application.conf裏對Dispatcher進行以下配置,其中thread-pool-executor.fixed-pool-size
的數值可根據實際負載狀況進行微調:
my-blocking-dispatcher { type = Dispatcher executor = "thread-pool-executor" thread-pool-executor { fixed-pool-size = 16 } throughput = 1 }
隨後,使用該配置替換掉前述代碼第4行加載的默認Dispatcher
implicit val executionContext: ExecutionContext = context.system.dispatchers.lookup(DispatcherSelector.fromConfig("my-blocking-dispatcher"))
以上即是處理響應性應用程序中阻塞問題的推薦方法。對有關Akka HTTP中阻塞調用的相似討論,請參閱🔗 Handling blocking operations in Akka HTTP。
其餘一些建議:
固定的線程池大小
blocking-io-dispatcher { type = Dispatcher executor = "thread-pool-executor" thread-pool-executor { fixed-pool-size = 32 } throughput = 1 }
根據CPU核心數設置線程池大小
my-thread-pool-dispatcher { # Dispatcher is the name of the event-based dispatcher type = Dispatcher # What kind of ExecutionService to use executor = "thread-pool-executor" # Configuration for the thread pool thread-pool-executor { # minimum number of threads to cap factor-based core number to core-pool-size-min = 2 # No of core threads ... ceil(available processors * factor) core-pool-size-factor = 2.0 # maximum number of threads to cap factor-based number to core-pool-size-max = 10 } # Throughput defines the maximum number of messages to be # processed per actor before the thread jumps to the next actor. # Set to 1 for as fair as possible. throughput = 100 }
PinnedDispatcher
my-pinned-dispatcher { executor = "thread-pool-executor" type = PinnedDispatcher }
因爲Actor每次得到的不必定都是同一個線程,因此當確有必要時,能夠設置thread-pool-executor.allow-core-timeout=off
,以確保始終使用同一線程。
設置線程關閉超時
不管是fork-join-executor仍是thread-pool-executor,線程都將在無人使用時被關閉。若是想設置一個稍長點的時間,可進行以下調整。特別是當該Executor只是做爲執行上下文使用(好比只進行Future調用),而沒有關聯Actor時更應如此,不然默認的1秒將會致使整個線程池過分頻繁地被關閉。
my-dispatcher-with-timeouts { type = Dispatcher executor = "thread-pool-executor" thread-pool-executor { fixed-pool-size = 16 # Keep alive time for threads keep-alive-time = 60s # Allow core threads to time out allow-core-timeout = off } # How long time the dispatcher will wait for new actors until it shuts down shutdown-timeout = 60s }
郵箱是Actor接收待處理消息的隊列,默認是沒有容量上限的。但當Actor的處理消息的速度低於消息送達的速度時,就有必要設置郵箱的容量上限了,這樣當有更多消息到達時,將被轉投至系統的DeadLetter。
若是沒有特別指定,將使用默認的郵箱SingleConsumerOnlyUnboundedMailbox
。不然在context.spawn時指定,且配置可從配置文件中動態加載。
context.spawn(childBehavior, "bounded-mailbox-child", MailboxSelector.bounded(100)) val props = MailboxSelector.fromConfig("my-app.my-special-mailbox") context.spawn(childBehavior, "from-config-mailbox-child", props)
my-app { my-special-mailbox { mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox" } }
非阻塞類型的郵箱
郵箱 | 內部實現 | 有否上限 | 配置名稱 |
---|---|---|---|
SingleConsumerOnlyUnboundedMailbox(默認) | 一個多生產者-單消費者隊列,不能與BalancingDispatcher搭配 | 否 | akka.dispatch.SingleConsumerOnlyUnboundedMailbox |
UnboundedMailbox | 一個java.util.concurrent.ConcurrentLinkedQueue | 否 | unbounded 或 akka.dispatch.UnboundedMailbox |
NonBlockingBoundedMailbox | 一個高效的多生產者-單消費者隊列 | 是 | akka.dispatch.NonBlockingBoundedMailbox |
UnboundedControlAwareMailbox akka.dispatch.ControlMessage派生的控制消息將被優先投遞 |
兩個java.util.concurrent.ConcurrentLinkedQueue | 否 | akka.dispatch.UnboundedControlAwareMailbox |
UnboundedPriorityMailbox 不保證同優先級消息的投遞順序 |
一個java.util.concurrent.PriorityBlockingQueue | 否 | akka.dispatch.UnboundedPriorityMailbox |
UnboundedStablePriorityMailbox 嚴格按FIFO順序投遞同優先級消息 |
一個使用akka.util.PriorityQueueStabilizer包裝的java.util.concurrent.PriorityBlockingQueue | 否 | akka.dispatch.UnboundedStablePriorityMailbox |
阻塞類型的郵箱:若mailbox-push-timeout-time設置爲非零時將阻塞,不然不阻塞
郵箱 | 內部實現 | 有否上限 | 配置名稱 |
---|---|---|---|
BoundedMailbox | 一個java.util.concurrent.LinkedBlockingQueue | 是 | bounded 或 akka.dispatch.BoundedMailbox |
BoundedPriorityMailbox 不保證同優先級消息的投遞順序 |
一個使用akka.util.BoundedBlockingQueue包裝的java.util.PriorityQueue | 是 | akka.dispatch.BoundedPriorityMailbox |
BoundedStablePriorityMailbox 嚴格按FIFO順序投遞同優先級消息 |
一個使用akka.util.PriorityQueueStabilizer和akka.util.BoundedBlockingQueue包裝的java.util.PriorityQueue | 是 | akka.dispatch.BoundedStablePriorityMailbox |
BoundedControlAwareMailbox akka.dispatch.ControlMessage派生的控制消息將被優先投遞 |
兩個java.util.concurrent.ConcurrentLinkedQueue,且當塞滿時將阻塞 | 是 | akka.dispatch.BoundedControlAwareMailbox |
若是要本身實現郵箱,則須要從MailboxType派生。該類的構造函數有2個重要參數:一個是ActorSystem.Settings對象,一個是Config的節。後者須要在Dispatcher或者Mailbox的配置中,修改mailbox-type
爲自定義MailboxType的徹底限定名。
💀 標記用trait的需求映射指的是什麼?是必須的嗎?
// Marker trait used for mailbox requirements mapping trait MyUnboundedMessageQueueSemantics object MyUnboundedMailbox { // This is the MessageQueue implementation class MyMessageQueue extends MessageQueue with MyUnboundedMessageQueueSemantics { private final val queue = new ConcurrentLinkedQueue[Envelope]() // these should be implemented; queue used as example def enqueue(receiver: ActorRef, handle: Envelope): Unit = queue.offer(handle) def dequeue(): Envelope = queue.poll() def numberOfMessages: Int = queue.size def hasMessages: Boolean = !queue.isEmpty def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = { while (hasMessages) { deadLetters.enqueue(owner, dequeue()) } } } } // This is the Mailbox implementation class MyUnboundedMailbox extends MailboxType with ProducesMessageQueue[MyUnboundedMailbox.MyMessageQueue] { import MyUnboundedMailbox._ // This constructor signature must exist, it will be called by Akka def this(settings: ActorSystem.Settings, config: Config) = { // put your initialization code here this() } // The create method is called to create the MessageQueue final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = new MyMessageQueue() }
🏭
com.typesafe.akka:akka-actor-testkit-typed_2.13:2.6.5
org.scalatest:scalatest_2.13:3.1.1
測試能夠是在真實的ActorSystem上進行的異步測試,也能夠是在BehaviorTestKit工具提供的測試專用線程上進行的同步測試。
ScalaTest提供了ActorTestKit做爲真實ActorSystem的替代品,經過混入BeforeAndAfterAll
,覆寫其afterAll() = testKit.shutdownTestKit()
,可實現測試後關閉ActorSystem。
經過使用一個固定的testKit實例,能夠直接spawn/stop某個Actor(能夠是匿名的Actor),並以這種方式建立臨時的Mock Actor,用以測試某個Actor的行爲是否符合預期。
同時,ScalaTest提供TestProbe用於接受Actor的回覆,並附上一組probe.expectXXX對Actor的活動進行斷言。
固然,更簡便的方式即是繼承ScalaTestWithActorTestKit並混入AnyFeatureSpecLike之類的trait,從而將注意力徹底集中在測試用例自己,而不用關心ActorSystem如何關閉之類的細枝末節。
ScalaTest的配置從application-test.conf中加載,不然將會自動加載Akka庫自帶的reference.conf配置,而不是應用程序自定義的application.conf。同時,ScalaTest支持用ConfigFactory.load()加載自定義配置文件,或用parseString()直接解決配置字符串,若再附以withFallback()將實現一次性完成配置及其後備的加載。
ConfigFactory.parseString(""" akka.loglevel = DEBUG akka.log-config-on-start = on """).withFallback(ConfigFactory.load())
爲測試與時間線關係密切的Actor活動,ScalaTest提供了手動的定時器ManualTime,能夠象下面這樣測試指定時間點的活動:
class ManualTimerExampleSpec extends ScalaTestWithActorTestKit(ManualTime.config) with AnyWordSpecLike with LogCapturing { val manualTime: ManualTime = ManualTime() "A timer" must { "schedule non-repeated ticks" in { case object Tick case object Tock val probe = TestProbe[Tock.type]() val behavior = Behaviors.withTimers[Tick.type] { timer => // 10ms後纔會調度消息 timer.startSingleTimer(Tick, 10.millis) Behaviors.receiveMessage { _ => probe.ref ! Tock Behaviors.same } } spawn(behavior) // 在9ms時尚未任何消息 manualTime.expectNoMessageFor(9.millis, probe) // 再通過2ms後,收到Tock消息 manualTime.timePasses(2.millis) probe.expectMessage(Tock) // 在10ms以後再沒有消息傳來 manualTime.expectNoMessageFor(10.seconds, probe) } } }
爲了驗證Actor是否發出了某些日誌事件,ScalaTest提供了LoggingTestKit。
LoggingTestKit .error[IllegalArgumentException] .withMessageRegex(".*was rejected.*expecting ascii input.*") .withCustom { event => event.marker match { case Some(m) => m.getName == "validation" case None => false } } .withOccurrences(2) .expect { ref ! Message("hellö") ref ! Message("hejdå") }
爲了集中有序輸出日誌信息,ScalaTest提供了LogCapturing,把日誌和控制檯輸出信息整理在一塊兒,在測試失敗的時候才一次性輸出,方便分析錯誤緣由。具體示例參見交互模式一章。
ScalaTest提供BehaviorTestKit用於Actor的同步測試。
val testKit = BehaviorTestKit(Hello()) // 建立子Actor testKit.run(Hello.CreateChild("child")) testKit.expectEffect(Spawned(childActor, "child")) // 建立匿名的子Actor testKit.run(Hello.CreateAnonymousChild) testKit.expectEffect(SpawnedAnonymous(childActor)) // 用一個InBox模擬Mailbox,方便測試收到的消息 val inbox = TestInbox[String]() testKit.run(Hello.SayHello(inbox.ref)) inbox.expectMessage("hello") // 測試子Actor的InBox testKit.run(Hello.SayHelloToChild("child")) val childInbox = testKit.childInbox[String]("child") childInbox.expectMessage("hello") // 測試匿名子Actor的InBox testKit.run(Hello.SayHelloToAnonymousChild) val child = testKit.expectEffectType[SpawnedAnonymous[String]] val childInbox = testKit.childInbox(child.ref) childInbox.expectMessage("hello stranger")
在如下一些狀況下,不推薦使用BehaviorTestKit(將來可能會逐步改善):
除了Spawned和SpawnedAnonymous,BehaviorTestKit還支持如下一些Effect:
BehaviorTestKit也支持日誌驗證
val testKit = BehaviorTestKit(Hello()) val inbox = TestInbox[String]("Inboxer") testKit.run(Hello.LogAndSayHello(inbox.ref)) testKit.logEntries() shouldBe Seq(CapturedLogEvent(Level.INFO, "Saying hello to Inboxer"))
現階段的Akka Typed的內部,實質仍是由傳統Akka實現的,但將來將會有所改變。目前兩類Akka有如下一些共存的方式:
在導入命名空間時使用別名,以示區別:
import akka.{ actor => classic }
⚠️ 在監管策略方面,因爲Classic默認爲重啓,而Typed爲中止,因此Akka根據Child來決定實際策略。即若是被建立的Child是Classic,則默認採起重啓策略,不然採起中止策略。
// 導入Typed的Adapter幾乎必不可少 import akka.actor.typed.scaladsl.adapter._ val system = akka.actor.ActorSystem("ClassicToTypedSystem") val typedSystem: ActorSystem[Nothing] = system.toTyped val classicActor = system.actorOf(Classic.props()) class Classic extends classic.Actor with ActorLogging { // context.spawn is an implicit extension method val second: ActorRef[Typed.Command] = context.spawn(Typed(), "second") // context.watch is an implicit extension method context.watch(second) // self can be used as the `replyTo` parameter here because // there is an implicit conversion from akka.actor.ActorRef to // akka.actor.typed.ActorRef // An equal alternative would be `self.toTyped` second ! Typed.Ping(self) override def receive = { case Typed.Pong => log.info(s"$self got Pong from ${sender()}") // context.stop is an implicit extension method context.stop(second) case classic.Terminated(ref) => log.info(s"$self observed termination of $ref") context.stop(self) } }
val system = classic.ActorSystem("TypedWatchingClassic") val typed = system.spawn(Typed.behavior, "Typed") object Typed { final case class Ping(replyTo: akka.actor.typed.ActorRef[Pong.type]) sealed trait Command case object Pong extends Command val behavior: Behavior[Command] = Behaviors.setup { context => // context.actorOf is an implicit extension method val classic = context.actorOf(Classic.props(), "second") // context.watch is an implicit extension method context.watch(classic) // illustrating how to pass sender, toClassic is an implicit extension method classic.tell(Typed.Ping(context.self), context.self.toClassic) Behaviors .receivePartial[Command] { case (context, Pong) => // it's not possible to get the sender, that must be sent in message // context.stop is an implicit extension method context.stop(classic) Behaviors.same } .receiveSignal { case (_, akka.actor.typed.Terminated(_)) => Behaviors.stopped } } }
區別 | 函數式編程風格 | 面向對象風格 |
---|---|---|
組成結構 | Singleton Object | Companion Object + AbstractBehavior[Message]派生類 |
工廠apply() | 在工廠方法裏完成Behavior定義及其餘全部工做 | 在Companion Object工廠方法裏採起Behaviors.setup {context => new MyActor(context)} 這樣的方式構造初始化的Behavior,而後把context和其餘必要參數注入給類的構造函數,完成Behavior的連接 |
Actor擴展類 | 沒有派生,因此只能用Behaviors.same | 從AbstractBehavior[Message]派生實例,因此能夠使用this等同於Behaviors.same |
Behavior | 在Singleton Object裏給Behaviors.receive這樣的工廠方法傳入一個函數(閉包)進行定義 | 覆寫派生類的onMessage函數 |
Context | Context與Message一塊兒傳入給receive | 依賴Behaviors.setup等工廠方法傳遞給派生類,所以每實例對應一個context |
狀態 | 給工廠方法傳入參數(一般會把包括context在內的全部參數封裝成一個相似DTO的Class以適當解耦),返回帶新狀態的Behavior | 在AbstractBehavior實例對象的內部維護全部的可變狀態 |
推薦理由 |
|
|
推薦作法:
Classic | Typed |
---|---|
akka-actor | akka-actor-typed |
akka-cluster | akka-cluster-typed |
akka-cluster-sharding | akka-cluster-sharding-typed |
akka-cluster-tools | akka-cluster-typed |
akka-distributed-data | akka-cluster-typed |
akka-persistence | akka-persistence-typed |
akka-stream | akka-stream-typed |
akka-testkit | akka-actor-testkit-typed |
Classic | Typed for Scala |
---|---|
akka.actor | akka.actor.typed.scaladsl |
akka.cluster | akka.cluster.typed |
akka.cluster.sharding | akka.cluster.sharding.typed.scaladsl |
akka.persistence | akka.persistence.typed.scaladsl |
🏭 com.typesafe.akka:akka-cluster-typed_2.13:2.6.5
import akka.actor.typed.delivery._
⚠️ 此模塊目前仍不成熟,不建議在生產環境使用。
確保消息至少投遞一次或剛好投遞一次,是此模塊的核心任務,但Akka框架無法自主實現,由於確認收到消息而且處理之,是屬於業務邏輯的職責,因此必須在應用程序的配合下才能徹底實現。並且,將消息妥投到目標郵箱還只是其中一個步驟(不丟失消息),確保目標Actor在消息到達前還沒有崩潰(消息能被處理)也是其中重要的一環。
一個完整的消息妥投方案,包括髮送消息、檢測丟包、重發消息、防止過載、冪等處理等細節,這些工做絕大部分要由消費消息的一方來承擔。好比消息重發,就要由消費者發現有丟包,而後向生產者提出,限流等其餘一些工做亦是如此。Akka提供瞭如下三種模式(留意關於消息重發的細節):
點對點模式適用於2個單一Actor之間的消息妥投。
Worker Pulling,是若干個Worker根據本身的消費進度,主動從一個WorkManager處拉取任務的模式。
有新Worker加入時
🏭 com.typesafe.akka:akka-cluster-sharding-typed_2.13:2.6.5
Sharding,是在集羣進行了分片後的消息妥投模式,將由Producer與Consumer兩端的ShardingController負責總協調,由ShardingController各自的小弟Controller負責點個端點的通訊。
發送消息到另外一個Entity
從另外一個節點上的Producer發送消息(圖中WorkPullingProducerController有誤,應爲ShardingProducerController)
🏭 com.typesafe.akka:akka-persistence-typed_2.13:2.6.5
須要Producer支持消息重發,就意味着Producer得把發出去的消息保存一段時間,直到確信該消息已被處理後才刪除之,因此能暫存消息的即爲耐用的Producer。Akka爲此提供了一個DurableProducerQueue的具體實現EventSourcedProducerQueue。其中,每一個Producer必須對應一個惟一的PersistenceId。
import akka.persistence.typed.delivery.EventSourcedProducerQueue import akka.persistence.typed.PersistenceId val durableQueue = EventSourcedProducerQueue[ImageConverter.ConversionJob](PersistenceId.ofUniqueId("ImageWorkManager")) val durableProducerController = context.spawn( WorkPullingProducerController( producerId = "workManager", workerServiceKey = ImageConverter.serviceKey, durableQueueBehavior = Some(durableQueue)), "producerController")
除了tell模式,Producer還能夠改用ask模式發出消息,此時用askNext代替requestNext,回覆將被包裝在MessageWithConfirmation裏。
context.ask[MessageWithConfirmation[ImageConverter.ConversionJob], Done]( next.askNextTo, askReplyTo => MessageWithConfirmation(ImageConverter.ConversionJob(resultId, from, to, image), askReplyTo)) { case Success(done) => AskReply(resultId, originalReplyTo, timeout = false) case Failure(_) => AskReply(resultId, originalReplyTo, timeout = true) }
對同處一個JVM上的不一樣Actor,消息將直接發送給對方,而對於跨JVM的消息,則須要序列化成一串二進制字節後傳出,再反序列化恢復成消息對象後接收。Akka推薦使用Jackson和Google Protocol Buffers,且使用後者用於其內部消息的序列化,但也容許使用自定義的序列化器。
序列化的相關配置都保存在akka.actor.serializers
一節,其中指向各類akka.serialization.Serializer
的實現,並使用serialization-bindings
爲特定對象實例時綁定序列化器。因爲對象可能同時繼承了某個trait或者class,因此在判斷應使用哪個序列化器時,一般是找其最特化的那一個。若兩者之間沒有繼承關係,則會觸發警告。
akka { actor { serializers { jackson-json = "akka.serialization.jackson.JacksonJsonSerializer" jackson-cbor = "akka.serialization.jackson.JacksonCborSerializer" proto = "akka.remote.serialization.ProtobufSerializer" myown = "docs.serialization.MyOwnSerializer" } serialization-bindings { "docs.serialization.JsonSerializable" = jackson-json "docs.serialization.CborSerializable" = jackson-cbor "com.google.protobuf.Message" = proto "docs.serialization.MyOwnSerializable" = myown } } }
⚠️ 若是待序列化的消息包含在Scala對象中,則爲了引用這些消息,須要使用標準Java類名稱。對於包含在名爲Wrapper對象中名爲Message的消息,正確的引用是Wrapper $ Message
,而不是Wrapper.Message
。
完整的序列化信息包括三個部分:二進制字節串形式的有效載荷payload,序列化器的SerializerId及其適用類的清單manifest,因此它是自描述的,得以跨JVM使用。
而在啓動ActorSystem時,序列化器由SerializationExtension負責初始化,所以序列化器自己不能從其構造函數訪問SerializationExtension,而只能在完成初始化以後遲一點才能訪問它。
import akka.actor._ import akka.actor.typed.scaladsl.Behaviors import akka.cluster.Cluster import akka.serialization._ val system = ActorSystem("example") // Get the Serialization Extension val serialization = SerializationExtension(system) // Have something to serialize val original = "woohoo" // Turn it into bytes, and retrieve the serializerId and manifest, which are needed for deserialization val bytes = serialization.serialize(original).get val serializerId = serialization.findSerializerFor(original).identifier val manifest = Serializers.manifestFor(serialization.findSerializerFor(original), original) // Turn it back into an object val back = serialization.deserialize(bytes, serializerId, manifest).get
全部的序列化器均派生自akka.serialization.Serializer。
class MyOwnSerializer extends Serializer { // If you need logging here, introduce a constructor that takes an ExtendedActorSystem. // class MyOwnSerializer(actorSystem: ExtendedActorSystem) extends Serializer // Get a logger using: // private val logger = Logging(actorSystem, this) // This is whether "fromBinary" requires a "clazz" or not def includeManifest: Boolean = true // Pick a unique identifier for your Serializer, // you've got a couple of billions to choose from, // 0 - 40 is reserved by Akka itself def identifier = 1234567 // "toBinary" serializes the given object to an Array of Bytes def toBinary(obj: AnyRef): Array[Byte] = { // Put the code that serializes the object here //#... Array[Byte]() //#... } // "fromBinary" deserializes the given array, // using the type hint (if any, see "includeManifest" above) def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = { // Put your code that deserializes here //#... null //#... } }
SerializerId必須是全局惟一的,該Id能夠編碼指定,也能夠在配置中指定:
akka { actor { serialization-identifiers { "docs.serialization.MyOwnSerializer" = 1234567 } } }
默認狀況下,序列化器使用Class指定其適用目標,但也能夠使用字符串名稱指定,具體參見fromBinary的第2個參數:
class MyOwnSerializer2 extends SerializerWithStringManifest { val CustomerManifest = "customer" val UserManifest = "user" val UTF_8 = StandardCharsets.UTF_8.name() // Pick a unique identifier for your Serializer, // you've got a couple of billions to choose from, // 0 - 40 is reserved by Akka itself def identifier = 1234567 // The manifest (type hint) that will be provided in the fromBinary method // Use `""` if manifest is not needed. def manifest(obj: AnyRef): String = obj match { case _: Customer => CustomerManifest case _: User => UserManifest } // "toBinary" serializes the given object to an Array of Bytes def toBinary(obj: AnyRef): Array[Byte] = { // Put the real code that serializes the object here obj match { case Customer(name) => name.getBytes(UTF_8) case User(name) => name.getBytes(UTF_8) } } // "fromBinary" deserializes the given array, // using the type hint def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = { // Put the real code that deserializes here manifest match { case CustomerManifest => Customer(new String(bytes, UTF_8)) case UserManifest => User(new String(bytes, UTF_8)) } } }
ActorRef都可以使用Jackson進行序列化,但也能夠自定義實現。
其中,要以字符串形式表示ActorRef,應藉助ActorRefResolver實現。它主要有2個方法,分別對應序列化和反序列化:
class PingSerializer(system: ExtendedActorSystem) extends SerializerWithStringManifest { private val actorRefResolver = ActorRefResolver(system.toTyped) private val PingManifest = "a" private val PongManifest = "b" override def identifier = 41 override def manifest(msg: AnyRef) = msg match { case _: PingService.Ping => PingManifest case PingService.Pong => PongManifest case _ => throw new IllegalArgumentException(s"Can't serialize object of type ${msg.getClass} in [${getClass.getName}]") } override def toBinary(msg: AnyRef) = msg match { case PingService.Ping(who) => actorRefResolver.toSerializationFormat(who).getBytes(StandardCharsets.UTF_8) case PingService.Pong => Array.emptyByteArray case _ => throw new IllegalArgumentException(s"Can't serialize object of type ${msg.getClass} in [${getClass.getName}]") } override def fromBinary(bytes: Array[Byte], manifest: String) = { manifest match { case PingManifest => val str = new String(bytes, StandardCharsets.UTF_8) val ref = actorRefResolver.resolveActorRef[PingService.Pong.type](str) PingService.Ping(ref) case PongManifest => PingService.Pong case _ => throw new IllegalArgumentException(s"Unknown manifest [$manifest]") } } }
一個消息被反序列爲消息對象,其決定因素只有3個:payload、serializerId和manifest。Akka根據Id選擇Serializer,而後Serializer根據manifest匹配fromBinary,最後fromBinary使用payload解析出消息對象。在這個過程當中,起關鍵做用的manifest並不等價於Serializer綁定的消息類型,因此一個Serializer能夠應用於多個消息類型,這就給換用新的序列化器提供了機會。主要步驟包括兩步:
爲了在本地測試時確認消息被正常地序列化與反序列化,能夠採起以下配置啓用本地消息的序列化。若是要將某個消息排除出此列,則須要繼承trait akka.actor.NoSerializationVerificationNeeded
,或者在配置akka.actor.no-serialization-verification-needed-class-prefix
指定類名的前綴。
akka { actor { # 啓用本地消息序列化 serialize-messages = on # 啓用Prop序列化 serialize-creators = on } }
🏭 com.typesafe.akka:akka-serialization-jackson_2.12:2.6.6
Jackson支持文本形式的JSON(jackson-json)和二進制形式的CBOR字節串(jackson-cbor)。
在使用Jackson進行序列化前,須要在Akka配置里加入序列化器聲明和綁定聲明,此處用的JSON格式。
akka.actor { serialization-bindings { "com.myservice.MySerializable" = jackson-json } }
而全部要用Jackson序列化的消息也得擴展其trait以做標識。
// 約定的名稱是CborSerializable或者JsonSerializable,此處用MySerializable是爲了演示 trait MySerializable final case class Message(name: String, nr: Int) extends MySerializable
出於安全考慮,不能將Jackson序列化器應用到諸如java.lang.Object、java.io.Serializable、java.util.Comparable等開放類型。
多態類型是指可能有多種不一樣實現的類型,這就致使在反序列化時將面對多種可能的子類型。因此在使用Jackson序列化前,須要用JsonTypeInfo和JsonSubTypes進行註解說明。
⚠️ 切記不能使用@JsonTypeInfo(use = Id.CLASS)
或ObjectMapper.enableDefaultTyping
,這會給多態類型帶來安全隱患。
final case class Zoo(primaryAttraction: Animal) extends MySerializable @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonSubTypes( Array( new JsonSubTypes.Type(value = classOf[Lion], name = "lion"), new JsonSubTypes.Type(value = classOf[Elephant], name = "elephant"))) sealed trait Animal final case class Lion(name: String) extends Animal final case class Elephant(name: String, age: Int) extends Animal
因爲上述註解只能用於class,因此case class能夠直接使用,但case object就須要採起變通的方法,經過在case object繼承的trait上使用註解@JsonSerialize和@JsonDeserialize,再使用StdSerializer和StdDeserializer實現序列化操做便可。
import com.fasterxml.jackson.core.JsonGenerator import com.fasterxml.jackson.core.JsonParser import com.fasterxml.jackson.databind.DeserializationContext import com.fasterxml.jackson.databind.SerializerProvider import com.fasterxml.jackson.databind.annotation.JsonDeserialize import com.fasterxml.jackson.databind.annotation.JsonSerialize import com.fasterxml.jackson.databind.deser.std.StdDeserializer import com.fasterxml.jackson.databind.ser.std.StdSerializer @JsonSerialize(using = classOf[DirectionJsonSerializer]) @JsonDeserialize(using = classOf[DirectionJsonDeserializer]) sealed trait Direction object Direction { case object North extends Direction case object East extends Direction case object South extends Direction case object West extends Direction } class DirectionJsonSerializer extends StdSerializer[Direction](classOf[Direction]) { import Direction._ override def serialize(value: Direction, gen: JsonGenerator, provider: SerializerProvider): Unit = { val strValue = value match { case North => "N" case East => "E" case South => "S" case West => "W" } gen.writeString(strValue) } } class DirectionJsonDeserializer extends StdDeserializer[Direction](classOf[Direction]) { import Direction._ override def deserialize(p: JsonParser, ctxt: DeserializationContext): Direction = { p.getText match { case "N" => North case "E" => East case "S" => South case "W" => West } } } final case class Compass(currentDirection: Direction) extends MySerializable
Jackson默認會將Scala的枚舉類型中的Value序列化爲一個JsonObject,該JsonObject包含一個「value」字段和一個「type」字段(其值是枚舉的徹底限定類名FQCN)。爲此,Jackson爲每一個字段提供了一個註解JsonScalaEnumeration,用於設定字段的類型,它將會把枚舉值序列化爲JsonString。
trait TestMessage object Planet extends Enumeration { type Planet = Value val Mercury, Venus, Earth, Mars, Krypton = Value } // Uses default Jackson serialization format for Scala Enumerations final case class Alien(name: String, planet: Planet.Planet) extends TestMessage // Serializes planet values as a JsonString class PlanetType extends TypeReference[Planet.type] {} // Specifies the type of planet with @JsonScalaEnumeration final case class Superhero(name: String, @JsonScalaEnumeration(classOf[PlanetType]) planet: Planet.Planet) extends TestMessage
參見Event Sourced一節中的Schema Evolution。
Jackson會自動忽略class中不存在的屬性,因此不須要作額外工做。
若是新增的字段是可選字段,那麼該字段默認值是Option.None,不須要作額外工做。若是是必備字段,那麼須要繼承JacksonMigration並設定其默認值。示例以下:
// Old Event case class ItemAdded(shoppingCartId: String, productId: String, quantity: Int) extends MySerializable // New Event: optional property discount and field note added. // 爲何要區分property與field? case class ItemAdded(shoppingCartId: String, productId: String, quantity: Int, discount: Option[Double], note: String) extends MySerializable { // alternative constructor because `note` should have default value "" when not defined in json @JsonCreator def this(shoppingCartId: String, productId: String, quantity: Int, discount: Option[Double], note: Option[String]) = this(shoppingCartId, productId, quantity, discount, note.getOrElse("")) } // New Event: mandatory field discount added. case class ItemAdded(shoppingCartId: String, productId: String, quantity: Int, discount: Double) extends MySerializable import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.node.DoubleNode import com.fasterxml.jackson.databind.node.ObjectNode import akka.serialization.jackson.JacksonMigration class ItemAddedMigration extends JacksonMigration { // 註明這是第幾個版本,以後還能夠有更新的版本 override def currentVersion: Int = 2 override def transform(fromVersion: Int, json: JsonNode): JsonNode = { val root = json.asInstanceOf[ObjectNode] if (fromVersion <= 1) { root.set("discount", DoubleNode.valueOf(0.0)) } root } }
ItemAddedMigration與ItemAdded的聯繫,須要在配置裏設定,下同:
akka.serialization.jackson.migrations { "com.myservice.event.ItemAdded" = "com.myservice.event.ItemAddedMigration" }
// 將productId重命名爲itemId case class ItemAdded(shoppingCartId: String, itemId: String, quantity: Int) extends MySerializable import akka.serialization.jackson.JacksonMigration import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.node.ObjectNode class ItemAddedMigration extends JacksonMigration { override def currentVersion: Int = 2 override def transform(fromVersion: Int, json: JsonNode): JsonNode = { val root = json.asInstanceOf[ObjectNode] if (fromVersion <= 1) { root.set("itemId", root.get("productId")) root.remove("productId") } root } }
// Old class case class Customer(name: String, street: String, city: String, zipCode: String, country: String) extends MySerializable // New class case class Customer(name: String, shippingAddress: Address, billingAddress: Option[Address]) extends MySerializable //Address class case class Address(street: String, city: String, zipCode: String, country: String) extends MySerializable import akka.serialization.jackson.JacksonMigration import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.node.ObjectNode class CustomerMigration extends JacksonMigration { override def currentVersion: Int = 2 override def transform(fromVersion: Int, json: JsonNode): JsonNode = { val root = json.asInstanceOf[ObjectNode] if (fromVersion <= 1) { val shippingAddress = root.`with`("shippingAddress") shippingAddress.set("street", root.get("street")) shippingAddress.set("city", root.get("city")) shippingAddress.set("zipCode", root.get("zipCode")) shippingAddress.set("country", root.get("country")) root.remove("street") root.remove("city") root.remove("zipCode") root.remove("country") } root } }
// Old class case class OrderAdded(shoppingCartId: String) extends MySerializable // New class case class OrderPlaced(shoppingCartId: String) extends MySerializable class OrderPlacedMigration extends JacksonMigration { override def currentVersion: Int = 2 override def transformClassName(fromVersion: Int, className: String): String = classOf[OrderPlaced].getName override def transform(fromVersion: Int, json: JsonNode): JsonNode = json }
當某個類再也不須要序列化,而只須要反序列化時,應將其加入序列化的白名單,名單是一組類名或其前綴:
akka.serialization.jackson.whitelist-class-prefix = ["com.myservice.event.OrderAdded", "com.myservice.command"]
Akka默認啓用瞭如下Jackson模塊:
akka.serialization.jackson { # The Jackson JSON serializer will register these modules. jackson-modules += "akka.serialization.jackson.AkkaJacksonModule" # AkkaTypedJacksonModule optionally included if akka-actor-typed is in classpath jackson-modules += "akka.serialization.jackson.AkkaTypedJacksonModule" // FIXME how does that optional loading work?? # AkkaStreamsModule optionally included if akka-streams is in classpath jackson-modules += "akka.serialization.jackson.AkkaStreamJacksonModule" jackson-modules += "com.fasterxml.jackson.module.paramnames.ParameterNamesModule" jackson-modules += "com.fasterxml.jackson.datatype.jdk8.Jdk8Module" jackson-modules += "com.fasterxml.jackson.datatype.jsr310.JavaTimeModule" jackson-modules += "com.fasterxml.jackson.module.scala.DefaultScalaModule" }
默認的JSON壓縮策略以下:
# Compression settings for the jackson-json binding akka.serialization.jackson.jackson-json.compression { # Compression algorithm. # - off : no compression (it will decompress payloads even it's off) # - gzip : using common java gzip (it's slower than lz4 generally) # - lz4 : using lz4-java algorithm = gzip # If compression is enabled with the `algorithm` setting the payload is compressed # when it's larger than this value. compress-larger-than = 32 KiB }
# 共有配置 akka.serialization.jackson.jackson-json { serialization-features { WRITE_DATES_AS_TIMESTAMPS = off } } akka.serialization.jackson.jackson-cbor { serialization-features { WRITE_DATES_AS_TIMESTAMPS = on } } akka.actor { serializers { jackson-json-message = "akka.serialization.jackson.JacksonJsonSerializer" jackson-json-event = "akka.serialization.jackson.JacksonJsonSerializer" } serialization-identifiers { jackson-json-message = 9001 jackson-json-event = 9002 } serialization-bindings { "com.myservice.MyMessage" = jackson-json-message "com.myservice.MyEvent" = jackson-json-event } } # 爲每一個綁定關係單獨配置 akka.serialization.jackson { jackson-json-message { serialization-features { WRITE_DATES_AS_TIMESTAMPS = on } } jackson-json-event { serialization-features { WRITE_DATES_AS_TIMESTAMPS = off } } }
默認狀況下,Jackson使用manifest裏的徹底限定類名進行序列化,但這比較耗費磁盤空間和IO資源,爲此能夠用type-in-manifest關閉之,使類名再也不出如今manifest裏,而後再使用deserialization-type指定便可,不然Jackson會在綁定關係裏去查找匹配的類型。
Akka Remoting已經實現了manifest的壓縮,因此這部份內容對它沒有什麼實際效果。
akka.actor { serializers { jackson-json-event = "akka.serialization.jackson.JacksonJsonSerializer" } serialization-identifiers { jackson-json-event = 9001 } serialization-bindings { "com.myservice.MyEvent" = jackson-json-event } } # 因爲manifest無關的序列化一般只適用於一個類型,因此一般採起每綁定關係單獨配置的方式 akka.serialization.jackson { jackson-json-event { type-in-manifest = off # Since there is exactly one serialization binding declared for this # serializer above, this is optional, but if there were none or many, # this would be mandatory. deserialization-type = "com.myservice.MyEvent" } }
WRITE_DATES_AS_TIMESTAMPS
和WRITE_DURATIONS_AS_TIMESTAMPS
默認狀況下是被禁用的,這意味着日期與時間字段將按ISO-8601(rfc3339)標準的yyyy-MM-dd'T'HH:mm:ss.SSSZZ
格式,而不是數字數組進行序列化。雖然這樣的互操做性更好,但速度較慢。因此若是不須要ISO格式便可與外部系統進行互操做,那麼能夠做以下配置,以擁有更佳的性能(反序列化不受此設置影響)。
akka.serialization.jackson.serialization-features { WRITE_DATES_AS_TIMESTAMPS = on WRITE_DURATIONS_AS_TIMESTAMPS = on }
akka.serialization.jackson { # Configuration of the ObjectMapper serialization features. # See com.fasterxml.jackson.databind.SerializationFeature # Enum values corresponding to the SerializationFeature and their boolean value. serialization-features { # Date/time in ISO-8601 (rfc3339) yyyy-MM-dd'T'HH:mm:ss.SSSZ format # as defined by com.fasterxml.jackson.databind.util.StdDateFormat # For interoperability it's better to use the ISO format, i.e. WRITE_DATES_AS_TIMESTAMPS=off, # but WRITE_DATES_AS_TIMESTAMPS=on has better performance. WRITE_DATES_AS_TIMESTAMPS = off WRITE_DURATIONS_AS_TIMESTAMPS = off } # Configuration of the ObjectMapper deserialization features. # See com.fasterxml.jackson.databind.DeserializationFeature # Enum values corresponding to the DeserializationFeature and their boolean value. deserialization-features { FAIL_ON_UNKNOWN_PROPERTIES = off } # Configuration of the ObjectMapper mapper features. # See com.fasterxml.jackson.databind.MapperFeature # Enum values corresponding to the MapperFeature and their # boolean values, for example: # # mapper-features { # SORT_PROPERTIES_ALPHABETICALLY = on # } mapper-features {} # Configuration of the ObjectMapper JsonParser features. # See com.fasterxml.jackson.core.JsonParser.Feature # Enum values corresponding to the JsonParser.Feature and their # boolean value, for example: # # json-parser-features { # ALLOW_SINGLE_QUOTES = on # } json-parser-features {} # Configuration of the ObjectMapper JsonParser features. # See com.fasterxml.jackson.core.JsonGenerator.Feature # Enum values corresponding to the JsonGenerator.Feature and # their boolean value, for example: # # json-generator-features { # WRITE_NUMBERS_AS_STRINGS = on # } json-generator-features {} # Configuration of the JsonFactory StreamReadFeature. # See com.fasterxml.jackson.core.StreamReadFeature # Enum values corresponding to the StreamReadFeatures and # their boolean value, for example: # # stream-read-features { # STRICT_DUPLICATE_DETECTION = on # } stream-read-features {} # Configuration of the JsonFactory StreamWriteFeature. # See com.fasterxml.jackson.core.StreamWriteFeature # Enum values corresponding to the StreamWriteFeatures and # their boolean value, for example: # # stream-write-features { # WRITE_BIGDECIMAL_AS_PLAIN = on # } stream-write-features {} # Configuration of the JsonFactory JsonReadFeature. # See com.fasterxml.jackson.core.json.JsonReadFeature # Enum values corresponding to the JsonReadFeatures and # their boolean value, for example: # # json-read-features { # ALLOW_SINGLE_QUOTES = on # } json-read-features {} # Configuration of the JsonFactory JsonWriteFeature. # See com.fasterxml.jackson.core.json.JsonWriteFeature # Enum values corresponding to the JsonWriteFeatures and # their boolean value, for example: # # json-write-features { # WRITE_NUMBERS_AS_STRINGS = on # } json-write-features {} # Additional classes that are allowed even if they are not defined in `serialization-bindings`. # This is useful when a class is not used for serialization any more and therefore removed # from `serialization-bindings`, but should still be possible to deserialize. whitelist-class-prefix = [] # settings for compression of the payload compression { # Compression algorithm. # - off : no compression # - gzip : using common java gzip algorithm = off # If compression is enabled with the `algorithm` setting the payload is compressed # when it's larger than this value. compress-larger-than = 0 KiB } # Whether the type should be written to the manifest. # If this is off, then either deserialization-type must be defined, or there must be exactly # one serialization binding declared for this serializer, and the type in that binding will be # used as the deserialization type. This feature will only work if that type either is a # concrete class, or if it is a supertype that uses Jackson polymorphism (ie, the # @JsonTypeInfo annotation) to store type information in the JSON itself. The intention behind # disabling this is to remove extraneous type information (ie, fully qualified class names) when # serialized objects are persisted in Akka persistence or replicated using Akka distributed # data. Note that Akka remoting already has manifest compression optimizations that address this, # so for types that just get sent over remoting, this offers no optimization. type-in-manifest = on # The type to use for deserialization. # This is only used if type-in-manifest is disabled. If set, this type will be used to # deserialize all messages. This is useful if the binding configuration you want to use when # disabling type in manifest cannot be expressed as a single type. Examples of when you might # use this include when changing serializers, so you don't want this serializer used for # serialization and you haven't declared any bindings for it, but you still want to be able to # deserialize messages that were serialized with this serializer, as well as situations where # you only want some sub types of a given Jackson polymorphic type to be serialized using this # serializer. deserialization-type = "" # Specific settings for jackson-json binding can be defined in this section to # override the settings in 'akka.serialization.jackson' jackson-json {} # Specific settings for jackson-cbor binding can be defined in this section to # override the settings in 'akka.serialization.jackson' jackson-cbor {} # Issue #28918 for compatibility with data serialized with JacksonCborSerializer in # Akka 2.6.4 or earlier, which was plain JSON format. jackson-cbor-264 = ${akka.serialization.jackson.jackson-cbor} }
🏭 com.typesafe.akka:akka-persistence-typed_2.13:2.6.5
Akka Persistence爲帶狀態的Actor提供了持久化其狀態以備崩潰後恢復的支持,其本質是持久化Actor相關的事件Event,從而在恢復時利用所有事件或階段性快照重塑(Reconstruct/Replay/Rebuild)Actor。ES在現實生活中最典型的一個例子是會計使用的複式記帳法。
📎 參考書目
MSDN上的 CQRS Journey。
該書以一個用C#編寫的Conference預定售票系統爲例,由淺入深地展現了實現CQRS的各個環節須要關注的重點。書中的配圖和討論很是精彩,而其中提到的Process Manager也是當下實現Saga的流行方式之一。
Randy Shoup所著 Events as First-Class Citizens。
文中的Stitch Fix是一家智能零售商,它經過整合零售、技術、倉儲、數據分析等資源,使用數據分析軟件和機器學習來匹配顧客的服裝定製需求,爲其挑選符合其我的風格、尺寸和偏好的服飾和配飾,提供了良好的消費體驗。
顧客按需訂購服裝或申請每個月、每兩個月或每季度交貨。每一個盒子有五件貨物。若是顧客喜歡配送貨物,能夠選擇以標籤價購買,所有購買享受75%的折扣;若是不喜歡,則免費退貨。若是顧客沒有購買任何貨物,則需支付20美圓的設計費。Stitch Fix的平均商品單價約65美圓,公司指望在每一個盒子中,用戶可以保存2件商品。造型師是兼職,薪水爲每小時15美圓。每小時,造型師會完成4個盒子,這樣能產生較高的毛利率,以覆蓋巨大的開銷及庫存成本。
⚠️ 通用數據保護條例(General Data Protection Regulation,GDPR)要求,必須能根據用戶的要求刪除其我的信息。然而,在一個以Event Sourcing爲基礎的應用裏,要完全刪除或修改帶有我的信息的全部事件是很是困難的,因此改用「數據粉碎」的技術來實現。其原理是給每一個人分配一個惟一的ID,而後以該ID做爲密鑰,對其相關的全部我的數據進行加密。當須要完全刪除該用戶的信息時,直接刪除該ID,便可保證其我的數據沒法被解密,從而達到保護目的。Lightbend爲Akka Persistence提供了相應的工具,以幫助構建具備GDPR功能的系統。
Akka Persistence提供了event sourced actor(又稱爲 persistent actor)做爲實現。這類Actor在收到Command時會先進行檢驗Validate。若是Command各項條件經過了檢驗,則使之做用於當前實體,併產生相應的事件Event,待這些Event被持久化後,以更新實體的狀態結束;不然,實體將直接拒絕Reject該Command。(💀 不應是先更新狀態,而後才持久化事件嗎?貌似先持久化再更新會更靠譜。)
而在重塑Actor時,全部的事件將被加載,並沒有需再校驗地直接用於更新Actor的狀態,直到恢復到最新狀態。
一個典型的EventSourcedBehavior包括ID、初始State,CommandHandler與EventHandler四個組成部分,若是須要傳入ActorContext,則在外層用Behaviors.setup傳入便可:
import akka.persistence.typed.scaladsl.EventSourcedBehavior import akka.persistence.typed.PersistenceId object MyPersistentBehavior { sealed trait Command sealed trait Event final case class State() def apply(): Behavior[Command] = EventSourcedBehavior[Command, Event, State]( // 1. 該Actor的惟一Id persistenceId = PersistenceId.ofUniqueId("abc"), // 2. 初始狀態 emptyState = State(), // 3. Command Handler commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"), // 4. Event Handler eventHandler = (state, evt) => throw new NotImplementedError("TODO: process the event return the next state")) }
PersistenceId是Event Sourced Actor在其生命週期內惟一的身份標識(想一想聚合Id)。由於Akka Cluster提供的EntityId可能爲多個不一樣類型的Actor共享,因此通常配合EntityTypeKey一塊兒組成惟一的PersistenceId。因此,PersistenceId.apply()用默認的分隔符|
將entityType.name與entityId兩個字符串鏈接成所需的Id。固然,也能夠使用PersistenceId.ofUniqueId生成自定義分隔符的Id。
即便在集羣條件下,持同一PersistanceId的Actor在任什麼時候候只能存在一個,不然就世界大亂了。固然,由於有Recovery,這個Actor能夠被分片甚至遷移到任何一個片及其節點上。
🔗 摘選自 https://doc.akka.io/docs/akka/current/typed/cluster-sharding.html#persistence-example
sharding.init(Entity(typeKey = HelloWorld.TypeKey) { entityContext => HelloWorld(entityContext.entityId, PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId)) })
一個CommandHandler有2個參數:當前的State、收到的Command,而後返回Effect。Effect由其工廠建立,建立動做包括:
在返回Effect的同時,還能夠在該Effect後接反作用SideEffect,好比Effect.persist(...).thenRun(...)。具體包括:
任何SideEffect都最多隻能執行一次。若是持久化失敗,或者Actor直接重啓、中止後再啓動,都不會執行任何反作用。因此一般是響應RecoveryCompleted信號,在其中去執行須要被確認的反作用,這種狀況下,則可能會出現同一個反作用屢次執行的狀況。
反作用都是按註冊的順序同步執行,但也不能避免由於發送消息等而致使操做的併發執行。反作用也可能在事件被持久化以前就被執行,這樣的話,即便持久化失敗致使事件未被保存,反作用也生效了。
💀 關於翻譯:Akka用「日記」——Journal指代的Event Store,並與「日誌」Log相區別。雖然我更喜歡用「事件簿」這樣的稱謂,但一來請教了師姐說「日記」更準確,二來電影《Joker》裏作心理諮詢的社工在問Frank時也用的Journal這個詞,因而就此做罷。
一個EventHandler有2個參數:當前State,觸發的Event,而後返回新的State。
⚡ CommandHandler觸發並持久化事件,EventHandler處理事件並更新狀態,因此Actor的狀態實際是在EventHandler裏才真正被改變的!
當事件Event被持久化後,EventHandler將使用它去修改做爲參數傳入的當前狀態State,從而產生新的State。至於State的具體實現,能夠是FP風格的不可變量,也能夠是OO風格的可變量,但一般都會封裝在諸如Class這樣的一個容器裏。
不一樣於Command Handler的是,Event Handler不會產生反作用,因此它將直接用於Actor的重塑Recovery操做上。若是須要在Recovery以後作點什麼,那麼恰當的楔入點包括:CommandHandler最後建立的Effect附加的thenRun(),或者是RecoveryCompleted事件的處理函數裏。
由於不一樣的消息將觸發Actor不一樣的行爲,因此行爲也是Actor狀態的一部分。因此在Recovery時除了恢復數據,還要當心恢復其相應的行爲。儘管行爲是函數,而函數是一等公民,因此行爲理應能夠象數據同樣保存,但困難的地方在於怎麼保存編碼,所以Akka Persistence不提供Behavior的持久化。
面對這個棘手的問題,最容易想到的辦法是根據State定義不一樣的CommandHandler,並隨State變化而切換,從而使Actor成爲一臺有限狀態機。因而,由此獲得的即是由State與Command兩級匹配構成的邏輯,利用繼承定義State的不一樣實現,而後先case State、再case Command,最後根據匹配結果將消息分發至相應的處理函數(處理函數亦相對獨立,以凸顯不一樣的邏輯分支)。而在代碼實現的結構上,就是在一個CommandHandler裏,定義若干個協助完成消息處理的private function。這些處理函數的參數由Handler在case分支裏賦與,返回類型則統一爲與CommandHandler相同的Effect[Event, State]。最後,只須要將這個CommandHandler連殼帶肉交給EventSourcedBehavior工廠便可。
📎 更規範的方式是把Handler定義在State裏,具體參見後續的Handler設計指南。
Request-Response是最多見的通訊模式之一。爲了保證Persistent Actor必定會回覆,EventSourcedBehavior推出了ReplyEffect,從而保證CommandHandler必定會發回Reply。它與Effect的惟一區別是必須用工廠Effect.reply
、Effect.noReply
、Effect.thenReply
或者Effect.thenNoReply
之一建立的結果做爲返回值,而再也不是Effect,不然編譯器會提示類型不匹配的錯誤。
爲此,在定義Command時必須包含一個replyTo屬性,同時得用EventSourcedBehavior.withEnforcedReplies(id, state, cmdHandler, evtHandler)
來建立Behavior。
常見的序列化方案和工具也適用於Akka,推薦使用🔗 Jackson
在序列化時,必須考慮不一樣版本事件之間的向下兼容性,參考綱要演進 Schema Evolution(💀 統一箇中文名真難。architecture 架構,pattern 模式,structure 結構,style 風格/樣式,template 模板,boilerplate 樣板,schema 綱要)
相比Recovery,我更喜歡Replay或者Reconstruct,使用「重塑實體」和「事件重播」在語義上也更生動。
Akka Persistence在Actor啓動或重啓時,將自動地直接使用EventHandler進行Actor的重塑。要注意的是,不要在EventHandler中執行反作用,而應該在重塑完成後,在receiveSignal裏響應RecoveryCompleted信號,在響應程序裏執行反作用。在RecoveryCompleted信號裏帶有重塑後的當前狀態。而即便對於一個新的、尚未任何已記錄事件的Actor,在執行Recovery以後也會觸發RecoveryCompleted信號。
因爲在重塑完成前,全部新消息將會被Stash,因此爲防止失去響應,Akka提供了最大併發的重塑數,能夠按akka.persistence.max-concurrent-recoveries = 50
的方式進行配置。
在某些狀況下,事件流可能會損壞,而此時多個寫入者(即多個Persistent Actor實例)準備寫入具備相同序列號的不一樣消息,則會引起不一致的衝突。爲此,Akka Persistence提供了Replay Filter,經過消息序列號和寫入者的UUID來檢測並解決消息之間的衝突。具體配置須要寫入配置文件中的以下區段(leveldb視具體插件而不一樣):
💀 理解不能:爲何會有多個Actor實例要寫入有相同序列號的消息?PersistenceId不應是惟一的嗎?消息序列號是什麼鬼?
🔈 Akka Persistence使用單一寫入者原則,即任一時刻,對於任何一個特定的PersistenceId,只有一個EventSourcedBehavior能持久化事件。
akka.persistence.journal.leveldb.replay-filter { mode = repair-by-discard-old }
包括4種策略:
使用withRecovery()能夠修改重塑的策略,包括禁用自動重塑功能。固然,快照功能能夠單獨禁用,或者只選擇本身須要的那一類快照。
EventSourcedBehavior[Command, Event, State]( persistenceId = PersistenceId.ofUniqueId("abc"), emptyState = State(), commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"), eventHandler = (state, evt) => throw new NotImplementedError("TODO: process the event return the next state")) .withRecovery(Recovery.disabled)
在不使用EventAdapter的狀況下,能夠直接使用withTagger爲EventSourcedBehavior中的事件打上標籤(準確說是標籤集),方便將Event根據Tag實現分組,好比屬於不一樣Actor實例但屬相同類型的全部Event,而後在Persistence Query中使用。
EventSourcedBehavior[Command, Event, State]( persistenceId = PersistenceId.ofUniqueId("abc"), emptyState = State(), commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"), eventHandler = (state, evt) => throw new NotImplementedError("TODO: process the event return the next state")) .withTagger(_ => Set("tag1", "tag2"))
經過繼承EventAdapter[T, Wrapper]
並安裝到EventSourcedBehavior,能夠自動將事件T轉換爲Wrapper,而後持久化。
case class Wrapper[T](event: T) class WrapperEventAdapter[T] extends EventAdapter[T, Wrapper[T]] { override def toJournal(e: T): Wrapper[T] = Wrapper(e) override def fromJournal(p: Wrapper[T], manifest: String): EventSeq[T] = EventSeq.single(p.event) override def manifest(event: T): String = "" } EventSourcedBehavior[Command, Event, State]( persistenceId = PersistenceId.ofUniqueId("abc"), emptyState = State(), commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"), eventHandler = (state, evt) => throw new NotImplementedError("TODO: process the event return the next state")) .eventAdapter(new WrapperEventAdapter[Event])
若Journal存取失敗,則EventSourcedBehavior將中止。該默認行爲能夠經過使用覆寫後的回退策略BackoffSupervisorStrategy
進行改變。普通的Supervisor在此處並不適用,由於事件已經被持久化,單純地重啓Actor並不能一併撤銷日記發生的改變。若是Journal存取失敗發生在重塑Actor的過程當中,則會觸發RecoveryFailed信號,同時Actor將中止或在回退後從新啓動。
但如果Journal在持久化事件時發現錯誤,好比事件沒法被序列化,那麼它會主動拒絕持久化事件。此時該事件一定不會被Journal持久化,而會觸發一個EventRejectedException異常傳遞給EventSourcedBehavior,而後按Supervisor設定的策略進行處理。
EventSourcedBehavior[Command, Event, State]( persistenceId = PersistenceId.ofUniqueId("abc"), emptyState = State(), commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"), eventHandler = (state, evt) => throw new NotImplementedError("TODO: process the event return the next state")) .onPersistFailure( SupervisorStrategy.restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1))
在執行Effect.persist或persistAll,上一個unstashAll或者建立快照期間,全部新到來的消息將自動地被暫存,直到全部事件被持久化且全部的反作用執行完畢。同理,在重塑過程當中,新消息也將被暫存,直到重塑過程結束。除了自動暫存以外,在須要的時候,好比須要等候其餘條件一併成立時,也能夠用Effect.stash手動開始暫存,待條件所有齊備後再thenUnstashAll。
設置Stash的消息數量請配置:akka.persistence.typed.stash-capacity = 10000
⚠️ 因爲Stash的消息都暫存在內存裏,因此在如下狀況發生時,這些消息將丟失:
💀 Akka Persistence爲何沒有爲Mailbox提供一個持久化方案?或者,這應該是ConsumerController的責任?
🔈 參見:耐久的Producer
Akka Persistence使用EventSourcedBehavior,配合Persistence Query的EventsByTag,實現CQRS模式。
與Handler單獨放置的常見方案不一樣,Akka Persistence推薦將CommandHandler與EventHandler都設計在State裏。這樣State即可看成包括了業務邏輯和數據的完整領域對象。而在State剛建立時,除了專門定義一個初始化的State類外,也能夠用Option[State]來代替,這樣Option.None即表明了初始狀態,而Option.Some則是更新後的狀態,而後用case匹配便可。(💀 注意示例代碼裏State用的Option[Account])
完整示例以下:
/** * Bank account example illustrating: * - Option[State] that is starting with None as the initial state * - event handlers in the state classes * - command handlers in the state classes * - replies of various types, using withEnforcedReplies */ object AccountExampleWithOptionState { //#account-entity object AccountEntity { // Command sealed trait Command extends CborSerializable final case class CreateAccount(replyTo: ActorRef[OperationResult]) extends Command final case class Deposit(amount: BigDecimal, replyTo: ActorRef[OperationResult]) extends Command final case class Withdraw(amount: BigDecimal, replyTo: ActorRef[OperationResult]) extends Command final case class GetBalance(replyTo: ActorRef[CurrentBalance]) extends Command final case class CloseAccount(replyTo: ActorRef[OperationResult]) extends Command // Reply sealed trait CommandReply extends CborSerializable sealed trait OperationResult extends CommandReply case object Confirmed extends OperationResult final case class Rejected(reason: String) extends OperationResult final case class CurrentBalance(balance: BigDecimal) extends CommandReply // Event sealed trait Event extends CborSerializable case object AccountCreated extends Event case class Deposited(amount: BigDecimal) extends Event case class Withdrawn(amount: BigDecimal) extends Event case object AccountClosed extends Event val Zero = BigDecimal(0) // type alias to reduce boilerplate type ReplyEffect = akka.persistence.typed.scaladsl.ReplyEffect[Event, Option[Account]] // State sealed trait Account extends CborSerializable { def applyCommand(cmd: Command): ReplyEffect def applyEvent(event: Event): Account } // State: OpenedAccount case class OpenedAccount(balance: BigDecimal) extends Account { require(balance >= Zero, "Account balance can't be negative") override def applyCommand(cmd: Command): ReplyEffect = cmd match { case Deposit(amount, replyTo) => Effect.persist(Deposited(amount)).thenReply(replyTo)(_ => Confirmed) case Withdraw(amount, replyTo) => if (canWithdraw(amount)) Effect.persist(Withdrawn(amount)).thenReply(replyTo)(_ => Confirmed) else Effect.reply(replyTo)(Rejected(s"Insufficient balance $balance to be able to withdraw $amount")) case GetBalance(replyTo) => Effect.reply(replyTo)(CurrentBalance(balance)) case CloseAccount(replyTo) => if (balance == Zero) Effect.persist(AccountClosed).thenReply(replyTo)(_ => Confirmed) else Effect.reply(replyTo)(Rejected("Can't close account with non-zero balance")) case CreateAccount(replyTo) => Effect.reply(replyTo)(Rejected("Account is already created")) } override def applyEvent(event: Event): Account = event match { case Deposited(amount) => copy(balance = balance + amount) case Withdrawn(amount) => copy(balance = balance - amount) case AccountClosed => ClosedAccount case AccountCreated => throw new IllegalStateException(s"unexpected event [$event] in state [OpenedAccount]") } def canWithdraw(amount: BigDecimal): Boolean = { balance - amount >= Zero } } // State: ClosedAccount case object ClosedAccount extends Account { override def applyCommand(cmd: Command): ReplyEffect = cmd match { case c: Deposit => replyClosed(c.replyTo) case c: Withdraw => replyClosed(c.replyTo) case GetBalance(replyTo) => Effect.reply(replyTo)(CurrentBalance(Zero)) case CloseAccount(replyTo) => replyClosed(replyTo) case CreateAccount(replyTo) => replyClosed(replyTo) } private def replyClosed(replyTo: ActorRef[AccountEntity.OperationResult]): ReplyEffect = Effect.reply(replyTo)(Rejected(s"Account is closed")) override def applyEvent(event: Event): Account = throw new IllegalStateException(s"unexpected event [$event] in state [ClosedAccount]") } // when used with sharding, this TypeKey can be used in `sharding.init` and `sharding.entityRefFor`: val TypeKey: EntityTypeKey[Command] = EntityTypeKey[Command]("Account") def apply(persistenceId: PersistenceId): Behavior[Command] = { // type of State is Option[Account] EventSourcedBehavior.withEnforcedReplies[Command, Event, Option[Account]]( persistenceId, None, // use result of case match for the parameter handler. (state, cmd) => state match { case None => onFirstCommand(cmd) case Some(account) => account.applyCommand(cmd) }, // match type Option[Account] declared in withEnforcedReplies. (state, event) => state match { case None => Some(onFirstEvent(event)) case Some(account) => Some(account.applyEvent(event)) }) } def onFirstCommand(cmd: Command): ReplyEffect = { cmd match { case CreateAccount(replyTo) => Effect.persist(AccountCreated).thenReply(replyTo)(_ => Confirmed) case _ => // CreateAccount before handling any other commands Effect.unhandled.thenNoReply() } } def onFirstEvent(event: Event): Account = { event match { case AccountCreated => OpenedAccount(Zero) case _ => throw new IllegalStateException(s"unexpected event [$event] in state [EmptyAccount]") } } } }
快照的初衷,是爲了提升Recovery的效率,因此相應能夠在構造EventSourcedBehavior時使用.snapshotWhen()定義建立快照的兩種狀況:一是每N條事件時建立一份快照;二是當知足特定條件時建立一份快照。
💀 snapshotWhen裏的case匹配什麼鬼?
🔈 參見Akka API
snapshotWhen:在指定狀態和序列號的條件下,當指定事件被持久化後,即建立一份快照。當有多條事件時,則要等全部事件完成持久化後纔會建立快照。
def snapshotWhen(predicate: (State, Event, Long) ⇒ Boolean): EventSourcedBehavior[Command, Event, State]
withRetention:指定保留或刪除快照的策略。默認狀況下,快照不會自動保存和刪除。
def withRetention(criteria: RetentionCriteria): EventSourcedBehavior[Command, Event, State]
如下示例即指定在觸發BookingCompleted事件後建立一份快照:
EventSourcedBehavior[Command, Event, State]( persistenceId = PersistenceId.ofUniqueId("abc"), emptyState = State(), commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"), eventHandler = (state, evt) => state) .snapshotWhen { case (state, BookingCompleted(_), sequenceNumber) => true case (state, event, sequenceNumber) => false } .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 2))
在重塑Actor時,默認會使用SnapshotSelectionCriteria.Latest
來選擇最新的(最年輕)的快照版本,除非使用withRecovery裏的Recovery參數指定其餘策略(好比完全禁用快照):
EventSourcedBehavior[Command, Event, State]( persistenceId = PersistenceId.ofUniqueId("abc"), emptyState = State(), commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"), eventHandler = (state, evt) => throw new NotImplementedError("TODO: process the event return the next state")) .withRecovery(Recovery.withSnapshotSelectionCriteria(SnapshotSelectionCriteria.none))
除了默認提供的snapshot-store
插件(akka.persistence.snapshot-store.plugin,須要配置),能夠使用EventSourcedBehavior.withSnapshotPluginId
指定其餘的替代插件。
保存快照可能會失敗,但它不會致使Actor的中止或重啓,只會觸發信號SnapshotCompleted或者SnapshotFailed,並記入日誌Log。
每當有新的快照成功建立時,舊的快照都將根據RetentionCriteria裏設置的條件自動刪除。在上面的例子裏,將在每100條事件時(numberOfEvents = 100)建立一份快照,而後每份序列號小於已保存快照的序列號減去keepNSnapshots * numberOfEvents的快照會被自動刪除(每隔200號刪除以前的快照)。
⚠️ 根據Akka API的說明,若是將EventSourcedBehavior.withRetention和RetentionCriteria.snapshotEvery一塊兒使用,則符合snapshotWhen定義條件而觸發的快照將不會致使舊快照被刪除。此類刪除僅當單獨使用withRetention,且匹配RetentionCriteria中的numberOfEvents設定值時纔會觸發。
在刪除快照時,將會觸發DeleteSnapshotsCompleted或DeleteSnapshotsFailed信號,可藉此進行調試。
💀 除非頭鐵,在一個以Event Sourced爲基礎實現模式的系統裏,誰會沒事刪除事件?!相反,即便是由於應用版本升級而對原有的事件進行改造,那麼在CQRS Journey裏提出的事件版本遷移才理應是更恰當的選擇。而在Akka Persistence裏,這被稱爲綱要演進Schema Evolution。
刪除事件與刪除快照的策略,都是在withRetention裏用RetentionCriteria.withDeleteEventsOnSnapshot
指定的,且同期的事件會先於快照被刪除,而只保留最新版本的快照(💀這便與非EventSourced的應用有何區別?)。但這只是Akka Persistence認爲的刪除,至於底層的Event Store是否真的從數據庫中刪除該事件,則由EventStore的具體實現決定。
EventSourcedBehavior[Command, Event, State]( persistenceId = PersistenceId.ofUniqueId("abc"), emptyState = State(), commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"), eventHandler = (state, evt) => throw new NotImplementedError("TODO: process the event return the next state")) .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 2).withDeleteEventsOnSnapshot) .receiveSignal { // optionally respond to signals case (state, _: SnapshotFailed) => // react to failure case (state, _: DeleteSnapshotsFailed) => // react to failure case (state, _: DeleteEventsFailed) => // react to failure }
🏭 (看到此處時候更新到2.6.6,要注意這部分和Akka Persistence同樣在將來版本會有大的變化)
com.typesafe.akka:akka-persistence-typed_2.12:2.6.6
com.typesafe.akka:akka-persistence-testkit_2.12:2.6.6
Akka Persistence提供了EventSourcedBehaviorTestKit幫助進行測試,它按照一次一條命令的方式同步執行並返回結果,方便你斷言其行爲。
使用時,經過加載EventSourcedBehaviorTestKit.config來啓動在內存中模擬的事件存儲和快照功能。
Command、Event以及State的序列化校驗會自動完成,相關的設置能夠在建立EventSourcedBehaviorTestKit時,使用SerializationSettings進行自定義。默認狀況下,它只負責序列化是否可用而不檢查結果是否一致,因此要檢查一致性就要啓用verifyEquality,而且用case class之類的方法實現Command、Event和State的equals。
要測試重塑功能,能夠使用EventSourcedBehaviorTestKit.restart
。完整示例以下:
class AccountExampleDocSpec extends ScalaTestWithActorTestKit(EventSourcedBehaviorTestKit.config) with AnyWordSpecLike with BeforeAndAfterEach with LogCapturing { private val eventSourcedTestKit = EventSourcedBehaviorTestKit[AccountEntity.Command, AccountEntity.Event, AccountEntity.Account]( system, AccountEntity("1", PersistenceId("Account", "1"))) override protected def beforeEach(): Unit = { super.beforeEach() eventSourcedTestKit.clear() } "Account" must { "be created with zero balance" in { val result = eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.CreateAccount(_)) result.reply shouldBe AccountEntity.Confirmed result.event shouldBe AccountEntity.AccountCreated result.stateOfType[AccountEntity.OpenedAccount].balance shouldBe 0 } "handle Withdraw" in { eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.CreateAccount(_)) val result1 = eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.Deposit(100, _)) result1.reply shouldBe AccountEntity.Confirmed result1.event shouldBe AccountEntity.Deposited(100) result1.stateOfType[AccountEntity.OpenedAccount].balance shouldBe 100 val result2 = eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.Withdraw(10, _)) result2.reply shouldBe AccountEntity.Confirmed result2.event shouldBe AccountEntity.Withdrawn(10) result2.stateOfType[AccountEntity.OpenedAccount].balance shouldBe 90 } "reject Withdraw overdraft" in { eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.CreateAccount(_)) eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.Deposit(100, _)) val result = eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.Withdraw(110, _)) result.replyOfType[AccountEntity.Rejected] result.hasNoEvents shouldBe true } "handle GetBalance" in { eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.CreateAccount(_)) eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.Deposit(100, _)) val result = eventSourcedTestKit.runCommand[AccountEntity.CurrentBalance](AccountEntity.GetBalance(_)) result.reply.balance shouldBe 100 result.hasNoEvents shouldBe true } } }
🏭 com.typesafe.akka:akka-persistence-testkit_2.12:2.6.6
要測試事件是否被成功持久化,則要使用PersistenceTestKit。它提供了不一樣的工具集:
使用前,須要在用於初始化TestKit的ActorSystem中進行配置:
object TestKitTypedConf { val yourConfiguration = ConfigFactory.defaultApplication() val system = ActorSystem( ??? /*some behavior*/, "test-system", PersistenceTestKitPlugin.config.withFallback(yourConfiguration)) val testKit = PersistenceTestKit(system) } object SnapshotTypedConf { val yourConfiguration = ConfigFactory.defaultApplication() val system = ActorSystem( ??? /*some behavior*/, "test-system", PersistenceTestKitSnapshotPlugin.config.withFallback(yourConfiguration)) val testKit = SnapshotTestKit(system) }
使用PersistenceTestKit,能夠實施如下測試行爲:
經過爲事件存儲實現ProcessingPolicy[EventStorage.JournalOperation]
或者爲快照存儲實現ProcessingPolicy[SnapshotStorage.SnapshotOperation]
,而後使用withPolicy()加載,能夠自定義存儲的存取策略,實現更細粒度的控制。
其中,較爲關鍵的是ProcessingPolicy.tryProcess(persistenceId, storageOperation)方法。storageOperation方法包括:
而tryProcess的結果則是下列情形之一:
object PersistenceTestKitSampleSpec { final case class Cmd(data: String) extends CborSerializable final case class Evt(data: String) extends CborSerializable object State { val empty: State = new State } final class State extends CborSerializable { def updated(event: Evt): State = this } } class PersistenceTestKitSampleSpec extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config.withFallback(ConfigFactory.defaultApplication())) with AnyWordSpecLike with BeforeAndAfterEach { val persistenceTestKit = PersistenceTestKit(system) override def beforeEach(): Unit = { persistenceTestKit.clearAll() } "Persistent actor" should { "persist all events" in { val persistenceId = PersistenceId.ofUniqueId("your-persistence-id") val persistentActor = spawn( EventSourcedBehavior[Cmd, Evt, State]( persistenceId, emptyState = State.empty, commandHandler = (_, cmd) => Effect.persist(Evt(cmd.data)), eventHandler = (state, evt) => state.updated(evt))) val cmd = Cmd("data") persistentActor ! cmd val expectedPersistedEvent = Evt(cmd.data) persistenceTestKit.expectNextPersisted(persistenceId.id, expectedPersistedEvent) } } } class SampleEventStoragePolicy extends EventStorage.JournalPolicies.PolicyType { //you can use internal state, it does not need to be thread safe var count = 1 override def tryProcess(persistenceId: String, processingUnit: JournalOperation): ProcessingResult = if (count < 10) { count += 1 //check the type of operation and react with success or with reject or with failure. //if you return ProcessingSuccess the operation will be performed, otherwise not. processingUnit match { case ReadEvents(batch) if batch.nonEmpty => ProcessingSuccess case WriteEvents(batch) if batch.size > 1 => ProcessingSuccess case ReadSeqNum => StorageFailure() case DeleteEvents(_) => Reject() case _ => StorageFailure() } } else { ProcessingSuccess } } class SampleSnapshotStoragePolicy extends SnapshotStorage.SnapshotPolicies.PolicyType { //you can use internal state, it does not need to be thread safe var count = 1 override def tryProcess(persistenceId: String, processingUnit: SnapshotOperation): ProcessingResult = if (count < 10) { count += 1 //check the type of operation and react with success or with reject or with failure. //if you return ProcessingSuccess the operation will be performed, otherwise not. processingUnit match { case ReadSnapshot(_, payload) if payload.nonEmpty => ProcessingSuccess case WriteSnapshot(meta, payload) if meta.sequenceNr > 10 => ProcessingSuccess case DeleteSnapshotsByCriteria(_) => StorageFailure() case DeleteSnapshotByMeta(meta) if meta.sequenceNr < 10 => ProcessingSuccess case _ => StorageFailure() } } else { ProcessingSuccess } } class PersistenceTestKitSampleSpecWithPolicy extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config.withFallback(ConfigFactory.defaultApplication())) with AnyWordSpecLike with BeforeAndAfterEach { val persistenceTestKit = PersistenceTestKit(system) override def beforeEach(): Unit = { persistenceTestKit.clearAll() persistenceTestKit.resetPolicy() } "Testkit policy" should { "fail all operations with custom exception" in { val policy = new EventStorage.JournalPolicies.PolicyType { class CustomFailure extends RuntimeException override def tryProcess(persistenceId: String, processingUnit: JournalOperation): ProcessingResult = processingUnit match { case WriteEvents(_) => StorageFailure(new CustomFailure) case _ => ProcessingSuccess } } persistenceTestKit.withPolicy(policy) val persistenceId = PersistenceId.ofUniqueId("your-persistence-id") val persistentActor = spawn( EventSourcedBehavior[Cmd, Evt, State]( persistenceId, emptyState = State.empty, commandHandler = (_, cmd) => Effect.persist(Evt(cmd.data)), eventHandler = (state, evt) => state.updated(evt))) persistentActor ! Cmd("data") persistenceTestKit.expectNothingPersisted(persistenceId.id) } } }
PersistenceTestKit能夠配合ActorTestKit一塊兒使用,但有幾點須要注意。一是對集羣條件下涉及多個節點的測試,得使用單獨的事件和快照存儲。儘管能夠使用Persistence Plugin Proxy,但使用真實的數據庫一般會更好、更現實。二是某些Persistence插件會自動建立數據庫的表,但在多個ActorSystem併發要求建表時就有必定的侷限性了。因此爲協調數據庫的初始化工做,就得使用PersistenceInit工具。
val timeout = 5.seconds val done: Future[Done] = PersistenceInit.initializeDefaultPlugins(system, timeout) Await.result(done, timeout)
💀 這就是前面提到的事件版本遷移了!Schema,好比XML Schema,是描述特定結構的一種方式,翻譯爲「綱要」貌似妥帖一些。
這一章的重點,是介紹不一樣的綱要演進策略,以及如何根據領域模型的實際狀況,在不一樣策略之間做出抉擇。固然,這些策略並非所有或者惟一的選擇,而只是Akka給出的有限方案。其本質,都是爲了保證舊系統下舊綱要規格格式的事件,在遷移到新系統後也能保持一致性,且不會爲了處理這些不一樣版本的同一類型事件,而給業務邏輯帶來額外負擔。因此,綱要演進要實現的目標包括:
其中,綱要演進的誘因包括,相應的解決方案也可能是利用EventAdapter實現過濾:
選擇適當的序列化格式很是重要,這不只關乎序列化的性能,還關乎綱要演進的方案肯定和細節的實現。選擇不當的話,序列化的擴展將很是困難,系統中將不得不保留多個不一樣版本的序列化代碼。Akka Persistence推薦的序列化方案主要包括:
🔗 參考文獻:Martin Kleppmann 所著Schema evolution in Avro, Protocol Buffers and Thrift
默認狀況下,Akka Persistence使用Akka Serialization模塊裏基於Google Protocol Buffers實現的一個序列化器。若是Journal插件但願使用其餘類型的序列化器,則須要根據不一樣的數據庫進行挑選。但不管如何,Akka Persistence只是提供一個序列化器的可插拔接口,它不會自動處理消息的序列化。
全部的消息都會被序列化成以下封裝結構:最底層也是最內層的是用黃色標註的有效載荷,它就是消息對象的實例被序列化後的結果,而後序列化器會附加它本身的SerializerId等信息一塊兒組成中間層的PersistentPayload,以後纔是Akka Persistence附加的SequenceNumber、PersistenceId等其餘一些信息包裹組成的最外層。(💀 想像一下對方收到這封信時又是怎麼一層層剝開的就好理解了。外面2層都是框架直接包攬了,只有核心那層須要本身操心。)因此序列化的要點,就在於最內層的消息對象要序列化成什麼樣子。對此,Java內置的序列化器用於調試還算勉強(想一想多層屬性嵌套的情形),生產環境下最好仍是另尋他途。因此,瞭解序列化器的優點和侷限性很重要,這樣才能在進行項目時迅速行動,而且無懼重構模型。
如下是在Akka Serialization裏自定義有效載荷序列化器的示例:
/** * Usually a serializer like this would use a library like: * protobuf, kryo, avro, cap'n proto, flatbuffers, SBE or some other dedicated serializer backend * to perform the actual to/from bytes marshalling. */ final case class Person(name: String, surname: String) class SimplestPossiblePersonSerializer extends SerializerWithStringManifest { val Utf8 = Charset.forName("UTF-8") val PersonManifest = classOf[Person].getName // unique identifier of the serializer // 在反序列化時,這個SerializerId將用於加載同一類型的序列化器,以保證徹底對稱 def identifier = 1234567 // extract manifest to be stored together with serialized object override def manifest(o: AnyRef): String = o.getClass.getName // serialize the object override def toBinary(obj: AnyRef): Array[Byte] = obj match { case p: Person => s"""${p.name}|${p.surname}""".getBytes(Utf8) case _ => throw new IllegalArgumentException(s"Unable to serialize to bytes, class was: ${obj.getClass}!") } // deserialize the object, using the manifest to indicate which logic to apply override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match { case PersonManifest => val nameAndSurname = new String(bytes, Utf8) val Array(name, surname) = nameAndSurname.split("[|]") Person(name, surname) case _ => throw new NotSerializableException( s"Unable to deserialize from bytes, manifest was: $manifest! Bytes length: " + bytes.length) } }
相應在application.conf裏的配置:
akka { actor { serializers { person = "docs.persistence.SimplestPossiblePersonSerializer" } serialization-bindings { "docs.persistence.Person" = person } } }
Akka Serialization提供了相應的Jackson示例
適用場景:向已經存在的事件類型裏添加一個新的字段。
解決方案:添加字段是最多見的演進事由之一,只要添加的字段是二進制兼容的(💀 Jackson就是文本兼容的,不是二進制兼容的嗎?),就能很容易在序列化裏實現演進。此處用ProtoBuf示範,爲值機選座增長了一個靠窗或過道的字段seatType,而後給它一個默認值(此處用的SeatType.Unknown),或者能夠用Option[T]包裝,最後用ProtoBuf提供的方法hasSeatType區分新舊事件,再使用SeatType.fromString從字符串析取值。
class ProtobufReadOptional { sealed abstract class SeatType { def code: String } object SeatType { def fromString(s: String) = s match { case Window.code => Window case Aisle.code => Aisle case Other.code => Other case _ => Unknown } case object Window extends SeatType { override val code = "W" } case object Aisle extends SeatType { override val code = "A" } case object Other extends SeatType { override val code = "O" } case object Unknown extends SeatType { override val code = "" } } case class SeatReserved(letter: String, row: Int, seatType: SeatType) /** * Example serializer impl which uses protocol buffers generated classes (proto.*) * to perform the to/from binary marshalling. */ class AddedFieldsSerializerWithProtobuf extends SerializerWithStringManifest { override def identifier = 67876 final val SeatReservedManifest = classOf[SeatReserved].getName override def manifest(o: AnyRef): String = o.getClass.getName override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match { case SeatReservedManifest => // use generated protobuf serializer seatReserved(FlightAppModels.SeatReserved.parseFrom(bytes)) case _ => throw new NotSerializableException("Unable to handle manifest: " + manifest) } override def toBinary(o: AnyRef): Array[Byte] = o match { case s: SeatReserved => FlightAppModels.SeatReserved.newBuilder .setRow(s.row) .setLetter(s.letter) .setSeatType(s.seatType.code) .build() .toByteArray } // -- fromBinary helpers -- private def seatReserved(p: FlightAppModels.SeatReserved): SeatReserved = SeatReserved(p.getLetter, p.getRow, seatType(p)) // handle missing field by assigning "Unknown" value private def seatType(p: FlightAppModels.SeatReserved): SeatType = if (p.hasSeatType) SeatType.fromString(p.getSeatType) else SeatType.Unknown } }
相應的ProtoBuf配置FlightAppModels.proto,其中新增長的seatType是optional
。ProtoBuf會根據配置生成一個具體負責Marshall的工具類,optional字段將會賦與一hasXXX的方法:
option java_package = "docs.persistence.proto"; option optimize_for = SPEED; message SeatReserved { required string letter = 1; required uint32 row = 2; optional string seatType = 3; // the new field }
適用場景:解決設計之初不恰當的字段命名,使其更符合業務需求。此處舉例用了SeatReserved中原來的code,如今的seatNr。
解決方案一:使用符合IDL規範(Interface Description Language)的序列化器。這是最簡單有效的方案,也是ProtoBuf和Thrift採起的方案,好比上面的.proto便是用IDL描述的映射結構,而後ProtoBuf再據此描述自動生成工具類。要更名時,只須要維護IDL映射結構,保持字段的ID不變,修改映射的名稱便可。
// protobuf message definition, BEFORE: message SeatReserved { required string code = 1; } // protobuf message definition, AFTER: message SeatReserved { required string seatNr = 1; // field renamed, id remains the same }
解決方案二:手動處理事件版本的遷移。在沒辦法使用IDL方式,好比使用Jackson格式進行序列化時,就只有手動進行轉換,給事件附加一個版本號字段,而後用手寫的EventAdapter進行反序列化的轉換(該EventAdapter在EventSourcedBehavior建立時加載)。在使用Jackson進行增長字段、改變事件結構等狀況下,這樣的方法也是適用的。
class JsonRenamedFieldAdapter extends EventAdapter { import spray.json.JsObject val marshaller = new ExampleJsonMarshaller val V1 = "v1" val V2 = "v2" // this could be done independently for each event type override def manifest(event: Any): String = V2 override def toJournal(event: Any): JsObject = marshaller.toJson(event) override def fromJournal(event: Any, manifest: String): EventSeq = event match { case json: JsObject => EventSeq(marshaller.fromJson(manifest match { case V1 => rename(json, "code", "seatNr") case V2 => json // pass-through case unknown => throw new IllegalArgumentException(s"Unknown manifest: $unknown") })) case _ => val c = event.getClass throw new IllegalArgumentException("Can only work with JSON, was: %s".format(c)) } def rename(json: JsObject, from: String, to: String): JsObject = { val value = json.fields(from) val withoutOld = json.fields - from JsObject(withoutOld + (to -> value)) } }
適用場景:某個事件被認爲是多餘的、毫無價值甚至影響效率的,可是在重塑時卻無法跳過該事件。本例中是乘客按燈呼叫服務的事件CustomerBlinked。
最簡單的方案:因爲事件並不能真正從Journal中完全刪除,因此一般是在重塑時經過忽略特定的事件達到刪除的效果。最簡單的方案,就是在EventAdapter中截留該事件而返回一個空的EventSeq,同時放過其餘類型的事件。該方案的弊端,在於從Storage中讀取事件時,仍須要反序列化這個事件,從而致使效率的損失。
更成熟的方案:在上述方案基礎上,增長了在序列化器端的過濾,使特定事件再也不被反序列化。被忽略的事件被稱爲墓碑Tombstone。
final case class CustomerBlinked(customerId: Long) case object EventDeserializationSkipped class RemovedEventsAwareSerializer extends SerializerWithStringManifest { val utf8 = Charset.forName("UTF-8") override def identifier: Int = 8337 val SkipEventManifestsEvents = Set("docs.persistence.CustomerBlinked" // 其餘被忽略的事件... ) override def manifest(o: AnyRef): String = o.getClass.getName override def toBinary(o: AnyRef): Array[Byte] = o match { case _ => o.toString.getBytes(utf8) // example serialization } override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match { case m if SkipEventManifestsEvents.contains(m) => EventDeserializationSkipped case other => new String(bytes, utf8) } } class SkippedEventsAwareAdapter extends EventAdapter { override def manifest(event: Any) = "" override def toJournal(event: Any) = event override def fromJournal(event: Any, manifest: String) = event match { case EventDeserializationSkipped => EventSeq.empty case _ => EventSeq(event) } }
適用場景:這主要是從持久化無關(Persistence Ignorance)的角度,堅持採用POJO(Plain Ordinary Java Object)這樣的case class實現領域模型,並儘可能避免數據模型及數據庫、序列化器等底層細節和框架對領域模型的入侵。
解決方案:建立一個EventAdapter,實現case class與數據庫存取class之間一對一的映射。
/** Domain model - highly optimised for domain language and maybe "fluent" usage */ object DomainModel { final case class Customer(name: String) final case class Seat(code: String) { def bookFor(customer: Customer): SeatBooked = SeatBooked(code, customer) } final case class SeatBooked(code: String, customer: Customer) } /** Data model - highly optimised for schema evolution and persistence */ object DataModel { final case class SeatBooked(code: String, customerName: String) } class DetachedModelsAdapter extends EventAdapter { override def manifest(event: Any): String = "" override def toJournal(event: Any): Any = event match { case DomainModel.SeatBooked(code, customer) => DataModel.SeatBooked(code, customer.name) } override def fromJournal(event: Any, manifest: String): EventSeq = event match { case DataModel.SeatBooked(code, customerName) => EventSeq(DomainModel.SeatBooked(code, DomainModel.Customer(customerName))) } }
適用場景:但願以JSON等更可讀的樣式,而不是一個二進制流的方式來保存事件。這在最近的一些諸如MongoDB、PostgreSQL的NoSQL類型數據庫中應用較爲廣泛。在作這樣的決定前,必需要肯定是存儲格式即是要可讀樣式的,還只是想窺探事件存儲而須要可讀樣式的。若是是後者,Persistence Query也能達到一樣目的,且不會影響存儲效率。
解決方案:建立EventAdapter,將事件轉化爲JSON後交給Journal直接保存。前提是必須有適配Akka Persistence的Journal插件支持,這樣數據庫才能直接識別EventAdapter轉化來的JSon對象,而後存儲它。
// act as-if JSON library class ExampleJsonMarshaller { def toJson(any: Any): JsObject = JsObject() def fromJson(json: JsObject): Any = new Object } class JsonDataModelAdapter extends EventAdapter { override def manifest(event: Any): String = "" val marshaller = new ExampleJsonMarshaller override def toJournal(event: Any): JsObject = marshaller.toJson(event) override def fromJournal(event: Any, manifest: String): EventSeq = event match { case json: JsObject => EventSeq(marshaller.fromJson(json)) case _ => throw new IllegalArgumentException("Unable to fromJournal a non-JSON object! Was: " + event.getClass) } }
替代方案:若是找不到能支持上述方案的Journal插件,使用akka.persistence.journal.AsyncWriteJournal
來本身手動實現一個JSON格式的序列化器,再配合一個EventAdapter實現toJournal與fromJournal也是可行的。
適用場景:隨着領域分析的深刻,須要將原有粗粒度的一個事件切分爲更小粒度的若干事件。此處以「用戶信息改變」爲例,將其切分爲更小粒度的「用戶名改變」「地址改變」等等。
解決方案:依舊是藉助EventAdapter,將Journal裏保存的一個大事件,切分爲若干個小事件,反之亦然。
trait Version1 trait Version2 // V1 event: final case class UserDetailsChanged(name: String, address: String) extends Version1 // corresponding V2 events: final case class UserNameChanged(name: String) extends Version2 final case class UserAddressChanged(address: String) extends Version2 // event splitting adapter: class UserEventsAdapter extends EventAdapter { override def manifest(event: Any): String = "" override def fromJournal(event: Any, manifest: String): EventSeq = event match { case UserDetailsChanged(null, address) => EventSeq(UserAddressChanged(address)) case UserDetailsChanged(name, null) => EventSeq(UserNameChanged(name)) case UserDetailsChanged(name, address) => EventSeq(UserNameChanged(name), UserAddressChanged(address)) case event: Version2 => EventSeq(event) } override def toJournal(event: Any): Any = event }
🏭 com.typesafe.akka:akka-persistence-query_2.12:2.6.6
Akka Persistence公開了一個基於異步流的查詢接口,使CQRS的讀端(Read-Side)能夠利用該接口讀取Journal裏保存的事件,從而執行更新UI等任務。可是,Persistence Query不能徹底勝任讀端的要求,它一般只能協助應用將數據從寫端遷移到讀端,這也是爲了更好的執行效率和可擴展性,而建議讀端與寫端分別使用不一樣類型數據庫的緣由。
💀 寫端一般都是以追加方式寫入帶Id的Event,因此傳統的關係型數據庫MySQL、Oracle或者象LevelDB、Redis這類Key-Value類型的NoSQL數據庫一般會有較好的性能。而在讀端,相似MongoDB這樣文檔類型的NoSQL數據庫會有更大的市場。
考慮到要儘量保持接口的通用性,Akka Persistence沒有過多幹涉Persistence Query的API定義,只要求:每一個讀日記(Read Journal)都必須明確代表其支持的查詢類型,並按最多見的場景預約義了一些查詢類型,而由Journal的插件本身去選擇其中的一部分並實現之。
ReadJournal都屬於Akka社區插件(🔗 Community Plugins),由用戶本身開發並維護。每一個ReadJournal對應一個存儲方案(能夠是數據庫,甚至文本文件),且都有一個固定的Id。該Id能夠使用相似readJournalFor[NoopJournal](NoopJournal.identifier)
的方式獲取它,但這不是強制的,只是推薦作法。
要使用ReadJournal進行查詢,就須要先獲取它的一個實例(此處的Id即爲akka.persistence.query.my-read-journal
):
// obtain read journal by plugin id val readJournal = PersistenceQuery(system).readJournalFor[MyScaladslReadJournal]("akka.persistence.query.my-read-journal") // issue query to journal val source: Source[EventEnvelope, NotUsed] = readJournal.eventsByPersistenceId("user-1337", 0, Long.MaxValue) // materialize stream, consuming events source.runForeach { event => println("Event: " + event) }
Akka Persistence的Read Journal API主要包括如下內容:
persistenceIds():查詢系統中全部活動的Live PersistenceId,每當有新的Id被建立時也將被加入流當中。
currentPersistenceIds():查詢系統中當前的PersistenceId,在執行查詢以後新建立的Id不會被加入流中。
eventsByPersistenceId(id, fromSequenceNr = 0L, toSequenceNr = Long.MaxValue):查詢PersistenceId對應Persistent Actor的事件,這有點相似於重塑過程當中依次獲取事件的活動,但該查詢流也是活動的,因此多數Journal會採用輪詢的方式確保結果是最新的。此處的SequenceNr用於從指定位置開始查詢事件,這也變相地爲「斷點續注」提供了支持。
eventTag(tag, offset):查詢指定Tag的全部事件。事件可能來源於多個PersistenceId,並且不能保證其前後順序,除非Journal在反饋結果時提早排好序。
val NumberOfEntityGroups = 10 def tagEvent(entityId: String, event: Event): Set[String] = { val entityGroup = s"group-${math.abs(entityId.hashCode % NumberOfEntityGroups)}" event match { // OrderCompleted類型的事件會額外多一個標籤 case _: OrderCompleted => Set(entityGroup, "order-completed") case _ => Set(entityGroup) } } def apply(entityId: String): Behavior[Command] = { EventSourcedBehavior[Command, Event, State]( persistenceId = PersistenceId("ShoppingCart", entityId), emptyState = State(), commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"), eventHandler = (state, evt) => throw new NotImplementedError("TODO: process the event return the next state")) .withTagger(event => tagEvent(entityId, event)) } // assuming journal is able to work with numeric offsets we can: val completedOrders: Source[EventEnvelope, NotUsed] = readJournal.eventsByTag("order-completed", Offset.noOffset) // find first 10 completed orders: val firstCompleted: Future[Vector[OrderCompleted]] = completedOrders .map(_.event) .collectType[OrderCompleted] .take(10) // cancels the query stream after pulling 10 elements .runFold(Vector.empty[OrderCompleted])(_ :+ _) // start another query, from the known offset val furtherOrders = readJournal.eventsByTag("order-completed", offset = Sequence(10))
查詢附屬信息:Persistence Query還支持查詢屬於流的附屬信息,不過具體得由Journal實現。
final case class RichEvent(tags: Set[String], payload: Any) // a plugin can provide: order & infinite case class QueryMetadata(deterministicOrder: Boolean, infinite: Boolean) // Journal提供的查詢附屬信息API def byTagsWithMeta(tags: Set[String]): Source[RichEvent, QueryMetadata] = ??? // 使用上述API查詢Materialized values val query: Source[RichEvent, QueryMetadata] = readJournal.byTagsWithMeta(Set("red", "blue")) query .mapMaterializedValue { meta => println( s"The query is: " + s"ordered deterministically: ${meta.deterministicOrder}, " + s"infinite: ${meta.infinite}") } .map { event => println(s"Event payload: ${event.payload}") } .runWith(Sink.ignore)
在CQRS模式下,讀寫端只要能保證最終的一致性,能夠分別擁有不一樣形式的存儲方案。據此,能夠在讀端採起相似數據庫視圖的方式,創建固定結構的物化視圖Materialized Views反覆使用,從而提升查詢的效率。這樣的視圖無需嚴格遵照數據庫的範式規則,怎麼方便怎麼來,好比用文檔類型的NoSQL就不錯。
藉助兼容🔗 JDK9中Reactive Streams接口的數據庫建立物化視圖:這須要讀端數據存儲支持Reactive Streams接口,這樣Journal直接將寫端數據注入讀端便可。
implicit val system = ActorSystem() val readJournal = PersistenceQuery(system).readJournalFor[MyScaladslReadJournal](JournalId) val dbBatchWriter: Subscriber[immutable.Seq[Any]] = ReactiveStreamsCompatibleDBDriver.batchWriter // Using an example (Reactive Streams) Database driver readJournal .eventsByPersistenceId("user-1337", fromSequenceNr = 0L, toSequenceNr = Long.MaxValue) .map(envelope => envelope.event) .map(convertToReadSideTypes) // convert to datatype .grouped(20) // batch inserts into groups of 20 .runWith(Sink.fromSubscriber(dbBatchWriter)) // write batches to read-side database
藉助mapAsync建立物化視圖:在沒有Reactive Streams支持的狀況下,本身動手實現從寫端事件數據庫到讀端數據庫的轉換過程。
// 模擬的讀端數據庫 trait ExampleStore { def save(event: Any): Future[Unit] } val store: ExampleStore = ??? readJournal .eventsByTag("bid", NoOffset) .mapAsync(parallelism = 1) { e => store.save(e) } .runWith(Sink.ignore)
def runQuery(writer: ActorRef[TheOneWhoWritesToQueryJournal.Command])(implicit system: ActorSystem[_]): Unit = { val readJournal = PersistenceQuery(system.toClassic).readJournalFor[MyScaladslReadJournal](JournalId) import system.executionContext implicit val timeout = Timeout(3.seconds) val bidProjection = new MyResumableProjection("bid") bidProjection.latestOffset.foreach { startFromOffset => readJournal .eventsByTag("bid", Sequence(startFromOffset)) .mapAsync(8) { envelope => writer .ask((replyTo: ActorRef[Done]) => TheOneWhoWritesToQueryJournal.Update(envelope.event, replyTo)) .map(_ => envelope.offset) } .mapAsync(1) { offset => bidProjection.saveProgress(offset) } .runWith(Sink.ignore) } } // 用一個Actor來實際執行注入任務 object TheOneWhoWritesToQueryJournal { sealed trait Command final case class Update(payload: Any, replyTo: ActorRef[Done]) extends Command def apply(id: String, store: ExampleStore): Behavior[Command] = { updated(ComplexState(), store) } private def updated(state: ComplexState, store: ExampleStore): Behavior[Command] = { Behaviors.receiveMessage { case command: Update => val newState = updateState(state, command) if (state.readyToSave) store.save(Record(state)) updated(newState, store) } } private def updateState(state: ComplexState, command: Command): ComplexState = { // some complicated aggregation logic here ... state } }
不管使用何種數據庫,只要是實現了ReadJournal的插件,都屬於查詢插件Query Plugins,都要公開查詢適用的場景和語義。
全部的ReadJournal插件都必須實現akka.persistence.query.ReadJournalProvider
,且該Provider必須能同時支持建立Scala與Java版本的ReadJournal實例(akka.persistence.query.scaladsl/javadsl.ReadJournal),其構造子主要有如下4種形式:
若是數據庫只支持查詢當前結果集,那麼對於這一類無限的事件流,ReadJournal就必須採起輪詢方式反覆嘗試讀取更新的事件,此時建議在配置中使用refresh-interval
定義輪詢間隔。
class MyReadJournalProvider(system: ExtendedActorSystem, config: Config) extends ReadJournalProvider { override val scaladslReadJournal: MyScaladslReadJournal = new MyScaladslReadJournal(system, config) override val javadslReadJournal: MyJavadslReadJournal = new MyJavadslReadJournal(scaladslReadJournal) } class MyScaladslReadJournal(system: ExtendedActorSystem, config: Config) extends akka.persistence.query.scaladsl.ReadJournal with akka.persistence.query.scaladsl.EventsByTagQuery with akka.persistence.query.scaladsl.EventsByPersistenceIdQuery with akka.persistence.query.scaladsl.PersistenceIdsQuery with akka.persistence.query.scaladsl.CurrentPersistenceIdsQuery { private val refreshInterval: FiniteDuration = config.getDuration("refresh-interval", MILLISECONDS).millis /** * You can use `NoOffset` to retrieve all events with a given tag or retrieve a subset of all * events by specifying a `Sequence` `offset`. The `offset` corresponds to an ordered sequence number for * the specific tag. Note that the corresponding offset of each event is provided in the * [[akka.persistence.query.EventEnvelope]], which makes it possible to resume the * stream at a later point from a given offset. * * The `offset` is exclusive, i.e. the event with the exact same sequence number will not be included * in the returned stream. This means that you can use the offset that is returned in `EventEnvelope` * as the `offset` parameter in a subsequent query. */ override def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] = offset match { case Sequence(offsetValue) => Source.fromGraph(new MyEventsByTagSource(tag, offsetValue, refreshInterval)) case NoOffset => eventsByTag(tag, Sequence(0L)) //recursive case _ => throw new IllegalArgumentException("MyJournal does not support " + offset.getClass.getName + " offsets") } override def eventsByPersistenceId( persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long): Source[EventEnvelope, NotUsed] = { // implement in a similar way as eventsByTag ??? } override def persistenceIds(): Source[String, NotUsed] = { // implement in a similar way as eventsByTag ??? } override def currentPersistenceIds(): Source[String, NotUsed] = { // implement in a similar way as eventsByTag ??? } // possibility to add more plugin specific queries def byTagsWithMeta(tags: Set[String]): Source[RichEvent, QueryMetadata] = { // implement in a similar way as eventsByTag ??? } } class MyJavadslReadJournal(scaladslReadJournal: MyScaladslReadJournal) extends akka.persistence.query.javadsl.ReadJournal with akka.persistence.query.javadsl.EventsByTagQuery with akka.persistence.query.javadsl.EventsByPersistenceIdQuery with akka.persistence.query.javadsl.PersistenceIdsQuery with akka.persistence.query.javadsl.CurrentPersistenceIdsQuery { override def eventsByTag(tag: String, offset: Offset = Sequence(0L)): javadsl.Source[EventEnvelope, NotUsed] = scaladslReadJournal.eventsByTag(tag, offset).asJava override def eventsByPersistenceId( persistenceId: String, fromSequenceNr: Long = 0L, toSequenceNr: Long = Long.MaxValue): javadsl.Source[EventEnvelope, NotUsed] = scaladslReadJournal.eventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr).asJava override def persistenceIds(): javadsl.Source[String, NotUsed] = scaladslReadJournal.persistenceIds().asJava override def currentPersistenceIds(): javadsl.Source[String, NotUsed] = scaladslReadJournal.currentPersistenceIds().asJava // possibility to add more plugin specific queries def byTagsWithMeta(tags: java.util.Set[String]): javadsl.Source[RichEvent, QueryMetadata] = { import akka.util.ccompat.JavaConverters._ scaladslReadJournal.byTagsWithMeta(tags.asScala.toSet).asJava } } class MyEventsByTagSource(tag: String, offset: Long, refreshInterval: FiniteDuration) extends GraphStage[SourceShape[EventEnvelope]] { private case object Continue val out: Outlet[EventEnvelope] = Outlet("MyEventByTagSource.out") override def shape: SourceShape[EventEnvelope] = SourceShape(out) override protected def initialAttributes: Attributes = Attributes(ActorAttributes.IODispatcher) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with OutHandler { lazy val system = materializer.system private val Limit = 1000 private val connection: java.sql.Connection = ??? private var currentOffset = offset private var buf = Vector.empty[EventEnvelope] private val serialization = SerializationExtension(system) override def preStart(): Unit = { scheduleWithFixedDelay(Continue, refreshInterval, refreshInterval) } override def onPull(): Unit = { query() tryPush() } override def onDownstreamFinish(): Unit = { // close connection if responsible for doing so } private def query(): Unit = { if (buf.isEmpty) { try { buf = Select.run(tag, currentOffset, Limit) } catch { case NonFatal(e) => failStage(e) } } } private def tryPush(): Unit = { if (buf.nonEmpty && isAvailable(out)) { push(out, buf.head) buf = buf.tail } } override protected def onTimer(timerKey: Any): Unit = timerKey match { case Continue => query() tryPush() } object Select { private def statement() = connection.prepareStatement(""" SELECT id, persistence_id, seq_nr, serializer_id, serializer_manifest, payload FROM journal WHERE tag = ? AND id > ? ORDER BY id LIMIT ? """) def run(tag: String, from: Long, limit: Int): Vector[EventEnvelope] = { val s = statement() try { s.setString(1, tag) s.setLong(2, from) s.setLong(3, limit) val rs = s.executeQuery() val b = Vector.newBuilder[EventEnvelope] while (rs.next()) { val deserialized = serialization .deserialize(rs.getBytes("payload"), rs.getInt("serializer_id"), rs.getString("serializer_manifest")) .get currentOffset = rs.getLong("id") b += EventEnvelope( Offset.sequence(currentOffset), rs.getString("persistence_id"), rs.getLong("seq_nr"), deserialized) } b.result() } finally s.close() } } } }
關於這部分,請參考Lagom框架。Lagom是Lightbend開發的一個微服務框架,裏面涉及了ES和CQRS的大量實現,裏面已經有成熟的方案,能夠藉助Cluster Sharding實現有效擴展。
Lagom: The opinionated microservices framework for moving away from the monolith.
Lagom helps you decompose your legacy monolith and build, test, and deploy entire systems of Reactive microservices.
📌 LevelDB是Google推出的一款Key-Value類型的NoSQL本地數據庫(暫不支持網絡)。本示例用LevelDB演示瞭如何用綠黑藍做Tag,而後進行Persistence Query。
LevelDB is a fast key-value storage library written at Google that provides an ordered mapping from string keys to string values.
import akka.NotUsed import akka.testkit.AkkaSpec import akka.persistence.query.{ EventEnvelope, PersistenceQuery, Sequence } import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal import akka.stream.scaladsl.Source object LeveldbPersistenceQueryDocSpec { //#tagger import akka.persistence.journal.WriteEventAdapter import akka.persistence.journal.Tagged class MyTaggingEventAdapter extends WriteEventAdapter { val colors = Set("green", "black", "blue") override def toJournal(event: Any): Any = event match { case s: String => var tags = colors.foldLeft(Set.empty[String]) { (acc, c) => if (s.contains(c)) acc + c else acc } if (tags.isEmpty) event else Tagged(event, tags) case _ => event } override def manifest(event: Any): String = "" } //#tagger } class LeveldbPersistenceQueryDocSpec(config: String) extends AkkaSpec(config) { def this() = this("") "LeveldbPersistentQuery" must { "demonstrate how get ReadJournal" in { //#get-read-journal import akka.persistence.query.PersistenceQuery import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal val queries = PersistenceQuery(system).readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier) //#get-read-journal } "demonstrate EventsByPersistenceId" in { //#EventsByPersistenceId val queries = PersistenceQuery(system).readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier) val src: Source[EventEnvelope, NotUsed] = queries.eventsByPersistenceId("some-persistence-id", 0L, Long.MaxValue) val events: Source[Any, NotUsed] = src.map(_.event) //#EventsByPersistenceId } "demonstrate AllPersistenceIds" in { //#AllPersistenceIds val queries = PersistenceQuery(system).readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier) val src: Source[String, NotUsed] = queries.persistenceIds() //#AllPersistenceIds } "demonstrate EventsByTag" in { //#EventsByTag val queries = PersistenceQuery(system).readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier) val src: Source[EventEnvelope, NotUsed] = queries.eventsByTag(tag = "green", offset = Sequence(0L)) //#EventsByTag } } }
相應的配置以下:
# Configuration for the LeveldbReadJournal akka.persistence.query.journal.leveldb { # Implementation class of the LevelDB ReadJournalProvider class = "akka.persistence.query.journal.leveldb.LeveldbReadJournalProvider" # Absolute path to the write journal plugin configuration entry that this # query journal will connect to. That must be a LeveldbJournal or SharedLeveldbJournal. # If undefined (or "") it will connect to the default journal as specified by the # akka.persistence.journal.plugin property. write-plugin = "" # The LevelDB write journal is notifying the query side as soon as things # are persisted, but for efficiency reasons the query side retrieves the events # in batches that sometimes can be delayed up to the configured `refresh-interval`. refresh-interval = 3s # How many events to fetch in one query (replay) and keep buffered until they # are delivered downstreams. max-buffer-size = 100 }
持久化插件Persistence Plugins爲數據庫存儲事件和快照提供支持,要注意與查詢插件Query Plugins區別。由Akka團隊負責維護的持久化插件包括:
在Persistent Actor沒有覆寫journalPluginId和snapshotPluginId的狀況下,Akka將使用在reference.conf中akka.persistence.journal.plugin
和akka.persistence.snapshot-store.plugin
中配置的默認日記和快照存儲插件。若配置留空,則須要在application.conf中明確指定。
若是須要早晚加載持久化插件,則需按以下配置:
akka { extensions = [akka.persistence.Persistence] persistence { journal { plugin = "akka.persistence.journal.leveldb" auto-start-journals = ["akka.persistence.journal.leveldb"] } snapshot-store { plugin = "akka.persistence.snapshot-store.local" auto-start-snapshot-stores = ["akka.persistence.snapshot-store.local"] } } }
配置文件指定啓用LevelDB Plugin做爲Persistence Plugin,並指定數據庫文件存放位置(默認是當前工做目錄下的journal文件夾,快照是snapshots文件夾):
# Path to the journal plugin to be used akka.persistence.journal.plugin = "akka.persistence.journal.leveldb" akka.persistence.journal.leveldb.dir = "target/journal" # Path to the snapshot store plugin to be used akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" akka.persistence.snapshot-store.local.dir = "target/snapshots"
Gradle包管理加入LevelDB Plugin
dependencies { implementation org.fusesource.leveldbjni:leveldbjni-all:1.8 }
設定LevelDB的持久化參數,此處主要是設定每到哪一個id,就經過刪除事件而保留快照的方式(並不是真正刪除,而是給事件打上邏輯標誌,使之成爲「墓碑」),實現壓縮數據庫的功能:
# Number of deleted messages per persistence id that will trigger journal compaction akka.persistence.journal.leveldb.compaction-intervals { persistence-id-1 = 100 persistence-id-2 = 200 # ... persistence-id-N = 1000 # use wildcards to match unspecified persistence ids, if any "*" = 250 }
Akka內置了用於測試的可共享的LevelDB實例,啓用配置以下:
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared" akka.persistence.journal.leveldb-shared.store.dir = "target/shared"
在使用前,還必須用SharedLeveldbJournal.setStore注入一個Actor完成初始化:
import akka.persistence.journal.leveldb.SharedLeveldbStore trait SharedStoreUsage extends Actor { override def preStart(): Unit = { context.actorSelection("akka://example@127.0.0.1:2552/user/store") ! Identify(1) } def receive = { case ActorIdentity(1, Some(store)) => SharedLeveldbJournal.setStore(store, context.system) } } // 而後就能正常使用了 val store = system.actorOf(Props[SharedLeveldbStore], "store")
Akka還內置了用於測試的Persistence Plugin Proxy,它經過定向轉發整合若干個Journal,從而讓多個Actor共享底層的持久化支持,在一個Journal節點崩潰後,也能經過其災備節點繼續爲Actor提供事件存儲和快照支持。代理的啓動,則能夠經過實例化PersistencePluginProxyExtension擴展或調用PersistencePluginProxy.start方法來完成。
# 配置信息需放入相應配置塊 akka.persistence.journal.proxy { ... } akka.persistence.snapshot-store.proxy { ... } # 指定底層的Journal target-journal-plugin = akka.persistence.journal.leveldb target-snapshot-store-plugin = ... # 指定用於初始化代理的ActorSystem start-target-journal = xxx.xxx.myActor start-target-snapshot-store = ... ## 指定Actor的位置(也能夠在初始化代碼中調用PersistencePluginProxy.setTargetLocation指定) target-journal-address = target-snapshot-store-address =
爲自定義持久化事件的後端數據庫支持,Akka Persistence公開了一組API。
🏭
import akka.persistence._
import akka.persistence.journal._
import akka.persistence.snapshot._
🔗 AsyncWriteJournal本質也是一個Actor,公開的方法只有如下3個:
/** * Plugin API: asynchronously writes a batch (`Seq`) of persistent messages to the * journal. * * The batch is only for performance reasons, i.e. all messages don't have to be written * atomically. Higher throughput can typically be achieved by using batch inserts of many * records compared to inserting records one-by-one, but this aspect depends on the * underlying data store and a journal implementation can implement it as efficient as * possible. Journals should aim to persist events in-order for a given `persistenceId` * as otherwise in case of a failure, the persistent state may be end up being inconsistent. * * Each `AtomicWrite` message contains the single `PersistentRepr` that corresponds to * the event that was passed to the `persist` method of the `PersistentActor`, or it * contains several `PersistentRepr` that corresponds to the events that were passed * to the `persistAll` method of the `PersistentActor`. All `PersistentRepr` of the * `AtomicWrite` must be written to the data store atomically, i.e. all or none must * be stored. If the journal (data store) cannot support atomic writes of multiple * events it should reject such writes with a `Try` `Failure` with an * `UnsupportedOperationException` describing the issue. This limitation should * also be documented by the journal plugin. * * If there are failures when storing any of the messages in the batch the returned * `Future` must be completed with failure. The `Future` must only be completed with * success when all messages in the batch have been confirmed to be stored successfully, * i.e. they will be readable, and visible, in a subsequent replay. If there is * uncertainty about if the messages were stored or not the `Future` must be completed * with failure. * * Data store connection problems must be signaled by completing the `Future` with * failure. * * The journal can also signal that it rejects individual messages (`AtomicWrite`) by * the returned `immutable.Seq[Try[Unit]]`. It is possible but not mandatory to reduce * number of allocations by returning `Future.successful(Nil)` for the happy path, * i.e. when no messages are rejected. Otherwise the returned `Seq` must have as many elements * as the input `messages` `Seq`. Each `Try` element signals if the corresponding * `AtomicWrite` is rejected or not, with an exception describing the problem. Rejecting * a message means it was not stored, i.e. it must not be included in a later replay. * Rejecting a message is typically done before attempting to store it, e.g. because of * serialization error. * * Data store connection problems must not be signaled as rejections. * * It is possible but not mandatory to reduce number of allocations by returning * `Future.successful(Nil)` for the happy path, i.e. when no messages are rejected. * * Calls to this method are serialized by the enclosing journal actor. If you spawn * work in asynchronous tasks it is alright that they complete the futures in any order, * but the actual writes for a specific persistenceId should be serialized to avoid * issues such as events of a later write are visible to consumers (query side, or replay) * before the events of an earlier write are visible. * A PersistentActor will not send a new WriteMessages request before the previous one * has been completed. * * Please note that the `sender` field of the contained PersistentRepr objects has been * nulled out (i.e. set to `ActorRef.noSender`) in order to not use space in the journal * for a sender reference that will likely be obsolete during replay. * * Please also note that requests for the highest sequence number may be made concurrently * to this call executing for the same `persistenceId`, in particular it is possible that * a restarting actor tries to recover before its outstanding writes have completed. In * the latter case it is highly desirable to defer reading the highest sequence number * until all outstanding writes have completed, otherwise the PersistentActor may reuse * sequence numbers. * * This call is protected with a circuit-breaker. */ def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] /** * Plugin API: asynchronously deletes all persistent messages up to `toSequenceNr` * (inclusive). * * This call is protected with a circuit-breaker. * Message deletion doesn't affect the highest sequence number of messages, * journal must maintain the highest sequence number and never decrease it. */ def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] /** * Plugin API * * Allows plugin implementers to use `f pipeTo self` and * handle additional messages for implementing advanced features * */ def receivePluginInternal: Actor.Receive = Actor.emptyBehavior
若是想讓Journal只支持同步寫入,那麼按以下方式阻塞掉異步的寫入便可:
def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = Future.fromTry(Try { // blocking call here ??? })
Journal還必須實現AsyncRecovery中定義的用於重塑和序列號恢復的方法:
/** * Plugin API: asynchronously replays persistent messages. Implementations replay * a message by calling `replayCallback`. The returned future must be completed * when all messages (matching the sequence number bounds) have been replayed. * The future must be completed with a failure if any of the persistent messages * could not be replayed. * * The `replayCallback` must also be called with messages that have been marked * as deleted. In this case a replayed message's `deleted` method must return * `true`. * * The `toSequenceNr` is the lowest of what was returned by [[#asyncReadHighestSequenceNr]] * and what the user specified as recovery [[akka.persistence.Recovery]] parameter. * This does imply that this call is always preceded by reading the highest sequence * number for the given `persistenceId`. * * This call is NOT protected with a circuit-breaker because it may take long time * to replay all events. The plugin implementation itself must protect against * an unresponsive backend store and make sure that the returned Future is * completed with success or failure within reasonable time. It is not allowed * to ignore completing the future. * * @param persistenceId persistent actor id. * @param fromSequenceNr sequence number where replay should start (inclusive). * @param toSequenceNr sequence number where replay should end (inclusive). * @param max maximum number of messages to be replayed. * @param recoveryCallback called to replay a single message. Can be called from any * thread. * * @see [[AsyncWriteJournal]] */ def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)( recoveryCallback: PersistentRepr => Unit): Future[Unit] /** * Plugin API: asynchronously reads the highest stored sequence number for the * given `persistenceId`. The persistent actor will use the highest sequence * number after recovery as the starting point when persisting new events. * This sequence number is also used as `toSequenceNr` in subsequent call * to [[#asyncReplayMessages]] unless the user has specified a lower `toSequenceNr`. * Journal must maintain the highest sequence number and never decrease it. * * This call is protected with a circuit-breaker. * * Please also note that requests for the highest sequence number may be made concurrently * to writes executing for the same `persistenceId`, in particular it is possible that * a restarting actor tries to recover before its outstanding writes have completed. * * @param persistenceId persistent actor id. * @param fromSequenceNr hint where to start searching for the highest sequence * number. When a persistent actor is recovering this * `fromSequenceNr` will be the sequence number of the used * snapshot or `0L` if no snapshot is used. */ def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long]
編碼完成後,經過配置便可啓用Journal,但切記不能在默認Dispatcher上執行Journal的任務或者Future,不然會形成其餘Actor陷入飢餓:
# Path to the journal plugin to be used akka.persistence.journal.plugin = "my-journal" # My custom journal plugin my-journal { # Class name of the plugin. class = "docs.persistence.MyJournal" # Dispatcher for the plugin actor. plugin-dispatcher = "akka.actor.default-dispatcher" }
🔗 SnapshotStore也是一個Actor:
/** * Plugin API: asynchronously loads a snapshot. * * If the future `Option` is `None` then all events will be replayed, * i.e. there was no snapshot. If snapshot could not be loaded the `Future` * should be completed with failure. That is important because events may * have been deleted and just replaying the events might not result in a valid * state. * * This call is protected with a circuit-breaker. * * @param persistenceId id of the persistent actor. * @param criteria selection criteria for loading. */ def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] /** * Plugin API: asynchronously saves a snapshot. * * This call is protected with a circuit-breaker. * * @param metadata snapshot metadata. * @param snapshot snapshot. */ def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] /** * Plugin API: deletes the snapshot identified by `metadata`. * * This call is protected with a circuit-breaker. * * @param metadata snapshot metadata. */ def deleteAsync(metadata: SnapshotMetadata): Future[Unit] /** * Plugin API: deletes all snapshots matching `criteria`. * * This call is protected with a circuit-breaker. * * @param persistenceId id of the persistent actor. * @param criteria selection criteria for deleting. */ def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] /** * Plugin API * Allows plugin implementers to use `f pipeTo self` and * handle additional messages for implementing advanced features */ def receivePluginInternal: Actor.Receive = Actor.emptyBehavior
相似Journal,編碼完成後經過配置便可啓用Snapshot插件:
# Path to the snapshot store plugin to be used akka.persistence.snapshot-store.plugin = "my-snapshot-store" # My custom snapshot store plugin my-snapshot-store { # Class name of the plugin. class = "docs.persistence.MySnapshotStore" # Dispatcher for the plugin actor. plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" }
Akka開發了TCK(Technology Compatibility Kit),用於插件的測試。
🏭 com.typesafe.akka:akka-persistence-tck_2.12:2.6.6
如下分別是Journal與Snapshot必備的測試。若是插件須要一些額外的設置,好比啓動模擬數據庫、刪除臨時文件等,那麼能夠覆寫beforeAll和afterAll方法:
class MyJournalSpec extends JournalSpec( config = ConfigFactory.parseString("""akka.persistence.journal.plugin = "my.journal.plugin"""")) { override def supportsRejectingNonSerializableObjects: CapabilityFlag = false // or CapabilityFlag.off override def supportsSerialization: CapabilityFlag = true // or CapabilityFlag.on } class MySnapshotStoreSpec extends SnapshotStoreSpec( config = ConfigFactory.parseString(""" akka.persistence.snapshot-store.plugin = "my.snapshot-store.plugin" """)) { override def supportsSerialization: CapabilityFlag = true // or CapabilityFlag.on }
若是沒法阻止用戶同時運行具備相同persistenceId的Actor,則事件日誌Log可能會因具備相同序列號的事件而被破壞。建議Journal在重塑過程當中仍繼續傳遞這些事件,同時使用reply-filter來決定如何處理。
使用Gradle打包,主要藉助其Java插件的Jar任務來完成。爲了保證多個reference.conf正確合併,推薦使用Gradle插件🔗 Shadow plugin,而後在build.gradle裏這樣寫:
import com.github.jengelman.gradle.plugins.shadow.transformers.AppendingTransformer plugins { id 'java' id "com.github.johnrengelman.shadow" version "5.0.0" } shadowJar { transform(AppendingTransformer) { resource = 'reference.conf' } with jar }
在Docker容器中,能夠同時使用Akka Remoting和Akka Cluster,但要注意配置好網絡(🔗 Akka behind NAT or in a Docker container),並適當調整可用的CPU、內存等資源。
🔗 https://doc.akka.io/docs/akka/current/additional/books.html