问题描述
我正在使用下面的代码将43列和大约2,000,000行的DataFrame写入SQL Server中的表:
I am using the code below to write a DataFrame of 43 columns and about 2,000,000 rows into a table in SQL Server:
dataFrame
.write
.format("jdbc")
.mode("overwrite")
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
.option("url", url)
.option("dbtable", tablename)
.option("user", user)
.option("password", password)
.save()
遗憾的是,虽然它确实适用于小型DataFrame,但它要么非常慢,要么对于大型DataFrame超时.关于如何优化它的任何提示?
Sadly, while it does work for small DataFrames it's either extremely slow or gets timed out for large ones. Any hints on how to optimize it?
我尝试设置rewriteBatchedStatements=true
谢谢.
推荐答案
我们求助于使用 azure -sqldb-spark 库,而不是Spark的默认内置导出功能.该库为您提供了一个bulkCopyToSqlDB
方法,该方法是 real 批处理插入的,并且执行速度更快 .它比内置功能实用性差一些,但以我的经验,还是值得的.
We resorted to using the azure-sqldb-spark library instead of the default built-in exporting functionality of Spark. This library gives you a bulkCopyToSqlDB
method which is a real batch insert and goes a lot faster. It's a bit less practical to use than the built-in functionality, but in my experience it's still worth it.
我们或多或少像这样使用它:
We use it more or less like this:
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
import com.microsoft.azure.sqldb.spark.query._
val options = Map(
"url" -> "***",
"databaseName" -> "***",
"user" -> "***",
"password" -> "***",
"driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver"
)
// first make sure the table exists, with the correct column types
// and is properly cleaned up if necessary
val query = dropAndCreateQuery(df, "myTable")
val createConfig = Config(options ++ Map("QueryCustom" -> query))
spark.sqlContext.sqlDBQuery(createConfig)
val bulkConfig = Config(options ++ Map(
"dbTable" -> "myTable",
"bulkCopyBatchSize" -> "20000",
"bulkCopyTableLock" -> "true",
"bulkCopyTimeout" -> "600"
))
df.bulkCopyToSqlDB(bulkConfig)
如您所见,我们自己生成了CREATE TABLE
查询.您可以 让该库创建表,但是它只能执行dataFrame.limit(0).write.sqlDB(config)
,效率仍然很低,可能需要您缓存DataFrame
,并且不允许您选择SaveMode
.
As you can see we generate the CREATE TABLE
query ourselves. You can let the library create the table, but it will just do dataFrame.limit(0).write.sqlDB(config)
which can still be pretty inefficient, probably requires you to cache your DataFrame
, and it doesn't allow you to choose the SaveMode
.
也可能很有趣:将这个库添加到我们的sbt版本中时,我们必须使用ExclusionRule
,否则assembly
任务将会失败.
Also potentially interesting: we had to use an ExclusionRule
when adding this library to our sbt build, or the assembly
task would fail.
libraryDependencies += "com.microsoft.azure" % "azure-sqldb-spark" % "1.0.2" excludeAll(
ExclusionRule(organization = "org.apache.spark")
)
这篇关于Spark:优化将DataFrame写入SQL Server的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!