Lagom 官方文檔之隨手記

引言

Lagom是出品Akka的Lightbend公司推出的一個微服務框架,目前最新版本爲1.6.2。Lagom一詞出自瑞典語,意爲「適量」。html

🔗 https://www.lagomframework.com/documentation/1.6.x/scala/Home.htmljava

Lagom框架堅持,微服務是按服務邊界Boundary將系統切分爲若干個組成部分的結果,這意味着要使它們與限界上下文Bounded Context、業務功能和模塊隔離等要求保持一致,才能達到可伸縮性和彈性要求,從而易於部署和管理。所以,在設計微服務時應考慮大小是否「Lagom」,而非是否足夠「Micro」。react

Lagom框架大量使用了Jonas Bonér所著Reactive Microservices Architecture: Design Principles For Distributed Systems一書的設計理念和思想,因此推薦在使用Lagom以前先閱讀此書。git

「船大好頂浪,船小好調頭」——Jonas認爲,將龐大的系統分割爲若干獨立的更小粒度的部分,同時將管理權限適當下放,可使這些獨立的部分更快地作出決斷,以適應外部環境的不斷變化。github

➡️ Getting Start

Lagom開發環境要求:web

  • JDK8
  • sbt 1.2.1以上版本(Lightbend推薦使用sbt,maven次之,故如下均使用sbt。)
  • 可用的互聯網

Hello World

Lagom提供了Giter8模板,方便利用sbt構造一個Hello World項目結構,確保在開發前驗證生成工具和項目已正確配置。Hello World包括Hello與Stream兩個微服務,每一個微服務包括API與實現兩個子項目。Lagom會自動配置諸如持久化、服務定位等基礎設施,而且支持發現並加載微服務的熱更新。算法

sbt new lagom/lagom-scala.g8

hello                   → Project root
 └ hello-api            → hello api project
 └ hello-impl           → hello implementation project
 └ hello-stream-api     → hello-stream api project
 └ hello-stream-impl    → hello-stream implementation project
 └ project              → sbt configuration files
   └ build.properties   → Marker for sbt project
   └ plugins.sbt        → sbt plugins including the declaration for Lagom itself
 └ build.sbt            → Your project build file

使用sbt裏的runAll一次性啓動全部服務後(包括Cassandra、Kafka,如果手動則須要本身一個個啓動包括基礎服務在內的全部服務),在http://localhost:9000/api/hello/World處將獲得顯示了一行「Hello World」的頁面。docker

  • 在Hello API裏,HelloWorldService從Service派生,聲明服務的API以及服務的回覆消息。
  • 在API裏,實現Service.descriptor方法,該方法返回一個Descriptor,用於定義服務的名稱、REST端點路徑、Kafka消息主題,以及REST路徑與服務API方法的映射關係,等等。
  • 在Hello Impl裏,HelloWorldServiceImpl從API裏的HelloWorldService派生,定義服務的API。
  • 在服務API的定義裏,將使用一個支持集羣分片的Sharded、可持久化的Persistent、強類型Typed的Actor——HelloWorldBehavior採用Ask模式進行通訊,按entityRef(id).ask[Message](replyTo => msg(replyTo)).map(reply => ...)的樣式進行ask與map配對。其中,entityRef是該Actor在集羣裏的引用:clusterSharding.entityRefFor(HelloWorldState.typeKey, id)
  • HelloWorldBehavior實質就是一個EventSourcedBehavior,採用了State Pattern,定義了支撐服務的State、Command和Event以及相應的Handler。
  • 在Hello API內部,hello(id)實際是委託useGreeting(id)方法完成的,因此將World換做其餘內容亦可。

服務與網絡地址綁定

使用sbt將服務綁定到特定網絡地址(默認是localhost):數據庫

lazy val biddingImpl = (project in file("biddingImpl"))
  .enablePlugins(LagomScala)
  .settings(lagomServiceAddress := "0.0.0.0")

端口號的分配機制

即使是將服務部署到不一樣的物理主機上,服務的端口號也將保持先後一致(老是使用特定的某個端口號),該端口號基於如下算法:apache

  • 先對項目Project的名稱進行hash。
  • 將hash值投射到默認的端口範圍[49152, 65535]內。
  • 若是沒有其餘項目申請同一個端口號,那麼選定的該端口號將指定給該項目。若是發生衝突,那麼將會按項目名稱的字母順序逐個遞增地分配相鄰的端口號。

一般狀況下,無需關心上述細節,由於不多會發生這樣的衝突。固然,也能夠按下列方式手動指定端口號:

lazy val usersImpl = (project in file("usersImpl"))
  .enablePlugins(LagomScala)
  .settings(lagomServiceHttpPort := 11000)

若是嫌默認的端口範圍[49152, 65535]不合適,能夠指定端口範圍,不過範圍越窄,發生衝突的機率就越大:

lagomServicesPortRange in ThisBuild := PortRange(40000, 45000)

在開發模式下使用HTTPS

開發模式,是指在sbt或者maven支持下,啓動和調試各類服務。在這種環境下,對代碼作出修改後,Lagom將負責後續的編譯和從新加載工做,相應的服務會自動重啓。

在sbt裏進行以下配置,將會使用一個自簽名的證書爲服務啓用HTTPS並指定其端口,確保服務之間能經過HTTPS進行調用,但同時服務網關Service Gateway將仍舊只能使用HTTP:

lagomServiceEnableSsl in ThisBuild := true
lagomServiceHttpsPort := 20443

在客戶端,則建議使用諸如Play-WS或者Akka-HTTP Client API這樣的HTTPS Client框架。

服務定位子與網關

服務定位子

服務定位子Service Locator,是確保用於發現其餘服務並與之聯繫的組件。Lagom內置的缺省Locator有如下特性:

  • 默認地址是localhost,可以使用lagomServiceLocatorAddress in ThisBuild := "0.0.0.0"修改之。
  • 默認端口是9008,可以使用lagomServiceLocatorPort in ThisBuild := 10000修改之。
  • 非Lagom的外部服務須要先註冊:lagomUnmanagedServices in ThisBuild := Map("weather" -> "http://localhost:3333"),而後再使用。

網關

網關Gateway,至關於Service Locator的代理,用於防止對Locator的不當訪問。Lagom內置的缺省Gateway有如下特性:

  • 默認地址是localhost,可以使用lagomServiceGatewayAddress in ThisBuild := "0.0.0.0"修改之。
  • 默認端口是9000,可以使用lagomServiceGatewayPort in ThisBuild := 9010修改之。
  • Lagom提供了一個基於Akka HTTP的網關(akka-http是默認的),一箇舊式的基於Netty的網關,可以使用lagomServiceGatewayImpl in ThisBuild := "netty"指定之。

啓停與禁用

在開發環境下,使用runAll時默認會啓動Locator與Gateway。須要手動啓停時,分別執行lagomServiceLocatorStartlagomServiceLocatorStop任務便可。

如要須要禁用內置的Locater與Gateway,則使用lagomServiceLocatorEnabled in ThisBuild := false禁用之。以後便須要本身提供一個Locator的實現,並須要牢記每一個服務的端口號以創建相互聯繫。

Cassandra服務

Cassandra是Apache提供的一個分佈式、可擴展的NoSQL數據庫。它以KeySpace爲單位,其中包含若干個表Table或者列族Column Family,每一個列族能夠有不一樣的列(至關於RDBMS中的字段)並可自由添加列,每一個行能夠擁有不一樣的列,並支持索引。Cassandra支持的數據類型除常見的原生類型外,爲List、Map和Set提供了直接支持,提供了TTL數據到期自動刪除功能,而且能夠自定義數據類型。

Lagom內置了一個Cassandra服務,做爲Event Sourcing的事件持久化平臺。

  • 默認端口是4000,是爲避免與Cassandra默認端口9042衝突,能夠用lagomCassandraPort in ThisBuild := 9042指定之。
  • 默認狀況下生成的數據會在每次Cassandra服務運行期間被保存,能夠用lagomCassandraCleanOnStart in ThisBuild := true使之在每次服務啓動時清空數據庫。
  • 默認使用文件dev-embedded-cassandra.yaml對Cassandra進行配置,可使用lagomCassandraYamlFile in ThisBuild := Some((baseDirectory in ThisBuild).value / "project" / "cassandra.yaml")另行指定特定YAML配置文件。
  • Cassandra服務將運行在獨立的JVM上,可使用lagomCassandraJvmOptions in ThisBuild := Seq("-Xms256m", "-Xmx1024m", "-Dcassandra.jmx.local.port=4099")指定相應的JVM參數。
  • 默認的日誌將輸出至標準輸出,默認級別爲ERROR,暫時未提供配置措施。若是確需調整日誌配置,則只有鏈接到一個本地的Cassandra實例再修改之。
  • 默認狀況下Cassandra服務會最早啓動,並預留20秒時間完成啓動,可使用lagomCassandraMaxBootWaitingTime in ThisBuild := 0.seconds修改之。
  • 使用lagomCassandraStartlagomCassandraStop手動啓停。
  • 使用lagomCassandraEnabled in ThisBuild := false禁用之,再使用lagomUnmanagedServices in ThisBuild := Map("cas_native" -> "tcp://localhost:9042")註冊本地Cassandra實例便可使用之。

