原文地址https://devblogs.microsoft.com/azure-sql/partitioning-on-spark-fast-loading-clustered-columnstore-index/

介绍

SQL Server的Bulk load默认为串行,这味着例如,一个BULK INSERT语句将生成一个线程将数据插入表中。但是,对于并发负载,您可以使用多个批量插入语句插入同一张表,前提是需要阅读多个文件。

考虑要求所在的情景:

  • 从大文件加载数据(比如,超过 20 GB)
  • 拆分文件不是一个选项,因为它将是整个大容量负载操作中的一个额外步骤。
  • 每个传入的数据文件大小不同,因此很难识别大块数(将文件拆分为)并动态定义为每个大块执行的批量插入语句。
  • 要加载的多个文件跨越多个 GB(例如超过 20 GB 及以上),每个GB 包含数百万条记录。

在这种情况下,使用 Apache Spark是并行批量数据加载到 SQL 表的流行方法之一。

在本文中,我们使用 Azure Databricks spark engine使用单个输入文件将数据以并行流(多个线程将数据加载到表中)插入 SQL Server。目标表可能是HeapClustered IndexClustered Columnstore Index。本文旨在展示如何利用Spark提供的高度分布式框架,在加载到 SQL Server或 Azure SQL的聚集列存储索引表之前仔细对数据分区。

本文中分享的最有趣的观察是展示使用Spark默认配置时列存储表的行组质量降低,以及如何通过高效使用Spark分区来提高质量。从本质上讲,提高行组质量是决定查询性能的重要因素。

环境设置

数据集:

  • 单张表的一个自定义数据集。一个 27 GB 的 CSV 文件,110 M 记录,共 36 列。其中列的类型有int, nvarchar, datetime等。

数据库:

  • Azure SQL Database – Business Critical, Gen5 80vCores

ELT 平台:

  • Azure Databricks – 6.6 (includes Apache Spark 2.4.5, Scala 2.11)
  • Standard_DS3_v2 14.0 GB Memory, 4 Cores, 0.75 DBU (8 Worker Nodes Max)

存储:

  • Azure Data Lake Storage Gen2

先决条件:

在进一步浏览本文之前,请花一些时间了解此处将数据加载到聚集列存储表中的概述:Data Loading performance considerations with Clustered Columnstore indexes

在此测试中,数据从位于 Azure Data Lake Storage Gen 2的 CSV 文件中加载。CSV 文件大小为 27 GB,有 110 M 记录,有 36 列。这是一个带有随机数据的自定义数据集。

批量加载或预处理(ELT\ETL)的典型架构看起来与下图相似:

使用Spark加载数据到SQL Server列存储表-LMLPHP

使用BULK INSERTS    

在第一次测试中,单个BULK INSERT用于将数据加载到带有聚集列存储索引的 Azure SQL 表中,这里没有意外,根据所使用的 BATCHSIZE,它花了 30 多分钟才完成。请记住,BULK INSERT是一个单一的线程操作,因此单个流会读取并将其写入表中,从而降低负载吞吐量

使用Spark加载数据到SQL Server列存储表-LMLPHP

使用Spark加载数据到SQL Server列存储表-LMLPHP

使用Azure Databricks

为了实现写入到 SQL Server和读取ADLS (Azure Data Lake Storage) Gen 2的最大并发性和高吞吐量,Azure Databricks 被选为平台的选择,尽管我们还有其他选择,即 Azure Data Factory或其他基于Spark引擎的平台。

使用Azure Databricks加载数据的优点是 Spark 引擎通过专用的 Spark API并行读取输入文件。这些 API将使用一定数量的分区,这些分区映射到单个或多个输入文件,映射是在文件的一部分或整个文件上完成的。数据读入Spark DataFrame or, DataSet or RDD (Resilient Distributed Dataset) 。在这种情况下,数据被加载到DataFrame中,然后进行转换(设置与目标表匹配的DataFrame schema),然后数据准备写入 SQL 表。

要将DataFrame中的数据写入 SQL Server中,必须使用Microsoft's Apache Spark SQL Connector。这是一个高性能的连接器,使您能够在大数据分析中使用事务数据,和持久化结果用于即席查询或报告。连接器允许您使用任何 SQL Server(本地数据库或云中)作为 Spark 作业的输入数据源或输出目标。

  GitHub repo: Fast Data Loading in Azure SQL DB using Azure Databricks

