優秀的數據工程師,怎麼用 Spark 在 TiDB 上作 OLAP 分析

做者:RickyHuo
本文轉載自公衆號「大道至簡bigdata」
原文連接優秀的數據工程師,怎麼用 Spark 在 TiDB 上作 OLAP 分析mysql

TiDB 是一款定位於在線事務處理/在線分析處理的融合型數據庫產品,實現了一鍵水平伸縮,強一致性的多副本數據安全,分佈式事務,實時 OLAP 等重要特性。
TiSpark 是 PingCAP 爲解決用戶複雜 OLAP 需求而推出的產品。它藉助 Spark 平臺,同時融合 TiKV 分佈式集羣的優點。直接使用 TiSpark 完成 OLAP 操做須要瞭解 Spark,還須要一些開發工做。那麼,有沒有一些開箱即用的工具能幫咱們更快速地使用 TiSpark 在 TiDB 上完成 OLAP 分析呢?
目前開源社區上有一款工具 Waterdrop,能夠基於 Spark,在 TiSpark 的基礎上快速實現 TiDB 數據讀取和 OLAP 分析。項目地址:
https://github.com/InterestingLab/waterdrop

使用 Waterdrop 操做 TiDB

在咱們線上有這麼一個需求,從 TiDB 中讀取某一天的網站訪問數據,統計每一個域名以及服務返回狀態碼的訪問次數,最後將統計結果寫入 TiDB 另一個表中。 咱們來看看 Waterdrop 是如何實現這麼一個功能的。nginx

Waterdrop

Waterdrop 是一個很是易用,高性能,可以應對海量數據的實時數據處理產品,它構建在 Spark 之上。Waterdrop 擁有着很是豐富的插件,支持從 TiDB、Kafka、HDFS、Kudu 中讀取數據,進行各類各樣的數據處理,而後將結果寫入 TiDB、ClickHouse、Elasticsearch 或者 Kafka 中。git

準備工做

1. TiDB 表結構介紹

Input(存儲訪問日誌的表)github

CREATE TABLE access_log (
    domain VARCHAR(255),
    datetime VARCHAR(63),
    remote_addr VARCHAR(63),
    http_ver VARCHAR(15),
    body_bytes_send INT,
    status INT,
    request_time FLOAT,
    url TEXT
)
+-----------------+--------------+------+------+---------+-------+
| Field           | Type         | Null | Key  | Default | Extra |
+-----------------+--------------+------+------+---------+-------+
| domain          | varchar(255) | YES  |      | NULL    |       |
| datetime        | varchar(63)  | YES  |      | NULL    |       |
| remote_addr     | varchar(63)  | YES  |      | NULL    |       |
| http_ver        | varchar(15)  | YES  |      | NULL    |       |
| body_bytes_send | int(11)      | YES  |      | NULL    |       |
| status          | int(11)      | YES  |      | NULL    |       |
| request_time    | float        | YES  |      | NULL    |       |
| url             | text         | YES  |      | NULL    |       |
+-----------------+--------------+------+------+---------+-------+

Output(存儲結果數據的表)sql

CREATE TABLE access_collect (
    date VARCHAR(23),
    domain VARCHAR(63),
    status INT,
    hit INT
)
+--------+-------------+------+------+---------+-------+
| Field  | Type        | Null | Key  | Default | Extra |
+--------+-------------+------+------+---------+-------+
| date   | varchar(23) | YES  |      | NULL    |       |
| domain | varchar(63) | YES  |      | NULL    |       |
| status | int(11)     | YES  |      | NULL    |       |
| hit    | int(11)     | YES  |      | NULL    |       |
+--------+-------------+------+------+---------+-------+

2. 安裝 Waterdrop

有了 TiDB 輸入和輸出表以後, 咱們須要安裝 Waterdrop,安裝十分簡單,無需配置系統環境變量數據庫

1) 準備 Spark 環境apache

2) 安裝 Waterdropvim

3) 配置 Waterdrop安全

如下是簡易步驟,具體安裝能夠參照 Quick Start。微信

# 下載安裝Spark
cd /usr/local
wget https://archive.apache.org/dist/spark/spark-2.1.0/spark-2.1.0-bin-hadoop2.7.tgz
tar -xvf https://archive.apache.org/dist/spark/spark-2.1.0/spark-2.1.0-bin-hadoop2.7.tgz
wget
# 下載安裝Waterdrop
https://github.com/InterestingLab/waterdrop/releases/download/v1.2.0/waterdrop-1.2.0.zip
unzip waterdrop-1.2.0.zip
cd waterdrop-1.2.0

vim config/waterdrop-env.sh
# 指定Spark安裝路徑
SPARK_HOME=${SPARK_HOME:-/usr/local/spark-2.1.0-bin-hadoop2.7}

實現 Waterdrop 處理流程

咱們僅須要編寫一個 Waterdrop 配置文件便可完成數據的讀取、處理、寫入。

Waterdrop 配置文件由四個部分組成,分別是 SparkInputFilterOutputInput 部分用於指定數據的輸入源,Filter 部分用於定義各類各樣的數據處理、聚合,Output 部分負責將處理以後的數據寫入指定的數據庫或者消息隊列。

整個處理流程爲 Input -> Filter -> Output,整個流程組成了 Waterdrop 的處理流程(Pipeline)。

