做者:陳肅
整理:周奇,Apache Flink 社區志願者數據庫
本文根據陳肅老師在 Apache Kafka x Flink Meetup 深圳站的分享整理而成,文章首先將從數據融合角度,談一下 DataPipeline 對批流一體架構的見解,以及如何設計和使用一個基礎框架。其次,數據的一致性是進行數據融合時最基礎的問題。若是數據沒法實現一致,即便同步再快,支持的功能再豐富,都沒有意義。安全
另外,DataPipeline 目前使用的基礎框架爲 Kafka Connect。爲實現一致性的語義保證,咱們作了一些額外工做,但願對你們有必定的參考意義。性能優化
最後,會提一些咱們在應用 Kafka Connect 框架時,遇到的一些現實的工程問題,以及應對方法。儘管你們的場景、環境和數據量級不一樣,但也有可能會遇到這些問題。但願對你們的工做有所幫助。服務器
下圖來自 Flink 官網。傳統的數據融合一般基於批模式。在批的模式下,咱們會經過一些週期性運行的 ETL JOB,將數據從關係型數據庫、文件存儲向下遊的目標數據庫進行同步,中間可能有各類類型的轉換。網絡
另外一種是 Data Pipeline 模式。與批模式相比相比, 其最核心的區別是將批量變爲實時:輸入的數據再也不是週期性的去獲取,而是源源不斷的來自於數據庫的日誌、消息隊列的消息。進而經過一個實時計算引擎,進行各類聚合運算,產生輸出結果,而且寫入下游。架構
現代的一些處理框架,包括 Flink、Kafka Streams、Spark,或多或少都可以支持批和流兩種概念。只不過像 Kafka,其原生就是爲流而生,因此若是基於 Kafka Connect 作批流一體,你可能須要對批量的數據處理作一些額外工做,這是我今天重點要介紹的。app
若是問題簡化到你只有一張表,多是一張 MySQL 的表,裏面只有幾百萬行數據,你可能想將其同步到一張 Hive 表中。基於這種狀況,大部分問題都不會遇到。由於結構是肯定的,數據量很小,且沒有所謂的並行化問題。框架
但在一個實際的企業場景下,若是作一個數據融合系統,就不可避免要面臨幾方面的挑戰:機器學習
第一,「動態性」異步
數據源會不斷地發生變化,主要歸因於:表結構的變化,表的增減。針對這些狀況,你須要有一些相應的策略進行處理。
第二,「可伸縮性」
任何一個分佈式系統,必需要提供可伸縮性。由於你不是隻同步一張表,一般會有大量數據同步任務在進行着。如何在一個集羣或多個集羣中進行統一的調度,保證任務並行執行的效率,這是一個要解決的基本問題。
第三,「容錯性」
在任何環境裏你都不能假定服務器是永遠在正常運行的,網絡、磁盤、內存都有可能發生故障。這種狀況下一個 Job 可能會失敗,以後如何進行恢復?狀態可否延續?是否會產生數據的丟失和重複?這都是要考慮的問題。
第四,「異構性」
當咱們作一個數據融合項目時,因爲源和目的地是不同的,好比,源是 MySQL,目的地是 Oracle,可能它們對於一個字段類型定義的標準是有差異的。在同步時,若是忽略這些差別,就會形成一系列的問題。
第五,「一致性」
一致性是數據融合中最基本的問題,即便不考慮數據同步的速度,也要保證數據一致。數據一致性的底線爲:數據先不丟,若是丟了一部分,一般會致使業務沒法使用;在此基礎上更好的狀況是:源和目的地的數據要徹底一致,即所謂的端到端一致性,如何作到呢?
目前在作這樣的平臺時,業界比較公認的有兩種架構:一種是 Lambda 架構,Lambda 架構的核心是按需使用批量和流式的處理框架,分別針對批式和流式數據提供相應的處理邏輯。最終經過一個服務層進行對外服務的輸出。
爲何咱們認爲 Lambda 架構是批流一體化的必然要求?這好像看起來是矛盾的(與之相對,還有一種架構叫 Kappa 架構,即用一個流式處理引擎解決全部問題)。
實際上,這在很大程度來自於現實中用戶的需求。DataPipeline 在剛剛成立時只有一種模式,只支持實時流同步,在咱們看來這是將來的一種趨勢。
但後來發現,不少客戶實際上有批量同步的需求。好比,銀行在天天晚上可能會有一些月結、日結,證券公司也有相似的結算服務。基於一些歷史緣由,或出於對性能、數據庫配置的考慮,可能有的數據庫自己不能開 change log。因此實際上並非全部狀況下都能從源端獲取實時的流數據。
考慮到上述問題,咱們認爲一個產品在支撐數據融合過程當中,必須能同時支撐批量和流式兩種處理模式,且在產品裏面出於性能和穩定性考慮提供不一樣的處理策略,這纔是一個相對來講比較合理的基礎架構。
具體到作這件事,還能夠有兩種基礎的應用模式。假如我須要將數據從 MySQL 同步到 Hive,能夠直接創建一個 ETL 的 JOB(例如基於 Flink),其中封裝全部的處理邏輯,包括從源端讀取數據,而後進行變換寫入目的地。在將代碼編譯好之後,就能夠放到 Flink 集羣上運行,獲得想要的結果。這個集羣環境能夠提供所須要的基礎能力,剛纔提到的包括分佈式,容錯等。
另外一種模式是 ETL JOB 自己輸入輸出實際上都是面對消息隊列的,實際上這是如今最常使用的一種模式。在這種模式下,須要經過一些獨立的數據源和目的地鏈接器,來完成數據到消息隊列的輸入和輸出。ETL JOB 能夠用多種框架實現,包括 Flink、Kafka Streams 等,ETL JOB 只和消息隊列發生數據交換。
DataPipeline 選擇 MQ 模式,主要有幾點考慮:
第一,在咱們產品應用中有一個很是常見的場景:要作數據的一對多分發。數據要進行一次讀取,而後分發到各類不一樣的目的地,這是一個很是適合消息隊列使用的分發模型。
第二,有時會對一次讀取的數據加不一樣的處理邏輯,咱們但願這種處理不要從新對源端產生一次讀取。因此在多數狀況下,都需將數據先讀到消息隊列,而後再配置相應的處理邏輯。
第三,Kafka Connect 就是基於 MQ 模式的,它有大量的開源鏈接器。基於 Kafka Connect 框架,咱們能夠重用這些鏈接器,節省研發的投入。
第四,當你把數據抽取跟寫入目的地,從處理邏輯中獨立出來以後,即可以提供更強大的集成能力。由於你能夠在消息隊列上集成更多的處理邏輯,而無需考慮從新寫整個 Job。
相應而言,若是你選擇將 MQ 做爲全部 JOB 的傳輸通道,就必需要克服幾個缺點:
第一,全部數據的吞吐都通過 MQ,因此 MQ 會成爲一個吞吐瓶頸。
第二,由於是一個徹底的流式架構,因此針對批量同步,你須要引入一些邊界消息來實現一些批量控制。
第三,Kafka 是一個有持久化能力的消息隊列,這意味着數據留存是有極限的。好比,你將源端的讀到 Kafka Topic 裏面,Topic 不會無限的大,有可能會形成數據容量超限,致使一些數據丟失。
第四,當批量同步在中間由於某種緣由被打斷,沒法作續傳時,你須要進行重傳。在重傳過程當中,首先要將數據進行清理,若是基於消息隊列模式,清理過程就會帶來額外的工做。你會面臨兩個困境:要麼清空原有的消息隊列,要麼你創造新的消息隊列。這確定不如像直接使用一些批量同步框架那樣來的直接。
先簡單介紹一下用戶對於數據同步方面的一些基本要求:
第一種需求,批量同步須要以一種事務性的方式完成同步
不管是同步一整塊的歷史數據,仍是同步某一天的增量,該部分數據到目的地,必須是以事務性的方式出現的。而不是在同步一半時,數據就已經在目的地出現了,這可能會影響下游的一些計算邏輯。
第二種需求,流式數據儘量快的完成同步
你們都但願越快越好,但相應的,同步的越快,吞吐量有可能由於你的參數設置出現相應的降低,這可能須要有一個權衡。
第三種需求,批量和流式可能共存於一個 JOB
做爲一個數據融合產品,當用戶在使用DataPipeline時,一般須要將存量數據同步完,後面緊接着去接增量。而後存量與增量之間須要進行一個無縫切換,中間的數據不要丟、也不要多。
**第四種需求,按需靈活選擇一致性語義保證
**
DataPipeline 做爲一個產品,在客戶的環境中,咱們沒法對客戶數據自己的特性提出強制要求。咱們不能要求客戶數據必定要有主鍵或者有惟一性的索引。因此在不一樣場景下,對於一致性語義保證,用戶的要求也不同的:
好比在有主鍵的場景下,通常咱們作到至少有一次就夠了,由於在下游若是對方也是一個相似於關係型數據庫這樣的目的地,其自己就有去重能力,不須要在過程當中間作一個強一致的保證。可是,若是其自己沒有主鍵,或者其下游是一個文件系統,若是不在過程當中間作額外的一致性保證,就有可能在目的地產生多餘的數據,這部分數據對於下游可能會形成很是嚴重的影響。
若是要解決端到端的數據一致性,咱們要處理好幾個基本環節:
**第一,在源端作一個一致性抽取
**
一致性抽取是什麼含義?即當數據從經過數據鏈接器寫入到 MQ 時,和與其對應的 offset 必須是以事務方式進入 MQ 的。
第二,一致性處理
若是你們用過 Flink,Flink 提供了一個端到端一致性處理的能力,它是內部經過 checkpoint 機制,並結合 Sink 端的二階段提交協議,實現從數據讀取處理到寫入的一個端到端事務一致性。其它框架,例如 Spark Streaming 和 Kafka Streams 也有各自的機制來實現一致性處理。
第三,一致性寫入
在 MQ 模式下,一致性寫入,即 consumer offset 跟實際的數據寫入目的時,必須是同時持久化的,要麼全都成功,要麼所有失敗。
第四,一致性銜接
在 DataPipeline 的產品應用中,歷史數據與實時數據的傳輸有時須要在一個任務共同完成。因此產品自己須要有這種一致性銜接的能力,即歷史數據和流式數據,必須可以在一個任務中,由程序自動完成它們之間的切換。
Kafka Connect 如何保證數據同步的一致性?就目前版本,Kafka Connect 只能支持端到端的 at least once,核心緣由在於,在 Kafka Connect 裏面,其 offset 的持久化與數據發送自己是異步完成的。這在很大程度上是爲了提升其吞吐量考慮,但相應產生的問題是,若是使用 Kafka Connect,框架自己只能爲你提供 at least once 的語義保證。
在該模式下,若是沒有經過主鍵或下游應用進行額外地去重,同步過程中的數據會在極端狀況下出現重複,好比源端發送出一批數據已經成功,但 offset 持久化失敗了,這樣在任務恢復以後,以前已經發送成功的數據會再次從新發送一批,而下游對這種現象徹底是不知情的。目的端也是如此,由於 consumer 的 offset 也是異步持久化,就會到致使有可能數據已經持久化到 Sink,但實際上 consumer offset 尚未推動。這是咱們在應用原生的 Kafka Connect 框架裏遇到最大的兩個問題。
DataPipeline 如何解決上述問題?首先,須要用協議的方式保證每一步都作成事務。一旦作成事務,因爲每一個環節都是解耦的,其最終數據就能夠保證一致性。下圖爲二階段提交協議的最基礎版本,接下來爲你們簡單介紹一下。
首先,在二階段提交協議中,對於分佈式事務的參與方,在 DataPipeline 的場景下爲數據寫入與 offset 寫入,這是兩個獨立組件。二者之間的寫入操做由 Coordinator 進行協調。第一步是一個 prepare 階段,每個參與方會將數據寫入到本身的目的地,具體持久化的位置取決於具體應用的實現。
第二步,當 prepare 階段完成以後,Coordinator 會向全部參與者發出 commit 指令,全部參與者在完成 commit 以後,會發出一個 ack,Coordinator 收到 ack 以後,事務就完成了。若是出現失敗,再進行相應的回滾操做。其實在分佈式數據庫的設計領域中,單純應用一個二階段提交協議會出現很是多的問題,例如 Coordinator 自己若是不是高可用的,在過程中就有可能出現事務不一致的問題。
因此應用二階段提交協議,最核心的問題是如何保證 Coordinator 高可用。所幸在你們耳熟能詳的各類框架裏,包括 Kafka 和 Flink,都可以經過分佈式一致協議實現 Coordinator 高可用,這也是爲何咱們可以使用二階段提交來保證事務性。
關於 Kafka 事務消息原理,網上有不少資料,在此簡單說一下可以達到的效果。Kafka 經過二階段提交協議,最終實現了兩個最核心的功能。
第一,一致性抽取
上文提到數據要被髮送進 Kafka,同時 offset 要被持久化到 Kafka,這是對兩個不一樣 Topic 的寫入。經過利用 Kafka 事務性消息,咱們可以保證 offset 的寫入和數據的發送是一個事務。若是 offset 沒有持久化成功,下游是看不到這批數據的,這批數據實際上最終會被丟棄掉。
因此對於源端的發送,咱們對 Kafka Connect 的 Source Worker 作了一些改造,讓其可以提供兩種模式,若是用戶的數據自己是具有主鍵去重能力的,就能夠繼續使用 Kafka Connect 原生的模式。
若是用戶須要強一致時,首先要開啓一個源端的事務發送功能,這就實現了源端的一致性抽取。其能夠保證數據進 Kafka 一端不會出現數據重複。這裏有一個限制,即一旦要開啓一致性抽取,根據 Kafka 必需要將 ack 設置成 all,這意味着一批數據有多少個副本,其必須可以在全部的副本所在的 broker 都已經應答的狀況下,才能夠開始下一批數據的寫入。儘管會形成一些性能上的損失,但爲了實現強一致,你必需要接受這一事實。
**第二,一致性處理
**
事務性消息最先就是爲 Kafka Streams 設計和準備的。能夠寫一段 Kafka Streams 應用,從 Kafka 裏讀取數據,而後完成轉化邏輯,進而將結果再輸出回 Kafka。Sink 端再從 Kafka 中消費數據,寫入目的地。
以前簡要談了一下二階段提交協議的原理,DataPipeline 實現的方式不算很深奧,基本是業界的一種統一方式。其中最核心的點是,咱們將 consumer offset 管理從 Kafka Connect 框架中獨立出來,實現事務一致性提交。另外,在 Sink 端封裝了一個相似於 Flink 的 TwoPhaseCommitSinkFunction 方式,其定義了 Sink 若要實現一個二階段提交所必需要實現的一些功能。
DataPipeline 將 Sink Connector 分爲兩類,一類是 Connector 自己具有了事務能力,好比絕大部分的關係型數據庫,只需將 offset 跟數據同時持久化到目的地便可。額外的可能須要有一張 offset 表來記錄提交的 offset。還有一類 Sink 不具有事務性能力,相似像 FTP、OSS 這些對象存儲,咱們須要去實現一個二階段提交協議,最終才能保證 Sink 端的數據可以達到一致性寫入。
關於批量數據與實時數據如何銜接的問題,主要有兩個關鍵點:
第一,當開始進行一個批量數據同步時,以關係型數據庫爲例,你應該拿到當時一個總體數據的 Snapshot,並在一個事務中同時記錄當時對應的日誌起始值。以 MySQL 爲例,當要獲取一個 Binlog 起始偏移量時,須要開啓一個 START TRANSACTION WITH CONSISTENT SNAPSHOT,這樣才能保證完成全量以後,後期的讀取增量日誌同步不會產生重複數據。
第二,若是採用增量同步模式,則必須根據實際的數據業務領域,採用一種比較靈活的增量表達式,才能避免讀到寫到一半的數據。好比在你的數據中,其 ID 是一個徹底自增,沒有任何重複的可能,此時只需每次單純的大於上一次同步的最後一條記錄便可。
但若是是一個時間戳,不管精度多高,都有可能在數據庫產生相同的時間戳,因此安全的作法是每次迭代時,取比當前時間稍微少一點,保證留出一個安全時間,好比五秒甚至一分鐘,這樣你永遠不會讀到一些時間戳可能會產生衝突的這部分數據,避免遺漏數據。這是一個小技巧,但若是沒有注意,在使用過程當中就會產生各類各樣的問題。
還有一點是上面說起的,如何可以在一個流式框架實現批量同步的一致性,對於全部的流式框架,須要引入一些邊界條件來標誌着一次批量同步的開始和結束。DataPipeline 在每次批量發送開始和結束後,會引入一些控制量信號,而後在 Sink端進行相應處理。一樣爲了保證事務一致性,在 Sink 端處理這種批量同步時,依然要作一些相似於二階段提交這樣的方式,避免在一些極端狀況下出現數據不一致的問題。
上文介紹的是 DataPipeline 如何基於 Kafka Connect 作事務同步一致性的方案。
DataPipeline 在使用 Kafka Connect 過程當中遇到過一些問題,目前大部分已經有一些解決方案,還有少許問題,可能須要將來採用新的方法/框架纔可以更好的解決。
第一,反壓的問題
Kafka Connect 設計的邏輯是但願實現源端和目的端徹底解耦,這種解偶自己是一個很好的特性。但也帶來一些問題,源和目的地的 task 徹底不知道彼此的存在。剛纔我提到 Kafka 有容量限制,不能假定在一個客戶環境裏面,會給你無限的磁盤來作緩衝。一般咱們在客戶那邊默認 Topic 爲 100G 的容量。若是源端讀的過快,大量數據會在 Kafka 裏堆積,目的端沒有及時消費,就有可能出現數據丟失,這是一個很是容易出現的問題。
怎麼解決?DataPipeline 做爲一個產品,在 Kafka Connect 之上,作了控制層,控制層中有像 Manager 這樣的邏輯組件,會監控每個 Topic 消費的 lag,當達到必定閾值時,會對源端進行限速,保證源和目的地儘量匹配。
第二,資源隔離
Connect Worker 集羣沒法對 task 進行資源預留,多個 task 並行運行會相互影響。Worker 的 rest 接口是隊列式的,單個集羣任務過多會致使啓停緩慢。
咱們正在考慮利用外部的資源調度框架,例如 K8s 進行 worker 節點管理;以及經過路由規則將不一樣優先級任務運行在不一樣的 worker 集羣上,實現預分配和共享資源池的靈活配置。
第三,Rebalance
在 2.3 版本之前,Kafka Connect 的 task rebalance 採用 stop-the-world 模式,牽一髮動全身。在 2.3 版本以後,已經作了很是大優化,改成了具備粘性的 rebalance。因此若是使用 Kafka Connect,強烈推薦必定要升級到 2.3 以上的版本,也就是目前的最新版本。
基於 MQ 模式的架構,針對大批量數據的同步,實際上仍是容易出現性能瓶頸。主要瓶頸是在 MQ 的集羣,咱們並不能在客戶環境裏無限優化 Kafka 集羣的性能,由於客戶提供的硬件資源有限。因此一旦客戶給定了硬件資源,Kafka 吞吐的上限就變爲一個固定值。因此針對批量數據的同步,可能將來會考慮用內存隊列替代 MQ。
同時,會採用更加靈活的 Runtime,主要是爲了解決剛纔提到的預分配資源池和共享資源池的統一管理問題。
另外,關於數據質量管理,實際上金融類客戶對數據質量的一致性要求很是高。因此對於一些對數據質量要求很是高的客戶,咱們考慮提供一些後校驗功能,尤爲是針對批量同步。
▼ Apache Flink 社區推薦 ▼
Apache Flink 及大數據領域頂級盛會 Flink Forward Asia 2019 重磅開啓,大會議程精彩上線,瞭解 Flink Forward Asia 2019 的更多信息,請查看:
https://developer.aliyun.com/special/ffa2019
首屆 Apache Flink 極客挑戰賽重磅開啓,聚焦機器學習與性能優化兩大熱門領域,40萬獎金等你拿,加入挑戰請點擊:
https://tianchi.aliyun.com/markets/tianchi/flink2019
本文爲雲棲社區原創內容,未經容許不得轉載。