请注意,目标表具有聚集列存储索引,以实现高负载吞吐量,但是,您也可以将数据加载到Heap,这也将提供良好的负载性能。对于本文的相关性,我们只讨论加载到列存储表。我们使用不同的 BATCHSIZE 值将数据加载到Clustered Columnstore Index中 -请参阅此文档,了解 BATCHSIZE 在批量加载到聚集列存储索引表期间的影响。

以下是Clustered Columnstore Index上的数据加载测试运行,BATCHSIZE为 102400 和 1048576:

使用Spark加载数据到SQL Server列存储表-LMLPHP

请注意,我们正在使用 Azure Databricks使用的默认并行和分区,并将数据直接推至 SQL Server聚集列存储索引表。我们没有调整 Azure Databricks使用的任何默认配置。无论所定义的批次大小,我们所有的测试都大致在同一时间完成。

将数据加载到 SQL 中的 32 个并发线程是由于上述已提供的数据砖群集的大小。该集群最多有 8 个节点,每个节点有 4 个内核,即 8*4 = 32 个内核,最多可运行 32 个并发线程。

查看行组(Row Groups)

有关我们使用 BATCHSIZE 1048576 插入数据的表格,以下是在 SQL 中创建的行组数:

行组总数:

SELECT COUNT(1)
FROM sys.dm_db_column_store_row_group_physical_stats
WHERE object_id = OBJECT_ID('largetable110M_1048576')
216 

行组的质量:

SELECT *
FROM sys.dm_db_column_store_row_group_physical_stats
WHERE object_id = OBJECT_ID('largetable110M_1048576')

使用Spark加载数据到SQL Server列存储表-LMLPHP

在这种情况下,我们只有一个delta store在OPEN状态 (total_rows = 3810) 和 215 行组处于压缩状态, 这是有道理的, 因为如果插入的批次大小是>102400 行, 数据不再delta store存储, 而是直接插入一个压缩行组的列存储。在这种情况下,压缩状态中的所有行组都有 >102400 条记录。现在,有关行组的问题是:

为什么我们有216行组?

为什么当我们的BatchSize设置为 1048576 时,每个行组的行数不同?

请注意,每个行组的数据大约等于上述结果集中的 500000 条记录。

这两个问题的答案是 Azure Databricks Spark引擎对数据分区控制了写入聚集列存储索引表行组的数据行数。让我们来看看 Azure Databricks为有关数据集创建的分区数:

# Get the number of partitions before re-partitioning
print(df_gl.rdd.getNumPartitions())
216

因此,我们为数据集创建了 216 个分区。请记住,这些是分区的默认数。每个分区都有大约 500000 条记录。

# Number of records in each partition
from pyspark.sql.functions
import spark_partition_id
df_gl.withColumn("partitionId", spark_partition_id()).groupBy("partitionId").count().show(10000)

使用Spark加载数据到SQL Server列存储表-LMLPHP

将Spark分区中的记录数与行组中的记录数进行比较,您就会发现它们是相等的。甚至分区数也等于行组数。因此,从某种意义上说,1048576 的 BATCHSIZE 正被每个分区中的行数过度拉大。

sqldbconnection = dbutils.secrets.get(scope = "sqldb-secrets", key = "sqldbconn")
sqldbuser = dbutils.secrets.get(scope = "sqldb-secrets", key = "sqldbuser")
sqldbpwd = dbutils.secrets.get(scope = "sqldb-secrets", key = "sqldbpwd")

servername = "jdbc:sqlserver://" + sqldbconnection url = servername + ";" + "database_name=" + <Your Database Name> + ";"
table_name = "<Your Table Name>"

# Write data to SQL table with BatchSize 1048576
df_gl.write \
.format("com.microsoft.sqlserver.jdbc.spark") \
.mode("overwrite") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", sqldbuser) \
.option("password", sqldbpwd) \
.option("schemaCheckEnabled", False) \
.option("BatchSize", 1048576) \
.option("truncate", True) \
.save()

行组质量