如下是一個具體配置,此配置來源於線上實際應用,可是爲了演示有所簡化。

Input (TiDB)

這裏部分配置定義輸入源,以下是從 TiDB 一張表中讀取數據。

input {
    tidb {
        database = "nginx"
        pre_sql = "select * from nginx.access_log"
        table_name = "spark_nginx_input"
    }
}

Filter

在 Filter 部分,這裏咱們配置一系列的轉化, 大部分數據分析的需求,都是在 Filter 完成的。Waterdrop 提供了豐富的插件,足以知足各類數據分析需求。這裏咱們經過 SQL 插件完成數據的聚合操做。

filter {
    sql {
        table_name = "spark_nginx_log"
        sql = "select count(*) as hit, domain, status, substring(datetime, 1, 10) as date from spark_nginx_log where substring(datetime, 1, 10)='2019-01-20' group by domain, status, substring(datetime, 1, 10)"
    }
}

Output (TiDB)

最後, 咱們將處理後的結果寫入 TiDB 另一張表中。TiDB Output 是經過 JDBC 實現的。

output {
    tidb {
        url = "jdbc:mysql://127.0.0.1:4000/nginx?useUnicode=true&characterEncoding=utf8"
        table = "access_collect"
        user = "username"
        password = "password"
        save_mode = "append"
    }
}

Spark

這一部分是 Spark 的相關配置,主要配置 Spark 執行時所需的資源大小以及其餘 Spark 配置。

咱們的 TiDB Input 插件是基於 TiSpark 實現的,而 TiSpark 依賴於 TiKV 集羣和 Placement Driver (PD)。所以咱們須要指定 PD 節點信息以及 TiSpark 相關配置 spark.tispark.pd.addressesspark.sql.extensions

spark {
  spark.app.name = "Waterdrop-tidb"
  spark.executor.instances = 2
  spark.executor.cores = 1
  spark.executor.memory = "1g"
  # Set for TiSpark
  spark.tispark.pd.addresses = "localhost:2379"
  spark.sql.extensions = "org.apache.spark.sql.TiExtensions"
}

運行 Waterdrop

咱們將上述四部分配置組合成咱們最終的配置文件 conf/tidb.conf

spark {
    spark.app.name = "Waterdrop-tidb"
    spark.executor.instances = 2
    spark.executor.cores = 1
    spark.executor.memory = "1g"
    # Set for TiSpark
    spark.tispark.pd.addresses = "localhost:2379"
    spark.sql.extensions = "org.apache.spark.sql.TiExtensions"
}
input {
    tidb {
        database = "nginx"
        pre_sql = "select * from nginx.access_log"
        table_name = "spark_table"
    }
}
filter {
    sql {
        table_name = "spark_nginx_log"
        sql = "select count(*) as hit, domain, status, substring(datetime, 1, 10) as date from spark_nginx_log where substring(datetime, 1, 10)='2019-01-20' group by domain, status, substring(datetime, 1, 10)"
    }
}
output {
    tidb {
        url = "jdbc:mysql://127.0.0.1:4000/nginx?useUnicode=true&characterEncoding=utf8"
        table = "access_collect"
        user = "username"
        password = "password"
        save_mode = "append"
    }
}

執行命令,指定配置文件,運行 Waterdrop ,便可實現咱們的數據處理邏輯。

  • Local

./bin/start-waterdrop.sh --config config/tidb.conf --deploy-mode client --master 'local[2]'

  • yarn-client

./bin/start-waterdrop.sh --config config/tidb.conf --deploy-mode client --master yarn

  • yarn-cluster

./bin/start-waterdrop.sh --config config/tidb.conf --deploy-mode cluster -master yarn

若是是本機測試驗證邏輯,用本地模式(Local)就能夠了,通常生產環境下,都是使用 yarn-client 或者 yarn-cluster 模式。

檢查結果

mysql> select * from access_collect;
+------------+--------+--------+------+
| date       | domain | status | hit  |
+------------+--------+--------+------+
| 2019-01-20 | b.com  |    200 |   63 |
| 2019-01-20 | a.com  |    200 |   85 |
+------------+--------+--------+------+
2 rows in set (0.21 sec)

總結

在這篇文章中,咱們介紹瞭如何使用 Waterdrop 從 TiDB 中讀取數據,作簡單的數據處理以後寫入 TiDB 另一個表中。僅經過一個配置文件即可快速完成數據的導入,無需編寫任何代碼。

除了支持 TiDB 數據源以外,Waterdrop 一樣支持 Elasticsearch,Kafka,Kudu, ClickHouse 等數據源。

與此同時,咱們正在研發一個重要功能,就是在 Waterdrop 中,利用 TiDB 的事務特性,實現從 Kafka 到 TiDB 流式數據處理,而且支持端(Kafka)到端(TiDB)的 Exactly-Once 數據一致性

但願瞭解 Waterdrop 和 TiDB,ClickHouse、Elasticsearch、Kafka 結合使用的更多功能和案例,能夠直接進入項目主頁:https://github.com/InterestingLab/waterdrop ,或者聯繫項目負責人: Garyelephan(微信: garyelephant)、RickyHuo (微信: chodomatte1994)。

相關文章
相關標籤/搜索