大數據平臺 - 數據採集及治理

數據採集介紹

ETL基本上就是數據採集的表明,包括數據的提取(Extract)、轉換(Transform)和加載(Load)。數據源是整個大數據平臺的上游,數據採集是數據源與數倉之間的管道。在採集過程當中針對業務場景對數據進行治理,完成數據清洗工做。html

在大數據場景下,數據源複雜、多樣,包括業務數據庫、日誌數據、圖片、視頻等多媒體數據等。數據採集形式也須要更加複雜,多樣,包括定時、實時、增量、全量等。常見的數據採集工具也多種多樣,能夠知足多種業務需求。java

一個典型的數據加載架構:
大數據平臺 - 數據採集及治理
python

常見的三個數據採集場景:mysql

  • 場景1:從支持FTP、SFTP、 HTTP等 協議的數據源獲取數據
  • 場景2:從業務數據庫獲取數據,數據採集錄入後需支撐業務系統
  • 場景3:數據源經過Kafka等消息隊列,須要實時採集數據

數據採集系統需求:git

  • 數據源管理與狀態監控
  • 定時、實時、全量、增量等多模式的數據採集及任務監控
  • 元數據管理、數據補採及數據歸檔

經常使用數據採集工具

Sqoop

Sqoop是經常使用的關係數據庫與HDFS之間的數據導入導出工具,將導入或導出命令翻譯成MapReduce程序來實現。因此經常使用於在Hadoop和傳統的數據庫(Mysq|、Postgresq|等)進行數據的傳遞。github

能夠經過Hadoop的MapReduce把數據從關係型數據庫中導入到Hadoop集羣。使用Sqoop傳輸大量結構化或半結構化數據的過程是徹底自動化的。sql

Sqoop數據傳輸示意圖:
大數據平臺 - 數據採集及治理
數據庫

Sqoop Import流程:
大數據平臺 - 數據採集及治理
apache

  • 獲取源數據表的MetaData信息
  • 根據參數提交MapReduce任務
  • 表內每行做爲一條記錄,按計劃進行數據導入

**Sqoop Export流程:***
大數據平臺 - 數據採集及治理
json

  • 獲取目標數據表的MetaData信息
  • 根據參數提交MapReduce任務
  • 對HDFS文件內每行數據按指定字符分割,導出到數據庫

Apache Flume

Apache Flume本質上是一個分佈式、可靠的、高可用的日誌收集系統,支持多種數據來源,配置靈活。Flume能夠對海量日誌進行採集,聚合和傳輸。

Flume系統分爲三個組件,分別是Source(負責數據源的讀取),Sink(負責數據的輸出),Channel(做爲數據的暫存通道),這三個組件將構成一個Agent。Flume容許用戶構建一個複雜的數據流,好比數據流經多個Agent最終落地。

Flume數據傳輸示意圖:
大數據平臺 - 數據採集及治理

Flume多數據源多Agent下的數據傳輸示意圖:
大數據平臺 - 數據採集及治理

Flume多Sink多Agent下的數據傳輸示意圖:
大數據平臺 - 數據採集及治理

關於Flume的實操內容能夠參考:

DataX

官方文檔:

DataX是阿里開源的異構數據源離線同步工具,致力於實現關係數據庫(MySQL、Oracle等)、HDFS、Hive、ODPS、 HBase、 FTP等各類異構數據源之間高效穩定的數據同步功能。DataX將複雜的網狀的同步鏈路變成了星型數據同步鏈路,具備良好的擴展性。

網狀同步鏈路和DataX星型數據同步鏈路的對比圖:
大數據平臺 - 數據採集及治理

DataX的架構示意圖:
大數據平臺 - 數據採集及治理


Datax數據採集實戰

官方文檔:

到GitHub上的下載地址下載DataX,或者拉取源碼進行編譯:

將下載好的安裝包,上傳到服務器:

[root@hadoop ~]# cd /usr/local/src
[root@hadoop /usr/local/src]# ls |grep datax.tar.gz 
datax.tar.gz
[root@hadoop /usr/local/src]#

