我遇到了OutOfMemoryError:请求的数组大小超过 VM 限制";运行我的 Scala Spark 作业时出错.

I am running into a "OutOfMemoryError: Requested array size exceeds VM limit" error when running my Scala Spark job.

我在具有以下组成的 AWS EMR 集群上运行此作业:

I'm running this job on an AWS EMR cluster with the following makeup:

主:1 m4.4xlarge 32 vCore,64 GiB 内存

核心:1 r3.4xlarge 32 vCore,122 GiB 内存

Core: 1 r3.4xlarge 32 vCore, 122 GiB memory

我使用的 Spark 版本是 EMR 发布标签 5.11.0 上的 2.2.1.

The version of Spark I'm using is 2.2.1 on EMR release label 5.11.0.

我在具有以下配置的 spark shell 中运行我的工作:

I'm running my job in a spark shell with the following configurations:

spark-shell --conf spark.driver.memory=40G
--conf spark.driver.maxResultSize=25G
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--conf spark.kryoserializer.buffer.max=2000
--conf spark.rpc.message.maxSize=2000
--conf spark.dynamicAllocation.enabled=true


What I'm attempting to do with this job is to convert a one column dataframe of objects into a one row dataframe that contains a list of those objects.


case class Properties (id: String)
case class Geometry (`type`: String, coordinates: Seq[Seq[Seq[String]]])
case class Features (`type`: String, properties: Properties, geometry: Geometry)


 |-- geometry: struct (nullable = true)
 |    |-- type: string (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: array (containsNull = true)
 |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |-- type: string (nullable = false)
 |-- properties: struct (nullable = false)
 |    |-- id: string (nullable = true)


I'm converting it to a list and adding it to a one row dataframe like so:

val x = Seq(df.collect.toList)
final_df.withColumn("features", typedLit(x))


I don't run into any issues when creating this list and it's pretty quick. However, there seems to be a limit to the size of this list when I try to write it out by doing either of the following:



I've tried to also convert the list to a dataframe by doing the following, but it seems to never end.

val x = Seq(df.collect.toList)
val y = x.toDF

我能够使用这个数据框的最大列表有 813318 个 Features 对象,每个对象包含一个 Geometry 对象,其中包含 33 个元素的列表,总共 29491869 个元素.

The largest list I've been capable of getting this dataframe to work with had 813318 Features objects, each of which contains a Geometry object that contains a list of 33 elements, for a total of 29491869 elements.


Attempting to write pretty much any list larger than that gives me the following stacktrace when running my job.

# java.lang.OutOfMemoryError: Requested array size exceeds VM limit
# -XX:OnOutOfMemoryError="kill -9 %p"
#   Executing /bin/sh -c "kill -9 33028"...
os::fork_and_exec failed: Cannot allocate memory (12)
18/03/29 21:41:35 ERROR FileFormatWriter: Aborting job null.
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
    at org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:73)
    at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter.write(UnsafeArrayWriter.java:217)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_1$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply1_1$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.execution.LocalTableScanExec$$anonfun$unsafeRows$1.apply(LocalTableScanExec.scala:41)
    at org.apache.spark.sql.execution.LocalTableScanExec$$anonfun$unsafeRows$1.apply(LocalTableScanExec.scala:41)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.immutable.List.map(List.scala:285)
    at org.apache.spark.sql.execution.LocalTableScanExec.unsafeRows$lzycompute(LocalTableScanExec.scala:41)
    at org.apache.spark.sql.execution.LocalTableScanExec.unsafeRows(LocalTableScanExec.scala:36)
    at org.apache.spark.sql.execution.LocalTableScanExec.rdd$lzycompute(LocalTableScanExec.scala:48)
    at org.apache.spark.sql.execution.LocalTableScanExec.rdd(LocalTableScanExec.scala:48)
    at org.apache.spark.sql.execution.LocalTableScanExec.doExecute(LocalTableScanExec.scala:52)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:173)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:166)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:145)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)


I've tried making a million configuration changes, including throwing both more driver and executor memory at this job, but to no avail. Is there any way around this? Any ideas?



Well, there is a dataframe aggregation function that does what you want without doing a collect on the driver. For example if you wanted to collect all "feature" columns by key: df.groupBy($"key").agg(collect_list("feature")), or if you really wanted to do that for the whole dataframe without grouping: df.agg(collect_list("feature")).

但是,我想知道您为什么要这样做,因为使用每个对象一行的数据框似乎比包含整个结果的单行更容易.即使使用 collect_list 聚合函数,如果您仍然内存不足,我也不会感到惊讶.

However I wonder why you'd want to do that, when it seems easier to work with a dataframe with one row per object than a single row containing the entire result. Even using the collect_list aggregation function I wouldn't be surprised if you still run out of memory.

