數棧技術分享:OTS數據遷移——咱們不生產數據,咱們是大數據的搬運工

數棧是雲原生—站式數據中臺PaaS,咱們在github和gitee上有一個有趣的開源項目:FlinkX,FlinkX是一個基於Flink的批流統一的數據同步工具,既能夠採集靜態的數據,也能夠採集實時變化的數據,是全域、異構、批流一體的數據同步引擎。你們喜歡的話請給咱們點個star!star!star!
html

github開源項目:https://github.com/DTStack/flinkxpython

gitee開源項目:https://gitee.com/dtstack_dev_0/flinkxgit


「表格存儲」是 NoSQL 的數據存儲服務,是基於雲計算技術構建的一個分佈式結構化和半結構化數據的存儲和管理服務。github

表格存儲的數據模型以「二維表」爲中心。數據庫

表有行和列的概念,可是與傳統數據庫不同,表格存儲的表是稀疏的json

每一行能夠有不一樣的列,能夠動態增長或者減小屬性列,建表時不須要爲表的屬性列定義嚴格的 schema。bash

1、概述

OTS的數據遷移可使用「DataX」完成全量數據遷移。但因爲部分數據表的數據量較大,沒法在指定的時間窗口內完成全量遷移,且目前DataX只能針對主鍵值進行範圍查詢,暫不支持按照屬性列範圍抽取數據。app

因此能夠按以下兩種方式實現全量+增量的數據遷移:jvm

  • 分區鍵包含範圍信息(如時間信息、自增ID),則以指定range爲切分點,分批次遷移。
  • 分區鍵不包含範圍信息,則能夠採用在應用側雙寫的模式將數據分批次遷移,寫入目標環境同一張業務表。利用OTS的主鍵惟一性,選擇對重複數據執行覆蓋原有行的策略來保證數據惟一性。

總而言之,言而總之,咱們不生產數據,此刻,咱們是大數據的搬運工。分佈式

接下來呢,本文就以應用側調整爲雙寫模式爲例,詳細說明OTS數據遷移、校驗過程。

其中OTS數據遷移流程具體以下圖所示:

OTS數據遷移之準備工做

  • 預遷移階段:雙寫模式中的大表全量遷移
  • 正式遷移階段:雙寫模式中的增量表全量遷移、其他小表的全量遷移

2、預遷移階段

一、 準備工做

爲保證新老環境的數據一致性,須要在開始數據遷移前,對目標環境的OTS數據表進行數據清空操做,Delete操做是經過DataX工具直接刪除表內數據,無需從新建表。

具體操做以下:

1) 配置DataX任務

在使用DataX執行數據清空前,需配置對應數據表使用DataX執行Delete任務所需的json文件。在清空數據的配置中,reader與writer均配置目標端的鏈接信息,且數據寫入模式配置DeleteRow便可,具體內容以下:

{
    "job": {
        "setting": {
            "speed": {
                "channel": "5"
            }
        },
        "content": [{
            "reader": {
                "name": "otsreader",
                "parameter": {
                    "endpoint": "http://xxx.vpc.ots.yyy.com/",
                    "accessId": "dest_accessId",
                    "accessKey": "dest_accessKey",
                    "instanceName": " dest_instanceName",
                    "table": " tablename ",
                    "column": [{
                            "name": "xxxxx"
                        },
                        {
                            "name": "xxxxx"
                        }
                    ],
                    "range": {
                        "begin": [{
                            "type": "INF_MIN"
                        }],
                        "end": [{
                            "type": "INF_MAX"
                        }]
                    }
                }
            },
            "writer": {
                "name": "otswriter",
                "parameter": {
                    "endpoint": "http://xxx.vpc.ots.yun.yyy.com/",
                    "accessId": "dest_accessId",
                    "accessKey": "dest_accessKey",
                    "instanceName": " dest_instanceName",
                    "table": " tablename ",
                    "primaryKey": [{
                        "name": "xxxxx",
                        "type": "string"
                    }],
                    "column": [{
                            "name": "xxxxx",
                            "type": "string"
                        },
                        {
                            "name": "xxxxx",
                            "type": "string"
                        }
                    ],
                    "writeMode": "DeleteRow"
                }
            }
        }]
    }
}

2 )執行datax任務

  • 登陸datax所在ECS後,進入datax所在路徑
  • 在對應的工具機分別執行del_pre.sh腳本,便可開始目標環境對應表的數據清空,具體命令以下:
sh del_pre.sh

  • del_pre.sh腳本內容以下:
#!/bin/bash
nohup python datax.py del_table_1.json --jvm="-Xms16G -Xmx16G" > del_table_1.log &


二、 數據遷移

在不停服務的狀況下把源環境內數據量較大的數據表所有遷移到目標環境內對應的數據表。

1)配置DataX任務

在DataX對數據表配置相應的json文件,遷移配置的具體內容以下:

{
    "job": {
        "setting": {
            "speed": {
                "channel": "5"
            }
        },
        "content": [{
            "reader": {
                "name": "otsreader",
                "parameter": {
                    "endpoint": "http://xxx.vpc.ots.yyy.com/",
                    "accessId": "src_accessId",
                    "accessKey": "src_ accessKey ",
                    "instanceName": " src_instanceName",
                    "table": "tablename",
                    "column": [{
                            "name": "xxxxx"
                        },
                        {
                            "name": "xxxxx"
                        }
                    ],
                    "range": {
                        "begin": [{
                            "type": "INF_MIN"
                        }],
                        "end": [{
                            "type": "INF_MAX"
                        }]
                    }
                }
            },
            "writer": {
                "name": "otswriter",
                "parameter": {
                    "endpoint": "http://xxx.vpc.ots.yun.zzz.com/",
                    "accessId": "dest_accessId",
                    "accessKey": "dest_accessKey",
                    "instanceName": " dest_instanceName",
                    "table": " tablename ",
                    "primaryKey": [{
                        "name": "xxxxx",
                        "type": "string"
                    }],
                    "column": [{
                            "name": "xxxxx",
                            "type": "string"
                        },
                        {
                            "name": "xxxxx",
                            "type": "string"
                        }
                    ],
                    "writeMode": "PutRow"
                }
            }
        }]
    }
}

需注意,因爲OTS自己是NoSQL系統,在遷移數據的配置中,必須配置全部的屬性列,不然會缺失對應屬性列的值。

2) 執行datax任務

  • 登陸datax所在ECS後,進入datax所在路徑
  • 在對應的工具機分別執行pre_transfer.sh腳本,便可開始專有域OTS到專有云OTS的數據遷移,具體命令以下:
sh pre_transfer.sh

  • pre_transfer.sh腳本內容以下:
#!/bin/bash
nohup python datax.py table_1.json --jvm="-Xms16G -Xmx16G" >table_1.log &

吶,此時,萬事俱備,數據只待遷移!

在遷移以前,讓咱們最後再對焦一下數據遷移的目標:


下面,進入正式遷移階段!

3、正式遷移階段

一、OTS數據靜默

OTS的數據靜默主要是經過觀察對應表的數據是否存在變化來判斷,校驗方式主要包括行數統計、內容統計。

1)行數統計

因OTS自己不提供count接口,因此採用在hive建立OTS外部表的方式,讀取OTS數據並計算對應數據表的行數,具體操做以下:

  • 建立外部表
    啓動hive,建立上述數據表對應的外部表;爲提升統計效率,外部表能夠只讀取OTS的主鍵列,建表語句示例以下:
CREATE EXTERNAL TABLE t_oilcard_expenses_old
(h_card_no string)
STORED BY 'com.aliyun.openservices.tablestore.hive.TableStoreStorageHandler'
WITH SERDEPROPERTIES(
"tablestore.columns.mapping"="card_no")
TBLPROPERTIES ("tablestore.endpoint"="$endpoint ","tablestore.instance"="instanceName","tablestore.access_key_id"="ak","tablestore.access_key_secret"="sk","tablestore.table.name"="tableName");

  • 進入腳本所在路徑
    登陸Hadoop集羣master所在ECS,進入hive所在目錄
  • 執行行數統計
    執行pre_all_count.sh腳本,便可開始源環境內OTS對應表的行數統計
nohup sh pre_all_count.sh >pre_all_count.log &

  • pre_all_count.sh腳本內容以下:
#!/bin/bash
  ./bin/hive -e "use ots;select count(h_card_no) from tableName;" >table.rs &

連續執行兩次行數統計,若兩次統計結果一致則說明數據已經靜默,數據寫入以中止。

2)內容統計

因爲部分數據表分區鍵對應的值比較單一,致使數據所有存儲在同一個分區。若採用hive統計行數會耗時過久,因此對於這個表使用datax將OTS數據導入oss的方式進行內容統計,具體操做以下:

  • 進入腳本所在路徑
    登陸上述表格對應的ECS,進入datax所在路徑;
  • 執行內容校驗
    a、執行check_table.sh腳本,便可將源環境內OTS數據表導出到OSS;
sh check_table.sh

  • check_table.sh腳本內容以下:
#!/bin/bash
nohup python datax.py table.json --jvm="-Xms8G -Xmx8G">ots2oss01.log &

b、獲取OSS object的ETAG值,寫入對應文件table_check01.rs
連續執行兩次內容統計,對比兩次導出object的ETAG值,若結果一致則說明數據已經靜默,數據寫入以中止。

二、OTS數據遷移

1)準備工做

爲保證遷移後新老環境數據一致,防止目標環境因測試產生遺留髒數據,在進行數據遷移前,須要將目標環境的OTS的其他全量表進行數據清空。

「數據清空方式」主要有Drop、Delete,二者的區別以下:

a、Drop表操做

登陸OTS圖形化客戶端所在工具機,使用以下信息鏈接指定OTS實例,並進行對應表的drop操做;

AK: dest_accessId
SK: dest_accessKey
InstanceName: InstanceName
Endpoint:endpoint

確認刪除後,再在客戶端從新建立對應的數據。

b、 Delete表操做

Delete操做是經過DataX工具直接刪除表內數據,無需從新建表。DataX所需的配置文件參考2.1.1所示。

  • 登陸datax所在ECS後,進入datax所在路徑
  • 在對應的工具機分別執行delete腳本,便可開始目標環境OTS的對應表的數據清空,具體命令以下:
sh del_table_01.sh

  • del_table_01.sh腳本內容以下:
#!/bin/bash
nohup python datax.py del_table.json --jvm="-Xms16G -Xmx16G">del_table.log &

2)數據遷移

在源環境中止服務的狀況下把雙寫模式中的增量表全量遷移以及其他小表所有遷移到目標環境內對應的數據表。

具體操做以下:

a、配置DataX任務

在DataX對上述數據表配置相應的json文件,遷移配置的具體內容參考2.2.1,在遷移數據的配置中,須要列全全部的屬性列。

b、執行DataX任務

  • 登陸DataX所在ECS後,進入DataX所在路徑
  • 在對應的工具機分別執行transfer.sh腳本,便可開始專有域OTS到專有云OTS的數據遷移,具體命令以下:
sh transfer.sh

  • transfer.sh腳本內容以下:
#!/bin/bash
nohup python datax.py Table.json  >Table.log &

三、OTS數據校驗

新老環境OTS的數據校驗方式均包括行數統計、內容統計,具體以下:

1)源環境數據統計

源環境OTS數據表的數據量統計依據數據靜默期間最後一次的統計結果便可。

2)目標環境數據統計

a、行數統計

因OTS自己不提供count接口,且目標環境ODPS支持建立OTS外部表,因此採用在ODPS建立OTS外部表的方式,讀取OTS數據並計算對應數據表的行數,具體操做以下:

  • 建立外部表
    登陸odpscmd,建立上述數據表對應的外部表;
  • 進入腳本所在路徑
    登陸odpscmd工具所在ECS,進入odps所在路徑;
  • 執行行數統計
    執行newots_count.sh腳本,便可進行目標環境內OTS對應表的行數統計;
nohup sh newots_count.sh >newots_count.log &

  • newots_count.sh腳本內容以下:
#!/bin/bash
./bin/odpscmd -e "select count(h_card_no) from tableName;" >table.rs &

b、 內容統計

因爲源環境的部分數據表採用內容統計的方式進行數據校驗,爲了方便對比數據是否一致,因此目標環境也採用內容統計的方式,具體操做參考3.1.2。

相關文章
相關標籤/搜索