使用Akka HTTP構建微服務:CDC方法

歡迎你們前往騰訊雲+社區,獲取更多騰訊海量技術實踐乾貨哦~html

本文來自 雲+社區翻譯社,做者 阿小慶

構建微服務並不容易,特別是當微服務變得愈來愈多時,並且好多微服務可能由不一樣的團隊提供和維護,這些微服務彼此交互而且變化很快。java

文檔、團隊交互和測試是得到成功的三大法寶,可是若是用錯誤的方式進行,它們會產生更多的複雜性,而不是一種優點。git

咱們可使用像Swagger(用於文檔),Docker(用於測試環境),Selenium(用於端到端測試)等工具,可是咱們最終仍是會由於更改API而浪費大量時間,由於他們不是說誰適合來使用它們,或者設置合適的環境來執行集成測試,而是須要生產數據(但願是匿名的),但生產數據可能須要很長時間才能完成。github

對全部這些問題都沒有正確的答案,但我認爲有一件事能夠幫助不少人:首先從用戶角度出發!sql

這是什麼意思?通常狀況下,在開發Web應用程序的時候,從模型和流程定義開始,深刻到軟件開發中,都是使用TDD(測試驅動開發)方法:先寫測試,考慮咱們真正想要的,以及咱們如何使用它; 但微服務(microservices)呢?在這種狀況下,它從消費者開始!消費者但願從其餘服務中得到什麼以及它但願如何互動?docker

這就是我說的消費者驅動的契約(CDC)測試。採用這種方法,消費者本身會定義須要的數據格式以及交互細節,並驅動生成一份契約文件。而後生產者根據契約文件來實現本身的邏輯,並在持續集成環境中持續驗證。數據庫

商業案例

好比,咱們但願在「個人圖書館」實現一項新功能,因此咱們須要介紹類別(Categories),而且咱們想知道其中有多少類別。這個想法是將邏輯分紅兩個服務,一個生產者(Producer)提供全部類別的列表,另外一個消費者(Consumer)對其進行計數。json

img

很是容易,但足以建立一個良好的基礎結構和對CDC的理解。api

技術棧

這篇文章,我選擇了Scala做爲語言,Akka HTTP做爲框架。我認爲這是一項很是好的技術,它能夠知足構建微服務所需的全部基本要求:安全

  • 易於實現
  • 快速
  • 健壯性
  • 很好的支持和文檔記錄

在數據方面,我選擇了Slick做爲庫,將數據庫交互和FlyWay抽象爲數據庫遷移框架。它們既健壯又穩定,屢次使用也沒有問題。

最後,也是很重要的一點,測試支持!我喜歡Scala Test,由於它始終是我在Scala的項目的一部分,但咱們的CDC呢?

對於CDC,有一個很是好的框架,可用於多平臺:Pact

經過Pact,咱們能夠定義咱們的消費者契約文件,並根據微服務接口的提供者和消費者進行驗證。我建議花幾分鐘閱讀官方Pact網站的主頁,這很好地詮釋了它背後的道理。

正如我所說的,Pact適用於不少平臺,在咱們的例子中,用Scala編寫Consumer和Producer,咱們只能使用一個實現:Scala-Pact

操做

爲了簡單起見,我已經建立了一個包含消費者和生產者的SBT項目,但它們能夠很容易被分割並用做模板。你能夠在https://github.com/mariniss/m...找到源代碼。

讓咱們以CDC風格開始咱們的微服務實現!首先,咱們必須定義咱們的項目。咱們能夠輕鬆地使用SBT建立一個新的Scala項目並定義build.sbt,以下所示:

build.sbt

name := "myLibrary-contracts"
version := "0.1"
scalaVersion := "2.12.4"
enablePlugins(ScalaPactPlugin)
libraryDependencies ++= Seq(
 //Common dependencies
 "com.typesafe.akka"  %% "akka-stream"             % "2.4.20",
 "com.typesafe.akka"  %% "akka-http"               % "10.0.11", // Akka HTTP項目的標準依賴關係
 "com.typesafe.akka"  %% "akka-http-spray-json"    % "10.0.11", // 用於JSON序列化和反序列化
 "org.slf4j"           % "slf4j-simple"            % "1.7.25",  // 用於日誌記錄
 "org.scalatest"      %% "scalatest"               % "3.0.1"   % "test",  // 測試框架
 "org.scalamock"      %% "scalamock"               % "4.0.0"   % "test",  // 模擬框架
 "com.typesafe.akka"  %% "akka-stream-testkit"     % "2.4.20"  % "test",
 "com.typesafe.akka"  %% "akka-testkit"            % "2.4.20"  % "test",
 "com.typesafe.akka"  %% "akka-http-testkit"       % "10.0.11" % "test",
 "com.itv"            %% "scalapact-argonaut-6-2"  % "2.2.0"   % "test",
 "com.itv"            %% "scalapact-scalatest"     % "2.2.0"   % "test",
 "com.itv"            %% "scalapact-http4s-0-16-2" % "2.2.0"   % "test",
 //Producer dependencies
 "com.typesafe.slick" %% "slick"                   % "3.2.1",
 "com.typesafe.slick" %% "slick-hikaricp"          % "3.2.1",
 "com.h2database"      % "h2"                      % "1.4.196",
 "org.flywaydb"        % "flyway-core"             % "5.0.7"
)
testOptions in Test += Tests.Argument(TestFrameworks.ScalaTest,
 "-y", "org.scalatest.WordSpec",
 "-y", "org.scalatest.FunSpec")
parallelExecution in Test := false

正如你所看到的,Akka HTTP項目的標準依賴關係(通用於提供者和消費者),spry-json用於JSON序列化和反序列化,SL4J用於日誌記錄,scalatest和scalamock做爲測試和模擬框架,以及Scala協議爲CDC測試。

生產者特定的依賴關係僅用於數據庫支持,如您所見,我使用H2(在內存數據庫中),但您能夠輕鬆地將其替換爲其餘數據庫支持。

測試環境也有特定的配置; 只是由於咱們在同一個項目中同時擁有生產者和客戶端,因此並行執行被禁用,因此若是並行執行(咱們稍後會看到它),咱們可能會在Pact文件生成和使用過程當中遇到問題。另外,我已經用兩種不一樣的格式實現了測試,WordSpec和FunSpec,第一次用於全部的單元測試,第二次用於Pact測試,你能夠按你的想法隨意使用。

消費者(Consumer)操做

如今咱們有了基本的項目結構,咱們能夠開始在消費者方面建立Pact測試,因此咱們能夠定義咱們在給定特定場景/狀態時對提供者(Provider)的指望。

*MyLibraryClientPactSpec.scala*

package com.fm.mylibrary.consumer.pact
import com.fm.mylibrary.consumer.MyLibraryClient
import com.fm.mylibrary.model.Category
import com.fm.mylibrary.model.JsonProtocol._
import com.itv.scalapact.ScalaPactForger._
import org.scalatest.{FunSpec, Matchers}
import spray.json._
class MyLibraryClientPactSpec extends FunSpec with Matchers {
 describe("Connecting to the MyLibrary server") {
   it("should be able to fetch the categories"){
     val categories = List(Category("Java"), Category("DevOps"))
     forgePact
       .between("ScalaConsumer")
       .and("myLibraryServer")
       .addInteraction(
         interaction
           .description("Fetching categories")
           .given("Categories: [Java, DevOps]")
           .uponReceiving(
             method = GET,
             path = "/search/category",
             query = None)
           .willRespondWith(
             status = 200,
             headers = Map("Content-Type" -> "application/json"),
             body = categories.toJson.toString())
       )
       .runConsumerTest { mockConfig =>
         val results = new MyLibraryClient().fetchCategories()
         results.isDefined shouldEqual true
         results.get.size shouldEqual 2
         results.get.forall(c => categories.contains(c)) shouldEqual true
       }
   }
 }
}

Scala-pact很是易於使用,這要歸功於ScalaPactForger對象,能夠經過幾行代碼構建契約定義和指望效果,更詳細地說:

  • 契約參與者的定義: .between("ScalaConsumer") .and("myLibraryServer")
  • 參與者之間的相互做用的定義:
.addInteraction(interaction 
.description("Fetching categories") 
.given("Categories: [Java, DevOps]")
.uponReceiving(method = GET,path = "/search/category",query = None)
.willRespondWith( status = 200, headers = Map("Content-Type" -> "application/json"),body = categories.toJson.toString()))givenuponReceivingwillRespondWith

真正重要的是描述系統狀態,其中交互必須如所描述的那樣工做,由消費者uponReceiving執行的請求和預期的響應。同時考慮到全部HTTP元素必須匹配(方法,url,標題,正文和查詢)

  • 用於驗證消費者契約的實際測試的定義: 此代碼將針對之前的方案運行,虛擬服務器將響應 交互部分中定義的惟一HTTP請求(若是響應爲deined),它將驗證消費者(Consumer)是否將按照協議中的規定進行要求。也能夠在消費者(Consumer)處理的結果值上添加更多的檢查(聲明)。

.runConsumerTest { mockConfig =>

val results = new MyLibraryClient().fetchCategories()

results.isDefined shouldEqual true

results.get.size shouldEqual 2

results.get.forall(c => categories.contains(c)) shouldEqual true}

固然,咱們能夠添加更多場景和交互。咱們也能夠爲許多生產者定義更多的契約。我建議經過「基本路徑」和標準錯誤情景來肯定描述正常使用狀況下所需的基本情景和交互狀況,可是留給單元測試全部詳細的測試,以及與它們的實現相關的各類狀況。

如今,您能夠嘗試編譯並執行測試,但因爲咱們沒有客戶端和模型,因此咱們須要添加基本邏輯來讓測試經過。

我認爲咱們能夠經過兩種方式進行,直接構建客戶端(由於咱們已經進行了測試),或者改進咱們客戶端的定義,建立單元測試並以純TDD方式對其進行處理。咱們來看第二個選項:

MyLibraryClientSpec.scala

package com.fm.mylibrary.consumer
import akka.http.scaladsl.model._
import com.fm.mylibrary.model.Category
import scala.concurrent.Future
class MyLibraryClientSpec extends BaseTestAppClient {
 implicit val myLibraryServerUrl:String = "//test"
 "Fetch categories" must {
   "execute the HTTP request to get all categories and returns them" in {
     val request = HttpRequest(HttpMethods.GET, "//test/search/category")
     val responseEntity = HttpEntity(bytes = """[{"name": "Java"}, {"name": "DevOps"}]""".getBytes,
                                     contentType = ContentTypes.`application/json`)
     val response = HttpResponse(status = StatusCodes.OK, entity = responseEntity)
     requestExecutor.expects(request).returning(Future.successful(response))
     val results = new MyLibraryClient().fetchCategories()
     results.isDefined shouldEqual true
     results.get.size shouldEqual 2
     results.get.contains(Category("Java")) shouldEqual true
     results.get.contains(Category("DevOps")) shouldEqual true
   }
 }
}

很是標準的測試; 咱們但願拋出一個MyLibraryClient函數,該函數使用一個外部函數返回一個「Category」對象列表,該函數接受一個HttpRequest並返回一個HttpResponse。

正如你所看到的,沒有明確提供這種外部依賴; 那是由於我想把它做爲一個「隱含」價值。這是一種幫助建立可測試代碼的方法,但我強烈建議不要使用它,由於它會使代碼難以閱讀,特別是對於那些新的Scala。

我也喜歡定義一個具備全部必要依賴項的特徵來輕鬆構建測試用例:

BaseTestAppClient.scala

package com.fm.mylibrary.consumer
import akka.actor.ActorSystem
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream.ActorMaterializer
import akka.testkit.{ImplicitSender, TestKit}
import org.scalamock.scalatest.MockFactory
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
import scala.concurrent.{ExecutionContextExecutor, Future}
class BaseTestAppClient extends TestKit(ActorSystem("BaseTestAppClient"))
           with WordSpecLike
           with ImplicitSender
           with Matchers
           with BeforeAndAfterAll
           with MockFactory {
 implicit val actorSystem: ActorSystem = system
 implicit val materializer: ActorMaterializer = ActorMaterializer()(system)
 implicit val executionContext: ExecutionContextExecutor = system.dispatcher
 implicit val requestExecutor = mockFunction[HttpRequest, Future[HttpResponse]]
 override def afterAll {
   TestKit.shutdownActorSystem(system)
 }
}

它定義了在咱們的測試中使用的actor系統和執行HTTP請求的函數。

如今咱們有了測試,讓咱們來實現一些邏輯:

MyClientLibrary.scala

package com.fm.mylibrary.consumer
import akka.actor.ActorSystem
import akka.http.scaladsl.client.RequestBuilding
import akka.http.scaladsl.model.{HttpRequest, HttpResponse, StatusCodes}
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.ActorMaterializer
import com.fm.mylibrary.model.Category
import com.fm.mylibrary.model.JsonProtocol._
import scala.concurrent.{ExecutionContextExecutor, Future}
class MyLibraryClient(implicit val myLibraryServerUrl: String,
                      implicit val actorSystem: ActorSystem,
                      implicit val materializer: ActorMaterializer,
                      implicit val executionContext: ExecutionContextExecutor,
                      implicit val requestExecutor: HttpRequest => Future[HttpResponse]) extends BaseHttpClient {
 def fetchCategories(): Option[List[Category]] = executeSyncRequest(
   RequestBuilding.Get(s"$myLibraryServerUrl/search/category"),
   response =>
     if(response.status == StatusCodes.OK)
       Unmarshal(response.entity).to[Option[List[Category]]]
     else
       Future.successful(None)
 )
}

