做者 | Natan Silnitsky前端
在過去一年裏,我一直是數據流團隊的一員,負責 Wix 事件驅動的消息傳遞基礎設施(基於 Kafka)。有超過 1400 個微服務使用這個基礎設施。在此期間,我實現或目擊了事件驅動消息傳遞設計的幾個關鍵模式,這些模式有助於建立一個健壯的分佈式系統,該系統能夠輕鬆地處理不斷增加的流量和存儲需求。git
1消費與投影github
針對那些使用很是普遍、已經成爲瓶頸的服務數據庫
當有遺留服務存儲着大型域對象的數據,這些數據使用又很是普遍,使得該遺留服務成爲瓶頸時,此模式能夠提供幫助。後端
在 Wix,咱們的 MetaSite 服務就面臨着這樣的狀況,它爲 Wix 用戶建立的每一個站點保存了大量的元數據,好比站點版本、站點全部者以及站點上安裝了哪些應用程序——已安裝應用上下文(The Installed Apps Context.)。瀏覽器
這些信息對於 Wix 的許多其餘微服務(團隊)頗有價值,好比 Wix Stores、Wix booking、Wix Restaurants 等等。這個服務被超過 100 萬 RPM 的請求轟炸,它們須要獲取站點元數據的不一樣部分。緩存
從服務的各類 API 能夠明顯看出,它處理了客戶端服務的太多不一樣的關注點。服務器
MetaSite 服務處理大約 1M RPM 的各種請求網絡
咱們想要回答的問題是,如何以最終一致的方式將讀請求從該服務轉移出來?架構
使用 Kafka 建立「物化視圖」
負責這項服務的團隊決定另外建立一個服務,只處理 MetaSite 的一個關注點——來自客戶端服務的「已安裝應用上下文」請求。
首先,他們將全部數據庫的站點元數據對象以流的方式傳輸到 Kafka 主題中,包括新站點建立和站點更新。一致性能夠經過在 Kafka Consumer 中進行 DB 插入來實現,或者經過使用 CDC 產品(如 Debezium)來實現。
其次,他們建立了一個有本身數據庫的「只寫」服務(反向查找寫入器),該服務使用站點元數據對象,但只獲取已安裝應用上下文並寫入數據庫。即將站點元數據的某個「視圖」(已安裝的應用程序)投影到數據庫中。
已安裝應用上下文消費與投影
第三,他們建立了一個「只讀」服務,只接受與已安裝應用上下文相關的請求,經過查詢存儲着「已安裝應用程序」視圖的數據庫來知足請求。
讀寫分離
效果
經過將數據以流的方式傳輸到 Kafka,MetaSite 服務徹底同數據消費者解耦,這大大下降了服務和 DB 的負載。
經過消費來自 Kafka 的數據,併爲特定的上下文建立一個「物化視圖」,反向查找寫入器服務可以建立一個最終一致的數據投影,大幅優化了客戶端服務的查詢需求。
將讀服務與寫服務分開,能夠方便地擴展只讀 DB 副本和服務實例的數量,這些實例能夠處理來自全球多個數據中心的不斷增加的查詢負載。
2端到端事件驅動
針對簡單業務流程的狀態更新
請求 - 應答模型在瀏覽器 - 服務器交互中特別常見。藉助 Kafka 和 WebSocket,咱們就有了一個完整的事件流驅動,包括瀏覽器 - 服務器交互。
這使得交互過程容錯性更好,由於消息在 Kafka 中被持久化,而且能夠在服務重啓時從新處理。該架構還具備更高的可伸縮性和解耦性,由於狀態管理徹底從服務中移除,而且不須要對查詢進行數據聚合和維護。
考慮一下這種狀況,將全部 Wix 用戶的聯繫方式導入 Wix 平臺。
這個過程涉及到兩個服務:Contacts Jobs 服務處理導入請求並建立導入批處理做業,Contacts Importer 執行實際的格式化並存儲聯繫人(有時藉助第三方服務)。
傳統的請求 - 應答方法須要瀏覽器不斷輪詢導入狀態,前端服務須要將狀態更新狀況保存到數據庫表中,並輪詢下游服務以得到狀態更新。
而使用 Kafka 和 WebSocket 管理者服務,咱們能夠實現一個徹底分佈式的事件驅動過程,其中每一個服務都是徹底獨立工做的。
使用 Kafka 和 WebSocket 的 E2E 事件驅動
首先,瀏覽器會根據開始導入請求訂閱 WebSocket 服務。
它須要提供一個 channel-Id,以便 WebSocket 服務可以將通知路由回正確的瀏覽器:
打開 WebSocket 通知「通道」
第二,瀏覽器須要向 Jobs 服務發送一個 HTTP 請求,聯繫人信息使用 CSV 格式,並附加 channel-Id,這樣 Jobs 服務(和下游服務)就可以向 WebSocket 服務發送通知。注意,HTTP 響應將當即返回,沒有任何內容。
第三,Jobs 服務在處理完請求後,會生成並向 Kafka 主題發送做業請求。
HTTP Import 請求和生成的 Import Job 消息
第四,Contacts Importer** 服務消費來自 Kafka 的做業請求,並執行實際的導入任務。當它完成時,它能夠通知 WebSocket 服務做業已經完成,而 WebSocket 服務又通知瀏覽器。
工做已消費、已處理和已完成狀態通知
效果
使用這種設計,在導入過程的各個階段通知瀏覽器變得很簡單,並且不須要保持任何狀態,也不須要任何輪詢。
Kafka 的使用使得導入過程更具彈性和可擴展性,由於多個服務能夠處理來自同一個原始導入 http 請求的做業。
使用 Kafka 複製,很容易將每一個階段放在最合適的數據中心和地理位置。也許導入器服務須要在谷歌 DC 上,以即可以更快地導入谷歌聯繫人。
WebSocket 服務的傳入通知請求也能夠生成到 Kafka,而後複製到 WebSocket 服務所在的數據中心。
3內存 KV 存儲
針對 0 延遲數據訪問
有時,咱們須要動態對應用程序進行持久化配置,但咱們不想爲它建立一個全面的關係數據庫表。
一個選擇是用 HBase/Cassandra/DynamoDB 爲全部應用建立一個大的寬列存儲表,其主鍵包含標識應用域的前綴(例如「store_taxes_」)。
這個解決方案效果很好,可是經過網絡取值存在沒法避免的延遲。它更適合於更大的數據集,而不只僅是配置數據。
另外一種方法是有一個位於內存但一樣具備持久性的鍵 / 值緩存——Redis AOF 提供了這種能力。
Kafka 以壓縮主題的形式爲鍵 / 值存儲提供了相似的解決方案(保留模型確保鍵的最新值不會被刪除)。
在 Wix,咱們將這些壓縮主題用做內存中的 kv-store,咱們在應用程序啓動時加載(消費)來自主題的數據。這有一個 Redis 沒有提供的好處,這個主題還能夠被其餘想要得到更新的用戶使用。
訂閱和查詢
考慮如下用例——兩個微服務使用壓縮主題來作數據維護:Wix Business Manager(幫助 Wix 網站全部者管理他們的業務)使用一個壓縮主題存放支持的國家列表,Wix Bookings(容許安排預定和課程)維護了一個「(Time Zones)」壓縮主題。從這些內存 KV 存儲中檢索值的延遲爲 0。
各內存 KV 存儲以及相應的 Kafka 壓縮主題
Wix Bookings 監聽「國家(Countries)」主題的更新:
Bookings 消費來自壓縮主題 Countries 的更新
當 Wix Business Manager 將另外一個國家添加到「國家」主題時,Wix Bookings 會消費此更新,並自動爲「時區」主題添加一個新的時區。如今,內存 KV 存儲中的「時區」也經過更新增長了新的時區:
South Sudan 的時區被加入壓縮主題
咱們沒有在這裏停下來。Wix Events(供 Wix Users 管理事件傳票和 RSVP)也可使用 Bookings 的時區主題,並在一個國家由於夏令時更改時區時自動更新其內存 kv-store。
兩個內存 KV 存儲消費同一個壓縮主題
4調度並遺忘
當存在須要確保計劃事件最終被處理的需求時
在許多狀況下,須要 Wix 微服務根據某個計劃執行做業。
Wix Payments Subscriptions 服務就是一個例子,它管理基於訂閱的支付(例如瑜伽課程的訂閱)。
對於每月度或年度訂閱用戶,必須經過支付提供程序完成續訂過程。
爲此,Wix 自定義的 Job Scheduler 服務調用由 Payments Subscription 服務預先配置好的 REST 端點。
訂閱續期過程在後臺進行,不須要(人類)用戶參與。這就是爲何最終能夠成功續訂很重要,即便臨時有錯誤——例如第三支付提供程序不可用。
要確保這一過程是徹底彈性的,一種方法是由做業調度器重複請求 Payment Subscriptions 服務(續訂的當前狀態保存在數據庫中),對每一個到期但還沒有續期的訂閱進行輪詢。這將須要數據庫上的悲觀 / 樂觀鎖定,由於同一用戶同一時間可能有多個訂閱續期請求(來自兩個單獨的正在進行的請求)。
更好的方法是首先生成 Kafka 請求。爲何?由於請求的處理將由 Kafka 的消費者順序完成(對於每一個特定的用戶),因此不須要並行工做的同步機制。
此外,一旦消息生成併發送到 Kafka,咱們就能夠經過引入消費者重試來確保它最終會被成功處理。因爲有這些重試,請求調度的頻率可能就會低不少。
在這種狀況下,咱們但願能夠保持處理順序,這樣重試邏輯能夠在兩次嘗試之間(以「指數退避」間隔進行)簡單地休眠。
Wix 開發人員使用咱們自定義的 Greyhound 消費者,所以,他們只需指定一個 BlockingPolicy,並根據須要指定適當的重試間隔。
在某些狀況下,消費者和生產者之間可能會產生延遲,如長時間持續出錯。在這些狀況下,有一個特殊的儀表板用於解除阻塞,並跳過開發人員可使用的消息。
若是消息處理順序不是強制性的,那麼 Greyhound 中還有一個使用「重試主題」的非阻塞重試策略。
當配置重試策略時,Greyhound 消費者將建立與用戶定義的重試間隔同樣多的重試主題。內置的重試生成器將在出錯時生成一條下一個重試主題的消息,該消息帶有一個自定義頭,指定在下一次調用處理程序代碼以前應該延遲多少時間。
還有一個死信隊列,用於重試次數耗盡的狀況。在這種狀況下,消息被放在死信隊列中,由開發人員手動審查。
這種重試機制是受 Uber 這篇文章的啓發。
https://eng.uber.com/reliable-reprocessing/
Wix 最近開放了 Greyhound 的源代碼,不久將提供給測試用戶。要了解更多信息,能夠閱讀 GitHub 上的自述文件。
https://github.com/wix/greyhound#greyhound
總結:
Kafka 容許按順序處理每一個鍵的請求(例如使用 userId 進行續訂),簡化工做進程邏輯;
因爲 Kafka 重試策略的實現大大提升了容錯能力,續期請求的做業調度頻率大大下降。
5事務中的事件
當冪等性很難實現時
考慮下面這個典型的電子商務流程。
Payments 服務生成一個 Order Purchase Completed 事件到 Kafka。如今,Checkout 服務將消費此消息,並生成本身的 Order Checkout Completed 消息,其中包含購物車中的全部商品。
而後,全部下游服務(Delivery、Inventory 和 Invoices)將消費該消息並繼續處理(分別準備發貨、更新庫存和建立發票)。
若是下游服務能夠假設 Order Checkout Completed 事件只由 Checkout 服務生成一次,則此事件驅動流的實現會簡單不少。
爲何?由於屢次處理相同的 Checkout Completed 事件可能致使屢次發貨或庫存錯誤。爲了防止下游服務出現這種狀況,它們將須要存儲去重後的狀態,例如,輪詢一些存儲以確保它們之前沒有處理過這個 Order Id。
一般,這是經過常見的數據庫一致性策略實現的,如悲觀鎖定和樂觀鎖定。
幸運的是,Kafka 爲這種流水線事件流提供了一個解決方案,每一個事件只處理一次,即便當一個服務有一個消費者 - 生產者對(例如 Checkout),它消費一條消息,併產生一條新消息。
簡而言之,當 Checkout 服務處理傳入的 Payment Completed 事件時,它須要將 Checkout Completed 事件的發送過程封裝在一個生產者事務中,它還須要發送消息偏移量(使 Kafka 代理可以跟蹤重複的消息)。
事務期間生成的任何消息將僅在事務完成後纔對下游消費者(Inventory Service)可見。
此外,位於 Kafka 流開始位置的 Payment Service Producer 必須轉變爲冪等(Idempotent)生產者——這意味着代理將丟棄它生成的任何重複消息。
要了解更多信息,請觀看個人視頻「Kafka 中的剛好一次語義」。
https://www.youtube.com/watch?v=7O_UC_i1XY0
6事件聚合
當你想知道整個批次的事件已經被消費時
在上半部分,我描述了在 Wix 將聯繫人導入到 Wix CRM 平臺的業務流程。後端包括兩個服務。一個是做業服務,咱們提供一個 CSV 文件,它會生成做業事件到 Kafka。還有一個聯繫人導入服務,它會消費並執行導入做業。
假設 CSV 文件有時很是大,將工做負載分割成更小的做業,每一個做業中須要導入的聯繫人就會更少,這個過程就會更高效。經過這種方式,這項工做能夠在 Contacts Importer 服務的多個實例中並行。可是,當導入工做被拆分爲許多較小的做業時,該如何知道什麼時候通知最終用戶全部的聯繫人都已導入?
顯然,已完成做業的當前狀態須要持久化,不然,內存中哪些做業已完成的記錄可能會由於隨機的 Kubernetes pod 重啓而丟失。
一種在 Kafka 中進行持久化的方法是使用 Kafka 壓縮主題。這類主題能夠當作是一種流式 KV 存儲。
在咱們的示例中,Contacts Importer 服務(在多個實例中)經過索引消費做業。每當它處理完一些做業,就須要用一個 Job Completed 事件更新 KV 存儲。這些更新能夠同時發生,所以,可能會出現競態條件並致使做業完成計數器失效。
原子 KV 存儲
爲了不競態條件,Contacts Importer 服務將完成事件寫到原子 KV 存儲類型的 Jobs-Completed-Store 中。
原子存儲確保全部做業完成事件將按順序處理。它經過建立一個「Commands」主題和一個「Store」壓縮主題來實現。
順序處理
從下圖能夠看出,原子存儲如何生成每一條新的 Import-job-completed「更新」消息,並以 [Import Request Id]+[total job count] 做爲鍵。藉助鍵,咱們就能夠老是依賴 Kafka 將特定 requestId 的「更新」放在特定的分區中。
接下來,做爲原子存儲的一部分,消費者 - 生產者對將首先偵聽每一個新的更新,而後執行 atomicStore 用戶請求的「命令」——在本例中,將已完成做業數量的值加 1。
端到端更新流示例
讓咱們回到 Contacts Importer 服務流。一旦這個服務實例完成了某些做業的處理,它將更新 Job-Completed KVAtomicStore(例如,請求 Id 爲 YYY 的導入做業 3 已經完成):
Atomic Store 將生成一條新消息到 job-completed-commands 主題,鍵爲 YYY-6,值爲 Job 3 Completed。
接下來,Atomic Store 的消費者 - 生產者對將消費此消息,並增長 KV Store 主題中鍵 YYY-6 的已完成做業計數。
剛好一次處理
注意,「命令」請求處理必須只發生一次,不然完成計數器可能不正確(錯誤增量)。爲消費者 - 生產者對建立一個 Kafka 事務(如上文的模式 4 所述)對於確保統計準確相當重要。
AtomicKVStore 值更新回調
最後,一旦 KV 最新生成的已完成做業計數的值與總數匹配(例如 YYY 導入請求有 6 個已完成做業),就能夠通知用戶(經過 WebSocket,參見本系列文章第一部分的模式 3)導入完成。通知能夠做爲 KV-store 主題生成動做的反作用,即調用用戶提供給 KV 原子存儲的回調。
注意事項:
完成通知邏輯不必定要在 Contacts Importer 服務中,它能夠在任何微服務中,由於這個邏輯徹底獨立於這個過程的其餘部分,只依賴於 Kafka 主題。
不須要進行按期輪詢。整個過程都是事件驅動的,即以管道方式處理事件。
經過使用基於鍵的排序和剛好一次的 Kafka 事務,避免做業完成通知或重複更新之間的競態條件。
Kafka Streams API 很是適合這樣的聚合需求,其特性包括 groupBy(按 Import Request Id 分組), reduce 或 count(已完成做業計數)和 filter (count 等於總做業數),而後是反作用 Webhook 通知。對於 Wix 來講,使用現有的生產者 / 消費者基礎設施更有意義,這對咱們的微服務拓撲影響更小。
7總結
這裏的一些模式比其餘的模式更爲常見,但它們都有相同的原則。經過使用事件驅動的模式,能夠減小樣板代碼(以及輪詢和鎖定原語),增長彈性(減小級聯失敗,處理更多的錯誤和邊緣狀況)。此外,微服務之間的耦合要小得多(生產者不須要知道誰消費了它的數據),擴展也更容易,向主題添加更多分區(和更多服務實例)便可。