Vert.x Blueprint 系列教程(三) | Micro-Shop 微服務應用實戰

本文章是 Vert.x 藍圖系列 的第三篇教程。全系列:javascript

本系列已發佈至Vert.x官網:Vert.x Blueprint Tutorialsgit


前言

歡迎回到Vert.x 藍圖系列!當今,微服務架構 變得愈來愈流行,開發者們都想嘗試一下微服務應用的開發和架構設計。使人激動的是,Vert.x給咱們提供了一系列用於微服務開發的組件,包括 Service Discovery (服務發現)、Circuit Breaker (斷路器) 以及其它的一些組件。有了Vert.x微服務組件的幫助,咱們就能夠快速利用Vert.x搭建咱們的微服務應用。在這篇藍圖教程中,咱們一塊兒來探索一個利用Vert.x的各個組件開發的 Micro-Shop 微服務應用~github

經過本教程,你將會學習到如下內容:web

  • 微服務架構redis

  • 如何利用Vert.x來開發微服務應用sql

  • 異步開發模式docker

  • 響應式、函數式編程

  • 事件溯源 (Event Sourcing)

  • 經過分佈式 Event Bus 進行異步RPC調用

  • 各類各樣的服務類型(例如REST、數據源、Event Bus服務等)

  • 如何使用服務發現模塊 (Vert.x Service Discovery)

  • 如何使用斷路器模塊 (Vert.x Circuit Breaker)

  • 如何利用Vert.x實現API Gateway

  • 如何進行權限認證 (OAuth 2 + Keycloak)

  • 如何配置及使用 SockJS - Event Bus Bridge

以及其它的一些東西。。。

本教程是 Vert.x 藍圖系列 的第三篇教程,對應的Vert.x版本爲 3.3.2 。本教程中的完整代碼已託管至 GitHub

踏入微服務之門

哈~你必定對「微服務」這個詞很熟悉——至少聽起來很熟悉~愈來愈多的開發者開始擁抱微服務架構,那麼微服務到底是什麼呢?一句話總結一下:

Microservices are small, autonomous services that work together.

咱們來深刻一下微服務的各類特性,來看看微服務爲什麼如此出色:

  • 首先,微服務的重要的一點是「微」。每一個微服務都是獨立的,每一個單獨的微服務組件都注重某一特定的邏輯。在微服務架構中,咱們將傳統的單體應用拆分紅許多互相獨立的組件。每一個組件都由其特定的「邏輯邊界」,所以組件不會過於龐大。不過話又說回來了,每一個組件應該有多小呢?這個問題可很差回答,它一般取決與咱們的業務與負載。正如Sam Newman在其《Building
    Microservices》書中所講的那樣:

We seem to have a very good sense of what is too big, and so it could be argued that once a piece of code no longer feels too big, it’s probably small enough.

所以,當咱們以爲每一個組件不是特別大的時候,組件的大小可能就剛恰好。

  • 在微服務架構中,組件之間能夠經過任意協議進行通訊,好比 HTTPAMQP

  • 每一個組件是獨立的,所以咱們能夠在不一樣的組件中使用不一樣的編程語言,不一樣的技術 —— 這就是所謂的 polyglot support (不錯,Vert.x也是支持多語言的!)

  • 每一個組件都是獨立開發、部署以及發佈的,因此這減小了部署及發佈的難度。

  • 微服務架構一般與分佈式系統如影隨行,因此咱們還須要考慮分佈式系統中的方方面面,包括可用性、彈性以及可擴展性。

  • 微服務架構一般被設計成爲 面向失敗的,由於在分佈式系統中失敗的場景很是複雜,咱們須要有效地處理失敗的手段。

雖然微服務有如此多的優勢,可是不要忘了,微服務可不是銀彈,由於它引入了分佈式系統中所帶來的各類問題,所以設計架構時咱們都要考慮這些狀況。

服務發現

在微服務架構中,每一個組件都是獨立的,它們都不知道其餘組件的位置,可是組件之間又須要通訊,所以咱們必須知道各個組件的位置。然而,把位置信息寫死在代碼中顯然很差,所以咱們須要一種機制能夠動態地記錄每一個組件的位置 —— 這就是 服務發現。有了服務發現模塊,咱們就能夠將服務位置發佈至服務發現模塊中,其它服務就能夠從服務發現模塊中獲取想要調用的服務的位置並進行調用。在調用服務的過程當中,咱們不須要知道對應服務的位置,因此當服務位置或環境變更時,服務調用能夠不受影響,這使得咱們的架構更加靈活。

Vert.x提供了一個服務發現模塊用於發佈和獲取服務記錄。在Vert.x 服務發現模塊,每一個服務都被抽象成一個Record(服務記錄)。服務提供者能夠向服務發現模塊中發佈服務,此時Record會根據底層ServiceDiscoveryBackend的配置存儲在本地Map、分佈式Map或Redis中。服務消費者能夠從服務發現模塊中獲取服務記錄,而且經過服務記錄獲取對應的服務實例而後進行服務調用。目前Vert.x原生支持好幾種服務類型,好比 Event Bus 服務(即服務代理)、HTTP 端點消息源 以及 數據源。固然咱們也能夠實現本身的服務類型,能夠參考相關的文檔。在後面咱們還會詳細講述如何使用服務發現模塊,這裏先簡單作個瞭解。

異步的、響應式的Vert.x

異步與響應式風格都很適合微服務架構,而Vert.x兼具這兩種風格!異步開發模式相信你們已經瞭然於胸了,而若是你們讀過前幾篇藍圖教程的話,響應式風格你們必定不會陌生。有了基於Future以及基於RxJava的異步開發模式,咱們能夠爲所欲爲地對異步過程進行組合和變換,這樣代碼能夠很是簡潔,很是優美!在本藍圖教程中,咱們會見到大量基於Future和RxJava的異步方法。

Mirco Shop 微服務應用

好啦,如今你們應該對微服務架構有了一個大體的瞭解了,下面咱們來說一下本藍圖中的微服務應用。這是一個簡單的 Micro-Shop 微服務應用 (目前只完成了基本功能),人們能夠進行網上購物以及交易。。。當前版本的微服務應用包含下列組件:

  • 帳戶服務:提供用戶帳戶的操做服務,使用MySQL做爲後端存儲。

  • 商品服務:提供商品的操做服務,使用MySQL做爲後端存儲。

  • 庫存服務:提供商品庫存的操做服務,如查詢庫存、增長庫存即減小庫存。使用Redis做爲後端存儲。

  • 網店服務:提供網店的操做即管理服務,使用MongoDB做爲後端存儲。

  • 購物車服務:提供購物車事件的生成以及購物車操做(添加、刪除商品以及結算)服務。咱們經過此服務來說述 事件溯源

  • 訂單服務:訂單服務從Event Bus接收購物車服務發送的訂單請求,接着處理訂單並將訂單發送至下層服務(本例中僅僅簡單地存儲至數據庫中)。

  • Micro Shop 前端:此微服務的前端部分(SPA),目前已整合至API Gateway組件中。

  • 監視儀表板:用於監視微服務系統的狀態以及日誌、統計數據的查看。

  • API Gateway:整個微服務的入口,它負責將收到的請求按照必定的規則分發至對應的組件的REST端點中(至關於反向代理)。它也負責權限認證與管理,負載均衡,心跳檢測以及失敗處理(使用Vert.x Circuit Breaker)。

Micro Shop 微服務架構

咱們來看一下Micro Shop微服務應用的架構:

Microservice Architecture

用戶請求首先通過API Gateway,再經其處理並分發至對應的業務端點。

咱們再來看一下每一個基礎組件內部的結構(基礎組件即圖中最下面的各個業務組件)。

組件結構

每一個基礎組件至少有兩個Verticle:服務Verticle以及REST Verticle。REST Vertice提供了服務對應的REST端點,而且也負責將此端點發布至服務發現層。而服務Verticle則負責發佈其它服務(如Event Bus服務或消息源)而且部署REST Verticle。

每一個基礎組件中都包含對應的服務接口,如商品組件中包含ProductService接口。這些服務接口都是Event Bus 服務,由@ProxyGen註解修飾。上篇藍圖教程中咱們講過,Vert.x Service Proxy能夠自動爲@ProxyGen註解修飾的接口生成服務代理類,所以咱們能夠很方便地在Event Bus上進行異步RPC調用而不用寫額外的代碼。很酷吧!而且有了服務發現組件之後,咱們能夠很是方便地將Event Bus服務發佈至服務發現層,這樣其它組件能夠更方便地調用服務。

Component structure

組件之間的通訊

咱們先來看一下咱們的微服務應用中用到的服務類型:

  • HTTP端點 (e.g. REST 端點以及API Gateway) - 此服務的位置用URL描述

  • Event Bus服務 - 此服務的位置用Event Bus上的一個特定地址描述

  • 事件源 - 事件源服務對應Event Bus上某個地址的事件消費者。此服務的位置用Event Bus上的一個特定地址描述

所以,咱們各個組件之間能夠經過HTTP以及Event Bus(本質是TCP)進行通訊,例如:

Interaction

API Gateway與其它組件經過HTTP進行通訊。

讓咱們開始吧!

好啦,如今開始咱們的微服務藍圖旅程吧!首先咱們從GitHub上clone項目:

git clone https://github.com/sczyh30/vertx-blueprint-microservice.git

在本藍圖教程中,咱們使用 Maven 做爲構建工具。咱們首先來看一下pom.xml配置文件。咱們能夠看到,咱們的藍圖應用由許多模塊構成:

<modules>
  <module>microservice-blueprint-common</module>
  <module>account-microservice</module>
  <module>product-microservice</module>
  <module>inventory-microservice</module>
  <module>store-microservice</module>
  <module>shopping-cart-microservice</module>
  <module>order-microservice</module>
  <module>api-gateway</module>
  <module>cache-infrastructure</module>
  <module>monitor-dashboard</module>
</modules>

每一個模塊表明一個組件。看着配置文件,彷佛有很多組件呢!不要擔憂,咱們將會一一探究這些組件。下面咱們先來看一下全部組件的基礎模塊 - microservice-blueprint-common

微服務基礎模塊

microservice-blueprint-common模塊提供了一些微服務功能相關的輔助類以及輔助Verticle。咱們先來看一下兩個base verticles - BaseMicroserviceVerticleRestAPIVerticle

Base Microservice Verticle

BaseMicroserviceVerticle提供了與微服務相關的初始化函數以及各類各樣的輔助函數。其它每個Verticle都會繼承此Verticle,所以這個基礎Verticle很是重要。

首先咱們來看一下其中的成員變量:

protected ServiceDiscovery discovery;
protected CircuitBreaker circuitBreaker;
protected Set<Record> registeredRecords = new ConcurrentHashSet<>();

discovery以及circuitBreaker分別表明服務發現實例以及斷路器實例,而registeredRecords表明當前已發佈的服務記錄的集合,用於在結束Verticle時註銷服務。

start函數中主要是對服務發現實例和斷路器實例進行初始化,配置文件從config()中獲取。它的實現很是簡單:

@Override
public void start() throws Exception {
  // init service discovery instance
  discovery = ServiceDiscovery.create(vertx, new ServiceDiscoveryOptions().setBackendConfiguration(config()));

  // init circuit breaker instance
  JsonObject cbOptions = config().getJsonObject("circuit-breaker") != null ?
    config().getJsonObject("circuit-breaker") : new JsonObject();
  circuitBreaker = CircuitBreaker.create(cbOptions.getString("name", "circuit-breaker"), vertx,
    new CircuitBreakerOptions()
      .setMaxFailures(cbOptions.getInteger("max-failures", 5))
      .setTimeout(cbOptions.getLong("timeout", 10000L))
      .setFallbackOnFailure(true)
      .setResetTimeout(cbOptions.getLong("reset-timeout", 30000L))
  );
}

下面咱們還提供了幾個輔助函數用於發佈各類各樣的服務。這些函數都是異步的,而且基於Future:

protected Future<Void> publishHttpEndpoint(String name, String host, int port) {
  Record record = HttpEndpoint.createRecord(name, host, port, "/",
    new JsonObject().put("api.name", config().getString("api.name", ""))
  );
  return publish(record);
}

protected Future<Void> publishMessageSource(String name, String address) {
  Record record = MessageSource.createRecord(name, address);
  return publish(record);
}

protected Future<Void> publishJDBCDataSource(String name, JsonObject location) {
  Record record = JDBCDataSource.createRecord(name, location, new JsonObject());
  return publish(record);
}

protected Future<Void> publishEventBusService(String name, String address, Class serviceClass) {
  Record record = EventBusService.createRecord(name, address, serviceClass);
  return publish(record);
}

以前咱們提到過,每一個服務記錄Record表明一個服務,其中服務類型由記錄中的type字段標識。Vert.x原生支持的各類服務接口中都包含着好幾個createRecord方法所以咱們能夠利用這些方法來方便地建立服務記錄。一般狀況下咱們須要給每一個服務都指定一個name,這樣以後咱們就能夠經過名稱來獲取服務了。咱們還能夠經過setMetadata方法來給服務記錄添加額外的元數據。

你可能注意到在publishHttpEndpoint方法中咱們就提供了含有api-name的元數據,以後咱們會了解到,API Gateway在進行反向代理時會用到它。

下面咱們來看一下發布服務的通用方法 —— publish方法:

private Future<Void> publish(Record record) {
  Future<Void> future = Future.future();
  // publish the service
  discovery.publish(record, ar -> {
    if (ar.succeeded()) {
      registeredRecords.add(record);
      logger.info("Service <" + ar.result().getName() + "> published");
      future.complete();
    } else {
      future.fail(ar.cause());
    }
  });
  return future;
}

publish方法中,咱們調用了服務發現實例discoverypublish方法來將服務發佈至服務發現模塊。它一樣也是一個異步方法,當發佈成功時,咱們將此服務記錄存儲至registeredRecords中,輸出日誌而後通知future操做已完成。最後返回對應的future

注意,在Vert.x Service Discovery當前版本(3.3.2)的設計中,服務發佈者須要在必要時手動註銷服務,所以當Verticle結束時,咱們須要將註冊的服務都註銷掉:

@Override
public void stop(Future<Void> future) throws Exception {
  // In current design, the publisher is responsible for removing the service
  List<Future> futures = new ArrayList<>();
  for (Record record : registeredRecords) {
    Future<Void> unregistrationFuture = Future.future();
    futures.add(unregistrationFuture);
    discovery.unpublish(record.getRegistration(), unregistrationFuture.completer());
  }

  if (futures.isEmpty()) {
    discovery.close();
    future.complete();
  } else {
    CompositeFuture.all(futures)
      .setHandler(ar -> {
        discovery.close();
        if (ar.failed()) {
          future.fail(ar.cause());
        } else {
          future.complete();
        }
      });
  }
}

stop方法中,咱們遍歷registeredRecords集合而且嘗試註銷每個服務,並將異步結果future添加至futures列表中。以後咱們調用CompositeFuture.all(futures)來依次獲取每一個異步結果的狀態。all方法返回一個組合的Future,當列表中的全部Future都成功賦值時方爲成功狀態,反之只要有一個異步結果失敗,它就爲失敗狀態。所以,咱們給它綁定一個Handler,當全部服務都被註銷時,服務發現模塊就能夠安全地關閉了,不然結束函數會失敗。

REST API Verticle

RestAPIVerticle抽象類繼承了BaseMicroserviceVerticle抽象類。從名字上就能夠看出,它提供了諸多的用於REST API開發的輔助方法。咱們在其中封裝了諸如建立服務端、開啓Cookie和Session支持,開啓心跳檢測支持(經過HTTP),各類各樣的路由處理封裝以及用於權限驗證的路由處理器。在以後的章節中咱們將會見到這些方法。

好啦,如今咱們已經瞭解了整個藍圖應用中的兩個基礎Verticle,下面是時候探索各個模塊了!在探索邏輯組件以前,咱們先來看一下其中最重要的組件之一 —— API Gateway。

API Gateway

咱們把API Gateway的內容單獨歸爲一篇教程,請見:Vert.x 藍圖 - Micro Shop 微服務實戰 (API Gateway篇)

Event Bus 服務 - 帳戶、網店及商品服務

在Event Bus上進行異步RPC

在以前的 Vert.x Kue 藍圖教程 中咱們已經介紹過Vert.x中的異步RPC(也叫服務代理)了,這裏咱們再來回顧一下,而且說一說如何利用服務發現模塊更方便地進行異步RPC。

傳統的RPC有一個缺點:消費者須要阻塞等待生產者的迴應。這是一種阻塞模型,和Vert.x推崇的異步開發模式不相符。而且,傳統的RPC不是真正面向失敗設計的。還好,Vert.x提供了一種高效的、響應式的RPC —— 異步RPC。咱們不須要等待生產者的迴應,而只須要傳遞一個Handler<AsyncResult<R>>參數給異步方法。這樣當收到生產者結果時,對應的Handler就會被調用,很是方便,這與Vert.x的異步開發模式相符。而且,AsyncResult也是面向失敗設計的。

Vert.x Service Proxy(服務代理組件)能夠自動處理含有@ProxyGen註解的服務接口,生成相應的服務代理類。生成的服務代理類能夠幫咱們將數據封裝好後發送至Event Bus、從Event Bus接收數據,以及對數據進行編碼和解碼,所以咱們能夠省掉很多代碼。咱們須要作的就是遵循@ProxyGen註解的一些限定

好比,這裏有一個Event Bus服務接口:

@ProxyGen
public interface MyService {
  @Fluent
  MyService retrieveData(String id, Handler<AsyncResult<JsonObject>> resultHandler);
}

咱們能夠經過Vert.x Service Proxy組件生成對應的代理類。而後咱們就能夠經過ProxyHelper類的registerService方法將此服務註冊至Event Bus上:

MyService myService = MyService.createService(vertx, config);
ProxyHelper.registerService(MyService.class, vertx, myService, SERVICE_ADDRESS);

有了服務發現組件以後,將服務發佈至服務發現層就很是容易了。好比在咱們的藍圖應用中咱們使用封裝好的方法:

publishEventBusService(SERVICE_NAME, SERVICE_ADDRESS, MyService.class)

OK,如今服務已經成功地發佈至服務發現模塊。如今咱們就能夠經過EventBusService接口的getProxy方法來從服務發現層獲取發佈的Event Bus服務,而且像調用普通異步方法那樣進行異步RPC:

EventBusService.<MyService>getProxy(discovery, new JsonObject().put("name", SERVICE_NAME), ar -> {
  if (ar.succeeded()) {
    MyService myService = ar.result();
    myService.retrieveData(...);
  }
});

幾個服務模塊的通用特性

在咱們的Micro Shop微服務應用中,帳戶、網店及商品服務有幾個通用的特性及約定。如今咱們來解釋一下。

在這三個模塊中,每一個模塊都包含:

  • 一個Event Bus服務接口。此服務接口定義了對實體存儲的各類操做

  • 服務接口的實現

  • REST API Verticle,用於建立服務端並將其發佈至服務發現模塊

  • Main Verticle,用於部署其它的verticles以及將Event Bus服務和消息源發佈至服務發現層

其中,用戶帳戶服務以及商品服務都使用 MySQL 做爲後端存儲,而網店服務則以 MongoDB 做爲後端存儲。這裏咱們只挑兩個典型的服務介紹如何經過Vert.x操做不一樣的數據庫:product-microservicestore-microserviceaccount-microservice的實現與product-microservice很是類似,你們能夠查閱 GitHub 上的代碼。

基於MySQL的商品服務

商品微服務模塊提供了商品的操做功能,包括添加、查詢(搜索)、刪除與更新商品等。其中最重要的是ProductService服務接口以及其實現了。咱們先來看一下此服務接口的定義:

@VertxGen
@ProxyGen
public interface ProductService {

  /**
   * The name of the event bus service.
   */
  String SERVICE_NAME = "product-eb-service";

  /**
   * The address on which the service is published.
   */
  String SERVICE_ADDRESS = "service.product";

  /**
   * Initialize the persistence.
   */
  @Fluent
  ProductService initializePersistence(Handler<AsyncResult<Void>> resultHandler);

  /**
   * Add a product to the persistence.
   */
  @Fluent
  ProductService addProduct(Product product, Handler<AsyncResult<Void>> resultHandler);

  /**
   * Retrieve the product with certain `productId`.
   */
  @Fluent
  ProductService retrieveProduct(String productId, Handler<AsyncResult<Product>> resultHandler);

  /**
   * Retrieve the product price with certain `productId`.
   */
  @Fluent
  ProductService retrieveProductPrice(String productId, Handler<AsyncResult<JsonObject>> resultHandler);

  /**
   * Retrieve all products.
   */
  @Fluent
  ProductService retrieveAllProducts(Handler<AsyncResult<List<Product>>> resultHandler);

  /**
   * Retrieve products by page.
   */
  @Fluent
  ProductService retrieveProductsByPage(int page, Handler<AsyncResult<List<Product>>> resultHandler);

  /**
   * Delete a product from the persistence
   */
  @Fluent
  ProductService deleteProduct(String productId, Handler<AsyncResult<Void>> resultHandler);

  /**
   * Delete all products from the persistence
   */
  @Fluent
  ProductService deleteAllProducts(Handler<AsyncResult<Void>> resultHandler);

}

正如咱們以前所提到的那樣,這個服務接口是一個Event Bus服務,因此咱們須要給它加上@ProxyGen註解。這些方法都是異步的,所以每一個方法都須要接受一個Handler<AsyncResult<T>>參數。當異步操做完成時,對應的Handler會被調用。注意到咱們還給此接口加了@VertxGen註解。上篇藍圖教程中咱們提到過,這是爲了開啓多語言支持(polyglot language support)。Vert.x Codegen註解處理器會自動處理含有@VertxGen註解的類,並生成支持的其它語言的代碼,如Ruby、JS等。。。這是很是適合微服務架構的,由於不一樣的組件能夠用不一樣的語言進行開發!

