X-Pack Spark歸檔POLARDB數據作分析

簡介

POLARDB數據庫是阿里雲自研的下一代關係型雲數據庫,100%兼容MySQL,性能最高是MySQL的6倍,可是隨着數據量不斷增大,面臨着單條SQL沒法分析出結果的現狀。X-Pack Spark爲數據庫提供分析引擎,旨在打造數據庫閉環,藉助X-Pack Spark能夠將POLARDB數據歸檔至列式存儲Parquet文件,一條SQL完成複雜數據分析,並將分析結果迴流到業務庫提供查詢。本文主要介紹如何使用X-Pack Spark數據工做臺對POLARDB數據歸檔。java

業務架構

業務須要對多張表出不一樣緯度,按天、按月的報表並對外提供查詢服務;最大表當前500G,數據量還在不斷的增長。嘗試過spark直接經過jdbc去分析POLARDB,一方面比較慢,另一方面每次掃全量的POLARDB數據,對在線業務有影響。基於如下幾點考慮選擇POLARDB+Spark的架構:mysql

  • 選擇POLARDB按天增量歸檔到spark列存,天天增量數據量比較少,選擇業務低峯期歸檔,對在線查詢無影響
  • 選擇Spark做爲報表分析引擎,由於Spark很適合作ETL,且內置支持數據迴流到POLARDB、MongoDB等多種在線庫
  • 選擇Spark離線數倉做爲數據的中轉站,對於分析的結果數據迴流到在線庫提供查詢,可以一條Spark SQL完成分析,不須要按維度值拆分多條分析SQL

前置條件

1. 設置Spark訪問POLARDB白名單

Spark集羣和POLARDB需在同一個VPC下才能訪問,目前X-Pack Spark上還不支持一鍵關聯POLARDB數據庫,須要將Spark集羣的IP加到POLARDB白名單中。後續將會開放一鍵關聯POLARDB的功能。
在「HBase控制檯」->「集羣列表」中找到分析Spark實例,在「數據庫鏈接」欄中找到「VSwitch ID」交換機ID,以下圖:

而後在「專有網絡VPC控制檯」->"交換機"搜索交換機實例ID,查詢到IPV4網段。

將Spark集羣網絡加入到POLARDB白名單,進入「控制檯」->「集羣列表」找到所要關聯的POLARDB實例,而後在「基本信息」->「訪問信息」->「白名單」加入Spark集羣所屬網段。git

2. 建立測試表

POLARDB中已經存在測試表,若是沒有可登陸POLARDB數據庫建立測試表,下文也以該測試表爲例。github

CREATE TABLE IF NOT EXISTS test.us_population (
    state CHAR(2) NOT NULL PRIMARY KEY,
    city VARCHAR(10),
    population INTEGER, 
    dt TIMESTAMP );

INSERT INTO test.us_population VALUES('NY','New York',8143197, CURRENT_DATE );
INSERT INTO test.us_population VALUES('CA','Los Angeles',3844829, CURRENT_DATE);
INSERT INTO test.us_population VALUES('IL','Chicago',2842518, '2019-04-13');
INSERT INTO test.us_population VALUES('TX','Houston',2016582,  '2019-04-14');
INSERT INTO test.us_population VALUES('PA','Philadelphia',1463281,  '2019-04-13');
INSERT INTO test.us_population VALUES('AZ','Phoenix',1461575, '2019-04-15');
INSERT INTO test.us_population VALUES('SA','San Antonio',1256509, CURRENT_DATE);
INSERT INTO test.us_population VALUES('SD','San Diego',1255540, CURRENT_DATE);
INSERT INTO test.us_population VALUES('DL','Dallas',1213825, '2019-04-15');
INSERT INTO test.us_population VALUES('SJ','San Jose',912332,'2019-04-15');

1、使用交互式工做臺歸檔數據(調試、測試)

建立Spark運行會話

在"HBase控制檯"->"會話管理"建立會話,指定會話名稱和執行集羣,如圖:

在編輯器中輸入Spark啓動參數,並運行會話,以便在交互式查詢中使用。sql

--driver-memory 1G 
--driver-cores 1
--executor-cores 1
--executor-memory 2G
--num-executors 1
--name spark_on_polardb
--jars /spark-demo/mysql-connector-java-5.1.34.jar

參數說明:數據庫

參數 說明
driver-memory spark運行driver內存大小
driver-cores spark運行driver核數
executor-cores spark做業執行器executor核數
executor-memory 執行器內存
jars spark做業依賴第三方包,地址可在資源管理中複製

注:上述參數在測試環境中給定偏小,大數據量時根據實際集羣規格和數據量進行配置apache

