過往記憶大數據 過往記憶大數據
Delta Lake 支持 DML 命令,包括 DELETE, UPDATE, 以及 MERGE,這些命令簡化了 CDC、審計、治理以及 GDPR/CCPA 工做流等業務場景。在這篇文章中,咱們將演示如何使用這些 DML 命令,並會介紹這些命令的後背實現,同時也會介紹對應命令的一些性能調優技巧。json
若是你是剛剛學習 Delta Lake,那麼你能夠看下本章以便快速瞭解 Delta Lake 的基本原理。本節主要在文件級別上介紹 Delta Lake 表的構建。緩存
在建立新表時,Delta 將數據保存在一系列的 Parquet 文件中,並會在表的根目錄建立 _delta_log 文件夾,其中包含 Delta Lake 的事務日誌,ACID 事務日誌裏面記錄了對應表的每次更改。當您修改表時(例如,經過添加新數據或執行更新、合併或刪除),Delta Lake 將每一個新事務的記錄做爲帶編號的 JSON 文件保存在 delta_log 文件夾中,從 00...00000.json 命名開始,而後依次是 00...00001.json,00...00002.json,以此類推;每10個事務,Delta 還會在 delta_log 文件夾中生成一個「檢查點」 的 Parquet 文件,這個文件容許咱們快速從新建立表的狀態。ide
最終,當咱們查詢 Delta Lake 表時,咱們能夠先讀取事務日誌,以快速肯定哪些數據文件構成了表的最新版本,而不須要列出雲對象存儲中的全部文件,這大大提高了查詢性能。當咱們執行 DML 操做時,Delta Lake 會建立出新的文件,而不是在原來的文件裏面修改它們,並使用事務日誌來記錄全部這些操做,好比哪些文件是新增的。若是想了解更多這方面的知識,能夠參見 《深刻理解 Apache Spark Delta Lake 的事務日誌》 這篇文章。佈局
好了,有了前面的基本介紹,下面咱們就能夠深刻介紹 Delta Lake 的 DML 命令如何使用以及其背後的工做原理。下面的例子是使用 SQL 進行操做的,這個須要咱們使用 Delta Lake 0.7.0 以及 Apache Spark 3.0,詳情請參見 《在 Delta Lake 中啓用 Spark SQL DDL 和 DML》。性能
可使用 UPDATE 操做有選擇地更新與篩選條件(也稱爲謂詞,predicate)匹配的任何行。下面的代碼演示瞭如何將每種類型的謂詞用做 UPDATE 語句的一部分。注意,Delta Lake 的 update 能夠在 Python、Scala 和 SQL 中使用,但出於本文的目的,咱們這裏只是用 SQL 來介紹其使用。學習
-- Update events UPDATE events SET eventType = 'click' WHERE eventType = 'click' UPDATE: Under the hood
UPDATE 在 Delta Lake 的實現是分爲兩步走的:大數據
•首先找到並選擇那些包含與謂詞匹配於是須要更新的數據的文件。這個過程當中 Delta Lake 可使用 data skipping 技術來加速這個過程。data skipping 這個技術看起來是數磚的商業部纔有,開源版本貌似沒看到。
•將每一個匹配的文件讀入內存,更新相關行,並將結果寫入新的數據文件。 ui
整個過程以下:spa
一旦 Delta Lake 成功地執行了 UPDATE,它就會在事務日誌中添加一個提交,代表從如今開始將使用新的數據文件來代替舊的數據文件。可是,舊的數據文件沒有被刪除。相反,它只是被標記成 tombstoned —— 意思是這個文件只屬於表的舊版本數據文件而不是當前版本的數據文件。Delta Lake 可以使用它來提供數據版本控制和時間旅行(time travel)。debug
保留舊的數據文件對於調試很是有用,由於您能夠在任什麼時候候使用 Delta Lake 的時間旅行回到歷史並查詢表的之前版本數據。若是哪天咱們不當心錯誤地更新了表,並但願找出發生了什麼,咱們能夠輕鬆地比較表的兩個版本。
SELECT * FROM events VERSION AS OF 12
提升 Delta Lake 的 UPDATE 命令性能的主要方法是添加更多的謂詞來縮小搜索空間。搜索越具體,Delta Lake 須要掃描和/或修改的文件就越少。
Databricks 的商業版 Delta Lake 具備一些企業性加強,如改進的 data skipping、布隆過濾器(bloom filters)的使用和 Z-Order Optimize ,Z-Order Optimize 從新組織每一個數據文件的佈局,使類似的列值在策略上相互接近,以得到最大效率。
咱們可使用 DELETE 命令並根據謂詞(過濾條件)選擇性地刪除任意行。
DELETE FROM events WHERE date < '2017-01-01'
若是但願恢復意外的刪除操做,可使用時間旅行將表回滾到原來的狀態,以下面的 Python 代碼片斷所示。
# Read correct version of table into memory dt = spark.read.format("delta") \ .option("versionAsOf", 4) \ .load("/tmp/loans_delta") # Overwrite current table with DataFrame in memory dt.write.format("delta") \ .mode("overwrite") \ .save(deltaPath)
DELETE 的工做原理和 UPDATE 同樣。Delta Lake 對數據進行兩次掃描:第一次掃描是識別包含與謂詞條件匹配的行的任何數據文件。第二次掃描將匹配的數據文件讀入內存,此時 Delta Lake 刪除相關的行,而後將未刪除的數據寫入磁盤上的新文件中。
在 Delta Lake 成功完成刪除操做後,舊的數據文件不會被刪除——它們仍然保留在磁盤上,可是在 Delta Lake 事務日誌中這些文件被記錄爲 tombstoned (再也不是活動表的一部分)。記住,那些舊文件不會當即刪除,由於咱們可能須要使用時間旅行功能來跳到更早版本的數據。若是想刪除超過必定時間期限的文件,可使用 VACUUM 命令。
運行 VACUUM 命令永久刪除知足如下條件的全部數據文件:
•再也不是活動表的一部分,而且
•超過保留閾值(默認爲七天)。
Delta Lake 不會自動刪除舊文件——咱們必須本身運行 VACUUM 命令,以下所示。若是咱們但願指定一個與默認值不一樣的保留期限,那麼咱們能夠將其做爲參數提供。
from delta.tables import * # vacuum files not required by versions older than the default # retention period, which is 168 hours (7 days) by default dt.vacuum() deltaTable.vacuum(48) # vacuum files older than 48 hours
注意:運行 VACUUM 命令時指定的保留時間爲0小時將刪除表的最新版本中未使用的全部文件。請確保運行這個命令時,對應的表沒有寫操做,不然可能會發生數據丟失。
DELETE:性能調優
與 UPDATE 命令同樣,提升 Delta Lake DELETE 操做性能的主要方法是添加更多謂詞來縮小搜索空間。Databricks 的商業版 Delta Lake 具備一些企業性加強,如改進的 data skipping、布隆過濾器(bloom filters)的使用和 Z-Order Optimize 。
Delta Lake 的 MERGE 命令容許咱們實現 upserts 語義,其實 UPDATE 和 INSERT 的混合。爲了理解 upserts 的含義,假設咱們有一張表(目標表)和一張源表,其中包含新記錄和對現有記錄的更新。upsert 是這樣工做的:
•當源表中的一條記錄與目標表中現有的記錄相匹配時,Delta Lake 將更新該記錄。
•當沒有匹配時,Delta Lake 將插入新記錄。
MERGE INTO events USING updates ON events.eventId = updates.eventId WHEN MATCHED THEN UPDATE SET events.data = updates.data WHEN NOT MATCHED THEN INSERT (date, eventId, data) VALUES (date, eventId, data)
Delta Lake 的 MERGE 命令極大地簡化了工做流。
Delta Lake 經過下面兩步實現 MERGE:
•在目標表和源表之間執行 inner join,以選擇全部匹配的文件。
•在目標表中選中的文件和源表之間執行 outer join ,並寫出更新/刪除/插入的數據。
MERGE 的實現與 UPDATE 或者 DELETE 的主要區別是其使用了 join,這一事實容許咱們在尋求提升性能時使用一些獨特的策略。
爲了提高 MERGE 的執行性能,咱們須要瞭解上面兩個 join 中的哪一個影響了程序的執行。
若是 inner join 是執行 MERGE 的瓶頸(好比找到 Delta Lake 須要重寫的文件花費的時間太長了),那麼咱們能夠採用如下的策略解決:
•添加更多的過濾條件來減小搜索空間;
•調整 shuffle partition 的數量;
•調整 broadcast join 的閾值;.
•若是表中有不少小文件,咱們能夠先壓縮合並它們;但不要將它們壓縮爲太大的文件,由於 Delta Lake 必須複製整個文件來重寫它。
若是 outer join 是執行 MERGE 的瓶頸(好比重寫文件花費的時間太長了),那麼咱們能夠採用如下的策略解決:
•調整 shuffle partition 的數量
•這個對分區表可能會生成不少的小文件;
•在寫文件以前開啓自動重分區來減小 Reduce 產生的文件。
•調整 broadcast 的閾值。若是咱們使用 full outer join, Spark 則沒法進行 broadcast join, 可是若是咱們使用 right outer join, Spark 則可使用;咱們能夠根據實際狀況來調整 broadcast 的閾值;
•緩存 source table / DataFrame.
•緩存 source table 能夠加快第二次的掃描時間,可是記住別緩存 target table,由於這可能會致使緩存一致性問題。
Delta Lake支持 UPDATE、DELETE 以及 MERGE INTO 等 DML 命令,這極大地簡化了許多常見大數據操做的工做流程。在本文中,咱們演示瞭如何在 Delta Lake 中使用這些命令,介紹了這些 DML 命令的實現原理,並提供了一些性能調優技巧。