原文:Real-time Financial Alerts at Rabobank with Apache Kafka’s Streams API
原做者:Jeroen van Disseldorphtml
本文討論使用 Apache Kafka 的 Streams API 向 Rabobank 的客戶發送告警。Rabobank(荷蘭合做銀行)總部位於荷蘭,在全球擁有 900 多個分支機構,48,000 名員工和 681 億歐元的資產。Rabobank 是一家由客戶和銀行組成的合做銀行,一家對社會負責的銀行。其目標是成爲荷蘭金融市場的領導者。Rabobank 還致力於成爲全球食品和農業領域的領先銀行。Rabobank 向全球數百萬客戶提供金融產品和服務。java
在過去的幾年中,Rabobank 一直在爲成爲一家實時的,事件驅動的銀行而進行積極的投資。若是你熟悉銀行的業務流程,應該會明白這並不能一蹴而就。許多銀行業務流程都是在非商用硬件上以批處理做業的形式進行的,所以遷移工做很是艱鉅。但如前所述,Rabobank 接受了這一挑戰
,並定義了一個業務事件總線(BEB,Business Event Bus),用於應用程序之間共享整個組織架構的業務事件。
Rabobank 選擇 Apache Kafka 做爲底層的主要引擎,並編寫了本身的的 BEB 客戶端庫,以方便應用程序開發人員使用簡單的消息生產/消費以及災難恢復等功能。git
Rabobank 採用 Active-Active
的 Kafka 設置,Kafka 集羣在多個數據中心進行對稱鏡像。當數據中心出現故障或由操做人員干預後,BEB 客戶端(包括本文討論的基於 Kafka Streams 的應用程序)會切換到另外一個 Kafka 集羣,而無需從新啓動。實如今災難情景和計劃維護時段內的 24×7 不間斷運行。BEB 客戶端庫爲生產者、消費者以及流式應用提供了這種切換機制。github
Rabo Alerts 是一個由一系列生產、消費、流式消息等微服務組成的系統,基於 BEB 實現。下面討論的全部數據類型和代碼均可以在 GitHub 中找到。本文將在必定程度上簡化源碼清單(如刪除未使用的字段),但這些清單仍反映了生產中實際運行的代碼。apache
Rabo Alerts 服務可讓 Rabobank 的客戶接收其關注的財務事件告警。例如某筆款項從帳戶中扣除或者記入帳戶,以及其它更復雜的事件。客戶能夠根據本身的偏好配置告警,並經過第三方渠道發送:如電子郵件、短信和移動推送通知。值得一提的是,Rabo Alerts 並非一項新的或試用服務,它已經投產十多年,可供數百萬帳戶持有者使用。api
舊的 Rabo Alerts 實現主要是在大型機系統上。全部的處理步驟都是面向批處理的,大型機會根據告警類型派生告警,並每隔幾分鐘發送,但天天只發送幾回。這種實現很是穩定可靠,但 Rabobank 但願解決兩個問題:(1)靈活性不足;(2)告警發送速度慢。服務器
因爲對現有告警進行更改或添加新(更智能)的告警須要很大的工做量,所以舊的 Rabo Alerts 對適應新業務需求的靈活性很低。在過去幾年中,Rabobank 在其在線環境中引入新功能的步伐大幅增長,舊有僵化的告警解決方案變得愈來愈成問題。markdown
告警的傳遞速度也是一個問題,舊的 Rabo Alerts 可能須要 5 分鐘到 4-5 小時才能向用戶發送告警(取決於告警類型和批處理窗口)。若是在十年前,這個速度可能足夠快了,但現在客戶的指望值要高得多。如今 Rabobank 向客戶提供「相關信息」的時間窗口要比過去十年小得多。架構
所以,如何從新設計現有的機制,使其具備更強的擴展性及更快的速度,即是擺在眼前的問題。固然,從新設計的 Rabo Aerts 也須要穩定可靠,以便可以正確地爲現有數百萬的用戶羣提供服務。併發
在過去的一年裏,咱們一直使用 Kafka 及其 Streams API 從新設計和實現告警機制。因爲整個 Rabo Alerts 服務至關龐大,咱們決定從四個簡單但使用率高的告警開始:
這些告警的每個均可以從當前帳戶系統的支付信息流中派生出來。例如:「當個人餘額低於 100 歐元時向我發送短信」或「當有人給我超過 1000 歐元時向我推送消息」(一般用於存款通知)。
如下截圖說明如何經過手機銀行 app 配置 Rabo Alerts ——
咱們的第一步是從新設計告警過程,基本流程以下:
對每一個 AccountEntry 執行如下步驟:
b. 對每一個客戶執行如下步驟:
步驟 1 須要與執行交易的核心銀行系統創建連接。
步驟 2a 須要創建一個查詢表,其中包含全部帳戶的全部客戶權限。
步驟 2b 須要創建一個查詢表,其中包含全部客戶的 Rabo Alert 設置。
該流程的使用及其需求見下圖:
圖中全部白色框都是 Kafka 主題(Topic),其中列出了它們的 Avro 鍵/值數據類型。大部分數據類型都是不言自明的,但如下數據類型值得一提:
CustomerAlertSettings:特定客戶的告警設置,這些設置包括:
CustormerId
表示,由於註冊移動設備的實際列表是在消息發送過程當中肯定的。Outbound
topic 的 Key 承載。藍色框表示獨立的應用程序(或稱微服務),是使用 Spring Boot 實現的可執行 jar,並部署在託管平臺上。它們一塊兒組成了實現 Rabo Alerts 的全部必要功能:
compacted topic
(開啓了 Log Compaction 的 Kafka 主題)發佈每一個客戶的全部自定義告警設置。compacted topic
發佈帳號和受權客戶ID的關係。它是實時的,以便受權的變化在發送告警時能當即生效。compacted topic
完成,但因爲各類不一樣的緣由,此處是經過遠程服務調用的方式實現的。使用 Kafka Streams 編碼實現告警只須要 2 個類。
第一個類是 BalanceAlertsTopology
。這個類使用給定的 KStreamBuilder
定義主要的 Kafka Streams 拓撲(Topology)。它實現了 BEB 的 TopologyFactory
,是一個 BEB 客戶端庫使用的自定義接口,用於在應用程序啓動後或 Kafka 集羣切換(如數據中心切換/故障轉移)時生成新的 Kafka Streams Topology。
KStream<CustomerId, KeyValue<SpecificRecord, OutboundMessage>> addressedMessages = builder.<AccountId, AccountEntry>stream(accountEntryStream) .leftJoin(accountToCustomerIds, (accountEntry, customerIds) -> { if (isNull(customerIds)) { return Collections.<KeyValue<CustomerId, AccountEntry>>emptyList(); } else { return customerIds.getCustomerIds().stream() .map(customerId -> KeyValue.pair(customerId, accountEntry)) .collect(toList()); } }) .flatMap((accountId, accountentryByCustomer) -> accountentryByCustomer) .through(customerIdToAccountEntryStream) .leftJoin(alertSettings, Pair::with) .flatMapValues( (Pair<AccountEntry, CustomerAlertSettings> accountEntryAndSettings) -> BalanceAlertsGenerator.generateAlerts( accountEntryAndSettings.getValue0(), accountEntryAndSettings.getValue1()) ); // Send all Email messages from addressedMessages addressedMessages .filter((e, kv) -> kv.key instanceof EmailAddress) .map((k, v) -> v) .to(emailMessageStream); // Send all Sms messages from addressedMessages addressedMessages .filter((e, kv) -> kv.key instanceof PhoneNumber) .map((k, v) -> v) .to(smsMessageStream); // Send all Push messages from addressedMessages // (CustomerId is later resolved to a list of customer's mobile devices) addressedMessages .filter((e, kv) -> kv.key instanceof CustomerId) .map((k, v) -> v) .to(customerPushMessageStream);
該 Topology 定義瞭如下幾個步驟:
CustomerId
爲 Key,AccountEntry
爲 Value。該 topic 的意思是「這個客戶(Key)的這個 AccountEntry(Value)須要處理」。OutboundMessage
。OutboundMessage
,並將它們分配到各自的渠道 topic。告警消息是在 17 行調用輔助類 BalanceAlertsGenerator
生成的。其主要方法是 generateAlerts()
,該方法獲取一個 AccountEntry,並從具備該帳戶查看權限的客戶中獲取告警配置。如下是它的代碼:
public static List<KeyValue<SpecificRecord, OutboundMessage>> generateAlerts(AccountEntry accountEntry, CustomerAlertSettings settings) { /* 使用告警設置爲一個 AccountEntry 生成完成尋址的告警,步驟以下: * 1) 使用特定帳戶的告警設置,過濾掉不屬於該帳戶的 AccountEntry * 2) 匹配告警設置中的每一項設置,生成適當的消息 * 3) 爲生成的消息添加尋址信息 */ if (settings == null) { return new ArrayList<>(); } return settings.getAccountAlertSettings().stream() .filter(accountAlertSettings -> matchAccount(accountEntry, accountAlertSettings)) .flatMap(accountAlertSettings -> accountAlertSettings.getSettings().stream()) .flatMap(accountAlertSetting -> Stream.of( generateBalanceAbove(accountEntry, accountAlertSetting), generateBalanceBelow(accountEntry, accountAlertSetting), generateCreditedAbove(accountEntry, accountAlertSetting), generateDebitedAbove(accountEntry, accountAlertSetting)) ) .filter(Optional::isPresent).map(Optional::get) .flatMap(messageWithChannels -> mapAddresses(messageWithChannels.getValue0(), settings.getAddresses()) .map(address -> KeyValue.pair(address, messageWithChannels.getValue1()))) .collect(toList()); }
該方法執行如下步驟:
Pair<List, OutboundMessage>
流。KeyValue<address, OutboundMessage>
流。List
返回。這個類的其它輔助方法:
matchAccount()
:經過比較帳號和幣種,來匹配 AccountEntry 和帳戶告警設置。generateBalanceAbove/Below()
:生成 BalanceAbove/Below
告警消息(餘額高於/低於閾值)。generateDebited/CreditedAbove()
:生成 Debited/CreditedAbove
告警消息(超出閾值的借記/貸記)。mapAddresses()
:查找指定渠道列表的客戶告警接收地址。buildMessage()
:構建一個 OutboundMessage
。再加上一些其它額外的類來將這個功能包裝在一個獨立的應用程序中,這就是它的所有功能!
在第一次的初步實現後,咱們進行了測試運行。事實令咱們驚訝,而且指望值高漲。從支付訂單確認到移動設備收到告警的整個過程只須要一到兩秒鐘,且一秒的狀況居多。這個過程還包括了支付工廠所花費的時間(驗證支付訂單,交易處理),所以響應時間可能會依當時的支付工廠工做量而有所不一樣。整個告警鏈——從 AccountEntry 在 Kafka 上發佈,到將消息推送給客戶——一般在 120 毫秒內完成。在發送階段,推送(PUSH)告警是最快的,僅需 100-200 毫秒便可到達客戶的移動設備。電子郵件(EMAIL)和短信(SMS)稍慢,一般在發出消息後的 2-4 秒到達。相比之下,舊有的體系一般須要幾分鐘的時間才能提供告警。
下面的視頻演示了使用個人我的測試帳戶進行告警傳輸的速度。請注意,雖然是測試用的,但這也是一個正常運行的 Rabobank 支付帳戶!
【只是一段演示視頻,markdown 插入視頻比較麻煩,原文看吧,或看下面的文字解說】
首先我在個人設備上啓用了告警,並配置了閾值爲 0 的 DebitedAboveThreshold 告警(「More withdrawn than」)。這意味着超過 0 歐元的任何支付都會向我發送告警。我設置了 PUSH 和 SMS 兩種渠道告警(視頻中未演示),所以告警會經過兩個渠道發給我。保存設置並返回主屏幕後,我開始向個人同事 Joris Meijer 轉帳 1 歐元,並經過指紋驗證。以後付款訂單被髮送到支付工廠進行處理。在訂單確認關閉以前,推送(PUSH)告警已經在屏幕頂部彈出,如通知窗口所示。幾秒鐘後,相同的告警消息也以 SMS 的方式到達。
新機制簡潔而優雅,只須要少數 Java 類組成。這個邏輯大約四個星期寫完,但要使整個拓撲工做須要大約六個月的時間。這主要是由於 Alert Settings Manager
、Account Authorizations Manager
以及 Account Entry Bridge
須要和銀行的其它業務模塊達成一致。
在團隊內部的告警測試以後,須要更完全更大規模的測試。畢竟咱們但願確保客戶不會錯過告警或接收到不應接收告警。咱們選用了 25,000 名 Rabobank 的員工做爲試點小組,對這個新機制進行了爲期兩個月的試用。這樣能夠更好的觀察系統在生產數據及高負載下的運行表現。另外,Rabobank 的員工比付費客戶更能容忍告警失敗(有時確實會失敗)的狀況。在試用期間,咱們優化了告警生成並消除了一些外圍應用的邊界錯誤。
經批准,新體系於 6 月 8 日上線爲數百萬 Rabobank 客戶提供服務。這對咱們來講是很是激動人心的時刻——不只由於它有效,並且由於咱們永遠不可能回頭。咱們經過延遲幾秒而不是幾分鐘或幾小時的告警,有效提高了客戶的指望值。若是因爲某種緣由致使某個組件服務失敗,客戶會當即注意到,由於告警會延遲。所以咱們密切關注這套體系,但到目前爲止,它一直運行良好且可預測。
新體系提供了實時告警,且易於擴展,知足了 Rabobank 對於速度和靈活性的要求。但這裏提到的四種告警類型並非所有。客戶還能夠配置其它約 10 種告警,例如「當我收到來自指定帳戶的付款時提醒我」和「當付款單沒法執行時提醒我」。下一步是將這些告警從大型機遷移到新體系,但這須要鏈接更多的支付系統,例如支付訂單執行引擎。咱們將在將來的幾個月爲此努力,且不會止於此,新的實現也激發了大量新的想法,咱們將很快公開討論(甚至展現)。
Jeroen van Disseldorp 是位於荷蘭的 Confluent 合做夥伴公司 Axual 的創始人。Axual 爲企業應用提供基於 Apache Kafka 的實時數據解決方案。
Email:jeroen@axual.io
Twitter:@axual
LinkedIn:http://linkedin.com/in/dizzl
若是你喜歡本文,可能會但願繼續使用如下資源瞭解有關 Apache Kafka Streams API 的更多信息:
本文討論了一種架構思路的實現案例:數據的流式處理。這種思路比如將數據放到一條流水線中,通過一道道環節的加工處理,每一道環節均可能從數據中提取須要的信息,或向數據中寫入一些特定的內容。而這條流水線中的每一個環節均可能造成一條子流水線,將該環節關心的數據從原始數據中提取出來,放到子流水線中加工處理。當數據從流水線的終點出來後,將會是一個包含更豐富內容的數據,或者其中的有價值內容已經被各個環節提取完畢。
本譯文經原做者受權後,首發於 K棧。轉載請註明原做者以及原文和譯文出處。