Kafka服務

Kafka是Apache提供的一個分佈式的流處理平臺,簡單講能夠理解爲一個生產者-消費者結構的、集羣條件下的消息隊列Message Queue。每條消息流Stream以主題Topic做爲惟一區別,每一個Topic下能夠有多個相同的分區Partition,分區裏的消息都有一個Offset做爲序號以確保按順序被消費。Kafka依賴ZooKeeper提供的集羣功能部署其節點。

Lagom內置了一個Kafka服務:

  • 默認端口爲9092,能夠用lagomKafkaPort in ThisBuild := 10000修改之
  • 依賴的ZooKeeper端口默認爲2181,能夠用lagomKafkaZookeeperPort in ThisBuild := 9999修改之
  • 默認使用文件kafka-server.properties配置Kafka運行參數,可使用lagomKafkaPropertiesFile in ThisBuild := Some((baseDirectory in ThisBuild).value / "project" / "kafka-server.properties")另行指定配置文件。
  • Kafka服務將運行在獨立的JVM上,可使用lagomKafkaJvmOptions in ThisBuild := Seq("-Xms256m", "-Xmx1024m")指定相應的JVM參數。
  • 默認的日誌將直接輸出至文件,路徑爲<your-project-root>/target/lagom-dynamic-projects/lagom-internal-meta-project-kafka/target/log4j_output,而Kafka的提交日誌將保存在<your-project-root>/target/lagom-dynamic-projects/lagom-internal-meta-project-kafka/target/logs
  • 使用lagomKafkaStartlagomKafkaStop手動啓停。
  • 使用lagomKafkaEnabled in ThisBuild := false禁用之,再使用lagomKafkaAddress in ThisBuild := "localhost:10000"指定Kafka實例便可使用之。若是是本地實例,用lagomKafkaPort in ThisBuild := 10000指定端口便可。

➡️ 編寫Lagom服務

服務描述子

每個Lagom服務都由一個接口進行描述。該接口的內容不只包括接口方法的聲明和實現,同時還定義了接口的元數據如何被映射到底層的傳輸協議。

服務的每一個接口方法都要求返回一個ServiceCall[Request, Response],其中Request或Response能夠是Akka的NotUsed

trait ServiceCall[Request, Response] {
  def invoke(request: Request): Future[Response]
}

每個Lagom服務在覆寫的descriptor中都將返回一個服務描述子Service Descriptor。如下即是聲明瞭一個叫做hello的服務,該服務提供了sayHello的API:

trait HelloService extends Service {
  def sayHello: ServiceCall[String, String]

  override def descriptor = {
    import Service._
    named("hello").withCalls(call(sayHello))
  }
}

Call標識符

每一個服務調用都必須有一個惟一的標識符,以保證調用能最終映射到正確的API方法上。這個標識符能夠是靜態的一個字符串名稱,也能夠在運行時動態生成。默認狀況下,API方法的名稱即該調用的標識符。

強命名的標識符

使用namedCall指定強命名的標識符,服務hello裏的API方法sayHello的調用名爲hello,在REST架構下的相應路徑爲/hello

named("hello").withCalls(namedCall("hello", sayHello))

基於路徑的標識符

使用pathCall指定基於路徑的標識符,相似於字符串中用$引導的內插值,此處用:引導內插變量。Lagom爲此提供了一個隱式的PathParamSerializer,用於從路徑中提取StringIntBoolean或者UUID類型的內插變量。好比如下即是提取了路徑中類型爲long的orderId和類型爲String的itemId值,做爲參數傳遞給API方法。在做參數映射時,默認將按從路徑中從左至右提取的順序進行映射。

💀 這與ASP.NET MVC等一些HTTP框架採用的方法是相似的,因此要注意以構建RESTful應用的思路貫徹學習始終。

def getItem(orderId: Long, itemId: String): ServiceCall[NotUsed, Item]

override def descriptor = {
  import Service._
  named("orders").withCalls(pathCall("/order/:orderId/item/:itemId", getItem _))
}

提取查詢串中的參數時,則使用的?起始、以&分隔的形式:

def getItems(orderId: Long, pageNo: Int, pageSize: Int): ServiceCall[NotUsed, Seq[Item]]

override def descriptor = {
  import Service._
  named("orders").withCalls(pathCall("/order/:orderId/items?pageNo&pageSize", getItems _))
}

在REST架構下,Lagom會努力正確實現上述映射,而且在有Request消息時使用POST方法,不然使用GET方法。

REST標識符

REST標識符用於徹底REST形式的調用,和pathCall很是相似,區別只是REST標識符可指定HTTP調用方法:

def addItem(orderId: Long): ServiceCall[Item, NotUsed]
def getItem(orderId: Long, itemId: String): ServiceCall[NotUsed, Item]
def deleteItem(orderId: Long, itemId: String): ServiceCall[NotUsed, NotUsed]

def descriptor = {
  import Service._
  import com.lightbend.lagom.scaladsl.api.transport.Method
  named("orders").withCalls(
    restCall(Method.POST, "/order/:orderId/item", addItem _),
    restCall(Method.GET, "/order/:orderId/item/:itemId", getItem _),
    restCall(Method.DELETE, "/order/:orderId/item/:itemId", deleteItem _)
  )
}

消息

每一個服務API都須要指定Request和Response的消息類型,能夠用akka.NotUsed做爲佔位符,分爲兩種形式:

  • 嚴格消息Strict Message:這就是用Scala裏的object或者case class表達的常見消息,它們將在內存的緩衝區中被序列化後進行傳輸。若是Request與Response均是這類嚴格消息,則調用將會是同步的,嚴格按請求-回覆的順序進行。
  • 流消息Streamed Message:流消息是一類特殊的消息,其類型爲Akka-Stream裏的Source。Lagom將使用WebSocket協議進行流的傳輸。若是Request與Response均是流消息時,任何一方關閉時,WebSocket將徹底關閉。若是隻有一方是流消息,則嚴格消息仍按序列化方式進行傳輸,而WebSocket將始終保持打開狀態,直到另外一個方向關閉爲止。

如下分別是單向流和雙向流消息的示例:

def tick(interval: Int): ServiceCall[String, Source[String, NotUsed]]

def descriptor = {
  import Service._
  named("clock").withCalls(pathCall("/tick/:interval", tick _))
}

def sayHello: ServiceCall[Source[String, NotUsed], Source[String, NotUsed]]

def descriptor = {
  import Service._
  named("hello").withCalls(call(this.sayHello))
}

消息的序列化

