本文介绍了Spark Dataframes 已成功创建但无法写入本地磁盘的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用 IntelliJ IDE 在 Microsoft Windows 平台上执行 Spark Scala 代码.

I am using IntelliJ IDE for executing Spark Scala code on Microsoft Windows Platform.

我有四个 Spark 数据帧,每个数据帧大约有 30000 条记录,我尝试从这些数据帧中的每一个中取出一列作为我要求的一部分.

I have four Spark Dataframes of around 30000 records each and I tried to take one column from each of those Dataframes as part of my requirement.

我使用了 Spark SQL 函数来完成它并成功执行.当我执行 DF.show() 或 DF.count() 方法时,我能够在屏幕上看到结果,但是当我尝试将数据帧写入本地磁盘(Windows 目录)时,作业因以下错误而中止:

I used Spark SQL function to do it and it got executed successfully. When I execute DF.show() or DF.count() method, I am able to see results in the screen but when I tried to write the dataframe into my local disk (windows directory) the job is getting aborted with the below error :

线程main"org.apache.spark.SparkException 中的异常:作业中止.在org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:147)在org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:121)在org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:121)在org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)在org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:121)在org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:101)在org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)在org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)在org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)在org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)在org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)在org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)在org.apache.spark.rdd.RDDOOperationScope$.withScope(RDDOOperationScope.scala:151)在org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)在org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)在org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87)在org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)在org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:492)在org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)在org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:198)在 main.src.countFeatures2$.countFeature$1(countFeatures2.scala:118)在main.src.countFeatures2$.getFeatureAsString$1(countFeatures2.scala:32)在 main.src.countFeatures2$.main(countFeatures2.scala:40) 在main.src.countFeatures2.main(countFeatures2.scala) 在sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)在 java.lang.reflect.Method.invoke(Method.java:498) 在com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)引起:org.apache.spark.SparkException:作业因阶段而中止失败:阶段 31.0 中的任务 0 失败 1 次,最近失败:阶段 31.0 中丢失的任务 0.0(TID 2636,本地主机,执行程序驱动程序):java.io.IOException:(空)命令字符串中的条目:空 chmod 0644D:\Test_Output_File2_temporary\0_temporary\attempt_20170830194047_0031_m_000000_0\part-00000-85c32c55-e12d-4433-979d-ccecb2fcd341.csv在org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:770)在 org.apache.hadoop.util.Shell.execCommand(Shell.java:866) 在org.apache.hadoop.util.Shell.execCommand(Shell.java:849) 在org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733)在org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.(RawLocalFileSystem.java:225)在org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.(RawLocalFileSystem.java:209)在org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307)在org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296)在org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328)在org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.(ChecksumFileSystem.java:398)在org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:461)在org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440)在 org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911) 在org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892) 在org.apache.hadoop.fs.FileSystem.create(FileSystem.java:789) 在org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.getRecordWriter(TextOutputFormat.java:132)在org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.(CSVRelation.scala:208)在org.apache.spark.sql.execution.datasources.csv.CSVOutputWriterFactory.newInstance(CSVRelation.scala:178)在org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.(FileFormatWriter.scala:234)在org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:182)在org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129)在org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128)在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)在 org.apache.spark.scheduler.Task.run(Task.scala:99) 在org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)在java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)在 java.lang.Thread.run(Thread.java:745)

