數棧是雲原生—站式數據中臺PaaS,咱們在github和gitee上有一個有趣的開源項目:FlinkX,FlinkX是一個基於Flink的批流統一的數據同步工具,既能夠採集靜態的數據,也能夠採集實時變化的數據,是全域、異構、批流一體的數據同步引擎。你們喜歡的話請給咱們點個star!star!star!
html
github開源項目:https://github.com/DTStack/flinkxpython
gitee開源項目:https://gitee.com/dtstack_dev_0/flinkxgit
「表格存儲」是 NoSQL 的數據存儲服務,是基於雲計算技術構建的一個分佈式結構化和半結構化數據的存儲和管理服務。github
表格存儲的數據模型以「二維表」爲中心。數據庫
表有行和列的概念,可是與傳統數據庫不同,表格存儲的表是稀疏的json
每一行能夠有不一樣的列,能夠動態增長或者減小屬性列,建表時不須要爲表的屬性列定義嚴格的 schema。bash
OTS的數據遷移可使用「DataX」完成全量數據遷移。但因爲部分數據表的數據量較大,沒法在指定的時間窗口內完成全量遷移,且目前DataX只能針對主鍵值進行範圍查詢,暫不支持按照屬性列範圍抽取數據。app
因此能夠按以下兩種方式實現全量+增量的數據遷移:jvm
總而言之,言而總之,咱們不生產數據,此刻,咱們是大數據的搬運工。分佈式
接下來呢,本文就以應用側調整爲雙寫模式爲例,詳細說明OTS數據遷移、校驗過程。
其中OTS數據遷移流程具體以下圖所示:
OTS數據遷移之準備工做
一、 準備工做
爲保證新老環境的數據一致性,須要在開始數據遷移前,對目標環境的OTS數據表進行數據清空操做,Delete操做是經過DataX工具直接刪除表內數據,無需從新建表。
具體操做以下:
1) 配置DataX任務
在使用DataX執行數據清空前,需配置對應數據表使用DataX執行Delete任務所需的json文件。在清空數據的配置中,reader與writer均配置目標端的鏈接信息,且數據寫入模式配置DeleteRow便可,具體內容以下:
{ "job": { "setting": { "speed": { "channel": "5" } }, "content": [{ "reader": { "name": "otsreader", "parameter": { "endpoint": "http://xxx.vpc.ots.yyy.com/", "accessId": "dest_accessId", "accessKey": "dest_accessKey", "instanceName": " dest_instanceName", "table": " tablename ", "column": [{ "name": "xxxxx" }, { "name": "xxxxx" } ], "range": { "begin": [{ "type": "INF_MIN" }], "end": [{ "type": "INF_MAX" }] } } }, "writer": { "name": "otswriter", "parameter": { "endpoint": "http://xxx.vpc.ots.yun.yyy.com/", "accessId": "dest_accessId", "accessKey": "dest_accessKey", "instanceName": " dest_instanceName", "table": " tablename ", "primaryKey": [{ "name": "xxxxx", "type": "string" }], "column": [{ "name": "xxxxx", "type": "string" }, { "name": "xxxxx", "type": "string" } ], "writeMode": "DeleteRow" } } }] } }
2 )執行datax任務
sh del_pre.sh
#!/bin/bash nohup python datax.py del_table_1.json --jvm="-Xms16G -Xmx16G" > del_table_1.log &
二、 數據遷移
在不停服務的狀況下把源環境內數據量較大的數據表所有遷移到目標環境內對應的數據表。
1)配置DataX任務
在DataX對數據表配置相應的json文件,遷移配置的具體內容以下:
{ "job": { "setting": { "speed": { "channel": "5" } }, "content": [{ "reader": { "name": "otsreader", "parameter": { "endpoint": "http://xxx.vpc.ots.yyy.com/", "accessId": "src_accessId", "accessKey": "src_ accessKey ", "instanceName": " src_instanceName", "table": "tablename", "column": [{ "name": "xxxxx" }, { "name": "xxxxx" } ], "range": { "begin": [{ "type": "INF_MIN" }], "end": [{ "type": "INF_MAX" }] } } }, "writer": { "name": "otswriter", "parameter": { "endpoint": "http://xxx.vpc.ots.yun.zzz.com/", "accessId": "dest_accessId", "accessKey": "dest_accessKey", "instanceName": " dest_instanceName", "table": " tablename ", "primaryKey": [{ "name": "xxxxx", "type": "string" }], "column": [{ "name": "xxxxx", "type": "string" }, { "name": "xxxxx", "type": "string" } ], "writeMode": "PutRow" } } }] } }
需注意,因爲OTS自己是NoSQL系統,在遷移數據的配置中,必須配置全部的屬性列,不然會缺失對應屬性列的值。
2) 執行datax任務
sh pre_transfer.sh
#!/bin/bash nohup python datax.py table_1.json --jvm="-Xms16G -Xmx16G" >table_1.log &
吶,此時,萬事俱備,數據只待遷移!
在遷移以前,讓咱們最後再對焦一下數據遷移的目標:
下面,進入正式遷移階段!
一、OTS數據靜默
OTS的數據靜默主要是經過觀察對應表的數據是否存在變化來判斷,校驗方式主要包括行數統計、內容統計。
1)行數統計
因OTS自己不提供count接口,因此採用在hive建立OTS外部表的方式,讀取OTS數據並計算對應數據表的行數,具體操做以下:
CREATE EXTERNAL TABLE t_oilcard_expenses_old (h_card_no string) STORED BY 'com.aliyun.openservices.tablestore.hive.TableStoreStorageHandler' WITH SERDEPROPERTIES( "tablestore.columns.mapping"="card_no") TBLPROPERTIES ("tablestore.endpoint"="$endpoint ","tablestore.instance"="instanceName","tablestore.access_key_id"="ak","tablestore.access_key_secret"="sk","tablestore.table.name"="tableName");
nohup sh pre_all_count.sh >pre_all_count.log &
#!/bin/bash ./bin/hive -e "use ots;select count(h_card_no) from tableName;" >table.rs &
連續執行兩次行數統計,若兩次統計結果一致則說明數據已經靜默,數據寫入以中止。
2)內容統計
因爲部分數據表分區鍵對應的值比較單一,致使數據所有存儲在同一個分區。若採用hive統計行數會耗時過久,因此對於這個表使用datax將OTS數據導入oss的方式進行內容統計,具體操做以下:
sh check_table.sh
#!/bin/bash nohup python datax.py table.json --jvm="-Xms8G -Xmx8G">ots2oss01.log &
b、獲取OSS object的ETAG值,寫入對應文件table_check01.rs
連續執行兩次內容統計,對比兩次導出object的ETAG值,若結果一致則說明數據已經靜默,數據寫入以中止。
二、OTS數據遷移
1)準備工做
爲保證遷移後新老環境數據一致,防止目標環境因測試產生遺留髒數據,在進行數據遷移前,須要將目標環境的OTS的其他全量表進行數據清空。
「數據清空方式」主要有Drop、Delete,二者的區別以下:
a、Drop表操做
登陸OTS圖形化客戶端所在工具機,使用以下信息鏈接指定OTS實例,並進行對應表的drop操做;
AK: dest_accessId SK: dest_accessKey InstanceName: InstanceName Endpoint:endpoint
確認刪除後,再在客戶端從新建立對應的數據。
b、 Delete表操做
Delete操做是經過DataX工具直接刪除表內數據,無需從新建表。DataX所需的配置文件參考2.1.1所示。
sh del_table_01.sh
#!/bin/bash nohup python datax.py del_table.json --jvm="-Xms16G -Xmx16G">del_table.log &
2)數據遷移
在源環境中止服務的狀況下把雙寫模式中的增量表全量遷移以及其他小表所有遷移到目標環境內對應的數據表。
具體操做以下:
a、配置DataX任務
在DataX對上述數據表配置相應的json文件,遷移配置的具體內容參考2.2.1,在遷移數據的配置中,須要列全全部的屬性列。
b、執行DataX任務
sh transfer.sh
#!/bin/bash nohup python datax.py Table.json >Table.log &
三、OTS數據校驗
新老環境OTS的數據校驗方式均包括行數統計、內容統計,具體以下:
1)源環境數據統計
源環境OTS數據表的數據量統計依據數據靜默期間最後一次的統計結果便可。
2)目標環境數據統計
a、行數統計
因OTS自己不提供count接口,且目標環境ODPS支持建立OTS外部表,因此採用在ODPS建立OTS外部表的方式,讀取OTS數據並計算對應數據表的行數,具體操做以下:
nohup sh newots_count.sh >newots_count.log &
#!/bin/bash ./bin/odpscmd -e "select count(h_card_no) from tableName;" >table.rs &
b、 內容統計
因爲源環境的部分數據表採用內容統計的方式進行數據校驗,爲了方便對比數據是否一致,因此目標環境也採用內容統計的方式,具體操做參考3.1.2。