打造實時數據集成平臺——DataPipeline基於Kafka Connect的應用實踐

導讀:傳統ETL方案讓企業難以承受數據集成之重,基於Kafka Connect構建的新型實時數據集成平臺被寄予厚望。數據庫

在4月21日的Kafka Beijing Meetup第四場活動上,DataPipeline CTO陳肅分享了DataPipeline是如何基於Kafka Connect框架構建實時數據集成平臺的應用實踐。如下內容是基於現場錄音整理的文字,供你們參考。 api

什麼是數據集成?最簡單的應用場景就是:一個數據源,一個數據目的地,數據目的地能夠一個數據倉庫,把關係型數據庫的數據同步到數據倉庫裏,就造成了一次數據集成。微信

企業數據集成面臨的4個挑戰

咱們先來看一個真實的數據集成案例。 架構

G公司是DataPipeline的一個典型客戶,擁有近千個數據源,類型主要包括Oracle、SQL Server、MySQL等。根據業務的須要和現有的基礎設施狀況,這些數據源分別須要同步到不一樣的目的端,類型主要包括MySQL、HDFS、Kafka等。基於以上背景,G公司的具體要求以下:併發

  1. 須要支持約5TB日新增數據量的同步,今年將增加5-10倍。
  2. 這些數據一部分數據源要求實時同步,另外一部分可接受定時同步。
  3. 乏強大的運維人才,現有數據源的業務承載壓力有限,對壓力很是的敏感,要求進行限流。
  4. 從這些數據源到目的地的同步都是Kettle寫腳本實現的,管理起來比較混亂,要求經過一個管理平臺對任務進行集中化的配置和管理。
  5. 上游的數據源和下游的數據目的都不穩定,隨時可能出現各類問題,要求經過一個高可用的平臺以減小數據傳輸中斷的影響。
  6. 當數據同步任務被隨機的暫停/恢復時,要求能夠保證數據的完整性。
  7. 當數據源和目的地隨機出現故障和過載時,要求能夠保證數據的完整性。
  8. 當數據源Schema發生變化時,要求能夠根據業務需求靈活配置目的地策略。

G公司的案例只是當前企業數據集成需求的一個典型應用場景。事實上,不管是互聯網企業仍是傳統企業,在面臨數據集成的時候都會遇到如下4個挑戰:
圖片描述app

1.數據源的異構性:傳統ETL方案中,從數據源到目的地的同步都是腳本實現的,異構數據源就意味着企業要作大量的適配工做。框架

2.數據源的動態性:在數據集成時,上游的數據源端常常會發生各類變化,有些數據源可能被刪掉一些結構,這可能會影響到後續數據分析的結果。 運維

3.任務的可伸縮性:當數據集成只有幾個數據源,系統壓力的問題不太突出。當數據集成面臨的是成百上千個數據源時,多任務並行就須要進行一些限速與緩衝的調度,讓讀寫速度相互匹配。數據庫設計

4.任務的容錯性:當數據在傳輸過程當中出現問題的時候,是否能夠實現斷點重傳,且不產生重複的數據。工具

以上也是DataPipeline要爲企業數據集成過程當中解決的最關鍵的4個問題。

爲何選擇Kafka Connect做爲底層框架

Kafka Connect是一種用於在Kafka和其餘系統之間可擴展的、可靠的流式傳輸數據的工具,能夠更快捷和簡單地將大量數據集合移入和移出Kafka的鏈接器。Kafka Connect爲DataPipeline提供了一個相對成熟穩定的基礎框架,還提供了一些開箱即用的工具,大大地下降研發的投入和提高應用的質量。
圖片描述

下面,咱們看一看Kafka Connect的具體優點。

首先,Kafka Connect提供的是以數據管道爲中心的業務抽象。在Kafka Connect裏有兩個核心概念:Source和Sink。Source負責導入數據到Kafka,Sink負責從Kafka導出數據,它們都被稱爲Connector。好比Source Connector,Sink Connector,其實就是提供了數據讀取和寫入的高度業務抽象,能夠簡化不少生命週期的管理工做。

固然,Source Connector會去初始化Source Task,Sink Connector會去初始化Sink Task。這些都是標準的封裝。對於數據方面,經過Source & Sink Record把數據的結構進行標準化的抽象。另外,企業客戶在作數據集成的時候,數據在不少應用場景下都要求有必定的格式,因此在Kafka Connect裏用Schema Registry & Projector來解決數據格式驗證和兼容性的問題。當數據源產生變化的時候,會生成新的Schema版本,經過不一樣的處理策略用Projector來完成對數據格式的兼容。

