POLARDB數據庫是阿里雲自研的下一代關係型雲數據庫,100%兼容MySQL,性能最高是MySQL的6倍,可是隨着數據量不斷增大,面臨着單條SQL沒法分析出結果的現狀。X-Pack Spark爲數據庫提供分析引擎,旨在打造數據庫閉環,藉助X-Pack Spark能夠將POLARDB數據歸檔至列式存儲Parquet文件,一條SQL完成複雜數據分析,並將分析結果迴流到業務庫提供查詢。本文主要介紹如何使用X-Pack Spark數據工做臺對POLARDB數據歸檔。java
業務須要對多張表出不一樣緯度,按天、按月的報表並對外提供查詢服務;最大表當前500G,數據量還在不斷的增長。嘗試過spark直接經過jdbc去分析POLARDB,一方面比較慢,另一方面每次掃全量的POLARDB數據,對在線業務有影響。基於如下幾點考慮選擇POLARDB+Spark的架構:mysql
Spark集羣和POLARDB需在同一個VPC下才能訪問,目前X-Pack Spark上還不支持一鍵關聯POLARDB數據庫,須要將Spark集羣的IP加到POLARDB白名單中。後續將會開放一鍵關聯POLARDB的功能。
在「HBase控制檯」->「集羣列表」中找到分析Spark實例,在「數據庫鏈接」欄中找到「VSwitch ID」交換機ID,以下圖:
而後在「專有網絡VPC控制檯」->"交換機"搜索交換機實例ID,查詢到IPV4網段。
將Spark集羣網絡加入到POLARDB白名單,進入「控制檯」->「集羣列表」找到所要關聯的POLARDB實例,而後在「基本信息」->「訪問信息」->「白名單」加入Spark集羣所屬網段。git
POLARDB中已經存在測試表,若是沒有可登陸POLARDB數據庫建立測試表,下文也以該測試表爲例。github
CREATE TABLE IF NOT EXISTS test.us_population ( state CHAR(2) NOT NULL PRIMARY KEY, city VARCHAR(10), population INTEGER, dt TIMESTAMP ); INSERT INTO test.us_population VALUES('NY','New York',8143197, CURRENT_DATE ); INSERT INTO test.us_population VALUES('CA','Los Angeles',3844829, CURRENT_DATE); INSERT INTO test.us_population VALUES('IL','Chicago',2842518, '2019-04-13'); INSERT INTO test.us_population VALUES('TX','Houston',2016582, '2019-04-14'); INSERT INTO test.us_population VALUES('PA','Philadelphia',1463281, '2019-04-13'); INSERT INTO test.us_population VALUES('AZ','Phoenix',1461575, '2019-04-15'); INSERT INTO test.us_population VALUES('SA','San Antonio',1256509, CURRENT_DATE); INSERT INTO test.us_population VALUES('SD','San Diego',1255540, CURRENT_DATE); INSERT INTO test.us_population VALUES('DL','Dallas',1213825, '2019-04-15'); INSERT INTO test.us_population VALUES('SJ','San Jose',912332,'2019-04-15');
在"HBase控制檯"->"會話管理"建立會話,指定會話名稱和執行集羣,如圖:
在編輯器中輸入Spark啓動參數,並運行會話,以便在交互式查詢中使用。sql
--driver-memory 1G --driver-cores 1 --executor-cores 1 --executor-memory 2G --num-executors 1 --name spark_on_polardb --jars /spark-demo/mysql-connector-java-5.1.34.jar
參數說明:數據庫
參數 | 說明 |
---|---|
driver-memory | spark運行driver內存大小 |
driver-cores | spark運行driver核數 |
executor-cores | spark做業執行器executor核數 |
executor-memory | 執行器內存 |
jars | spark做業依賴第三方包,地址可在資源管理中複製 |
注:上述參數在測試環境中給定偏小,大數據量時根據實際集羣規格和數據量進行配置apache
會話運行成功後以下圖所示:網絡
進入"HBase控制檯"->"交互式查詢",在會話列表中選擇上一步建立會話「spark_on_polardb」,而後新建查詢,指定查詢名稱,選擇查詢類型爲「SQL」類型,如圖:架構
在查詢輸入框中輸入Spark建表語句,與POLARDB表進行關聯,建表語句爲:編輯器
create table spark_polordb using org.apache.spark.sql.jdbc options ( driver "com.mysql.jdbc.Driver", url "jdbc:mysql://pc-xxx.rwlb.rds.aliyuncs.com:3306", dbtable "test.us_population", user 'xxx', password 'xxxxxx' )
參數說明:
參數 | 說明 |
---|---|
spark_polordb | spark中表名 |
driver | polardb驅動類名 |
url | polardb的數據庫鏈接地址 |
dbtable | 對應polardb表名,格式爲database.tablename |
user | polardb用戶名 |
password | 鏈接密碼 |
點擊運行,查詢狀態爲「success」時代表建立成功。
在上步建立查詢編輯器中輸入查詢語句,而後運行:
SELECT * FROM spark_polordb
查詢成功後返回結果如圖:
X-Pack Spark將POLARDB數據歸檔至Parquet列式存儲格式中,一方面可以獲取更優的壓縮空間,另外一方面後續分析任務中具備更高的效率。
Spark建立parquet分區表語句以下,一樣在第一步中交互式查詢編輯中輸入:
CREATE table parquetTable(state CHAR(2), city VARCHAR(10), population int) USING parquet PARTITIONED BY(dt timestamp)
參數說明:
參數 | 說明 |
---|---|
parquetTable | spark中歸檔表名 |
USING parquet | 數據存儲格式爲parquet |
PARTITIONED BY | 按照字段分區,類型爲timestamp,也能夠指定爲date |
建表成功後,能夠將POLARDB數據寫入至Parquet表。
將POLARDB數據查詢出寫入parquet表便可完成數據歸檔,操做語句爲:
INSERT INTO parquetTable partition(dt) SELECT state, city, population, dt FROM spark_polordb
運行成功後數據歸檔完成。查詢parquet表數據:
交互式查詢主要用來測試調試,歸檔通常須要作t+1的操做,天天按期把當前的數據作歸檔,這就須要使用工做流的週期調度,下面具體介紹如何使用工做流的週期調度實現t+1的歸檔。
使用工做流以前須要建立對應的Spark做業,Spark歸檔POLARDB能夠實現一個完整做業,包括如下流程:
雲Spark提供了Spark歸檔POLARDB的代碼DEMO,請參考github:SparkArchivePolarDB
具體歸檔代碼需結合實際場景,歸檔不一樣表,設置特定分區和歸檔條件等。
將打成jar包的spark歸檔demo代碼經過資源管理上傳至資源列表,jar包下載地址:Spark歸檔工具DEMO下載
本身編寫的Spark做業一樣須要打成jar包後上傳至資源列表,後面做業須要運行jar包中歸檔做業。
進入「HBase控制檯」->"數據工做臺"->「做業管理」->「建立做業」, 如圖
做業內容中主要指定了Spark做業運行參數,以及具體的歸檔做業編碼類和傳入參數等,以SparkArchivePolarDB demo爲例:
--class com.aliyun.spark.polardb.SparkOnPolarDBArchive --driver-memory 1G --driver-cores 1 --executor-cores 1 --executor-memory 2G --num-executors 1 --jars /spark-demo/mysql-connector-java-5.1.34.jar /spark-demo/spark-examples-0.0.1-SNAPSHOT.jar pc-xxx.rwlb.rds.aliyuncs.com:3306 test.us_population username passwd sparkTestPolarDB
參數說明:
參數 | 說明 |
---|---|
class | 指定spark做業運行主類 |
/spark-demo/spark-examples-0.0.1-SNAPSHOT.jar | spark做業所屬包 |
pc-xxx.rwlb.rds.aliyuncs.com:3306 | polardb的鏈接串 |
test.us_population | 歸檔polardb表 |
username | polardb用戶名 |
passwd | polardb鏈接密碼 |
sparkTestPolarDB | spark歸檔表名 |
其他參數可參見上述章節介紹
做業配置如圖:
做業運行後一段時間能夠查看到運行狀態,成功後可在交互式查詢中查看歸檔表數據。
進入交互式工做臺,使用可參考上述介紹,查看歸檔表數據:
進入「HBase控制檯」->「數據工做臺」->「工做流」,選擇新建工做流,指定工做流名稱、描述和執行集羣,
而後進入工做流設計工做臺,拖動Spark做業並進行配置,選擇上一步配置做業並連線:
選擇"工做流配置"->"調度屬性",開啓調度狀態並設置其實時間和調度週期,工做流即將進行週期性調度,如圖:
全量歸檔方式主要用來對原庫中歷史數據進行歸檔或者針對數據量比較小的表,歸檔步驟以下:
建立歸檔表時若是表數據量較大,能夠建立分區表。分區策略通常分爲時間分區和業務分區:
- 時間分區易於使用,即將相同時間的數據歸檔到同一個目錄,好比選擇按年或者按天進行時間分區,在分析時限定數據分區便可過濾掉與分析任務無關的數據。
- 業務分區字段須要具備有限的類別,好比性別、年齡、部門等。業務分區須要結合具體業務進行考慮,分區個數不宜過多,spark默認最大分區數爲1000。
- 分區方式能夠選擇靜態分區和動態分區,默認使用靜態分區,即寫入數據時必須指定寫入哪一個分區,動態分區須要將hive.exec.dynamic.partition.mode設置爲nonstrict,寫入時根據具體分區字段值動態建立分區,相同partition key值寫入同一個分區。
使用示例可參考:SparkOnPolarDBArchivedemo
在業務表中數據不存在更新和刪除的操做,僅僅是向數據表中增量寫入,這種狀況下只須要在數據表中記錄數據入庫時間或者其餘標記記錄新增數據,在Spark中使用工做流週期調度,傳入增量數據條件,按期將新增數據歸檔只Spark中便可。
針對業務數據存在更新的數據,若是原表中沒法辨別更新的數據,目前只能經過全量歸檔的方式每次對全量數據進行一次歸檔,將原歸檔表數據進行overwrite;若是存在更新數據標記,如update_time字段,因爲Spark目前不支持ACID,沒法使用merge..into功能直接更新已有數據,增量更新歸檔步驟以下:
Spark更新增量歸檔目前只能使用join關聯方式遍歷全部數據完成數據更新,但好處是儘可能避免影響在線庫POLARDB的數據訪問,每次只讀取更新和增量的部分數據,將計算工做放在廉價的Spark集羣中。
使用示例可參考:SparkOnPolarDBIncrement
另外一種方式:若是在業務側須要保留多個版本更新的數據,能夠直接將更新和增量的數據追加到歸檔表中,而後在業務側經過最新時間判斷出有效的數據,能夠避免每次更新時複雜計算過程。
業務表中若是存在delete,目前Spark沒有較好的辦法進行支持,須要在業務庫記錄刪除的關鍵字段信息,與歸檔表進行join,過濾掉join到的數據而後覆寫到歸檔表中,達到delete的效果。
在進行實際數據開發中,每每須要多個Spark做業配合完成數據歸檔以及分析工做,單個工做流中支持配置多個做業並按序執行,同時配合交互式工做臺進行數據驗證,減小不少開發中不便。目前工做臺仍在不斷優化中,在使用中遇到不便之處可隨時提出建議,便於簡化您的數據開發工做。
後續X-Pack Spark將提供一鍵歸檔功能,敬請期待。
原文連接 本文爲雲棲社區原創內容,未經容許不得轉載。