Category.scala

package com.fm.mylibrary.model
case class Category (name: String)

這個相對容易實現。而且我使用了隱式聲明依賴關係,但能夠顯性地提升代碼的可讀性。

接下來我建立了一個特徵,它爲每一個HTTP客戶端(如今只有一個)定義了基本組件,並具備一個以同步方式執行HTTP請求的功能:

BaseHttpClient.scala

package com.fm.mylibrary.consumer
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream.ActorMaterializer
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContextExecutor, Future}
import scala.language.postfixOps
trait BaseHttpClient {
 implicit def actorSystem: ActorSystem
 implicit def materializer: ActorMaterializer
 implicit def executionContext: ExecutionContextExecutor
 implicit def requestExecutor: HttpRequest => Future[HttpResponse]
 val awaitTime: FiniteDuration = 5000 millis
 def executeSyncRequest[T](request: HttpRequest, responseHandler: HttpResponse => Future[T]): T = {
   val response: Future[T] = requestExecutor(request).flatMap({ response =>
     responseHandler(response)
   })
   Await.result(response, awaitTime)
 }
}

如今咱們很好地執行單元測試,若是咱們沒有犯錯誤,咱們應該獲得一個成功的執行。隨意添加更多測試並重構客戶端以便根據您的喜愛調整結構(您能夠在此處找到更多測試)。

咱們也能夠嘗試執行Pact test(MyLibraryClientPactSpec),但它會失敗,由於它應該執行一個真正的HTTP調用,scala-pact框架將啓動一個真實的HTTP服務器,接受和響應協議中描述的請求。

咱們差很少完成了咱們想要的實現,它基本上是定義了actor系統和執行HTTP調用的函數的元素:

MyLibraryAppClient.scala

package com.fm.mylibrary.consumer.app
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream.ActorMaterializer
import scala.concurrent.{ExecutionContextExecutor, Future}
object MyLibraryAppClient {
 implicit val actorSystem: ActorSystem = ActorSystem()
 implicit val materializer: ActorMaterializer = ActorMaterializer()
 implicit val executionContext: ExecutionContextExecutor = actorSystem.dispatcher
 implicit val requestExecutor: HttpRequest => Future[HttpResponse] = Http().singleRequest(_)
}

它是一個對象,因此咱們能夠將它導入到任何咱們必須使用咱們的客戶端的地方,正如您在Pact測試中看到的那樣: import com.fm.mylibrary.consumer.app.MyLibraryAppClient._

固然,您可使用其餘方法,但請在選擇時保持一致,並避免在相同或相似項目中使用不一樣的方法/結構。

咱們終於能夠執行協議測試了!若是你很幸運,你應該獲得這樣的輸出:

> Adding interactions:
> - Interaction(None,Some(Categories: [Java, DevOps]),Fetching categories,InteractionRequest(Some(GET),Some(/search/category),None,None,None,None),InteractionResponse(Some(200),Some(Map(Content-Type -> application/json)),Some([{"name":"Java"},{"name":"DevOps"}]),None))
[ScalaTest-run-running-MyLibraryClientPactSpec] INFO org.http4s.blaze.channel.nio1.NIO1SocketServerGroup - Service bound to address /127.0.0.1:55653
> ScalaPact stub running at: http://localhost:55653
[blaze-nio1-acceptor] INFO org.http4s.blaze.channel.ServerChannelGroup - Connection to /127.0.0.1:55666 accepted at Tue Feb 13 11:43:08 GMT 2018.
[http4s-blaze-client-1] INFO org.http4s.client.PoolManager - Shutting down connection pool: allocated=1 idleQueue.size=1 waitQueue.size=0
[DEBUG] [02/13/2018 11:43:09.376] [ScalaTest-run-running-MyLibraryClientPactSpec] [EventStream(akka://default)] logger log1-Logging$DefaultLogger started
[DEBUG] [02/13/2018 11:43:09.377] [ScalaTest-run-running-MyLibraryClientPactSpec] [EventStream(akka://default)] Default Loggers started
[DEBUG] [02/13/2018 11:43:09.595] [ScalaTest-run-running-MyLibraryClientPactSpec] [AkkaSSLConfig(akka://default)] Initializing AkkaSSLConfig extension...
[DEBUG] [02/13/2018 11:43:09.598] [ScalaTest-run-running-MyLibraryClientPactSpec] [AkkaSSLConfig(akka://default)] buildHostnameVerifier: created hostname verifier: com.typesafe.sslconfig.ssl.DefaultHostnameVerifier@db2cd5
[DEBUG] [02/13/2018 11:43:09.834] [default-akka.actor.default-dispatcher-5] [default/Pool(shared->http://localhost:55653)] (Re-)starting host connection pool to localhost:55653
[DEBUG] [02/13/2018 11:43:10.123] [default-akka.actor.default-dispatcher-5] [default/Pool(shared->http://localhost:55653)] InputBuffer (max-open-requests = 32) now filled with 1 request after enqueuing GET /search/category Empty
[DEBUG] [02/13/2018 11:43:10.127] [default-akka.actor.default-dispatcher-2] [default/Pool(shared->http://localhost:55653)] [0] Unconnected -> Loaded(1)
[DEBUG] [02/13/2018 11:43:10.137] [default-akka.actor.default-dispatcher-2] [default/Pool(shared->http://localhost:55653)] [0] <unconnected> Establishing connection...
[DEBUG] [02/13/2018 11:43:10.167] [default-akka.actor.default-dispatcher-2] [default/Pool(shared->http://localhost:55653)] [0] <unconnected> pushing request to connection: GET /search/category Empty
[DEBUG] [02/13/2018 11:43:10.179] [default-akka.actor.default-dispatcher-2] [akka://default/system/IO-TCP/selectors/$a/0] Resolving localhost before connecting
[DEBUG] [02/13/2018 11:43:10.200] [default-akka.actor.default-dispatcher-5] [akka://default/system/IO-DNS] Resolution request for localhost from Actor[akka://default/system/IO-TCP/selectors/$a/0#871918912]
[DEBUG] [02/13/2018 11:43:10.209] [default-akka.actor.default-dispatcher-5] [akka://default/system/IO-TCP/selectors/$a/0] Attempting connection to [localhost/127.0.0.1:55653]
[blaze-nio1-acceptor] INFO org.http4s.blaze.channel.ServerChannelGroup - Connection to /127.0.0.1:55669 accepted at Tue Feb 13 11:43:10 GMT 2018.
[DEBUG] [02/13/2018 11:43:10.212] [default-akka.actor.default-dispatcher-5] [akka://default/system/IO-TCP/selectors/$a/0] Connection established to [localhost:55653]
[DEBUG] [02/13/2018 11:43:10.291] [default-akka.actor.default-dispatcher-5] [default/Pool(shared->http://localhost:55653)] [0] </127.0.0.1:55669->localhost:55653> Received response: GET /search/category Empty -> 200 OK Strict(35 bytes)
[DEBUG] [02/13/2018 11:43:10.296] [default-akka.actor.default-dispatcher-8] [default/Pool(shared->http://localhost:55653)] [0] </127.0.0.1:55669->localhost:55653> Finished reading response entity for GET /search/category Empty -> 200 OK Strict(35 bytes)
[DEBUG] [02/13/2018 11:43:10.298] [default-akka.actor.default-dispatcher-5] [default/Pool(shared->http://localhost:55653)] [0] Loaded(1) -> Idle
[ScalaTest-run-running-MyLibraryClientPactSpec] INFO org.http4s.blaze.channel.ServerChannel - Closing NIO1 channel /127.0.0.1:55653 at Tue Feb 13 11:43:10 GMT 2018
[ScalaTest-run-running-MyLibraryClientPactSpec] INFO org.http4s.blaze.channel.nio1.NIO1SocketServerGroup - Closing NIO1SocketServerGroup
[blaze-nio1-acceptor] INFO org.http4s.blaze.channel.nio1.SelectorLoop - Shutting down SelectorLoop blaze-nio-fixed-selector-pool-0
[blaze-nio1-acceptor] INFO org.http4s.blaze.channel.nio1.SelectorLoop - Shutting down SelectorLoop blaze-nio-fixed-selector-pool-1
[blaze-nio1-acceptor] INFO org.http4s.blaze.channel.nio1.SelectorLoop - Shutting down SelectorLoop blaze-nio-fixed-selector-pool-2
[blaze-nio1-acceptor] INFO org.http4s.blaze.channel.nio1.SelectorLoop - Shutting down SelectorLoop blaze-nio-fixed-selector-pool-3
[blaze-nio1-acceptor] INFO org.http4s.blaze.channel.nio1.SelectorLoop - Shutting down SelectorLoop blaze-nio-fixed-selector-pool-4
[DEBUG] [02/13/2018 11:43:10.355] [default-akka.actor.default-dispatcher-3] [default/Pool(shared->http://localhost:55653)] [0] </127.0.0.1:55669->localhost:55653> connection was closed by peer while no requests were in flight
[DEBUG] [02/13/2018 11:43:10.360] [default-akka.actor.default-dispatcher-3] [default/Pool(shared->http://localhost:55653)] [0] Idle -> Unconnected
Process finished with exit code 0

我已經使用IntelliJ IDEA CE來執行測試,可是您能夠直接使用這些命令來使用sbt:

  • sbt test:它執行擴展了FunSpec和WordSpec的全部測試(如在build.sbt定義)
  • sbt pactTest:它執行全部pacts測試

該測試驗證了消費者協議,並生成提供者必須遵照的契約/協議。你能夠找到它們,它們是遵循特定Pact結構的JSON文件。生成的應該是這樣的:target/pacts

ScalaConsumer_myLibraryServer.json

{
 "provider" : {
   "name" : "myLibraryServer"
 },
 "consumer" : {
   "name" : "ScalaConsumer"
 },
 "interactions" : [
   {
     "request" : {
       "method" : "GET",
       "path" : "/search/category"
     },
     "description" : "Fetching categories",
     "response" : {
       "status" : 200,
       "headers" : {
         "Content-Type" : "application/json"
       },
       "body" : [
         {
           "name" : "Java"
         },
         {
           "name" : "DevOps"
         }
       ]
     },
     "providerState" : "Categories: [Java, DevOps]"
   }
 ]
}

正如你所看到的,這很是簡單,兩個參與者(提供者和消費者)的定義與可能的交互。

迄今爲止已經很好好。但您能夠添加更多的邏輯,更多的客戶端,更多的契約,更多的服務等.Git倉庫中的項目還包含一個小型服務,其中包含業務邏輯,計算類別的詳細任務。這裏是代碼: CategoriesServiceSpec.scala

package com.fm.mylibrary.consumer.service
import com.fm.mylibrary.consumer.MyLibraryClient
import com.fm.mylibrary.model.Category
import org.scalamock.scalatest.MockFactory
import org.scalatest.{Matchers, WordSpec}
class CategoriesServiceSpec extends WordSpec with Matchers with MockFactory {
 private val mockMyLibraryClient = mock[MyLibraryClient]
 private val service = new CategoriesService(mockMyLibraryClient)
 "Count Categories" must {
   "return the number of all categories fetched form MyLibrary" in {
     val javaCategory = Category("Java")
     val devopsCategory = Category("DevOps")
     (mockMyLibraryClient.fetchCategories _).expects().returning(Some(List(javaCategory, devopsCategory)))
     val result = service.countCategories()
     result shouldBe 2
   }
   "return 0 in case of the fetch form MyLibrary fails" in {
     (mockMyLibraryClient.fetchCategories _).expects().returning(None)
     val result = service.countCategories()
     result shouldBe 0
   }
 }
}

CategoriesService.scala

package com.fm.mylibrary.consumer.service
import com.fm.mylibrary.consumer.MyLibraryClient
class CategoriesService(val myLibraryClient: MyLibraryClient) extends {
 def countCategories(): Int = myLibraryClient.fetchCategories() match {
   case None => 0
   case Some(categories) =>
     categories.size
 }
}

我沒有使用任何依賴注入框架,由於我相信,若是微服務須要一個DI框架,那會使它變得很是龐大而複雜,可是若是你不像我這樣想,能夠隨意使用它。我過去使用過Google Guice,看起來至關不錯。

生產者(Provider)實現

一旦咱們用契約文件定義了咱們的消費者(Consumer),咱們就能夠轉移到生產者並使用消費者產生的關聯來實現它。

與往常同樣,咱們從測試開始。至於生產者,咱們將有兩種類型的測試,一種是驗證協議,另外一種是詳細驗證業務邏輯(單元測試)。服務器的實現一般比客戶端要大得多,因此我認爲最好從單元測試開始,一旦咱們有了一個完整的應用程序,咱們就能夠建立測試來驗證pact(或契約)。

另外,我老是建議採用增量方法(即便是小型項目),因此在這種狀況下,咱們能夠構建一個服務器來公開一個API並返回兩個類別的靜態列表(如Pact文件中定義的),而後添加配置支持,數據庫支持,遷移支持等。

在這裏,咱們將對咱們的API進行單元測試:

CategoriesRoutesSpec.scala

package com.fm.mylibrary.producer
import com.fm.mylibrary.model.Category
import com.fm.mylibrary.model.JsonProtocol._
class CategoriesRoutesSpec extends BaseTestAppServer {
 "The service" should {
   "return an empty JSon array if there are no categories" in {
     Get("/search/category") ~> routes ~> check {
       responseAs[List[Category]] shouldBe List(Category("DevOps"), Category("Java"))
     }
   }
 }
}

以及具備全部測試依賴性的基本測試類BaseTestAppServer:

BaseTestAppServer.scala

package com.fm.mylibrary.producer
import akka.http.scaladsl.testkit.ScalatestRouteTest
import org.scalamock.scalatest.MockFactory
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec}
import scala.concurrent.ExecutionContextExecutor
class BaseTestAppServer extends WordSpec
           with ScalatestRouteTest
           with Matchers
           with MockFactory
           with Routes
           with BeforeAndAfterAll {
 implicit val executionContext: ExecutionContextExecutor = system.dispatcher
}

該測試是使用Akka HTTP Route TestKit實現的,您能夠在這裏找到官方文檔,它容許在這種格式的路由上構建測試:

REQUEST ~> ROUTE ~> check {
    ASSERTIONS 
}

BaseTestAppServer的類包含基本的依賴WordSpecScalatestRouteTestMatchersMockFactoryBeforeAndAfterAll和定義應用程序的路由的性狀:Routes

固然它不會編譯也不會傳遞,由於尚未實現,因此讓咱們定義咱們的路由:

Routes.scala

package com.fm.mylibrary.producer
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.stream.Materializer
import com.fm.mylibrary.model.Category
import scala.concurrent.ExecutionContext
import spray.json._
import com.fm.mylibrary.model.JsonProtocol._
trait Routes {
 implicit val materializer: Materializer
 implicit val executionContext: ExecutionContext
 val searchRoutes: Route = {
   pathPrefix("search" / "category") {
     get {
       complete(
         List(Category("DevOps"), Category("Java")).toJson
       )
     }
   }
 }
 val routes: Route = searchRoutes
}

我爲json編組/解組使用了spray-json,而且它須要定義用於轉換的協議(或格式),您能夠在代碼import com.fm.mylibrary.model.JsonProtocol._中看到此對象的導入:; 還須要導入其中import spray.json._提供轉換的全部功能; 在這種狀況下,我正在使用toJson尋找它將要轉換的特定對象的協議(或格式)的隱式定義。

JsonProtocol.scala

package com.fm.mylibrary.model
import spray.json._
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
object JsonProtocol extends SprayJsonSupport with DefaultJsonProtocol {
 implicit val categoryFormat = jsonFormat1(Category)
}

沒有必要爲對象定義轉換器ListArrayOptions,等等,由於它們是由DefaultJsonProtocol中的,spry-json提供。

還有其餘相似的庫,如ArgonautJSON4S,能夠按你想法評估全部這些庫,並選擇最適合您需求的庫。

若是咱們再次執行測試,咱們如今應該獲得一條綠線。再次,添加更多的測試,以涵蓋每個案例。在此以前,爲了檢查咱們的服務是否符合消費者契約,咱們必須完成定義Akka HTTP應用程序的基本服務:

MyLibraryAppServer.scala

package com.fm.mylibrary.producer.app
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.directives.DebuggingDirectives
import akka.stream.ActorMaterializer
import com.fm.mylibrary.producer.Routes
import scala.concurrent.ExecutionContextExecutor
import scala.util.{Failure, Success}
object MyLibraryAppServer extends App
         with Routes
         with DebuggingDirectives {
 implicit val actorSystem: ActorSystem = ActorSystem()
 implicit val materializer: ActorMaterializer = ActorMaterializer()
 implicit val executionContext: ExecutionContextExecutor = actorSystem.dispatcher
 val log = actorSystem.log
 def startApplication(): Unit = {
   Http().bindAndHandle(handler = logRequestResult("log")(routes), interface = "localhost", port = 9000).onComplete {
     case Success(b) => log.info(s"application is up and running at ${b.localAddress.getHostName}:${b.localAddress.getPort}")
     case Failure(e) => log.error(s"could not start application: {}", e.getMessage)
   }
 }
 def stopApplication(): Unit = {
   actorSystem.terminate()
 }
 startApplication()
}

這個類定義了兩個方法,一個是啓動咱們的服務器所必需的,另外一個是中止服務器的方法,它還定義了將在路由處理中使用的actor系統和執行上下文。

它擴展了提供主要方法的特徵scala.App,因此你能夠執行這個類,它將啓動一個提供定義路由的http服務器。

但首先,讓咱們來檢查一下協議是否被知足,咱們能夠很容易地用這樣的測試類來驗證它:

MyLibraryServerPactSpec.scala

package com.fm.mylibrary.producer.pact
import com.fm.mylibrary.producer.app.MyLibraryAppServer
import org.scalatest.{BeforeAndAfterAll, FunSpec, Matchers}
import com.itv.scalapact.ScalaPactVerify._
class MyLibraryServerPactSpec extends FunSpec with Matchers with BeforeAndAfterAll {
 override def beforeAll() {
   MyLibraryAppServer.main(Array())
 }
 override def afterAll() {
   MyLibraryAppServer.stopApplication()
 }
 describe("Verifying MyLibrary server") {
   it("should be able to respect the contract"){
     verifyPact
       .withPactSource(loadFromLocal("target/pacts"))
       .noSetupRequired
       .runVerificationAgainst("localhost", 9999)
   }
 }
}

它使用能夠以像相似forgePact方式使用的對象verifyPact,Pact文件的來源target/pacts在咱們的例子中定義(但能夠是共享位置或Pact Broker),設置執行所需的數據或環境所需的最終代碼全部交互,而後是服務器正在偵聽請求的主機和端口。

所以,根據Consumer測試,咱們但願scala-pact執行真正的HTTP調用,因此咱們須要設置應用程序以處理此調用。咱們能夠經過多種方式作到這一點,我爲我選擇了安全和簡單的解決方案,即在生產中啓動服務器,調用以前執行測試MyLibraryAppServer的主要方法,而且以後關閉它。若是應用程序很簡單,咱們可使用這種方法,若是不是這樣,咱們能夠爲這種測試實現特定的測試運行器,但我建議儘量與生產案例相似。

執行測試,咱們應該獲得一個pass和一個這樣的輸出:

[DEBUG] [02/13/2018 16:45:09.053] [ScalaTest-run] [EventStream(akka://default)] logger log1-Logging$DefaultLogger started
[DEBUG] [02/13/2018 16:45:09.054] [ScalaTest-run] [EventStream(akka://default)] Default Loggers started
[DEBUG] [02/13/2018 16:45:09.110] [ScalaTest-run] [AkkaSSLConfig(akka://default)] Initializing AkkaSSLConfig extension...
[DEBUG] [02/13/2018 16:45:09.112] [ScalaTest-run] [AkkaSSLConfig(akka://default)] buildHostnameVerifier: created hostname verifier: com.typesafe.sslconfig.ssl.DefaultHostnameVerifier@1bb571c
[DEBUG] [02/13/2018 16:45:10.244] [default-akka.actor.default-dispatcher-3] [akka://default/system/IO-TCP/selectors/$a/0] Successfully bound to /127.0.0.1:9000
[INFO] [02/13/2018 16:45:10.256] [default-akka.actor.default-dispatcher-3] [akka.actor.ActorSystemImpl(default)] application is up and running at 127.0.0.1:9000
Attempting to use local pact files at: 'target/pacts'
Looking for pact files in: target/pacts
Found directory: C:\Dev\git-1.0.6\home\src-rnd\myLibrary-contracts\target\pacts
Loading pact file: ScalaConsumer_myLibraryServer.json
Verifying against 'localhost' on port '9000' with a timeout of 2 second(s).
--------------------
Attempting to run provider state: Categories: [Java, DevOps]
Provider state ran successfully
--------------------
[DEBUG] [02/13/2018 16:45:10.883] [default-akka.actor.default-dispatcher-4] [akka://default/system/IO-TCP/selectors/$a/0] New connection accepted
[DEBUG] [02/13/2018 16:45:11.146] [default-akka.actor.default-dispatcher-2] [akka.actor.ActorSystemImpl(default)] log: Response for
  Request : HttpRequest(HttpMethod(GET),http://localhost:9000/search/category,List(Host: localhost:9000, User-Agent: scala-pact/0.16.2, Timeout-Access: <function1>),HttpEntity.Strict(none/none,ByteString()),HttpProtocol(HTTP/1.1))
  Response: Complete(HttpResponse(200 OK,List(),HttpEntity.Strict(application/json,[{"name":"DevOps"},{"name":"Java"}]),HttpProtocol(HTTP/1.1)))
[http4s-blaze-client-1] INFO org.http4s.client.PoolManager - Shutting down connection pool: allocated=1 idleQueue.size=1 waitQueue.size=0
[DEBUG] [02/13/2018 16:45:11.262] [default-akka.actor.default-dispatcher-2] [akka://default/system/IO-TCP/selectors/$a/1] Closing connection due to IO error java.io.IOException: An existing connection was forcibly closed by the remote host
Results for pact between ScalaConsumer and myLibraryServer
 - [  OK  ] Fetching categories
[DEBUG] [02/13/2018 16:45:11.391] [default-akka.actor.default-dispatcher-9] [EventStream] shutting down: StandardOutLogger started
[DEBUG] [02/13/2018 16:45:11.391] [default-akka.actor.default-dispatcher-7] [akka://default/system/IO-TCP/selectors/$a/0] Monitored actor [Actor[akka://default/user/StreamSupervisor-0/$a#-487633161]] terminated
Process finished with exit code 0

若是你不能執行,請確保在其中包含協議文件。target/pactsMyLibraryClientPactSpec

消費者協議彷佛受到尊重,因此咱們能夠繼續實現,添加外部配置文件,數據庫支持和數據庫遷移支持。

添加外部配置是很容易的,只須要在建立文件下,配置它全部的配置值,即:application.confsrc/main/resources

application.conf

akka {
 loglevel = DEBUG
}
http {
 interface = "0.0.0.0"
 port = 9000
}
database = {
 url = "jdbc:h2:mem:test;DB_CLOSE_DELAY=-1"
 driver = org.h2.Driver
 connectionPool = disabled
 keepAliveConnection = true
 user = "sa"
 password = ""
}

而後,您能夠建立一個處理它的特徵,從而加載配置和相應的命名常量:

Config.scala

package com.fm.mylibrary.producer
import com.typesafe.config.ConfigFactory
trait Config {
 private val config = ConfigFactory.load()
 private val httpConfig = config.getConfig("http")
 private val databaseConfig = config.getConfig("database")
 val httpInterface: String = httpConfig.getString("interface")
 val httpPort: Int = httpConfig.getInt("port")
 val databaseUrl: String = databaseConfig.getString("url")
 val databaseUser: String = databaseConfig.getString("user")
 val databasePassword: String = databaseConfig.getString("password")
}

默認狀況下,ConfigFactory.load()src/main/resources/application.conf該位置加載配置

咱們也能夠將測試的配置版本放在:src/test/resources

application.conf

akka {
 loglevel = DEBUG
}
http {
 interface = "localhost"
 port = 9999
}
database = {
 url = "jdbc:h2:mem:test;DB_CLOSE_DELAY=-1"
 driver = org.h2.Driver
 connectionPool = disabled
 keepAliveConnection = true
 user = "sa"
 password = ""
}

在這種狀況下沒有太大的不一樣,由於我正在使用內存數據庫。

在主類中使用它很是容易; 只需將其添加爲類特徵,並將靜態值替換爲相應的常量便可:

MyLibraryAppServer.scala

package com.fm.mylibrary.producer.app
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.directives.DebuggingDirectives
import akka.stream.ActorMaterializer
import com.fm.mylibrary.producer.{Config, Routes}
import scala.concurrent.ExecutionContextExecutor
import scala.util.{Failure, Success}
object MyLibraryAppServer extends App
         with Routes
         with Config
         with DebuggingDirectives {
 implicit val actorSystem: ActorSystem = ActorSystem()
 implicit val materializer: ActorMaterializer = ActorMaterializer()
 implicit val executionContext: ExecutionContextExecutor = actorSystem.dispatcher
 val log = actorSystem.log
 def startApplication(): Unit = {
   Http().bindAndHandle(handler = logRequestResult("log")(routes), interface = httpInterface, port = httpPort).onComplete {
     case Success(b) => log.info(s"application is up and running at ${b.localAddress.getHostName}:${b.localAddress.getPort}")
     case Failure(e) => log.error(s"could not start application: {}", e.getMessage)
   }
 }
 def stopApplication(): Unit = {
   actorSystem.terminate()
 }
 startApplication()
}

您也能夠在Pact測試中使用該配置,以便使用正確的服務器地址:

MyLibraryServerPactSpec.scala

package com.fm.mylibrary.producer.pact
import com.fm.mylibrary.producer.Config
import com.fm.mylibrary.producer.app.MyLibraryAppServer
import org.scalatest.{BeforeAndAfterAll, FunSpec, Matchers}
import com.itv.scalapact.ScalaPactVerify._
class MyLibraryServerPactSpec extends FunSpec with Matchers with BeforeAndAfterAll with Config {
 override def beforeAll() {
   MyLibraryAppServer.main(Array())
 }
 override def afterAll() {
   MyLibraryAppServer.stopApplication()
 }
 describe("Verifying MyLibrary server") {
   it("should be able to respect the contract"){
     verifyPact
       .withPactSource(loadFromLocal("target/pacts"))
       .noSetupRequired
       .runVerificationAgainst(httpInterface, httpPort)
   }
 }
}

如今咱們終於能夠經過遷移來添加數據庫支持。

首先,咱們必須定義咱們的實體(或表),在咱們的例子中,咱們只須要一個:Category

CategoryEntity.scala

package com.fm.mylibrary.producer.entity
import com.fm.mylibrary.model.Category
import slick.jdbc.H2Profile.api._
trait CategoryEntity {
 class Categories(tag: Tag) extends Table[Category](tag, "CATEGORY") {
   def name = column[String]("NAME", O.PrimaryKey)
   def * = name  <> (Category.apply, Category.unapply)
 }
 protected val categories = TableQuery[Categories]
}

這是一個標準的光滑表格定義; 你能夠看到這個表只有一列也是主鍵,它和類的類別有關Table[Category]

它能夠從Category類中實例化,如定義:def * = name <> (Category.apply, Category.unapply),確保模型類同時實現了apply和unapply,最簡單的方法是定義模型類的案例類

最後一條指令是定義TableQuery對象,該對象對於該表執行任何類型的查詢都是必需的。讓咱們來定義咱們的任何數據庫交互的主要入口點,我已經實現了它能夠被任何類須要數據庫訪問使用的特徵:

DatabaseSupport.scala

package com.fm.mylibrary.producer.db
import slick.jdbc.H2Profile
import slick.jdbc.H2Profile.api._
trait DatabaseSupport {
 val db: H2Profile.backend.Database = Database.forConfig("database")
 def closeDB(): Unit = db.close
}

咱們如今能夠定義在類別表DAO上操做所必需的圖層。我已經在CategoryEntity的相同的文件中建立了它,可是若是您想要使用不一樣的包,則能夠將它移動到不一樣的文件中:

CategoryEntity.scala

package com.fm.mylibrary.producer.entity
import com.fm.mylibrary.model.Category
import com.fm.mylibrary.producer.db.DatabaseSupport
import slick.jdbc.H2Profile.api._
import scala.concurrent.Future
trait CategoryEntity {
 class Categories(tag: Tag) extends Table[Category](tag, "CATEGORY") {
   def name = column[String]("NAME", O.PrimaryKey)
   def * = name  <> (Category.apply, Category.unapply)
 }
 protected val categories = TableQuery[Categories]
}
class CategoryDAO extends CategoryEntity with DatabaseSupport {
 def insertOrUpdate(category: Category): Future[Int] =
       db.run(categories.insertOrUpdate(category))
 def findAll(): Future[Seq[Category]] =
       db.run(categories.result)
}

CategoryDAO同時擴展DatabaseSupportCategoryEntity,首先是要得到分類表查詢的對象,第二個是要獲得數據庫實例用來執行查詢。

我只實現了兩種方法,對咱們的測試來講已經足夠了。正如您所看到的,我使用Slick提供的基本方法,而且因爲實體Categories和模型Category相互關聯,所以DAO能夠直接返回模型而不顯式轉換。您能夠在官方文檔中找到更多關於如何在Slick中實現實體和DAO的示例和信息。

若是他們實現庫提供的標準查詢,我一般不會實現DAO測試,我沒有看到測試外部庫方法的任何一點,而且它們已經被路由測試覆蓋了。可是,若是DAO實現了涉及多個表的複雜查詢,我強烈建議對全部可能的案例進行單元測試。

爲了如今開始咱們的應用程序,須要一個帶有分類表的數據庫,而且咱們能夠手動完成,或者讓機器爲咱們完成工做。因此咱們能夠實現一個數據庫遷移,它可以在啓動時應用任何須要的數據庫更改來執行應用程序。

正如咱們爲數據庫支持所作的那樣,咱們能夠實現一個提供執行遷移功能的特性:

DatabaseMigrationSupport.scala

package com.fm.mylibrary.producer.db
import com.fm.mylibrary.producer.Config
import org.flywaydb.core.Flyway
trait DatabaseMigrationSupport extends Config {
 private val flyway = new Flyway()
 flyway.setDataSource(databaseUrl, databaseUser, databasePassword)
 def migrateDB(): Unit = {
   flyway.migrate()
 }
 def reloadSchema(): Unit = {
   flyway.clean()
   flyway.migrate()
 }
}

這暴露了兩種方法,一種是增量遷移,一種是從新執行整個遷移。它使用特徵來獲取數據庫鏈接信息。Config

默認狀況下,Flayway會在src/main/resources/db/migration中查找遷移的sql腳本文件,它須要具備特定名稱格式的文件:

img

官方遷移文檔獲取更多信息。

因此,咱們的第一個遷移腳本是建立分類表:

V1__Create_Category.sql

CREATE TABLE category (
 name VARCHAR(255) NOT NULL PRIMARY KEY
);

咱們能夠在服務器啓動時執行它:

MyLibraryAppServer.scala

package com.fm.mylibrary.producer.app
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.directives.DebuggingDirectives
import akka.stream.ActorMaterializer
import com.fm.mylibrary.producer.db.DatabaseMigrationSupport
import com.fm.mylibrary.producer.{Config, Routes}
import scala.concurrent.ExecutionContextExecutor
import scala.util.{Failure, Success}
object MyLibraryAppServer extends App
         with Routes
         with Config
         with DatabaseMigrationSupport
         with DebuggingDirectives {
 implicit val actorSystem: ActorSystem = ActorSystem()
 implicit val materializer: ActorMaterializer = ActorMaterializer()
 implicit val executionContext: ExecutionContextExecutor = actorSystem.dispatcher
 val log = actorSystem.log
 def startApplication(): Unit = {
   migrateDB()
   Http().bindAndHandle(handler = logRequestResult("log")(routes), interface = httpInterface, port = httpPort).onComplete {
     case Success(b) => log.info(s"application is up and running at ${b.localAddress.getHostName}:${b.localAddress.getPort}")
     case Failure(e) => log.error(s"could not start application: {}", e.getMessage)
   }
 }
 def stopApplication(): Unit = {
   actorSystem.terminate()
 }
 startApplication()
}

咱們在HTTP綁定以前添加了DatabaseMigrationSupport和migrateDB()的調用。

最後一件事是將咱們的新數據源與業務邏輯關聯起來,改變路線以便從DB中檢索類別:

Routes.scala

package com.fm.mylibrary.producer
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.stream.Materializer
import com.fm.mylibrary.producer.entity.CategoryDAO
import scala.concurrent.ExecutionContext
import spray.json._
import com.fm.mylibrary.model.JsonProtocol._
trait Routes {
 implicit val materializer: Materializer
 implicit val executionContext: ExecutionContext
 private val categoryEntityDAO = new CategoryDAO()
 val searchRoutes: Route = {
   pathPrefix("search" / "category") {
     get {
       complete(
         categoryEntityDAO.findAll()
             .map(_.toJson)
       )
     }
   }
 }
 val routes: Route = searchRoutes
}

咱們剛剛調用dao中的findAll方法替換了靜態列表。

你能夠看到dao在trait中被實例化,若是邏輯變得更復雜,我建議將它做爲必需的參數(隱式或類屬性)移動,以便從外部注入它們。在咱們如今的狀況下,沒有必要,由於邏輯很是簡單,在測試方面,咱們使用的是內存數據庫,因此沒有必要對它進行模擬。

回到測試路徑上,它會失敗,由於沒有數據,因此咱們要添加它們。咱們能夠很容易地用一種方法的特徵來實現,這個特徵實現了一個方法,添加了幾個類別::

MockData.data

package com.fm.mylibrary.producer.db
import com.fm.mylibrary.model.Category
import com.fm.mylibrary.producer.entity.CategoryDAO
import scala.concurrent.{Await, ExecutionContext}
import scala.concurrent.duration.Duration
trait MockData {
 implicit val executionContext: ExecutionContext
 def addMockCategories(): Unit = {
   val categoryEntityDAO = new CategoryDAO()
   val setupFuture = for {
     c1 <- categoryEntityDAO.insertOrUpdate(Category("Java"))
     c2 <- categoryEntityDAO.insertOrUpdate(Category("DevOps"))
   } yield c1 + c2
   Await.result(setupFuture, Duration.Inf)
 }
}

將它添加進來,以便咱們可使用路由測試和Pact測試輕鬆驗證應用程序:BaseAppServerTestAppMyLibraryAppServer

MyLibraryAppServer.scala

package com.fm.mylibrary.producer.app
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.directives.DebuggingDirectives
import akka.stream.ActorMaterializer
import com.fm.mylibrary.producer.db.{DatabaseMigrationSupport, MockData}
import com.fm.mylibrary.producer.{Config, Routes}
import scala.concurrent.ExecutionContextExecutor
import scala.util.{Failure, Success}
object MyLibraryAppServer extends App
         with Routes
         with Config
         with DatabaseMigrationSupport
         with MockData
         with DebuggingDirectives {
 implicit val actorSystem: ActorSystem = ActorSystem()
 implicit val materializer: ActorMaterializer = ActorMaterializer()
 implicit val executionContext: ExecutionContextExecutor = actorSystem.dispatcher
 val log = actorSystem.log
 def startApplication(): Unit = {
   migrateDB()
   addMockCategories()
   Http().bindAndHandle(handler = logRequestResult("log")(routes), interface = httpInterface, port = httpPort).onComplete {
     case Success(b) => log.info(s"application is up and running at ${b.localAddress.getHostName}:${b.localAddress.getPort}")
     case Failure(e) => log.error(s"could not start application: {}", e.getMessage)
   }
 }
 def stopApplication(): Unit = {
   actorSystem.terminate()
 }
 startApplication()
}

BaseTestAppServer.scala

package com.fm.mylibrary.producer
import akka.http.scaladsl.testkit.ScalatestRouteTest
import com.fm.mylibrary.producer.db.{DatabaseMigrationSupport, MockData}
import org.scalamock.scalatest.MockFactory
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec}
import scala.concurrent.ExecutionContextExecutor
class BaseTestAppServer extends WordSpec
           with ScalatestRouteTest
           with Matchers
           with MockFactory
           with DatabaseMigrationSupport
           with MockData
           with Routes
           with BeforeAndAfterAll {
 implicit val executionContext: ExecutionContextExecutor = system.dispatcher
 override def beforeAll(): Unit = {
   migrateDB()
   addMockCategories()
 }
}

若是咱們執行全部測試,咱們應該沒有問題; 你能夠用sbt test命令來作到這一點

若是咱們啓動服務器,用sbt run命令,並執行GET /search/category,咱們應該獲得咱們的兩個類別:

img

總結

消費者驅動的契約測試是一項很是棒的技術,能夠節省不少時間和與集成測試相關的問題。

全部的實現都是「以契約爲中心」的,因此它意味着咱們強制首先考慮如何讓消費者得到特定的服務,而且咱們必須提供特定的服務,而後咱們不須要設置基礎設施來執行集成測試服務。

另外一方面,Scala協議沒有很好的文檔記錄,所以設置複雜測試會頗有挑戰性,而我發現的惟一方法是瀏覽它的示例和源代碼。

咱們已經看到了一個很是簡單的例子,不多在真實環境中使用,可是但願您能夠將它用做下一個微服務的起點。

更多關於CDC和Pact

我已經向你展現了Pact的最基本用法,對於一個真正的環境來講這多是不夠的,由於有許多團隊,每一個團隊都與許多生產者和消費者進行「併發」工做,其中通訊很是重要,以及自動化和用於解決它的工具。

在CDC和Pact的狀況下,您必須自動執行契約處理(發佈/驗證),並將其與CI / CD(持續集成/持續交付)流程相連接,以便在沒有相關生產商的狀況下客戶沒法投入生產尊重他們的契約,若是違反了某些契約,任何生產者都不能生產。

因此,我強烈建議您將Pact的官方文檔和介紹人Pact Broker帶入您的CI / CD流程,它是一個提供如下功能的應用程序(來自官方文檔):

  • 經過獨立部署您的服務並避免集成測試的瓶頸,您能夠快速,放心地利用客戶價值
  • 解決了如何在消費者和提供者項目之間共享契約驗證結果的問題
  • 告訴您能夠將應用程序的哪一個版本安全地部署在一塊兒,自動地將您的合同版本部署在一塊兒
  • 容許您確保多個消費者版本和提供者版本之間的向後兼容性(例如,在移動或多租戶環境中)
  • 提供保證爲最新的應用程序的API文檔
  • 向您展現您的服務如何互動的真實例子
  • 容許您可視化服務之間的關係

您能夠隨時提出任何問題,若是您須要建議,我將很是樂意提供幫助。

擴展閱讀:https://www.cnblogs.com/jinji...

問答
微服務架構:跨服務數據共享如何實現?
相關閱讀
在微服務之間進行通訊 
服務集成時需避免的兩個錯誤
基於 RabbitMQ 和 AMQP 進行消息傳遞

此文已由做者受權騰訊雲+社區發佈,原文連接:https://cloud.tencent.com/dev...

歡迎你們前往騰訊雲+社區或關注雲加社區微信公衆號(QcloudCommunity),第一時間獲取更多海量技術實踐乾貨哦~

相關文章
相關標籤/搜索