Lagom是出品Akka的Lightbend公司推出的一個微服務框架,目前最新版本爲1.6.2。Lagom一詞出自瑞典語,意爲「適量」。html
🔗 https://www.lagomframework.com/documentation/1.6.x/scala/Home.htmljava
Lagom框架堅持,微服務是按服務邊界Boundary將系統切分爲若干個組成部分的結果,這意味着要使它們與限界上下文Bounded Context、業務功能和模塊隔離等要求保持一致,才能達到可伸縮性和彈性要求,從而易於部署和管理。所以,在設計微服務時應考慮大小是否「Lagom」,而非是否足夠「Micro」。react
Lagom框架大量使用了Jonas Bonér所著Reactive Microservices Architecture: Design Principles For Distributed Systems一書的設計理念和思想,因此推薦在使用Lagom以前先閱讀此書。git
「船大好頂浪,船小好調頭」——Jonas認爲,將龐大的系統分割爲若干獨立的更小粒度的部分,同時將管理權限適當下放,可使這些獨立的部分更快地作出決斷,以適應外部環境的不斷變化。github
Lagom開發環境要求:web
Lagom提供了Giter8模板,方便利用sbt構造一個Hello World項目結構,確保在開發前驗證生成工具和項目已正確配置。Hello World包括Hello與Stream兩個微服務,每一個微服務包括API與實現兩個子項目。Lagom會自動配置諸如持久化、服務定位等基礎設施,而且支持發現並加載微服務的熱更新。算法
sbt new lagom/lagom-scala.g8 hello → Project root └ hello-api → hello api project └ hello-impl → hello implementation project └ hello-stream-api → hello-stream api project └ hello-stream-impl → hello-stream implementation project └ project → sbt configuration files └ build.properties → Marker for sbt project └ plugins.sbt → sbt plugins including the declaration for Lagom itself └ build.sbt → Your project build file
使用sbt裏的runAll一次性啓動全部服務後(包括Cassandra、Kafka,如果手動則須要本身一個個啓動包括基礎服務在內的全部服務),在http://localhost:9000/api/hello/World
處將獲得顯示了一行「Hello World」的頁面。docker
Service
派生,聲明服務的API以及服務的回覆消息。Service.descriptor
方法,該方法返回一個Descriptor
,用於定義服務的名稱、REST端點路徑、Kafka消息主題,以及REST路徑與服務API方法的映射關係,等等。HelloWorldBehavior
採用Ask模式進行通訊,按entityRef(id).ask[Message](replyTo => msg(replyTo)).map(reply => ...)
的樣式進行ask與map配對。其中,entityRef是該Actor在集羣裏的引用:clusterSharding.entityRefFor(HelloWorldState.typeKey, id)
。hello(id)
實際是委託useGreeting(id)
方法完成的,因此將World換做其餘內容亦可。使用sbt將服務綁定到特定網絡地址(默認是localhost):數據庫
lazy val biddingImpl = (project in file("biddingImpl")) .enablePlugins(LagomScala) .settings(lagomServiceAddress := "0.0.0.0")
即使是將服務部署到不一樣的物理主機上,服務的端口號也將保持先後一致(老是使用特定的某個端口號),該端口號基於如下算法:apache
一般狀況下,無需關心上述細節,由於不多會發生這樣的衝突。固然,也能夠按下列方式手動指定端口號:
lazy val usersImpl = (project in file("usersImpl")) .enablePlugins(LagomScala) .settings(lagomServiceHttpPort := 11000)
若是嫌默認的端口範圍[49152, 65535]不合適,能夠指定端口範圍,不過範圍越窄,發生衝突的機率就越大:
lagomServicesPortRange in ThisBuild := PortRange(40000, 45000)
開發模式,是指在sbt或者maven支持下,啓動和調試各類服務。在這種環境下,對代碼作出修改後,Lagom將負責後續的編譯和從新加載工做,相應的服務會自動重啓。
在sbt裏進行以下配置,將會使用一個自簽名的證書爲服務啓用HTTPS並指定其端口,確保服務之間能經過HTTPS進行調用,但同時服務網關Service Gateway將仍舊只能使用HTTP:
lagomServiceEnableSsl in ThisBuild := true lagomServiceHttpsPort := 20443
在客戶端,則建議使用諸如Play-WS或者Akka-HTTP Client API這樣的HTTPS Client框架。
服務定位子Service Locator,是確保用於發現其餘服務並與之聯繫的組件。Lagom內置的缺省Locator有如下特性:
localhost
,可以使用lagomServiceLocatorAddress in ThisBuild := "0.0.0.0"
修改之。lagomServiceLocatorPort in ThisBuild := 10000
修改之。lagomUnmanagedServices in ThisBuild := Map("weather" -> "http://localhost:3333")
,而後再使用。網關Gateway,至關於Service Locator的代理,用於防止對Locator的不當訪問。Lagom內置的缺省Gateway有如下特性:
localhost
,可以使用lagomServiceGatewayAddress in ThisBuild := "0.0.0.0"
修改之。lagomServiceGatewayPort in ThisBuild := 9010
修改之。lagomServiceGatewayImpl in ThisBuild := "netty"
指定之。在開發環境下,使用runAll時默認會啓動Locator與Gateway。須要手動啓停時,分別執行lagomServiceLocatorStart
和lagomServiceLocatorStop
任務便可。
如要須要禁用內置的Locater與Gateway,則使用lagomServiceLocatorEnabled in ThisBuild := false
禁用之。以後便須要本身提供一個Locator的實現,並須要牢記每一個服務的端口號以創建相互聯繫。
Cassandra是Apache提供的一個分佈式、可擴展的NoSQL數據庫。它以KeySpace爲單位,其中包含若干個表Table或者列族Column Family,每一個列族能夠有不一樣的列(至關於RDBMS中的字段)並可自由添加列,每一個行能夠擁有不一樣的列,並支持索引。Cassandra支持的數據類型除常見的原生類型外,爲List、Map和Set提供了直接支持,提供了TTL數據到期自動刪除功能,而且能夠自定義數據類型。
Lagom內置了一個Cassandra服務,做爲Event Sourcing的事件持久化平臺。
lagomCassandraPort in ThisBuild := 9042
指定之。lagomCassandraCleanOnStart in ThisBuild := true
使之在每次服務啓動時清空數據庫。dev-embedded-cassandra.yaml
對Cassandra進行配置,可使用lagomCassandraYamlFile in ThisBuild := Some((baseDirectory in ThisBuild).value / "project" / "cassandra.yaml")
另行指定特定YAML配置文件。lagomCassandraJvmOptions in ThisBuild := Seq("-Xms256m", "-Xmx1024m", "-Dcassandra.jmx.local.port=4099")
指定相應的JVM參數。lagomCassandraMaxBootWaitingTime in ThisBuild := 0.seconds
修改之。lagomCassandraStart
和lagomCassandraStop
手動啓停。lagomCassandraEnabled in ThisBuild := false
禁用之,再使用lagomUnmanagedServices in ThisBuild := Map("cas_native" -> "tcp://localhost:9042")
註冊本地Cassandra實例便可使用之。Kafka是Apache提供的一個分佈式的流處理平臺,簡單講能夠理解爲一個生產者-消費者結構的、集羣條件下的消息隊列Message Queue。每條消息流Stream以主題Topic做爲惟一區別,每一個Topic下能夠有多個相同的分區Partition,分區裏的消息都有一個Offset做爲序號以確保按順序被消費。Kafka依賴ZooKeeper提供的集羣功能部署其節點。
Lagom內置了一個Kafka服務:
lagomKafkaPort in ThisBuild := 10000
修改之lagomKafkaZookeeperPort in ThisBuild := 9999
修改之kafka-server.properties
配置Kafka運行參數,可使用lagomKafkaPropertiesFile in ThisBuild := Some((baseDirectory in ThisBuild).value / "project" / "kafka-server.properties")
另行指定配置文件。lagomKafkaJvmOptions in ThisBuild := Seq("-Xms256m", "-Xmx1024m")
指定相應的JVM參數。<your-project-root>/target/lagom-dynamic-projects/lagom-internal-meta-project-kafka/target/log4j_output
,而Kafka的提交日誌將保存在<your-project-root>/target/lagom-dynamic-projects/lagom-internal-meta-project-kafka/target/logs
。lagomKafkaStart
和lagomKafkaStop
手動啓停。lagomKafkaEnabled in ThisBuild := false
禁用之,再使用lagomKafkaAddress in ThisBuild := "localhost:10000"
指定Kafka實例便可使用之。若是是本地實例,用lagomKafkaPort in ThisBuild := 10000
指定端口便可。每個Lagom服務都由一個接口進行描述。該接口的內容不只包括接口方法的聲明和實現,同時還定義了接口的元數據如何被映射到底層的傳輸協議。
服務的每一個接口方法都要求返回一個ServiceCall[Request, Response]
,其中Request或Response能夠是Akka的NotUsed
:
trait ServiceCall[Request, Response] { def invoke(request: Request): Future[Response] }
每個Lagom服務在覆寫的descriptor中都將返回一個服務描述子Service Descriptor。如下即是聲明瞭一個叫做hello的服務,該服務提供了sayHello的API:
trait HelloService extends Service { def sayHello: ServiceCall[String, String] override def descriptor = { import Service._ named("hello").withCalls(call(sayHello)) } }
每一個服務調用都必須有一個惟一的標識符,以保證調用能最終映射到正確的API方法上。這個標識符能夠是靜態的一個字符串名稱,也能夠在運行時動態生成。默認狀況下,API方法的名稱即該調用的標識符。
使用namedCall指定強命名的標識符,服務hello裏的API方法sayHello的調用名爲hello,在REST架構下的相應路徑爲/hello
named("hello").withCalls(namedCall("hello", sayHello))
使用pathCall指定基於路徑的標識符,相似於字符串中用$
引導的內插值,此處用:
引導內插變量。Lagom爲此提供了一個隱式的PathParamSerializer
,用於從路徑中提取String
、Int
、Boolean
或者UUID
類型的內插變量。好比如下即是提取了路徑中類型爲long
的orderId和類型爲String
的itemId值,做爲參數傳遞給API方法。在做參數映射時,默認將按從路徑中從左至右提取的順序進行映射。
💀 這與ASP.NET MVC等一些HTTP框架採用的方法是相似的,因此要注意以構建RESTful應用的思路貫徹學習始終。
def getItem(orderId: Long, itemId: String): ServiceCall[NotUsed, Item] override def descriptor = { import Service._ named("orders").withCalls(pathCall("/order/:orderId/item/:itemId", getItem _)) }
提取查詢串中的參數時,則使用的?
起始、以&
分隔的形式:
def getItems(orderId: Long, pageNo: Int, pageSize: Int): ServiceCall[NotUsed, Seq[Item]] override def descriptor = { import Service._ named("orders").withCalls(pathCall("/order/:orderId/items?pageNo&pageSize", getItems _)) }
在REST架構下,Lagom會努力正確實現上述映射,而且在有Request消息時使用POST方法,不然使用GET方法。
REST標識符用於徹底REST形式的調用,和pathCall很是相似,區別只是REST標識符可指定HTTP調用方法:
def addItem(orderId: Long): ServiceCall[Item, NotUsed] def getItem(orderId: Long, itemId: String): ServiceCall[NotUsed, Item] def deleteItem(orderId: Long, itemId: String): ServiceCall[NotUsed, NotUsed] def descriptor = { import Service._ import com.lightbend.lagom.scaladsl.api.transport.Method named("orders").withCalls( restCall(Method.POST, "/order/:orderId/item", addItem _), restCall(Method.GET, "/order/:orderId/item/:itemId", getItem _), restCall(Method.DELETE, "/order/:orderId/item/:itemId", deleteItem _) ) }
每一個服務API都須要指定Request和Response的消息類型,能夠用akka.NotUsed
做爲佔位符,分爲兩種形式:
object
或者case class
表達的常見消息,它們將在內存的緩衝區中被序列化後進行傳輸。若是Request與Response均是這類嚴格消息,則調用將會是同步的,嚴格按請求-回覆的順序進行。Source
。Lagom將使用WebSocket協議進行流的傳輸。若是Request與Response均是流消息時,任何一方關閉時,WebSocket將徹底關閉。若是隻有一方是流消息,則嚴格消息仍按序列化方式進行傳輸,而WebSocket將始終保持打開狀態,直到另外一個方向關閉爲止。如下分別是單向流和雙向流消息的示例:
def tick(interval: Int): ServiceCall[String, Source[String, NotUsed]] def descriptor = { import Service._ named("clock").withCalls(pathCall("/tick/:interval", tick _)) } def sayHello: ServiceCall[Source[String, NotUsed], Source[String, NotUsed]] def descriptor = { import Service._ named("hello").withCalls(call(this.sayHello)) }
Lagom經過定義隱式的MessageSerializer
,爲call、namedCall、pathCall和restCall提供了消息的序列化支持。對String類型的消息和Play框架JSON格式的消息,Lagom提供了內置的序列化器。除此之外,能夠自定義序列化器。(參考:消息序列化器)
使用Play-JSON時,一般是用case class和companion object配合,case class定義消息的結構,companion object定義消息的格式:
case class User( id: Long, name: String, email: Option[String] ) object User { import play.api.libs.json._ implicit val format: Format[User] = Json.format[User] }
對應的JSON結果爲:
{ "id": 12345, "name": "John Smith", "email": "john.smith@example.org" }
屬性能夠是Option,這樣若是爲None,則Play-JSON在序列化時不會解析它、反序列化時不會生成該屬性。若是case class還內嵌了其餘case class,則被嵌入的case class也須要定義它的format。
服務的實現,即實現以前聲明的服務描述子trait。對服務API中聲明的每一個方法,使用ServiceCall的工廠方法apply,傳入一個Request => Future[Response]
,返回一個ServiceCall
。
(💀 注意:Lagom大量使用函數做爲返回值,從而充分發揮了FP組合高階函數的優點。)
class HelloServiceImpl extends HelloService { override def sayHello = ServiceCall { name => Future.successful(s"Hello $name!") } }
當消息不是普通的嚴格消息而是流消息時,須要使用Akka Stream來處理它。對應前面單向流的例子,它的實現以下:
override def tick(intervalMs: Int) = ServiceCall { tickMessage => Future.successful( Source .tick( // 消息被髮送前的延遲 intervalMs.milliseconds, // 消息發送的間隔 intervalMs.milliseconds, // 將被髮送的消息 tickMessage ) .mapMaterializedValue(_ => NotUsed) ) }
若是某些時候須要處理消息的頭部信息(一般是HTTP),那麼Lagom提供了ServiceCall的派生類ServerServiceCall做爲支持。ServerServiceCall將ServiceCall裏的Header單獨取出,提供了invokeWithHeaders方法,該方法第一個參數是RequestHeader,另外一個參數纔是Request自己,這樣直接將invoke委託給invokeWithHeaders,從而方便在函數體中使用handleRequestHeader
和handleResponseHeader
對頭部信息進行處理(儘管ServiceCall自己也支持這2個處理函數)。
override def sayHello = ServerServiceCall { (requestHeader, name) => val user = requestHeader.principal .map(_.getName) .getOrElse("No one") val response = s"$user wants to say hello to $name" val responseHeader = ResponseHeader.Ok.withHeader("Server", "Hello service") Future.successful((responseHeader, response)) }
ServerServiceCall工廠方法有一個版本能夠同時處理Request與Response的頭部信息,但也有不帶處理頭部信息的版本。後者雖然看起來與ServiceCall沒什麼區別,從而顯得畫蛇添足,但實際這是爲知足組合服務調用的需求而存在的。
組合服務調用,相似於把ServerServiceCall經過依賴注入傳遞給須要包裹在外層的日誌、權限、過濾等切面服務,從而實現AOP切入到核心的服務API調用上。
在AOP切入的實現上,Lagom採起了組合高階函數的方法。這就象是抹了一層又一層奶油的生日蛋糕,只有切開了才能看到最裏層真正想要吃到的蛋糕。相比使用共享的線程變量等方法,不只經過類型系統發揮了編譯檢查的優點而更加安全,而且還經過構造表達式樹而提供了延遲計算的功能。
// AOP: Log def logged[Request, Response](serviceCall: ServerServiceCall[Request, Response]) = ServerServiceCall.compose { requestHeader => println(s"Received ${requestHeader.method} ${requestHeader.uri}") serviceCall } override def sayHello = logged(ServerServiceCall { name => Future.successful(s"Hello $name!") }) // AOP: Authentication trait UserStorage { def lookupUser(username: String): Future[Option[User]] } def authenticated[Request, Response](serviceCall: User => ServerServiceCall[Request, Response]) = { // composeAsync容許異步地返回要調用的服務API ServerServiceCall.composeAsync { requestHeader => // First lookup user val userLookup = requestHeader.principal .map(principal => userStorage.lookupUser(principal.getName)) .getOrElse(Future.successful(None)) // Then, if it exists, apply it to the service call userLookup.map { case Some(user) => serviceCall(user) case None => throw Forbidden("User must be authenticated to access this service call") } } } override def sayHello = authenticated { user => ServerServiceCall { name => // 注意:由於閉包,此處可訪問經AOP切入帶來的user Future.successful(s"$user is saying hello to $name") } }
依賴注入,Dependency Injection,是把本應由服務承擔的建立其自身依賴的責任移交給外部的框架,改由DI框架負責生產對象並維護彼此的關聯,最終造成完整的對象圖(Object Graph),從而達到公示服務所需依賴、消滅代碼中硬編碼的new操做、下降耦合度的目的。
在Scala語言裏最多見的一個DI模式,是用trait的Self Type特性實現的蛋糕模式(Thin Cake Pattern)。
更復雜的示例,請參見 🔗 Real-World Scala: Dependency Injection
trait Stuffing { val stuffing: String } // 使用Self Type特性,限定繼承了Cake的子類,必須同時也繼承了Stuffing // 此處Stuffing還能夠with其餘trait,從而實現多重注入 trait Cake { this: Stuffing => def flavour: String = this.stuffing } object LemonCake extends Cake with Stuffing { ... }
在FP的世界,實現DI的另外一個選擇是Reader Monad。
case class Reader[R, A](run: R => A) { def map[B](f: A => B): Reader[R, B] = Reader(r => f(run(r))) def flatMap[B](f: A => Reader[R, B]): Reader[R, B] = Reader(r => f(run(r)).run(r)) } def balance(accountNo: String) = Reader((repo: AccountRepository) => repo.balance(accountNo))
🔗 Dependency Injection in Scala using MacWire
🔗 MacWire使用指南中涉及的參考知識:軌道交通編組站
Lagom使用了MacWire做爲默認的DI框架(固然也能夠換Spring之類的其餘DI框架)。
MacWire主要使用宏wire[T]
進行DI,它會從標註了@Inject
的構造子、非私有的主要構造子、Companion Object裏的apply()
方法裏查找依賴關係。而後再根據依賴項的類型,按如下順序查找匹配的參數項:
使用MacWire相關事項:
若是是隱式參數,MacWire將會跳過它,使它仍按Scala語法關於隱式參數的方法進行處理。
只要類型匹配,此處的值能夠是val
、lazy val
或者用def
定義的無參函數。
須要根據條件使用不一樣的依賴項實現時,用相似lazy val component = if (condition) then wire[implementationA] else wire[implementationB]
的方法,定義好component
的值便可。
須要在依賴項上切入攔截器Interceptor時,先在依賴項定義處聲明一個攔截器def logInterceptor : Interceptor
,再用lazy val component = logInterceptor(wire[Component])
綁定攔截聲明,最後在使用依賴項進行實現的地方用lazy val logInterceptor = { ... }
給出具體實現便可。
trait ShuntingModule { lazy val pointSwitcher: PointSwitcher = logEvents(wire[PointSwitcher]) lazy val trainCarCoupler: TrainCarCoupler = logEvents(wire[TrainCarCoupler]) lazy val trainShunter = wire[TrainShunter] def logEvents: Interceptor } object TrainStation extends App { val modules = new ShuntingModule with LoadingModule with StationModule { lazy val logEvents = ProxyingInterceptor { ctx => println("Calling method: " + ctx.method.getName()) ctx.proceed() } } modules.trainStation.prepareAndDispatchNextTrain() }
默認狀況下,使用lazy val component = wire[Component]
聲明在當前範圍內惟一的依賴項(Singleton)。若是須要在每次調用時都建立新的依賴項(Dependent),則換做def
便可。
除Singleton和Dependent兩種生命週期外,MacWire還支持其餘形式的生命週期,方法相似使用攔截器。
trait StationModule extends ShuntingModule with LoadingModule { lazy val trainDispatch: TrainDispatch = session(wire[TrainDispatch]) lazy val trainStation: TrainStation = wire[TrainStation] def session: Scope } object TrainStation extends App { val modules = new ShuntingModule with LoadingModule with StationModule { lazy val session = new ThreadLocalScope } // implement a filter which attaches the session to the scope // use the filter in the server modules.trainStation.prepareAndDispatchNextTrain() }
對須要參數進行構造的依賴項,使用lazy val component = (parameters) => wire[Component]
這樣的工廠方法進行聲明,展開後將變成new Component(parameters)
。對在trait裏聲明的依賴項,則改用def component(parameters: Parameter) = wire[Component]
的方式定義工廠方法。
在具體使用參數化的依賴項時,使用wireWith(real_parameter)代入實際參數。
須要區別同一類依賴項的不一樣實例時,能夠用trait做爲標記tag。一種方法是用new Component(...) with tag
細化其子類型,另外一種是在聲明依賴值的類型後面用@@ tag
做標記。
trait Regular trait Liquid class TrainStation( trainShunter: TrainShunter, regularTrainLoader: TrainLoader with Regular, liquidTrainLoader: TrainLoader with Liquid, trainDispatch: TrainDispatch) { ... } lazy val regularTrainLoader = new TrainLoader(...) with Regular lazy val liquidTrainLoader = new TrainLoader(...) with Liquid lazy val trainStation = wire[TrainStation] /////////////////////////////////////// class TrainStation( trainShunter: TrainShunter, regularTrainLoader: TrainLoader @@ Regular, liquidTrainLoader: TrainLoader @@ Liquid, trainDispatch: TrainDispatch) { ... } lazy val regularTrainLoader = wire[TrainLoader].taggedWith[Regular] lazy val liquidTrainLoader = wire[TrainLoader].taggedWith[Liquid] lazy val trainStation = wire[TrainStation]
在Lagom中使用MacWire進行DI是可行的,但前述的服務定位子基本上只能知足開發環境的需求,生產環境仍是要換用Akka Discovery Service Locator之類更強大的定位子框架。
在完成組件的拼裝後,還須要一個啓動子啓動應用程序。Lagom爲此提供了Play框架ApplicationLoader
的簡化版本LagomApplicationLoader
。其中的loadDevMode
與load
是必須的,前者用with LagomDevModeComponents
加載Lagom內嵌的服務定位子,用於開發環境;後者能夠指定生產環境下的服務定位子,若是返回NoServiceLoader
將意味着不提供服務定位。而describeService
是可選的,它主要用於聲明服務的API,爲外部管理框架或者腳本語言提供服務的Meta信息,以方便進行動態配置。服務無數據Metadata,又稱爲ServiceInfo,主要包括服務的名稱以及一個訪問控制列表ACL。一般這些Metadata會由框架自動生成,前提是在使用withCalls
聲明描述子時,用withAutoAcl(true)
激活便可。
import com.lightbend.lagom.scaladsl.server._ import com.lightbend.lagom.scaladsl.api.ServiceLocator import com.lightbend.lagom.scaladsl.devmode.LagomDevModeComponents class HelloApplicationLoader extends LagomApplicationLoader { override def loadDevMode(context: LagomApplicationContext) = new HelloApplication(context) with LagomDevModeComponents override def load(context: LagomApplicationContext) = new HelloApplication(context) { override def serviceLocator = ServiceLocator.NoServiceLocator } override def describeService = Some(readDescriptor[HelloService]) }
最後,只需在配置application.conf裏用play.application.loader = com.example.HelloApplicationLoader
指定啓動子,即完成了應用程序的裝配。
Lagom根據用途不一樣,一般將組件分爲若干類型:
🔗 詳細列表:https://www.lagomframework.com/documentation/1.6.x/scala/ScalaComponents.html
定義並實現服務後,該服務便可被其餘服務或其餘類型的客戶消費使用。Lagom將根據服務描述子的內容,經過使用ServiceClient
的implement
宏進行綁定,自動生成一個調用該服務的框架,而後就能夠象調用本地對象的方法同樣invoke
服務提供的API了。
// 先綁定HelloService abstract class MyApplication(context: LagomApplicationContext) extends LagomApplication(context) with AhcWSComponents { lazy val helloService = serviceClient.implement[HelloService] } // 而後在另外一個服務MyService裏消費HelloService class MyServiceImpl(helloService: HelloService)(implicit ec: ExecutionContext) extends MyService { override def sayHelloLagom = ServiceCall { _ => val result: Future[String] = helloService.sayHello.invoke("Lagom") result.map { response => s"Hello service said: $response" } } }
當服務使用了流消息時,Lagom將在消費端使用帶有最大幀長度參數的WebSocket進行通訊。該參數定義了能夠發送的消息的最大尺寸,能夠在application.conf中配置。
#This configures the websocket clients used by this service. #This is a global configuration and it is currently not possible to provide different configurations if multiple websocket services are consumed. lagom.client.websocket { #This parameter limits the allowed maximum size for the messages flowing through the WebSocket. A similar limit exists on the server side, see: #https://www.playframework.com/documentation/2.6.x/ScalaWebSockets#Configuring-WebSocket-Frame-Length frame.maxLength = 65536 }
斷路器Circuit Breaker,如同電路保險絲,能夠在服務崩潰時迅速熔斷,從而避免產生更大面積的連鎖反應。
斷路器有如下3種狀態:
call-timeout
,致使失敗計數值增加,在超過設定的max-failures
,斷路器跳到斷開狀態。CircuitBreakerOpenException
異常而快速失敗。reset-timeout
後,斷路器跳入半開狀態。reset-timeout
週期後再從新進行試探。Lagom爲全部消費端對服務的調用都默認啓用了斷路器。儘管斷路器在消費端配置並使用,但用於綁定到服務的標識符則要由服務提供者定義和肯定相應粒度。默認狀況下,一個斷路器實例將覆蓋對一個服務全部API的調用。但經過設置斷路器標識符,能夠爲每一個API方法設置惟一的斷路器標識符,以便爲每一個API方法使用單獨的斷路器實例。或者經過在某幾個API方法上設置使用相同的標識符,實現對API調用的斷路保護分組。
下例中,sayHi
將使用默認的斷路器,而hiAgain
將使用斷路器hello2
:
def descriptor: Descriptor = { import Service._ named("hello").withCalls( namedCall("hi", this.sayHi), namedCall("hiAgain", this.hiAgain).withCircuitBreaker(CircuitBreaker.identifiedBy("hello2")) ) }
對應的application.conf中配置以下:
lagom.circuit-breaker { # will be used by sayHi method hello.max-failures = 5 # will be used by hiAgain method hello2 { max-failures = 7 reset-timeout = 30s } # Change the default call-timeout will be used for both sayHi and hiAgain methods default.call-timeout = 5s }
默認狀況下,Lagom的客戶端會將全部4字頭和5字頭的HTTP響應都映射爲相應的異常,而斷路器會將全部的異常都視做失敗,從而觸發熔斷。經過設置白名單,能夠忽略某些類型的異常。斷路器完整的斷路器配置選項以下:
# Circuit breakers for calls to other services are configured # in this section. A child configuration section with the same # name as the circuit breaker identifier will be used, with fallback # to the `lagom.circuit-breaker.default` section. lagom.circuit-breaker { # Default configuration that is used if a configuration section # with the circuit breaker identifier is not defined. default { # Possibility to disable a given circuit breaker. enabled = on # Number of failures before opening the circuit. max-failures = 10 # Duration of time after which to consider a call a failure. call-timeout = 10s # Duration of time in open state after which to attempt to close # the circuit, by first entering the half-open state. reset-timeout = 15s # A whitelist of fqcn of Exceptions that the CircuitBreaker # should not consider failures. By default all exceptions are # considered failures. exception-whitelist = [] } }
Lightbend推薦使用ScalaTest和Spec2做爲Lagom的測試框架。相似Akka ActorTestKit,可使用Lagom提供的ServiceTest
工具包對服務進行測試。
爲每一個測試建立一個單獨的服務實例,其關鍵步驟包括:
ServiceTest.withServer(config)(lagomApplication)(testBlock)
的結構進行測試。LocalServiceLocator
,以啓用默認的定位子服務。import com.lightbend.lagom.scaladsl.server.LocalServiceLocator import com.lightbend.lagom.scaladsl.testkit.ServiceTest import org.scalatest.AsyncWordSpec import org.scalatest.Matchers // 1. 啓用AsyncWordSpec class HelloServiceSpec extends AsyncWordSpec with Matchers { "The HelloService" should { // 2. 使用ServiceTest.withServer進行測試 "say hello" in ServiceTest.withServer(ServiceTest.defaultSetup) { ctx => // 3. 啓用默認的服務定位子 new HelloApplication(ctx) with LocalServiceLocator } { server => // 4. 建立一個客戶端進行服務調用 val client = server.serviceClient.implement[HelloService] client.sayHello.invoke("Alice").map { response => response should ===("Hello Alice!") } } } }
若要由多個測試共享一個服務實例,則須要使用ServiceTest.startServer()代替withServer(),而後在beforeAll與afterAll中啓動和中止服務。
import com.lightbend.lagom.scaladsl.server.LocalServiceLocator import com.lightbend.lagom.scaladsl.testkit.ServiceTest import org.scalatest.AsyncWordSpec import org.scalatest.Matchers import org.scalatest.BeforeAndAfterAll class HelloServiceSpec extends AsyncWordSpec with Matchers with BeforeAndAfterAll { lazy val server = ServiceTest.startServer(ServiceTest.defaultSetup) { ctx => new HelloApplication(ctx) with LocalServiceLocator } lazy val client = server.serviceClient.implement[HelloService] "The HelloService" should { "say hello" in { client.sayHello.invoke("Alice").map { response => response should ===("Hello Alice!") } } } protected override def beforeAll() = server protected override def afterAll() = server.stop() }
若要啓用Cluster、PubSub或者Persistence支持,則須要在withServer
第1個參數中啓用。若須要調用其餘服務,能夠在第2個參數中構造要調用服務的Stub或者Mock。注意事項包括:
withCassandra
或者withJdbc
啓用Persistence後。會自動啓用Clusterlazy val server = ServiceTest.startServer(ServiceTest.defaultSetup.withCluster) { ctx => new HelloApplication(ctx) with LocalServiceLocator { override lazy val greetingService = new GreetingService { override def greeting = ServiceCall { _ => Future.successful("Hello") } } } }
Lagom沒有爲HTTPS提供客戶端框架,所以只有借用Play-WS、Akka HTTP或者Akka gRPC等框架建立使用SSL鏈接的客戶端。而在服務端,可使用withSsl激活SSL支持,隨後框架將會爲測試端自動打開一個隨機的端口並提供一個javax.net.ssl.SSLContext
類型的上下文環境。接下來,客戶端就可使用testServer提供的httpsPort
和sslContext
鏈接到服務端併發送Request。Lagom測試工具提供的證書僅限於CN=localhost,因此該上下文SSLContext也只會信任本地的testServer,這就要求在發送請求時也設置好相應的權限,不然服務器將會拒絕該請求。目前,Lagom還沒法爲測試服務器設置不一樣的SSL證書。
"complete a WS call over HTTPS" in { val setup = defaultSetup.withSsl() ServiceTest.withServer(setup)(new TestTlsApplication(_)) { server => implicit val actorSystem = server.application.actorSystem implicit val ctx = server.application.executionContext // To explicitly use HTTPS on a test you must create a client of your own // and make sure it uses the provided SSLContext val wsClient = buildCustomWS(server.clientSslContext.get) // use `localhost` as authority val url = s"https://localhost:${server.playServer.httpsPort.get}/api/sample" val response = wsClient.url(url).get().map { _.body[String] } whenReady(response, timeout) { r => r should be("sample response") } } }
在測試支持流消息的服務時,須要搭配Akka Streams TestKit進行測試。
"The EchoService" should { "echo" in { // Use a source that never terminates (concat Source.maybe) // so we don't close the upstream, which would close the downstream val input = Source(List("msg1", "msg2", "msg3")).concat(Source.maybe) client.echo.invoke(input).map { output => val probe = output.runWith(TestSink.probe(server.actorSystem)) probe.request(10) probe.expectNext("msg1") probe.expectNext("msg2") probe.expectNext("msg3") probe.cancel succeed } } }
在服務測試中,能夠經過額外編寫PersistEntityTestDriver,使用持久化實體Persistent Entity進行與數據庫無關的功能測試。
Play、Akka和Lagom均出自Lightbend,所以Lagom使用Play JSON做爲消息的序列化框架,算是開箱即用了。
Lagom的序列化器,一般是與服務描述子放在一塊兒的一個MessageSerializer類型的隱式變量。也能夠在withCalls()
聲明服務API時,在call、namedCall、pathCall、restCall或者topic方法裏顯式地指定分別用於Request和Response的序列化器。
Lagom經過借用Play的JSON序列化器,對case class
進行JSON格式的序列化。該JSON序列化器有一個主要方法是jsValueFormatMessageSerializer
,可使用它在case classs
的companion object
裏指定其餘格式的JSON模板。同時,Lagom的MessageSerializer也爲NotUsed
、Done
、String
等類型提供了缺省的非JSON格式的序列化支持,好比MessageSerializer.StringMessageSerializer
。
trait HelloService extends Service { def sayHello: ServiceCall[String, String] override def descriptor = { import Service._ named("hello").withCalls( call(sayHello)( MessageSerializer.StringMessageSerializer, MessageSerializer.StringMessageSerializer ) ) } }
在companion object裏定義不一樣的JSON格式,隨後在服務描述子中顯式地選擇使用:
import play.api.libs.json._ import play.api.libs.functional.syntax._ case class MyMessage(id: String) object MyMessage { implicit val format: Format[MyMessage] = Json.format val alternateFormat: Format[MyMessage] = { // 將id映射爲JSON串裏的identifier (__ \ "identifier") .format[String] .inmap(MyMessage.apply, _.id) } } trait MyService extends Service { def getMessage: ServiceCall[NotUsed, MyMessage] def getMessageAlternate: ServiceCall[NotUsed, MyMessage] override def descriptor = { import Service._ named("my-service").withCalls( call(getMessage), call(getMessageAlternate)( // Request的序列化器 implicitly[MessageSerializer[NotUsed, ByteString]], // Response的序列化器 MessageSerializer.jsValueFormatMessageSerializer( implicitly[MessageSerializer[JsValue, ByteString]], // 指定JSON格式 MyMessage.alternateFormat ) ) ) } }
Lagom的trait MessageSerializer也能夠用來實現自定義的序列化器,派生的StrictMessageSerializer和StreamedMessageSerializer分別用於嚴格消息與流消息。其中,嚴格消息的序列化類型爲二進制串ByteString
,而流消息的則是Source[ByteString, _]
。
在實現自定義的序列化器以前,有幾個關鍵性概念:
Content-Type
和Accept
,以及MIME Type Scheme的版本號,或者根據服務的配置方式直接從URL中提取。NegotiatedSerializer
和NegotiatedDeserializer
負責。內容協商在多數狀況下並非必要的,因此並非必定須要實現的。
/* ------------ String Protocol ------------ */ import akka.util.ByteString import com.lightbend.lagom.scaladsl.api.deser.MessageSerializer.NegotiatedSerializer import com.lightbend.lagom.scaladsl.api.transport.DeserializationException import com.lightbend.lagom.scaladsl.api.transport.MessageProtocol import com.lightbend.lagom.scaladsl.api.transport.NotAcceptable import com.lightbend.lagom.scaladsl.api.transport.UnsupportedMediaType // 注意:`charset`由構造子傳入,傳遞給MessageProtocol用於定義協議使用的字符集。 class PlainTextSerializer(val charset: String) extends NegotiatedSerializer[String, ByteString] { override val protocol = MessageProtocol(Some("text/plain"), Some(charset)) def serialize(s: String) = ByteString.fromString(s, charset) } import com.lightbend.lagom.scaladsl.api.deser.MessageSerializer.NegotiatedDeserializer class PlainTextDeserializer(val charset: String) extends NegotiatedDeserializer[String, ByteString] { def deserialize(bytes: ByteString) = bytes.decodeString(charset) } /* ------------ JSON Protocol ------------ */ import play.api.libs.json.Json import play.api.libs.json.JsString class JsonTextSerializer extends NegotiatedSerializer[String, ByteString] { override val protocol = MessageProtocol(Some("application/json")) def serialize(s: String) = ByteString.fromString(Json.stringify(JsString(s))) } import scala.util.control.NonFatal class JsonTextDeserializer extends NegotiatedDeserializer[String, ByteString] { def deserialize(bytes: ByteString) = { try { Json.parse(bytes.iterator.asInputStream).as[String] } catch { case NonFatal(e) => throw DeserializationException(e) } } }
import com.lightbend.lagom.scaladsl.api.deser.StrictMessageSerializer import scala.collection.immutable class TextMessageSerializer extends StrictMessageSerializer[String] { // 支持的協議 override def acceptResponseProtocols = List( MessageProtocol(Some("text/plain")), MessageProtocol(Some("application/json")) ) /* ------ Serializer -----*/ // Client發出Request時還不須要協商協議,故使用最簡單的文本協議 def serializerForRequest = new PlainTextSerializer("utf-8") def serializerForResponse(accepted: immutable.Seq[MessageProtocol]) = accepted match { case Nil => new PlainTextSerializer("utf-8") case protocols => protocols .collectFirst { case MessageProtocol(Some("text/plain" | "text/*" | "*/*" | "*"), charset, _) => new PlainTextSerializer(charset.getOrElse("utf-8")) case MessageProtocol(Some("application/json"), _, _) => new JsonTextSerializer } .getOrElse { throw NotAcceptable(accepted, MessageProtocol(Some("text/plain"))) } } /* ------ Deserializer for Client and Service -----*/ def deserializer(protocol: MessageProtocol) = protocol.contentType match { case Some("text/plain") | None => new PlainTextDeserializer(protocol.charset.getOrElse("utf-8")) case Some("application/json") => new JsonTextDeserializer case _ => // 拋出異常在生產環境並不可取,由於對於諸如WebSocket之類的應用,瀏覽器不容許設置ContentType // 此時應返回一個缺省的反序列化器更爲穩當。 throw UnsupportedMediaType(protocol, MessageProtocol(Some("text/plain"))) } }
另外一個用Protocol buffers實現的例子:
import akka.util.ByteString import com.lightbend.lagom.scaladsl.api.deser.MessageSerializer.NegotiatedDeserializer import com.lightbend.lagom.scaladsl.api.deser.MessageSerializer.NegotiatedSerializer import com.lightbend.lagom.scaladsl.api.deser.StrictMessageSerializer import com.lightbend.lagom.scaladsl.api.transport.MessageProtocol import scala.collection.immutable class ProtobufSerializer extends StrictMessageSerializer[Order] { private final val serializer = { new NegotiatedSerializer[Order, ByteString]() { override def protocol: MessageProtocol = MessageProtocol(Some("application/octet-stream")) def serialize(order: Order) = { val builder = ByteString.createBuilder order.writeTo(builder.asOutputStream) builder.result } } } private final val deserializer = { new NegotiatedDeserializer[Order, ByteString] { override def deserialize(bytes: ByteString) = Order.parseFrom(bytes.iterator.asInputStream) } } override def serializerForRequest = serializer override def deserializer(protocol: MessageProtocol) = deserializer override def serializerForResponse(acceptedMessageProtocols: immutable.Seq[MessageProtocol]) = serializer }
在服務描述子中能夠加入頭部過濾器Header Filter,用於實現協商協議、身份驗證或訪問受權的溝通。過濾器會根據預設的條件,對服務與客戶端雙方通訊的消息進行轉換或修改。
下例是一個典型的過濾器實現。若是沒有特地進行綁定,那麼全部的服務默認都將會使用它,並使用ServicePrincipal來標識帶有服務名稱的客戶端。在客戶端,當Client發出Request時,過濾器將會在頭部附加User-Agent,Lagom默認會自動將服務名稱做爲ServicePrinciple。而在服務端,則會讀取Request中的User-Agent,並將其值設置爲Request的Principle。
⚡ 切記:頭部過濾器僅用於通訊雙方的協商,以肯定雙方採起何種方式進行後續的通訊,而不該用來執行實際的驗證邏輯。驗證邏輯屬於業務邏輯的組成部分,應當放在服務API及其組合當中。
object UserAgentHeaderFilter extends HeaderFilter { override def transformClientRequest(request: RequestHeader): RequestHeader = { request.principal match { case Some(principal: ServicePrincipal) => request.withHeader(HeaderNames.USER_AGENT, principal.serviceName) case _ => request } } override def transformServerRequest(request: RequestHeader): RequestHeader = { request.getHeader(HeaderNames.USER_AGENT) match { case Some(userAgent) => request.withPrincipal(ServicePrincipal.forServiceNamed(userAgent)) case _ => request } } override def transformServerResponse(response: ResponseHeader,request: RequestHeader): ResponseHeader = response override def transformClientResponse(response: ResponseHeader, request: RequestHeader): ResponseHeader = response }
相似服務使用compose進行組合,頭部過濾器也可使用HeaderFilter.composite
方法進行組合。
⚡ 注意:過濾器在發送消息與接收消息時適用的順序是恰好相反的。對於Request,越後加入的過濾器越新鮮就越早被運用。
class VerboseFilter(name: String) extends HeaderFilter { private val log = LoggerFactory.getLogger(getClass) def transformClientRequest(request: RequestHeader) = { log.debug(name + " - transforming Client Request") request } def transformServerRequest(request: RequestHeader) = { log.debug(name + " - transforming Server Request") request } def transformServerResponse(response: ResponseHeader, request: RequestHeader) = { log.debug(name + " - transforming Server Response") response } def transformClientResponse(response: ResponseHeader, request: RequestHeader) = { log.debug(name + " - transforming Client Response") response } } /* ----- 按下列順序組合過濾器 ----- */ def descriptor = { import Service._ named("hello") .withCalls( call(sayHello) ) .withHeaderFilter( HeaderFilter.composite( new VerboseFilter("Foo"), new VerboseFilter("Bar") ) ) }
在服務端,控制檯獲得的輸出將是以下的順序:
[debug] Bar - transforming Server Request [debug] Foo - transforming Server Request [debug] Foo - transforming Server Response [debug] Bar - transforming Server Response
Lagom在設計錯誤處理機制時,遵循瞭如下一些原則:
Lagom爲異常提供了trait ExceptionSerializer
,用於將異常信息序列化爲JSON等序列化格式,或者是特定的錯誤編碼或響應代碼。ExceptionSerializer會將異常轉換爲RawExceptionMessage
,其中包括對應HTTP響應代碼或WebSocket關閉代碼的狀態碼、消息主體,以及一個關於協議的描述子(在HTTP,此處對應響應頭部信息中的Content Type)。
默認的ExceptionSerializer使用Play JSON將異常信息序列化爲JSON格式。除非是在開發模式下,不然它只會返回TransportException
派生異常類的詳細信息,最多見的派生類包括NotFound
和PolicyViolation
。Lagom一般也容許Client拋出這一類的異常,也容許本身建立或實現一個TransportException的派生類實例,不過前提是Client得認識這個異常而且知道如何進行反序列化。
從Lagom 1.5.0版本開始,容許使用Play Router對Lagom的服務進行擴展。該功能在須要將Lagom服務與既存的Play Router進行集成時顯得更爲實用。
在Lagom作DI時,能夠注入額外的Router:
override lazy val lagomServer = serverFor[HelloService](wire[HelloServiceImpl]) .additionalRouter(wire[SomePlayRouter])
此例基於ScalaSirdRouter
實現,它將爲服務添加一個/api/files
的路徑,用於接收支持斷點續傳數據的POST請求。
import play.api.mvc.DefaultActionBuilder import play.api.mvc.PlayBodyParsers import play.api.mvc.Results import play.api.routing.Router import play.api.routing.sird._ class FileUploadRouter(action: DefaultActionBuilder, parser: PlayBodyParsers) { val router = Router.from { case POST(p"/api/files") => action(parser.multipartFormData) { request => val filePaths = request.body.files.map(_.ref.getAbsolutePath) Results.Ok(filePaths.mkString("Uploaded[", ", ", "]")) } } } override lazy val lagomServer = serverFor[HelloService](wire[HelloServiceImpl]) .additionalRouter(wire[FileUploadRouter].router)
在控制檯使用命令curl -X POST -F "data=@somefile.txt" -v http://localhost:65499/api/files
,便可發出上傳請求。
因爲額外的路由並無在服務描述子中定義,所以當服務使用了服務網關時,這些外部的路由將不會自動被服務網關暴露,所以須要顯式地將它的路徑加入訪問控制列表ACL(Access Control List)裏,而後經過服務網關訪問它們。
trait HelloService extends Service { def hello(id: String): ServiceCall[NotUsed, String] final override def descriptor = { import Service._ named("hello") .withCalls( pathCall("/api/hello/:id", hello _).withAutoAcl(true) ) .withAcls( // extra ACL to expose additional router endpoint on ServiceGateway ServiceAcl(pathRegex = Some("/api/files")) ) } }
而後在控制檯使用命令curl -X POST -F "data=@somefile.txt" -v http://localhost:9000/api/files
訪問它們。
因爲額外的路由不是服務API的一部分,因此沒法從Lagom生成的客戶端進行直接的訪問,而只能改用Play-WS之類的客戶端去訪問其暴露的HTTP端點。
這部分將使用Akka Typed按照DDD的方法實現一個CQRS架構的Lagom服務。
在這個ShoppingCart的示例裏(🔗 完整代碼),使用了Dock做爲容器,包裝了Zookeeper、Kafka和PostGres服務,因此在演示前須要用docker-compose up -d
進行初始化。同時,由於Lagom將使用Read-Side Processor
和Topic Producer
對AggregateEventTag標記的Event進行消費,因此須要用AkkaTaggerAdapter.fromLagom把AggregateEventTag轉換爲Akka能理解的Tag類型。而在讀端,Lagom提供的ReadSideProcessor
,在Cassandra和Relational數據庫插件支持下,能夠爲實現CQRS的讀端提供完整的支持。
⚡ 使用JDBC驅動數據庫存儲Journal時,分片標記數不能超過10。這是該插件已知的一個Bug,若是超過了10,將會致使某些事件被屢次傳遞。
/* ----- State & Handlers ----- */ final case class ShoppingCart( items: Map[String, Int], // checkedOutTime defines if cart was checked-out or not: // case None, cart is open // case Some, cart is checked-out checkedOutTime: Option[Instant] = None){ /* ----- Command Handlers ----- */ def applyCommand(cmd: ShoppingCartCommand): ReplyEffect[ShoppingCartEvent, ShoppingCart] = if (isOpen) { cmd match { case AddItem(itemId, quantity, replyTo) => onAddItem(itemId, quantity, replyTo) case Checkout(replyTo) => onCheckout(replyTo) case Get(replyTo) => onGet(replyTo) } } else { cmd match { case AddItem(_, _, replyTo) => Effect.reply(replyTo)(Rejected("Cannot add an item to a checked-out cart")) case Checkout(replyTo) => Effect.reply(replyTo)(Rejected("Cannot checkout a checked-out cart")) case Get(replyTo) => onGet(replyTo) } } private def onAddItem(itemId: String, quantity: Int, replyTo: ActorRef[Confirmation]): ReplyEffect[ShoppingCartEvent, ShoppingCart] = { if (items.contains(itemId)) Effect.reply(replyTo)(Rejected(s"Item '$itemId' was already added to this shopping cart")) else if (quantity <= 0) Effect.reply(replyTo)(Rejected("Quantity must be greater than zero")) else Effect .persist(ItemAdded(itemId, quantity)) .thenReply(replyTo)(updatedCart => Accepted(toSummary(updatedCart))) } private def onCheckout(replyTo: ActorRef[Confirmation]): ReplyEffect[ShoppingCartEvent, ShoppingCart] = { if (items.isEmpty) Effect.reply(replyTo)(Rejected("Cannot checkout an empty shopping cart")) else Effect .persist(CartCheckedOut(Instant.now())) .thenReply(replyTo)(updatedCart => Accepted(toSummary(updatedCart))) } private def onGet(replyTo: ActorRef[Summary]): ReplyEffect[ShoppingCartEvent, ShoppingCart] = { Effect.reply(replyTo)(toSummary(shoppingCart = this)) } private def toSummary(shoppingCart: ShoppingCart): Summary = { Summary(shoppingCart.items, shoppingCart.checkedOut) } /* ----- Event Handlers ----- */ def applyEvent(evt: ShoppingCartEvent): ShoppingCart = evt match { case ItemAdded(itemId, quantity) => onItemAdded(itemId, quantity) case CartCheckedOut(checkedOutTime) => onCartCheckedOut(checkedOutTime) } private def onItemAdded(itemId: String, quantity: Int): ShoppingCart = copy(items = items + (itemId -> quantity)) private def onCartCheckedOut(checkedOutTime: Instant): ShoppingCart = { copy(checkedOutTime = Option(checkedOutTime)) } }
Lagom借用Akka Cluster Sharding實現了服務的集羣部署,確保在任意時刻,有且只有一個聚合的實例在集羣內活動,相同類型的多個聚合實例則均勻分佈在各個節點之上。爲此,須要給ShoppingCart設定一個EntityKey,並向其工廠方法傳入EntityContext。
⚡ 在Command-Reply-Event的設計上,要注意區別Reply是回覆調用者的反作用,它能夠是確認Command是否成功執行,或者是返回聚合的內部狀態(要注意區別查詢命令與Read-Side)。而Event纔是因Command而致使聚合發生變化的正做用,是要持久化的。相應的,Effect.Reply()
用於聚合未發生變化的場合,而Effect.persist().thenReply()
則用於聚合發生變化以後。
/* ----- Factory & Protocols ----- */ object ShoppingCart { val empty = ShoppingCart(items = Map.empty) val typeKey: EntityTypeKey[ShoppingCartCommand] = EntityTypeKey[ShoppingCartCommand]("ShoppingCart") /* ----- Commands ----- */ trait CommandSerializable sealed trait ShoppingCartCommand extends CommandSerializable final case class AddItem(itemId: String, quantity: Int, replyTo: ActorRef[Confirmation]) extends ShoppingCartCommand final case class Checkout(replyTo: ActorRef[Confirmation]) extends ShoppingCartCommand final case class Get(replyTo: ActorRef[Summary]) extends ShoppingCartCommand /* ----- Replies (will not be persisted) ----- */ sealed trait Confirmation final case class Accepted(summary: Summary) extends Confirmation final case class Rejected(reason: String) extends Confirmation final case class Summary(items: Map[String, Int], checkedOut: Boolean) /* ----- Events (will be persisted) ----- */ sealed trait ShoppingCartEvent extends AggregateEvent[ShoppingCartEvent] { override def aggregateTag: AggregateEventTagger[ShoppingCartEvent] = ShoppingCartEvent.Tag } final case class ItemAdded(itemId: String, quantity: Int) extends ShoppingCartEvent final case class CartCheckedOut(eventTime: Instant) extends ShoppingCartEvent /* ----- Tag for read-side consuming ----- */ object ShoppingCartEvent { // will produce tags with shard numbers from 0 to 9 val Tag: AggregateEventShards[ShoppingCartEvent] = AggregateEventTag.sharded[ShoppingCartEvent](numShards = 10) } def apply(entityContext: EntityContext[ShoppingCartCommand]): Behavior[ShoppingCartCommand] = { EventSourcedBehavior .withEnforcedReplies[ShoppingCartCommand, ShoppingCartEvent, ShoppingCart]( persistenceId = PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId), emptyState = ShoppingCart.empty, commandHandler = (cart, cmd) => cart.applyCommand(cmd), eventHandler = (cart, evt) => cart.applyEvent(evt) ) // convert tag of Lagom to tag of Akka .withTagger(AkkaTaggerAdapter.fromLagom(entityContext, ShoppingCartEvent.Tag)) // snapshot every 100 events and keep at most 2 snapshots on db .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 2)) } }
class ShoppingCartLoader extends LagomApplicationLoader { override def load(context: LagomApplicationContext): LagomApplication = new ShoppingCartApplication(context) with AkkaDiscoveryComponents override def loadDevMode(context: LagomApplicationContext): LagomApplication = new ShoppingCartApplication(context) with LagomDevModeComponents override def describeService = Some(readDescriptor[ShoppingCartService]) } trait ShoppingCartComponents extends LagomServerComponents with SlickPersistenceComponents with HikariCPComponents with AhcWSComponents { implicit def executionContext: ExecutionContext override lazy val lagomServer: LagomServer = serverFor[ShoppingCartService](wire[ShoppingCartServiceImpl]) override lazy val jsonSerializerRegistry: JsonSerializerRegistry = ShoppingCartSerializerRegistry // Initialize the sharding for the ShoppingCart aggregate. // See https://doc.akka.io/docs/akka/2.6/typed/cluster-sharding.html clusterSharding.init( Entity(ShoppingCart.typeKey) { entityContext => ShoppingCart(entityContext) } ) } abstract class ShoppingCartApplication(context: LagomApplicationContext) extends LagomApplication(context) with ShoppingCartComponents with LagomKafkaComponents {}
服務API的相關操做實際將由幕後的Actor負責實現,所以在服務的實現裏須要訪問Actor的實例,爲此須要經過ClusterSharding.entityRefFor獲取其EntityRef。
class ShoppingCartServiceImpl( clusterSharding: ClusterSharding, persistentEntityRegistry: PersistentEntityRegistry )(implicit ec: ExecutionContext) extends ShoppingCartService { def entityRef(id: String): EntityRef[ShoppingCartCommand] = clusterSharding.entityRefFor(ShoppingCart.typeKey, id) }
在得到Reference後,即可使用Ask模式與Actor進行交互。Ask返回的是Future[Response],所以示例將Future[Summary]
投射爲ShoppingCartView
,方便Read-Side使用。
implicit val timeout = Timeout(5.seconds) override def get(id: String): ServiceCall[NotUsed, ShoppingCartView] = ServiceCall { _ => entityRef(id) .ask(reply => Get(reply)) .map(cartSummary => asShoppingCartView(id, cartSummary)) } final case class ShoppingCartItem(itemId: String, quantity: Int) final case class ShoppingCartView(id: String, items: Seq[ShoppingCartItem], checkedOut: Boolean) private def asShoppingCartView(id: String, cartSummary: Summary): ShoppingCartView = { ShoppingCartView( id, cartSummary.items.map((ShoppingCartItem.apply _).tupled).toSeq, cartSummary.checkedOut ) }
這部份內容是對比上一節直接使用Akka Persistence Typed創建領域模型的方式,改從傳統的Lagom Persistence遷移到Akka Persistence Typed的角度進行了詳細的分步講解。因此,若是是全新開始設計的Lagom的服務,建議直接使用Akka Persistence Typed進行實現,只有此前用Lagom Persistence實現的服務才須要考慮遷移。
因爲內容主要涉及Akka Typed,可參考個人博客內容:
Lagom與下列數據庫平臺兼容:
參考連接:
Cassandra須要至少3個KeySpace:
cassandra-journal.keyspace = my_service_journal
cassandra-snapshot-store.keyspace = my_service_snapshot
lagom.persistence.read-side.cassandra.keyspace = my_service_read_side
這只是Lagom官方文檔的一小部份內容,算是對如何使用該框架實現服務的初窺,有興趣的請移步官方網站尋找更多的內容。