DataX 是阿里巴巴集團內被普遍使用的離線數據同步工具/平臺,實現包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各類異構數據源之間高效的數據同步功能。html
若是想進一步瞭解 DataX ,請進一步查看 DataX 詳細介紹 。git
DataX 支持多種數據庫的讀寫, json 格式配置文件很容易編寫, 同步性能很好, 一般能夠達到每秒鐘 1 萬條記錄或者更高, 能夠說是至關優秀的產品, 可是缺少對增量更新的內置支持。github
其實增量更新很是簡單, 只要從目標數據庫讀取一個最大值的記錄, 多是 DateTime
或者 RowVersion
類型, 而後根據這個最大值對源數據庫要同步的表進行過濾, 而後再進行同步便可。sql
因爲 DataX 支持多種數據庫的讀寫, 一種相對簡單而且可靠的思路就是:docker
接下來就用 shell 腳原本一步一步實現增量更新。shell
個人同步環境是從 SQLServer 同步到 PostgreSQL , 部分配置以下:數據庫
{ "job": { "content": [ { "reader": { "name": "sqlserverreader", "parameter": { "username": "...", "password": "...", "connection": [ { "jdbcUrl": [ "jdbc:sqlserver://[source_server];database=[source_db]" ], "querySql": [ "SELECT DataTime, PointID, DataValue FROM dbo.Minutedata WHERE 1=1" ] } ] } }, "writer": { "name": "postgresqlwriter", "parameter": { "username": "...", "password": "...", "connection": [ { "jdbcUrl": "jdbc:postgresql://[target_server]:5432/[target_db]", "table": [ "public.minute_data" ] } ], "column": [ "data_time", "point_id", "data_value" ], "preSql": [ "TRUNCATE TABLE @table" ] } } } ], "setting": { } } }
更多的配置能夠參考 SqlServerReader 插件文檔以及 PostgresqlWriter 插件文檔。json
要實現增量更新, 首先要 PostgresqlReader 從目標數據庫讀取最大日期, 並用 TextFileWriter 寫入到一個 csv 文件, 這一步個人配置以下所示:bash
{ "job": { "content": [ { "reader": { "name": "postgresqlreader", "parameter": { "connection": [ { "jdbcUrl": [ "jdbc:postgresql://[target_server]:5432/[target_db]" ], "querySql": [ "SELECT max(data_time) FROM public.minute_data" ] } ], "password": "...", "username": "..." } }, "writer": { "name": "txtfilewriter", "parameter": { "dateFormat": "yyyy-MM-dd HH:mm:ss", "fileName": "minute_data_max_time_result", "fileFormat": "csv", "path": "/scripts/", "writeMode": "truncate" } } } ], "setting": { } } }
更多的配置能夠看考 PostgresqlDataReader 插件文檔以及 TextFileWriter 插件文檔工具
有了這兩個配置文件, 如今能夠編寫增量同步的 shell 文件, 內容以下:
#!/bin/bash ### every exit != 0 fails the script set -e # 獲取目標數據庫最大數據時間,並寫入一個 csv 文件 docker run --interactive --tty --rm --network compose --volume $(pwd):/scripts \ beginor/datax:3.0 \ /scripts/minute_data_max_time.json if [ $? -ne 0 ]; then echo "minute_data_sync.sh error, can not get max_time from target db!" exit 1 fi # 找到 DataX 寫入的文本文件,並將內容讀取到一個變量中 RESULT_FILE=`ls minute_data_max_time_result_*` MAX_TIME=`cat $RESULT_FILE` # 若是最大時間不爲 null 的話, 修改所有同步的配置,進行增量更新; if [ "$MAX_TIME" != "null" ]; then # 設置增量更新過濾條件 WHERE="DataTime > '$MAX_TIME'" sed "s/1=1/$WHERE/g" minute_data.json > minute_data_tmp.json # 將第 45 行的 truncate 語句刪除; sed '45d' minute_data_tmp.json > minute_data_inc.json # 增量更新 docker run --interactive --tty --rm --network compose --volume $(pwd):/scripts \ beginor/datax:3.0 \ /scripts/minute_data_inc.json # 刪除臨時文件 rm ./minute_data_tmp.json ./minute_data_inc.json else # 所有更新 docker run --interactive --tty --rm --network compose --volume $(pwd):/scripts \ beginor/datax:3.0 \ /scripts/minute_data.json fi
在上面的 shell 文件中, 使用我製做的 DataX docker 鏡像, 使用命令 docker pull beginor/datax:3.0
便可獲取該鏡像, 當也能夠修改這個 shell 腳本直接使用 datax 命令來執行。
由於 DataX 支持多種數據庫的讀寫, 充分利用 DataX 讀取各類數據庫的能力, 減小了不少開發工做, 畢竟 DataX 的可靠性是很好的。
文章來源:https://beginor.github.io/2018/06/29/incremental-sync-with-datax.html