第二,Kafka Connect具備良好的可伸縮性、與容錯性。這些特性是與Kafka是一脈相承的。在流式處理和批量處理模式裏,更多取決於Source端如何去讀取數據,Kafka Connect自然支持流式處理和批量傳輸方式。單節點和集羣水平擴展功能都是由Kafka Connect框架直接支持。而任務恢復和狀態保持方面,目的端任務的寫入進度信息經過Kafka Connect框架自動管理、源端任務能夠根據須要往Kafka裏面放讀取進度信息,節省不少精力去管理任務重啓後的進度。

對於數據集成這樣一個通用的應用場景裏,你們確定都不但願重複發明輪子。目前,在Kafka Connect生態系統下,擁有能夠直接使用的Connector共84個,絕大部分都是開源的。其中,一部分是Kafka官方提供的,另一些是Confluent認證的,還有一些是第三方提供的。根據需求適當裁剪後,這些Connector均可以應用到本身的系統平臺中。

DataPipeline解決哪些數據集成的核心難題

基於Kafka Connect 框架,DataPipeline已經完成了不少優化和提高工做,能夠很好地解決當前企業數據集成面臨的不少核心難題。

1.任務的獨立性與全局性。
從Kafka設計之初,就聽從從源端到目的的解耦性。下游能夠有不少個Consumer,若是不是具備這種解耦性,消費端很難擴展。企業作數據集成任務的時候,須要源端到目的端的協同性,由於企業最終但願把握的是從源端到目的端的數據同步擁有一個可控的週期,並可以持續保持增量同步。在這個過程當中,源端和目的端相互獨立的話,會帶來一個問題,源端和目的端速度不匹配,一快一慢,形成數據堆積現象嚴重。因此,企業用戶在創建一個數據任務以後,咱們但願對任務進行緩衝的控制,避免數據丟失。

2.任務並行化的方式。

若是企業客戶有1000張數據表須要創建數據集成的任務,就要考慮用什麼方式進行任務切分最佳。其中一種方式是把1000張表切分紅若干個任務。這種狀況下,Source Task的負載很難作到均衡,Sink Task能夠消費多個Topics,依然存在負載不均的問題,每一個任務負載多少張表實際上是很難均衡的。每增長一個任務都會觸發Rebalance機制。能夠想象,每一張表都經過Source Connector和Sink Connector初始化一個源端和目的端任務,會大大增長Rebalance的開銷。

3.異構數據的映射。

在給企業客戶作數據集成的時候,50%概率都會遇到一些髒活累活——異構數據源的映射(Mapping)。這個映射對不少互聯網公司來講不是那麼嚴重什麼事兒,由於數據庫設計的都比較符合規範,對字段的命名方式等都會比較「優雅」(統一)。可是在傳統企業裏,因爲不少業務系統都會外包,還有一些意識的緣由,致使數據庫設計的沒有那麼規範和統一。用Kafka Connect作數據集成的時候,須要儘量作到異構數據精準的還原,尤爲金融行業客戶對此要求比較高。另外,當確實遇到數據之間不匹配的狀況時,能夠在業務數據之間進行比較合理的映射。

另外,源端的Source Record包含了每一列的基本數據類型(INT1六、STRING等)以及可選的meta信息(例如「name」)。目的端處理Sink Record的時候,須要依據基本數據類型以及meta信息決定映射關係。

4.Schema變化的處理策略。

給企業作數據集成的時候,須要根據數據源Schema的變化給出對應的處理策略。基於Kafka Connect框架,咱們提供瞭如下幾種處理策略:
(1)Backward Compatibility:可以使用最新的Schema一致訪問全部數據,e.g. 刪除列、添加具備默認值的列。
(2)Forward Compatibility:可以使用最舊的Schema一致訪問全部數據,e.g. 刪除具備默認值的列。
(3)Full Compatibility:可任意使用新舊Schema訪問全部數據。

Kafka Connect推薦使用Backward Compatibility,這也是Schema Registry的默認值。另外,企業用戶還會提出源端刪除列,目的端須要忽略,源端添加具備默認值列,目的端須要跟隨等需求,都以Task爲單位進行配置和實現。

DataPipeline基於Kafka Connect作了哪些提高

在不斷知足當前企業客戶數據集成需求的同時,DataPipeline也基於Kafka Connect 框架作了不少很是重要的提高。

1.系統架構層面。
DataPipeline引入DataPipeline Manager的概念,主要用於優化Source和Sink的全局化生命週期管理。當任務出現異常時,能夠實現對目的端和全局生命週期的管理。例如,處理源端到目的端讀取速率不匹配以及暫停等狀態的協同。
圖片描述

