- 移山是禧雲自研的數據遷移平臺,經過在移山的配置操做,能夠方便的實現第三方數據接入、實時數據同步、異構數據源間遷移。
- 本文主要介紹移山數據遷移平臺對異構數據源間遷移這塊的實現思路和圖形化配置實現流程。
能夠理解爲:指數據結構、存取方式、形式不同的多個數據源:前端
在創建數據倉庫的過程當中,會有大量的 ETL(Extract、Transform、Load)工做,處理數據抽取、數據轉換、數據裝載的過程當中會有須要多種異構數據源(txt、Hbase、HDFS、Mysql、SqlServer等)遷移的場景。vue
數據中的一個字段被轉移到目標數據字段中的一個過程。
好比:python
備註git
好比 數據報表系統 B 須要使用 業務系統 A 中的某一張表的數據用來作數據報表展示時,須要保證過多的數據查詢、數據聚合處理不會影響該 系統 A 的正常業務。
解決數據湖(Hbase)中的數據遷移至數據倉庫各層及關係型數據庫之間的數據遷移問題。
因爲要支持異構數據源的遷移,【移山】採用了在阿里內部被普遍使用的離線數據同步工具 DataX。github
DataX 是一個異構數據源離線同步工具,致力於實現包括關係型數據庫(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各類異構數據源之間穩定高效的數據同步功能。sql
DataX自己做爲離線數據同步框架,採用Framework + plugin架構構建。將數據源讀取和寫入抽象成爲Reader/Writer插件,歸入到整個同步框架中。shell
DataX目前已經有了比較全面的插件體系,主流的 RDBMS 數據庫、NOSQL、大數據計算系統都已經接入:
備註數據庫
針對前面提到的兩種場景,咱們開發了數據遷移功能,如下爲【移山】中數據遷移的主要實現流程。json
Vue.js + Element UI + vue-socket + codemirror
SpringBoot
python + Flask + gunicorn + supervisor + Celery
拿建立一個 HbaseReader 插件爲例,配置樣例以下:瀏覽器
{ "name": "hbase11xreader", "parameter": { "hbaseConfig": { "hbase.rootdir": "hdfs://test/test1", "hbase.cluster.distributed": "true", "hbase.zookeeper.quorum": "", }, "table": "table1", "encoding": "utf-8", "mode": "normal", "column": [{ "name": "info:column1", "type": "string" }, { "name": "info:column2", "type": "string" }, { "name": "info:column3", "type": "string" } ], "range": { "startRowkey": "", "endRowkey": "", "isBinaryRowkey": true } } }
備註
移山平臺展現插件模板的時候使用了 codemirror 在線代碼編輯器,能夠友好的展現 JSON 格式的模板數據,效果以下:
總結
在準備好了數據遷移任務所需使用的 Reader/Writer 插件後,就能夠經過移山去建立遷移任務了,其實就是經過圖形化配置去生成一個完整的 Job 配置文件,爲運行任務作準備。
該步驟主要配置任務併發數,髒數據限制信息。
備註
備註
從下拉列表裏選擇要使用的 Writer 插件,一樣配置內容會自動匹配顯示,一樣能夠對配置內容進行編輯,這裏再也不截圖。
經過前面步驟的配置,完整的 Job 配置文件已經成型, 在任務保存以前,會將該配置文件完整的展現出來,方便使用者對以前步驟裏的設置數據進行檢查,格式以下:
3.1.1 創建Websocket 服務
點擊【執行】按鈕,前端會發出執行任務的請求,瀏覽器與 DataX 服務會經過 Websocket 服務創建鏈接,方便前端實時拿到 DataX 服務產生的任務執行結果:
import io from 'vue-socket.io' Vue.use(io, domain.socketUrl)
3.1.2 將Job配置數據傳給 DataX 服務:
socket.emit('action', jobConfig);
3.2.1 接收數據,生成 json 配置文件
從 data 中取出 Job 配置數據,並將該數據寫入到一個 .json 文件裏:
job_config = data['job_config'] # 生成臨時執行文件 file_name = '/tmp/' + str(job_id) + '.json' with open(file_name, 'w') as f: f.write(job_content) f.close()
3.2.2 拼接執行命令
command = DATAX\_ROOT + ' data.py ' + file\_name
3.2.3 執行任務
利用 subprocess 的 Popen 用法來執行命令:
child_process = subprocess.Popen( command, stdin=subprocess.PIPE, stderr=subprocess.PIPE, stdout=subprocess.PIPE, shell=True)
3.2.4 經過 stdout 獲取執行信息
while child_process.poll() is None: line = child_process.stdout.readline() line = line.strip() + '\n'
3.2.5 將執行結果傳送給 Websocket 客戶端
經過socket.emit('action',line)
方法,將日誌傳送給 Websocket 客戶端(瀏覽器):
if line: emit('execute_info', line) if child_process.returncode == 0: emit('execute_info', '執行成功!') else: emit('execute_info', '執行失敗!')
前端利用 socket.on 方法接收任務執行日誌:
this.$socket.on('execute_info', (info) => { // TODO 拿到執行信息,並展現 })
在監控管理界面,能夠對當天全部的遷移任務進行監控:
歡迎你們關注個人微信公衆號閱讀更多文章: