EMR Spark Relational Cache如何支持雪花模型中的關聯匹配

做者:李呈祥,阿里巴巴計算平臺事業部EMR團隊高級技術專家,Apache Hive Committer, Apache Flink Committer,深度參與了Hadoop,Hive,Spark,Flink等開源項目的研發工做,對於SQL引擎,分佈式系統有較爲深刻的瞭解和實踐,目前主要專一於EMR產品中開源計算引擎的優化工做。sql


Relational Cache相關文章連接:apache

使用Relational Cache加速EMR Spark數據分析,微信

連接:https://yq.aliyun.com/articles/703046
網絡

使用EMR Spark Relational Cache跨集羣同步數據,

連接:https://yq.aliyun.com/articles/704649

EMR Spark Relational Cache的執行計劃重寫


背景

Join是Spark SQL中很是常見的操做,數據表按照業務語義的範式化表定義,便於用戶理解與使用,並且能夠消除冗餘數據。用戶經過join操做將相關的數據關聯後進行進一步的過濾,聚合等操做。在Spark中,Join一般是代價比較大,尤爲是當join的兩個表的數據都比較大,沒法優化爲map join時,須要經過網絡shuffle兩個表的數據,對數據按照jion字段進行從新組織。Relational Cache是EMR Spark支持的重要特性,相似於數據倉庫的物化視圖,將反範式化表(即關聯後的大表)保存爲relational cache,即可以使用cache重寫執行計劃,提升查詢效率。可是A ⋈ B ⋈ C做爲relational cache只能用來優化包含A ⋈ B ⋈ C的查詢,理論上是不能用來優化只包含A ⋈ B或A ⋈ C的查詢。若是表的數量不少,假設有n個表,則總共可能有2^n個關聯結果(固然在業務上並不會出現任意兩個表均可以關聯的狀況,可是可能的關聯數量依然會很是大),將每一個關聯結果都做爲relational cache構建代價太大,並不現實,咱們須要找到一種方式能夠經過單個Relational Cache支持優化多個關聯查詢的方式,從而在加速用戶查詢的同時,減小建立和更新relational cache的代價。Record Preserve Join是支持這種優化的很是有效的方式。app

什麼是Record Preserved Join

定義:對於表A和表B,若是A的每一條記錄都出如今A ⋈ B的結果中,而且A ⋈ B的結果並無其餘額外的記錄,那麼咱們稱A ⋈ B record preserved on A。分佈式

下面咱們看一下一個Record preserved join的簡單示例。函數

如上圖所示,activity_log爲事實表,product,user,city爲維度表,activity_log表經過user_id字段和user表關聯,經過product_id字段與product表關聯,user表經過city_id字段與city表關聯,關聯的結果以下:
oop

若是咱們把上述的activity_log,product,user以及city四表的關聯結果做爲Relational Cache,理論上只有後續的查詢包含這四個表的關聯時,纔可以使用cache優化SQL執行計劃,若是查詢只包含部分表的關聯,好比只是activity_log和user關聯,是沒有辦法使用以前的cache優化的。可是咱們仔細觀察能夠發現,每一行activity_log表中的記錄,其user_id都和user表中的其中一條且僅一條記錄相關聯,對於product_id和city_id的關聯也是一樣如此,能夠看到關聯後的結果,僅僅是爲activity_log表中的每條記錄增長更多的字段,activity_log中的每條記錄和關聯結果中的每條記錄是一一對應的關係,這種join結果就是record preserved join。因爲activity_log和user關聯的數據每一條都和cache中的數據一一對應,很少也很多,因此實際上咱們應該容許使用該cache優化包含activity_log和user關聯的查詢。好比對於上面的示例,咱們把圖2中四表關聯的結果保存爲Relational Cache activity_flat_cache,那麼對於query
優化

SELECT activity_text, user_name FROM activity_log, user WHERE activity.user_id = user.user_id AND user_name = 'jack'

同理,咱們可使用activity_flat_cache優化任意包含activity_log與其餘單個或多個維度表的關聯查詢,使用同一個cache優化多種關聯場景,大大下降relational cache維護和更新所需的存儲和計算成本。ui

爲了可以在Relational Cache中基於Record Preserved Join支持更豐富的優化場景,咱們須要首先解決兩個問題:

  1. Record preserved Join對於關聯兩表的數據有很是嚴格的約束,Relational Cache如何知道兩個表關聯的結果是否爲Record preserved Join。

  2. 已知1的信息,如何推導Relational Cache是否可用於Join查詢的執行計劃優化。

Record Preserved Join聲明

一個常見的Record Preserved Join是Left Outer Join,對於任意的表A和B,A left outer join B record preserved on A。根據表A和B中join key的數據分佈,最多見的也可能知足record preserved join的條件。在EMR Spark中,用戶能夠定義表的主外鍵和NOT NULL約束,經過表的主外鍵和NOT NULL約束,Relational Cache能夠推斷出兩表關聯是否知足Record Preserved Join。