Lagom經過定義隱式的MessageSerializer,爲call、namedCall、pathCall和restCall提供了消息的序列化支持。對String類型的消息和Play框架JSON格式的消息,Lagom提供了內置的序列化器。除此之外,能夠自定義序列化器。(參考:消息序列化器

使用Play-JSON時,一般是用case class和companion object配合,case class定義消息的結構,companion object定義消息的格式:

case class User(
    id: Long,
    name: String,
    email: Option[String]
)

object User {
  import play.api.libs.json._
  implicit val format: Format[User] = Json.format[User]
}

對應的JSON結果爲:

{
  "id": 12345,
  "name": "John Smith",
  "email": "john.smith@example.org"
}

屬性能夠是Option,這樣若是爲None,則Play-JSON在序列化時不會解析它、反序列化時不會生成該屬性。若是case class還內嵌了其餘case class,則被嵌入的case class也須要定義它的format。

實現服務

服務的實現,即實現以前聲明的服務描述子trait。對服務API中聲明的每一個方法,使用ServiceCall的工廠方法apply,傳入一個Request => Future[Response],返回一個ServiceCall

(💀 注意:Lagom大量使用函數做爲返回值,從而充分發揮了FP組合高階函數的優點。)

class HelloServiceImpl extends HelloService {
  override def sayHello = ServiceCall { name => Future.successful(s"Hello $name!") }
}

使用流消息

當消息不是普通的嚴格消息而是流消息時,須要使用Akka Stream來處理它。對應前面單向流的例子,它的實現以下:

override def tick(intervalMs: Int) = ServiceCall { tickMessage =>
  Future.successful(
    Source
      .tick(
        // 消息被髮送前的延遲
        intervalMs.milliseconds,
        // 消息發送的間隔
        intervalMs.milliseconds,
        // 將被髮送的消息
        tickMessage
      )
      .mapMaterializedValue(_ => NotUsed)
  )
}

處理消息的頭部信息

若是某些時候須要處理消息的頭部信息(一般是HTTP),那麼Lagom提供了ServiceCall的派生類ServerServiceCall做爲支持。ServerServiceCall將ServiceCall裏的Header單獨取出,提供了invokeWithHeaders方法,該方法第一個參數是RequestHeader,另外一個參數纔是Request自己,這樣直接將invoke委託給invokeWithHeaders,從而方便在函數體中使用handleRequestHeaderhandleResponseHeader對頭部信息進行處理(儘管ServiceCall自己也支持這2個處理函數)。

override def sayHello = ServerServiceCall { (requestHeader, name) =>
  val user = requestHeader.principal
    .map(_.getName)
    .getOrElse("No one")
  val response = s"$user wants to say hello to $name"

  val responseHeader = ResponseHeader.Ok.withHeader("Server", "Hello service")

  Future.successful((responseHeader, response))
}

ServerServiceCall工廠方法有一個版本能夠同時處理Request與Response的頭部信息,但也有不帶處理頭部信息的版本。後者雖然看起來與ServiceCall沒什麼區別,從而顯得畫蛇添足,但實際這是爲知足組合服務調用的需求而存在的。

組合服務調用

組合服務調用,相似於把ServerServiceCall經過依賴注入傳遞給須要包裹在外層的日誌、權限、過濾等切面服務,從而實現AOP切入到核心的服務API調用上。

在AOP切入的實現上,Lagom採起了組合高階函數的方法。這就象是抹了一層又一層奶油的生日蛋糕,只有切開了才能看到最裏層真正想要吃到的蛋糕。相比使用共享的線程變量等方法,不只經過類型系統發揮了編譯檢查的優點而更加安全,而且還經過構造表達式樹而提供了延遲計算的功能。

// AOP: Log
def logged[Request, Response](serviceCall: ServerServiceCall[Request, Response]) =
  ServerServiceCall.compose { requestHeader =>
    println(s"Received ${requestHeader.method} ${requestHeader.uri}")
    serviceCall
  }

override def sayHello = logged(ServerServiceCall { name =>
    Future.successful(s"Hello $name!")
  })

// AOP: Authentication
trait UserStorage {
  def lookupUser(username: String): Future[Option[User]]
}

def authenticated[Request, Response](serviceCall: User => ServerServiceCall[Request, Response]) = {
  // composeAsync容許異步地返回要調用的服務API
  ServerServiceCall.composeAsync { requestHeader =>
    // First lookup user
    val userLookup = requestHeader.principal
      .map(principal => userStorage.lookupUser(principal.getName))
      .getOrElse(Future.successful(None))

    // Then, if it exists, apply it to the service call
    userLookup.map {
      case Some(user) => serviceCall(user)
      case None       => throw Forbidden("User must be authenticated to access this service call")
    }
  }
}

override def sayHello = authenticated { user =>
  ServerServiceCall { name =>
    // 注意:由於閉包,此處可訪問經AOP切入帶來的user
    Future.successful(s"$user is saying hello to $name")
  }
}

依賴注入

依賴注入,Dependency Injection,是把本應由服務承擔的建立其自身依賴的責任移交給外部的框架,改由DI框架負責生產對象並維護彼此的關聯,最終造成完整的對象圖(Object Graph),從而達到公示服務所需依賴、消滅代碼中硬編碼的new操做、下降耦合度的目的。

Cake Pattern

在Scala語言裏最多見的一個DI模式,是用trait的Self Type特性實現的蛋糕模式(Thin Cake Pattern)。

更復雜的示例,請參見 🔗 Real-World Scala: Dependency Injection

trait Stuffing {
  val stuffing: String
}

// 使用Self Type特性,限定繼承了Cake的子類,必須同時也繼承了Stuffing
// 此處Stuffing還能夠with其餘trait,從而實現多重注入
trait Cake { this: Stuffing =>
  def flavour: String = this.stuffing
}

object LemonCake extends Cake with Stuffing { ... }

Reader Monad

在FP的世界,實現DI的另外一個選擇是Reader Monad。

case class Reader[R, A](run: R => A) {
  def map[B](f: A => B): Reader[R, B] =
    Reader(r => f(run(r)))

  def flatMap[B](f: A => Reader[R, B]): Reader[R, B] =
    Reader(r => f(run(r)).run(r))
}

def balance(accountNo: String) = Reader((repo: AccountRepository) => repo.balance(accountNo))

DI框架MacWire基本用法

🔗 Dependency Injection in Scala using MacWire
🔗 MacWire使用指南中涉及的參考知識:軌道交通編組站

Lagom使用了MacWire做爲默認的DI框架(固然也能夠換Spring之類的其餘DI框架)。

MacWire主要使用wire[T]進行DI,它會從標註了@Inject的構造子、非私有的主要構造子、Companion Object裏的apply()方法裏查找依賴關係。而後再根據依賴項的類型,按如下順序查找匹配的參數項:

  • 先在當前塊、閉合函數或匿名函數的參數中尋找惟一值;
  • 而後在閉合類型的參數、屬性裏尋找惟一值;
  • 最後在父類型裏尋找惟一值。

使用MacWire相關事項:

  • 若是是隱式參數,MacWire將會跳過它,使它仍按Scala語法關於隱式參數的方法進行處理。

  • 只要類型匹配,此處的值能夠是vallazy val或者用def定義的無參函數。

  • 須要根據條件使用不一樣的依賴項實現時,用相似lazy val component = if (condition) then wire[implementationA] else wire[implementationB]的方法,定義好component的值便可。

  • 須要在依賴項上切入攔截器Interceptor時,先在依賴項定義處聲明一個攔截器def logInterceptor : Interceptor,再用lazy val component = logInterceptor(wire[Component])綁定攔截聲明,最後在使用依賴項進行實現的地方用lazy val logInterceptor = { ... }給出具體實現便可。

    trait ShuntingModule {
      lazy val pointSwitcher: PointSwitcher =
            logEvents(wire[PointSwitcher])
      lazy val trainCarCoupler: TrainCarCoupler =
            logEvents(wire[TrainCarCoupler])
      lazy val trainShunter = wire[TrainShunter]
    
      def logEvents: Interceptor
    }
    
    object TrainStation extends App {
      val modules = new ShuntingModule
          with LoadingModule
          with StationModule {
    
          lazy val logEvents = ProxyingInterceptor { ctx =>
            println("Calling method: " + ctx.method.getName())
            ctx.proceed()
          }
      }
    
      modules.trainStation.prepareAndDispatchNextTrain()
    }
  • 默認狀況下,使用lazy val component = wire[Component]聲明在當前範圍內惟一的依賴項(Singleton)。若是須要在每次調用時都建立新的依賴項(Dependent),則換做def便可。

  • 除Singleton和Dependent兩種生命週期外,MacWire還支持其餘形式的生命週期,方法相似使用攔截器。

    trait StationModule extends ShuntingModule with LoadingModule {
      lazy val trainDispatch: TrainDispatch =
            session(wire[TrainDispatch])
      lazy val trainStation: TrainStation =
            wire[TrainStation]
    
      def session: Scope
    }
    
    object TrainStation extends App {
      val modules = new ShuntingModule
          with LoadingModule
          with StationModule {
    
          lazy val session = new ThreadLocalScope
      }
    
      // implement a filter which attaches the session to the scope
      // use the filter in the server
    
      modules.trainStation.prepareAndDispatchNextTrain()
    }
  • 對須要參數進行構造的依賴項,使用lazy val component = (parameters) => wire[Component]這樣的工廠方法進行聲明,展開後將變成new Component(parameters)。對在trait裏聲明的依賴項,則改用def component(parameters: Parameter) = wire[Component]的方式定義工廠方法。

  • 在具體使用參數化的依賴項時,使用wireWith(real_parameter)代入實際參數。

  • 須要區別同一類依賴項的不一樣實例時,能夠用trait做爲標記tag。一種方法是用new Component(...) with tag細化其子類型,另外一種是在聲明依賴值的類型後面用@@ tag做標記。

    trait Regular
    trait Liquid
    
    class TrainStation(
      trainShunter: TrainShunter,
      regularTrainLoader: TrainLoader with Regular,  
      liquidTrainLoader: TrainLoader with Liquid,
      trainDispatch: TrainDispatch) { ... }
    
    lazy val regularTrainLoader = new TrainLoader(...) with Regular
    lazy val liquidTrainLoader = new TrainLoader(...) with Liquid
    lazy val trainStation = wire[TrainStation]
    
    ///////////////////////////////////////
    
    class TrainStation(
      trainShunter: TrainShunter,
      regularTrainLoader: TrainLoader @@ Regular,  
      liquidTrainLoader: TrainLoader @@ Liquid,
      trainDispatch: TrainDispatch) { ... }
    
    lazy val regularTrainLoader = wire[TrainLoader].taggedWith[Regular]
    lazy val liquidTrainLoader = wire[TrainLoader].taggedWith[Liquid]
    lazy val trainStation = wire[TrainStation]

組裝完整的應用程序

在Lagom中使用MacWire進行DI是可行的,但前述的服務定位子基本上只能知足開發環境的需求,生產環境仍是要換用Akka Discovery Service Locator之類更強大的定位子框架。

在完成組件的拼裝後,還須要一個啓動子啓動應用程序。Lagom爲此提供了Play框架ApplicationLoader的簡化版本LagomApplicationLoader。其中的loadDevModeload是必須的,前者用with LagomDevModeComponents加載Lagom內嵌的服務定位子,用於開發環境;後者能夠指定生產環境下的服務定位子,若是返回NoServiceLoader將意味着不提供服務定位。而describeService是可選的,它主要用於聲明服務的API,爲外部管理框架或者腳本語言提供服務的Meta信息,以方便進行動態配置。服務無數據Metadata,又稱爲ServiceInfo,主要包括服務的名稱以及一個訪問控制列表ACL。一般這些Metadata會由框架自動生成,前提是在使用withCalls聲明描述子時,用withAutoAcl(true)激活便可。

import com.lightbend.lagom.scaladsl.server._
import com.lightbend.lagom.scaladsl.api.ServiceLocator
import com.lightbend.lagom.scaladsl.devmode.LagomDevModeComponents

class HelloApplicationLoader extends LagomApplicationLoader {
  override def loadDevMode(context: LagomApplicationContext) =
    new HelloApplication(context) with LagomDevModeComponents

  override def load(context: LagomApplicationContext) =
    new HelloApplication(context) {
      override def serviceLocator = ServiceLocator.NoServiceLocator
    }

  override def describeService = Some(readDescriptor[HelloService])
}

最後,只需在配置application.conf裏用play.application.loader = com.example.HelloApplicationLoader指定啓動子,即完成了應用程序的裝配。

Lagom預置的組件類型

Lagom根據用途不一樣,一般將組件分爲若干類型:

  • Service Components:定義服務時所須要的基本模板
  • Persistence and Cluster Components:與Akka Cluster、Akka Persistence協做,實現Cluster、ES和CQRS所須要的組件模板
  • Broker API Components:與Kafka協做,進行消息的生產和消費所須要的組件模板
  • Service Locator Components:實現服務定位子的組件模板
  • Third party Components:與諸如Web Service Client之類進行協做的組件模板

🔗 詳細列表:https://www.lagomframework.com/documentation/1.6.x/scala/ScalaComponents.html

消費服務

基本步驟

定義並實現服務後,該服務便可被其餘服務或其餘類型的客戶消費使用。Lagom將根據服務描述子的內容,經過使用ServiceClientimplement宏進行綁定,自動生成一個調用該服務的框架,而後就能夠象調用本地對象的方法同樣invoke服務提供的API了。

// 先綁定HelloService
abstract class MyApplication(context: LagomApplicationContext)
    extends LagomApplication(context)
    with AhcWSComponents {
  lazy val helloService = serviceClient.implement[HelloService]
}

// 而後在另外一個服務MyService裏消費HelloService
class MyServiceImpl(helloService: HelloService)(implicit ec: ExecutionContext) extends MyService {
  override def sayHelloLagom = ServiceCall { _ =>
    val result: Future[String] = helloService.sayHello.invoke("Lagom")

    result.map { response => s"Hello service said: $response" }
  }
}

消費流服務

當服務使用了流消息時,Lagom將在消費端使用帶有最大幀長度參數的WebSocket進行通訊。該參數定義了能夠發送的消息的最大尺寸,能夠在application.conf中配置。

#This configures the websocket clients used by this service.
#This is a global configuration and it is currently not possible to provide different configurations if multiple websocket services are consumed.
lagom.client.websocket {
  #This parameter limits the allowed maximum size for the messages flowing through the WebSocket. A similar limit exists on the server side, see:
  #https://www.playframework.com/documentation/2.6.x/ScalaWebSockets#Configuring-WebSocket-Frame-Length
  frame.maxLength = 65536
}

斷路器

斷路器Circuit Breaker,如同電路保險絲,能夠在服務崩潰時迅速熔斷,從而避免產生更大面積的連鎖反應。

斷路器有如下3種狀態:

lagom

  • 閉合狀態 Closed:一般狀況下,斷路器處於閉合狀態,保證服務調用正常經過。
    • 因爲觸發異常或者調用超過設定的call-timeout,致使失敗計數值增加,在超過設定的max-failures,斷路器跳到斷開狀態。
    • 因爲半開狀態下試探成功,失敗計數值被重置歸零。
  • 斷開狀態 Open:這種狀況下,斷路器處於斷開狀態,服務將始終不可用。
    • 全部的服務調用,都會獲得一個CircuitBreakerOpenException異常而快速失敗。
    • 在超過設定的reset-timeout後,斷路器跳入半開狀態。
  • 半開狀態 Half-Open:這種狀況下,斷路器將嘗試恢復到閉合狀態。
    • 除第一個調用將被容許經過,以試探服務是否恢復可用外,後續調用將繼續被快速失敗所拒絕。
    • 若是試探成功,斷路器將經過重置跳到閉合狀態,恢復服務的可用性;若是試探失敗,斷路器將回到斷開狀態,而後等待下一個reset-timeout週期後再從新進行試探。

Lagom爲全部消費端對服務的調用都默認啓用了斷路器。儘管斷路器在消費端配置並使用,但用於綁定到服務的標識符則要由服務提供者定義和肯定相應粒度。默認狀況下,一個斷路器實例將覆蓋對一個服務全部API的調用。但經過設置斷路器標識符,能夠爲每一個API方法設置惟一的斷路器標識符,以便爲每一個API方法使用單獨的斷路器實例。或者經過在某幾個API方法上設置使用相同的標識符,實現對API調用的斷路保護分組。

下例中,sayHi將使用默認的斷路器,而hiAgain將使用斷路器hello2

def descriptor: Descriptor = {
  import Service._

  named("hello").withCalls(
    namedCall("hi", this.sayHi),
    namedCall("hiAgain", this.hiAgain).withCircuitBreaker(CircuitBreaker.identifiedBy("hello2"))
  )
}

對應的application.conf中配置以下:

lagom.circuit-breaker {
  # will be used by sayHi method
  hello.max-failures = 5

  # will be used by hiAgain method
  hello2 {
    max-failures = 7
    reset-timeout = 30s
  }

  # Change the default call-timeout will be used for both sayHi and hiAgain methods
  default.call-timeout = 5s
}

默認狀況下,Lagom的客戶端會將全部4字頭和5字頭的HTTP響應都映射爲相應的異常,而斷路器會將全部的異常都視做失敗,從而觸發熔斷。經過設置白名單,能夠忽略某些類型的異常。斷路器完整的斷路器配置選項以下:

# Circuit breakers for calls to other services are configured
# in this section. A child configuration section with the same
# name as the circuit breaker identifier will be used, with fallback
# to the `lagom.circuit-breaker.default` section.
lagom.circuit-breaker {
  # Default configuration that is used if a configuration section
  # with the circuit breaker identifier is not defined.
  default {
    # Possibility to disable a given circuit breaker.
    enabled = on

    # Number of failures before opening the circuit.
    max-failures = 10

    # Duration of time after which to consider a call a failure.
    call-timeout = 10s

    # Duration of time in open state after which to attempt to close
    # the circuit, by first entering the half-open state.
    reset-timeout = 15s

    # A whitelist of fqcn of Exceptions that the CircuitBreaker
    # should not consider failures. By default all exceptions are
    # considered failures.
    exception-whitelist = []
  }
}

測試服務

Lightbend推薦使用ScalaTest和Spec2做爲Lagom的測試框架。相似Akka ActorTestKit,可使用Lagom提供的ServiceTest工具包對服務進行測試。

測試單個服務

爲每一個測試建立一個單獨的服務實例,其關鍵步驟包括:

  • 使用Scala的異步測試支持AsyncWordSpec。
  • 使用ServiceTest.withServer(config)(lagomApplication)(testBlock)的結構進行測試。
  • 爲上一步中的lagomApplication混入一個LocalServiceLocator,以啓用默認的定位子服務。
  • 在testBlock中使用一個客戶端進行服務調用。
import com.lightbend.lagom.scaladsl.server.LocalServiceLocator
import com.lightbend.lagom.scaladsl.testkit.ServiceTest
import org.scalatest.AsyncWordSpec
import org.scalatest.Matchers

// 1. 啓用AsyncWordSpec
class HelloServiceSpec extends AsyncWordSpec with Matchers {
  "The HelloService" should {
    // 2. 使用ServiceTest.withServer進行測試
    "say hello" in ServiceTest.withServer(ServiceTest.defaultSetup) { ctx =>
      // 3. 啓用默認的服務定位子
      new HelloApplication(ctx) with LocalServiceLocator
    } { server =>
      // 4. 建立一個客戶端進行服務調用
      val client = server.serviceClient.implement[HelloService]

      client.sayHello.invoke("Alice").map { response =>
        response should ===("Hello Alice!")
      }
    }
  }
}

若要由多個測試共享一個服務實例,則須要使用ServiceTest.startServer()代替withServer(),而後在beforeAll與afterAll中啓動和中止服務。

import com.lightbend.lagom.scaladsl.server.LocalServiceLocator
import com.lightbend.lagom.scaladsl.testkit.ServiceTest
import org.scalatest.AsyncWordSpec
import org.scalatest.Matchers
import org.scalatest.BeforeAndAfterAll

class HelloServiceSpec extends AsyncWordSpec with Matchers with BeforeAndAfterAll {
  lazy val server = ServiceTest.startServer(ServiceTest.defaultSetup) { ctx =>
    new HelloApplication(ctx) with LocalServiceLocator
  }
  lazy val client = server.serviceClient.implement[HelloService]

  "The HelloService" should {
    "say hello" in {
      client.sayHello.invoke("Alice").map { response =>
        response should ===("Hello Alice!")
      }
    }
  }

  protected override def beforeAll() = server

  protected override def afterAll() = server.stop()
}

若要啓用Cluster、PubSub或者Persistence支持,則須要在withServer第1個參數中啓用。若須要調用其餘服務,能夠在第2個參數中構造要調用服務的Stub或者Mock。注意事項包括:

  • 持久化功能只能在Cassandra與JDBC之間兩者擇其一。
  • 使用withCassandra或者withJdbc啓用Persistence後。會自動啓用Cluster
  • 不管自動或手動啓用Cluster,都會啓動PubSub。而PubSub不能手動啓用。
lazy val server = ServiceTest.startServer(ServiceTest.defaultSetup.withCluster) { ctx =>
  new HelloApplication(ctx) with LocalServiceLocator {
    override lazy val greetingService = new GreetingService {
      override def greeting = ServiceCall { _ =>
        Future.successful("Hello")
      }
    }
  }
}

在測試時使用TLS

Lagom沒有爲HTTPS提供客戶端框架,所以只有借用Play-WS、Akka HTTP或者Akka gRPC等框架建立使用SSL鏈接的客戶端。而在服務端,可使用withSsl激活SSL支持,隨後框架將會爲測試端自動打開一個隨機的端口並提供一個javax.net.ssl.SSLContext類型的上下文環境。接下來,客戶端就可使用testServer提供的httpsPortsslContext鏈接到服務端併發送Request。Lagom測試工具提供的證書僅限於CN=localhost,因此該上下文SSLContext也只會信任本地的testServer,這就要求在發送請求時也設置好相應的權限,不然服務器將會拒絕該請求。目前,Lagom還沒法爲測試服務器設置不一樣的SSL證書。

"complete a WS call over HTTPS" in {
  val setup = defaultSetup.withSsl()
  ServiceTest.withServer(setup)(new TestTlsApplication(_)) { server =>
    implicit val actorSystem = server.application.actorSystem
    implicit val ctx         = server.application.executionContext
    // To explicitly use HTTPS on a test you must create a client of your own
    // and make sure it uses the provided SSLContext
    val wsClient = buildCustomWS(server.clientSslContext.get)
    // use `localhost` as authority
    val url = s"https://localhost:${server.playServer.httpsPort.get}/api/sample"
    val response = wsClient.url(url).get().map { _.body[String] }
    whenReady(response, timeout) { r => r should be("sample response") }
  }
}

測試流消息

在測試支持流消息的服務時,須要搭配Akka Streams TestKit進行測試。

"The EchoService" should {
  "echo" in {
    // Use a source that never terminates (concat Source.maybe)
    // so we don't close the upstream, which would close the downstream
    val input = Source(List("msg1", "msg2", "msg3")).concat(Source.maybe)
    client.echo.invoke(input).map { output =>
      val probe = output.runWith(TestSink.probe(server.actorSystem))
      probe.request(10)
      probe.expectNext("msg1")
      probe.expectNext("msg2")
      probe.expectNext("msg3")
      probe.cancel
      succeed
    }
  }
}

測試持久化實體

在服務測試中,能夠經過額外編寫PersistEntityTestDriver,使用持久化實體Persistent Entity進行與數據庫無關的功能測試。

消息序列化器

Play、Akka和Lagom均出自Lightbend,所以Lagom使用Play JSON做爲消息的序列化框架,算是開箱即用了。

選配序列化器

Lagom的序列化器,一般是與服務描述子放在一塊兒的一個MessageSerializer類型的隱式變量。也能夠在withCalls()聲明服務API時,在call、namedCall、pathCall、restCall或者topic方法裏顯式地指定分別用於Request和Response的序列化器。

Lagom經過借用Play的JSON序列化器,對case class進行JSON格式的序列化。該JSON序列化器有一個主要方法是jsValueFormatMessageSerializer,可使用它在case classscompanion object裏指定其餘格式的JSON模板。同時,Lagom的MessageSerializer也爲NotUsedDoneString等類型提供了缺省的非JSON格式的序列化支持,好比MessageSerializer.StringMessageSerializer

trait HelloService extends Service {
  def sayHello: ServiceCall[String, String]

  override def descriptor = {
    import Service._

    named("hello").withCalls(
      call(sayHello)(
        MessageSerializer.StringMessageSerializer,
        MessageSerializer.StringMessageSerializer
      )
    )
  }
}

在companion object裏定義不一樣的JSON格式,隨後在服務描述子中顯式地選擇使用:

import play.api.libs.json._
import play.api.libs.functional.syntax._

case class MyMessage(id: String)

object MyMessage {
  implicit val format: Format[MyMessage] = Json.format
  val alternateFormat: Format[MyMessage] = {
    // 將id映射爲JSON串裏的identifier
    (__ \ "identifier")
      .format[String]
      .inmap(MyMessage.apply, _.id)
  }
}

trait MyService extends Service {
  def getMessage: ServiceCall[NotUsed, MyMessage]
  def getMessageAlternate: ServiceCall[NotUsed, MyMessage]

  override def descriptor = {
    import Service._

    named("my-service").withCalls(
      call(getMessage),
      call(getMessageAlternate)(
        // Request的序列化器
        implicitly[MessageSerializer[NotUsed, ByteString]],
        // Response的序列化器
        MessageSerializer.jsValueFormatMessageSerializer(
          implicitly[MessageSerializer[JsValue, ByteString]],
          // 指定JSON格式
          MyMessage.alternateFormat
        )
      )
    )
  }
}

自定義序列化器

Lagom的trait MessageSerializer也能夠用來實現自定義的序列化器,派生的StrictMessageSerializer和StreamedMessageSerializer分別用於嚴格消息與流消息。其中,嚴格消息的序列化類型爲二進制串ByteString,而流消息的則是Source[ByteString, _]

在實現自定義的序列化器以前,有幾個關鍵性概念:

  • 消息協議 Message Protocols:包括內容類型,字符集和版本等3個可選屬性,這3個屬性將被投射到HTTP頭部信息中的Content-TypeAccept,以及MIME Type Scheme的版本號,或者根據服務的配置方式直接從URL中提取。
  • 內容協商 Content Negotiation:Lagom鏡像了HTTP的內容協商能力,保證服務器一方能根據客戶端發出請求所使用的消息協議,採用對應協議進行通訊。
  • 協商序列化器 Negotiation Serializer:使用內容協商後,MessageSerializer將再也不直接承擔序列化與反序列化職責,改由NegotiatedSerializerNegotiatedDeserializer負責。

內容協商在多數狀況下並非必要的,因此並非必定須要實現的。

第一步:定義不一樣協議對應的序列化與反序列化器
/* ------------ String Protocol ------------ */
import akka.util.ByteString
import com.lightbend.lagom.scaladsl.api.deser.MessageSerializer.NegotiatedSerializer
import com.lightbend.lagom.scaladsl.api.transport.DeserializationException
import com.lightbend.lagom.scaladsl.api.transport.MessageProtocol
import com.lightbend.lagom.scaladsl.api.transport.NotAcceptable
import com.lightbend.lagom.scaladsl.api.transport.UnsupportedMediaType

// 注意:`charset`由構造子傳入,傳遞給MessageProtocol用於定義協議使用的字符集。
class PlainTextSerializer(val charset: String) extends NegotiatedSerializer[String, ByteString] {
  override val protocol = MessageProtocol(Some("text/plain"), Some(charset))

  def serialize(s: String) = ByteString.fromString(s, charset)
}

import com.lightbend.lagom.scaladsl.api.deser.MessageSerializer.NegotiatedDeserializer

class PlainTextDeserializer(val charset: String) extends NegotiatedDeserializer[String, ByteString] {
  def deserialize(bytes: ByteString) =
    bytes.decodeString(charset)
}

/* ------------ JSON Protocol ------------ */
import play.api.libs.json.Json
import play.api.libs.json.JsString

class JsonTextSerializer extends NegotiatedSerializer[String, ByteString] {
  override val protocol = MessageProtocol(Some("application/json"))

  def serialize(s: String) =
    ByteString.fromString(Json.stringify(JsString(s)))
}

import scala.util.control.NonFatal

class JsonTextDeserializer extends NegotiatedDeserializer[String, ByteString] {
  def deserialize(bytes: ByteString) = {
    try {
      Json.parse(bytes.iterator.asInputStream).as[String]
    } catch {
      case NonFatal(e) => throw DeserializationException(e)
    }
  }
}
第二步:集成不一樣協議的序列化器
import com.lightbend.lagom.scaladsl.api.deser.StrictMessageSerializer
import scala.collection.immutable

class TextMessageSerializer extends StrictMessageSerializer[String] {
  // 支持的協議
  override def acceptResponseProtocols = List(
    MessageProtocol(Some("text/plain")),
    MessageProtocol(Some("application/json"))
  )

  /* ------ Serializer -----*/
  // Client發出Request時還不須要協商協議,故使用最簡單的文本協議
  def serializerForRequest = new PlainTextSerializer("utf-8")

  def serializerForResponse(accepted: immutable.Seq[MessageProtocol]) = accepted match {
    case Nil => new PlainTextSerializer("utf-8")
    case protocols =>
      protocols
        .collectFirst {
          case MessageProtocol(Some("text/plain" | "text/*" | "*/*" | "*"), charset, _) =>
            new PlainTextSerializer(charset.getOrElse("utf-8"))
          case MessageProtocol(Some("application/json"), _, _) =>
            new JsonTextSerializer
        }
        .getOrElse {
          throw NotAcceptable(accepted, MessageProtocol(Some("text/plain")))
        }
  }

  /* ------ Deserializer for Client and Service -----*/
  def deserializer(protocol: MessageProtocol) = protocol.contentType match {
    case Some("text/plain") | None =>
      new PlainTextDeserializer(protocol.charset.getOrElse("utf-8"))
    case Some("application/json") =>
      new JsonTextDeserializer
    case _ =>
      // 拋出異常在生產環境並不可取,由於對於諸如WebSocket之類的應用,瀏覽器不容許設置ContentType
      // 此時應返回一個缺省的反序列化器更爲穩當。
      throw UnsupportedMediaType(protocol, MessageProtocol(Some("text/plain")))
  }
}

另外一個用Protocol buffers實現的例子:

import akka.util.ByteString
import com.lightbend.lagom.scaladsl.api.deser.MessageSerializer.NegotiatedDeserializer
import com.lightbend.lagom.scaladsl.api.deser.MessageSerializer.NegotiatedSerializer
import com.lightbend.lagom.scaladsl.api.deser.StrictMessageSerializer
import com.lightbend.lagom.scaladsl.api.transport.MessageProtocol

import scala.collection.immutable

class ProtobufSerializer extends StrictMessageSerializer[Order] {
  private final val serializer = {
    new NegotiatedSerializer[Order, ByteString]() {
      override def protocol: MessageProtocol =
        MessageProtocol(Some("application/octet-stream"))

      def serialize(order: Order) = {
        val builder = ByteString.createBuilder
        order.writeTo(builder.asOutputStream)
        builder.result
      }
    }
  }

  private final val deserializer = {
    new NegotiatedDeserializer[Order, ByteString] {
      override def deserialize(bytes: ByteString) =
        Order.parseFrom(bytes.iterator.asInputStream)
    }
  }

  override def serializerForRequest = serializer
  override def deserializer(protocol: MessageProtocol) = deserializer
  override def serializerForResponse(acceptedMessageProtocols: immutable.Seq[MessageProtocol]) = serializer
}

頭部過濾器

在服務描述子中能夠加入頭部過濾器Header Filter,用於實現協商協議、身份驗證或訪問受權的溝通。過濾器會根據預設的條件,對服務與客戶端雙方通訊的消息進行轉換或修改。

下例是一個典型的過濾器實現。若是沒有特地進行綁定,那麼全部的服務默認都將會使用它,並使用ServicePrincipal來標識帶有服務名稱的客戶端。在客戶端,當Client發出Request時,過濾器將會在頭部附加User-Agent,Lagom默認會自動將服務名稱做爲ServicePrinciple。而在服務端,則會讀取Request中的User-Agent,並將其值設置爲Request的Principle。

⚡ 切記:頭部過濾器僅用於通訊雙方的協商,以肯定雙方採起何種方式進行後續的通訊,而不該用來執行實際的驗證邏輯。驗證邏輯屬於業務邏輯的組成部分,應當放在服務API及其組合當中。

object UserAgentHeaderFilter extends HeaderFilter {
  override def transformClientRequest(request: RequestHeader): RequestHeader = {
    request.principal match {
      case Some(principal: ServicePrincipal) =>
        request.withHeader(HeaderNames.USER_AGENT, principal.serviceName)
      case _ => request
    }
  }

  override def transformServerRequest(request: RequestHeader): RequestHeader = {
    request.getHeader(HeaderNames.USER_AGENT) match {
      case Some(userAgent) =>
        request.withPrincipal(ServicePrincipal.forServiceNamed(userAgent))
      case _ =>
        request
    }
  }

  override def transformServerResponse(response: ResponseHeader,request: RequestHeader): ResponseHeader = response

  override def transformClientResponse(response: ResponseHeader, request: RequestHeader): ResponseHeader = response
}

頭部過濾器的組合

相似服務使用compose進行組合,頭部過濾器也可使用HeaderFilter.composite方法進行組合。

⚡ 注意:過濾器在發送消息與接收消息時適用的順序是恰好相反的。對於Request,越後加入的過濾器越新鮮就越早被運用。

class VerboseFilter(name: String) extends HeaderFilter {
  private val log = LoggerFactory.getLogger(getClass)

  def transformClientRequest(request: RequestHeader) = {
    log.debug(name + " - transforming Client Request")
    request
  }

  def transformServerRequest(request: RequestHeader) = {
    log.debug(name + " - transforming Server Request")
    request
  }

  def transformServerResponse(response: ResponseHeader, request: RequestHeader) = {
    log.debug(name + " - transforming Server Response")
    response
  }

  def transformClientResponse(response: ResponseHeader, request: RequestHeader) = {
    log.debug(name + " - transforming Client Response")
    response
  }
}

/* ----- 按下列順序組合過濾器 ----- */

def descriptor = {
  import Service._
  named("hello")
    .withCalls(
      call(sayHello)
    )
    .withHeaderFilter(
      HeaderFilter.composite(
        new VerboseFilter("Foo"),
        new VerboseFilter("Bar")
      )
    )
}

在服務端,控制檯獲得的輸出將是以下的順序:

[debug] Bar - transforming Server Request
[debug] Foo - transforming Server Request
[debug] Foo - transforming Server Response
[debug] Bar - transforming Server Response

錯誤的處理

Lagom在設計錯誤處理機制時,遵循瞭如下一些原則:

  • 在生產環境下,一個Lagom服務將永遠不會將錯誤的細節暴露給另外一個服務,除非知道這樣作是確定安全的。不然,未經審查和過濾的錯誤信息將有可能由於暴露了服務的內部細節,而被人利用。
  • 在開發環境下,則要儘量地將異常的細節暴露出來,方便發現和定位Bug。
  • 若是可能,一般在客戶端會復刻服務端觸發的異常,以直接反饋服務發生失敗的緣由。
  • 若是可能,一般會將異常映射到HTTP的4字頭或5字頭的協議響應代碼,或者是WebSocket的錯誤關閉代碼。
  • 在斷路器中實現HTTP響應代碼映射,一般被視爲最佳實踐。

異常的序列化

Lagom爲異常提供了trait ExceptionSerializer,用於將異常信息序列化爲JSON等序列化格式,或者是特定的錯誤編碼或響應代碼。ExceptionSerializer會將異常轉換爲RawExceptionMessage,其中包括對應HTTP響應代碼或WebSocket關閉代碼的狀態碼、消息主體,以及一個關於協議的描述子(在HTTP,此處對應響應頭部信息中的Content Type)。

默認的ExceptionSerializer使用Play JSON將異常信息序列化爲JSON格式。除非是在開發模式下,不然它只會返回TransportException派生異常類的詳細信息,最多見的派生類包括NotFoundPolicyViolation。Lagom一般也容許Client拋出這一類的異常,也容許本身建立或實現一個TransportException的派生類實例,不過前提是Client得認識這個異常而且知道如何進行反序列化。

額外的路由

從Lagom 1.5.0版本開始,容許使用Play Router對Lagom的服務進行擴展。該功能在須要將Lagom服務與既存的Play Router進行集成時顯得更爲實用。

在Lagom作DI時,能夠注入額外的Router:

override lazy val lagomServer = serverFor[HelloService](wire[HelloServiceImpl])
  .additionalRouter(wire[SomePlayRouter])

一個關於文件上傳的範例

此例基於ScalaSirdRouter實現,它將爲服務添加一個/api/files的路徑,用於接收支持斷點續傳數據的POST請求。

import play.api.mvc.DefaultActionBuilder
import play.api.mvc.PlayBodyParsers
import play.api.mvc.Results
import play.api.routing.Router
import play.api.routing.sird._

class FileUploadRouter(action: DefaultActionBuilder, parser: PlayBodyParsers) {
  val router = Router.from {
    case POST(p"/api/files") =>
      action(parser.multipartFormData) { request =>
        val filePaths = request.body.files.map(_.ref.getAbsolutePath)
        Results.Ok(filePaths.mkString("Uploaded[", ", ", "]"))
      }
  }
}

override lazy val lagomServer =
  serverFor[HelloService](wire[HelloServiceImpl])
    .additionalRouter(wire[FileUploadRouter].router)

在控制檯使用命令curl -X POST -F "data=@somefile.txt" -v http://localhost:65499/api/files,便可發出上傳請求。

與服務網關有關的注意事項

因爲額外的路由並無在服務描述子中定義,所以當服務使用了服務網關時,這些外部的路由將不會自動被服務網關暴露,所以須要顯式地將它的路徑加入訪問控制列表ACL(Access Control List)裏,而後經過服務網關訪問它們。

trait HelloService extends Service {
  def hello(id: String): ServiceCall[NotUsed, String]

  final override def descriptor = {
    import Service._
    named("hello")
      .withCalls(
        pathCall("/api/hello/:id", hello _).withAutoAcl(true)
      )
      .withAcls(
        // extra ACL to expose additional router endpoint on ServiceGateway
        ServiceAcl(pathRegex = Some("/api/files"))
      )
  }
}

而後在控制檯使用命令curl -X POST -F "data=@somefile.txt" -v http://localhost:9000/api/files訪問它們。

與Lagom客戶端有關的注意事項

因爲額外的路由不是服務API的一部分,因此沒法從Lagom生成的客戶端進行直接的訪問,而只能改用Play-WS之類的客戶端去訪問其暴露的HTTP端點。

編寫可持久化和集羣化的服務

這部分將使用Akka Typed按照DDD的方法實現一個CQRS架構的Lagom服務。

在這個ShoppingCart的示例裏(🔗 完整代碼),使用了Dock做爲容器,包裝了Zookeeper、Kafka和PostGres服務,因此在演示前須要用docker-compose up -d進行初始化。同時,由於Lagom將使用Read-Side ProcessorTopic Producer對AggregateEventTag標記的Event進行消費,因此須要用AkkaTaggerAdapter.fromLagom把AggregateEventTag轉換爲Akka能理解的Tag類型。而在讀端,Lagom提供的ReadSideProcessor,在Cassandra和Relational數據庫插件支持下,能夠爲實現CQRS的讀端提供完整的支持。

⚡ 使用JDBC驅動數據庫存儲Journal時,分片標記數不能超過10。這是該插件已知的一個Bug,若是超過了10,將會致使某些事件被屢次傳遞。

1. 採用相似State Pattern的方式,把State與Handler設計在一塊兒

/* ----- State & Handlers ----- */
final case class ShoppingCart(
    items: Map[String, Int],
    // checkedOutTime defines if cart was checked-out or not:
    // case None, cart is open
    // case Some, cart is checked-out
    checkedOutTime: Option[Instant] = None){
  /* ----- Command Handlers ----- */
  def applyCommand(cmd: ShoppingCartCommand): ReplyEffect[ShoppingCartEvent, ShoppingCart] =
    if (isOpen) {
      cmd match {
        case AddItem(itemId, quantity, replyTo) => onAddItem(itemId, quantity, replyTo)
        case Checkout(replyTo)                  => onCheckout(replyTo)
        case Get(replyTo)                       => onGet(replyTo)
      }
    } else {
      cmd match {
        case AddItem(_, _, replyTo) => Effect.reply(replyTo)(Rejected("Cannot add an item to a checked-out cart"))
        case Checkout(replyTo)      => Effect.reply(replyTo)(Rejected("Cannot checkout a checked-out cart"))
        case Get(replyTo)           => onGet(replyTo)
      }
    }

  private def onAddItem(itemId: String, quantity: Int, replyTo: ActorRef[Confirmation]): ReplyEffect[ShoppingCartEvent, ShoppingCart] = {
    if (items.contains(itemId))
      Effect.reply(replyTo)(Rejected(s"Item '$itemId' was already added to this shopping cart"))
    else if (quantity <= 0)
      Effect.reply(replyTo)(Rejected("Quantity must be greater than zero"))
    else
      Effect
        .persist(ItemAdded(itemId, quantity))
        .thenReply(replyTo)(updatedCart => Accepted(toSummary(updatedCart)))
  }

  private def onCheckout(replyTo: ActorRef[Confirmation]): ReplyEffect[ShoppingCartEvent, ShoppingCart] = {
    if (items.isEmpty)
      Effect.reply(replyTo)(Rejected("Cannot checkout an empty shopping cart"))
    else
      Effect
        .persist(CartCheckedOut(Instant.now()))
        .thenReply(replyTo)(updatedCart => Accepted(toSummary(updatedCart)))
  }

  private def onGet(replyTo: ActorRef[Summary]): ReplyEffect[ShoppingCartEvent, ShoppingCart] = {
    Effect.reply(replyTo)(toSummary(shoppingCart = this))
  }

  private def toSummary(shoppingCart: ShoppingCart): Summary = {
    Summary(shoppingCart.items, shoppingCart.checkedOut)
  }

  /* ----- Event Handlers ----- */
  def applyEvent(evt: ShoppingCartEvent): ShoppingCart =
    evt match {
      case ItemAdded(itemId, quantity)    => onItemAdded(itemId, quantity)
      case CartCheckedOut(checkedOutTime) => onCartCheckedOut(checkedOutTime)
    }

  private def onItemAdded(itemId: String, quantity: Int): ShoppingCart =
    copy(items = items + (itemId -> quantity))

  private def onCartCheckedOut(checkedOutTime: Instant): ShoppingCart = {
    copy(checkedOutTime = Option(checkedOutTime))
  }
}

2. 將Protocol與Factory放在Companion Object裏

Lagom借用Akka Cluster Sharding實現了服務的集羣部署,確保在任意時刻,有且只有一個聚合的實例在集羣內活動,相同類型的多個聚合實例則均勻分佈在各個節點之上。爲此,須要給ShoppingCart設定一個EntityKey,並向其工廠方法傳入EntityContext。

⚡ 在Command-Reply-Event的設計上,要注意區別Reply是回覆調用者的反作用,它能夠是確認Command是否成功執行,或者是返回聚合的內部狀態(要注意區別查詢命令與Read-Side)。而Event纔是因Command而致使聚合發生變化的正做用,是要持久化的。相應的,Effect.Reply()用於聚合未發生變化的場合,而Effect.persist().thenReply()則用於聚合發生變化以後。

/* ----- Factory & Protocols ----- */
object ShoppingCart {
  val empty                                       = ShoppingCart(items = Map.empty)
  val typeKey: EntityTypeKey[ShoppingCartCommand] = EntityTypeKey[ShoppingCartCommand]("ShoppingCart")

  /* ----- Commands ----- */
  trait CommandSerializable

  sealed trait ShoppingCartCommand extends CommandSerializable

  final case class AddItem(itemId: String, quantity: Int, replyTo: ActorRef[Confirmation])
    extends ShoppingCartCommand

  final case class Checkout(replyTo: ActorRef[Confirmation]) extends ShoppingCartCommand

  final case class Get(replyTo: ActorRef[Summary]) extends ShoppingCartCommand

  /* ----- Replies (will not be persisted) ----- */
  sealed trait Confirmation

  final case class Accepted(summary: Summary) extends Confirmation

  final case class Rejected(reason: String) extends Confirmation

  final case class Summary(items: Map[String, Int], checkedOut: Boolean)

  /* ----- Events (will be persisted) ----- */
  sealed trait ShoppingCartEvent extends AggregateEvent[ShoppingCartEvent] {
    override def aggregateTag: AggregateEventTagger[ShoppingCartEvent] = ShoppingCartEvent.Tag
  }

  final case class ItemAdded(itemId: String, quantity: Int) extends ShoppingCartEvent

  final case class CartCheckedOut(eventTime: Instant) extends ShoppingCartEvent

  /* ----- Tag for read-side consuming ----- */
  object ShoppingCartEvent {
    // will produce tags with shard numbers from 0 to 9
    val Tag: AggregateEventShards[ShoppingCartEvent] =
      AggregateEventTag.sharded[ShoppingCartEvent](numShards = 10)
  }

  def apply(entityContext: EntityContext[ShoppingCartCommand]): Behavior[ShoppingCartCommand] = {
    EventSourcedBehavior
      .withEnforcedReplies[ShoppingCartCommand, ShoppingCartEvent, ShoppingCart](
        persistenceId = PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId),
        emptyState = ShoppingCart.empty,
        commandHandler = (cart, cmd) => cart.applyCommand(cmd),
        eventHandler = (cart, evt) => cart.applyEvent(evt)
      )
      // convert tag of Lagom to tag of Akka
      .withTagger(AkkaTaggerAdapter.fromLagom(entityContext, ShoppingCartEvent.Tag))
      // snapshot every 100 events and keep at most 2 snapshots on db
      .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 2))
  }
}

3. 實現Lagom用於啓動服務的ApplicationLoader,集成造成完整應用

class ShoppingCartLoader extends LagomApplicationLoader {
  override def load(context: LagomApplicationContext): LagomApplication =
    new ShoppingCartApplication(context) with AkkaDiscoveryComponents

  override def loadDevMode(context: LagomApplicationContext): LagomApplication =
    new ShoppingCartApplication(context) with LagomDevModeComponents

  override def describeService = Some(readDescriptor[ShoppingCartService])
}

trait ShoppingCartComponents
    extends LagomServerComponents
    with SlickPersistenceComponents
    with HikariCPComponents
    with AhcWSComponents {
  implicit def executionContext: ExecutionContext

  override lazy val lagomServer: LagomServer =
    serverFor[ShoppingCartService](wire[ShoppingCartServiceImpl])
  override lazy val jsonSerializerRegistry: JsonSerializerRegistry =
    ShoppingCartSerializerRegistry

  // Initialize the sharding for the ShoppingCart aggregate.
  // See https://doc.akka.io/docs/akka/2.6/typed/cluster-sharding.html
  clusterSharding.init(
    Entity(ShoppingCart.typeKey) { entityContext =>
      ShoppingCart(entityContext)
    }
  )
}

abstract class ShoppingCartApplication(context: LagomApplicationContext)
    extends LagomApplication(context)
    with ShoppingCartComponents
    with LagomKafkaComponents {}

