问题描述
以下代码收到无法序列化任务"的错误?
The following code got the error of "Task not serializable"?
错误
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2101)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.map(RDD.scala:369)
at ConnTest$.main(main.scala:41)
at ConnTest.main(main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: DoWork
Serialization stack:
- object not serializable (class: DoWork, value: DoWork@655621fd)
- field (class: ConnTest$$anonfun$2, name: doWork$1, type: class DoWork)
- object (class ConnTest$$anonfun$2, )
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
... 20 more
代码:
object ConnTest extends App {
override def main(args: scala.Array[String]): Unit = {
super.main(args)
val date = args(0)
val conf = new SparkConf()
val sc = new SparkContext(conf.setAppName("Test").setMaster("local[*]"))
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val jdbcSqlConn = "jdbc:sqlserver://......;"
val listJob = new ItemListJob(sqlContext, jdbcSqlConn)
val list = listJob.run(date).select("id").rdd.map(r => r(0).asInstanceOf[Int]).collect()
// It returns about 3000 rows
val doWork = new DoWork(sqlContext, jdbcSqlConn)
val processed = sc.parallelize(list).map(d => {
doWork.run(d, date)
})
}
}
class ItemList(sqlContext: org.apache.spark.sql.SQLContext, jdbcSqlConn: String) {
def run(date: LocalDate) = {
sqlContext.read.format("jdbc").options(Map(
"driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver",
"url" -> jdbcSqlConn,
"dbtable" -> s"dbo.GetList('$date')"
)).load()
}
}
class DoWork(sqlContext: org.apache.spark.sql.SQLContext, jdbcSqlConn: String) {
def run(id: Int, date: LocalDate) = {
// ...... read the data from database for id, and create a text file
val data = sqlContext.read.format("jdbc").options(Map(
"driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver",
"url" -> jdbcSqlConn,
"dbtable" -> s"someFunction('$id', $date)"
)).load()
// .... create a text file with content of data
(id, date)
}
}
更新:
我将.map()
调用更改为以下内容,
I changed the .map()
call to the following,
val processed = sc.parallelize(dealList).toDF.map(d => {
doWork.run(d(0).asInstanceOf[Int], rc)
})
现在我得到了
Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for java.time.LocalDate
- field (class: "java.time.LocalDate", name: "_2")
- root class: "scala.Tuple2"
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:602)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:596)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:587)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:381)
推荐答案
问题出在以下闭包中:
val processed = sc.parallelize(list).map(d => {
doWork.run(d, date)
})
map
中的闭包将在执行程序中运行,因此Spark需要序列化doWork
,并将其发送给执行程序. DoWork
必须可序列化.然而.我看到DoWork
包含sc
和sqlContext
,所以您不能仅仅使DoWork
实现Serializable
,因为您不能在执行程序中使用它们.
The closure in map
will run in executors, so Spark needs to serialize doWork
and send it to executors. DoWork
must be serializable. However. I saw DoWork
contains sc
and sqlContext
so you cannot just make DoWork
implement Serializable
because you cannot use them in executors.
我想您可能想将数据存储到DoWork
中的数据库中.如果是这样,您可以将RDD转换为DataFrame并通过jdbc
方法将其保存,例如:
I guess you probably want to store data into database in DoWork
. If so, you can convert RDD to DataFrame and save it via jdbc
method, such as:
sc.parallelize(list).toDF.write.jdbc(...)
由于您未提供DoWork
中的代码,因此我无法提供更多建议.
I cannot give more suggestions since you don't provides the codes in DoWork
.
这篇关于Spark应用程序收到“无法序列化任务"的错误?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!