使用Data Lake Analytics從OSS清洗數據到AnalyticDB

前提

  • 必須是同一阿里雲region的Data Lake Analytics(DLA)到AnalyticDB的才能進行清洗操做;
  • 開通並初始化了該region的DLA服務;
  • 開通併購買了AnalyticDB的實例,實例規模和數據清洗速度強相關,與AnalyticDB的實例資源規模基本成線性比例關係。

總體執行流程示意圖:html

步驟 1:在AnalyticDB中爲DLA開通一個VPC訪問點

DLA在上海region的VPC參數信息:mysql

  • 可用區:cn-shanghai-d
  • VPC id: vpc-uf6wxkgst74es59wqareb
  • VSwitch id: vsw-uf6m7k4fcq3pgd0yjfdnm
DLA Region 可用區 VPC id VSwitch id
華東1(杭州) cn-hangzhou-g vpc-bp1g66t4f0onrvbht2et5 vsw-bp1nh5ri8di2q7tkof474
華東2(上海) cn-shanghai-d vpc-uf6wxkgst74es59wqareb vsw-uf6m7k4fcq3pgd0yjfdnm
華北2(北京) cn-beijing-g vpc-2zeawsrpzbelyjko7i0ir vsw-2zea8ct4hy4hwsrcpd52d
華南1(深圳) cn-shenzhen-a vpc-wz9622zx341dy24ozifn3 vsw-wz91ov6gj2i4u2kenpe42
華北3(張家口) cn-zhangjiakou-a vpc-8vbpi1t7c0devxwfe19sn vsw-8vbjl32xkft0ewggef6g9
新加坡 ap-southeast-a vpc-t4n3sczhu5efvwo1gsupf vsw-t4npcrmzzk64r13e3nhhm
英國(倫敦) eu-west-1a vpc-d7ovzdful8490upm8b413 vsw-d7opmgixr2h34r1975s8a

在AnalyticDB中爲DLA建立VPC的專有網絡,注意,要使用MySQL命令行鏈接AnalyticDB的經典網絡連接,執行:sql

alter database txk_cldsj set zone_id='xxx' vpc_id='xxx' vswitch_id='xxx';

其中,「zone_id」、「vpc_id」和「vswitch_id」分別填同region的DLA對應的VPC id和VSwitch id,見上表。網絡

命令執行成功後,刷新DMS for AnalyticDB控制檯頁面,應該能看到一個VPC的URL。異步

步驟 2:在AnalyticDB中建立好目標的實時表

具體AnalyticDB的建表文檔請參考:https://help.aliyun.com/document_detail/26403.htmlasync

-- 例如:

-- 目標表爲實時維度表:
CREATE DIMENSION TABLE etl_ads_db.etl_ads_dimension_table (
  col1 INT, 
  col2 STRING, 
  col3 INT, 
  col4 STRING,
  primary key (col1)
)
options (updateType='realtime');

-- 目標表爲實時分區表:
CREATE TABLE etl_ads_db.etl_ads_partition_table (
  col1 INT, 
  col2 INT, 
  col3 INT, 
  col4 INT, 
  col5 DOUBLE, 
  col6 DOUBLE, 
  col7 DOUBLE
  primary key (col1, col2, col3, col4)
)
PARTITION BY HASH KEY(col1)
PARTITION NUM 32
TABLEGROUP xxx_group
options (updateType='realtime');

步驟 3:在DLA中建立好與AnalyticDB目標表映射的表

DLA中的表名、列名與AnalyticDB目標表對應同名

這種狀況下,建表語句會比較簡單。
其中,以下參數須要指明:阿里雲

-- 目標AnalyticDB
LOCATION = 'jdbc:mysql://etl_ads_db-e85fbfe8-vpc.cn-shanghai-1.ads.aliyuncs.com:10001/etl_ads_db'

-- 目標AnalyticDB的訪問用戶名
USER='xxx'

-- 目標AnalyticDB的訪問密碼
PASSWORD='xxx'
CREATE SCHEMA `etl_dla_schema` WITH DBPROPERTIES 
( 
  CATALOG = 'ads', 
  LOCATION = 'jdbc:mysql://etl_ads_db-e85fbfe8-vpc.cn-shanghai-1.ads.aliyuncs.com:10001/etl_ads_db',
  USER='xxx',
  PASSWORD='xxx'
);