行组质量由行组数和每个行组记录决定。由于聚集列存储索引通过扫描单行组的列段扫描表,则最大化每个行组中的行数可增强查询性能。当行组具有大量行数时,数据压缩会改善,这意味着从磁盘中读取的数据更少。为了获得最佳的查询性能,目标是最大限度地提高聚集列索引中每个行组的行数。行组最多可有 1048576 行。但是,需要注意的是,由于聚集列索引,行组必须至少有 102400 行才能实现性能提升。此外,请记住,行组的最大大小(100万)可能在每一个情况下都达到,文件行组大小不只是最大限制的一个因素,但受到以下因素的影响。

  • 字典大小限制,即 16 MB
  • 插入指定的批次大小
  • 表的分区方案,因为行组不跨分区
  • 内存压力导致行组被修剪
  • 索引重组,重建

话虽如此,现在一个重要的考虑是让行组大小尽可能接近 100 万条记录。在此测试中,由于每个行组的大小接近 500000 条记录,我们有两个选项可以达到约 100 万条记录的大小:

  • 在Spark中,更改分区数,使每个分区尽可能接近 1048576 条记录,
  • 保持Spark分区(默认值),一旦数据加载到表中,就运行 ALTER INDEX REORG,将多个压缩行组组合成一组。

选项#1很容易在Python或Scala代码中实现,该代码将在Azure Databricks上运行,负载相当低。

选项#2是数据加载后需要采取的额外步骤,当然,这将消耗 SQL 上的额外 CPU ,并增加整个加载过程所需的时间。

为了保持本文的相关性,让我们来讨论更多关于Spark分区,以及如何从其默认值及其在下一节的影响中更改它。

Spark Partitioning

Spark 引擎最典型的输入源是一组文件,这些文件通过将每个节点上的适当分区划分为一个或多个 Spark API来读取这些文件。这是 Spark 的自动分区,将用户从确定分区数量的忧虑中抽象出来,如果用户想挑战,就需控制分区的配置。根据环境和环境设置计算的分区的默认数通常适用于大多数情况下。但是,在某些情况下,更好地了解分区是如何自动计算的,如果需要,用户可以更改分区计数,从而在性能上产生明显差异。

注意:大型Spark群集可以生成大量并行线程,这可能导致 Azure SQL DB 上的内存授予争议。由于内存超时,您必须留意这种可能性,以避免提前修剪。请参阅本文以了解更多详细信息,了解表的模式和行数等也可能对内存授予产生影响。

spark.sql.files.maxPartitionBytes是控制分区大小的重要参数,默认设置为128 MB。它可以调整以控制分区大小,因此也会更改由此产生的分区数。

spark.default.parallelism这相当于worker nodes核心的总数。

最后,我们有coalesce()repartition(),可用于增加/减少分区数,甚至在数据已被读入Spark。

只有当您想要减少分区数时,才能使用coalesce() 因为它不涉及数据的重排。请考虑此data frame的分区数为 16,并且您希望将其增加到 32,因此您决定运行以下命令。

df = df.coalesce(32)
print(df.rdd.getNumPartitions())

但是,分区数量不会增加到 32 个,并且将保持在 16 个,因为coalesce()不涉及数据重排。这是一个性能优化的实现,因为无需昂贵的数据重排即可减少分区。

如果您想将上述示例的分区数减少到 8,则会获得预期的结果。

df = df.coalesce(8)
print(df.rdd.getNumPartitions())

这将合并数据并产生 8 个分区。

repartition() 另一个帮助调整分区的函数。对于同一示例,您可以使用以下命令将数据放入 32 个分区。

df = df.repartition(32)
print(df.rdd.getNumPartitions())

最后,还有其他功能可以改变分区数,其中是groupBy(), groupByKey(), reduceByKey() join()。当在 DataFrame 上调用这些功能时,会导致跨机器或通常跨执行器对数据进行重排,最终在默认情况下将数据重新划分为 200 个分区。此默认 数字可以使用spark.sql.shuffle.partitions配置进行控制。

数据加载

现在,了解分区在 Spark 中的工作原理以及如何更改分区,是时候实施这些学习了。在上述实验中,分区数为 216默认情况下),这是因为文件的大小为 27 GB,因此将 27 GB 除以 128 MB(默认情况下由 Spark 定义的最大分区字节)提供了216 个分区

