DataX在有贊大數據平臺的實踐

1、需求

有贊大數據技術應用的早期,咱們使用 Sqoop 做爲數據同步工具,知足了 MySQL 與 Hive 之間數據同步的平常開發需求。mysql

隨着公司業務發展,數據同步的場景愈來愈多,主要是 MySQL、Hive 與文本文件之間的數據同步,Sqoop 已經不能徹底知足咱們的需求。在2017年初,咱們已經沒法忍受 Sqoop 給咱們帶來的折磨,準備改造咱們的數據同步工具。當時有這麼些很最痛的需求:sql

  • 屢次因 MySQL 變動引發的數據同步異常。MySQL 須要支持讀寫分離與分表分庫模式,並且要兼容可能的數據庫遷移、節點宕機以及主從切換
  • 有很多異常是由表結構變動致使。MySQL 或 Hive 的表結構均可能發生變動,須要兼容多數的表結構不一致狀況
  • MySQL 讀寫操做不要影響線上業務,不要觸發 MySQL 的運維告警,不想每天被 DBA 噴
  • 但願支持更多的數據源,如 HBase、ES、文本文件

做爲數據平臺管理員,還但願收集到更多運行細節,方便平常維護:數據庫

  • 統計信息採集,例如運行時間、數據量、消耗資源
  • 髒數據校驗和上報
  • 但願運行日誌能接入公司的日誌平臺,方便監控

2、選型

基於上述的數據同步需求,咱們計劃基於開源作改造,考察的對象主要是 DataX 和 Sqoop,它們之間的功能對好比下後端

功能 DataX Sqoop
運行模式 單進程多線程 MapReduce
MySQL讀寫 單機壓力大;讀寫粒度容易控制 MapReduce 模式重,寫出錯處理麻煩
Hive讀寫 單機壓力大 擴展性好
文件格式 orc支持 orc不支持,可添加
分佈式 不支持,能夠經過調度系統規避 支持
流控 有流控功能 須要定製
統計信息 已有一些統計,上報需定製 沒有,分佈式的數據收集不方便
數據校驗 在core部分有校驗功能 沒有,分佈式的數據收集不方便
監控 須要定製 須要定製
社區 開源不久,社區不活躍 一直活躍,核心部分變更不多

DataX 主要的缺點在於單機運行,而這個能夠經過調度系統規避,其餘方面的功能均優於 Sqoop,最終咱們選擇了基於 DataX 開發。api

3、前期設計

3.1 運行形態

使用 DataX 最重要的是解決分佈式部署和運行問題,DataX 自己是單進程的客戶端運行模式,須要考慮如何觸發運行 DataX。服務器

咱們決定複用已有的離線任務調度系統,任務觸發由調度系統負責,DataX 只負責數據同步。這樣就複用了系統能力,避免重複開發。關於調度系統,可參考文章《大數據開發平臺(Data Platform)在有讚的最佳踐》網絡

在每一個數據平臺的 worker 服務器,都會部署一個 DataX 客戶端,運行時可同時啓動多個進程,這些都由調度系統控制。多線程

3.2 執行器設計

爲了與已有的數據平臺交互,須要作一些定製修改:運維

  • 符合平臺規則的狀態上報,如啓動/運行中/結束,運行時需上報進度,結束需上報成功失敗
  • 符合平臺規則的運行日誌實時上報,用於展現
  • 統計、校驗、流控等子模塊的參數可從平臺傳入,並須要對結果作持久化
  • 須要對異常輸入作好兼容,例如 MySQL 主從切換、表結構變動

3.3 開發策略

大體的運行流程是:前置配置文件轉換、表結構校驗 -> (輸入 -> DataX 核心+業務無關的校驗 -> 輸出) -> 後置統計/持久化分佈式

儘可能保證 DataX 專一於數據同步,儘可能不隱含業務邏輯,把有贊特有的業務邏輯放到 DataX 以外,數據同步過程沒法知足的需求,纔去修改源碼。

表結構、表命名規則、地址轉換這些運行時前置校驗邏輯,以及運行結果的持久化,放在元數據系統(參考《有贊數據倉庫元數據系統實踐》),而運行狀態的監控放在調度系統。

4、源碼改造之路

4.1 支持 Hive 讀寫

DataX 並無自帶 Hive 的 reader 和 writer,而只有 HDFS 的 reader 和writer。咱們選擇在 DataX 以外封裝,把 Hive 讀寫操做的配置文件,轉換爲 HDFS 讀寫的配置文件,另外輔助上 Hive DDL 操做。具體的,咱們作了以下改造:

4.1.1 Hive 讀操做

  • 根據表名,拼接出 HDFS 路徑。有讚的數據倉庫規範裏有一條,禁止使用外部表,這使得 HDFS 路徑拼接變得容易。如果外部表,就須要從元數據系統獲取相應的路徑
  • Hive 的表結構獲取,須要依賴元數據系統。還需對 Hive 表結構作校驗,後面會詳細說明

4.1.2 Hive 寫操做

  • 寫 Hive 的配置裏不會指定 Hive 的文件格式、分隔符,須要讀取元數據,獲取這些信息填入 HDFS 的寫配置文件
  • 支持新建不存在的 Hive 表或分區,能構建出符合數據倉庫規範的建表語句

4.2 MySQL -> Hive兼容性

按 DataX 的設計理念,reader 和 writer 相互不用關心,但實際使用常常須要關聯考慮才能避免運行出錯。MySQL 加減字段,或者字段類型變動,都會致使 MySQL 和 Hive 的表結構不一致,須要避免這種不一致的運行出錯。

4.2.1 MySQL -> Hive 非分區表

非分區表都是全量導入,以 mysqlreader 配置爲準。若是 MySQL 配置字段與 Hive 實際結構不一致,則把 Hive 表 drop 掉後重建。表重建可能帶來下游 Hive SQL 出錯的風險,這個靠 SQL 的定時檢查規避。

Hive 表重建時,須要作 MySQL 字段轉換爲 Hive 類型,好比 MySQL 的 varchar 轉爲 Hive 的 string。這裏有坑,Hive 沒有無符號類型,注意 MySQL 的 int unsigned 的取值範圍,須要向上轉型,轉爲 Hive 的 bigint;同理,MySQL 的 bigint unsigned 也須要向上轉型,咱們根據實際業務狀況大膽轉爲 bigint。而 Hive 的 string 是萬能類型,若是不知道怎麼轉,用 string 是比較保險的。

4.2.2 MySQL -> Hive 分區表

Hive 分區表不能隨意變動表結構,變動可能會致使舊分區數據讀取異常。因此寫Hive 分區表時,以 Hive 表結構爲準,表結構不一致則直接報錯。咱們採起了以下的策略

MySQL字段 Hive實際字段 處理方法
a,b a,b 正常
a,b,c a,b 忽略MySQL的多餘字段,以Hive爲準
b,a a,b 順序不對,調整
a a,b MySQL少一個,報錯
a,c a,b 不匹配, 報錯
未指定字段 a,b 以Hive爲準

這麼作偏保守,對於無害的Hive分區表變動,其實能夠大膽去作,好比int類型改bigint、orc表加字段。

4.3 適配 MySQL 集羣

有贊並無獨立運行的 MySQL 實例,都是由 RDS 中間件管理着 MySQL 集羣,有讀寫分離和分表分庫兩種模式。讀寫 MySQL 有兩種選擇,經過 RDS 中間件讀寫,以及直接讀寫 MySQL 實例。

方案 優先 缺點
連實例 性能好;不影響線上業務 當備庫維護或切換地址時,須要修改配置;開發者不知道備庫地址
連 RDS 與普通應用一致;屏蔽了後端維護 對 RDS 形成額外壓力,有影響線上業務的風險;須要徹底符合公司 SQL 規範

對於寫 MySQL,寫入的數據量通常不大,DataX 選擇連 RDS,這樣就不用額外考慮主從複製。 對於讀 MySQL,考慮到有大量的全表同步任務,特別是凌晨離線任務高峯流量特別大,避免大流量對 RDS 中間件的衝擊,DataX 選擇直連到 MySQL 實例去讀取數據。爲了規避 MySQL 維護帶來的地址變動風險,咱們又作了幾件事情:

  • 元數據維護了標準的 RDS 中間件地址
  • 主庫、從庫、RDS 中間件三者地址能夠關聯和任意轉換
  • 每次 DataX 任務啓動時,獲取最新的主庫和從庫地址
  • 按期的 MySQL 連通性校驗
  • 與 DBA 創建協做關係,變動提早通知

讀取 MySQL 時,對於讀寫分離,每次獲取其中一個從庫地址並鏈接;對於分表分庫,咱們有1024分片,就要轉換出1024個從庫地址,拼接出 DataX 的配置文件。

4.4 MySQL 運維規範的兼容

4.4.1 避免慢 SQL

前提是有讚的 MySQL 建表規範,規定了建表必須有整型自增id主鍵。另外一條運維規範,SQL 運行超過2s會被強行 kill 掉。

以讀取 MySQL 全表爲例,咱們把一條全表去取的 SQL,拆分爲不少條小 SQL,而每條小 SQL 只走主鍵 id 的聚簇索引,代碼以下 select ... from table_name where id>? by id asc limit ?

4.4.2 避免過快讀寫影響其餘業務