根據外鍵的定義,外鍵的值必須存在其引用的主鍵中,或者爲空,其引用的主鍵又必須不能重複,因此若是存在表A和表B,並且表A中的外鍵字段關聯表B的主鍵,且外鍵字段有NOT NULL約束,則咱們能夠肯定A INNER JOIN B的結果record preserved on A

以上面四個表的關係爲例,在Spark SQL定義主外鍵的DDL示例以下:

ALTER TABLE activity_log ADD CONSTRAINT act_pk PRIMARY KEY (activity_id);ALTER TABLE product ADD CONSTRAINT prd_pk PRIMARY KEY (product_id);ALTER TABLE user ADD CONSTRAINT user_pk PRIMARY KEY (user_id);ALTER TABLE city ADD CONSTRAINT city_pk PRIMARY KEY (city_id);ALTER TABLE activity_log ADD CONSTRAINT act_prd_fk FOREIGN KEY (product_id) REFRENCES product (product_id);ALTER TABLE activity_log ADD CONSTRAINT act_user_fk FOREIGN KEY (user_id) REFRENCES user (user_id);ALTER TABLE user ADD CONSTRAINT user_city_fk FOREIGN KEY (city_id) REFRENCES city (city_id);

結合各外鍵字段的NOT NULL約束,咱們能夠推斷出以下Record Preserved Join:

  • activity_log inner join product record preserved on activity_log

  • activity_log inner join user record preserved on activity_log

  • user inner join city record preserved on user

使用Record Preserved Join優化優化執行計劃

EMR Spark支持經過任意的SQL查詢建立Relational Cache,可能包含關聯,聚合,過濾,投影等各類操做,其中關聯也包括record preserved join和其餘join,如何利用到其中的record preserved join特性對更多的查詢優化其執行計劃,決定了咱們對於Relational Cache的利用效率。Relational Cache經過比較用戶查詢和cache視圖的執行計劃來決定是否可使用cache代替查詢執行計劃或其一部分,在匹配Join時判斷的主要步驟以下:

  1. 收集用戶查詢中的join相關信息,與Relational Cache中join相關信息,找到二者並集,且並集中全部表都是關聯的。

  2. 對於Relational Cache中的除1中並集外的其餘關聯操做,根據用戶定義的約束推斷出來的record preserved join信息,判斷Relational Cache其餘關聯操做的結果是不是record preserved on 並集結果。

  3. 使用cache替換並集,並和用戶查詢中剩餘的其餘表從新拼接join。

  4. 繼續適配執行計劃其餘部分。

例如咱們建立了relational cache (A ⋈ B) ⋈ C,且 A ⋈ B record preserved on A & A ⋈ C record preserved on A, 用戶查詢爲A ⋈ C,

在判斷過程當中,直接從約束的獲得的Record Preserved Join信息可能並不足夠,咱們還須要經過一些定律進一步推理,從而充分利用Record Preserved Join信息優化更多的查詢。

Record Preserved Join推理

根據record preserved join的定義和關係代數的基本原理,咱們能夠推導出以下定理。

  1. 等價


if A full outer join B record preserved on Athen A full outer join B = A left outer join Bif A inner join B record preserved on Athen A inner join B = A left outer join B

已知A left outer join B record preserved on A,若是同時A full outer join B record preserved on A的話,那麼咱們能夠肯定A left outer join B和A full outer join B的結果一致,能夠互相替換。對於Inner Join一樣如此。

  1. 交換

if A ⋈ B record preserved on A then B reverse() A record preserved on A def reverse(join) join match { case INNER => INNER case LEFT OUTER => RIGHT OUTER case FULL OUTER => FULL OUTER }

根據關係代數的基本定義能夠獲得reverse函數,加上record preserved join定義,能夠很方便的推導出此定理。實際的查詢中,join的順序可能和Relational Cache中並不一致,可能須要變換join順序進行比較。

  1. 結合

if A ⋈ B record preserved on A and B ⋈ C record preserved on Bthen A ⋈ B join C record preserved on A

因爲B ⋈ C record preserved on B,能夠認爲B join C的結果是在B表中新增更多的維度列,因此A ⋈ B ⋈ C的結果和A ⋈ B的結果記錄數一致,A ⋈ B record preserved on A,因此A ⋈ B ⋈ C record preserved on A。

同理,也可推導出:

if A ⋈ B record preserved on A and A ⋈ C record preserved on A then (A ⋈ B) ⋈ C record preserved on A and (A ⋈ B), (A ⋈ C) ⋈ B record preserved on A and (A ⋈ C),

