支持流式處理ACID事務!Flink團隊開源新做Streaming Ledger

流式處理的下一個演化步驟git

 

在 data Artisans,咱們目擊了數據流式處理的瘋狂式增加,從早期階段到快速增加的市場,預計到 2025 年將達到近 500 億美圓的規模。從 Apache Flink 建立之初,咱們就堅信流式處理是一項能夠爲企業的任務關鍵型應用程序提供強大支持的技術。github

隨着愈來愈多的行業採用流式處理,併爲愈來愈多的任務關鍵型應用程序提供支持,這項技術自己也在不斷髮展,以便爲數據和計算的正確性提供更好的保證。數據庫

 

 第 1 步:數據分析的分佈式流式處理(「至少一次保證」)架構

 

第一個分佈式流式處理器的主要目標是數據分析應用程序,提供了一種實時非精確和異步精確的分析方法。這種方式被稱爲「Lambda 架構」,流式處理器基於到達的數據提供不精確的分析結果,同時批處理器以小時或天爲單位提供精確的分析結果。這種流式處理保證被稱爲「至少一次處理」。這是流式處理系統可以提供的最弱的正確性保證,是流式處理技術的第一步。併發

 

 第 2 步:單鍵應用程序的分佈式流式處理(「剛好一次保證」)app

 

Apache Flink 率先採用了真正的有狀態流式處理,提供了大規模的「剛好一次保證」。這樣就能夠經過能過提供強正確性保證的流式處理技術來構建分析類型和事務類型的應用程序,減小對 Lambda 架構和批處理器的需求。框架

現在,市場上有不少流式處理器都提供了強大的一致性保證,但僅適用於特定類型的應用程序,這些應用程序每次只更新單個鍵。也就是說,若是一個應用程序每次只更新單個銀行帳戶餘額,那就能夠經過流式處理來實現,但若是應用程序要將資金從一個銀行帳戶轉移到另外一個銀行帳戶,就難以實現強一致性。機器學習

 

 第 3 步:通常應用程序的分佈式流式處理(「ACID 保證」)異步

 

隨着 Streaming Ledger 成爲 data Artisans 平臺的一部分,用戶如今能夠構建同時讀取或更新多條記錄和多張表的應用程序,並實現 ACID 事務支持。分佈式

Streaming Ledger 在提供這些保證的同時,還能保持流式處理(剛好一次保證)的伸縮能力,並且不會影響應用程序的速度、性能、可伸縮性或可用性。

咱們能夠將 Lambda 架構的至少一次保證視爲最終一致性的一種形式(由於批處理系統最終會遇上來)。Flink 提供的「剛好一次保證」相似於分佈式鍵值存儲系統爲單鍵操做提供的一致性保證,而 Streaming Ledger 提供的保證相似於關係型數據庫提供的 ACID 保證。

咱們相信這是流式處理的下一個演化步驟,它爲以正確、可伸縮和靈活的方式實現基於流式架構的應用程序打開了大門。

 

多鍵和多表事務簡介

 

大部分關係型數據庫管理系統會執行 ACID 事務,每一個事務經過 ACID 語義在串行化的隔離級別下修改數據表,以此來實現完整的數據一致性。根據 ACID 語義,全部事務都是 Atomic、Consistent、Isolated 和 Durable 的。ACID 語義在金融服務或電子商務等行業中扮演着重要的角色。

讓咱們舉一個經典的例子,假設有一個賬戶餘額表,基於這個表將一個賬戶的錢轉到另外一個賬戶上。爲了確保事務的正確性,轉帳操做必須同時修改兩個賬戶或都不作修改(原子性),並且只有在源賬戶有足夠資金時才能進行轉帳(一致性),而且不存在其餘可能致使錯誤結果的操做(隔離、無異常)。任何違反這些條件的操做都會致使資金丟失,最終致使帳戶餘額不正確。

