SQL Server的Bulk load默認爲串行,這意味着例如,一個BULK INSERT語句將生成一個線程將數據插入表中。可是,對於併發負載,您可使用多個批量插入語句插入同一張表,前提是須要閱讀多個文件。node
考慮要求所在的情景:git
在這種狀況下,使用 Apache Spark是並行批量數據加載到 SQL 表的流行方法之一。github
在本文中,咱們使用 Azure Databricks spark engine使用單個輸入文件將數據以並行流(多個線程將數據加載到表中)插入 SQL Server。目標表多是Heap、Clustered Index或Clustered Columnstore Index。本文旨在展現如何利用Spark提供的高度分佈式框架,在加載到 SQL Server或 Azure SQL的彙集列存儲索引表以前仔細對數據分區。sql
本文中分享的最有趣的觀察是展現使用Spark默認配置時列存儲表的行組質量下降,以及如何經過高效使用Spark分區來提升質量。從本質上講,提升行組質量是決定查詢性能的重要因素。數據庫
數據集:apache
數據庫:性能優化
ELT 平臺:架構
存儲:併發
先決條件:
在進一步瀏覽本文以前,請花一些時間瞭解此處將數據加載到彙集列存儲表中的概述:Data Loading performance considerations with Clustered Columnstore indexes
在此測試中,數據從位於 Azure Data Lake Storage Gen 2的 CSV 文件中加載。CSV 文件大小爲 27 GB,有 110 M 記錄,有 36 列。這是一個帶有隨機數據的自定義數據集。
批量加載或預處理(ELT\ETL)的典型架構看起來與下圖類似:
在第一次測試中,單個BULK INSERT用於將數據加載到帶有彙集列存儲索引的 Azure SQL 表中,這裏沒有意外,根據所使用的 BATCHSIZE,它花了 30 多分鐘才完成。請記住,BULK INSERT是一個單一的線程操做,所以單個流會讀取並將其寫入表中,從而下降負載吞吐量。
爲了實現寫入到 SQL Server和讀取ADLS (Azure Data Lake Storage) Gen 2的最大併發性和高吞吐量,Azure Databricks 被選爲平臺的選擇,儘管咱們還有其餘選擇,即 Azure Data Factory或其餘基於Spark引擎的平臺。
使用Azure Databricks加載數據的優勢是 Spark 引擎經過專用的 Spark API並行讀取輸入文件。這些 API將使用必定數量的分區,這些分區映射到單個或多個輸入文件,映射是在文件的一部分或整個文件上完成的。數據讀入Spark DataFrame or, DataSet or RDD (Resilient Distributed Dataset) 。在這種狀況下,數據被加載到DataFrame中,而後進行轉換(設置與目標表匹配的DataFrame schema),而後數據準備寫入 SQL 表。
要將DataFrame中的數據寫入 SQL Server中,必須使用Microsoft's Apache Spark SQL Connector。這是一個高性能的鏈接器,使您可以在大數據分析中使用事務數據,和持久化結果用於即席查詢或報告。鏈接器容許您使用任何 SQL Server(本地數據庫或雲中)做爲 Spark 做業的輸入數據源或輸出目標。
GitHub repo: Fast Data Loading in Azure SQL DB using Azure Databricks
請注意,目標表具備彙集列存儲索引,以實現高負載吞吐量,可是,您也能夠將數據加載到Heap,這也將提供良好的負載性能。對於本文的相關性,咱們只討論加載到列存儲表。咱們使用不一樣的 BATCHSIZE 值將數據加載到Clustered Columnstore Index中 -請參閱此文檔,瞭解 BATCHSIZE 在批量加載到彙集列存儲索引表期間的影響。
如下是Clustered Columnstore Index上的數據加載測試運行,BATCHSIZE爲 102400 和 1048576:
請注意,咱們正在使用 Azure Databricks使用的默認並行和分區,並將數據直接推至 SQL Server彙集列存儲索引表。咱們沒有調整 Azure Databricks使用的任何默認配置。不管所定義的批次大小,咱們全部的測試都大體在同一時間完成。
將數據加載到 SQL 中的 32 個併發線程是因爲上述已提供的數據磚羣集的大小。該集羣最多有 8 個節點,每一個節點有 4 個內核,即 8*4 = 32 個內核,最多可運行 32 個併發線程。
有關咱們使用 BATCHSIZE 1048576 插入數據的表格,如下是在 SQL 中建立的行組數:
SELECT COUNT(1) FROM sys.dm_db_column_store_row_group_physical_stats WHERE object_id = OBJECT_ID('largetable110M_1048576') 216
SELECT * FROM sys.dm_db_column_store_row_group_physical_stats WHERE object_id = OBJECT_ID('largetable110M_1048576')
在這種狀況下,咱們只有一個delta store在OPEN狀態 (total_rows = 3810) 和 215 行組處於壓縮狀態, 這是有道理的, 由於若是插入的批次大小是>102400 行, 數據再也不delta store存儲, 而是直接插入一個壓縮行組的列存儲。在這種狀況下,壓縮狀態中的全部行組都有 >102400 條記錄。如今,有關行組的問題是:
爲何咱們有216行組?
爲何當咱們的BatchSize設置爲 1048576 時,每一個行組的行數不一樣?
請注意,每一個行組的數據大約等於上述結果集中的 500,000 條記錄。
這兩個問題的答案是 Azure Databricks Spark引擎對數據分區控制了寫入彙集列存儲索引錶行組的數據行數。讓咱們來看看 Azure Databricks爲有關數據集建立的分區數:
# Get the number of partitions before re-partitioning print(df_gl.rdd.getNumPartitions()) 216
所以,咱們爲數據集建立了 216 個分區。請記住,這些是分區的默認數。每一個分區都有大約 500000 條記錄。
# Number of records in each partition from pyspark.sql.functions import spark_partition_id df_gl.withColumn("partitionId", spark_partition_id()).groupBy("partitionId").count().show(10000)
將Spark分區中的記錄數與行組中的記錄數進行比較,您就會發現它們是相等的。甚至分區數也等於行組數。所以,從某種意義上說,1048576 的 BATCHSIZE 正被每一個分區中的行數過分拉大。
sqldbconnection = dbutils.secrets.get(scope = "sqldb-secrets", key = "sqldbconn") sqldbuser = dbutils.secrets.get(scope = "sqldb-secrets", key = "sqldbuser") sqldbpwd = dbutils.secrets.get(scope = "sqldb-secrets", key = "sqldbpwd") servername = "jdbc:sqlserver://" + sqldbconnection url = servername + ";" + "database_name=" + <Your Database Name> + ";" table_name = "<Your Table Name>" # Write data to SQL table with BatchSize 1048576 df_gl.write \ .format("com.microsoft.sqlserver.jdbc.spark") \ .mode("overwrite") \ .option("url", url) \ .option("dbtable", table_name) \ .option("user", sqldbuser) \ .option("password", sqldbpwd) \ .option("schemaCheckEnabled", False) \ .option("BatchSize", 1048576) \ .option("truncate", True) \ .save()
行組質量由行組數和每一個行組記錄決定。因爲彙集列存儲索引經過掃描單行組的列段掃描表,則最大化每一個行組中的行數可加強查詢性能。當行組具備大量行數時,數據壓縮會改善,這意味着從磁盤中讀取的數據更少。爲了得到最佳的查詢性能,目標是最大限度地提升彙集列索引中每一個行組的行數。行組最多可有 1048576 行。可是,須要注意的是,因爲彙集列索引,行組必須至少有 102400 行才能實現性能提高。此外,請記住,行組的最大大小(100萬)可能在每個狀況下都達到,文件行組大小不僅是最大限制的一個因素,但受到如下因素的影響。
話雖如此,如今一個重要的考慮是讓行組大小盡量接近 100 萬條記錄。在此測試中,因爲每一個行組的大小接近 500000 條記錄,咱們有兩個選項能夠達到約 100 萬條記錄的大小:
選項#1很容易在Python或Scala代碼中實現,該代碼將在Azure Databricks上運行,負載至關低。
選項#2是數據加載後須要採起的額外步驟,固然,這將消耗 SQL 上的額外 CPU ,並增長整個加載過程所需的時間。
爲了保持本文的相關性,讓咱們來討論更多關於Spark分區,以及如何從其默認值及其在下一節的影響中更改它。
Spark 引擎最典型的輸入源是一組文件,這些文件經過將每一個節點上的適當分區劃分爲一個或多個 Spark API來讀取這些文件。這是 Spark 的自動分區,將用戶從肯定分區數量的憂慮中抽象出來,若是用戶想挑戰,就需控制分區的配置。根據環境和環境設置計算的分區的默認數一般適用於大多數狀況下。可是,在某些狀況下,更好地瞭解分區是如何自動計算的,若是須要,用戶能夠更改分區計數,從而在性能上產生明顯差別。
注意:大型Spark羣集能夠生成大量並行線程,這可能致使 Azure SQL DB 上的內存授予爭議。因爲內存超時,您必須留意這種可能性,以免提早修剪。請參閱本文以瞭解更多詳細信息,瞭解表的模式和行數等也可能對內存授予產生影響。
spark.sql.files.maxPartitionBytes是控制分區大小的重要參數,默認設置爲128 MB。它能夠調整以控制分區大小,所以也會更改由此產生的分區數。
spark.default.parallelism這至關於worker nodes核心的總數。
最後,咱們有coalesce()和repartition(),可用於增長/減小分區數,甚至在數據已被讀入Spark。
只有當您想要減小分區數時,才能使用coalesce() ,由於它不涉及數據的重排。請考慮此data frame的分區數爲 16,而且您但願將其增長到 32,所以您決定運行如下命令。
df = df.coalesce(32) print(df.rdd.getNumPartitions())
可是,分區數量不會增長到 32 個,而且將保持在 16 個,由於coalesce()不涉及數據重排。這是一個性能優化的實現,由於無需昂貴的數據重排便可減小分區。
若是您想將上述示例的分區數減小到 8,則會得到預期的結果。
df = df.coalesce(8) print(df.rdd.getNumPartitions())
這將合併數據併產生 8 個分區。
repartition() 是另外一個幫助調整分區的函數。對於同一示例,您可使用如下命令將數據放入 32 個分區。
df = df.repartition(32) print(df.rdd.getNumPartitions())
最後,還有其餘功能能夠改變分區數,其中是groupBy(), groupByKey(), reduceByKey() 和 join()。當在 DataFrame 上調用這些功能時,會致使跨機器或一般跨執行器對數據進行重排,最終在默認狀況下將數據從新劃分爲 200 個分區。此默認 數字可使用spark.sql.shuffle.partitions配置進行控制。
如今,瞭解分區在 Spark 中的工做原理以及如何更改分區,是時候實施這些學習了。在上述實驗中,分區數爲 216(默認狀況下),這是由於文件的大小爲 27 GB,所以將 27 GB 除以 128 MB(默認狀況下由 Spark 定義的最大分區字節)提供了216 個分區。
對 PySpark 代碼的更改是從新分區數據並確保每一個分區如今有 1048576 行或接近它。爲此,首先在DataFrame中獲取記錄數量,而後將其除以 1048576。此劃分的結果將是用於加載數據的分區數,假設分區數爲n。可是,可能有一些分區如今有 >=1048576 行,所以,爲了確保每一個分區都<=1048576行,咱們將分區數做爲n+1。使用n+1在分區結果爲 0 的狀況下也很重要。在這種狀況下,您將有一個分區。
因爲數據已加載到DataFrame中,而 Spark 默認已建立分區,咱們如今必須再次從新分區數據,分區數等於n+1。
# Get the number of partitions before re-partitioning print(df_gl.rdd.getNumPartitions()) 216 # Get the number of rows of DataFrame and get the number of partitions to be used. rows = df_gl.count() n_partitions = rows//1048576
# Re-Partition the DataFrame df_gl_repartitioned = df_gl.repartition(n_partitions+1) # Get the number of partitions after re-partitioning print(df_gl_repartitioned.rdd.getNumPartitions()) 105 # Get the partition id and count of partitions df_gl_repartitioned.withColumn("partitionId",spark_partition_id()).groupBy("partitionId").count().show(10000)
所以,在從新劃分分區後,分區數量從216 個減小到 105 (n+1),所以每一個分區如今都有接近1048576行。
此時,讓咱們將數據再次寫入 SQL 表中,並驗證行組質量。這一次,每一個行組的行數將接近每一個分區中的行數(略低於 1048576)。讓咱們看看下面:
SELECT COUNT(1) FROM sys.dm_db_column_store_row_group_physical_stats WHERE object_id = OBJECT_ID('largetable110M_1048576') 105
從本質上講,此次總體數據加載比以前慢了 2 秒,但行組的質量要好得多。行組數量減小到一半,行組幾乎已填滿到最大容量。請注意,因爲DataFrame的從新劃分,將消耗額外的時間,這取決於數據幀的大小和分區數。
請注意,您不會老是得到每row_group 100 萬條記錄。它將取決於數據類型、列數等,以及以前討論的因素-請參閱sys.dm_db_column_store_row_group_physical_stats
這是一篇很是好的數據ETL文章,Spark和SQL Server列存儲表功能的組合。
Azure Data Factory是當前最成熟,功能最強大的ETL/ELT數據集成服務。其架構就是使用Spark做爲計算引擎。
https://github.com/mrpaulandrew/A-Day-Full-of-Azure-Data-Factory