移山(數據遷移平臺)的數據遷移是怎麼實現的

  1. 移山是禧雲自研的數據遷移平臺,經過在移山的配置操做,能夠方便的實現第三方數據接入、實時數據同步、異構數據源間遷移。
  2. 本文主要介紹移山數據遷移平臺對異構數據源間遷移這塊的實現思路和圖形化配置實現流程。

| 一. 什麼是異構數據源

能夠理解爲:指數據結構、存取方式、形式不同的多個數據源:前端

  • 一個公司在信息化建設中,不一樣時期、不一樣背景、面對不一樣的應用和客戶會催生出多個系統;
  • 每一個系統可能會積累大量不一樣存儲方式的數據,從簡單的 Excel 文件數據、txt 文本數據到複雜的關係型數據庫 MYSQL、Oracle 數據等,它們構成了異構數據源。

| 二. 異構數據源遷移

離線數據倉庫

  1. 爲支撐集團運營發展和決策分析,禧雲起建之初構建了完善的離線數據倉庫體系;
  2. 從業務數據源抽取過來的數據存放在數據湖(Hbase)中;
  3. 數據倉庫的數據存儲爲 HDFS,離線計算框架爲 Hive,Spark。

數據遷移場景

在創建數據倉庫的過程當中,會有大量的 ETL(Extract、Transform、Load)工做,處理數據抽取、數據轉換、數據裝載的過程當中會有須要多種異構數據源(txt、Hbase、HDFS、Mysql、SqlServer等)遷移的場景。vue

場景1: 簡單的數據轉換:字段水平的簡單映射
數據中的一個字段被轉移到目標數據字段中的一個過程。

好比:python

  • 業務數據庫(Mysql)到數據湖(Hbase);
  • 從數據湖(Hbase)到數據倉庫(HDFS);
  • 數據倉庫(HDFS)到目標關係型數據庫(Mysql)。

備註git

  • 複雜的數據轉換
  • 須要作更多的數據分析,包括通用標識符問題、複雜條件過濾、時間類型的轉換、去除重複記錄等;
  • 此時能夠藉助魔盒(開發協做平臺)來完成 Spark 計算任務的提交、工做流調度來實現;
  • 該類數據分析處理不在本文討論範圍,如有興趣瞭解能夠看這篇
場景2: 關係型數據庫之間的數據遷移
好比 數據報表系統 B 須要使用 業務系統 A 中的某一張表的數據用來作數據報表展示時,須要保證過多的數據查詢、數據聚合處理不會影響該 系統 A 的正常業務。

| 三. 移山數據遷移工具

1. 目的

解決數據湖(Hbase)中的數據遷移至數據倉庫各層及關係型數據庫之間的數據遷移問題。

2. 數據遷移工具

因爲要支持異構數據源的遷移,【移山】採用了在阿里內部被普遍使用的離線數據同步工具 DataX。github

2.1 DataX簡介

