问题描述
使用spark-shell测试功能,
Using spark-shell to test a function,
def smallDfToCSV(fname: String, df: org.apache.spark.sql.DataFrame) : Unit = {
import java.io._
val pw = new PrintWriter(new File(fname))
val header = df.head.schema.map(r=>r.name)
pw.write( header.mkString(",")+"\n" ) // fine
df.limit(5000).foreach( r => pw.write(r.mkString(",")+"\n") ) // error!
// org.apache.spark.SparkException: Task not serializable
pw.close
} // \csvWr
val df = spark.sql(query)
smallDfToCSV("./lixo.csv", df)
错误不会使感觉是因为它正在运行:
The error not make sense because it is running:
df.foreach( r => println(r.mkString(",")) )
推荐答案
任务不能为之所以要序列化,是因为 PrintWriter
没有实现 java.io.Serializable
。在Spark执行程序上调用的任何类(即 map
, reduce
, foreach
等对数据集或RDD的操作)需要可序列化,以便可以将其分发给执行者。
The task cannot be serialized because PrintWriter
does not implement java.io.Serializable
. Any class that is called on a Spark executor (i.e. inside of a map
, reduce
, foreach
, etc. operation on a dataset or RDD) needs to be serializable so it can be distributed to executors.
我也很好奇您的职能目标。由于此函数将在执行程序上执行,因此您将获得 df
的部分内容写入 lixo.csv
在当前工作目录适合您各种执行者的位置。如果打算将 df
的全部内容写入本地计算机上的文件,则必须首先通过将其从RDD中删除。收集
。
I'm curious about the intended goal of your function, as well. Since this function will be executed on your executors, you're going to get partial contents of df
written to lixo.csv
in whatever the current working directory is for your various executors. If you're intending instead to write the entire contents of df
to a file on your local machine, you must first take it out of the RDD via collect
.
这篇关于作为参数传递的DF不起作用,任务不可序列化的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!