Akka Typed 官方文檔之隨手記

➡️ 引言

近兩年,一直在折騰用FP與OO共存的編程語言Scala,採起以函數式編程爲主的方式,結合TDD和BDD的手段,採用Domain Driven Design的方法學,去構造DDDD應用(Domain Driven Design & Distributed)。期間,嘗試了大量的框架:業務領域主要適用Akka、Scalaz等框架,UI和基礎設施方面主要適用Spring、Kafka等框架,測試則主要適用Spock、ScalaTest、Selenium等框架。html

兩年的折騰,不能說沒有一點收穫。在覈心的領域模型設計方面,經過嘗試用傳統Akka的Actor包裹聚合,以自定義的Command和Event進行信息交換,用Free Monad構造表達樹實現延遲計算,用Akka Persistence實現Event Store,用Kafka維護命令和事件隊列,讓我依稀看到了以FP的方式實現ES+CQRS架構應用的曙光。java

但如此多的框架不只讓人眼花繚亂,還要學習Scala、Groovy、Java等編程語言以及Gradle等Build工具的DSL,若是再加上Cluster、Cloud和Micro Service,這無疑是一個浩大的工程。所幸,伴隨Akka 2.6版本推出的Akka Typed,解決了消息難以管理、框架入侵過甚、節點部署繁複等痛點和難點。特別是在ES和CQRS方面,Akka Typed能夠說是提供了一個完整的解決方案。就如同當初LINQ的出現同樣,給我打開了一扇新的窗口。node

如下,即是我學習Akka Typed官方文檔時隨看隨寫留下的筆記(語種Scala,版本2.6.5,在學習過程當中已更新至2.6.6),以備查考。內容和路線均以我我的的關注點爲主,因此只是節選且不免有失偏頗。react

📌 文中提到的RMP是指Vaughn Vernon撰寫的Reactive Messaging Patterns with the Actor Model一書。它是我學習傳統Actor模型時的重要參考書籍。git

📎 在Github上,Paweł Kaczor開發出的akka-ddd框架,是一個很是棒的學習案例。github

  • akka-ddd:一個基於Akka和EventStore實現的CQRS架構的DDD框架。
  • ddd-leaven-akka-v2:使用akka-ddd框架實現的電子商務應用。

目錄


➡️ Get Started

Actor模型的優勢

  • 事件驅動的:Actor相互之間只能用異步的消息進行聯繫,不會產生直接的耦合。
  • 強壯的隔離性: Actor不象普通的對象那樣提供可供調用的API,只會暴露它所支持的消息(通訊協議),從而避免了狀態的共享。
  • 位置的透明性: ActorSystem使用工廠建立Actor並返回其引用,因此位置可有可無,Actor也能夠啓動、中止、移動或者重啓,甚至從故障中恢復。
  • 輕量性: 每一個Actor一般只須要數百字節的開銷,因此一個應用程序徹底可能擁有上百萬個併發Actor實例。
  • Actor Library:Akka Typed的核心
  • Remoting:使Actor得以分佈部署
  • Cluster及其Sharding、Singleton:集羣支持
  • Persistence:使Actor得以將其事件持久化,是實現ES+CQRS的重要組成
  • Distributed Data:在Actor之間共享數據
  • Stream:使Actor支持流處理

示例 Internet of Things (IoT)

🔗 https://doc.akka.io/docs/akka/current/typed/guide/tutorial.htmlsql

這是一個在物聯時代,讓用戶能夠藉助遍及家中的溫度監測儀,隨時掌握屋內各個角落溫度狀況的應用。docker

akka

📌 所需的準備工做

  • 在IDEA的Editor/General/Auto Import中排除對*javadsl*的自動導入。數據庫

  • 須要的依賴:express

    implementation 'org.scala-lang:scala-library:2.13.2'
    implementation 'com.typesafe.akka:akka-actor-typed_2.13:2.6.5'
    implementation 'ch.qos.logback:logback-classic:1.2.3'
    
    testImplementation 'org.scalatest:scalatest_2.13:3.1.2'
    testImplementation 'com.typesafe.akka:akka-actor-testkit-typed_2.13:2.6.5'

📝 Note

  • 打印ActorRef將得到Actor的URL,從中可獲悉actor族譜。
  • Actor的生命週期始終保持與其父Actor一致,Actor自身中止時推薦返回Behaviors.stopped,父可用context.stop(childRef)中止葉子Actor。
  • Actor在其生命週期中可觸發PostStop之類的信號(參見RMP-47)。
  • 在Actor內部處理消息的是onMessage,處理信號的是onSignal。
  • 使用val child = context.spawn(Behaviors.supervise(child()).onFailure(SupervisorStrategy.restart), name = "child-actor")改變默認的監督策略Stop。傳統Akka採起的方式是override supervisorStrategy,用一個閉包聲明Decider函數(參見RMP-52)。
  • 「協議即API」——在Actor的世界裏,協議Protocol取代了傳統OOP的接口Interface。利用這些由Command和Event組成的協議,各方的Actor們最終將藉助from-replyTo所指向的ActorRef[Command/Event]完成對話。
  • ⚡ 傳統Akka圍繞若干Actor的實例構築整個系統,而Akka Typed則圍繞Behavior的實例構築系統,這是觀念上的巨大差異。
  • Command用如今時,能夠理解爲「我能作什麼」,是Actor對外公開的API、是它能提供的服務;而Event則用過去式,代表「我關心什麼」,是觸發Actor後續動做的事件。
  • 傳遞消息涉及網絡、主機、Actor郵箱、Actor消息處理函數等多個環節,因此很是脆弱。主要的模式有三種(參見RMP-164:確保送達機制):
    • 最多一次:消息在發出去就不用管,也不用保存消息傳送的狀態。因此消息可能會丟失。這是Actor默認採用的方式,簡單而高效。
    • 最少一次:發送後還要保存消息傳送狀態甚至進行重試,以確保收件人收到消息。因此消息不會丟失,但不能避免重複。
    • 正好一次:除了發件人,還要在收件人保存消息傳送狀態,以確保收件人不會接到重複的消息。因此消息既不會丟失,也不會重複。
  • Actor保證直連雙方的消息會嚴格按序傳送,但不保證不丟失消息。
  • 合理決定Actor的粒度,是Akka設計的核心難點:一般狀況下都推薦使用較大的粒度,以下降細粒度引入的複雜度。僅在如下一些狀況下方纔增長粒度:
    • 須要更多的Actor提供更高的併發性。
    • 場景自己須要複雜的對話。
    • 爲減小不一樣狀態之間的耦合度,須要將多個狀態分別交由更小的參與者進行獨立維護。
    • 爲隔離失敗,減小不一樣參與者之間相互的干擾與牽扯,確保失敗狀況形成最小的負面影響。
  • 使用Dead Watch實現有關Actor中止時的互動。Dead Watch關係不只限於父子之間,只要知道對方的ActorRef便可。當被觀察的Actor中止時,觀察者Actor將會收到一個Terminated(actorRefOfWatchee)信號。因爲該信號沒法附加其餘信息,因此推薦作法是將其再包裝成一條消息WatcheeTerminated,並在建立被觀察者時就用context.watchWith(watcheeActor, WatcheeTerminated(watcheeActor,...))創建觀察關係。(💀 WatcheeTerminated會被context自動填充嗎?貌似是的。)
  • 遵循CQRS的原則,在Actor裏也推薦讀寫分離,將Query放入單獨的Actor,避免對業務Actor的干擾。在示例中,由業務Actor負責建立Query Actor。
  • Query一般都要設置超時,因而引出Actor內建的調度機制,在工廠的Behaviors.setup中使用Behaviors.withTimers定義timers,而後在Actor類裏用timers.startSingleTime來調度一條通過給定延時後才發出的消息。
  • 對於跨Actor的消息,一般須要使用context.messageAdapter()來提供一個消息轉譯器。而轉譯器最簡單的方案就是把消息(一般是響應)包裹在本Actor的某個消息裏。

➡️ 術語和概念

三種無阻塞設計理念

爲防止死鎖(你們都搶,結果都吃不上)、飢餓(弱肉強食,弱者總是吃不上)和活鎖(你們都謙讓,結果都很差意思吃),有三種確保死鎖無憂的無阻塞設計,其能力由強到弱以下:

  • Wait-Freedom:須要確保每一個方法調用都能在有限的步數內完成,能保證不死鎖、不飢餓。
  • Lock-Freedom:須要確保某些關鍵方法調用能在有限的步數內完成,便可保證不死鎖,但不能避免飢餓。
  • Obstruction-Freedom:須要確保某些關鍵方法在特定的時段或條件下能在有限的步數內完成,便可避免死鎖。全部的Lock-Freedom都是Obstruction-Freedom的,反之卻不盡然。樂觀併發控制(Optimistic Concurrency Control)就是典型的Obstruction-Freedom,由於在特定的時點,當只有一名參與者在嘗試時,其共享操做便可完成。

Actor System

關於Actor體系設計的最佳實踐

整個Actor System的體系,如同一個組織,任務老是逐級下派,命令老是下級服從上級。這與常見的分層軟件設計(Layered Software Design)是不一樣的,後者老是千方百計把問題隱藏和解決在本身那一層,而不是交給上級去處理或與其餘人協商。推薦的作法主要包括:

  • 若是一個Actor攜帶的數據很是重要,那麼爲了防止自身崩潰,致使數據損失,就應該把危險的任務交給子Actor負責,確保每一個Request都由一個獨立的子Actor進行處理,並負責好子Actor失敗時的善後工做。(這被稱做Erlang的「錯誤內核模式 Error Kernel Pattern」)
  • 若是一個Actor依賴另外一個Actor來完成本身的工做,那麼就要創建Watch關係,確保接受其委託的代理Actor始終處於有效狀態。
  • 若是一個Actor承擔了太多不一樣的職責,那麼就把這些職責分派給不一樣的子Actor去負責。

關於Actor設計的最佳實踐

  • Actor應當是位很好的同事,它老是能獨立完成本身分內的工做,並且儘量不打擾別人、不獨佔資源。即使須要訪問某些外部資源,除非是逼不得已,它也不會處於阻塞狀態。
  • 不要在Actor之間傳遞可變對象,應儘量使用不可變的消息。
  • Actor被設計成包含了行爲與狀態的容器,因此不要習慣性地使用閉包等語法糖在消息裏夾帶行爲,這將因分享可變狀態而產生各類不可控的意外狀況。
  • 應用中最頂層的Actor是整個錯誤內核模式的最核心,它應當只負責啓動各個子系統,而不承擔其餘的業務職責。不然,它會因監督責任太重,影響失敗和故障的處理。

協調關機

🔗 https://doc.akka.io/docs/akka/current/coordinated-shutdown.html

當應用的全部工做完成後,能夠通知/user監督者中止運行,或者調用ActorSystem.terminate方法,從而經過運行協調關機CoordinatedShutdown來中止全部正在運行的Actor。在此過程當中,你還能夠執行其餘一些清理和掃尾工做。

Actor基礎

官方文檔有專章講解Actor的方方面面,本章只是介紹基本概念。

Actor的主要做用包括:向熟識的其餘Actor發送消息,建立新的Actor,指定處理下一條消息的行爲。它做爲一個容器,包括有狀態State、行爲Behavior、郵箱Mailbox、監督策略Supervisor Strategy以及若干的子Actor等內容物,且該容器只能經過指定消息類型的參數化ActorRef進行引用,以確保最基本的隔離:

  • State能夠是一臺複雜的狀態機,也能夠只是一個簡單的計數值,本質上是由Actor內部維護的一個狀態。它將在Actor重啓時回覆到Actor剛建立時候的樣子,或者也能夠採用Event Sourcing的方式在重啓後恢復到故障發生前的樣子。
  • Behavior老是和當前Actor要處理的消息相對應,而且在Actor建立之初總會有一個初始化的行爲。而在Actor的生命週期內,Actor的Behavior將可能隨Actor的狀態變化而變化,由上一個Behavior切換至下一個Behavior。
    • 因爲消息老是發送給ActorRef的,而這背後實際對應的是能響應該消息的Behavior,因此這種對應關係必須在Actor建立之時就聲明,且Behavior自身也和ActorRef同樣是參數化的,這同時也決定了彼此切換的兩個Behavior必須是類型相容的,不然便沒法與其ActorRef保持一致。(💀 這即是爲何同一個Actor的Message要從同一個trait派生,以代表它就只處理這一類的消息。)
    • 在迴應Command的回覆消息裏,一般都會包括指向應回覆Actor的replyTo引用,因此能以這種方式把第三者引入當前的會話當中。
  • Mailbox按照消息的發送時間將收到的消息排好隊,再交給Actor處理。默認的Mailbox是FIFO隊列。從Mailbox中出隊的消息,老是交由當前的Behavior進行處理。若是Behavior沒法處理,就只能做失敗處理。
  • Child Actors老是由父Actor監管,在spawn或stop後從context的列表中加入或退出,且這一類異步操做不會形成父Actor的阻塞。
  • Supervisor Strategy用於定義異常發生時的應對策略。默認狀況下Akka Typed在觸發異常時採起中止Actor的策略,而傳統的Akka則採起的重啓策略。

在Actor終止後,其持有的全部資源將被回收,剩下未處理的消息將轉入Actor System的死信郵箱Dead Letter Mailbox,然後續新傳來的消息也將悉數轉到System的EventStream做爲死信處理。

監管與監測

⚠️ Akka Typed的監管已經從新設計,與傳統Akka有顯著區別

監管 Supervision

監管的對象是意料以外的失敗(Unexpected Failure),而不是校驗錯誤或者try-catch能處理的預期異常。因此,監管是Actor的額外裝飾,並不屬於Actor消息處理的組成部分。然後者,則是屬於Actor業務邏輯的一部分。

當失敗發生時,監管的策略包括如下三種:

  • Resume:恢復Actor及其內部狀態。
  • Restart:清理Actor內部狀態並恢復到Actor剛建立時候的樣子。實際上,這是由父Actor使用一個新的Behavior實例替換掉當前失敗Child Actor的行爲,並用新的Actor接管失敗Actor的郵箱,從而實現重啓。
  • Stop:永久地中止Actor。

⚡ 要注意的是,引起失敗的那條消息將不會再被處理,並且期間Actor發生的這些變化,在父Actor之外的範圍都是不可知的。

生命週期監測 Lifecycle Monitoring

Lifecycle Monitoring一般是指DeathWatch(💀 以前叫Dead Watch,Watch觀察,Monitoring監測,譯爲觀察更爲妥帖)。這是除了父子間的監管關係外,Actor之間另外一種監測關係。因爲Supervision致使的Actor Restart對外是不可知的,因此要用Monitoring在一對Actor之間創建監測關係。但從目的上講兩者是有區別的,Supervision主要爲應對失敗情形,Monitoring主要爲確保另外一方知悉本方已終止運行。

使用context.watch(targetActorRef)及unwatch來創建或撤銷監測關係。當被監測Actor終止時,監測方Actor將收到一條Terminated消息(💀不是Signal嗎?),而默認的消息處理是拋出一個DeathPactException

⚡ 要注意的是,監測關係的創建和目標Actor終止時間無關。這就意味着在創建監測關係時,即便目標Actor已經終止,此時監測Actor仍將收到一條Terminated消息。

在消息處理過程當中觸發異常時的結果

  • 對消息而言:該消息將被丟失,再也不退回到郵箱。因此必須本身捕獲異常,並創建相應的重試機制,併兼顧到非阻塞的要求。
  • 對郵箱而言:沒有任何影響,後續的消息即便Actor被重啓也將所有保留。
  • 對Actor而言:若是Actor將異常拋出,則其將被父Actor掛起(Suspend),並根據父Actor的監管策略決定將被恢復、重啓仍是終止。

容錯能力設計

🔗 https://doc.akka.io/docs/akka/current/typed/fault-tolerance.html

Actor引用、路徑和地址

akka

一些基本的Actor Reference

  • ActorContext.self:指向本身的引用
  • PromiseActorRef:由Ask方式爲回調而建立的ActorRef
  • DeadLetterActorRef:默認的死信服務提供的ActorRef
  • EmptyLocalActorRef:當被查找的Actor不存在時Akka使用的ActorRef。它雖等價於DeadLetterActorRef,但因其保留有path,所以該引用仍可被傳送,用以與位於相同路徑的Actor引用進行比較,以肯定後者是否爲Actor死亡前得到的。(💀 有點相似Null Object模式。)

Actor引用與路徑之間的區別

  • Reference與Actor同生共死,隨着Actor生命結束而失效。因此即使是處於同一Path的新舊2個Actor,也不會有同一個Reference,這也意味着發給舊ActorRef的消息永遠不會自動轉發發新的ActorRef。
  • Path只是一個表明族譜關係的名字,不存在生存週期,因此永不會失效。
獲取Reference的2個主要渠道
  • 直接建立Actor。
  • 經過接線員Receptionist從已註冊的Actor裏查找。

Actor與Java內存模型

爲防止Actor相互可見和消息亂序問題,Akka嚴格遵照如下兩條「發生以前(happens before)」守則:

  • The actor send rule:發件人發送消息將始終先於收件人收到消息。
  • The actor subsequent processing rule:任何一個Actor,在任一時刻,有且只能處理一條消息。處理完成當前消息後,才接着處理下一條消息。

可靠的消息投遞

Delivery翻譯爲「投遞」更爲妥帖,更好模仿郵政業務的妥投等術語。「送達」側重結果,「發送"側重動做自己。

Akka消息投遞遵循的兩條原則

  • 一條消息最多被投遞一次。從業務角度講,相比命令發成功沒有,咱們實際更關心對方的回覆,有回覆即印證對方收到命令了,不然重發命令進行催促便可。
  • 在一對發件人-收件人之間,消息的發送與接收順序始終保持一致(僅限於用戶自定義消息,不包括父子間的系統消息)

Akka消息傳遞採用的ACK-RETRY協議內容

  • 區分不一樣的消息及其確認消息的標識機制
  • 在超時前仍未收到預期的確認消息時的重試機制
  • 收件人甄別重複消息並決定丟棄它的檢測機制。實現它的第一種方式,是直接採用Akka的妥投模塊,改變消息投遞模式爲最少投遞一次。第二種方式,是從業務邏輯的角度,確保消息處理的設計是冪等的。

保證妥投模塊

藉助Akka Persistence確保消息妥投。(參見RMP-164)

🔗 https://doc.akka.io/docs/akka/current/typed/reliable-delivery.html

事件溯源

事件溯源的本質,是執行一條Command,衍生出若干條Event,這些Event既是Command產生的反作用,也是改變對象狀態的動因,及其生命週期內不可變的歷史。

Akka Persistence對事件溯源提供了直接支持。

🔗 https://doc.akka.io/docs/akka/current/typed/persistence.html#event-sourcing-concepts

帶確認回執的郵箱

能夠經過自定義郵箱,實現消息投遞的重試。但這多數僅限於本地通信的場景,具體緣由參見🔗 The Rules for In-JVM (Local) Message Sends

死信

沒法妥投的而不是因網絡故障等緣由被丟失了的消息,將被送往名爲/deadLetters的Actor,所以這些消息被稱爲Dead Letter(參見RMP-161)。產生死信的緣由主要是收件人不詳或已經死亡,而死信Actor也主要用於系統調試。

因爲死信不能經過網絡傳遞,因此要蒐集一個集羣內的全部死信,則須要一臺一臺地收集每臺主機本地的死信後再進行彙總。經過在系統的Event Stream對象akka.actor.DeadLetter中註冊,普通Actor將能夠訂閱到本地的全部死信消息。

配置

Akka使用Typesafe Config Library管理配置信息。該庫獨立於Akka,也可用於其餘應用的配置信息管理。

Akka的ActorSystem在啓動時,全部的配置信息均會經過解析class path根目錄處的application.conf/.json/.properties等文件而加載入Config對象,並經過合併全部的reference.conf造成後備配置。

⚠️ 若正在編寫的屬於Akka應用程序,則Akka配置信息應寫入application.conf;如果基於Akka的庫,則配置信息應寫入reference.conf。而且,Akka不支持從另外一個庫中覆寫(override)當前庫中的config property。

配置信息既能夠從外部配置文件加載,也可用代碼實現運行時解析,還能夠利用ConfigFactory.load()從不一樣地方加載。

import akka.actor.typed.ActorSystem
import com.typesafe.config.ConfigFactory

val customConf = ConfigFactory.parseString("""
  akka.log-config-on-start = on
""")
// ConfigFactory.load sandwiches customConfig between default reference
// config and default overrides, and then resolves it.
val system = ActorSystem(rootBehavior, "MySystem", ConfigFactory.load(customConf))

一個典型的多項目配置示例:

myapp1 {
  akka.loglevel = "WARNING"
  my.own.setting = 43
}
myapp2 {
  akka.loglevel = "ERROR"
  app2.setting = "appname"
}
my.own.setting = 42
my.other.setting = "hello"

相應的配置信息加載代碼示例:

val config = ConfigFactory.load()
val app1 = ActorSystem(rootBehavior, "MyApp1", config.getConfig("myapp1").withFallback(config))
val app2 = ActorSystem(rootBehavior, "MyApp2", config.getConfig("myapp2").withOnlyPath("akka").withFallback(config))

🔗 Akka的默認配置列表,長達近千行……

📎 Akka Config Checker是用於查找Akka配置衝突的有力工具。


➡️ Actors

🏭 com.typesafe.akka:akka-actor-typed:2.6.5

Actor概貌

Hello World

示例HelloWorld是由HelloWorldMain建立一個HelloWorld(即Greeter),在每次ActorSystem要求HelloWorld SayHello的時候,就建立一個SayHello消息所賦名稱對應的HelloWorldBot(因此會有若干個動做相同但名稱不一樣的Bot),而後要求Greeter去向這個Bot問好,最後以Greeter與Bot相互問候數次做爲結束。

示例採用了FP風格,Actor的狀態和行爲均在Singleton對象裏定義,採用了相似傳統Akka receive()的函數Behaviors.receive { (context, message) => ... },以消息類型做爲約束,實現了Actor的互動與組合。在每一個Bot裏,利用消息的遞歸重入維持一個Greeting的計數值,屆滿則用Behaviors.stopped中止響應,不然遞歸重入。

Behaviors.receive {...}與receiveMessage {...}的區別,在於前者將把context帶入閉包。

ChatRoom

這是一個相似聊天室功能的示例,各Actor的職責、定義和聯繫以下表:

