我在使用spark_apply
的Spark中遇到GC开销限制超出错误。这是我的规格:
sparklyr v0.6.2
星火v2.1.0
4个具有8核和29G内存的工作程序
闭包get_dates
一次将Cassandra的数据提取一行。总共约有20万行。该过程运行了大约一个半小时,然后给了我这个内存错误。
我已经尝试使用spark.driver.memory
来增加堆的大小,但是它不起作用。
有任何想法吗?下面的用法
> config <- spark_config()
> config$spark.executor.cores = 1 # this ensures a max of 32 separate executors
> config$spark.cores.max = 26 # this ensures that cassandra gets some resources too, not all to spark
> config$spark.driver.memory = "4G"
> config$spark.driver.memoryOverhead = "10g"
> config$spark.executor.memory = "4G"
> config$spark.executor.memoryOverhead = "1g"
> sc <- spark_connect(master = "spark://master",
+ config = config)
> accounts <- sdf_copy_to(sc, insight %>%
+ # slice(1:100) %>%
+ {.}, "accounts", overwrite=TRUE)
> accounts <- accounts %>% sdf_repartition(78)
> dag <- spark_apply(accounts, get_dates, group_by = c("row"),
+ columns = list(row = "integer",
+ last_update_by = "character",
+ last_end_time = "character",
+ read_val = "numeric",
+ batch_id = "numeric",
+ fail_reason = "character",
+ end_time = "character",
+ meas_type = "character",
+ svcpt_id = "numeric",
+ org_id = "character",
+ last_update_date = "character",
+ validation_status = "character"
+ ))
> peak_usage <- dag %>% collect
Error: java.lang.OutOfMemoryError: GC overhead limit exceeded
at org.apache.spark.sql.execution.SparkPlan$$anon$1.next(SparkPlan.scala:260)
at org.apache.spark.sql.execution.SparkPlan$$anon$1.next(SparkPlan.scala:254)
at scala.collection.Iterator$class.foreach(Iterator.scala:743)
at org.apache.spark.sql.execution.SparkPlan$$anon$1.foreach(SparkPlan.scala:254)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeCollect$1.apply(SparkPlan.scala:276)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeCollect$1.apply(SparkPlan.scala:275)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:275)
at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2371)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2765)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2370)
at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$collect$1.apply(Dataset.scala:2375)
at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$collect$1.apply(Dataset.scala:2375)
at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2778)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2375)
at org.apache.spark.sql.Dataset.collect(Dataset.scala:2351)
at sparklyr.Utils$.collect(utils.scala:196)
at sparklyr.Utils.collect(utils.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at sparklyr.Invoke$.invoke(invoke.scala:102)
at sparklyr.StreamHandler$.handleMethodCall(stream.scala:97)
at sparklyr.StreamHandler$.read(stream.scala:62)
at sparklyr.BackendHandler.channelRead0(handler.scala:52)
at sparklyr.BackendHandler.channelRead0(handler.scala:14)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
最佳答案
也许我误解了您的示例,但是内存问题似乎是在您收集而不是在使用spark_apply时发生的。尝试
config$spark.driver.maxResultSize <- XXX
XXX是您期望所需要的位置(对于类似的工作,我将其设置为4G)。有关更多详细信息,请参见https://spark.apache.org/docs/latest/configuration.html。