使用Spark加載數據到SQL Server列存儲表

原文地址https://devblogs.microsoft.com/azure-sql/partitioning-on-spark-fast-loading-clustered-columnstore-index/html

介紹

SQL Server的Bulk load默認爲串行,這味着例如,一個BULK INSERT語句將生成一個線程將數據插入表中。可是,對於併發負載,您可使用多個批量插入語句插入同一張表,前提是須要閱讀多個文件。node

考慮要求所在的情景:git

  • 從大文件加載數據(好比,超過 20 GB)
  • 拆分文件不是一個選項,由於它將是整個大容量負載操做中的一個額外步驟。
  • 每一個傳入的數據文件大小不一樣,所以很難識別大塊數(將文件拆分爲)並動態定義爲每一個大塊執行的批量插入語句。
  • 要加載的多個文件跨越多個 GB(例如超過 20 GB 及以上),每一個GB 包含數百萬條記錄。

在這種狀況下,使用 Apache Spark是並行批量數據加載到 SQL 表的流行方法之一。github

在本文中,咱們使用 Azure Databricks spark engine使用單個輸入文件將數據以並行流(多個線程將數據加載到表中)插入 SQL Server。目標表多是HeapClustered IndexClustered Columnstore Index。本文旨在展現如何利用Spark提供的高度分佈式框架,在加載到 SQL Server或 Azure SQL的彙集列存儲索引表以前仔細對數據分區。sql

本文中分享的最有趣的觀察是展現使用Spark默認配置時列存儲表的行組質量下降,以及如何經過高效使用Spark分區來提升質量。從本質上講,提升行組質量是決定查詢性能的重要因素。數據庫

 

環境設置

數據集:apache

  • 單張表的一個自定義數據集。一個 27 GB 的 CSV 文件,110 M 記錄,共 36 列。其中列的類型有int, nvarchar, datetime等。

數據庫:性能優化

  • Azure SQL Database – Business Critical, Gen5 80vCores

ELT 平臺:架構

  • Azure Databricks – 6.6 (includes Apache Spark 2.4.5, Scala 2.11)
  • Standard_DS3_v2 14.0 GB Memory, 4 Cores, 0.75 DBU (8 Worker Nodes Max)

存儲:併發

  • Azure Data Lake Storage Gen2

先決條件:

在進一步瀏覽本文以前,請花一些時間瞭解此處將數據加載到彙集列存儲表中的概述:Data Loading performance considerations with Clustered Columnstore indexes

在此測試中,數據從位於 Azure Data Lake Storage Gen 2的 CSV 文件中加載。CSV 文件大小爲 27 GB,有 110 M 記錄,有 36 列。這是一個帶有隨機數據的自定義數據集。

批量加載或預處理(ELT\ETL)的典型架構看起來與下圖類似:

使用BULK INSERTS    

在第一次測試中,單個BULK INSERT用於將數據加載到帶有彙集列存儲索引的 Azure SQL 表中,這裏沒有意外,根據所使用的 BATCHSIZE,它花了 30 多分鐘才完成。請記住,BULK INSERT是一個單一的線程操做,所以單個流會讀取並將其寫入表中,從而下降負載吞吐量

 

使用Azure Databricks

爲了實現寫入到 SQL Server和讀取ADLS (Azure Data Lake Storage) Gen 2的最大併發性和高吞吐量,Azure Databricks 被選爲平臺的選擇,儘管咱們還有其餘選擇,即 Azure Data Factory或其餘基於Spark引擎的平臺。

使用Azure Databricks加載數據的優勢是 Spark 引擎經過專用的 Spark API並行讀取輸入文件。這些 API將使用必定數量的分區,這些分區映射到單個或多個輸入文件,映射是在文件的一部分或整個文件上完成的。數據讀入Spark DataFrame or, DataSet or RDD (Resilient Distributed Dataset) 。在這種狀況下,數據被加載到DataFrame中,而後進行轉換(設置與目標表匹配的DataFrame schema),而後數據準備寫入 SQL 表。

要將DataFrame中的數據寫入 SQL Server中,必須使用Microsoft's Apache Spark SQL Connector。這是一個高性能的鏈接器,使您可以在大數據分析中使用事務數據,和持久化結果用於即席查詢或報告。鏈接器容許您使用任何 SQL Server(本地數據庫或雲中)做爲 Spark 做業的輸入數據源或輸出目標。

  GitHub repo: Fast Data Loading in Azure SQL DB using Azure Databricks

