[譯] 流式處理:使用 Apache Kafka 的 Streams API 實現 Rabobank 的實時財務告警

原文:Real-time Financial Alerts at Rabobank with Apache Kafka’s Streams API
原做者:Jeroen van Disseldorphtml

本文討論使用 Apache Kafka 的 Streams API 向 Rabobank 的客戶發送告警。Rabobank(荷蘭合做銀行)總部位於荷蘭,在全球擁有 900 多個分支機構,48,000 名員工和 681 億歐元的資產。Rabobank 是一家由客戶和銀行組成的合做銀行,一家對社會負責的銀行。其目標是成爲荷蘭金融市場的領導者。Rabobank 還致力於成爲全球食品和農業領域的領先銀行。Rabobank 向全球數百萬客戶提供金融產品和服務。java


在過去的幾年中,Rabobank 一直在爲成爲一家實時的,事件驅動的銀行而進行積極的投資。若是你熟悉銀行的業務流程,應該會明白這並不能一蹴而就。許多銀行業務流程都是在非商用硬件上以批處理做業的形式進行的,所以遷移工做很是艱鉅。但如前所述,Rabobank 接受了這一挑戰
,並定義了一個業務事件總線(BEB,Business Event Bus),用於應用程序之間共享整個組織架構的業務事件。
Rabobank 選擇 Apache Kafka 做爲底層的主要引擎,並編寫了本身的的 BEB 客戶端庫,以方便應用程序開發人員使用簡單的消息生產/消費以及災難恢復等功能。git

Rabobank 採用 Active-Active 的 Kafka 設置,Kafka 集羣在多個數據中心進行對稱鏡像。當數據中心出現故障或由操做人員干預後,BEB 客戶端(包括本文討論的基於 Kafka Streams 的應用程序)會切換到另外一個 Kafka 集羣,而無需從新啓動。實如今災難情景和計劃維護時段內的 24×7 不間斷運行。BEB 客戶端庫爲生產者、消費者以及流式應用提供了這種切換機制。github

Rabo Alerts 是一個由一系列生產、消費、流式消息等微服務組成的系統,基於 BEB 實現。下面討論的全部數據類型和代碼均可以在 GitHub 中找到。本文將在必定程度上簡化源碼清單(如刪除未使用的字段),但這些清單仍反映了生產中實際運行的代碼。apache

案例:Rabo Alerts

Rabo Alerts 服務可讓 Rabobank 的客戶接收其關注的財務事件告警。例如某筆款項從帳戶中扣除或者記入帳戶,以及其它更復雜的事件。客戶能夠根據本身的偏好配置告警,並經過第三方渠道發送:如電子郵件、短信和移動推送通知。值得一提的是,Rabo Alerts 並非一項新的或試用服務,它已經投產十多年,可供數百萬帳戶持有者使用。api

面臨的問題

舊的 Rabo Alerts 實現主要是在大型機系統上。全部的處理步驟都是面向批處理的,大型機會根據告警類型派生告警,並每隔幾分鐘發送,但天天只發送幾回。這種實現很是穩定可靠,但 Rabobank 但願解決兩個問題:(1)靈活性不足;(2)告警發送速度慢。服務器

因爲對現有告警進行更改或添加新(更智能)的告警須要很大的工做量,所以舊的 Rabo Alerts 對適應新業務需求的靈活性很低。在過去幾年中,Rabobank 在其在線環境中引入新功能的步伐大幅增長,舊有僵化的告警解決方案變得愈來愈成問題。markdown

告警的傳遞速度也是一個問題,舊的 Rabo Alerts 可能須要 5 分鐘到 4-5 小時才能向用戶發送告警(取決於告警類型和批處理窗口)。若是在十年前,這個速度可能足夠快了,但現在客戶的指望值要高得多。如今 Rabobank 向客戶提供「相關信息」的時間窗口要比過去十年小得多。架構