Actor 職責 Behavior類型 Command Event
Main 建立聊天室ChatRoom和客戶Gabbler,併爲兩者牽線搭橋 NotUsed
ChatRoom 建立並管理一組Session RoomCommand
  • GetSession(screenName: String, replyTo: ActorRef[SessionEvent])
  • PublishSessionMessage(screenName: String, message: String)
    Session 負責播發諸如Gabbler這樣的Client的發言 SessionCommand
    • PostMessage(message: String)
    • NotifyClient(msgEvent: MessagePosted)
    Gabbler 響應Session SessionEvent
    • SessionGranted(session: ActorRef[PostMessage])- SessionDenied(reason: String)
    • MessagePosted(screenName: String, message: String)

    示例先採用FP風格實現。好比ChatRoom在處理GetSession消息時,最後以chatRoom(ses :: sessions)返回一個新的Behavior實例結束,這裏的sessions正是Actor ChatRoom維護的狀態。

    示例演示瞭如何限制消息的發件人。好比session及其工廠方法,以及PublishSessionMessage類型均爲chatroom私有,外部不可訪問;在session Behavior的PostMessage分支中,chatroom的ActorRef經過工廠方法傳入session,且類型被限制爲ActorRef[PublishSessionMessage]。這樣外界只能與ChatRoom通訊,而後由ChatRoom在內部將消息轉交Session處理。

    處理消息的參數來源於工廠方法的傳入參數,仍是封裝在消息的字段裏,這個示例也分別給出了樣板。💀 在設計通訊協議時,消息定義爲Command仍是Event,消息的主人是誰,處理消息須要的參數如何傳入等等,都是須要考慮的問題。

    爲實現程序安全退出,示例在Main的Behavior裏,設置了Dead Watch觀察gabbler,並定義了Behaviors.receiveSignal {...},在收到gabbler處理完MessagePosted消息,因返回Behaviors.stopped而發出的Terminated信號後,以Main自身的Behaviors.stopped做爲結束。

    ⚡ Behaviors.setup是一個Behavior的工廠方法,該Behavior的實例將在Actor啓動後才建立。而Behaviors.receive雖也是Behavior的工廠方法之一,但Behavior的實例倒是在Actor啓動的那一刻就同時建立的。

    Actor的生命週期

    Actor是一個須要顯式啓停而且自帶狀態的資源(子Actor與隨父Actor雖不共生、但定共死),因此回想在GC出現前須要本身管理內存句柄的時代吧。

    Actor System是一個高能耗的系統,因此一般一個應用或者一個JVM裏只有一個Actor System。

    建立Actor

    ActorContext

    ActorContext可用做:

    • 孵化(Spawn)子Actor和監管關係。
    • 觀察(Watch)其餘Actor,並在被觀察Actor中止運行時收到Terminated事件(信號)。
    • 記錄日誌(Logging)。
    • 建立消息適配器Message Adapter。
    • 以Request-Response方式與其餘Actor進行交互。
    • 訪問Actor自身引用self

    ActorContext自己並非徹底線程安全的,主要有如下限制:

    • 不能從Future回調函數的線程訪問。
    • 不能在多個Actor實例之間進行共享。
    • 只能在普通的消息處理線程裏使用。
    孵化子Actor

    孵化有兩層含義:建立並啓動。

    孵化協議SpawnProtocol

    在使用Behaviors.setup啓用SpawnProtocol後,在應用中任何地方都將能夠不直接引用context,改用telling或asking方式完成Actor系統的組裝。其中,Ask方式的使用相似傳統Akka,它將返回Future[ActorRef[XX]]。

    ⚡ 留意示例代碼裏的幾處泛型約束,由這些Message串起了應用的流程。

    // 啓用SpawnProtocol的Actor
    object HelloWorldMain {
      def apply(): Behavior[SpawnProtocol.Command] =
        Behaviors.setup { context =>
          // Start initial tasks
          // context.spawn(...)
    
          SpawnProtocol()
        }
    }
    
    implicit val system: ActorSystem[SpawnProtocol.Command] =
      ActorSystem(HelloWorldMain(), "hello")
    
    val greeter: Future[ActorRef[HelloWorld.Greet]] =
      system.ask(SpawnProtocol.Spawn(behavior = HelloWorld(), name = "greeter", props = Props.empty, _))
    
    val greetedBehavior = Behaviors.receive[HelloWorld.Greeted] { (context, message) =>
      context.log.info2("Greeting for {} from {}", message.whom, message.from)
      Behaviors.stopped
    }
    
    val greetedReplyTo: Future[ActorRef[HelloWorld.Greeted]] =
      system.ask(SpawnProtocol.Spawn(greetedBehavior, name = "", props = Props.empty, _))
    
    for (greeterRef <- greeter; replyToRef <- greetedReplyTo) {
      greeterRef ! HelloWorld.Greet("Akka", replyToRef)
    }

    中止Actor

    Actor能夠經過返回Behaviors.stopped做爲接替Behavior來中止自身運行。

    子Actor能夠在處理完當前消息後,被其父Actor使用ActorContext.stop方法強行關停。

    全部子Actor都將伴隨其父Actor關停而關停。

    當Actor中止後將會收到一個PostStop信號,能夠用Behaviors.receiveSignal在該信號的處理方法裏完成其餘的清理掃尾工做,或者提早給Behaviors.stopped傳入一個負責掃尾的閉包函數,以實現Actor優雅地關停。(💀 經測試,前者將先於後者執行。)

    觀察Actor

    因爲Terminated信號只帶有被觀察者的ActorRef,因此爲了添加額外的信息,在註冊觀察關係時能夠用context.watchWith(watchee, SpecifiedMessageRef)取代context.watch(watchee)。這樣在Terminated信號觸發時,觀察者將收到預約義的這個SpecifiedMessageRef。

    ⚡ 註冊、撤銷註冊和Terminated事件的到來,在時序上並不必定嚴格遵照先註冊後Terminated這樣的規則,由於消息是異步的,且有郵箱的存在。

    交互模式

    Actor之間的交互,只能經過彼此的ActorRef[Message]來進行。這些ActorRef和Message,構成了Protocol的所有,既代表了通訊的雙方,也代表了Actor能處理的消息、限制了能發給它的消息類型。

    📎 要運行示例代碼,須要導入日誌和Ask模式的支持:

    import akka.actor.typed.scaladsl.LoggerOps
    import akka.actor.typed.scaladsl.AskPattern._

    而且在test/resources文件夾下的logback-test.xml裏配置好日誌:

    <?xml version="1.0" encoding="UTF-8"?>
    <configuration>
    
      <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
          <level>INFO</level>
        </filter>
        <encoder>
          <pattern>[%date{ISO8601}] [%level] [%logger] [%marker] [%thread] - %msg MDC: {%mdc}%n</pattern>
        </encoder>
      </appender>
    
      <appender name="CapturingAppender" class="akka.actor.testkit.typed.internal.CapturingAppender"/>
    
      <logger name="akka.actor.testkit.typed.internal.CapturingAppenderDelegate">
        <appender-ref ref="STDOUT"/>
      </logger>
    
      <root level="DEBUG">
        <appender-ref ref="CapturingAppender"/>
      </root>
    </configuration>

    ⭕ Fire-Forget

    使用異步的、線程安全的tell發出消息,但不保證對方收到消息,也不關心該消息是否被對方處理完畢了。

    akka

    實現要點

    recipient ! message

    適用場景
    • 當消息是否被處理可有可無時;
    • 當對於消息未妥投或未處理的情形不須要處置預案時;
    • 當爲了提升吞吐量而須要最小化消息數量時(一般爲發送一條響應須要建立兩倍數量的消息)。
    缺點
    • 若是來信數量超過處理能力,會把郵箱撐破;
    • 發件人不會知曉消息是否丟失了。

    ⭕ Request & Response

    發件人發出Request並附上回信地址,並以得到收件人的Response做爲消息妥投並被處理的確信。

    akka

    實現要點

    先定義Request和Response,隨後sender在發出Request時把self做爲replyTo的ActorRef一併傳出,方便recipient收到Request後回覆Response。

    適用場景
    • 當須要訂閱對方的Response時。
    缺點
    • Actor之間一般不會爲彼此通訊而專門定義一個Response消息(參見Adapted Response);
    • 若是未收到Response,很難肯定到底是由於Request未妥投仍是未被對方處理(參見ask方式);
    • 若是沒有Request與Response之間一一對應的甄別機制或上下文,必然毫無用處(參見ask方式,或者每會話子Actor模式)。

    ⭕ Adapted Response

    把收件人的Response進行簡單封裝,即做爲發件人可處理的消息類型,從而減小發件人定義Protocol的負擔。

    akka

    實現要點

    定義收件人recipient的Response類型,再在sender裏定義適配後的Response類型,而後在其Behavior.setup裏用context.messageAdapter(rsp => WrappedResponse(rsp))註冊一個消息適配器,最後在適配後消息的分支裏取出原始的Response(當初由收件人回覆的),再處理該消息。適配器能匹配預約義的響應類型及其派生類,總以最晚註冊的爲有效版本,屬於sender且與sender同生命週期,因此當適配器觸發異常時將致使其宿主中止。

    適用場景
    • 當須要在2種不一樣協議間進行轉譯時;
    • 當須要訂閱一個Actor返回的多種Response時。
    缺點
    • 若是未收到Response,很難肯定到底是由於Request未妥投仍是未被對方處理(參見ask方式);
    • 每種Response類型在任什麼時候候只能有一個有效的適配器,因此若干個Actor只能共用一個適配器。
    • 若是沒有Request與Response之間一一對應的甄別機制或上下文,必然毫無用處。

    ⭕ 在Actor之間使用ask方式實現Request-Response

    把Request-Response本來使用的tell方式改成ask,從而能限定Response發回的時間,超時則視做ask失敗。

    akka

    實現要點

    在提問人中,經過Behaviors.setup定義隱式的超時時限,以context.ask(recipientRef, request) { case Success(Response(msg)) => AdaptedResponse(msg); case Failure(_) => AdaptedResponse(...) }使用ask方式發出Request,並備妥Response到來時的適配預案(無需再額外象Adapted Response那樣註冊消息適配器),最後用工廠Behaviors.receiveMessage定義適配後響應消息的處理函數。

    適用場景
    • 當提問人須要查詢單次的Response時;
    • 當提問人須要根據上一次Request的回覆狀況來決定下一步怎麼作時;
    • 當提問人在指定時限內未收到Response,須要在這種狀況下決定重發Request時;
    • 當提問人須要主動跟蹤Request被處理的狀況,而不是一味追問答覆人時(參見RMP-93 Back Pressure回壓模式,相似有最大容量限制的阻塞隊列,超出的請求將被直接拒絕);
    • 當Protocol在設計時遺漏了必要的上下文信息,但又須要將信息臨時添附到會話中時。(這是指提問人在使用context.ask發出Request前,在ask調用語句前放置的相關信息。💀 這安全嗎,若是這些信息被其餘代碼修改了怎麼辦?真有必要的話,爲何不放進Request消息的結構裏?)
    缺點
    • ask一次只能獲得一條Response消息;
    • 提問人給本身提問設置了時限,答覆人卻未必知曉。因此當ask超時那一刻,答覆人可能還在處理Request甚至纔剛收到正要處理;
    • 很難決策超時設置多長爲妥,不當的時限設置可能致使過多的誤報。

    ⭕ 從Actor系統外部使用ask方式實現Request-Response

    在Actor System之外直接用ask向某個Actor提問,最終獲得用Future包裝好的回覆。

    akka

    實現要點

    定義隱式的ActorSystem實例和超時時限,用reply: Future[Response] = recipient.ask(ref => Request).flatMap { case response => Future.successful(response); case another => Future.failed(...) }定義Request-Response的對應關係,再經過system.executionContext啓動執行,最後在reply的回調onComplete { case Success(Response) => ...; case Failure(_) => ...}裏取出Response區別處理。

    適用場景
    • 當須要從Actor系統以外向某個Actor提問時。
    缺點
    • 處在不一樣線程內的Future回調將可能致使各類意外;
    • ask一次只能獲得一條Response消息;
    • 提問人給本身提問設置了時限,答覆人卻未必知曉。

    ⭕ 忽略回覆

    當不關心收件人的迴應時,在Request裏把回信地址設置爲何也不幹的ignoreRef,使模式從Request-Response變爲Fire-Forget。

    實現要點

    發件人發出Request時,把回覆地址從context.self改成什麼消息也不處理的context.system.ignoreRef

    適用場景
    • 當協議裏本設定有回覆類型,但發件人偶爾不關心Response時。
    缺點

    因爲ignoreRef將忽略全部發給它的消息,因此使用時必須當心。

    • 若是使用不當,將會中斷兩個Actor的已有聯繫;
    • 當有外部ask請求發來時,ignoreRef將一定致使超時。
    • Watch ignoreRef將變得沒有意義。

    ⭕ 自提Future結果

    在Actor內部有Future類型的調用時,使用pipeToSelf獲取回調結果。儘管直接用Future.onComplete也能取出結果,但會所以將Actor的內部狀態暴露給外部線程(在onComplete裏能直接訪問Actor內部狀態),因此並不安全。

    akka

    實現要點

    在Actor內部,先定義Future調用futureResult,再使用context.pipeToSelf(futureResult) { case Success(_) => WrappedResult(...); case Failure(_) => WrappedResult(...)}將回調結果封裝入WrappedResult消息,最後在WrappedResult消息分支裏再做迴應。

    適用場景
    • 當須要從Actor裏使用Future訪問諸如數據庫之類的外部資源時;
    • 當Actor依賴Future返回結果才能完成消息處理時;
    • 當須要在Future返回結果時仍保持調用前上下文時。
    缺點
    • 引入了額外的消息包裝。

    ⭕ 每會話子Actor

    當一份響應須要綜合多個Actor的回覆信息才能做出時,由一個父Actor委託多個子Actor蒐集信息,待信息齊備後才由父Actor彙總發回給Request的請求人,請求人除與父Actor之間的協議外,對其間細節一律不知。這些子Actor僅活在每次會話期間,故名爲「每會話」的子Actor。

    akka

    實現要點

    由父Actor在Behaviors.setup裏構造實際承擔工做的一組子Actor,在Request處理過程當中構造負責組織協調子Actor的管家Actor(其行爲類型爲Behavior[AnyRef],以保證類型最大程度地兼容)。隨後在管家Actor的Behaviors.setup裏向子Actor發出Request,接着在Behaviors.receiveMessage裏,使用遞歸反覆嘗試從子Actor的Response裏取出結果(生產條件下應該設定子Actor響應超時)。當全部結果都取出後,由管家Actor利用父Actor傳入的replyTo直接向外發出Response,最後中止管家Actor。

    這當中的關鍵點包括:一是在管家Actor裏的幾處,使用narrow限定Actor的類型T:<U,這也算是一種妥協,確保消息類型爲子類型T而非父類型U,從而實現更嚴謹的約束;二是利用遞歸配合Option[T]取出子Actor的響應結果。

    // 子Actor
    case class Keys()
    case class Wallet()
    
    // 父Actor
    object Home {
      sealed trait Command
      case class LeaveHome(who: String, replyTo: ActorRef[ReadyToLeaveHome]) extends Command
      case class ReadyToLeaveHome(who: String, keys: Keys, wallet: Wallet)
    
      def apply(): Behavior[Command] = {
        Behaviors.setup[Command] { context =>
          val keyCabinet: ActorRef[KeyCabinet.GetKeys] = context.spawn(KeyCabinet(), "key-cabinet")
          val drawer: ActorRef[Drawer.GetWallet] = context.spawn(Drawer(), "drawer")
    
          Behaviors.receiveMessage[Command] {
            case LeaveHome(who, replyTo) =>
              context.spawn(prepareToLeaveHome(who, replyTo, keyCabinet, drawer), s"leaving-$who")
              Behaviors.same
          }
        }
      }
    
      // 管家Actor
      def prepareToLeaveHome(whoIsLeaving: String, replyTo: ActorRef[ReadyToLeaveHome],
                             keyCabinet: ActorRef[KeyCabinet.GetKeys], drawer: ActorRef[Drawer.GetWallet]): Behavior[NotUsed] = {
        Behaviors.setup[AnyRef] { context =>
          var wallet: Option[Wallet] = None
          var keys: Option[Keys] = None
    
          keyCabinet ! KeyCabinet.GetKeys(whoIsLeaving, context.self.narrow[Keys])
          drawer ! Drawer.GetWallet(whoIsLeaving, context.self.narrow[Wallet])
    
          Behaviors.receiveMessage {
            case w: Wallet =>
              wallet = Some(w)
              nextBehavior()
            case k: Keys =>
              keys = Some(k)
              nextBehavior()
            case _ =>
              Behaviors.unhandled
          }
    
          def nextBehavior(): Behavior[AnyRef] = (keys, wallet) match {
            case (Some(w), Some(k)) =>
              // 已取得全部結果
              replyTo ! ReadyToLeaveHome(whoIsLeaving, w, k)
              Behaviors.stopped
            case _ =>
              Behaviors.same
          }
        }.narrow[NotUsed]
      }
    }
    適用場景
    • 當須要的結果來自於數個Actor的響應彙總時;
    • 爲保證至少送達一次而設計重試功能時(委託子Actor反覆重試,直到獲取結果)。
    缺點
    • 因爲子Actor是隨管家Actor的中止而中止的,所以要切實防止資源泄漏;
    • 增長了實現的複雜度。

    ⭕ 通常意義上的響應聚合器

    本模式很是相似每會話子Actor模式,由聚合器負責收集子Actor迴應的信息,再反饋給委託人Actor。

    akka

    實現要點

    實現與Per Session Child Actor近似,只是在具體代碼上更具通用性而已。其中,context.spawnAnonymous是起聯結做用的重要步驟。它不只負責孵化聚合器,還要提早準備向子Actor發出Request的閉包,以及將子Actor回覆轉換爲統一的格式的映射閉包。聚合器被啓動後,即開始收集子Actor的回覆,收集完成時即了結止。

    // 容許子Actor有不一樣的協議,沒必要向Aggregator妥協
    object Hotel1 {
      final case class RequestQuote(replyTo: ActorRef[Quote])
      final case class Quote(hotel: String, price: BigDecimal)
    }
    
    object Hotel2 {
      final case class RequestPrice(replyTo: ActorRef[Price])
      final case class Price(hotel: String, price: BigDecimal)
    }
    
    object HotelCustomer {
      sealed trait Command
      final case class AggregatedQuotes(quotes: List[Quote]) extends Command
    
      // 將子Actor的回覆封裝成統一的格式
      final case class Quote(hotel: String, price: BigDecimal)
    
      def apply(hotel1: ActorRef[Hotel1.RequestQuote], hotel2: ActorRef[Hotel2.RequestPrice]): Behavior[Command] = {
        Behaviors.setup[Command] { context =>
          context.spawnAnonymous(
            // 這個傳遞給聚合器工廠的sendRequests是銜接聚合器及其委託人的關鍵
            Aggregator[Reply, AggregatedQuotes](
              sendRequests = { replyTo =>
                hotel1 ! Hotel1.RequestQuote(replyTo)
                hotel2 ! Hotel2.RequestPrice(replyTo)
              },
              expectedReplies = 2,
              context.self,
              aggregateReplies = replies =>
                AggregatedQuotes(
                  replies
                    .map {
                      case Hotel1.Quote(hotel, price) => Quote(hotel, price)
                      case Hotel2.Price(hotel, price) => Quote(hotel, price)
                    }
                    .sortBy(_.price)
                    .toList),
              timeout = 5.seconds))
    
          Behaviors.receiveMessage {
            case AggregatedQuotes(quotes) =>
              context.log.info("Best {}", quotes.headOption.getOrElse("Quote N/A"))
              Behaviors.same
          }
        }
      }
    }
    
    object Aggregator {
      // 用來兼容不一樣子Actor響應而定義的回覆類型
      type Reply = Any
    
      sealed trait Command
      private case object ReceiveTimeout extends Command
      private case class WrappedReply[R](reply: R) extends Command
    
      def apply[Reply: ClassTag, Aggregate](
                                             sendRequests: ActorRef[Reply] => Unit,
                                             expectedReplies: Int,
                                             replyTo: ActorRef[Aggregate],
                                             aggregateReplies: immutable.IndexedSeq[Reply] => Aggregate,
                                             timeout: FiniteDuration): Behavior[Command] = {
        Behaviors.setup { context =>
          context.setReceiveTimeout(timeout, ReceiveTimeout)
          val replyAdapter = context.messageAdapter[Reply](WrappedReply(_))
    
          // 向子Actor發出Request並蒐集整理回覆信息
          sendRequests(replyAdapter)
    
          def collecting(replies: immutable.IndexedSeq[Reply]): Behavior[Command] = {
            Behaviors.receiveMessage {
              case WrappedReply(reply: Reply) =>
                val newReplies = replies :+ reply
                if (newReplies.size == expectedReplies) {
                  val result = aggregateReplies(newReplies)
                  replyTo ! result
                  Behaviors.stopped
                } else
                collecting(newReplies)
    
              case ReceiveTimeout =>
                val aggregate = aggregateReplies(replies)
                replyTo ! aggregate
                Behaviors.stopped
            }
          }
    
        collecting(Vector.empty)
        }
      }
    }
    適用場景
    • 當須要以相同的方式,從分佈多處的多個Actor獲取信息,並以統一方式回覆時;
    • 當須要聚合多個回覆結果時;
    • 爲保證至少送達一次而設計重試功能時。
    缺點
    • 越是通用的消息類型,在運行時越缺乏約束;
    • 子Actor可能形成資源泄漏;
    • 增長了實現複雜度。

    ⭕ 延遲掐尾器 (Latency tail chopping)

    這是聚合器模式的一種變形。相似於集羣條件下,每一個Actor承擔着一樣的工做職責,當其中某個Actor未定期響應時,將工做從這個遲延的Actor手裏交給另外一個Actor負責。

    akka

    實現要點

    💀 這個例子不夠完整,還須要進一步理解,好比爲何sendRequests須要一個Int參數,若是換做OO風格如何實現。

    參考文獻 🔗 Achieving Rapid Response Times in Large Online Services

    • 使用Behaviors.withTimers設置若干個定時器,由定時器負責向子Actor發出Request。
    • 設置2個超時,其中請求超時是單個Actor完成工做的時限,到期未完成就交出工做;另外一個是最遲交付超時,是整個工做完成的時限,到期則說明沒法交付委託人的工做。
    • 利用sendRequest函數(類型爲(Int, ActorRef[Reply]) => Boolean)聯結掐尾器和具體承擔工做的Actor。若是sendRequest成功,說明請求已經發送給承擔工做的子Actor,那麼就調度一條由請求超時限定的單個Request的消息,不然就調度一條由最遲交付超時限定的消息。
    object TailChopping {
      sealed trait Command
      private case object RequestTimeout extends Command
      private case object FinalTimeout extends Command
      private case class WrappedReply[R](reply: R) extends Command
    
      def apply[Reply: ClassTag](
          sendRequest: (Int, ActorRef[Reply]) => Boolean,
          nextRequestAfter: FiniteDuration,
          replyTo: ActorRef[Reply],
          finalTimeout: FiniteDuration,
          timeoutReply: Reply): Behavior[Command] = {
        Behaviors.setup { context =>
          Behaviors.withTimers { timers =>
            val replyAdapter = context.messageAdapter[Reply](WrappedReply(_))
            sendNextRequest(1)
    
            def waiting(requestCount: Int): Behavior[Command] = {
              Behaviors.receiveMessage {
                case WrappedReply(reply: Reply) =>
                  replyTo ! reply
                  Behaviors.stopped
    
                // 單個任務沒能按時完成,另外找人
                case RequestTimeout =>
                  sendNextRequest(requestCount + 1)
    
                // 整個工做交付不了,抱歉
                case FinalTimeout =>
                  replyTo ! timeoutReply
                  Behaviors.stopped
              }
            }
    
            def sendNextRequest(requestCount: Int): Behavior[Command] = {
              if (sendRequest(requestCount, replyAdapter)) {
                timers.startSingleTimer(RequestTimeout, nextRequestAfter)
              } else {
                timers.startSingleTimer(FinalTimeout, finalTimeout)
              }
              waiting(requestCount)
            }
          }
        }
      }
    }
    適用場景
    • 當須要快速響應而必須下降沒必要要的延遲時;
    • 當工做老是一味重複的內容時。
    缺點
    • 由於引入了更多的消息而且要重複屢次一樣的工做,因此增長了整個系統的負擔;
    • 工做的內容必須是冪等和可重複的,不然沒法轉交;
    • 越是通用的消息類型,在運行時越缺乏約束;
    • 子Actor可能形成資源泄漏。

    ⭕ 調度消息給本身

    使用定時器,在指定時限到期時給本身發送一條指定的消息。

    akka

    實現要點
    • 使用Behaviors.withTimers爲Actor綁定TimerScheduler,該調度器將一樣適用於Behaviors的setup、receive、receiveMessage等工廠方法建立的行爲。
    • 在timers.startSingleTimer定義並啓動定時器,在startSingleTimer設定的超時到期時將會收到預設的消息。
    object Buncher {
      sealed trait Command
      final case class ExcitingMessage(message: String) extends Command
      final case class Batch(messages: Vector[Command])
      private case object Timeout extends Command
      private case object TimerKey
    
      def apply(target: ActorRef[Batch], after: FiniteDuration, maxSize: Int): Behavior[Command] = {
        Behaviors.withTimers(timers => new Buncher(timers, target, after, maxSize).idle())
      }
    }
    
    class Buncher(
        timers: TimerScheduler[Buncher.Command],
        target: ActorRef[Buncher.Batch],
        after: FiniteDuration,
        maxSize: Int) {
    
      private def idle(): Behavior[Command] = {
        Behaviors.receiveMessage[Command] { message =>
          timers.startSingleTimer(TimerKey, Timeout, after)
          active(Vector(message))
        }
      }
    
      def active(buffer: Vector[Command]): Behavior[Command] = {
        Behaviors.receiveMessage[Command] {
          // 收到定時器發來的Timeout消息,緩衝區buffer中止接收,將結果回覆給target。
          case Timeout =>
            target ! Batch(buffer)
            idle()
    
          // 時限到達前,新建緩衝區並把消息存入,直到緩衝區滿
          case m =>
            val newBuffer = buffer :+ m
            if (newBuffer.size == maxSize) {
              timers.cancel(TimerKey)
              target ! Batch(newBuffer)
              idle()
            } else
              active(newBuffer)
        }
      }
    }
    注意事項
    • 每一個定時器都有一個Key,若是啓動了具備相同Key的新定時器,則前一個定時器將被取消cancel,而且保證即使舊定時器的到期消息已經放入Mailbox,也不會再觸發(💀 定時器的Key能夠自定義嗎?舊定時器的到期消息是被框架主動過濾掉的嗎?)。
    • 定時器有周期性(PeriodicTimer)和一次性(SingleTimer)兩種,它們的參數形式都同樣:定時器鍵TimerKey、調度消息Message和時長Duration。區別在於最後一個參數對應週期時長或是超時時長。(⚡ 根據JAPI文檔,PeriodicTimer已經做廢,取而代之的是指定發送頻率的startTimerAtFixedRate或者指定兩次消息發送間隔時長的startTimerWithFixedDelay,區別參見下文調度週期的說明。​)
    • TimerScheduler自己是可變的,由於它要執行和管理諸如註冊計劃任務等反作用。(💀 因此不是線程安全的?)
    • TimerScheduler與其所屬的Actor同生命週期。
    • Behaviors.withTimers也能夠在Behaviors.supervise內部使用。當Actor重啓時,它將自動取消舊的定時器,並確保新定時器不會收到舊定時器的預設到期消息。
    關於調度週期的特別說明

    調度週期有兩種:一種是FixedDelay:指定先後兩次消息發送的時間間隔;一種是FixedRate:指定兩次任務執行的時間間隔。若是實難選擇,建議使用FixedDelay。(❗ 此處Task等價於一次消息處理過程,可見對Akka裏的各類術語還需進一步規範。)

    區別主要在於:Delay不會補償兩次消息間隔之間因各類緣由致使的延誤,先後兩條消息的間隔時間是固定的,而不會關心前一條消息是什麼時候才交付處理的;而Rate會對這之間的延誤進行補償,後一條消息發出的時間會根據前一條消息交付處理的時間而肯定。(💀 換句話說,Delay以發出時間計,Rate以開始處理的時間計。)

    長遠來看,Delay方式下的消息處理的頻率一般會略低於指定延遲的倒數,因此更適合短頻快的工做;Rate方式下的消息處理頻率剛好是指定間隔的倒數,因此適合注重完整執行次數的工做。

    ⚠️ 在Rate方式下,若是任務延遲超出了預設的時間間隔,則將在前一條消息以後當即發送下一條消息。好比scheduleAtFixedRate的間隔爲1秒,而消息處理過程因長時間暫停垃圾回收等緣由形成JVM被掛起30秒鐘,則ActorSystem將快速地連續發送30條消息進行追趕,從而形成短期內的消息爆發,因此通常狀況下Delay方式更被推崇。

    ⭕ 響應集羣條件下分片後的Actor

    在集羣條件下,一般採用的在Request中傳遞本Shard Actor之ActorRef的方法仍舊適用。但若是該Actor在發出Request後被移動或鈍化(指Actor暫時地關閉本身以節約內存,須要時再重啓),則回覆的Response將會所有發至Dead Letters。此時,引入EntityId做爲標識,取代ActorRef以解決之(參見RMP-68)。缺點是沒法再使用消息適配器。

    ⚠️ RMP-77:Actor的內部狀態不會隨Actor對象遷移,因此須要相應持久化機制來恢復Actor對象的狀態。

    akka

    實現要點

    把一般設計中的ActorRef換成EntityId,再使用TypeKey和EntityId定位Actor的引用便可。

    object CounterConsumer {
      sealed trait Command
      final case class NewCount(count: Long) extends Command
      val TypeKey: EntityTypeKey[Command] = EntityTypeKey[Command]("example-sharded-response")
    }
    
    object Counter {
      trait Command
      case object Increment extends Command
      final case class GetValue(replyToEntityId: String) extends Command
      val TypeKey: EntityTypeKey[Command] = EntityTypeKey[Command]("example-sharded-counter")
    
      private def apply(): Behavior[Command] = Behaviors.setup { context =>
          counter(ClusterSharding(context.system), 0)
      }
    
      private def counter(sharding: ClusterSharding, value: Long): Behavior[Command] = Behaviors.receiveMessage {
          case Increment =>
            counter(sharding, value + 1)
          case GetValue(replyToEntityId) =>
            val replyToEntityRef = sharding.entityRefFor(CounterConsumer.TypeKey, replyToEntityId)
            replyToEntityRef ! CounterConsumer.NewCount(value)
            Behaviors.same
      }
    }

    容錯能力

    默認狀況下,當Actor在初始化或處理消息時觸發了異常、失敗,則該Actor將被中止(⚠️ 傳統Akka默認是重啓Actor)。

    要區別校驗錯誤與失敗:校驗錯誤Validate Error意味着發給Actor的Command自己就是無效的,因此將其界定爲Protocol規範的內容,由發件人嚴格遵照,這遠甚過收件人發現收到的是無效Command後直接拋出異常。失敗Failure則是因爲Actor不可控的外因致使的,這一般沒法成爲雙方Protocol的一部分,發件人對此也無能爲力。

    發生失敗時,一般採起「就讓它崩」的原則。其思路在於,與其花費心思零敲碎打地在局部進行細粒度的修復和內部狀態糾正,不如就讓它崩潰中止,而後利用已有的災備方案,重建一個確定有效的新Actor從新來過。

    監管 Supervise

    監管就是一個放置災備方案的好地方。默認監視策略是在引起異常時中止Actor,若是要自定義此策略,則應在spawn子Actor時,使用Behaviors.supervise進行指定。

    策略有許多可選參數,也能夠象下面這樣進行嵌套,以應對不一樣的異常類型。

    Behaviors.supervise(
      Behaviors.supervise(behavior)
        .onFailure[IllegalStateException](SupervisorStrategy.restart))
      .onFailure[IllegalArgumentException](SupervisorStrategy.stop)

    ⚠️ 若Actor被重啓,則傳遞給Behaviors.supervise的Behavior內定義的可變狀態就須要在相似Behaviors.setup這樣的工廠方法中進行初始化。若採用OO風格,則推薦在setup中完成初始化;若採用FP風格,因爲一般不存在函數內的可變量,因此無需如此。

    🔗 完整列表參見API指南:SupervisorStrategy

    子Actor在父Actor重啓時中止

    第二個放置災備的地方是Behaviors.setup裏。由於當父Actor重啓時,其Behaviors.setup會再次執行。同時,子Actor會隨父Actor重啓而中止運行,以防止資源泄漏等問題發生。

    注意區別如下兩種方式:

    ⭕ 方式一:由supervise包裹setup

    這種方式下,每當父Actor重啓時,就會徹底重構一次子Actor,從而老是回到父Actor剛建立時候的樣子。

    def child(size: Long): Behavior[String] =
      Behaviors.receiveMessage(msg => child(size + msg.length))
    
    def parent: Behavior[String] = {
      Behaviors
        .supervise[String] {
          // setup被supervise包裹,意味着每次父Actor重啓,該setup必被從新執行
          Behaviors.setup { ctx =>
            val child1 = ctx.spawn(child(0), "child1")
            val child2 = ctx.spawn(child(0), "child2")
    
            Behaviors.receiveMessage[String] { msg =>
              val parts = msg.split(" ")
              child1 ! parts(0)
              child2 ! parts(1)
              Behaviors.same
            }
          }
        }
        .onFailure(SupervisorStrategy.restart)
    }
    ⭕ 方式二:由setup包裹supervise

    這種方式下,子Actor不會受到父Actor的重啓影響,它們既不會中止,更不會被重建。

    def parent2: Behavior[String] = {
      Behaviors.setup { ctx =>
        // 此setup只會在父Actor建立時運行一次
        val child1 = ctx.spawn(child(0), "child1")
        val child2 = ctx.spawn(child(0), "child2")
    
        Behaviors
          .supervise {
            // 在父Actor重啓時,只有這段receiveMessage工廠會被執行
            Behaviors.receiveMessage[String] { msg =>
              val parts = msg.split(" ")
              child1 ! parts(0)
              child2 ! parts(1)
              Behaviors.same
            }
          }
          // 參數false決定了父Actor重啓時不會中止子Actor
          .onFailure(SupervisorStrategy.restart.withStopChildren(false))
      }
    }

    PreRestart信號

    第三個放置災備方案的地方是在PreRestart信號處理過程裏。和以前提過的PostStop信號同樣,Actor因監測而重啓前,會收到一個信號PreRestart信號,方便Actor自身在重啓前完成清理掃尾工做。

    💀 RMP-47的對傳統Akka的描述適用於Akka Typed嗎?

    • PreStart:在Actor啓動前觸發
    • PostStop:在Actor中止後觸發
    • PreRestart:在重啓Actor前觸發,完成任務後會觸發PostStop
    • PostRestart:在Actor重啓後觸發,完成任務後會觸發PreStart

    把異常當水泡同樣順着遺傳樹往上傳遞

    在傳統Akka裏,子Actor觸發的異常將被上交給父Actor,由後者決定如何處置。而在Akka Typed裏,提供了更豐富的手段處理這種狀況。

    方法就是由父Actor觀察(watch)子Actor,這樣當子Actor因失敗而中止時,父Actor將會收到附上緣由的ChildFailed信號。特別地,ChildFailed信號派生自Terminated,因此若是業務上不須要刻意區分的話,處理Terminated信號便可。

    在子Actor觸發異常後,若是它的祖先Actor(不只僅是父親)沒有處理Terminated信號,那麼將會觸發akka.actor.typed.DeathPactException異常。

    📎 示例裏用Boss -> MiddleManagement -> Work這樣的層級進行了演示。當Boss發出Fail消息後,MiddleManagement將消息轉發給Work,Work收到Fail消息後拋出異常。因MiddleManagement和Boss均未對Terminated信號進行處理,所以相繼中止。隨後Boss按預約策略重啓,並順次重建MiddleManagement和Work,從而確保測試腳本嘗試在等候200毫秒後從新發送消息Hello成功。

    發現Actor

    除了經過建立Actor得到其引用外,還能夠經過接線員Receptionist獲取Actor的引用。

    Receptionist採用了註冊會員制,註冊過程還是基於Akka Protocol。在Receptionist上註冊後的會員都持有key,方便集羣上的其餘Actor經過key找到它。當發出Find請求後,Receptionist會回覆一個Listing,其中將包括一個由若干符合條件的Actor組成的集合。(⚠️ 同一個key能夠對應多個Actor)

    由Receptionist維護的註冊表是動態的,其中的Actor可能因其中止運行、手動從表中註銷或是節點從集羣中刪除而從表中消失。若是須要關注這種動態變化,能夠使用Receptionist.Subscribe(keyOfActor, replyTo)訂閱關注的Actor,Receptionist會在註冊表變化時將Listing消息發送給replyTo。

    ⚠️ 切記:上述操做均是基於異步消息的,因此操做不是即時產生結果的。可能發出註銷請求了,但Actor還在註冊表裏。

    要點:

    • ServiceKey[Message]("name")建立Key
    • context.system.receptionist ! Receptionist.Register(key, replyTo)註冊Actor,用Deregister註銷
    • context.system.receptionist ! Receptionist.Subscribe(key, replyTo)訂閱註冊表變更事件
    • context.system.receptionist ! Receptionist.Find(key, messageAdapter)查找指定key對應的若干Actor

    集羣的接線員

    在集羣條件下,一個Actor註冊到本地節點的接線員後,其餘節點上的接線員也會經過分佈式數據廣播獲悉,從而保證全部節點都能經過ServiceKey找到相同的Actor們。

    但須要注意集羣條件下與本地環境之間的差異:一是在集羣條件下進行的Subscription與Find將只能獲得可達Actor的集合。若是須要得到全部的已註冊Actor(包括不可達的Actor),則得經過Listing.allServiceInstances得到。二是在集羣內各節點之間傳遞的消息,都須要通過序列化。

    接線員的可擴展性

    接線員沒法擴展到任意數量、也達不到異常高吞吐的接轉要求,它一般最多就支持數千至上萬的接轉量。因此,若是應用確實須要超過Akka框架所能提供的接轉服務水平的,就得本身去解決各節點Actor初始化鏈接的難題。

    路由 Route

    儘管Actor在任意時刻只能處理一條消息,但這不併妨礙同時有多個Actor處理同一條消息,這即是Akka的路由功能使然。

    路由器自己也是一種Actor,但主要職責是轉發消息而不是處理消息。與傳統Akka同樣,Akka Typed的路由也分爲兩種:池路由池與組路由。

    ⭕ 池路由

    在池路由方式下,由Router負責構建並管理全部的Routee。當這些做爲子actor的Routee終止時,Router將會把它從Router中移除。當全部的Routee都移除後,Router自己中止運行。

    示例要點
    • 使用val pool = Routers.pool(poolSize = 4)(Behaviors.supervise(Worker()).onFailure[Exception](SupervisorStrategy.restart))定義池路由,其中監管策略應是必不可少的內容,被監管的Worker()便是Routee,poolSize則是池中最多能建立並管理的Routee數目。
    • 接着用val router = ctx.spawn(pool, "worker-pool")建立路由器自己。
    • 以後即可以向路由器router發送消息了。
    • 最終,消息將被路由給全部的routee(此處將有4個Worker的實例負責處理消息)。
    • Behaviors.monitor(monitor, behaviorOfMonitee):將被監測的Monitee收到新消息的同時,將該消息抄送給監測者Monitor

    因爲Router自己也是Actor,Routee是其子Actor,所以能夠指定其消息分發器。(💀 Router中以with開頭的API還有很多,須要仔細參考API文檔。)

    // 指定Routee使用默認的Blocking IO消息分發器
    val blockingPool = pool.withRouteeProps(routeeProps = DispatcherSelector.blocking())
    // 指定Router使用與其父Actor一致的消息分發器
    val blockingRouter = ctx.spawn(blockingPool, "blocking-pool", DispatcherSelector.sameAsParent())
    // 使用輪循策略分發消息,保證每一個Routee都儘可能得到一樣數量的任務,這是池路由默認策略
    // 示例將得到a-b-a-b順序的日誌
    val alternativePool = pool.withPoolSize(2).withRoundRobinRouting()

    📌 在學習Akka Typed的過程當中,應引發重視和警醒的是,不能象傳統Akka同樣執着於定義Actor的Class或Object自己,而應該牢牢圍繞Behavior來思考、認識和設計系統。

    在Akka Typed的世界裏,包括Behaviors各式工廠在內的許多API均是以Behavior爲核心進行設計的。而Behavior又與特定類型的Message綁定,這便意味着Behavior與Protocol進行了綁定,因而消息Message及處理消息的Behavior[Message]便構成了完整的Protocol。

    ⭕ 組路由

    與池路由不一樣的是,組路由方式下的Routee均由外界其它Actor產生(自行建立、自行管理),Router只是負責將其編組在一塊兒。

    組路由基於ServiceKey和Receptionist,管理着屬於同一個key的若干個Routee。雖然這種方式下對Routee構建和監控將更靈活和便捷,但也意味着組路由將徹底依賴Receptionist維護的註冊表才能工做。在Router啓動之初,當註冊表仍是空白時,發來的消息將做爲akka.actor.Dropped扔到事件流中。當註冊表中註冊有Routee後,若其可達,則消息將順利送達,不然該Routee將被標記爲不可達。

    路由策略

    • 輪循策略 Round Robin

      輪循策略將公平調度各Routee,平均分配任務,因此適合於Routee數目不會常常變化的場合,是池路由的默認策略。它有一個可選的參數preferLocalRoutees,爲true時將強制只使用本地的Routee(默認值爲false)。

    • 隨機策略 Random

      隨機策略將隨機選取Routee分配任務,適合Routee數目可能會變化的場合,是組路由的默認策略。它一樣有可靠參數preferLocalRoutees

    • 一致的散列策略 Consistent Hashing

      散列策略將基於一張以傳入消息爲鍵的映射表選擇Routee。

      🔗 參考文獻:Consistent Hashing

      💀 該文只展現瞭如何設計一個ConsistentHash[T]類,並提供add/remove/get等API函數,卻沒講怎麼使用它,因此須要完整示例!

    關於性能

    若是把Routee看做CPU的核心,那天然是多多益善。但因爲Router自己也是一個Actor,因此其Mailbox的承載能力反而會成爲整個路由器的瓶頸,而Akka Typed並未就此提供額外方案,所以遇到須要更高吞吐量的場合則須要本身去解決。

    暫存 Stash

    Stash(暫存),是指Actor將當前Behavior暫時還不能處理的消息所有或部分緩存起來,等完成初始化等準備工做或是處理完上一條冪等消息後,再切換至匹配的Behavior,從緩衝區取出消息進行處理的過程。

    示例要點

    trait DB {
      def save(id: String, value: String): Future[Done]
      def load(id: String): Future[String]
    }
    
    object DataAccess {
      sealed trait Command
      final case class Save(value: String, replyTo: ActorRef[Done]) extends Command
      final case class Get(replyTo: ActorRef[String]) extends Command
      private final case class InitialState(value: String) extends Command
      private case object SaveSuccess extends Command
      private final case class DBError(cause: Throwable) extends Command
    
      // 使用Behaviors.withStash(capacity)設置Stash容量
      // 隨後切換到初始Behavior start()
      def apply(id: String, db: DB): Behavior[Command] = {
        Behaviors.withStash(100) { buffer =>
          Behaviors.setup[Command] { context =>
            new DataAccess(context, buffer, id, db).start()
          }
        }
      }
    }
    
    // 大量使用context.pipeToSelf進行Future交互
    class DataAccess(
        context: ActorContext[DataAccess.Command],
        buffer: StashBuffer[DataAccess.Command],
        id: String,
        db: DB) {
      import DataAccess._
    
      private def start(): Behavior[Command] = {
        context.pipeToSelf(db.load(id)) {
          case Success(value) => InitialState(value)
          case Failure(cause) => DBError(cause)
        }
    
        Behaviors.receiveMessage {
          case InitialState(value) =>
            // 完成初始化,轉至Behavior active()開始處理消息
            buffer.unstashAll(active(value))
          case DBError(cause) =>
            throw cause
          case other =>
            // 正在處理冪等消息,故暫存後續消息
            buffer.stash(other)
            Behaviors.same
        }
      }
    
      // Behaviors.receiveMessagePartial():從部分消息處理程序構造一個Behavior
      // 該行爲將把未定義的消息視爲未處理。
      private def active(state: String): Behavior[Command] = {
        Behaviors.receiveMessagePartial {
          case Get(replyTo) =>
            replyTo ! state
            Behaviors.same
    
          // 處理冪等的Save消息
          case Save(value, replyTo) =>
            context.pipeToSelf(db.save(id, value)) {
              case Success(_)     => SaveSuccess
              case Failure(cause) => DBError(cause)
            }
            // 轉至Behavior saving(),反饋冪等消息處理結果
            saving(value, replyTo)
        }
      }
    
      private def saving(state: String, replyTo: ActorRef[Done]): Behavior[Command] = {
        Behaviors.receiveMessage {
          case SaveSuccess =>
            replyTo ! Done
            // 冪等消息處理結束並已反饋結果,轉至Behavior active()開始處理下一條消息
            buffer.unstashAll(active(state))
          case DBError(cause) =>
            throw cause
          case other =>
            buffer.stash(other)
            Behaviors.same
        }
      }
    }

    注意事項

    • Stash所使用的緩衝區由Akka提供,其大小必定要在Behavior對象建立前進行設定,不然過多的消息被暫存將致使內存溢出,觸發StashOverflowException異常。因此在往緩衝區裏暫存消息前,應當使用StashBuffer.isFull提早進行檢測。
    • unstashAll()將會中止Actor響應新的消息,直到當前暫存的全部消息被處理完畢,但這有可能因長時間佔用消息處理線程而致使其餘Actor陷入飢餓狀態。爲此,可改用方法unstash(numberOfMessages),確保一次只處理有限數量的暫存消息。

    Behavior是一臺有限狀態機

    有限狀態機:當前處於狀態S,發生E事件後,執行操做A,而後狀態將轉換爲S’。

    這部份內容對應傳統Akka的FSM:Finite State Machine,可參考RMP及下文

    📎 參考示例:哲學家用餐問題,及其解析:🔗 Dining Hakkers

    object Buncher {
      // 把FSM裏驅動狀態改變的事件,都用Message代替了
      sealed trait Event
      final case class SetTarget(ref: ActorRef[Batch]) extends Event
      final case class Queue(obj: Any) extends Event
      case object Flush extends Event
      private case object Timeout extends Event
    
      // 狀態
      sealed trait Data
      case object Uninitialized extends Data
      final case class Todo(target: ActorRef[Batch], queue: immutable.Seq[Any]) extends Data
      final case class Batch(obj: immutable.Seq[Any])
    
      // 初始狀態爲Uninitialized,對應初始的Behavior爲idle()
      def apply(): Behavior[Event] = idle(Uninitialized)
    
      private def idle(data: Data): Behavior[Event] =
        Behaviors.receiveMessage[Event] {
          message: Event => (message, data) match {
            case (SetTarget(ref), Uninitialized) =>
              idle(Todo(ref, Vector.empty))
            case (Queue(obj), t @ Todo(_, v)) =>
              active(t.copy(queue = v :+ obj))
            case _ =>
              Behaviors.unhandled
          }
      }
    
      // 處於激活狀態時,對應Behavior active()
      private def active(data: Todo): Behavior[Event] =
        Behaviors.withTimers[Event] { timers =>
          // 設置超時條件
          timers.startSingleTimer(Timeout, 1.second)
          Behaviors.receiveMessagePartial {
            case Flush | Timeout =>
              data.target ! Batch(data.queue)
              idle(data.copy(queue = Vector.empty))
            case Queue(obj) =>
              active(data.copy(queue = data.queue :+ obj))
          }
      }
    }

    在Akka Typed裏,因爲Protocol和Behavior的出現,簡化了傳統Akka中有限狀態機FSM的實現。不一樣的狀態下,對應不一樣的Behavior,響應不一樣的請求,成爲Akka Typed的典型做法,這在此前的大量示例裏已經有所展現。

    協調關機 Coordinated Shutdown

    CoordinatedShutdown是一個擴展,經過提早註冊好的任務Task,能夠在系統關閉前完成一些清理掃尾工做,防止資源泄漏等問題產生。

    關閉過程當中,默認的各階段(Phase)都定義在下面這個akka.coordinated-shutdown.phases裏,各Task則後續再添加至相應的階段中。

    在application.conf配置裏,能夠經過定義不一樣的depends-on來覆蓋缺省的設置。其中,before-service-unbindbefore-cluster-shutdownbefore-actor-system-terminate是最常被覆蓋的。

    各Phase原則上按照被依賴者先於依賴者的順序執行,從而構成一個有向無環圖(Directed Acyclic Graph,DAG),最終全部Phase按DAG的拓撲順序執行。

    # CoordinatedShutdown is enabled by default and will run the tasks that
    # are added to these phases by individual Akka modules and user logic.
    #
    # The phases are ordered as a DAG by defining the dependencies between the phases
    # to make sure shutdown tasks are run in the right order.
    #
    # In general user tasks belong in the first few phases, but there may be use
    # cases where you would want to hook in new phases or register tasks later in
    # the DAG.
    #
    # Each phase is defined as a named config section with the
    # following optional properties:
    # - timeout=15s: Override the default-phase-timeout for this phase.
    # - recover=off: If the phase fails the shutdown is aborted
    #                and depending phases will not be executed.
    # - enabled=off: Skip all tasks registered in this phase. DO NOT use
    #                this to disable phases unless you are absolutely sure what the
    #                consequences are. Many of the built in tasks depend on other tasks
    #                having been executed in earlier phases and may break if those are disabled.
    # depends-on=[]: Run the phase after the given phases
    phases {
    
      # The first pre-defined phase that applications can add tasks to.
      # Note that more phases can be added in the application's
      # configuration by overriding this phase with an additional
      # depends-on.
      before-service-unbind {
      }
    
      # Stop accepting new incoming connections.
      # This is where you can register tasks that makes a server stop accepting new connections. Already
      # established connections should be allowed to continue and complete if possible.
      service-unbind {
        depends-on = [before-service-unbind]
      }
    
      # Wait for requests that are in progress to be completed.
      # This is where you register tasks that will wait for already established connections to complete, potentially
      # also first telling them that it is time to close down.
      service-requests-done {
        depends-on = [service-unbind]
      }
    
      # Final shutdown of service endpoints.
      # This is where you would add tasks that forcefully kill connections that are still around.
      service-stop {
        depends-on = [service-requests-done]
      }
    
      # Phase for custom application tasks that are to be run
      # after service shutdown and before cluster shutdown.
      before-cluster-shutdown {
        depends-on = [service-stop]
      }
    
      # Graceful shutdown of the Cluster Sharding regions.
      # This phase is not meant for users to add tasks to.
      cluster-sharding-shutdown-region {
        timeout = 10 s
        depends-on = [before-cluster-shutdown]
      }
    
      # Emit the leave command for the node that is shutting down.
      # This phase is not meant for users to add tasks to.
      cluster-leave {
        depends-on = [cluster-sharding-shutdown-region]
      }
    
      # Shutdown cluster singletons
      # This is done as late as possible to allow the shard region shutdown triggered in
      # the "cluster-sharding-shutdown-region" phase to complete before the shard coordinator is shut down.
      # This phase is not meant for users to add tasks to.
      cluster-exiting {
        timeout = 10 s
        depends-on = [cluster-leave]
      }
    
      # Wait until exiting has been completed
      # This phase is not meant for users to add tasks to.
      cluster-exiting-done {
        depends-on = [cluster-exiting]
      }
    
      # Shutdown the cluster extension
      # This phase is not meant for users to add tasks to.
      cluster-shutdown {
        depends-on = [cluster-exiting-done]
      }
    
      # Phase for custom application tasks that are to be run
      # after cluster shutdown and before ActorSystem termination.
      before-actor-system-terminate {
        depends-on = [cluster-shutdown]
      }
    
      # Last phase. See terminate-actor-system and exit-jvm above.
      # Don't add phases that depends on this phase because the
      # dispatcher and scheduler of the ActorSystem have been shutdown.
      # This phase is not meant for users to add tasks to.
      actor-system-terminate {
        timeout = 10 s
        depends-on = [before-actor-system-terminate]
      }
    }
    • 一般應在系統啓動後儘早註冊任務,不然添加得太晚的任務將不會被運行。

    • 向同一個Phase添加的任務將並行執行,沒有前後之分。

    • 下一個Phase會一般會等待上一個Phase裏的Task都執行完畢或超時後纔會啓動。能夠爲Phase配置recover = off,從而在Task失敗或超時後,停止整個系統的關機過程。

    • 一般狀況下,使用CoordinatedShutdown(system).addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "someTaskName") { ... }向Phase中添加Task,此處的名稱主要用做調試或者日誌。

    • 使用CoordinatedShutdown(system).addCancellableTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "cleanup") { () => Future { ... } }添加可取消的Task,以後能夠用c.cancel()取消Task的執行。

    • 一般狀況下,不須要Actor回覆Task已完成的消息,由於這會拖慢關機進程,直接讓Actor終止運行便可。若是要關注該Task什麼時候完成,能夠使用CoordinatedShutdown(system).addActorTerminationTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "someTaskName", someActor, Some("stop"))添加任務,而且給這個someActor發送一條消息,隨後watch該Actor的終止即可知曉Task完成狀況。

    • 使用ActorSystem.terminate()val done: Future[Done] = CoordinatedShutdown(system).run(CoordinatedShutdown.UnknownReason)能夠啓動協調關機過程,且屢次調用也只會執行一次。

    • ActorSystem會在最後一個Phase裏的Task所有執行完畢後關閉,但JVM不必定會中止,除非全部守護進程均已中止運行。經過配置akka.coordinated-shutdown.exit-jvm = on,能夠強制一併關閉JVM。

    • 在集羣條件下,當節點正在從集羣中離開或退出時,將會自動觸發協調關機。並且系統會自動添加Cluster Singleton和Cluster Sharding等正常退出羣集的任務。

    • 默認狀況下,當經過殺死SIGTERM信號(Ctrl-C對SIGINT不起做用)終止JVM進程時,CoordinatedShutdown也將運行,該默認行爲能夠經過配置akka.coordinated-shutdown.run-by-jvm-shutdown-hook=off禁用之。

    • 能夠使用CoordinatedShutdown(system).addJvmShutdownHook { ... }添加JVM Hook任務,以保證其在Akka關機前得以執行。

    • 在測試時,若是不但願啓用協調關機,能夠採用如下配置禁用之:

      # Don't terminate ActorSystem via CoordinatedShutdown in tests
      akka.coordinated-shutdown.terminate-actor-system = off
      akka.coordinated-shutdown.run-by-actor-system-terminate = off
      akka.coordinated-shutdown.run-by-jvm-shutdown-hook = off
      akka.cluster.run-coordinated-shutdown-when-down = off

    消息分發器 Dispatchers

    MessageDispatcher是Akka的心臟,是它驅動着整個ActorSystem的正常運轉,而且爲全部的Actor提供了執行上下文ExecutionContext,方便在其中執行代碼、進行Future回調等等。

    • 默認Dispatcher

      每一個ActorSystem都有一個默認的Dispatcher,能夠在akka.actor.default-dispatcher配置中細調,其默認的執行器Executor類型爲 「fork-join-executor」,這在絕大多數狀況下都能提供優越的性能,也能夠在akka.actor.default-dispatcher.executor一節中進行設置。

    • 內部專用Dispatcher

      爲保護Akka各模塊內部維護的Actor,有一個獨立的內部專用Dispatcher。它能夠在akka.actor.internal-dispatcher配置中細調,也能夠設置akka.actor.internal-dispatcher爲其餘Dispatcher名字(別名)來替換之。

    • 查找指定的Dispatcher

      Dispatcher均實現了ExecutionContext接口,因此象這樣val executionContext = context.system.dispatchers.lookup(DispatcherSelector.fromConfig("my-dispatcher"))就可加載不一樣的Dispatcher。

    • 選擇指定的Dispatcher

      // 爲新的Actor使用默認Dispatcher
      context.spawn(yourBehavior, "DefaultDispatcher")
      context.spawn(yourBehavior, "ExplicitDefaultDispatcher", DispatcherSelector.default())
      
      // 爲不支持Future的阻塞調用(好比訪問一些老式的數據庫),使用blocking Dispatcher
      context.spawn(yourBehavior, "BlockingDispatcher", DispatcherSelector.blocking())
      
      // 使用和父Actor同樣的Dispatcher
      context.spawn(yourBehavior, "ParentDispatcher", DispatcherSelector.sameAsParent())
      
      // 從配置加載指定的Dispatcher
      context.spawn(yourBehavior, "DispatcherFromConfig", DispatcherSelector.fromConfig("your-dispatcher"))
      your-dispatcher {
        type = Dispatcher
        executor = "thread-pool-executor"
        thread-pool-executor {
          fixed-pool-size = 32
        }
        throughput = 1
      }

    Dispatcher的兩種類型

    對比 Dispatcher PinnedDispatcher
    線程池 事件驅動,一組Actor共用一個線程池。 每一個Actor都擁有專屬的一個線程池,池中只有一個線程。
    能否被共享 沒有限制 不可共享
    郵箱 每一個Actor擁有一個 每一個Actor擁有一個
    適用場景 是Akka默認的Dispatcher, 支持隔板 支持隔板
    驅動 java.util.concurrent.ExecutorService驅動。使用fork-join-executor、thread-pool-executor或基於akka.dispatcher.ExecutorServiceConfigurator實現的徹底限定類名,可指定其使用的executor。 由任意的akka.dispatch.ThreadPoolExecutorConfigurator驅動,默認執行器爲thread-pool-executor

    一個Fork-Join執行器示例:

    my-dispatcher {
      # Dispatcher is the name of the event-based dispatcher
      type = Dispatcher
      # What kind of ExecutionService to use
      executor = "fork-join-executor"
      # Configuration for the fork join pool
      fork-join-executor {
        # Min number of threads to cap factor-based parallelism number to
        parallelism-min = 2
        # Parallelism (threads) ... ceil(available processors * factor)
        parallelism-factor = 2.0
        # Max number of threads to cap factor-based parallelism number to
        parallelism-max = 10
      }
      # Throughput defines the maximum number of messages to be
      # processed per actor before the thread jumps to the next actor.
      # Set to 1 for as fair as possible.
      throughput = 100
    }

    自定義Dispatcher以儘量避免阻塞

    📎 講解阻塞危害的參考視頻:Managing Blocking in Akka video,及其示例代碼:https://github.com/raboof/akka-blocking-dispatcher

    在使用默認Dispatcher的狀況下,多個Actor共用一個線程池,因此當其中一些Actor因被阻塞而佔用線程後,有可能致使可用線程耗盡,而使其餘同組的Actor陷入線程飢餓狀態。

    監測工具推薦:YourKit,VisualVM,Java Mission Control,Lightbend出品的Thread Starvation Detector等等。

    示例使用了兩個Actor做對比,在(1 to 100)的循環裏,新建的一個Actor在消息處理函數中sleep 5秒,致使同時新建的另外一個Actor沒法得到線程處理消息而卡住。

    針對上述狀況,首先可能想到的象下面這樣,用Future來封裝這樣的長時調用,但這樣的想法實際上過於簡單。由於仍舊使用了由全體Actor共用的ExecutionContext做爲Future的執行上下文,因此隨着應用程序的負載不斷增長,內存和線程都會飛快地被耗光。

    object BlockingFutureActor {
      def apply(): Behavior[Int] =
        Behaviors.setup { context =>
          implicit val executionContext: ExecutionContext = context.executionContext
    
          Behaviors.receiveMessage { i =>
            triggerFutureBlockingOperation(i)
            Behaviors.same
          }
        }
    
      def triggerFutureBlockingOperation(i: Int)(implicit ec: ExecutionContext): Future[Unit] = {
        println(s"Calling blocking Future: $i")
        Future {
          Thread.sleep(5000) //block for 5 seconds
          println(s"Blocking future finished $i")
        }
      }
    }

    正確的解決方案,是爲全部的阻塞調用提供一個獨立的Dispatcher,這種技巧被稱做「隔板 bulk-heading」或者「隔離阻塞 isolating blocking」。

    在application.conf裏對Dispatcher進行以下配置,其中thread-pool-executor.fixed-pool-size的數值可根據實際負載狀況進行微調:

    my-blocking-dispatcher {
      type = Dispatcher
      executor = "thread-pool-executor"
      thread-pool-executor {
        fixed-pool-size = 16
      }
      throughput = 1
    }

    隨後,使用該配置替換掉前述代碼第4行加載的默認Dispatcher

    implicit val executionContext: ExecutionContext = context.system.dispatchers.lookup(DispatcherSelector.fromConfig("my-blocking-dispatcher"))

    以上即是處理響應性應用程序中阻塞問題的推薦方法。對有關Akka HTTP中阻塞調用的相似討論,請參閱🔗 Handling blocking operations in Akka HTTP

    其餘一些建議:

    • 在Future中進行阻塞調用,但務必要確保任意時刻此類調用的數量上限,不然大量的此類任務將耗盡您的內存或線程。
    • 在Future中進行阻塞調用,爲線程池提供一個線程數上限,該上限要匹配運行應用程序的硬件平臺條件。
    • 專門使用一個線程來管理一組阻塞資源,例如用一個NIO選擇器來管理多個通道,並在阻塞資源觸發特定事件時做爲Actor消息進行分發調度。
    • 使用路由器來管理進行阻塞調用的Actor,並確保相應配置足夠大小的線程池。這種方案特別適用於訪問傳統數據庫這樣的單線程資源,使每一個Actor對應一個數據庫鏈接,由一個路由器進行集中管理。至於Actor的數量,則由數據庫部署平臺的硬件條件來決定。
    • 使用Akka的任務Task在application.conf中配置線程池,它,再經過ActorSystem進行實例化。

    其餘一些常見的Dispatcher配置

    • 固定的線程池大小

      blocking-io-dispatcher {
        type = Dispatcher
        executor = "thread-pool-executor"
        thread-pool-executor {
          fixed-pool-size = 32
        }
        throughput = 1
      }
    • 根據CPU核心數設置線程池大小

      my-thread-pool-dispatcher {
        # Dispatcher is the name of the event-based dispatcher
        type = Dispatcher
        # What kind of ExecutionService to use
        executor = "thread-pool-executor"
        # Configuration for the thread pool
        thread-pool-executor {
          # minimum number of threads to cap factor-based core number to
          core-pool-size-min = 2
          # No of core threads ... ceil(available processors * factor)
          core-pool-size-factor = 2.0
          # maximum number of threads to cap factor-based number to
          core-pool-size-max = 10
        }
        # Throughput defines the maximum number of messages to be
        # processed per actor before the thread jumps to the next actor.
        # Set to 1 for as fair as possible.
        throughput = 100
      }
    • PinnedDispatcher

      my-pinned-dispatcher {
        executor = "thread-pool-executor"
        type = PinnedDispatcher
      }

      因爲Actor每次得到的不必定都是同一個線程,因此當確有必要時,能夠設置thread-pool-executor.allow-core-timeout=off,以確保始終使用同一線程。

    • 設置線程關閉超時
      不管是fork-join-executor仍是thread-pool-executor,線程都將在無人使用時被關閉。若是想設置一個稍長點的時間,可進行以下調整。特別是當該Executor只是做爲執行上下文使用(好比只進行Future調用),而沒有關聯Actor時更應如此,不然默認的1秒將會致使整個線程池過分頻繁地被關閉。

      my-dispatcher-with-timeouts {
        type = Dispatcher
        executor = "thread-pool-executor"
        thread-pool-executor {
          fixed-pool-size = 16
          # Keep alive time for threads
          keep-alive-time = 60s
          # Allow core threads to time out
          allow-core-timeout = off
        }
        # How long time the dispatcher will wait for new actors until it shuts down
        shutdown-timeout = 60s
      }

    郵箱 Mailbox

    郵箱是Actor接收待處理消息的隊列,默認是沒有容量上限的。但當Actor的處理消息的速度低於消息送達的速度時,就有必要設置郵箱的容量上限了,這樣當有更多消息到達時,將被轉投至系統的DeadLetter。

    選擇特定的郵箱

    若是沒有特別指定,將使用默認的郵箱SingleConsumerOnlyUnboundedMailbox。不然在context.spawn時指定,且配置可從配置文件中動態加載。

    context.spawn(childBehavior, "bounded-mailbox-child", MailboxSelector.bounded(100))
    
    val props = MailboxSelector.fromConfig("my-app.my-special-mailbox")
    context.spawn(childBehavior, "from-config-mailbox-child", props)
    my-app {
      my-special-mailbox {
        mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
      }
    }

    Akka提供的郵箱

    • 非阻塞類型的郵箱

      郵箱 內部實現 有否上限 配置名稱
      SingleConsumerOnlyUnboundedMailbox(默認) 一個多生產者-單消費者隊列,不能與BalancingDispatcher搭配 akka.dispatch.SingleConsumerOnlyUnboundedMailbox
      UnboundedMailbox 一個java.util.concurrent.ConcurrentLinkedQueue unbounded 或 akka.dispatch.UnboundedMailbox
      NonBlockingBoundedMailbox 一個高效的多生產者-單消費者隊列 akka.dispatch.NonBlockingBoundedMailbox
      UnboundedControlAwareMailbox
      akka.dispatch.ControlMessage派生的控制消息將被優先投遞
      兩個java.util.concurrent.ConcurrentLinkedQueue akka.dispatch.UnboundedControlAwareMailbox
      UnboundedPriorityMailbox
      不保證同優先級消息的投遞順序
      一個java.util.concurrent.PriorityBlockingQueue akka.dispatch.UnboundedPriorityMailbox
      UnboundedStablePriorityMailbox
      嚴格按FIFO順序投遞同優先級消息
      一個使用akka.util.PriorityQueueStabilizer包裝的java.util.concurrent.PriorityBlockingQueue akka.dispatch.UnboundedStablePriorityMailbox
    • 阻塞類型的郵箱:若mailbox-push-timeout-time設置爲非零時將阻塞,不然不阻塞

      郵箱 內部實現 有否上限 配置名稱
      BoundedMailbox 一個java.util.concurrent.LinkedBlockingQueue bounded 或 akka.dispatch.BoundedMailbox
      BoundedPriorityMailbox
      不保證同優先級消息的投遞順序
      一個使用akka.util.BoundedBlockingQueue包裝的java.util.PriorityQueue akka.dispatch.BoundedPriorityMailbox
      BoundedStablePriorityMailbox
      嚴格按FIFO順序投遞同優先級消息
      一個使用akka.util.PriorityQueueStabilizer和akka.util.BoundedBlockingQueue包裝的java.util.PriorityQueue akka.dispatch.BoundedStablePriorityMailbox
      BoundedControlAwareMailbox
      akka.dispatch.ControlMessage派生的控制消息將被優先投遞
      兩個java.util.concurrent.ConcurrentLinkedQueue,且當塞滿時將阻塞 akka.dispatch.BoundedControlAwareMailbox

    自定義郵箱

    若是要本身實現郵箱,則須要從MailboxType派生。該類的構造函數有2個重要參數:一個是ActorSystem.Settings對象,一個是Config的節。後者須要在Dispatcher或者Mailbox的配置中,修改mailbox-type爲自定義MailboxType的徹底限定名。

    💀 標記用trait的需求映射指的是什麼?是必須的嗎?

    // Marker trait used for mailbox requirements mapping
    trait MyUnboundedMessageQueueSemantics
    
    object MyUnboundedMailbox {
      // This is the MessageQueue implementation
      class MyMessageQueue extends MessageQueue with MyUnboundedMessageQueueSemantics {
    
        private final val queue = new ConcurrentLinkedQueue[Envelope]()
    
        // these should be implemented; queue used as example
        def enqueue(receiver: ActorRef, handle: Envelope): Unit =
          queue.offer(handle)
        def dequeue(): Envelope = queue.poll()
        def numberOfMessages: Int = queue.size
        def hasMessages: Boolean = !queue.isEmpty
        def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = {
          while (hasMessages) {
            deadLetters.enqueue(owner, dequeue())
          }
        }
      }
    }
    
    // This is the Mailbox implementation
    class MyUnboundedMailbox extends MailboxType with ProducesMessageQueue[MyUnboundedMailbox.MyMessageQueue] {
    
      import MyUnboundedMailbox._
    
      // This constructor signature must exist, it will be called by Akka
      def this(settings: ActorSystem.Settings, config: Config) = {
        // put your initialization code here
        this()
      }
    
      // The create method is called to create the MessageQueue
      final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
        new MyMessageQueue()
    }

    測試

    🏭
    com.typesafe.akka:akka-actor-testkit-typed_2.13:2.6.5
    org.scalatest:scalatest_2.13:3.1.1

    測試能夠是在真實的ActorSystem上進行的異步測試,也能夠是在BehaviorTestKit工具提供的測試專用線程上進行的同步測試。

    異步測試

    • ScalaTest提供了ActorTestKit做爲真實ActorSystem的替代品,經過混入BeforeAndAfterAll,覆寫其afterAll() = testKit.shutdownTestKit(),可實現測試後關閉ActorSystem。

      經過使用一個固定的testKit實例,能夠直接spawn/stop某個Actor(能夠是匿名的Actor),並以這種方式建立臨時的Mock Actor,用以測試某個Actor的行爲是否符合預期。

    • 同時,ScalaTest提供TestProbe用於接受Actor的回覆,並附上一組probe.expectXXX對Actor的活動進行斷言。

    • 固然,更簡便的方式即是繼承ScalaTestWithActorTestKit並混入AnyFeatureSpecLike之類的trait,從而將注意力徹底集中在測試用例自己,而不用關心ActorSystem如何關閉之類的細枝末節。

    • ScalaTest的配置從application-test.conf中加載,不然將會自動加載Akka庫自帶的reference.conf配置,而不是應用程序自定義的application.conf。同時,ScalaTest支持用ConfigFactory.load()加載自定義配置文件,或用parseString()直接解決配置字符串,若再附以withFallback()將實現一次性完成配置及其後備的加載。

      ConfigFactory.parseString("""
        akka.loglevel = DEBUG
        akka.log-config-on-start = on
        """).withFallback(ConfigFactory.load())
    • 爲測試與時間線關係密切的Actor活動,ScalaTest提供了手動的定時器ManualTime,能夠象下面這樣測試指定時間點的活動:

      class ManualTimerExampleSpec
          extends ScalaTestWithActorTestKit(ManualTime.config)
          with AnyWordSpecLike
          with LogCapturing {
      
        val manualTime: ManualTime = ManualTime()
      
        "A timer" must {
          "schedule non-repeated ticks" in {
            case object Tick
            case object Tock
      
            val probe = TestProbe[Tock.type]()
            val behavior = Behaviors.withTimers[Tick.type] { timer =>
              // 10ms後纔會調度消息
              timer.startSingleTimer(Tick, 10.millis)
              Behaviors.receiveMessage { _ =>
                probe.ref ! Tock
                Behaviors.same
              }
            }
      
            spawn(behavior)
      
            // 在9ms時尚未任何消息
            manualTime.expectNoMessageFor(9.millis, probe)
      
            // 再通過2ms後,收到Tock消息
            manualTime.timePasses(2.millis)
            probe.expectMessage(Tock)
      
            // 在10ms以後再沒有消息傳來
            manualTime.expectNoMessageFor(10.seconds, probe)
          }
        }
      }
    • 爲了驗證Actor是否發出了某些日誌事件,ScalaTest提供了LoggingTestKit。

      LoggingTestKit
        .error[IllegalArgumentException]
        .withMessageRegex(".*was rejected.*expecting ascii input.*")
        .withCustom { event =>
          event.marker match {
            case Some(m) => m.getName == "validation"
            case None    => false
          }
        }
        .withOccurrences(2)
        .expect {
          ref ! Message("hellö")
          ref ! Message("hejdå")
        }
    • 爲了集中有序輸出日誌信息,ScalaTest提供了LogCapturing,把日誌和控制檯輸出信息整理在一塊兒,在測試失敗的時候才一次性輸出,方便分析錯誤緣由。具體示例參見交互模式一章。

    同步測試

    • ScalaTest提供BehaviorTestKit用於Actor的同步測試。

      val testKit = BehaviorTestKit(Hello())
      // 建立子Actor
      testKit.run(Hello.CreateChild("child"))
      testKit.expectEffect(Spawned(childActor, "child"))
      // 建立匿名的子Actor
      testKit.run(Hello.CreateAnonymousChild)
      testKit.expectEffect(SpawnedAnonymous(childActor))
      
      // 用一個InBox模擬Mailbox,方便測試收到的消息
      val inbox = TestInbox[String]()
      testKit.run(Hello.SayHello(inbox.ref))
      inbox.expectMessage("hello")
      // 測試子Actor的InBox
      testKit.run(Hello.SayHelloToChild("child"))
      val childInbox = testKit.childInbox[String]("child")
      childInbox.expectMessage("hello")
      // 測試匿名子Actor的InBox
      testKit.run(Hello.SayHelloToAnonymousChild)
      val child = testKit.expectEffectType[SpawnedAnonymous[String]]
      val childInbox = testKit.childInbox(child.ref)
      childInbox.expectMessage("hello stranger")
    • 在如下一些狀況下,不推薦使用BehaviorTestKit(將來可能會逐步改善):

      • 涉及Future及相似的帶異步回調的場景
      • 涉及定時器或消息定時調度的場景
      • 涉及EventSourcedBehavior的場景
      • 涉及必須實測的Stubbed Actor的場景
      • 黑盒測試
    • 除了Spawned和SpawnedAnonymous,BehaviorTestKit還支持如下一些Effect:

      • SpawnedAdapter
      • Stopped
      • Watched
      • WatchedWith
      • Unwatched
      • Scheduled
    • BehaviorTestKit也支持日誌驗證

      val testKit = BehaviorTestKit(Hello())
      val inbox = TestInbox[String]("Inboxer")
      testKit.run(Hello.LogAndSayHello(inbox.ref))
      testKit.logEntries() shouldBe Seq(CapturedLogEvent(Level.INFO, "Saying hello to Inboxer"))

    Akka之Classic與Typed共存

    現階段的Akka Typed的內部,實質仍是由傳統Akka實現的,但將來將會有所改變。目前兩類Akka有如下一些共存的方式:

    • Classic ActorSystem能夠建立Typed Actor
    • Typed Actor與Classic Actor能夠互發消息
    • Typed Actor與Classic Actor能夠相互創建監管或觀察關係
    • Classic Actor能夠轉換爲Typed Actor

    在導入命名空間時使用別名,以示區別:import akka.{ actor => classic }

    ⚠️ 在監管策略方面,因爲Classic默認爲重啓,而Typed爲中止,因此Akka根據Child來決定實際策略。即若是被建立的Child是Classic,則默認採起重啓策略,不然採起中止策略。

    ⭕ 從Classic到Typed

    // 導入Typed的Adapter幾乎必不可少
    import akka.actor.typed.scaladsl.adapter._
    
    val system = akka.actor.ActorSystem("ClassicToTypedSystem")
    val typedSystem: ActorSystem[Nothing] = system.toTyped
    
    val classicActor = system.actorOf(Classic.props())
    class Classic extends classic.Actor with ActorLogging {
      // context.spawn is an implicit extension method
      val second: ActorRef[Typed.Command] = context.spawn(Typed(), "second")
    
      // context.watch is an implicit extension method
      context.watch(second)
    
      // self can be used as the `replyTo` parameter here because
      // there is an implicit conversion from akka.actor.ActorRef to
      // akka.actor.typed.ActorRef
      // An equal alternative would be `self.toTyped`
      second ! Typed.Ping(self)
    
      override def receive = {
        case Typed.Pong =>
          log.info(s"$self got Pong from ${sender()}")
          // context.stop is an implicit extension method
          context.stop(second)
        case classic.Terminated(ref) =>
          log.info(s"$self observed termination of $ref")
          context.stop(self)
      }
    }

    ⭕ 從Typed到Classic

    val system = classic.ActorSystem("TypedWatchingClassic")
    val typed = system.spawn(Typed.behavior, "Typed")
    
    object Typed {
      final case class Ping(replyTo: akka.actor.typed.ActorRef[Pong.type])
      sealed trait Command
      case object Pong extends Command
    
      val behavior: Behavior[Command] =
        Behaviors.setup { context =>
          // context.actorOf is an implicit extension method
          val classic = context.actorOf(Classic.props(), "second")
    
          // context.watch is an implicit extension method
          context.watch(classic)
    
          // illustrating how to pass sender, toClassic is an implicit extension method
          classic.tell(Typed.Ping(context.self), context.self.toClassic)
    
          Behaviors
            .receivePartial[Command] {
              case (context, Pong) =>
                // it's not possible to get the sender, that must be sent in message
                // context.stop is an implicit extension method
                context.stop(classic)
                Behaviors.same
            }
            .receiveSignal {
              case (_, akka.actor.typed.Terminated(_)) =>
                Behaviors.stopped
            }
        }
    }

    函數式與面向對象風格指南

    區別 函數式編程風格 面向對象風格
    組成結構 Singleton Object Companion Object + AbstractBehavior[Message]派生類
    工廠apply() 在工廠方法裏完成Behavior定義及其餘全部工做 在Companion Object工廠方法裏採起Behaviors.setup {context => new MyActor(context)}這樣的方式構造初始化的Behavior,而後把context和其餘必要參數注入給類的構造函數,完成Behavior的連接
    Actor擴展類 沒有派生,因此只能用Behaviors.same 從AbstractBehavior[Message]派生實例,因此能夠使用this等同於Behaviors.same
    Behavior 在Singleton Object裏給Behaviors.receive這樣的工廠方法傳入一個函數(閉包)進行定義 覆寫派生類的onMessage函數
    Context Context與Message一塊兒傳入給receive 依賴Behaviors.setup等工廠方法傳遞給派生類,所以每實例對應一個context
    狀態 給工廠方法傳入參數(一般會把包括context在內的全部參數封裝成一個相似DTO的Class以適當解耦),返回帶新狀態的Behavior 在AbstractBehavior實例對象的內部維護全部的可變狀態
    推薦理由
    • 熟悉FP的方式,畢竟Akka並無引入範疇論等高深理論
    • 習慣使用不變量保存狀態,並使之能夠傳遞給下一個Behavior
    • 能實現Behavior與狀態無關
    • 用FP的風格,採起一個Behavior對應一種狀態的方式實現有限狀態機,相比OO風格更天然
    • 能下降在諸如Future等線程之間共享狀態的風險
    • 熟悉OO編碼的風格,喜歡方法method甚過函數function
    • 習慣使用變量保存狀態
    • 更容易以OO風格升級已有的傳統Akka代碼
    • 使用可變量的性能,相較使用不可變量更好

    推薦作法:

    • 不要把消息定義爲頂層Class,而應與Behavior一塊兒定義在Companion Object裏,這樣在使用時帶着對象名做爲前綴纔不會引發歧義。
    • 若是某Protocol由幾個Actor共享,那麼建議是在一個單獨的Object裏定義完整的Protocol。
    • 諸如定時器調度消息或者再包裝後的消息,一般做爲Actor的私有消息用private修飾,但它們一樣要從trait Command派生。
    • 另外一種定義私有消息的方法,是全部消息均派生自trait Message,而後插入一個派生自Message的中間trait PrivateMessage,以後再從PrivateMessage派生全部的私有消息,使用這樣的層次結構區分公有和私有消息。
    • 一般頂層的Message要定義爲sealed,以免case匹配時編譯器提示匹配項不完整的錯誤。
    • 使用AskPattern從ActorSystem外部與Actor直接進行Request-Response方式的交互時,建議使用AskPattern.ask()而不是?的中綴語法,這樣能夠最大程度保證類型安全(在Actor之間的屬於ActorContext.ask())。
    • Behaviors.setup能夠嵌套,用以加載不一樣類型的資源。習慣上也把setup放在最外層,不過要注意supervise對setup的影響。

    從傳統Akka過渡

    項目依賴的變化

    Classic Typed
    akka-actor akka-actor-typed
    akka-cluster akka-cluster-typed
    akka-cluster-sharding akka-cluster-sharding-typed
    akka-cluster-tools akka-cluster-typed
    akka-distributed-data akka-cluster-typed
    akka-persistence akka-persistence-typed
    akka-stream akka-stream-typed
    akka-testkit akka-actor-testkit-typed

    import package的變化

    Classic Typed for Scala
    akka.actor akka.actor.typed.scaladsl
    akka.cluster akka.cluster.typed
    akka.cluster.sharding akka.cluster.sharding.typed.scaladsl
    akka.persistence akka.persistence.typed.scaladsl

    ➡️ Cluster

    🏭 com.typesafe.akka:akka-cluster-typed_2.13:2.6.5

    Member狀態圖

    akka

    消息妥投 Reliable Delivery

    import akka.actor.typed.delivery._

    ⚠️ 此模塊目前仍不成熟,不建議在生產環境使用。

    確保消息至少投遞一次或剛好投遞一次,是此模塊的核心任務,但Akka框架無法自主實現,由於確認收到消息而且處理之,是屬於業務邏輯的職責,因此必須在應用程序的配合下才能徹底實現。並且,將消息妥投到目標郵箱還只是其中一個步驟(不丟失消息),確保目標Actor在消息到達前還沒有崩潰(消息能被處理)也是其中重要的一環。

    一個完整的消息妥投方案,包括髮送消息、檢測丟包、重發消息、防止過載、冪等處理等細節,這些工做絕大部分要由消費消息的一方來承擔。好比消息重發,就要由消費者發現有丟包,而後向生產者提出,限流等其餘一些工做亦是如此。Akka提供瞭如下三種模式(留意關於消息重發的細節):

    ⭕ 點對點模式 Point to Point

    點對點模式適用於2個單一Actor之間的消息妥投。

    • P:我準備舀了。
    • C:我坐好了。
    • P:舀好了,張嘴!
    • C:我吃完了,再來一口!

    akka

    • 運行時將檢查並確保Producer與ProducerController都必須是本地Actor,以保證高效率,Consumer一側亦如此。
    • 由應用程序負責使用ProducerController.RegisterConsumer或ConsumerController.RegisterToProducerController消息,創建並維護兩個Controller之間的鏈接暢通。
    • 在前一條消息被處理完並Confirmed以前,ConsumerController不會把下一條消息Delivery發給Consumer。
    • 在兩個Controller之間的消息數量將由一個ConsumerController負責的流控制窗口(flow control window)進行管理。
    • 不管是ProducerController亦或ConsumerController崩潰,全部未被Confirmed的消息都會被從新投遞(即便事實上Consumer已經處理過的消息),以確保至少投遞一次,不然消息將嚴格按Producer發出的順序投遞給Consumer。

    ⭕ 拉取模式 Worker Pulling

    Worker Pulling,是若干個Worker根據本身的消費進度,主動從一個WorkManager處拉取任務的模式。

    • P:我這有一堆活須要找人幹。
    • M:沒問題,我找人來作。
    • W(C):我來應聘。
    • M:你被錄用了!
    • W1:給我點活幹。
    • W2:也給我點活幹。
    • M:這是今天的活,大家本身分吧!
    • W1:我搶到了3份!
    • W2:我搶到了4份!」

    akka

    有新Worker加入時

    akka

    • 由Receptionist負責登記全部的Worker,由WorkPullingProducerController負責從Receptionist的Listing裏指定執行任務的Worker。
    • 在WorkPullingProducerController與Worker之間創建聯繫後,仍由ProducerController與ConsumerController負責具體的一對一投遞。

    ⭕ 分片模式 Sharding

    🏭 com.typesafe.akka:akka-cluster-sharding-typed_2.13:2.6.5

    Sharding,是在集羣進行了分片後的消息妥投模式,將由Producer與Consumer兩端的ShardingController負責總協調,由ShardingController各自的小弟Controller負責點個端點的通訊。

    • P:喂喂,SPC,我有一批特定款式的鞋須要找工廠代工。
    • SPC:好的,我在全世界找代工廠。
    • SCC1:做爲一家中國的鞋類加工連鎖企業,我OK。
    • SCC2:我是加工襯衣的,Sorry。
    • SPC:SCC1就你了,個人小弟PC稍後會直接和你聯繫。
    • PC:SCC1,訂單發給你了。
    • SCC1:PC,個人小弟CC負責這批訂單,大家2個實際幹活的直接聯繫吧。
    • CC:OK,我交給流水線C專門生產這款鞋。
    • C:我這條線生產完了,貨交給你了CC。
    • CC:PC,我按訂單交付地址把貨直接發給你了。
    • PC:SPC,貨備妥了。
    • SPC:P老闆,貨備妥了你在哪?
    • P:送過來吧。

    akka

    發送消息到另外一個Entity

    akka

    從另外一個節點上的Producer發送消息(圖中WorkPullingProducerController有誤,應爲ShardingProducerController)

    akka

    • 發送與接收方的任一端,均由本體(Producer或Consumer),Controller和ShardingController三個部件構成。其中,ShardingProducerController與ShardingConsumerController搭配,負責爲ProducerController與ConsumerController牽線搭橋,但2個ShardingController之間不須要相互註冊,而是經過EntityId找到對方。
    • 創建聯繫通道後,消息從ShardingProducerController發出,經ProducerController發往ShardingConsumerController,由ShardingConsumerController找到相應的ConsumerController,將消發給最終的Consumer。Consumer在處理完消息後,直接回復給ConsumerController,再經其發還給ProducerController,最終由ShardingProducerController回覆Producer。
    • 消息RequestNext.entitiesWithDemand屬性將指向Consumer端若干同EntityId的Actor,因此這能夠是一對多的關係。

    ⭕ 耐久的Producer

    🏭 com.typesafe.akka:akka-persistence-typed_2.13:2.6.5

    須要Producer支持消息重發,就意味着Producer得把發出去的消息保存一段時間,直到確信該消息已被處理後才刪除之,因此能暫存消息的即爲耐用的Producer。Akka爲此提供了一個DurableProducerQueue的具體實現EventSourcedProducerQueue。其中,每一個Producer必須對應一個惟一的PersistenceId。

    import akka.persistence.typed.delivery.EventSourcedProducerQueue
    import akka.persistence.typed.PersistenceId
    
    val durableQueue =
      EventSourcedProducerQueue[ImageConverter.ConversionJob](PersistenceId.ofUniqueId("ImageWorkManager"))
    val durableProducerController = context.spawn(
      WorkPullingProducerController(
        producerId = "workManager",
        workerServiceKey = ImageConverter.serviceKey,
        durableQueueBehavior = Some(durableQueue)),
      "producerController")

    ⭕ 改用Ask模式

    除了tell模式,Producer還能夠改用ask模式發出消息,此時用askNext代替requestNext,回覆將被包裝在MessageWithConfirmation裏。

    context.ask[MessageWithConfirmation[ImageConverter.ConversionJob], Done](
            next.askNextTo,
            askReplyTo => MessageWithConfirmation(ImageConverter.ConversionJob(resultId, from, to, image), askReplyTo)) {
              case Success(done) => AskReply(resultId, originalReplyTo, timeout = false)
              case Failure(_)    => AskReply(resultId, originalReplyTo, timeout = true)
          }

    序列化 Serialization

    對同處一個JVM上的不一樣Actor,消息將直接發送給對方,而對於跨JVM的消息,則須要序列化成一串二進制字節後傳出,再反序列化恢復成消息對象後接收。Akka推薦使用Jackson和Google Protocol Buffers,且使用後者用於其內部消息的序列化,但也容許使用自定義的序列化器。

    使用序列化器

    以配置方式使用序列化器

    序列化的相關配置都保存在akka.actor.serializers一節,其中指向各類akka.serialization.Serializer的實現,並使用serialization-bindings爲特定對象實例時綁定序列化器。因爲對象可能同時繼承了某個trait或者class,因此在判斷應使用哪個序列化器時,一般是找其最特化的那一個。若兩者之間沒有繼承關係,則會觸發警告。

    akka {
      actor {
        serializers {
          jackson-json = "akka.serialization.jackson.JacksonJsonSerializer"
          jackson-cbor = "akka.serialization.jackson.JacksonCborSerializer"
          proto = "akka.remote.serialization.ProtobufSerializer"
          myown = "docs.serialization.MyOwnSerializer"
        }
    
        serialization-bindings {
          "docs.serialization.JsonSerializable" = jackson-json
          "docs.serialization.CborSerializable" = jackson-cbor
          "com.google.protobuf.Message" = proto
          "docs.serialization.MyOwnSerializable" = myown
        }
      }
    }

    ⚠️ 若是待序列化的消息包含在Scala對象中,則爲了引用這些消息,須要使用標準Java類名稱。對於包含在名爲Wrapper對象中名爲Message的消息,正確的引用是Wrapper $ Message,而不是Wrapper.Message

    以編程方式使用序列化器

    完整的序列化信息包括三個部分:二進制字節串形式的有效載荷payload,序列化器的SerializerId及其適用類的清單manifest,因此它是自描述的,得以跨JVM使用。

    而在啓動ActorSystem時,序列化器由SerializationExtension負責初始化,所以序列化器自己不能從其構造函數訪問SerializationExtension,而只能在完成初始化以後遲一點才能訪問它。

    import akka.actor._
    import akka.actor.typed.scaladsl.Behaviors
    import akka.cluster.Cluster
    import akka.serialization._
    
    val system = ActorSystem("example")
    
    // Get the Serialization Extension
    val serialization = SerializationExtension(system)
    
    // Have something to serialize
    val original = "woohoo"
    
    // Turn it into bytes, and retrieve the serializerId and manifest, which are needed for deserialization
    val bytes = serialization.serialize(original).get
    val serializerId = serialization.findSerializerFor(original).identifier
    val manifest = Serializers.manifestFor(serialization.findSerializerFor(original), original)
    
    // Turn it back into an object
    val back = serialization.deserialize(bytes, serializerId, manifest).get

    自定義序列化器

    建立序列化器

    全部的序列化器均派生自akka.serialization.Serializer。

    class MyOwnSerializer extends Serializer {
      // If you need logging here, introduce a constructor that takes an ExtendedActorSystem.
      // class MyOwnSerializer(actorSystem: ExtendedActorSystem) extends Serializer
      // Get a logger using:
      // private val logger = Logging(actorSystem, this)
    
      // This is whether "fromBinary" requires a "clazz" or not
      def includeManifest: Boolean = true
    
      // Pick a unique identifier for your Serializer,
      // you've got a couple of billions to choose from,
      // 0 - 40 is reserved by Akka itself
      def identifier = 1234567
    
      // "toBinary" serializes the given object to an Array of Bytes
      def toBinary(obj: AnyRef): Array[Byte] = {
        // Put the code that serializes the object here
        //#...
        Array[Byte]()
        //#...
      }
    
      // "fromBinary" deserializes the given array,
      // using the type hint (if any, see "includeManifest" above)
      def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
        // Put your code that deserializes here
        //#...
        null
        //#...
      }
    }

    SerializerId必須是全局惟一的,該Id能夠編碼指定,也能夠在配置中指定:

    akka {
      actor {
        serialization-identifiers {
          "docs.serialization.MyOwnSerializer" = 1234567
        }
      }
    }
    使用StringManifest指定適用類

    默認狀況下,序列化器使用Class指定其適用目標,但也能夠使用字符串名稱指定,具體參見fromBinary的第2個參數:

    class MyOwnSerializer2 extends SerializerWithStringManifest {
      val CustomerManifest = "customer"
      val UserManifest = "user"
      val UTF_8 = StandardCharsets.UTF_8.name()
    
      // Pick a unique identifier for your Serializer,
      // you've got a couple of billions to choose from,
      // 0 - 40 is reserved by Akka itself
      def identifier = 1234567
    
      // The manifest (type hint) that will be provided in the fromBinary method
      // Use `""` if manifest is not needed.
      def manifest(obj: AnyRef): String =
        obj match {
          case _: Customer => CustomerManifest
          case _: User     => UserManifest
        }
    
      // "toBinary" serializes the given object to an Array of Bytes
      def toBinary(obj: AnyRef): Array[Byte] = {
        // Put the real code that serializes the object here
        obj match {
          case Customer(name) => name.getBytes(UTF_8)
          case User(name)     => name.getBytes(UTF_8)
        }
      }
    
      // "fromBinary" deserializes the given array,
      // using the type hint
      def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {
        // Put the real code that deserializes here
        manifest match {
          case CustomerManifest =>
            Customer(new String(bytes, UTF_8))
          case UserManifest =>
            User(new String(bytes, UTF_8))
        }
      }
    }
    序列化ActorRef

    ActorRef都可以使用Jackson進行序列化,但也能夠自定義實現。

    其中,要以字符串形式表示ActorRef,應藉助ActorRefResolver實現。它主要有2個方法,分別對應序列化和反序列化:

    • def toSerializationFormat[T](ref: ActorRef[T]): String
    • def resolveActorRef[T](serializedActorRef: String): ActorRef[T]
    class PingSerializer(system: ExtendedActorSystem) extends SerializerWithStringManifest {
      private val actorRefResolver = ActorRefResolver(system.toTyped)
    
      private val PingManifest = "a"
      private val PongManifest = "b"
    
      override def identifier = 41
    
      override def manifest(msg: AnyRef) = msg match {
        case _: PingService.Ping => PingManifest
        case PingService.Pong    => PongManifest
        case _ =>
          throw new IllegalArgumentException(s"Can't serialize object of type ${msg.getClass} in [${getClass.getName}]")
      }
    
      override def toBinary(msg: AnyRef) = msg match {
        case PingService.Ping(who) =>
          actorRefResolver.toSerializationFormat(who).getBytes(StandardCharsets.UTF_8)
        case PingService.Pong =>
          Array.emptyByteArray
        case _ =>
          throw new IllegalArgumentException(s"Can't serialize object of type ${msg.getClass} in [${getClass.getName}]")
      }
    
      override def fromBinary(bytes: Array[Byte], manifest: String) = {
        manifest match {
          case PingManifest =>
            val str = new String(bytes, StandardCharsets.UTF_8)
            val ref = actorRefResolver.resolveActorRef[PingService.Pong.type](str)
            PingService.Ping(ref)
          case PongManifest =>
            PingService.Pong
          case _ =>
            throw new IllegalArgumentException(s"Unknown manifest [$manifest]")
        }
      }
    }

    滾動升級 Rolling Updates

    一個消息被反序列爲消息對象,其決定因素只有3個:payload、serializerId和manifest。Akka根據Id選擇Serializer,而後Serializer根據manifest匹配fromBinary,最後fromBinary使用payload解析出消息對象。在這個過程當中,起關鍵做用的manifest並不等價於Serializer綁定的消息類型,因此一個Serializer能夠應用於多個消息類型,這就給換用新的序列化器提供了機會。主要步驟包括兩步:

    • 第一步:暫時只向akka.actor.serializers配置節中添加Serializer的定義,而不添加到akka.actor.serialization-bindings配置節中,而後執行一次滾動升級。這至關於註冊Serializer,爲切換到新的Serializer做準備。
    • 第二步:向akka.actor.serialization-bindings配置節中添加新的Serializer,而後再執行一次滾動升級。此時,舊的節點將繼續使用舊的Serializer序列化消息,而新節點將切換使用新的Serializer進行序列化,而且它也能夠反序列化舊的序列化格式。
    • 第三步(可選):徹底刪除舊的Serializer,由於新的Serializer已經能同時承擔新舊兩種版本的序列化格式。

    校驗

    爲了在本地測試時確認消息被正常地序列化與反序列化,能夠採起以下配置啓用本地消息的序列化。若是要將某個消息排除出此列,則須要繼承trait akka.actor.NoSerializationVerificationNeeded,或者在配置akka.actor.no-serialization-verification-needed-class-prefix指定類名的前綴。

    akka {
      actor {
        # 啓用本地消息序列化
        serialize-messages = on
    
        # 啓用Prop序列化
        serialize-creators = on
      }
    }

    使用Jackson進行序列化

    🏭 com.typesafe.akka:akka-serialization-jackson_2.12:2.6.6

    Jackson支持文本形式的JSON(jackson-json)和二進制形式的CBOR字節串(jackson-cbor)。

    使用前準備

    在使用Jackson進行序列化前,須要在Akka配置里加入序列化器聲明和綁定聲明,此處用的JSON格式。

    akka.actor {
      serialization-bindings {
        "com.myservice.MySerializable" = jackson-json
      }
    }

    而全部要用Jackson序列化的消息也得擴展其trait以做標識。

    // 約定的名稱是CborSerializable或者JsonSerializable,此處用MySerializable是爲了演示
    trait MySerializable
    
    final case class Message(name: String, nr: Int) extends MySerializable

    安全要求

    出於安全考慮,不能將Jackson序列化器應用到諸如java.lang.Object、java.io.Serializable、java.util.Comparable等開放類型。

    註解 Annotations

    適用於普通的多態類型 Polymorphic types

    多態類型是指可能有多種不一樣實現的類型,這就致使在反序列化時將面對多種可能的子類型。因此在使用Jackson序列化前,須要用JsonTypeInfo和JsonSubTypes進行註解說明。

    • @JsonTypeInfo用來開啓多態類型處理,它有如下幾個屬性:
      • use:定義使用哪種類型識別碼,其可選值包括:
        • JsonTypeInfo.Id.CLASS:使用徹底限定類名作識別
        • JsonTypeInfo.Id.MINIMAL_CLASS:若基類和子類在同一包類,使用類名(忽略包名)做爲識別碼
        • JsonTypeInfo.Id.NAME:一個合乎邏輯的指定名稱
        • JsonTypeInfo.Id.CUSTOM:自定義識別碼,與@JsonTypeIdResolver相對應
        • JsonTypeInfo.Id.NONE:不使用識別碼
      • include(可選):指定識別碼是如何被包含進去的,其可選值包括:
        • JsonTypeInfo.As.PROPERTY:做爲數據的兄弟屬性
        • JsonTypeInfo.As.EXISTING_PROPERTY:做爲POJO中已經存在的屬性
        • JsonTypeInfo.As.EXTERNAL_PROPERTY:做爲擴展屬性
        • JsonTypeInfo.As.WRAPPER_OBJECT:做爲一個包裝的對象
        • JsonTypeInfo.As.WRAPPER_ARRAY:做爲一個包裝的數組
      • property(可選):制定識別碼的屬性名稱。此屬性只有當use爲JsonTypeInfo.Id.CLASS(若不指定property則默認爲@class)、JsonTypeInfo.Id.MINIMAL_CLASS(若不指定property則默認爲@c)、JsonTypeInfo.Id.NAME(若不指定property默認爲@type),include爲JsonTypeInfo.As.PROPERTY、JsonTypeInfo.As.EXISTING_PROPERTY、JsonTypeInfo.As.EXTERNAL_PROPERTY時纔有效。
      • defaultImpl(可選):若是類型識別碼不存在或者無效,能夠使用該屬性來制定反序列化時使用的默認類型。
      • visible(可選):是否可見。該屬性定義了類型標識符的值是否會經過JSON流成爲反序列化器的一部分,默認爲false,即jackson會從JSON內容中處理和刪除類型標識符,再傳遞給JsonDeserializer。
    • @JsonSubTypes用來列出給定類的子類,只有當子類類型沒法被檢測到時纔會使用它,通常是配合@JsonTypeInfo在基類上使用。它的的值是一個@JsonSubTypes.Type[]數組,裏面枚舉了多態類型(value對應子類)和類型的標識符值(name對應@JsonTypeInfo中的property標識名稱的值。此爲可選值,若未指定則需由@JsonTypeName在子類上指定)。
    • @JsonTypeName做用於子類,用來爲多態子類指定類型標識符的值。

    ⚠️ 切記不能使用@JsonTypeInfo(use = Id.CLASS)ObjectMapper.enableDefaultTyping,這會給多態類型帶來安全隱患。

    final case class Zoo(primaryAttraction: Animal) extends MySerializable
    
    @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
    @JsonSubTypes(
      Array(
        new JsonSubTypes.Type(value = classOf[Lion], name = "lion"),
        new JsonSubTypes.Type(value = classOf[Elephant], name = "elephant")))
    sealed trait Animal
    
    final case class Lion(name: String) extends Animal
    
    final case class Elephant(name: String, age: Int) extends Animal
    適用於trait和case object建立的ADT

    因爲上述註解只能用於class,因此case class能夠直接使用,但case object就須要採起變通的方法,經過在case object繼承的trait上使用註解@JsonSerialize和@JsonDeserialize,再使用StdSerializer和StdDeserializer實現序列化操做便可。

    import com.fasterxml.jackson.core.JsonGenerator
    import com.fasterxml.jackson.core.JsonParser
    import com.fasterxml.jackson.databind.DeserializationContext
    import com.fasterxml.jackson.databind.SerializerProvider
    import com.fasterxml.jackson.databind.annotation.JsonDeserialize
    import com.fasterxml.jackson.databind.annotation.JsonSerialize
    import com.fasterxml.jackson.databind.deser.std.StdDeserializer
    import com.fasterxml.jackson.databind.ser.std.StdSerializer
    
    @JsonSerialize(using = classOf[DirectionJsonSerializer])
    @JsonDeserialize(using = classOf[DirectionJsonDeserializer])
    sealed trait Direction
    
    object Direction {
      case object North extends Direction
      case object East extends Direction
      case object South extends Direction
      case object West extends Direction
    }
    
    class DirectionJsonSerializer extends StdSerializer[Direction](classOf[Direction]) {
      import Direction._
    
      override def serialize(value: Direction, gen: JsonGenerator, provider: SerializerProvider): Unit = {
        val strValue = value match {
          case North => "N"
          case East  => "E"
          case South => "S"
          case West  => "W"
        }
        gen.writeString(strValue)
      }
    }
    
    class DirectionJsonDeserializer extends StdDeserializer[Direction](classOf[Direction]) {
      import Direction._
    
      override def deserialize(p: JsonParser, ctxt: DeserializationContext): Direction = {
        p.getText match {
          case "N" => North
          case "E" => East
          case "S" => South
          case "W" => West
        }
      }
    }
    
    final case class Compass(currentDirection: Direction) extends MySerializable
    適用於枚舉 Enumerations

    Jackson默認會將Scala的枚舉類型中的Value序列化爲一個JsonObject,該JsonObject包含一個「value」字段和一個「type」字段(其值是枚舉的徹底限定類名FQCN)。爲此,Jackson爲每一個字段提供了一個註解JsonScalaEnumeration,用於設定字段的類型,它將會把枚舉值序列化爲JsonString。

    trait TestMessage
    
    object Planet extends Enumeration {
      type Planet = Value
      val Mercury, Venus, Earth, Mars, Krypton = Value
    }
    
    // Uses default Jackson serialization format for Scala Enumerations
    final case class Alien(name: String, planet: Planet.Planet) extends TestMessage
    
    // Serializes planet values as a JsonString
    class PlanetType extends TypeReference[Planet.type] {}
    
    // Specifies the type of planet with @JsonScalaEnumeration
    final case class Superhero(name: String, @JsonScalaEnumeration(classOf[PlanetType]) planet: Planet.Planet) extends TestMessage

    綱要演進 Schema Evolution

    參見Event Sourced一節中的Schema Evolution

    刪除字段

    Jackson會自動忽略class中不存在的屬性,因此不須要作額外工做。

    添加字段

    若是新增的字段是可選字段,那麼該字段默認值是Option.None,不須要作額外工做。若是是必備字段,那麼須要繼承JacksonMigration並設定其默認值。示例以下:

    // Old Event
    case class ItemAdded(shoppingCartId: String, productId: String, quantity: Int) extends MySerializable
    
    // New Event: optional property discount and field note added.
    // 爲何要區分property與field?
    case class ItemAdded(shoppingCartId: String, productId: String, quantity: Int, discount: Option[Double], note: String)
        extends MySerializable {
      // alternative constructor because `note` should have default value "" when not defined in json
      @JsonCreator
      def this(shoppingCartId: String, productId: String, quantity: Int, discount: Option[Double], note: Option[String]) =
        this(shoppingCartId, productId, quantity, discount, note.getOrElse(""))
    }
    
    // New Event: mandatory field discount added.
    case class ItemAdded(shoppingCartId: String, productId: String, quantity: Int, discount: Double) extends MySerializable
    
    import com.fasterxml.jackson.databind.JsonNode
    import com.fasterxml.jackson.databind.node.DoubleNode
    import com.fasterxml.jackson.databind.node.ObjectNode
    import akka.serialization.jackson.JacksonMigration
    
    class ItemAddedMigration extends JacksonMigration {
      // 註明這是第幾個版本,以後還能夠有更新的版本
      override def currentVersion: Int = 2
    
      override def transform(fromVersion: Int, json: JsonNode): JsonNode = {
        val root = json.asInstanceOf[ObjectNode]
        if (fromVersion <= 1) {
          root.set("discount", DoubleNode.valueOf(0.0))
        }
        root
      }
    }

    ItemAddedMigration與ItemAdded的聯繫,須要在配置裏設定,下同:

    akka.serialization.jackson.migrations {
      "com.myservice.event.ItemAdded" = "com.myservice.event.ItemAddedMigration"
    }
    重命名字段
    // 將productId重命名爲itemId
    case class ItemAdded(shoppingCartId: String, itemId: String, quantity: Int) extends MySerializable
    
    import akka.serialization.jackson.JacksonMigration
    import com.fasterxml.jackson.databind.JsonNode
    import com.fasterxml.jackson.databind.node.ObjectNode
    
    class ItemAddedMigration extends JacksonMigration {
      override def currentVersion: Int = 2
    
      override def transform(fromVersion: Int, json: JsonNode): JsonNode = {
        val root = json.asInstanceOf[ObjectNode]
        if (fromVersion <= 1) {
          root.set("itemId", root.get("productId"))
          root.remove("productId")
        }
        root
      }
    }
    重定義類結構
    // Old class
    case class Customer(name: String, street: String, city: String, zipCode: String, country: String) extends MySerializable
    
    // New class
    case class Customer(name: String, shippingAddress: Address, billingAddress: Option[Address]) extends MySerializable
    
    //Address class
    case class Address(street: String, city: String, zipCode: String, country: String) extends MySerializable
    
    import akka.serialization.jackson.JacksonMigration
    import com.fasterxml.jackson.databind.JsonNode
    import com.fasterxml.jackson.databind.node.ObjectNode
    
    class CustomerMigration extends JacksonMigration {
      override def currentVersion: Int = 2
    
      override def transform(fromVersion: Int, json: JsonNode): JsonNode = {
        val root = json.asInstanceOf[ObjectNode]
        if (fromVersion <= 1) {
          val shippingAddress = root.`with`("shippingAddress")
          shippingAddress.set("street", root.get("street"))
          shippingAddress.set("city", root.get("city"))
          shippingAddress.set("zipCode", root.get("zipCode"))
          shippingAddress.set("country", root.get("country"))
          root.remove("street")
          root.remove("city")
          root.remove("zipCode")
          root.remove("country")
        }
        root
      }
    }
    重命名類
    // Old class
    case class OrderAdded(shoppingCartId: String) extends MySerializable
    
    // New class
    case class OrderPlaced(shoppingCartId: String) extends MySerializable
    
    class OrderPlacedMigration extends JacksonMigration {
      override def currentVersion: Int = 2
    
      override def transformClassName(fromVersion: Int, className: String): String = classOf[OrderPlaced].getName
    
      override def transform(fromVersion: Int, json: JsonNode): JsonNode = json
    }
    刪除特定的序列化綁定

    當某個類再也不須要序列化,而只須要反序列化時,應將其加入序列化的白名單,名單是一組類名或其前綴:

    akka.serialization.jackson.whitelist-class-prefix =
      ["com.myservice.event.OrderAdded", "com.myservice.command"]

    Jackson模塊

    Akka默認啓用瞭如下Jackson模塊:

    akka.serialization.jackson {
      # The Jackson JSON serializer will register these modules.
      jackson-modules += "akka.serialization.jackson.AkkaJacksonModule"
      # AkkaTypedJacksonModule optionally included if akka-actor-typed is in classpath
      jackson-modules += "akka.serialization.jackson.AkkaTypedJacksonModule"
      // FIXME how does that optional loading work??
      # AkkaStreamsModule optionally included if akka-streams is in classpath
      jackson-modules += "akka.serialization.jackson.AkkaStreamJacksonModule"
      jackson-modules += "com.fasterxml.jackson.module.paramnames.ParameterNamesModule"
      jackson-modules += "com.fasterxml.jackson.datatype.jdk8.Jdk8Module"
      jackson-modules += "com.fasterxml.jackson.datatype.jsr310.JavaTimeModule"
      jackson-modules += "com.fasterxml.jackson.module.scala.DefaultScalaModule"
    }
    JSON壓縮

    默認的JSON壓縮策略以下:

    # Compression settings for the jackson-json binding
    akka.serialization.jackson.jackson-json.compression {
      # Compression algorithm.
      # - off  : no compression (it will decompress payloads even it's off)
      # - gzip : using common java gzip (it's slower than lz4 generally)
      # - lz4 : using lz4-java
      algorithm = gzip
    
      # If compression is enabled with the `algorithm` setting the payload is compressed
      # when it's larger than this value.
      compress-larger-than = 32 KiB
    }
    每一個綁定關係單獨配置
    # 共有配置
    akka.serialization.jackson.jackson-json {
      serialization-features {
        WRITE_DATES_AS_TIMESTAMPS = off
      }
    }
    akka.serialization.jackson.jackson-cbor {
      serialization-features {
        WRITE_DATES_AS_TIMESTAMPS = on
      }
    }
    
    akka.actor {
      serializers {
        jackson-json-message = "akka.serialization.jackson.JacksonJsonSerializer"
        jackson-json-event   = "akka.serialization.jackson.JacksonJsonSerializer"
      }
      serialization-identifiers {
        jackson-json-message = 9001
        jackson-json-event = 9002
      }
      serialization-bindings {
        "com.myservice.MyMessage" = jackson-json-message
        "com.myservice.MyEvent" = jackson-json-event
      }
    }
    
    # 爲每一個綁定關係單獨配置
    akka.serialization.jackson {
      jackson-json-message {
        serialization-features {
          WRITE_DATES_AS_TIMESTAMPS = on
        }
      }
      jackson-json-event {
        serialization-features {
          WRITE_DATES_AS_TIMESTAMPS = off
        }
      }
    }
    使用Manifest無關的序列化

    默認狀況下,Jackson使用manifest裏的徹底限定類名進行序列化,但這比較耗費磁盤空間和IO資源,爲此能夠用type-in-manifest關閉之,使類名再也不出如今manifest裏,而後再使用deserialization-type指定便可,不然Jackson會在綁定關係裏去查找匹配的類型。

    Akka Remoting已經實現了manifest的壓縮,因此這部份內容對它沒有什麼實際效果。

    akka.actor {
      serializers {
        jackson-json-event = "akka.serialization.jackson.JacksonJsonSerializer"
      }
      serialization-identifiers {
        jackson-json-event = 9001
      }
      serialization-bindings {
        "com.myservice.MyEvent" = jackson-json-event
      }
    }
    # 因爲manifest無關的序列化一般只適用於一個類型,因此一般採起每綁定關係單獨配置的方式
    akka.serialization.jackson {
      jackson-json-event {
        type-in-manifest = off
        # Since there is exactly one serialization binding declared for this
        # serializer above, this is optional, but if there were none or many,
        # this would be mandatory.
        deserialization-type = "com.myservice.MyEvent"
      }
    }
    日期與時間格式

    WRITE_DATES_AS_TIMESTAMPSWRITE_DURATIONS_AS_TIMESTAMPS默認狀況下是被禁用的,這意味着日期與時間字段將按ISO-8601(rfc3339)標準的yyyy-MM-dd'T'HH:mm:ss.SSSZZ格式,而不是數字數組進行序列化。雖然這樣的互操做性更好,但速度較慢。因此若是不須要ISO格式便可與外部系統進行互操做,那麼能夠做以下配置,以擁有更佳的性能(反序列化不受此設置影響)。

    akka.serialization.jackson.serialization-features {
      WRITE_DATES_AS_TIMESTAMPS = on
      WRITE_DURATIONS_AS_TIMESTAMPS = on
    }

    其餘可用配置

    akka.serialization.jackson {
      # Configuration of the ObjectMapper serialization features.
      # See com.fasterxml.jackson.databind.SerializationFeature
      # Enum values corresponding to the SerializationFeature and their boolean value.
      serialization-features {
        # Date/time in ISO-8601 (rfc3339) yyyy-MM-dd'T'HH:mm:ss.SSSZ format
        # as defined by com.fasterxml.jackson.databind.util.StdDateFormat
        # For interoperability it's better to use the ISO format, i.e. WRITE_DATES_AS_TIMESTAMPS=off,
        # but WRITE_DATES_AS_TIMESTAMPS=on has better performance.
        WRITE_DATES_AS_TIMESTAMPS = off
        WRITE_DURATIONS_AS_TIMESTAMPS = off
      }
    
      # Configuration of the ObjectMapper deserialization features.
      # See com.fasterxml.jackson.databind.DeserializationFeature
      # Enum values corresponding to the DeserializationFeature and their boolean value.
      deserialization-features {
        FAIL_ON_UNKNOWN_PROPERTIES = off
      }
    
      # Configuration of the ObjectMapper mapper features.
      # See com.fasterxml.jackson.databind.MapperFeature
      # Enum values corresponding to the MapperFeature and their
      # boolean values, for example:
      #
      # mapper-features {
      #   SORT_PROPERTIES_ALPHABETICALLY = on
      # }
      mapper-features {}
    
      # Configuration of the ObjectMapper JsonParser features.
      # See com.fasterxml.jackson.core.JsonParser.Feature
      # Enum values corresponding to the JsonParser.Feature and their
      # boolean value, for example:
      #
      # json-parser-features {
      #   ALLOW_SINGLE_QUOTES = on
      # }
      json-parser-features {}
    
      # Configuration of the ObjectMapper JsonParser features.
      # See com.fasterxml.jackson.core.JsonGenerator.Feature
      # Enum values corresponding to the JsonGenerator.Feature and
      # their boolean value, for example:
      #
      # json-generator-features {
      #   WRITE_NUMBERS_AS_STRINGS = on
      # }
      json-generator-features {}
    
      # Configuration of the JsonFactory StreamReadFeature.
      # See com.fasterxml.jackson.core.StreamReadFeature
      # Enum values corresponding to the StreamReadFeatures and
      # their boolean value, for example:
      #
      # stream-read-features {
      #   STRICT_DUPLICATE_DETECTION = on
      # }
      stream-read-features {}
    
      # Configuration of the JsonFactory StreamWriteFeature.
      # See com.fasterxml.jackson.core.StreamWriteFeature
      # Enum values corresponding to the StreamWriteFeatures and
      # their boolean value, for example:
      #
      # stream-write-features {
      #   WRITE_BIGDECIMAL_AS_PLAIN = on
      # }
      stream-write-features {}
    
      # Configuration of the JsonFactory JsonReadFeature.
      # See com.fasterxml.jackson.core.json.JsonReadFeature
      # Enum values corresponding to the JsonReadFeatures and
      # their boolean value, for example:
      #
      # json-read-features {
      #   ALLOW_SINGLE_QUOTES = on
      # }
      json-read-features {}
    
      # Configuration of the JsonFactory JsonWriteFeature.
      # See com.fasterxml.jackson.core.json.JsonWriteFeature
      # Enum values corresponding to the JsonWriteFeatures and
      # their boolean value, for example:
      #
      # json-write-features {
      #   WRITE_NUMBERS_AS_STRINGS = on
      # }
      json-write-features {}
    
      # Additional classes that are allowed even if they are not defined in `serialization-bindings`.
      # This is useful when a class is not used for serialization any more and therefore removed
      # from `serialization-bindings`, but should still be possible to deserialize.
      whitelist-class-prefix = []
    
      # settings for compression of the payload
      compression {
        # Compression algorithm.
        # - off  : no compression
        # - gzip : using common java gzip
        algorithm = off
    
        # If compression is enabled with the `algorithm` setting the payload is compressed
        # when it's larger than this value.
        compress-larger-than = 0 KiB
      }
    
      # Whether the type should be written to the manifest.
      # If this is off, then either deserialization-type must be defined, or there must be exactly
      # one serialization binding declared for this serializer, and the type in that binding will be
      # used as the deserialization type. This feature will only work if that type either is a
      # concrete class, or if it is a supertype that uses Jackson polymorphism (ie, the
      # @JsonTypeInfo annotation) to store type information in the JSON itself. The intention behind
      # disabling this is to remove extraneous type information (ie, fully qualified class names) when
      # serialized objects are persisted in Akka persistence or replicated using Akka distributed
      # data. Note that Akka remoting already has manifest compression optimizations that address this,
      # so for types that just get sent over remoting, this offers no optimization.
      type-in-manifest = on
    
      # The type to use for deserialization.
      # This is only used if type-in-manifest is disabled. If set, this type will be used to
      # deserialize all messages. This is useful if the binding configuration you want to use when
      # disabling type in manifest cannot be expressed as a single type. Examples of when you might
      # use this include when changing serializers, so you don't want this serializer used for
      # serialization and you haven't declared any bindings for it, but you still want to be able to
      # deserialize messages that were serialized with this serializer, as well as situations where
      # you only want some sub types of a given Jackson polymorphic type to be serialized using this
      # serializer.
      deserialization-type = ""
    
      # Specific settings for jackson-json binding can be defined in this section to
      # override the settings in 'akka.serialization.jackson'
      jackson-json {}
    
      # Specific settings for jackson-cbor binding can be defined in this section to
      # override the settings in 'akka.serialization.jackson'
      jackson-cbor {}
    
      # Issue #28918 for compatibility with data serialized with JacksonCborSerializer in
      # Akka 2.6.4 or earlier, which was plain JSON format.
      jackson-cbor-264 = ${akka.serialization.jackson.jackson-cbor}
    }

    ➡️ Persistence

    Event Sourcing

    🏭 com.typesafe.akka:akka-persistence-typed_2.13:2.6.5

    Akka Persistence爲帶狀態的Actor提供了持久化其狀態以備崩潰後恢復的支持,其本質是持久化Actor相關的事件Event,從而在恢復時利用所有事件或階段性快照重塑(Reconstruct/Replay/Rebuild)Actor。ES在現實生活中最典型的一個例子是會計使用的複式記帳法

    📎 參考書目

    • MSDN上的 CQRS Journey

      該書以一個用C#編寫的Conference預定售票系統爲例,由淺入深地展現了實現CQRS的各個環節須要關注的重點。書中的配圖和討論很是精彩,而其中提到的Process Manager也是當下實現Saga的流行方式之一。

    • Randy Shoup所著 Events as First-Class Citizens

      文中的Stitch Fix是一家智能零售商,它經過整合零售、技術、倉儲、數據分析等資源,使用數據分析軟件和機器學習來匹配顧客的服裝定製需求,爲其挑選符合其我的風格、尺寸和偏好的服飾和配飾,提供了良好的消費體驗。

      顧客按需訂購服裝或申請每個月、每兩個月或每季度交貨。每一個盒子有五件貨物。若是顧客喜歡配送貨物,能夠選擇以標籤價購買,所有購買享受75%的折扣;若是不喜歡,則免費退貨。若是顧客沒有購買任何貨物,則需支付20美圓的設計費。Stitch Fix的平均商品單價約65美圓,公司指望在每一個盒子中,用戶可以保存2件商品。造型師是兼職,薪水爲每小時15美圓。每小時,造型師會完成4個盒子,這樣能產生較高的毛利率,以覆蓋巨大的開銷及庫存成本。

    ⚠️ 通用數據保護條例(General Data Protection Regulation,GDPR)要求,必須能根據用戶的要求刪除其我的信息。然而,在一個以Event Sourcing爲基礎的應用裏,要完全刪除或修改帶有我的信息的全部事件是很是困難的,因此改用「數據粉碎」的技術來實現。其原理是給每一個人分配一個惟一的ID,而後以該ID做爲密鑰,對其相關的全部我的數據進行加密。當須要完全刪除該用戶的信息時,直接刪除該ID,便可保證其我的數據沒法被解密,從而達到保護目的。Lightbend爲Akka Persistence提供了相應的工具,以幫助構建具備GDPR功能的系統。

    Akka Persistence提供了event sourced actor(又稱爲 persistent actor)做爲實現。這類Actor在收到Command時會先進行檢驗Validate。若是Command各項條件經過了檢驗,則使之做用於當前實體,併產生相應的事件Event,待這些Event被持久化後,以更新實體的狀態結束;不然,實體將直接拒絕Reject該Command。(💀 不應是先更新狀態,而後才持久化事件嗎?貌似先持久化再更新會更靠譜。)

    而在重塑Actor時,全部的事件將被加載,並沒有需再校驗地直接用於更新Actor的狀態,直到恢復到最新狀態。

    一個典型的EventSourcedBehavior包括ID、初始State,CommandHandler與EventHandler四個組成部分,若是須要傳入ActorContext,則在外層用Behaviors.setup傳入便可:

    import akka.persistence.typed.scaladsl.EventSourcedBehavior
    import akka.persistence.typed.PersistenceId
    
    object MyPersistentBehavior {
      sealed trait Command
      sealed trait Event
      final case class State()
    
      def apply(): Behavior[Command] =
        EventSourcedBehavior[Command, Event, State](
          // 1. 該Actor的惟一Id
          persistenceId = PersistenceId.ofUniqueId("abc"),
          // 2. 初始狀態
          emptyState = State(),
          // 3. Command Handler
          commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"),
          // 4. Event Handler
          eventHandler = (state, evt) => throw new NotImplementedError("TODO: process the event return the next state"))
    }

    PersistenceId

    PersistenceId是Event Sourced Actor在其生命週期內惟一的身份標識(想一想聚合Id)。由於Akka Cluster提供的EntityId可能爲多個不一樣類型的Actor共享,因此通常配合EntityTypeKey一塊兒組成惟一的PersistenceId。因此,PersistenceId.apply()用默認的分隔符|將entityType.name與entityId兩個字符串鏈接成所需的Id。固然,也能夠使用PersistenceId.ofUniqueId生成自定義分隔符的Id。

    即便在集羣條件下,持同一PersistanceId的Actor在任什麼時候候只能存在一個,不然就世界大亂了。固然,由於有Recovery,這個Actor能夠被分片甚至遷移到任何一個片及其節點上。

    🔗 摘選自 https://doc.akka.io/docs/akka/current/typed/cluster-sharding.html#persistence-example

    sharding.init(Entity(typeKey = HelloWorld.TypeKey) { entityContext =>
        HelloWorld(entityContext.entityId, PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId))
      })

    Command Handler

    一個CommandHandler有2個參數:當前的State、收到的Command,而後返回Effect。Effect由其工廠建立,建立動做包括:

    • persist:原子性地保存處理完Command後產生的若干Event,若保存其中一個Event時失敗則全部Event都將失敗。可是在底層的事件存儲不支持一次寫入多個事件的狀況下,CommandHandler爲拒絕一次性持久化多個事件,能夠拋出EventRejectedException(一般帶有UnsupportedOperationException),從而由父Actor進行監管處理。
    • none:什麼也不作,好比一個只包括讀操做的Query Command。
    • unhandled:代表該命令不適用於當前狀態。
    • stop:中止該Actor。
    • stash:暫存當前命令。
    • unstashAll:處理全部被Effect.stash暫存起來的命令。
    • reply:向發來命令的Actor發送一條回覆。

    在返回Effect的同時,還能夠在該Effect後接反作用SideEffect,好比Effect.persist(...).thenRun(...)。具體包括:

    • thenRun:運行某個反作用函數。
    • thenStop:中止該Actor。
    • thenUnstashAll:處理全部被Effect.stash暫存起來的命令。
    • thenReply:向發來命令的Actor發送一條回覆。

    任何SideEffect都最多隻能執行一次。若是持久化失敗,或者Actor直接重啓、中止後再啓動,都不會執行任何反作用。因此一般是響應RecoveryCompleted信號,在其中去執行須要被確認的反作用,這種狀況下,則可能會出現同一個反作用屢次執行的狀況。

    反作用都是按註冊的順序同步執行,但也不能避免由於發送消息等而致使操做的併發執行。反作用也可能在事件被持久化以前就被執行,這樣的話,即便持久化失敗致使事件未被保存,反作用也生效了。

    💀 關於翻譯:Akka用「日記」——Journal指代的Event Store,並與「日誌」Log相區別。雖然我更喜歡用「事件簿」這樣的稱謂,但一來請教了師姐說「日記」更準確,二來電影《Joker》裏作心理諮詢的社工在問Frank時也用的Journal這個詞,因而就此做罷。

    Event Handler

    一個EventHandler有2個參數:當前State,觸發的Event,而後返回新的State。

    ⚡ CommandHandler觸發並持久化事件,EventHandler處理事件並更新狀態,因此Actor的狀態實際是在EventHandler裏才真正被改變的!

    當事件Event被持久化後,EventHandler將使用它去修改做爲參數傳入的當前狀態State,從而產生新的State。至於State的具體實現,能夠是FP風格的不可變量,也能夠是OO風格的可變量,但一般都會封裝在諸如Class這樣的一個容器裏。

    不一樣於Command Handler的是,Event Handler不會產生反作用,因此它將直接用於Actor的重塑Recovery操做上。若是須要在Recovery以後作點什麼,那麼恰當的楔入點包括:CommandHandler最後建立的Effect附加的thenRun(),或者是RecoveryCompleted事件的處理函數裏。

    改變Actor的行爲

    由於不一樣的消息將觸發Actor不一樣的行爲,因此行爲也是Actor狀態的一部分。因此在Recovery時除了恢復數據,還要當心恢復其相應的行爲。儘管行爲是函數,而函數是一等公民,因此行爲理應能夠象數據同樣保存,但困難的地方在於怎麼保存編碼,所以Akka Persistence不提供Behavior的持久化。

    面對這個棘手的問題,最容易想到的辦法是根據State定義不一樣的CommandHandler,並隨State變化而切換,從而使Actor成爲一臺有限狀態機。因而,由此獲得的即是由State與Command兩級匹配構成的邏輯,利用繼承定義State的不一樣實現,而後先case State、再case Command,最後根據匹配結果將消息分發至相應的處理函數(處理函數亦相對獨立,以凸顯不一樣的邏輯分支)。而在代碼實現的結構上,就是在一個CommandHandler裏,定義若干個協助完成消息處理的private function。這些處理函數的參數由Handler在case分支裏賦與,返回類型則統一爲與CommandHandler相同的Effect[Event, State]。最後,只須要將這個CommandHandler連殼帶肉交給EventSourcedBehavior工廠便可。

    📎 更規範的方式是把Handler定義在State裏,具體參見後續的Handler設計指南

    強制回覆

    Request-Response是最多見的通訊模式之一。爲了保證Persistent Actor必定會回覆,EventSourcedBehavior推出了ReplyEffect,從而保證CommandHandler必定會發回Reply。它與Effect的惟一區別是必須用工廠Effect.replyEffect.noReplyEffect.thenReply或者Effect.thenNoReply之一建立的結果做爲返回值,而再也不是Effect,不然編譯器會提示類型不匹配的錯誤。

    爲此,在定義Command時必須包含一個replyTo屬性,同時得用EventSourcedBehavior.withEnforcedReplies(id, state, cmdHandler, evtHandler)來建立Behavior。

    序列化

    常見的序列化方案和工具也適用於Akka,推薦使用🔗 Jackson

    在序列化時,必須考慮不一樣版本事件之間的向下兼容性,參考綱要演進 Schema Evolution(💀 統一箇中文名真難。architecture 架構,pattern 模式,structure 結構,style 風格/樣式,template 模板,boilerplate 樣板,schema 綱要)

    重塑

    相比Recovery,我更喜歡Replay或者Reconstruct,使用「重塑實體」和「事件重播」在語義上也更生動。

    Akka Persistence在Actor啓動或重啓時,將自動地直接使用EventHandler進行Actor的重塑。要注意的是,不要在EventHandler中執行反作用,而應該在重塑完成後,在receiveSignal裏響應RecoveryCompleted信號,在響應程序裏執行反作用。在RecoveryCompleted信號裏帶有重塑後的當前狀態。而即便對於一個新的、尚未任何已記錄事件的Actor,在執行Recovery以後也會觸發RecoveryCompleted信號。

    因爲在重塑完成前,全部新消息將會被Stash,因此爲防止失去響應,Akka提供了最大併發的重塑數,能夠按akka.persistence.max-concurrent-recoveries = 50的方式進行配置。

    重塑過濾 Replay Filter

    在某些狀況下,事件流可能會損壞,而此時多個寫入者(即多個Persistent Actor實例)準備寫入具備相同序列號的不一樣消息,則會引起不一致的衝突。爲此,Akka Persistence提供了Replay Filter,經過消息序列號和寫入者的UUID來檢測並解決消息之間的衝突。具體配置須要寫入配置文件中的以下區段(leveldb視具體插件而不一樣):

    💀 理解不能:爲何會有多個Actor實例要寫入有相同序列號的消息?PersistenceId不應是惟一的嗎?消息序列號是什麼鬼?
    🔈 Akka Persistence使用單一寫入者原則,即任一時刻,對於任何一個特定的PersistenceId,只有一個EventSourcedBehavior能持久化事件。

    akka.persistence.journal.leveldb.replay-filter {
      mode = repair-by-discard-old
    }

    包括4種策略:

    • repair-by-discard-old:拋棄舊寫入者的事件,而且在Log裏記下此次警告Warning。而在任何狀況下,最高序列號的事件總會被重播,所以不用擔憂新的事件會打亂你已有的事件日誌。
    • fail:讓重塑直接失敗,並在Log裏記下此次錯誤Error。
    • warn:繼續發出消息,並在Log裏記下此次警告Warning。
    • off:禁用此功能。
    徹底禁用重塑和快照功能

    使用withRecovery()能夠修改重塑的策略,包括禁用自動重塑功能。固然,快照功能能夠單獨禁用,或者只選擇本身須要的那一類快照。

    EventSourcedBehavior[Command, Event, State](
      persistenceId = PersistenceId.ofUniqueId("abc"),
      emptyState = State(),
      commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"),
      eventHandler = (state, evt) => throw new NotImplementedError("TODO: process the event return the next state"))
      .withRecovery(Recovery.disabled)

    Tag標籤

    在不使用EventAdapter的狀況下,能夠直接使用withTagger爲EventSourcedBehavior中的事件打上標籤(準確說是標籤集),方便將Event根據Tag實現分組,好比屬於不一樣Actor實例但屬相同類型的全部Event,而後在Persistence Query中使用。

    EventSourcedBehavior[Command, Event, State](
      persistenceId = PersistenceId.ofUniqueId("abc"),
      emptyState = State(),
      commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"),
      eventHandler = (state, evt) => throw new NotImplementedError("TODO: process the event return the next state"))
      .withTagger(_ => Set("tag1", "tag2"))

    適配Event

    經過繼承EventAdapter[T, Wrapper]並安裝到EventSourcedBehavior,能夠自動將事件T轉換爲Wrapper,而後持久化。

    case class Wrapper[T](event: T)
    class WrapperEventAdapter[T] extends EventAdapter[T, Wrapper[T]] {
      override def toJournal(e: T): Wrapper[T] = Wrapper(e)
      override def fromJournal(p: Wrapper[T], manifest: String): EventSeq[T] = EventSeq.single(p.event)
      override def manifest(event: T): String = ""
    }
    
    EventSourcedBehavior[Command, Event, State](
      persistenceId = PersistenceId.ofUniqueId("abc"),
      emptyState = State(),
      commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"),
      eventHandler = (state, evt) => throw new NotImplementedError("TODO: process the event return the next state"))
      .eventAdapter(new WrapperEventAdapter[Event])

    處理日記失敗

    若Journal存取失敗,則EventSourcedBehavior將中止。該默認行爲能夠經過使用覆寫後的回退策略BackoffSupervisorStrategy進行改變。普通的Supervisor在此處並不適用,由於事件已經被持久化,單純地重啓Actor並不能一併撤銷日記發生的改變。若是Journal存取失敗發生在重塑Actor的過程當中,則會觸發RecoveryFailed信號,同時Actor將中止或在回退後從新啓動。

    但如果Journal在持久化事件時發現錯誤,好比事件沒法被序列化,那麼它會主動拒絕持久化事件。此時該事件一定不會被Journal持久化,而會觸發一個EventRejectedException異常傳遞給EventSourcedBehavior,而後按Supervisor設定的策略進行處理。

    EventSourcedBehavior[Command, Event, State](
      persistenceId = PersistenceId.ofUniqueId("abc"),
      emptyState = State(),
      commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"),
      eventHandler = (state, evt) => throw new NotImplementedError("TODO: process the event return the next state"))
      .onPersistFailure(
        SupervisorStrategy.restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1))

    暫存消息

    在執行Effect.persist或persistAll,上一個unstashAll或者建立快照期間,全部新到來的消息將自動地被暫存,直到全部事件被持久化且全部的反作用執行完畢。同理,在重塑過程當中,新消息也將被暫存,直到重塑過程結束。除了自動暫存以外,在須要的時候,好比須要等候其餘條件一併成立時,也能夠用Effect.stash手動開始暫存,待條件所有齊備後再thenUnstashAll。

    設置Stash的消息數量請配置:akka.persistence.typed.stash-capacity = 10000

    ⚠️ 因爲Stash的消息都暫存在內存裏,因此在如下狀況發生時,這些消息將丟失:

    • 當Actor被Cluster Sharding鈍化或從新分配時
    • 當Actor因處理命令或執行反作用時拋出異常而被中止或重啓時
    • 當Actor在持久化事件過程當中觸發異常時(若定義了onPersistFailure回退策略,則暫存的命令會被保留並在稍後再處理)

    💀 Akka Persistence爲何沒有爲Mailbox提供一個持久化方案?或者,這應該是ConsumerController的責任?
    🔈 參見:耐久的Producer

    CQRS

    Akka Persistence使用EventSourcedBehavior,配合Persistence Query的EventsByTag,實現CQRS模式。

    Handler設計指南

    與Handler單獨放置的常見方案不一樣,Akka Persistence推薦將CommandHandler與EventHandler都設計在State裏。這樣State即可看成包括了業務邏輯和數據的完整領域對象。而在State剛建立時,除了專門定義一個初始化的State類外,也能夠用Option[State]來代替,這樣Option.None即表明了初始狀態,而Option.Some則是更新後的狀態,而後用case匹配便可。(💀 注意示例代碼裏State用的Option[Account])

    完整示例以下:

    /**
     * Bank account example illustrating:
     * - Option[State] that is starting with None as the initial state
     * - event handlers in the state classes
     * - command handlers in the state classes
     * - replies of various types, using withEnforcedReplies
     */
    object AccountExampleWithOptionState {
    
      //#account-entity
      object AccountEntity {
        // Command
        sealed trait Command extends CborSerializable
        final case class CreateAccount(replyTo: ActorRef[OperationResult]) extends Command
        final case class Deposit(amount: BigDecimal, replyTo: ActorRef[OperationResult]) extends Command
        final case class Withdraw(amount: BigDecimal, replyTo: ActorRef[OperationResult]) extends Command
        final case class GetBalance(replyTo: ActorRef[CurrentBalance]) extends Command
        final case class CloseAccount(replyTo: ActorRef[OperationResult]) extends Command
    
        // Reply
        sealed trait CommandReply extends CborSerializable
        sealed trait OperationResult extends CommandReply
        case object Confirmed extends OperationResult
        final case class Rejected(reason: String) extends OperationResult
        final case class CurrentBalance(balance: BigDecimal) extends CommandReply
    
        // Event
        sealed trait Event extends CborSerializable
        case object AccountCreated extends Event
        case class Deposited(amount: BigDecimal) extends Event
        case class Withdrawn(amount: BigDecimal) extends Event
        case object AccountClosed extends Event
    
        val Zero = BigDecimal(0)
    
        // type alias to reduce boilerplate
        type ReplyEffect = akka.persistence.typed.scaladsl.ReplyEffect[Event, Option[Account]]
    
        // State
        sealed trait Account extends CborSerializable {
          def applyCommand(cmd: Command): ReplyEffect
          def applyEvent(event: Event): Account
        }
        // State: OpenedAccount
        case class OpenedAccount(balance: BigDecimal) extends Account {
          require(balance >= Zero, "Account balance can't be negative")
    
          override def applyCommand(cmd: Command): ReplyEffect =
            cmd match {
              case Deposit(amount, replyTo) =>
                Effect.persist(Deposited(amount)).thenReply(replyTo)(_ => Confirmed)
    
              case Withdraw(amount, replyTo) =>
                if (canWithdraw(amount))
                  Effect.persist(Withdrawn(amount)).thenReply(replyTo)(_ => Confirmed)
                else
                  Effect.reply(replyTo)(Rejected(s"Insufficient balance $balance to be able to withdraw $amount"))
    
              case GetBalance(replyTo) =>
                Effect.reply(replyTo)(CurrentBalance(balance))
    
              case CloseAccount(replyTo) =>
                if (balance == Zero)
                  Effect.persist(AccountClosed).thenReply(replyTo)(_ => Confirmed)
                else
                  Effect.reply(replyTo)(Rejected("Can't close account with non-zero balance"))
    
              case CreateAccount(replyTo) =>
                Effect.reply(replyTo)(Rejected("Account is already created"))
    
            }
    
          override def applyEvent(event: Event): Account =
            event match {
              case Deposited(amount) => copy(balance = balance + amount)
              case Withdrawn(amount) => copy(balance = balance - amount)
              case AccountClosed     => ClosedAccount
              case AccountCreated    => throw new IllegalStateException(s"unexpected event [$event] in state [OpenedAccount]")
            }
    
          def canWithdraw(amount: BigDecimal): Boolean = {
            balance - amount >= Zero
          }
        }
    
        // State: ClosedAccount
        case object ClosedAccount extends Account {
          override def applyCommand(cmd: Command): ReplyEffect =
            cmd match {
              case c: Deposit =>
                replyClosed(c.replyTo)
              case c: Withdraw =>
                replyClosed(c.replyTo)
              case GetBalance(replyTo) =>
                Effect.reply(replyTo)(CurrentBalance(Zero))
              case CloseAccount(replyTo) =>
                replyClosed(replyTo)
              case CreateAccount(replyTo) =>
                replyClosed(replyTo)
            }
    
          private def replyClosed(replyTo: ActorRef[AccountEntity.OperationResult]): ReplyEffect =
            Effect.reply(replyTo)(Rejected(s"Account is closed"))
    
          override def applyEvent(event: Event): Account =
            throw new IllegalStateException(s"unexpected event [$event] in state [ClosedAccount]")
        }
    
        // when used with sharding, this TypeKey can be used in `sharding.init` and `sharding.entityRefFor`:
        val TypeKey: EntityTypeKey[Command] = EntityTypeKey[Command]("Account")
    
        def apply(persistenceId: PersistenceId): Behavior[Command] = {
          // type of State is Option[Account]
          EventSourcedBehavior.withEnforcedReplies[Command, Event, Option[Account]](
            persistenceId,
            None,
            // use result of case match for the parameter handler.
            (state, cmd) =>
              state match {
                case None          => onFirstCommand(cmd)
                case Some(account) => account.applyCommand(cmd)
              },
            // match type Option[Account] declared in withEnforcedReplies.
            (state, event) =>
              state match {
                case None          => Some(onFirstEvent(event))
                case Some(account) => Some(account.applyEvent(event))
              })
        }
    
        def onFirstCommand(cmd: Command): ReplyEffect = {
          cmd match {
            case CreateAccount(replyTo) =>
              Effect.persist(AccountCreated).thenReply(replyTo)(_ => Confirmed)
            case _ =>
              // CreateAccount before handling any other commands
              Effect.unhandled.thenNoReply()
          }
        }
    
        def onFirstEvent(event: Event): Account = {
          event match {
            case AccountCreated => OpenedAccount(Zero)
            case _              => throw new IllegalStateException(s"unexpected event [$event] in state [EmptyAccount]")
          }
        }
      }
    }

    快照 Snapshot

    快照的初衷,是爲了提升Recovery的效率,因此相應能夠在構造EventSourcedBehavior時使用.snapshotWhen()定義建立快照的兩種狀況:一是每N條事件時建立一份快照;二是當知足特定條件時建立一份快照。

    💀 snapshotWhen裏的case匹配什麼鬼?
    🔈 參見Akka API

    • snapshotWhen:在指定狀態和序列號的條件下,當指定事件被持久化後,即建立一份快照。當有多條事件時,則要等全部事件完成持久化後纔會建立快照。
      def snapshotWhen(predicate: (State, Event, Long) ⇒ Boolean): EventSourcedBehavior[Command, Event, State]

    • withRetention:指定保留或刪除快照的策略。默認狀況下,快照不會自動保存和刪除。
      def withRetention(criteria: RetentionCriteria): EventSourcedBehavior[Command, Event, State]

    如下示例即指定在觸發BookingCompleted事件後建立一份快照:

    EventSourcedBehavior[Command, Event, State](
      persistenceId = PersistenceId.ofUniqueId("abc"),
      emptyState = State(),
      commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"),
      eventHandler = (state, evt) => state)
      .snapshotWhen {
        case (state, BookingCompleted(_), sequenceNumber) => true
        case (state, event, sequenceNumber)               => false
      }
      .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 2))

    在重塑Actor時,默認會使用SnapshotSelectionCriteria.Latest來選擇最新的(最年輕)的快照版本,除非使用withRecovery裏的Recovery參數指定其餘策略(好比完全禁用快照):

    EventSourcedBehavior[Command, Event, State](
      persistenceId = PersistenceId.ofUniqueId("abc"),
      emptyState = State(),
      commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"),
      eventHandler = (state, evt) => throw new NotImplementedError("TODO: process the event return the next state"))
      .withRecovery(Recovery.withSnapshotSelectionCriteria(SnapshotSelectionCriteria.none))

    除了默認提供的snapshot-store插件(akka.persistence.snapshot-store.plugin,須要配置),能夠使用EventSourcedBehavior.withSnapshotPluginId指定其餘的替代插件。

    保存快照可能會失敗,但它不會致使Actor的中止或重啓,只會觸發信號SnapshotCompleted或者SnapshotFailed,並記入日誌Log。

    刪除快照

    每當有新的快照成功建立時,舊的快照都將根據RetentionCriteria裏設置的條件自動刪除。在上面的例子裏,將在每100條事件時(numberOfEvents = 100)建立一份快照,而後每份序列號小於已保存快照的序列號減去keepNSnapshots * numberOfEvents的快照會被自動刪除(每隔200號刪除以前的快照)。

    ⚠️ 根據Akka API的說明,若是將EventSourcedBehavior.withRetention和RetentionCriteria.snapshotEvery一塊兒使用,則符合snapshotWhen定義條件而觸發的快照將不會致使舊快照被刪除。此類刪除僅當單獨使用withRetention,且匹配RetentionCriteria中的numberOfEvents設定值時纔會觸發。

    在刪除快照時,將會觸發DeleteSnapshotsCompleted或DeleteSnapshotsFailed信號,可藉此進行調試。

    刪除事件

    💀 除非頭鐵,在一個以Event Sourced爲基礎實現模式的系統裏,誰會沒事刪除事件?!相反,即便是由於應用版本升級而對原有的事件進行改造,那麼在CQRS Journey裏提出的事件版本遷移才理應是更恰當的選擇。而在Akka Persistence裏,這被稱爲綱要演進Schema Evolution

    刪除事件與刪除快照的策略,都是在withRetention裏用RetentionCriteria.withDeleteEventsOnSnapshot指定的,且同期的事件會先於快照被刪除,而只保留最新版本的快照(💀這便與非EventSourced的應用有何區別?)。但這只是Akka Persistence認爲的刪除,至於底層的Event Store是否真的從數據庫中刪除該事件,則由EventStore的具體實現決定。

    EventSourcedBehavior[Command, Event, State](
      persistenceId = PersistenceId.ofUniqueId("abc"),
      emptyState = State(),
      commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"),
      eventHandler = (state, evt) => throw new NotImplementedError("TODO: process the event return the next state"))
      .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 2).withDeleteEventsOnSnapshot)
      .receiveSignal { // optionally respond to signals
        case (state, _: SnapshotFailed)        => // react to failure
        case (state, _: DeleteSnapshotsFailed) => // react to failure
        case (state, _: DeleteEventsFailed)    => // react to failure
      }

    測試 Persistent Actor

    🏭 (看到此處時候更新到2.6.6,要注意這部分和Akka Persistence同樣在將來版本會有大的變化)
    com.typesafe.akka:akka-persistence-typed_2.12:2.6.6
    com.typesafe.akka:akka-persistence-testkit_2.12:2.6.6

    單元測試

    Akka Persistence提供了EventSourcedBehaviorTestKit幫助進行測試,它按照一次一條命令的方式同步執行並返回結果,方便你斷言其行爲。

    使用時,經過加載EventSourcedBehaviorTestKit.config來啓動在內存中模擬的事件存儲和快照功能。

    Command、Event以及State的序列化校驗會自動完成,相關的設置能夠在建立EventSourcedBehaviorTestKit時,使用SerializationSettings進行自定義。默認狀況下,它只負責序列化是否可用而不檢查結果是否一致,因此要檢查一致性就要啓用verifyEquality,而且用case class之類的方法實現Command、Event和State的equals。

    要測試重塑功能,能夠使用EventSourcedBehaviorTestKit.restart。完整示例以下:

    class AccountExampleDocSpec
        extends ScalaTestWithActorTestKit(EventSourcedBehaviorTestKit.config)
        with AnyWordSpecLike
        with BeforeAndAfterEach
        with LogCapturing {
    
      private val eventSourcedTestKit =
        EventSourcedBehaviorTestKit[AccountEntity.Command, AccountEntity.Event, AccountEntity.Account](
          system,
          AccountEntity("1", PersistenceId("Account", "1")))
    
      override protected def beforeEach(): Unit = {
        super.beforeEach()
        eventSourcedTestKit.clear()
      }
    
      "Account" must {
        "be created with zero balance" in {
          val result = eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.CreateAccount(_))
          result.reply shouldBe AccountEntity.Confirmed
          result.event shouldBe AccountEntity.AccountCreated
          result.stateOfType[AccountEntity.OpenedAccount].balance shouldBe 0
        }
    
        "handle Withdraw" in {
          eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.CreateAccount(_))
    
          val result1 = eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.Deposit(100, _))
          result1.reply shouldBe AccountEntity.Confirmed
          result1.event shouldBe AccountEntity.Deposited(100)
          result1.stateOfType[AccountEntity.OpenedAccount].balance shouldBe 100
    
          val result2 = eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.Withdraw(10, _))
          result2.reply shouldBe AccountEntity.Confirmed
          result2.event shouldBe AccountEntity.Withdrawn(10)
          result2.stateOfType[AccountEntity.OpenedAccount].balance shouldBe 90
        }
    
        "reject Withdraw overdraft" in {
          eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.CreateAccount(_))
          eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.Deposit(100, _))
    
          val result = eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.Withdraw(110, _))
          result.replyOfType[AccountEntity.Rejected]
          result.hasNoEvents shouldBe true
        }
    
        "handle GetBalance" in {
          eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.CreateAccount(_))
          eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.Deposit(100, _))
    
          val result = eventSourcedTestKit.runCommand[AccountEntity.CurrentBalance](AccountEntity.GetBalance(_))
          result.reply.balance shouldBe 100
          result.hasNoEvents shouldBe true
        }
      }
    }

    持久化測試

    🏭 com.typesafe.akka:akka-persistence-testkit_2.12:2.6.6

    要測試事件是否被成功持久化,則要使用PersistenceTestKit。它提供了不一樣的工具集:

    • 類PersistenceTestKit用於事件,對應的PersistenceTestKitPlugin用於模擬事件存儲。
    • 類SnapshotTestKit用於快照,對應的PersistenceTestKitSnapshotPlugin用於模擬快照存儲。

    使用前,須要在用於初始化TestKit的ActorSystem中進行配置:

    object TestKitTypedConf {
      val yourConfiguration = ConfigFactory.defaultApplication()
      val system = ActorSystem(
        ??? /*some behavior*/,
        "test-system",
        PersistenceTestKitPlugin.config.withFallback(yourConfiguration))
    
      val testKit = PersistenceTestKit(system)
    }
    
    object SnapshotTypedConf {
      val yourConfiguration = ConfigFactory.defaultApplication()
      val system = ActorSystem(
        ??? /*some behavior*/,
        "test-system",
        PersistenceTestKitSnapshotPlugin.config.withFallback(yourConfiguration))
    
      val testKit = SnapshotTestKit(system)
    }

    使用PersistenceTestKit,能夠實施如下測試行爲:

    • 檢查某個關注事件是否將要被持久化的那一個。
    • 檢查某個關注事件是否已被持久化。
    • 讀取一組事件序列,方便逐個檢視。
    • 清空全部已持久化的事件。
    • 讀取全部已持久化的事件。
    • 拒絕某個事件被持久化(快照不能被拒絕)。
    • 把事件放入存儲,以測試重塑功能。
    • 當要持久化、讀取或刪除某個事件時拋出異常。
    • 自定義底層存儲設施的存取策略。
    自定義存儲策略

    經過爲事件存儲實現ProcessingPolicy[EventStorage.JournalOperation]或者爲快照存儲實現ProcessingPolicy[SnapshotStorage.SnapshotOperation],而後使用withPolicy()加載,能夠自定義存儲的存取策略,實現更細粒度的控制。

    其中,較爲關鍵的是ProcessingPolicy.tryProcess(persistenceId, storageOperation)方法。storageOperation方法包括:

    • Event Storage
      • ReadEvents
      • WriteEvents
      • DeleteEvents
      • ReadSeqNum
    • Snapshot Storage
      • ReadSnapshot
      • WriteSnapshot
      • DeleteSnapshotByCriteria
      • DeleteSnapshotByMeta:由SequenceNumber和TimeStamp構成的Meta

    而tryProcess的結果則是下列情形之一:

    • ProcessingSuccess:全部事件都被成功存取或刪除。
    • StorageFailure:模擬觸發異常。
    • Reject:模擬拒絕存取。
    object PersistenceTestKitSampleSpec {
      final case class Cmd(data: String) extends CborSerializable
      final case class Evt(data: String) extends CborSerializable
      object State {
        val empty: State = new State
      }
      final class State extends CborSerializable {
        def updated(event: Evt): State = this
      }
    }
    
    class PersistenceTestKitSampleSpec
        extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config.withFallback(ConfigFactory.defaultApplication()))
        with AnyWordSpecLike
        with BeforeAndAfterEach {
    
      val persistenceTestKit = PersistenceTestKit(system)
    
      override def beforeEach(): Unit = {
        persistenceTestKit.clearAll()
      }
    
      "Persistent actor" should {
        "persist all events" in {
          val persistenceId = PersistenceId.ofUniqueId("your-persistence-id")
          val persistentActor = spawn(
            EventSourcedBehavior[Cmd, Evt, State](
              persistenceId,
              emptyState = State.empty,
              commandHandler = (_, cmd) => Effect.persist(Evt(cmd.data)),
              eventHandler = (state, evt) => state.updated(evt)))
          val cmd = Cmd("data")
    
          persistentActor ! cmd
    
          val expectedPersistedEvent = Evt(cmd.data)
          persistenceTestKit.expectNextPersisted(persistenceId.id, expectedPersistedEvent)
        }
      }
    }
    
    class SampleEventStoragePolicy extends EventStorage.JournalPolicies.PolicyType {
      //you can use internal state, it does not need to be thread safe
      var count = 1
    
      override def tryProcess(persistenceId: String, processingUnit: JournalOperation): ProcessingResult =
        if (count < 10) {
          count += 1
          //check the type of operation and react with success or with reject or with failure.
          //if you return ProcessingSuccess the operation will be performed, otherwise not.
          processingUnit match {
            case ReadEvents(batch) if batch.nonEmpty => ProcessingSuccess
            case WriteEvents(batch) if batch.size > 1 =>
              ProcessingSuccess
            case ReadSeqNum      => StorageFailure()
            case DeleteEvents(_) => Reject()
            case _               => StorageFailure()
          }
        } else {
          ProcessingSuccess
        }
    }
    
    class SampleSnapshotStoragePolicy extends SnapshotStorage.SnapshotPolicies.PolicyType {
      //you can use internal state, it does not need to be thread safe
      var count = 1
    
      override def tryProcess(persistenceId: String, processingUnit: SnapshotOperation): ProcessingResult =
        if (count < 10) {
          count += 1
          //check the type of operation and react with success or with reject or with failure.
          //if you return ProcessingSuccess the operation will be performed, otherwise not.
          processingUnit match {
            case ReadSnapshot(_, payload) if payload.nonEmpty =>
              ProcessingSuccess
            case WriteSnapshot(meta, payload) if meta.sequenceNr > 10 =>
              ProcessingSuccess
            case DeleteSnapshotsByCriteria(_) => StorageFailure()
            case DeleteSnapshotByMeta(meta) if meta.sequenceNr < 10 =>
              ProcessingSuccess
            case _ => StorageFailure()
          }
        } else {
          ProcessingSuccess
        }
    }
    
    class PersistenceTestKitSampleSpecWithPolicy
        extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config.withFallback(ConfigFactory.defaultApplication()))
        with AnyWordSpecLike
        with BeforeAndAfterEach {
    
      val persistenceTestKit = PersistenceTestKit(system)
    
      override def beforeEach(): Unit = {
        persistenceTestKit.clearAll()
        persistenceTestKit.resetPolicy()
      }
    
      "Testkit policy" should {
        "fail all operations with custom exception" in {
          val policy = new EventStorage.JournalPolicies.PolicyType {
    
            class CustomFailure extends RuntimeException
    
            override def tryProcess(persistenceId: String, processingUnit: JournalOperation): ProcessingResult =
              processingUnit match {
                case WriteEvents(_) => StorageFailure(new CustomFailure)
                case _              => ProcessingSuccess
              }
          }
    
          persistenceTestKit.withPolicy(policy)
    
          val persistenceId = PersistenceId.ofUniqueId("your-persistence-id")
          val persistentActor = spawn(
            EventSourcedBehavior[Cmd, Evt, State](
              persistenceId,
              emptyState = State.empty,
              commandHandler = (_, cmd) => Effect.persist(Evt(cmd.data)),
              eventHandler = (state, evt) => state.updated(evt)))
    
          persistentActor ! Cmd("data")
          persistenceTestKit.expectNothingPersisted(persistenceId.id)
        }
      }
    }

    集成測試

    PersistenceTestKit能夠配合ActorTestKit一塊兒使用,但有幾點須要注意。一是對集羣條件下涉及多個節點的測試,得使用單獨的事件和快照存儲。儘管能夠使用Persistence Plugin Proxy,但使​​用真實的數據庫一般會更好、更現實。二是某些Persistence插件會自動建立數據庫的表,但在多個ActorSystem併發要求建表時就有必定的侷限性了。因此爲協調數據庫的初始化工做,就得使用PersistenceInit工具。

    val timeout = 5.seconds
    val done: Future[Done] = PersistenceInit.initializeDefaultPlugins(system, timeout)
    Await.result(done, timeout)

    綱要演進

    💀 這就是前面提到的事件版本遷移了!Schema,好比XML Schema,是描述特定結構的一種方式,翻譯爲「綱要」貌似妥帖一些。

    這一章的重點,是介紹不一樣的綱要演進策略,以及如何根據領域模型的實際狀況,在不一樣策略之間做出抉擇。固然,這些策略並非所有或者惟一的選擇,而只是Akka給出的有限方案。其本質,都是爲了保證舊系統下舊綱要規格格式的事件,在遷移到新系統後也能保持一致性,且不會爲了處理這些不一樣版本的同一類型事件,而給業務邏輯帶來額外負擔。因此,綱要演進要實現的目標包括:

    • 保證系統繼續正常運行,而無需進行大規模的事件版本遷移。
    • 保證新舊版本事件兼容,即便是舊版本的事件也能以新面貌統一呈現。
    • 在重塑或查詢過程當中,將舊版本事件透明地升級爲最新版本,從而使業務邏輯無需考慮事件的多個版本的兼容性問題。

    其中,綱要演進的誘因包括,相應的解決方案也可能是利用EventAdapter實現過濾:

    • 向事件中增長一個字段。
    • 原有字段被刪除或改名。
    • 事件從Protocol中刪除。
    • 將一個事件劃分爲幾個更小粒度的事件。

    選擇適當的序列化格式

    選擇適當的序列化格式很是重要,這不只關乎序列化的性能,還關乎綱要演進的方案肯定和細節的實現。選擇不當的話,序列化的擴展將很是困難,系統中將不得不保留多個不一樣版本的序列化代碼。Akka Persistence推薦的序列化方案主要包括:

    • Jackson:這是Akka強烈推薦的方案。
    • Google Protocol Buffers:能得到更精細的控制,但須要處理更多的序列化與領域模型之間的映射細節。
    • Apache的Thrift與Avro:主要提供二進制格式的序列化支持。

    🔗 參考文獻:Martin Kleppmann 所著Schema evolution in Avro, Protocol Buffers and Thrift

    默認狀況下,Akka Persistence使用Akka Serialization模塊裏基於Google Protocol Buffers實現的一個序列化器。若是Journal插件但願使用其餘類型的序列化器,則須要根據不一樣的數據庫進行挑選。但不管如何,Akka Persistence只是提供一個序列化器的可插拔接口,它不會自動處理消息的序列化。

    全部的消息都會被序列化成以下封裝結構:最底層也是最內層的是用黃色標註的有效載荷,它就是消息對象的實例被序列化後的結果,而後序列化器會附加它本身的SerializerId等信息一塊兒組成中間層的PersistentPayload,以後纔是Akka Persistence附加的SequenceNumber、PersistenceId等其餘一些信息包裹組成的最外層。(💀 想像一下對方收到這封信時又是怎麼一層層剝開的就好理解了。外面2層都是框架直接包攬了,只有核心那層須要本身操心。)因此序列化的要點,就在於最內層的消息對象要序列化成什麼樣子。對此,Java內置的序列化器用於調試還算勉強(想一想多層屬性嵌套的情形),生產環境下最好仍是另尋他途。因此,瞭解序列化器的優點和侷限性很重要,這樣才能在進行項目時迅速行動,而且無懼重構模型。

    akka

    如下是在Akka Serialization裏自定義有效載荷序列化器的示例:

    /**
     * Usually a serializer like this would use a library like:
     * protobuf, kryo, avro, cap'n proto, flatbuffers, SBE or some other dedicated serializer backend
     * to perform the actual to/from bytes marshalling.
     */
    
    final case class Person(name: String, surname: String)
    
    class SimplestPossiblePersonSerializer extends SerializerWithStringManifest {
      val Utf8 = Charset.forName("UTF-8")
      val PersonManifest = classOf[Person].getName
    
      // unique identifier of the serializer
      // 在反序列化時,這個SerializerId將用於加載同一類型的序列化器,以保證徹底對稱
      def identifier = 1234567
    
      // extract manifest to be stored together with serialized object
      override def manifest(o: AnyRef): String = o.getClass.getName
    
      // serialize the object
      override def toBinary(obj: AnyRef): Array[Byte] = obj match {
        case p: Person => s"""${p.name}|${p.surname}""".getBytes(Utf8)
        case _         => throw new IllegalArgumentException(s"Unable to serialize to bytes, class was: ${obj.getClass}!")
      }
    
      // deserialize the object, using the manifest to indicate which logic to apply
      override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef =
        manifest match {
          case PersonManifest =>
            val nameAndSurname = new String(bytes, Utf8)
            val Array(name, surname) = nameAndSurname.split("[|]")
            Person(name, surname)
          case _ =>
            throw new NotSerializableException(
              s"Unable to deserialize from bytes, manifest was: $manifest! Bytes length: " + bytes.length)
        }
    }

    相應在application.conf裏的配置:

    akka {
      actor {
        serializers {
          person = "docs.persistence.SimplestPossiblePersonSerializer"
        }
    
        serialization-bindings {
          "docs.persistence.Person" = person
        }
      }
    }

    Akka Serialization提供了相應的Jackson示例

    ⭕ 情形一:增長字段

    適用場景:向已經存在的事件類型裏添加一個新的字段。

    解決方案:添加字段是最多見的演進事由之一,只要添加的字段是二進制兼容的(💀 Jackson就是文本兼容的,不是二進制兼容的嗎?),就能很容易在序列化裏實現演進。此處用ProtoBuf示範,爲值機選座增長了一個靠窗或過道的字段seatType,而後給它一個默認值(此處用的SeatType.Unknown),或者能夠用Option[T]包裝,最後用ProtoBuf提供的方法hasSeatType區分新舊事件,再使用SeatType.fromString從字符串析取值。

    class ProtobufReadOptional {
      sealed abstract class SeatType { def code: String }
      object SeatType {
        def fromString(s: String) = s match {
          case Window.code => Window
          case Aisle.code  => Aisle
          case Other.code  => Other
          case _           => Unknown
        }
        case object Window extends SeatType { override val code = "W" }
        case object Aisle extends SeatType { override val code = "A" }
        case object Other extends SeatType { override val code = "O" }
        case object Unknown extends SeatType { override val code = "" }
      }
    
      case class SeatReserved(letter: String, row: Int, seatType: SeatType)
    
      /**
       * Example serializer impl which uses protocol buffers generated classes (proto.*)
       * to perform the to/from binary marshalling.
       */
      class AddedFieldsSerializerWithProtobuf extends SerializerWithStringManifest {
        override def identifier = 67876
    
        final val SeatReservedManifest = classOf[SeatReserved].getName
    
        override def manifest(o: AnyRef): String = o.getClass.getName
    
        override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef =
          manifest match {
            case SeatReservedManifest =>
              // use generated protobuf serializer
              seatReserved(FlightAppModels.SeatReserved.parseFrom(bytes))
            case _ =>
              throw new NotSerializableException("Unable to handle manifest: " + manifest)
          }
    
        override def toBinary(o: AnyRef): Array[Byte] = o match {
          case s: SeatReserved =>
            FlightAppModels.SeatReserved.newBuilder
              .setRow(s.row)
              .setLetter(s.letter)
              .setSeatType(s.seatType.code)
              .build()
              .toByteArray
        }
    
        // -- fromBinary helpers --
    
        private def seatReserved(p: FlightAppModels.SeatReserved): SeatReserved =
          SeatReserved(p.getLetter, p.getRow, seatType(p))
    
        // handle missing field by assigning "Unknown" value
        private def seatType(p: FlightAppModels.SeatReserved): SeatType =
          if (p.hasSeatType) SeatType.fromString(p.getSeatType) else SeatType.Unknown
      }
    }

    相應的ProtoBuf配置FlightAppModels.proto,其中新增長的seatType是optional。ProtoBuf會根據配置生成一個具體負責Marshall的工具類,optional字段將會賦與一hasXXX的方法:

    option java_package = "docs.persistence.proto";
    option optimize_for = SPEED;
    
    message SeatReserved {
      required string letter   = 1;
      required uint32 row      = 2;
      optional string seatType = 3; // the new field
    }

    ⭕ 情形二:重命名字段

    適用場景:解決設計之初不恰當的字段命名,使其更符合業務需求。此處舉例用了SeatReserved中原來的code,如今的seatNr。

    解決方案一:使用符合IDL規範(Interface Description Language)的序列化器。這是最簡單有效的方案,也是ProtoBuf和Thrift採起的方案,好比上面的.proto便是用IDL描述的映射結構,而後ProtoBuf再據此描述自動生成工具類。要更名時,只須要維護IDL映射結構,保持字段的ID不變,修改映射的名稱便可。

    akka

    // protobuf message definition, BEFORE:
    message SeatReserved {
      required string code = 1;
    }
    
    // protobuf message definition, AFTER:
    message SeatReserved {
      required string seatNr = 1; // field renamed, id remains the same
    }

    解決方案二:手動處理事件版本的遷移。在沒辦法使用IDL方式,好比使用Jackson格式進行序列化時,就只有手動進行轉換,給事件附加一個版本號字段,而後用手寫的EventAdapter進行反序列化的轉換(該EventAdapter在EventSourcedBehavior建立時加載)。在使用Jackson進行增長字段、改變事件結構等狀況下,這樣的方法也是適用的。

    akka

    class JsonRenamedFieldAdapter extends EventAdapter {
      import spray.json.JsObject
    
      val marshaller = new ExampleJsonMarshaller
    
      val V1 = "v1"
      val V2 = "v2"
    
      // this could be done independently for each event type
      override def manifest(event: Any): String = V2
    
      override def toJournal(event: Any): JsObject =
        marshaller.toJson(event)
    
      override def fromJournal(event: Any, manifest: String): EventSeq = event match {
        case json: JsObject =>
          EventSeq(marshaller.fromJson(manifest match {
            case V1      => rename(json, "code", "seatNr")
            case V2      => json // pass-through
            case unknown => throw new IllegalArgumentException(s"Unknown manifest: $unknown")
          }))
        case _ =>
          val c = event.getClass
          throw new IllegalArgumentException("Can only work with JSON, was: %s".format(c))
      }
    
      def rename(json: JsObject, from: String, to: String): JsObject = {
        val value = json.fields(from)
        val withoutOld = json.fields - from
        JsObject(withoutOld + (to -> value))
      }
    }

    ⭕ 情形三:刪除事件並忽略之

    適用場景:某個事件被認爲是多餘的、毫無價值甚至影響效率的,可是在重塑時卻無法跳過該事件。本例中是乘客按燈呼叫服務的事件CustomerBlinked。

    最簡單的方案:因爲事件並不能真正從Journal中完全刪除,因此一般是在重塑時經過忽略特定的事件達到刪除的效果。最簡單的方案,就是在EventAdapter中截留該事件而返回一個空的EventSeq,同時放過其餘類型的事件。該方案的弊端,在於從Storage中讀取事件時,仍須要反序列化這個事件,從而致使效率的損失。

    akka

    更成熟的方案:在上述方案基礎上,增長了在序列化器端的過濾,使特定事件再也不被反序列化。被忽略的事件被稱爲墓碑Tombstone。

    akka

    final case class CustomerBlinked(customerId: Long)
    
    case object EventDeserializationSkipped
    
    class RemovedEventsAwareSerializer extends SerializerWithStringManifest {
      val utf8 = Charset.forName("UTF-8")
      override def identifier: Int = 8337
    
      val SkipEventManifestsEvents = Set("docs.persistence.CustomerBlinked"
                                         // 其餘被忽略的事件...
      )
    
      override def manifest(o: AnyRef): String = o.getClass.getName
    
      override def toBinary(o: AnyRef): Array[Byte] = o match {
        case _ => o.toString.getBytes(utf8) // example serialization
      }
    
      override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef =
        manifest match {
          case m if SkipEventManifestsEvents.contains(m) =>
            EventDeserializationSkipped
    
          case other => new String(bytes, utf8)
        }
    }
    
    class SkippedEventsAwareAdapter extends EventAdapter {
      override def manifest(event: Any) = ""
      override def toJournal(event: Any) = event
    
      override def fromJournal(event: Any, manifest: String) = event match {
        case EventDeserializationSkipped => EventSeq.empty
        case _                           => EventSeq(event)
      }
    }

    ⭕ 情形四:從數據模型中分離出域模型

    適用場景:這主要是從持久化無關(Persistence Ignorance)的角度,堅持採用POJO(Plain Ordinary Java Object)這樣的case class實現領域模型,並儘可能避免數據模型及數據庫、序列化器等底層細節和框架對領域模型的入侵。

    解決方案:建立一個EventAdapter,實現case class與數據庫存取class之間一對一的映射。

    /** Domain model - highly optimised for domain language and maybe "fluent" usage */
    object DomainModel {
      final case class Customer(name: String)
      final case class Seat(code: String) {
        def bookFor(customer: Customer): SeatBooked = SeatBooked(code, customer)
      }
    
      final case class SeatBooked(code: String, customer: Customer)
    }
    
    /** Data model - highly optimised for schema evolution and persistence */
    object DataModel {
      final case class SeatBooked(code: String, customerName: String)
    }
    
    class DetachedModelsAdapter extends EventAdapter {
      override def manifest(event: Any): String = ""
    
      override def toJournal(event: Any): Any = event match {
        case DomainModel.SeatBooked(code, customer) =>
          DataModel.SeatBooked(code, customer.name)
      }
      override def fromJournal(event: Any, manifest: String): EventSeq = event match {
        case DataModel.SeatBooked(code, customerName) =>
          EventSeq(DomainModel.SeatBooked(code, DomainModel.Customer(customerName)))
      }
    }

    ⭕ 情形五:以可讀樣式存儲事件

    適用場景:但願以JSON等更可讀的樣式,而不是一個二進制流的方式來保存事件。這在最近的一些諸如MongoDB、PostgreSQL的NoSQL類型數據庫中應用較爲廣泛。在作這樣的決定前,必需要肯定是存儲格式即是要可讀樣式的,還只是想窺探事件存儲而須要可讀樣式的。若是是後者,Persistence Query也能達到一樣目的,且不會影響存儲效率。

    解決方案:建立EventAdapter,將事件轉化爲JSON後交給Journal直接保存。前提是必須有適配Akka Persistence的Journal插件支持,這樣數據庫才能直接識別EventAdapter轉化來的JSon對象,而後存儲它。

    // act as-if JSON library
    class ExampleJsonMarshaller {
      def toJson(any: Any): JsObject = JsObject()
      def fromJson(json: JsObject): Any = new Object
    }
    
    class JsonDataModelAdapter extends EventAdapter {
      override def manifest(event: Any): String = ""
    
      val marshaller = new ExampleJsonMarshaller
    
      override def toJournal(event: Any): JsObject =
        marshaller.toJson(event)
    
      override def fromJournal(event: Any, manifest: String): EventSeq = event match {
        case json: JsObject =>
          EventSeq(marshaller.fromJson(json))
        case _ =>
          throw new IllegalArgumentException("Unable to fromJournal a non-JSON object! Was: " + event.getClass)
      }
    }

    替代方案:若是找不到能支持上述方案的Journal插件,使用akka.persistence.journal.AsyncWriteJournal來本身手動實現一個JSON格式的序列化器,再配合一個EventAdapter實現toJournal與fromJournal也是可行的。

    ⭕ 情形六:把事件切分爲更小的粒度

    適用場景:隨着領域分析的深刻,須要將原有粗粒度的一個事件切分爲更小粒度的若干事件。此處以「用戶信息改變」爲例,將其切分爲更小粒度的「用戶名改變」「地址改變」等等。

    解決方案:依舊是藉助EventAdapter,將Journal裏保存的一個大事件,切分爲若干個小事件,反之亦然。

    akka

    trait Version1
    trait Version2
    
    // V1 event:
    final case class UserDetailsChanged(name: String, address: String) extends Version1
    
    // corresponding V2 events:
    final case class UserNameChanged(name: String) extends Version2
    final case class UserAddressChanged(address: String) extends Version2
    
    // event splitting adapter:
    class UserEventsAdapter extends EventAdapter {
      override def manifest(event: Any): String = ""
    
      override def fromJournal(event: Any, manifest: String): EventSeq = event match {
        case UserDetailsChanged(null, address) => EventSeq(UserAddressChanged(address))
        case UserDetailsChanged(name, null)    => EventSeq(UserNameChanged(name))
        case UserDetailsChanged(name, address) =>
          EventSeq(UserNameChanged(name), UserAddressChanged(address))
        case event: Version2 => EventSeq(event)
      }
    
      override def toJournal(event: Any): Any = event
    }

    Persistence Query

    🏭 com.typesafe.akka:akka-persistence-query_2.12:2.6.6

    Akka Persistence公開了一個基於異步流的查詢接口,使CQRS的讀端(Read-Side)能夠利用該接口讀取Journal裏保存的事件,從而執行更新UI等任務。可是,Persistence Query不能徹底勝任讀端的要求,它一般只能協助應用將數據從寫端遷移到讀端,這也是爲了更好的執行效率和可擴展性,而建議讀端與寫端分別使用不一樣類型數據庫的緣由。

    💀 寫端一般都是以追加方式寫入帶Id的Event,因此傳統的關係型數據庫MySQL、Oracle或者象LevelDB、Redis這類Key-Value類型的NoSQL數據庫一般會有較好的性能。而在讀端,相似MongoDB這樣文檔類型的NoSQL數據庫會有更大的市場。

    考慮到要儘量保持接口的通用性,Akka Persistence沒有過多幹涉Persistence Query的API定義,只要求:每一個讀日記(Read Journal)都必須明確代表其支持的查詢類型,並按最多見的場景預約義了一些查詢類型,而由Journal的插件本身去選擇其中的一部分並實現之。

    讀日記

    ReadJournal都屬於Akka社區插件(🔗 Community Plugins),由用戶本身開發並維護。每一個ReadJournal對應一個存儲方案(能夠是數據庫,甚至文本文件),且都有一個固定的Id。該Id能夠使用相似readJournalFor[NoopJournal](NoopJournal.identifier)的方式獲取它,但這不是強制的,只是推薦作法。

    要使用ReadJournal進行查詢,就須要先獲取它的一個實例(此處的Id即爲akka.persistence.query.my-read-journal):

    // obtain read journal by plugin id
    val readJournal =
      PersistenceQuery(system).readJournalFor[MyScaladslReadJournal]("akka.persistence.query.my-read-journal")
    
    // issue query to journal
    val source: Source[EventEnvelope, NotUsed] =
      readJournal.eventsByPersistenceId("user-1337", 0, Long.MaxValue)
    
    // materialize stream, consuming events
    source.runForeach { event =>
      println("Event: " + event)
    }

    Akka Persistence的Read Journal API主要包括如下內容:

    • persistenceIds():查詢系統中全部活動的Live PersistenceId,每當有新的Id被建立時也將被加入流當中。

    • currentPersistenceIds():查詢系統中當前的PersistenceId,在執行查詢以後新建立的Id不會被加入流中。

    • eventsByPersistenceId(id, fromSequenceNr = 0L, toSequenceNr = Long.MaxValue):查詢PersistenceId對應Persistent Actor的事件,這有點相似於重塑過程當中依次獲取事件的活動,但該查詢流也是活動的,因此多數Journal會採用輪詢的方式確保結果是最新的。此處的SequenceNr用於從指定位置開始查詢事件,這也變相地爲「斷點續注」提供了支持。

    • eventTag(tag, offset):查詢指定Tag的全部事件。事件可能來源於多個PersistenceId,並且不能保證其前後順序,除非Journal在反饋結果時提早排好序。

      val NumberOfEntityGroups = 10
      
      def tagEvent(entityId: String, event: Event): Set[String] = {
        val entityGroup = s"group-${math.abs(entityId.hashCode % NumberOfEntityGroups)}"
        event match {
          // OrderCompleted類型的事件會額外多一個標籤
          case _: OrderCompleted => Set(entityGroup, "order-completed")
          case _                 => Set(entityGroup)
        }
      }
      
      def apply(entityId: String): Behavior[Command] = {
        EventSourcedBehavior[Command, Event, State](
          persistenceId = PersistenceId("ShoppingCart", entityId),
          emptyState = State(),
          commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"),
          eventHandler = (state, evt) => throw new NotImplementedError("TODO: process the event return the next state"))
          .withTagger(event => tagEvent(entityId, event))
      }
      
      // assuming journal is able to work with numeric offsets we can:
      
      val completedOrders: Source[EventEnvelope, NotUsed] =
        readJournal.eventsByTag("order-completed", Offset.noOffset)
      
      // find first 10 completed orders:
      val firstCompleted: Future[Vector[OrderCompleted]] =
        completedOrders
          .map(_.event)
          .collectType[OrderCompleted]
          .take(10) // cancels the query stream after pulling 10 elements
          .runFold(Vector.empty[OrderCompleted])(_ :+ _)
      
      // start another query, from the known offset
      val furtherOrders = readJournal.eventsByTag("order-completed", offset = Sequence(10))
    • 查詢附屬信息:Persistence Query還支持查詢屬於流的附屬信息,不過具體得由Journal實現。

      final case class RichEvent(tags: Set[String], payload: Any)
      
      // a plugin can provide: order & infinite
      case class QueryMetadata(deterministicOrder: Boolean, infinite: Boolean)
      
      // Journal提供的查詢附屬信息API
      def byTagsWithMeta(tags: Set[String]): Source[RichEvent, QueryMetadata] = ???
      
      // 使用上述API查詢Materialized values
      val query: Source[RichEvent, QueryMetadata] = readJournal.byTagsWithMeta(Set("red", "blue"))
      
      query
        .mapMaterializedValue { meta =>
          println(
            s"The query is: " +
            s"ordered deterministically: ${meta.deterministicOrder}, " +
            s"infinite: ${meta.infinite}")
        }
        .map { event =>
          println(s"Event payload: ${event.payload}")
        }
        .runWith(Sink.ignore)

    性能與非範式化

    在CQRS模式下,讀寫端只要能保證最終的一致性,能夠分別擁有不一樣形式的存儲方案。據此,能夠在讀端採起相似數據庫視圖的方式,創建固定結構的物化視圖Materialized Views反覆使用,從而提升查詢的效率。這樣的視圖無需嚴格遵照數據庫的範式規則,怎麼方便怎麼來,好比用文檔類型的NoSQL就不錯。

    • 藉助兼容🔗 JDK9中Reactive Streams接口的數據庫建立物化視圖:這須要讀端數據存儲支持Reactive Streams接口,這樣Journal直接將寫端數據注入讀端便可。

      implicit val system = ActorSystem()
      
      val readJournal = PersistenceQuery(system).readJournalFor[MyScaladslReadJournal](JournalId)
      val dbBatchWriter: Subscriber[immutable.Seq[Any]] = ReactiveStreamsCompatibleDBDriver.batchWriter
      
      // Using an example (Reactive Streams) Database driver
      readJournal
        .eventsByPersistenceId("user-1337", fromSequenceNr = 0L, toSequenceNr = Long.MaxValue)
        .map(envelope => envelope.event)
        .map(convertToReadSideTypes) // convert to datatype
        .grouped(20) // batch inserts into groups of 20
        .runWith(Sink.fromSubscriber(dbBatchWriter)) // write batches to read-side database
    • 藉助mapAsync建立物化視圖:在沒有Reactive Streams支持的狀況下,本身動手實現從寫端事件數據庫到讀端數據庫的轉換過程。

      // 模擬的讀端數據庫
      trait ExampleStore {
        def save(event: Any): Future[Unit]
      }
      
      val store: ExampleStore = ???
      
      readJournal
        .eventsByTag("bid", NoOffset)
        .mapAsync(parallelism = 1) { e =>
          store.save(e)
        }
        .runWith(Sink.ignore)
      • 實現可恢復的注入:保存好Offset或者SequenceNumber,便可實現「斷點續注」。
      def runQuery(writer: ActorRef[TheOneWhoWritesToQueryJournal.Command])(implicit system: ActorSystem[_]): Unit = {
        val readJournal =
          PersistenceQuery(system.toClassic).readJournalFor[MyScaladslReadJournal](JournalId)
      
        import system.executionContext
        implicit val timeout = Timeout(3.seconds)
      
        val bidProjection = new MyResumableProjection("bid")
      
        bidProjection.latestOffset.foreach { startFromOffset =>
          readJournal
            .eventsByTag("bid", Sequence(startFromOffset))
            .mapAsync(8) { envelope =>
              writer
                .ask((replyTo: ActorRef[Done]) => TheOneWhoWritesToQueryJournal.Update(envelope.event, replyTo))
                .map(_ => envelope.offset)
            }
            .mapAsync(1) { offset =>
              bidProjection.saveProgress(offset)
            }
            .runWith(Sink.ignore)
        }
      }
      
      // 用一個Actor來實際執行注入任務
      object TheOneWhoWritesToQueryJournal {
        sealed trait Command
        final case class Update(payload: Any, replyTo: ActorRef[Done]) extends Command
      
        def apply(id: String, store: ExampleStore): Behavior[Command] = {
          updated(ComplexState(), store)
        }
      
        private def updated(state: ComplexState, store: ExampleStore): Behavior[Command] = {
          Behaviors.receiveMessage {
            case command: Update =>
              val newState = updateState(state, command)
              if (state.readyToSave) store.save(Record(state))
              updated(newState, store)
          }
        }
      
        private def updateState(state: ComplexState, command: Command): ComplexState = {
          // some complicated aggregation logic here ...
          state
        }
      }

    查詢插件

    不管使用何種數據庫,只要是實現了ReadJournal的插件,都屬於查詢插件Query Plugins,都要公開查詢適用的場景和語義。

    讀日記插件

    全部的ReadJournal插件都必須實現akka.persistence.query.ReadJournalProvider,且該Provider必須能同時支持建立Scala與Java版本的ReadJournal實例(akka.persistence.query.scaladsl/javadsl.ReadJournal),其構造子主要有如下4種形式:

    • (ExtendedActorSystem, com.typesafe.config.Config, pathOfConfig):Config裏是ActorSystem中有關插件的配置,path裏是插件自身的配置
    • (ExtendedActorSystem, com.typesafe.config.Config)
    • (ExtendedActorSystem)
    • ()

    若是數據庫只支持查詢當前結果集,那麼對於這一類無限的事件流,ReadJournal就必須採起輪詢方式反覆嘗試讀取更新的事件,此時建議在配置中使用refresh-interval定義輪詢間隔。

    class MyReadJournalProvider(system: ExtendedActorSystem, config: Config) extends ReadJournalProvider {
    
      override val scaladslReadJournal: MyScaladslReadJournal =
        new MyScaladslReadJournal(system, config)
    
      override val javadslReadJournal: MyJavadslReadJournal =
        new MyJavadslReadJournal(scaladslReadJournal)
    }
    
    class MyScaladslReadJournal(system: ExtendedActorSystem, config: Config)
        extends akka.persistence.query.scaladsl.ReadJournal
        with akka.persistence.query.scaladsl.EventsByTagQuery
        with akka.persistence.query.scaladsl.EventsByPersistenceIdQuery
        with akka.persistence.query.scaladsl.PersistenceIdsQuery
        with akka.persistence.query.scaladsl.CurrentPersistenceIdsQuery {
    
      private val refreshInterval: FiniteDuration =
        config.getDuration("refresh-interval", MILLISECONDS).millis
    
      /**
       * You can use `NoOffset` to retrieve all events with a given tag or retrieve a subset of all
       * events by specifying a `Sequence` `offset`. The `offset` corresponds to an ordered sequence number for
       * the specific tag. Note that the corresponding offset of each event is provided in the
       * [[akka.persistence.query.EventEnvelope]], which makes it possible to resume the
       * stream at a later point from a given offset.
       *
       * The `offset` is exclusive, i.e. the event with the exact same sequence number will not be included
       * in the returned stream. This means that you can use the offset that is returned in `EventEnvelope`
       * as the `offset` parameter in a subsequent query.
       */
      override def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] = offset match {
        case Sequence(offsetValue) =>
          Source.fromGraph(new MyEventsByTagSource(tag, offsetValue, refreshInterval))
        case NoOffset => eventsByTag(tag, Sequence(0L)) //recursive
        case _ =>
          throw new IllegalArgumentException("MyJournal does not support " + offset.getClass.getName + " offsets")
      }
    
      override def eventsByPersistenceId(
          persistenceId: String,
          fromSequenceNr: Long,
          toSequenceNr: Long): Source[EventEnvelope, NotUsed] = {
        // implement in a similar way as eventsByTag
        ???
      }
    
      override def persistenceIds(): Source[String, NotUsed] = {
        // implement in a similar way as eventsByTag
        ???
      }
    
      override def currentPersistenceIds(): Source[String, NotUsed] = {
        // implement in a similar way as eventsByTag
        ???
      }
    
      // possibility to add more plugin specific queries
    
      def byTagsWithMeta(tags: Set[String]): Source[RichEvent, QueryMetadata] = {
        // implement in a similar way as eventsByTag
        ???
      }
    
    }
    
    class MyJavadslReadJournal(scaladslReadJournal: MyScaladslReadJournal)
        extends akka.persistence.query.javadsl.ReadJournal
        with akka.persistence.query.javadsl.EventsByTagQuery
        with akka.persistence.query.javadsl.EventsByPersistenceIdQuery
        with akka.persistence.query.javadsl.PersistenceIdsQuery
        with akka.persistence.query.javadsl.CurrentPersistenceIdsQuery {
    
      override def eventsByTag(tag: String, offset: Offset = Sequence(0L)): javadsl.Source[EventEnvelope, NotUsed] =
        scaladslReadJournal.eventsByTag(tag, offset).asJava
    
      override def eventsByPersistenceId(
          persistenceId: String,
          fromSequenceNr: Long = 0L,
          toSequenceNr: Long = Long.MaxValue): javadsl.Source[EventEnvelope, NotUsed] =
        scaladslReadJournal.eventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr).asJava
    
      override def persistenceIds(): javadsl.Source[String, NotUsed] =
        scaladslReadJournal.persistenceIds().asJava
    
      override def currentPersistenceIds(): javadsl.Source[String, NotUsed] =
        scaladslReadJournal.currentPersistenceIds().asJava
    
      // possibility to add more plugin specific queries
    
      def byTagsWithMeta(tags: java.util.Set[String]): javadsl.Source[RichEvent, QueryMetadata] = {
        import akka.util.ccompat.JavaConverters._
        scaladslReadJournal.byTagsWithMeta(tags.asScala.toSet).asJava
      }
    }
    
    class MyEventsByTagSource(tag: String, offset: Long, refreshInterval: FiniteDuration)
        extends GraphStage[SourceShape[EventEnvelope]] {
    
      private case object Continue
      val out: Outlet[EventEnvelope] = Outlet("MyEventByTagSource.out")
      override def shape: SourceShape[EventEnvelope] = SourceShape(out)
    
      override protected def initialAttributes: Attributes = Attributes(ActorAttributes.IODispatcher)
    
      override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
        new TimerGraphStageLogic(shape) with OutHandler {
          lazy val system = materializer.system
          private val Limit = 1000
          private val connection: java.sql.Connection = ???
          private var currentOffset = offset
          private var buf = Vector.empty[EventEnvelope]
          private val serialization = SerializationExtension(system)
    
          override def preStart(): Unit = {
            scheduleWithFixedDelay(Continue, refreshInterval, refreshInterval)
          }
    
          override def onPull(): Unit = {
            query()
            tryPush()
          }
    
          override def onDownstreamFinish(): Unit = {
            // close connection if responsible for doing so
          }
    
          private def query(): Unit = {
            if (buf.isEmpty) {
              try {
                buf = Select.run(tag, currentOffset, Limit)
              } catch {
                case NonFatal(e) =>
                  failStage(e)
              }
            }
          }
    
          private def tryPush(): Unit = {
            if (buf.nonEmpty && isAvailable(out)) {
              push(out, buf.head)
              buf = buf.tail
            }
          }
    
          override protected def onTimer(timerKey: Any): Unit = timerKey match {
            case Continue =>
              query()
              tryPush()
          }
    
          object Select {
            private def statement() =
              connection.prepareStatement("""
                SELECT id, persistence_id, seq_nr, serializer_id, serializer_manifest, payload
                FROM journal WHERE tag = ? AND id > ?
                ORDER BY id LIMIT ?
          """)
    
            def run(tag: String, from: Long, limit: Int): Vector[EventEnvelope] = {
              val s = statement()
              try {
                s.setString(1, tag)
                s.setLong(2, from)
                s.setLong(3, limit)
                val rs = s.executeQuery()
    
                val b = Vector.newBuilder[EventEnvelope]
                while (rs.next()) {
                  val deserialized = serialization
                    .deserialize(rs.getBytes("payload"), rs.getInt("serializer_id"), rs.getString("serializer_manifest"))
                    .get
                  currentOffset = rs.getLong("id")
                  b += EventEnvelope(
                    Offset.sequence(currentOffset),
                    rs.getString("persistence_id"),
                    rs.getLong("seq_nr"),
                    deserialized)
                }
                b.result()
              } finally s.close()
            }
          }
        }
    }

    擴展

    關於這部分,請參考Lagom框架。Lagom是Lightbend開發的一個微服務框架,裏面涉及了ES和CQRS的大量實現,裏面已經有成熟的方案,能夠藉助Cluster Sharding實現有效擴展。

    Lagom: The opinionated microservices framework for moving away from the monolith.
    Lagom helps you decompose your legacy monolith and build, test, and deploy entire systems of Reactive microservices.

    LevelDB實現Persistence Query的示例

    📌 LevelDB是Google推出的一款Key-Value類型的NoSQL本地數據庫(暫不支持網絡)。本示例用LevelDB演示瞭如何用綠黑藍做Tag,而後進行Persistence Query。

    LevelDB is a fast key-value storage library written at Google that provides an ordered mapping from string keys to string values.

    import akka.NotUsed
    import akka.testkit.AkkaSpec
    import akka.persistence.query.{ EventEnvelope, PersistenceQuery, Sequence }
    import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
    import akka.stream.scaladsl.Source
    
    object LeveldbPersistenceQueryDocSpec {
      //#tagger
      import akka.persistence.journal.WriteEventAdapter
      import akka.persistence.journal.Tagged
    
      class MyTaggingEventAdapter extends WriteEventAdapter {
        val colors = Set("green", "black", "blue")
        override def toJournal(event: Any): Any = event match {
          case s: String =>
            var tags = colors.foldLeft(Set.empty[String]) { (acc, c) =>
              if (s.contains(c)) acc + c else acc
            }
            if (tags.isEmpty) event
            else Tagged(event, tags)
          case _ => event
        }
    
        override def manifest(event: Any): String = ""
      }
      //#tagger
    }
    
    class LeveldbPersistenceQueryDocSpec(config: String) extends AkkaSpec(config) {
    
      def this() = this("")
    
      "LeveldbPersistentQuery" must {
        "demonstrate how get ReadJournal" in {
          //#get-read-journal
          import akka.persistence.query.PersistenceQuery
          import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
    
          val queries = PersistenceQuery(system).readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
          //#get-read-journal
        }
    
        "demonstrate EventsByPersistenceId" in {
          //#EventsByPersistenceId
          val queries = PersistenceQuery(system).readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
    
          val src: Source[EventEnvelope, NotUsed] =
            queries.eventsByPersistenceId("some-persistence-id", 0L, Long.MaxValue)
    
          val events: Source[Any, NotUsed] = src.map(_.event)
          //#EventsByPersistenceId
        }
    
        "demonstrate AllPersistenceIds" in {
          //#AllPersistenceIds
          val queries = PersistenceQuery(system).readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
    
          val src: Source[String, NotUsed] = queries.persistenceIds()
          //#AllPersistenceIds
        }
    
        "demonstrate EventsByTag" in {
          //#EventsByTag
          val queries = PersistenceQuery(system).readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
    
          val src: Source[EventEnvelope, NotUsed] =
            queries.eventsByTag(tag = "green", offset = Sequence(0L))
          //#EventsByTag
        }
      }
    }

    相應的配置以下:

    # Configuration for the LeveldbReadJournal
    akka.persistence.query.journal.leveldb {
      # Implementation class of the LevelDB ReadJournalProvider
      class = "akka.persistence.query.journal.leveldb.LeveldbReadJournalProvider"
      
      # Absolute path to the write journal plugin configuration entry that this
      # query journal will connect to. That must be a LeveldbJournal or SharedLeveldbJournal.
      # If undefined (or "") it will connect to the default journal as specified by the
      # akka.persistence.journal.plugin property.
      write-plugin = ""
      
      # The LevelDB write journal is notifying the query side as soon as things
      # are persisted, but for efficiency reasons the query side retrieves the events
      # in batches that sometimes can be delayed up to the configured `refresh-interval`.
      refresh-interval = 3s
      
      # How many events to fetch in one query (replay) and keep buffered until they
      # are delivered downstreams.
      max-buffer-size = 100
    }

    持久化插件 Persistence Plugins

    持久化插件Persistence Plugins爲數據庫存儲事件和快照提供支持,要注意與查詢插件Query Plugins區別。由Akka團隊負責維護的持久化插件包括:

    • akka-persistence-cassandra
    • akka-persistence-couchbase
    • akka-persistence-jdbc

    在Persistent Actor沒有覆寫journalPluginId和snapshotPluginId的狀況下,Akka將使用在reference.conf中akka.persistence.journal.pluginakka.persistence.snapshot-store.plugin中配置的默認日記和快照存儲插件。若配置留空,則須要在application.conf中明確指定。

    若是須要早晚加載持久化插件,則需按以下配置:

    akka {
      extensions = [akka.persistence.Persistence]
    
      persistence {
        journal {
          plugin = "akka.persistence.journal.leveldb"
          auto-start-journals = ["akka.persistence.journal.leveldb"]
        }
    
        snapshot-store {
          plugin = "akka.persistence.snapshot-store.local"
          auto-start-snapshot-stores = ["akka.persistence.snapshot-store.local"]
        }
      }
    }

    LevelDB Plugin使用示例

    配置文件指定啓用LevelDB Plugin做爲Persistence Plugin,並指定數據庫文件存放位置(默認是當前工做目錄下的journal文件夾,快照是snapshots文件夾):

    # Path to the journal plugin to be used
    akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
    akka.persistence.journal.leveldb.dir = "target/journal"
    
    # Path to the snapshot store plugin to be used
    akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
    akka.persistence.snapshot-store.local.dir = "target/snapshots"

    Gradle包管理加入LevelDB Plugin

    dependencies {
      implementation org.fusesource.leveldbjni:leveldbjni-all:1.8
    }

    設定LevelDB的持久化參數,此處主要是設定每到哪一個id,就經過刪除事件而保留快照的方式(並不是真正刪除,而是給事件打上邏輯標誌,使之成爲「墓碑」),實現壓縮數據庫的功能:

    # Number of deleted messages per persistence id that will trigger journal compaction
    akka.persistence.journal.leveldb.compaction-intervals {
      persistence-id-1 = 100
      persistence-id-2 = 200
      # ...
      persistence-id-N = 1000
      # use wildcards to match unspecified persistence ids, if any
      "*" = 250
    }
    僅供測試的共享LevelDB

    Akka內置了用於測試的可共享的LevelDB實例,啓用配置以下:

    akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
    akka.persistence.journal.leveldb-shared.store.dir = "target/shared"

    在使用前,還必須用SharedLeveldbJournal.setStore注入一個Actor完成初始化:

    import akka.persistence.journal.leveldb.SharedLeveldbStore
    
    trait SharedStoreUsage extends Actor {
      override def preStart(): Unit = {
        context.actorSelection("akka://example@127.0.0.1:2552/user/store") ! Identify(1)
      }
    
      def receive = {
        case ActorIdentity(1, Some(store)) =>
          SharedLeveldbJournal.setStore(store, context.system)
      }
    }
    
    // 而後就能正常使用了
    val store = system.actorOf(Props[SharedLeveldbStore], "store")
    僅供測試的持久化插件代理

    Akka還內置了用於測試的Persistence Plugin Proxy,它經過定向轉發整合若干個Journal,從而讓多個Actor共享底層的持久化支持,在一個Journal節點崩潰後,也能經過其災備節點繼續爲Actor提供事件存儲和快照支持。代理的啓動,則能夠經過實例化PersistencePluginProxyExtension擴展或調用PersistencePluginProxy.start方法來完成。

    # 配置信息需放入相應配置塊
    akka.persistence.journal.proxy { ... }
    akka.persistence.snapshot-store.proxy { ... }
    
    # 指定底層的Journal
    target-journal-plugin = akka.persistence.journal.leveldb
    target-snapshot-store-plugin = ...
    
    # 指定用於初始化代理的ActorSystem
    start-target-journal = xxx.xxx.myActor
    start-target-snapshot-store = ...
    
    ## 指定Actor的位置(也能夠在初始化代碼中調用PersistencePluginProxy.setTargetLocation指定)
    target-journal-address =
    target-snapshot-store-address =

    建造持久化後端

    爲自定義持久化事件的後端數據庫支持,Akka Persistence公開了一組API。

    🏭
    import akka.persistence._
    import akka.persistence.journal._
    import akka.persistence.snapshot._

    Journal插件API:AsyncWriteJournal

    🔗 AsyncWriteJournal本質也是一個Actor,公開的方法只有如下3個:

    /**
       * Plugin API: asynchronously writes a batch (`Seq`) of persistent messages to the
       * journal.
       *
       * The batch is only for performance reasons, i.e. all messages don't have to be written
       * atomically. Higher throughput can typically be achieved by using batch inserts of many
       * records compared to inserting records one-by-one, but this aspect depends on the
       * underlying data store and a journal implementation can implement it as efficient as
       * possible. Journals should aim to persist events in-order for a given `persistenceId`
       * as otherwise in case of a failure, the persistent state may be end up being inconsistent.
       *
       * Each `AtomicWrite` message contains the single `PersistentRepr` that corresponds to
       * the event that was passed to the `persist` method of the `PersistentActor`, or it
       * contains several `PersistentRepr` that corresponds to the events that were passed
       * to the `persistAll` method of the `PersistentActor`. All `PersistentRepr` of the
       * `AtomicWrite` must be written to the data store atomically, i.e. all or none must
       * be stored. If the journal (data store) cannot support atomic writes of multiple
       * events it should reject such writes with a `Try` `Failure` with an
       * `UnsupportedOperationException` describing the issue. This limitation should
       * also be documented by the journal plugin.
       *
       * If there are failures when storing any of the messages in the batch the returned
       * `Future` must be completed with failure. The `Future` must only be completed with
       * success when all messages in the batch have been confirmed to be stored successfully,
       * i.e. they will be readable, and visible, in a subsequent replay. If there is
       * uncertainty about if the messages were stored or not the `Future` must be completed
       * with failure.
       *
       * Data store connection problems must be signaled by completing the `Future` with
       * failure.
       *
       * The journal can also signal that it rejects individual messages (`AtomicWrite`) by
       * the returned `immutable.Seq[Try[Unit]]`. It is possible but not mandatory to reduce
       * number of allocations by returning `Future.successful(Nil)` for the happy path,
       * i.e. when no messages are rejected. Otherwise the returned `Seq` must have as many elements
       * as the input `messages` `Seq`. Each `Try` element signals if the corresponding
       * `AtomicWrite` is rejected or not, with an exception describing the problem. Rejecting
       * a message means it was not stored, i.e. it must not be included in a later replay.
       * Rejecting a message is typically done before attempting to store it, e.g. because of
       * serialization error.
       *
       * Data store connection problems must not be signaled as rejections.
       *
       * It is possible but not mandatory to reduce number of allocations by returning
       * `Future.successful(Nil)` for the happy path, i.e. when no messages are rejected.
       *
       * Calls to this method are serialized by the enclosing journal actor. If you spawn
       * work in asynchronous tasks it is alright that they complete the futures in any order,
       * but the actual writes for a specific persistenceId should be serialized to avoid
       * issues such as events of a later write are visible to consumers (query side, or replay)
       * before the events of an earlier write are visible.
       * A PersistentActor will not send a new WriteMessages request before the previous one
       * has been completed.
       *
       * Please note that the `sender` field of the contained PersistentRepr objects has been
       * nulled out (i.e. set to `ActorRef.noSender`) in order to not use space in the journal
       * for a sender reference that will likely be obsolete during replay.
       *
       * Please also note that requests for the highest sequence number may be made concurrently
       * to this call executing for the same `persistenceId`, in particular it is possible that
       * a restarting actor tries to recover before its outstanding writes have completed. In
       * the latter case it is highly desirable to defer reading the highest sequence number
       * until all outstanding writes have completed, otherwise the PersistentActor may reuse
       * sequence numbers.
       *
       * This call is protected with a circuit-breaker.
       */
      def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]]
    
      /**
       * Plugin API: asynchronously deletes all persistent messages up to `toSequenceNr`
       * (inclusive).
       *
       * This call is protected with a circuit-breaker.
       * Message deletion doesn't affect the highest sequence number of messages,
       * journal must maintain the highest sequence number and never decrease it.
       */
      def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit]
    
      /**
       * Plugin API
       *
       * Allows plugin implementers to use `f pipeTo self` and
       * handle additional messages for implementing advanced features
       *
       */
      def receivePluginInternal: Actor.Receive = Actor.emptyBehavior

    若是想讓Journal只支持同步寫入,那麼按以下方式阻塞掉異步的寫入便可:

    def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] =
      Future.fromTry(Try {
        // blocking call here
        ???
      })

    Journal還必須實現AsyncRecovery中定義的用於重塑和序列號恢復的方法:

    /**
       * Plugin API: asynchronously replays persistent messages. Implementations replay
       * a message by calling `replayCallback`. The returned future must be completed
       * when all messages (matching the sequence number bounds) have been replayed.
       * The future must be completed with a failure if any of the persistent messages
       * could not be replayed.
       *
       * The `replayCallback` must also be called with messages that have been marked
       * as deleted. In this case a replayed message's `deleted` method must return
       * `true`.
       *
       * The `toSequenceNr` is the lowest of what was returned by [[#asyncReadHighestSequenceNr]]
       * and what the user specified as recovery [[akka.persistence.Recovery]] parameter.
       * This does imply that this call is always preceded by reading the highest sequence
       * number for the given `persistenceId`.
       *
       * This call is NOT protected with a circuit-breaker because it may take long time
       * to replay all events. The plugin implementation itself must protect against
       * an unresponsive backend store and make sure that the returned Future is
       * completed with success or failure within reasonable time. It is not allowed
       * to ignore completing the future.
       *
       * @param persistenceId persistent actor id.
       * @param fromSequenceNr sequence number where replay should start (inclusive).
       * @param toSequenceNr sequence number where replay should end (inclusive).
       * @param max maximum number of messages to be replayed.
       * @param recoveryCallback called to replay a single message. Can be called from any
       *                       thread.
       *
       * @see [[AsyncWriteJournal]]
       */
      def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(
          recoveryCallback: PersistentRepr => Unit): Future[Unit]
    
    
      /**
       * Plugin API: asynchronously reads the highest stored sequence number for the
       * given `persistenceId`. The persistent actor will use the highest sequence
       * number after recovery as the starting point when persisting new events.
       * This sequence number is also used as `toSequenceNr` in subsequent call
       * to [[#asyncReplayMessages]] unless the user has specified a lower `toSequenceNr`.
       * Journal must maintain the highest sequence number and never decrease it.
       *
       * This call is protected with a circuit-breaker.
       *
       * Please also note that requests for the highest sequence number may be made concurrently
       * to writes executing for the same `persistenceId`, in particular it is possible that
       * a restarting actor tries to recover before its outstanding writes have completed.
       *
       * @param persistenceId persistent actor id.
       * @param fromSequenceNr hint where to start searching for the highest sequence
       *                       number. When a persistent actor is recovering this
       *                       `fromSequenceNr` will be the sequence number of the used
       *                       snapshot or `0L` if no snapshot is used.
       */
      def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long]

    編碼完成後,經過配置便可啓用Journal,但切記不能在默認Dispatcher上執行Journal的任務或者Future,不然會形成其餘Actor陷入飢餓:

    # Path to the journal plugin to be used
    akka.persistence.journal.plugin = "my-journal"
    
    # My custom journal plugin
    my-journal {
      # Class name of the plugin.
      class = "docs.persistence.MyJournal"
      # Dispatcher for the plugin actor.
      plugin-dispatcher = "akka.actor.default-dispatcher"
    }

    Snapshot插件API:SnapshotStore

    🔗 SnapshotStore也是一個Actor:

    /**
       * Plugin API: asynchronously loads a snapshot.
       *
       * If the future `Option` is `None` then all events will be replayed,
       * i.e. there was no snapshot. If snapshot could not be loaded the `Future`
       * should be completed with failure. That is important because events may
       * have been deleted and just replaying the events might not result in a valid
       * state.
       *
       * This call is protected with a circuit-breaker.
       *
       * @param persistenceId id of the persistent actor.
       * @param criteria selection criteria for loading.
       */
      def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]]
    
    
      /**
       * Plugin API: asynchronously saves a snapshot.
       *
       * This call is protected with a circuit-breaker.
       *
       * @param metadata snapshot metadata.
       * @param snapshot snapshot.
       */
      def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit]
    
    
      /**
       * Plugin API: deletes the snapshot identified by `metadata`.
       *
       * This call is protected with a circuit-breaker.
       *
       * @param metadata snapshot metadata.
       */
      def deleteAsync(metadata: SnapshotMetadata): Future[Unit]
    
    
      /**
       * Plugin API: deletes all snapshots matching `criteria`.
       *
       * This call is protected with a circuit-breaker.
       *
       * @param persistenceId id of the persistent actor.
       * @param criteria selection criteria for deleting.
       */
      def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit]
    
    
      /**
       * Plugin API
       * Allows plugin implementers to use `f pipeTo self` and
       * handle additional messages for implementing advanced features
       */
      def receivePluginInternal: Actor.Receive = Actor.emptyBehavior

    相似Journal,編碼完成後經過配置便可啓用Snapshot插件:

    # Path to the snapshot store plugin to be used
    akka.persistence.snapshot-store.plugin = "my-snapshot-store"
    
    # My custom snapshot store plugin
    my-snapshot-store {
      # Class name of the plugin.
      class = "docs.persistence.MySnapshotStore"
      # Dispatcher for the plugin actor.
      plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
    }

    插件開發輔助工具 TCK

    Akka開發了TCK(Technology Compatibility Kit),用於插件的測試。

    🏭 com.typesafe.akka:akka-persistence-tck_2.12:2.6.6

    如下分別是Journal與Snapshot必備的測試。若是插件須要一些額外的設置,好比啓動模擬數據庫、刪除臨時文件等,那麼能夠覆寫beforeAll和afterAll方法:

    class MyJournalSpec
        extends JournalSpec(
          config = ConfigFactory.parseString("""akka.persistence.journal.plugin = "my.journal.plugin"""")) {
    
      override def supportsRejectingNonSerializableObjects: CapabilityFlag =
        false // or CapabilityFlag.off
    
      override def supportsSerialization: CapabilityFlag =
        true // or CapabilityFlag.on
    }
    
    class MySnapshotStoreSpec
        extends SnapshotStoreSpec(
          config = ConfigFactory.parseString("""
        akka.persistence.snapshot-store.plugin = "my.snapshot-store.plugin"
        """)) {
    
      override def supportsSerialization: CapabilityFlag =
        true // or CapabilityFlag.on
    }

    損壞的事件日誌

    若是沒法阻止用戶同時運行具備相同persistenceId的Actor,則事件日誌Log可能會因具備相同序列號的事件而被破壞。建議Journal在重塑過程當中仍繼續傳遞這些事件,同時使用reply-filter來決定如何處理。


    ➡️ 其餘

    打包

    使用Gradle打包,主要藉助其Java插件的Jar任務來完成。爲了保證多個reference.conf正確合併,推薦使用Gradle插件🔗 Shadow plugin,而後在build.gradle裏這樣寫:

    import com.github.jengelman.gradle.plugins.shadow.transformers.AppendingTransformer
    
    plugins {
        id 'java'
        id "com.github.johnrengelman.shadow" version "5.0.0"
    }
    
    shadowJar {
        transform(AppendingTransformer) {
            resource = 'reference.conf'
        }
        with jar
    }

    以Docker包的形式發佈

    在Docker容器中,能夠同時使用Akka Remoting和Akka Cluster,但要注意配置好網絡(🔗 Akka behind NAT or in a Docker container),並適當調整可用的CPU、內存等資源。

    書籍與視頻

    🔗 https://doc.akka.io/docs/akka/current/additional/books.html

    Akka API

    🔗 https://doc.akka.io/api/akka/2.6/index.html

    相關文章
    相關標籤/搜索