驱动程序堆栈跟踪:在org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)在org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)在org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)在scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)在 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)在org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)在org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)在org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)在 scala.Option.foreach(Option.scala:257) 在org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)在org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)在org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)在org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)在 org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)在org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)在 org.apache.spark.SparkContext.runJob(SparkContext.scala:1918) 在org.apache.spark.SparkContext.runJob(SparkContext.scala:1931) 在org.apache.spark.SparkContext.runJob(SparkContext.scala:1951) 在org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:127)... 28 更多 引起:java.io.IOException: (null) entry in command字符串:空 chmod 0644D:\Test_Output_File2_temporary\0_temporary\attempt_20170830194047_0031_m_000000_0\part-00000-85c32c55-e12d-4433-979d-ccecb2fcd341.csv在org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:770)在 org.apache.hadoop.util.Shell.execCommand(Shell.java:866) 在org.apache.hadoop.util.Shell.execCommand(Shell.java:849) 在org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733)在org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.(RawLocalFileSystem.java:225)在org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.(RawLocalFileSystem.java:209)在org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307)在org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296)在org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328)在org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.(ChecksumFileSystem.java:398)在org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:461)在org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440)在 org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911) 在org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892) 在org.apache.hadoop.fs.FileSystem.create(FileSystem.java:789) 在org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.getRecordWriter(TextOutputFormat.java:132)在org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.(CSVRelation.scala:208)在org.apache.spark.sql.execution.datasources.csv.CSVOutputWriterFactory.newInstance(CSVRelation.scala:178)在org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.(FileFormatWriter.scala:234)在org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:182)在org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129)在org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128)在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)在 org.apache.spark.scheduler.Task.run(Task.scala:99) 在org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)在java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)在 java.lang.Thread.run(Thread.java:745) 中选择了 _JAVA_OPTIONS:-Xmx512M

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:127) ... 28 more Caused by: java.io.IOException: (null) entry in command string: null chmod 0644 D:\Test_Output_File2_temporary\0_temporary\attempt_20170830194047_0031_m_000000_0\part-00000-85c32c55-e12d-4433-979d-ccecb2fcd341.csv at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:770) at org.apache.hadoop.util.Shell.execCommand(Shell.java:866) at org.apache.hadoop.util.Shell.execCommand(Shell.java:849) at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733) at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.(RawLocalFileSystem.java:225) at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.(RawLocalFileSystem.java:209) at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307) at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296) at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328) at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.(ChecksumFileSystem.java:398) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:461) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:789) at org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.getRecordWriter(TextOutputFormat.java:132) at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.(CSVRelation.scala:208) at org.apache.spark.sql.execution.datasources.csv.CSVOutputWriterFactory.newInstance(CSVRelation.scala:178) at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.(FileFormatWriter.scala:234) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:182) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Picked up _JAVA_OPTIONS: -Xmx512M

进程以退出代码 1 结束

Process finished with exit code 1

我无法理解哪里出错了.有人能解释一下如何克服这个问题吗?

I am not able to understand where it went wrong. Can anybody explain how to overcome this issue ?

更新请注意,直到昨天我都能够写入相同的文件,并且我的系统或 IDE 的配置都没有进行任何更改.所以我不明白为什么它一直运行到昨天,为什么现在不运行

UPDATEPlease note that I was able to write the same files until yesterday and no changes were made in my system or with the IDE's configuration. So I don't understand why it was running till yesterday and why not it is running now

此链接中有一个类似的帖子:(空)在 Pyspark 上的 saveAsTextFile() 中输入命令字符串异常,但他们在 Jupiter 笔记本上使用 pyspark,而我的问题是 IntelliJ IDE

There was a similar post in this link : (null) entry in command string exception in saveAsTextFile() on Pyspark but they are using pyspark on Jupiter notebook whereas my issue is with IntelliJ IDE

将输出文件写入本地磁盘的超级简化代码

val Test_Output =spark.sql("select A.Col1, A.Col2, B.Col2, C.Col2, D.Col2 from A, B, C, D where A.primaryKey = B.primaryKey and B.primaryKey = C.primaryKey and C.primaryKey = D.primaryKey and D.primaryKey = A.primaryKey")

val Test_Output_File = Test_Output.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").option("nullValue", "0").save("D:/Test_Output_File")

推荐答案

最后我自己纠正了.我在创建数据帧时使用了 .persist() 方法.这帮助我编写输出文件而没有任何错误.虽然我不明白它背后的逻辑.

Finally I rectified myself. I used .persist() method while creating dataframes. That helped me to write the output files without any error. Though I do not understand the logic behind it.

感谢您对此的宝贵意见

这篇关于Spark Dataframes 已成功创建但无法写入本地磁盘的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-01 04:39