所以,如何從新設計現有的機制,使其具備更強的擴展性及更快的速度,即是擺在眼前的問題。固然,從新設計的 Rabo Aerts 也須要穩定可靠,以便可以正確地爲現有數百萬的用戶羣提供服務。併發

從小處着手

在過去的一年裏,咱們一直使用 Kafka 及其 Streams API 從新設計和實現告警機制。因爲整個 Rabo Alerts 服務至關龐大,咱們決定從四個簡單但使用率高的告警開始:

  • 餘額高於閾值
  • 餘額低於閾值
  • 超過閾值的貸記(Credit)
  • 超過閾值的借記(Debit)

這些告警的每個均可以從當前帳戶系統的支付信息流中派生出來。例如:「當個人餘額低於 100 歐元時向我發送短信」或「當有人給我超過 1000 歐元時向我推送消息」(一般用於存款通知)。

如下截圖說明如何經過手機銀行 app 配置 Rabo Alerts ——

raboalerts-mobile-banking.png-157.5kB

告警拓撲

咱們的第一步是從新設計告警過程,基本流程以下:

  1. 挖掘來自支付工廠的交易流,產生一連串的 AccountEntry(帳戶會計條目)。注意,每個支付交易老是由兩個 AccountEntry 組成,即借記(Debit)和貸記(Credit)。
  2. 對每一個 AccountEntry 執行如下步驟:

    • a. 將具備帳戶讀取權限的帳號轉換爲一個客戶列表。
    • b. 對每一個客戶執行如下步驟:

      • i. 查看客戶是否爲給定帳號配置了 Rabo Alert。
      • ii. 若是是,檢查此 AccountEntry 是否符合客戶設置的告警條件。
      • iii. 如何符合,經過客戶配置的渠道(電子郵件、短信、消息推送)發送告警。

步驟 1 須要與執行交易的核心銀行系統創建連接。
步驟 2a 須要創建一個查詢表,其中包含全部帳戶的全部客戶權限。
步驟 2b 須要創建一個查詢表,其中包含全部客戶的 Rabo Alert 設置。

該流程的使用及其需求見下圖:

raboalerts-alerting-topology-flow.jpg-77.8kB

圖中全部白色框都是 Kafka 主題(Topic),其中列出了它們的 Avro 鍵/值數據類型。大部分數據類型都是不言自明的,但如下數據類型值得一提:

  • CustomerAlertSettings:特定客戶的告警設置,這些設置包括:

    • CustomerAlertAddresses:客戶用於接收告警消息的渠道及地址列表。移動推送地址此處以 CustormerId 表示,由於註冊移動設備的實際列表是在消息發送過程當中肯定的。
    • CustomerAccountAlertSettings:客戶爲特定帳戶設定的告警配置列表。這個列表指定了客戶但願接收特定帳戶的哪些告警及其閾值。
  • ChannelType:可用的渠道類型枚舉,當前爲 EMAIL、PUSH 和 SMS。
  • AccountEntry:一條支付帳戶的會計記帳。一個記帳條目是一個支付交易的一半,能夠是一個借記條目(Debit),也能夠是一個貸記條目(Credit)。
  • OutboundMessage:發送給客戶的消息內容。包含消息類型和參數,但不包含其尋址。這些信息由 Outbound topic 的 Key 承載。