將安裝包解壓到合適的目錄下:

[root@hadoop /usr/local/src]# tar -zxvf datax.tar.gz -C /usr/local
[root@hadoop /usr/local/src]# cd ../datax/
[root@hadoop /usr/local/datax]# ls
bin  conf  job  lib  plugin  script  tmp
[root@hadoop /usr/local/datax]#

執行DataX的自檢腳本:

[root@hadoop /usr/local/datax]# python bin/datax.py job/job.json
...

任務啓動時刻                    : 2020-11-13 11:21:01
任務結束時刻                    : 2020-11-13 11:21:11
任務總計耗時                    :                 10s
任務平均流量                    :          253.91KB/s
記錄寫入速度                    :          10000rec/s
讀出記錄總數                    :              100000
讀寫失敗總數                    :                   0

CSV文件數據導入Hive

檢測沒問題後,接下來簡單演示一下將CSV文件中的數據導入到Hive中。咱們須要用到hdfswriter,以及txtfilereader。官方文檔:

首先,到Hive中建立一個數據庫:

0: jdbc:hive2://localhost:10000> create database db01;
No rows affected (0.315 seconds)
0: jdbc:hive2://localhost:10000> use db01;

而後建立一張表:

create table log_dev2(
    id int,
    name string,
    create_time int,
    creator string,
    info string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
stored as orcfile;

當庫、表建立完成後,在HDFS中會有對應的目錄文件:

[root@hadoop ~]# hdfs dfs -ls /user/hive/warehouse/db01.db
Found 1 items
drwxr-xr-x   - root supergroup          0 2020-11-13 11:30 /user/hive/warehouse/db01.db/log_dev2
[root@hadoop ~]#

準備測試數據:

[root@hadoop ~]# cat datax/db.csv
1,建立用戶,1554099545,hdfs,建立用戶 test
2,更新用戶,1554099546,yarn,更新用戶 test1
3,刪除用戶,1554099547,hdfs,刪除用戶 test2
4,更新用戶,1554189515,yarn,更新用戶 test3
5,刪除用戶,1554199525,hdfs,刪除用戶 test4
6,建立用戶,1554299345,yarn,建立用戶 test5

DataX經過json格式的配置文件來定義ETL任務,建立一個json文件:vim csv2hive.json,咱們要定義的ETL任務內容以下:

{
    "setting":{

    },
    "job":{
        "setting":{
            "speed":{
                "channel":2
            }
        },
        "content":[
            {
                "reader":{
                    "name":"txtfilereader",
                    "parameter":{
                        "path":[
                            "/root/datax/db.csv"
                        ],
                        "encoding":"UTF-8",
                        "column":[
                            {
                                "index":0,
                                "type":"long"
                            },
                            {
                                "index":1,
                                "type":"string"
                            },
                            {
                                "index":2,
                                "type":"long"
                            },
                            {
                                "index":3,
                                "type":"string"
                            },
                            {
                                "index":4,
                                "type":"string"
                            }
                        ],
                        "fieldDelimiter":","
                    }
                },
                "writer":{
                    "name":"hdfswriter",
                    "parameter":{
                        "defaultFS":"hdfs://192.168.243.161:8020",
                        "fileType":"orc",
                        "path":"/user/hive/warehouse/db01.db/log_dev2",
                        "fileName":"log_dev2.csv",
                        "column":[
                            {
                                "name":"id",
                                "type":"int"
                            },
                            {
                                "name":"name",
                                "type":"string"
                            },
                            {
                                "name":"create_time",
                                "type":"INT"
                            },
                            {
                                "name":"creator",
                                "type":"string"
                            },
                            {
                                "name":"info",
                                "type":"string"
                            }
                        ],
                        "writeMode":"append",
                        "fieldDelimiter":",",
                        "compress":"NONE"
                    }
                }
            }
        ]
    }
}
  1. datax使用json做爲配置文件,文件能夠是本地的也能夠是遠程http服務器上面
  2. json配置文件最外層是一個jobjob包含settingcontent兩部分,其中setting用於對整個job進行配置,content是數據的源和目的
  3. setting:用於設置全局channe|配置,髒數據配置,限速配置等,本例中只配置了channel個數1,也就是使用單線程執行數據傳輸
  4. content
    • reader:配置從哪裏讀數據
      • name:插件名稱,須要和工程中的插件名保持-致
      • parameter:插件對應的輸入參數
      • path:源數據文件的路徑
      • encoding:數據編碼
      • fieldDelimiter:數據分隔符
      • column:源數據按照分隔符分割以後的位置和數據類型
    • writer:配置將數據寫到哪裏去
      • name:插件名稱,須要和工程中的插件名保持一致
      • parameter:插件對應的輸入參數
      • path:目標路徑
      • fileName:目標文件名前綴
      • writeMode:寫入目標目錄的方式

經過DataX的Python腳本執行咱們定義的ETL任務:

[root@hadoop ~]# python /usr/local/datax/bin/datax.py datax/csv2hive.json
...

任務啓動時刻                    : 2020-11-15 11:10:20
任務結束時刻                    : 2020-11-15 11:10:32
任務總計耗時                    :                 12s
任務平均流量                    :               17B/s
記錄寫入速度                    :              0rec/s
讀出記錄總數                    :                   6
讀寫失敗總數                    :                   0

查看HDFS中是否已存在相應的數據文件:

[root@hadoop ~]# hdfs dfs -ls /user/hive/warehouse/db01.db/log_dev2
Found 1 items
-rw-r--r--   3 root supergroup        825 2020-11-15 11:10 /user/hive/warehouse/db01.db/log_dev2/log_dev2.csv__f19a135d_6c22_4988_ae69_df39354acb1e
[root@hadoop ~]#

到Hive中驗證導入的數據是否符合預期:

0: jdbc:hive2://localhost:10000> use db01;
No rows affected (0.706 seconds)
0: jdbc:hive2://localhost:10000> show tables;
+-----------+
| tab_name  |
+-----------+
| log_dev2  |
+-----------+
1 row selected (0.205 seconds)
0: jdbc:hive2://localhost:10000> select * from log_dev2;
+--------------+----------------+-----------------------+-------------------+----------------+
| log_dev2.id  | log_dev2.name  | log_dev2.create_time  | log_dev2.creator  | log_dev2.info  |
+--------------+----------------+-----------------------+-------------------+----------------+
| 1            | 建立用戶         | 1554099545         | hdfs              | 建立用戶 test      |
| 2            | 更新用戶         | 1554099546         | yarn              | 更新用戶 test1     |
| 3            | 刪除用戶         | 1554099547         | hdfs              | 刪除用戶 test2     |
| 4            | 更新用戶         | 1554189515         | yarn              | 更新用戶 test3     |
| 5            | 刪除用戶         | 1554199525         | hdfs              | 刪除用戶 test4     |
| 6            | 建立用戶         | 1554299345         | yarn              | 建立用戶 test5     |
+--------------+----------------+-----------------------+-------------------+----------------+
6 rows selected (1.016 seconds)
0: jdbc:hive2://localhost:10000>

MySQL數據導入Hive

接下來演示一下將MySQL數據導入Hive中。爲了實現該功能,咱們須要使用到mysqlreader來從MySQL中讀取數據,其官方文檔以下:

首先,執行以下SQL構造一些測試數據:

CREATE DATABASE datax_test;

USE `datax_test`;

CREATE TABLE `dev_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL,
  `create_time` int(11) DEFAULT NULL,
  `creator` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL,
  `info` varchar(2000) COLLATE utf8_unicode_ci DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1069 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;

insert  into `dev_log`(`id`,`name`,`create_time`,`creator`,`info`) values 
(1,'建立用戶',1554099545,'hdfs','建立用戶 test'),
(2,'更新用戶',1554099546,'yarn','更新用戶 test1'),
(3,'刪除用戶',1554099547,'hdfs','刪除用戶 test2'),
(4,'更新用戶',1554189515,'yarn','更新用戶 test3'),
(5,'刪除用戶',1554199525,'hdfs','刪除用戶 test4'),
(6,'建立用戶',1554299345,'yarn','建立用戶 test5');

而後到Hive的db01數據庫中再建立一張表:

create table log_dev(
    id int,
    name string,
    create_time int,
    creator string,
    info string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
stored as textfile;

建立ETL任務的配置文件:

[root@hadoop ~]# vim datax/mysql2hive.json

文件內容以下:

{
    "job":{
        "setting":{
            "speed":{
                "channel":3
            },
            "errorLimit":{
                "record":0,
                "percentage":0.02
            }
        },
        "content":[
            {
                "reader":{
                    "name":"mysqlreader",
                    "parameter":{
                        "username":"root",
                        "password":"123456a.",
                        "column":[
                            "id",
                            "name",
                            "create_time",
                            "creator",
                            "info"
                        ],
                        "where":"creator='${creator}' and create_time>${create_time}",
                        "connection":[
                            {
                                "table":[
                                    "dev_log"
                                ],
                                "jdbcUrl":[
                                    "jdbc:mysql://192.168.1.11:3306/datax_test?serverTimezone=Asia/Shanghai"
                                ]
                            }
                        ]
                    }
                },
                "writer":{
                    "name":"hdfswriter",
                    "parameter":{
                        "defaultFS":"hdfs://192.168.243.161:8020",
                        "fileType":"text",
                        "path":"/user/hive/warehouse/db01.db/log_dev",
                        "fileName":"log_dev3.csv",
                        "column":[
                            {
                                "name":"id",
                                "type":"int"
                            },
                            {
                                "name":"name",
                                "type":"string"
                            },
                            {
                                "name":"create_time",
                                "type":"INT"
                            },
                            {
                                "name":"creator",
                                "type":"string"
                            },
                            {
                                "name":"info",
                                "type":"string"
                            }
                        ],
                        "writeMode":"append",
                        "fieldDelimiter":",",
                        "compress":"GZIP"
                    }
                }
            }
        ]
    }
}
  • mysqlreader支持傳入where條件來過濾須要讀取的數據,具體參數能夠在執行datax腳本時傳入,咱們能夠經過這種變量替換的方式實現增量同步的支持

mysqlreader默認的驅動包是5.x的,因爲我這裏的MySQL版本是8.x,因此須要替換一下mysqlreader中的驅動包:

[root@hadoop ~]# cp /usr/local/src/mysql-connector-java-8.0.21.jar /usr/local/datax/plugin/reader/mysqlreader/libs/
[root@hadoop ~]# rm -rf /usr/local/datax/plugin/reader/mysqlreader/libs/mysql-connector-java-5.1.34.jar

而後執行該ETL任務:

[root@hadoop ~]# python /usr/local/datax/bin/datax.py datax/mysql2hive.json -p "-Dcreator=yarn -Dcreate_time=1554099547"
...

任務啓動時刻                    : 2020-11-15 11:38:14
任務結束時刻                    : 2020-11-15 11:38:25
任務總計耗時                    :                 11s
任務平均流量                    :                5B/s
記錄寫入速度                    :              0rec/s
讀出記錄總數                    :                   2
讀寫失敗總數                    :                   0

查看HDFS中是否已存在相應的數據文件:

[root@hadoop ~]# hdfs dfs -ls /user/hive/warehouse/db01.db/log_dev
Found 1 items
-rw-r--r--   3 root supergroup         84 2020-11-15 11:38 /user/hive/warehouse/db01.db/log_dev/log_dev3.csv__d142f3ee_126e_4056_af49_b56e45dec1ef.gz
[root@hadoop ~]#

到Hive中驗證導入的數據是否符合預期:

0: jdbc:hive2://localhost:10000> select * from log_dev;
+-------------+---------------+----------------------+------------------+---------------+
| log_dev.id  | log_dev.name  | log_dev.create_time  | log_dev.creator  | log_dev.info  |
+-------------+---------------+----------------------+------------------+---------------+
| 4           | 更新用戶        | 1554189515          | yarn             | 更新用戶 test3  |
| 6           | 建立用戶        | 1554299345          | yarn             | 建立用戶 test5  |
+-------------+---------------+----------------------+------------------+---------------+
2 rows selected (0.131 seconds)
0: jdbc:hive2://localhost:10000>

數據治理簡介

將數據採集到數倉後所面臨的問題:

  • 相比傳統數倉大數據時代數據更加多樣、更加複雜、數據量更大
  • 隨處可見的數據不統1、難以提高的數據質量、難以完成的數據模型梳理
  • 多種採集工具、多種存儲方式使數據倉庫or數據湖逐漸變成數據沼澤

數據治理須要解決的問題:

  • 數據不可知:用戶不知道有哪些數據、不知道數據和業務的關係
  • 數據不可控:沒有統一的數據標準,數據沒法集成和統一
  • 數據不可取:用戶不能便捷的取到數據,或者取到的數據不可用
  • 數據不可聯:數據之間的關係沒有體現出來,數據深層價值沒法體現

數據治理的目標:

  • 創建統一數據標準與數據規範,保障數據質量
  • 制定數據管理流程,把控數據整個生命週期
  • 造成平臺化工具,提供給用戶使用

數據治理:

  • 數據治理包括元數據管理、數據質量管理、數據血緣管理等
  • 數據治理在數據採集、數據清洗、數據計算等各個環節
  • 數據治理可貴不是技術,而是流程、協同和管理

元數據管理:

  • 管理數據的庫表結構等schema信息
  • 數據存儲空間、讀寫記錄、權限歸屬及其餘各種統計信息

數據血緣管理:

  • 數據之間的血緣關係及生命週期
  • B表的數據從A表彙總而來,那麼B和A表就具備血緣關係
  • 數據的業務屬性信息和業務數據模型

數據治理步驟簡述:

  • 統一數據規範和數據定義,打通業務模型和技術模型
  • 提高數據質量,實現數據全生命週期管理
  • 挖掘數據價值,幫助業務人員便捷靈活的使用數據

數據治理與周邊系統:

  • ODS、DWD、DM等各層次元數據歸入數據治理平臺集中管理
  • 數據採集及處理流程中產生的元數據歸入數據治理平臺,並創建血緣關係
  • 提供數據管理的服務接口,數據模型變動及時通知上下游

Apache Atlas數據治理

常見的數據治理工具:

  • Apache Atlas:Hortonworks主推的數據治理開源項目
  • Metacat:Netflix開源的元數據管理、數據發現組件
  • Navigator:Cloudera提供的數據管理的解決方案
  • WhereHows:LinkedIn內部使用並開源的數據管理解決方案

Apache Altas:

  • 數據分類:自動捕獲、定義和註釋元數據,對數據進行業務導向分類
  • 集中審計:捕獲全部步驟、應用及數據交互的訪問信息
  • 搜索與血緣:基於分類和審計關聯數據與數據的關係,並經過可視化的方式展示

Apache Altas架構圖:
大數據平臺 - 數據採集及治理

  • Type System:對須要管理的元數據對象抽象的實體,由類型構成
  • Ingest\Export:元數據的自動採集和導出工具,Export能夠做 爲事件進行觸發,使用戶能夠及時響應
  • Graph Engine:經過圖數據庫和圖計算弓|擎展示數據之間的關係

元數據捕獲:

  • Hook:來自各個組件的Hook自動捕獲數據進行存儲
  • Entity:集成的各個系統在操做時觸發事件進行寫入
  • 獲取元數據的同時,獲取數據之間的關聯關係,構建血緣
相關文章
相關標籤/搜索