4. 在服務的實現中在服務的實現中使用Ask模式訪問Actor

服務API的相關操做實際將由幕後的Actor負責實現,所以在服務的實現裏須要訪問Actor的實例,爲此須要經過ClusterSharding.entityRefFor獲取其EntityRef。

class ShoppingCartServiceImpl(
    clusterSharding: ClusterSharding,
    persistentEntityRegistry: PersistentEntityRegistry
  )(implicit ec: ExecutionContext) extends ShoppingCartService {
  def entityRef(id: String): EntityRef[ShoppingCartCommand] =
    clusterSharding.entityRefFor(ShoppingCart.typeKey, id)
}

在得到Reference後,即可使用Ask模式與Actor進行交互。Ask返回的是Future[Response],所以示例將Future[Summary]投射爲ShoppingCartView,方便Read-Side使用。

implicit val timeout = Timeout(5.seconds)

override def get(id: String): ServiceCall[NotUsed, ShoppingCartView] = ServiceCall { _ =>
  entityRef(id)
    .ask(reply => Get(reply))
    .map(cartSummary => asShoppingCartView(id, cartSummary))
}

final case class ShoppingCartItem(itemId: String, quantity: Int)
final case class ShoppingCartView(id: String, items: Seq[ShoppingCartItem], checkedOut: Boolean)