藍色框表示獨立的應用程序(或稱微服務),是使用 Spring Boot 實現的可執行 jar,並部署在託管平臺上。它們一塊兒組成了實現 Rabo Alerts 的全部必要功能:

  • Alert Settings Manager:告警配置管理器。向一個 compacted topic (開啓了 Log Compaction 的 Kafka 主題)發佈每一個客戶的全部自定義告警設置。
  • Account Authorization Manager:帳戶受權管理器。帳戶並非和客戶一對一綁定,而是能夠由不一樣的用戶查看。例如,配偶之間共享帳戶;或企業帳戶針對不一樣員工的不一樣受權。這些狀況下可能會產生任意的帳戶/用戶間的受權關係。該應用程序向一個 compacted topic 發佈帳號和受權客戶ID的關係。它是實時的,以便受權的變化在發送告警時能當即生效。
  • Account Entry Bridge:經過 IBM MQ 從 Rabobank 基於大型機的支付工廠中檢索全部支付流,並轉發到 Kafka 的 topic。
  • Alerting:核心告警服務,參見下文。
  • Device Resolver:設備解析器,輔助應用。從外部系統查找全部客戶的移動設備,並將相同的告警消息寫入各個設備對應的 topic 中(PushId)。客戶移動設備的查找能夠經過一個 compacted topic 完成,但因爲各類不一樣的緣由,此處是經過遠程服務調用的方式實現的。
  • Senders:每個 Sender 消費其綁定的渠道 topic 的消息,併發送給尋址客戶。每種渠道都被分配了各自的 Kafka topic,以使各類渠道的故障能彼此分離。例如,當電子郵件服務器關閉時,告警消息依然能夠經過消息推送的方式發送出去。

廢話少說,放碼過來

使用 Kafka Streams 編碼實現告警只須要 2 個類。

第一個類是 BalanceAlertsTopology。這個類使用給定的 KStreamBuilder 定義主要的 Kafka Streams 拓撲(Topology)。它實現了 BEB 的 TopologyFactory,是一個 BEB 客戶端庫使用的自定義接口,用於在應用程序啓動後或 Kafka 集羣切換(如數據中心切換/故障轉移)時生成新的 Kafka Streams Topology。

KStream<CustomerId, KeyValue<SpecificRecord, OutboundMessage>> addressedMessages =
    builder.<AccountId, AccountEntry>stream(accountEntryStream)
        .leftJoin(accountToCustomerIds, (accountEntry, customerIds) -> {
          if (isNull(customerIds)) {
            return Collections.<KeyValue<CustomerId, AccountEntry>>emptyList();
          } else {
            return customerIds.getCustomerIds().stream()
                .map(customerId -> KeyValue.pair(customerId, accountEntry))
                .collect(toList());
          }
        })
        .flatMap((accountId, accountentryByCustomer) -> accountentryByCustomer)
        .through(customerIdToAccountEntryStream)
        .leftJoin(alertSettings, Pair::with)
        .flatMapValues(
            (Pair<AccountEntry, CustomerAlertSettings> accountEntryAndSettings) ->
                BalanceAlertsGenerator.generateAlerts(
                    accountEntryAndSettings.getValue0(),
                    accountEntryAndSettings.getValue1())
        );

// Send all Email messages from addressedMessages
addressedMessages
    .filter((e, kv) -> kv.key instanceof EmailAddress)
    .map((k, v) -> v)
    .to(emailMessageStream);

// Send all Sms messages from addressedMessages
addressedMessages
    .filter((e, kv) -> kv.key instanceof PhoneNumber)
    .map((k, v) -> v)
    .to(smsMessageStream);

// Send all Push messages from addressedMessages
// (CustomerId is later resolved to a list of customer's mobile devices)
addressedMessages
    .filter((e, kv) -> kv.key instanceof CustomerId)
    .map((k, v) -> v)
    .to(customerPushMessageStream);

該 Topology 定義瞭如下幾個步驟:

  • 1-13 行,從消費 AccountEntry 流開始,當檢索到一個 AccountEntry 時,會查找哪些客戶有權訪問該帳戶,並將結果存儲在一箇中間 topic 中,以 CustomerId 爲 Key,AccountEntry 爲 Value。該 topic 的意思是「這個客戶(Key)的這個 AccountEntry(Value)須要處理」。
  • 14-20 行,針對每一個客戶執行。檢查客戶的告警設置,若是 AccountEntry 符合客戶的告警設置,會要求輔助類生成 OutboundMessage
  • 22-39 行,遍歷全部的 OutboundMessage,並將它們分配到各自的渠道 topic。