1.傳導

if A ⋈ B record preserved on A and B ⋈ C record preserved on Bthen A ⋈ C record preserved on A// same join type, same join key

因爲A ⋈ B record preserved on A和B ⋈ C record preserved on B能夠得知A ⋈ B ⋈ C record perserved on A,若是A ⋈ B中的join字段和A ⋈ C中A的join字段一致,且B ⋈ C中的join字段和A ⋈ C中C的join字段一致,將A ⋈ B ⋈ C結果中的B相關字段去掉,即爲A join C,其結果依然record preserved on A。

雪花數據模型

Relational Cache一個重要的使用場景是決策支持系統,經過BI,報表或多維數據分析快速支持用戶的商業決策。在這種場景中,數據模型一般包括一個事實表(Fact Table)和多個維度表(Dimension Table),對於事實表和維度表的關聯關係,能夠大致分爲三種類型:

  1. Star Schema:全部的維度表都是反範式化(denormalized)的,即維度表只有一層,事實表能夠和任意維度表直接關聯。

  2. Snowflake Schema:全部的維度表都是範式化(normalized)的,即維度表有多層,事實表須要經過屢次關聯才能關聯到所有維度數據。

  3. Starflake Schema:部分維度表是範式化的,部分維度表是反範式化的。

在Star/Snowflake/Starflake數據模型中,事實表和維度表的數據存在着業務上的關聯關係,實際的數據也知足主外鍵/非空字段等約束條件,是驗證在執行計劃優化時使用Record Preserved Join的合適場景。在MOLAP引擎中,例如apache kylin,一般須要用戶描述Star/Snowflake/Starflake數據模型,結合維度和統計列信息構建Cube,用於快速響應多維分析請求。用戶的多維分析查詢可能涉及到事實表和一個或者多個維度表的關聯,實際上Star/Snowflake/Starflake數據模型的定義也隱含着事實表和維度表的Record Preserved Join約束,Relational Cache經過更基礎的字段約束定義,推導出Record Preserved Join,從而支持使用relational cache構建cube,經過執行計劃重寫,知足交互式的多維分析查詢需求。Relational Cache的Record Preserved Join推導不只可用於基於雪花模型的多維分析場景,也能夠用於其餘涉及到Join的場景,拓展relational cache可優化的查詢場景,減小維護的成本和代價。

使用Record Preserved Join優化雪花模型示例

咱們使用第二節中的表及其約束,構建Relational Cache,假設用戶須要進行多維分析,構建一個Full Cube語句以下:

CACHE TABLE activity_cubeUSING parquetAS SELECT product_name, user_name, city_name, count(1), GROUPING_ID() AS grouping_id FROM activity_log, user, product, city WHERE activity_log.product_id = product.product_id and activity_log.user_id = user.user_id and user.city_id = city.city_idGROUP BY CUBE(product_name, user_name, city_name);

用戶查詢以下:

SELECT product_name, count FROM activity_log, product WHERE activity_log.product_id = product.product_id and product_name = 'xxx';

在匹配Join時判斷的主要步驟以下:

  1. cache和用戶查詢join的並集爲:activity_log ⋈ product

  2. cache中剩餘的表爲user和city,這一步可能重複屢次,在第一輪經過activity_log ⋈ user record preserved on activity_log以及activity_log ⋈ product record preserved on activity_log使用結合律2推導出(activity_log ⋈ product) ⋈ user record preserved on (activity_log ⋈ product), 在第二輪使用結合律1和上輪的結果推導出(activity_log ⋈ product) ⋈ user ⋈ city record preserved on (activity_log ⋈ product), 從而得出結論cache能夠用於替換activity_log ⋈ product。

  3. 繼續其餘部分執行計劃的匹配和重寫。



能夠看到,基於Record Preserved Join及其推理,咱們可使用單個大寬表(包含事實表和全部維度表關聯的結果)做爲cache優化全部包含事實表activity_log的關聯查詢,以此爲基礎,咱們構建的activity_cube能夠用於優化基於各個維度組合的查詢,結合咱們在聚合層面的匹配策略,支持Starflake模型數據的交互式多維分析。

總結

Relational Cache經過Spark中表的各類字段約束信息,推導出Record Preserved Join,結合更進一步的推理規則,使得relational cache能夠經過一個寬表的cache優化多種關聯查詢的場景。在star/snowflake/starflake數據模型下,經過將事實表和全部維度表關聯並根據維度聚合後的結果(即Cube)保存爲relational cache後,經過Record Preserved Join的推導,relational cache在執行計劃優化時可使用cube數據重寫各類維度組合的多維分析查詢的執行計劃,從而知足亞秒級響應的交互式分析需求。


本文分享自微信公衆號 - Apache Spark技術交流社區(E-MapReduce_Spark)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索