DataX 是一個異構數據源離線同步工具,致力於實現包括關係型數據庫(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各類異構數據源之間穩定高效的數據同步功能。sql

2.2 DataX3.0框架設計

DataX自己做爲離線數據同步框架,採用Framework + plugin架構構建。將數據源讀取和寫入抽象成爲Reader/Writer插件,歸入到整個同步框架中。
datax1.jpgshell

  • Reader:數據採集模塊,負責採集數據源的數據,將數據發送給Framework。
  • Writer:數據寫入模塊,負責不斷向Framework取數據,並將數據寫入到目的端。
  • Framework:Framework用於鏈接Reader和Writer,做爲二者的數據傳輸通道,並處理緩衝,流控,併發,數據轉換等核心技術問題。
2.3 DataX3.0插件體系

DataX目前已經有了比較全面的插件體系,主流的 RDBMS 數據庫、NOSQL、大數據計算系統都已經接入:
數據遷移能力.png
備註數據庫

2.4 移山數據遷移功能對DataX的封裝
  • DataX 配置過程比較複雜,而且只支持命令行方式執行;
  • 爲下降使用難度,咱們將配置過程進行了圖形化處理,採用 Python 的 Flask 框架進行封裝,任務執行支持 HTTP 請求調用

| 四. 移山數據遷移實現流程

針對前面提到的兩種場景,咱們開發了數據遷移功能,如下爲【移山】中數據遷移的主要實現流程。json

技術棧

前端
Vue.js + Element UI + vue-socket + codemirror
移山服務端
SpringBoot
DataX 服務端
python + Flask + gunicorn + supervisor + Celery

1. 建立Reader/Writer插件

1.1 準備插件配置模板

拿建立一個 HbaseReader 插件爲例,配置樣例以下:瀏覽器

{
    "name": "hbase11xreader",
    "parameter": {
        "hbaseConfig": {
            "hbase.rootdir": "hdfs://test/test1",
            "hbase.cluster.distributed": "true",
            "hbase.zookeeper.quorum": "",
        },
        "table": "table1",
        "encoding": "utf-8",
        "mode": "normal",
        "column": [{
                "name": "info:column1",
                "type": "string"
            },
            {
                "name": "info:column2",
                "type": "string"
            },
            {
                "name": "info:column3",
                "type": "string"
            }
        ],
        "range": {
            "startRowkey": "",
            "endRowkey": "",
            "isBinaryRowkey": true
        }
    }
}

備註

1.2 保存配置

移山平臺展現插件模板的時候使用了 codemirror 在線代碼編輯器,能夠友好的展現 JSON 格式的模板數據,效果以下:
hbase-reader.png
總結

  • 經過藉助 codemirror 插件,JSON 模板能夠高亮的顯示,同時強大的語法檢查功能也有助於解決在編輯過程當中產生的語法錯誤,從而下降了 DataX 的Reader/Writer 插件配置難度。

2. 建立數據遷移任務

在準備好了數據遷移任務所需使用的 Reader/Writer 插件後,就能夠經過移山去建立遷移任務了,其實就是經過圖形化配置去生成一個完整的 Job 配置文件,爲運行任務作準備。

2.1 配置任務基本屬性

該步驟主要配置任務併發數,髒數據限制信息。
create-task0.png

備註

  • 最大髒記錄數:寫入數據時可容許的最大髒記錄數,超過該閾值時,程序會執行失敗;
  • 髒數據佔比:寫入數據時可容許的最大髒數據佔比,超過該閾值時,程序會執行失敗。
2.2 使用Reader插件

create-task1.png
備註

  • 能夠對自動匹配顯示的模板內容進行修改,以適應你的需求;
2.3 使用Writer插件

從下拉列表裏選擇要使用的 Writer 插件,一樣配置內容會自動匹配顯示,一樣能夠對配置內容進行編輯,這裏再也不截圖。

2.4 信息確認

經過前面步驟的配置,完整的 Job 配置文件已經成型, 在任務保存以前,會將該配置文件完整的展現出來,方便使用者對以前步驟裏的設置數據進行檢查,格式以下:
job配置文件.png

3. 執行遷移任務

3.1 前端

3.1.1 創建Websocket 服務 
點擊【執行】按鈕,前端會發出執行任務的請求,瀏覽器與 DataX 服務會經過 Websocket 服務創建鏈接,方便前端實時拿到 DataX 服務產生的任務執行結果:

import io from 'vue-socket.io'
Vue.use(io, domain.socketUrl)

3.1.2 將Job配置數據傳給 DataX 服務:

socket.emit('action', jobConfig);
3.2 DataX 服務端

3.2.1 接收數據,生成 json 配置文件 
從 data 中取出 Job 配置數據,並將該數據寫入到一個 .json 文件裏:

job_config = data['job_config']
# 生成臨時執行文件
file_name = '/tmp/' + str(job_id) + '.json'
with open(file_name, 'w') as f:
    f.write(job_content)
    f.close()

3.2.2 拼接執行命令

command = DATAX\_ROOT + ' data.py ' + file\_name

3.2.3 執行任務
利用 subprocess 的 Popen 用法來執行命令:

child_process = subprocess.Popen(
                    command,
                    stdin=subprocess.PIPE,
                    stderr=subprocess.PIPE,
                    stdout=subprocess.PIPE,
                    shell=True)

3.2.4 經過 stdout 獲取執行信息

while child_process.poll() is None:
    line = child_process.stdout.readline()
    line = line.strip() + '\n'

3.2.5 將執行結果傳送給 Websocket 客戶端
經過socket.emit('action',line)方法,將日誌傳送給 Websocket 客戶端(瀏覽器):

if line:
   emit('execute_info', line)

if child_process.returncode == 0:
   emit('execute_info', '執行成功!')
else:
   emit('execute_info', '執行失敗!')

4. 移山查看任務的執行狀況

4.1 接收執行信息

前端利用 socket.on 方法接收任務執行日誌:

this.$socket.on('execute_info', (info) => {
  // TODO 拿到執行信息,並展現       
})
4.2 展現執行結果

execute-info.png

4.3 監控遷移任務

在監控管理界面,能夠對當天全部的遷移任務進行監控:
遷移任務監控.png

五. 總結

  • 經過使用移山的數據遷移功能,數據開發人員在移山中能夠無感知的使用 DataX 這個 ETL 工具,取代經過手動開發、命令行執行的傳統方式;
  • 經過圖形化配置,就可以快速的建立、執行一個數據遷移任務;
  • 經過監控管理功能,能夠方便的監控任務的執行狀況;
  • 大大的提升了數據開發人員的開發效率。

關注微信公衆號

歡迎你們關注個人微信公衆號閱讀更多文章:
微信公衆號二維碼.jpg

相關文章
相關標籤/搜索