告警消息是在 17 行調用輔助類 BalanceAlertsGenerator 生成的。其主要方法是 generateAlerts(),該方法獲取一個 AccountEntry,並從具備該帳戶查看權限的客戶中獲取告警配置。如下是它的代碼:

public static List<KeyValue<SpecificRecord, OutboundMessage>> generateAlerts(AccountEntry accountEntry,
                                                                             CustomerAlertSettings settings) {
      /* 使用告警設置爲一個 AccountEntry 生成完成尋址的告警,步驟以下:
      *  1) 使用特定帳戶的告警設置,過濾掉不屬於該帳戶的 AccountEntry
      *  2) 匹配告警設置中的每一項設置,生成適當的消息
      *  3) 爲生成的消息添加尋址信息
      */

  if (settings == null) {
    return new ArrayList<>();
  }

  return settings.getAccountAlertSettings().stream()
      .filter(accountAlertSettings -> matchAccount(accountEntry, accountAlertSettings))
      .flatMap(accountAlertSettings -> accountAlertSettings.getSettings().stream())
      .flatMap(accountAlertSetting -> Stream.of(
          generateBalanceAbove(accountEntry, accountAlertSetting),
          generateBalanceBelow(accountEntry, accountAlertSetting),
          generateCreditedAbove(accountEntry, accountAlertSetting),
          generateDebitedAbove(accountEntry, accountAlertSetting))
      )
      .filter(Optional::isPresent).map(Optional::get)
      .flatMap(messageWithChannels -> mapAddresses(messageWithChannels.getValue0(), settings.getAddresses())
          .map(address -> KeyValue.pair(address, messageWithChannels.getValue1())))
      .collect(toList());
}

該方法執行如下步驟:

  • 13 行,流化全部帳戶相關的告警設置(一個帳戶一個對象)。
  • 14 行,將告警設置中的帳號和 AccountEntry 中的帳號進行匹配。
  • 15 行,流化告警設置中的各項設置。
  • 16-21 行,構造要發送的一系列消息,以及用於發送消息的渠道列表(這裏對每種告警類型都使用了單獨的方法),結果是一個 Pair<List, OutboundMessage> 流。
  • 22 行,過濾空結果。
  • 23-24 行,爲指定渠道查找客戶地址,並返回一個 KeyValue<address, OutboundMessage> 流。
  • 25 行,收集全部結果,並做爲 List 返回。

這個類的其它輔助方法:

  • matchAccount():經過比較帳號和幣種,來匹配 AccountEntry 和帳戶告警設置。
  • generateBalanceAbove/Below():生成 BalanceAbove/Below 告警消息(餘額高於/低於閾值)。
  • generateDebited/CreditedAbove():生成 Debited/CreditedAbove 告警消息(超出閾值的借記/貸記)。
  • mapAddresses():查找指定渠道列表的客戶告警接收地址。
  • buildMessage():構建一個 OutboundMessage

再加上一些其它額外的類來將這個功能包裝在一個獨立的應用程序中,這就是它的所有功能!

第一次測試運行

在第一次的初步實現後,咱們進行了測試運行。事實令咱們驚訝,而且指望值高漲。從支付訂單確認到移動設備收到告警的整個過程只須要一到兩秒鐘,且一秒的狀況居多。這個過程還包括了支付工廠所花費的時間(驗證支付訂單,交易處理),所以響應時間可能會依當時的支付工廠工做量而有所不一樣。整個告警鏈——從 AccountEntry 在 Kafka 上發佈,到將消息推送給客戶——一般在 120 毫秒內完成。在發送階段,推送(PUSH)告警是最快的,僅需 100-200 毫秒便可到達客戶的移動設備。電子郵件(EMAIL)和短信(SMS)稍慢,一般在發出消息後的 2-4 秒到達。相比之下,舊有的體系一般須要幾分鐘的時間才能提供告警。

下面的視頻演示了使用個人我的測試帳戶進行告警傳輸的速度。請注意,雖然是測試用的,但這也是一個正常運行的 Rabobank 支付帳戶!