會話運行成功後以下圖所示:網絡

交互式查詢歸檔數據

建立Spark映射POLARDB表

進入"HBase控制檯"->"交互式查詢",在會話列表中選擇上一步建立會話「spark_on_polardb」,而後新建查詢,指定查詢名稱,選擇查詢類型爲「SQL」類型,如圖:架構

在查詢輸入框中輸入Spark建表語句,與POLARDB表進行關聯,建表語句爲:編輯器

create table spark_polordb
using org.apache.spark.sql.jdbc
options (
  driver "com.mysql.jdbc.Driver",
  url "jdbc:mysql://pc-xxx.rwlb.rds.aliyuncs.com:3306",
  dbtable "test.us_population",
  user 'xxx',
  password 'xxxxxx'
)

參數說明:

參數 說明
spark_polordb spark中表名
driver polardb驅動類名
url polardb的數據庫鏈接地址
dbtable 對應polardb表名,格式爲database.tablename
user polardb用戶名
password 鏈接密碼

點擊運行,查詢狀態爲「success」時代表建立成功。

查詢測試

在上步建立查詢編輯器中輸入查詢語句,而後運行:

SELECT * FROM spark_polordb

查詢成功後返回結果如圖:

建立歸檔表

X-Pack Spark將POLARDB數據歸檔至Parquet列式存儲格式中,一方面可以獲取更優的壓縮空間,另外一方面後續分析任務中具備更高的效率。
Spark建立parquet分區表語句以下,一樣在第一步中交互式查詢編輯中輸入:

CREATE table parquetTable(state CHAR(2), city VARCHAR(10), population int)
USING parquet
PARTITIONED BY(dt timestamp)

參數說明:

參數 說明
parquetTable spark中歸檔表名
USING parquet 數據存儲格式爲parquet
PARTITIONED BY 按照字段分區,類型爲timestamp,也能夠指定爲date

建表成功後,能夠將POLARDB數據寫入至Parquet表。

歸檔數據

將POLARDB數據查詢出寫入parquet表便可完成數據歸檔,操做語句爲:

INSERT INTO parquetTable partition(dt) SELECT state, city, population, dt FROM spark_polordb

運行成功後數據歸檔完成。查詢parquet表數據:

2、工做流調度週期歸檔(生產T+1歸檔)

交互式查詢主要用來測試調試,歸檔通常須要作t+1的操做,天天按期把當前的數據作歸檔,這就須要使用工做流的週期調度,下面具體介紹如何使用工做流的週期調度實現t+1的歸檔。

歸檔代碼編寫

使用工做流以前須要建立對應的Spark做業,Spark歸檔POLARDB能夠實現一個完整做業,包括如下流程:

  1. 在Spark中建立POLARDB表映射表(前提POLARDB中表已經存在)
  2. 建立Spark分區歸檔表
  3. 將數據寫入歸檔表

雲Spark提供了Spark歸檔POLARDB的代碼DEMO,請參考github:SparkArchivePolarDB
具體歸檔代碼需結合實際場景,歸檔不一樣表,設置特定分區和歸檔條件等。

上傳Spark歸檔做業資源

將打成jar包的spark歸檔demo代碼經過資源管理上傳至資源列表,jar包下載地址:Spark歸檔工具DEMO下載
本身編寫的Spark做業一樣須要打成jar包後上傳至資源列表,後面做業須要運行jar包中歸檔做業。

建立Spark做業

進入「HBase控制檯」->"數據工做臺"->「做業管理」->「建立做業」, 如圖

編輯做業內容

做業內容中主要指定了Spark做業運行參數,以及具體的歸檔做業編碼類和傳入參數等,以SparkArchivePolarDB demo爲例:

--class com.aliyun.spark.polardb.SparkOnPolarDBArchive
--driver-memory 1G 
--driver-cores 1
--executor-cores 1
--executor-memory 2G
--num-executors 1
--jars /spark-demo/mysql-connector-java-5.1.34.jar
/spark-demo/spark-examples-0.0.1-SNAPSHOT.jar
pc-xxx.rwlb.rds.aliyuncs.com:3306 test.us_population username passwd sparkTestPolarDB

參數說明:

參數 說明
class 指定spark做業運行主類
/spark-demo/spark-examples-0.0.1-SNAPSHOT.jar spark做業所屬包
pc-xxx.rwlb.rds.aliyuncs.com:3306 polardb的鏈接串
test.us_population 歸檔polardb表
username polardb用戶名
passwd polardb鏈接密碼
sparkTestPolarDB spark歸檔表名

其他參數可參見上述章節介紹
做業配置如圖:

運行做業並查看結果