每一個方法的含義都在註釋中給出了,這裏就不解釋了。

商品服務接口的實現位於ProductServiceImpl類中。商品信息存儲在MySQL中,所以咱們能夠經過 Vert.x-JDBC 對數據庫進行操做。咱們在 第一篇藍圖教程 中已經詳細講述過Vert.x JDBC的使用細節了,所以這裏咱們就不過多地討論細節了。這裏咱們只關注如何減小代碼量。由於一般簡單數據庫操做的過程都是千篇一概的,所以作個封裝是頗有必要的。

首先來回顧一下每次數據庫操做的過程:

  1. JDBCClient中獲取數據庫鏈接SQLConnection,這是一個異步過程

  2. 執行SQL語句,綁定回調Handler

  3. 最後不要忘記關閉數據庫鏈接以釋放資源

對於正常的CRUD操做來講,它們的實現都很類似,所以咱們封裝了一個JdbcRepositoryWrapper類來實現這些通用邏輯。它位於io.vertx.blueprint.microservice.common.service包中:

JdbcRepositoryWrapper class structure

咱們提供瞭如下的封裝方法:

  • executeNoResult: 執行含參數的SQL語句 (經過updateWithParams方法)。執行結果是不須要的,所以只須要接受一個 Handler<AsyncResult<Void>> 類型的參數。此方法一般用於insert之類的操做。

  • retrieveOne: 執行含參數的SQL語句,用於獲取某一特定實體(經過 queryWithParams方法)。此方法是基於Future的,它返回一個Future<Optional<JsonObject>>類型的異步結果。若是結果集爲空,那麼返回一個空的Optional monad。若是結果集不爲空,則返回第一個結果並用Optional進行包裝。

  • retrieveMany: 獲取多個實體,返回Future<List<JsonObject>>做爲異步結果。

  • retrieveByPage: 與retrieveMany 方法類似,但包含分頁邏輯。

  • retrieveAll: similar to retrieveMany method but does not require query parameters as it simply executes statement such as SELECT * FROM xx_table.

  • removeOne and removeAll: remove entity from the database.

固然這與Spring JPA相比的不足之處在於SQL語句得本身寫,本身封裝也不是很方便。。。考慮到Vert.x JDBC底層也只是使用了Worker線程池包裝了原生的JDBC(不是真正的異步),咱們也能夠結合Spring Data的相關組件來簡化開發。另外,Vert.x JDBC使用C3P0做爲默認的數據庫鏈接池,C3P0的性能我想你們應該都懂。。。所以換成性能更優的HikariCP是頗有必要的。

回到JdbcRepositoryWrapper中來。這層封裝能夠大大地減小代碼量。好比,咱們的ProductServiceImpl實現類就能夠繼承JdbcRepositoryWrapper類,而後利用這些封裝好的方法。看個例子 —— retrieveProduct方法的實現:

@Override
public ProductService retrieveProduct(String productId, Handler<AsyncResult<Product>> resultHandler) {
  this.retrieveOne(productId, FETCH_STATEMENT)
    .map(option -> option.map(Product::new).orElse(null))
    .setHandler(resultHandler);
  return this;
}

咱們惟一須要作的只是將結果變換成須要的類型。是否是很方便呢?

固然這不是惟一方法。在下面的章節中,咱們將會講到一種更Reactive,更Functional的方法 —— 利用Rx版本的Vert.x JDBC。另外,用vertx-sync也是一種不錯的選擇(相似於async/await)。

好啦!看完服務實現,下面輪到REST API了。咱們來看看RestProductAPIVerticle的實現:

public class RestProductAPIVerticle extends RestAPIVerticle {

  public static final String SERVICE_NAME = "product-rest-api";

  private static final String API_ADD = "/add";
  private static final String API_RETRIEVE = "/:productId";
  private static final String API_RETRIEVE_BY_PAGE = "/products";
  private static final String API_RETRIEVE_PRICE = "/:productId/price";
  private static final String API_RETRIEVE_ALL = "/products";
  private static final String API_DELETE = "/:productId";
  private static final String API_DELETE_ALL = "/all";

  private final ProductService service;

  public RestProductAPIVerticle(ProductService service) {
    this.service = service;
  }

  @Override
  public void start(Future<Void> future) throws Exception {
    super.start();
    final Router router = Router.router(vertx);
    // body handler
    router.route().handler(BodyHandler.create());
    // API route handler
    router.post(API_ADD).handler(this::apiAdd);
    router.get(API_RETRIEVE).handler(this::apiRetrieve);
    router.get(API_RETRIEVE_BY_PAGE).handler(this::apiRetrieveByPage);
    router.get(API_RETRIEVE_PRICE).handler(this::apiRetrievePrice);
    router.get(API_RETRIEVE_ALL).handler(this::apiRetrieveAll);
    router.patch(API_UPDATE).handler(this::apiUpdate);
    router.delete(API_DELETE).handler(this::apiDelete);
    router.delete(API_DELETE_ALL).handler(context -> requireLogin(context, this::apiDeleteAll));

    enableHeartbeatCheck(router, config());

    // get HTTP host and port from configuration, or use default value
    String host = config().getString("product.http.address", "0.0.0.0");
    int port = config().getInteger("product.http.port", 8082);

    // create HTTP server and publish REST service
    createHttpServer(router, host, port)
      .compose(serverCreated -> publishHttpEndpoint(SERVICE_NAME, host, port))
      .setHandler(future.completer());
  }

  private void apiAdd(RoutingContext context) {
    try {
      Product product = new Product(new JsonObject(context.getBodyAsString()));
      service.addProduct(product, resultHandler(context, r -> {
        String result = new JsonObject().put("message", "product_added")
          .put("productId", product.getProductId())
          .encodePrettily();
        context.response().setStatusCode(201)
          .putHeader("content-type", "application/json")
          .end(result);
      }));
    } catch (DecodeException e) {
      badRequest(context, e);
    }
  }

  private void apiRetrieve(RoutingContext context) {
    String productId = context.request().getParam("productId");
    service.retrieveProduct(productId, resultHandlerNonEmpty(context));
  }

  private void apiRetrievePrice(RoutingContext context) {
    String productId = context.request().getParam("productId");
    service.retrieveProductPrice(productId, resultHandlerNonEmpty(context));
  }

  private void apiRetrieveByPage(RoutingContext context) {
    try {
      String p = context.request().getParam("p");
      int page = p == null ? 1 : Integer.parseInt(p);
      service.retrieveProductsByPage(page, resultHandler(context, Json::encodePrettily));
    } catch (Exception ex) {
      badRequest(context, ex);
    }
  }

  private void apiRetrieveAll(RoutingContext context) {
    service.retrieveAllProducts(resultHandler(context, Json::encodePrettily));
  }

  private void apiDelete(RoutingContext context) {
    String productId = context.request().getParam("productId");
    service.deleteProduct(productId, deleteResultHandler(context));
  }

  private void apiDeleteAll(RoutingContext context, JsonObject principle) {
    service.deleteAllProducts(deleteResultHandler(context));
  }

}

此Verticle繼承了RestAPIVerticle,所以咱們能夠利用其中諸多的輔助方法。首先來看一下啓動過程,即start方法。首先咱們先調用super.start()來初始化服務發現組件,而後建立Router,綁定BodyHandler以便操做請求正文,而後建立各個API路由並綁定相應的處理函數。接着咱們調用enableHeartbeatCheck方法開啓簡單的心跳檢測支持。最後咱們經過封裝好的createHttpServer建立HTTP服務端,並經過publishHttpEndpoint方法將HTTP端點發布至服務發現模塊。

其中createHttpServer方法很是簡單,咱們只是把vertx.createHttpServer方法變成了基於Future的:

protected Future<Void> createHttpServer(Router router, String host, int port) {
  Future<HttpServer> httpServerFuture = Future.future();
  vertx.createHttpServer()
    .requestHandler(router::accept)
    .listen(port, host, httpServerFuture.completer());
  return httpServerFuture.map(r -> null);
}

至於各個路由處理邏輯如何實現,能夠參考 Vert.x Blueprint - Todo Backend Tutorial 獲取相信信息。

最後咱們打開此微服務模塊中的Main Verticle - ProductVerticle類。正如咱們以前所提到的,它負責發佈服務以及部署REST Verticle。咱們來看一下其start方法:

@Override
public void start(Future<Void> future) throws Exception {
  super.start();

  // create the service instance
  ProductService productService = new ProductServiceImpl(vertx, config()); // (1)
  // register the service proxy on event bus
  ProxyHelper.registerService(ProductService.class, vertx, productService, SERVICE_ADDRESS); // (2)
  // publish the service in the discovery infrastructure
  initProductDatabase(productService) // (3)
    .compose(databaseOkay -> publishEventBusService(ProductService.SERVICE_NAME, SERVICE_ADDRESS, ProductService.class)) // (4)
    .compose(servicePublished -> deployRestService(productService)) // (5)
    .setHandler(future.completer()); // (6)
}

首先咱們建立一個ProductService服務實例(1),而後經過registerService方法將服務註冊至Event Bus(2)。接着咱們初始化數據庫表(3),將商品服務發佈至服務發現層(4)而後部署REST Verticle(5)。這是一系列的異步方法的組合操做,很溜吧!最後咱們將future.completer()綁定至組合後的Future上,這樣當全部異步操做都OK的時候,Future會自動完成。

固然,不要忘記在配置裏指定api.name。以前咱們在 API Gateway章節 提到過,API Gateway的反向代理部分就是經過對應REST服務的 api.name 來進行請求分發的。默認狀況下api.nameproduct:

{
  "api.name": "product"
}

基於MongoDB的網店服務

網店服務用於網店的操做,如開店、關閉、更新數據。正常狀況下,開店都須要人工申請,不過在本藍圖教程中,咱們把這一步簡化掉了。網店服務模塊的結構和商品服務模塊很是類似,因此咱們就不細說了。咱們這裏僅僅瞅一瞅如何使用Vert.x Mongo Client。

使用Vert.x Mongo Client很是簡單,首先咱們須要建立一個MongoClient實例,過程相似於JDBCClient

private final MongoClient client;

public StoreCRUDServiceImpl(Vertx vertx, JsonObject config) {
  this.client = MongoClient.createNonShared(vertx, config);
}

而後咱們就能夠經過它來操做Mongo了。好比咱們想執行存儲(save)操做,咱們能夠這樣寫:

@Override
public void saveStore(Store store, Handler<AsyncResult<Void>> resultHandler) {
  client.save(COLLECTION, new JsonObject().put("_id", store.getSellerId())
      .put("name", store.getName())
      .put("description", store.getDescription())
      .put("openTime", store.getOpenTime()),
    ar -> {
      if (ar.succeeded()) {
        resultHandler.handle(Future.succeededFuture());
      } else {
        resultHandler.handle(Future.failedFuture(ar.cause()));
      }
    }
  );
}