【只是一段演示視頻,markdown 插入視頻比較麻煩,原文看吧,或看下面的文字解說】

首先我在個人設備上啓用了告警,並配置了閾值爲 0 的 DebitedAboveThreshold 告警(「More withdrawn than」)。這意味着超過 0 歐元的任何支付都會向我發送告警。我設置了 PUSH 和 SMS 兩種渠道告警(視頻中未演示),所以告警會經過兩個渠道發給我。保存設置並返回主屏幕後,我開始向個人同事 Joris Meijer 轉帳 1 歐元,並經過指紋驗證。以後付款訂單被髮送到支付工廠進行處理。在訂單確認關閉以前,推送(PUSH)告警已經在屏幕頂部彈出,如通知窗口所示。幾秒鐘後,相同的告警消息也以 SMS 的方式到達。

回顧

新機制簡潔而優雅,只須要少數 Java 類組成。這個邏輯大約四個星期寫完,但要使整個拓撲工做須要大約六個月的時間。這主要是由於 Alert Settings ManagerAccount Authorizations Manager 以及 Account Entry Bridge 須要和銀行的其它業務模塊達成一致。

在團隊內部的告警測試以後,須要更完全更大規模的測試。畢竟咱們但願確保客戶不會錯過告警或接收到不應接收告警。咱們選用了 25,000 名 Rabobank 的員工做爲試點小組,對這個新機制進行了爲期兩個月的試用。這樣能夠更好的觀察系統在生產數據及高負載下的運行表現。另外,Rabobank 的員工比付費客戶更能容忍告警失敗(有時確實會失敗)的狀況。在試用期間,咱們優化了告警生成並消除了一些外圍應用的邊界錯誤。

經批准,新體系於 6 月 8 日上線爲數百萬 Rabobank 客戶提供服務。這對咱們來講是很是激動人心的時刻——不只由於它有效,並且由於咱們永遠不可能回頭。咱們經過延遲幾秒而不是幾分鐘或幾小時的告警,有效提高了客戶的指望值。若是因爲某種緣由致使某個組件服務失敗,客戶會當即注意到,由於告警會延遲。所以咱們密切關注這套體系,但到目前爲止,它一直運行良好且可預測。

下一步

新體系提供了實時告警,且易於擴展,知足了 Rabobank 對於速度和靈活性的要求。但這裏提到的四種告警類型並非所有。客戶還能夠配置其它約 10 種告警,例如「當我收到來自指定帳戶的付款時提醒我」和「當付款單沒法執行時提醒我」。下一步是將這些告警從大型機遷移到新體系,但這須要鏈接更多的支付系統,例如支付訂單執行引擎。咱們將在將來的幾個月爲此努力,且不會止於此,新的實現也激發了大量新的想法,咱們將很快公開討論(甚至展現)。

關於做者

Jeroen van Disseldorp 是位於荷蘭的 Confluent 合做夥伴公司 Axual 的創始人。Axual 爲企業應用提供基於 Apache Kafka 的實時數據解決方案。
Email:jeroen@axual.io
Twitter:@axual
LinkedIn:http://linkedin.com/in/dizzl

關於 Apache Kafka Streams API

若是你喜歡本文,可能會但願繼續使用如下資源瞭解有關 Apache Kafka Streams API 的更多信息:

譯者按

本文討論了一種架構思路的實現案例:數據的流式處理。這種思路比如將數據放到一條流水線中,通過一道道環節的加工處理,每一道環節均可能從數據中提取須要的信息,或向數據中寫入一些特定的內容。而這條流水線中的每一個環節均可能造成一條子流水線,將該環節關心的數據從原始數據中提取出來,放到子流水線中加工處理。當數據從流水線的終點出來後,將會是一個包含更豐富內容的數據,或者其中的有價值內容已經被各個環節提取完畢。

關於譯文

本譯文經原做者受權後,首發於 K棧。轉載請註明原做者以及原文譯文出處。

clipboard.png

相關文章
相關標籤/搜索