執行完一條 SQL 後會強制 sleep 一下,讓系統不能太忙。不管是 insert 的 batchSize,仍是 select 每次分頁大小,咱們都是動態生成的,根據上一條運行的時間,運行太快就多 sleep,運行太慢就少 sleep,同時調整下一個批次的數量。

這裏還有改進的空間,能夠根據系統級的監控指標動態調整速率,好比磁盤使用率、CPU 使用率、binlog 延遲等。實際運行中,刪數據很容易引發 binlog 延遲,僅從 delete 語句運行時間沒法判斷是否刪的太快,具體緣由還沒有去深究。

4.5 更多的插件

除了最經常使用的 MySQL、Hive,以及邏輯比較簡單的文本,咱們還對 HBase 的讀寫根據業務狀況作了簡單改造。 咱們還全新開發了 eswriter,以及有贊 kvds 的 kvwriter,這些都是由相關存儲的開發者負責開發和維護插件。

4.6 與大數據體系交互

4.6.1 上報運行統計數據

DataX 自帶了運行結果的統計數據,咱們但願把這些統計數據上報到元數據系統,做爲 ETL 的過程元數據存儲下來。

基於咱們的開發策略,不要把有贊元數據系統的 api 嵌入 DataX 源碼,而是在 DataX 以外獲取 stdout,截取出打印的統計信息再上報。

4.6.2 與數據平臺的交互

數據平臺提供了 DataX 任務的編輯頁面,保存後會留下 DataX 運行配置文件以及調度週期在平臺上。調度系統會根據調度週期和配置文件,定時啓動 DataX 任務,每一個 DataX 任務以獨立進程的方式運行,進程退出後任務結束。運行中,會把 DataX 的日誌實時傳輸並展現到頁面上。

4.7 考慮更多異常

DataX 代碼中多數場景暴力的使用catch Exception,缺少對各異常場景的兼容或重試,一個大任務執行過程當中出現網絡、IO等異常容易引發任務失敗。最多見的異常就是 SQLException,須要對異常作分類處理,好比 SQL 異常考慮重試,批量處理異常改走單條依次處理,網絡異常考慮數據庫鏈接重建。 HDFS 對異常容忍度較高,DataX 較少捕獲異常。

4.8 測試場景改造

4.8.1 持續集成

爲了發現低級問題,例如表遷移了但任務還在、普通表改爲了分區表,咱們天天晚上20點之後,會把當天運行的全部重要 DataX 任務「重放」一遍。

這不是原樣重放,而是在配置文件里加入了一個測試的標識,DataX 啓動後,reader 部分只會讀取一行數據,而 writer 會把目標地址指向一個測試的空間。這個測試能保證 DataX基本功能沒問題,以及整個運行環境沒有問題。

4.8.2 全鏈路壓測場景

有贊全鏈路壓測系統經過 Hive 來生成數據,經過 DataX 把生成好的數據導入影子庫。影子庫是一種建在生產 MySQL 裏的 database,對普通應用不可見,加上 SQL 的特殊 hint 才能夠訪問。

生產環境的全鏈路壓測是個高危操做,一旦配置文件有誤可能會破壞真實的生產數據。DataX 的 MySQL 讀寫參數裏,加上了全鏈路壓測的標記時,只能讀寫特定的 MySQL 和 Hive 庫,並配置數據平臺作好醒目的提醒。

5、線上運行狀況

DataX 是在2017年二季度正式上線,到2017年Q3完成了上述的大部分特性開發,後續僅作了少許修補。到2019年Q1,已經穩定運行了超過20個月時間,目前天天運行超過6000個 DataX 任務,傳輸了超過100億行數據,是數據平臺裏比較穩定的一個組件。

期間出現過一些小問題,有一個印象深入。原生的 hdfsreader 讀取超大 orc 文件有 bug,orc 的讀 api 會把大文件分片成多份,默認大於256MB會分片,而 datax 僅讀取了第一個分片,修改成讀取全部分片解決問題。由於256MB足夠大,這個問題不多出現很隱蔽。除此以外沒有發現大的 bug,平時遇到的問題,多數是運行環境或用戶理解的問題,或是能夠克服的小問題。

6、下一步計劃

對於 DataX 其實並無再多的開發計劃。在需求列表裏積累了十幾條改進需求,而這些需求即使1年不去改進,也不會影響線上運行,諸如髒數據可讀性改進、支持 HDFS HA。這些不重要不緊急的需求,暫時不會再投入去作。

DataX 主要解決批量同步問題,沒法知足多數增量同步和實時同步的需求。對於增量同步咱們也有了成熟方案,會有另外一篇文章介紹咱們自研的增量同步產品。

相關文章
相關標籤/搜索