這些操做都是異步的,所以你必定很是熟悉這種模式!固然若是不喜歡基於回調的異步模式的話,你也能夠選擇Rx版本的API~

更多關於Vert.x Mongo Client的使用細節,請參考官方文檔

基於Redis的商品庫存服務

商品庫存服務負責操做商品的庫存數量,好比添加庫存、減小庫存以及獲取當前庫存數量。庫存使用Redis來存儲。

與以前的Event Bus服務不一樣,咱們這裏的商品庫存服務是基於Future的,而不是基於回調的。因爲服務代理模塊不支持處理基於Future的服務接口,所以這裏咱們就不用異步RPC了,只發佈一個REST API端點,全部的調用都經過REST進行。

首先來看一下InventoryService服務接口:

public interface InventoryService {

  /**
   * Create a new inventory service instance.
   *
   * @param vertx  Vertx instance
   * @param config configuration object
   * @return a new inventory service instance
   */
  static InventoryService createService(Vertx vertx, JsonObject config) {
    return new InventoryServiceImpl(vertx, config);
  }

  /**
   * Increase the inventory amount of a certain product.
   *
   * @param productId the id of the product
   * @param increase  increase amount
   * @return the asynchronous result of current amount
   */
  Future<Integer> increase(String productId, int increase);

  /**
   * Decrease the inventory amount of a certain product.
   *
   * @param productId the id of the product
   * @param decrease  decrease amount
   * @return the asynchronous result of current amount
   */
  Future<Integer> decrease(String productId, int decrease);

  /**
   * Retrieve the inventory amount of a certain product.
   *
   * @param productId the id of the product
   * @return the asynchronous result of current amount
   */
  Future<Integer> retrieveInventoryForProduct(String productId);

}

接口定義很是簡單,含義都在註釋中給出了。接着咱們再看一下服務的實現類InventoryServiceImpl類。在Redis中,全部的庫存數量都被存儲在inventory:v1命名空間中,並以商品號productId做爲標識。好比商品A123456會被存儲至inventory:v1:A123456鍵值對中。

Vert.x Redis提供了incrbydecrby命令,能夠很方便地實現庫存增長和減小功能,代碼相似。這裏咱們只看庫存增長功能:

@Override
public Future<Integer> increase(String productId, int increase) {
  Future<Long> future = Future.future();
  client.incrby(PREFIX + productId, increase, future.completer());
  return future.map(Long::intValue);
}

因爲庫存數量不會很是大,Integer就足夠了,所以咱們須要經過Long::intValue方法引用來將Long結果變換成Integer類型的。

retrieveInventoryForProduct方法的實現也很是短小精悍:

@Override
public Future<Integer> retrieveInventoryForProduct(String productId) {
  Future<String> future = Future.future();
  client.get(PREFIX + productId, future.completer());
  return future.map(r -> r == null ? "0" : r)
    .map(Integer::valueOf);
}

咱們經過get命令來獲取值。因爲結果是String類型的,所以咱們須要自行將其轉換爲Integer類型。若是結果爲空,咱們就認爲商品沒有庫存,返回0

至於REST Verticle(在此模塊中也爲Main Verticle),其實現模式與前面的大同小異,這裏就不展開說了。不要忘記在config.json中指定api.name:

{
  "api.name": "inventory",
  "redis.host": "redis",
  "inventory.http.address": "inventory-microservice",
  "inventory.http.port": 8086
}

事件溯源 - 購物車服務

好了,如今咱們與基礎服務模塊告一段落了。下面咱們來到了另外一個重要的服務模塊 —— 購物車微服務。此模塊負責購物車的獲取、購物車事件的添加以及結算功能。與傳統的實現不一樣,這裏咱們要介紹一種不一樣的開發模式 —— 事件溯源(Event Sourcing)。

解道Event Sourcing

在傳統的數據存儲模式中,咱們一般直接將數據自己的狀態存儲至數據庫中。這在通常場景中是沒有問題的,但有些時候,咱們不只想獲取到數據,還想獲取數據操做的過程(即此數據是通過怎樣的操做生成的),這時候咱們就能夠利用事件溯源(Event Sourcing)來解決這個問題。

事件溯源保證了數據狀態的變換都以一系列的事件的形式存儲在數據庫中。因此,咱們不只能夠獲取每一個變換的事件,並且能夠經過過去的事件來組合出過去任意時刻的數據狀態!這真是極好的~注意,有一點很重要,咱們不能更改已經保存的事件以及它們的序列 —— 也就是說,事件存儲是隻能添加而不能刪除的,而且須要不可變。是否是感受和數據庫事務日誌的原理差很少呢?

在微服務架構中,事件溯源模式能夠帶來如下的好處:

  • 咱們能夠從過去的事件序列中組建出任意時刻的數據狀態

  • 每一個過去的事件都得以保存,所以這使得補償事務成爲可能

  • 咱們能夠從事件存儲中獲取事件流,而且以異步、響應式風格對其進行變換和處理

  • 事件存儲一樣能夠看成爲數據日誌

事件存儲的選擇也須要好好考慮。Apache Kafka很是適合這種場景,在此版本的Micro Shop微服務中,爲了簡化其實現,咱們簡單地使用了MySQL做爲事件存儲。下個版本咱們將把Kafka整合進來。

注:在實際生產環境中,購物車一般被存儲於Session或緩存內。本章節僅爲介紹事件溯源而使用事件存儲模式。

購物車事件

咱們來看一下表明購物車事件的CartEvent數據對象:

@DataObject(generateConverter = true)
public class CartEvent {

  private Long id;
  private CartEventType cartEventType;
  private String userId;
  private String productId;
  private Integer amount;

  private long createdAt;

  public CartEvent() {
    this.createdAt = System.currentTimeMillis();
  }

  public CartEvent(JsonObject json) {
    CartEventConverter.fromJson(json, this);
  }

  public CartEvent(CartEventType cartEventType, String userId, String productId, Integer amount) {
    this.cartEventType = cartEventType;
    this.userId = userId;
    this.productId = productId;
    this.amount = amount;
    this.createdAt = System.currentTimeMillis();
  }

  public static CartEvent createCheckoutEvent(String userId) {
    return new CartEvent(CartEventType.CHECKOUT, userId, "all", 0);
  }

  public static CartEvent createClearEvent(String userId) {
    return new CartEvent(CartEventType.CLEAR_CART, userId, "all", 0);
  }

  public JsonObject toJson() {
    JsonObject json = new JsonObject();
    CartEventConverter.toJson(this, json);
    return json;
  }

  public static boolean isTerminal(CartEventType eventType) {
    return eventType == CartEventType.CLEAR_CART || eventType == CartEventType.CHECKOUT;
  }
}

一個購物車事件存儲着事件的類型、發生的時間、操做用戶、對應的商品ID以及商品數量變更。在咱們的藍圖應用中,購物車事件一共有四種,它們用CartEventType枚舉類表示:

public enum CartEventType {
  ADD_ITEM, // 添加商品至購物車
  REMOVE_ITEM, // 從購物車中刪除商品
  CHECKOUT, // 結算並清空
  CLEAR_CART // 清空
}

其中CHECKOUTCLEAR_CART事件是對整個購物車實體進行操做,對應的購物車事件參數相似,所以咱們寫了兩個靜態方法來建立這兩種事件。

另外咱們還注意到一個靜態方法isTerminal,它用於檢測當前購物車事件是否爲一個「終結」事件。所謂的「終結」,指的是到此就對整個購物車進行操做(結算或清空)。在從購物車事件流構建出對應的購物車狀態的時候,此方法很是有用。

購物車實體

看完了購物車事件,咱們再來看一下購物車。購物車實體用ShoppingCart數據對象表示,它包含着一個商品列表表示當前購物車中的商品即數量:

private List<ProductTuple> productItems = new ArrayList<>();

其中ProductTuple數據對象包含着商品號、商品賣家ID、單價以及當前購物車中次商品的數目amount

爲了方便,咱們還在ShoppingCart類中放了一個amountMap用於暫時存儲商品數量:

private Map<String, Integer> amountMap = new HashMap<>();

因爲它只是暫時存儲,咱們不但願在對應的JSON數據中看到它,因此把它的getter和setter方法都註解上@GenIgnore

在事件溯源模式中,咱們要從一系列的購物車事件構建對應的購物車狀態,所以咱們須要一個incorporate方法將每一個購物車事件「合併」至購物車內以變動對應的商品數目:

public ShoppingCart incorporate(CartEvent cartEvent) {
  // 此事件必須爲添加或刪除事件
  boolean ifValid = Stream.of(CartEventType.ADD_ITEM, CartEventType.REMOVE_ITEM)
    .anyMatch(cartEventType ->
      cartEvent.getCartEventType().equals(cartEventType));

  if (ifValid) {
    amountMap.put(cartEvent.getProductId(),
      amountMap.getOrDefault(cartEvent.getProductId(), 0) +
        (cartEvent.getAmount() * (cartEvent.getCartEventType()
          .equals(CartEventType.ADD_ITEM) ? 1 : -1)));
  }

  return this;
}

實現卻是比較簡單,咱們首先來檢查要合併的事件是否是添加商品或移除商品事件,若是是的話,咱們就根據事件類型以及對應的數量變動來改變當前購物車中該商品的數量(amountMap)。

使用Rx版本的Vert.x JDBC

咱們如今已經瞭解購物車微服務中的實體類了,下面該看看購物車事件存儲服務了。

以前用callback-based API寫Vert.x JDBC操做總感受心累,還好Vert.x支持與RxJava進行整合,而且幾乎每一個Vert.x組件都有對應的Rx版本!是否是瞬間感受整我的都變得Reactive了呢~(⊙o⊙) 這裏咱們就來使用Rx版本的Vert.x JDBC來寫咱們的購物車事件存儲服務。也就是說,裏面全部的異步方法都將是基於Observable的,頗有FRP風格!

咱們首先定義了一個簡單的CRUD接口SimpleCrudDataSource

public interface SimpleCrudDataSource<T, ID> {

  Observable<Void> save(T entity);

  Observable<T> retrieveOne(ID id);

  Observable<Void> delete(ID id);

}

接着定義了一個CartEventDataSource接口,定義了購物車事件獲取的相關方法:

public interface CartEventDataSource extends SimpleCrudDataSource<CartEvent, Long> {

  Observable<CartEvent> streamByUser(String userId);

}

能夠看到這個接口只有一個方法 —— streamByUser方法會返回某一用戶對應的購物車事件流,這樣後面咱們就能夠對其進行流式變換操做了!

下面咱們來看一下服務的實現類CartEventDataSourceImpl。首先是save方法,它將一個事件存儲至事件數據庫中:

@Override
public Observable<Void> save(CartEvent cartEvent) {
  JsonArray params = new JsonArray().add(cartEvent.getCartEventType().name())
    .add(cartEvent.getUserId())
    .add(cartEvent.getProductId())
    .add(cartEvent.getAmount())
    .add(cartEvent.getCreatedAt() > 0 ? cartEvent.getCreatedAt() : System.currentTimeMillis());
  return client.getConnectionObservable()
    .flatMap(conn -> conn.updateWithParamsObservable(SAVE_STATEMENT, params))
    .map(r -> null);
}

