问题描述
我正在尝试从数据库中查询数据,对其进行一些转换,然后将新数据以拼花格式保存在hdfs上.
I'm trying to query data from a database, do some transformations on it and save the new data in parquet format on hdfs.
由于数据库查询返回了大量行,因此我要分批获取数据并在每个传入的批处理上运行上述过程.
Since the database query returns a large number of rows, I'm getting the data in batches and running the above process on every incoming batch.
更新2:批处理逻辑为:
import scala.collection.JavaConverters._
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, StringType}
class Batch(rows: List[String],
sqlContext: SQLContext) {
// The actual schema has around 60 fields
val schema = Array("name", "age", "address").map(field =>
StructField(field, StringType, true)
)
val transformedRows = rows.map(rows => {
// transformation logic (returns Array[Array[String]] type)
}).map(row => Row.fromSeq(row.toSeq))
val dataframe = sqlContext.createDataFrame(transformedRows.asJava, schema)
}
val sparkConf = new sparkConf().setAppName("Spark App")
val sparkContext = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sparkContext)
// Code to query database
// queryResponse is essentially an iterator that fetches the next batch on calling queryResponse.next
var batch_num = 0
while (queryResponse.hasNext) {
val batch = queryResponse.next
val batchToSave = new Batch(
batch.toList.map(_.getDocument.toString),
sqlContext)
batchToSave.dataframe.write.parquet(batch_num + "_Parquet")
batch_num += 1
}
我在1.6.1中的Spark版本和spark-submit是:
My Spark version in 1.6.1 and the spark-submit is:
spark-submit target/scala-2.10/Spark\ Application-assembly-1.0.jar
问题在于,经过一定数量的批处理后,出现了 java.lang.OutOfMemoryError
错误.
The problem is that after a certain number of batches, I get a java.lang.OutOfMemoryError
error.
整个堆栈跟踪为:
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOfRange(Arrays.java:2694)
at java.lang.String.<init>(String.java:203)
at java.lang.StringBuilder.toString(StringBuilder.java:405)
at scala.StringContext.standardInterpolator(StringContext.scala:125)
at scala.StringContext.s(StringContext.scala:90)
at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:70)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:52)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:256)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:334)
at app.Application$.main(App.scala:156)
at app.Application.main(App.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
我尝试将数据合并到一个分区中,但这没有任何区别.
I tried coalescing the data into a single partition but that didn't make any difference.
dataframe.coalesce(1).write.parquet(batch_num + "_Parquet")
任何帮助将不胜感激.
更新1
不对RDD进行合并转换仍然会给出错误,但是stacktrace如下.似乎是Parquet的问题.
Not doing a coalesce transform on the RDD still gives an error but the stacktrace is as follows. Seems to be an issue with Parquet.
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1855)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1868)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1945)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:150)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:256)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:334)
at app.Application$.main(App.scala:156)
at app.Application.main(App.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.OutOfMemoryError: Java heap space
at org.apache.parquet.column.values.dictionary.IntList.initSlab(IntList.java:90)
at org.apache.parquet.column.values.dictionary.IntList.<init>(IntList.java:86)
at org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.<init>(DictionaryValuesWriter.java:93)
at org.apache.parquet.column.values.dictionary.DictionaryValuesWriter$PlainBinaryDictionaryValuesWriter.<init>(DictionaryValuesWriter.java:229)
at org.apache.parquet.column.ParquetProperties.dictionaryWriter(ParquetProperties.java:131)
at org.apache.parquet.column.ParquetProperties.dictWriterWithFallBack(ParquetProperties.java:178)
at org.apache.parquet.column.ParquetProperties.getValuesWriter(ParquetProperties.java:203)
at org.apache.parquet.column.impl.ColumnWriterV1.<init>(ColumnWriterV1.java:83)
at org.apache.parquet.column.impl.ColumnWriteStoreV1.newMemColumn(ColumnWriteStoreV1.java:68)
at org.apache.parquet.column.impl.ColumnWriteStoreV1.getColumnWriter(ColumnWriteStoreV1.java:56)
at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.<init>(MessageColumnIO.java:183)
at org.apache.parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:375)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:109)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:99)
at org.apache.parquet.hadoop.ParquetRecordWriter.<init>(ParquetRecordWriter.java:100)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:303)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:262)
at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetRelation.scala:94)
at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anon$3.newInstance(ParquetRelation.scala:286)
at org.apache.spark.sql.execution.datasources.BaseWriterContainer.newOutputWriter(WriterContainer.scala:129)
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:255)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
推荐答案
今天也遇到了同样的问题.原来我的执行计划非常复杂,并且 toString 生成了150 MB的信息,再加上Scala的String插值会导致驱动程序内存不足.
Had the same problem today as well. Turned out my execution plan was rather complex and toString generated 150 MB of information which combined with String interpolation of Scala lead to the driver running out of memory.
您可以尝试增加驱动程序内存(我必须将其从8 GB翻倍到16 GB).
You can try to increase the driver memory (I had to double it from 8 GB to 16 GB).
这篇关于将Spark数据帧写入拼花格式时出现内存不足错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!