做業運行後一段時間能夠查看到運行狀態,成功後可在交互式查詢中查看歸檔表數據。

進入交互式工做臺,使用可參考上述介紹,查看歸檔表數據:

配置工做流

進入「HBase控制檯」->「數據工做臺」->「工做流」,選擇新建工做流,指定工做流名稱、描述和執行集羣,

而後進入工做流設計工做臺,拖動Spark做業並進行配置,選擇上一步配置做業並連線:

選擇"工做流配置"->"調度屬性",開啓調度狀態並設置其實時間和調度週期,工做流即將進行週期性調度,如圖:

3、歸檔方式(產出表的形式)

全量歸檔

全量歸檔方式主要用來對原庫中歷史數據進行歸檔或者針對數據量比較小的表,歸檔步驟以下:

  1. 使用Spark的jdbc datasource建立POLARDB的映射表;
  2. 在Spark中建立相同表結構的歸檔表,歸檔表使用Parquet列式存儲,可以最大化節約存儲空間,並加速分析性能;
  3. 經過映射表讀取POLARDB數據並寫入Spark歸檔表,注意寫入時保證字段順序一致。

建立歸檔表時若是表數據量較大,能夠建立分區表。分區策略通常分爲時間分區和業務分區:

  • 時間分區易於使用,即將相同時間的數據歸檔到同一個目錄,好比選擇按年或者按天進行時間分區,在分析時限定數據分區便可過濾掉與分析任務無關的數據。
  • 業務分區字段須要具備有限的類別,好比性別、年齡、部門等。業務分區須要結合具體業務進行考慮,分區個數不宜過多,spark默認最大分區數爲1000。
  • 分區方式能夠選擇靜態分區和動態分區,默認使用靜態分區,即寫入數據時必須指定寫入哪一個分區,動態分區須要將hive.exec.dynamic.partition.mode設置爲nonstrict,寫入時根據具體分區字段值動態建立分區,相同partition key值寫入同一個分區。

使用示例可參考:SparkOnPolarDBArchivedemo

增量歸檔

業務數據僅增量

在業務表中數據不存在更新和刪除的操做,僅僅是向數據表中增量寫入,這種狀況下只須要在數據表中記錄數據入庫時間或者其餘標記記錄新增數據,在Spark中使用工做流週期調度,傳入增量數據條件,按期將新增數據歸檔只Spark中便可。

業務數據更新

針對業務數據存在更新的數據,若是原表中沒法辨別更新的數據,目前只能經過全量歸檔的方式每次對全量數據進行一次歸檔,將原歸檔表數據進行overwrite;若是存在更新數據標記,如update_time字段,因爲Spark目前不支持ACID,沒法使用merge..into功能直接更新已有數據,增量更新歸檔步驟以下:

  1. 設置更新增量數據選擇條件(歸檔表全量歸檔時已建立),如update_time大於某個日期;
  2. 抽取增量更新的數據寫入spark臨時表;
  3. 將歷史數據歸檔表與增量更新數據表進行left out join並過濾出增量表字段爲空的數據,表示歷史數據中未參與增量更新的數據,而後與增量更新的數據進行union合併,寫入Spark臨時表;
  4. 將臨時表數據覆蓋寫入到歸檔表中做爲新的歸檔數據參與後續業務分析。

Spark更新增量歸檔目前只能使用join關聯方式遍歷全部數據完成數據更新,但好處是儘可能避免影響在線庫POLARDB的數據訪問,每次只讀取更新和增量的部分數據,將計算工做放在廉價的Spark集羣中。
使用示例可參考:SparkOnPolarDBIncrement
另外一種方式:若是在業務側須要保留多個版本更新的數據,能夠直接將更新和增量的數據追加到歸檔表中,而後在業務側經過最新時間判斷出有效的數據,能夠避免每次更新時複雜計算過程。

業務數據更新刪除

業務表中若是存在delete,目前Spark沒有較好的辦法進行支持,須要在業務庫記錄刪除的關鍵字段信息,與歸檔表進行join,過濾掉join到的數據而後覆寫到歸檔表中,達到delete的效果。

總結

在進行實際數據開發中,每每須要多個Spark做業配合完成數據歸檔以及分析工做,單個工做流中支持配置多個做業並按序執行,同時配合交互式工做臺進行數據驗證,減小不少開發中不便。目前工做臺仍在不斷優化中,在使用中遇到不便之處可隨時提出建議,便於簡化您的數據開發工做。
後續X-Pack Spark將提供一鍵歸檔功能,敬請期待。


原文連接 本文爲雲棲社區原創內容,未經容許不得轉載。

相關文章
相關標籤/搜索