Spark重新分区的影响

对 PySpark 代码的更改是重新分区数据并确保每个分区现在有 1048576 行或接近它。为此,首先在DataFrame中获取记录数量,然后将其除以 1048576。此划分的结果将是用于加载数据的分区数,假设分区数为n但是,可能有一些分区现在有 >=1048576 行,因此,为了确保每个分区都<=1048576行,我们将分区数作为n+1使用n+1在分区结果为 0 的情况下也很重要。在这种情况下,您将有一个分区。

由于数据已加载到DataFrame中,而 Spark 默认已创建分区,我们现在必须再次重新分区数据,分区数等于n+1。

# Get the number of partitions before re-partitioning
print(df_gl.rdd.getNumPartitions())
216

# Get the number of rows of DataFrame and get the number of partitions to be used.
rows = df_gl.count()
n_partitions = rows//1048576
# Re-Partition the DataFrame df_gl_repartitioned = df_gl.repartition(n_partitions+1) # Get the number of partitions after re-partitioning print(df_gl_repartitioned.rdd.getNumPartitions()) 105 # Get the partition id and count of partitions df_gl_repartitioned.withColumn("partitionId",spark_partition_id()).groupBy("partitionId").count().show(10000)

使用Spark加载数据到SQL Server列存储表-LMLPHP

因此,在重新划分分区后,分区数量从216 个减少到 105 (n+1),因此每个分区现在都有接近1048576行。

此时,让我们将数据再次写入 SQL 表中,并验证行组质量。这一次,每个行组的行数将接近每个分区中的行数(略低于 1048576)。让我们看看下面:

重新分区后的行组

SELECT COUNT(1)
FROM sys.dm_db_column_store_row_group_physical_stats
WHERE object_id = OBJECT_ID('largetable110M_1048576')
105

重新分区后的行组质量

使用Spark加载数据到SQL Server列存储表-LMLPHP

从本质上讲,这次整体数据加载比之前慢了 2 秒,但行组的质量要好得多。行组数量减少到一半,行组几乎已填满到最大容量。请注意,由于DataFrame的重新划分,将消耗额外的时间,这取决于数据帧的大小和分区数。

请注意,您不会总是获得每row_group 100 万条记录。它将取决于数据类型、列数等,以及之前讨论的因素-请参阅sys.dm_db_column_store_row_group_physical_stats

关键点

  1. 建议在将数据批量加载到 SQL Server时使用BatchSize(无论是 CCI 还是Heap)。但是,如果 Azure Databricks 或任何其他 Spark 引擎用于加载数据,则数据分区在确定聚集列存储索引中的行组质量方面起着重要作用。
  2. 使用BULK INSERT命令加载数据将遵守命令中提到的BATCHSIZE,除非其他因素影响插入行组的行数。
  3. Spark 中的数据分区不应基于某些随机数,最好动态识别分区数,并将n+1 用作分区数
  4. 由于聚集列存储索引通过扫描单行组的列段扫描表,则最大化每个行组中的记录数可增强查询性能。为了获得最佳的查询性能,目标是最大限度地提高聚集列存储索引中每个行组的行数。
  5. Azure Databricks的数据加载速度在很大程度上取决于选择的集群类型及其配置。此外,请注意,到目前为止,Azure Databricks连接器仅支持Apache Spark 2.4.5。微软已经发布了对Spark 3.0的支持,它目前在预览版中,我们建议您在开发测试环境中彻底测试此连接器。
  6. 根据data frame的大小、列数、数据类型等,进行重新划分的时间会有所不同,因此您必须从端端角度考虑这次对整体数据加载的考虑。

Azure Data Factory

这是一篇非常好的数据ETL文章,Spark和SQL Server列存储表功能的组合。

Azure Data Factory是当前最成熟,功能最强大的ETL/ELT数据集成服务。其架构就是使用Spark作为计算引擎。

使用Spark加载数据到SQL Server列存储表-LMLPHP

https://github.com/mrpaulandrew/A-Day-Full-of-Azure-Data-Factory

03-04 08:26