private def asShoppingCartView(id: String, cartSummary: Summary): ShoppingCartView = {
  ShoppingCartView(
    id,
    cartSummary.items.map((ShoppingCartItem.apply _).tupled).toSeq,
    cartSummary.checkedOut
  )
}

5. 其餘一些須要考慮的細節

  • 分片數與節點數的協調:分片數少於節點數,某些節點將無所事事;分片數多於節點數,節點的工做量須要細心平衡。
  • 實體鈍化:讓全部的聚合實例都始終活在內存裏並非高效的辦法。對較長時間內沒有活動的聚合,可使用實體鈍化功能將其暫時從分片節點上移除,須要它時再從新喚醒並加載便可。
  • 數據的序列化:選擇JSON、二進制串等格式進行序列化時,須要注意的是消息中可能包含ActorRef這樣的字段,因此這部份內容要參考Akka指南。

遷移到Akka Persistence Typed

這部份內容是對比上一節直接使用Akka Persistence Typed創建領域模型的方式,改從傳統的Lagom Persistence遷移到Akka Persistence Typed的角度進行了詳細的分步講解。因此,若是是全新開始設計的Lagom的服務,建議直接使用Akka Persistence Typed進行實現,只有此前用Lagom Persistence實現的服務才須要考慮遷移。

因爲內容主要涉及Akka Typed,可參考個人博客內容:

選擇合適的數據庫平臺

Lagom與下列數據庫平臺兼容:

  • Cassandra
  • PostgreSQL
  • MySQL
  • Oracle
  • H2
  • Microsoft SQL Server
  • Couchbase

參考連接:

Cassandra

Cassandra須要至少3個KeySpace:

  • Journal:存儲Event:cassandra-journal.keyspace = my_service_journal
  • Snapshot:存儲快照:cassandra-snapshot-store.keyspace = my_service_snapshot
  • Offset:存儲Read-Side側最近處理的Event:lagom.persistence.read-side.cassandra.keyspace = my_service_read_side

這只是Lagom官方文檔的一小部份內容,算是對如何使用該框架實現服務的初窺,有興趣的請移步官方網站尋找更多的內容。

相關文章
相關標籤/搜索