引言:2018年7月25日,DataPipeline CTO陳肅在第一期公開課上做了題爲《從ETL到ELT,AI時代數據集成的問題與解決方案》的分享,本文根據陳肅分享內容整理而成。html
你們好!很高興今天有機會和你們分享一些數據集成方面的見解和應用經驗。先自我介紹一下。我叫陳肅,博士畢業於中國科學院大學,數據挖掘研究方向。如今北京數見科技(DataPipeline)任 CTO。以前在中國移動研究院任職算法工程師和用戶行爲實驗室技術經理,以後做爲合夥人加入過一家互聯網教育公司,從事智能學習方面的研發工做。
git
在畢業後工做的這多年以來,我大部分時候在作大數據和機器學習相關的應用系統研發工做,數據的整合是其中一個很是重要的環節。加入 DataPipeline 後,公司研發的是一款企業級的數據集成產品,旨在幫助企業一站式解決數據集成和元數據管理問題。github
ELT 和 ETL 是數據集成的兩種基本方式。前者專一於大數據的實時抽取和可靠傳輸,後者則包含了更豐富的數據轉換功能。 因爲今天是和 AI 前線的朋友們一塊兒探討數據集成,我主要結合 AI 應用的場景談談:爲何 ELT 是更適合 AI 應用場景的數據集成方案、採用 Kafka 技術棧來構建 ELT 平臺所具有的優點和問題以及咱們所作的一些優化工做。但願可以對你們的工做和學習有所幫助。
算法
今天個人分享主要內容如上圖:數據庫
首先,我會介紹一下 AI 應用中數據集成的典型場景,ETL 和 ELT 兩種數據集成模式的異同點,以及爲何 AI 應用下更適合採用 ELT 模式。而後,我會花一些篇幅介紹數據集成中須要重點考慮的基本問題,以及咱們所採用的底層平臺——Kafka Connect 在解決這些問題上的優點和侷限。apache
接下來,我會介紹 DataPipeline 對於 Kafka Connect 一些優化。有的是從底層作的優化,例如線程池的優化。有的則是從產品特性上的優化,例如錯誤數據隊列。api
最後,咱們談一談 Kafka Connect 和 Kafka Stream 的結合,以及咱們用 Kafka Stream 作數據質量預警方面的一個應用 Case。緩存
1、AI 應用場景下的數據集成
數據集成是把不一樣來源、格式、特色性質的數據在邏輯上或物理上有機地集中,爲企業提供全面的數據共享。AI 是典型的數據驅動應用,數據集成在其中起着關鍵的基礎性做用。
安全
以一個你們所熟悉的在線推薦服務爲例,一般須要依賴三類數據:用戶的屬性 (年齡、性別、地域、註冊時間等)、商品的屬性(分類、價格、描述等)、用戶產生的各種行爲(登陸、點擊、搜索、加購物車、購買、評論、點贊、收藏、加好友、發私信等)事件數據。微信
隨着微服務框架的流行,這三類數據一般會存在於不一樣的微服務中:「用戶管理服務」儲存着用戶的屬性、好友關係、登陸等數據;「商品管理服務」存儲的商品信息;「訂單服務」存儲着用戶的訂單數據;「支付服務」存儲用戶的支付數據;「評論服務」記錄着用戶的評論和點贊數據。爲了實現一個推薦服務,咱們首先須要讓服務可以訪問到這些數據。這種數據訪問應該是非侵入式的,也就是說不能對原有系統的性能、穩定性、安全性形成額外的負擔。所以,推薦服務不該當直接訪問這些分散的數據源,而是應該經過某種方式將這些數據從各個業務子系統中提取出來,聚集到一個邏輯上集中的數據庫 / 倉庫,而後才能方便地使用機器學習框架(例如 Spark MLlib)來讀取數據、訓練和更新模型。
1. ETL 和 ELT 的區別與聯繫
數據集成包含三個基本的環節:Extract(抽取)、Transform(轉換)、Load(加載)。
抽取是將數據從已有的數據源中提取出來,例如經過 JDBC/Binlog 方式獲取 MySQL 數據庫的增量數據;轉換是對原始數據進行處理,例如將用戶屬性中的手機號替換爲匿名的惟一 ID、計算每一個用戶對商品的平均打分、計算每一個商品的購買數量、將 B 表的數據填充到 A 表中造成新的寬表等;加載是將數據寫入目的地。
根據轉換轉換髮生的順序和位置,數據集成能夠分爲 ETL 和 ELT 兩種模式。ETL 在數據源抽取後首先進行轉換,而後將轉換的結果寫入目的地。ELT 則是在抽取後將結果先寫入目的地,而後由下游應用利用數據庫的聚合分析能力或者外部計算框架,例如 Spark 來完成轉換的步驟。
2.爲何 ELT 更適合 AI 應用場景
爲何說 ELT 更適合 AI 的應用場景呢?
首先這是由 AI 應用對數據轉換的高度靈活性需求決定的。 絕大多數 AI 應用使用的算法模型都包括一個特徵提取和變換的過程。根據算法的不一樣,這個特徵提取多是特徵矩陣的簡單的歸一化或平滑處理,也能夠是用 Aggregation 函數或 One-Hot 編碼進行維度特徵的擴充,甚至特徵提取自己也須要用到其它模型的輸出結果。這使得 AI 模型很難直接利用 ETL 工具內建的轉換功能,來完成特徵提取步驟。此外,企業如今不多會從零構建 AI 應用。當應用包括 Spark/Flink MLlib 在內的機器學習框架時,內建的模型庫自己每每包含了特徵提取和變換的邏輯,這使得在數據提取階段就作複雜變換的必要性進一步下降。
其次,企業常常會基於一樣的數據構建不一樣應用。 以我以前所在的一家在線教育公司爲例,咱們構建了兩個 AI 的應用:其中一個是針對各種課程的推薦應用,主要用於增長用戶的購買轉化率。另一個是自適應學習系統,用於評估用戶的知識掌握程度和題目的難度和區分度,從而爲用戶動態地規劃學習路徑。兩個應用都須要用戶屬性、作題記錄、點擊行爲以及學習資料文本,但採用的具體模型的特徵提取和處理方式徹底不一樣。若是用 ETL 模式,咱們須要從源端抽取兩遍數據。而採用 ELT 模式,全部數據存儲在 HBase 中,不一樣的應用根據模型須要過濾提取出所需的數據子集,在 Spark 集羣完成相應的特徵提取和模型計算,下降了對源端的依賴和訪問頻次。
最後,主流的機器學習框架,例如 Spark MLlib 和 Flink MLlib,對於分佈式、並行化和容錯都有良好的支持,而且易於進行節點擴容。 採用 ELT 模式,咱們能夠避免構建一個專有數據轉換集羣(可能還伴隨着昂貴的 ETL 產品 License 費用),而是用一個通用的、易於建立和維護的分佈式計算集羣來完成全部的工做,有利於下降整體擁有成本,同時提高系統的可維護性和擴展性。
2、從 ETL 和 ELT 面臨的主要問題
採用 ELT 模式,意味着能夠較少的關注數據集成過程當中的複雜轉換,而將重點放在讓數據儘快地傳輸上。然而,一些共性的問題依然須要獲得解決:
1.數據源的異構性: 傳統 ETL 方案中,企業要經過 ETL 工具或者編寫腳本的方式來完成數據源到目的地同步工做。當數據源異構的時候,須要特別考慮 Schema(能夠簡單理解爲數據字段類型)兼容性帶來的影響。不管是 ETL 仍是 ELT,都須要解決這一問題。
2.數據源的動態性: 動態性有兩方面含義。一是如何獲取數據源的增量;二是如何應對數據源端的 Schema 變化,例如增長列和刪除列。
3.任務的可伸縮性: 當面對少許幾個數據源,數據增量不過每日幾百 MB 的時候,ELT 平臺的可伸縮性不是什麼大問題。當 ELT 面對的是成百上千個數據源,或者數據源數據增速很快時,ELT 平臺的任務水平切分和多任務並行處理就成爲一個必備的要求。平臺不只要支持單節點的多任務並行,還須要支持節點的水平擴展。此外,ELT 的上游一般會遇到一些吞吐能力較差的數據源,須要可以對讀取進行限速,避免對現有業務產生影響。
4.任務的容錯性:ELT 平臺某些節點出現故障的時候,失敗的做業必須可以遷移到健康的節點上繼續工做。同時,做業的恢復須要實現斷點重傳,至少不能出現丟失數據,最好可以作到不產生重複的數據。
3、Kafka Connect 的架構
1.Kafka Connect:基於 Kafka 的 ELT 框架
可用於構建 ELT 的開源數據集成平臺方案不止一種,較普遍採用的包括 Kafka Connect、DataX 等,也有公司直接採用 Flink 等流式計算框架。DataPipeline 做爲一家提供企業數據集成產品的公司,咱們在 Kafka Connect 之上踩了許多坑而且也作了許多優化。
4、踩過的坑與優化的點
1.Kafka Connect 應用於ELT的關鍵問題1
下面咱們聊一聊 Kafka Connect 應用過程當中的幾個關鍵問題。
首先是 任務的限速和數據緩存問題。從 Kafka Connect 設計之初,就聽從從源端到目的地解耦性。當 Source 的寫入速度長時間大於 Sink 端的消費速度時,就會產生 Kafka 隊列中消息的堆積。若是 Kafka 的 Topic Retention 參數設置不當,有可能會形成數據在消費前被回收,形成數據丟失。Kafka Connect 框架自己並無提供 Connector 級別的限速措施,須要進行二次開發。
2.Kafka Connect 應用於ELT的關鍵問題2
用戶有多個數據源,或者單一數據源中有大量的表須要進行並行同步時,任務的並行化問題 就產生了。Kafka Connect 的 rebalance 是牽一髮動全身,一個新任務的開始和中止都會致使全部任務的 reload。當任務數不少的時候,整個 Kafka Connect 集羣可能陷入長達數分鐘的 rebalance 過程。
解決的方法,一是用 CDC(Change Data Capture)來捕獲全局的數據增量;二是 在任務內部引入多線程輪詢機制,減小任務數量並提升資源利用率。
3.Kafka Connect 應用於ELT的關鍵問題3
異構數據源同步會遇到 Schema 不匹配 的問題。在須要精確同步的場景下(例如金融機構的異構數據庫同步),一般須要 Case by Case 的去定義映射規則。而在 AI 應用場景下,這個問題並非很突出,模型訓練對於損失一點精度一般是可容忍的,一些數據庫獨有的類型也不經常使用。
4.Kafka Connect 應用於ELT的關鍵問題4
Source 端須要可以檢測到 Schema 的變化,從而生成具備正確 Schema 格式的 Source Record。CDC 模式下,經過解析 DDL 語句能夠獲取到。非 CDC 模式下,須要保存一個快照纔可以獲取到這種變化。
下面我用一些時間對 DataPipeline 所作的優化和產品特性方面的工做。
DataPipeline 是一個底層使用 Kafka Connect 框架的 ELT 產品。首先,咱們在底層上引入了 Manager 來進行全局化的任務管理。Manager 負責管理 Source Connector 和 Sink Connector 的生命週期,與 Kafka Connect 的管理 API 經過 REST 進行交互。
系統的任何運行異常,都會進行統一的處理,並由通知中心發送給任務的負責人和運維工程師。咱們還提供了一個 Dashboard,用於圖形化方式對任務進行生命週期管理、檢索和狀態監控。用戶能夠告別 Kafka Connect 的命令行。
5.DataPipeline的任務並行模型
DataPipeline 在任務並行方面作了一些增強。在 DataPipeline Connector 中,咱們在每一個 Task 內部定義和維護一個線程池,從而可以用較少的 Task 數量達到比較高的並行度,下降了 rebalance 的開銷。 而對於 JDBC 類型的 Connector,咱們額外容許配置鏈接池的大小,減小上游和下游資源的開銷。此外,每一個 Connector 還能夠定義本身限速策略,以適應不一樣的應用環境需求。
6.DataPipeline 的錯誤隊列機制
經過產品中錯誤隊列預警功能,用戶能夠指定面對錯誤數據暫存和處理邏輯,好比錯誤隊列達到某個百分比的時候任務會暫停,這樣的設置能夠保證任務不會因少許異常數據而中斷,被完整記錄下來的異常數據能夠被管理員很是方便地進行追蹤、排查和處理。
相比之前經過日誌來篩查異常數據,這種錯誤隊列可視化功能可以大大提高管理員的工做效率。
7.DataPipeline 的數據轉換
DataPipeline 實現了本身的 動態加載機制。提供了兩種 可視化的轉換器:基本轉換器和高級轉換器。前者提供包括字段過濾、字段替換和字段忽略等功能;後者基於 Java,能夠更加靈活地對數據處理,而且校驗處理結果的 Schema 一致性。DataPipeline 還提供了數據採樣和動態調試能力,方便用戶進行表級別的轉換規則開發。
值得注意的是,Kafka 不只僅是一個消息隊列系統,自己也提供了持久化能力。一個很天然的問題就是:可否不額外引入 Sink 端的外部存儲,直接從 Kafka 中獲取訓練數據?
若是模型自己要用到某個 Topic 的全量數據或者最近一段時間的數據,那麼經過設置合適的 retention 參數,能夠直接將 Kafka 做爲訓練數據的來源。Kafka 的順序讀模式能夠提供很是高的讀取速度;若是模型要根據消息的內容作數據篩選,那麼因爲 Kafka 自己並不提供檢索能力,須要遍歷全部消息,這樣就顯得比較低效了。
當模型用於線上時,可能還須要引入流式計算來完成實時特徵的提取工做。Kafka 自己就提供了這種流式計算能力。
8.流式計算在 ELT 中的做用 - 數據質量預警
DataPipeline 也將流式計算引入到平臺的質量預警功能中。在咱們的將來版本中,用戶能夠定義 Topic 級別的質量預警規則模型,例如「在 5 分鐘時間內,數據記錄的字段 1 均值超過歷史均值記錄的比率超過 70%」爲異常,採起策略爲「告警並暫停同步」。經過這種方式,能夠在 ELT 的過程當中,儘早發現數據中的異常現象,避免大量異常數據進入數據目的地。
5、總結與展望
最後總結一下。數據集成並非什麼新的概念,在過去二十多年間已經普遍應用於各個行業的信息系統。ELT 和 ETL 相比,最大的區別是「重抽取和加載,輕轉換」,從而能夠用更簡單的技術棧、更輕量的方案搭建起一個知足現代企業應用的數據集成平臺。AI 應用內在的特色也使得 ELT 特別適合這個場景。
Kafka Connect 自己是一個業界被普遍採用的 ELT 框架,針對容錯、分佈式、Schema 一致性等方面都提供了良好的支持,同時有大量的社區和商業資源可供參考和選擇。DataPipeline 基於 Kafka Connect 作了大量數據集成場景下的優化,與 Kafka Stream 相結合,可以爲包括 AI 在內的各類應用場景構建起一個完整的數據層支撐方案。
有其它關於數據集成的技術問題,也歡迎一塊兒探討、共同提升。
參考資料
· How to Build and Deploy Scalable Machine Learning in Production with Apache Kafka
https://www.confluent.io/blog/
· Kafka Connect 官方文檔
https://docs.confluent.io/cur...
· Machine Learning + Kafka Streams Examples
https://github.com/kaiwaehner
· PredictionIO- 基於 Spark 的機器學習框架
http://predictionio.apache.org
Q & A
Q1:DataPipeline 避開了數據處理這個過程,並以此提升性能,這個思路很承認。可是有個問題:從數據生產到數據利用的環節中,總要有一步數據處理的步驟的,這個步驟,從產品角度,DataPipeline 是如何考慮的?
A1:ELT 的核心思想就是要利用下游數據存儲性能大幅提高和機器學習應用的靈活性的優點,在數據流轉的過程當中不作過於複雜的計算。若是真的須要作處理,也能夠基於咱們的產品能夠去寫轉換的代碼。但這種處理都是無狀態的。有狀態處理,建議放到下游去作。這樣才更符合 ELT 的理念。
Q2:請問數據的落地是自動的嗎?
A2: 基於原生 Kafka Connector,須要命令行啓動目標端類型的 Sink Connector,指定消費的 topic 列表,經過代碼完成數據落地。基於 DataPipeline 產品,經過界面配置源和目的地後,落地是徹底自動的。
Q3:多線程讀,對源端的數據表或用戶權限有沒有特定的要求?
A3:JDBC 模式的 Source Connector 使用的 RDBMS 用戶,須要具備選擇同步表的 select 權限。CDC 模式的各不相同,參照產品內詳盡的權限配置說明。
Q4:如何保證生產和消費的 EOS 恰好一次語義?
A4: Kafka Connect 下的 Exactly Once Semantic 依賴於具體 Connector 實現,Kafka Connect 框架自己對此只提供了必要非充分的支持。咱們先來看 Source 端:假定 Source Connector 是從 MySQL 的 Binlog 中抽取數據到 Kafka,爲了實現 EOS,首先 Source Connector 在每次提交記錄到 Kafka 的時候,須要原子化的記錄下來對應的 binlog position,這樣才能保證任務異常中斷、重啓後可以從這個 position 繼續讀取。Kafka Connect 框架在 Source 端封裝了 offset storage 的存儲更新邏輯。offset storage 本質上是一個 Kafka 的 topic,利用 Kafka 的事務機制,理論上能夠保證 offset 的修改和消息發送的原子性。再來看 Sink 端:若是 Sink Connector 能夠將數據的輸出和 Offset 的記錄進行原子化操做,那麼同理也可以作到 EOS。但這個原子化操做須要 Sink 端本身用某種機制實現,例如 Confluent 的 HDFS Connector 就用 WAL 日誌保證了寫入的 EOS。
PS.添加DataPipeline君微信:datapipeline2018,拉你進技術討論羣。