請注意,目標表具備彙集列存儲索引,以實現高負載吞吐量,可是,您也能夠將數據加載到Heap,這也將提供良好的負載性能。對於本文的相關性,咱們只討論加載到列存儲表。咱們使用不一樣的 BATCHSIZE 值將數據加載到Clustered Columnstore Index中 -請參閱此文檔,瞭解 BATCHSIZE 在批量加載到彙集列存儲索引表期間的影響。

如下是Clustered Columnstore Index上的數據加載測試運行,BATCHSIZE爲 102400 和 1048576:

 

 

 

 

請注意,咱們正在使用 Azure Databricks使用的默認並行和分區,並將數據直接推至 SQL Server彙集列存儲索引表。咱們沒有調整 Azure Databricks使用的任何默認配置。不管所定義的批次大小,咱們全部的測試都大體在同一時間完成。

將數據加載到 SQL 中的 32 個併發線程是因爲上述已提供的數據磚羣集的大小。該集羣最多有 8 個節點,每一個節點有 4 個內核,即 8*4 = 32 個內核,最多可運行 32 個併發線程。

查看行組(Row Groups)

有關咱們使用 BATCHSIZE 1048576 插入數據的表格,如下是在 SQL 中建立的行組數:

行組總數:

SELECT COUNT(1)
FROM sys.dm_db_column_store_row_group_physical_stats
WHERE object_id = OBJECT_ID('largetable110M_1048576')
216 

行組的質量:

SELECT *
FROM sys.dm_db_column_store_row_group_physical_stats
WHERE object_id = OBJECT_ID('largetable110M_1048576')

在這種狀況下,咱們只有一個delta store在OPEN狀態 (total_rows = 3810) 和 215 行組處於壓縮狀態, 這是有道理的, 由於若是插入的批次大小是>102400 行, 數據再也不delta store存儲, 而是直接插入一個壓縮行組的列存儲。在這種狀況下,壓縮狀態中的全部行組都有 >102400 條記錄。如今,有關行組的問題是:

爲何咱們有216行組?

爲何當咱們的BatchSize設置爲 1048576 時,每一個行組的行數不一樣?

請注意,每一個行組的數據大約等於上述結果集中的 500000 條記錄。

這兩個問題的答案是 Azure Databricks Spark引擎對數據分區控制了寫入彙集列存儲索引錶行組的數據行數。讓咱們來看看 Azure Databricks爲有關數據集建立的分區數:

# Get the number of partitions before re-partitioning
print(df_gl.rdd.getNumPartitions())
216

所以,咱們爲數據集建立了 216 個分區。請記住,這些是分區的默認數。每一個分區都有大約 500000 條記錄。

# Number of records in each partition
from pyspark.sql.functions
import spark_partition_id
df_gl.withColumn("partitionId", spark_partition_id()).groupBy("partitionId").count().show(10000)

將Spark分區中的記錄數與行組中的記錄數進行比較,您就會發現它們是相等的。甚至分區數也等於行組數。所以,從某種意義上說,1048576 的 BATCHSIZE 正被每一個分區中的行數過分拉大。

sqldbconnection = dbutils.secrets.get(scope = "sqldb-secrets", key = "sqldbconn")
sqldbuser = dbutils.secrets.get(scope = "sqldb-secrets", key = "sqldbuser")
sqldbpwd = dbutils.secrets.get(scope = "sqldb-secrets", key = "sqldbpwd")

servername = "jdbc:sqlserver://" + sqldbconnection url = servername + ";" + "database_name=" + <Your Database Name> + ";"
table_name = "<Your Table Name>"

# Write data to SQL table with BatchSize 1048576
df_gl.write \
.format("com.microsoft.sqlserver.jdbc.spark") \
.mode("overwrite") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", sqldbuser) \
.option("password", sqldbpwd) \
.option("schemaCheckEnabled", False) \
.option("BatchSize", 1048576) \
.option("truncate", True) \
.save()

行組質量

行組質量由行組數和每一個行組記錄決定。因爲彙集列存儲索引經過掃描單行組的列段掃描表,則最大化每一個行組中的行數可加強查詢性能。當行組具備大量行數時,數據壓縮會改善,這意味着從磁盤中讀取的數據更少。爲了得到最佳的查詢性能,目標是最大限度地提升彙集列索引中每一個行組的行數。行組最多可有 1048576 行。可是,須要注意的是,因爲彙集列索引,行組必須至少有 102400 行才能實現性能提高。此外,請記住,行組的最大大小(100萬)可能在每個狀況下都達到,文件行組大小不僅是最大限制的一個因素,但受到如下因素的影響。

  • 字典大小限制,即 16 MB
  • 插入指定的批次大小
  • 表的分區方案,由於行組不跨分區
  • 內存壓力致使行組被修剪
  • 索引重組,重建

話雖如此,如今一個重要的考慮是讓行組大小盡量接近 100 萬條記錄。在此測試中,因爲每一個行組的大小接近 500000 條記錄,咱們有兩個選項能夠達到約 100 萬條記錄的大小:

  • 在Spark中,更改分區數,使每一個分區儘量接近 1048576 條記錄,
  • 保持Spark分區(默認值),一旦數據加載到表中,就運行 ALTER INDEX REORG,將多個壓縮行組組合成一組。

選項#1很容易在Python或Scala代碼中實現,該代碼將在Azure Databricks上運行,負載至關低。

選項#2是數據加載後須要採起的額外步驟,固然,這將消耗 SQL 上的額外 CPU ,並增長整個加載過程所需的時間。

爲了保持本文的相關性,讓咱們來討論更多關於Spark分區,以及如何從其默認值及其在下一節的影響中更改它。

 

Spark Partitioning

Spark 引擎最典型的輸入源是一組文件,這些文件經過將每一個節點上的適當分區劃分爲一個或多個 Spark API來讀取這些文件。這是 Spark 的自動分區,將用戶從肯定分區數量的憂慮中抽象出來,若是用戶想挑戰,就需控制分區的配置。根據環境和環境設置計算的分區的默認數一般適用於大多數狀況下。可是,在某些狀況下,更好地瞭解分區是如何自動計算的,若是須要,用戶能夠更改分區計數,從而在性能上產生明顯差別。

注意:大型Spark羣集能夠生成大量並行線程,這可能致使 Azure SQL DB 上的內存授予爭議。因爲內存超時,您必須留意這種可能性,以免提早修剪。請參閱本文以瞭解更多詳細信息,瞭解表的模式和行數等也可能對內存授予產生影響。

spark.sql.files.maxPartitionBytes是控制分區大小的重要參數,默認設置爲128 MB。它能夠調整以控制分區大小,所以也會更改由此產生的分區數。

spark.default.parallelism這至關於worker nodes核心的總數。

最後,咱們有coalesce()repartition(),可用於增長/減小分區數,甚至在數據已被讀入Spark。

只有當您想要減小分區數時,才能使用coalesce() 由於它不涉及數據的重排。請考慮此data frame的分區數爲 16,而且您但願將其增長到 32,所以您決定運行如下命令。

df = df.coalesce(32)
print(df.rdd.getNumPartitions())

可是,分區數量不會增長到 32 個,而且將保持在 16 個,由於coalesce()不涉及數據重排。這是一個性能優化的實現,由於無需昂貴的數據重排便可減小分區。

若是您想將上述示例的分區數減小到 8,則會得到預期的結果。

df = df.coalesce(8)
print(df.rdd.getNumPartitions())

這將合併數據併產生 8 個分區。

repartition() 另外一個幫助調整分區的函數。對於同一示例,您可使用如下命令將數據放入 32 個分區。

df = df.repartition(32)
print(df.rdd.getNumPartitions())

最後,還有其餘功能能夠改變分區數,其中是groupBy(), groupByKey(), reduceByKey() join()。當在 DataFrame 上調用這些功能時,會致使跨機器或一般跨執行器對數據進行重排,最終在默認狀況下將數據從新劃分爲 200 個分區。此默認 數字可使用spark.sql.shuffle.partitions配置進行控制。

 

數據加載

如今,瞭解分區在 Spark 中的工做原理以及如何更改分區,是時候實施這些學習了。在上述實驗中,分區數爲 216默認狀況下),這是由於文件的大小爲 27 GB,所以將 27 GB 除以 128 MB(默認狀況下由 Spark 定義的最大分區字節)提供了216 個分區

Spark從新分區的影響

對 PySpark 代碼的更改是從新分區數據並確保每一個分區如今有 1048576 行或接近它。爲此,首先在DataFrame中獲取記錄數量,而後將其除以 1048576。此劃分的結果將是用於加載數據的分區數,假設分區數爲n可是,可能有一些分區如今有 >=1048576 行,所以,爲了確保每一個分區都<=1048576行,咱們將分區數做爲n+1使用n+1在分區結果爲 0 的狀況下也很重要。在這種狀況下,您將有一個分區。

因爲數據已加載到DataFrame中,而 Spark 默認已建立分區,咱們如今必須再次從新分區數據,分區數等於n+1。

# Get the number of partitions before re-partitioning
print(df_gl.rdd.getNumPartitions())
216
 
# Get the number of rows of DataFrame and get the number of partitions to be used.
rows = df_gl.count()
n_partitions = rows//1048576
# Re-Partition the DataFrame df_gl_repartitioned = df_gl.repartition(n_partitions+1) # Get the number of partitions after re-partitioning print(df_gl_repartitioned.rdd.getNumPartitions()) 105 # Get the partition id and count of partitions df_gl_repartitioned.withColumn("partitionId",spark_partition_id()).groupBy("partitionId").count().show(10000)

所以,在從新劃分分區後,分區數量從216 個減小到 105 (n+1),所以每一個分區如今都有接近1048576行。

此時,讓咱們將數據再次寫入 SQL 表中,並驗證行組質量。這一次,每一個行組的行數將接近每一個分區中的行數(略低於 1048576)。讓咱們看看下面:

從新分區後的行組

SELECT COUNT(1)
FROM sys.dm_db_column_store_row_group_physical_stats 
WHERE object_id = OBJECT_ID('largetable110M_1048576')
105

從新分區後的行組質量

從本質上講,此次總體數據加載比以前慢了 2 秒,但行組的質量要好得多。行組數量減小到一半,行組幾乎已填滿到最大容量。請注意,因爲DataFrame的從新劃分,將消耗額外的時間,這取決於數據幀的大小和分區數。

請注意,您不會老是得到每row_group 100 萬條記錄。它將取決於數據類型、列數等,以及以前討論的因素-請參閱sys.dm_db_column_store_row_group_physical_stats

 

關鍵點

  1. 建議在將數據批量加載到 SQL Server時使用BatchSize(不管是 CCI 仍是Heap)。可是,若是 Azure Databricks 或任何其餘 Spark 引擎用於加載數據,則數據分區在肯定彙集列存儲索引中的行組質量方面起着重要做用。
  2. 使用BULK INSERT命令加載數據將遵照命令中提到的BATCHSIZE,除非其餘因素影響插入行組的行數。
  3. Spark 中的數據分區不該基於某些隨機數,最好動態識別分區數,並將n+1 用做分區數
  4. 因爲彙集列存儲索引經過掃描單行組的列段掃描表,則最大化每一個行組中的記錄數可加強查詢性能。爲了得到最佳的查詢性能,目標是最大限度地提升彙集列存儲索引中每一個行組的行數。
  5. Azure Databricks的數據加載速度在很大程度上取決於選擇的集羣類型及其配置。此外,請注意,到目前爲止,Azure Databricks鏈接器僅支持Apache Spark 2.4.5。微軟已經發布了對Spark 3.0的支持,它目前在預覽版中,咱們建議您在開發測試環境中完全測試此鏈接器。
  6. 根據data frame的大小、列數、數據類型等,進行從新劃分的時間會有所不一樣,所以您必須從端端角度考慮此次對總體數據加載的考慮。

 

Azure Data Factory

這是一篇很是好的數據ETL文章,Spark和SQL Server列存儲表功能的組合。

Azure Data Factory是當前最成熟,功能最強大的ETL/ELT數據集成服務。其架構就是使用Spark做爲計算引擎。

https://github.com/mrpaulandrew/A-Day-Full-of-Azure-Data-Factory

相關文章
相關標籤/搜索