上圖顯示瞭如何在有狀態流式處理器中實現這個示例。其中的賬戶 ID 就是鍵,並且兩個賬戶位於不一樣的分片中。這兩個分片對對方都沒有訪問權限或者對對方的狀態持有一致的視圖,這使得實現這一的框架變得至關複雜,由於須要經過下列方式在兩個分片之間傳遞狀態:

  1. 提供一致的狀態視圖

  2. 可以管理併發修改

  3. 確保修改的原子性

 

超越流式處理框架的剛好一次語義

 

data Artisans Streaming Ledger 基於 Apache Flink 構建,爲跨多個表和單表多行的多個數據流提供執行串行化事務的能力。它能夠被視爲等效於鍵值存儲系統(甚至是跨多個鍵值存儲系統)的多行事務。Streaming Ledger 使用 Flink 的狀態來存儲表,因此就不須要額外的存儲或系統配置。Streaming Ledger 應用程序的構建塊由表、事務事件流、事務函數和可選的結果流組成。

有關更多信息,請下載白皮書(https://data-artisans.com/download-the-data-artisans-streaming-ledger-whitepaper )。

data Artisans Streaming Ledger 爲使用流式處理構建新的應用程序類型打開了一個大門,這類應用程序在之前只能依賴關係型數據庫。如今,數據密集型實時應用程序(如欺詐檢測、機器學習和實時交易訂價)能夠絕不費力地遷移到流式處理平臺上。在下一節中,咱們將經過具體的示例演示使用 data Artisans Streaming Ledger 進行開發的必要步驟。

 

Streaming Ledger 用例演示

 

data Artisans Streaming Ledger 很是適合用於處理涉及多個狀態的用例,它支持對多個狀態進行事務性的修改,這些修改彼此隔離,並遵循串行化一致性原則。

咱們假設有一個實時應用程序,它負責在賬戶和分類賬條目之間識別匯款模式。

這個應用程序須要維護兩張 Flink 狀態表:第一張表叫做「Accounts」,第二張叫做「Asset Ledger」。應用程序消費事務事件流,例如賬戶之間、分類賬條目之間或兩者之間的轉帳。當有事件進入時,不一樣的事務類型會被應用在每一個事件類型上,而後訪問相關行,檢查前置條件,並決定是處理仍是拒絕轉帳操做。對於帳戶之間的轉帳,它會更新表中的各個行。對於帳戶和分類帳之間的轉帳,它會生成結果事件,表示轉帳是被接受仍是被拒絕。下圖顯示了架構細節:

Streaming Ledger 公開了一個易於使用的 API,對於有過流式處理使用經驗的用戶和熟悉關係型數據庫的用戶來講,這個 API 能夠輕鬆上手。在咱們的例子中,咱們使用瞭如下假設:

  • 兩個表:Accounts 和 Asset Ledger

  • 三個事件流:存款、轉帳和餘額查詢

  • 存款時將值寫入 Accounts 和 Asset Ledger 表中

  • 轉帳原子操做在 Accounts 和 Asset Ledger 之間轉移值。

接下來,咱們將逐步使用 Streaming Ledger API 來完成這個示例。Ledger 的 API 是開源的,還包含了一個串行(單節點)的實現(https://github.com/dataArtisans/da-streamingledger )。

如下是經過 Flink DataStream 來建立事件源的方法:

 

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<DepositEvent> deposits = env.addSource(…); 
DataStream<TransferEvent> transfers = env.addSource(…);

And this is how you can define the scope and tables of your programme:

// 定義事務範圍
TransactionalStreams txStreams =
TransactionalStreams.create(「simple example」);

// 定義事務表
TransactionalStreams.State<String, Long> accounts = txStreams.declareState(「accounts」)
        .withKeyType(String.class)
        .withValueType(Long.class);

TransactionalStreams.State<String, Long> asset ledger = 
txStreams.declareState(「AssetLedger」)
       .withKeyType(String.class)
       .withValueType(Long.class);

接下來,你能夠爲每一個流和表指定事務函數,以及選擇將要訪問到的數據行的鍵。

.apply(…) 函數自己包含事務的業務邏輯。

對於被訪問的每一個數據行,爲它們添加一個額外的調用:.on(table,key,name,type):

  • 'table'表示要訪問的表

  • 'key'是一個函數,能夠經過這個函數從輸入事件中獲取這一行數據的鍵

  • 'name'是該行數據的邏輯名稱(稍後可能會用到)

  • 'type’限定對該行的訪問屬性,如只讀、只寫或讀寫訪問。這是一種優化,其中 READ_WRITE 是最通用的選項。

 

// 定義存款事務
txStreams.usingStream(deposits, 「deposits」)
  .apply(new DepositsFunction())
  .on(accounts, DepositEvent::getAccountId, 「account」, READ_WRITE)
  .on(assetledger,DepositEvent::getassetledgerEntryId, 「asset」, READ_WRITE);

// 定義轉帳

// 將句柄保存在結果流中,以備後用
OutputTag<TransferResult> result = txStreams.usingStream(transfers, 「transfers」)
  .apply(new TransferFunction())
  .on(accounts, TransferEvent::getSourceAccountId, 「source-account」, READ_WRITE)
  .on(accounts, TransferEvent::getTargetAccountId, 「target-account」, READ_WRITE)
  .on(assetledger, TransferEvent::getSourceAssetledgerEntryId, 「source-asset」, READ_WRITE)
  .on(Assetledger, TransferEvent::getTargetAssetledgerEntryId, 「target-asset」, READ_WRITE)
  .output();

 

而後實現了包含業務邏輯的事務,決定是否以及如何更新數據行,以及生成哪些結果。

咱們傳給這些事務函數一個狀態訪問對象,訪問對象負責讀取或跟新每一行數據。爲了將狀態訪問與數據行和鍵關聯起來,對這些函數進行了與前一步相同的註解。

爲簡單起見,咱們只給出'TransferFunction'的實現。

 

 

public class TransferFunction extends 
TransactionProcessFunction<TransferEvent, TransferResult> { 
@ProcessTransaction 
public void process(
   TransferEvent txn,
   Context<TransferResult> ctx,
   @State(「source-account」) StateAccess<Long> sourceAccount,
   @State(「target-account」) StateAccess<Long> targetAccount,
   @State(「source-asset」) StateAccess<Long> sourceAsset,
   @State(「target-asset」) StateAccess<Long> targetAsset) {

// 訪問當前值的數據行
long sourceBalance = sourceAccount.read();
long sourceAssetValue = sourceAsset.read();
long targetBalance = targetAccount.read();
long targetAssetValue = targetAsset.read();

// 檢查前置條件: 正餘額和最小余額
if (sourceBalance > txn.getMinAccountBalance()
&& sourceBalance > txn.getAccountTransfer()
&& sourceAssetValue > txn.getAssetledgerEntryTransfer()) {

// 計算新的餘額
long newSourceBalance = sourceBalance - 
txn.getAccountTransfer();
long newTargetBalance = targetBalance + 
txn.getAccountTransfer();
long newSourceAssets = sourceAssetValue - 
txn.getAssetledgerEntryTransfer();
long newTargetAssets = targetAssetValue + 
txn.getAssetledgerEntryTransfer();

// 寫入更新過的值
sourceAccount.write(newSourceBalance);
targetAccount.write(newTargetBalance);
sourceAsset.write(newSourceAssets);
targetAsset.write(newTargetAssets);

// 觸發包含新餘額的正結果事件
ctx.emit(new TransferResult(txn, SUCCESS, 
newSourceBalance, newTargetBalance));
}
else {
// 觸發包含未更新餘額的負結果事件
ctx.emit(new TransferResult(txn, REJECT, 
sourceBalance, targetBalance));
   } 
 }
}

 

結    論

data Artisans Streaming Ledger 將以前依賴關係型數據庫的應用程序帶入到了流式處理時代,進一步擴展了流式處理技術的應用範圍!藉助 Streaming Ledger,咱們正在開啓流式處理的新篇章,咱們很高興如今愈來愈多的任務關鍵型應用程序能夠充分利用流式處理的實時、異步、靈活等優點。

開源項目地址

https://github.com/dataArtisans/da-streamingledger

相關文章
相關標籤/搜索