DataX-On-Hadoop即便用hadoop的任務調度器,將DataX task(Reader->Channel->Writer)調度到hadoop執行集羣上執行。這樣用戶的hadoop數據能夠經過MR任務批量上傳到MaxCompute、RDS等,不須要用戶提早安裝和部署DataX軟件包,也不須要另外爲DataX準備執行集羣。可是能夠享受到DataX已有的插件邏輯、流控限速、魯棒重試等等。html
DataX https://github.com/alibaba/DataX 是阿里巴巴集團內被普遍使用的離線數據同步工具/平臺,實現包括 MySQL、Oracle、HDFS、Hive、OceanBase、HBase、OTS、MaxCompute 等各類異構數據源之間高效的數據同步功能。 DataX同步引擎內部實現了任務的切分、調度執行能力,DataX的執行不依賴Hadoop環境。java
DataX-On-Hadoop是DataX針對Hadoop調度環境實現的版本,使用hadoop的任務調度器,將DataX task(Reader->Channel->Writer)調度到hadoop執行集羣上執行。這樣用戶的hadoop數據能夠經過MR任務批量上傳到MaxCompute等,不須要用戶提早安裝和部署DataX軟件包,也不須要另外爲DataX準備執行集羣。可是能夠享受到DataX已有的插件邏輯、流控限速、魯棒重試等等。node
目前DataX-On-Hadoop支持將Hdfs中的數據上傳到公共雲MaxCompute當中。mysql
運行DataX-On-Hadoop步驟以下:git
./bin/hadoop jar datax-jar-with-dependencies.jar com.alibaba.datax.hdfs.odps.mr.HdfsToOdpsMRJob ./bvt_case/speed.json
本例子的Hdfs Reader 和Odps Writer配置信息以下:github
{ "core": { "transport": { "channel": { "speed": { "byte": "-1", "record": "-1" } } } }, "job": { "setting": { "speed": { "byte": 1048576 }, "errorLimit": { "record": 0 } }, "content": [ { "reader": { "name": "hdfsreader", "parameter": { "path": "/tmp/test_datax/big_data*", "defaultFS": "hdfs://localhost:9000", "column": [ { "index": 0, "type": "string" }, { "index": 1, "type": "string" } ], "fileType": "text", "encoding": "UTF-8", "fieldDelimiter": "," } }, "writer": { "name": "odpswriter", "parameter": { "project": "", "table": "", "partition": "pt=1,dt=2", "column": [ "id", "name" ], "accessId": "", "accessKey": "", "truncate": true, "odpsServer": "http://service.odps.aliyun.com/api", "tunnelServer": "http://dt.odps.aliyun.com", "accountType": "aliyun" } } } ] } }
針對上面的例子,介紹幾個性能、髒數據的參數:正則表達式
做業級別的性能參數配置位置示例:算法
{ "core": { "transport": { "channel": { "speed": { "byte": "-1", "record": "-1" } } } }, "job": { "setting": { "speed": { "byte": 1048576 }, "errorLimit": { "record": 0 } }, "content": [ { "reader": {}, "writer": {} } ] } }
另外,介紹幾個變量替換、做業命名參數:sql
"path": "/tmp/test_datax/dt=${dt}/abc.txt"
任務執行時能夠配置以下傳參,使得一份配置代碼能夠屢次使用:數據庫
./bin/hadoop jar datax-jar-with-dependencies.jar com.alibaba.datax.hdfs.odps.mr.HdfsToOdpsMRJob datax.json -p "-Ddt=20170427 -Dbizdate=123" -t hdfs_2_odps_mr
-t hdfs_2_odps_mr
讀寫插件詳細配置介紹,請見後續第二、3部分。
Hdfs Reader提供了讀取分佈式文件系統數據存儲的能力。在底層實現上,Hdfs Reader獲取分佈式文件系統上文件的數據,並轉換爲DataX傳輸協議傳遞給Writer。
Hdfs Reader實現了從Hadoop分佈式文件系統Hdfs中讀取文件數據並轉爲DataX協議的功能。textfile是Hive建表時默認使用的存儲格式,數據不作壓縮,本質上textfile就是以文本的形式將數據存放在hdfs中,對於DataX而言,Hdfs Reader實現上類比TxtFileReader,有諸多類似之處。orcfile,它的全名是Optimized Row Columnar file,是對RCFile作了優化。據官方文檔介紹,這種文件格式能夠提供一種高效的方法來存儲Hive數據。Hdfs Reader利用Hive提供的OrcSerde類,讀取解析orcfile文件的數據。目前Hdfs Reader支持的功能以下:
支持textfile、orcfile、rcfile、sequence file、csv和parquet格式的文件,且要求文件內容存放的是一張邏輯意義上的二維表。
支持多種類型數據讀取(使用String表示),支持列裁剪,支持列常量。
支持遞歸讀取、支持正則表達式("*"和"?")。
支持orcfile數據壓縮,目前支持SNAPPY,ZLIB兩種壓縮方式。
支持sequence file數據壓縮,目前支持lzo壓縮方式。
多個File能夠支持併發讀取。
csv類型支持壓縮格式有:gzip、bz二、zip、lzo、lzo_deflate、snappy。
咱們暫時不能作到:
{ "core": { "transport": { "channel": { "speed": { "byte": "-1048576", "record": "-1" } } } }, "job": { "setting": { "speed": { "byte": 1048576 }, "errorLimit": { "record": 0 } }, "content": [ { "reader": { "name": "hdfsreader", "parameter": { "path": "/tmp/test_datax/*", "defaultFS": "hdfs://localhost:9000", "column": [ { "index": 0, "type": "string" }, { "index": 1, "type": "string" } ], "fileType": "text", "encoding": "UTF-8", "fieldDelimiter": "," } }, "writer": {} } ] } }
path
描述:要讀取的文件路徑,若是要讀取多個文件,可使用正則表達式"*",注意這裏能夠支持填寫多個路徑。
當指定通配符,HdfsReader嘗試遍歷出多個文件信息。例如: 指定/*表明讀取/目錄下全部的文件,指定/yixiao/*表明讀取yixiao目錄下全部的文件。HdfsReader目前只支持"*"和"?"做爲文件通配符。
特別須要注意的是,DataX會將一個做業下同步的全部的文件視做同一張數據表。用戶必須本身保證全部的File可以適配同一套schema信息。而且提供給DataX權限可讀。
必選:是
默認值:無
defaultFS
fileType
描述:文件的類型,目前只支持用戶配置爲"text"、"orc"、"rc"、"seq"、"csv"。
text表示textfile文件格式
orc表示orcfile文件格式
rc表示rcfile文件格式
seq表示sequence file文件格式
csv表示普通hdfs文件格式(邏輯二維表)
特別須要注意的是,HdfsReader可以自動識別文件是orcfile、rcfile、sequence file仍是textfile或csv類型的文件,該項是必填項,HdfsReader在作數據同步以前,會檢查用戶配置的路徑下全部須要同步的文件格式是否和fileType一致,若是不一致則會拋出異常
另外須要注意的是,因爲textfile和orcfile是兩種徹底不一樣的文件格式,因此HdfsReader對這兩種文件的解析方式也存在差別,這種差別致使hive支持的複雜複合類型(好比map,array,struct,union)在轉換爲DataX支持的String類型時,轉換的結果格式略有差別,好比以map類型爲例:
orcfile map類型經hdfsreader解析轉換成datax支持的string類型後,結果爲"{job=80, team=60, person=70}"
textfile map類型經hdfsreader解析轉換成datax支持的string類型後,結果爲"job:80,team:60,person:70"
從上面的轉換結果能夠看出,數據自己沒有變化,可是表示的格式略有差別,因此若是用戶配置的文件路徑中要同步的字段在Hive中是複合類型的話,建議配置統一的文件格式。
若是須要統一複合類型解析出來的格式,咱們建議用戶在hive客戶端將textfile格式的表導成orcfile格式的表
column
描述:讀取字段列表,type指定源數據的類型,index指定當前列來自於文本第幾列(以0開始),value指定當前類型爲常量,不從源頭文件讀取數據,而是根據value值自動生成對應的列。
默認狀況下,用戶能夠所有按照string類型讀取數據,配置以下:
用戶能夠指定column字段信息,配置以下:
對於用戶指定column信息,type必須填寫,index/value必須選擇其一。
"column": ["*"]
{ "type": "long", "index": 0 //從本地文件文本第一列獲取int字段 }, { "type": "string", "value": "alibaba" //HdfsReader內部生成alibaba的字符串字段做爲當前字段 }
必選:是
默認值:所有按照string類型讀取
fieldDelimiter
另外須要注意的是,HdfsReader在讀取textfile數據時,須要指定字段分割符,若是不指定默認爲',',HdfsReader在讀取orcfile時,用戶無需指定字段分割符,hive自己的默認分隔符爲 "\u0001";若你想將每一行做爲目的端的一列,分隔符請使用行內容不存在的字符,好比不可見字符"\u0001" ,分隔符不能使用\n
encoding
nullFormat
描述:文本文件中沒法使用標準字符串定義null(空指針),DataX提供nullFormat定義哪些字符串能夠表示爲null。
例如若是用戶配置: nullFormat:"\N",那麼若是源頭數據是"\N",DataX視做null字段。
必選:否
默認值:無
compress
csvReaderConfig
默認值:無
常見配置:
"csvReaderConfig":{ "safetySwitch": false, "skipEmptyRecords": false, "useTextQualifier": false }
全部配置項及默認值,配置時 csvReaderConfig 的map中請嚴格按照如下字段名字進行配置:
hadoopConfig
描述:hadoopConfig裏能夠配置與Hadoop相關的一些高級參數,好比HA的配置。
"hadoopConfig":{ "dfs.nameservices": "testDfs", "dfs.ha.namenodes.testDfs": "namenode1,namenode2", "dfs.namenode.rpc-address.youkuDfs.namenode1": "", "dfs.namenode.rpc-address.youkuDfs.namenode2": "", "dfs.client.failover.proxy.provider.testDfs": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" }
必選:否
默認值:無
minInputSplitSize
若是用戶同步的hdfs文件是rcfile,因爲rcfile底層存儲的時候不一樣的數據類型存儲方式不同,而HdfsReader不支持對Hive元數據數據庫進行訪問查詢,所以須要用戶在column type裏指定該column在hive表中的數據類型,好比該column是bigint型。那麼type就寫爲bigint,若是是double型,則填寫double,若是是float型,則填寫float。注意:若是是varchar或者char類型,則須要填寫字節數,好比varchar(255),char(30)等,跟hive表中該字段的類型保持一致,或者也能夠填寫string類型。
若是column配置的是*,會讀取全部column,那麼datax會默認以string類型讀取全部column,此時要求column中的類型只能爲String,CHAR,VARCHAR中的一種。
RCFile中的類型默認會轉成DataX支持的內部類型,對照表以下:
RCFile在Hive表中的數據類型 | DataX 內部類型 |
---|---|
TINYINT,SMALLINT,INT,BIGINT | Long |
FLOAT,DOUBLE,DECIMAL | Double |
String,CHAR,VARCHAR | String |
BOOLEAN | Boolean |
Date,TIMESTAMP | Date |
Binary | Binary |
若是column配置的是*, 會讀取全部列; 此時Datax會默認以String類型讀取全部列. 若是列中出現Double等類型的話, 所有將轉換爲String類型。若是column配置讀取特定的列的話, DataX中的類型和Parquet文件類型的對應關係以下:
Parquet格式文件的數據類型 | DataX 內部類型 |
---|---|
int32, int64, int96 | Long |
float, double | Double |
binary | Binary |
boolean | Boolean |
fixed_len_byte_array | String |
textfile,orcfile,sequencefile:
因爲textfile和orcfile文件表的元數據信息由Hive維護並存放在Hive本身維護的數據庫(如mysql)中,目前HdfsReader不支持對Hive元數據數據庫進行訪問查詢,所以用戶在進行類型轉換的時候,必須指定數據類型,若是用戶配置的column爲"*",則全部column默認轉換爲string類型。HdfsReader提供了類型轉換的建議表以下:
DataX 內部類型 | Hive表 數據類型 |
---|---|
Long | TINYINT,SMALLINT,INT,BIGINT |
Double | FLOAT,DOUBLE |
String | String,CHAR,VARCHAR,STRUCT,MAP,ARRAY,UNION,BINARY |
Boolean | BOOLEAN |
Date | Date,TIMESTAMP |
其中:
特別提醒:
Hive在建表的時候,能夠指定分區partition,例如建立分區partition(day="20150820",hour="09"),對應的hdfs文件系統中,相應的表的目錄下則會多出/20150820和/09兩個目錄,且/20150820是/09的父目錄。瞭解了分區都會列成相應的目錄結構,在按照某個分區讀取某個表全部數據時,則只需配置好json中path的值便可。
好比須要讀取表名叫mytable01下分區day爲20150820這一天的全部數據,則配置以下:
"path": "/user/hive/warehouse/mytable01/20150820/*"
ODPSWriter插件用於實現往ODPS(即MaxCompute)插入或者更新數據,主要提供給etl開發同窗將業務數據導入MaxCompute,適合於TB,GB數量級的數據傳輸。在底層實現上,根據你配置的 項目 / 表 / 分區 / 表字段 等信息,經過 Tunnel寫入 MaxCompute 中。支持MaxCompute中如下數據類型:BIGINT、DOUBLE、STRING、DATATIME、BOOLEAN。下面列出ODPSWriter針對MaxCompute類型轉換列表:
DataX 內部類型 | MaxCompute 數據類型 |
---|---|
Long | bigint |
Double | double |
String | string |
Date | datetime |
Boolean | bool |
在底層實現上,ODPSWriter是經過MaxCompute Tunnel寫入MaxCompute系統的,有關MaxCompute的更多技術細節請參看 MaxCompute主站: https://www.aliyun.com/product/odps
{ "core": { "transport": { "channel": { "speed": { "byte": "-1048576", "record": "-1" } } } }, "job": { "setting": { "speed": { "byte": 1048576 }, "errorLimit": { "record": 0 } }, "content": [ { "reader": {}, "writer": { "name": "odpswriter", "parameter": { "project": "", "table": "", "partition": "pt=1,dt=2", "column": [ "col1", "col2" ], "accessId": "", "accessKey": "", "truncate": true, "odpsServer": "http://service.odps.aliyun.com/api", "tunnelServer": "http://dt.odps.aliyun.com", "accountType": "aliyun" } } } ] } }
accessId
accessKey
project
table
partition
column
truncate
描述:ODPSWriter經過配置"truncate": true,保證寫入的冪等性,即當出現寫入失敗再次運行時,ODPSWriter將清理前述數據,並導入新數據,這樣能夠保證每次重跑以後的數據都保持一致。
truncate選項不是原子操做!MaxCompute SQL沒法作到原子性。所以當多個任務同時向一個Table/Partition清理分區時候,可能出現併發時序問題,請務必注意!針對這類問題,咱們建議儘可能不要多個做業DDL同時操做同一份分區,或者在多個併發做業啓動前,提早建立分區。
必選:是
默認值:無
odpsServer
線上公網地址爲 http://service.cn.maxcompute.aliyun.com/api
tunnelServer
線上公網地址爲 http://dt.cn-beijing.maxcompute.aliyun-inc.com
blockSizeInMB
原文連接 本文爲雲棲社區原創內容,未經容許不得轉載。