看看咱們的代碼,在對比對比普通的callback-based的Vert.x JDBC,是否是更加簡潔,更加Reactive呢?咱們能夠很是簡單地經過getConnectionObservable方法獲取數據庫鏈接,而後組合updateWithParamsObservable方法執行對應的含參SQL語句。只須要兩行有木有!而若是用callback-based的風格的話,你只能這麼寫:

client.getConnection(ar -> {
  if (ar.succeeded) {
    SQLConnection connection = ar.result();
    connection.updateWithParams(SAVE_STATEMENT, params, ar2 -> {
      // ...
    })
  } else {
    resultHandler.handle(Future.failedFuture(ar.cause()));
  }
})

所以,使用RxJava是很是愉快的一件事!固然vertx-sync也是一個不錯的選擇。

固然,不要忘記返回的Observablecold 的,所以只有在它被subscribe的時候,數據纔會被髮射。

不過話說回來了,Vert.x JDBC底層本質仍是阻塞型的調用,要實現真正的異步數據庫操做,咱們能夠利用 Vert.x MySQL / PostgreSQL Client 這個組件,底層使用Scala寫的異步數據庫操做庫,不過目前還不是很穩定,你們能夠本身嚐嚐鮮。

下面咱們再來看一下retrieveOne方法,它從數據存儲中獲取特定ID的事件:

@Override
public Observable<CartEvent> retrieveOne(Long id) {
  return client.getConnectionObservable()
    .flatMap(conn -> conn.queryWithParamsObservable(RETRIEVE_STATEMENT, new JsonArray().add(id)))
    .map(ResultSet::getRows)
    .filter(list -> !list.isEmpty())
    .map(res -> res.get(0))
    .map(this::wrapCartEvent);
}

很是簡潔明瞭,就像以前咱們的基於Future的範式類似,所以這裏就再也不詳細解釋了~

下面咱們來看一下里面最重要的方法 —— streamByUser方法:

@Override
public Observable<CartEvent> streamByUser(String userId) {
  JsonArray params = new JsonArray().add(userId).add(userId);
  return client.getConnectionObservable()
    .flatMap(conn -> conn.queryWithParamsObservable(STREAM_STATEMENT, params))
    .map(ResultSet::getRows)
    .flatMapIterable(item -> item) // list merge into observable
    .map(this::wrapCartEvent);
}

其核心在於它的SQL語句STREAM_STATEMENT

SELECT * FROM cart_event c
WHERE c.user_id = ? AND c.created_at > coalesce(
    (SELECT created_at FROM cart_event
       WHERE user_id = ? AND (`type` = "CHECKOUT" OR `type` = "CLEAR_CART")
     ORDER BY cart_event.created_at DESC
     LIMIT 1),
    0)
ORDER BY c.created_at ASC;

此SQL語句執行時會獲取與當前購物車相關的全部購物車事件。注意到咱們有許多用戶,每一個用戶可能會有許多購物車事件,它們屬於不一樣時間的購物車,那麼如何來獲取相關的事件呢?方法是 —— 首先咱們獲取最近一次「終結」事件發生對應的時間,那麼當前購物車相關的購物車事件就是在此終結事件發生後全部的購物車事件。

明白了這一點,咱們再回到streamByUser方法中來。既然此方法是從數據庫中獲取一個事件列表,那麼爲何此方法返回Observable<CartEvent>而不是Observable<List<CartEvent>>呢?咱們來看看其中的奧祕 —— flatMapIterable算子,它將一個序列變換爲一串數據流。因此,這裏的Observable<CartEvent>與Vert.x中的Future以及Java 8中的CompletableFuture就有些不一樣了。CompletableFuture更像是RxJava中的Single,它僅僅發送一個值或一個錯誤信息,而Observable自己則就像是一個數據流,數據源源不斷地從發佈者流向訂閱者。以前retrieveOnesave方法中返回的Observable的使用更像是一個Single,可是在streamByUser方法中,Observable是真真正正的事件數據流。咱們將會在購物車服務ShoppingCartService中處理事件流。

哇!如今你必定又被Rx這種函數響應式風格所吸引了~在下面的部分中,咱們將探索購物車服務及其實現,基於Future,一樣很是Reactive!

根據購物車事件序列構建對應的購物車狀態

咱們首先來看一下ShoppingCartService —— 購物車服務接口,它也是一個Event Bus服務:

@VertxGen
@ProxyGen
public interface ShoppingCartService {

  /**
   * The name of the event bus service.
   */
  String SERVICE_NAME = "shopping-cart-eb-service";

  /**
   * The address on which the service is published.
   */
  String SERVICE_ADDRESS = "service.shopping.cart";

  @Fluent
  ShoppingCartService addCartEvent(CartEvent event, Handler<AsyncResult<Void>> resultHandler);

  @Fluent
  ShoppingCartService getShoppingCart(String userId, Handler<AsyncResult<ShoppingCart>> resultHandler);

}

這裏咱們定義了兩個方法:addCartEvent用於將購物車事件存儲至事件存儲中;getShoppingCart方法用於獲取某個用戶當前購物車的狀態。

下面咱們來看一下其實現類 —— ShoppingCartServiceImpl。首先是addCartEvent方法,它很是簡單:

@Override
public ShoppingCartService addCartEvent(CartEvent event, Handler<AsyncResult<Void>> resultHandler) {
  Future<Void> future = Future.future();
  repository.save(event).toSingle().subscribe(future::complete, future::fail);
  future.setHandler(resultHand
  return this;
}

正如以前咱們所提到的,這裏save方法返回的Observable其實更像個Single,所以咱們將其經過toSingle方法變換爲Single,而後經過subscribe(future::complete, future::fail)將其轉化爲Future以便於給其綁定一個Handler<AsyncResult<Void>>類型的處理函數。

getShoppingCart方法的邏輯位於aggregateCartEvents方法中,此方法很是重要,而且是基於Future的。咱們先來看一下代碼:

private Future<ShoppingCart> aggregateCartEvents(String userId) {
  Future<ShoppingCart> future = Future.future();
  // aggregate cart events into raw shopping cart
  repository.streamByUser(userId) // (1)
    .takeWhile(cartEvent -> !CartEvent.isTerminal(cartEvent.getCartEventType())) // (2)
    .reduce(new ShoppingCart(), ShoppingCart::incorporate) // (3)
    .toSingle()
    .subscribe(future::complete, future::fail); // (4)

  return future.compose(cart ->
    getProductService() // (5)
      .compose(service -> prepareProduct(service, cart)) // (6) prepare product data
      .compose(productList -> generateCurrentCartFromStream(cart, productList)) // (7) prepare product items
  );
}

咱們來詳細地解釋一下。首先咱們先建立個Future,而後先經過repository.streamByUser(userId)方法獲取事件流(1),而後咱們使用takeWhile算子來獲取全部的ADD_ITEMREMOVE_ITEM類型的事件(2)。takeWhile算子在斷定條件變爲假時中止發射新的數據,所以當事件流遇到一個終結事件時,新的事件就再也不往外發送了,以前的事件將會繼續被傳遞。

下面就是產生購物車狀態的過程了!咱們經過reduce算子將事件流來「聚合」成購物車實體(3)。這個過程能夠總結爲如下幾步:首先咱們先建立一個空的購物車,而後依次將各個購物車事件「合併」至購物車實體中。最後聚合而成的購物車實體應該包含一個完整的amountMap

如今此Observable已經包含了咱們想要的初始狀態的購物車了。咱們將其轉化爲Single而後經過subscribe(future::complete, future::fail)轉化爲Future(4)。

如今咱們須要更多的信息以組件一個完整的購物車,因此咱們首先組合getProductService異步方法來從服務發現層獲取商品服務(5),而後經過prepareProduct方法來獲取須要的商品數據(6),最後經過generateCurrentCartFromStream異步方法組合出完整的購物車實體(7)。這裏麪包含了好幾個組合過程,咱們來一一解釋。

首先來看getProductService異步方法。它用於從服務發現層獲取商品服務,而後返回其異步結果:

private Future<ProductService> getProductService() {
  Future<ProductService> future = Future.future();
  EventBusService.getProxy(discovery,
    new JsonObject().put("name", ProductService.SERVICE_NAME),
    future.completer());
  return future;
}

如今咱們獲取到商品服務了,那麼下一步天然是獲取須要的商品數據了。這個過程經過prepareProduct異步方法實現:

private Future<List<Product>> prepareProduct(ProductService service, ShoppingCart cart) {
  List<Future<Product>> futures = cart.getAmountMap().keySet() // (1)
    .stream()
    .map(productId -> {
      Future<Product> future = Future.future();
      service.retrieveProduct(productId, future.completer());
      return future; // (2)
    })
    .collect(Collectors.toList()); // (3)
  return Functional.sequenceFuture(futures); // (4)
}

在此實現中,首先咱們從amountMap中獲取購物車中全部商品的ID(1),而後咱們根據每一個ID異步調用商品服務的retrieveProduct方法而且以Future包裝(2),而後將此流轉化爲List<Future<Product>>類型的列表(3)。咱們這裏想得到的是全部商品的異步結果,即Future<List<Product>>,那麼如何轉換呢?這裏我寫了一個輔助函數sequenceFuture來實現這樣的變換,它位於io.vertx.blueprint.microservice.common.functional包下的Functional類中:

public static <R> Future<List<R>> sequenceFuture(List<Future<R>> futures) {
  return CompositeFutureImpl.all(futures.toArray(new Future[futures.size()])) // (1)
    .map(v -> futures.stream()
        .map(Future::result) // (2)
        .collect(Collectors.toList()) // (3)
    );
}

此方法對於想將一個Future序列變換成單個Future的狀況很是有用。這裏咱們首先調用CompositeFutureImpl類的all方法(1),它返回一個組合的Future,當且僅當序列中全部的Future都成功完成時,它爲成功狀態,不然爲失敗狀態。下面咱們就對此組合Future作變換:獲取每一個Future對應的結果(由於all方法已經強制獲取全部結果),而後歸結成列表(3)。

回到以前的組合中來!如今咱們獲得了咱們須要的商品信息列表List<Product>,接下來就根據這些信息來構建完整的購物車實體了!咱們來看一下generateCurrentCartFromStream方法的實現:

private Future<ShoppingCart> generateCurrentCartFromStream(ShoppingCart rawCart, List<Product> productList) {
  Future<ShoppingCart> future = Future.future();
  // check if any of the product is invalid
  if (productList.stream().anyMatch(e -> e == null)) { // (1)
    future.fail("Error when retrieve products: empty");
    return future;
  }
  // construct the product items
  List<ProductTuple> currentItems = rawCart.getAmountMap().entrySet() // (2)
    .stream()
    .map(item -> new ProductTuple(getProductFromStream(productList, item.getKey()), // (3)
      item.getValue())) // (4) amount value
    .filter(item -> item.getAmount() > 0) // (5) amount must be greater than zero
    .collect(Collectors.toList());

  ShoppingCart cart = rawCart.setProductItems(currentItems); // (6)
  return Future.succeededFuture(cart); // (7)
}

看起來很是混亂的樣子。。。不要擔憂,咱們慢慢來~注意這個方法自己不是異步的,但咱們須要表示此方法成功或失敗兩種狀態(即AsyncResult),因此此方法仍然返回Future。首先咱們建立一個Future,而後經過anyMatch方法檢查商品列表是否合法(1)。若不合法,返回一個失敗的Future;若合法,咱們對每一個商品依次構建出對應的ProductTuple。在(3)中,咱們經過這個構造函數來構建ProductTuple:

public ProductTuple(Product product, Integer amount) {
  this.productId = product.getProductId();
  this.sellerId = product.getSellerId();
  this.price = product.getPrice();
  this.amount = amount;
}

其中第一個參數是對應的商品實體。爲了從列表中獲取對應的商品實體,咱們寫了一個getProductFromStream方法:

private Product getProductFromStream(List<Product> productList, String productId) {
  return productList.stream()
    .filter(product -> product.getProductId().equals(productId))
    .findFirst()
    .get();
}

當每一個商品的ProductTuple都構建完畢的時候,咱們就能夠將列表賦值給對應的購物車實體了(6),而且返回購物車實體結果(7)。如今咱們終於整合出一個完整的購物車了!

結算 - 根據購物車產生訂單

如今咱們已經選好了本身喜好的商品,把購物車填的慢慢噹噹了,下面是時候進行結算了!咱們這裏一樣定義了一個結算服務接口CheckoutService,它只包含一個特定的方法:checkout

@VertxGen
@ProxyGen
public interface CheckoutService {

  /**
   * The name of the event bus service.
   */
  String SERVICE_NAME = "shopping-checkout-eb-service";

  /**
   * The address on which the service is published.
   */
  String SERVICE_ADDRESS = "service.shopping.cart.checkout";

  /**
   * Order event source address.
   */
  String ORDER_EVENT_ADDRESS = "events.service.shopping.to.order";

  /**
   * Create a shopping checkout service instance
   */
  static CheckoutService createService(Vertx vertx, ServiceDiscovery discovery) {
    return new CheckoutServiceImpl(vertx, discovery);
  }

  void checkout(String userId, Handler<AsyncResult<CheckoutResult>> handler);

}

接口很是簡單,下面咱們來看其實現 —— CheckoutServiceImpl類。儘管接口只包含一個checkout方法,但咱們都知道結算過程可不簡單。。。它包含庫存檢測、付款(這裏暫時省掉了)以及生成訂單的邏輯。咱們先來看看checkout方法的源碼:

@Override
public void checkout(String userId, Handler<AsyncResult<CheckoutResult>> resultHandler) {
  if (userId == null) { // (1)
    resultHandler.handle(Future.failedFuture(new IllegalStateException("Invalid user")));
    return;
  }
  Future<ShoppingCart> cartFuture = getCurrentCart(userId); // (2)
  Future<CheckoutResult> orderFuture = cartFuture.compose(cart ->
    checkAvailableInventory(cart).compose(checkResult -> { // (3)
      if (checkResult.getBoolean("res")) { // (3)
        double totalPrice = calculateTotalPrice(cart); // (4)
        // 建立訂單實體
        Order order = new Order().setBuyerId(userId) // (5)
          .setPayId("TEST")
          .setProducts(cart.getProductItems())
          .setTotalPrice(totalPrice);
        // 設置訂單流水號,而後向訂單組件發送訂單並等待迴應
        return retrieveCounter("order") // (6)
          .compose(id -> sendOrderAwaitResult(order.setOrderId(id))) // (7)
          .compose(result -> saveCheckoutEvent(userId).map(v -> result)); // (8)
      } else {
        // 庫存不足,結算失敗
        return Future.succeededFuture(new CheckoutResult()
          .setMessage(checkResult.getString("message"))); // (9)
      }
    })
  );

  orderFuture.setHandler(resultHandler); // (10)
}

好吧,咱們又看到了大量的compose。。。是的,這裏咱們又組合了不少基於Future的異步方法。首先咱們先來判斷給定的userId是否合法(1),若是不合法的話馬上讓Future失敗掉;若用戶合法,咱們就經過getCurrentCart方法獲取給定用戶的當前購物車狀態(2)。這個過程是異步的,因此此方法返回Future<ShoppingCart>類型的異步結果:

private Future<ShoppingCart> getCurrentCart(String userId) {
  Future<ShoppingCartService> future = Future.future();
  EventBusService.getProxy(discovery,
    new JsonObject().put("name", ShoppingCartService.SERVICE_NAME),
    future.completer());
  return future.compose(service -> {
    Future<ShoppingCart> cartFuture = Future.future();
    service.getShoppingCart(userId, cartFuture.completer());
    return cartFuture.compose(c -> {
      if (c == null || c.isEmpty())
        return Future.failedFuture(new IllegalStateException("Invalid shopping cart"));
      else
        return Future.succeededFuture(c);
    });
  });
}

getCurrentCart方法中,咱們經過EventBusService接口的getProxy方法從服務發現層獲取購物車服務;而後咱們調用購物車服務的getShoppingCart方法獲取購物車。這裏咱們還須要檢驗購物車是否爲空,購物車不爲空的話就返回異步結果,爲空的話結算顯然不合適,返回不合法錯誤。

你可能已經注意到了checkout方法會產生一個CheckoutResult類型的異步結果,這表明結算的結果:

@DataObject(generateConverter = true)
public class CheckoutResult {
  private String message; // 結算結果信息
  private Order order; // 若成功,此項爲訂單實體
}

回到咱們的checkout方法中來。如今咱們要從獲取到的cartFuture進行一系列的操做,最終獲得Future<CheckoutResult>類型的結算結果。那麼進行哪些操做呢?首先咱們組合checkAvailableInventory異步方法,它用於獲取商品庫存檢測數據,後面咱們講詳細討論其實現。接着咱們檢查獲取到的商品庫存數據,判斷是否全部庫存都充足(3)。若是不充足的話,咱們直接返回一個CheckoutResult並標記庫存不足的信息(9)。若是庫存充足,咱們就計算出此訂單的總價(4)而後生成訂單Order(5)。訂單用Order數據對象表示,它包含如下信息:

  • 買家ID

  • 每一個所選商品的數量、單價以及賣家ID

  • 商品總價

生成初始訂單以後,咱們須要從計數器服務中生成該訂單的流水號(6),接着經過Event Bus向訂單組件中發送訂單數據,而且等待結帳結果CheckoutResult(7)。這些都作完之後,咱們向事件存儲中添加購物車結算事件(8)。最後咱們向最終獲得的orderFuture綁定resultHandler處理函數(10)。當結帳結果回覆過來的時候,處理函數將會被調用。

下面咱們來解釋一下上面出現過的一些異步過程。首先是最早提到的用於準備庫存數據的checkAvailableInventory方法:

private Future<JsonObject> checkAvailableInventory(ShoppingCart cart) {
  Future<List<JsonObject>> allInventories = getInventoryEndpoint().compose(client -> { // (1)
    List<Future<JsonObject>> futures = cart.getProductItems() // (2)
      .stream()
      .map(product -> getInventory(product, client)) // (3)
      .collect(Collectors.toList());
    return Functional.sequenceFuture(futures); // (4)
  });
  return allInventories.map(inventories -> {
    JsonObject result = new JsonObject();
    // get the list of products whose inventory is lower than the demand amount
    List<JsonObject> insufficient = inventories.stream()
      .filter(item -> item.getInteger("inventory") - item.getInteger("amount") < 0) // (5)
      .collect(Collectors.toList());
    // insufficient inventory exists
    if (insufficient.size() > 0) {
      String insufficientList = insufficient.stream()
        .map(item -> item.getString("id"))
        .collect(Collectors.joining(", ")); // (6)
      result.put("message", String.format("Insufficient inventory available for product %s.", insufficientList))
        .put("res", false); // (7)
    } else {
      result.put("res", true); // (8)
    }
    return result;
  });
}

有點複雜呢。。。首先咱們經過getInventoryEndpoint方法來從服務發現層獲取商品庫存組件對應的REST端點(1)。這是對HttpEndpoint接口的getClient方法的簡單封裝:

private Future<HttpClient> getInventoryEndpoint() {
  Future<HttpClient> future = Future.future();
  HttpEndpoint.getClient(discovery,
    new JsonObject().put("name", "inventory-rest-api"), // service name
    future.completer());
  return future;
}

接着咱們又要組合另外一個Future。在這個過程當中,咱們從購物車中獲取商品列表(2),而後將每一個ProductTuple都變換成對應的商品ID以及對應庫存(3)。以前咱們已經獲取到庫存服務REST端點對應的HttpClient了,下面咱們就能夠經過客戶端來獲取每一個商品的庫存。獲取庫存的過程是在getInventory方法中實現的:

private Future<JsonObject> getInventory(ProductTuple product, HttpClient client) {
  Future<Integer> future = Future.future(); // (A)
  client.get("/" + product.getProductId(), response -> { // (B)
    if (response.statusCode() == 200) { // (C)
      response.bodyHandler(buffer -> {
        try {
          int inventory = Integer.valueOf(buffer.toString()); // (D)
          future.complete(inventory);
        } catch (NumberFormatException ex) {
          future.fail(ex);
        }
      });
    } else {
      future.fail("not_found:" + product.getProductId()); // (E)
    }
  })
    .exceptionHandler(future::fail)
    .end();
  return future.map(inv -> new JsonObject()
    .put("id", product.getProductId())
    .put("inventory", inv)
    .put("amount", product.getAmount())); // (F)
}

過程很是簡潔明瞭。首先咱們先建立一個Future<Integer>來保存庫存數量異步結果(A)。而後咱們調用clientget方法來發送獲取庫存的請求(B)。在對迴應的處理邏輯responseHandler中,若是結果狀態爲 200 OK(C),咱們就能夠經過bodyHandler來解析迴應正文並將其轉換爲Integer類型(D)。這幾個過程都完成後,對應的future會被賦值爲對應的庫存數量;若是結果狀態不正常(好比400或404),那麼咱們就能夠認爲獲取失敗,將future置爲失敗狀態(E)。

只有庫存數量是不夠的(由於咱們不知道庫存對應哪一個商品),所以爲了方便起見,咱們將庫存數量和對應的商品號以及購物車中選定的數量都塞進一個JsonObject中,最後將Future<Integer>變換爲Future<JsonObject>類型的結果(F)。

再回到checkAvailableInventory方法中來。在(3)過程以後,咱們有的到了一個Future列表,因此咱們再次調用Functional.sequenceFuture方法將其變換成Future<List<JsonObject>>類型(4)。如今咱們能夠來檢查每一個庫存是否都充足了!咱們建立了一個列表insufficient專門存儲庫存不足的商品,這是經過filter算子實現的(5)。若是庫存不足的商品列表不爲空,那就是說有商品庫存不足,因此咱們須要獲取每一個庫存不足的商品ID並把其概括成一串信息。這裏咱們經過collect算子實現的:collect(Collectors.joining(", ")) (6)。這個小trick仍是很好使的,好比列表[TST-0001, TST-0002, BK-16623]會被歸結成 "TST-0001, TST-0002, BK-16623" 這樣的字符串。生成庫存不足商品的信息之後,咱們將此信息置於JsonObject中。同時,咱們在此JsonObject中用一個bool型的res來表示商品庫存是否充足,所以這裏咱們將res的值設爲false(7)。

若是以前得到的庫存不足的商品列表爲空,那麼就表明全部商品餘額充足,咱們就將res的值設爲true(8),最後返回異步結果future

再回到那一串組合中。咱們接着經過calculateTotalPrice方法來計算購物車中商品的總價,以便爲訂單生成提供信息。這個過程很簡單:

return cart.getProductItems().stream()
  .map(p -> p.getAmount() * p.getPrice()) // join by product id
  .reduce(0.0d, (a, b) -> a + b);

正如以前在checkout方法中提到的那樣,在建立原始訂單以後,咱們會對結果進行三個組合:retrieveCounter -> sendOrderAwaitResult -> saveCheckoutEvent。咱們來看一下。

咱們首先從緩存組件的計數器服務中生成當前訂單的流水號:

private Future<Long> retrieveCounter(String key) {
  Future<Long> future = Future.future();
  EventBusService.<CounterService>getProxy(discovery,
    new JsonObject().put("name", "counter-eb-service"),
    ar -> {
      if (ar.succeeded()) {
        CounterService service = ar.result();
        service.addThenRetrieve(key, future.completer());
      } else {
        future.fail(ar.cause());
      }
    });
  return future;
}

固然你也能夠直接用數據庫自帶的AUTO INCREMENT計數器,不過當有多臺數據庫服務器的時候,咱們須要保證計數器在集羣內的一致性。

接着咱們經過saveCheckoutEvent方法存儲購物車結算事件,其實現和getCurrentCart方法很是相似。它們都是先從服務發現層中獲取購物車服務,而後再異步調用對應的邏輯:

private Future<Void> saveCheckoutEvent(String userId) {
  Future<ShoppingCartService> future = Future.future();
  EventBusService.getProxy(discovery,
    new JsonObject().put("name", ShoppingCartService.SERVICE_NAME),
    future.completer());
  return future.compose(service -> {
    Future<Void> resFuture = Future.future();
    CartEvent event = CartEvent.createCheckoutEvent(userId);
    service.addCartEvent(event, resFuture.completer());
    return resFuture;
  });
}

向訂單模塊發送訂單

生成訂單流水號之後,如今咱們的訂單實體已是完整的了,能夠向下層訂單服務組件發送了。咱們來看一下其實現 —— sendOrderAwaitResult方法:

private Future<CheckoutResult> sendOrderAwaitResult(Order order) {
  Future<CheckoutResult> future = Future.future();
  vertx.eventBus().send(CheckoutService.ORDER_EVENT_ADDRESS, order.toJson(), reply -> {
    if (reply.succeeded()) {
      future.complete(new CheckoutResult((JsonObject) reply.result().body()));
    } else {
      future.fail(reply.cause());
    }
  });
  return future;
}

咱們將訂單實體發送至Event Bus上的一個特定地址中,這樣在訂單服務組件中,訂單服務就可以從Event Bus上獲取發送的訂單並對其進行處理和分發。注意到咱們調用的send函數同時還接受一個Handler<AsyncResult<Message<T>>>類型的參數,這意味着咱們須要等待消息接收者發送回的回覆消息。這實際上是一種相似於 請求/回覆模式 的消息模式。若是咱們成功地接收到回覆消息,咱們就將其轉化爲訂單結果CheckoutResult而且給future賦值;若是咱們收到了失敗的消息,或者接受消息超時,咱們就將future標記爲失敗。

好啦!在經歷了一系列的「組合」過程以後,咱們終於完成了對checkout方法的探索。是否是感受很Reactive呢?

因爲訂單服務並不知道咱們發送的地址,咱們須要向服務發現層中發佈一個 消息源,這裏的消息源其實就是咱們將訂單發送的位置。訂單就能夠經過服務發現層獲取對應的消費者MessageConsumer,而後今後處接受訂單。咱們將會在CartVerticle中發佈此消息源,不過在看CartVerticle的實現以前,咱們先來瞥一眼購物車服務的REST Verticle。

購物車服務REST API

在購物車服務相關的REST Verticle裏有三個主要的API:

  • GET /cart - 獲取當前用戶的購物車狀態

  • POST /events - 向購物車事件存儲中添加一個新的與當前用戶相關的購物車事件

  • POST /checkout - 發出購物車結算請求

注意這三個API都須要權限(登陸用戶),所以它們的路由處理函數都包裝着requireLogin方法。這一點已經在以前的API Gateway章節中提到過:

// api route handler
router.post(API_CHECKOUT).handler(context -> requireLogin(context, this::apiCheckout));
router.post(API_ADD_CART_EVENT).handler(context -> requireLogin(context, this::apiAddCartEvent));
router.get(API_GET_CART).handler(context -> requireLogin(context, this::apiGetCart));

它們的路由函數實現卻是很是簡單,咱們這裏只看一個apiAddCartEvent方法:

private void apiAddCartEvent(RoutingContext context, JsonObject principal) {
  String userId = Optional.ofNullable(principal.getString("userId"))
    .orElse(TEST_USER); // (1)
  CartEvent cartEvent = new CartEvent(context.getBodyAsJson()); // (2)
  if (validateEvent(cartEvent, userId)) {
    shoppingCartService.addCartEvent(cartEvent, resultVoidHandler(context, 201)); // (3)
  } else {
    context.fail(400); // (4)
  }
}

首先咱們從當前的用戶憑證principal中獲取用戶ID。若是當前用戶憑證中獲取不到ID,那麼咱們就暫時用TEST_USER來替代(1)。而後咱們根據請求正文來建立購物車事件CartEvent(2)。咱們同時須要驗證購物車事件中的用戶與當前做用域內的用戶是否相符。若相符,則調用服務的addCartEvent方法將事件添加至事件存儲中,並在成功時返回 201 狀態(3)。若是請求正文中的購物車事件不合法,咱們就返回 400 Bad Request* 狀態(4)。

Cart Verticle

CartVerticle是購物車服務組件的Main Verticle,用於發佈各類服務。這裏咱們會發布三個服務:

  • shopping-checkout-eb-service: 結算服務,這是一個 Event Bus 服務

  • shopping-cart-eb-service: 購物車服務,這是一個 Event Bus 服務

  • shopping-order-message-source: 發送訂單的消息源,這是一個 消息源服務

同時咱們的CartVerticle也負責部署RestShoppingAPIVerticle。注意不要忘掉設置api.name:

{
  "api.name": "cart"
}

這是購物車部分的UI:

Cart Page

訂單服務

好啦!如今咱們已經提交告終算請求,在底層訂單已經發送至訂單微服務組件中了。因此下一步天然就是訂單服務的責任了 —— 分發訂單以及處理訂單。在當前版本的Micro Shop實現中,咱們僅僅將訂單存儲至數據庫中並變動對應的商品庫存數額。在正常的生產環境中,咱們一般會將訂單push到消息隊列中,而且在下層服務中從消息隊列中pull訂單並進行處理。

訂單存儲服務的實現與以前太相似了,所以這裏就不講OrderService及其實現的細節了。你們能夠自行查看相關代碼

咱們的訂單處理邏輯寫在RawOrderDispatcher這個verticle中,下面咱們就來看一下。

消費消息源發送來的數據

首先咱們須要從消息源中根據服務名稱獲取消息消費者,而後從消費者處獲取發送來的訂單。這能夠經過MessageSource接口的getConsumer方法實現:

@Override
public void start(Future<Void> future) throws Exception {
  super.start();
  MessageSource.<JsonObject>getConsumer(discovery,
    new JsonObject().put("name", "shopping-order-message-source"),
    ar -> {
      if (ar.succeeded()) {
        MessageConsumer<JsonObject> orderConsumer = ar.result();
        orderConsumer.handler(message -> {
          Order wrappedOrder = wrapRawOrder(message.body());
          dispatchOrder(wrappedOrder, message);
        });
        future.complete();
      } else {
        future.fail(ar.cause());
      }
    });
}

獲取到對應的MessageConsumer之後,咱們就能夠經過handler方法給其綁定一個Handler<Message<T>>類型的處理函數,在此處理函數中咱們就能夠對獲取的消息進行各類操做。這裏咱們的message body是JsonObject類型的,因此咱們首先將其轉化爲訂單實體,而後就能夠對其進行分發和處理了。對應的邏輯在dispatchOrder方法中。

「處理」訂單

咱們來看一下dispatchOrder方法中的簡單的「分發處理」邏輯:

private void dispatchOrder(Order order, Message<JsonObject> sender) {
  Future<Void> orderCreateFuture = Future.future();
  orderService.createOrder(order, orderCreateFuture.completer()); // (1)
  orderCreateFuture
    .compose(orderCreated -> applyInventoryChanges(order)) // (2)
    .setHandler(ar -> {
      if (ar.succeeded()) {
        CheckoutResult result = new CheckoutResult("checkout_success", order); // (3)
        sender.reply(result.toJson()); // (4)
        publishLogEvent("checkout", result.toJson(), true); // (5)
      } else {
        sender.fail(5000, ar.cause().getMessage()); // (6)
        ar.cause().printStackTrace();
      }
    });
}

首先咱們先建立一個Future表明向數據庫中添加訂單的異步結果。而後咱們調用訂單服務的createOrder方法將訂單存儲至數據庫中(1)。能夠看到咱們給此方法傳遞的處理函數是orderCreateFuture.completer(),這樣當添加操做結束時,對應的Future就會被賦值。下一步咱們組合一個異步方法 —— applyInventoryChanges方法,用於變動商品庫存數量(2)。若是這兩個過程都成功完成的話,咱們就建立一個表明結算成功的CheckoutResult實體(3),而後調用reply方法向消息發送者回復結算結果(4)。以後咱們向Event Bus發送結算事件來通知日誌組件記錄日誌(5)。若是其中有過程失敗的話,咱們須要對消息發送者sender調用fail方法來通知操做失敗(6)。

很簡單吧?下面咱們來看一下applyInventoryChanges方法的實現,看看如何變動商品庫存數量:

private Future<Void> applyInventoryChanges(Order order) {
  Future<Void> future = Future.future();
  // 從服務發現層獲取REST端點
  Future<HttpClient> clientFuture = Future.future();
  HttpEndpoint.getClient(discovery,
    new JsonObject().put("name", "inventory-rest-api"), // 服務名稱
    clientFuture.completer());
  // 經過調用REST API來變動對應的庫存
  return clientFuture.compose(client -> {
    List<Future> futures = order.getProducts()
      .stream()
      .map(item -> { // 變換成對應的異步結果
        Future<Void> resultFuture = Future.future();
        String url = String.format("/%s/decrease?n=%d", item.getProductId(), item.getAmount());
        client.put(url, response -> {
          if (response.statusCode() == 200) {
            resultFuture.complete();
          } else {
            resultFuture.fail(response.statusMessage());
          }
        })
          .exceptionHandler(resultFuture::fail)
          .end();
        return resultFuture;
      })
      .collect(Collectors.toList());
    // 每一個Future必須都success,生成的組合Future纔會success
    CompositeFuture.all(futures).setHandler(ar -> {
      if (ar.succeeded()) {
        future.complete();
      } else {
        future.fail(ar.cause());
      }
    });
    return future;
  });
}

相信你必定不會對此方法的實現感到陌生,由於它和咱們以前在購物車服務中講的getInventory方法很是相似。咱們首先獲取庫存組件對應的HTTP客戶端,接着對訂單中每一個商品,根據其數額來調用REST API減小對應的庫存。調用REST API獲取結果的過程是異步的,所以這裏咱們又獲得了一個List<Future>。可是這裏咱們並不須要每一個Future的實際結果。咱們只須要每一個Future的狀態,所以這裏僅需調用CompositeFuture.all方法獲取全部Future的組合Future

至於組件中的OrderVerticle,它只作了三件微小的事情:發佈訂單服務、部署用於訂單分發處理的RawOrderDispatcher以及部署REST Verticle。

Micro Shop SPA整合

在咱們的Micro Shop項目中,咱們提供了一個用Angular.js寫的簡單的SPA前端頁面。那麼問題來了,如何將其整合至咱們的微服務中?

注意:當前版本中,爲了方便起見,咱們將SPA部分整合進了api-gateway模塊中。在生產環境下UI部分一般要單獨部署。

有了Vert.x Web的魔力,咱們只須要作的是配置一下路由,讓其能夠處理靜態資源便可!只須要一行:

router.route("/*").handler(StaticHandler.create());

默認狀況下靜態資源映射的目錄是webroot目錄,固然你也能夠在建立StaticHandler的時候來配置映射目錄。

監控儀表板與統計數據

監控儀表板(Monitor Dashboard)一樣也是一個SPA前端應用。在本章節中咱們會涉及到如下內容:

  • 如何配置SockJS - EventBus bridge

  • 如何在瀏覽器中接受來自Event Bus的信息

  • 如何利用 Vert.x Dropwizard Metrics 來獲取Vert.x組件的統計數據

SockJS - Event Bus Bridge

不少時候咱們想要在瀏覽器中接收來自Event Bus的消息並進行處理。聽起來很神奇吧~並且你應該可以想象到,Vert.x支持這麼作!Vert.x提供了 SockJS - Event Bus Bridge 來支持服務的和客戶端(一般是瀏覽器端)經過Event Bus進行通訊。

爲了開啓SockJS - Event Bus Bridge支持,咱們須要配置SockJSHandler以及對應的路由器:

// event bus bridge
SockJSHandler sockJSHandler = SockJSHandler.create(vertx); // (1)
BridgeOptions options = new BridgeOptions()
  .addOutboundPermitted(new PermittedOptions().setAddress("microservice.monitor.metrics")) // (2)
  .addOutboundPermitted(new PermittedOptions().setAddress("events.log"));

sockJSHandler.bridge(options); // (3)
router.route("/eventbus/*").handler(sockJSHandler); // (4)

首先咱們建立一個SockJSHandler (1),它用於處理Event Bus信息。默認狀況下,爲了安全起見,Vert.x不容許任何消息經過Event Bus傳輸至瀏覽器端,所以咱們須要對其進行配置。咱們能夠建立一個BridgeOptions而後設定容許單向傳輸消息的地址。這裏有兩種地址:Outbound 以及 Inbound。Outbound地址容許服務端向瀏覽器端經過Event Bus發送消息,而Inbound地址容許瀏覽器端向服務端經過Event Bus發送消息。這裏咱們只須要兩個Outbound Address:microservice.monitor.metrics用做傳輸統計數據,events.log用做傳輸日誌消息(2)。接着咱們就能夠將配置好的BridgeOptions設置給Bridge(3),最後配置對應的路由。瀏覽器端的SockJS客戶端會使用/eventbus/*路由路徑來進行通訊。

將統計數據發送至Event Bus

在微服務架構中,監控(Monitoring)也是重要的一環。有了Vert.x的各類Metrics組件,如 Vert.x Dropwizard MetricsVert.x Hawkular Metrics,咱們能夠從對應的組件中獲取到統計數據。

這裏咱們使用 Vert.x Dropwizard Metrics。使用方法很簡單,首先建立一個MetricsService實例:

MetricsService service = MetricsService.create(vertx);

接着咱們就能夠調用getMetricsSnapshot方法獲取各類組件的統計數據。此方法接受一個實現了Measured接口的類。Measured接口定義了獲取Metrics Data的一種規範,Vert.x中主要的類,如VertxEventBus都實現了此接口。所以傳入不一樣的Measured實現就能夠獲取不一樣的數據。這裏咱們傳入了Vertx實例來獲取更多的統計數據。獲取的統計數據的格式爲JsonObject

// send metrics message to the event bus
vertx.setPeriodic(metricsInterval, t -> {
  JsonObject metrics = service.getMetricsSnapshot(vertx);
  vertx.eventBus().publish("microservice.monitor.metrics", metrics);
});

咱們設定了一個定時器,每隔一段時間就向microservice.monitor.metrics地址發送當前的統計數據。

若是想了解統計數據都包含什麼,請參考 Vert.x Dropwizard metrics 官方文檔

如今是時候在瀏覽器端接收並展現統計數據以及日誌消息了~

在瀏覽器端接收Event Bus上的消息

爲了在瀏覽器端接收Event Bus上的消息,咱們首先須要這兩個庫: vertx3-eventbus-client以及sockjs。你能夠經過npm或bower來下載這兩個庫。而後咱們就能夠在代碼中建立一個EventBus實例,而後註冊處理函數:

var eventbus = new EventBus('/eventbus');

eventbus.onopen = () => {
  eventbus.registerHandler('microservice.monitor.metrics', (err, message) => {
      $scope.metrics = message.body;
      $scope.$apply();
  });
}

咱們能夠經過message.body來獲取對應的消息數據。

以後咱們將會運行這個儀表板來監視整個微服務應用的狀態。

展現時間!

哈哈,如今咱們已經看完整個Micro Shop微服務的源碼了~看源碼看的也有些累了,如今到了展現時間了!這裏咱們使用Docker Compose來編排容器並運行咱們的微服務應用,很是方便。

注意:建議預留 4GB 內存來運行此微服務應用。

構建項目以及容器

在咱們構建整個項目以前,咱們須要先經過bower獲取api-gatewaymonitor-dashboard這兩個組件中前端代碼對應的依賴。它們的bower.json文件都在對應的src/main/resources/webroot目錄中。咱們分別進入這兩個目錄並執行:

bower install

而後咱們就能夠構建整個項目了:

mvn clean install

構建完項目之後,咱們再來構建容器(須要root權限):

cd docker
sudo ./build.sh

構建完成後,咱們就能夠來運行咱們的微服務應用了:

sudo ./run.sh

當整個微服務初始化完成的時候,咱們就能夠在瀏覽器中瀏覽網店頁面了,默認地址是 https://localhost:8787

第一次運行?

若是咱們是第一次運行此微服務應用(或以前刪除了全部的容器),咱們必須手動配置Keycloak服務器。首先咱們須要在hosts文件中添加一條記錄:

0.0.0.0    keycloak-server

而後咱們須要訪問 http://keycloak-server:8080而後進入管理員登陸頁面。默認狀況下用戶名和密碼都是 admin。進入管理臺以後,咱們須要建立一個 Realm,名字隨意(實例中給的是Vert.x)。而後進入此Realm,而且爲咱們的應用建立一個Client,相似於這樣:

Keycloak configuration

建立完之後,咱們進入 Installation 選項卡中來複制對應的JSON配置文件。咱們須要將複製的內容覆蓋掉api-gateway/src/config/docker.json中對應的配置。好比:

{
  "api.gateway.http.port": 8787,
  "api.gateway.http.address": "localhost",
  "circuit-breaker": {
    "name": "api-gateway-cb",
    "timeout": 10000,
    "max-failures": 5
  },
  // 下面的都是Keycloak相關的配置
  "realm": "Vert.x",
  "realm-public-key": "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAkto9ZZm69cmdA9e7X4NUSo8T4CyvrYzlRiJdhr+LMqELdfN3ghEY0EBpaROiOueva//iUc/KViYGiAHVXEQ3nr3kytF6uZs9iwqkshKvltpxkOm2Qpj/FSRsCyHlB8Ahbt5xBmzH2mI1VDIxmVTdEBze4u6tLoi4ieo72b2q/dz09yrEokRm/sSYqzNgfE0i1JY6DI8C7FaKszKTK5DRGMIAib8wURrTyf8au0iiisKEXOHKEjo/g0uHCFGSOKqPOprNNIWYwedV+qaQa9oSah2IpwNgFNRLtHpvbcanftMLQOQIR0iufIJ+bHrNhH0RISZhTzcGX3pSIBw/HaERwQIDAQAB",
  "auth-server-url": "http://127.0.0.1:8180/auth",
  "ssl-required": "external",
  "resource": "vertx-blueprint",
  "credentials": {
    "secret": "ea99a8e6-f503-4bdb-afbd-9ae322ee7089"
  },
  "use-resource-role-mappings": true
}

咱們還須要建立幾個用戶(User)以便後面經過這些用戶來登陸。

更詳細的Keycloak的配置過程及解釋請參考Paulo的教程: Vertx 3 and Keycloak tutorial,很是詳細。

修改完對應的配置文件以後,咱們必須從新構建api-gateway模塊的容器,而後從新啓動此容器。

歡樂的購物時間!

完成配置以後,咱們就來訪問前端頁面吧!

SPA Frontend

如今咱們能夠訪問 https://localhost:8787/login 進行登陸,它會跳轉至Keycloak的用戶登陸頁面。若是登錄成功,它會自動跳轉回Micro Shop的主頁。如今咱們能夠盡情地享受購物時間了!這真是極好的!

咱們也能夠來訪問Monitor Dashboard,默認地址是 http://localhost:9100

Monitor Dashboard

一顆賽艇!

完結!

不錯不錯!咱們終於到達了微服務旅途的終點!恭喜!咱們很是但願你可以喜歡此藍圖教程,而且掌握到關於Vert.x和微服務的知識 :-)

如下是關於微服務和分佈式系統的一些推薦閱讀材料:

享受微服務的狂歡吧!


My Blog: 「千載絃歌,芳華如夢」 - sczyh30's blog

若是您對Vert.x感興趣,歡迎加入Vert.x中國用戶組QQ羣,一塊兒探討。羣號:515203212

相關文章
相關標籤/搜索