簡介: 獲取更詳細的 Databricks 數據洞察相關信息,可至產品詳情頁查看:https://www.aliyun.com/produc...
做者
高爽,基智科技數據中心負責人
尚子鈞,數據研發工程師sql
北京基智科技有限公司是一家提供智能營銷服務的科技公司。公司願景是基於 AI 和大數據分析爲 B2B 企業提供全流程的智能營銷服務。公司秉承開放,挑戰,專業,創新的價值觀從線索挖掘到 AI 智達、CRM 客戶管理覆蓋客戶全生命週期,實現全渠道的營銷和數據分析決策,幫助企業高效引流,精準拓客,以更低的成本獲取更多的商機。截至目前,基智科技已與包括房產、教育、汽車、企業服務等領域展開普遍合做。安全
在基智科技目前的離線計算任務中,大部分數據源都是來自於業務 DB(MySQL) 。業務 DB 數據接入的準確性、穩定性和及時性,決定着下游整個離線計算 pipeline 的準確性和及時性。最初咱們在 ECS 上搭建了本身的 Hadoop 集羣,天天使用 Sqoop 同步 MySQL 數據,再經由 Spark ETL 任務,落表寫入 Hive ,ES,MongoDB 、MySQL ,經過調用 Service API 作頁籤的展現。架構
咱們的 ETL 任務通常在凌晨1點開始運行,數據處理階段約1h, Load 階段1h+,總體執行時間爲2-3h,下圖爲咱們的 ETL 過程:運維
上面的架構在使用的過程當中如下幾個問題比較突出:dom
隨着業務數據的增加,受 DB 性能瓶頸影響突出。
須要維護多套數據源,數據冗雜,容易造成數據孤島使用不方便。
天級 ETL 任務耗時久,影響下游依賴的產出時間。
數據主要存儲在 HDFS 上,隨着數據的增長,須要增長集羣,成本這一塊也是不小的開銷。
大數據平臺運維成本高。oop
爲了解決天級 ETL 逐漸尖銳的問題,減小資源成本、提早數據產出,咱們決定將T+1級 ETL 任務轉換成T+0實時數據入庫,在保證數據一致的前提下,作到數據落地便可用。性能
考慮過使用 Lambda 架構在離線、實時分別維護一份數據但在實際使用過程當中沒法保證事務性,隨着數據量增大查詢性能低,操做較複雜維護成本比較高等問題最終沒能達到理想化使用。大數據
後來咱們決定選擇數據湖架構,緊接着考察了市場上主流的數據湖架構:Delta Lake(開源和商業版)& Hudi。兩者都支持了 ACID 語義、Upsert、Schema 動態變動、Time Travel 等功能,但也存在差別好比:優化
Delta Lake 優點:阿里雲
Delta Lake 不足:
Hudi 優點:
Hudi 不足:
綜合以上指標,加上咱們以前的平臺就是基於阿里雲平臺搭建,選型時阿里雲還沒有支持 Hudi ,最終咱們選擇了阿里雲 Databricks 數據洞察(商業版 Delta Lake 專業性更強)。同時 Databricks 數據洞察提供全託管服務,可以免去咱們的運維成本。
總體的架構如上圖所示。咱們接入的數據會分爲兩部分,存量歷史數據和實時數據,存量數據使用 Spark 將 MySQL 全量數據導入 Delta Lake 的表中, 實時數據使用 Binlog 採集實時寫入到 Delta Lake 表中,這樣實時數據和歷史數據都同步到同一份表裏面真正實現批流一體操做。
前期在阿里同事的協助下咱們完成了數據遷移的工做,實如今Databricks數據洞察架構下數據開發工做,咱們的前期作的準備以下:
天天作ETL數據清洗,作表的merge操做 ,delta表結構爲:
%sql CREATE TABLE IF NOT EXISTS delta.delta_{table_name}( id bigint, uname string, dom string, email string, update timestamp, created timestamp ) USING delta LOCATION '------/delta/'
%sql MERGE INTO delta.delta_{table_name} AS A USING (SELECT * FROM rds.table_{table_name} where day= date_format (date_sub (current_date,1), 'yyyy-mm-dd') AS B ON A.id=B.id WHEN MATCHED THEN update set A.uname=B.name, A.dom=B.dom, A.email=B.email, A.updated=current_timestamp() WHEN NOT MATCHED THEN INSERT (A.uname,A.dom,A.email,A.update,A.created) values (B.name,B.dom,B.email,current_timestamp(),current_timestamp())
因爲 Delta Lake 的數據僅接入實時數據,對於存量歷史數據咱們是經過 SparkSQL 一次性 Sink Delta Lake 的表中,這樣咱們流和批處理時只維護一張 Delta 表,因此咱們只在最初對這兩部分數據作一次 merge 操做。
同時爲了保證數據的高安全,咱們使用 Databricks Deep Clone 天天會定時更新來維護一張從表以備用。對於每日新增的數據,使用 Deep Clone 一樣只會對新數據 Insert 對須要更新的數據 Update 操做,這樣能夠大大提升執行效率。
CREATE OR REPLACE TABLE delta.delta_{table_name}_clone DEEP CLONE delta.delta_{table_name};
原文連接本文爲阿里雲原創內容,未經容許不得轉載。