USE etl_dla_schema;

CREATE EXTERNAL TABLE etl_ads_dimension_table (
  col1 INT, 
  col2 VARCHAR(200), 
  col3 INT, 
  col4 VARCHAR(200),
  primary key (col1)
);

CREATE EXTERNAL TABLE etl_ads_partition_table (
  col1 INT, 
  col2 INT, 
  col3 INT, 
  col4 INT, 
  col5 DOUBLE, 
  col6 DOUBLE, 
  col7 DOUBLE
  primary key (col1, col2, col3, col4)
)

步驟 4:在DLA中建立表指向源OSS數據

CREATE SCHEMA oss_data_schema with DBPROPERTIES(
  LOCATION = 'oss://my_bucket/',
  catalog='oss'
);

CREATE EXTERNAL TABLE IF NOT EXISTS dla_table_1 (
    col_1 INT, 
    col_2 VARCHAR(200), 
    col_3 INT, 
    col_4 VARCHAR(200)
) 
ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' 
STORED AS TEXTFILE 
LOCATION 'oss://my_bucket/oss_table_1';


CREATE EXTERNAL TABLE IF NOT EXISTS dla_table_2 (
  col_1 INT, 
  col_2 INT, 
  col_3 INT, 
  col_4 INT, 
  col_5 DOUBLE, 
  col_6 DOUBLE, 
  col_7 DOUBLE
) 
ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' 
STORED AS TEXTFILE 
LOCATION 'oss://my_bucket/oss_table_2';

步驟 5:在DLA中執行INSERT FROM SELECT語句

INSERT FROM SELECT一般爲長時運行任務,建議經過異步執行方式:
注意:用MySQL命令行執行時,鏈接時,須要在命令行指定-c參數,用來識別MySQL語句前的hint:es5

mysql -hxxx -Pxxx -uxxx -pxxx db_name -c

示例:spa

-- 執行OSS到AnalyticDB的全量數據插入
/*+run-async=true*/
INSERT INTO etl_dla_schema.etl_dla_dimension_table 
SELECT * FROM oss_data_schema.dla_table_1;

-- 執行OSS到AnalyticDB的數據插入,包含對OSS數據的篩選邏輯
/*+run-async=true*/
INSERT INTO etl_dla_schema.etl_dla_partition_table (col_1, col_2, col_3, col_7)
SELECT col_1, col_2, col_3, col_7 
FROM oss_data_schema.dla_table_2 
WHERE col_1 > 1000 
LIMIT 10000;

注意:命令行

  • 若是在INSERT INTO子句和SELECT子句中沒有指定列信息,請確保源表和目標表的列定義順序一致,且類型對應匹配;
  • 若是在INSERT INTO子句和SELECT子句中指定了列的信息,請確保二者中的列的順序符合業務須要的匹配順序,且類型對應匹配。

若是在DMS for Data Lake Analytics控制檯執行,請選擇「異步執行」。

而後能夠從「執行歷史」 中,點擊「刷新」,查看任務的執行狀態。
異步執行INSERT FROM SELECT語句,會返回一個task id,經過這個task id,能夠輪詢任務執行狀況,若是status爲「SUCCESS」,則任務完成:

SHOW query_task WHERE id = '26c6b18b_1532588796832'

注意事項

  • AnalyticDB爲主鍵覆蓋邏輯,整個INSERT FROM SELECT的ETL任務失敗,用戶須要總體重試;
  • AnalyticDB消費數據有必定延時,在AnalyticDB端查詢寫入數據時,會有必定的延遲可見,具體延遲時間取決於AnalyticDB的資源規格;
  • 建議將ETL任務儘可能切成小的單位批次執行,好比,OSS數據200GB,在業務容許的狀況下,200GB的數據切成100個文件夾,每一個文件夾2GB數據,對應DLA中建100張表,100張表分別作ETL,單個ETL任務失敗,能夠只重試單個ETL任務;
  • ETL任務結束後,視狀況刪除DLA中的表,包括映射AnalyticDB中的表、以及指向OSS數據的表。


原文連接 本文爲雲棲社區原創內容,未經容許不得轉載。

相關文章
相關標籤/搜索