數據採集介紹
ETL基本上就是數據採集的表明,包括數據的提取(Extract)、轉換(Transform)和加載(Load)。數據源是整個大數據平臺的上游,數據採集是數據源與數倉之間的管道。在採集過程當中針對業務場景對數據進行治理,完成數據清洗工做。html
在大數據場景下,數據源複雜、多樣,包括業務數據庫、日誌數據、圖片、視頻等多媒體數據等。數據採集形式也須要更加複雜,多樣,包括定時、實時、增量、全量等。常見的數據採集工具也多種多樣,能夠知足多種業務需求。java
一個典型的數據加載架構:
python
常見的三個數據採集場景:mysql
- 場景1:從支持FTP、SFTP、 HTTP等 協議的數據源獲取數據
- 場景2:從業務數據庫獲取數據,數據採集錄入後需支撐業務系統
- 場景3:數據源經過Kafka等消息隊列,須要實時採集數據
數據採集系統需求:git
- 數據源管理與狀態監控
- 定時、實時、全量、增量等多模式的數據採集及任務監控
- 元數據管理、數據補採及數據歸檔
經常使用數據採集工具
Sqoop
Sqoop是經常使用的關係數據庫與HDFS之間的數據導入導出工具,將導入或導出命令翻譯成MapReduce程序來實現。因此經常使用於在Hadoop和傳統的數據庫(Mysq|、Postgresq|等)進行數據的傳遞。github
能夠經過Hadoop的MapReduce把數據從關係型數據庫中導入到Hadoop集羣。使用Sqoop傳輸大量結構化或半結構化數據的過程是徹底自動化的。sql
Sqoop數據傳輸示意圖:
數據庫
Sqoop Import流程:
apache
- 獲取源數據表的MetaData信息
- 根據參數提交MapReduce任務
- 表內每行做爲一條記錄,按計劃進行數據導入
**Sqoop Export流程:***
json
- 獲取目標數據表的MetaData信息
- 根據參數提交MapReduce任務
- 對HDFS文件內每行數據按指定字符分割,導出到數據庫
Apache Flume
Apache Flume本質上是一個分佈式、可靠的、高可用的日誌收集系統,支持多種數據來源,配置靈活。Flume能夠對海量日誌進行採集,聚合和傳輸。
Flume系統分爲三個組件,分別是Source(負責數據源的讀取),Sink(負責數據的輸出),Channel(做爲數據的暫存通道),這三個組件將構成一個Agent。Flume容許用戶構建一個複雜的數據流,好比數據流經多個Agent最終落地。
Flume數據傳輸示意圖:
Flume多數據源多Agent下的數據傳輸示意圖:
Flume多Sink多Agent下的數據傳輸示意圖:
關於Flume的實操內容能夠參考:
DataX
官方文檔:
DataX是阿里開源的異構數據源離線同步工具,致力於實現關係數據庫(MySQL、Oracle等)、HDFS、Hive、ODPS、 HBase、 FTP等各類異構數據源之間高效穩定的數據同步功能。DataX將複雜的網狀的同步鏈路變成了星型數據同步鏈路,具備良好的擴展性。
網狀同步鏈路和DataX星型數據同步鏈路的對比圖:
DataX的架構示意圖:
Datax數據採集實戰
官方文檔:
到GitHub上的下載地址下載DataX,或者拉取源碼進行編譯:
將下載好的安裝包,上傳到服務器:
[root@hadoop ~]# cd /usr/local/src [root@hadoop /usr/local/src]# ls |grep datax.tar.gz datax.tar.gz [root@hadoop /usr/local/src]#
將安裝包解壓到合適的目錄下:
[root@hadoop /usr/local/src]# tar -zxvf datax.tar.gz -C /usr/local [root@hadoop /usr/local/src]# cd ../datax/ [root@hadoop /usr/local/datax]# ls bin conf job lib plugin script tmp [root@hadoop /usr/local/datax]#
執行DataX的自檢腳本:
[root@hadoop /usr/local/datax]# python bin/datax.py job/job.json ... 任務啓動時刻 : 2020-11-13 11:21:01 任務結束時刻 : 2020-11-13 11:21:11 任務總計耗時 : 10s 任務平均流量 : 253.91KB/s 記錄寫入速度 : 10000rec/s 讀出記錄總數 : 100000 讀寫失敗總數 : 0
CSV文件數據導入Hive
檢測沒問題後,接下來簡單演示一下將CSV文件中的數據導入到Hive中。咱們須要用到hdfswriter,以及txtfilereader。官方文檔:
- https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md
- https://github.com/alibaba/DataX/blob/master/txtfilereader/doc/txtfilereader.md
首先,到Hive中建立一個數據庫:
0: jdbc:hive2://localhost:10000> create database db01; No rows affected (0.315 seconds) 0: jdbc:hive2://localhost:10000> use db01;
而後建立一張表:
create table log_dev2( id int, name string, create_time int, creator string, info string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' stored as orcfile;
當庫、表建立完成後,在HDFS中會有對應的目錄文件:
[root@hadoop ~]# hdfs dfs -ls /user/hive/warehouse/db01.db Found 1 items drwxr-xr-x - root supergroup 0 2020-11-13 11:30 /user/hive/warehouse/db01.db/log_dev2 [root@hadoop ~]#
準備測試數據:
[root@hadoop ~]# cat datax/db.csv 1,建立用戶,1554099545,hdfs,建立用戶 test 2,更新用戶,1554099546,yarn,更新用戶 test1 3,刪除用戶,1554099547,hdfs,刪除用戶 test2 4,更新用戶,1554189515,yarn,更新用戶 test3 5,刪除用戶,1554199525,hdfs,刪除用戶 test4 6,建立用戶,1554299345,yarn,建立用戶 test5
DataX經過json格式的配置文件來定義ETL任務,建立一個json文件:vim csv2hive.json
,咱們要定義的ETL任務內容以下:
{ "setting":{ }, "job":{ "setting":{ "speed":{ "channel":2 } }, "content":[ { "reader":{ "name":"txtfilereader", "parameter":{ "path":[ "/root/datax/db.csv" ], "encoding":"UTF-8", "column":[ { "index":0, "type":"long" }, { "index":1, "type":"string" }, { "index":2, "type":"long" }, { "index":3, "type":"string" }, { "index":4, "type":"string" } ], "fieldDelimiter":"," } }, "writer":{ "name":"hdfswriter", "parameter":{ "defaultFS":"hdfs://192.168.243.161:8020", "fileType":"orc", "path":"/user/hive/warehouse/db01.db/log_dev2", "fileName":"log_dev2.csv", "column":[ { "name":"id", "type":"int" }, { "name":"name", "type":"string" }, { "name":"create_time", "type":"INT" }, { "name":"creator", "type":"string" }, { "name":"info", "type":"string" } ], "writeMode":"append", "fieldDelimiter":",", "compress":"NONE" } } } ] } }
- datax使用json做爲配置文件,文件能夠是本地的也能夠是遠程http服務器上面
- json配置文件最外層是一個
job
,job
包含setting
和content
兩部分,其中setting
用於對整個job
進行配置,content
是數據的源和目的 setting
:用於設置全局channe|配置,髒數據配置,限速配置等,本例中只配置了channel個數1,也就是使用單線程執行數據傳輸content
:- reader:配置從哪裏讀數據
name
:插件名稱,須要和工程中的插件名保持-致parameter
:插件對應的輸入參數path
:源數據文件的路徑encoding
:數據編碼fieldDelimiter
:數據分隔符column
:源數據按照分隔符分割以後的位置和數據類型
- writer:配置將數據寫到哪裏去
name
:插件名稱,須要和工程中的插件名保持一致parameter
:插件對應的輸入參數path
:目標路徑fileName
:目標文件名前綴writeMode
:寫入目標目錄的方式
- reader:配置從哪裏讀數據
經過DataX的Python腳本執行咱們定義的ETL任務:
[root@hadoop ~]# python /usr/local/datax/bin/datax.py datax/csv2hive.json ... 任務啓動時刻 : 2020-11-15 11:10:20 任務結束時刻 : 2020-11-15 11:10:32 任務總計耗時 : 12s 任務平均流量 : 17B/s 記錄寫入速度 : 0rec/s 讀出記錄總數 : 6 讀寫失敗總數 : 0
查看HDFS中是否已存在相應的數據文件:
[root@hadoop ~]# hdfs dfs -ls /user/hive/warehouse/db01.db/log_dev2 Found 1 items -rw-r--r-- 3 root supergroup 825 2020-11-15 11:10 /user/hive/warehouse/db01.db/log_dev2/log_dev2.csv__f19a135d_6c22_4988_ae69_df39354acb1e [root@hadoop ~]#
到Hive中驗證導入的數據是否符合預期:
0: jdbc:hive2://localhost:10000> use db01; No rows affected (0.706 seconds) 0: jdbc:hive2://localhost:10000> show tables; +-----------+ | tab_name | +-----------+ | log_dev2 | +-----------+ 1 row selected (0.205 seconds) 0: jdbc:hive2://localhost:10000> select * from log_dev2; +--------------+----------------+-----------------------+-------------------+----------------+ | log_dev2.id | log_dev2.name | log_dev2.create_time | log_dev2.creator | log_dev2.info | +--------------+----------------+-----------------------+-------------------+----------------+ | 1 | 建立用戶 | 1554099545 | hdfs | 建立用戶 test | | 2 | 更新用戶 | 1554099546 | yarn | 更新用戶 test1 | | 3 | 刪除用戶 | 1554099547 | hdfs | 刪除用戶 test2 | | 4 | 更新用戶 | 1554189515 | yarn | 更新用戶 test3 | | 5 | 刪除用戶 | 1554199525 | hdfs | 刪除用戶 test4 | | 6 | 建立用戶 | 1554299345 | yarn | 建立用戶 test5 | +--------------+----------------+-----------------------+-------------------+----------------+ 6 rows selected (1.016 seconds) 0: jdbc:hive2://localhost:10000>
MySQL數據導入Hive
接下來演示一下將MySQL數據導入Hive中。爲了實現該功能,咱們須要使用到mysqlreader來從MySQL中讀取數據,其官方文檔以下:
首先,執行以下SQL構造一些測試數據:
CREATE DATABASE datax_test; USE `datax_test`; CREATE TABLE `dev_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `name` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL, `create_time` int(11) DEFAULT NULL, `creator` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL, `info` varchar(2000) COLLATE utf8_unicode_ci DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1069 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci; insert into `dev_log`(`id`,`name`,`create_time`,`creator`,`info`) values (1,'建立用戶',1554099545,'hdfs','建立用戶 test'), (2,'更新用戶',1554099546,'yarn','更新用戶 test1'), (3,'刪除用戶',1554099547,'hdfs','刪除用戶 test2'), (4,'更新用戶',1554189515,'yarn','更新用戶 test3'), (5,'刪除用戶',1554199525,'hdfs','刪除用戶 test4'), (6,'建立用戶',1554299345,'yarn','建立用戶 test5');
而後到Hive的db01
數據庫中再建立一張表:
create table log_dev( id int, name string, create_time int, creator string, info string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' stored as textfile;
建立ETL任務的配置文件:
[root@hadoop ~]# vim datax/mysql2hive.json
文件內容以下:
{ "job":{ "setting":{ "speed":{ "channel":3 }, "errorLimit":{ "record":0, "percentage":0.02 } }, "content":[ { "reader":{ "name":"mysqlreader", "parameter":{ "username":"root", "password":"123456a.", "column":[ "id", "name", "create_time", "creator", "info" ], "where":"creator='${creator}' and create_time>${create_time}", "connection":[ { "table":[ "dev_log" ], "jdbcUrl":[ "jdbc:mysql://192.168.1.11:3306/datax_test?serverTimezone=Asia/Shanghai" ] } ] } }, "writer":{ "name":"hdfswriter", "parameter":{ "defaultFS":"hdfs://192.168.243.161:8020", "fileType":"text", "path":"/user/hive/warehouse/db01.db/log_dev", "fileName":"log_dev3.csv", "column":[ { "name":"id", "type":"int" }, { "name":"name", "type":"string" }, { "name":"create_time", "type":"INT" }, { "name":"creator", "type":"string" }, { "name":"info", "type":"string" } ], "writeMode":"append", "fieldDelimiter":",", "compress":"GZIP" } } } ] } }
- mysqlreader支持傳入
where
條件來過濾須要讀取的數據,具體參數能夠在執行datax腳本時傳入,咱們能夠經過這種變量替換的方式實現增量同步的支持
mysqlreader默認的驅動包是5.x的,因爲我這裏的MySQL版本是8.x,因此須要替換一下mysqlreader中的驅動包:
[root@hadoop ~]# cp /usr/local/src/mysql-connector-java-8.0.21.jar /usr/local/datax/plugin/reader/mysqlreader/libs/ [root@hadoop ~]# rm -rf /usr/local/datax/plugin/reader/mysqlreader/libs/mysql-connector-java-5.1.34.jar
而後執行該ETL任務:
[root@hadoop ~]# python /usr/local/datax/bin/datax.py datax/mysql2hive.json -p "-Dcreator=yarn -Dcreate_time=1554099547" ... 任務啓動時刻 : 2020-11-15 11:38:14 任務結束時刻 : 2020-11-15 11:38:25 任務總計耗時 : 11s 任務平均流量 : 5B/s 記錄寫入速度 : 0rec/s 讀出記錄總數 : 2 讀寫失敗總數 : 0
查看HDFS中是否已存在相應的數據文件:
[root@hadoop ~]# hdfs dfs -ls /user/hive/warehouse/db01.db/log_dev Found 1 items -rw-r--r-- 3 root supergroup 84 2020-11-15 11:38 /user/hive/warehouse/db01.db/log_dev/log_dev3.csv__d142f3ee_126e_4056_af49_b56e45dec1ef.gz [root@hadoop ~]#
到Hive中驗證導入的數據是否符合預期:
0: jdbc:hive2://localhost:10000> select * from log_dev; +-------------+---------------+----------------------+------------------+---------------+ | log_dev.id | log_dev.name | log_dev.create_time | log_dev.creator | log_dev.info | +-------------+---------------+----------------------+------------------+---------------+ | 4 | 更新用戶 | 1554189515 | yarn | 更新用戶 test3 | | 6 | 建立用戶 | 1554299345 | yarn | 建立用戶 test5 | +-------------+---------------+----------------------+------------------+---------------+ 2 rows selected (0.131 seconds) 0: jdbc:hive2://localhost:10000>
數據治理簡介
將數據採集到數倉後所面臨的問題:
- 相比傳統數倉大數據時代數據更加多樣、更加複雜、數據量更大
- 隨處可見的數據不統1、難以提高的數據質量、難以完成的數據模型梳理
- 多種採集工具、多種存儲方式使數據倉庫or數據湖逐漸變成數據沼澤
數據治理須要解決的問題:
- 數據不可知:用戶不知道有哪些數據、不知道數據和業務的關係
- 數據不可控:沒有統一的數據標準,數據沒法集成和統一
- 數據不可取:用戶不能便捷的取到數據,或者取到的數據不可用
- 數據不可聯:數據之間的關係沒有體現出來,數據深層價值沒法體現
數據治理的目標:
- 創建統一數據標準與數據規範,保障數據質量
- 制定數據管理流程,把控數據整個生命週期
- 造成平臺化工具,提供給用戶使用
數據治理:
- 數據治理包括元數據管理、數據質量管理、數據血緣管理等
- 數據治理在數據採集、數據清洗、數據計算等各個環節
- 數據治理可貴不是技術,而是流程、協同和管理
元數據管理:
- 管理數據的庫表結構等schema信息
- 數據存儲空間、讀寫記錄、權限歸屬及其餘各種統計信息
數據血緣管理:
- 數據之間的血緣關係及生命週期
- B表的數據從A表彙總而來,那麼B和A表就具備血緣關係
- 數據的業務屬性信息和業務數據模型
數據治理步驟簡述:
- 統一數據規範和數據定義,打通業務模型和技術模型
- 提高數據質量,實現數據全生命週期管理
- 挖掘數據價值,幫助業務人員便捷靈活的使用數據
數據治理與周邊系統:
- ODS、DWD、DM等各層次元數據歸入數據治理平臺集中管理
- 數據採集及處理流程中產生的元數據歸入數據治理平臺,並創建血緣關係
- 提供數據管理的服務接口,數據模型變動及時通知上下游
Apache Atlas數據治理
常見的數據治理工具:
- Apache Atlas:Hortonworks主推的數據治理開源項目
- Metacat:Netflix開源的元數據管理、數據發現組件
- Navigator:Cloudera提供的數據管理的解決方案
- WhereHows:LinkedIn內部使用並開源的數據管理解決方案
Apache Altas:
- 數據分類:自動捕獲、定義和註釋元數據,對數據進行業務導向分類
- 集中審計:捕獲全部步驟、應用及數據交互的訪問信息
- 搜索與血緣:基於分類和審計關聯數據與數據的關係,並經過可視化的方式展示
Apache Altas架構圖:
- Type System:對須要管理的元數據對象抽象的實體,由類型構成
- Ingest\Export:元數據的自動採集和導出工具,Export能夠做 爲事件進行觸發,使用戶能夠及時響應
- Graph Engine:經過圖數據庫和圖計算弓|擎展示數據之間的關係
元數據捕獲:
- Hook:來自各個組件的Hook自動捕獲數據進行存儲
- Entity:集成的各個系統在操做時觸發事件進行寫入
- 獲取元數據的同時,獲取數據之間的關聯關係,構建血緣