有贊大數據技術應用的早期,咱們使用 Sqoop 做爲數據同步工具,知足了 MySQL 與 Hive 之間數據同步的平常開發需求。mysql
隨着公司業務發展,數據同步的場景愈來愈多,主要是 MySQL、Hive 與文本文件之間的數據同步,Sqoop 已經不能徹底知足咱們的需求。在2017年初,咱們已經沒法忍受 Sqoop 給咱們帶來的折磨,準備改造咱們的數據同步工具。當時有這麼些很最痛的需求:sql
做爲數據平臺管理員,還但願收集到更多運行細節,方便平常維護:數據庫
基於上述的數據同步需求,咱們計劃基於開源作改造,考察的對象主要是 DataX 和 Sqoop,它們之間的功能對好比下後端
功能 | DataX | Sqoop |
---|---|---|
運行模式 | 單進程多線程 | MapReduce |
MySQL讀寫 | 單機壓力大;讀寫粒度容易控制 | MapReduce 模式重,寫出錯處理麻煩 |
Hive讀寫 | 單機壓力大 | 擴展性好 |
文件格式 | orc支持 | orc不支持,可添加 |
分佈式 | 不支持,能夠經過調度系統規避 | 支持 |
流控 | 有流控功能 | 須要定製 |
統計信息 | 已有一些統計,上報需定製 | 沒有,分佈式的數據收集不方便 |
數據校驗 | 在core部分有校驗功能 | 沒有,分佈式的數據收集不方便 |
監控 | 須要定製 | 須要定製 |
社區 | 開源不久,社區不活躍 | 一直活躍,核心部分變更不多 |
DataX 主要的缺點在於單機運行,而這個能夠經過調度系統規避,其餘方面的功能均優於 Sqoop,最終咱們選擇了基於 DataX 開發。api
使用 DataX 最重要的是解決分佈式部署和運行問題,DataX 自己是單進程的客戶端運行模式,須要考慮如何觸發運行 DataX。服務器
咱們決定複用已有的離線任務調度系統,任務觸發由調度系統負責,DataX 只負責數據同步。這樣就複用了系統能力,避免重複開發。關於調度系統,可參考文章《大數據開發平臺(Data Platform)在有讚的最佳踐》網絡
在每一個數據平臺的 worker 服務器,都會部署一個 DataX 客戶端,運行時可同時啓動多個進程,這些都由調度系統控制。多線程
爲了與已有的數據平臺交互,須要作一些定製修改:運維
大體的運行流程是:前置配置文件轉換、表結構校驗 -> (輸入 -> DataX 核心+業務無關的校驗 -> 輸出) -> 後置統計/持久化
分佈式
儘可能保證 DataX 專一於數據同步,儘可能不隱含業務邏輯,把有贊特有的業務邏輯放到 DataX 以外,數據同步過程沒法知足的需求,纔去修改源碼。
表結構、表命名規則、地址轉換這些運行時前置校驗邏輯,以及運行結果的持久化,放在元數據系統(參考《有贊數據倉庫元數據系統實踐》),而運行狀態的監控放在調度系統。
DataX 並無自帶 Hive 的 reader 和 writer,而只有 HDFS 的 reader 和writer。咱們選擇在 DataX 以外封裝,把 Hive 讀寫操做的配置文件,轉換爲 HDFS 讀寫的配置文件,另外輔助上 Hive DDL 操做。具體的,咱們作了以下改造:
按 DataX 的設計理念,reader 和 writer 相互不用關心,但實際使用常常須要關聯考慮才能避免運行出錯。MySQL 加減字段,或者字段類型變動,都會致使 MySQL 和 Hive 的表結構不一致,須要避免這種不一致的運行出錯。
非分區表都是全量導入,以 mysqlreader 配置爲準。若是 MySQL 配置字段與 Hive 實際結構不一致,則把 Hive 表 drop 掉後重建。表重建可能帶來下游 Hive SQL 出錯的風險,這個靠 SQL 的定時檢查規避。
Hive 表重建時,須要作 MySQL 字段轉換爲 Hive 類型,好比 MySQL 的 varchar 轉爲 Hive 的 string。這裏有坑,Hive 沒有無符號類型,注意 MySQL 的 int unsigned 的取值範圍,須要向上轉型,轉爲 Hive 的 bigint;同理,MySQL 的 bigint unsigned 也須要向上轉型,咱們根據實際業務狀況大膽轉爲 bigint。而 Hive 的 string 是萬能類型,若是不知道怎麼轉,用 string 是比較保險的。
Hive 分區表不能隨意變動表結構,變動可能會致使舊分區數據讀取異常。因此寫Hive 分區表時,以 Hive 表結構爲準,表結構不一致則直接報錯。咱們採起了以下的策略
MySQL字段 | Hive實際字段 | 處理方法 |
---|---|---|
a,b | a,b | 正常 |
a,b,c | a,b | 忽略MySQL的多餘字段,以Hive爲準 |
b,a | a,b | 順序不對,調整 |
a | a,b | MySQL少一個,報錯 |
a,c | a,b | 不匹配, 報錯 |
未指定字段 | a,b | 以Hive爲準 |
這麼作偏保守,對於無害的Hive分區表變動,其實能夠大膽去作,好比int類型改bigint、orc表加字段。
有贊並無獨立運行的 MySQL 實例,都是由 RDS 中間件管理着 MySQL 集羣,有讀寫分離和分表分庫兩種模式。讀寫 MySQL 有兩種選擇,經過 RDS 中間件讀寫,以及直接讀寫 MySQL 實例。
方案 | 優先 | 缺點 |
---|---|---|
連實例 | 性能好;不影響線上業務 | 當備庫維護或切換地址時,須要修改配置;開發者不知道備庫地址 |
連 RDS | 與普通應用一致;屏蔽了後端維護 | 對 RDS 形成額外壓力,有影響線上業務的風險;須要徹底符合公司 SQL 規範 |
對於寫 MySQL,寫入的數據量通常不大,DataX 選擇連 RDS,這樣就不用額外考慮主從複製。 對於讀 MySQL,考慮到有大量的全表同步任務,特別是凌晨離線任務高峯流量特別大,避免大流量對 RDS 中間件的衝擊,DataX 選擇直連到 MySQL 實例去讀取數據。爲了規避 MySQL 維護帶來的地址變動風險,咱們又作了幾件事情:
讀取 MySQL 時,對於讀寫分離,每次獲取其中一個從庫地址並鏈接;對於分表分庫,咱們有1024分片,就要轉換出1024個從庫地址,拼接出 DataX 的配置文件。
前提是有讚的 MySQL 建表規範,規定了建表必須有整型自增id主鍵。另外一條運維規範,SQL 運行超過2s會被強行 kill 掉。
以讀取 MySQL 全表爲例,咱們把一條全表去取的 SQL,拆分爲不少條小 SQL,而每條小 SQL 只走主鍵 id 的聚簇索引,代碼以下 select ... from table_name where id>? by id asc limit ?
執行完一條 SQL 後會強制 sleep 一下,讓系統不能太忙。不管是 insert 的 batchSize,仍是 select 每次分頁大小,咱們都是動態生成的,根據上一條運行的時間,運行太快就多 sleep,運行太慢就少 sleep,同時調整下一個批次的數量。
這裏還有改進的空間,能夠根據系統級的監控指標動態調整速率,好比磁盤使用率、CPU 使用率、binlog 延遲等。實際運行中,刪數據很容易引發 binlog 延遲,僅從 delete 語句運行時間沒法判斷是否刪的太快,具體緣由還沒有去深究。
除了最經常使用的 MySQL、Hive,以及邏輯比較簡單的文本,咱們還對 HBase 的讀寫根據業務狀況作了簡單改造。 咱們還全新開發了 eswriter,以及有贊 kvds 的 kvwriter,這些都是由相關存儲的開發者負責開發和維護插件。
DataX 自帶了運行結果的統計數據,咱們但願把這些統計數據上報到元數據系統,做爲 ETL 的過程元數據存儲下來。
基於咱們的開發策略,不要把有贊元數據系統的 api 嵌入 DataX 源碼,而是在 DataX 以外獲取 stdout,截取出打印的統計信息再上報。
數據平臺提供了 DataX 任務的編輯頁面,保存後會留下 DataX 運行配置文件以及調度週期在平臺上。調度系統會根據調度週期和配置文件,定時啓動 DataX 任務,每一個 DataX 任務以獨立進程的方式運行,進程退出後任務結束。運行中,會把 DataX 的日誌實時傳輸並展現到頁面上。
DataX 代碼中多數場景暴力的使用catch Exception
,缺少對各異常場景的兼容或重試,一個大任務執行過程當中出現網絡、IO等異常容易引發任務失敗。最多見的異常就是 SQLException,須要對異常作分類處理,好比 SQL 異常考慮重試,批量處理異常改走單條依次處理,網絡異常考慮數據庫鏈接重建。 HDFS 對異常容忍度較高,DataX 較少捕獲異常。
爲了發現低級問題,例如表遷移了但任務還在、普通表改爲了分區表,咱們天天晚上20點之後,會把當天運行的全部重要 DataX 任務「重放」一遍。
這不是原樣重放,而是在配置文件里加入了一個測試的標識,DataX 啓動後,reader 部分只會讀取一行數據,而 writer 會把目標地址指向一個測試的空間。這個測試能保證 DataX基本功能沒問題,以及整個運行環境沒有問題。
有贊全鏈路壓測系統經過 Hive 來生成數據,經過 DataX 把生成好的數據導入影子庫。影子庫是一種建在生產 MySQL 裏的 database,對普通應用不可見,加上 SQL 的特殊 hint 才能夠訪問。
生產環境的全鏈路壓測是個高危操做,一旦配置文件有誤可能會破壞真實的生產數據。DataX 的 MySQL 讀寫參數裏,加上了全鏈路壓測的標記時,只能讀寫特定的 MySQL 和 Hive 庫,並配置數據平臺作好醒目的提醒。
DataX 是在2017年二季度正式上線,到2017年Q3完成了上述的大部分特性開發,後續僅作了少許修補。到2019年Q1,已經穩定運行了超過20個月時間,目前天天運行超過6000個 DataX 任務,傳輸了超過100億行數據,是數據平臺裏比較穩定的一個組件。
期間出現過一些小問題,有一個印象深入。原生的 hdfsreader 讀取超大 orc 文件有 bug,orc 的讀 api 會把大文件分片成多份,默認大於256MB會分片,而 datax 僅讀取了第一個分片,修改成讀取全部分片解決問題。由於256MB足夠大,這個問題不多出現很隱蔽。除此以外沒有發現大的 bug,平時遇到的問題,多數是運行環境或用戶理解的問題,或是能夠克服的小問題。
對於 DataX 其實並無再多的開發計劃。在需求列表裏積累了十幾條改進需求,而這些需求即使1年不去改進,也不會影響線上運行,諸如髒數據可讀性改進、支持 HDFS HA。這些不重要不緊急的需求,暫時不會再投入去作。
DataX 主要解決批量同步問題,沒法知足多數增量同步和實時同步的需求。對於增量同步咱們也有了成熟方案,會有另外一篇文章介紹咱們自研的增量同步產品。