爲了增強系統的健壯性,咱們把Connector任務的參數保存在ZooKeeper中,方便任務重啓後讀取配置信息。

DataPipeline Connector經過JMX Client將統計信息上報Dashboard。在Connector中在技術上進行一些封裝,把一些通用信息,好比說Connector歷史讀取信息,跟管理相關的信息都採集到Dashboard裏面,提供給客戶。

2.任務並行模式。
DataPipeline在任務並行方面作了一些增強。咱們在具體服務客戶的時候也遇到這樣的問題,須要同步數十張表。在DataPipeline Connector中,咱們容許每一個Task內部能夠定義和維護一個線程池,經過控制線程併發數,而且每一個Task容許設置行級別的IO控制。而對於JDBC類型的Task,咱們額外容許配置鏈接池的大小,減小上游和下游資源的開銷。
圖片描述

3.規則引擎。
DataPipeline在基於Kafka Connect作應用時的基本定位是數據集成。數據集成過程當中,不該當對數據進行大量的計算,可是又不可避免地要對一些字段進行過濾,因此在產品中咱們也在考慮怎樣提供一種融合性。
圖片描述

雖然Kafka Connect提供了一個Transformation接口能夠與Source Connector和Sink Connector進行協同,對數據進行基本的轉換。但這是以Connector爲基本單位的,企業客戶須要編譯後部署到全部集羣的節點,而且缺少良好的可視化動態編譯調試環境支持。

基於這種狀況,DataPipeline產品提供了兩種可視化配置環境:基本編碼引擎(Basic Code Engine)和高級編碼引擎(Advanced Code Engine)。前者提供包括字段過濾、字段替換和字段忽略等功能,後者基於Groovy能夠更加靈活地對數據處理、而且校驗處理結果的Schema一致性。對於高級編碼引擎,DataPipeline還提供了數據採樣和動態調試能力。

4.錯誤隊列機制。
咱們在服務企業客戶的過程當中也看到,用戶源端的數據永遠不會很「乾淨」。不「乾淨」的數據可能來自幾個方面,好比當文件類型數據源中的「髒記錄」、規則引擎處理特定數據產生未預期的異常、由於目的端Schema不匹配致使某些值沒法寫入等各類緣由。
圖片描述

面對這些狀況,企業客戶要麼把任務停下來,要麼把數據暫存到某處後續再處理。而DataPipeline採起的是第二種方式,經過產品中錯誤隊列預警功能指定面對錯誤隊列的策略,支持預警和中斷策略的設置和實施等,好比錯誤隊列達到某個百分比的時候任務會暫停,這樣的設置能夠保證任務不會因少許異常數據而中斷,被完整記錄下來的異常數據能夠被管理員很是方便地進行追蹤、排查和處理。企業客戶認爲,相比之前經過日誌來篩查異常數據,這種錯誤隊列可視化設置功能大大提高管理員的工做效率。

在作數據集成的過程當中,確實不該該對原始數據自己作過多的變換和計算。傳統ETL方案把數據進行大量的變換以後,雖然會產生比較高效的輸出結果,可是當用戶業務需求發生變化時,還須要從新創建一個數據管道再進行一次原始數據的傳輸。這種作法並不適應當前大數據分析的需求。

基於這種考慮,DataPipeline會建議客戶先作少許的清洗,儘可能保持數據的原貌。可是,這並非說,咱們不重視數據質量。將來的重要工做之一,DataPipeline將基於Kafka Streaming將流式計算用於數據質量管理,它不對數據最終輸出的結果負責,而是從業務角度去分析數據在交換過程當中是否發生了改變,經過滑動窗口去判斷到底數據發生了什麼問題,判斷條件是是否超出必定比例歷史均值的記錄數,一旦達到這個條件將進一步觸發告警並暫停同步任務。

總結一下,DataPipeline通過不斷地努力,很好地解決了企業數據集成過程須要解決異構性、動態性、可伸縮性和容錯性等方面的問題;基於Kafka Connect的良好基礎支撐構建了成熟的企業級數據集成平臺;基於Kafka Connect進行二次封裝和擴展,優化了應用Kafka Connect時面臨的挑戰:包括Schema映射和演進,任務並行策略和全局化管理等。將來,Datapipeline將會基於流式計算進一步增強數據質量管理。

PS.添加DataPipeline君微信:datapipeline2018,拉你進技術討論羣。

相關文章
相關標籤/搜索